hotshot_task_impls/
helpers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{HashMap, HashSet},
9    sync::Arc,
10    time::Instant,
11};
12
13use alloy::{
14    primitives::{FixedBytes, U256},
15    sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24    consensus::OuterConsensus,
25    data::{
26        EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2,
27        ViewNumber,
28    },
29    drb::DrbResult,
30    epoch_membership::EpochMembershipCoordinator,
31    event::{Event, EventType, LeafInfo},
32    light_client::{CircuitField, LightClientState, StakeTableState},
33    message::{Proposal, UpgradeLock},
34    request_response::ProposalRequestPayload,
35    simple_certificate::{
36        CertificatePair, DaCertificate2, LightClientStateUpdateCertificateV2,
37        NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
38    },
39    simple_vote::HasEpoch,
40    stake_table::StakeTableEntries,
41    traits::{
42        BlockPayload, ValidatedState,
43        block_contents::BlockHeader,
44        election::Membership,
45        node_implementation::{NodeImplementation, NodeType},
46        signature_key::{
47            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
48        },
49        storage::Storage,
50    },
51    utils::{
52        Terminator, View, ViewInner, epoch_from_block_number, is_epoch_root, is_epoch_transition,
53        is_transition_block, option_epoch_from_block_number,
54    },
55    vote::{Certificate, HasViewNumber},
56};
57use hotshot_utils::anytrace::*;
58use time::OffsetDateTime;
59use tokio::time::timeout;
60use tracing::instrument;
61use versions::EPOCH_VERSION;
62
63use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
64
65/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
66#[instrument(skip_all)]
67#[allow(clippy::too_many_arguments)]
68pub(crate) async fn fetch_proposal<TYPES: NodeType>(
69    qc: &QuorumCertificate2<TYPES>,
70    event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
71    event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
72    membership_coordinator: EpochMembershipCoordinator<TYPES>,
73    consensus: OuterConsensus<TYPES>,
74    sender_public_key: TYPES::SignatureKey,
75    sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
76    upgrade_lock: &UpgradeLock<TYPES>,
77    epoch_height: u64,
78) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
79    let view_number = qc.view_number();
80    let leaf_commit = qc.data.leaf_commit;
81    // We need to be able to sign this request before submitting it to the network. Compute the
82    // payload first.
83    let signed_proposal_request = ProposalRequestPayload {
84        view_number,
85        key: sender_public_key,
86    };
87
88    // Finally, compute the signature for the payload.
89    let signature = TYPES::SignatureKey::sign(
90        &sender_private_key,
91        signed_proposal_request.commit().as_ref(),
92    )
93    .wrap()
94    .context(error!("Failed to sign proposal. This should never happen."))?;
95
96    tracing::info!("Sending proposal request for view {view_number}");
97
98    // First, broadcast that we need a proposal to the current leader
99    broadcast_event(
100        HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
101        &event_sender,
102    )
103    .await;
104
105    let mut rx = event_receiver.clone();
106    // Make a background task to await the arrival of the event data.
107    let Ok(Some(proposal)) =
108        // We want to explicitly timeout here so we aren't waiting around for the data.
109        timeout(REQUEST_TIMEOUT, async move {
110            // We want to iterate until the proposal is not None, or until we reach the timeout.
111            while let Ok(event) = rx.recv_direct().await {
112                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
113                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
114                    if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
115                        return Some(quorum_proposal.clone());
116                    }
117                }
118            }
119            None
120        })
121        .await
122    else {
123        bail!("Request for proposal failed");
124    };
125
126    let view_number = proposal.data.view_number();
127    let justify_qc = proposal.data.justify_qc().clone();
128
129    let justify_qc_epoch = justify_qc.data.epoch();
130
131    let epoch_membership = membership_coordinator
132        .stake_table_for_epoch(justify_qc_epoch)
133        .await?;
134    let membership_stake_table = epoch_membership.stake_table().await;
135    let membership_success_threshold = epoch_membership.success_threshold().await;
136
137    justify_qc
138        .is_valid_cert(
139            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
140            membership_success_threshold,
141            upgrade_lock,
142        )
143        .await
144        .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
145
146    let mut consensus_writer = consensus.write().await;
147    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
148    let state = Arc::new(
149        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
150    );
151
152    if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
153        tracing::trace!("{e:?}");
154    }
155    let view = View {
156        view_inner: ViewInner::Leaf {
157            leaf: leaf.commit(),
158            state,
159            delta: None,
160            epoch: leaf.epoch(epoch_height),
161        },
162    };
163    Ok((leaf, view))
164}
165pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
166    membership: &Arc<RwLock<TYPES::Membership>>,
167    epoch: EpochNumber,
168    storage: &I::Storage,
169    drb_result: DrbResult,
170) {
171    tracing::debug!("Calling store_drb_result for epoch {epoch}");
172    if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
173        tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
174    }
175
176    membership.write().await.add_drb_result(epoch, drb_result)
177}
178
179/// Verify the DRB result from the proposal for the next epoch if this is the last block of the
180/// current epoch.
181///
182/// Uses the result from `start_drb_task`.
183///
184/// Returns an error if we should not vote.
185pub(crate) async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
186    proposal: &QuorumProposalWrapper<TYPES>,
187    validation_info: &ValidationInfo<TYPES, I>,
188) -> Result<()> {
189    // Skip if this is not the expected block.
190    if validation_info.epoch_height == 0
191        || !is_epoch_transition(
192            proposal.block_header().block_number(),
193            validation_info.epoch_height,
194        )
195    {
196        tracing::debug!("Skipping DRB result verification");
197        return Ok(());
198    }
199
200    // #3967 REVIEW NOTE: Check if this is the right way to decide if we're doing epochs
201    // Alternatively, should we just return Err() if epochs aren't happening here? Or can we assume
202    // that epochs are definitely happening by virtue of getting here?
203    let epoch = option_epoch_from_block_number(
204        validation_info
205            .upgrade_lock
206            .epochs_enabled(proposal.view_number()),
207        proposal.block_header().block_number(),
208        validation_info.epoch_height,
209    );
210
211    let proposal_result = proposal
212        .next_drb_result()
213        .context(info!("Proposal is missing the next epoch's DRB result."))?;
214
215    if let Some(epoch_val) = epoch {
216        let current_epoch_membership = validation_info
217            .membership
218            .coordinator
219            .stake_table_for_epoch(epoch)
220            .await
221            .context(warn!("No stake table for epoch {}", epoch_val))?;
222
223        let has_stake_current_epoch = current_epoch_membership
224            .has_stake(&validation_info.public_key)
225            .await;
226
227        if has_stake_current_epoch {
228            let computed_result = current_epoch_membership
229                .next_epoch()
230                .await
231                .context(warn!("No stake table for epoch {}", epoch_val + 1))?
232                .get_epoch_drb()
233                .await
234                .clone()
235                .context(warn!("DRB result not found"))?;
236
237            ensure!(
238                proposal_result == computed_result,
239                warn!(
240                    "Our calculated DRB result is {computed_result:?}, which does not match the \
241                     proposed DRB result of {proposal_result:?}"
242                )
243            );
244        }
245
246        Ok(())
247    } else {
248        Err(error!("Epochs are not available"))
249    }
250}
251
252/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
253async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
254    decided_leaf: &Leaf2<TYPES>,
255    epoch_height: u64,
256    membership: &EpochMembershipCoordinator<TYPES>,
257    storage: &I::Storage,
258    consensus: &OuterConsensus<TYPES>,
259) {
260    let decided_leaf = decided_leaf.clone();
261    let decided_block_number = decided_leaf.block_header().block_number();
262
263    // Skip if this is not the expected block.
264    if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
265        let next_epoch_number =
266            EpochNumber::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
267
268        let start = Instant::now();
269        if let Err(e) = storage
270            .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
271            .await
272        {
273            tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
274        }
275        tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
276
277        let membership = membership.clone();
278        let decided_block_header = decided_leaf.block_header().clone();
279        let storage = storage.clone();
280        let consensus = consensus.clone();
281
282        let consensus_reader = consensus.read().await;
283
284        drop(consensus_reader);
285
286        tokio::spawn(async move {
287            let membership_clone = membership.clone();
288
289            // First add the epoch root to `membership`
290            {
291                let start = Instant::now();
292                if let Err(e) = Membership::add_epoch_root(
293                    Arc::clone(membership_clone.membership()),
294                    decided_block_header,
295                )
296                .await
297                {
298                    tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
299                }
300                tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
301            }
302
303            let membership_clone = membership.clone();
304
305            let drb_result = membership_clone
306                .compute_drb_result(next_epoch_number, decided_leaf.clone())
307                .await;
308
309            let drb_result = match drb_result {
310                Ok(result) => result,
311                Err(e) => {
312                    tracing::error!("Failed to compute DRB result from decide: {e}");
313                    return;
314                },
315            };
316
317            let start = Instant::now();
318            handle_drb_result::<TYPES, I>(
319                membership.membership(),
320                next_epoch_number,
321                &storage,
322                drb_result,
323            )
324            .await;
325            tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
326        });
327    }
328}
329
330/// Helper type to give names and to the output values of the leaf chain traversal operation.
331#[derive(Debug)]
332pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
333    /// The new locked view obtained from a 2 chain starting from the proposal's parent.
334    pub new_locked_view_number: Option<ViewNumber>,
335
336    /// The new decided view obtained from a 3 chain starting from the proposal's parent.
337    pub new_decided_view_number: Option<ViewNumber>,
338
339    /// The QC signing the latest new leaf, causing it to become committed.
340    ///
341    /// Only present if a new leaf chain has been decided.
342    pub committing_qc: Option<CertificatePair<TYPES>>,
343
344    /// A second QC extending the committing QC, causing the new leaf chain to become decided.
345    ///
346    /// This is only applicable in HotStuff2, and will be [`None`] prior to HotShot version 0.3.
347    /// HotStuff1 (HotShot < 0.3) uses a different commit rule, which is not captured in this type.
348    pub deciding_qc: Option<CertificatePair<TYPES>>,
349
350    /// The decided leaves with corresponding validated state and VID info.
351    pub leaf_views: Vec<LeafInfo<TYPES>>,
352
353    /// The transactions in the block payload for each leaf.
354    pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
355
356    /// The most recent upgrade certificate from one of the leaves.
357    pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
358}
359
360/// We need Default to be implemented because the leaf ascension has very few failure branches,
361/// and when they *do* happen, we still return intermediate states. Default makes the burden
362/// of filling values easier.
363impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
364    /// The default method for this type is to set all of the returned values to `None`.
365    fn default() -> Self {
366        Self {
367            new_locked_view_number: None,
368            new_decided_view_number: None,
369            committing_qc: None,
370            deciding_qc: None,
371            leaf_views: Vec::new(),
372            included_txns: None,
373            decided_upgrade_cert: None,
374        }
375    }
376}
377
378async fn update_metrics<TYPES: NodeType>(
379    consensus: &OuterConsensus<TYPES>,
380    leaf_views: &[LeafInfo<TYPES>],
381) {
382    let consensus_reader = consensus.read().await;
383    let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
384
385    for leaf_view in leaf_views {
386        let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
387
388        let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
389            tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
390            continue;
391        };
392        consensus_reader
393            .metrics
394            .proposal_to_decide_time
395            .add_point(proposal_to_decide_time as f64);
396        if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
397            consensus_reader
398                .metrics
399                .finalized_bytes
400                .add_point(txn_bytes as f64);
401        }
402    }
403}
404
405/// calculate the new decided leaf chain based on the rules of HotStuff 2
406///
407/// # Panics
408/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
409/// impossible.
410#[allow(clippy::too_many_arguments)]
411pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
412    proposal: &QuorumProposalWrapper<TYPES>,
413    consensus: OuterConsensus<TYPES>,
414    upgrade_lock: &UpgradeLock<TYPES>,
415    public_key: &TYPES::SignatureKey,
416    with_epochs: bool,
417    membership: &EpochMembershipCoordinator<TYPES>,
418    storage: &I::Storage,
419) -> LeafChainTraversalOutcome<TYPES> {
420    let mut res = LeafChainTraversalOutcome::default();
421    let consensus_reader = consensus.read().await;
422    let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
423    res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
424
425    // If we don't have the proposals parent return early
426    let Some(parent_info) = consensus_reader
427        .parent_leaf_info(&proposed_leaf, public_key)
428        .await
429    else {
430        return res;
431    };
432    // Get the parents parent and check if it's consecutive in view to the parent, if so we can decided
433    // the grandparents view.  If not we're done.
434    let Some(grand_parent_info) = consensus_reader
435        .parent_leaf_info(&parent_info.leaf, public_key)
436        .await
437    else {
438        return res;
439    };
440    if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
441        return res;
442    }
443    res.committing_qc = Some(CertificatePair::for_parent(&parent_info.leaf));
444    res.deciding_qc = Some(CertificatePair::for_parent(&proposed_leaf));
445    let decided_view_number = grand_parent_info.leaf.view_number();
446    res.new_decided_view_number = Some(decided_view_number);
447    // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it.
448    let old_anchor_view = consensus_reader.last_decided_view();
449    let mut current_leaf_info = Some(grand_parent_info);
450    let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
451    let mut txns = HashSet::new();
452    while current_leaf_info
453        .as_ref()
454        .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
455    {
456        // unwrap is safe, we just checked that he option is some
457        let info = &mut current_leaf_info.unwrap();
458        // Check if there's a new upgrade certificate available.
459        if let Some(cert) = info.leaf.upgrade_certificate()
460            && info.leaf.upgrade_certificate() != existing_upgrade_cert_reader
461        {
462            if cert.data.decide_by < decided_view_number {
463                tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
464            } else {
465                tracing::info!("Reached decide on upgrade certificate: {cert:?}");
466                res.decided_upgrade_cert = Some(cert.clone());
467            }
468        }
469
470        // If the block payload is available for this leaf, include it in
471        // the leaf chain that we send to the client.
472        if let Some(payload) = consensus_reader
473            .saved_payloads()
474            .get(&info.leaf.view_number())
475        {
476            info.leaf
477                .fill_block_payload_unchecked(payload.as_ref().payload.clone());
478        }
479
480        if let Some(ref payload) = info.leaf.block_payload() {
481            for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
482                txns.insert(txn);
483            }
484        }
485
486        current_leaf_info = consensus_reader
487            .parent_leaf_info(&info.leaf, public_key)
488            .await;
489        res.leaf_views.push(info.clone());
490    }
491
492    if !txns.is_empty() {
493        res.included_txns = Some(txns);
494    }
495
496    if with_epochs && res.new_decided_view_number.is_some() {
497        let Some(first_leaf) = res.leaf_views.first() else {
498            return res;
499        };
500        let epoch_height = consensus_reader.epoch_height;
501        consensus_reader
502            .metrics
503            .last_synced_block_height
504            .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
505        drop(consensus_reader);
506
507        for decided_leaf_info in &res.leaf_views {
508            decide_epoch_root::<TYPES, I>(
509                &decided_leaf_info.leaf,
510                epoch_height,
511                membership,
512                storage,
513                &consensus,
514            )
515            .await;
516        }
517        update_metrics(&consensus, &res.leaf_views).await;
518    }
519
520    res
521}
522
523/// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin
524/// by obtaining the parent view, and if we are in a chain (i.e. the next view from the parent is
525/// one view newer), then we begin attempting to form the chain. This is a direct impl from
526/// [HotStuff](https://arxiv.org/pdf/1803.05069) section 5:
527///
528/// > When a node b* carries a QC that refers to a direct parent, i.e., b*.justify.node = b*.parent,
529/// > we say that it forms a One-Chain. Denote by b'' = b*.justify.node. Node b* forms a Two-Chain,
530/// > if in addition to forming a One-Chain, b''.justify.node = b''.parent.
531/// > It forms a Three-Chain, if b'' forms a Two-Chain.
532///
533/// We follow this exact logic to determine if we are able to reach a commit and a decide. A commit
534/// is reached when we have a two chain, and a decide is reached when we have a three chain.
535///
536/// # Example
537/// Suppose we have a decide for view 1, and we then move on to get undecided views 2, 3, and 4. Further,
538/// suppose that our *next* proposal is for view 5, but this leader did not see info for view 4, so the
539/// justify qc of the proposal points to view 3. This is fine, and the undecided chain now becomes
540/// 2-3-5.
541///
542/// Assuming we continue with honest leaders, we then eventually could get a chain like: 2-3-5-6-7-8. This
543/// will prompt a decide event to occur (this code), where the `proposal` is for view 8. Now, since the
544/// lowest value in the 3-chain here would be 5 (excluding 8 since we only walk the parents), we begin at
545/// the first link in the chain, and walk back through all undecided views, making our new anchor view 5,
546/// and out new locked view will be 6.
547///
548/// Upon receipt then of a proposal for view 9, assuming it is valid, this entire process will repeat, and
549/// the anchor view will be set to view 6, with the locked view as view 7.
550///
551/// # Panics
552/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
553/// impossible.
554#[allow(clippy::too_many_arguments)]
555pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
556    proposal: &QuorumProposalWrapper<TYPES>,
557    consensus: OuterConsensus<TYPES>,
558    upgrade_lock: &UpgradeLock<TYPES>,
559    public_key: &TYPES::SignatureKey,
560    with_epochs: bool,
561    membership: &EpochMembershipCoordinator<TYPES>,
562    storage: &I::Storage,
563    epoch_height: u64,
564) -> LeafChainTraversalOutcome<TYPES> {
565    let consensus_reader = consensus.read().await;
566    let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
567    let view_number = proposal.view_number();
568    let parent_view_number = proposal.justify_qc().view_number();
569    let old_anchor_view = consensus_reader.last_decided_view();
570
571    let mut last_view_number_visited = view_number;
572    let mut current_chain_length = 0usize;
573    let mut res = LeafChainTraversalOutcome::default();
574
575    if let Err(e) = consensus_reader.visit_leaf_ancestors(
576        parent_view_number,
577        Terminator::Exclusive(old_anchor_view),
578        true,
579        |leaf, state, delta| {
580            // This is the core paper logic. We're implementing the chain in chained hotstuff.
581            if res.new_decided_view_number.is_none() {
582                // If the last view number is the child of the leaf we've moved to...
583                if last_view_number_visited == leaf.view_number() + 1 {
584                    last_view_number_visited = leaf.view_number();
585
586                    // The chain grows by one
587                    current_chain_length += 1;
588
589                    // We emit a locked view when the chain length is 2
590                    if current_chain_length == 2 {
591                        res.new_locked_view_number = Some(leaf.view_number());
592                        // The next leaf in the chain, if there is one, is decided, so this
593                        // leaf's justify_qc would become the QC for the decided chain.
594                        res.committing_qc = Some(CertificatePair::for_parent(leaf));
595                    } else if current_chain_length == 3 {
596                        // And we decide when the chain length is 3.
597                        res.new_decided_view_number = Some(leaf.view_number());
598                    }
599                } else {
600                    // There isn't a new chain extension available, so we signal to the callback
601                    // owner that we can exit for now.
602                    return false;
603                }
604            }
605
606            // Now, if we *have* reached a decide, we need to do some state updates.
607            if let Some(new_decided_view) = res.new_decided_view_number {
608                // First, get a mutable reference to the provided leaf.
609                let mut leaf = leaf.clone();
610
611                // Update the metrics
612                if leaf.view_number() == new_decided_view {
613                    consensus_reader
614                        .metrics
615                        .last_synced_block_height
616                        .set(usize::try_from(leaf.height()).unwrap_or(0));
617                }
618
619                // Check if there's a new upgrade certificate available.
620                if let Some(cert) = leaf.upgrade_certificate()
621                    && leaf.upgrade_certificate() != existing_upgrade_cert_reader
622                {
623                    if cert.data.decide_by < view_number {
624                        tracing::warn!(
625                            "Failed to decide an upgrade certificate in time. Ignoring."
626                        );
627                    } else {
628                        tracing::info!("Reached decide on upgrade certificate: {cert:?}");
629                        res.decided_upgrade_cert = Some(cert.clone());
630                    }
631                }
632                // If the block payload is available for this leaf, include it in
633                // the leaf chain that we send to the client.
634                if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
635                    leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
636                }
637
638                // Get the VID share at the leaf's view number, corresponding to our key
639                // (if one exists)
640                let vid_share = consensus_reader
641                    .vid_shares()
642                    .get(&leaf.view_number())
643                    .and_then(|key_map| key_map.get(public_key))
644                    .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
645                    .map(|prop| prop.data.clone());
646
647                let state_cert = if leaf.with_epoch
648                    && is_epoch_root(
649                        leaf.block_header().block_number(),
650                        consensus_reader.epoch_height,
651                    ) {
652                    match consensus_reader.state_cert() {
653                        // Sanity check that the state cert is for the same view as the decided leaf
654                        Some(state_cert)
655                            if state_cert.light_client_state.view_number
656                                == leaf.view_number().u64() =>
657                        {
658                            Some(state_cert.clone())
659                        },
660                        _ => None,
661                    }
662                } else {
663                    None
664                };
665
666                // Add our data into a new `LeafInfo`
667                res.leaf_views.push(LeafInfo::new(
668                    leaf.clone(),
669                    Arc::clone(&state),
670                    delta.clone(),
671                    vid_share,
672                    state_cert,
673                ));
674                if let Some(ref payload) = leaf.block_payload() {
675                    res.included_txns = Some(
676                        payload
677                            .transaction_commitments(leaf.block_header().metadata())
678                            .into_iter()
679                            .collect::<HashSet<_>>(),
680                    );
681                }
682            }
683            true
684        },
685    ) {
686        tracing::debug!("Leaf ascension failed; error={e}");
687    }
688
689    let epoch_height = consensus_reader.epoch_height;
690    drop(consensus_reader);
691
692    if with_epochs && res.new_decided_view_number.is_some() {
693        for decided_leaf_info in &res.leaf_views {
694            decide_epoch_root::<TYPES, I>(
695                &decided_leaf_info.leaf,
696                epoch_height,
697                membership,
698                storage,
699                &consensus,
700            )
701            .await;
702        }
703    }
704
705    res
706}
707
708/// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not.
709#[instrument(skip_all)]
710#[allow(clippy::too_many_arguments)]
711pub(crate) async fn parent_leaf_and_state<TYPES: NodeType>(
712    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
713    event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
714    membership: EpochMembershipCoordinator<TYPES>,
715    public_key: TYPES::SignatureKey,
716    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
717    consensus: OuterConsensus<TYPES>,
718    upgrade_lock: &UpgradeLock<TYPES>,
719    parent_qc: &QuorumCertificate2<TYPES>,
720    epoch_height: u64,
721) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
722    let consensus_reader = consensus.read().await;
723    let vsm_contains_parent_view = consensus_reader
724        .validated_state_map()
725        .contains_key(&parent_qc.view_number());
726    drop(consensus_reader);
727
728    if !vsm_contains_parent_view {
729        let _ = fetch_proposal(
730            parent_qc,
731            event_sender.clone(),
732            event_receiver.clone(),
733            membership,
734            consensus.clone(),
735            public_key.clone(),
736            private_key.clone(),
737            upgrade_lock,
738            epoch_height,
739        )
740        .await
741        .context(info!("Failed to fetch proposal"))?;
742    }
743
744    let consensus_reader = consensus.read().await;
745    let parent_view = consensus_reader
746        .validated_state_map()
747        .get(&parent_qc.view_number())
748        .context(debug!(
749            "Couldn't find parent view in state map, waiting for replica to see proposal; \
750             parent_view_number: {}",
751            *parent_qc.view_number()
752        ))?;
753
754    let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
755        "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
756         parent_view {:?}",
757        *parent_qc.view_number(),
758        parent_view
759    ))?;
760
761    if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
762        // NOTE: This happens on the genesis block
763        tracing::debug!(
764            "They don't equal: {:?}   {:?}",
765            leaf_commitment,
766            consensus_reader.high_qc().data().leaf_commit
767        );
768    }
769
770    let leaf = consensus_reader
771        .saved_leaves()
772        .get(&leaf_commitment)
773        .context(info!("Failed to find high QC of parent"))?;
774
775    Ok((leaf.clone(), Arc::clone(state)))
776}
777
778pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
779    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
780    validation_info: &ValidationInfo<TYPES, I>,
781) -> Result<()> {
782    let in_transition_epoch = proposal
783        .data
784        .justify_qc()
785        .data
786        .block_number
787        .is_some_and(|bn| {
788            !is_transition_block(bn, validation_info.epoch_height)
789                && is_epoch_transition(bn, validation_info.epoch_height)
790                && bn % validation_info.epoch_height != 0
791        });
792    let justify_qc = proposal.data.justify_qc();
793    let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
794    if !in_transition_epoch {
795        tracing::debug!(
796            "Storing high QC for view {:?} and height {:?}",
797            justify_qc.view_number(),
798            justify_qc.data.block_number
799        );
800        if let Err(e) = validation_info
801            .storage
802            .update_high_qc2(justify_qc.clone())
803            .await
804        {
805            bail!("Failed to store High QC, not voting; error = {e:?}");
806        }
807        if justify_qc
808            .data
809            .block_number
810            .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
811        {
812            let Some(state_cert) = proposal.data.state_cert() else {
813                bail!("Epoch root QC has no state cert, not voting!");
814            };
815            if let Err(e) = validation_info
816                .storage
817                .update_state_cert(state_cert.clone())
818                .await
819            {
820                bail!(
821                    "Failed to store the light client state update certificate, not voting; error \
822                     = {:?}",
823                    e
824                );
825            }
826            validation_info
827                .consensus
828                .write()
829                .await
830                .update_state_cert(state_cert.clone())?;
831        }
832        if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc
833            && let Err(e) = validation_info
834                .storage
835                .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
836                .await
837        {
838            bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
839        }
840    }
841    let mut consensus_writer = validation_info.consensus.write().await;
842    if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
843        if justify_qc
844            .data
845            .block_number
846            .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
847        {
848            consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
849            consensus_writer
850                .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
851            return Ok(());
852        }
853        consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
854    }
855    consensus_writer.update_high_qc(justify_qc.clone())?;
856
857    Ok(())
858}
859
860async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
861    validation_info: &ValidationInfo<TYPES, I>,
862) -> Option<(
863    QuorumCertificate2<TYPES>,
864    NextEpochQuorumCertificate2<TYPES>,
865)> {
866    validation_info
867        .consensus
868        .read()
869        .await
870        .transition_qc()
871        .cloned()
872}
873
874pub(crate) async fn validate_epoch_transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
875    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
876    validation_info: &ValidationInfo<TYPES, I>,
877) -> Result<()> {
878    let proposed_qc = proposal.data.justify_qc();
879    let Some(qc_block_number) = proposed_qc.data().block_number else {
880        bail!("Justify QC has no block number");
881    };
882    if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
883        || qc_block_number % validation_info.epoch_height == 0
884    {
885        return Ok(());
886    }
887    let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
888        bail!("Next epoch justify QC is not present");
889    };
890    ensure!(
891        next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
892        "Next epoch QC has different leaf commit to justify QC"
893    );
894
895    if is_transition_block(qc_block_number, validation_info.epoch_height) {
896        // Height is epoch height - 2
897        ensure!(
898            transition_qc(validation_info)
899                .await
900                .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
901            "Proposed transition qc must have view number greater than or equal to previous \
902             transition QC"
903        );
904
905        validation_info
906            .consensus
907            .write()
908            .await
909            .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
910        // reset the high qc to the transition qc
911        update_high_qc(proposal, validation_info).await?;
912    } else {
913        // Height is either epoch height - 1 or epoch height
914        ensure!(
915            transition_qc(validation_info)
916                .await
917                .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
918            "Transition block must have view number greater than previous transition QC"
919        );
920        ensure!(
921            proposal.data.view_change_evidence().is_none(),
922            "Second to last block and last block of epoch must directly extend previous block, Qc \
923             Block number: {qc_block_number}, Proposal Block number: {}",
924            proposal.data.block_header().block_number()
925        );
926        ensure!(
927            proposed_qc.view_number() + 1 == proposal.data.view_number()
928                || transition_qc(validation_info)
929                    .await
930                    .is_some_and(|(qc, _)| &qc == proposed_qc),
931            "Transition proposals must extend the previous view directly, or extend the previous \
932             transition block"
933        );
934    }
935    Ok(())
936}
937
938/// Validate the state and safety and liveness of a proposal then emit
939/// a `QuorumProposalValidated` event.
940///
941///
942/// # Errors
943/// If any validation or state update fails.
944#[allow(clippy::too_many_lines)]
945#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
946pub(crate) async fn validate_proposal_safety_and_liveness<
947    TYPES: NodeType,
948    I: NodeImplementation<TYPES>,
949>(
950    proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
951    parent_leaf: Leaf2<TYPES>,
952    validation_info: &ValidationInfo<TYPES, I>,
953    event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
954    sender: TYPES::SignatureKey,
955) -> Result<()> {
956    let view_number = proposal.data.view_number();
957
958    let mut valid_epoch_transition = false;
959    if validation_info
960        .upgrade_lock
961        .version(proposal.data.justify_qc().view_number())
962        .is_ok_and(|v| v >= EPOCH_VERSION)
963    {
964        let Some(block_number) = proposal.data.justify_qc().data.block_number else {
965            bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
966        };
967        if is_epoch_transition(block_number, validation_info.epoch_height) {
968            validate_epoch_transition_qc(&proposal, validation_info).await?;
969            valid_epoch_transition = true;
970        }
971    }
972
973    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
974    ensure!(
975        proposed_leaf.parent_commitment() == parent_leaf.commit(),
976        "Proposed leaf does not extend the parent leaf."
977    );
978    let proposal_epoch = option_epoch_from_block_number(
979        validation_info.upgrade_lock.epochs_enabled(view_number),
980        proposed_leaf.height(),
981        validation_info.epoch_height,
982    );
983
984    let state = Arc::new(
985        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
986    );
987
988    {
989        let mut consensus_writer = validation_info.consensus.write().await;
990        if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
991            tracing::trace!("{e:?}");
992        }
993
994        // Update our internal storage of the proposal. The proposal is valid, so
995        // we swallow this error and just log if it occurs.
996        if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
997            tracing::debug!("Internal proposal update failed; error = {e:#}");
998        };
999    }
1000
1001    UpgradeCertificate::validate(
1002        proposal.data.upgrade_certificate(),
1003        &validation_info.membership,
1004        proposal_epoch,
1005        &validation_info.upgrade_lock,
1006    )
1007    .await?;
1008
1009    // Validate that the upgrade certificate is re-attached, if we saw one on the parent
1010    proposed_leaf.extends_upgrade(&parent_leaf, &validation_info.upgrade_lock)?;
1011
1012    let justify_qc = proposal.data.justify_qc().clone();
1013    // Create a positive vote if either liveness or safety check
1014    // passes.
1015
1016    {
1017        let consensus_reader = validation_info.consensus.read().await;
1018        // Epoch safety check:
1019        // The proposal is safe if
1020        // 1. the proposed block and the justify QC block belong to the same epoch or
1021        // 2. the justify QC is the eQC for the previous block
1022        let justify_qc_epoch = option_epoch_from_block_number(
1023            validation_info.upgrade_lock.epochs_enabled(view_number),
1024            parent_leaf.height(),
1025            validation_info.epoch_height,
1026        );
1027        ensure!(
1028            proposal_epoch == justify_qc_epoch
1029                || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1030            {
1031                error!(
1032                    "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1033                     QC leaf is {parent_leaf:?}"
1034                )
1035            }
1036        );
1037
1038        // Make sure that the epoch transition proposal includes the next epoch QC
1039        if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1040            && validation_info.upgrade_lock.epochs_enabled(view_number)
1041        {
1042            ensure!(
1043                proposal.data.next_epoch_justify_qc().is_some(),
1044                "Epoch transition proposal does not include the next epoch justify QC. Do not \
1045                 vote!"
1046            );
1047        }
1048
1049        // Liveness check.
1050        let liveness_check =
1051            justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1052
1053        // Safety check.
1054        // Check if proposal extends from the locked leaf.
1055        let outcome = consensus_reader.visit_leaf_ancestors(
1056            justify_qc.view_number(),
1057            Terminator::Inclusive(consensus_reader.locked_view()),
1058            false,
1059            |leaf, _, _| {
1060                // if leaf view no == locked view no then we're done, report success by
1061                // returning true
1062                leaf.view_number() != consensus_reader.locked_view()
1063            },
1064        );
1065        let safety_check = outcome.is_ok();
1066
1067        ensure!(safety_check || liveness_check, {
1068            if let Err(e) = outcome {
1069                broadcast_event(
1070                    Event {
1071                        view_number,
1072                        event: EventType::Error { error: Arc::new(e) },
1073                    },
1074                    &validation_info.output_event_stream,
1075                )
1076                .await;
1077            }
1078
1079            error!(
1080                "Failed safety and liveness check \n High QC is {:?}  Proposal QC is {:?}  Locked \
1081                 view is {:?}",
1082                consensus_reader.high_qc(),
1083                proposal.data,
1084                consensus_reader.locked_view()
1085            )
1086        });
1087    }
1088
1089    // We accept the proposal, notify the application layer
1090    broadcast_event(
1091        Event {
1092            view_number,
1093            event: EventType::QuorumProposal {
1094                proposal: proposal.clone(),
1095                sender,
1096            },
1097        },
1098        &validation_info.output_event_stream,
1099    )
1100    .await;
1101
1102    // Notify other tasks
1103    broadcast_event(
1104        Arc::new(HotShotEvent::QuorumProposalValidated(
1105            proposal.clone(),
1106            parent_leaf,
1107        )),
1108        &event_stream,
1109    )
1110    .await;
1111
1112    Ok(())
1113}
1114
1115/// Validates, from a given `proposal` that the view that it is being submitted for is valid when
1116/// compared to `cur_view` which is the highest proposed view (so far) for the caller. If the proposal
1117/// is for a view that's later than expected, that the proposal includes a timeout or view sync certificate.
1118///
1119/// # Errors
1120/// If any validation or view number check fails.
1121pub(crate) async fn validate_proposal_view_and_certs<
1122    TYPES: NodeType,
1123    I: NodeImplementation<TYPES>,
1124>(
1125    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1126    validation_info: &ValidationInfo<TYPES, I>,
1127) -> Result<()> {
1128    let view_number = proposal.data.view_number();
1129    ensure!(
1130        view_number + 1 >= validation_info.consensus.read().await.cur_view(),
1131        "Proposal is from an older view {:?}",
1132        proposal.data
1133    );
1134
1135    // Validate the proposal's signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment
1136    let mut membership = validation_info.membership.clone();
1137    proposal.validate_signature(&membership).await?;
1138
1139    // Verify a timeout certificate OR a view sync certificate exists and is valid.
1140    if proposal.data.justify_qc().view_number() != view_number - 1 {
1141        let received_proposal_cert =
1142            proposal
1143                .data
1144                .view_change_evidence()
1145                .clone()
1146                .context(debug!(
1147                    "Quorum proposal for view {view_number} needed a timeout or view sync \
1148                     certificate, but did not have one",
1149                ))?;
1150
1151        match received_proposal_cert {
1152            ViewChangeEvidence2::Timeout(timeout_cert) => {
1153                ensure!(
1154                    timeout_cert.data().view == view_number - 1,
1155                    "Timeout certificate for view {view_number} was not for the immediately \
1156                     preceding view"
1157                );
1158                let timeout_cert_epoch = timeout_cert.data().epoch();
1159                membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1160
1161                let membership_stake_table = membership.stake_table().await;
1162                let membership_success_threshold = membership.success_threshold().await;
1163
1164                timeout_cert
1165                    .is_valid_cert(
1166                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1167                        membership_success_threshold,
1168                        &validation_info.upgrade_lock,
1169                    )
1170                    .await
1171                    .context(|e| {
1172                        warn!("Timeout certificate for view {view_number} was invalid: {e}")
1173                    })?;
1174            },
1175            ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1176                ensure!(
1177                    view_sync_cert.view_number == view_number,
1178                    "View sync cert view number {:?} does not match proposal view number {:?}",
1179                    view_sync_cert.view_number,
1180                    view_number
1181                );
1182
1183                let view_sync_cert_epoch = view_sync_cert.data().epoch();
1184                membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1185
1186                let membership_stake_table = membership.stake_table().await;
1187                let membership_success_threshold = membership.success_threshold().await;
1188
1189                // View sync certs must also be valid.
1190                view_sync_cert
1191                    .is_valid_cert(
1192                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1193                        membership_success_threshold,
1194                        &validation_info.upgrade_lock,
1195                    )
1196                    .await
1197                    .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1198            },
1199        }
1200    }
1201
1202    // Validate the upgrade certificate -- this is just a signature validation.
1203    // Note that we don't do anything with the certificate directly if this passes; it eventually gets stored as part of the leaf if nothing goes wrong.
1204    {
1205        let epoch = option_epoch_from_block_number(
1206            proposal.data.epoch().is_some(),
1207            proposal.data.block_header().block_number(),
1208            validation_info.epoch_height,
1209        );
1210        UpgradeCertificate::validate(
1211            proposal.data.upgrade_certificate(),
1212            &validation_info.membership,
1213            epoch,
1214            &validation_info.upgrade_lock,
1215        )
1216        .await?;
1217    }
1218
1219    Ok(())
1220}
1221
1222/// Helper function to send events and log errors
1223pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1224    match sender.broadcast_direct(event).await {
1225        Ok(None) => (),
1226        Ok(Some(overflowed)) => {
1227            tracing::error!(
1228                "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1229            );
1230        },
1231        Err(SendError(e)) => {
1232            tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1233        },
1234    }
1235}
1236
1237/// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it
1238/// corresponds to the provided high_qc.
1239pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType>(
1240    qc: &QuorumCertificate2<TYPES>,
1241    maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1242    consensus: &OuterConsensus<TYPES>,
1243    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1244    upgrade_lock: &UpgradeLock<TYPES>,
1245    epoch_height: u64,
1246) -> Result<()> {
1247    let cert = CertificatePair::new(qc.clone(), maybe_next_epoch_qc.cloned());
1248
1249    let mut epoch_membership = membership_coordinator
1250        .stake_table_for_epoch(cert.epoch())
1251        .await?;
1252
1253    let membership_stake_table = epoch_membership.stake_table().await;
1254    let membership_success_threshold = epoch_membership.success_threshold().await;
1255
1256    if let Err(e) = cert
1257        .qc()
1258        .is_valid_cert(
1259            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1260            membership_success_threshold,
1261            upgrade_lock,
1262        )
1263        .await
1264    {
1265        consensus.read().await.metrics.invalid_qc.update(1);
1266        return Err(warn!("Invalid certificate: {e}"));
1267    }
1268
1269    // Check the next epoch QC if required.
1270    if upgrade_lock.epochs_enabled(cert.view_number())
1271        && let Some(next_epoch_qc) = cert.verify_next_epoch_qc(epoch_height)?
1272    {
1273        epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1274        let membership_next_stake_table = epoch_membership.stake_table().await;
1275        let membership_next_success_threshold = epoch_membership.success_threshold().await;
1276        next_epoch_qc
1277            .is_valid_cert(
1278                &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1279                membership_next_success_threshold,
1280                upgrade_lock,
1281            )
1282            .await
1283            .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1284    }
1285
1286    Ok(())
1287}
1288
1289/// Validates the light client state update certificate
1290pub async fn validate_light_client_state_update_certificate<TYPES: NodeType>(
1291    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1292    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1293    upgrade_lock: &UpgradeLock<TYPES>,
1294) -> Result<()> {
1295    tracing::debug!("Validating light client state update certificate");
1296
1297    let epoch_membership = membership_coordinator
1298        .membership_for_epoch(state_cert.epoch())
1299        .await?;
1300
1301    let membership_stake_table = epoch_membership.stake_table().await;
1302    let membership_success_threshold = epoch_membership.success_threshold().await;
1303
1304    let mut state_key_map = HashMap::new();
1305    membership_stake_table.into_iter().for_each(|config| {
1306        state_key_map.insert(
1307            config.state_ver_key.clone(),
1308            config.stake_table_entry.stake(),
1309        );
1310    });
1311
1312    let mut accumulated_stake = U256::from(0);
1313    let signed_state_digest = derive_signed_state_digest(
1314        &state_cert.light_client_state,
1315        &state_cert.next_stake_table_state,
1316        &state_cert.auth_root,
1317    );
1318    for (key, sig, sig_v2) in state_cert.signatures.iter() {
1319        if let Some(stake) = state_key_map.get(key) {
1320            accumulated_stake += *stake;
1321            #[allow(clippy::collapsible_else_if)]
1322            // We only perform the second signature check prior to the DrbAndHeaderUpgrade
1323            if !upgrade_lock
1324                .proposal2_version(ViewNumber::new(state_cert.light_client_state.view_number))
1325            {
1326                if !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1327                    key,
1328                    sig_v2,
1329                    &state_cert.light_client_state,
1330                    &state_cert.next_stake_table_state,
1331                ) {
1332                    bail!("Invalid light client state update certificate signature");
1333                }
1334            } else {
1335                if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1336                    key,
1337                    sig,
1338                    signed_state_digest,
1339                ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1340                    key,
1341                    sig_v2,
1342                    &state_cert.light_client_state,
1343                    &state_cert.next_stake_table_state,
1344                ) {
1345                    bail!("Invalid light client state update certificate signature");
1346                }
1347            }
1348        } else {
1349            bail!("Invalid light client state update certificate signature");
1350        }
1351    }
1352    if accumulated_stake < membership_success_threshold {
1353        bail!("Light client state update certificate does not meet the success threshold");
1354    }
1355
1356    Ok(())
1357}
1358
1359pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1360    qc: &QuorumCertificate2<TYPES>,
1361    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1362    epoch_height: u64,
1363) -> bool {
1364    qc.data
1365        .block_number
1366        .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1367        && Some(state_cert.epoch) == qc.data.epoch()
1368        && qc.view_number().u64() == state_cert.light_client_state.view_number
1369}
1370
1371/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state;
1372/// makes sure it corresponds to the given DA certificate;
1373/// if it's not yet available, waits for it with the given timeout.
1374pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1375    target_epoch: Option<EpochNumber>,
1376    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1377    da_cert: &DaCertificate2<TYPES>,
1378    consensus: &OuterConsensus<TYPES>,
1379    receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1380    cancel_receiver: Receiver<()>,
1381    id: u64,
1382) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1383    tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1384    let maybe_second_vid_share = consensus
1385        .read()
1386        .await
1387        .vid_shares()
1388        .get(&vid_share.data.view_number())
1389        .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1390        .and_then(|epoch_map| epoch_map.get(&target_epoch))
1391        .cloned();
1392    if let Some(second_vid_share) = maybe_second_vid_share
1393        && ((target_epoch == da_cert.epoch()
1394            && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1395            || (target_epoch != da_cert.epoch()
1396                && Some(second_vid_share.data.payload_commitment())
1397                    == da_cert.data().next_epoch_payload_commit))
1398    {
1399        return Ok(second_vid_share);
1400    }
1401
1402    let receiver = receiver.clone();
1403    let da_cert_clone = da_cert.clone();
1404    let Some(event) = EventDependency::new(
1405        receiver,
1406        cancel_receiver,
1407        format!(
1408            "VoteDependency Second VID share for view {:?}, my id {:?}",
1409            vid_share.data.view_number(),
1410            id
1411        ),
1412        Box::new(move |event| {
1413            let event = event.as_ref();
1414            if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1415                if target_epoch == da_cert_clone.epoch() {
1416                    second_vid_share.data.payload_commitment()
1417                        == da_cert_clone.data().payload_commit
1418                } else {
1419                    Some(second_vid_share.data.payload_commitment())
1420                        == da_cert_clone.data().next_epoch_payload_commit
1421                }
1422            } else {
1423                false
1424            }
1425        }),
1426    )
1427    .completed()
1428    .await
1429    else {
1430        return Err(warn!("Error while waiting for the second VID share."));
1431    };
1432    let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1433        // this shouldn't happen
1434        return Err(warn!(
1435            "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1436             possible."
1437        ));
1438    };
1439    Ok(second_vid_share.clone())
1440}
1441
1442pub async fn broadcast_view_change<TYPES: NodeType>(
1443    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1444    new_view_number: ViewNumber,
1445    epoch: Option<EpochNumber>,
1446    first_epoch: Option<(ViewNumber, EpochNumber)>,
1447) {
1448    let mut broadcast_epoch = epoch;
1449    if let Some((first_epoch_view, first_epoch)) = first_epoch
1450        && new_view_number == first_epoch_view
1451        && broadcast_epoch != Some(first_epoch)
1452    {
1453        broadcast_epoch = Some(first_epoch);
1454    }
1455    tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1456    broadcast_event(
1457        Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1458        sender,
1459    )
1460    .await
1461}
1462
1463pub fn derive_signed_state_digest(
1464    lc_state: &LightClientState,
1465    next_stake_state: &StakeTableState,
1466    auth_root: &FixedBytes<32>,
1467) -> CircuitField {
1468    let lc_state_sol: LightClientStateSol = (*lc_state).into();
1469    let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1470
1471    let res = alloy::primitives::keccak256(
1472        (
1473            lc_state_sol.abi_encode(),
1474            stake_st_sol.abi_encode(),
1475            auth_root.abi_encode(),
1476        )
1477            .abi_encode_packed(),
1478    );
1479    CircuitField::from_be_bytes_mod_order(res.as_ref())
1480}