hotshot_task_impls/
helpers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{HashMap, HashSet},
9    sync::Arc,
10    time::Instant,
11};
12
13use alloy::{
14    primitives::{FixedBytes, U256},
15    sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24    consensus::OuterConsensus,
25    data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
26    drb::DrbResult,
27    epoch_membership::EpochMembershipCoordinator,
28    event::{Event, EventType, LeafInfo},
29    light_client::{CircuitField, LightClientState, StakeTableState},
30    message::{Proposal, UpgradeLock},
31    request_response::ProposalRequestPayload,
32    simple_certificate::{
33        DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
34        QuorumCertificate2, UpgradeCertificate,
35    },
36    simple_vote::HasEpoch,
37    stake_table::StakeTableEntries,
38    traits::{
39        block_contents::BlockHeader,
40        election::Membership,
41        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
42        signature_key::{
43            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
44        },
45        storage::Storage,
46        BlockPayload, ValidatedState,
47    },
48    utils::{
49        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_transition_block,
50        option_epoch_from_block_number, Terminator, View, ViewInner,
51    },
52    vote::{Certificate, HasViewNumber},
53};
54use hotshot_utils::anytrace::*;
55use time::OffsetDateTime;
56use tokio::time::timeout;
57use tracing::instrument;
58use vbs::version::StaticVersionType;
59
60use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
61
62/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
63#[instrument(skip_all)]
64#[allow(clippy::too_many_arguments)]
65pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
66    qc: &QuorumCertificate2<TYPES>,
67    event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
68    event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
69    membership_coordinator: EpochMembershipCoordinator<TYPES>,
70    consensus: OuterConsensus<TYPES>,
71    sender_public_key: TYPES::SignatureKey,
72    sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
73    upgrade_lock: &UpgradeLock<TYPES, V>,
74    epoch_height: u64,
75) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
76    let view_number = qc.view_number();
77    let leaf_commit = qc.data.leaf_commit;
78    // We need to be able to sign this request before submitting it to the network. Compute the
79    // payload first.
80    let signed_proposal_request = ProposalRequestPayload {
81        view_number,
82        key: sender_public_key,
83    };
84
85    // Finally, compute the signature for the payload.
86    let signature = TYPES::SignatureKey::sign(
87        &sender_private_key,
88        signed_proposal_request.commit().as_ref(),
89    )
90    .wrap()
91    .context(error!("Failed to sign proposal. This should never happen."))?;
92
93    tracing::info!("Sending proposal request for view {view_number}");
94
95    // First, broadcast that we need a proposal to the current leader
96    broadcast_event(
97        HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
98        &event_sender,
99    )
100    .await;
101
102    let mut rx = event_receiver.clone();
103    // Make a background task to await the arrival of the event data.
104    let Ok(Some(proposal)) =
105        // We want to explicitly timeout here so we aren't waiting around for the data.
106        timeout(REQUEST_TIMEOUT, async move {
107            // We want to iterate until the proposal is not None, or until we reach the timeout.
108            while let Ok(event) = rx.recv_direct().await {
109                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
110                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
111                    if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
112                        return Some(quorum_proposal.clone());
113                    }
114                }
115            }
116            None
117        })
118        .await
119    else {
120        bail!("Request for proposal failed");
121    };
122
123    let view_number = proposal.data.view_number();
124    let justify_qc = proposal.data.justify_qc().clone();
125
126    let justify_qc_epoch = justify_qc.data.epoch();
127
128    let epoch_membership = membership_coordinator
129        .stake_table_for_epoch(justify_qc_epoch)
130        .await?;
131    let membership_stake_table = epoch_membership.stake_table().await;
132    let membership_success_threshold = epoch_membership.success_threshold().await;
133
134    justify_qc
135        .is_valid_cert(
136            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
137            membership_success_threshold,
138            upgrade_lock,
139        )
140        .await
141        .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
142
143    let mut consensus_writer = consensus.write().await;
144    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
145    let state = Arc::new(
146        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
147    );
148
149    if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
150        tracing::trace!("{e:?}");
151    }
152    let view = View {
153        view_inner: ViewInner::Leaf {
154            leaf: leaf.commit(),
155            state,
156            delta: None,
157            epoch: leaf.epoch(epoch_height),
158        },
159    };
160    Ok((leaf, view))
161}
162pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
163    membership: &Arc<RwLock<TYPES::Membership>>,
164    epoch: TYPES::Epoch,
165    storage: &I::Storage,
166    drb_result: DrbResult,
167) {
168    tracing::debug!("Calling store_drb_result for epoch {epoch}");
169    if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
170        tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
171    }
172
173    membership.write().await.add_drb_result(epoch, drb_result)
174}
175
176/// Verify the DRB result from the proposal for the next epoch if this is the last block of the
177/// current epoch.
178///
179/// Uses the result from `start_drb_task`.
180///
181/// Returns an error if we should not vote.
182pub(crate) async fn verify_drb_result<
183    TYPES: NodeType,
184    I: NodeImplementation<TYPES>,
185    V: Versions,
186>(
187    proposal: &QuorumProposalWrapper<TYPES>,
188    validation_info: &ValidationInfo<TYPES, I, V>,
189) -> Result<()> {
190    // Skip if this is not the expected block.
191    if validation_info.epoch_height == 0
192        || !is_epoch_transition(
193            proposal.block_header().block_number(),
194            validation_info.epoch_height,
195        )
196    {
197        tracing::debug!("Skipping DRB result verification");
198        return Ok(());
199    }
200
201    // #3967 REVIEW NOTE: Check if this is the right way to decide if we're doing epochs
202    // Alternatively, should we just return Err() if epochs aren't happening here? Or can we assume
203    // that epochs are definitely happening by virtue of getting here?
204    let epoch = option_epoch_from_block_number::<TYPES>(
205        validation_info
206            .upgrade_lock
207            .epochs_enabled(proposal.view_number())
208            .await,
209        proposal.block_header().block_number(),
210        validation_info.epoch_height,
211    );
212
213    let proposal_result = proposal
214        .next_drb_result()
215        .context(info!("Proposal is missing the next epoch's DRB result."))?;
216
217    if let Some(epoch_val) = epoch {
218        let current_epoch_membership = validation_info
219            .membership
220            .coordinator
221            .stake_table_for_epoch(epoch)
222            .await
223            .context(warn!("No stake table for epoch {}", epoch_val))?;
224
225        let has_stake_current_epoch = current_epoch_membership
226            .has_stake(&validation_info.public_key)
227            .await;
228
229        if has_stake_current_epoch {
230            let computed_result = current_epoch_membership
231                .next_epoch()
232                .await
233                .context(warn!("No stake table for epoch {}", epoch_val + 1))?
234                .get_epoch_drb()
235                .await
236                .clone()
237                .context(warn!("DRB result not found"))?;
238
239            ensure!(
240                proposal_result == computed_result,
241                warn!(
242                    "Our calculated DRB result is {computed_result:?}, which does not match the \
243                     proposed DRB result of {proposal_result:?}"
244                )
245            );
246        }
247
248        Ok(())
249    } else {
250        Err(error!("Epochs are not available"))
251    }
252}
253
254/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
255async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
256    decided_leaf: &Leaf2<TYPES>,
257    epoch_height: u64,
258    membership: &EpochMembershipCoordinator<TYPES>,
259    storage: &I::Storage,
260    consensus: &OuterConsensus<TYPES>,
261) {
262    let decided_leaf = decided_leaf.clone();
263    let decided_block_number = decided_leaf.block_header().block_number();
264
265    // Skip if this is not the expected block.
266    if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
267        let next_epoch_number =
268            TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
269
270        let start = Instant::now();
271        if let Err(e) = storage
272            .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
273            .await
274        {
275            tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
276        }
277        tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
278
279        let membership = membership.clone();
280        let decided_block_header = decided_leaf.block_header().clone();
281        let storage = storage.clone();
282        let consensus = consensus.clone();
283
284        let consensus_reader = consensus.read().await;
285
286        drop(consensus_reader);
287
288        tokio::spawn(async move {
289            let membership_clone = membership.clone();
290
291            // First add the epoch root to `membership`
292            {
293                let start = Instant::now();
294                if let Err(e) = Membership::add_epoch_root(
295                    Arc::clone(membership_clone.membership()),
296                    next_epoch_number,
297                    decided_block_header,
298                )
299                .await
300                {
301                    tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
302                }
303                tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
304            }
305
306            let membership_clone = membership.clone();
307
308            let drb_result = membership_clone
309                .compute_drb_result(next_epoch_number, decided_leaf.clone())
310                .await;
311
312            let drb_result = match drb_result {
313                Ok(result) => result,
314                Err(e) => {
315                    tracing::error!("Failed to compute DRB result from decide: {e}");
316                    return;
317                },
318            };
319
320            let start = Instant::now();
321            handle_drb_result::<TYPES, I>(
322                membership.membership(),
323                next_epoch_number,
324                &storage,
325                drb_result,
326            )
327            .await;
328            tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
329        });
330    }
331}
332
333/// Helper type to give names and to the output values of the leaf chain traversal operation.
334#[derive(Debug)]
335pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
336    /// The new locked view obtained from a 2 chain starting from the proposal's parent.
337    pub new_locked_view_number: Option<TYPES::View>,
338
339    /// The new decided view obtained from a 3 chain starting from the proposal's parent.
340    pub new_decided_view_number: Option<TYPES::View>,
341
342    /// The QC signing the new leaf, causing it to become committed.
343    pub committing_qc: Option<QuorumCertificate2<TYPES>>,
344
345    /// A second QC extending the committing QC, causing the new leaf chain to become decided.
346    ///
347    /// This is only applicable in HotStuff2, and will be [`None`] prior to HotShot version 0.3.
348    /// HotStuff1 (HotShot < 0.3) uses a different commit rule, which is not captured in this type.
349    pub deciding_qc: Option<QuorumCertificate2<TYPES>>,
350
351    /// The decided leaves with corresponding validated state and VID info.
352    pub leaf_views: Vec<LeafInfo<TYPES>>,
353
354    /// The transactions in the block payload for each leaf.
355    pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
356
357    /// The most recent upgrade certificate from one of the leaves.
358    pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
359}
360
361/// We need Default to be implemented because the leaf ascension has very few failure branches,
362/// and when they *do* happen, we still return intermediate states. Default makes the burden
363/// of filling values easier.
364impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
365    /// The default method for this type is to set all of the returned values to `None`.
366    fn default() -> Self {
367        Self {
368            new_locked_view_number: None,
369            new_decided_view_number: None,
370            committing_qc: None,
371            deciding_qc: None,
372            leaf_views: Vec::new(),
373            included_txns: None,
374            decided_upgrade_cert: None,
375        }
376    }
377}
378
379async fn update_metrics<TYPES: NodeType>(
380    consensus: &OuterConsensus<TYPES>,
381    leaf_views: &[LeafInfo<TYPES>],
382) {
383    let consensus_reader = consensus.read().await;
384    let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
385
386    for leaf_view in leaf_views {
387        let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
388
389        let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
390            tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
391            continue;
392        };
393        consensus_reader
394            .metrics
395            .proposal_to_decide_time
396            .add_point(proposal_to_decide_time as f64);
397        if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
398            consensus_reader
399                .metrics
400                .finalized_bytes
401                .add_point(txn_bytes as f64);
402        }
403    }
404}
405
406/// calculate the new decided leaf chain based on the rules of HotStuff 2
407///
408/// # Panics
409/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
410/// impossible.
411#[allow(clippy::too_many_arguments)]
412pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
413    proposal: &QuorumProposalWrapper<TYPES>,
414    consensus: OuterConsensus<TYPES>,
415    existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
416    public_key: &TYPES::SignatureKey,
417    with_epochs: bool,
418    membership: &EpochMembershipCoordinator<TYPES>,
419    storage: &I::Storage,
420) -> LeafChainTraversalOutcome<TYPES> {
421    let mut res = LeafChainTraversalOutcome::default();
422    let consensus_reader = consensus.read().await;
423    let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
424    res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
425
426    // If we don't have the proposals parent return early
427    let Some(parent_info) = consensus_reader
428        .parent_leaf_info(&proposed_leaf, public_key)
429        .await
430    else {
431        return res;
432    };
433    // Get the parents parent and check if it's consecutive in view to the parent, if so we can decided
434    // the grandparents view.  If not we're done.
435    let Some(grand_parent_info) = consensus_reader
436        .parent_leaf_info(&parent_info.leaf, public_key)
437        .await
438    else {
439        return res;
440    };
441    if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
442        return res;
443    }
444    res.committing_qc = Some(parent_info.leaf.justify_qc().clone());
445    res.deciding_qc = Some(proposed_leaf.justify_qc().clone());
446    let decided_view_number = grand_parent_info.leaf.view_number();
447    res.new_decided_view_number = Some(decided_view_number);
448    // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it.
449    let old_anchor_view = consensus_reader.last_decided_view();
450    let mut current_leaf_info = Some(grand_parent_info);
451    let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
452    let mut txns = HashSet::new();
453    while current_leaf_info
454        .as_ref()
455        .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
456    {
457        // unwrap is safe, we just checked that he option is some
458        let info = &mut current_leaf_info.unwrap();
459        // Check if there's a new upgrade certificate available.
460        if let Some(cert) = info.leaf.upgrade_certificate() {
461            if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
462                if cert.data.decide_by < decided_view_number {
463                    tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
464                } else {
465                    tracing::info!("Reached decide on upgrade certificate: {cert:?}");
466                    res.decided_upgrade_cert = Some(cert.clone());
467                }
468            }
469        }
470
471        // If the block payload is available for this leaf, include it in
472        // the leaf chain that we send to the client.
473        if let Some(payload) = consensus_reader
474            .saved_payloads()
475            .get(&info.leaf.view_number())
476        {
477            info.leaf
478                .fill_block_payload_unchecked(payload.as_ref().payload.clone());
479        }
480
481        if let Some(ref payload) = info.leaf.block_payload() {
482            for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
483                txns.insert(txn);
484            }
485        }
486
487        current_leaf_info = consensus_reader
488            .parent_leaf_info(&info.leaf, public_key)
489            .await;
490        res.leaf_views.push(info.clone());
491    }
492
493    if !txns.is_empty() {
494        res.included_txns = Some(txns);
495    }
496
497    if with_epochs && res.new_decided_view_number.is_some() {
498        let Some(first_leaf) = res.leaf_views.first() else {
499            return res;
500        };
501        let epoch_height = consensus_reader.epoch_height;
502        consensus_reader
503            .metrics
504            .last_synced_block_height
505            .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
506        drop(consensus_reader);
507
508        for decided_leaf_info in &res.leaf_views {
509            decide_epoch_root::<TYPES, I>(
510                &decided_leaf_info.leaf,
511                epoch_height,
512                membership,
513                storage,
514                &consensus,
515            )
516            .await;
517        }
518        update_metrics(&consensus, &res.leaf_views).await;
519    }
520
521    res
522}
523
524/// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin
525/// by obtaining the parent view, and if we are in a chain (i.e. the next view from the parent is
526/// one view newer), then we begin attempting to form the chain. This is a direct impl from
527/// [HotStuff](https://arxiv.org/pdf/1803.05069) section 5:
528///
529/// > When a node b* carries a QC that refers to a direct parent, i.e., b*.justify.node = b*.parent,
530/// > we say that it forms a One-Chain. Denote by b'' = b*.justify.node. Node b* forms a Two-Chain,
531/// > if in addition to forming a One-Chain, b''.justify.node = b''.parent.
532/// > It forms a Three-Chain, if b'' forms a Two-Chain.
533///
534/// We follow this exact logic to determine if we are able to reach a commit and a decide. A commit
535/// is reached when we have a two chain, and a decide is reached when we have a three chain.
536///
537/// # Example
538/// Suppose we have a decide for view 1, and we then move on to get undecided views 2, 3, and 4. Further,
539/// suppose that our *next* proposal is for view 5, but this leader did not see info for view 4, so the
540/// justify qc of the proposal points to view 3. This is fine, and the undecided chain now becomes
541/// 2-3-5.
542///
543/// Assuming we continue with honest leaders, we then eventually could get a chain like: 2-3-5-6-7-8. This
544/// will prompt a decide event to occur (this code), where the `proposal` is for view 8. Now, since the
545/// lowest value in the 3-chain here would be 5 (excluding 8 since we only walk the parents), we begin at
546/// the first link in the chain, and walk back through all undecided views, making our new anchor view 5,
547/// and out new locked view will be 6.
548///
549/// Upon receipt then of a proposal for view 9, assuming it is valid, this entire process will repeat, and
550/// the anchor view will be set to view 6, with the locked view as view 7.
551///
552/// # Panics
553/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
554/// impossible.
555#[allow(clippy::too_many_arguments)]
556pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
557    proposal: &QuorumProposalWrapper<TYPES>,
558    consensus: OuterConsensus<TYPES>,
559    existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
560    public_key: &TYPES::SignatureKey,
561    with_epochs: bool,
562    membership: &EpochMembershipCoordinator<TYPES>,
563    storage: &I::Storage,
564    epoch_height: u64,
565) -> LeafChainTraversalOutcome<TYPES> {
566    let consensus_reader = consensus.read().await;
567    let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
568    let view_number = proposal.view_number();
569    let parent_view_number = proposal.justify_qc().view_number();
570    let old_anchor_view = consensus_reader.last_decided_view();
571
572    let mut last_view_number_visited = view_number;
573    let mut current_chain_length = 0usize;
574    let mut res = LeafChainTraversalOutcome::default();
575
576    if let Err(e) = consensus_reader.visit_leaf_ancestors(
577        parent_view_number,
578        Terminator::Exclusive(old_anchor_view),
579        true,
580        |leaf, state, delta| {
581            // This is the core paper logic. We're implementing the chain in chained hotstuff.
582            if res.new_decided_view_number.is_none() {
583                // If the last view number is the child of the leaf we've moved to...
584                if last_view_number_visited == leaf.view_number() + 1 {
585                    last_view_number_visited = leaf.view_number();
586
587                    // The chain grows by one
588                    current_chain_length += 1;
589
590                    // We emit a locked view when the chain length is 2
591                    if current_chain_length == 2 {
592                        res.new_locked_view_number = Some(leaf.view_number());
593                        // The next leaf in the chain, if there is one, is decided, so this
594                        // leaf's justify_qc would become the QC for the decided chain.
595                        res.committing_qc = Some(leaf.justify_qc().clone());
596                    } else if current_chain_length == 3 {
597                        // And we decide when the chain length is 3.
598                        res.new_decided_view_number = Some(leaf.view_number());
599                    }
600                } else {
601                    // There isn't a new chain extension available, so we signal to the callback
602                    // owner that we can exit for now.
603                    return false;
604                }
605            }
606
607            // Now, if we *have* reached a decide, we need to do some state updates.
608            if let Some(new_decided_view) = res.new_decided_view_number {
609                // First, get a mutable reference to the provided leaf.
610                let mut leaf = leaf.clone();
611
612                // Update the metrics
613                if leaf.view_number() == new_decided_view {
614                    consensus_reader
615                        .metrics
616                        .last_synced_block_height
617                        .set(usize::try_from(leaf.height()).unwrap_or(0));
618                }
619
620                // Check if there's a new upgrade certificate available.
621                if let Some(cert) = leaf.upgrade_certificate() {
622                    if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
623                        if cert.data.decide_by < view_number {
624                            tracing::warn!(
625                                "Failed to decide an upgrade certificate in time. Ignoring."
626                            );
627                        } else {
628                            tracing::info!("Reached decide on upgrade certificate: {cert:?}");
629                            res.decided_upgrade_cert = Some(cert.clone());
630                        }
631                    }
632                }
633                // If the block payload is available for this leaf, include it in
634                // the leaf chain that we send to the client.
635                if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
636                    leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
637                }
638
639                // Get the VID share at the leaf's view number, corresponding to our key
640                // (if one exists)
641                let vid_share = consensus_reader
642                    .vid_shares()
643                    .get(&leaf.view_number())
644                    .and_then(|key_map| key_map.get(public_key))
645                    .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
646                    .map(|prop| prop.data.clone());
647
648                let state_cert = if leaf.with_epoch
649                    && is_epoch_root(
650                        leaf.block_header().block_number(),
651                        consensus_reader.epoch_height,
652                    ) {
653                    match consensus_reader.state_cert() {
654                        // Sanity check that the state cert is for the same view as the decided leaf
655                        Some(state_cert)
656                            if state_cert.light_client_state.view_number
657                                == leaf.view_number().u64() =>
658                        {
659                            Some(state_cert.clone())
660                        },
661                        _ => None,
662                    }
663                } else {
664                    None
665                };
666
667                // Add our data into a new `LeafInfo`
668                res.leaf_views.push(LeafInfo::new(
669                    leaf.clone(),
670                    Arc::clone(&state),
671                    delta.clone(),
672                    vid_share,
673                    state_cert,
674                ));
675                if let Some(ref payload) = leaf.block_payload() {
676                    res.included_txns = Some(
677                        payload
678                            .transaction_commitments(leaf.block_header().metadata())
679                            .into_iter()
680                            .collect::<HashSet<_>>(),
681                    );
682                }
683            }
684            true
685        },
686    ) {
687        tracing::debug!("Leaf ascension failed; error={e}");
688    }
689
690    let epoch_height = consensus_reader.epoch_height;
691    drop(consensus_reader);
692
693    if with_epochs && res.new_decided_view_number.is_some() {
694        for decided_leaf_info in &res.leaf_views {
695            decide_epoch_root::<TYPES, I>(
696                &decided_leaf_info.leaf,
697                epoch_height,
698                membership,
699                storage,
700                &consensus,
701            )
702            .await;
703        }
704    }
705
706    res
707}
708
709/// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not.
710#[instrument(skip_all)]
711#[allow(clippy::too_many_arguments)]
712pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
713    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
714    event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
715    membership: EpochMembershipCoordinator<TYPES>,
716    public_key: TYPES::SignatureKey,
717    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
718    consensus: OuterConsensus<TYPES>,
719    upgrade_lock: &UpgradeLock<TYPES, V>,
720    parent_qc: &QuorumCertificate2<TYPES>,
721    epoch_height: u64,
722) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
723    let consensus_reader = consensus.read().await;
724    let vsm_contains_parent_view = consensus_reader
725        .validated_state_map()
726        .contains_key(&parent_qc.view_number());
727    drop(consensus_reader);
728
729    if !vsm_contains_parent_view {
730        let _ = fetch_proposal(
731            parent_qc,
732            event_sender.clone(),
733            event_receiver.clone(),
734            membership,
735            consensus.clone(),
736            public_key.clone(),
737            private_key.clone(),
738            upgrade_lock,
739            epoch_height,
740        )
741        .await
742        .context(info!("Failed to fetch proposal"))?;
743    }
744
745    let consensus_reader = consensus.read().await;
746    let parent_view = consensus_reader
747        .validated_state_map()
748        .get(&parent_qc.view_number())
749        .context(debug!(
750            "Couldn't find parent view in state map, waiting for replica to see proposal; \
751             parent_view_number: {}",
752            *parent_qc.view_number()
753        ))?;
754
755    let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
756        "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
757         parent_view {:?}",
758        *parent_qc.view_number(),
759        parent_view
760    ))?;
761
762    if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
763        // NOTE: This happens on the genesis block
764        tracing::debug!(
765            "They don't equal: {:?}   {:?}",
766            leaf_commitment,
767            consensus_reader.high_qc().data().leaf_commit
768        );
769    }
770
771    let leaf = consensus_reader
772        .saved_leaves()
773        .get(&leaf_commitment)
774        .context(info!("Failed to find high QC of parent"))?;
775
776    Ok((leaf.clone(), Arc::clone(state)))
777}
778
779pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
780    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
781    validation_info: &ValidationInfo<TYPES, I, V>,
782) -> Result<()> {
783    let in_transition_epoch = proposal
784        .data
785        .justify_qc()
786        .data
787        .block_number
788        .is_some_and(|bn| {
789            !is_transition_block(bn, validation_info.epoch_height)
790                && is_epoch_transition(bn, validation_info.epoch_height)
791                && bn % validation_info.epoch_height != 0
792        });
793    let justify_qc = proposal.data.justify_qc();
794    let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
795    if !in_transition_epoch {
796        tracing::debug!(
797            "Storing high QC for view {:?} and height {:?}",
798            justify_qc.view_number(),
799            justify_qc.data.block_number
800        );
801        if let Err(e) = validation_info
802            .storage
803            .update_high_qc2(justify_qc.clone())
804            .await
805        {
806            bail!("Failed to store High QC, not voting; error = {e:?}");
807        }
808        if justify_qc
809            .data
810            .block_number
811            .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
812        {
813            let Some(state_cert) = proposal.data.state_cert() else {
814                bail!("Epoch root QC has no state cert, not voting!");
815            };
816            if let Err(e) = validation_info
817                .storage
818                .update_state_cert(state_cert.clone())
819                .await
820            {
821                bail!(
822                    "Failed to store the light client state update certificate, not voting; error \
823                     = {:?}",
824                    e
825                );
826            }
827            validation_info
828                .consensus
829                .write()
830                .await
831                .update_state_cert(state_cert.clone())?;
832        }
833        if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
834            if let Err(e) = validation_info
835                .storage
836                .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
837                .await
838            {
839                bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
840            }
841        }
842    }
843    let mut consensus_writer = validation_info.consensus.write().await;
844    if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
845        if justify_qc
846            .data
847            .block_number
848            .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
849        {
850            consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
851            consensus_writer
852                .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
853            return Ok(());
854        }
855        consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
856    }
857    consensus_writer.update_high_qc(justify_qc.clone())?;
858
859    Ok(())
860}
861
862async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
863    validation_info: &ValidationInfo<TYPES, I, V>,
864) -> Option<(
865    QuorumCertificate2<TYPES>,
866    NextEpochQuorumCertificate2<TYPES>,
867)> {
868    validation_info
869        .consensus
870        .read()
871        .await
872        .transition_qc()
873        .cloned()
874}
875
876pub(crate) async fn validate_epoch_transition_qc<
877    TYPES: NodeType,
878    I: NodeImplementation<TYPES>,
879    V: Versions,
880>(
881    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
882    validation_info: &ValidationInfo<TYPES, I, V>,
883) -> Result<()> {
884    let proposed_qc = proposal.data.justify_qc();
885    let Some(qc_block_number) = proposed_qc.data().block_number else {
886        bail!("Justify QC has no block number");
887    };
888    if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
889        || qc_block_number % validation_info.epoch_height == 0
890    {
891        return Ok(());
892    }
893    let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
894        bail!("Next epoch justify QC is not present");
895    };
896    ensure!(
897        next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
898        "Next epoch QC has different leaf commit to justify QC"
899    );
900
901    if is_transition_block(qc_block_number, validation_info.epoch_height) {
902        // Height is epoch height - 2
903        ensure!(
904            transition_qc(validation_info)
905                .await
906                .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
907            "Proposed transition qc must have view number greater than or equal to previous \
908             transition QC"
909        );
910
911        validation_info
912            .consensus
913            .write()
914            .await
915            .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
916        // reset the high qc to the transition qc
917        update_high_qc(proposal, validation_info).await?;
918    } else {
919        // Height is either epoch height - 1 or epoch height
920        ensure!(
921            transition_qc(validation_info)
922                .await
923                .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
924            "Transition block must have view number greater than previous transition QC"
925        );
926        ensure!(
927            proposal.data.view_change_evidence().is_none(),
928            "Second to last block and last block of epoch must directly extend previous block, Qc \
929             Block number: {qc_block_number}, Proposal Block number: {}",
930            proposal.data.block_header().block_number()
931        );
932        ensure!(
933            proposed_qc.view_number() + 1 == proposal.data.view_number()
934                || transition_qc(validation_info)
935                    .await
936                    .is_some_and(|(qc, _)| &qc == proposed_qc),
937            "Transition proposals must extend the previous view directly, or extend the previous \
938             transition block"
939        );
940    }
941    Ok(())
942}
943
944/// Validate the state and safety and liveness of a proposal then emit
945/// a `QuorumProposalValidated` event.
946///
947///
948/// # Errors
949/// If any validation or state update fails.
950#[allow(clippy::too_many_lines)]
951#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
952pub(crate) async fn validate_proposal_safety_and_liveness<
953    TYPES: NodeType,
954    I: NodeImplementation<TYPES>,
955    V: Versions,
956>(
957    proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
958    parent_leaf: Leaf2<TYPES>,
959    validation_info: &ValidationInfo<TYPES, I, V>,
960    event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
961    sender: TYPES::SignatureKey,
962) -> Result<()> {
963    let view_number = proposal.data.view_number();
964
965    let mut valid_epoch_transition = false;
966    if validation_info
967        .upgrade_lock
968        .version(proposal.data.justify_qc().view_number())
969        .await
970        .is_ok_and(|v| v >= V::Epochs::VERSION)
971    {
972        let Some(block_number) = proposal.data.justify_qc().data.block_number else {
973            bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
974        };
975        if is_epoch_transition(block_number, validation_info.epoch_height) {
976            validate_epoch_transition_qc(&proposal, validation_info).await?;
977            valid_epoch_transition = true;
978        }
979    }
980
981    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
982    ensure!(
983        proposed_leaf.parent_commitment() == parent_leaf.commit(),
984        "Proposed leaf does not extend the parent leaf."
985    );
986    let proposal_epoch = option_epoch_from_block_number::<TYPES>(
987        validation_info
988            .upgrade_lock
989            .epochs_enabled(view_number)
990            .await,
991        proposed_leaf.height(),
992        validation_info.epoch_height,
993    );
994
995    let state = Arc::new(
996        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
997    );
998
999    {
1000        let mut consensus_writer = validation_info.consensus.write().await;
1001        if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
1002            tracing::trace!("{e:?}");
1003        }
1004
1005        // Update our internal storage of the proposal. The proposal is valid, so
1006        // we swallow this error and just log if it occurs.
1007        if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
1008            tracing::debug!("Internal proposal update failed; error = {e:#}");
1009        };
1010    }
1011
1012    UpgradeCertificate::validate(
1013        proposal.data.upgrade_certificate(),
1014        &validation_info.membership,
1015        proposal_epoch,
1016        &validation_info.upgrade_lock,
1017    )
1018    .await?;
1019
1020    // Validate that the upgrade certificate is re-attached, if we saw one on the parent
1021    proposed_leaf
1022        .extends_upgrade(
1023            &parent_leaf,
1024            &validation_info.upgrade_lock.decided_upgrade_certificate,
1025        )
1026        .await?;
1027
1028    let justify_qc = proposal.data.justify_qc().clone();
1029    // Create a positive vote if either liveness or safety check
1030    // passes.
1031
1032    {
1033        let consensus_reader = validation_info.consensus.read().await;
1034        // Epoch safety check:
1035        // The proposal is safe if
1036        // 1. the proposed block and the justify QC block belong to the same epoch or
1037        // 2. the justify QC is the eQC for the previous block
1038        let justify_qc_epoch = option_epoch_from_block_number::<TYPES>(
1039            validation_info
1040                .upgrade_lock
1041                .epochs_enabled(view_number)
1042                .await,
1043            parent_leaf.height(),
1044            validation_info.epoch_height,
1045        );
1046        ensure!(
1047            proposal_epoch == justify_qc_epoch
1048                || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1049            {
1050                error!(
1051                    "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1052                     QC leaf is {parent_leaf:?}"
1053                )
1054            }
1055        );
1056
1057        // Make sure that the epoch transition proposal includes the next epoch QC
1058        if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1059            && validation_info
1060                .upgrade_lock
1061                .epochs_enabled(view_number)
1062                .await
1063        {
1064            ensure!(
1065                proposal.data.next_epoch_justify_qc().is_some(),
1066                "Epoch transition proposal does not include the next epoch justify QC. Do not \
1067                 vote!"
1068            );
1069        }
1070
1071        // Liveness check.
1072        let liveness_check =
1073            justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1074
1075        // Safety check.
1076        // Check if proposal extends from the locked leaf.
1077        let outcome = consensus_reader.visit_leaf_ancestors(
1078            justify_qc.view_number(),
1079            Terminator::Inclusive(consensus_reader.locked_view()),
1080            false,
1081            |leaf, _, _| {
1082                // if leaf view no == locked view no then we're done, report success by
1083                // returning true
1084                leaf.view_number() != consensus_reader.locked_view()
1085            },
1086        );
1087        let safety_check = outcome.is_ok();
1088
1089        ensure!(safety_check || liveness_check, {
1090            if let Err(e) = outcome {
1091                broadcast_event(
1092                    Event {
1093                        view_number,
1094                        event: EventType::Error { error: Arc::new(e) },
1095                    },
1096                    &validation_info.output_event_stream,
1097                )
1098                .await;
1099            }
1100
1101            error!(
1102                "Failed safety and liveness check \n High QC is {:?}  Proposal QC is {:?}  Locked \
1103                 view is {:?}",
1104                consensus_reader.high_qc(),
1105                proposal.data,
1106                consensus_reader.locked_view()
1107            )
1108        });
1109    }
1110
1111    // We accept the proposal, notify the application layer
1112    broadcast_event(
1113        Event {
1114            view_number,
1115            event: EventType::QuorumProposal {
1116                proposal: proposal.clone(),
1117                sender,
1118            },
1119        },
1120        &validation_info.output_event_stream,
1121    )
1122    .await;
1123
1124    // Notify other tasks
1125    broadcast_event(
1126        Arc::new(HotShotEvent::QuorumProposalValidated(
1127            proposal.clone(),
1128            parent_leaf,
1129        )),
1130        &event_stream,
1131    )
1132    .await;
1133
1134    Ok(())
1135}
1136
1137/// Validates, from a given `proposal` that the view that it is being submitted for is valid when
1138/// compared to `cur_view` which is the highest proposed view (so far) for the caller. If the proposal
1139/// is for a view that's later than expected, that the proposal includes a timeout or view sync certificate.
1140///
1141/// # Errors
1142/// If any validation or view number check fails.
1143pub(crate) async fn validate_proposal_view_and_certs<
1144    TYPES: NodeType,
1145    I: NodeImplementation<TYPES>,
1146    V: Versions,
1147>(
1148    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1149    validation_info: &ValidationInfo<TYPES, I, V>,
1150) -> Result<()> {
1151    let view_number = proposal.data.view_number();
1152    ensure!(
1153        view_number >= validation_info.consensus.read().await.cur_view(),
1154        "Proposal is from an older view {:?}",
1155        proposal.data
1156    );
1157
1158    // Validate the proposal's signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment
1159    let mut membership = validation_info.membership.clone();
1160    proposal.validate_signature(&membership).await?;
1161
1162    // Verify a timeout certificate OR a view sync certificate exists and is valid.
1163    if proposal.data.justify_qc().view_number() != view_number - 1 {
1164        let received_proposal_cert =
1165            proposal
1166                .data
1167                .view_change_evidence()
1168                .clone()
1169                .context(debug!(
1170                    "Quorum proposal for view {view_number} needed a timeout or view sync \
1171                     certificate, but did not have one",
1172                ))?;
1173
1174        match received_proposal_cert {
1175            ViewChangeEvidence2::Timeout(timeout_cert) => {
1176                ensure!(
1177                    timeout_cert.data().view == view_number - 1,
1178                    "Timeout certificate for view {view_number} was not for the immediately \
1179                     preceding view"
1180                );
1181                let timeout_cert_epoch = timeout_cert.data().epoch();
1182                membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1183
1184                let membership_stake_table = membership.stake_table().await;
1185                let membership_success_threshold = membership.success_threshold().await;
1186
1187                timeout_cert
1188                    .is_valid_cert(
1189                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1190                        membership_success_threshold,
1191                        &validation_info.upgrade_lock,
1192                    )
1193                    .await
1194                    .context(|e| {
1195                        warn!("Timeout certificate for view {view_number} was invalid: {e}")
1196                    })?;
1197            },
1198            ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1199                ensure!(
1200                    view_sync_cert.view_number == view_number,
1201                    "View sync cert view number {:?} does not match proposal view number {:?}",
1202                    view_sync_cert.view_number,
1203                    view_number
1204                );
1205
1206                let view_sync_cert_epoch = view_sync_cert.data().epoch();
1207                membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1208
1209                let membership_stake_table = membership.stake_table().await;
1210                let membership_success_threshold = membership.success_threshold().await;
1211
1212                // View sync certs must also be valid.
1213                view_sync_cert
1214                    .is_valid_cert(
1215                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1216                        membership_success_threshold,
1217                        &validation_info.upgrade_lock,
1218                    )
1219                    .await
1220                    .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1221            },
1222        }
1223    }
1224
1225    // Validate the upgrade certificate -- this is just a signature validation.
1226    // Note that we don't do anything with the certificate directly if this passes; it eventually gets stored as part of the leaf if nothing goes wrong.
1227    {
1228        let epoch = option_epoch_from_block_number::<TYPES>(
1229            proposal.data.epoch().is_some(),
1230            proposal.data.block_header().block_number(),
1231            validation_info.epoch_height,
1232        );
1233        UpgradeCertificate::validate(
1234            proposal.data.upgrade_certificate(),
1235            &validation_info.membership,
1236            epoch,
1237            &validation_info.upgrade_lock,
1238        )
1239        .await?;
1240    }
1241
1242    Ok(())
1243}
1244
1245/// Helper function to send events and log errors
1246pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1247    match sender.broadcast_direct(event).await {
1248        Ok(None) => (),
1249        Ok(Some(overflowed)) => {
1250            tracing::error!(
1251                "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1252            );
1253        },
1254        Err(SendError(e)) => {
1255            tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1256        },
1257    }
1258}
1259
1260/// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it
1261/// corresponds to the provided high_qc.
1262pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
1263    qc: &QuorumCertificate2<TYPES>,
1264    maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1265    consensus: &OuterConsensus<TYPES>,
1266    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1267    upgrade_lock: &UpgradeLock<TYPES, V>,
1268    epoch_height: u64,
1269) -> Result<()> {
1270    let mut epoch_membership = membership_coordinator
1271        .stake_table_for_epoch(qc.data.epoch)
1272        .await?;
1273
1274    let membership_stake_table = epoch_membership.stake_table().await;
1275    let membership_success_threshold = epoch_membership.success_threshold().await;
1276
1277    if let Err(e) = qc
1278        .is_valid_cert(
1279            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1280            membership_success_threshold,
1281            upgrade_lock,
1282        )
1283        .await
1284    {
1285        consensus.read().await.metrics.invalid_qc.update(1);
1286        return Err(warn!("Invalid certificate: {e}"));
1287    }
1288
1289    if upgrade_lock.epochs_enabled(qc.view_number()).await {
1290        ensure!(
1291            qc.data.block_number.is_some(),
1292            "QC for epoch {:?} has no block number",
1293            qc.data.epoch
1294        );
1295    }
1296
1297    if qc
1298        .data
1299        .block_number
1300        .is_some_and(|b| is_epoch_transition(b, epoch_height))
1301    {
1302        ensure!(
1303            maybe_next_epoch_qc.is_some(),
1304            error!("Received High QC for the transition block but not the next epoch QC")
1305        );
1306    }
1307
1308    if let Some(next_epoch_qc) = maybe_next_epoch_qc {
1309        // If the next epoch qc exists, make sure it's equal to the qc
1310        if qc.view_number() != next_epoch_qc.view_number() || qc.data != *next_epoch_qc.data {
1311            bail!("Next epoch qc exists but it's not equal with qc.");
1312        }
1313        epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1314        let membership_next_stake_table = epoch_membership.stake_table().await;
1315        let membership_next_success_threshold = epoch_membership.success_threshold().await;
1316
1317        // Validate the next epoch qc as well
1318        next_epoch_qc
1319            .is_valid_cert(
1320                &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1321                membership_next_success_threshold,
1322                upgrade_lock,
1323            )
1324            .await
1325            .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1326    }
1327    Ok(())
1328}
1329
1330/// Validates the light client state update certificate
1331pub async fn validate_light_client_state_update_certificate<TYPES: NodeType, V: Versions>(
1332    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1333    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1334    upgrade_lock: &UpgradeLock<TYPES, V>,
1335) -> Result<()> {
1336    tracing::debug!("Validating light client state update certificate");
1337
1338    let epoch_membership = membership_coordinator
1339        .membership_for_epoch(state_cert.epoch())
1340        .await?;
1341
1342    let membership_stake_table = epoch_membership.stake_table().await;
1343    let membership_success_threshold = epoch_membership.success_threshold().await;
1344
1345    let mut state_key_map = HashMap::new();
1346    membership_stake_table.into_iter().for_each(|config| {
1347        state_key_map.insert(
1348            config.state_ver_key.clone(),
1349            config.stake_table_entry.stake(),
1350        );
1351    });
1352
1353    let mut accumulated_stake = U256::from(0);
1354    let signed_state_digest = derive_signed_state_digest(
1355        &state_cert.light_client_state,
1356        &state_cert.next_stake_table_state,
1357        &state_cert.auth_root,
1358    );
1359    for (key, sig, sig_v2) in state_cert.signatures.iter() {
1360        if let Some(stake) = state_key_map.get(key) {
1361            accumulated_stake += *stake;
1362            #[allow(clippy::collapsible_else_if)]
1363            // We only perform the second signature check prior to the DrbAndHeaderUpgrade
1364            if !upgrade_lock
1365                .proposal2_version(TYPES::View::new(state_cert.light_client_state.view_number))
1366                .await
1367            {
1368                if !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1369                    key,
1370                    sig_v2,
1371                    &state_cert.light_client_state,
1372                    &state_cert.next_stake_table_state,
1373                ) {
1374                    bail!("Invalid light client state update certificate signature");
1375                }
1376            } else {
1377                if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1378                    key,
1379                    sig,
1380                    signed_state_digest,
1381                ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1382                    key,
1383                    sig_v2,
1384                    &state_cert.light_client_state,
1385                    &state_cert.next_stake_table_state,
1386                ) {
1387                    bail!("Invalid light client state update certificate signature");
1388                }
1389            }
1390        } else {
1391            bail!("Invalid light client state update certificate signature");
1392        }
1393    }
1394    if accumulated_stake < membership_success_threshold {
1395        bail!("Light client state update certificate does not meet the success threshold");
1396    }
1397
1398    Ok(())
1399}
1400
1401pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1402    qc: &QuorumCertificate2<TYPES>,
1403    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1404    epoch_height: u64,
1405) -> bool {
1406    qc.data
1407        .block_number
1408        .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1409        && Some(state_cert.epoch) == qc.data.epoch()
1410        && qc.view_number().u64() == state_cert.light_client_state.view_number
1411}
1412
1413/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state;
1414/// makes sure it corresponds to the given DA certificate;
1415/// if it's not yet available, waits for it with the given timeout.
1416pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1417    target_epoch: Option<TYPES::Epoch>,
1418    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1419    da_cert: &DaCertificate2<TYPES>,
1420    consensus: &OuterConsensus<TYPES>,
1421    receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1422    cancel_receiver: Receiver<()>,
1423    id: u64,
1424) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1425    tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1426    let maybe_second_vid_share = consensus
1427        .read()
1428        .await
1429        .vid_shares()
1430        .get(&vid_share.data.view_number())
1431        .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1432        .and_then(|epoch_map| epoch_map.get(&target_epoch))
1433        .cloned();
1434    if let Some(second_vid_share) = maybe_second_vid_share {
1435        if (target_epoch == da_cert.epoch()
1436            && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1437            || (target_epoch != da_cert.epoch()
1438                && Some(second_vid_share.data.payload_commitment())
1439                    == da_cert.data().next_epoch_payload_commit)
1440        {
1441            return Ok(second_vid_share);
1442        }
1443    }
1444
1445    let receiver = receiver.clone();
1446    let da_cert_clone = da_cert.clone();
1447    let Some(event) = EventDependency::new(
1448        receiver,
1449        cancel_receiver,
1450        format!(
1451            "VoteDependency Second VID share for view {:?}, my id {:?}",
1452            vid_share.data.view_number(),
1453            id
1454        ),
1455        Box::new(move |event| {
1456            let event = event.as_ref();
1457            if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1458                if target_epoch == da_cert_clone.epoch() {
1459                    second_vid_share.data.payload_commitment()
1460                        == da_cert_clone.data().payload_commit
1461                } else {
1462                    Some(second_vid_share.data.payload_commitment())
1463                        == da_cert_clone.data().next_epoch_payload_commit
1464                }
1465            } else {
1466                false
1467            }
1468        }),
1469    )
1470    .completed()
1471    .await
1472    else {
1473        return Err(warn!("Error while waiting for the second VID share."));
1474    };
1475    let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1476        // this shouldn't happen
1477        return Err(warn!(
1478            "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1479             possible."
1480        ));
1481    };
1482    Ok(second_vid_share.clone())
1483}
1484
1485pub async fn broadcast_view_change<TYPES: NodeType>(
1486    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1487    new_view_number: TYPES::View,
1488    epoch: Option<TYPES::Epoch>,
1489    first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
1490) {
1491    let mut broadcast_epoch = epoch;
1492    if let Some((first_epoch_view, first_epoch)) = first_epoch {
1493        if new_view_number == first_epoch_view && broadcast_epoch != Some(first_epoch) {
1494            broadcast_epoch = Some(first_epoch);
1495        }
1496    }
1497    tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1498    broadcast_event(
1499        Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1500        sender,
1501    )
1502    .await
1503}
1504
1505pub fn derive_signed_state_digest(
1506    lc_state: &LightClientState,
1507    next_stake_state: &StakeTableState,
1508    auth_root: &FixedBytes<32>,
1509) -> CircuitField {
1510    let lc_state_sol: LightClientStateSol = (*lc_state).into();
1511    let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1512
1513    let res = alloy::primitives::keccak256(
1514        (
1515            lc_state_sol.abi_encode(),
1516            stake_st_sol.abi_encode(),
1517            auth_root.abi_encode(),
1518        )
1519            .abi_encode_packed(),
1520    );
1521    CircuitField::from_be_bytes_mod_order(res.as_ref())
1522}