1use std::{
11 marker::PhantomData,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_task::dependency_task::HandleDepOutput;
19use hotshot_types::{
20 consensus::{CommitmentAndMetadata, OuterConsensus},
21 data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
22 epoch_membership::EpochMembership,
23 message::Proposal,
24 simple_certificate::{
25 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
26 UpgradeCertificate,
27 },
28 traits::{
29 block_contents::BlockHeader,
30 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
31 signature_key::SignatureKey,
32 storage::Storage,
33 BlockPayload,
34 },
35 utils::{
36 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
37 is_transition_block, option_epoch_from_block_number,
38 },
39 vote::HasViewNumber,
40};
41use hotshot_utils::anytrace::*;
42use tracing::instrument;
43use vbs::version::StaticVersionType;
44
45use crate::{
46 events::HotShotEvent,
47 helpers::{
48 broadcast_event, check_qc_state_cert_correspondence, parent_leaf_and_state,
49 validate_light_client_state_update_certificate, validate_qc_and_next_epoch_qc,
50 },
51 quorum_proposal::{QuorumProposalTaskState, UpgradeLock, Versions},
52};
53
54#[derive(PartialEq, Debug)]
56pub(crate) enum ProposalDependency {
57 PayloadAndMetadata,
59
60 Qc,
62
63 ViewSyncCert,
65
66 TimeoutCert,
68
69 Proposal,
71
72 VidShare,
74}
75
76pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
78 pub latest_proposed_view: TYPES::View,
80
81 pub view_number: TYPES::View,
83
84 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87 pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
89
90 pub instance_state: Arc<TYPES::InstanceState>,
92
93 pub membership: EpochMembership<TYPES>,
95
96 pub public_key: TYPES::SignatureKey,
98
99 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
101
102 pub consensus: OuterConsensus<TYPES>,
104
105 pub timeout: u64,
107
108 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
115
116 pub upgrade_lock: UpgradeLock<TYPES, V>,
118
119 pub id: u64,
121
122 pub view_start_time: Instant,
124
125 pub epoch_height: u64,
127
128 pub cancel_receiver: Receiver<()>,
129}
130
131impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
132 async fn wait_for_qc_event(
134 &self,
135 mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
136 ) -> Option<(
137 QuorumCertificate2<TYPES>,
138 Option<NextEpochQuorumCertificate2<TYPES>>,
139 Option<LightClientStateUpdateCertificateV2<TYPES>>,
140 )> {
141 while let Ok(event) = rx.recv_direct().await {
142 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
143 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
144 (qc, maybe_next_epoch_qc, None)
145 },
146 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
147 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
148 },
149 _ => continue,
150 };
151 if validate_qc_and_next_epoch_qc(
152 qc,
153 maybe_next_epoch_qc.as_ref(),
154 &self.consensus,
155 &self.membership.coordinator,
156 &self.upgrade_lock,
157 self.epoch_height,
158 )
159 .await
160 .is_ok()
161 {
162 if qc
163 .data
164 .block_number
165 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
166 {
167 if let Some(state_cert) = &maybe_state_cert {
169 if validate_light_client_state_update_certificate(
170 state_cert,
171 &self.membership.coordinator,
172 &self.upgrade_lock,
173 )
174 .await
175 .is_err()
176 || !check_qc_state_cert_correspondence(
177 qc,
178 state_cert,
179 self.epoch_height,
180 )
181 {
182 tracing::error!("Failed to validate state cert");
183 return None;
184 }
185 } else {
186 tracing::error!(
187 "Received an epoch root QC but we don't have the corresponding state \
188 cert."
189 );
190 return None;
191 }
192 } else {
193 maybe_state_cert = None;
194 }
195 return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
196 }
197 }
198 None
199 }
200
201 async fn wait_for_transition_qc(
202 &self,
203 ) -> Result<
204 Option<(
205 QuorumCertificate2<TYPES>,
206 NextEpochQuorumCertificate2<TYPES>,
207 )>,
208 > {
209 ensure!(
210 self.upgrade_lock.epochs_enabled(self.view_number).await,
211 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
212 );
213
214 let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
215
216 let wait_duration = Duration::from_millis(self.timeout / 2);
217
218 let mut rx = self.receiver.clone();
219
220 while let Ok(event) = rx.try_recv() {
223 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
224 if let Some(block_number) = qc.data.block_number {
225 if !is_transition_block(block_number, self.epoch_height) {
226 continue;
227 }
228 } else {
229 continue;
230 }
231 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
232 continue;
233 };
234 if validate_qc_and_next_epoch_qc(
235 qc,
236 Some(next_epoch_qc),
237 &self.consensus,
238 &self.membership.coordinator,
239 &self.upgrade_lock,
240 self.epoch_height,
241 )
242 .await
243 .is_ok()
244 && transition_qc
245 .as_ref()
246 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
247 {
248 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
249 }
250 }
251 }
252 while self.view_start_time.elapsed() < wait_duration {
254 let time_spent = Instant::now()
255 .checked_duration_since(self.view_start_time)
256 .ok_or(error!(
257 "Time elapsed since the start of the task is negative. This should never \
258 happen."
259 ))?;
260 let time_left = wait_duration
261 .checked_sub(time_spent)
262 .ok_or(info!("No time left"))?;
263 let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
264 return Ok(transition_qc);
265 };
266 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
267 if let Some(block_number) = qc.data.block_number {
268 if !is_transition_block(block_number, self.epoch_height) {
269 continue;
270 }
271 } else {
272 continue;
273 }
274 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
275 continue;
276 };
277 if validate_qc_and_next_epoch_qc(
278 qc,
279 Some(next_epoch_qc),
280 &self.consensus,
281 &self.membership.coordinator,
282 &self.upgrade_lock,
283 self.epoch_height,
284 )
285 .await
286 .is_ok()
287 && transition_qc
288 .as_ref()
289 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
290 {
291 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
292 }
293 }
294 }
295 Ok(transition_qc)
296 }
297 async fn wait_for_highest_qc(
301 &self,
302 ) -> Result<(
303 QuorumCertificate2<TYPES>,
304 Option<NextEpochQuorumCertificate2<TYPES>>,
305 Option<LightClientStateUpdateCertificateV2<TYPES>>,
306 )> {
307 tracing::debug!("waiting for QC");
308 ensure!(
310 self.upgrade_lock.epochs_enabled(self.view_number).await,
311 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
312 );
313
314 let consensus_reader = self.consensus.read().await;
315 let mut highest_qc = consensus_reader.high_qc().clone();
316 let mut state_cert = if highest_qc
317 .data
318 .block_number
319 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
320 {
321 consensus_reader.state_cert().cloned()
322 } else {
323 None
324 };
325 let mut next_epoch_qc = if highest_qc
326 .data
327 .block_number
328 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
329 {
330 let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
331 if maybe_neqc
332 .as_ref()
333 .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
334 {
335 maybe_neqc
336 } else {
337 None
338 }
339 } else {
340 None
341 };
342 drop(consensus_reader);
343
344 let wait_duration = Duration::from_millis(self.timeout / 2);
345
346 let mut rx = self.receiver.clone();
347
348 while let Ok(event) = rx.try_recv() {
350 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
351 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
352 (qc, maybe_next_epoch_qc, None)
353 },
354 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
355 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
356 },
357 _ => continue,
358 };
359 if validate_qc_and_next_epoch_qc(
360 qc,
361 maybe_next_epoch_qc.as_ref(),
362 &self.consensus,
363 &self.membership.coordinator,
364 &self.upgrade_lock,
365 self.epoch_height,
366 )
367 .await
368 .is_ok()
369 {
370 if qc
371 .data
372 .block_number
373 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
374 {
375 if let Some(state_cert) = &maybe_state_cert {
377 if validate_light_client_state_update_certificate(
378 state_cert,
379 &self.membership.coordinator,
380 &self.upgrade_lock,
381 )
382 .await
383 .is_err()
384 || !check_qc_state_cert_correspondence(
385 qc,
386 state_cert,
387 self.epoch_height,
388 )
389 {
390 tracing::error!("Failed to validate state cert");
391 continue;
392 }
393 } else {
394 tracing::error!(
395 "Received an epoch root QC but we don't have the corresponding state \
396 cert."
397 );
398 continue;
399 }
400 } else {
401 maybe_state_cert = None;
402 }
403 if qc.view_number() > highest_qc.view_number() {
404 highest_qc = qc.clone();
405 next_epoch_qc = maybe_next_epoch_qc.clone();
406 state_cert = maybe_state_cert;
407 }
408 }
409 }
410
411 while self.view_start_time.elapsed() < wait_duration {
413 let time_spent = Instant::now()
414 .checked_duration_since(self.view_start_time)
415 .ok_or(error!(
416 "Time elapsed since the start of the task is negative. This should never \
417 happen."
418 ))?;
419 let time_left = wait_duration
420 .checked_sub(time_spent)
421 .ok_or(info!("No time left"))?;
422 let Ok(maybe_qc_state_cert) =
423 tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
424 else {
425 tracing::info!(
426 "Some nodes did not respond with their HighQc in time. Continuing with the \
427 highest QC that we received: {highest_qc:?}"
428 );
429 return Ok((highest_qc, next_epoch_qc, state_cert));
430 };
431 let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
432 continue;
433 };
434 if qc.view_number() > highest_qc.view_number() {
435 highest_qc = qc;
436 next_epoch_qc = maybe_next_epoch_qc;
437 state_cert = maybe_state_cert;
438 }
439 }
440 Ok((highest_qc, next_epoch_qc, state_cert))
441 }
442 #[allow(clippy::too_many_arguments)]
446 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
447 async fn publish_proposal(
448 &self,
449 commitment_and_metadata: CommitmentAndMetadata<TYPES>,
450 _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
451 view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
452 formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
453 parent_qc: QuorumCertificate2<TYPES>,
454 maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
455 maybe_state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
456 ) -> Result<()> {
457 let (parent_leaf, state) = parent_leaf_and_state(
458 &self.sender,
459 &self.receiver,
460 self.membership.coordinator.clone(),
461 self.public_key.clone(),
462 self.private_key.clone(),
463 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
464 &self.upgrade_lock,
465 &parent_qc,
466 self.epoch_height,
467 )
468 .await?;
469
470 let mut upgrade_certificate = parent_leaf
485 .upgrade_certificate()
486 .or(formed_upgrade_certificate);
487
488 if let Some(cert) = upgrade_certificate.clone() {
489 if cert.is_relevant(self.view_number).await.is_err() {
490 upgrade_certificate = None;
491 }
492 }
493
494 let proposal_certificate = view_change_evidence
495 .as_ref()
496 .filter(|cert| cert.is_valid_for_view(&self.view_number))
497 .cloned();
498
499 ensure!(
500 commitment_and_metadata.block_view == self.view_number,
501 "Cannot propose because our VID payload commitment and metadata is for an older view."
502 );
503
504 let version = self.upgrade_lock.version(self.view_number).await?;
505
506 let builder_commitment = commitment_and_metadata.builder_commitment.clone();
507 let metadata = commitment_and_metadata.metadata.clone();
508
509 if version >= V::Epochs::VERSION
510 && parent_qc.view_number()
511 > self
512 .upgrade_lock
513 .upgrade_view()
514 .await
515 .unwrap_or(TYPES::View::new(0))
516 {
517 let Some(parent_block_number) = parent_qc.data.block_number else {
518 tracing::error!("Parent QC does not have a block number. Do not propose.");
519 return Ok(());
520 };
521 if is_epoch_transition(parent_block_number, self.epoch_height)
522 && !is_last_block(parent_block_number, self.epoch_height)
523 {
524 let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
525 tracing::info!("Reached end of epoch.");
526 ensure!(
527 builder_commitment == empty_payload.builder_commitment(&metadata)
528 && metadata == empty_metadata,
529 "We're trying to propose non empty block in the epoch transition. Do not \
530 propose. View number: {}. Parent Block number: {}",
531 self.view_number,
532 parent_block_number,
533 );
534 }
535 if is_epoch_root(parent_block_number, self.epoch_height) {
536 ensure!(
537 maybe_state_cert.as_ref().is_some_and(|state_cert| {
538 check_qc_state_cert_correspondence(
539 &parent_qc,
540 state_cert,
541 self.epoch_height,
542 )
543 }),
544 "We are proposing with parent epoch root QC but we don't have the \
545 corresponding state cert."
546 );
547 }
548 }
549 let block_header = TYPES::BlockHeader::new(
550 state.as_ref(),
551 self.instance_state.as_ref(),
552 &parent_leaf,
553 commitment_and_metadata.commitment,
554 builder_commitment,
555 metadata,
556 commitment_and_metadata.fees.first().clone(),
557 version,
558 *self.view_number,
559 )
560 .await
561 .wrap()
562 .context(warn!("Failed to construct block header"))?;
563 let epoch = option_epoch_from_block_number::<TYPES>(
564 version >= V::Epochs::VERSION,
565 block_header.block_number(),
566 self.epoch_height,
567 );
568
569 let epoch_membership = self
570 .membership
571 .coordinator
572 .membership_for_epoch(epoch)
573 .await?;
574 if epoch_membership.leader(self.view_number).await? != self.public_key {
577 tracing::warn!(
578 "We are not the leader in the epoch for which we are about to propose. Do not \
579 send the quorum proposal."
580 );
581 return Ok(());
582 }
583 let is_high_qc_for_transition_block = parent_qc
584 .data
585 .block_number
586 .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
587 let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number).await
588 && is_high_qc_for_transition_block
589 {
590 ensure!(
591 maybe_next_epoch_qc
592 .as_ref()
593 .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
594 "Justify QC on our proposal is for an epoch transition block but we don't have \
595 the corresponding next epoch QC. Do not propose."
596 );
597 maybe_next_epoch_qc
598 } else {
599 None
600 };
601 let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
602 {
603 if let Some(epoch_val) = &epoch {
604 let drb_result = epoch_membership
605 .next_epoch()
606 .await
607 .context(warn!("No stake table for epoch {}", *epoch_val + 1))?
608 .get_epoch_drb()
609 .await
610 .clone()
611 .context(warn!("No DRB result for epoch {}", *epoch_val + 1))?;
612
613 Some(drb_result)
614 } else {
615 None
616 }
617 } else {
618 None
619 };
620
621 let proposal = QuorumProposalWrapper {
622 proposal: QuorumProposal2 {
623 block_header,
624 view_number: self.view_number,
625 epoch,
626 justify_qc: parent_qc,
627 next_epoch_justify_qc: next_epoch_qc,
628 upgrade_certificate,
629 view_change_evidence: proposal_certificate,
630 next_drb_result,
631 state_cert: maybe_state_cert,
632 },
633 };
634
635 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
636 ensure!(
637 proposed_leaf.parent_commitment() == parent_leaf.commit(),
638 "Proposed leaf parent does not equal high qc"
639 );
640
641 let signature =
642 TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
643 .wrap()
644 .context(error!("Failed to compute proposed_leaf.commit()"))?;
645
646 let message = Proposal {
647 data: proposal,
648 signature,
649 _pd: PhantomData,
650 };
651 tracing::info!(
652 "Sending proposal for view {}, height {}, justify_qc view: {}",
653 proposed_leaf.view_number(),
654 proposed_leaf.height(),
655 proposed_leaf.justify_qc().view_number()
656 );
657
658 broadcast_event(
659 Arc::new(HotShotEvent::QuorumProposalSend(
660 message.clone(),
661 self.public_key.clone(),
662 )),
663 &self.sender,
664 )
665 .await;
666
667 Ok(())
668 }
669
670 fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
671 let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
672 tracing::warn!("Failed to propose, events: {:#?}", events);
673 }
674
675 async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
676 let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
677 let mut timeout_certificate = None;
678 let mut view_sync_finalize_cert = None;
679 let mut vid_share = None;
680 let mut parent_qc = None;
681 let mut next_epoch_qc = None;
682 let mut state_cert = None;
683 for event in res.iter().flatten().flatten() {
684 match event.as_ref() {
685 HotShotEvent::SendPayloadCommitmentAndMetadata(
686 payload_commitment,
687 builder_commitment,
688 metadata,
689 view,
690 fees,
691 ) => {
692 commit_and_metadata = Some(CommitmentAndMetadata {
693 commitment: *payload_commitment,
694 builder_commitment: builder_commitment.clone(),
695 metadata: metadata.clone(),
696 fees: fees.clone(),
697 block_view: *view,
698 });
699 },
700 HotShotEvent::Qc2Formed(cert) => match cert {
701 either::Right(timeout) => {
702 timeout_certificate = Some(timeout.clone());
703 },
704 either::Left(qc) => {
705 parent_qc = Some(qc.clone());
706 },
707 },
708 HotShotEvent::EpochRootQcFormed(root_qc) => {
709 parent_qc = Some(root_qc.qc.clone());
710 state_cert = Some(root_qc.state_cert.clone());
711 },
712 HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
713 view_sync_finalize_cert = Some(cert.clone());
714 },
715 HotShotEvent::VidDisperseSend(share, _) => {
716 vid_share = Some(share.clone());
717 },
718 HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
719 next_epoch_qc = Some(qc.clone());
720 },
721 _ => {},
722 }
723 }
724
725 let Ok(version) = self.upgrade_lock.version(self.view_number).await else {
726 bail!(error!(
727 "Failed to get version for view {:?}, not proposing",
728 self.view_number
729 ));
730 };
731
732 let mut maybe_epoch = None;
733 let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
734 maybe_epoch = view_sync_cert.data.epoch;
735 Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
736 } else {
737 match timeout_certificate {
738 Some(timeout_cert) => {
739 maybe_epoch = timeout_cert.data.epoch;
740 Some(ViewChangeEvidence2::Timeout(timeout_cert))
741 },
742 None => None,
743 }
744 };
745
746 let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
747 if qc
748 .data
749 .block_number
750 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
751 && next_epoch_qc
752 .as_ref()
753 .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
754 {
755 bail!(error!(
756 "We've formed a transition QC but we haven't formed the corresponding next \
757 epoch QC. Do not propose."
758 ));
759 }
760 (qc, next_epoch_qc, state_cert)
761 } else if version < V::Epochs::VERSION {
762 (self.consensus.read().await.high_qc().clone(), None, None)
763 } else if proposal_cert.is_some() {
764 if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
766 let Some(epoch) = maybe_epoch else {
767 bail!(error!(
768 "No epoch found on view change evidence, but we are in epoch mode"
769 ));
770 };
771 if qc
772 .data
773 .block_number
774 .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
775 {
776 (qc, Some(next_epoch_qc), None)
777 } else {
778 match self.wait_for_highest_qc().await {
779 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
780 (qc, maybe_next_epoch_qc, maybe_state_cert)
781 },
782 Err(e) => {
783 bail!(error!("Error while waiting for highest QC: {e:?}"));
784 },
785 }
786 }
787 } else {
788 let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
789 self.wait_for_highest_qc().await
790 else {
791 bail!(error!("Error while waiting for highest QC"));
792 };
793 if qc.data.block_number.is_some_and(|bn| {
794 is_epoch_transition(bn, self.epoch_height)
795 && !is_last_block(bn, self.epoch_height)
796 }) {
797 bail!(error!(
798 "High is in transition but we need to propose with transition QC, do \
799 nothing"
800 ));
801 }
802 (qc, maybe_next_epoch_qc, maybe_state_cert)
803 }
804 } else {
805 match self.wait_for_highest_qc().await {
806 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
807 (qc, maybe_next_epoch_qc, maybe_state_cert)
808 },
809 Err(e) => {
810 bail!(error!("Error while waiting for highest QC: {e:?}"));
811 },
812 }
813 };
814
815 ensure!(
816 commit_and_metadata.is_some(),
817 error!(
818 "Somehow completed the proposal dependency task without a commitment and metadata"
819 )
820 );
821
822 ensure!(
823 vid_share.is_some(),
824 error!("Somehow completed the proposal dependency task without a VID share")
825 );
826
827 self.publish_proposal(
828 commit_and_metadata.unwrap(),
829 vid_share.unwrap(),
830 proposal_cert,
831 self.formed_upgrade_certificate.clone(),
832 parent_qc,
833 maybe_next_epoch_qc,
834 maybe_state_cert,
835 )
836 .await
837 .inspect_err(|e| tracing::error!("Failed to publish proposal: {e:?}"))
838 }
839}
840
841impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<TYPES, V> {
842 type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
843
844 #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
845 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
846 async fn handle_dep_result(self, res: Self::Output) {
847 let mut cancel_receiver = self.cancel_receiver.clone();
848 let result = tokio::select! {
849 result = self.handle_proposal_deps(&res) => {
850 result
851 }
852 _ = cancel_receiver.recv() => {
853 tracing::warn!("Proposal dependency task cancelled");
854 return;
855 }
856 };
857 if result.is_err() {
858 log!(result);
859 self.print_proposal_events(&res)
860 }
861 }
862}
863
864pub(super) async fn handle_eqc_formed<
865 TYPES: NodeType,
866 I: NodeImplementation<TYPES>,
867 V: Versions,
868>(
869 cert_view: TYPES::View,
870 leaf_commit: Commitment<Leaf2<TYPES>>,
871 block_number: Option<u64>,
872 task_state: &mut QuorumProposalTaskState<TYPES, I, V>,
873 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
874) {
875 if !task_state.upgrade_lock.epochs_enabled(cert_view).await {
876 tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
877 return;
878 }
879 if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
880 tracing::debug!("We formed QC but not eQC. Do nothing");
881 return;
882 }
883
884 let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
885 tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
886 return;
887 };
888 if current_epoch_qc.view_number() != cert_view
889 || current_epoch_qc.data.leaf_commit != leaf_commit
890 {
891 tracing::debug!("We haven't yet formed the eQC. Do nothing");
892 return;
893 }
894 let Some(next_epoch_qc) = task_state
895 .formed_next_epoch_quorum_certificates
896 .get(&cert_view)
897 else {
898 tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
899 return;
900 };
901 if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
902 tracing::debug!(
903 "We formed the eQC but the current and next epoch QCs do not correspond to each other."
904 );
905 return;
906 }
907 let current_epoch_qc_clone = current_epoch_qc.clone();
908
909 let mut consensus_writer = task_state.consensus.write().await;
910 let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
911 let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
912 drop(consensus_writer);
913
914 if let Err(e) = task_state
915 .storage
916 .update_eqc(current_epoch_qc.clone(), next_epoch_qc.clone())
917 .await
918 {
919 tracing::error!("Failed to store EQC: {}", e);
920 }
921
922 task_state.formed_quorum_certificates =
923 task_state.formed_quorum_certificates.split_off(&cert_view);
924 task_state.formed_next_epoch_quorum_certificates = task_state
925 .formed_next_epoch_quorum_certificates
926 .split_off(&cert_view);
927
928 broadcast_event(
929 Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
930 event_sender,
931 )
932 .await;
933}