1use std::{
11 marker::PhantomData,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_task::dependency_task::HandleDepOutput;
19use hotshot_types::{
20 consensus::{CommitmentAndMetadata, OuterConsensus},
21 data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
22 epoch_membership::EpochMembership,
23 message::Proposal,
24 simple_certificate::{
25 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
26 UpgradeCertificate,
27 },
28 traits::{
29 block_contents::BlockHeader,
30 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
31 signature_key::SignatureKey,
32 storage::Storage,
33 BlockPayload,
34 },
35 utils::{
36 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
37 is_transition_block, option_epoch_from_block_number,
38 },
39 vote::HasViewNumber,
40};
41use hotshot_utils::anytrace::*;
42use tracing::instrument;
43use vbs::version::StaticVersionType;
44
45use crate::{
46 events::HotShotEvent,
47 helpers::{
48 broadcast_event, check_qc_state_cert_correspondence, parent_leaf_and_state,
49 validate_light_client_state_update_certificate, validate_qc_and_next_epoch_qc,
50 },
51 quorum_proposal::{QuorumProposalTaskState, UpgradeLock, Versions},
52};
53
54#[derive(PartialEq, Debug)]
56pub(crate) enum ProposalDependency {
57 PayloadAndMetadata,
59
60 Qc,
62
63 ViewSyncCert,
65
66 TimeoutCert,
68
69 Proposal,
71
72 VidShare,
74}
75
76pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
78 pub latest_proposed_view: TYPES::View,
80
81 pub view_number: TYPES::View,
83
84 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87 pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
89
90 pub instance_state: Arc<TYPES::InstanceState>,
92
93 pub membership: EpochMembership<TYPES>,
95
96 pub public_key: TYPES::SignatureKey,
98
99 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
101
102 pub consensus: OuterConsensus<TYPES>,
104
105 pub timeout: u64,
107
108 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
115
116 pub upgrade_lock: UpgradeLock<TYPES, V>,
118
119 pub id: u64,
121
122 pub view_start_time: Instant,
124
125 pub epoch_height: u64,
127}
128
129impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
130 async fn wait_for_qc_event(
132 &self,
133 mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
134 ) -> Option<(
135 QuorumCertificate2<TYPES>,
136 Option<NextEpochQuorumCertificate2<TYPES>>,
137 Option<LightClientStateUpdateCertificateV2<TYPES>>,
138 )> {
139 while let Ok(event) = rx.recv_direct().await {
140 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
141 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
142 (qc, maybe_next_epoch_qc, None)
143 },
144 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
145 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
146 },
147 _ => continue,
148 };
149 if validate_qc_and_next_epoch_qc(
150 qc,
151 maybe_next_epoch_qc.as_ref(),
152 &self.consensus,
153 &self.membership.coordinator,
154 &self.upgrade_lock,
155 self.epoch_height,
156 )
157 .await
158 .is_ok()
159 {
160 if qc
161 .data
162 .block_number
163 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
164 {
165 if let Some(state_cert) = &maybe_state_cert {
167 if validate_light_client_state_update_certificate(
168 state_cert,
169 &self.membership.coordinator,
170 &self.upgrade_lock,
171 )
172 .await
173 .is_err()
174 || !check_qc_state_cert_correspondence(
175 qc,
176 state_cert,
177 self.epoch_height,
178 )
179 {
180 tracing::error!("Failed to validate state cert");
181 return None;
182 }
183 } else {
184 tracing::error!(
185 "Received an epoch root QC but we don't have the corresponding state \
186 cert."
187 );
188 return None;
189 }
190 } else {
191 maybe_state_cert = None;
192 }
193 return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
194 }
195 }
196 None
197 }
198
199 async fn wait_for_transition_qc(
200 &self,
201 ) -> Result<
202 Option<(
203 QuorumCertificate2<TYPES>,
204 NextEpochQuorumCertificate2<TYPES>,
205 )>,
206 > {
207 ensure!(
208 self.upgrade_lock.epochs_enabled(self.view_number).await,
209 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
210 );
211
212 let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
213
214 let wait_duration = Duration::from_millis(self.timeout / 2);
215
216 let mut rx = self.receiver.clone();
217
218 while let Ok(event) = rx.try_recv() {
221 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
222 if let Some(block_number) = qc.data.block_number {
223 if !is_transition_block(block_number, self.epoch_height) {
224 continue;
225 }
226 } else {
227 continue;
228 }
229 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
230 continue;
231 };
232 if validate_qc_and_next_epoch_qc(
233 qc,
234 Some(next_epoch_qc),
235 &self.consensus,
236 &self.membership.coordinator,
237 &self.upgrade_lock,
238 self.epoch_height,
239 )
240 .await
241 .is_ok()
242 && transition_qc
243 .as_ref()
244 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
245 {
246 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
247 }
248 }
249 }
250 while self.view_start_time.elapsed() < wait_duration {
252 let time_spent = Instant::now()
253 .checked_duration_since(self.view_start_time)
254 .ok_or(error!(
255 "Time elapsed since the start of the task is negative. This should never \
256 happen."
257 ))?;
258 let time_left = wait_duration
259 .checked_sub(time_spent)
260 .ok_or(info!("No time left"))?;
261 let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
262 return Ok(transition_qc);
263 };
264 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
265 if let Some(block_number) = qc.data.block_number {
266 if !is_transition_block(block_number, self.epoch_height) {
267 continue;
268 }
269 } else {
270 continue;
271 }
272 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
273 continue;
274 };
275 if validate_qc_and_next_epoch_qc(
276 qc,
277 Some(next_epoch_qc),
278 &self.consensus,
279 &self.membership.coordinator,
280 &self.upgrade_lock,
281 self.epoch_height,
282 )
283 .await
284 .is_ok()
285 && transition_qc
286 .as_ref()
287 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
288 {
289 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
290 }
291 }
292 }
293 Ok(transition_qc)
294 }
295 async fn wait_for_highest_qc(
299 &self,
300 ) -> Result<(
301 QuorumCertificate2<TYPES>,
302 Option<NextEpochQuorumCertificate2<TYPES>>,
303 Option<LightClientStateUpdateCertificateV2<TYPES>>,
304 )> {
305 tracing::debug!("waiting for QC");
306 ensure!(
308 self.upgrade_lock.epochs_enabled(self.view_number).await,
309 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
310 );
311
312 let consensus_reader = self.consensus.read().await;
313 let mut highest_qc = consensus_reader.high_qc().clone();
314 let mut state_cert = if highest_qc
315 .data
316 .block_number
317 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
318 {
319 consensus_reader.state_cert().cloned()
320 } else {
321 None
322 };
323 let mut next_epoch_qc = if highest_qc
324 .data
325 .block_number
326 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
327 {
328 let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
329 if maybe_neqc
330 .as_ref()
331 .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
332 {
333 maybe_neqc
334 } else {
335 None
336 }
337 } else {
338 None
339 };
340 drop(consensus_reader);
341
342 let wait_duration = Duration::from_millis(self.timeout / 2);
343
344 let mut rx = self.receiver.clone();
345
346 while let Ok(event) = rx.try_recv() {
348 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
349 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
350 (qc, maybe_next_epoch_qc, None)
351 },
352 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
353 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
354 },
355 _ => continue,
356 };
357 if validate_qc_and_next_epoch_qc(
358 qc,
359 maybe_next_epoch_qc.as_ref(),
360 &self.consensus,
361 &self.membership.coordinator,
362 &self.upgrade_lock,
363 self.epoch_height,
364 )
365 .await
366 .is_ok()
367 {
368 if qc
369 .data
370 .block_number
371 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
372 {
373 if let Some(state_cert) = &maybe_state_cert {
375 if validate_light_client_state_update_certificate(
376 state_cert,
377 &self.membership.coordinator,
378 &self.upgrade_lock,
379 )
380 .await
381 .is_err()
382 || !check_qc_state_cert_correspondence(
383 qc,
384 state_cert,
385 self.epoch_height,
386 )
387 {
388 tracing::error!("Failed to validate state cert");
389 continue;
390 }
391 } else {
392 tracing::error!(
393 "Received an epoch root QC but we don't have the corresponding state \
394 cert."
395 );
396 continue;
397 }
398 } else {
399 maybe_state_cert = None;
400 }
401 if qc.view_number() > highest_qc.view_number() {
402 highest_qc = qc.clone();
403 next_epoch_qc = maybe_next_epoch_qc.clone();
404 state_cert = maybe_state_cert;
405 }
406 }
407 }
408
409 while self.view_start_time.elapsed() < wait_duration {
411 let time_spent = Instant::now()
412 .checked_duration_since(self.view_start_time)
413 .ok_or(error!(
414 "Time elapsed since the start of the task is negative. This should never \
415 happen."
416 ))?;
417 let time_left = wait_duration
418 .checked_sub(time_spent)
419 .ok_or(info!("No time left"))?;
420 let Ok(maybe_qc_state_cert) =
421 tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
422 else {
423 tracing::info!(
424 "Some nodes did not respond with their HighQc in time. Continuing with the \
425 highest QC that we received: {highest_qc:?}"
426 );
427 return Ok((highest_qc, next_epoch_qc, state_cert));
428 };
429 let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
430 continue;
431 };
432 if qc.view_number() > highest_qc.view_number() {
433 highest_qc = qc;
434 next_epoch_qc = maybe_next_epoch_qc;
435 state_cert = maybe_state_cert;
436 }
437 }
438 Ok((highest_qc, next_epoch_qc, state_cert))
439 }
440 #[allow(clippy::too_many_arguments)]
444 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
445 async fn publish_proposal(
446 &self,
447 commitment_and_metadata: CommitmentAndMetadata<TYPES>,
448 _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
449 view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
450 formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
451 parent_qc: QuorumCertificate2<TYPES>,
452 maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
453 maybe_state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
454 ) -> Result<()> {
455 let (parent_leaf, state) = parent_leaf_and_state(
456 &self.sender,
457 &self.receiver,
458 self.membership.coordinator.clone(),
459 self.public_key.clone(),
460 self.private_key.clone(),
461 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
462 &self.upgrade_lock,
463 &parent_qc,
464 self.epoch_height,
465 )
466 .await?;
467
468 let mut upgrade_certificate = parent_leaf
483 .upgrade_certificate()
484 .or(formed_upgrade_certificate);
485
486 if let Some(cert) = upgrade_certificate.clone() {
487 if cert.is_relevant(self.view_number).await.is_err() {
488 upgrade_certificate = None;
489 }
490 }
491
492 let proposal_certificate = view_change_evidence
493 .as_ref()
494 .filter(|cert| cert.is_valid_for_view(&self.view_number))
495 .cloned();
496
497 ensure!(
498 commitment_and_metadata.block_view == self.view_number,
499 "Cannot propose because our VID payload commitment and metadata is for an older view."
500 );
501
502 let version = self.upgrade_lock.version(self.view_number).await?;
503
504 let builder_commitment = commitment_and_metadata.builder_commitment.clone();
505 let metadata = commitment_and_metadata.metadata.clone();
506
507 if version >= V::Epochs::VERSION
508 && parent_qc.view_number()
509 > self
510 .upgrade_lock
511 .upgrade_view()
512 .await
513 .unwrap_or(TYPES::View::new(0))
514 {
515 let Some(parent_block_number) = parent_qc.data.block_number else {
516 tracing::error!("Parent QC does not have a block number. Do not propose.");
517 return Ok(());
518 };
519 if is_epoch_transition(parent_block_number, self.epoch_height)
520 && !is_last_block(parent_block_number, self.epoch_height)
521 {
522 let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
523 tracing::info!("Reached end of epoch.");
524 ensure!(
525 builder_commitment == empty_payload.builder_commitment(&metadata)
526 && metadata == empty_metadata,
527 "We're trying to propose non empty block in the epoch transition. Do not \
528 propose. View number: {}. Parent Block number: {}",
529 self.view_number,
530 parent_block_number,
531 );
532 }
533 if is_epoch_root(parent_block_number, self.epoch_height) {
534 ensure!(
535 maybe_state_cert.as_ref().is_some_and(|state_cert| {
536 check_qc_state_cert_correspondence(
537 &parent_qc,
538 state_cert,
539 self.epoch_height,
540 )
541 }),
542 "We are proposing with parent epoch root QC but we don't have the \
543 corresponding state cert."
544 );
545 }
546 }
547 let block_header = TYPES::BlockHeader::new(
548 state.as_ref(),
549 self.instance_state.as_ref(),
550 &parent_leaf,
551 commitment_and_metadata.commitment,
552 builder_commitment,
553 metadata,
554 commitment_and_metadata.fees.first().clone(),
555 version,
556 *self.view_number,
557 )
558 .await
559 .wrap()
560 .context(warn!("Failed to construct block header"))?;
561 let epoch = option_epoch_from_block_number::<TYPES>(
562 version >= V::Epochs::VERSION,
563 block_header.block_number(),
564 self.epoch_height,
565 );
566
567 let epoch_membership = self
568 .membership
569 .coordinator
570 .membership_for_epoch(epoch)
571 .await?;
572 if epoch_membership.leader(self.view_number).await? != self.public_key {
575 tracing::warn!(
576 "We are not the leader in the epoch for which we are about to propose. Do not \
577 send the quorum proposal."
578 );
579 return Ok(());
580 }
581 let is_high_qc_for_transition_block = parent_qc
582 .data
583 .block_number
584 .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
585 let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number).await
586 && is_high_qc_for_transition_block
587 {
588 ensure!(
589 maybe_next_epoch_qc
590 .as_ref()
591 .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
592 "Jusify QC on our proposal is for an epoch transition block but we don't have the \
593 corresponding next epoch QC. Do not propose."
594 );
595 maybe_next_epoch_qc
596 } else {
597 None
598 };
599 let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
600 {
601 if let Some(epoch_val) = &epoch {
602 let drb_result = epoch_membership
603 .next_epoch()
604 .await
605 .context(warn!("No stake table for epoch {}", *epoch_val + 1))?
606 .get_epoch_drb()
607 .await
608 .clone()
609 .context(warn!("No DRB result for epoch {}", *epoch_val + 1))?;
610
611 Some(drb_result)
612 } else {
613 None
614 }
615 } else {
616 None
617 };
618
619 let proposal = QuorumProposalWrapper {
620 proposal: QuorumProposal2 {
621 block_header,
622 view_number: self.view_number,
623 epoch,
624 justify_qc: parent_qc,
625 next_epoch_justify_qc: next_epoch_qc,
626 upgrade_certificate,
627 view_change_evidence: proposal_certificate,
628 next_drb_result,
629 state_cert: maybe_state_cert,
630 },
631 };
632
633 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
634 ensure!(
635 proposed_leaf.parent_commitment() == parent_leaf.commit(),
636 "Proposed leaf parent does not equal high qc"
637 );
638
639 let signature =
640 TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
641 .wrap()
642 .context(error!("Failed to compute proposed_leaf.commit()"))?;
643
644 let message = Proposal {
645 data: proposal,
646 signature,
647 _pd: PhantomData,
648 };
649 tracing::info!(
650 "Sending proposal for view {}, height {}, justify_qc view: {}",
651 proposed_leaf.view_number(),
652 proposed_leaf.height(),
653 proposed_leaf.justify_qc().view_number()
654 );
655
656 broadcast_event(
657 Arc::new(HotShotEvent::QuorumProposalSend(
658 message.clone(),
659 self.public_key.clone(),
660 )),
661 &self.sender,
662 )
663 .await;
664
665 Ok(())
666 }
667
668 fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
669 let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
670 tracing::warn!("Failed to propose, events: {:#?}", events);
671 }
672
673 async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
674 let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
675 let mut timeout_certificate = None;
676 let mut view_sync_finalize_cert = None;
677 let mut vid_share = None;
678 let mut parent_qc = None;
679 let mut next_epoch_qc = None;
680 let mut state_cert = None;
681 for event in res.iter().flatten().flatten() {
682 match event.as_ref() {
683 HotShotEvent::SendPayloadCommitmentAndMetadata(
684 payload_commitment,
685 builder_commitment,
686 metadata,
687 view,
688 fees,
689 ) => {
690 commit_and_metadata = Some(CommitmentAndMetadata {
691 commitment: *payload_commitment,
692 builder_commitment: builder_commitment.clone(),
693 metadata: metadata.clone(),
694 fees: fees.clone(),
695 block_view: *view,
696 });
697 },
698 HotShotEvent::Qc2Formed(cert) => match cert {
699 either::Right(timeout) => {
700 timeout_certificate = Some(timeout.clone());
701 },
702 either::Left(qc) => {
703 parent_qc = Some(qc.clone());
704 },
705 },
706 HotShotEvent::EpochRootQcFormed(root_qc) => {
707 parent_qc = Some(root_qc.qc.clone());
708 state_cert = Some(root_qc.state_cert.clone());
709 },
710 HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
711 view_sync_finalize_cert = Some(cert.clone());
712 },
713 HotShotEvent::VidDisperseSend(share, _) => {
714 vid_share = Some(share.clone());
715 },
716 HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
717 next_epoch_qc = Some(qc.clone());
718 },
719 _ => {},
720 }
721 }
722
723 let Ok(version) = self.upgrade_lock.version(self.view_number).await else {
724 bail!(error!(
725 "Failed to get version for view {:?}, not proposing",
726 self.view_number
727 ));
728 };
729
730 let mut maybe_epoch = None;
731 let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
732 maybe_epoch = view_sync_cert.data.epoch;
733 Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
734 } else {
735 match timeout_certificate {
736 Some(timeout_cert) => {
737 maybe_epoch = timeout_cert.data.epoch;
738 Some(ViewChangeEvidence2::Timeout(timeout_cert))
739 },
740 None => None,
741 }
742 };
743
744 let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
745 if qc
746 .data
747 .block_number
748 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
749 && next_epoch_qc
750 .as_ref()
751 .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
752 {
753 bail!(error!(
754 "We've formed a transition QC but we haven't formed the corresponding next \
755 epoch QC. Do not propose."
756 ));
757 }
758 (qc, next_epoch_qc, state_cert)
759 } else if version < V::Epochs::VERSION {
760 (self.consensus.read().await.high_qc().clone(), None, None)
761 } else if proposal_cert.is_some() {
762 if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
764 let Some(epoch) = maybe_epoch else {
765 bail!(error!(
766 "No epoch found on view change evidence, but we are in epoch mode"
767 ));
768 };
769 if qc
770 .data
771 .block_number
772 .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
773 {
774 (qc, Some(next_epoch_qc), None)
775 } else {
776 match self.wait_for_highest_qc().await {
777 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
778 (qc, maybe_next_epoch_qc, maybe_state_cert)
779 },
780 Err(e) => {
781 bail!(error!("Error while waiting for highest QC: {e:?}"));
782 },
783 }
784 }
785 } else {
786 let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
787 self.wait_for_highest_qc().await
788 else {
789 bail!(error!("Error while waiting for highest QC"));
790 };
791 if qc.data.block_number.is_some_and(|bn| {
792 is_epoch_transition(bn, self.epoch_height)
793 && !is_last_block(bn, self.epoch_height)
794 }) {
795 bail!(error!(
796 "High is in transition but we need to propose with transition QC, do \
797 nothing"
798 ));
799 }
800 (qc, maybe_next_epoch_qc, maybe_state_cert)
801 }
802 } else {
803 match self.wait_for_highest_qc().await {
804 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
805 (qc, maybe_next_epoch_qc, maybe_state_cert)
806 },
807 Err(e) => {
808 bail!(error!("Error while waiting for highest QC: {e:?}"));
809 },
810 }
811 };
812
813 ensure!(
814 commit_and_metadata.is_some(),
815 error!(
816 "Somehow completed the proposal dependency task without a commitment and metadata"
817 )
818 );
819
820 ensure!(
821 vid_share.is_some(),
822 error!("Somehow completed the proposal dependency task without a VID share")
823 );
824
825 self.publish_proposal(
826 commit_and_metadata.unwrap(),
827 vid_share.unwrap(),
828 proposal_cert,
829 self.formed_upgrade_certificate.clone(),
830 parent_qc,
831 maybe_next_epoch_qc,
832 maybe_state_cert,
833 )
834 .await
835 }
836}
837
838impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<TYPES, V> {
839 type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
840
841 #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
842 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
843 async fn handle_dep_result(self, res: Self::Output) {
844 let result = self.handle_proposal_deps(&res).await;
845 if result.is_err() {
846 log!(result);
847 self.print_proposal_events(&res)
848 }
849 }
850}
851
852pub(super) async fn handle_eqc_formed<
853 TYPES: NodeType,
854 I: NodeImplementation<TYPES>,
855 V: Versions,
856>(
857 cert_view: TYPES::View,
858 leaf_commit: Commitment<Leaf2<TYPES>>,
859 block_number: Option<u64>,
860 task_state: &mut QuorumProposalTaskState<TYPES, I, V>,
861 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
862) {
863 if !task_state.upgrade_lock.epochs_enabled(cert_view).await {
864 tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
865 return;
866 }
867 if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
868 tracing::debug!("We formed QC but not eQC. Do nothing");
869 return;
870 }
871
872 let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
873 tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
874 return;
875 };
876 if current_epoch_qc.view_number() != cert_view
877 || current_epoch_qc.data.leaf_commit != leaf_commit
878 {
879 tracing::debug!("We haven't yet formed the eQC. Do nothing");
880 return;
881 }
882 let Some(next_epoch_qc) = task_state
883 .formed_next_epoch_quorum_certificates
884 .get(&cert_view)
885 else {
886 tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
887 return;
888 };
889 if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
890 tracing::debug!(
891 "We formed the eQC but the current and next epoch QCs do not correspond to each other."
892 );
893 return;
894 }
895 let current_epoch_qc_clone = current_epoch_qc.clone();
896
897 let mut consensus_writer = task_state.consensus.write().await;
898 let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
899 let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
900 drop(consensus_writer);
901
902 if let Err(e) = task_state
903 .storage
904 .update_eqc(current_epoch_qc.clone(), next_epoch_qc.clone())
905 .await
906 {
907 tracing::error!("Failed to store EQC: {}", e);
908 }
909
910 task_state.formed_quorum_certificates =
911 task_state.formed_quorum_certificates.split_off(&cert_view);
912 task_state.formed_next_epoch_quorum_certificates = task_state
913 .formed_next_epoch_quorum_certificates
914 .split_off(&cert_view);
915
916 broadcast_event(
917 Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
918 event_sender,
919 )
920 .await;
921}