1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender, broadcast};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency, OrDependency},
14 dependency_task::DependencyTask,
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::OuterConsensus,
19 data::{EpochNumber, ViewNumber},
20 epoch_membership::EpochMembershipCoordinator,
21 message::UpgradeLock,
22 simple_certificate::{
23 EpochRootQuorumCertificateV2, LightClientStateUpdateCertificateV2,
24 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
25 },
26 stake_table::StakeTableEntries,
27 traits::{
28 node_implementation::{NodeImplementation, NodeType},
29 signature_key::SignatureKey,
30 storage::Storage,
31 },
32 utils::{EpochTransitionIndicator, is_epoch_transition, is_last_block},
33 vote::{Certificate, HasViewNumber},
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37
38use self::handlers::{ProposalDependency, ProposalDependencyHandle};
39use crate::{
40 events::HotShotEvent, helpers::broadcast_view_change,
41 quorum_proposal::handlers::handle_eqc_formed,
42};
43
44mod handlers;
45
46pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
48 pub latest_proposed_view: ViewNumber,
50
51 pub cur_epoch: Option<EpochNumber>,
53
54 pub proposal_dependencies: BTreeMap<ViewNumber, Sender<()>>,
56
57 pub formed_quorum_certificates: BTreeMap<ViewNumber, QuorumCertificate2<TYPES>>,
59
60 pub formed_next_epoch_quorum_certificates:
62 BTreeMap<ViewNumber, NextEpochQuorumCertificate2<TYPES>>,
63
64 pub instance_state: Arc<TYPES::InstanceState>,
66
67 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
69
70 pub public_key: TYPES::SignatureKey,
72
73 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
75
76 pub timeout: u64,
78
79 pub storage: I::Storage,
81
82 pub consensus: OuterConsensus<TYPES>,
84
85 pub id: u64,
87
88 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
95
96 pub upgrade_lock: UpgradeLock<TYPES>,
98
99 pub epoch_height: u64,
101
102 pub formed_state_cert: BTreeMap<EpochNumber, LightClientStateUpdateCertificateV2<TYPES>>,
104
105 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
107}
108
109impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalTaskState<TYPES, I> {
110 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
112 fn create_event_dependency(
113 &self,
114 dependency_type: ProposalDependency,
115 view_number: ViewNumber,
116 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
117 cancel_receiver: Receiver<()>,
118 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
119 let id = self.id;
120 EventDependency::new(
121 event_receiver,
122 cancel_receiver,
123 format!(
124 "ProposalDependency::{:?} for view {:?}, my id {:?}",
125 dependency_type, view_number, self.id
126 ),
127 Box::new(move |event| {
128 let event = event.as_ref();
129 let event_view = match dependency_type {
130 ProposalDependency::Qc => {
131 if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
132 qc.view_number() + 1
133 } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
134 root_qc.view_number() + 1
135 } else {
136 return false;
137 }
138 },
139 ProposalDependency::TimeoutCert => {
140 if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
141 timeout.view_number() + 1
142 } else {
143 return false;
144 }
145 },
146 ProposalDependency::ViewSyncCert => {
147 if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
148 {
149 view_sync_cert.view_number()
150 } else {
151 return false;
152 }
153 },
154 ProposalDependency::Proposal => {
155 if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
156 {
157 proposal.data.view_number() + 1
158 } else {
159 return false;
160 }
161 },
162 ProposalDependency::PayloadAndMetadata => {
163 if let HotShotEvent::SendPayloadCommitmentAndMetadata(
164 _payload_commitment,
165 _builder_commitment,
166 _metadata,
167 view_number,
168 _fee,
169 ) = event
170 {
171 *view_number
172 } else {
173 return false;
174 }
175 },
176 ProposalDependency::VidShare => {
177 if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
178 vid_disperse.data.view_number()
179 } else {
180 return false;
181 }
182 },
183 };
184 let valid = event_view == view_number;
185 if valid {
186 tracing::debug!(
187 "Dependency {dependency_type:?} is complete for view {event_view:?}, my \
188 id is {id:?}!",
189 );
190 }
191 valid
192 }),
193 )
194 }
195
196 fn create_and_complete_dependencies(
198 &self,
199 view_number: ViewNumber,
200 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
201 event: Arc<HotShotEvent<TYPES>>,
202 cancel_receiver: &Receiver<()>,
203 ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
204 let mut proposal_dependency = self.create_event_dependency(
205 ProposalDependency::Proposal,
206 view_number,
207 event_receiver.clone(),
208 cancel_receiver.clone(),
209 );
210
211 let mut qc_dependency = self.create_event_dependency(
212 ProposalDependency::Qc,
213 view_number,
214 event_receiver.clone(),
215 cancel_receiver.clone(),
216 );
217
218 let mut view_sync_dependency = self.create_event_dependency(
219 ProposalDependency::ViewSyncCert,
220 view_number,
221 event_receiver.clone(),
222 cancel_receiver.clone(),
223 );
224
225 let mut timeout_dependency = self.create_event_dependency(
226 ProposalDependency::TimeoutCert,
227 view_number,
228 event_receiver.clone(),
229 cancel_receiver.clone(),
230 );
231
232 let mut payload_commitment_dependency = self.create_event_dependency(
233 ProposalDependency::PayloadAndMetadata,
234 view_number,
235 event_receiver.clone(),
236 cancel_receiver.clone(),
237 );
238
239 let mut vid_share_dependency = self.create_event_dependency(
240 ProposalDependency::VidShare,
241 view_number,
242 event_receiver.clone(),
243 cancel_receiver.clone(),
244 );
245
246 let epoch_height = self.epoch_height;
247
248 let mut next_epoch_qc_dependency = EventDependency::new(
251 event_receiver.clone(),
252 cancel_receiver.clone(),
253 format!(
254 "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
255 view_number, self.id
256 ),
257 Box::new(move |event| {
258 if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
259 event.as_ref()
260 {
261 return next_epoch_qc.view_number() + 1 == view_number;
262 }
263 if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
264 return true;
266 }
267 if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref()
268 && qc.view_number() + 1 == view_number
269 {
270 return qc
271 .data
272 .block_number
273 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
274 }
275 false
276 }),
277 );
278
279 match event.as_ref() {
280 HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
281 payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
282 },
283 HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
284 proposal_dependency.mark_as_completed(event);
285 },
286 HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
287 Either::Right(_) => timeout_dependency.mark_as_completed(event),
288 Either::Left(qc) => {
289 if qc
290 .data
291 .block_number
292 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
293 {
294 next_epoch_qc_dependency.mark_as_completed(event.clone());
295 }
296 qc_dependency.mark_as_completed(event);
297 },
298 },
299 HotShotEvent::EpochRootQcFormed(..) => {
300 next_epoch_qc_dependency.mark_as_completed(event.clone());
302 qc_dependency.mark_as_completed(event);
303 },
304 HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
305 view_sync_dependency.mark_as_completed(event);
306 },
307 HotShotEvent::VidDisperseSend(..) => {
308 vid_share_dependency.mark_as_completed(event);
309 },
310 HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
311 next_epoch_qc_dependency.mark_as_completed(event);
312 },
313 _ => {},
314 };
315
316 let mut secondary_deps = vec![
318 AndDependency::from_deps(vec![timeout_dependency]),
320 AndDependency::from_deps(vec![view_sync_dependency]),
322 ];
323 if *view_number > 1 {
325 secondary_deps.push(AndDependency::from_deps(vec![
326 qc_dependency,
327 proposal_dependency,
328 next_epoch_qc_dependency,
329 ]));
330 } else {
331 secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
332 }
333
334 let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
335
336 AndDependency::from_deps(vec![OrDependency::from_deps(vec![
337 AndDependency::from_deps(vec![
338 OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
339 OrDependency::from_deps(secondary_deps),
340 ]),
341 ])])
342 }
343
344 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
350 async fn create_dependency_task_if_new(
351 &mut self,
352 view_number: ViewNumber,
353 epoch_number: Option<EpochNumber>,
354 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
355 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
356 event: Arc<HotShotEvent<TYPES>>,
357 epoch_transition_indicator: EpochTransitionIndicator,
358 ) -> Result<()> {
359 let epoch_membership = self
360 .membership_coordinator
361 .membership_for_epoch(epoch_number)
362 .await?;
363 let leader_in_current_epoch =
364 epoch_membership.leader(view_number).await? == self.public_key;
365 let leader_in_next_epoch = !leader_in_current_epoch
369 && epoch_number.is_some()
370 && matches!(
371 epoch_transition_indicator,
372 EpochTransitionIndicator::InTransition
373 )
374 && epoch_membership
375 .next_epoch()
376 .await
377 .context(warn!(
378 "Missing the randomized stake table for epoch {}",
379 epoch_number.unwrap() + 1
380 ))?
381 .leader(view_number)
382 .await?
383 == self.public_key;
384
385 ensure!(
387 leader_in_current_epoch || leader_in_next_epoch,
388 debug!("We are not the leader of the next view")
389 );
390
391 ensure!(
393 view_number > self.latest_proposed_view,
394 "We have already proposed for this view"
395 );
396
397 tracing::debug!(
398 "Attempting to make dependency task for view {view_number} and event {event:?}"
399 );
400
401 ensure!(
402 !self.proposal_dependencies.contains_key(&view_number),
403 "Task already exists"
404 );
405
406 let (cancel_sender, cancel_receiver) = broadcast(1);
407
408 let dependency_chain = self.create_and_complete_dependencies(
409 view_number,
410 &event_receiver,
411 event,
412 &cancel_receiver,
413 );
414
415 let dependency_task = DependencyTask::new(
416 dependency_chain,
417 ProposalDependencyHandle {
418 latest_proposed_view: self.latest_proposed_view,
419 view_number,
420 sender: event_sender,
421 receiver: event_receiver,
422 membership: epoch_membership,
423 public_key: self.public_key.clone(),
424 private_key: self.private_key.clone(),
425 instance_state: Arc::clone(&self.instance_state),
426 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
427 timeout: self.timeout,
428 formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
429 upgrade_lock: self.upgrade_lock.clone(),
430 id: self.id,
431 view_start_time: Instant::now(),
432 epoch_height: self.epoch_height,
433 cancel_receiver,
434 },
435 );
436 self.proposal_dependencies
437 .insert(view_number, cancel_sender);
438
439 dependency_task.run();
440
441 Ok(())
442 }
443
444 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
446 async fn update_latest_proposed_view(&mut self, new_view: ViewNumber) -> bool {
447 if *self.latest_proposed_view < *new_view {
448 tracing::debug!(
449 "Updating latest proposed view from {} to {}",
450 *self.latest_proposed_view,
451 *new_view
452 );
453
454 for view in (*self.latest_proposed_view + 1)..=(*new_view) {
456 let maybe_cancel_sender = self.proposal_dependencies.remove(&ViewNumber::new(view));
457 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
458 tracing::debug!("Aborting proposal dependency task for view {view}");
459 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
460 }
461 }
462
463 self.latest_proposed_view = new_view;
464
465 return true;
466 }
467 false
468 }
469
470 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
472 pub async fn handle(
473 &mut self,
474 event: Arc<HotShotEvent<TYPES>>,
475 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
476 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
477 ) -> Result<()> {
478 let epoch_number = self.cur_epoch;
479 let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
480 let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
481 is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
482 }) {
483 EpochTransitionIndicator::InTransition
484 } else {
485 EpochTransitionIndicator::NotInTransition
486 };
487 match event.as_ref() {
488 HotShotEvent::UpgradeCertificateFormed(cert) => {
489 tracing::debug!(
490 "Upgrade certificate received for view {}!",
491 *cert.view_number
492 );
493 if cert.data.decide_by >= self.latest_proposed_view + 3 {
495 tracing::debug!("Updating current formed_upgrade_certificate");
496
497 self.formed_upgrade_certificate = Some(cert.clone());
498 }
499 },
500 HotShotEvent::Qc2Formed(cert) => match cert.clone() {
501 either::Right(timeout_cert) => {
502 let view_number = timeout_cert.view_number + 1;
503 self.create_dependency_task_if_new(
504 view_number,
505 epoch_number,
506 event_receiver,
507 event_sender,
508 Arc::clone(&event),
509 epoch_transition_indicator,
510 )
511 .await?;
512 },
513 either::Left(qc) => {
514 if qc.view_number <= self.consensus.read().await.high_qc().view_number {
516 tracing::trace!(
517 "Received a QC for a view that was not > than our current high QC"
518 );
519 }
520
521 self.formed_quorum_certificates
522 .insert(qc.view_number(), qc.clone());
523
524 handle_eqc_formed(
525 qc.view_number(),
526 qc.data.leaf_commit,
527 qc.data.block_number,
528 self,
529 &event_sender,
530 )
531 .await;
532
533 let view_number = qc.view_number() + 1;
534 if !qc
535 .data
536 .block_number
537 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
538 {
539 broadcast_view_change(
540 &event_sender,
541 view_number,
542 qc.data.epoch,
543 self.first_epoch,
544 )
545 .await;
546 }
547 self.create_dependency_task_if_new(
548 view_number,
549 epoch_number,
550 event_receiver,
551 event_sender,
552 Arc::clone(&event),
553 epoch_transition_indicator,
554 )
555 .await?;
556 },
557 },
558
559 HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificateV2 { qc, state_cert }) => {
560 if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
562 tracing::trace!(
563 "Received a QC for a view that was not > than our current high QC"
564 );
565 }
566
567 self.formed_quorum_certificates
568 .insert(qc.view_number(), qc.clone());
569 self.formed_state_cert
570 .insert(state_cert.epoch, state_cert.clone());
571
572 self.storage
573 .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
574 .await
575 .wrap()
576 .context(error!(
577 "Failed to update the epoch root QC and state cert in storage!"
578 ))?;
579
580 let view_number = qc.view_number() + 1;
581 broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
582 .await;
583 self.create_dependency_task_if_new(
584 view_number,
585 epoch_number,
586 event_receiver,
587 event_sender,
588 Arc::clone(&event),
589 epoch_transition_indicator,
590 )
591 .await?;
592 },
593 HotShotEvent::SendPayloadCommitmentAndMetadata(
594 _payload_commitment,
595 _builder_commitment,
596 _metadata,
597 view_number,
598 _fee,
599 ) => {
600 let view_number = *view_number;
601
602 self.create_dependency_task_if_new(
603 view_number,
604 epoch_number,
605 event_receiver,
606 event_sender,
607 Arc::clone(&event),
608 epoch_transition_indicator,
609 )
610 .await?;
611 },
612 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
613 let epoch_number = certificate.data.epoch;
614 let epoch_membership = self
615 .membership_coordinator
616 .stake_table_for_epoch(epoch_number)
617 .await
618 .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
619
620 let membership_stake_table = epoch_membership.stake_table().await;
621 let membership_success_threshold = epoch_membership.success_threshold().await;
622
623 certificate
624 .is_valid_cert(
625 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
626 membership_success_threshold,
627 &self.upgrade_lock,
628 )
629 .await
630 .context(|e| {
631 warn!(
632 "View Sync Finalize certificate {:?} was invalid: {}",
633 certificate.data(),
634 e
635 )
636 })?;
637
638 let view_number = certificate.view_number;
639
640 self.create_dependency_task_if_new(
641 view_number,
642 epoch_number,
643 event_receiver,
644 event_sender,
645 event,
646 epoch_transition_indicator,
647 )
648 .await?;
649 },
650 HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
651 let view_number = proposal.data.view_number();
652 if !self.update_latest_proposed_view(view_number).await {
654 tracing::trace!("Failed to update latest proposed view");
655 }
656
657 self.create_dependency_task_if_new(
658 view_number + 1,
659 epoch_number,
660 event_receiver,
661 event_sender,
662 Arc::clone(&event),
663 epoch_transition_indicator,
664 )
665 .await?;
666 },
667 HotShotEvent::QuorumProposalSend(proposal, _) => {
668 let view = proposal.data.view_number();
669
670 ensure!(
671 self.update_latest_proposed_view(view).await,
672 "Failed to update latest proposed view"
673 );
674 },
675 HotShotEvent::VidDisperseSend(vid_disperse, _) => {
676 let view_number = vid_disperse.data.view_number();
677 self.create_dependency_task_if_new(
678 view_number,
679 epoch_number,
680 event_receiver,
681 event_sender,
682 Arc::clone(&event),
683 epoch_transition_indicator,
684 )
685 .await?;
686 },
687 HotShotEvent::ViewChange(view, epoch) => {
688 if epoch > &self.cur_epoch {
689 self.cur_epoch = *epoch;
690 }
691 let keep_view = ViewNumber::new(view.saturating_sub(1));
692 self.cancel_tasks(keep_view);
693 },
694 HotShotEvent::Timeout(view, ..) => {
695 let keep_view = ViewNumber::new(view.saturating_sub(1));
696 self.cancel_tasks(keep_view);
697 },
698 HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
699 let current_next_epoch_qc =
701 self.consensus.read().await.next_epoch_high_qc().cloned();
702 ensure!(
703 current_next_epoch_qc.is_none()
704 || next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
705 debug!(
706 "Received a next epoch QC for a view that was not > than our current next \
707 epoch high QC"
708 )
709 );
710
711 self.formed_next_epoch_quorum_certificates
712 .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
713
714 handle_eqc_formed(
715 next_epoch_qc.view_number(),
716 next_epoch_qc.data.leaf_commit,
717 next_epoch_qc.data.block_number,
718 self,
719 &event_sender,
720 )
721 .await;
722
723 let view_number = next_epoch_qc.view_number() + 1;
724 self.create_dependency_task_if_new(
725 view_number,
726 epoch_number,
727 event_receiver,
728 event_sender,
729 Arc::clone(&event),
730 epoch_transition_indicator,
731 )
732 .await?;
733 },
734 HotShotEvent::SetFirstEpoch(view, epoch) => {
735 self.first_epoch = Some((*view, *epoch));
736 },
737 _ => {},
738 }
739 Ok(())
740 }
741
742 pub fn cancel_tasks(&mut self, view: ViewNumber) {
744 let keep = self.proposal_dependencies.split_off(&view);
745 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
746 if !cancel_sender.is_closed() {
747 tracing::debug!("Aborting proposal dependency task for view {view}");
748 let _ = cancel_sender.try_broadcast(());
749 }
750 }
751 self.proposal_dependencies = keep;
752 }
753}
754
755#[async_trait]
756impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState
757 for QuorumProposalTaskState<TYPES, I>
758{
759 type Event = HotShotEvent<TYPES>;
760
761 async fn handle_event(
762 &mut self,
763 event: Arc<Self::Event>,
764 sender: &Sender<Arc<Self::Event>>,
765 receiver: &Receiver<Arc<Self::Event>>,
766 ) -> Result<()> {
767 self.handle(event, receiver.clone(), sender.clone()).await
768 }
769
770 fn cancel_subtasks(&mut self) {
771 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
772 if !cancel_sender.is_closed() {
773 tracing::debug!("Aborting proposal dependency task for view {view}");
774 let _ = cancel_sender.try_broadcast(());
775 }
776 }
777 }
778}