hotshot_task_impls/quorum_proposal/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! This module holds the dependency task for the QuorumProposalTask. It is spawned whenever an event that could
8//! initiate a proposal occurs.
9
10use std::{
11    marker::PhantomData,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_task::dependency_task::HandleDepOutput;
19use hotshot_types::{
20    consensus::{CommitmentAndMetadata, OuterConsensus},
21    data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
22    epoch_membership::EpochMembership,
23    message::Proposal,
24    simple_certificate::{
25        LightClientStateUpdateCertificate, NextEpochQuorumCertificate2, QuorumCertificate2,
26        UpgradeCertificate,
27    },
28    traits::{
29        block_contents::BlockHeader,
30        node_implementation::{ConsensusTime, NodeImplementation, NodeType},
31        signature_key::SignatureKey,
32        BlockPayload,
33    },
34    utils::{
35        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
36        is_transition_block, option_epoch_from_block_number,
37    },
38    vote::HasViewNumber,
39};
40use hotshot_utils::anytrace::*;
41use tracing::instrument;
42use vbs::version::StaticVersionType;
43
44use crate::{
45    events::HotShotEvent,
46    helpers::{
47        broadcast_event, check_qc_state_cert_correspondence, parent_leaf_and_state,
48        validate_light_client_state_update_certificate, validate_qc_and_next_epoch_qc,
49    },
50    quorum_proposal::{QuorumProposalTaskState, UpgradeLock, Versions},
51};
52
53/// Proposal dependency types. These types represent events that precipitate a proposal.
54#[derive(PartialEq, Debug)]
55pub(crate) enum ProposalDependency {
56    /// For the `SendPayloadCommitmentAndMetadata` event.
57    PayloadAndMetadata,
58
59    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event.
60    Qc,
61
62    /// For the `ViewSyncFinalizeCertificateRecv` event.
63    ViewSyncCert,
64
65    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event timeout branch.
66    TimeoutCert,
67
68    /// For the `QuorumProposalRecv` event.
69    Proposal,
70
71    /// For the `VidShareValidated` event.
72    VidShare,
73}
74
75/// Handler for the proposal dependency
76pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
77    /// Latest view number that has been proposed for (proxy for cur_view).
78    pub latest_proposed_view: TYPES::View,
79
80    /// The view number to propose for.
81    pub view_number: TYPES::View,
82
83    /// The event sender.
84    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
85
86    /// The event receiver.
87    pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
88
89    /// Immutable instance state
90    pub instance_state: Arc<TYPES::InstanceState>,
91
92    /// Membership for Quorum Certs/votes
93    pub membership: EpochMembership<TYPES>,
94
95    /// Our public key
96    pub public_key: TYPES::SignatureKey,
97
98    /// Our Private Key
99    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
100
101    /// Shared consensus task state
102    pub consensus: OuterConsensus<TYPES>,
103
104    /// View timeout from config.
105    pub timeout: u64,
106
107    /// The most recent upgrade certificate this node formed.
108    /// Note: this is ONLY for certificates that have been formed internally,
109    /// so that we can propose with them.
110    ///
111    /// Certificates received from other nodes will get reattached regardless of this fields,
112    /// since they will be present in the leaf we propose off of.
113    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
114
115    /// Lock for a decided upgrade
116    pub upgrade_lock: UpgradeLock<TYPES, V>,
117
118    /// The node's id
119    pub id: u64,
120
121    /// The time this view started
122    pub view_start_time: Instant,
123
124    /// Number of blocks in an epoch, zero means there are no epochs
125    pub epoch_height: u64,
126}
127
128impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
129    /// Return the next HighQc we get from the event stream
130    async fn wait_for_qc_event(
131        &self,
132        mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
133    ) -> Option<(
134        QuorumCertificate2<TYPES>,
135        Option<NextEpochQuorumCertificate2<TYPES>>,
136        Option<LightClientStateUpdateCertificate<TYPES>>,
137    )> {
138        while let Ok(event) = rx.recv_direct().await {
139            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
140                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
141                    (qc, maybe_next_epoch_qc, None)
142                },
143                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
144                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
145                },
146                _ => continue,
147            };
148            if validate_qc_and_next_epoch_qc(
149                qc,
150                maybe_next_epoch_qc.as_ref(),
151                &self.consensus,
152                &self.membership.coordinator,
153                &self.upgrade_lock,
154                self.epoch_height,
155            )
156            .await
157            .is_ok()
158            {
159                if qc
160                    .data
161                    .block_number
162                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
163                {
164                    // Validate the state cert
165                    if let Some(state_cert) = &maybe_state_cert {
166                        if validate_light_client_state_update_certificate(
167                            state_cert,
168                            &self.membership.coordinator,
169                        )
170                        .await
171                        .is_err()
172                            || !check_qc_state_cert_correspondence(
173                                qc,
174                                state_cert,
175                                self.epoch_height,
176                            )
177                        {
178                            tracing::error!("Failed to validate state cert");
179                            return None;
180                        }
181                    } else {
182                        tracing::error!(
183                            "Received an epoch root QC but we don't have the corresponding state \
184                             cert."
185                        );
186                        return None;
187                    }
188                } else {
189                    maybe_state_cert = None;
190                }
191                return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
192            }
193        }
194        None
195    }
196
197    async fn wait_for_transition_qc(
198        &self,
199    ) -> Result<
200        Option<(
201            QuorumCertificate2<TYPES>,
202            NextEpochQuorumCertificate2<TYPES>,
203        )>,
204    > {
205        ensure!(
206            self.upgrade_lock.epochs_enabled(self.view_number).await,
207            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
208        );
209
210        let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
211
212        let wait_duration = Duration::from_millis(self.timeout / 2);
213
214        let mut rx = self.receiver.clone();
215
216        // drain any qc off the queue
217        // We don't watch for EpochRootQcRecv events here because it's not in transition.
218        while let Ok(event) = rx.try_recv() {
219            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
220                if let Some(block_number) = qc.data.block_number {
221                    if !is_transition_block(block_number, self.epoch_height) {
222                        continue;
223                    }
224                } else {
225                    continue;
226                }
227                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
228                    continue;
229                };
230                if validate_qc_and_next_epoch_qc(
231                    qc,
232                    Some(next_epoch_qc),
233                    &self.consensus,
234                    &self.membership.coordinator,
235                    &self.upgrade_lock,
236                    self.epoch_height,
237                )
238                .await
239                .is_ok()
240                    && transition_qc
241                        .as_ref()
242                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
243                {
244                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
245                }
246            }
247        }
248        // TODO configure timeout
249        while self.view_start_time.elapsed() < wait_duration {
250            let time_spent = Instant::now()
251                .checked_duration_since(self.view_start_time)
252                .ok_or(error!(
253                    "Time elapsed since the start of the task is negative. This should never \
254                     happen."
255                ))?;
256            let time_left = wait_duration
257                .checked_sub(time_spent)
258                .ok_or(info!("No time left"))?;
259            let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
260                return Ok(transition_qc);
261            };
262            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
263                if let Some(block_number) = qc.data.block_number {
264                    if !is_transition_block(block_number, self.epoch_height) {
265                        continue;
266                    }
267                } else {
268                    continue;
269                }
270                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
271                    continue;
272                };
273                if validate_qc_and_next_epoch_qc(
274                    qc,
275                    Some(next_epoch_qc),
276                    &self.consensus,
277                    &self.membership.coordinator,
278                    &self.upgrade_lock,
279                    self.epoch_height,
280                )
281                .await
282                .is_ok()
283                    && transition_qc
284                        .as_ref()
285                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
286                {
287                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
288                }
289            }
290        }
291        Ok(transition_qc)
292    }
293    /// Waits for the configured timeout for nodes to send HighQc messages to us.  We'll
294    /// then propose with the highest QC from among these proposals. A light client state
295    /// update certificate is also returned if the highest QC is an epoch root QC.
296    async fn wait_for_highest_qc(
297        &self,
298    ) -> Result<(
299        QuorumCertificate2<TYPES>,
300        Option<NextEpochQuorumCertificate2<TYPES>>,
301        Option<LightClientStateUpdateCertificate<TYPES>>,
302    )> {
303        tracing::debug!("waiting for QC");
304        // If we haven't upgraded to Hotstuff 2 just return the high qc right away
305        ensure!(
306            self.upgrade_lock.epochs_enabled(self.view_number).await,
307            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
308        );
309
310        let consensus_reader = self.consensus.read().await;
311        let mut highest_qc = consensus_reader.high_qc().clone();
312        let mut state_cert = if highest_qc
313            .data
314            .block_number
315            .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
316        {
317            consensus_reader.state_cert().cloned()
318        } else {
319            None
320        };
321        let mut next_epoch_qc = if highest_qc
322            .data
323            .block_number
324            .is_some_and(|bn| is_last_block(bn, self.epoch_height))
325        {
326            let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
327            if maybe_neqc
328                .as_ref()
329                .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
330            {
331                maybe_neqc
332            } else {
333                None
334            }
335        } else {
336            None
337        };
338        drop(consensus_reader);
339
340        let wait_duration = Duration::from_millis(self.timeout / 2);
341
342        let mut rx = self.receiver.clone();
343
344        // drain any qc off the queue
345        while let Ok(event) = rx.try_recv() {
346            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
347                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
348                    (qc, maybe_next_epoch_qc, None)
349                },
350                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
351                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
352                },
353                _ => continue,
354            };
355            if validate_qc_and_next_epoch_qc(
356                qc,
357                maybe_next_epoch_qc.as_ref(),
358                &self.consensus,
359                &self.membership.coordinator,
360                &self.upgrade_lock,
361                self.epoch_height,
362            )
363            .await
364            .is_ok()
365            {
366                if qc
367                    .data
368                    .block_number
369                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
370                {
371                    // Validate the state cert
372                    if let Some(state_cert) = &maybe_state_cert {
373                        if validate_light_client_state_update_certificate(
374                            state_cert,
375                            &self.membership.coordinator,
376                        )
377                        .await
378                        .is_err()
379                            || !check_qc_state_cert_correspondence(
380                                qc,
381                                state_cert,
382                                self.epoch_height,
383                            )
384                        {
385                            tracing::error!("Failed to validate state cert");
386                            continue;
387                        }
388                    } else {
389                        tracing::error!(
390                            "Received an epoch root QC but we don't have the corresponding state \
391                             cert."
392                        );
393                        continue;
394                    }
395                } else {
396                    maybe_state_cert = None;
397                }
398                if qc.view_number() > highest_qc.view_number() {
399                    highest_qc = qc.clone();
400                    next_epoch_qc = maybe_next_epoch_qc.clone();
401                    state_cert = maybe_state_cert;
402                }
403            }
404        }
405
406        // TODO configure timeout
407        while self.view_start_time.elapsed() < wait_duration {
408            let time_spent = Instant::now()
409                .checked_duration_since(self.view_start_time)
410                .ok_or(error!(
411                    "Time elapsed since the start of the task is negative. This should never \
412                     happen."
413                ))?;
414            let time_left = wait_duration
415                .checked_sub(time_spent)
416                .ok_or(info!("No time left"))?;
417            let Ok(maybe_qc_state_cert) =
418                tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
419            else {
420                tracing::info!(
421                    "Some nodes did not respond with their HighQc in time. Continuing with the \
422                     highest QC that we received: {highest_qc:?}"
423                );
424                return Ok((highest_qc, next_epoch_qc, state_cert));
425            };
426            let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
427                continue;
428            };
429            if qc.view_number() > highest_qc.view_number() {
430                highest_qc = qc;
431                next_epoch_qc = maybe_next_epoch_qc;
432                state_cert = maybe_state_cert;
433            }
434        }
435        Ok((highest_qc, next_epoch_qc, state_cert))
436    }
437    /// Publishes a proposal given the [`CommitmentAndMetadata`], [`VidDisperse`]
438    /// and high qc [`hotshot_types::simple_certificate::QuorumCertificate`],
439    /// with optional [`ViewChangeEvidence`].
440    #[allow(clippy::too_many_arguments)]
441    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
442    async fn publish_proposal(
443        &self,
444        commitment_and_metadata: CommitmentAndMetadata<TYPES>,
445        _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
446        view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
447        formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
448        parent_qc: QuorumCertificate2<TYPES>,
449        maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
450        maybe_state_cert: Option<LightClientStateUpdateCertificate<TYPES>>,
451    ) -> Result<()> {
452        let (parent_leaf, state) = parent_leaf_and_state(
453            &self.sender,
454            &self.receiver,
455            self.membership.coordinator.clone(),
456            self.public_key.clone(),
457            self.private_key.clone(),
458            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
459            &self.upgrade_lock,
460            &parent_qc,
461            self.epoch_height,
462        )
463        .await?;
464
465        // In order of priority, we should try to attach:
466        //   - the parent certificate if it exists, or
467        //   - our own certificate that we formed.
468        // In either case, we need to ensure that the certificate is still relevant.
469        //
470        // Note: once we reach a point of potentially propose with our formed upgrade certificate,
471        // we will ALWAYS drop it. If we cannot immediately use it for whatever reason, we choose
472        // to discard it.
473        //
474        // It is possible that multiple nodes form separate upgrade certificates for the some
475        // upgrade if we are not careful about voting. But this shouldn't bother us: the first
476        // leader to propose is the one whose certificate will be used. And if that fails to reach
477        // a decide for whatever reason, we may lose our own certificate, but something will likely
478        // have gone wrong there anyway.
479        let mut upgrade_certificate = parent_leaf
480            .upgrade_certificate()
481            .or(formed_upgrade_certificate);
482
483        if let Some(cert) = upgrade_certificate.clone() {
484            if cert.is_relevant(self.view_number).await.is_err() {
485                upgrade_certificate = None;
486            }
487        }
488
489        let proposal_certificate = view_change_evidence
490            .as_ref()
491            .filter(|cert| cert.is_valid_for_view(&self.view_number))
492            .cloned();
493
494        ensure!(
495            commitment_and_metadata.block_view == self.view_number,
496            "Cannot propose because our VID payload commitment and metadata is for an older view."
497        );
498
499        let version = self.upgrade_lock.version(self.view_number).await?;
500
501        let builder_commitment = commitment_and_metadata.builder_commitment.clone();
502        let metadata = commitment_and_metadata.metadata.clone();
503
504        if version >= V::Epochs::VERSION
505            && parent_qc.view_number()
506                > self
507                    .upgrade_lock
508                    .upgrade_view()
509                    .await
510                    .unwrap_or(TYPES::View::new(0))
511        {
512            let Some(parent_block_number) = parent_qc.data.block_number else {
513                tracing::error!("Parent QC does not have a block number. Do not propose.");
514                return Ok(());
515            };
516            if is_epoch_transition(parent_block_number, self.epoch_height)
517                && !is_last_block(parent_block_number, self.epoch_height)
518            {
519                let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
520                tracing::info!("Reached end of epoch.");
521                ensure!(
522                    builder_commitment == empty_payload.builder_commitment(&metadata)
523                        && metadata == empty_metadata,
524                    "We're trying to propose non empty block in the epoch transition. Do not \
525                     propose. View number: {}. Parent Block number: {}",
526                    self.view_number,
527                    parent_block_number,
528                );
529            }
530            if is_epoch_root(parent_block_number, self.epoch_height) {
531                ensure!(
532                    maybe_state_cert.as_ref().is_some_and(|state_cert| {
533                        check_qc_state_cert_correspondence(
534                            &parent_qc,
535                            state_cert,
536                            self.epoch_height,
537                        )
538                    }),
539                    "We are proposing with parent epoch root QC but we don't have the \
540                     corresponding state cert."
541                );
542            }
543        }
544        let block_header = TYPES::BlockHeader::new(
545            state.as_ref(),
546            self.instance_state.as_ref(),
547            &parent_leaf,
548            commitment_and_metadata.commitment,
549            builder_commitment,
550            metadata,
551            commitment_and_metadata.fees.first().clone(),
552            version,
553            *self.view_number,
554        )
555        .await
556        .wrap()
557        .context(warn!("Failed to construct block header"))?;
558        let epoch = option_epoch_from_block_number::<TYPES>(
559            version >= V::Epochs::VERSION,
560            block_header.block_number(),
561            self.epoch_height,
562        );
563
564        let epoch_membership = self
565            .membership
566            .coordinator
567            .membership_for_epoch(epoch)
568            .await?;
569        // Make sure we are the leader for the view and epoch.
570        // We might have ended up here because we were in the epoch transition.
571        if epoch_membership.leader(self.view_number).await? != self.public_key {
572            tracing::warn!(
573                "We are not the leader in the epoch for which we are about to propose. Do not \
574                 send the quorum proposal."
575            );
576            return Ok(());
577        }
578        let is_high_qc_for_transition_block = parent_qc
579            .data
580            .block_number
581            .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
582        let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number).await
583            && is_high_qc_for_transition_block
584        {
585            ensure!(
586                maybe_next_epoch_qc
587                    .as_ref()
588                    .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
589                "Jusify QC on our proposal is for an epoch transition block but we don't have the \
590                 corresponding next epoch QC. Do not propose."
591            );
592            maybe_next_epoch_qc
593        } else {
594            None
595        };
596        let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
597        {
598            if let Some(epoch_val) = &epoch {
599                self.consensus
600                    .read()
601                    .await
602                    .drb_results
603                    .results
604                    .get(&(*epoch_val + 1))
605                    .copied()
606            } else {
607                None
608            }
609        } else {
610            None
611        };
612
613        let proposal = QuorumProposalWrapper {
614            proposal: QuorumProposal2 {
615                block_header,
616                view_number: self.view_number,
617                epoch,
618                justify_qc: parent_qc,
619                next_epoch_justify_qc: next_epoch_qc,
620                upgrade_certificate,
621                view_change_evidence: proposal_certificate,
622                next_drb_result,
623                state_cert: maybe_state_cert,
624            },
625        };
626
627        let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
628        ensure!(
629            proposed_leaf.parent_commitment() == parent_leaf.commit(),
630            "Proposed leaf parent does not equal high qc"
631        );
632
633        let signature =
634            TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
635                .wrap()
636                .context(error!("Failed to compute proposed_leaf.commit()"))?;
637
638        let message = Proposal {
639            data: proposal,
640            signature,
641            _pd: PhantomData,
642        };
643        tracing::info!(
644            "Sending proposal for view {}, height {}, justify_qc view: {}",
645            proposed_leaf.view_number(),
646            proposed_leaf.height(),
647            proposed_leaf.justify_qc().view_number()
648        );
649
650        broadcast_event(
651            Arc::new(HotShotEvent::QuorumProposalSend(
652                message.clone(),
653                self.public_key.clone(),
654            )),
655            &self.sender,
656        )
657        .await;
658
659        Ok(())
660    }
661
662    fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
663        let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
664        tracing::warn!("Failed to propose, events: {:#?}", events);
665    }
666
667    async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
668        let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
669        let mut timeout_certificate = None;
670        let mut view_sync_finalize_cert = None;
671        let mut vid_share = None;
672        let mut parent_qc = None;
673        let mut next_epoch_qc = None;
674        let mut state_cert = None;
675        for event in res.iter().flatten().flatten() {
676            match event.as_ref() {
677                HotShotEvent::SendPayloadCommitmentAndMetadata(
678                    payload_commitment,
679                    builder_commitment,
680                    metadata,
681                    view,
682                    fees,
683                ) => {
684                    commit_and_metadata = Some(CommitmentAndMetadata {
685                        commitment: *payload_commitment,
686                        builder_commitment: builder_commitment.clone(),
687                        metadata: metadata.clone(),
688                        fees: fees.clone(),
689                        block_view: *view,
690                    });
691                },
692                HotShotEvent::Qc2Formed(cert) => match cert {
693                    either::Right(timeout) => {
694                        timeout_certificate = Some(timeout.clone());
695                    },
696                    either::Left(qc) => {
697                        parent_qc = Some(qc.clone());
698                    },
699                },
700                HotShotEvent::EpochRootQcFormed(root_qc) => {
701                    parent_qc = Some(root_qc.qc.clone());
702                    state_cert = Some(root_qc.state_cert.clone());
703                },
704                HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
705                    view_sync_finalize_cert = Some(cert.clone());
706                },
707                HotShotEvent::VidDisperseSend(share, _) => {
708                    vid_share = Some(share.clone());
709                },
710                HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
711                    next_epoch_qc = Some(qc.clone());
712                },
713                _ => {},
714            }
715        }
716
717        let Ok(version) = self.upgrade_lock.version(self.view_number).await else {
718            bail!(error!(
719                "Failed to get version for view {:?}, not proposing",
720                self.view_number
721            ));
722        };
723
724        let mut maybe_epoch = None;
725        let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
726            maybe_epoch = view_sync_cert.data.epoch;
727            Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
728        } else {
729            match timeout_certificate {
730                Some(timeout_cert) => {
731                    maybe_epoch = timeout_cert.data.epoch;
732                    Some(ViewChangeEvidence2::Timeout(timeout_cert))
733                },
734                None => None,
735            }
736        };
737
738        let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
739            if qc
740                .data
741                .block_number
742                .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
743                && next_epoch_qc
744                    .as_ref()
745                    .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
746            {
747                bail!(error!(
748                    "We've formed a transition QC but we haven't formed the corresponding next \
749                     epoch QC. Do not propose."
750                ));
751            }
752            (qc, next_epoch_qc, state_cert)
753        } else if version < V::Epochs::VERSION {
754            (self.consensus.read().await.high_qc().clone(), None, None)
755        } else if proposal_cert.is_some() {
756            // If we have a view change evidence, we need to wait to propose with the transition QC
757            if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
758                let Some(epoch) = maybe_epoch else {
759                    bail!(error!(
760                        "No epoch found on view change evidence, but we are in epoch mode"
761                    ));
762                };
763                if qc
764                    .data
765                    .block_number
766                    .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
767                {
768                    (qc, Some(next_epoch_qc), None)
769                } else {
770                    match self.wait_for_highest_qc().await {
771                        Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
772                            (qc, maybe_next_epoch_qc, maybe_state_cert)
773                        },
774                        Err(e) => {
775                            bail!(error!("Error while waiting for highest QC: {e:?}"));
776                        },
777                    }
778                }
779            } else {
780                let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
781                    self.wait_for_highest_qc().await
782                else {
783                    bail!(error!("Error while waiting for highest QC"));
784                };
785                if qc.data.block_number.is_some_and(|bn| {
786                    is_epoch_transition(bn, self.epoch_height)
787                        && !is_last_block(bn, self.epoch_height)
788                }) {
789                    bail!(error!(
790                        "High is in transition but we need to propose with transition QC, do \
791                         nothing"
792                    ));
793                }
794                (qc, maybe_next_epoch_qc, maybe_state_cert)
795            }
796        } else {
797            match self.wait_for_highest_qc().await {
798                Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
799                    (qc, maybe_next_epoch_qc, maybe_state_cert)
800                },
801                Err(e) => {
802                    bail!(error!("Error while waiting for highest QC: {e:?}"));
803                },
804            }
805        };
806
807        ensure!(
808            commit_and_metadata.is_some(),
809            error!(
810                "Somehow completed the proposal dependency task without a commitment and metadata"
811            )
812        );
813
814        ensure!(
815            vid_share.is_some(),
816            error!("Somehow completed the proposal dependency task without a VID share")
817        );
818
819        self.publish_proposal(
820            commit_and_metadata.unwrap(),
821            vid_share.unwrap(),
822            proposal_cert,
823            self.formed_upgrade_certificate.clone(),
824            parent_qc,
825            maybe_next_epoch_qc,
826            maybe_state_cert,
827        )
828        .await
829    }
830}
831
832impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<TYPES, V> {
833    type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
834
835    #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
836    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
837    async fn handle_dep_result(self, res: Self::Output) {
838        let result = self.handle_proposal_deps(&res).await;
839        if result.is_err() {
840            log!(result);
841            self.print_proposal_events(&res)
842        }
843    }
844}
845
846pub(super) async fn handle_eqc_formed<
847    TYPES: NodeType,
848    I: NodeImplementation<TYPES>,
849    V: Versions,
850>(
851    cert_view: TYPES::View,
852    leaf_commit: Commitment<Leaf2<TYPES>>,
853    block_number: Option<u64>,
854    task_state: &mut QuorumProposalTaskState<TYPES, I, V>,
855    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
856) {
857    if !task_state.upgrade_lock.epochs_enabled(cert_view).await {
858        tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
859        return;
860    }
861    if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
862        tracing::debug!("We formed QC but not eQC. Do nothing");
863        return;
864    }
865
866    let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
867        tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
868        return;
869    };
870    if current_epoch_qc.view_number() != cert_view
871        || current_epoch_qc.data.leaf_commit != leaf_commit
872    {
873        tracing::debug!("We haven't yet formed the eQC. Do nothing");
874        return;
875    }
876    let Some(next_epoch_qc) = task_state
877        .formed_next_epoch_quorum_certificates
878        .get(&cert_view)
879    else {
880        tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
881        return;
882    };
883    if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
884        tracing::debug!(
885            "We formed the eQC but the current and next epoch QCs do not correspond to each other."
886        );
887        return;
888    }
889    let current_epoch_qc_clone = current_epoch_qc.clone();
890
891    let mut consensus_writer = task_state.consensus.write().await;
892    let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
893    let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
894    drop(consensus_writer);
895
896    task_state.formed_quorum_certificates =
897        task_state.formed_quorum_certificates.split_off(&cert_view);
898    task_state.formed_next_epoch_quorum_certificates = task_state
899        .formed_next_epoch_quorum_certificates
900        .split_off(&cert_view);
901
902    broadcast_event(
903        Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
904        event_sender,
905    )
906    .await;
907}