hotshot_task_impls/quorum_proposal/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! This module holds the dependency task for the QuorumProposalTask. It is spawned whenever an event that could
8//! initiate a proposal occurs.
9
10use std::{
11    marker::PhantomData,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_task::dependency_task::HandleDepOutput;
19use hotshot_types::{
20    consensus::{CommitmentAndMetadata, OuterConsensus},
21    data::{
22        Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2, ViewNumber,
23    },
24    epoch_membership::EpochMembership,
25    message::Proposal,
26    simple_certificate::{
27        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
28        UpgradeCertificate,
29    },
30    traits::{
31        BlockPayload,
32        block_contents::BlockHeader,
33        node_implementation::{NodeImplementation, NodeType},
34        signature_key::SignatureKey,
35        storage::Storage,
36    },
37    utils::{
38        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
39        is_transition_block, option_epoch_from_block_number,
40    },
41    vote::HasViewNumber,
42};
43use hotshot_utils::anytrace::*;
44use tracing::instrument;
45use versions::EPOCH_VERSION;
46
47use crate::{
48    events::HotShotEvent,
49    helpers::{
50        broadcast_event, check_qc_state_cert_correspondence, parent_leaf_and_state,
51        validate_light_client_state_update_certificate, validate_qc_and_next_epoch_qc,
52    },
53    quorum_proposal::{QuorumProposalTaskState, UpgradeLock},
54};
55
56/// Proposal dependency types. These types represent events that precipitate a proposal.
57#[derive(PartialEq, Debug)]
58pub(crate) enum ProposalDependency {
59    /// For the `SendPayloadCommitmentAndMetadata` event.
60    PayloadAndMetadata,
61
62    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event.
63    Qc,
64
65    /// For the `ViewSyncFinalizeCertificateRecv` event.
66    ViewSyncCert,
67
68    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event timeout branch.
69    TimeoutCert,
70
71    /// For the `QuorumProposalRecv` event.
72    Proposal,
73
74    /// For the `VidShareValidated` event.
75    VidShare,
76}
77
78/// Handler for the proposal dependency
79pub struct ProposalDependencyHandle<TYPES: NodeType> {
80    /// Latest view number that has been proposed for (proxy for cur_view).
81    pub latest_proposed_view: ViewNumber,
82
83    /// The view number to propose for.
84    pub view_number: ViewNumber,
85
86    /// The event sender.
87    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
88
89    /// The event receiver.
90    pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
91
92    /// Immutable instance state
93    pub instance_state: Arc<TYPES::InstanceState>,
94
95    /// Membership for Quorum Certs/votes
96    pub membership: EpochMembership<TYPES>,
97
98    /// Our public key
99    pub public_key: TYPES::SignatureKey,
100
101    /// Our Private Key
102    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
103
104    /// Shared consensus task state
105    pub consensus: OuterConsensus<TYPES>,
106
107    /// View timeout from config.
108    pub timeout: u64,
109
110    /// The most recent upgrade certificate this node formed.
111    /// Note: this is ONLY for certificates that have been formed internally,
112    /// so that we can propose with them.
113    ///
114    /// Certificates received from other nodes will get reattached regardless of this fields,
115    /// since they will be present in the leaf we propose off of.
116    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
117
118    /// Lock for a decided upgrade
119    pub upgrade_lock: UpgradeLock<TYPES>,
120
121    /// The node's id
122    pub id: u64,
123
124    /// The time this view started
125    pub view_start_time: Instant,
126
127    /// Number of blocks in an epoch, zero means there are no epochs
128    pub epoch_height: u64,
129
130    pub cancel_receiver: Receiver<()>,
131}
132
133impl<TYPES: NodeType> ProposalDependencyHandle<TYPES> {
134    /// Return the next HighQc we get from the event stream
135    async fn wait_for_qc_event(
136        &self,
137        mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
138    ) -> Option<(
139        QuorumCertificate2<TYPES>,
140        Option<NextEpochQuorumCertificate2<TYPES>>,
141        Option<LightClientStateUpdateCertificateV2<TYPES>>,
142    )> {
143        while let Ok(event) = rx.recv_direct().await {
144            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
145                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
146                    (qc, maybe_next_epoch_qc, None)
147                },
148                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
149                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
150                },
151                _ => continue,
152            };
153            if validate_qc_and_next_epoch_qc(
154                qc,
155                maybe_next_epoch_qc.as_ref(),
156                &self.consensus,
157                &self.membership.coordinator,
158                &self.upgrade_lock,
159                self.epoch_height,
160            )
161            .await
162            .is_ok()
163            {
164                if qc
165                    .data
166                    .block_number
167                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
168                {
169                    // Validate the state cert
170                    if let Some(state_cert) = &maybe_state_cert {
171                        if validate_light_client_state_update_certificate(
172                            state_cert,
173                            &self.membership.coordinator,
174                            &self.upgrade_lock,
175                        )
176                        .await
177                        .is_err()
178                            || !check_qc_state_cert_correspondence(
179                                qc,
180                                state_cert,
181                                self.epoch_height,
182                            )
183                        {
184                            tracing::error!("Failed to validate state cert");
185                            return None;
186                        }
187                    } else {
188                        tracing::error!(
189                            "Received an epoch root QC but we don't have the corresponding state \
190                             cert."
191                        );
192                        return None;
193                    }
194                } else {
195                    maybe_state_cert = None;
196                }
197                return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
198            }
199        }
200        None
201    }
202
203    async fn wait_for_transition_qc(
204        &self,
205    ) -> Result<
206        Option<(
207            QuorumCertificate2<TYPES>,
208            NextEpochQuorumCertificate2<TYPES>,
209        )>,
210    > {
211        ensure!(
212            self.upgrade_lock.epochs_enabled(self.view_number),
213            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
214        );
215
216        let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
217
218        let wait_duration = Duration::from_millis(self.timeout / 2);
219
220        let mut rx = self.receiver.clone();
221
222        // drain any qc off the queue
223        // We don't watch for EpochRootQcRecv events here because it's not in transition.
224        while let Ok(event) = rx.try_recv() {
225            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
226                if let Some(block_number) = qc.data.block_number {
227                    if !is_transition_block(block_number, self.epoch_height) {
228                        continue;
229                    }
230                } else {
231                    continue;
232                }
233                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
234                    continue;
235                };
236                if validate_qc_and_next_epoch_qc(
237                    qc,
238                    Some(next_epoch_qc),
239                    &self.consensus,
240                    &self.membership.coordinator,
241                    &self.upgrade_lock,
242                    self.epoch_height,
243                )
244                .await
245                .is_ok()
246                    && transition_qc
247                        .as_ref()
248                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
249                {
250                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
251                }
252            }
253        }
254        // TODO configure timeout
255        while self.view_start_time.elapsed() < wait_duration {
256            let time_spent = Instant::now()
257                .checked_duration_since(self.view_start_time)
258                .ok_or(error!(
259                    "Time elapsed since the start of the task is negative. This should never \
260                     happen."
261                ))?;
262            let time_left = wait_duration
263                .checked_sub(time_spent)
264                .ok_or(info!("No time left"))?;
265            let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
266                return Ok(transition_qc);
267            };
268            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
269                if let Some(block_number) = qc.data.block_number {
270                    if !is_transition_block(block_number, self.epoch_height) {
271                        continue;
272                    }
273                } else {
274                    continue;
275                }
276                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
277                    continue;
278                };
279                if validate_qc_and_next_epoch_qc(
280                    qc,
281                    Some(next_epoch_qc),
282                    &self.consensus,
283                    &self.membership.coordinator,
284                    &self.upgrade_lock,
285                    self.epoch_height,
286                )
287                .await
288                .is_ok()
289                    && transition_qc
290                        .as_ref()
291                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
292                {
293                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
294                }
295            }
296        }
297        Ok(transition_qc)
298    }
299    /// Waits for the configured timeout for nodes to send HighQc messages to us.  We'll
300    /// then propose with the highest QC from among these proposals. A light client state
301    /// update certificate is also returned if the highest QC is an epoch root QC.
302    async fn wait_for_highest_qc(
303        &self,
304    ) -> Result<(
305        QuorumCertificate2<TYPES>,
306        Option<NextEpochQuorumCertificate2<TYPES>>,
307        Option<LightClientStateUpdateCertificateV2<TYPES>>,
308    )> {
309        tracing::debug!("waiting for QC");
310        // If we haven't upgraded to Hotstuff 2 just return the high qc right away
311        ensure!(
312            self.upgrade_lock.epochs_enabled(self.view_number),
313            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
314        );
315
316        let consensus_reader = self.consensus.read().await;
317        let mut highest_qc = consensus_reader.high_qc().clone();
318        let mut state_cert = if highest_qc
319            .data
320            .block_number
321            .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
322        {
323            consensus_reader.state_cert().cloned()
324        } else {
325            None
326        };
327        let mut next_epoch_qc = if highest_qc
328            .data
329            .block_number
330            .is_some_and(|bn| is_last_block(bn, self.epoch_height))
331        {
332            let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
333            if maybe_neqc
334                .as_ref()
335                .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
336            {
337                maybe_neqc
338            } else {
339                None
340            }
341        } else {
342            None
343        };
344        drop(consensus_reader);
345
346        let wait_duration = Duration::from_millis(self.timeout / 2);
347
348        let mut rx = self.receiver.clone();
349
350        // drain any qc off the queue
351        while let Ok(event) = rx.try_recv() {
352            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
353                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
354                    (qc, maybe_next_epoch_qc, None)
355                },
356                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
357                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
358                },
359                _ => continue,
360            };
361            if validate_qc_and_next_epoch_qc(
362                qc,
363                maybe_next_epoch_qc.as_ref(),
364                &self.consensus,
365                &self.membership.coordinator,
366                &self.upgrade_lock,
367                self.epoch_height,
368            )
369            .await
370            .is_ok()
371            {
372                if qc
373                    .data
374                    .block_number
375                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
376                {
377                    // Validate the state cert
378                    if let Some(state_cert) = &maybe_state_cert {
379                        if validate_light_client_state_update_certificate(
380                            state_cert,
381                            &self.membership.coordinator,
382                            &self.upgrade_lock,
383                        )
384                        .await
385                        .is_err()
386                            || !check_qc_state_cert_correspondence(
387                                qc,
388                                state_cert,
389                                self.epoch_height,
390                            )
391                        {
392                            tracing::error!("Failed to validate state cert");
393                            continue;
394                        }
395                    } else {
396                        tracing::error!(
397                            "Received an epoch root QC but we don't have the corresponding state \
398                             cert."
399                        );
400                        continue;
401                    }
402                } else {
403                    maybe_state_cert = None;
404                }
405                if qc.view_number() > highest_qc.view_number() {
406                    highest_qc = qc.clone();
407                    next_epoch_qc = maybe_next_epoch_qc.clone();
408                    state_cert = maybe_state_cert;
409                }
410            }
411        }
412
413        // TODO configure timeout
414        while self.view_start_time.elapsed() < wait_duration {
415            let time_spent = Instant::now()
416                .checked_duration_since(self.view_start_time)
417                .ok_or(error!(
418                    "Time elapsed since the start of the task is negative. This should never \
419                     happen."
420                ))?;
421            let time_left = wait_duration
422                .checked_sub(time_spent)
423                .ok_or(info!("No time left"))?;
424            let Ok(maybe_qc_state_cert) =
425                tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
426            else {
427                tracing::info!(
428                    "Some nodes did not respond with their HighQc in time. Continuing with the \
429                     highest QC that we received: {highest_qc:?}"
430                );
431                return Ok((highest_qc, next_epoch_qc, state_cert));
432            };
433            let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
434                continue;
435            };
436            if qc.view_number() > highest_qc.view_number() {
437                highest_qc = qc;
438                next_epoch_qc = maybe_next_epoch_qc;
439                state_cert = maybe_state_cert;
440            }
441        }
442        Ok((highest_qc, next_epoch_qc, state_cert))
443    }
444    /// Publishes a proposal given the [`CommitmentAndMetadata`], [`VidDisperse`]
445    /// and high qc [`hotshot_types::simple_certificate::QuorumCertificate`],
446    /// with optional [`ViewChangeEvidence`].
447    #[allow(clippy::too_many_arguments)]
448    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
449    async fn publish_proposal(
450        &self,
451        commitment_and_metadata: CommitmentAndMetadata<TYPES>,
452        _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
453        view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
454        formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
455        parent_qc: QuorumCertificate2<TYPES>,
456        maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
457        maybe_state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
458    ) -> Result<()> {
459        let (parent_leaf, state) = parent_leaf_and_state(
460            &self.sender,
461            &self.receiver,
462            self.membership.coordinator.clone(),
463            self.public_key.clone(),
464            self.private_key.clone(),
465            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
466            &self.upgrade_lock,
467            &parent_qc,
468            self.epoch_height,
469        )
470        .await?;
471
472        // In order of priority, we should try to attach:
473        //   - the parent certificate if it exists, or
474        //   - our own certificate that we formed.
475        // In either case, we need to ensure that the certificate is still relevant.
476        //
477        // Note: once we reach a point of potentially propose with our formed upgrade certificate,
478        // we will ALWAYS drop it. If we cannot immediately use it for whatever reason, we choose
479        // to discard it.
480        //
481        // It is possible that multiple nodes form separate upgrade certificates for the some
482        // upgrade if we are not careful about voting. But this shouldn't bother us: the first
483        // leader to propose is the one whose certificate will be used. And if that fails to reach
484        // a decide for whatever reason, we may lose our own certificate, but something will likely
485        // have gone wrong there anyway.
486        let mut upgrade_certificate = parent_leaf
487            .upgrade_certificate()
488            .or(formed_upgrade_certificate);
489
490        if let Some(cert) = upgrade_certificate.clone()
491            && cert.is_relevant(self.view_number).await.is_err()
492        {
493            upgrade_certificate = None;
494        }
495
496        let proposal_certificate = view_change_evidence
497            .as_ref()
498            .filter(|cert| cert.is_valid_for_view(&self.view_number))
499            .cloned();
500
501        ensure!(
502            commitment_and_metadata.block_view == self.view_number,
503            "Cannot propose because our VID payload commitment and metadata is for an older view."
504        );
505
506        let version = self.upgrade_lock.version(self.view_number)?;
507
508        let builder_commitment = commitment_and_metadata.builder_commitment.clone();
509        let metadata = commitment_and_metadata.metadata.clone();
510
511        if version >= EPOCH_VERSION
512            && parent_qc.view_number()
513                > self
514                    .upgrade_lock
515                    .upgrade_view()
516                    .unwrap_or(ViewNumber::new(0))
517        {
518            let Some(parent_block_number) = parent_qc.data.block_number else {
519                tracing::error!("Parent QC does not have a block number. Do not propose.");
520                return Ok(());
521            };
522            if is_epoch_transition(parent_block_number, self.epoch_height)
523                && !is_last_block(parent_block_number, self.epoch_height)
524            {
525                let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
526                tracing::info!("Reached end of epoch.");
527                ensure!(
528                    builder_commitment == empty_payload.builder_commitment(&metadata)
529                        && metadata == empty_metadata,
530                    "We're trying to propose non empty block in the epoch transition. Do not \
531                     propose. View number: {}. Parent Block number: {}",
532                    self.view_number,
533                    parent_block_number,
534                );
535            }
536            if is_epoch_root(parent_block_number, self.epoch_height) {
537                ensure!(
538                    maybe_state_cert.as_ref().is_some_and(|state_cert| {
539                        check_qc_state_cert_correspondence(
540                            &parent_qc,
541                            state_cert,
542                            self.epoch_height,
543                        )
544                    }),
545                    "We are proposing with parent epoch root QC but we don't have the \
546                     corresponding state cert."
547                );
548            }
549        }
550        let block_header = TYPES::BlockHeader::new(
551            state.as_ref(),
552            self.instance_state.as_ref(),
553            &parent_leaf,
554            commitment_and_metadata.commitment,
555            builder_commitment,
556            metadata,
557            commitment_and_metadata.fees.first().clone(),
558            version,
559            *self.view_number,
560        )
561        .await
562        .wrap()
563        .context(warn!("Failed to construct block header"))?;
564        let epoch = option_epoch_from_block_number(
565            version >= EPOCH_VERSION,
566            block_header.block_number(),
567            self.epoch_height,
568        );
569
570        let epoch_membership = self
571            .membership
572            .coordinator
573            .membership_for_epoch(epoch)
574            .await?;
575        // Make sure we are the leader for the view and epoch.
576        // We might have ended up here because we were in the epoch transition.
577        if epoch_membership.leader(self.view_number).await? != self.public_key {
578            tracing::warn!(
579                "We are not the leader in the epoch for which we are about to propose. Do not \
580                 send the quorum proposal."
581            );
582            return Ok(());
583        }
584        let is_high_qc_for_transition_block = parent_qc
585            .data
586            .block_number
587            .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
588        let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number)
589            && is_high_qc_for_transition_block
590        {
591            ensure!(
592                maybe_next_epoch_qc
593                    .as_ref()
594                    .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
595                "Justify QC on our proposal is for an epoch transition block but we don't have \
596                 the corresponding next epoch QC. Do not propose."
597            );
598            maybe_next_epoch_qc
599        } else {
600            None
601        };
602        let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
603        {
604            if let Some(epoch_val) = &epoch {
605                let drb_result = epoch_membership
606                    .next_epoch()
607                    .await
608                    .context(warn!("No stake table for epoch {}", *epoch_val + 1))?
609                    .get_epoch_drb()
610                    .await
611                    .clone()
612                    .context(warn!("No DRB result for epoch {}", *epoch_val + 1))?;
613
614                Some(drb_result)
615            } else {
616                None
617            }
618        } else {
619            None
620        };
621
622        let proposal = QuorumProposalWrapper {
623            proposal: QuorumProposal2 {
624                block_header,
625                view_number: self.view_number,
626                epoch,
627                justify_qc: parent_qc,
628                next_epoch_justify_qc: next_epoch_qc,
629                upgrade_certificate,
630                view_change_evidence: proposal_certificate,
631                next_drb_result,
632                state_cert: maybe_state_cert,
633            },
634        };
635
636        let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
637        ensure!(
638            proposed_leaf.parent_commitment() == parent_leaf.commit(),
639            "Proposed leaf parent does not equal high qc"
640        );
641
642        let signature =
643            TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
644                .wrap()
645                .context(error!("Failed to compute proposed_leaf.commit()"))?;
646
647        let message = Proposal {
648            data: proposal,
649            signature,
650            _pd: PhantomData,
651        };
652        tracing::info!(
653            "Sending proposal for view {}, height {}, justify_qc view: {}",
654            proposed_leaf.view_number(),
655            proposed_leaf.height(),
656            proposed_leaf.justify_qc().view_number()
657        );
658
659        broadcast_event(
660            Arc::new(HotShotEvent::QuorumProposalSend(
661                message.clone(),
662                self.public_key.clone(),
663            )),
664            &self.sender,
665        )
666        .await;
667
668        Ok(())
669    }
670
671    fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
672        let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
673        tracing::warn!("Failed to propose, events: {:#?}", events);
674    }
675
676    async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
677        let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
678        let mut timeout_certificate = None;
679        let mut view_sync_finalize_cert = None;
680        let mut vid_share = None;
681        let mut parent_qc = None;
682        let mut next_epoch_qc = None;
683        let mut state_cert = None;
684        for event in res.iter().flatten().flatten() {
685            match event.as_ref() {
686                HotShotEvent::SendPayloadCommitmentAndMetadata(
687                    payload_commitment,
688                    builder_commitment,
689                    metadata,
690                    view,
691                    fees,
692                ) => {
693                    commit_and_metadata = Some(CommitmentAndMetadata {
694                        commitment: *payload_commitment,
695                        builder_commitment: builder_commitment.clone(),
696                        metadata: metadata.clone(),
697                        fees: fees.clone(),
698                        block_view: *view,
699                    });
700                },
701                HotShotEvent::Qc2Formed(cert) => match cert {
702                    either::Right(timeout) => {
703                        timeout_certificate = Some(timeout.clone());
704                    },
705                    either::Left(qc) => {
706                        parent_qc = Some(qc.clone());
707                    },
708                },
709                HotShotEvent::EpochRootQcFormed(root_qc) => {
710                    parent_qc = Some(root_qc.qc.clone());
711                    state_cert = Some(root_qc.state_cert.clone());
712                },
713                HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
714                    view_sync_finalize_cert = Some(cert.clone());
715                },
716                HotShotEvent::VidDisperseSend(share, _) => {
717                    vid_share = Some(share.clone());
718                },
719                HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
720                    next_epoch_qc = Some(qc.clone());
721                },
722                _ => {},
723            }
724        }
725
726        let Ok(version) = self.upgrade_lock.version(self.view_number) else {
727            bail!(error!(
728                "Failed to get version for view {:?}, not proposing",
729                self.view_number
730            ));
731        };
732
733        let mut maybe_epoch = None;
734        let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
735            maybe_epoch = view_sync_cert.data.epoch;
736            Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
737        } else {
738            match timeout_certificate {
739                Some(timeout_cert) => {
740                    maybe_epoch = timeout_cert.data.epoch;
741                    Some(ViewChangeEvidence2::Timeout(timeout_cert))
742                },
743                None => None,
744            }
745        };
746
747        let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
748            if qc
749                .data
750                .block_number
751                .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
752                && next_epoch_qc
753                    .as_ref()
754                    .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
755            {
756                bail!(error!(
757                    "We've formed a transition QC but we haven't formed the corresponding next \
758                     epoch QC. Do not propose."
759                ));
760            }
761            (qc, next_epoch_qc, state_cert)
762        } else if version < EPOCH_VERSION {
763            (self.consensus.read().await.high_qc().clone(), None, None)
764        } else if proposal_cert.is_some() {
765            // If we have a view change evidence, we need to wait to propose with the transition QC
766            if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
767                let Some(epoch) = maybe_epoch else {
768                    bail!(error!(
769                        "No epoch found on view change evidence, but we are in epoch mode"
770                    ));
771                };
772                if qc
773                    .data
774                    .block_number
775                    .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
776                {
777                    (qc, Some(next_epoch_qc), None)
778                } else {
779                    match self.wait_for_highest_qc().await {
780                        Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
781                            (qc, maybe_next_epoch_qc, maybe_state_cert)
782                        },
783                        Err(e) => {
784                            bail!(error!("Error while waiting for highest QC: {e:?}"));
785                        },
786                    }
787                }
788            } else {
789                let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
790                    self.wait_for_highest_qc().await
791                else {
792                    bail!(error!("Error while waiting for highest QC"));
793                };
794                if qc.data.block_number.is_some_and(|bn| {
795                    is_epoch_transition(bn, self.epoch_height)
796                        && !is_last_block(bn, self.epoch_height)
797                }) {
798                    bail!(error!(
799                        "High is in transition but we need to propose with transition QC, do \
800                         nothing"
801                    ));
802                }
803                (qc, maybe_next_epoch_qc, maybe_state_cert)
804            }
805        } else {
806            match self.wait_for_highest_qc().await {
807                Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
808                    (qc, maybe_next_epoch_qc, maybe_state_cert)
809                },
810                Err(e) => {
811                    bail!(error!("Error while waiting for highest QC: {e:?}"));
812                },
813            }
814        };
815
816        ensure!(
817            commit_and_metadata.is_some(),
818            error!(
819                "Somehow completed the proposal dependency task without a commitment and metadata"
820            )
821        );
822
823        ensure!(
824            vid_share.is_some(),
825            error!("Somehow completed the proposal dependency task without a VID share")
826        );
827
828        self.publish_proposal(
829            commit_and_metadata.unwrap(),
830            vid_share.unwrap(),
831            proposal_cert,
832            self.formed_upgrade_certificate.clone(),
833            parent_qc,
834            maybe_next_epoch_qc,
835            maybe_state_cert,
836        )
837        .await
838        .inspect_err(|e| tracing::error!("Failed to publish proposal: {e:?}"))
839    }
840}
841
842impl<TYPES: NodeType> HandleDepOutput for ProposalDependencyHandle<TYPES> {
843    type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
844
845    #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
846    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
847    async fn handle_dep_result(self, res: Self::Output) {
848        let mut cancel_receiver = self.cancel_receiver.clone();
849        let result = tokio::select! {
850            result = self.handle_proposal_deps(&res) => {
851                result
852            }
853            _ = cancel_receiver.recv() => {
854                tracing::warn!("Proposal dependency task cancelled");
855                return;
856            }
857        };
858        if result.is_err() {
859            log!(result);
860            self.print_proposal_events(&res)
861        }
862    }
863}
864
865pub(super) async fn handle_eqc_formed<TYPES: NodeType, I: NodeImplementation<TYPES>>(
866    cert_view: ViewNumber,
867    leaf_commit: Commitment<Leaf2<TYPES>>,
868    block_number: Option<u64>,
869    task_state: &mut QuorumProposalTaskState<TYPES, I>,
870    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
871) {
872    if !task_state.upgrade_lock.epochs_enabled(cert_view) {
873        tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
874        return;
875    }
876    if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
877        tracing::debug!("We formed QC but not eQC. Do nothing");
878        return;
879    }
880
881    let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
882        tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
883        return;
884    };
885    if current_epoch_qc.view_number() != cert_view
886        || current_epoch_qc.data.leaf_commit != leaf_commit
887    {
888        tracing::debug!("We haven't yet formed the eQC. Do nothing");
889        return;
890    }
891    let Some(next_epoch_qc) = task_state
892        .formed_next_epoch_quorum_certificates
893        .get(&cert_view)
894    else {
895        tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
896        return;
897    };
898    if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
899        tracing::debug!(
900            "We formed the eQC but the current and next epoch QCs do not correspond to each other."
901        );
902        return;
903    }
904    let current_epoch_qc_clone = current_epoch_qc.clone();
905
906    let mut consensus_writer = task_state.consensus.write().await;
907    let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
908    let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
909    drop(consensus_writer);
910
911    if let Err(e) = task_state
912        .storage
913        .update_eqc(current_epoch_qc.clone(), next_epoch_qc.clone())
914        .await
915    {
916        tracing::error!("Failed to store EQC: {}", e);
917    }
918
919    task_state.formed_quorum_certificates =
920        task_state.formed_quorum_certificates.split_off(&cert_view);
921    task_state.formed_next_epoch_quorum_certificates = task_state
922        .formed_next_epoch_quorum_certificates
923        .split_off(&cert_view);
924
925    broadcast_event(
926        Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
927        event_sender,
928    )
929    .await;
930}