hotshot_task_impls/quorum_proposal_recv/
mod.rs1#![allow(unused_imports)]
8
9use std::{collections::BTreeMap, sync::Arc};
10
11use async_broadcast::{Receiver, Sender, broadcast};
12use async_lock::RwLock;
13use async_trait::async_trait;
14use either::Either;
15use futures::future::{err, join_all};
16use hotshot_task::task::{Task, TaskState};
17use hotshot_types::{
18 consensus::{Consensus, OuterConsensus},
19 data::{EpochNumber, Leaf, ViewChangeEvidence2, ViewNumber},
20 epoch_membership::{self, EpochMembership, EpochMembershipCoordinator},
21 event::Event,
22 message::UpgradeLock,
23 simple_certificate::UpgradeCertificate,
24 simple_vote::HasEpoch,
25 traits::{
26 block_contents::BlockHeader,
27 node_implementation::{NodeImplementation, NodeType},
28 signature_key::SignatureKey,
29 },
30 utils::option_epoch_from_block_number,
31 vote::{Certificate, HasViewNumber},
32};
33use hotshot_utils::anytrace::{Result, bail};
34use tokio::task::JoinHandle;
35use tracing::{debug, error, info, instrument, warn};
36use vbs::version::Version;
37
38use self::handlers::handle_quorum_proposal_recv;
39use crate::{
40 events::{HotShotEvent, ProposalMissing},
41 helpers::{broadcast_event, fetch_proposal, parent_leaf_and_state},
42};
43mod handlers;
45
46pub struct QuorumProposalRecvTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
49 pub public_key: TYPES::SignatureKey,
51
52 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
54
55 pub consensus: OuterConsensus<TYPES>,
57
58 pub cur_view: ViewNumber,
60
61 pub cur_epoch: Option<EpochNumber>,
63
64 pub membership: EpochMembershipCoordinator<TYPES>,
66
67 pub timeout: u64,
69
70 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
72
73 pub storage: I::Storage,
75
76 pub spawned_tasks: BTreeMap<ViewNumber, Vec<JoinHandle<()>>>,
79
80 pub id: u64,
82
83 pub upgrade_lock: UpgradeLock<TYPES>,
85
86 pub epoch_height: u64,
88
89 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
91}
92
93pub(crate) struct ValidationInfo<TYPES: NodeType, I: NodeImplementation<TYPES>> {
96 pub id: u64,
98
99 pub(crate) public_key: TYPES::SignatureKey,
101
102 pub(crate) private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
104
105 pub(crate) consensus: OuterConsensus<TYPES>,
107
108 pub membership: EpochMembership<TYPES>,
110
111 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
113
114 pub(crate) storage: I::Storage,
116
117 pub(crate) upgrade_lock: UpgradeLock<TYPES>,
119
120 pub epoch_height: u64,
122
123 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
125}
126
127impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalRecvTaskState<TYPES, I> {
128 pub fn cancel_tasks(&mut self, view: ViewNumber) {
130 let keep = self.spawned_tasks.split_off(&view);
131 while let Some((_, tasks)) = self.spawned_tasks.pop_first() {
132 for task in tasks {
133 task.abort();
134 }
135 }
136 self.spawned_tasks = keep;
137 }
138
139 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error")]
141 #[allow(unused_variables)]
142 pub async fn handle(
143 &mut self,
144 event: Arc<HotShotEvent<TYPES>>,
145 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
146 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
147 ) {
148 match event.as_ref() {
149 HotShotEvent::QuorumProposalRecv(proposal, sender) => {
150 tracing::debug!(
151 "Quorum proposal recv for view {}",
152 proposal.data.view_number()
153 );
154 if self.consensus.read().await.cur_view() > proposal.data.view_number() + 1
155 || self.cur_view > proposal.data.view_number() + 1
156 {
157 tracing::warn!(
158 "Throwing away old proposal for view {}",
159 proposal.data.view_number()
160 );
161 return;
162 }
163 let proposal_epoch = option_epoch_from_block_number(
164 proposal.data.proposal.epoch().is_some(),
165 proposal.data.block_header().block_number(),
166 self.epoch_height,
167 );
168 let Ok(epoch_membership) =
169 self.membership.membership_for_epoch(proposal_epoch).await
170 else {
171 tracing::warn!("No Stake table for epoch = {proposal_epoch:?}");
172 return;
173 };
174 let validation_info = ValidationInfo::<TYPES, I> {
175 id: self.id,
176 public_key: self.public_key.clone(),
177 private_key: self.private_key.clone(),
178 consensus: self.consensus.clone(),
179 membership: epoch_membership,
180 output_event_stream: self.output_event_stream.clone(),
181 storage: self.storage.clone(),
182 upgrade_lock: self.upgrade_lock.clone(),
183 epoch_height: self.epoch_height,
184 first_epoch: self.first_epoch,
185 };
186 match handle_quorum_proposal_recv(
187 proposal,
188 sender,
189 &event_sender,
190 &event_receiver,
191 validation_info,
192 )
193 .await
194 {
195 Ok(()) => {},
196 Err(e) => tracing::error!(?e, "Failed to validate the proposal"),
197 }
198 },
199 HotShotEvent::ViewChange(view, epoch) => {
200 if *epoch > self.cur_epoch {
201 self.cur_epoch = *epoch;
202 }
203 if self.cur_view >= *view {
204 return;
205 }
206 self.cur_view = *view;
207 let oldest_view_to_keep = ViewNumber::new(view.saturating_sub(1));
212 self.cancel_tasks(oldest_view_to_keep);
213 },
214 HotShotEvent::SetFirstEpoch(view, epoch) => {
215 self.first_epoch = Some((*view, *epoch));
216 },
217 _ => {},
218 }
219 }
220}
221
222#[async_trait]
223impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState
224 for QuorumProposalRecvTaskState<TYPES, I>
225{
226 type Event = HotShotEvent<TYPES>;
227
228 async fn handle_event(
229 &mut self,
230 event: Arc<Self::Event>,
231 sender: &Sender<Arc<Self::Event>>,
232 receiver: &Receiver<Arc<Self::Event>>,
233 ) -> Result<()> {
234 self.handle(event, sender.clone(), receiver.clone()).await;
235
236 Ok(())
237 }
238
239 fn cancel_subtasks(&mut self) {
240 while !self.spawned_tasks.is_empty() {
241 let Some((_, handles)) = self.spawned_tasks.pop_first() else {
242 break;
243 };
244 for handle in handles {
245 handle.abort();
246 }
247 }
248 }
249}