1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, Receiver, Sender};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency, OrDependency},
14 dependency_task::DependencyTask,
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::OuterConsensus,
19 epoch_membership::EpochMembershipCoordinator,
20 message::UpgradeLock,
21 simple_certificate::{
22 EpochRootQuorumCertificateV2, LightClientStateUpdateCertificateV2,
23 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
24 },
25 stake_table::StakeTableEntries,
26 traits::{
27 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
28 signature_key::SignatureKey,
29 storage::Storage,
30 },
31 utils::{is_epoch_transition, is_last_block, EpochTransitionIndicator},
32 vote::{Certificate, HasViewNumber},
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36
37use self::handlers::{ProposalDependency, ProposalDependencyHandle};
38use crate::{
39 events::HotShotEvent, helpers::broadcast_view_change,
40 quorum_proposal::handlers::handle_eqc_formed,
41};
42
43mod handlers;
44
45pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
47 pub latest_proposed_view: TYPES::View,
49
50 pub cur_epoch: Option<TYPES::Epoch>,
52
53 pub proposal_dependencies: BTreeMap<TYPES::View, Sender<()>>,
55
56 pub formed_quorum_certificates: BTreeMap<TYPES::View, QuorumCertificate2<TYPES>>,
58
59 pub formed_next_epoch_quorum_certificates:
61 BTreeMap<TYPES::View, NextEpochQuorumCertificate2<TYPES>>,
62
63 pub instance_state: Arc<TYPES::InstanceState>,
65
66 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
68
69 pub public_key: TYPES::SignatureKey,
71
72 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
74
75 pub timeout: u64,
77
78 pub storage: I::Storage,
80
81 pub consensus: OuterConsensus<TYPES>,
83
84 pub id: u64,
86
87 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
94
95 pub upgrade_lock: UpgradeLock<TYPES, V>,
97
98 pub epoch_height: u64,
100
101 pub formed_state_cert: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificateV2<TYPES>>,
103
104 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
106}
107
108impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
109 QuorumProposalTaskState<TYPES, I, V>
110{
111 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
113 fn create_event_dependency(
114 &self,
115 dependency_type: ProposalDependency,
116 view_number: TYPES::View,
117 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
118 cancel_receiver: Receiver<()>,
119 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
120 let id = self.id;
121 EventDependency::new(
122 event_receiver,
123 cancel_receiver,
124 format!(
125 "ProposalDependency::{:?} for view {:?}, my id {:?}",
126 dependency_type, view_number, self.id
127 ),
128 Box::new(move |event| {
129 let event = event.as_ref();
130 let event_view = match dependency_type {
131 ProposalDependency::Qc => {
132 if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
133 qc.view_number() + 1
134 } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
135 root_qc.view_number() + 1
136 } else {
137 return false;
138 }
139 },
140 ProposalDependency::TimeoutCert => {
141 if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
142 timeout.view_number() + 1
143 } else {
144 return false;
145 }
146 },
147 ProposalDependency::ViewSyncCert => {
148 if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
149 {
150 view_sync_cert.view_number()
151 } else {
152 return false;
153 }
154 },
155 ProposalDependency::Proposal => {
156 if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
157 {
158 proposal.data.view_number() + 1
159 } else {
160 return false;
161 }
162 },
163 ProposalDependency::PayloadAndMetadata => {
164 if let HotShotEvent::SendPayloadCommitmentAndMetadata(
165 _payload_commitment,
166 _builder_commitment,
167 _metadata,
168 view_number,
169 _fee,
170 ) = event
171 {
172 *view_number
173 } else {
174 return false;
175 }
176 },
177 ProposalDependency::VidShare => {
178 if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
179 vid_disperse.data.view_number()
180 } else {
181 return false;
182 }
183 },
184 };
185 let valid = event_view == view_number;
186 if valid {
187 tracing::debug!(
188 "Dependency {dependency_type:?} is complete for view {event_view:?}, my \
189 id is {id:?}!",
190 );
191 }
192 valid
193 }),
194 )
195 }
196
197 fn create_and_complete_dependencies(
199 &self,
200 view_number: TYPES::View,
201 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
202 event: Arc<HotShotEvent<TYPES>>,
203 cancel_receiver: &Receiver<()>,
204 ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
205 let mut proposal_dependency = self.create_event_dependency(
206 ProposalDependency::Proposal,
207 view_number,
208 event_receiver.clone(),
209 cancel_receiver.clone(),
210 );
211
212 let mut qc_dependency = self.create_event_dependency(
213 ProposalDependency::Qc,
214 view_number,
215 event_receiver.clone(),
216 cancel_receiver.clone(),
217 );
218
219 let mut view_sync_dependency = self.create_event_dependency(
220 ProposalDependency::ViewSyncCert,
221 view_number,
222 event_receiver.clone(),
223 cancel_receiver.clone(),
224 );
225
226 let mut timeout_dependency = self.create_event_dependency(
227 ProposalDependency::TimeoutCert,
228 view_number,
229 event_receiver.clone(),
230 cancel_receiver.clone(),
231 );
232
233 let mut payload_commitment_dependency = self.create_event_dependency(
234 ProposalDependency::PayloadAndMetadata,
235 view_number,
236 event_receiver.clone(),
237 cancel_receiver.clone(),
238 );
239
240 let mut vid_share_dependency = self.create_event_dependency(
241 ProposalDependency::VidShare,
242 view_number,
243 event_receiver.clone(),
244 cancel_receiver.clone(),
245 );
246
247 let epoch_height = self.epoch_height;
248
249 let mut next_epoch_qc_dependency = EventDependency::new(
252 event_receiver.clone(),
253 cancel_receiver.clone(),
254 format!(
255 "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
256 view_number, self.id
257 ),
258 Box::new(move |event| {
259 if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
260 event.as_ref()
261 {
262 return next_epoch_qc.view_number() + 1 == view_number;
263 }
264 if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
265 return true;
267 }
268 if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref() {
269 if qc.view_number() + 1 == view_number {
270 return qc
271 .data
272 .block_number
273 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
274 }
275 }
276 false
277 }),
278 );
279
280 match event.as_ref() {
281 HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
282 payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
283 },
284 HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
285 proposal_dependency.mark_as_completed(event);
286 },
287 HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
288 Either::Right(_) => timeout_dependency.mark_as_completed(event),
289 Either::Left(qc) => {
290 if qc
291 .data
292 .block_number
293 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
294 {
295 next_epoch_qc_dependency.mark_as_completed(event.clone());
296 }
297 qc_dependency.mark_as_completed(event);
298 },
299 },
300 HotShotEvent::EpochRootQcFormed(..) => {
301 next_epoch_qc_dependency.mark_as_completed(event.clone());
303 qc_dependency.mark_as_completed(event);
304 },
305 HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
306 view_sync_dependency.mark_as_completed(event);
307 },
308 HotShotEvent::VidDisperseSend(..) => {
309 vid_share_dependency.mark_as_completed(event);
310 },
311 HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
312 next_epoch_qc_dependency.mark_as_completed(event);
313 },
314 _ => {},
315 };
316
317 let mut secondary_deps = vec![
319 AndDependency::from_deps(vec![timeout_dependency]),
321 AndDependency::from_deps(vec![view_sync_dependency]),
323 ];
324 if *view_number > 1 {
326 secondary_deps.push(AndDependency::from_deps(vec![
327 qc_dependency,
328 proposal_dependency,
329 next_epoch_qc_dependency,
330 ]));
331 } else {
332 secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
333 }
334
335 let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
336
337 AndDependency::from_deps(vec![OrDependency::from_deps(vec![
338 AndDependency::from_deps(vec![
339 OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
340 OrDependency::from_deps(secondary_deps),
341 ]),
342 ])])
343 }
344
345 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
351 async fn create_dependency_task_if_new(
352 &mut self,
353 view_number: TYPES::View,
354 epoch_number: Option<TYPES::Epoch>,
355 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
356 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
357 event: Arc<HotShotEvent<TYPES>>,
358 epoch_transition_indicator: EpochTransitionIndicator,
359 ) -> Result<()> {
360 let epoch_membership = self
361 .membership_coordinator
362 .membership_for_epoch(epoch_number)
363 .await?;
364 let leader_in_current_epoch =
365 epoch_membership.leader(view_number).await? == self.public_key;
366 let leader_in_next_epoch = !leader_in_current_epoch
370 && epoch_number.is_some()
371 && matches!(
372 epoch_transition_indicator,
373 EpochTransitionIndicator::InTransition
374 )
375 && epoch_membership
376 .next_epoch()
377 .await
378 .context(warn!(
379 "Missing the randomized stake table for epoch {}",
380 epoch_number.unwrap() + 1
381 ))?
382 .leader(view_number)
383 .await?
384 == self.public_key;
385
386 ensure!(
388 leader_in_current_epoch || leader_in_next_epoch,
389 debug!("We are not the leader of the next view")
390 );
391
392 ensure!(
394 view_number > self.latest_proposed_view,
395 "We have already proposed for this view"
396 );
397
398 tracing::debug!(
399 "Attempting to make dependency task for view {view_number} and event {event:?}"
400 );
401
402 ensure!(
403 !self.proposal_dependencies.contains_key(&view_number),
404 "Task already exists"
405 );
406
407 let (cancel_sender, cancel_receiver) = broadcast(1);
408
409 let dependency_chain = self.create_and_complete_dependencies(
410 view_number,
411 &event_receiver,
412 event,
413 &cancel_receiver,
414 );
415
416 let dependency_task = DependencyTask::new(
417 dependency_chain,
418 ProposalDependencyHandle {
419 latest_proposed_view: self.latest_proposed_view,
420 view_number,
421 sender: event_sender,
422 receiver: event_receiver,
423 membership: epoch_membership,
424 public_key: self.public_key.clone(),
425 private_key: self.private_key.clone(),
426 instance_state: Arc::clone(&self.instance_state),
427 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
428 timeout: self.timeout,
429 formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
430 upgrade_lock: self.upgrade_lock.clone(),
431 id: self.id,
432 view_start_time: Instant::now(),
433 epoch_height: self.epoch_height,
434 cancel_receiver,
435 },
436 );
437 self.proposal_dependencies
438 .insert(view_number, cancel_sender);
439
440 dependency_task.run();
441
442 Ok(())
443 }
444
445 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
447 async fn update_latest_proposed_view(&mut self, new_view: TYPES::View) -> bool {
448 if *self.latest_proposed_view < *new_view {
449 tracing::debug!(
450 "Updating latest proposed view from {} to {}",
451 *self.latest_proposed_view,
452 *new_view
453 );
454
455 for view in (*self.latest_proposed_view + 1)..=(*new_view) {
457 let maybe_cancel_sender =
458 self.proposal_dependencies.remove(&TYPES::View::new(view));
459 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
460 tracing::error!("Aborting proposal dependency task for view {view}");
461 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
462 }
463 }
464
465 self.latest_proposed_view = new_view;
466
467 return true;
468 }
469 false
470 }
471
472 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
474 pub async fn handle(
475 &mut self,
476 event: Arc<HotShotEvent<TYPES>>,
477 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
478 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
479 ) -> Result<()> {
480 let epoch_number = self.cur_epoch;
481 let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
482 let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
483 is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
484 }) {
485 EpochTransitionIndicator::InTransition
486 } else {
487 EpochTransitionIndicator::NotInTransition
488 };
489 match event.as_ref() {
490 HotShotEvent::UpgradeCertificateFormed(cert) => {
491 tracing::debug!(
492 "Upgrade certificate received for view {}!",
493 *cert.view_number
494 );
495 if cert.data.decide_by >= self.latest_proposed_view + 3 {
497 tracing::debug!("Updating current formed_upgrade_certificate");
498
499 self.formed_upgrade_certificate = Some(cert.clone());
500 }
501 },
502 HotShotEvent::Qc2Formed(cert) => match cert.clone() {
503 either::Right(timeout_cert) => {
504 let view_number = timeout_cert.view_number + 1;
505 self.create_dependency_task_if_new(
506 view_number,
507 epoch_number,
508 event_receiver,
509 event_sender,
510 Arc::clone(&event),
511 epoch_transition_indicator,
512 )
513 .await?;
514 },
515 either::Left(qc) => {
516 if qc.view_number <= self.consensus.read().await.high_qc().view_number {
518 tracing::trace!(
519 "Received a QC for a view that was not > than our current high QC"
520 );
521 }
522
523 self.formed_quorum_certificates
524 .insert(qc.view_number(), qc.clone());
525
526 handle_eqc_formed(
527 qc.view_number(),
528 qc.data.leaf_commit,
529 qc.data.block_number,
530 self,
531 &event_sender,
532 )
533 .await;
534
535 let view_number = qc.view_number() + 1;
536 if !qc
537 .data
538 .block_number
539 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
540 {
541 broadcast_view_change(
542 &event_sender,
543 view_number,
544 qc.data.epoch,
545 self.first_epoch,
546 )
547 .await;
548 }
549 self.create_dependency_task_if_new(
550 view_number,
551 epoch_number,
552 event_receiver,
553 event_sender,
554 Arc::clone(&event),
555 epoch_transition_indicator,
556 )
557 .await?;
558 },
559 },
560
561 HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificateV2 { qc, state_cert }) => {
562 if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
564 tracing::trace!(
565 "Received a QC for a view that was not > than our current high QC"
566 );
567 }
568
569 self.formed_quorum_certificates
570 .insert(qc.view_number(), qc.clone());
571 self.formed_state_cert
572 .insert(state_cert.epoch, state_cert.clone());
573
574 self.storage
575 .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
576 .await
577 .wrap()
578 .context(error!(
579 "Failed to update the epoch root QC and state cert in storage!"
580 ))?;
581
582 let view_number = qc.view_number() + 1;
583 broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
584 .await;
585 self.create_dependency_task_if_new(
586 view_number,
587 epoch_number,
588 event_receiver,
589 event_sender,
590 Arc::clone(&event),
591 epoch_transition_indicator,
592 )
593 .await?;
594 },
595 HotShotEvent::SendPayloadCommitmentAndMetadata(
596 _payload_commitment,
597 _builder_commitment,
598 _metadata,
599 view_number,
600 _fee,
601 ) => {
602 let view_number = *view_number;
603
604 self.create_dependency_task_if_new(
605 view_number,
606 epoch_number,
607 event_receiver,
608 event_sender,
609 Arc::clone(&event),
610 epoch_transition_indicator,
611 )
612 .await?;
613 },
614 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
615 let epoch_number = certificate.data.epoch;
616 let epoch_membership = self
617 .membership_coordinator
618 .stake_table_for_epoch(epoch_number)
619 .await
620 .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
621
622 let membership_stake_table = epoch_membership.stake_table().await;
623 let membership_success_threshold = epoch_membership.success_threshold().await;
624
625 certificate
626 .is_valid_cert(
627 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
628 membership_success_threshold,
629 &self.upgrade_lock,
630 )
631 .await
632 .context(|e| {
633 warn!(
634 "View Sync Finalize certificate {:?} was invalid: {}",
635 certificate.data(),
636 e
637 )
638 })?;
639
640 let view_number = certificate.view_number;
641
642 self.create_dependency_task_if_new(
643 view_number,
644 epoch_number,
645 event_receiver,
646 event_sender,
647 event,
648 epoch_transition_indicator,
649 )
650 .await?;
651 },
652 HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
653 let view_number = proposal.data.view_number();
654 if !self.update_latest_proposed_view(view_number).await {
656 tracing::trace!("Failed to update latest proposed view");
657 }
658
659 self.create_dependency_task_if_new(
660 view_number + 1,
661 epoch_number,
662 event_receiver,
663 event_sender,
664 Arc::clone(&event),
665 epoch_transition_indicator,
666 )
667 .await?;
668 },
669 HotShotEvent::QuorumProposalSend(proposal, _) => {
670 let view = proposal.data.view_number();
671
672 ensure!(
673 self.update_latest_proposed_view(view).await,
674 "Failed to update latest proposed view"
675 );
676 },
677 HotShotEvent::VidDisperseSend(vid_disperse, _) => {
678 let view_number = vid_disperse.data.view_number();
679 self.create_dependency_task_if_new(
680 view_number,
681 epoch_number,
682 event_receiver,
683 event_sender,
684 Arc::clone(&event),
685 epoch_transition_indicator,
686 )
687 .await?;
688 },
689 HotShotEvent::ViewChange(view, epoch) => {
690 if epoch > &self.cur_epoch {
691 self.cur_epoch = *epoch;
692 }
693 let keep_view = TYPES::View::new(view.saturating_sub(1));
694 self.cancel_tasks(keep_view);
695 },
696 HotShotEvent::Timeout(view, ..) => {
697 let keep_view = TYPES::View::new(view.saturating_sub(1));
698 self.cancel_tasks(keep_view);
699 },
700 HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
701 let current_next_epoch_qc =
703 self.consensus.read().await.next_epoch_high_qc().cloned();
704 ensure!(
705 current_next_epoch_qc.is_none()
706 || next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
707 debug!(
708 "Received a next epoch QC for a view that was not > than our current next \
709 epoch high QC"
710 )
711 );
712
713 self.formed_next_epoch_quorum_certificates
714 .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
715
716 handle_eqc_formed(
717 next_epoch_qc.view_number(),
718 next_epoch_qc.data.leaf_commit,
719 next_epoch_qc.data.block_number,
720 self,
721 &event_sender,
722 )
723 .await;
724
725 let view_number = next_epoch_qc.view_number() + 1;
726 self.create_dependency_task_if_new(
727 view_number,
728 epoch_number,
729 event_receiver,
730 event_sender,
731 Arc::clone(&event),
732 epoch_transition_indicator,
733 )
734 .await?;
735 },
736 HotShotEvent::SetFirstEpoch(view, epoch) => {
737 self.first_epoch = Some((*view, *epoch));
738 },
739 _ => {},
740 }
741 Ok(())
742 }
743
744 pub fn cancel_tasks(&mut self, view: TYPES::View) {
746 let keep = self.proposal_dependencies.split_off(&view);
747 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
748 if !cancel_sender.is_closed() {
749 tracing::error!("Aborting proposal dependency task for view {view}");
750 let _ = cancel_sender.try_broadcast(());
751 }
752 }
753 self.proposal_dependencies = keep;
754 }
755}
756
757#[async_trait]
758impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
759 for QuorumProposalTaskState<TYPES, I, V>
760{
761 type Event = HotShotEvent<TYPES>;
762
763 async fn handle_event(
764 &mut self,
765 event: Arc<Self::Event>,
766 sender: &Sender<Arc<Self::Event>>,
767 receiver: &Receiver<Arc<Self::Event>>,
768 ) -> Result<()> {
769 self.handle(event, receiver.clone(), sender.clone()).await
770 }
771
772 fn cancel_subtasks(&mut self) {
773 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
774 if !cancel_sender.is_closed() {
775 tracing::error!("Aborting proposal dependency task for view {view}");
776 let _ = cancel_sender.try_broadcast(());
777 }
778 }
779 }
780}