1use std::{
11 marker::PhantomData,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_task::dependency_task::HandleDepOutput;
19use hotshot_types::{
20 consensus::{CommitmentAndMetadata, OuterConsensus},
21 data::{
22 Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2, ViewNumber,
23 },
24 epoch_membership::EpochMembership,
25 message::Proposal,
26 simple_certificate::{
27 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
28 UpgradeCertificate,
29 },
30 traits::{
31 BlockPayload,
32 block_contents::BlockHeader,
33 node_implementation::{NodeImplementation, NodeType},
34 signature_key::SignatureKey,
35 storage::Storage,
36 },
37 utils::{
38 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
39 is_transition_block, option_epoch_from_block_number,
40 },
41 vote::HasViewNumber,
42};
43use hotshot_utils::anytrace::*;
44use tracing::instrument;
45use versions::EPOCH_VERSION;
46
47use crate::{
48 events::HotShotEvent,
49 helpers::{
50 broadcast_event, check_qc_state_cert_correspondence, parent_leaf_and_state,
51 validate_light_client_state_update_certificate, validate_qc_and_next_epoch_qc,
52 },
53 quorum_proposal::{QuorumProposalTaskState, UpgradeLock},
54};
55
56#[derive(PartialEq, Debug)]
58pub(crate) enum ProposalDependency {
59 PayloadAndMetadata,
61
62 Qc,
64
65 ViewSyncCert,
67
68 TimeoutCert,
70
71 Proposal,
73
74 VidShare,
76}
77
78pub struct ProposalDependencyHandle<TYPES: NodeType> {
80 pub latest_proposed_view: ViewNumber,
82
83 pub view_number: ViewNumber,
85
86 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
88
89 pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
91
92 pub instance_state: Arc<TYPES::InstanceState>,
94
95 pub membership: EpochMembership<TYPES>,
97
98 pub public_key: TYPES::SignatureKey,
100
101 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
103
104 pub consensus: OuterConsensus<TYPES>,
106
107 pub timeout: u64,
109
110 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
117
118 pub upgrade_lock: UpgradeLock<TYPES>,
120
121 pub id: u64,
123
124 pub view_start_time: Instant,
126
127 pub epoch_height: u64,
129
130 pub cancel_receiver: Receiver<()>,
131}
132
133impl<TYPES: NodeType> ProposalDependencyHandle<TYPES> {
134 async fn wait_for_qc_event(
136 &self,
137 mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
138 ) -> Option<(
139 QuorumCertificate2<TYPES>,
140 Option<NextEpochQuorumCertificate2<TYPES>>,
141 Option<LightClientStateUpdateCertificateV2<TYPES>>,
142 )> {
143 while let Ok(event) = rx.recv_direct().await {
144 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
145 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
146 (qc, maybe_next_epoch_qc, None)
147 },
148 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
149 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
150 },
151 _ => continue,
152 };
153 if validate_qc_and_next_epoch_qc(
154 qc,
155 maybe_next_epoch_qc.as_ref(),
156 &self.consensus,
157 &self.membership.coordinator,
158 &self.upgrade_lock,
159 self.epoch_height,
160 )
161 .await
162 .is_ok()
163 {
164 if qc
165 .data
166 .block_number
167 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
168 {
169 if let Some(state_cert) = &maybe_state_cert {
171 if validate_light_client_state_update_certificate(
172 state_cert,
173 &self.membership.coordinator,
174 &self.upgrade_lock,
175 )
176 .await
177 .is_err()
178 || !check_qc_state_cert_correspondence(
179 qc,
180 state_cert,
181 self.epoch_height,
182 )
183 {
184 tracing::error!("Failed to validate state cert");
185 return None;
186 }
187 } else {
188 tracing::error!(
189 "Received an epoch root QC but we don't have the corresponding state \
190 cert."
191 );
192 return None;
193 }
194 } else {
195 maybe_state_cert = None;
196 }
197 return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
198 }
199 }
200 None
201 }
202
203 async fn wait_for_transition_qc(
204 &self,
205 ) -> Result<
206 Option<(
207 QuorumCertificate2<TYPES>,
208 NextEpochQuorumCertificate2<TYPES>,
209 )>,
210 > {
211 ensure!(
212 self.upgrade_lock.epochs_enabled(self.view_number),
213 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
214 );
215
216 let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
217
218 let wait_duration = Duration::from_millis(self.timeout / 2);
219
220 let mut rx = self.receiver.clone();
221
222 while let Ok(event) = rx.try_recv() {
225 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
226 if let Some(block_number) = qc.data.block_number {
227 if !is_transition_block(block_number, self.epoch_height) {
228 continue;
229 }
230 } else {
231 continue;
232 }
233 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
234 continue;
235 };
236 if validate_qc_and_next_epoch_qc(
237 qc,
238 Some(next_epoch_qc),
239 &self.consensus,
240 &self.membership.coordinator,
241 &self.upgrade_lock,
242 self.epoch_height,
243 )
244 .await
245 .is_ok()
246 && transition_qc
247 .as_ref()
248 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
249 {
250 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
251 }
252 }
253 }
254 while self.view_start_time.elapsed() < wait_duration {
256 let time_spent = Instant::now()
257 .checked_duration_since(self.view_start_time)
258 .ok_or(error!(
259 "Time elapsed since the start of the task is negative. This should never \
260 happen."
261 ))?;
262 let time_left = wait_duration
263 .checked_sub(time_spent)
264 .ok_or(info!("No time left"))?;
265 let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
266 return Ok(transition_qc);
267 };
268 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
269 if let Some(block_number) = qc.data.block_number {
270 if !is_transition_block(block_number, self.epoch_height) {
271 continue;
272 }
273 } else {
274 continue;
275 }
276 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
277 continue;
278 };
279 if validate_qc_and_next_epoch_qc(
280 qc,
281 Some(next_epoch_qc),
282 &self.consensus,
283 &self.membership.coordinator,
284 &self.upgrade_lock,
285 self.epoch_height,
286 )
287 .await
288 .is_ok()
289 && transition_qc
290 .as_ref()
291 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
292 {
293 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
294 }
295 }
296 }
297 Ok(transition_qc)
298 }
299 async fn wait_for_highest_qc(
303 &self,
304 ) -> Result<(
305 QuorumCertificate2<TYPES>,
306 Option<NextEpochQuorumCertificate2<TYPES>>,
307 Option<LightClientStateUpdateCertificateV2<TYPES>>,
308 )> {
309 tracing::debug!("waiting for QC");
310 ensure!(
312 self.upgrade_lock.epochs_enabled(self.view_number),
313 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
314 );
315
316 let consensus_reader = self.consensus.read().await;
317 let mut highest_qc = consensus_reader.high_qc().clone();
318 let mut state_cert = if highest_qc
319 .data
320 .block_number
321 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
322 {
323 consensus_reader.state_cert().cloned()
324 } else {
325 None
326 };
327 let mut next_epoch_qc = if highest_qc
328 .data
329 .block_number
330 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
331 {
332 let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
333 if maybe_neqc
334 .as_ref()
335 .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
336 {
337 maybe_neqc
338 } else {
339 None
340 }
341 } else {
342 None
343 };
344 drop(consensus_reader);
345
346 let wait_duration = Duration::from_millis(self.timeout / 2);
347
348 let mut rx = self.receiver.clone();
349
350 while let Ok(event) = rx.try_recv() {
352 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
353 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
354 (qc, maybe_next_epoch_qc, None)
355 },
356 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
357 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
358 },
359 _ => continue,
360 };
361 if validate_qc_and_next_epoch_qc(
362 qc,
363 maybe_next_epoch_qc.as_ref(),
364 &self.consensus,
365 &self.membership.coordinator,
366 &self.upgrade_lock,
367 self.epoch_height,
368 )
369 .await
370 .is_ok()
371 {
372 if qc
373 .data
374 .block_number
375 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
376 {
377 if let Some(state_cert) = &maybe_state_cert {
379 if validate_light_client_state_update_certificate(
380 state_cert,
381 &self.membership.coordinator,
382 &self.upgrade_lock,
383 )
384 .await
385 .is_err()
386 || !check_qc_state_cert_correspondence(
387 qc,
388 state_cert,
389 self.epoch_height,
390 )
391 {
392 tracing::error!("Failed to validate state cert");
393 continue;
394 }
395 } else {
396 tracing::error!(
397 "Received an epoch root QC but we don't have the corresponding state \
398 cert."
399 );
400 continue;
401 }
402 } else {
403 maybe_state_cert = None;
404 }
405 if qc.view_number() > highest_qc.view_number() {
406 highest_qc = qc.clone();
407 next_epoch_qc = maybe_next_epoch_qc.clone();
408 state_cert = maybe_state_cert;
409 }
410 }
411 }
412
413 while self.view_start_time.elapsed() < wait_duration {
415 let time_spent = Instant::now()
416 .checked_duration_since(self.view_start_time)
417 .ok_or(error!(
418 "Time elapsed since the start of the task is negative. This should never \
419 happen."
420 ))?;
421 let time_left = wait_duration
422 .checked_sub(time_spent)
423 .ok_or(info!("No time left"))?;
424 let Ok(maybe_qc_state_cert) =
425 tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
426 else {
427 tracing::info!(
428 "Some nodes did not respond with their HighQc in time. Continuing with the \
429 highest QC that we received: {highest_qc:?}"
430 );
431 return Ok((highest_qc, next_epoch_qc, state_cert));
432 };
433 let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
434 continue;
435 };
436 if qc.view_number() > highest_qc.view_number() {
437 highest_qc = qc;
438 next_epoch_qc = maybe_next_epoch_qc;
439 state_cert = maybe_state_cert;
440 }
441 }
442 Ok((highest_qc, next_epoch_qc, state_cert))
443 }
444 #[allow(clippy::too_many_arguments)]
448 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
449 async fn publish_proposal(
450 &self,
451 commitment_and_metadata: CommitmentAndMetadata<TYPES>,
452 _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
453 view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
454 formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
455 parent_qc: QuorumCertificate2<TYPES>,
456 maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
457 maybe_state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
458 ) -> Result<()> {
459 let (parent_leaf, state) = parent_leaf_and_state(
460 &self.sender,
461 &self.receiver,
462 self.membership.coordinator.clone(),
463 self.public_key.clone(),
464 self.private_key.clone(),
465 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
466 &self.upgrade_lock,
467 &parent_qc,
468 self.epoch_height,
469 )
470 .await?;
471
472 let mut upgrade_certificate = parent_leaf
487 .upgrade_certificate()
488 .or(formed_upgrade_certificate);
489
490 if let Some(cert) = upgrade_certificate.clone()
491 && cert.is_relevant(self.view_number).await.is_err()
492 {
493 upgrade_certificate = None;
494 }
495
496 let proposal_certificate = view_change_evidence
497 .as_ref()
498 .filter(|cert| cert.is_valid_for_view(&self.view_number))
499 .cloned();
500
501 ensure!(
502 commitment_and_metadata.block_view == self.view_number,
503 "Cannot propose because our VID payload commitment and metadata is for an older view."
504 );
505
506 let version = self.upgrade_lock.version(self.view_number)?;
507
508 let builder_commitment = commitment_and_metadata.builder_commitment.clone();
509 let metadata = commitment_and_metadata.metadata.clone();
510
511 if version >= EPOCH_VERSION
512 && parent_qc.view_number()
513 > self
514 .upgrade_lock
515 .upgrade_view()
516 .unwrap_or(ViewNumber::new(0))
517 {
518 let Some(parent_block_number) = parent_qc.data.block_number else {
519 tracing::error!("Parent QC does not have a block number. Do not propose.");
520 return Ok(());
521 };
522 if is_epoch_transition(parent_block_number, self.epoch_height)
523 && !is_last_block(parent_block_number, self.epoch_height)
524 {
525 let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
526 tracing::info!("Reached end of epoch.");
527 ensure!(
528 builder_commitment == empty_payload.builder_commitment(&metadata)
529 && metadata == empty_metadata,
530 "We're trying to propose non empty block in the epoch transition. Do not \
531 propose. View number: {}. Parent Block number: {}",
532 self.view_number,
533 parent_block_number,
534 );
535 }
536 if is_epoch_root(parent_block_number, self.epoch_height) {
537 ensure!(
538 maybe_state_cert.as_ref().is_some_and(|state_cert| {
539 check_qc_state_cert_correspondence(
540 &parent_qc,
541 state_cert,
542 self.epoch_height,
543 )
544 }),
545 "We are proposing with parent epoch root QC but we don't have the \
546 corresponding state cert."
547 );
548 }
549 }
550 let block_header = TYPES::BlockHeader::new(
551 state.as_ref(),
552 self.instance_state.as_ref(),
553 &parent_leaf,
554 commitment_and_metadata.commitment,
555 builder_commitment,
556 metadata,
557 commitment_and_metadata.fees.first().clone(),
558 version,
559 *self.view_number,
560 )
561 .await
562 .wrap()
563 .context(warn!("Failed to construct block header"))?;
564 let epoch = option_epoch_from_block_number(
565 version >= EPOCH_VERSION,
566 block_header.block_number(),
567 self.epoch_height,
568 );
569
570 let epoch_membership = self
571 .membership
572 .coordinator
573 .membership_for_epoch(epoch)
574 .await?;
575 if epoch_membership.leader(self.view_number).await? != self.public_key {
578 tracing::warn!(
579 "We are not the leader in the epoch for which we are about to propose. Do not \
580 send the quorum proposal."
581 );
582 return Ok(());
583 }
584 let is_high_qc_for_transition_block = parent_qc
585 .data
586 .block_number
587 .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
588 let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number)
589 && is_high_qc_for_transition_block
590 {
591 ensure!(
592 maybe_next_epoch_qc
593 .as_ref()
594 .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
595 "Justify QC on our proposal is for an epoch transition block but we don't have \
596 the corresponding next epoch QC. Do not propose."
597 );
598 maybe_next_epoch_qc
599 } else {
600 None
601 };
602 let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
603 {
604 if let Some(epoch_val) = &epoch {
605 let drb_result = epoch_membership
606 .next_epoch()
607 .await
608 .context(warn!("No stake table for epoch {}", *epoch_val + 1))?
609 .get_epoch_drb()
610 .await
611 .clone()
612 .context(warn!("No DRB result for epoch {}", *epoch_val + 1))?;
613
614 Some(drb_result)
615 } else {
616 None
617 }
618 } else {
619 None
620 };
621
622 let proposal = QuorumProposalWrapper {
623 proposal: QuorumProposal2 {
624 block_header,
625 view_number: self.view_number,
626 epoch,
627 justify_qc: parent_qc,
628 next_epoch_justify_qc: next_epoch_qc,
629 upgrade_certificate,
630 view_change_evidence: proposal_certificate,
631 next_drb_result,
632 state_cert: maybe_state_cert,
633 },
634 };
635
636 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
637 ensure!(
638 proposed_leaf.parent_commitment() == parent_leaf.commit(),
639 "Proposed leaf parent does not equal high qc"
640 );
641
642 let signature =
643 TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
644 .wrap()
645 .context(error!("Failed to compute proposed_leaf.commit()"))?;
646
647 let message = Proposal {
648 data: proposal,
649 signature,
650 _pd: PhantomData,
651 };
652 tracing::info!(
653 "Sending proposal for view {}, height {}, justify_qc view: {}",
654 proposed_leaf.view_number(),
655 proposed_leaf.height(),
656 proposed_leaf.justify_qc().view_number()
657 );
658
659 broadcast_event(
660 Arc::new(HotShotEvent::QuorumProposalSend(
661 message.clone(),
662 self.public_key.clone(),
663 )),
664 &self.sender,
665 )
666 .await;
667
668 Ok(())
669 }
670
671 fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
672 let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
673 tracing::warn!("Failed to propose, events: {:#?}", events);
674 }
675
676 async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
677 let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
678 let mut timeout_certificate = None;
679 let mut view_sync_finalize_cert = None;
680 let mut vid_share = None;
681 let mut parent_qc = None;
682 let mut next_epoch_qc = None;
683 let mut state_cert = None;
684 for event in res.iter().flatten().flatten() {
685 match event.as_ref() {
686 HotShotEvent::SendPayloadCommitmentAndMetadata(
687 payload_commitment,
688 builder_commitment,
689 metadata,
690 view,
691 fees,
692 ) => {
693 commit_and_metadata = Some(CommitmentAndMetadata {
694 commitment: *payload_commitment,
695 builder_commitment: builder_commitment.clone(),
696 metadata: metadata.clone(),
697 fees: fees.clone(),
698 block_view: *view,
699 });
700 },
701 HotShotEvent::Qc2Formed(cert) => match cert {
702 either::Right(timeout) => {
703 timeout_certificate = Some(timeout.clone());
704 },
705 either::Left(qc) => {
706 parent_qc = Some(qc.clone());
707 },
708 },
709 HotShotEvent::EpochRootQcFormed(root_qc) => {
710 parent_qc = Some(root_qc.qc.clone());
711 state_cert = Some(root_qc.state_cert.clone());
712 },
713 HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
714 view_sync_finalize_cert = Some(cert.clone());
715 },
716 HotShotEvent::VidDisperseSend(share, _) => {
717 vid_share = Some(share.clone());
718 },
719 HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
720 next_epoch_qc = Some(qc.clone());
721 },
722 _ => {},
723 }
724 }
725
726 let Ok(version) = self.upgrade_lock.version(self.view_number) else {
727 bail!(error!(
728 "Failed to get version for view {:?}, not proposing",
729 self.view_number
730 ));
731 };
732
733 let mut maybe_epoch = None;
734 let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
735 maybe_epoch = view_sync_cert.data.epoch;
736 Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
737 } else {
738 match timeout_certificate {
739 Some(timeout_cert) => {
740 maybe_epoch = timeout_cert.data.epoch;
741 Some(ViewChangeEvidence2::Timeout(timeout_cert))
742 },
743 None => None,
744 }
745 };
746
747 let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
748 if qc
749 .data
750 .block_number
751 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
752 && next_epoch_qc
753 .as_ref()
754 .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
755 {
756 bail!(error!(
757 "We've formed a transition QC but we haven't formed the corresponding next \
758 epoch QC. Do not propose."
759 ));
760 }
761 (qc, next_epoch_qc, state_cert)
762 } else if version < EPOCH_VERSION {
763 (self.consensus.read().await.high_qc().clone(), None, None)
764 } else if proposal_cert.is_some() {
765 if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
767 let Some(epoch) = maybe_epoch else {
768 bail!(error!(
769 "No epoch found on view change evidence, but we are in epoch mode"
770 ));
771 };
772 if qc
773 .data
774 .block_number
775 .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
776 {
777 (qc, Some(next_epoch_qc), None)
778 } else {
779 match self.wait_for_highest_qc().await {
780 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
781 (qc, maybe_next_epoch_qc, maybe_state_cert)
782 },
783 Err(e) => {
784 bail!(error!("Error while waiting for highest QC: {e:?}"));
785 },
786 }
787 }
788 } else {
789 let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
790 self.wait_for_highest_qc().await
791 else {
792 bail!(error!("Error while waiting for highest QC"));
793 };
794 if qc.data.block_number.is_some_and(|bn| {
795 is_epoch_transition(bn, self.epoch_height)
796 && !is_last_block(bn, self.epoch_height)
797 }) {
798 bail!(error!(
799 "High is in transition but we need to propose with transition QC, do \
800 nothing"
801 ));
802 }
803 (qc, maybe_next_epoch_qc, maybe_state_cert)
804 }
805 } else {
806 match self.wait_for_highest_qc().await {
807 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
808 (qc, maybe_next_epoch_qc, maybe_state_cert)
809 },
810 Err(e) => {
811 bail!(error!("Error while waiting for highest QC: {e:?}"));
812 },
813 }
814 };
815
816 ensure!(
817 commit_and_metadata.is_some(),
818 error!(
819 "Somehow completed the proposal dependency task without a commitment and metadata"
820 )
821 );
822
823 ensure!(
824 vid_share.is_some(),
825 error!("Somehow completed the proposal dependency task without a VID share")
826 );
827
828 self.publish_proposal(
829 commit_and_metadata.unwrap(),
830 vid_share.unwrap(),
831 proposal_cert,
832 self.formed_upgrade_certificate.clone(),
833 parent_qc,
834 maybe_next_epoch_qc,
835 maybe_state_cert,
836 )
837 .await
838 .inspect_err(|e| tracing::error!("Failed to publish proposal: {e:?}"))
839 }
840}
841
842impl<TYPES: NodeType> HandleDepOutput for ProposalDependencyHandle<TYPES> {
843 type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
844
845 #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
846 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
847 async fn handle_dep_result(self, res: Self::Output) {
848 let mut cancel_receiver = self.cancel_receiver.clone();
849 let result = tokio::select! {
850 result = self.handle_proposal_deps(&res) => {
851 result
852 }
853 _ = cancel_receiver.recv() => {
854 tracing::warn!("Proposal dependency task cancelled");
855 return;
856 }
857 };
858 if result.is_err() {
859 log!(result);
860 self.print_proposal_events(&res)
861 }
862 }
863}
864
865pub(super) async fn handle_eqc_formed<TYPES: NodeType, I: NodeImplementation<TYPES>>(
866 cert_view: ViewNumber,
867 leaf_commit: Commitment<Leaf2<TYPES>>,
868 block_number: Option<u64>,
869 task_state: &mut QuorumProposalTaskState<TYPES, I>,
870 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
871) {
872 if !task_state.upgrade_lock.epochs_enabled(cert_view) {
873 tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
874 return;
875 }
876 if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
877 tracing::debug!("We formed QC but not eQC. Do nothing");
878 return;
879 }
880
881 let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
882 tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
883 return;
884 };
885 if current_epoch_qc.view_number() != cert_view
886 || current_epoch_qc.data.leaf_commit != leaf_commit
887 {
888 tracing::debug!("We haven't yet formed the eQC. Do nothing");
889 return;
890 }
891 let Some(next_epoch_qc) = task_state
892 .formed_next_epoch_quorum_certificates
893 .get(&cert_view)
894 else {
895 tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
896 return;
897 };
898 if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
899 tracing::debug!(
900 "We formed the eQC but the current and next epoch QCs do not correspond to each other."
901 );
902 return;
903 }
904 let current_epoch_qc_clone = current_epoch_qc.clone();
905
906 let mut consensus_writer = task_state.consensus.write().await;
907 let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
908 let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
909 drop(consensus_writer);
910
911 if let Err(e) = task_state
912 .storage
913 .update_eqc(current_epoch_qc.clone(), next_epoch_qc.clone())
914 .await
915 {
916 tracing::error!("Failed to store EQC: {}", e);
917 }
918
919 task_state.formed_quorum_certificates =
920 task_state.formed_quorum_certificates.split_off(&cert_view);
921 task_state.formed_next_epoch_quorum_certificates = task_state
922 .formed_next_epoch_quorum_certificates
923 .split_off(&cert_view);
924
925 broadcast_event(
926 Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
927 event_sender,
928 )
929 .await;
930}