1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, Receiver, Sender};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency, OrDependency},
14 dependency_task::DependencyTask,
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::OuterConsensus,
19 epoch_membership::EpochMembershipCoordinator,
20 message::UpgradeLock,
21 simple_certificate::{
22 EpochRootQuorumCertificate, LightClientStateUpdateCertificateV2,
23 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
24 },
25 stake_table::StakeTableEntries,
26 traits::{
27 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
28 signature_key::SignatureKey,
29 storage::Storage,
30 },
31 utils::{is_epoch_transition, is_last_block, EpochTransitionIndicator},
32 vote::{Certificate, HasViewNumber},
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36
37use self::handlers::{ProposalDependency, ProposalDependencyHandle};
38use crate::{
39 events::HotShotEvent, helpers::broadcast_view_change,
40 quorum_proposal::handlers::handle_eqc_formed,
41};
42
43mod handlers;
44
45pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
47 pub latest_proposed_view: TYPES::View,
49
50 pub cur_epoch: Option<TYPES::Epoch>,
52
53 pub proposal_dependencies: BTreeMap<TYPES::View, Sender<()>>,
55
56 pub formed_quorum_certificates: BTreeMap<TYPES::View, QuorumCertificate2<TYPES>>,
58
59 pub formed_next_epoch_quorum_certificates:
61 BTreeMap<TYPES::View, NextEpochQuorumCertificate2<TYPES>>,
62
63 pub instance_state: Arc<TYPES::InstanceState>,
65
66 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
68
69 pub public_key: TYPES::SignatureKey,
71
72 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
74
75 pub timeout: u64,
77
78 pub storage: I::Storage,
80
81 pub consensus: OuterConsensus<TYPES>,
83
84 pub id: u64,
86
87 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
94
95 pub upgrade_lock: UpgradeLock<TYPES, V>,
97
98 pub epoch_height: u64,
100
101 pub formed_state_cert: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificateV2<TYPES>>,
103
104 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
106}
107
108impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
109 QuorumProposalTaskState<TYPES, I, V>
110{
111 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
113 fn create_event_dependency(
114 &self,
115 dependency_type: ProposalDependency,
116 view_number: TYPES::View,
117 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
118 cancel_receiver: Receiver<()>,
119 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
120 let id = self.id;
121 EventDependency::new(
122 event_receiver,
123 cancel_receiver,
124 format!(
125 "ProposalDependency::{:?} for view {:?}, my id {:?}",
126 dependency_type, view_number, self.id
127 ),
128 Box::new(move |event| {
129 let event = event.as_ref();
130 let event_view = match dependency_type {
131 ProposalDependency::Qc => {
132 if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
133 qc.view_number() + 1
134 } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
135 root_qc.view_number() + 1
136 } else {
137 return false;
138 }
139 },
140 ProposalDependency::TimeoutCert => {
141 if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
142 timeout.view_number() + 1
143 } else {
144 return false;
145 }
146 },
147 ProposalDependency::ViewSyncCert => {
148 if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
149 {
150 view_sync_cert.view_number()
151 } else {
152 return false;
153 }
154 },
155 ProposalDependency::Proposal => {
156 if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
157 {
158 proposal.data.view_number() + 1
159 } else {
160 return false;
161 }
162 },
163 ProposalDependency::PayloadAndMetadata => {
164 if let HotShotEvent::SendPayloadCommitmentAndMetadata(
165 _payload_commitment,
166 _builder_commitment,
167 _metadata,
168 view_number,
169 _fee,
170 ) = event
171 {
172 *view_number
173 } else {
174 return false;
175 }
176 },
177 ProposalDependency::VidShare => {
178 if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
179 vid_disperse.data.view_number()
180 } else {
181 return false;
182 }
183 },
184 };
185 let valid = event_view == view_number;
186 if valid {
187 tracing::debug!(
188 "Dependency {dependency_type:?} is complete for view {event_view:?}, my \
189 id is {id:?}!",
190 );
191 }
192 valid
193 }),
194 )
195 }
196
197 fn create_and_complete_dependencies(
199 &self,
200 view_number: TYPES::View,
201 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
202 event: Arc<HotShotEvent<TYPES>>,
203 cancel_receiver: &Receiver<()>,
204 ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
205 let mut proposal_dependency = self.create_event_dependency(
206 ProposalDependency::Proposal,
207 view_number,
208 event_receiver.clone(),
209 cancel_receiver.clone(),
210 );
211
212 let mut qc_dependency = self.create_event_dependency(
213 ProposalDependency::Qc,
214 view_number,
215 event_receiver.clone(),
216 cancel_receiver.clone(),
217 );
218
219 let mut view_sync_dependency = self.create_event_dependency(
220 ProposalDependency::ViewSyncCert,
221 view_number,
222 event_receiver.clone(),
223 cancel_receiver.clone(),
224 );
225
226 let mut timeout_dependency = self.create_event_dependency(
227 ProposalDependency::TimeoutCert,
228 view_number,
229 event_receiver.clone(),
230 cancel_receiver.clone(),
231 );
232
233 let mut payload_commitment_dependency = self.create_event_dependency(
234 ProposalDependency::PayloadAndMetadata,
235 view_number,
236 event_receiver.clone(),
237 cancel_receiver.clone(),
238 );
239
240 let mut vid_share_dependency = self.create_event_dependency(
241 ProposalDependency::VidShare,
242 view_number,
243 event_receiver.clone(),
244 cancel_receiver.clone(),
245 );
246
247 let epoch_height = self.epoch_height;
248
249 let mut next_epoch_qc_dependency = EventDependency::new(
252 event_receiver.clone(),
253 cancel_receiver.clone(),
254 format!(
255 "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
256 view_number, self.id
257 ),
258 Box::new(move |event| {
259 if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
260 event.as_ref()
261 {
262 return next_epoch_qc.view_number() + 1 == view_number;
263 }
264 if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
265 return true;
267 }
268 if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref() {
269 if qc.view_number() + 1 == view_number {
270 return qc
271 .data
272 .block_number
273 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
274 }
275 }
276 false
277 }),
278 );
279
280 match event.as_ref() {
281 HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
282 payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
283 },
284 HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
285 proposal_dependency.mark_as_completed(event);
286 },
287 HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
288 Either::Right(_) => timeout_dependency.mark_as_completed(event),
289 Either::Left(qc) => {
290 if qc
291 .data
292 .block_number
293 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
294 {
295 next_epoch_qc_dependency.mark_as_completed(event.clone());
296 }
297 qc_dependency.mark_as_completed(event);
298 },
299 },
300 HotShotEvent::EpochRootQcFormed(..) => {
301 next_epoch_qc_dependency.mark_as_completed(event.clone());
303 qc_dependency.mark_as_completed(event);
304 },
305 HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
306 view_sync_dependency.mark_as_completed(event);
307 },
308 HotShotEvent::VidDisperseSend(..) => {
309 vid_share_dependency.mark_as_completed(event);
310 },
311 HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
312 next_epoch_qc_dependency.mark_as_completed(event);
313 },
314 _ => {},
315 };
316
317 let mut secondary_deps = vec![
319 AndDependency::from_deps(vec![timeout_dependency]),
321 AndDependency::from_deps(vec![view_sync_dependency]),
323 ];
324 if *view_number > 1 {
326 secondary_deps.push(AndDependency::from_deps(vec![
327 qc_dependency,
328 proposal_dependency,
329 next_epoch_qc_dependency,
330 ]));
331 } else {
332 secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
333 }
334
335 let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
336
337 AndDependency::from_deps(vec![OrDependency::from_deps(vec![
338 AndDependency::from_deps(vec![
339 OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
340 OrDependency::from_deps(secondary_deps),
341 ]),
342 ])])
343 }
344
345 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
351 async fn create_dependency_task_if_new(
352 &mut self,
353 view_number: TYPES::View,
354 epoch_number: Option<TYPES::Epoch>,
355 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
356 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
357 event: Arc<HotShotEvent<TYPES>>,
358 epoch_transition_indicator: EpochTransitionIndicator,
359 ) -> Result<()> {
360 let epoch_membership = self
361 .membership_coordinator
362 .membership_for_epoch(epoch_number)
363 .await?;
364 let leader_in_current_epoch =
365 epoch_membership.leader(view_number).await? == self.public_key;
366 let leader_in_next_epoch = !leader_in_current_epoch
370 && epoch_number.is_some()
371 && matches!(
372 epoch_transition_indicator,
373 EpochTransitionIndicator::InTransition
374 )
375 && epoch_membership
376 .next_epoch()
377 .await
378 .context(warn!(
379 "Missing the randomized stake table for epoch {}",
380 epoch_number.unwrap() + 1
381 ))?
382 .leader(view_number)
383 .await?
384 == self.public_key;
385
386 ensure!(
388 leader_in_current_epoch || leader_in_next_epoch,
389 debug!("We are not the leader of the next view")
390 );
391
392 ensure!(
394 view_number > self.latest_proposed_view,
395 "We have already proposed for this view"
396 );
397
398 tracing::debug!(
399 "Attempting to make dependency task for view {view_number} and event {event:?}"
400 );
401
402 ensure!(
403 !self.proposal_dependencies.contains_key(&view_number),
404 "Task already exists"
405 );
406
407 let (cancel_sender, cancel_receiver) = broadcast(1);
408
409 let dependency_chain = self.create_and_complete_dependencies(
410 view_number,
411 &event_receiver,
412 event,
413 &cancel_receiver,
414 );
415
416 let dependency_task = DependencyTask::new(
417 dependency_chain,
418 ProposalDependencyHandle {
419 latest_proposed_view: self.latest_proposed_view,
420 view_number,
421 sender: event_sender,
422 receiver: event_receiver,
423 membership: epoch_membership,
424 public_key: self.public_key.clone(),
425 private_key: self.private_key.clone(),
426 instance_state: Arc::clone(&self.instance_state),
427 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
428 timeout: self.timeout,
429 formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
430 upgrade_lock: self.upgrade_lock.clone(),
431 id: self.id,
432 view_start_time: Instant::now(),
433 epoch_height: self.epoch_height,
434 },
435 );
436 self.proposal_dependencies
437 .insert(view_number, cancel_sender);
438
439 dependency_task.run();
440
441 Ok(())
442 }
443
444 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
446 async fn update_latest_proposed_view(&mut self, new_view: TYPES::View) -> bool {
447 if *self.latest_proposed_view < *new_view {
448 tracing::debug!(
449 "Updating latest proposed view from {} to {}",
450 *self.latest_proposed_view,
451 *new_view
452 );
453
454 for view in (*self.latest_proposed_view + 1)..=(*new_view) {
456 let maybe_cancel_sender =
457 self.proposal_dependencies.remove(&TYPES::View::new(view));
458 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
459 tracing::error!("Aborting proposal dependency task for view {view}");
460 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
461 }
462 }
463
464 self.latest_proposed_view = new_view;
465
466 return true;
467 }
468 false
469 }
470
471 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
473 pub async fn handle(
474 &mut self,
475 event: Arc<HotShotEvent<TYPES>>,
476 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
477 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
478 ) -> Result<()> {
479 let epoch_number = self.cur_epoch;
480 let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
481 let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
482 is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
483 }) {
484 EpochTransitionIndicator::InTransition
485 } else {
486 EpochTransitionIndicator::NotInTransition
487 };
488 match event.as_ref() {
489 HotShotEvent::UpgradeCertificateFormed(cert) => {
490 tracing::debug!(
491 "Upgrade certificate received for view {}!",
492 *cert.view_number
493 );
494 if cert.data.decide_by >= self.latest_proposed_view + 3 {
496 tracing::debug!("Updating current formed_upgrade_certificate");
497
498 self.formed_upgrade_certificate = Some(cert.clone());
499 }
500 },
501 HotShotEvent::Qc2Formed(cert) => match cert.clone() {
502 either::Right(timeout_cert) => {
503 let view_number = timeout_cert.view_number + 1;
504 self.create_dependency_task_if_new(
505 view_number,
506 epoch_number,
507 event_receiver,
508 event_sender,
509 Arc::clone(&event),
510 epoch_transition_indicator,
511 )
512 .await?;
513 },
514 either::Left(qc) => {
515 if qc.view_number <= self.consensus.read().await.high_qc().view_number {
517 tracing::trace!(
518 "Received a QC for a view that was not > than our current high QC"
519 );
520 }
521
522 self.formed_quorum_certificates
523 .insert(qc.view_number(), qc.clone());
524
525 handle_eqc_formed(
526 qc.view_number(),
527 qc.data.leaf_commit,
528 qc.data.block_number,
529 self,
530 &event_sender,
531 )
532 .await;
533
534 let view_number = qc.view_number() + 1;
535 if !qc
536 .data
537 .block_number
538 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
539 {
540 broadcast_view_change(
541 &event_sender,
542 view_number,
543 qc.data.epoch,
544 self.first_epoch,
545 )
546 .await;
547 }
548 self.create_dependency_task_if_new(
549 view_number,
550 epoch_number,
551 event_receiver,
552 event_sender,
553 Arc::clone(&event),
554 epoch_transition_indicator,
555 )
556 .await?;
557 },
558 },
559
560 HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificate { qc, state_cert }) => {
561 if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
563 tracing::trace!(
564 "Received a QC for a view that was not > than our current high QC"
565 );
566 }
567
568 self.formed_quorum_certificates
569 .insert(qc.view_number(), qc.clone());
570 self.formed_state_cert
571 .insert(state_cert.epoch, state_cert.clone());
572
573 self.storage
574 .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
575 .await
576 .wrap()
577 .context(error!(
578 "Failed to update the epoch root QC and state cert in storage!"
579 ))?;
580
581 let view_number = qc.view_number() + 1;
582 broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
583 .await;
584 self.create_dependency_task_if_new(
585 view_number,
586 epoch_number,
587 event_receiver,
588 event_sender,
589 Arc::clone(&event),
590 epoch_transition_indicator,
591 )
592 .await?;
593 },
594 HotShotEvent::SendPayloadCommitmentAndMetadata(
595 _payload_commitment,
596 _builder_commitment,
597 _metadata,
598 view_number,
599 _fee,
600 ) => {
601 let view_number = *view_number;
602
603 self.create_dependency_task_if_new(
604 view_number,
605 epoch_number,
606 event_receiver,
607 event_sender,
608 Arc::clone(&event),
609 epoch_transition_indicator,
610 )
611 .await?;
612 },
613 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
614 let epoch_number = certificate.data.epoch;
615 let epoch_membership = self
616 .membership_coordinator
617 .stake_table_for_epoch(epoch_number)
618 .await
619 .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
620
621 let membership_stake_table = epoch_membership.stake_table().await;
622 let membership_success_threshold = epoch_membership.success_threshold().await;
623
624 certificate
625 .is_valid_cert(
626 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
627 membership_success_threshold,
628 &self.upgrade_lock,
629 )
630 .await
631 .context(|e| {
632 warn!(
633 "View Sync Finalize certificate {:?} was invalid: {}",
634 certificate.data(),
635 e
636 )
637 })?;
638
639 let view_number = certificate.view_number;
640
641 self.create_dependency_task_if_new(
642 view_number,
643 epoch_number,
644 event_receiver,
645 event_sender,
646 event,
647 epoch_transition_indicator,
648 )
649 .await?;
650 },
651 HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
652 let view_number = proposal.data.view_number();
653 if !self.update_latest_proposed_view(view_number).await {
655 tracing::trace!("Failed to update latest proposed view");
656 }
657
658 self.create_dependency_task_if_new(
659 view_number + 1,
660 epoch_number,
661 event_receiver,
662 event_sender,
663 Arc::clone(&event),
664 epoch_transition_indicator,
665 )
666 .await?;
667 },
668 HotShotEvent::QuorumProposalSend(proposal, _) => {
669 let view = proposal.data.view_number();
670
671 ensure!(
672 self.update_latest_proposed_view(view).await,
673 "Failed to update latest proposed view"
674 );
675 },
676 HotShotEvent::VidDisperseSend(vid_disperse, _) => {
677 let view_number = vid_disperse.data.view_number();
678 self.create_dependency_task_if_new(
679 view_number,
680 epoch_number,
681 event_receiver,
682 event_sender,
683 Arc::clone(&event),
684 epoch_transition_indicator,
685 )
686 .await?;
687 },
688 HotShotEvent::ViewChange(view, epoch) => {
689 if epoch > &self.cur_epoch {
690 self.cur_epoch = *epoch;
691 }
692 let keep_view = TYPES::View::new(view.saturating_sub(1));
693 self.cancel_tasks(keep_view);
694 },
695 HotShotEvent::Timeout(view, ..) => {
696 let keep_view = TYPES::View::new(view.saturating_sub(1));
697 self.cancel_tasks(keep_view);
698 },
699 HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
700 let current_next_epoch_qc =
702 self.consensus.read().await.next_epoch_high_qc().cloned();
703 ensure!(
704 current_next_epoch_qc.is_none()
705 || next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
706 debug!(
707 "Received a next epoch QC for a view that was not > than our current next \
708 epoch high QC"
709 )
710 );
711
712 self.formed_next_epoch_quorum_certificates
713 .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
714
715 handle_eqc_formed(
716 next_epoch_qc.view_number(),
717 next_epoch_qc.data.leaf_commit,
718 next_epoch_qc.data.block_number,
719 self,
720 &event_sender,
721 )
722 .await;
723
724 let view_number = next_epoch_qc.view_number() + 1;
725 self.create_dependency_task_if_new(
726 view_number,
727 epoch_number,
728 event_receiver,
729 event_sender,
730 Arc::clone(&event),
731 epoch_transition_indicator,
732 )
733 .await?;
734 },
735 HotShotEvent::SetFirstEpoch(view, epoch) => {
736 self.first_epoch = Some((*view, *epoch));
737 },
738 _ => {},
739 }
740 Ok(())
741 }
742
743 pub fn cancel_tasks(&mut self, view: TYPES::View) {
745 let keep = self.proposal_dependencies.split_off(&view);
746 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
747 if !cancel_sender.is_closed() {
748 tracing::error!("Aborting proposal dependency task for view {view}");
749 let _ = cancel_sender.try_broadcast(());
750 }
751 }
752 self.proposal_dependencies = keep;
753 }
754}
755
756#[async_trait]
757impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
758 for QuorumProposalTaskState<TYPES, I, V>
759{
760 type Event = HotShotEvent<TYPES>;
761
762 async fn handle_event(
763 &mut self,
764 event: Arc<Self::Event>,
765 sender: &Sender<Arc<Self::Event>>,
766 receiver: &Receiver<Arc<Self::Event>>,
767 ) -> Result<()> {
768 self.handle(event, receiver.clone(), sender.clone()).await
769 }
770
771 fn cancel_subtasks(&mut self) {
772 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
773 if !cancel_sender.is_closed() {
774 tracing::error!("Aborting proposal dependency task for view {view}");
775 let _ = cancel_sender.try_broadcast(());
776 }
777 }
778 }
779}