hotshot_task_impls/quorum_proposal_recv/
mod.rs1#![allow(unused_imports)]
8
9use std::{collections::BTreeMap, sync::Arc};
10
11use async_broadcast::{broadcast, Receiver, Sender};
12use async_lock::RwLock;
13use async_trait::async_trait;
14use either::Either;
15use futures::future::{err, join_all};
16use hotshot_task::task::{Task, TaskState};
17use hotshot_types::{
18 consensus::{Consensus, OuterConsensus},
19 data::{EpochNumber, Leaf, ViewChangeEvidence2},
20 epoch_membership::{self, EpochMembership, EpochMembershipCoordinator},
21 event::Event,
22 message::UpgradeLock,
23 simple_certificate::UpgradeCertificate,
24 simple_vote::HasEpoch,
25 traits::{
26 block_contents::BlockHeader,
27 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
28 signature_key::SignatureKey,
29 },
30 utils::option_epoch_from_block_number,
31 vote::{Certificate, HasViewNumber},
32};
33use hotshot_utils::anytrace::{bail, Result};
34use tokio::task::JoinHandle;
35use tracing::{debug, error, info, instrument, warn};
36use vbs::version::Version;
37
38use self::handlers::handle_quorum_proposal_recv;
39use crate::{
40 events::{HotShotEvent, ProposalMissing},
41 helpers::{broadcast_event, fetch_proposal, parent_leaf_and_state},
42};
43mod handlers;
45
46pub struct QuorumProposalRecvTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
49 pub public_key: TYPES::SignatureKey,
51
52 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
54
55 pub consensus: OuterConsensus<TYPES>,
57
58 pub cur_view: TYPES::View,
60
61 pub cur_epoch: Option<TYPES::Epoch>,
63
64 pub membership: EpochMembershipCoordinator<TYPES>,
66
67 pub timeout: u64,
69
70 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
72
73 pub storage: I::Storage,
75
76 pub spawned_tasks: BTreeMap<TYPES::View, Vec<JoinHandle<()>>>,
79
80 pub id: u64,
82
83 pub upgrade_lock: UpgradeLock<TYPES, V>,
85
86 pub epoch_height: u64,
88
89 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
91}
92
93pub(crate) struct ValidationInfo<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
96 pub id: u64,
98
99 pub(crate) public_key: TYPES::SignatureKey,
101
102 pub(crate) private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
104
105 pub(crate) consensus: OuterConsensus<TYPES>,
107
108 pub membership: EpochMembership<TYPES>,
110
111 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
113
114 pub(crate) storage: I::Storage,
116
117 pub(crate) upgrade_lock: UpgradeLock<TYPES, V>,
119
120 pub epoch_height: u64,
122
123 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
125}
126
127impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
128 QuorumProposalRecvTaskState<TYPES, I, V>
129{
130 pub fn cancel_tasks(&mut self, view: TYPES::View) {
132 let keep = self.spawned_tasks.split_off(&view);
133 while let Some((_, tasks)) = self.spawned_tasks.pop_first() {
134 for task in tasks {
135 task.abort();
136 }
137 }
138 self.spawned_tasks = keep;
139 }
140
141 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error")]
143 #[allow(unused_variables)]
144 pub async fn handle(
145 &mut self,
146 event: Arc<HotShotEvent<TYPES>>,
147 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
148 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
149 ) {
150 match event.as_ref() {
151 HotShotEvent::QuorumProposalRecv(proposal, sender) => {
152 tracing::debug!(
153 "Quorum proposal recv for view {}",
154 proposal.data.view_number()
155 );
156 if self.consensus.read().await.cur_view() > proposal.data.view_number()
157 || self.cur_view > proposal.data.view_number()
158 {
159 tracing::warn!(
160 "Throwing away old proposal for view {}",
161 proposal.data.view_number()
162 );
163 return;
164 }
165 let proposal_epoch = option_epoch_from_block_number::<TYPES>(
166 proposal.data.proposal.epoch().is_some(),
167 proposal.data.block_header().block_number(),
168 self.epoch_height,
169 );
170 let Ok(epoch_membership) =
171 self.membership.membership_for_epoch(proposal_epoch).await
172 else {
173 tracing::warn!("No Stake table for epoch = {proposal_epoch:?}");
174 return;
175 };
176 let validation_info = ValidationInfo::<TYPES, I, V> {
177 id: self.id,
178 public_key: self.public_key.clone(),
179 private_key: self.private_key.clone(),
180 consensus: self.consensus.clone(),
181 membership: epoch_membership,
182 output_event_stream: self.output_event_stream.clone(),
183 storage: self.storage.clone(),
184 upgrade_lock: self.upgrade_lock.clone(),
185 epoch_height: self.epoch_height,
186 first_epoch: self.first_epoch,
187 };
188 match handle_quorum_proposal_recv(
189 proposal,
190 sender,
191 &event_sender,
192 &event_receiver,
193 validation_info,
194 )
195 .await
196 {
197 Ok(()) => {},
198 Err(e) => tracing::error!(?e, "Failed to validate the proposal"),
199 }
200 },
201 HotShotEvent::ViewChange(view, epoch) => {
202 if *epoch > self.cur_epoch {
203 self.cur_epoch = *epoch;
204 }
205 if self.cur_view >= *view {
206 return;
207 }
208 self.cur_view = *view;
209 let oldest_view_to_keep = TYPES::View::new(view.saturating_sub(1));
214 self.cancel_tasks(oldest_view_to_keep);
215 },
216 HotShotEvent::SetFirstEpoch(view, epoch) => {
217 self.first_epoch = Some((*view, *epoch));
218 },
219 _ => {},
220 }
221 }
222}
223
224#[async_trait]
225impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
226 for QuorumProposalRecvTaskState<TYPES, I, V>
227{
228 type Event = HotShotEvent<TYPES>;
229
230 async fn handle_event(
231 &mut self,
232 event: Arc<Self::Event>,
233 sender: &Sender<Arc<Self::Event>>,
234 receiver: &Receiver<Arc<Self::Event>>,
235 ) -> Result<()> {
236 self.handle(event, sender.clone(), receiver.clone()).await;
237
238 Ok(())
239 }
240
241 fn cancel_subtasks(&mut self) {
242 while !self.spawned_tasks.is_empty() {
243 let Some((_, handles)) = self.spawned_tasks.pop_first() else {
244 break;
245 };
246 for handle in handles {
247 handle.abort();
248 }
249 }
250 }
251}