hotshot_task_impls/quorum_proposal/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! This module holds the dependency task for the QuorumProposalTask. It is spawned whenever an event that could
8//! initiate a proposal occurs.
9
10use std::{
11    marker::PhantomData,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_task::dependency_task::HandleDepOutput;
19use hotshot_types::{
20    consensus::{CommitmentAndMetadata, OuterConsensus},
21    data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
22    epoch_membership::EpochMembership,
23    message::Proposal,
24    simple_certificate::{
25        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
26        UpgradeCertificate,
27    },
28    traits::{
29        block_contents::BlockHeader,
30        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
31        signature_key::SignatureKey,
32        storage::Storage,
33        BlockPayload,
34    },
35    utils::{
36        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
37        is_transition_block, option_epoch_from_block_number,
38    },
39    vote::HasViewNumber,
40};
41use hotshot_utils::anytrace::*;
42use tracing::instrument;
43use vbs::version::StaticVersionType;
44
45use crate::{
46    events::HotShotEvent,
47    helpers::{
48        broadcast_event, check_qc_state_cert_correspondence, parent_leaf_and_state,
49        validate_light_client_state_update_certificate, validate_qc_and_next_epoch_qc,
50    },
51    quorum_proposal::{QuorumProposalTaskState, UpgradeLock, Versions},
52};
53
54/// Proposal dependency types. These types represent events that precipitate a proposal.
55#[derive(PartialEq, Debug)]
56pub(crate) enum ProposalDependency {
57    /// For the `SendPayloadCommitmentAndMetadata` event.
58    PayloadAndMetadata,
59
60    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event.
61    Qc,
62
63    /// For the `ViewSyncFinalizeCertificateRecv` event.
64    ViewSyncCert,
65
66    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event timeout branch.
67    TimeoutCert,
68
69    /// For the `QuorumProposalRecv` event.
70    Proposal,
71
72    /// For the `VidShareValidated` event.
73    VidShare,
74}
75
76/// Handler for the proposal dependency
77pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
78    /// Latest view number that has been proposed for (proxy for cur_view).
79    pub latest_proposed_view: TYPES::View,
80
81    /// The view number to propose for.
82    pub view_number: TYPES::View,
83
84    /// The event sender.
85    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87    /// The event receiver.
88    pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
89
90    /// Immutable instance state
91    pub instance_state: Arc<TYPES::InstanceState>,
92
93    /// Membership for Quorum Certs/votes
94    pub membership: EpochMembership<TYPES>,
95
96    /// Our public key
97    pub public_key: TYPES::SignatureKey,
98
99    /// Our Private Key
100    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
101
102    /// Shared consensus task state
103    pub consensus: OuterConsensus<TYPES>,
104
105    /// View timeout from config.
106    pub timeout: u64,
107
108    /// The most recent upgrade certificate this node formed.
109    /// Note: this is ONLY for certificates that have been formed internally,
110    /// so that we can propose with them.
111    ///
112    /// Certificates received from other nodes will get reattached regardless of this fields,
113    /// since they will be present in the leaf we propose off of.
114    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
115
116    /// Lock for a decided upgrade
117    pub upgrade_lock: UpgradeLock<TYPES, V>,
118
119    /// The node's id
120    pub id: u64,
121
122    /// The time this view started
123    pub view_start_time: Instant,
124
125    /// Number of blocks in an epoch, zero means there are no epochs
126    pub epoch_height: u64,
127}
128
129impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
130    /// Return the next HighQc we get from the event stream
131    async fn wait_for_qc_event(
132        &self,
133        mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
134    ) -> Option<(
135        QuorumCertificate2<TYPES>,
136        Option<NextEpochQuorumCertificate2<TYPES>>,
137        Option<LightClientStateUpdateCertificateV2<TYPES>>,
138    )> {
139        while let Ok(event) = rx.recv_direct().await {
140            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
141                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
142                    (qc, maybe_next_epoch_qc, None)
143                },
144                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
145                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
146                },
147                _ => continue,
148            };
149            if validate_qc_and_next_epoch_qc(
150                qc,
151                maybe_next_epoch_qc.as_ref(),
152                &self.consensus,
153                &self.membership.coordinator,
154                &self.upgrade_lock,
155                self.epoch_height,
156            )
157            .await
158            .is_ok()
159            {
160                if qc
161                    .data
162                    .block_number
163                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
164                {
165                    // Validate the state cert
166                    if let Some(state_cert) = &maybe_state_cert {
167                        if validate_light_client_state_update_certificate(
168                            state_cert,
169                            &self.membership.coordinator,
170                            &self.upgrade_lock,
171                        )
172                        .await
173                        .is_err()
174                            || !check_qc_state_cert_correspondence(
175                                qc,
176                                state_cert,
177                                self.epoch_height,
178                            )
179                        {
180                            tracing::error!("Failed to validate state cert");
181                            return None;
182                        }
183                    } else {
184                        tracing::error!(
185                            "Received an epoch root QC but we don't have the corresponding state \
186                             cert."
187                        );
188                        return None;
189                    }
190                } else {
191                    maybe_state_cert = None;
192                }
193                return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
194            }
195        }
196        None
197    }
198
199    async fn wait_for_transition_qc(
200        &self,
201    ) -> Result<
202        Option<(
203            QuorumCertificate2<TYPES>,
204            NextEpochQuorumCertificate2<TYPES>,
205        )>,
206    > {
207        ensure!(
208            self.upgrade_lock.epochs_enabled(self.view_number).await,
209            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
210        );
211
212        let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
213
214        let wait_duration = Duration::from_millis(self.timeout / 2);
215
216        let mut rx = self.receiver.clone();
217
218        // drain any qc off the queue
219        // We don't watch for EpochRootQcRecv events here because it's not in transition.
220        while let Ok(event) = rx.try_recv() {
221            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
222                if let Some(block_number) = qc.data.block_number {
223                    if !is_transition_block(block_number, self.epoch_height) {
224                        continue;
225                    }
226                } else {
227                    continue;
228                }
229                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
230                    continue;
231                };
232                if validate_qc_and_next_epoch_qc(
233                    qc,
234                    Some(next_epoch_qc),
235                    &self.consensus,
236                    &self.membership.coordinator,
237                    &self.upgrade_lock,
238                    self.epoch_height,
239                )
240                .await
241                .is_ok()
242                    && transition_qc
243                        .as_ref()
244                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
245                {
246                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
247                }
248            }
249        }
250        // TODO configure timeout
251        while self.view_start_time.elapsed() < wait_duration {
252            let time_spent = Instant::now()
253                .checked_duration_since(self.view_start_time)
254                .ok_or(error!(
255                    "Time elapsed since the start of the task is negative. This should never \
256                     happen."
257                ))?;
258            let time_left = wait_duration
259                .checked_sub(time_spent)
260                .ok_or(info!("No time left"))?;
261            let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
262                return Ok(transition_qc);
263            };
264            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
265                if let Some(block_number) = qc.data.block_number {
266                    if !is_transition_block(block_number, self.epoch_height) {
267                        continue;
268                    }
269                } else {
270                    continue;
271                }
272                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
273                    continue;
274                };
275                if validate_qc_and_next_epoch_qc(
276                    qc,
277                    Some(next_epoch_qc),
278                    &self.consensus,
279                    &self.membership.coordinator,
280                    &self.upgrade_lock,
281                    self.epoch_height,
282                )
283                .await
284                .is_ok()
285                    && transition_qc
286                        .as_ref()
287                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
288                {
289                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
290                }
291            }
292        }
293        Ok(transition_qc)
294    }
295    /// Waits for the configured timeout for nodes to send HighQc messages to us.  We'll
296    /// then propose with the highest QC from among these proposals. A light client state
297    /// update certificate is also returned if the highest QC is an epoch root QC.
298    async fn wait_for_highest_qc(
299        &self,
300    ) -> Result<(
301        QuorumCertificate2<TYPES>,
302        Option<NextEpochQuorumCertificate2<TYPES>>,
303        Option<LightClientStateUpdateCertificateV2<TYPES>>,
304    )> {
305        tracing::debug!("waiting for QC");
306        // If we haven't upgraded to Hotstuff 2 just return the high qc right away
307        ensure!(
308            self.upgrade_lock.epochs_enabled(self.view_number).await,
309            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
310        );
311
312        let consensus_reader = self.consensus.read().await;
313        let mut highest_qc = consensus_reader.high_qc().clone();
314        let mut state_cert = if highest_qc
315            .data
316            .block_number
317            .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
318        {
319            consensus_reader.state_cert().cloned()
320        } else {
321            None
322        };
323        let mut next_epoch_qc = if highest_qc
324            .data
325            .block_number
326            .is_some_and(|bn| is_last_block(bn, self.epoch_height))
327        {
328            let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
329            if maybe_neqc
330                .as_ref()
331                .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
332            {
333                maybe_neqc
334            } else {
335                None
336            }
337        } else {
338            None
339        };
340        drop(consensus_reader);
341
342        let wait_duration = Duration::from_millis(self.timeout / 2);
343
344        let mut rx = self.receiver.clone();
345
346        // drain any qc off the queue
347        while let Ok(event) = rx.try_recv() {
348            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
349                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
350                    (qc, maybe_next_epoch_qc, None)
351                },
352                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
353                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
354                },
355                _ => continue,
356            };
357            if validate_qc_and_next_epoch_qc(
358                qc,
359                maybe_next_epoch_qc.as_ref(),
360                &self.consensus,
361                &self.membership.coordinator,
362                &self.upgrade_lock,
363                self.epoch_height,
364            )
365            .await
366            .is_ok()
367            {
368                if qc
369                    .data
370                    .block_number
371                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
372                {
373                    // Validate the state cert
374                    if let Some(state_cert) = &maybe_state_cert {
375                        if validate_light_client_state_update_certificate(
376                            state_cert,
377                            &self.membership.coordinator,
378                            &self.upgrade_lock,
379                        )
380                        .await
381                        .is_err()
382                            || !check_qc_state_cert_correspondence(
383                                qc,
384                                state_cert,
385                                self.epoch_height,
386                            )
387                        {
388                            tracing::error!("Failed to validate state cert");
389                            continue;
390                        }
391                    } else {
392                        tracing::error!(
393                            "Received an epoch root QC but we don't have the corresponding state \
394                             cert."
395                        );
396                        continue;
397                    }
398                } else {
399                    maybe_state_cert = None;
400                }
401                if qc.view_number() > highest_qc.view_number() {
402                    highest_qc = qc.clone();
403                    next_epoch_qc = maybe_next_epoch_qc.clone();
404                    state_cert = maybe_state_cert;
405                }
406            }
407        }
408
409        // TODO configure timeout
410        while self.view_start_time.elapsed() < wait_duration {
411            let time_spent = Instant::now()
412                .checked_duration_since(self.view_start_time)
413                .ok_or(error!(
414                    "Time elapsed since the start of the task is negative. This should never \
415                     happen."
416                ))?;
417            let time_left = wait_duration
418                .checked_sub(time_spent)
419                .ok_or(info!("No time left"))?;
420            let Ok(maybe_qc_state_cert) =
421                tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
422            else {
423                tracing::info!(
424                    "Some nodes did not respond with their HighQc in time. Continuing with the \
425                     highest QC that we received: {highest_qc:?}"
426                );
427                return Ok((highest_qc, next_epoch_qc, state_cert));
428            };
429            let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
430                continue;
431            };
432            if qc.view_number() > highest_qc.view_number() {
433                highest_qc = qc;
434                next_epoch_qc = maybe_next_epoch_qc;
435                state_cert = maybe_state_cert;
436            }
437        }
438        Ok((highest_qc, next_epoch_qc, state_cert))
439    }
440    /// Publishes a proposal given the [`CommitmentAndMetadata`], [`VidDisperse`]
441    /// and high qc [`hotshot_types::simple_certificate::QuorumCertificate`],
442    /// with optional [`ViewChangeEvidence`].
443    #[allow(clippy::too_many_arguments)]
444    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
445    async fn publish_proposal(
446        &self,
447        commitment_and_metadata: CommitmentAndMetadata<TYPES>,
448        _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
449        view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
450        formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
451        parent_qc: QuorumCertificate2<TYPES>,
452        maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
453        maybe_state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
454    ) -> Result<()> {
455        let (parent_leaf, state) = parent_leaf_and_state(
456            &self.sender,
457            &self.receiver,
458            self.membership.coordinator.clone(),
459            self.public_key.clone(),
460            self.private_key.clone(),
461            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
462            &self.upgrade_lock,
463            &parent_qc,
464            self.epoch_height,
465        )
466        .await?;
467
468        // In order of priority, we should try to attach:
469        //   - the parent certificate if it exists, or
470        //   - our own certificate that we formed.
471        // In either case, we need to ensure that the certificate is still relevant.
472        //
473        // Note: once we reach a point of potentially propose with our formed upgrade certificate,
474        // we will ALWAYS drop it. If we cannot immediately use it for whatever reason, we choose
475        // to discard it.
476        //
477        // It is possible that multiple nodes form separate upgrade certificates for the some
478        // upgrade if we are not careful about voting. But this shouldn't bother us: the first
479        // leader to propose is the one whose certificate will be used. And if that fails to reach
480        // a decide for whatever reason, we may lose our own certificate, but something will likely
481        // have gone wrong there anyway.
482        let mut upgrade_certificate = parent_leaf
483            .upgrade_certificate()
484            .or(formed_upgrade_certificate);
485
486        if let Some(cert) = upgrade_certificate.clone() {
487            if cert.is_relevant(self.view_number).await.is_err() {
488                upgrade_certificate = None;
489            }
490        }
491
492        let proposal_certificate = view_change_evidence
493            .as_ref()
494            .filter(|cert| cert.is_valid_for_view(&self.view_number))
495            .cloned();
496
497        ensure!(
498            commitment_and_metadata.block_view == self.view_number,
499            "Cannot propose because our VID payload commitment and metadata is for an older view."
500        );
501
502        let version = self.upgrade_lock.version(self.view_number).await?;
503
504        let builder_commitment = commitment_and_metadata.builder_commitment.clone();
505        let metadata = commitment_and_metadata.metadata.clone();
506
507        if version >= V::Epochs::VERSION
508            && parent_qc.view_number()
509                > self
510                    .upgrade_lock
511                    .upgrade_view()
512                    .await
513                    .unwrap_or(TYPES::View::new(0))
514        {
515            let Some(parent_block_number) = parent_qc.data.block_number else {
516                tracing::error!("Parent QC does not have a block number. Do not propose.");
517                return Ok(());
518            };
519            if is_epoch_transition(parent_block_number, self.epoch_height)
520                && !is_last_block(parent_block_number, self.epoch_height)
521            {
522                let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
523                tracing::info!("Reached end of epoch.");
524                ensure!(
525                    builder_commitment == empty_payload.builder_commitment(&metadata)
526                        && metadata == empty_metadata,
527                    "We're trying to propose non empty block in the epoch transition. Do not \
528                     propose. View number: {}. Parent Block number: {}",
529                    self.view_number,
530                    parent_block_number,
531                );
532            }
533            if is_epoch_root(parent_block_number, self.epoch_height) {
534                ensure!(
535                    maybe_state_cert.as_ref().is_some_and(|state_cert| {
536                        check_qc_state_cert_correspondence(
537                            &parent_qc,
538                            state_cert,
539                            self.epoch_height,
540                        )
541                    }),
542                    "We are proposing with parent epoch root QC but we don't have the \
543                     corresponding state cert."
544                );
545            }
546        }
547        let block_header = TYPES::BlockHeader::new(
548            state.as_ref(),
549            self.instance_state.as_ref(),
550            &parent_leaf,
551            commitment_and_metadata.commitment,
552            builder_commitment,
553            metadata,
554            commitment_and_metadata.fees.first().clone(),
555            version,
556            *self.view_number,
557        )
558        .await
559        .wrap()
560        .context(warn!("Failed to construct block header"))?;
561        let epoch = option_epoch_from_block_number::<TYPES>(
562            version >= V::Epochs::VERSION,
563            block_header.block_number(),
564            self.epoch_height,
565        );
566
567        let epoch_membership = self
568            .membership
569            .coordinator
570            .membership_for_epoch(epoch)
571            .await?;
572        // Make sure we are the leader for the view and epoch.
573        // We might have ended up here because we were in the epoch transition.
574        if epoch_membership.leader(self.view_number).await? != self.public_key {
575            tracing::warn!(
576                "We are not the leader in the epoch for which we are about to propose. Do not \
577                 send the quorum proposal."
578            );
579            return Ok(());
580        }
581        let is_high_qc_for_transition_block = parent_qc
582            .data
583            .block_number
584            .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
585        let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number).await
586            && is_high_qc_for_transition_block
587        {
588            ensure!(
589                maybe_next_epoch_qc
590                    .as_ref()
591                    .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
592                "Jusify QC on our proposal is for an epoch transition block but we don't have the \
593                 corresponding next epoch QC. Do not propose."
594            );
595            maybe_next_epoch_qc
596        } else {
597            None
598        };
599        let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
600        {
601            if let Some(epoch_val) = &epoch {
602                let drb_result = epoch_membership
603                    .next_epoch()
604                    .await
605                    .context(warn!("No stake table for epoch {}", *epoch_val + 1))?
606                    .get_epoch_drb()
607                    .await
608                    .clone()
609                    .context(warn!("No DRB result for epoch {}", *epoch_val + 1))?;
610
611                Some(drb_result)
612            } else {
613                None
614            }
615        } else {
616            None
617        };
618
619        let proposal = QuorumProposalWrapper {
620            proposal: QuorumProposal2 {
621                block_header,
622                view_number: self.view_number,
623                epoch,
624                justify_qc: parent_qc,
625                next_epoch_justify_qc: next_epoch_qc,
626                upgrade_certificate,
627                view_change_evidence: proposal_certificate,
628                next_drb_result,
629                state_cert: maybe_state_cert,
630            },
631        };
632
633        let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
634        ensure!(
635            proposed_leaf.parent_commitment() == parent_leaf.commit(),
636            "Proposed leaf parent does not equal high qc"
637        );
638
639        let signature =
640            TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
641                .wrap()
642                .context(error!("Failed to compute proposed_leaf.commit()"))?;
643
644        let message = Proposal {
645            data: proposal,
646            signature,
647            _pd: PhantomData,
648        };
649        tracing::info!(
650            "Sending proposal for view {}, height {}, justify_qc view: {}",
651            proposed_leaf.view_number(),
652            proposed_leaf.height(),
653            proposed_leaf.justify_qc().view_number()
654        );
655
656        broadcast_event(
657            Arc::new(HotShotEvent::QuorumProposalSend(
658                message.clone(),
659                self.public_key.clone(),
660            )),
661            &self.sender,
662        )
663        .await;
664
665        Ok(())
666    }
667
668    fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
669        let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
670        tracing::warn!("Failed to propose, events: {:#?}", events);
671    }
672
673    async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
674        let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
675        let mut timeout_certificate = None;
676        let mut view_sync_finalize_cert = None;
677        let mut vid_share = None;
678        let mut parent_qc = None;
679        let mut next_epoch_qc = None;
680        let mut state_cert = None;
681        for event in res.iter().flatten().flatten() {
682            match event.as_ref() {
683                HotShotEvent::SendPayloadCommitmentAndMetadata(
684                    payload_commitment,
685                    builder_commitment,
686                    metadata,
687                    view,
688                    fees,
689                ) => {
690                    commit_and_metadata = Some(CommitmentAndMetadata {
691                        commitment: *payload_commitment,
692                        builder_commitment: builder_commitment.clone(),
693                        metadata: metadata.clone(),
694                        fees: fees.clone(),
695                        block_view: *view,
696                    });
697                },
698                HotShotEvent::Qc2Formed(cert) => match cert {
699                    either::Right(timeout) => {
700                        timeout_certificate = Some(timeout.clone());
701                    },
702                    either::Left(qc) => {
703                        parent_qc = Some(qc.clone());
704                    },
705                },
706                HotShotEvent::EpochRootQcFormed(root_qc) => {
707                    parent_qc = Some(root_qc.qc.clone());
708                    state_cert = Some(root_qc.state_cert.clone());
709                },
710                HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
711                    view_sync_finalize_cert = Some(cert.clone());
712                },
713                HotShotEvent::VidDisperseSend(share, _) => {
714                    vid_share = Some(share.clone());
715                },
716                HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
717                    next_epoch_qc = Some(qc.clone());
718                },
719                _ => {},
720            }
721        }
722
723        let Ok(version) = self.upgrade_lock.version(self.view_number).await else {
724            bail!(error!(
725                "Failed to get version for view {:?}, not proposing",
726                self.view_number
727            ));
728        };
729
730        let mut maybe_epoch = None;
731        let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
732            maybe_epoch = view_sync_cert.data.epoch;
733            Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
734        } else {
735            match timeout_certificate {
736                Some(timeout_cert) => {
737                    maybe_epoch = timeout_cert.data.epoch;
738                    Some(ViewChangeEvidence2::Timeout(timeout_cert))
739                },
740                None => None,
741            }
742        };
743
744        let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
745            if qc
746                .data
747                .block_number
748                .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
749                && next_epoch_qc
750                    .as_ref()
751                    .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
752            {
753                bail!(error!(
754                    "We've formed a transition QC but we haven't formed the corresponding next \
755                     epoch QC. Do not propose."
756                ));
757            }
758            (qc, next_epoch_qc, state_cert)
759        } else if version < V::Epochs::VERSION {
760            (self.consensus.read().await.high_qc().clone(), None, None)
761        } else if proposal_cert.is_some() {
762            // If we have a view change evidence, we need to wait to propose with the transition QC
763            if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
764                let Some(epoch) = maybe_epoch else {
765                    bail!(error!(
766                        "No epoch found on view change evidence, but we are in epoch mode"
767                    ));
768                };
769                if qc
770                    .data
771                    .block_number
772                    .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
773                {
774                    (qc, Some(next_epoch_qc), None)
775                } else {
776                    match self.wait_for_highest_qc().await {
777                        Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
778                            (qc, maybe_next_epoch_qc, maybe_state_cert)
779                        },
780                        Err(e) => {
781                            bail!(error!("Error while waiting for highest QC: {e:?}"));
782                        },
783                    }
784                }
785            } else {
786                let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
787                    self.wait_for_highest_qc().await
788                else {
789                    bail!(error!("Error while waiting for highest QC"));
790                };
791                if qc.data.block_number.is_some_and(|bn| {
792                    is_epoch_transition(bn, self.epoch_height)
793                        && !is_last_block(bn, self.epoch_height)
794                }) {
795                    bail!(error!(
796                        "High is in transition but we need to propose with transition QC, do \
797                         nothing"
798                    ));
799                }
800                (qc, maybe_next_epoch_qc, maybe_state_cert)
801            }
802        } else {
803            match self.wait_for_highest_qc().await {
804                Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
805                    (qc, maybe_next_epoch_qc, maybe_state_cert)
806                },
807                Err(e) => {
808                    bail!(error!("Error while waiting for highest QC: {e:?}"));
809                },
810            }
811        };
812
813        ensure!(
814            commit_and_metadata.is_some(),
815            error!(
816                "Somehow completed the proposal dependency task without a commitment and metadata"
817            )
818        );
819
820        ensure!(
821            vid_share.is_some(),
822            error!("Somehow completed the proposal dependency task without a VID share")
823        );
824
825        self.publish_proposal(
826            commit_and_metadata.unwrap(),
827            vid_share.unwrap(),
828            proposal_cert,
829            self.formed_upgrade_certificate.clone(),
830            parent_qc,
831            maybe_next_epoch_qc,
832            maybe_state_cert,
833        )
834        .await
835    }
836}
837
838impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<TYPES, V> {
839    type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
840
841    #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
842    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
843    async fn handle_dep_result(self, res: Self::Output) {
844        let result = self.handle_proposal_deps(&res).await;
845        if result.is_err() {
846            log!(result);
847            self.print_proposal_events(&res)
848        }
849    }
850}
851
852pub(super) async fn handle_eqc_formed<
853    TYPES: NodeType,
854    I: NodeImplementation<TYPES>,
855    V: Versions,
856>(
857    cert_view: TYPES::View,
858    leaf_commit: Commitment<Leaf2<TYPES>>,
859    block_number: Option<u64>,
860    task_state: &mut QuorumProposalTaskState<TYPES, I, V>,
861    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
862) {
863    if !task_state.upgrade_lock.epochs_enabled(cert_view).await {
864        tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
865        return;
866    }
867    if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
868        tracing::debug!("We formed QC but not eQC. Do nothing");
869        return;
870    }
871
872    let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
873        tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
874        return;
875    };
876    if current_epoch_qc.view_number() != cert_view
877        || current_epoch_qc.data.leaf_commit != leaf_commit
878    {
879        tracing::debug!("We haven't yet formed the eQC. Do nothing");
880        return;
881    }
882    let Some(next_epoch_qc) = task_state
883        .formed_next_epoch_quorum_certificates
884        .get(&cert_view)
885    else {
886        tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
887        return;
888    };
889    if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
890        tracing::debug!(
891            "We formed the eQC but the current and next epoch QCs do not correspond to each other."
892        );
893        return;
894    }
895    let current_epoch_qc_clone = current_epoch_qc.clone();
896
897    let mut consensus_writer = task_state.consensus.write().await;
898    let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
899    let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
900    drop(consensus_writer);
901
902    if let Err(e) = task_state
903        .storage
904        .update_eqc(current_epoch_qc.clone(), next_epoch_qc.clone())
905        .await
906    {
907        tracing::error!("Failed to store EQC: {}", e);
908    }
909
910    task_state.formed_quorum_certificates =
911        task_state.formed_quorum_certificates.split_off(&cert_view);
912    task_state.formed_next_epoch_quorum_certificates = task_state
913        .formed_next_epoch_quorum_certificates
914        .split_off(&cert_view);
915
916    broadcast_event(
917        Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
918        event_sender,
919    )
920    .await;
921}