hotshot_task_impls/
helpers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{HashMap, HashSet},
9    sync::Arc,
10    time::Instant,
11};
12
13use alloy::{
14    primitives::{FixedBytes, U256},
15    sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24    consensus::OuterConsensus,
25    data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
26    drb::DrbResult,
27    epoch_membership::EpochMembershipCoordinator,
28    event::{Event, EventType, LeafInfo},
29    light_client::{CircuitField, LightClientState, StakeTableState},
30    message::{Proposal, UpgradeLock},
31    request_response::ProposalRequestPayload,
32    simple_certificate::{
33        DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
34        QuorumCertificate2, UpgradeCertificate,
35    },
36    simple_vote::HasEpoch,
37    stake_table::StakeTableEntries,
38    traits::{
39        block_contents::BlockHeader,
40        election::Membership,
41        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
42        signature_key::{
43            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
44        },
45        storage::Storage,
46        BlockPayload, ValidatedState,
47    },
48    utils::{
49        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_transition_block,
50        option_epoch_from_block_number, Terminator, View, ViewInner,
51    },
52    vote::{Certificate, HasViewNumber},
53};
54use hotshot_utils::anytrace::*;
55use time::OffsetDateTime;
56use tokio::time::timeout;
57use tracing::instrument;
58use vbs::version::StaticVersionType;
59
60use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
61
62/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
63#[instrument(skip_all)]
64#[allow(clippy::too_many_arguments)]
65pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
66    qc: &QuorumCertificate2<TYPES>,
67    event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
68    event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
69    membership_coordinator: EpochMembershipCoordinator<TYPES>,
70    consensus: OuterConsensus<TYPES>,
71    sender_public_key: TYPES::SignatureKey,
72    sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
73    upgrade_lock: &UpgradeLock<TYPES, V>,
74    epoch_height: u64,
75) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
76    let view_number = qc.view_number();
77    let leaf_commit = qc.data.leaf_commit;
78    // We need to be able to sign this request before submitting it to the network. Compute the
79    // payload first.
80    let signed_proposal_request = ProposalRequestPayload {
81        view_number,
82        key: sender_public_key,
83    };
84
85    // Finally, compute the signature for the payload.
86    let signature = TYPES::SignatureKey::sign(
87        &sender_private_key,
88        signed_proposal_request.commit().as_ref(),
89    )
90    .wrap()
91    .context(error!("Failed to sign proposal. This should never happen."))?;
92
93    tracing::info!("Sending proposal request for view {view_number}");
94
95    // First, broadcast that we need a proposal to the current leader
96    broadcast_event(
97        HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
98        &event_sender,
99    )
100    .await;
101
102    let mut rx = event_receiver.clone();
103    // Make a background task to await the arrival of the event data.
104    let Ok(Some(proposal)) =
105        // We want to explicitly timeout here so we aren't waiting around for the data.
106        timeout(REQUEST_TIMEOUT, async move {
107            // We want to iterate until the proposal is not None, or until we reach the timeout.
108            while let Ok(event) = rx.recv_direct().await {
109                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
110                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
111                    if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
112                        return Some(quorum_proposal.clone());
113                    }
114                }
115            }
116            None
117        })
118        .await
119    else {
120        bail!("Request for proposal failed");
121    };
122
123    let view_number = proposal.data.view_number();
124    let justify_qc = proposal.data.justify_qc().clone();
125
126    let justify_qc_epoch = justify_qc.data.epoch();
127
128    let epoch_membership = membership_coordinator
129        .stake_table_for_epoch(justify_qc_epoch)
130        .await?;
131    let membership_stake_table = epoch_membership.stake_table().await;
132    let membership_success_threshold = epoch_membership.success_threshold().await;
133
134    justify_qc
135        .is_valid_cert(
136            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
137            membership_success_threshold,
138            upgrade_lock,
139        )
140        .await
141        .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
142
143    let mut consensus_writer = consensus.write().await;
144    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
145    let state = Arc::new(
146        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
147    );
148
149    if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
150        tracing::trace!("{e:?}");
151    }
152    let view = View {
153        view_inner: ViewInner::Leaf {
154            leaf: leaf.commit(),
155            state,
156            delta: None,
157            epoch: leaf.epoch(epoch_height),
158        },
159    };
160    Ok((leaf, view))
161}
162pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
163    membership: &Arc<RwLock<TYPES::Membership>>,
164    epoch: TYPES::Epoch,
165    storage: &I::Storage,
166    drb_result: DrbResult,
167) {
168    tracing::debug!("Calling store_drb_result for epoch {epoch}");
169    if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
170        tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
171    }
172
173    membership.write().await.add_drb_result(epoch, drb_result)
174}
175
176/// Verify the DRB result from the proposal for the next epoch if this is the last block of the
177/// current epoch.
178///
179/// Uses the result from `start_drb_task`.
180///
181/// Returns an error if we should not vote.
182pub(crate) async fn verify_drb_result<
183    TYPES: NodeType,
184    I: NodeImplementation<TYPES>,
185    V: Versions,
186>(
187    proposal: &QuorumProposalWrapper<TYPES>,
188    validation_info: &ValidationInfo<TYPES, I, V>,
189) -> Result<()> {
190    // Skip if this is not the expected block.
191    if validation_info.epoch_height == 0
192        || !is_epoch_transition(
193            proposal.block_header().block_number(),
194            validation_info.epoch_height,
195        )
196    {
197        tracing::debug!("Skipping DRB result verification");
198        return Ok(());
199    }
200
201    // #3967 REVIEW NOTE: Check if this is the right way to decide if we're doing epochs
202    // Alternatively, should we just return Err() if epochs aren't happening here? Or can we assume
203    // that epochs are definitely happening by virtue of getting here?
204    let epoch = option_epoch_from_block_number::<TYPES>(
205        validation_info
206            .upgrade_lock
207            .epochs_enabled(proposal.view_number())
208            .await,
209        proposal.block_header().block_number(),
210        validation_info.epoch_height,
211    );
212
213    let proposal_result = proposal
214        .next_drb_result()
215        .context(info!("Proposal is missing the next epoch's DRB result."))?;
216
217    if let Some(epoch_val) = epoch {
218        let current_epoch_membership = validation_info
219            .membership
220            .coordinator
221            .stake_table_for_epoch(epoch)
222            .await
223            .context(warn!("No stake table for epoch {}", epoch_val))?;
224
225        let has_stake_current_epoch = current_epoch_membership
226            .has_stake(&validation_info.public_key)
227            .await;
228
229        if has_stake_current_epoch {
230            let computed_result = current_epoch_membership
231                .next_epoch()
232                .await
233                .context(warn!("No stake table for epoch {}", epoch_val + 1))?
234                .get_epoch_drb()
235                .await
236                .clone()
237                .context(warn!("DRB result not found"))?;
238
239            ensure!(
240                proposal_result == computed_result,
241                warn!(
242                    "Our calculated DRB result is {computed_result:?}, which does not match the \
243                     proposed DRB result of {proposal_result:?}"
244                )
245            );
246        }
247
248        Ok(())
249    } else {
250        Err(error!("Epochs are not available"))
251    }
252}
253
254/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
255async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
256    decided_leaf: &Leaf2<TYPES>,
257    epoch_height: u64,
258    membership: &EpochMembershipCoordinator<TYPES>,
259    storage: &I::Storage,
260    consensus: &OuterConsensus<TYPES>,
261) {
262    let decided_leaf = decided_leaf.clone();
263    let decided_block_number = decided_leaf.block_header().block_number();
264
265    // Skip if this is not the expected block.
266    if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
267        let next_epoch_number =
268            TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
269
270        let start = Instant::now();
271        if let Err(e) = storage
272            .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
273            .await
274        {
275            tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
276        }
277        tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
278
279        let membership = membership.clone();
280        let decided_block_header = decided_leaf.block_header().clone();
281        let storage = storage.clone();
282        let consensus = consensus.clone();
283
284        let consensus_reader = consensus.read().await;
285
286        drop(consensus_reader);
287
288        tokio::spawn(async move {
289            let membership_clone = membership.clone();
290            let epoch_root_future = tokio::spawn(async move {
291                let start = Instant::now();
292                if let Err(e) = Membership::add_epoch_root(
293                    Arc::clone(membership_clone.membership()),
294                    next_epoch_number,
295                    decided_block_header,
296                )
297                .await
298                {
299                    tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
300                }
301                tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
302            });
303
304            let membership_clone = membership.clone();
305
306            let drb_result_future = tokio::spawn(async move {
307                membership_clone
308                    .compute_drb_result(next_epoch_number, decided_leaf.clone())
309                    .await
310            });
311
312            let (_, drb_result) = tokio::join!(epoch_root_future, drb_result_future);
313
314            let drb_result = match drb_result {
315                Ok(Ok(result)) => result,
316                Err(e) => {
317                    tracing::error!("Failed to compute DRB result from decide: {e}");
318                    return;
319                },
320                Ok(Err(e)) => {
321                    tracing::error!("Failed to compute DRB result from decide: {e}");
322                    return;
323                },
324            };
325
326            let start = Instant::now();
327            handle_drb_result::<TYPES, I>(
328                membership.membership(),
329                next_epoch_number,
330                &storage,
331                drb_result,
332            )
333            .await;
334            tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
335        });
336    }
337}
338
339/// Helper type to give names and to the output values of the leaf chain traversal operation.
340#[derive(Debug)]
341pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
342    /// The new locked view obtained from a 2 chain starting from the proposal's parent.
343    pub new_locked_view_number: Option<TYPES::View>,
344
345    /// The new decided view obtained from a 3 chain starting from the proposal's parent.
346    pub new_decided_view_number: Option<TYPES::View>,
347
348    /// The qc for the decided chain.
349    pub new_decide_qc: Option<QuorumCertificate2<TYPES>>,
350
351    /// The decided leaves with corresponding validated state and VID info.
352    pub leaf_views: Vec<LeafInfo<TYPES>>,
353
354    /// The transactions in the block payload for each leaf.
355    pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
356
357    /// The most recent upgrade certificate from one of the leaves.
358    pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
359}
360
361/// We need Default to be implemented because the leaf ascension has very few failure branches,
362/// and when they *do* happen, we still return intermediate states. Default makes the burden
363/// of filling values easier.
364impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
365    /// The default method for this type is to set all of the returned values to `None`.
366    fn default() -> Self {
367        Self {
368            new_locked_view_number: None,
369            new_decided_view_number: None,
370            new_decide_qc: None,
371            leaf_views: Vec::new(),
372            included_txns: None,
373            decided_upgrade_cert: None,
374        }
375    }
376}
377
378async fn update_metrics<TYPES: NodeType>(
379    consensus: &OuterConsensus<TYPES>,
380    leaf_views: &[LeafInfo<TYPES>],
381) {
382    let consensus_reader = consensus.read().await;
383    let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
384
385    for leaf_view in leaf_views {
386        let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
387
388        let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
389            tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
390            continue;
391        };
392        consensus_reader
393            .metrics
394            .proposal_to_decide_time
395            .add_point(proposal_to_decide_time as f64);
396        if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
397            consensus_reader
398                .metrics
399                .finalized_bytes
400                .add_point(txn_bytes as f64);
401        }
402    }
403}
404
405/// calculate the new decided leaf chain based on the rules of HotStuff 2
406///
407/// # Panics
408/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
409/// impossible.
410#[allow(clippy::too_many_arguments)]
411pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
412    proposal: &QuorumProposalWrapper<TYPES>,
413    consensus: OuterConsensus<TYPES>,
414    existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
415    public_key: &TYPES::SignatureKey,
416    with_epochs: bool,
417    membership: &EpochMembershipCoordinator<TYPES>,
418    storage: &I::Storage,
419) -> LeafChainTraversalOutcome<TYPES> {
420    let mut res = LeafChainTraversalOutcome::default();
421    let consensus_reader = consensus.read().await;
422    let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
423    res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
424
425    // If we don't have the proposals parent return early
426    let Some(parent_info) = consensus_reader
427        .parent_leaf_info(&proposed_leaf, public_key)
428        .await
429    else {
430        return res;
431    };
432    // Get the parents parent and check if it's consecutive in view to the parent, if so we can decided
433    // the grandparents view.  If not we're done.
434    let Some(grand_parent_info) = consensus_reader
435        .parent_leaf_info(&parent_info.leaf, public_key)
436        .await
437    else {
438        return res;
439    };
440    if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
441        return res;
442    }
443    res.new_decide_qc = Some(parent_info.leaf.justify_qc().clone());
444    let decided_view_number = grand_parent_info.leaf.view_number();
445    res.new_decided_view_number = Some(decided_view_number);
446    // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it.
447    let old_anchor_view = consensus_reader.last_decided_view();
448    let mut current_leaf_info = Some(grand_parent_info);
449    let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
450    let mut txns = HashSet::new();
451    while current_leaf_info
452        .as_ref()
453        .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
454    {
455        // unwrap is safe, we just checked that he option is some
456        let info = &mut current_leaf_info.unwrap();
457        // Check if there's a new upgrade certificate available.
458        if let Some(cert) = info.leaf.upgrade_certificate() {
459            if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
460                if cert.data.decide_by < decided_view_number {
461                    tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
462                } else {
463                    tracing::info!("Reached decide on upgrade certificate: {cert:?}");
464                    res.decided_upgrade_cert = Some(cert.clone());
465                }
466            }
467        }
468
469        // If the block payload is available for this leaf, include it in
470        // the leaf chain that we send to the client.
471        if let Some(payload) = consensus_reader
472            .saved_payloads()
473            .get(&info.leaf.view_number())
474        {
475            info.leaf
476                .fill_block_payload_unchecked(payload.as_ref().payload.clone());
477        }
478
479        if let Some(ref payload) = info.leaf.block_payload() {
480            for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
481                txns.insert(txn);
482            }
483        }
484
485        current_leaf_info = consensus_reader
486            .parent_leaf_info(&info.leaf, public_key)
487            .await;
488        res.leaf_views.push(info.clone());
489    }
490
491    if !txns.is_empty() {
492        res.included_txns = Some(txns);
493    }
494
495    if with_epochs && res.new_decided_view_number.is_some() {
496        let Some(first_leaf) = res.leaf_views.first() else {
497            return res;
498        };
499        let epoch_height = consensus_reader.epoch_height;
500        consensus_reader
501            .metrics
502            .last_synced_block_height
503            .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
504        drop(consensus_reader);
505
506        for decided_leaf_info in &res.leaf_views {
507            decide_epoch_root::<TYPES, I>(
508                &decided_leaf_info.leaf,
509                epoch_height,
510                membership,
511                storage,
512                &consensus,
513            )
514            .await;
515        }
516        update_metrics(&consensus, &res.leaf_views).await;
517    }
518
519    res
520}
521
522/// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin
523/// by obtaining the parent view, and if we are in a chain (i.e. the next view from the parent is
524/// one view newer), then we begin attempting to form the chain. This is a direct impl from
525/// [HotStuff](https://arxiv.org/pdf/1803.05069) section 5:
526///
527/// > When a node b* carries a QC that refers to a direct parent, i.e., b*.justify.node = b*.parent,
528/// > we say that it forms a One-Chain. Denote by b'' = b*.justify.node. Node b* forms a Two-Chain,
529/// > if in addition to forming a One-Chain, b''.justify.node = b''.parent.
530/// > It forms a Three-Chain, if b'' forms a Two-Chain.
531///
532/// We follow this exact logic to determine if we are able to reach a commit and a decide. A commit
533/// is reached when we have a two chain, and a decide is reached when we have a three chain.
534///
535/// # Example
536/// Suppose we have a decide for view 1, and we then move on to get undecided views 2, 3, and 4. Further,
537/// suppose that our *next* proposal is for view 5, but this leader did not see info for view 4, so the
538/// justify qc of the proposal points to view 3. This is fine, and the undecided chain now becomes
539/// 2-3-5.
540///
541/// Assuming we continue with honest leaders, we then eventually could get a chain like: 2-3-5-6-7-8. This
542/// will prompt a decide event to occur (this code), where the `proposal` is for view 8. Now, since the
543/// lowest value in the 3-chain here would be 5 (excluding 8 since we only walk the parents), we begin at
544/// the first link in the chain, and walk back through all undecided views, making our new anchor view 5,
545/// and out new locked view will be 6.
546///
547/// Upon receipt then of a proposal for view 9, assuming it is valid, this entire process will repeat, and
548/// the anchor view will be set to view 6, with the locked view as view 7.
549///
550/// # Panics
551/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
552/// impossible.
553#[allow(clippy::too_many_arguments)]
554pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
555    proposal: &QuorumProposalWrapper<TYPES>,
556    consensus: OuterConsensus<TYPES>,
557    existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
558    public_key: &TYPES::SignatureKey,
559    with_epochs: bool,
560    membership: &EpochMembershipCoordinator<TYPES>,
561    storage: &I::Storage,
562    epoch_height: u64,
563) -> LeafChainTraversalOutcome<TYPES> {
564    let consensus_reader = consensus.read().await;
565    let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
566    let view_number = proposal.view_number();
567    let parent_view_number = proposal.justify_qc().view_number();
568    let old_anchor_view = consensus_reader.last_decided_view();
569
570    let mut last_view_number_visited = view_number;
571    let mut current_chain_length = 0usize;
572    let mut res = LeafChainTraversalOutcome::default();
573
574    if let Err(e) = consensus_reader.visit_leaf_ancestors(
575        parent_view_number,
576        Terminator::Exclusive(old_anchor_view),
577        true,
578        |leaf, state, delta| {
579            // This is the core paper logic. We're implementing the chain in chained hotstuff.
580            if res.new_decided_view_number.is_none() {
581                // If the last view number is the child of the leaf we've moved to...
582                if last_view_number_visited == leaf.view_number() + 1 {
583                    last_view_number_visited = leaf.view_number();
584
585                    // The chain grows by one
586                    current_chain_length += 1;
587
588                    // We emit a locked view when the chain length is 2
589                    if current_chain_length == 2 {
590                        res.new_locked_view_number = Some(leaf.view_number());
591                        // The next leaf in the chain, if there is one, is decided, so this
592                        // leaf's justify_qc would become the QC for the decided chain.
593                        res.new_decide_qc = Some(leaf.justify_qc().clone());
594                    } else if current_chain_length == 3 {
595                        // And we decide when the chain length is 3.
596                        res.new_decided_view_number = Some(leaf.view_number());
597                    }
598                } else {
599                    // There isn't a new chain extension available, so we signal to the callback
600                    // owner that we can exit for now.
601                    return false;
602                }
603            }
604
605            // Now, if we *have* reached a decide, we need to do some state updates.
606            if let Some(new_decided_view) = res.new_decided_view_number {
607                // First, get a mutable reference to the provided leaf.
608                let mut leaf = leaf.clone();
609
610                // Update the metrics
611                if leaf.view_number() == new_decided_view {
612                    consensus_reader
613                        .metrics
614                        .last_synced_block_height
615                        .set(usize::try_from(leaf.height()).unwrap_or(0));
616                }
617
618                // Check if there's a new upgrade certificate available.
619                if let Some(cert) = leaf.upgrade_certificate() {
620                    if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
621                        if cert.data.decide_by < view_number {
622                            tracing::warn!(
623                                "Failed to decide an upgrade certificate in time. Ignoring."
624                            );
625                        } else {
626                            tracing::info!("Reached decide on upgrade certificate: {cert:?}");
627                            res.decided_upgrade_cert = Some(cert.clone());
628                        }
629                    }
630                }
631                // If the block payload is available for this leaf, include it in
632                // the leaf chain that we send to the client.
633                if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
634                    leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
635                }
636
637                // Get the VID share at the leaf's view number, corresponding to our key
638                // (if one exists)
639                let vid_share = consensus_reader
640                    .vid_shares()
641                    .get(&leaf.view_number())
642                    .and_then(|key_map| key_map.get(public_key))
643                    .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
644                    .map(|prop| prop.data.clone());
645
646                let state_cert = if leaf.with_epoch
647                    && is_epoch_root(
648                        leaf.block_header().block_number(),
649                        consensus_reader.epoch_height,
650                    ) {
651                    match consensus_reader.state_cert() {
652                        // Sanity check that the state cert is for the same view as the decided leaf
653                        Some(state_cert)
654                            if state_cert.light_client_state.view_number
655                                == leaf.view_number().u64() =>
656                        {
657                            Some(state_cert.clone())
658                        },
659                        _ => None,
660                    }
661                } else {
662                    None
663                };
664
665                // Add our data into a new `LeafInfo`
666                res.leaf_views.push(LeafInfo::new(
667                    leaf.clone(),
668                    Arc::clone(&state),
669                    delta.clone(),
670                    vid_share,
671                    state_cert,
672                ));
673                if let Some(ref payload) = leaf.block_payload() {
674                    res.included_txns = Some(
675                        payload
676                            .transaction_commitments(leaf.block_header().metadata())
677                            .into_iter()
678                            .collect::<HashSet<_>>(),
679                    );
680                }
681            }
682            true
683        },
684    ) {
685        tracing::debug!("Leaf ascension failed; error={e}");
686    }
687
688    let epoch_height = consensus_reader.epoch_height;
689    drop(consensus_reader);
690
691    if with_epochs && res.new_decided_view_number.is_some() {
692        for decided_leaf_info in &res.leaf_views {
693            decide_epoch_root::<TYPES, I>(
694                &decided_leaf_info.leaf,
695                epoch_height,
696                membership,
697                storage,
698                &consensus,
699            )
700            .await;
701        }
702    }
703
704    res
705}
706
707/// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not.
708#[instrument(skip_all)]
709#[allow(clippy::too_many_arguments)]
710pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
711    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
712    event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
713    membership: EpochMembershipCoordinator<TYPES>,
714    public_key: TYPES::SignatureKey,
715    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
716    consensus: OuterConsensus<TYPES>,
717    upgrade_lock: &UpgradeLock<TYPES, V>,
718    parent_qc: &QuorumCertificate2<TYPES>,
719    epoch_height: u64,
720) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
721    let consensus_reader = consensus.read().await;
722    let vsm_contains_parent_view = consensus_reader
723        .validated_state_map()
724        .contains_key(&parent_qc.view_number());
725    drop(consensus_reader);
726
727    if !vsm_contains_parent_view {
728        let _ = fetch_proposal(
729            parent_qc,
730            event_sender.clone(),
731            event_receiver.clone(),
732            membership,
733            consensus.clone(),
734            public_key.clone(),
735            private_key.clone(),
736            upgrade_lock,
737            epoch_height,
738        )
739        .await
740        .context(info!("Failed to fetch proposal"))?;
741    }
742
743    let consensus_reader = consensus.read().await;
744    let parent_view = consensus_reader
745        .validated_state_map()
746        .get(&parent_qc.view_number())
747        .context(debug!(
748            "Couldn't find parent view in state map, waiting for replica to see proposal; \
749             parent_view_number: {}",
750            *parent_qc.view_number()
751        ))?;
752
753    let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
754        "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
755         parent_view {:?}",
756        *parent_qc.view_number(),
757        parent_view
758    ))?;
759
760    if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
761        // NOTE: This happens on the genesis block
762        tracing::debug!(
763            "They don't equal: {:?}   {:?}",
764            leaf_commitment,
765            consensus_reader.high_qc().data().leaf_commit
766        );
767    }
768
769    let leaf = consensus_reader
770        .saved_leaves()
771        .get(&leaf_commitment)
772        .context(info!("Failed to find high QC of parent"))?;
773
774    Ok((leaf.clone(), Arc::clone(state)))
775}
776
777pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
778    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
779    validation_info: &ValidationInfo<TYPES, I, V>,
780) -> Result<()> {
781    let in_transition_epoch = proposal
782        .data
783        .justify_qc()
784        .data
785        .block_number
786        .is_some_and(|bn| {
787            !is_transition_block(bn, validation_info.epoch_height)
788                && is_epoch_transition(bn, validation_info.epoch_height)
789                && bn % validation_info.epoch_height != 0
790        });
791    let justify_qc = proposal.data.justify_qc();
792    let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
793    if !in_transition_epoch {
794        tracing::debug!(
795            "Storing high QC for view {:?} and height {:?}",
796            justify_qc.view_number(),
797            justify_qc.data.block_number
798        );
799        if let Err(e) = validation_info
800            .storage
801            .update_high_qc2(justify_qc.clone())
802            .await
803        {
804            bail!("Failed to store High QC, not voting; error = {e:?}");
805        }
806        if justify_qc
807            .data
808            .block_number
809            .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
810        {
811            let Some(state_cert) = proposal.data.state_cert() else {
812                bail!("Epoch root QC has no state cert, not voting!");
813            };
814            if let Err(e) = validation_info
815                .storage
816                .update_state_cert(state_cert.clone())
817                .await
818            {
819                bail!(
820                    "Failed to store the light client state update certificate, not voting; error \
821                     = {:?}",
822                    e
823                );
824            }
825            validation_info
826                .consensus
827                .write()
828                .await
829                .update_state_cert(state_cert.clone())?;
830        }
831        if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
832            if let Err(e) = validation_info
833                .storage
834                .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
835                .await
836            {
837                bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
838            }
839        }
840    }
841    let mut consensus_writer = validation_info.consensus.write().await;
842    if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
843        if justify_qc
844            .data
845            .block_number
846            .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
847        {
848            consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
849            consensus_writer
850                .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
851            return Ok(());
852        }
853        consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
854    }
855    consensus_writer.update_high_qc(justify_qc.clone())?;
856
857    Ok(())
858}
859
860async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
861    validation_info: &ValidationInfo<TYPES, I, V>,
862) -> Option<(
863    QuorumCertificate2<TYPES>,
864    NextEpochQuorumCertificate2<TYPES>,
865)> {
866    validation_info
867        .consensus
868        .read()
869        .await
870        .transition_qc()
871        .cloned()
872}
873
874pub(crate) async fn validate_epoch_transition_qc<
875    TYPES: NodeType,
876    I: NodeImplementation<TYPES>,
877    V: Versions,
878>(
879    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
880    validation_info: &ValidationInfo<TYPES, I, V>,
881) -> Result<()> {
882    let proposed_qc = proposal.data.justify_qc();
883    let Some(qc_block_number) = proposed_qc.data().block_number else {
884        bail!("Justify QC has no block number");
885    };
886    if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
887        || qc_block_number % validation_info.epoch_height == 0
888    {
889        return Ok(());
890    }
891    let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
892        bail!("Next epoch justify QC is not present");
893    };
894    ensure!(
895        next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
896        "Next epoch QC has different leaf commit to justify QC"
897    );
898
899    if is_transition_block(qc_block_number, validation_info.epoch_height) {
900        // Height is epoch height - 2
901        ensure!(
902            transition_qc(validation_info)
903                .await
904                .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
905            "Proposed transition qc must have view number greater than or equal to previous \
906             transition QC"
907        );
908
909        validation_info
910            .consensus
911            .write()
912            .await
913            .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
914        // reset the high qc to the transition qc
915        update_high_qc(proposal, validation_info).await?;
916    } else {
917        // Height is either epoch height - 1 or epoch height
918        ensure!(
919            transition_qc(validation_info)
920                .await
921                .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
922            "Transition block must have view number greater than previous transition QC"
923        );
924        ensure!(
925            proposal.data.view_change_evidence().is_none(),
926            "Second to last block and last block of epoch must directly extend previous block, Qc \
927             Block number: {qc_block_number}, Proposal Block number: {}",
928            proposal.data.block_header().block_number()
929        );
930        ensure!(
931            proposed_qc.view_number() + 1 == proposal.data.view_number()
932                || transition_qc(validation_info)
933                    .await
934                    .is_some_and(|(qc, _)| &qc == proposed_qc),
935            "Transition proposals must extend the previous view directly, or extend the previous \
936             transition block"
937        );
938    }
939    Ok(())
940}
941
942/// Validate the state and safety and liveness of a proposal then emit
943/// a `QuorumProposalValidated` event.
944///
945///
946/// # Errors
947/// If any validation or state update fails.
948#[allow(clippy::too_many_lines)]
949#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
950pub(crate) async fn validate_proposal_safety_and_liveness<
951    TYPES: NodeType,
952    I: NodeImplementation<TYPES>,
953    V: Versions,
954>(
955    proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
956    parent_leaf: Leaf2<TYPES>,
957    validation_info: &ValidationInfo<TYPES, I, V>,
958    event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
959    sender: TYPES::SignatureKey,
960) -> Result<()> {
961    let view_number = proposal.data.view_number();
962
963    let mut valid_epoch_transition = false;
964    if validation_info
965        .upgrade_lock
966        .version(proposal.data.justify_qc().view_number())
967        .await
968        .is_ok_and(|v| v >= V::Epochs::VERSION)
969    {
970        let Some(block_number) = proposal.data.justify_qc().data.block_number else {
971            bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
972        };
973        if is_epoch_transition(block_number, validation_info.epoch_height) {
974            validate_epoch_transition_qc(&proposal, validation_info).await?;
975            valid_epoch_transition = true;
976        }
977    }
978
979    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
980    ensure!(
981        proposed_leaf.parent_commitment() == parent_leaf.commit(),
982        "Proposed leaf does not extend the parent leaf."
983    );
984    let proposal_epoch = option_epoch_from_block_number::<TYPES>(
985        validation_info
986            .upgrade_lock
987            .epochs_enabled(view_number)
988            .await,
989        proposed_leaf.height(),
990        validation_info.epoch_height,
991    );
992
993    let state = Arc::new(
994        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
995    );
996
997    {
998        let mut consensus_writer = validation_info.consensus.write().await;
999        if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
1000            tracing::trace!("{e:?}");
1001        }
1002
1003        // Update our internal storage of the proposal. The proposal is valid, so
1004        // we swallow this error and just log if it occurs.
1005        if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
1006            tracing::debug!("Internal proposal update failed; error = {e:#}");
1007        };
1008    }
1009
1010    UpgradeCertificate::validate(
1011        proposal.data.upgrade_certificate(),
1012        &validation_info.membership,
1013        proposal_epoch,
1014        &validation_info.upgrade_lock,
1015    )
1016    .await?;
1017
1018    // Validate that the upgrade certificate is re-attached, if we saw one on the parent
1019    proposed_leaf
1020        .extends_upgrade(
1021            &parent_leaf,
1022            &validation_info.upgrade_lock.decided_upgrade_certificate,
1023        )
1024        .await?;
1025
1026    let justify_qc = proposal.data.justify_qc().clone();
1027    // Create a positive vote if either liveness or safety check
1028    // passes.
1029
1030    {
1031        let consensus_reader = validation_info.consensus.read().await;
1032        // Epoch safety check:
1033        // The proposal is safe if
1034        // 1. the proposed block and the justify QC block belong to the same epoch or
1035        // 2. the justify QC is the eQC for the previous block
1036        let justify_qc_epoch = option_epoch_from_block_number::<TYPES>(
1037            validation_info
1038                .upgrade_lock
1039                .epochs_enabled(view_number)
1040                .await,
1041            parent_leaf.height(),
1042            validation_info.epoch_height,
1043        );
1044        ensure!(
1045            proposal_epoch == justify_qc_epoch
1046                || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1047            {
1048                error!(
1049                    "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1050                     QC leaf is {parent_leaf:?}"
1051                )
1052            }
1053        );
1054
1055        // Make sure that the epoch transition proposal includes the next epoch QC
1056        if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1057            && validation_info
1058                .upgrade_lock
1059                .epochs_enabled(view_number)
1060                .await
1061        {
1062            ensure!(
1063                proposal.data.next_epoch_justify_qc().is_some(),
1064                "Epoch transition proposal does not include the next epoch justify QC. Do not \
1065                 vote!"
1066            );
1067        }
1068
1069        // Liveness check.
1070        let liveness_check =
1071            justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1072
1073        // Safety check.
1074        // Check if proposal extends from the locked leaf.
1075        let outcome = consensus_reader.visit_leaf_ancestors(
1076            justify_qc.view_number(),
1077            Terminator::Inclusive(consensus_reader.locked_view()),
1078            false,
1079            |leaf, _, _| {
1080                // if leaf view no == locked view no then we're done, report success by
1081                // returning true
1082                leaf.view_number() != consensus_reader.locked_view()
1083            },
1084        );
1085        let safety_check = outcome.is_ok();
1086
1087        ensure!(safety_check || liveness_check, {
1088            if let Err(e) = outcome {
1089                broadcast_event(
1090                    Event {
1091                        view_number,
1092                        event: EventType::Error { error: Arc::new(e) },
1093                    },
1094                    &validation_info.output_event_stream,
1095                )
1096                .await;
1097            }
1098
1099            error!(
1100                "Failed safety and liveness check \n High QC is {:?}  Proposal QC is {:?}  Locked \
1101                 view is {:?}",
1102                consensus_reader.high_qc(),
1103                proposal.data,
1104                consensus_reader.locked_view()
1105            )
1106        });
1107    }
1108
1109    // We accept the proposal, notify the application layer
1110    broadcast_event(
1111        Event {
1112            view_number,
1113            event: EventType::QuorumProposal {
1114                proposal: proposal.clone(),
1115                sender,
1116            },
1117        },
1118        &validation_info.output_event_stream,
1119    )
1120    .await;
1121
1122    // Notify other tasks
1123    broadcast_event(
1124        Arc::new(HotShotEvent::QuorumProposalValidated(
1125            proposal.clone(),
1126            parent_leaf,
1127        )),
1128        &event_stream,
1129    )
1130    .await;
1131
1132    Ok(())
1133}
1134
1135/// Validates, from a given `proposal` that the view that it is being submitted for is valid when
1136/// compared to `cur_view` which is the highest proposed view (so far) for the caller. If the proposal
1137/// is for a view that's later than expected, that the proposal includes a timeout or view sync certificate.
1138///
1139/// # Errors
1140/// If any validation or view number check fails.
1141pub(crate) async fn validate_proposal_view_and_certs<
1142    TYPES: NodeType,
1143    I: NodeImplementation<TYPES>,
1144    V: Versions,
1145>(
1146    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1147    validation_info: &ValidationInfo<TYPES, I, V>,
1148) -> Result<()> {
1149    let view_number = proposal.data.view_number();
1150    ensure!(
1151        view_number >= validation_info.consensus.read().await.cur_view(),
1152        "Proposal is from an older view {:?}",
1153        proposal.data
1154    );
1155
1156    // Validate the proposal's signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment
1157    let mut membership = validation_info.membership.clone();
1158    proposal.validate_signature(&membership).await?;
1159
1160    // Verify a timeout certificate OR a view sync certificate exists and is valid.
1161    if proposal.data.justify_qc().view_number() != view_number - 1 {
1162        let received_proposal_cert =
1163            proposal
1164                .data
1165                .view_change_evidence()
1166                .clone()
1167                .context(debug!(
1168                    "Quorum proposal for view {view_number} needed a timeout or view sync \
1169                     certificate, but did not have one",
1170                ))?;
1171
1172        match received_proposal_cert {
1173            ViewChangeEvidence2::Timeout(timeout_cert) => {
1174                ensure!(
1175                    timeout_cert.data().view == view_number - 1,
1176                    "Timeout certificate for view {view_number} was not for the immediately \
1177                     preceding view"
1178                );
1179                let timeout_cert_epoch = timeout_cert.data().epoch();
1180                membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1181
1182                let membership_stake_table = membership.stake_table().await;
1183                let membership_success_threshold = membership.success_threshold().await;
1184
1185                timeout_cert
1186                    .is_valid_cert(
1187                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1188                        membership_success_threshold,
1189                        &validation_info.upgrade_lock,
1190                    )
1191                    .await
1192                    .context(|e| {
1193                        warn!("Timeout certificate for view {view_number} was invalid: {e}")
1194                    })?;
1195            },
1196            ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1197                ensure!(
1198                    view_sync_cert.view_number == view_number,
1199                    "View sync cert view number {:?} does not match proposal view number {:?}",
1200                    view_sync_cert.view_number,
1201                    view_number
1202                );
1203
1204                let view_sync_cert_epoch = view_sync_cert.data().epoch();
1205                membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1206
1207                let membership_stake_table = membership.stake_table().await;
1208                let membership_success_threshold = membership.success_threshold().await;
1209
1210                // View sync certs must also be valid.
1211                view_sync_cert
1212                    .is_valid_cert(
1213                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1214                        membership_success_threshold,
1215                        &validation_info.upgrade_lock,
1216                    )
1217                    .await
1218                    .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1219            },
1220        }
1221    }
1222
1223    // Validate the upgrade certificate -- this is just a signature validation.
1224    // Note that we don't do anything with the certificate directly if this passes; it eventually gets stored as part of the leaf if nothing goes wrong.
1225    {
1226        let epoch = option_epoch_from_block_number::<TYPES>(
1227            proposal.data.epoch().is_some(),
1228            proposal.data.block_header().block_number(),
1229            validation_info.epoch_height,
1230        );
1231        UpgradeCertificate::validate(
1232            proposal.data.upgrade_certificate(),
1233            &validation_info.membership,
1234            epoch,
1235            &validation_info.upgrade_lock,
1236        )
1237        .await?;
1238    }
1239
1240    Ok(())
1241}
1242
1243/// Helper function to send events and log errors
1244pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1245    match sender.broadcast_direct(event).await {
1246        Ok(None) => (),
1247        Ok(Some(overflowed)) => {
1248            tracing::error!(
1249                "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1250            );
1251        },
1252        Err(SendError(e)) => {
1253            tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1254        },
1255    }
1256}
1257
1258/// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it
1259/// corresponds to the provided high_qc.
1260pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
1261    qc: &QuorumCertificate2<TYPES>,
1262    maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1263    consensus: &OuterConsensus<TYPES>,
1264    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1265    upgrade_lock: &UpgradeLock<TYPES, V>,
1266    epoch_height: u64,
1267) -> Result<()> {
1268    let mut epoch_membership = membership_coordinator
1269        .stake_table_for_epoch(qc.data.epoch)
1270        .await?;
1271
1272    let membership_stake_table = epoch_membership.stake_table().await;
1273    let membership_success_threshold = epoch_membership.success_threshold().await;
1274
1275    if let Err(e) = qc
1276        .is_valid_cert(
1277            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1278            membership_success_threshold,
1279            upgrade_lock,
1280        )
1281        .await
1282    {
1283        consensus.read().await.metrics.invalid_qc.update(1);
1284        return Err(warn!("Invalid certificate: {e}"));
1285    }
1286
1287    if upgrade_lock.epochs_enabled(qc.view_number()).await {
1288        ensure!(
1289            qc.data.block_number.is_some(),
1290            "QC for epoch {:?} has no block number",
1291            qc.data.epoch
1292        );
1293    }
1294
1295    if qc
1296        .data
1297        .block_number
1298        .is_some_and(|b| is_epoch_transition(b, epoch_height))
1299    {
1300        ensure!(
1301            maybe_next_epoch_qc.is_some(),
1302            error!("Received High QC for the transition block but not the next epoch QC")
1303        );
1304    }
1305
1306    if let Some(next_epoch_qc) = maybe_next_epoch_qc {
1307        // If the next epoch qc exists, make sure it's equal to the qc
1308        if qc.view_number() != next_epoch_qc.view_number() || qc.data != *next_epoch_qc.data {
1309            bail!("Next epoch qc exists but it's not equal with qc.");
1310        }
1311        epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1312        let membership_next_stake_table = epoch_membership.stake_table().await;
1313        let membership_next_success_threshold = epoch_membership.success_threshold().await;
1314
1315        // Validate the next epoch qc as well
1316        next_epoch_qc
1317            .is_valid_cert(
1318                &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1319                membership_next_success_threshold,
1320                upgrade_lock,
1321            )
1322            .await
1323            .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1324    }
1325    Ok(())
1326}
1327
1328/// Validates the light client state update certificate
1329pub async fn validate_light_client_state_update_certificate<TYPES: NodeType>(
1330    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1331    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1332) -> Result<()> {
1333    tracing::debug!("Validating light client state update certificate");
1334
1335    let epoch_membership = membership_coordinator
1336        .membership_for_epoch(state_cert.epoch())
1337        .await?;
1338
1339    let membership_stake_table = epoch_membership.stake_table().await;
1340    let membership_success_threshold = epoch_membership.success_threshold().await;
1341
1342    let mut state_key_map = HashMap::new();
1343    membership_stake_table.into_iter().for_each(|config| {
1344        state_key_map.insert(
1345            config.state_ver_key.clone(),
1346            config.stake_table_entry.stake(),
1347        );
1348    });
1349
1350    let mut accumulated_stake = U256::from(0);
1351    let signed_state_digest = derive_signed_state_digest(
1352        &state_cert.light_client_state,
1353        &state_cert.next_stake_table_state,
1354        &state_cert.auth_root,
1355    );
1356    for (key, sig, sig_v2) in state_cert.signatures.iter() {
1357        if let Some(stake) = state_key_map.get(key) {
1358            accumulated_stake += *stake;
1359            if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1360                key,
1361                sig,
1362                signed_state_digest,
1363            ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1364                key,
1365                sig_v2,
1366                &state_cert.light_client_state,
1367                &state_cert.next_stake_table_state,
1368            ) {
1369                bail!("Invalid light client state update certificate signature");
1370            }
1371        } else {
1372            bail!("Invalid light client state update certificate signature");
1373        }
1374    }
1375    if accumulated_stake < membership_success_threshold {
1376        bail!("Light client state update certificate does not meet the success threshold");
1377    }
1378
1379    Ok(())
1380}
1381
1382pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1383    qc: &QuorumCertificate2<TYPES>,
1384    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1385    epoch_height: u64,
1386) -> bool {
1387    qc.data
1388        .block_number
1389        .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1390        && Some(state_cert.epoch) == qc.data.epoch()
1391        && qc.view_number().u64() == state_cert.light_client_state.view_number
1392}
1393
1394/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state;
1395/// makes sure it corresponds to the given DA certificate;
1396/// if it's not yet available, waits for it with the given timeout.
1397pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1398    target_epoch: Option<TYPES::Epoch>,
1399    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1400    da_cert: &DaCertificate2<TYPES>,
1401    consensus: &OuterConsensus<TYPES>,
1402    receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1403    cancel_receiver: Receiver<()>,
1404    id: u64,
1405) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1406    tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1407    let maybe_second_vid_share = consensus
1408        .read()
1409        .await
1410        .vid_shares()
1411        .get(&vid_share.data.view_number())
1412        .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1413        .and_then(|epoch_map| epoch_map.get(&target_epoch))
1414        .cloned();
1415    if let Some(second_vid_share) = maybe_second_vid_share {
1416        if (target_epoch == da_cert.epoch()
1417            && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1418            || (target_epoch != da_cert.epoch()
1419                && Some(second_vid_share.data.payload_commitment())
1420                    == da_cert.data().next_epoch_payload_commit)
1421        {
1422            return Ok(second_vid_share);
1423        }
1424    }
1425
1426    let receiver = receiver.clone();
1427    let da_cert_clone = da_cert.clone();
1428    let Some(event) = EventDependency::new(
1429        receiver,
1430        cancel_receiver,
1431        format!(
1432            "VoteDependency Second VID share for view {:?}, my id {:?}",
1433            vid_share.data.view_number(),
1434            id
1435        ),
1436        Box::new(move |event| {
1437            let event = event.as_ref();
1438            if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1439                if target_epoch == da_cert_clone.epoch() {
1440                    second_vid_share.data.payload_commitment()
1441                        == da_cert_clone.data().payload_commit
1442                } else {
1443                    Some(second_vid_share.data.payload_commitment())
1444                        == da_cert_clone.data().next_epoch_payload_commit
1445                }
1446            } else {
1447                false
1448            }
1449        }),
1450    )
1451    .completed()
1452    .await
1453    else {
1454        return Err(warn!("Error while waiting for the second VID share."));
1455    };
1456    let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1457        // this shouldn't happen
1458        return Err(warn!(
1459            "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1460             possible."
1461        ));
1462    };
1463    Ok(second_vid_share.clone())
1464}
1465
1466pub async fn broadcast_view_change<TYPES: NodeType>(
1467    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1468    new_view_number: TYPES::View,
1469    epoch: Option<TYPES::Epoch>,
1470    first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
1471) {
1472    let mut broadcast_epoch = epoch;
1473    if let Some((first_epoch_view, first_epoch)) = first_epoch {
1474        if new_view_number == first_epoch_view && broadcast_epoch != Some(first_epoch) {
1475            broadcast_epoch = Some(first_epoch);
1476        }
1477    }
1478    tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1479    broadcast_event(
1480        Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1481        sender,
1482    )
1483    .await
1484}
1485
1486pub fn derive_signed_state_digest(
1487    lc_state: &LightClientState,
1488    next_stake_state: &StakeTableState,
1489    auth_root: &FixedBytes<32>,
1490) -> CircuitField {
1491    let lc_state_sol: LightClientStateSol = (*lc_state).into();
1492    let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1493
1494    let res = alloy::primitives::keccak256(
1495        (
1496            lc_state_sol.abi_encode(),
1497            stake_st_sol.abi_encode(),
1498            auth_root.abi_encode(),
1499        )
1500            .abi_encode_packed(),
1501    );
1502    CircuitField::from_be_bytes_mod_order(res.as_ref())
1503}