1use std::{
11 marker::PhantomData,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_task::dependency_task::HandleDepOutput;
19use hotshot_types::{
20 consensus::{CommitmentAndMetadata, OuterConsensus},
21 data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
22 epoch_membership::EpochMembership,
23 message::Proposal,
24 simple_certificate::{
25 LightClientStateUpdateCertificate, NextEpochQuorumCertificate2, QuorumCertificate2,
26 UpgradeCertificate,
27 },
28 traits::{
29 block_contents::BlockHeader,
30 node_implementation::{ConsensusTime, NodeImplementation, NodeType},
31 signature_key::SignatureKey,
32 BlockPayload,
33 },
34 utils::{
35 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
36 is_transition_block, option_epoch_from_block_number,
37 },
38 vote::HasViewNumber,
39};
40use hotshot_utils::anytrace::*;
41use tracing::instrument;
42use vbs::version::StaticVersionType;
43
44use crate::{
45 events::HotShotEvent,
46 helpers::{
47 broadcast_event, check_qc_state_cert_correspondence, parent_leaf_and_state,
48 validate_light_client_state_update_certificate, validate_qc_and_next_epoch_qc,
49 },
50 quorum_proposal::{QuorumProposalTaskState, UpgradeLock, Versions},
51};
52
53#[derive(PartialEq, Debug)]
55pub(crate) enum ProposalDependency {
56 PayloadAndMetadata,
58
59 Qc,
61
62 ViewSyncCert,
64
65 TimeoutCert,
67
68 Proposal,
70
71 VidShare,
73}
74
75pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
77 pub latest_proposed_view: TYPES::View,
79
80 pub view_number: TYPES::View,
82
83 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
85
86 pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
88
89 pub instance_state: Arc<TYPES::InstanceState>,
91
92 pub membership: EpochMembership<TYPES>,
94
95 pub public_key: TYPES::SignatureKey,
97
98 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
100
101 pub consensus: OuterConsensus<TYPES>,
103
104 pub timeout: u64,
106
107 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
114
115 pub upgrade_lock: UpgradeLock<TYPES, V>,
117
118 pub id: u64,
120
121 pub view_start_time: Instant,
123
124 pub epoch_height: u64,
126}
127
128impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
129 async fn wait_for_qc_event(
131 &self,
132 mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
133 ) -> Option<(
134 QuorumCertificate2<TYPES>,
135 Option<NextEpochQuorumCertificate2<TYPES>>,
136 Option<LightClientStateUpdateCertificate<TYPES>>,
137 )> {
138 while let Ok(event) = rx.recv_direct().await {
139 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
140 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
141 (qc, maybe_next_epoch_qc, None)
142 },
143 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
144 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
145 },
146 _ => continue,
147 };
148 if validate_qc_and_next_epoch_qc(
149 qc,
150 maybe_next_epoch_qc.as_ref(),
151 &self.consensus,
152 &self.membership.coordinator,
153 &self.upgrade_lock,
154 self.epoch_height,
155 )
156 .await
157 .is_ok()
158 {
159 if qc
160 .data
161 .block_number
162 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
163 {
164 if let Some(state_cert) = &maybe_state_cert {
166 if validate_light_client_state_update_certificate(
167 state_cert,
168 &self.membership.coordinator,
169 )
170 .await
171 .is_err()
172 || !check_qc_state_cert_correspondence(
173 qc,
174 state_cert,
175 self.epoch_height,
176 )
177 {
178 tracing::error!("Failed to validate state cert");
179 return None;
180 }
181 } else {
182 tracing::error!(
183 "Received an epoch root QC but we don't have the corresponding state \
184 cert."
185 );
186 return None;
187 }
188 } else {
189 maybe_state_cert = None;
190 }
191 return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
192 }
193 }
194 None
195 }
196
197 async fn wait_for_transition_qc(
198 &self,
199 ) -> Result<
200 Option<(
201 QuorumCertificate2<TYPES>,
202 NextEpochQuorumCertificate2<TYPES>,
203 )>,
204 > {
205 ensure!(
206 self.upgrade_lock.epochs_enabled(self.view_number).await,
207 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
208 );
209
210 let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
211
212 let wait_duration = Duration::from_millis(self.timeout / 2);
213
214 let mut rx = self.receiver.clone();
215
216 while let Ok(event) = rx.try_recv() {
219 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
220 if let Some(block_number) = qc.data.block_number {
221 if !is_transition_block(block_number, self.epoch_height) {
222 continue;
223 }
224 } else {
225 continue;
226 }
227 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
228 continue;
229 };
230 if validate_qc_and_next_epoch_qc(
231 qc,
232 Some(next_epoch_qc),
233 &self.consensus,
234 &self.membership.coordinator,
235 &self.upgrade_lock,
236 self.epoch_height,
237 )
238 .await
239 .is_ok()
240 && transition_qc
241 .as_ref()
242 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
243 {
244 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
245 }
246 }
247 }
248 while self.view_start_time.elapsed() < wait_duration {
250 let time_spent = Instant::now()
251 .checked_duration_since(self.view_start_time)
252 .ok_or(error!(
253 "Time elapsed since the start of the task is negative. This should never \
254 happen."
255 ))?;
256 let time_left = wait_duration
257 .checked_sub(time_spent)
258 .ok_or(info!("No time left"))?;
259 let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
260 return Ok(transition_qc);
261 };
262 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
263 if let Some(block_number) = qc.data.block_number {
264 if !is_transition_block(block_number, self.epoch_height) {
265 continue;
266 }
267 } else {
268 continue;
269 }
270 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
271 continue;
272 };
273 if validate_qc_and_next_epoch_qc(
274 qc,
275 Some(next_epoch_qc),
276 &self.consensus,
277 &self.membership.coordinator,
278 &self.upgrade_lock,
279 self.epoch_height,
280 )
281 .await
282 .is_ok()
283 && transition_qc
284 .as_ref()
285 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
286 {
287 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
288 }
289 }
290 }
291 Ok(transition_qc)
292 }
293 async fn wait_for_highest_qc(
297 &self,
298 ) -> Result<(
299 QuorumCertificate2<TYPES>,
300 Option<NextEpochQuorumCertificate2<TYPES>>,
301 Option<LightClientStateUpdateCertificate<TYPES>>,
302 )> {
303 tracing::debug!("waiting for QC");
304 ensure!(
306 self.upgrade_lock.epochs_enabled(self.view_number).await,
307 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
308 );
309
310 let consensus_reader = self.consensus.read().await;
311 let mut highest_qc = consensus_reader.high_qc().clone();
312 let mut state_cert = if highest_qc
313 .data
314 .block_number
315 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
316 {
317 consensus_reader.state_cert().cloned()
318 } else {
319 None
320 };
321 let mut next_epoch_qc = if highest_qc
322 .data
323 .block_number
324 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
325 {
326 let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
327 if maybe_neqc
328 .as_ref()
329 .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
330 {
331 maybe_neqc
332 } else {
333 None
334 }
335 } else {
336 None
337 };
338 drop(consensus_reader);
339
340 let wait_duration = Duration::from_millis(self.timeout / 2);
341
342 let mut rx = self.receiver.clone();
343
344 while let Ok(event) = rx.try_recv() {
346 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
347 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
348 (qc, maybe_next_epoch_qc, None)
349 },
350 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
351 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
352 },
353 _ => continue,
354 };
355 if validate_qc_and_next_epoch_qc(
356 qc,
357 maybe_next_epoch_qc.as_ref(),
358 &self.consensus,
359 &self.membership.coordinator,
360 &self.upgrade_lock,
361 self.epoch_height,
362 )
363 .await
364 .is_ok()
365 {
366 if qc
367 .data
368 .block_number
369 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
370 {
371 if let Some(state_cert) = &maybe_state_cert {
373 if validate_light_client_state_update_certificate(
374 state_cert,
375 &self.membership.coordinator,
376 )
377 .await
378 .is_err()
379 || !check_qc_state_cert_correspondence(
380 qc,
381 state_cert,
382 self.epoch_height,
383 )
384 {
385 tracing::error!("Failed to validate state cert");
386 continue;
387 }
388 } else {
389 tracing::error!(
390 "Received an epoch root QC but we don't have the corresponding state \
391 cert."
392 );
393 continue;
394 }
395 } else {
396 maybe_state_cert = None;
397 }
398 if qc.view_number() > highest_qc.view_number() {
399 highest_qc = qc.clone();
400 next_epoch_qc = maybe_next_epoch_qc.clone();
401 state_cert = maybe_state_cert;
402 }
403 }
404 }
405
406 while self.view_start_time.elapsed() < wait_duration {
408 let time_spent = Instant::now()
409 .checked_duration_since(self.view_start_time)
410 .ok_or(error!(
411 "Time elapsed since the start of the task is negative. This should never \
412 happen."
413 ))?;
414 let time_left = wait_duration
415 .checked_sub(time_spent)
416 .ok_or(info!("No time left"))?;
417 let Ok(maybe_qc_state_cert) =
418 tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
419 else {
420 tracing::info!(
421 "Some nodes did not respond with their HighQc in time. Continuing with the \
422 highest QC that we received: {highest_qc:?}"
423 );
424 return Ok((highest_qc, next_epoch_qc, state_cert));
425 };
426 let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
427 continue;
428 };
429 if qc.view_number() > highest_qc.view_number() {
430 highest_qc = qc;
431 next_epoch_qc = maybe_next_epoch_qc;
432 state_cert = maybe_state_cert;
433 }
434 }
435 Ok((highest_qc, next_epoch_qc, state_cert))
436 }
437 #[allow(clippy::too_many_arguments)]
441 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
442 async fn publish_proposal(
443 &self,
444 commitment_and_metadata: CommitmentAndMetadata<TYPES>,
445 _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
446 view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
447 formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
448 parent_qc: QuorumCertificate2<TYPES>,
449 maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
450 maybe_state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
451 ) -> Result<()> {
452 let (parent_leaf, state) = parent_leaf_and_state(
453 &self.sender,
454 &self.receiver,
455 self.membership.coordinator.clone(),
456 self.public_key.clone(),
457 self.private_key.clone(),
458 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
459 &self.upgrade_lock,
460 &parent_qc,
461 self.epoch_height,
462 )
463 .await?;
464
465 let mut upgrade_certificate = parent_leaf
480 .upgrade_certificate()
481 .or(formed_upgrade_certificate);
482
483 if let Some(cert) = upgrade_certificate.clone() {
484 if cert.is_relevant(self.view_number).await.is_err() {
485 upgrade_certificate = None;
486 }
487 }
488
489 let proposal_certificate = view_change_evidence
490 .as_ref()
491 .filter(|cert| cert.is_valid_for_view(&self.view_number))
492 .cloned();
493
494 ensure!(
495 commitment_and_metadata.block_view == self.view_number,
496 "Cannot propose because our VID payload commitment and metadata is for an older view."
497 );
498
499 let version = self.upgrade_lock.version(self.view_number).await?;
500
501 let builder_commitment = commitment_and_metadata.builder_commitment.clone();
502 let metadata = commitment_and_metadata.metadata.clone();
503
504 if version >= V::Epochs::VERSION
505 && parent_qc.view_number()
506 > self
507 .upgrade_lock
508 .upgrade_view()
509 .await
510 .unwrap_or(TYPES::View::new(0))
511 {
512 let Some(parent_block_number) = parent_qc.data.block_number else {
513 tracing::error!("Parent QC does not have a block number. Do not propose.");
514 return Ok(());
515 };
516 if is_epoch_transition(parent_block_number, self.epoch_height)
517 && !is_last_block(parent_block_number, self.epoch_height)
518 {
519 let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
520 tracing::info!("Reached end of epoch.");
521 ensure!(
522 builder_commitment == empty_payload.builder_commitment(&metadata)
523 && metadata == empty_metadata,
524 "We're trying to propose non empty block in the epoch transition. Do not \
525 propose. View number: {}. Parent Block number: {}",
526 self.view_number,
527 parent_block_number,
528 );
529 }
530 if is_epoch_root(parent_block_number, self.epoch_height) {
531 ensure!(
532 maybe_state_cert.as_ref().is_some_and(|state_cert| {
533 check_qc_state_cert_correspondence(
534 &parent_qc,
535 state_cert,
536 self.epoch_height,
537 )
538 }),
539 "We are proposing with parent epoch root QC but we don't have the \
540 corresponding state cert."
541 );
542 }
543 }
544 let block_header = TYPES::BlockHeader::new(
545 state.as_ref(),
546 self.instance_state.as_ref(),
547 &parent_leaf,
548 commitment_and_metadata.commitment,
549 builder_commitment,
550 metadata,
551 commitment_and_metadata.fees.first().clone(),
552 version,
553 *self.view_number,
554 )
555 .await
556 .wrap()
557 .context(warn!("Failed to construct block header"))?;
558 let epoch = option_epoch_from_block_number::<TYPES>(
559 version >= V::Epochs::VERSION,
560 block_header.block_number(),
561 self.epoch_height,
562 );
563
564 let epoch_membership = self
565 .membership
566 .coordinator
567 .membership_for_epoch(epoch)
568 .await?;
569 if epoch_membership.leader(self.view_number).await? != self.public_key {
572 tracing::warn!(
573 "We are not the leader in the epoch for which we are about to propose. Do not \
574 send the quorum proposal."
575 );
576 return Ok(());
577 }
578 let is_high_qc_for_transition_block = parent_qc
579 .data
580 .block_number
581 .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
582 let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number).await
583 && is_high_qc_for_transition_block
584 {
585 ensure!(
586 maybe_next_epoch_qc
587 .as_ref()
588 .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
589 "Jusify QC on our proposal is for an epoch transition block but we don't have the \
590 corresponding next epoch QC. Do not propose."
591 );
592 maybe_next_epoch_qc
593 } else {
594 None
595 };
596 let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
597 {
598 if let Some(epoch_val) = &epoch {
599 self.consensus
600 .read()
601 .await
602 .drb_results
603 .results
604 .get(&(*epoch_val + 1))
605 .copied()
606 } else {
607 None
608 }
609 } else {
610 None
611 };
612
613 let proposal = QuorumProposalWrapper {
614 proposal: QuorumProposal2 {
615 block_header,
616 view_number: self.view_number,
617 epoch,
618 justify_qc: parent_qc,
619 next_epoch_justify_qc: next_epoch_qc,
620 upgrade_certificate,
621 view_change_evidence: proposal_certificate,
622 next_drb_result,
623 state_cert: maybe_state_cert,
624 },
625 };
626
627 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
628 ensure!(
629 proposed_leaf.parent_commitment() == parent_leaf.commit(),
630 "Proposed leaf parent does not equal high qc"
631 );
632
633 let signature =
634 TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
635 .wrap()
636 .context(error!("Failed to compute proposed_leaf.commit()"))?;
637
638 let message = Proposal {
639 data: proposal,
640 signature,
641 _pd: PhantomData,
642 };
643 tracing::info!(
644 "Sending proposal for view {}, height {}, justify_qc view: {}",
645 proposed_leaf.view_number(),
646 proposed_leaf.height(),
647 proposed_leaf.justify_qc().view_number()
648 );
649
650 broadcast_event(
651 Arc::new(HotShotEvent::QuorumProposalSend(
652 message.clone(),
653 self.public_key.clone(),
654 )),
655 &self.sender,
656 )
657 .await;
658
659 Ok(())
660 }
661
662 fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
663 let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
664 tracing::warn!("Failed to propose, events: {:#?}", events);
665 }
666
667 async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
668 let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
669 let mut timeout_certificate = None;
670 let mut view_sync_finalize_cert = None;
671 let mut vid_share = None;
672 let mut parent_qc = None;
673 let mut next_epoch_qc = None;
674 let mut state_cert = None;
675 for event in res.iter().flatten().flatten() {
676 match event.as_ref() {
677 HotShotEvent::SendPayloadCommitmentAndMetadata(
678 payload_commitment,
679 builder_commitment,
680 metadata,
681 view,
682 fees,
683 ) => {
684 commit_and_metadata = Some(CommitmentAndMetadata {
685 commitment: *payload_commitment,
686 builder_commitment: builder_commitment.clone(),
687 metadata: metadata.clone(),
688 fees: fees.clone(),
689 block_view: *view,
690 });
691 },
692 HotShotEvent::Qc2Formed(cert) => match cert {
693 either::Right(timeout) => {
694 timeout_certificate = Some(timeout.clone());
695 },
696 either::Left(qc) => {
697 parent_qc = Some(qc.clone());
698 },
699 },
700 HotShotEvent::EpochRootQcFormed(root_qc) => {
701 parent_qc = Some(root_qc.qc.clone());
702 state_cert = Some(root_qc.state_cert.clone());
703 },
704 HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
705 view_sync_finalize_cert = Some(cert.clone());
706 },
707 HotShotEvent::VidDisperseSend(share, _) => {
708 vid_share = Some(share.clone());
709 },
710 HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
711 next_epoch_qc = Some(qc.clone());
712 },
713 _ => {},
714 }
715 }
716
717 let Ok(version) = self.upgrade_lock.version(self.view_number).await else {
718 bail!(error!(
719 "Failed to get version for view {:?}, not proposing",
720 self.view_number
721 ));
722 };
723
724 let mut maybe_epoch = None;
725 let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
726 maybe_epoch = view_sync_cert.data.epoch;
727 Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
728 } else {
729 match timeout_certificate {
730 Some(timeout_cert) => {
731 maybe_epoch = timeout_cert.data.epoch;
732 Some(ViewChangeEvidence2::Timeout(timeout_cert))
733 },
734 None => None,
735 }
736 };
737
738 let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
739 if qc
740 .data
741 .block_number
742 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
743 && next_epoch_qc
744 .as_ref()
745 .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
746 {
747 bail!(error!(
748 "We've formed a transition QC but we haven't formed the corresponding next \
749 epoch QC. Do not propose."
750 ));
751 }
752 (qc, next_epoch_qc, state_cert)
753 } else if version < V::Epochs::VERSION {
754 (self.consensus.read().await.high_qc().clone(), None, None)
755 } else if proposal_cert.is_some() {
756 if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
758 let Some(epoch) = maybe_epoch else {
759 bail!(error!(
760 "No epoch found on view change evidence, but we are in epoch mode"
761 ));
762 };
763 if qc
764 .data
765 .block_number
766 .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
767 {
768 (qc, Some(next_epoch_qc), None)
769 } else {
770 match self.wait_for_highest_qc().await {
771 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
772 (qc, maybe_next_epoch_qc, maybe_state_cert)
773 },
774 Err(e) => {
775 bail!(error!("Error while waiting for highest QC: {e:?}"));
776 },
777 }
778 }
779 } else {
780 let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
781 self.wait_for_highest_qc().await
782 else {
783 bail!(error!("Error while waiting for highest QC"));
784 };
785 if qc.data.block_number.is_some_and(|bn| {
786 is_epoch_transition(bn, self.epoch_height)
787 && !is_last_block(bn, self.epoch_height)
788 }) {
789 bail!(error!(
790 "High is in transition but we need to propose with transition QC, do \
791 nothing"
792 ));
793 }
794 (qc, maybe_next_epoch_qc, maybe_state_cert)
795 }
796 } else {
797 match self.wait_for_highest_qc().await {
798 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
799 (qc, maybe_next_epoch_qc, maybe_state_cert)
800 },
801 Err(e) => {
802 bail!(error!("Error while waiting for highest QC: {e:?}"));
803 },
804 }
805 };
806
807 ensure!(
808 commit_and_metadata.is_some(),
809 error!(
810 "Somehow completed the proposal dependency task without a commitment and metadata"
811 )
812 );
813
814 ensure!(
815 vid_share.is_some(),
816 error!("Somehow completed the proposal dependency task without a VID share")
817 );
818
819 self.publish_proposal(
820 commit_and_metadata.unwrap(),
821 vid_share.unwrap(),
822 proposal_cert,
823 self.formed_upgrade_certificate.clone(),
824 parent_qc,
825 maybe_next_epoch_qc,
826 maybe_state_cert,
827 )
828 .await
829 }
830}
831
832impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<TYPES, V> {
833 type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
834
835 #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
836 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
837 async fn handle_dep_result(self, res: Self::Output) {
838 let result = self.handle_proposal_deps(&res).await;
839 if result.is_err() {
840 log!(result);
841 self.print_proposal_events(&res)
842 }
843 }
844}
845
846pub(super) async fn handle_eqc_formed<
847 TYPES: NodeType,
848 I: NodeImplementation<TYPES>,
849 V: Versions,
850>(
851 cert_view: TYPES::View,
852 leaf_commit: Commitment<Leaf2<TYPES>>,
853 block_number: Option<u64>,
854 task_state: &mut QuorumProposalTaskState<TYPES, I, V>,
855 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
856) {
857 if !task_state.upgrade_lock.epochs_enabled(cert_view).await {
858 tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
859 return;
860 }
861 if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
862 tracing::debug!("We formed QC but not eQC. Do nothing");
863 return;
864 }
865
866 let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
867 tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
868 return;
869 };
870 if current_epoch_qc.view_number() != cert_view
871 || current_epoch_qc.data.leaf_commit != leaf_commit
872 {
873 tracing::debug!("We haven't yet formed the eQC. Do nothing");
874 return;
875 }
876 let Some(next_epoch_qc) = task_state
877 .formed_next_epoch_quorum_certificates
878 .get(&cert_view)
879 else {
880 tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
881 return;
882 };
883 if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
884 tracing::debug!(
885 "We formed the eQC but the current and next epoch QCs do not correspond to each other."
886 );
887 return;
888 }
889 let current_epoch_qc_clone = current_epoch_qc.clone();
890
891 let mut consensus_writer = task_state.consensus.write().await;
892 let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
893 let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
894 drop(consensus_writer);
895
896 task_state.formed_quorum_certificates =
897 task_state.formed_quorum_certificates.split_off(&cert_view);
898 task_state.formed_next_epoch_quorum_certificates = task_state
899 .formed_next_epoch_quorum_certificates
900 .split_off(&cert_view);
901
902 broadcast_event(
903 Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
904 event_sender,
905 )
906 .await;
907}