1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{broadcast, Receiver, Sender};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency, OrDependency},
14 dependency_task::DependencyTask,
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::OuterConsensus,
19 epoch_membership::EpochMembershipCoordinator,
20 message::UpgradeLock,
21 simple_certificate::{
22 EpochRootQuorumCertificate, LightClientStateUpdateCertificate, NextEpochQuorumCertificate2,
23 QuorumCertificate2, UpgradeCertificate,
24 },
25 stake_table::StakeTableEntries,
26 traits::{
27 node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
28 signature_key::SignatureKey,
29 storage::Storage,
30 },
31 utils::{is_epoch_transition, is_last_block, EpochTransitionIndicator},
32 vote::{Certificate, HasViewNumber},
33};
34use hotshot_utils::anytrace::*;
35use tracing::instrument;
36
37use self::handlers::{ProposalDependency, ProposalDependencyHandle};
38use crate::{
39 events::HotShotEvent, helpers::broadcast_view_change,
40 quorum_proposal::handlers::handle_eqc_formed,
41};
42
43mod handlers;
44
45pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> {
47 pub latest_proposed_view: TYPES::View,
49
50 pub cur_epoch: Option<TYPES::Epoch>,
52
53 pub proposal_dependencies: BTreeMap<TYPES::View, Sender<()>>,
55
56 pub formed_quorum_certificates: BTreeMap<TYPES::View, QuorumCertificate2<TYPES>>,
58
59 pub formed_next_epoch_quorum_certificates:
61 BTreeMap<TYPES::View, NextEpochQuorumCertificate2<TYPES>>,
62
63 pub instance_state: Arc<TYPES::InstanceState>,
65
66 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
68
69 pub public_key: TYPES::SignatureKey,
71
72 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
74
75 pub timeout: u64,
77
78 pub storage: I::Storage,
80
81 pub consensus: OuterConsensus<TYPES>,
83
84 pub id: u64,
86
87 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
94
95 pub upgrade_lock: UpgradeLock<TYPES, V>,
97
98 pub epoch_height: u64,
100
101 pub formed_state_cert: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificate<TYPES>>,
103
104 pub first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
106}
107
108impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
109 QuorumProposalTaskState<TYPES, I, V>
110{
111 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
113 fn create_event_dependency(
114 &self,
115 dependency_type: ProposalDependency,
116 view_number: TYPES::View,
117 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
118 cancel_receiver: Receiver<()>,
119 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
120 let id = self.id;
121 EventDependency::new(
122 event_receiver,
123 cancel_receiver,
124 format!(
125 "ProposalDependency::{:?} for view {:?}, my id {:?}",
126 dependency_type, view_number, self.id
127 ),
128 Box::new(move |event| {
129 let event = event.as_ref();
130 let event_view = match dependency_type {
131 ProposalDependency::Qc => {
132 if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
133 qc.view_number() + 1
134 } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
135 root_qc.view_number() + 1
136 } else {
137 return false;
138 }
139 },
140 ProposalDependency::TimeoutCert => {
141 if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
142 timeout.view_number() + 1
143 } else {
144 return false;
145 }
146 },
147 ProposalDependency::ViewSyncCert => {
148 if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
149 {
150 view_sync_cert.view_number()
151 } else {
152 return false;
153 }
154 },
155 ProposalDependency::Proposal => {
156 if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
157 {
158 proposal.data.view_number() + 1
159 } else {
160 return false;
161 }
162 },
163 ProposalDependency::PayloadAndMetadata => {
164 if let HotShotEvent::SendPayloadCommitmentAndMetadata(
165 _payload_commitment,
166 _builder_commitment,
167 _metadata,
168 view_number,
169 _fee,
170 ) = event
171 {
172 *view_number
173 } else {
174 return false;
175 }
176 },
177 ProposalDependency::VidShare => {
178 if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
179 vid_disperse.data.view_number()
180 } else {
181 return false;
182 }
183 },
184 };
185 let valid = event_view == view_number;
186 if valid {
187 tracing::debug!(
188 "Dependency {dependency_type:?} is complete for view {event_view:?}, my id is {id:?}!",
189 );
190 }
191 valid
192 }),
193 )
194 }
195
196 fn create_and_complete_dependencies(
198 &self,
199 view_number: TYPES::View,
200 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
201 event: Arc<HotShotEvent<TYPES>>,
202 cancel_receiver: &Receiver<()>,
203 ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
204 let mut proposal_dependency = self.create_event_dependency(
205 ProposalDependency::Proposal,
206 view_number,
207 event_receiver.clone(),
208 cancel_receiver.clone(),
209 );
210
211 let mut qc_dependency = self.create_event_dependency(
212 ProposalDependency::Qc,
213 view_number,
214 event_receiver.clone(),
215 cancel_receiver.clone(),
216 );
217
218 let mut view_sync_dependency = self.create_event_dependency(
219 ProposalDependency::ViewSyncCert,
220 view_number,
221 event_receiver.clone(),
222 cancel_receiver.clone(),
223 );
224
225 let mut timeout_dependency = self.create_event_dependency(
226 ProposalDependency::TimeoutCert,
227 view_number,
228 event_receiver.clone(),
229 cancel_receiver.clone(),
230 );
231
232 let mut payload_commitment_dependency = self.create_event_dependency(
233 ProposalDependency::PayloadAndMetadata,
234 view_number,
235 event_receiver.clone(),
236 cancel_receiver.clone(),
237 );
238
239 let mut vid_share_dependency = self.create_event_dependency(
240 ProposalDependency::VidShare,
241 view_number,
242 event_receiver.clone(),
243 cancel_receiver.clone(),
244 );
245
246 let epoch_height = self.epoch_height;
247
248 let mut next_epoch_qc_dependency = EventDependency::new(
251 event_receiver.clone(),
252 cancel_receiver.clone(),
253 format!(
254 "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
255 view_number, self.id
256 ),
257 Box::new(move |event| {
258 if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
259 event.as_ref()
260 {
261 return next_epoch_qc.view_number() + 1 == view_number;
262 }
263 if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
264 return true;
266 }
267 if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref() {
268 if qc.view_number() + 1 == view_number {
269 return qc
270 .data
271 .block_number
272 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
273 }
274 }
275 false
276 }),
277 );
278
279 match event.as_ref() {
280 HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
281 payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
282 },
283 HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
284 proposal_dependency.mark_as_completed(event);
285 },
286 HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
287 Either::Right(_) => timeout_dependency.mark_as_completed(event),
288 Either::Left(qc) => {
289 if qc
290 .data
291 .block_number
292 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
293 {
294 next_epoch_qc_dependency.mark_as_completed(event.clone());
295 }
296 qc_dependency.mark_as_completed(event);
297 },
298 },
299 HotShotEvent::EpochRootQcFormed(..) => {
300 next_epoch_qc_dependency.mark_as_completed(event.clone());
302 qc_dependency.mark_as_completed(event);
303 },
304 HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
305 view_sync_dependency.mark_as_completed(event);
306 },
307 HotShotEvent::VidDisperseSend(..) => {
308 vid_share_dependency.mark_as_completed(event);
309 },
310 HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
311 next_epoch_qc_dependency.mark_as_completed(event);
312 },
313 _ => {},
314 };
315
316 let mut secondary_deps = vec![
318 AndDependency::from_deps(vec![timeout_dependency]),
320 AndDependency::from_deps(vec![view_sync_dependency]),
322 ];
323 if *view_number > 1 {
325 secondary_deps.push(AndDependency::from_deps(vec![
326 qc_dependency,
327 proposal_dependency,
328 next_epoch_qc_dependency,
329 ]));
330 } else {
331 secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
332 }
333
334 let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
335
336 AndDependency::from_deps(vec![OrDependency::from_deps(vec![
337 AndDependency::from_deps(vec![
338 OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
339 OrDependency::from_deps(secondary_deps),
340 ]),
341 ])])
342 }
343
344 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
350 async fn create_dependency_task_if_new(
351 &mut self,
352 view_number: TYPES::View,
353 epoch_number: Option<TYPES::Epoch>,
354 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
355 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
356 event: Arc<HotShotEvent<TYPES>>,
357 epoch_transition_indicator: EpochTransitionIndicator,
358 ) -> Result<()> {
359 let epoch_membership = self
360 .membership_coordinator
361 .membership_for_epoch(epoch_number)
362 .await?;
363 let leader_in_current_epoch =
364 epoch_membership.leader(view_number).await? == self.public_key;
365 let leader_in_next_epoch = !leader_in_current_epoch
369 && epoch_number.is_some()
370 && matches!(
371 epoch_transition_indicator,
372 EpochTransitionIndicator::InTransition
373 )
374 && epoch_membership
375 .next_epoch()
376 .await
377 .context(warn!(
378 "Missing the randomized stake table for epoch {}",
379 epoch_number.unwrap() + 1
380 ))?
381 .leader(view_number)
382 .await?
383 == self.public_key;
384
385 ensure!(
387 leader_in_current_epoch || leader_in_next_epoch,
388 debug!("We are not the leader of the next view")
389 );
390
391 ensure!(
393 view_number > self.latest_proposed_view,
394 "We have already proposed for this view"
395 );
396
397 tracing::debug!(
398 "Attempting to make dependency task for view {view_number} and event {event:?}"
399 );
400
401 ensure!(
402 !self.proposal_dependencies.contains_key(&view_number),
403 "Task already exists"
404 );
405
406 let (cancel_sender, cancel_receiver) = broadcast(1);
407
408 let dependency_chain = self.create_and_complete_dependencies(
409 view_number,
410 &event_receiver,
411 event,
412 &cancel_receiver,
413 );
414
415 let dependency_task = DependencyTask::new(
416 dependency_chain,
417 ProposalDependencyHandle {
418 latest_proposed_view: self.latest_proposed_view,
419 view_number,
420 sender: event_sender,
421 receiver: event_receiver,
422 membership: epoch_membership,
423 public_key: self.public_key.clone(),
424 private_key: self.private_key.clone(),
425 instance_state: Arc::clone(&self.instance_state),
426 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
427 timeout: self.timeout,
428 formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
429 upgrade_lock: self.upgrade_lock.clone(),
430 id: self.id,
431 view_start_time: Instant::now(),
432 epoch_height: self.epoch_height,
433 },
434 );
435 self.proposal_dependencies
436 .insert(view_number, cancel_sender);
437
438 dependency_task.run();
439
440 Ok(())
441 }
442
443 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
445 async fn update_latest_proposed_view(&mut self, new_view: TYPES::View) -> bool {
446 if *self.latest_proposed_view < *new_view {
447 tracing::debug!(
448 "Updating latest proposed view from {} to {}",
449 *self.latest_proposed_view,
450 *new_view
451 );
452
453 for view in (*self.latest_proposed_view + 1)..=(*new_view) {
455 let maybe_cancel_sender =
456 self.proposal_dependencies.remove(&TYPES::View::new(view));
457 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
458 tracing::error!("Aborting proposal dependency task for view {view}");
459 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
460 }
461 }
462
463 self.latest_proposed_view = new_view;
464
465 return true;
466 }
467 false
468 }
469
470 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
472 pub async fn handle(
473 &mut self,
474 event: Arc<HotShotEvent<TYPES>>,
475 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
476 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
477 ) -> Result<()> {
478 let epoch_number = self.cur_epoch;
479 let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
480 let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
481 is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
482 }) {
483 EpochTransitionIndicator::InTransition
484 } else {
485 EpochTransitionIndicator::NotInTransition
486 };
487 match event.as_ref() {
488 HotShotEvent::UpgradeCertificateFormed(cert) => {
489 tracing::debug!(
490 "Upgrade certificate received for view {}!",
491 *cert.view_number
492 );
493 if cert.data.decide_by >= self.latest_proposed_view + 3 {
495 tracing::debug!("Updating current formed_upgrade_certificate");
496
497 self.formed_upgrade_certificate = Some(cert.clone());
498 }
499 },
500 HotShotEvent::Qc2Formed(cert) => match cert.clone() {
501 either::Right(timeout_cert) => {
502 let view_number = timeout_cert.view_number + 1;
503 self.create_dependency_task_if_new(
504 view_number,
505 epoch_number,
506 event_receiver,
507 event_sender,
508 Arc::clone(&event),
509 epoch_transition_indicator,
510 )
511 .await?;
512 },
513 either::Left(qc) => {
514 if qc.view_number <= self.consensus.read().await.high_qc().view_number {
516 tracing::trace!(
517 "Received a QC for a view that was not > than our current high QC"
518 );
519 }
520
521 self.formed_quorum_certificates
522 .insert(qc.view_number(), qc.clone());
523
524 handle_eqc_formed(
525 qc.view_number(),
526 qc.data.leaf_commit,
527 qc.data.block_number,
528 self,
529 &event_sender,
530 )
531 .await;
532
533 let view_number = qc.view_number() + 1;
534 if !qc
535 .data
536 .block_number
537 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
538 {
539 broadcast_view_change(
540 &event_sender,
541 view_number,
542 qc.data.epoch,
543 self.first_epoch,
544 )
545 .await;
546 }
547 self.create_dependency_task_if_new(
548 view_number,
549 epoch_number,
550 event_receiver,
551 event_sender,
552 Arc::clone(&event),
553 epoch_transition_indicator,
554 )
555 .await?;
556 },
557 },
558
559 HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificate { qc, state_cert }) => {
560 if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
562 tracing::trace!(
563 "Received a QC for a view that was not > than our current high QC"
564 );
565 }
566
567 self.formed_quorum_certificates
568 .insert(qc.view_number(), qc.clone());
569 self.formed_state_cert
570 .insert(state_cert.epoch, state_cert.clone());
571
572 self.storage
573 .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
574 .await
575 .wrap()
576 .context(error!(
577 "Failed to update the epoch root QC and state cert in storage!"
578 ))?;
579
580 let view_number = qc.view_number() + 1;
581 broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
582 .await;
583 self.create_dependency_task_if_new(
584 view_number,
585 epoch_number,
586 event_receiver,
587 event_sender,
588 Arc::clone(&event),
589 epoch_transition_indicator,
590 )
591 .await?;
592 },
593 HotShotEvent::SendPayloadCommitmentAndMetadata(
594 _payload_commitment,
595 _builder_commitment,
596 _metadata,
597 view_number,
598 _fee,
599 ) => {
600 let view_number = *view_number;
601
602 self.create_dependency_task_if_new(
603 view_number,
604 epoch_number,
605 event_receiver,
606 event_sender,
607 Arc::clone(&event),
608 epoch_transition_indicator,
609 )
610 .await?;
611 },
612 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
613 let epoch_number = certificate.data.epoch;
614 let epoch_membership = self
615 .membership_coordinator
616 .stake_table_for_epoch(epoch_number)
617 .await
618 .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
619
620 let membership_stake_table = epoch_membership.stake_table().await;
621 let membership_success_threshold = epoch_membership.success_threshold().await;
622
623 certificate
624 .is_valid_cert(
625 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
626 membership_success_threshold,
627 &self.upgrade_lock,
628 )
629 .await
630 .context(|e| {
631 warn!(
632 "View Sync Finalize certificate {:?} was invalid: {}",
633 certificate.data(),
634 e
635 )
636 })?;
637
638 let view_number = certificate.view_number;
639
640 self.create_dependency_task_if_new(
641 view_number,
642 epoch_number,
643 event_receiver,
644 event_sender,
645 event,
646 epoch_transition_indicator,
647 )
648 .await?;
649 },
650 HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
651 let view_number = proposal.data.view_number();
652 if !self.update_latest_proposed_view(view_number).await {
654 tracing::trace!("Failed to update latest proposed view");
655 }
656
657 self.create_dependency_task_if_new(
658 view_number + 1,
659 epoch_number,
660 event_receiver,
661 event_sender,
662 Arc::clone(&event),
663 epoch_transition_indicator,
664 )
665 .await?;
666 },
667 HotShotEvent::QuorumProposalSend(proposal, _) => {
668 let view = proposal.data.view_number();
669
670 ensure!(
671 self.update_latest_proposed_view(view).await,
672 "Failed to update latest proposed view"
673 );
674 },
675 HotShotEvent::VidDisperseSend(vid_disperse, _) => {
676 let view_number = vid_disperse.data.view_number();
677 self.create_dependency_task_if_new(
678 view_number,
679 epoch_number,
680 event_receiver,
681 event_sender,
682 Arc::clone(&event),
683 epoch_transition_indicator,
684 )
685 .await?;
686 },
687 HotShotEvent::ViewChange(view, epoch) => {
688 if epoch > &self.cur_epoch {
689 self.cur_epoch = *epoch;
690 }
691 let keep_view = TYPES::View::new(view.saturating_sub(1));
692 self.cancel_tasks(keep_view);
693 },
694 HotShotEvent::Timeout(view, ..) => {
695 let keep_view = TYPES::View::new(view.saturating_sub(1));
696 self.cancel_tasks(keep_view);
697 },
698 HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
699 let current_next_epoch_qc =
701 self.consensus.read().await.next_epoch_high_qc().cloned();
702 ensure!(current_next_epoch_qc.is_none() ||
703 next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
704 debug!("Received a next epoch QC for a view that was not > than our current next epoch high QC")
705 );
706
707 self.formed_next_epoch_quorum_certificates
708 .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
709
710 handle_eqc_formed(
711 next_epoch_qc.view_number(),
712 next_epoch_qc.data.leaf_commit,
713 next_epoch_qc.data.block_number,
714 self,
715 &event_sender,
716 )
717 .await;
718
719 let view_number = next_epoch_qc.view_number() + 1;
720 self.create_dependency_task_if_new(
721 view_number,
722 epoch_number,
723 event_receiver,
724 event_sender,
725 Arc::clone(&event),
726 epoch_transition_indicator,
727 )
728 .await?;
729 },
730 HotShotEvent::SetFirstEpoch(view, epoch) => {
731 self.first_epoch = Some((*view, *epoch));
732 },
733 _ => {},
734 }
735 Ok(())
736 }
737
738 pub fn cancel_tasks(&mut self, view: TYPES::View) {
740 let keep = self.proposal_dependencies.split_off(&view);
741 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
742 if !cancel_sender.is_closed() {
743 tracing::error!("Aborting proposal dependency task for view {view}");
744 let _ = cancel_sender.try_broadcast(());
745 }
746 }
747 self.proposal_dependencies = keep;
748 }
749}
750
751#[async_trait]
752impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
753 for QuorumProposalTaskState<TYPES, I, V>
754{
755 type Event = HotShotEvent<TYPES>;
756
757 async fn handle_event(
758 &mut self,
759 event: Arc<Self::Event>,
760 sender: &Sender<Arc<Self::Event>>,
761 receiver: &Receiver<Arc<Self::Event>>,
762 ) -> Result<()> {
763 self.handle(event, receiver.clone(), sender.clone()).await
764 }
765
766 fn cancel_subtasks(&mut self) {
767 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
768 if !cancel_sender.is_closed() {
769 tracing::error!("Aborting proposal dependency task for view {view}");
770 let _ = cancel_sender.try_broadcast(());
771 }
772 }
773 }
774}