hotshot_task_impls/quorum_proposal/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! This module holds the dependency task for the QuorumProposalTask. It is spawned whenever an event that could
8//! initiate a proposal occurs.
9
10use std::{
11    marker::PhantomData,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_task::dependency_task::HandleDepOutput;
19use hotshot_types::{
20    consensus::{CommitmentAndMetadata, OuterConsensus},
21    data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
22    epoch_membership::EpochMembership,
23    message::Proposal,
24    simple_certificate::{
25        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
26        UpgradeCertificate,
27    },
28    traits::{
29        block_contents::BlockHeader,
30        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
31        signature_key::SignatureKey,
32        storage::Storage,
33        BlockPayload,
34    },
35    utils::{
36        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
37        is_transition_block, option_epoch_from_block_number,
38    },
39    vote::HasViewNumber,
40};
41use hotshot_utils::anytrace::*;
42use tracing::instrument;
43use vbs::version::StaticVersionType;
44
45use crate::{
46    events::HotShotEvent,
47    helpers::{
48        broadcast_event, check_qc_state_cert_correspondence, parent_leaf_and_state,
49        validate_light_client_state_update_certificate, validate_qc_and_next_epoch_qc,
50    },
51    quorum_proposal::{QuorumProposalTaskState, UpgradeLock, Versions},
52};
53
54/// Proposal dependency types. These types represent events that precipitate a proposal.
55#[derive(PartialEq, Debug)]
56pub(crate) enum ProposalDependency {
57    /// For the `SendPayloadCommitmentAndMetadata` event.
58    PayloadAndMetadata,
59
60    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event.
61    Qc,
62
63    /// For the `ViewSyncFinalizeCertificateRecv` event.
64    ViewSyncCert,
65
66    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event timeout branch.
67    TimeoutCert,
68
69    /// For the `QuorumProposalRecv` event.
70    Proposal,
71
72    /// For the `VidShareValidated` event.
73    VidShare,
74}
75
76/// Handler for the proposal dependency
77pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
78    /// Latest view number that has been proposed for (proxy for cur_view).
79    pub latest_proposed_view: TYPES::View,
80
81    /// The view number to propose for.
82    pub view_number: TYPES::View,
83
84    /// The event sender.
85    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87    /// The event receiver.
88    pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
89
90    /// Immutable instance state
91    pub instance_state: Arc<TYPES::InstanceState>,
92
93    /// Membership for Quorum Certs/votes
94    pub membership: EpochMembership<TYPES>,
95
96    /// Our public key
97    pub public_key: TYPES::SignatureKey,
98
99    /// Our Private Key
100    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
101
102    /// Shared consensus task state
103    pub consensus: OuterConsensus<TYPES>,
104
105    /// View timeout from config.
106    pub timeout: u64,
107
108    /// The most recent upgrade certificate this node formed.
109    /// Note: this is ONLY for certificates that have been formed internally,
110    /// so that we can propose with them.
111    ///
112    /// Certificates received from other nodes will get reattached regardless of this fields,
113    /// since they will be present in the leaf we propose off of.
114    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
115
116    /// Lock for a decided upgrade
117    pub upgrade_lock: UpgradeLock<TYPES, V>,
118
119    /// The node's id
120    pub id: u64,
121
122    /// The time this view started
123    pub view_start_time: Instant,
124
125    /// Number of blocks in an epoch, zero means there are no epochs
126    pub epoch_height: u64,
127
128    pub cancel_receiver: Receiver<()>,
129}
130
131impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
132    /// Return the next HighQc we get from the event stream
133    async fn wait_for_qc_event(
134        &self,
135        mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
136    ) -> Option<(
137        QuorumCertificate2<TYPES>,
138        Option<NextEpochQuorumCertificate2<TYPES>>,
139        Option<LightClientStateUpdateCertificateV2<TYPES>>,
140    )> {
141        while let Ok(event) = rx.recv_direct().await {
142            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
143                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
144                    (qc, maybe_next_epoch_qc, None)
145                },
146                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
147                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
148                },
149                _ => continue,
150            };
151            if validate_qc_and_next_epoch_qc(
152                qc,
153                maybe_next_epoch_qc.as_ref(),
154                &self.consensus,
155                &self.membership.coordinator,
156                &self.upgrade_lock,
157                self.epoch_height,
158            )
159            .await
160            .is_ok()
161            {
162                if qc
163                    .data
164                    .block_number
165                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
166                {
167                    // Validate the state cert
168                    if let Some(state_cert) = &maybe_state_cert {
169                        if validate_light_client_state_update_certificate(
170                            state_cert,
171                            &self.membership.coordinator,
172                            &self.upgrade_lock,
173                        )
174                        .await
175                        .is_err()
176                            || !check_qc_state_cert_correspondence(
177                                qc,
178                                state_cert,
179                                self.epoch_height,
180                            )
181                        {
182                            tracing::error!("Failed to validate state cert");
183                            return None;
184                        }
185                    } else {
186                        tracing::error!(
187                            "Received an epoch root QC but we don't have the corresponding state \
188                             cert."
189                        );
190                        return None;
191                    }
192                } else {
193                    maybe_state_cert = None;
194                }
195                return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
196            }
197        }
198        None
199    }
200
201    async fn wait_for_transition_qc(
202        &self,
203    ) -> Result<
204        Option<(
205            QuorumCertificate2<TYPES>,
206            NextEpochQuorumCertificate2<TYPES>,
207        )>,
208    > {
209        ensure!(
210            self.upgrade_lock.epochs_enabled(self.view_number).await,
211            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
212        );
213
214        let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
215
216        let wait_duration = Duration::from_millis(self.timeout / 2);
217
218        let mut rx = self.receiver.clone();
219
220        // drain any qc off the queue
221        // We don't watch for EpochRootQcRecv events here because it's not in transition.
222        while let Ok(event) = rx.try_recv() {
223            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
224                if let Some(block_number) = qc.data.block_number {
225                    if !is_transition_block(block_number, self.epoch_height) {
226                        continue;
227                    }
228                } else {
229                    continue;
230                }
231                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
232                    continue;
233                };
234                if validate_qc_and_next_epoch_qc(
235                    qc,
236                    Some(next_epoch_qc),
237                    &self.consensus,
238                    &self.membership.coordinator,
239                    &self.upgrade_lock,
240                    self.epoch_height,
241                )
242                .await
243                .is_ok()
244                    && transition_qc
245                        .as_ref()
246                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
247                {
248                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
249                }
250            }
251        }
252        // TODO configure timeout
253        while self.view_start_time.elapsed() < wait_duration {
254            let time_spent = Instant::now()
255                .checked_duration_since(self.view_start_time)
256                .ok_or(error!(
257                    "Time elapsed since the start of the task is negative. This should never \
258                     happen."
259                ))?;
260            let time_left = wait_duration
261                .checked_sub(time_spent)
262                .ok_or(info!("No time left"))?;
263            let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
264                return Ok(transition_qc);
265            };
266            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
267                if let Some(block_number) = qc.data.block_number {
268                    if !is_transition_block(block_number, self.epoch_height) {
269                        continue;
270                    }
271                } else {
272                    continue;
273                }
274                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
275                    continue;
276                };
277                if validate_qc_and_next_epoch_qc(
278                    qc,
279                    Some(next_epoch_qc),
280                    &self.consensus,
281                    &self.membership.coordinator,
282                    &self.upgrade_lock,
283                    self.epoch_height,
284                )
285                .await
286                .is_ok()
287                    && transition_qc
288                        .as_ref()
289                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
290                {
291                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
292                }
293            }
294        }
295        Ok(transition_qc)
296    }
297    /// Waits for the configured timeout for nodes to send HighQc messages to us.  We'll
298    /// then propose with the highest QC from among these proposals. A light client state
299    /// update certificate is also returned if the highest QC is an epoch root QC.
300    async fn wait_for_highest_qc(
301        &self,
302    ) -> Result<(
303        QuorumCertificate2<TYPES>,
304        Option<NextEpochQuorumCertificate2<TYPES>>,
305        Option<LightClientStateUpdateCertificateV2<TYPES>>,
306    )> {
307        tracing::debug!("waiting for QC");
308        // If we haven't upgraded to Hotstuff 2 just return the high qc right away
309        ensure!(
310            self.upgrade_lock.epochs_enabled(self.view_number).await,
311            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
312        );
313
314        let consensus_reader = self.consensus.read().await;
315        let mut highest_qc = consensus_reader.high_qc().clone();
316        let mut state_cert = if highest_qc
317            .data
318            .block_number
319            .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
320        {
321            consensus_reader.state_cert().cloned()
322        } else {
323            None
324        };
325        let mut next_epoch_qc = if highest_qc
326            .data
327            .block_number
328            .is_some_and(|bn| is_last_block(bn, self.epoch_height))
329        {
330            let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
331            if maybe_neqc
332                .as_ref()
333                .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
334            {
335                maybe_neqc
336            } else {
337                None
338            }
339        } else {
340            None
341        };
342        drop(consensus_reader);
343
344        let wait_duration = Duration::from_millis(self.timeout / 2);
345
346        let mut rx = self.receiver.clone();
347
348        // drain any qc off the queue
349        while let Ok(event) = rx.try_recv() {
350            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
351                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
352                    (qc, maybe_next_epoch_qc, None)
353                },
354                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
355                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
356                },
357                _ => continue,
358            };
359            if validate_qc_and_next_epoch_qc(
360                qc,
361                maybe_next_epoch_qc.as_ref(),
362                &self.consensus,
363                &self.membership.coordinator,
364                &self.upgrade_lock,
365                self.epoch_height,
366            )
367            .await
368            .is_ok()
369            {
370                if qc
371                    .data
372                    .block_number
373                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
374                {
375                    // Validate the state cert
376                    if let Some(state_cert) = &maybe_state_cert {
377                        if validate_light_client_state_update_certificate(
378                            state_cert,
379                            &self.membership.coordinator,
380                            &self.upgrade_lock,
381                        )
382                        .await
383                        .is_err()
384                            || !check_qc_state_cert_correspondence(
385                                qc,
386                                state_cert,
387                                self.epoch_height,
388                            )
389                        {
390                            tracing::error!("Failed to validate state cert");
391                            continue;
392                        }
393                    } else {
394                        tracing::error!(
395                            "Received an epoch root QC but we don't have the corresponding state \
396                             cert."
397                        );
398                        continue;
399                    }
400                } else {
401                    maybe_state_cert = None;
402                }
403                if qc.view_number() > highest_qc.view_number() {
404                    highest_qc = qc.clone();
405                    next_epoch_qc = maybe_next_epoch_qc.clone();
406                    state_cert = maybe_state_cert;
407                }
408            }
409        }
410
411        // TODO configure timeout
412        while self.view_start_time.elapsed() < wait_duration {
413            let time_spent = Instant::now()
414                .checked_duration_since(self.view_start_time)
415                .ok_or(error!(
416                    "Time elapsed since the start of the task is negative. This should never \
417                     happen."
418                ))?;
419            let time_left = wait_duration
420                .checked_sub(time_spent)
421                .ok_or(info!("No time left"))?;
422            let Ok(maybe_qc_state_cert) =
423                tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
424            else {
425                tracing::info!(
426                    "Some nodes did not respond with their HighQc in time. Continuing with the \
427                     highest QC that we received: {highest_qc:?}"
428                );
429                return Ok((highest_qc, next_epoch_qc, state_cert));
430            };
431            let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
432                continue;
433            };
434            if qc.view_number() > highest_qc.view_number() {
435                highest_qc = qc;
436                next_epoch_qc = maybe_next_epoch_qc;
437                state_cert = maybe_state_cert;
438            }
439        }
440        Ok((highest_qc, next_epoch_qc, state_cert))
441    }
442    /// Publishes a proposal given the [`CommitmentAndMetadata`], [`VidDisperse`]
443    /// and high qc [`hotshot_types::simple_certificate::QuorumCertificate`],
444    /// with optional [`ViewChangeEvidence`].
445    #[allow(clippy::too_many_arguments)]
446    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
447    async fn publish_proposal(
448        &self,
449        commitment_and_metadata: CommitmentAndMetadata<TYPES>,
450        _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
451        view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
452        formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
453        parent_qc: QuorumCertificate2<TYPES>,
454        maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
455        maybe_state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
456    ) -> Result<()> {
457        let (parent_leaf, state) = parent_leaf_and_state(
458            &self.sender,
459            &self.receiver,
460            self.membership.coordinator.clone(),
461            self.public_key.clone(),
462            self.private_key.clone(),
463            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
464            &self.upgrade_lock,
465            &parent_qc,
466            self.epoch_height,
467        )
468        .await?;
469
470        // In order of priority, we should try to attach:
471        //   - the parent certificate if it exists, or
472        //   - our own certificate that we formed.
473        // In either case, we need to ensure that the certificate is still relevant.
474        //
475        // Note: once we reach a point of potentially propose with our formed upgrade certificate,
476        // we will ALWAYS drop it. If we cannot immediately use it for whatever reason, we choose
477        // to discard it.
478        //
479        // It is possible that multiple nodes form separate upgrade certificates for the some
480        // upgrade if we are not careful about voting. But this shouldn't bother us: the first
481        // leader to propose is the one whose certificate will be used. And if that fails to reach
482        // a decide for whatever reason, we may lose our own certificate, but something will likely
483        // have gone wrong there anyway.
484        let mut upgrade_certificate = parent_leaf
485            .upgrade_certificate()
486            .or(formed_upgrade_certificate);
487
488        if let Some(cert) = upgrade_certificate.clone() {
489            if cert.is_relevant(self.view_number).await.is_err() {
490                upgrade_certificate = None;
491            }
492        }
493
494        let proposal_certificate = view_change_evidence
495            .as_ref()
496            .filter(|cert| cert.is_valid_for_view(&self.view_number))
497            .cloned();
498
499        ensure!(
500            commitment_and_metadata.block_view == self.view_number,
501            "Cannot propose because our VID payload commitment and metadata is for an older view."
502        );
503
504        let version = self.upgrade_lock.version(self.view_number).await?;
505
506        let builder_commitment = commitment_and_metadata.builder_commitment.clone();
507        let metadata = commitment_and_metadata.metadata.clone();
508
509        if version >= V::Epochs::VERSION
510            && parent_qc.view_number()
511                > self
512                    .upgrade_lock
513                    .upgrade_view()
514                    .await
515                    .unwrap_or(TYPES::View::new(0))
516        {
517            let Some(parent_block_number) = parent_qc.data.block_number else {
518                tracing::error!("Parent QC does not have a block number. Do not propose.");
519                return Ok(());
520            };
521            if is_epoch_transition(parent_block_number, self.epoch_height)
522                && !is_last_block(parent_block_number, self.epoch_height)
523            {
524                let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
525                tracing::info!("Reached end of epoch.");
526                ensure!(
527                    builder_commitment == empty_payload.builder_commitment(&metadata)
528                        && metadata == empty_metadata,
529                    "We're trying to propose non empty block in the epoch transition. Do not \
530                     propose. View number: {}. Parent Block number: {}",
531                    self.view_number,
532                    parent_block_number,
533                );
534            }
535            if is_epoch_root(parent_block_number, self.epoch_height) {
536                ensure!(
537                    maybe_state_cert.as_ref().is_some_and(|state_cert| {
538                        check_qc_state_cert_correspondence(
539                            &parent_qc,
540                            state_cert,
541                            self.epoch_height,
542                        )
543                    }),
544                    "We are proposing with parent epoch root QC but we don't have the \
545                     corresponding state cert."
546                );
547            }
548        }
549        let block_header = TYPES::BlockHeader::new(
550            state.as_ref(),
551            self.instance_state.as_ref(),
552            &parent_leaf,
553            commitment_and_metadata.commitment,
554            builder_commitment,
555            metadata,
556            commitment_and_metadata.fees.first().clone(),
557            version,
558            *self.view_number,
559        )
560        .await
561        .wrap()
562        .context(warn!("Failed to construct block header"))?;
563        let epoch = option_epoch_from_block_number::<TYPES>(
564            version >= V::Epochs::VERSION,
565            block_header.block_number(),
566            self.epoch_height,
567        );
568
569        let epoch_membership = self
570            .membership
571            .coordinator
572            .membership_for_epoch(epoch)
573            .await?;
574        // Make sure we are the leader for the view and epoch.
575        // We might have ended up here because we were in the epoch transition.
576        if epoch_membership.leader(self.view_number).await? != self.public_key {
577            tracing::warn!(
578                "We are not the leader in the epoch for which we are about to propose. Do not \
579                 send the quorum proposal."
580            );
581            return Ok(());
582        }
583        let is_high_qc_for_transition_block = parent_qc
584            .data
585            .block_number
586            .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
587        let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number).await
588            && is_high_qc_for_transition_block
589        {
590            ensure!(
591                maybe_next_epoch_qc
592                    .as_ref()
593                    .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
594                "Justify QC on our proposal is for an epoch transition block but we don't have \
595                 the corresponding next epoch QC. Do not propose."
596            );
597            maybe_next_epoch_qc
598        } else {
599            None
600        };
601        let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
602        {
603            if let Some(epoch_val) = &epoch {
604                let drb_result = epoch_membership
605                    .next_epoch()
606                    .await
607                    .context(warn!("No stake table for epoch {}", *epoch_val + 1))?
608                    .get_epoch_drb()
609                    .await
610                    .clone()
611                    .context(warn!("No DRB result for epoch {}", *epoch_val + 1))?;
612
613                Some(drb_result)
614            } else {
615                None
616            }
617        } else {
618            None
619        };
620
621        let proposal = QuorumProposalWrapper {
622            proposal: QuorumProposal2 {
623                block_header,
624                view_number: self.view_number,
625                epoch,
626                justify_qc: parent_qc,
627                next_epoch_justify_qc: next_epoch_qc,
628                upgrade_certificate,
629                view_change_evidence: proposal_certificate,
630                next_drb_result,
631                state_cert: maybe_state_cert,
632            },
633        };
634
635        let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
636        ensure!(
637            proposed_leaf.parent_commitment() == parent_leaf.commit(),
638            "Proposed leaf parent does not equal high qc"
639        );
640
641        let signature =
642            TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
643                .wrap()
644                .context(error!("Failed to compute proposed_leaf.commit()"))?;
645
646        let message = Proposal {
647            data: proposal,
648            signature,
649            _pd: PhantomData,
650        };
651        tracing::info!(
652            "Sending proposal for view {}, height {}, justify_qc view: {}",
653            proposed_leaf.view_number(),
654            proposed_leaf.height(),
655            proposed_leaf.justify_qc().view_number()
656        );
657
658        broadcast_event(
659            Arc::new(HotShotEvent::QuorumProposalSend(
660                message.clone(),
661                self.public_key.clone(),
662            )),
663            &self.sender,
664        )
665        .await;
666
667        Ok(())
668    }
669
670    fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
671        let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
672        tracing::warn!("Failed to propose, events: {:#?}", events);
673    }
674
675    async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
676        let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
677        let mut timeout_certificate = None;
678        let mut view_sync_finalize_cert = None;
679        let mut vid_share = None;
680        let mut parent_qc = None;
681        let mut next_epoch_qc = None;
682        let mut state_cert = None;
683        for event in res.iter().flatten().flatten() {
684            match event.as_ref() {
685                HotShotEvent::SendPayloadCommitmentAndMetadata(
686                    payload_commitment,
687                    builder_commitment,
688                    metadata,
689                    view,
690                    fees,
691                ) => {
692                    commit_and_metadata = Some(CommitmentAndMetadata {
693                        commitment: *payload_commitment,
694                        builder_commitment: builder_commitment.clone(),
695                        metadata: metadata.clone(),
696                        fees: fees.clone(),
697                        block_view: *view,
698                    });
699                },
700                HotShotEvent::Qc2Formed(cert) => match cert {
701                    either::Right(timeout) => {
702                        timeout_certificate = Some(timeout.clone());
703                    },
704                    either::Left(qc) => {
705                        parent_qc = Some(qc.clone());
706                    },
707                },
708                HotShotEvent::EpochRootQcFormed(root_qc) => {
709                    parent_qc = Some(root_qc.qc.clone());
710                    state_cert = Some(root_qc.state_cert.clone());
711                },
712                HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
713                    view_sync_finalize_cert = Some(cert.clone());
714                },
715                HotShotEvent::VidDisperseSend(share, _) => {
716                    vid_share = Some(share.clone());
717                },
718                HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
719                    next_epoch_qc = Some(qc.clone());
720                },
721                _ => {},
722            }
723        }
724
725        let Ok(version) = self.upgrade_lock.version(self.view_number).await else {
726            bail!(error!(
727                "Failed to get version for view {:?}, not proposing",
728                self.view_number
729            ));
730        };
731
732        let mut maybe_epoch = None;
733        let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
734            maybe_epoch = view_sync_cert.data.epoch;
735            Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
736        } else {
737            match timeout_certificate {
738                Some(timeout_cert) => {
739                    maybe_epoch = timeout_cert.data.epoch;
740                    Some(ViewChangeEvidence2::Timeout(timeout_cert))
741                },
742                None => None,
743            }
744        };
745
746        let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
747            if qc
748                .data
749                .block_number
750                .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
751                && next_epoch_qc
752                    .as_ref()
753                    .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
754            {
755                bail!(error!(
756                    "We've formed a transition QC but we haven't formed the corresponding next \
757                     epoch QC. Do not propose."
758                ));
759            }
760            (qc, next_epoch_qc, state_cert)
761        } else if version < V::Epochs::VERSION {
762            (self.consensus.read().await.high_qc().clone(), None, None)
763        } else if proposal_cert.is_some() {
764            // If we have a view change evidence, we need to wait to propose with the transition QC
765            if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
766                let Some(epoch) = maybe_epoch else {
767                    bail!(error!(
768                        "No epoch found on view change evidence, but we are in epoch mode"
769                    ));
770                };
771                if qc
772                    .data
773                    .block_number
774                    .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
775                {
776                    (qc, Some(next_epoch_qc), None)
777                } else {
778                    match self.wait_for_highest_qc().await {
779                        Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
780                            (qc, maybe_next_epoch_qc, maybe_state_cert)
781                        },
782                        Err(e) => {
783                            bail!(error!("Error while waiting for highest QC: {e:?}"));
784                        },
785                    }
786                }
787            } else {
788                let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
789                    self.wait_for_highest_qc().await
790                else {
791                    bail!(error!("Error while waiting for highest QC"));
792                };
793                if qc.data.block_number.is_some_and(|bn| {
794                    is_epoch_transition(bn, self.epoch_height)
795                        && !is_last_block(bn, self.epoch_height)
796                }) {
797                    bail!(error!(
798                        "High is in transition but we need to propose with transition QC, do \
799                         nothing"
800                    ));
801                }
802                (qc, maybe_next_epoch_qc, maybe_state_cert)
803            }
804        } else {
805            match self.wait_for_highest_qc().await {
806                Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
807                    (qc, maybe_next_epoch_qc, maybe_state_cert)
808                },
809                Err(e) => {
810                    bail!(error!("Error while waiting for highest QC: {e:?}"));
811                },
812            }
813        };
814
815        ensure!(
816            commit_and_metadata.is_some(),
817            error!(
818                "Somehow completed the proposal dependency task without a commitment and metadata"
819            )
820        );
821
822        ensure!(
823            vid_share.is_some(),
824            error!("Somehow completed the proposal dependency task without a VID share")
825        );
826
827        self.publish_proposal(
828            commit_and_metadata.unwrap(),
829            vid_share.unwrap(),
830            proposal_cert,
831            self.formed_upgrade_certificate.clone(),
832            parent_qc,
833            maybe_next_epoch_qc,
834            maybe_state_cert,
835        )
836        .await
837        .inspect_err(|e| tracing::error!("Failed to publish proposal: {e:?}"))
838    }
839}
840
841impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<TYPES, V> {
842    type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
843
844    #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
845    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
846    async fn handle_dep_result(self, res: Self::Output) {
847        let mut cancel_receiver = self.cancel_receiver.clone();
848        let result = tokio::select! {
849            result = self.handle_proposal_deps(&res) => {
850                result
851            }
852            _ = cancel_receiver.recv() => {
853                tracing::warn!("Proposal dependency task cancelled");
854                return;
855            }
856        };
857        if result.is_err() {
858            log!(result);
859            self.print_proposal_events(&res)
860        }
861    }
862}
863
864pub(super) async fn handle_eqc_formed<
865    TYPES: NodeType,
866    I: NodeImplementation<TYPES>,
867    V: Versions,
868>(
869    cert_view: TYPES::View,
870    leaf_commit: Commitment<Leaf2<TYPES>>,
871    block_number: Option<u64>,
872    task_state: &mut QuorumProposalTaskState<TYPES, I, V>,
873    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
874) {
875    if !task_state.upgrade_lock.epochs_enabled(cert_view).await {
876        tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
877        return;
878    }
879    if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
880        tracing::debug!("We formed QC but not eQC. Do nothing");
881        return;
882    }
883
884    let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
885        tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
886        return;
887    };
888    if current_epoch_qc.view_number() != cert_view
889        || current_epoch_qc.data.leaf_commit != leaf_commit
890    {
891        tracing::debug!("We haven't yet formed the eQC. Do nothing");
892        return;
893    }
894    let Some(next_epoch_qc) = task_state
895        .formed_next_epoch_quorum_certificates
896        .get(&cert_view)
897    else {
898        tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
899        return;
900    };
901    if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
902        tracing::debug!(
903            "We formed the eQC but the current and next epoch QCs do not correspond to each other."
904        );
905        return;
906    }
907    let current_epoch_qc_clone = current_epoch_qc.clone();
908
909    let mut consensus_writer = task_state.consensus.write().await;
910    let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
911    let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
912    drop(consensus_writer);
913
914    if let Err(e) = task_state
915        .storage
916        .update_eqc(current_epoch_qc.clone(), next_epoch_qc.clone())
917        .await
918    {
919        tracing::error!("Failed to store EQC: {}", e);
920    }
921
922    task_state.formed_quorum_certificates =
923        task_state.formed_quorum_certificates.split_off(&cert_view);
924    task_state.formed_next_epoch_quorum_certificates = task_state
925        .formed_next_epoch_quorum_certificates
926        .split_off(&cert_view);
927
928    broadcast_event(
929        Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
930        event_sender,
931    )
932    .await;
933}