hotshot_task_impls/
helpers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{HashMap, HashSet},
9    sync::Arc,
10    time::Instant,
11};
12
13use alloy::{
14    primitives::{FixedBytes, U256},
15    sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24    consensus::OuterConsensus,
25    data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
26    drb::DrbResult,
27    epoch_membership::EpochMembershipCoordinator,
28    event::{Event, EventType, LeafInfo},
29    light_client::{CircuitField, LightClientState, StakeTableState},
30    message::{Proposal, UpgradeLock},
31    request_response::ProposalRequestPayload,
32    simple_certificate::{
33        CertificatePair, DaCertificate2, LightClientStateUpdateCertificateV2,
34        NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
35    },
36    simple_vote::HasEpoch,
37    stake_table::StakeTableEntries,
38    traits::{
39        block_contents::BlockHeader,
40        election::Membership,
41        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
42        signature_key::{
43            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
44        },
45        storage::Storage,
46        BlockPayload, ValidatedState,
47    },
48    utils::{
49        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_transition_block,
50        option_epoch_from_block_number, Terminator, View, ViewInner,
51    },
52    vote::{Certificate, HasViewNumber},
53};
54use hotshot_utils::anytrace::*;
55use time::OffsetDateTime;
56use tokio::time::timeout;
57use tracing::instrument;
58use vbs::version::StaticVersionType;
59
60use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
61
62/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
63#[instrument(skip_all)]
64#[allow(clippy::too_many_arguments)]
65pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
66    qc: &QuorumCertificate2<TYPES>,
67    event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
68    event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
69    membership_coordinator: EpochMembershipCoordinator<TYPES>,
70    consensus: OuterConsensus<TYPES>,
71    sender_public_key: TYPES::SignatureKey,
72    sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
73    upgrade_lock: &UpgradeLock<TYPES, V>,
74    epoch_height: u64,
75) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
76    let view_number = qc.view_number();
77    let leaf_commit = qc.data.leaf_commit;
78    // We need to be able to sign this request before submitting it to the network. Compute the
79    // payload first.
80    let signed_proposal_request = ProposalRequestPayload {
81        view_number,
82        key: sender_public_key,
83    };
84
85    // Finally, compute the signature for the payload.
86    let signature = TYPES::SignatureKey::sign(
87        &sender_private_key,
88        signed_proposal_request.commit().as_ref(),
89    )
90    .wrap()
91    .context(error!("Failed to sign proposal. This should never happen."))?;
92
93    tracing::info!("Sending proposal request for view {view_number}");
94
95    // First, broadcast that we need a proposal to the current leader
96    broadcast_event(
97        HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
98        &event_sender,
99    )
100    .await;
101
102    let mut rx = event_receiver.clone();
103    // Make a background task to await the arrival of the event data.
104    let Ok(Some(proposal)) =
105        // We want to explicitly timeout here so we aren't waiting around for the data.
106        timeout(REQUEST_TIMEOUT, async move {
107            // We want to iterate until the proposal is not None, or until we reach the timeout.
108            while let Ok(event) = rx.recv_direct().await {
109                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
110                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
111                    if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
112                        return Some(quorum_proposal.clone());
113                    }
114                }
115            }
116            None
117        })
118        .await
119    else {
120        bail!("Request for proposal failed");
121    };
122
123    let view_number = proposal.data.view_number();
124    let justify_qc = proposal.data.justify_qc().clone();
125
126    let justify_qc_epoch = justify_qc.data.epoch();
127
128    let epoch_membership = membership_coordinator
129        .stake_table_for_epoch(justify_qc_epoch)
130        .await?;
131    let membership_stake_table = epoch_membership.stake_table().await;
132    let membership_success_threshold = epoch_membership.success_threshold().await;
133
134    justify_qc
135        .is_valid_cert(
136            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
137            membership_success_threshold,
138            upgrade_lock,
139        )
140        .await
141        .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
142
143    let mut consensus_writer = consensus.write().await;
144    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
145    let state = Arc::new(
146        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
147    );
148
149    if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
150        tracing::trace!("{e:?}");
151    }
152    let view = View {
153        view_inner: ViewInner::Leaf {
154            leaf: leaf.commit(),
155            state,
156            delta: None,
157            epoch: leaf.epoch(epoch_height),
158        },
159    };
160    Ok((leaf, view))
161}
162pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
163    membership: &Arc<RwLock<TYPES::Membership>>,
164    epoch: TYPES::Epoch,
165    storage: &I::Storage,
166    drb_result: DrbResult,
167) {
168    tracing::debug!("Calling store_drb_result for epoch {epoch}");
169    if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
170        tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
171    }
172
173    membership.write().await.add_drb_result(epoch, drb_result)
174}
175
176/// Verify the DRB result from the proposal for the next epoch if this is the last block of the
177/// current epoch.
178///
179/// Uses the result from `start_drb_task`.
180///
181/// Returns an error if we should not vote.
182pub(crate) async fn verify_drb_result<
183    TYPES: NodeType,
184    I: NodeImplementation<TYPES>,
185    V: Versions,
186>(
187    proposal: &QuorumProposalWrapper<TYPES>,
188    validation_info: &ValidationInfo<TYPES, I, V>,
189) -> Result<()> {
190    // Skip if this is not the expected block.
191    if validation_info.epoch_height == 0
192        || !is_epoch_transition(
193            proposal.block_header().block_number(),
194            validation_info.epoch_height,
195        )
196    {
197        tracing::debug!("Skipping DRB result verification");
198        return Ok(());
199    }
200
201    // #3967 REVIEW NOTE: Check if this is the right way to decide if we're doing epochs
202    // Alternatively, should we just return Err() if epochs aren't happening here? Or can we assume
203    // that epochs are definitely happening by virtue of getting here?
204    let epoch = option_epoch_from_block_number::<TYPES>(
205        validation_info
206            .upgrade_lock
207            .epochs_enabled(proposal.view_number())
208            .await,
209        proposal.block_header().block_number(),
210        validation_info.epoch_height,
211    );
212
213    let proposal_result = proposal
214        .next_drb_result()
215        .context(info!("Proposal is missing the next epoch's DRB result."))?;
216
217    if let Some(epoch_val) = epoch {
218        let current_epoch_membership = validation_info
219            .membership
220            .coordinator
221            .stake_table_for_epoch(epoch)
222            .await
223            .context(warn!("No stake table for epoch {}", epoch_val))?;
224
225        let has_stake_current_epoch = current_epoch_membership
226            .has_stake(&validation_info.public_key)
227            .await;
228
229        if has_stake_current_epoch {
230            let computed_result = current_epoch_membership
231                .next_epoch()
232                .await
233                .context(warn!("No stake table for epoch {}", epoch_val + 1))?
234                .get_epoch_drb()
235                .await
236                .clone()
237                .context(warn!("DRB result not found"))?;
238
239            ensure!(
240                proposal_result == computed_result,
241                warn!(
242                    "Our calculated DRB result is {computed_result:?}, which does not match the \
243                     proposed DRB result of {proposal_result:?}"
244                )
245            );
246        }
247
248        Ok(())
249    } else {
250        Err(error!("Epochs are not available"))
251    }
252}
253
254/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
255async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
256    decided_leaf: &Leaf2<TYPES>,
257    epoch_height: u64,
258    membership: &EpochMembershipCoordinator<TYPES>,
259    storage: &I::Storage,
260    consensus: &OuterConsensus<TYPES>,
261) {
262    let decided_leaf = decided_leaf.clone();
263    let decided_block_number = decided_leaf.block_header().block_number();
264
265    // Skip if this is not the expected block.
266    if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
267        let next_epoch_number =
268            TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
269
270        let start = Instant::now();
271        if let Err(e) = storage
272            .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
273            .await
274        {
275            tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
276        }
277        tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
278
279        let membership = membership.clone();
280        let decided_block_header = decided_leaf.block_header().clone();
281        let storage = storage.clone();
282        let consensus = consensus.clone();
283
284        let consensus_reader = consensus.read().await;
285
286        drop(consensus_reader);
287
288        tokio::spawn(async move {
289            let membership_clone = membership.clone();
290
291            // First add the epoch root to `membership`
292            {
293                let start = Instant::now();
294                if let Err(e) = Membership::add_epoch_root(
295                    Arc::clone(membership_clone.membership()),
296                    decided_block_header,
297                )
298                .await
299                {
300                    tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
301                }
302                tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
303            }
304
305            let membership_clone = membership.clone();
306
307            let drb_result = membership_clone
308                .compute_drb_result(next_epoch_number, decided_leaf.clone())
309                .await;
310
311            let drb_result = match drb_result {
312                Ok(result) => result,
313                Err(e) => {
314                    tracing::error!("Failed to compute DRB result from decide: {e}");
315                    return;
316                },
317            };
318
319            let start = Instant::now();
320            handle_drb_result::<TYPES, I>(
321                membership.membership(),
322                next_epoch_number,
323                &storage,
324                drb_result,
325            )
326            .await;
327            tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
328        });
329    }
330}
331
332/// Helper type to give names and to the output values of the leaf chain traversal operation.
333#[derive(Debug)]
334pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
335    /// The new locked view obtained from a 2 chain starting from the proposal's parent.
336    pub new_locked_view_number: Option<TYPES::View>,
337
338    /// The new decided view obtained from a 3 chain starting from the proposal's parent.
339    pub new_decided_view_number: Option<TYPES::View>,
340
341    /// The QC signing the latest new leaf, causing it to become committed.
342    ///
343    /// Only present if a new leaf chain has been decided.
344    pub committing_qc: Option<CertificatePair<TYPES>>,
345
346    /// A second QC extending the committing QC, causing the new leaf chain to become decided.
347    ///
348    /// This is only applicable in HotStuff2, and will be [`None`] prior to HotShot version 0.3.
349    /// HotStuff1 (HotShot < 0.3) uses a different commit rule, which is not captured in this type.
350    pub deciding_qc: Option<CertificatePair<TYPES>>,
351
352    /// The decided leaves with corresponding validated state and VID info.
353    pub leaf_views: Vec<LeafInfo<TYPES>>,
354
355    /// The transactions in the block payload for each leaf.
356    pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
357
358    /// The most recent upgrade certificate from one of the leaves.
359    pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
360}
361
362/// We need Default to be implemented because the leaf ascension has very few failure branches,
363/// and when they *do* happen, we still return intermediate states. Default makes the burden
364/// of filling values easier.
365impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
366    /// The default method for this type is to set all of the returned values to `None`.
367    fn default() -> Self {
368        Self {
369            new_locked_view_number: None,
370            new_decided_view_number: None,
371            committing_qc: None,
372            deciding_qc: None,
373            leaf_views: Vec::new(),
374            included_txns: None,
375            decided_upgrade_cert: None,
376        }
377    }
378}
379
380async fn update_metrics<TYPES: NodeType>(
381    consensus: &OuterConsensus<TYPES>,
382    leaf_views: &[LeafInfo<TYPES>],
383) {
384    let consensus_reader = consensus.read().await;
385    let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
386
387    for leaf_view in leaf_views {
388        let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
389
390        let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
391            tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
392            continue;
393        };
394        consensus_reader
395            .metrics
396            .proposal_to_decide_time
397            .add_point(proposal_to_decide_time as f64);
398        if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
399            consensus_reader
400                .metrics
401                .finalized_bytes
402                .add_point(txn_bytes as f64);
403        }
404    }
405}
406
407/// calculate the new decided leaf chain based on the rules of HotStuff 2
408///
409/// # Panics
410/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
411/// impossible.
412#[allow(clippy::too_many_arguments)]
413pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
414    proposal: &QuorumProposalWrapper<TYPES>,
415    consensus: OuterConsensus<TYPES>,
416    existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
417    public_key: &TYPES::SignatureKey,
418    with_epochs: bool,
419    membership: &EpochMembershipCoordinator<TYPES>,
420    storage: &I::Storage,
421) -> LeafChainTraversalOutcome<TYPES> {
422    let mut res = LeafChainTraversalOutcome::default();
423    let consensus_reader = consensus.read().await;
424    let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
425    res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
426
427    // If we don't have the proposals parent return early
428    let Some(parent_info) = consensus_reader
429        .parent_leaf_info(&proposed_leaf, public_key)
430        .await
431    else {
432        return res;
433    };
434    // Get the parents parent and check if it's consecutive in view to the parent, if so we can decided
435    // the grandparents view.  If not we're done.
436    let Some(grand_parent_info) = consensus_reader
437        .parent_leaf_info(&parent_info.leaf, public_key)
438        .await
439    else {
440        return res;
441    };
442    if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
443        return res;
444    }
445    res.committing_qc = Some(CertificatePair::for_parent(&parent_info.leaf));
446    res.deciding_qc = Some(CertificatePair::for_parent(&proposed_leaf));
447    let decided_view_number = grand_parent_info.leaf.view_number();
448    res.new_decided_view_number = Some(decided_view_number);
449    // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it.
450    let old_anchor_view = consensus_reader.last_decided_view();
451    let mut current_leaf_info = Some(grand_parent_info);
452    let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
453    let mut txns = HashSet::new();
454    while current_leaf_info
455        .as_ref()
456        .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
457    {
458        // unwrap is safe, we just checked that he option is some
459        let info = &mut current_leaf_info.unwrap();
460        // Check if there's a new upgrade certificate available.
461        if let Some(cert) = info.leaf.upgrade_certificate() {
462            if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
463                if cert.data.decide_by < decided_view_number {
464                    tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
465                } else {
466                    tracing::info!("Reached decide on upgrade certificate: {cert:?}");
467                    res.decided_upgrade_cert = Some(cert.clone());
468                }
469            }
470        }
471
472        // If the block payload is available for this leaf, include it in
473        // the leaf chain that we send to the client.
474        if let Some(payload) = consensus_reader
475            .saved_payloads()
476            .get(&info.leaf.view_number())
477        {
478            info.leaf
479                .fill_block_payload_unchecked(payload.as_ref().payload.clone());
480        }
481
482        if let Some(ref payload) = info.leaf.block_payload() {
483            for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
484                txns.insert(txn);
485            }
486        }
487
488        current_leaf_info = consensus_reader
489            .parent_leaf_info(&info.leaf, public_key)
490            .await;
491        res.leaf_views.push(info.clone());
492    }
493
494    if !txns.is_empty() {
495        res.included_txns = Some(txns);
496    }
497
498    if with_epochs && res.new_decided_view_number.is_some() {
499        let Some(first_leaf) = res.leaf_views.first() else {
500            return res;
501        };
502        let epoch_height = consensus_reader.epoch_height;
503        consensus_reader
504            .metrics
505            .last_synced_block_height
506            .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
507        drop(consensus_reader);
508
509        for decided_leaf_info in &res.leaf_views {
510            decide_epoch_root::<TYPES, I>(
511                &decided_leaf_info.leaf,
512                epoch_height,
513                membership,
514                storage,
515                &consensus,
516            )
517            .await;
518        }
519        update_metrics(&consensus, &res.leaf_views).await;
520    }
521
522    res
523}
524
525/// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin
526/// by obtaining the parent view, and if we are in a chain (i.e. the next view from the parent is
527/// one view newer), then we begin attempting to form the chain. This is a direct impl from
528/// [HotStuff](https://arxiv.org/pdf/1803.05069) section 5:
529///
530/// > When a node b* carries a QC that refers to a direct parent, i.e., b*.justify.node = b*.parent,
531/// > we say that it forms a One-Chain. Denote by b'' = b*.justify.node. Node b* forms a Two-Chain,
532/// > if in addition to forming a One-Chain, b''.justify.node = b''.parent.
533/// > It forms a Three-Chain, if b'' forms a Two-Chain.
534///
535/// We follow this exact logic to determine if we are able to reach a commit and a decide. A commit
536/// is reached when we have a two chain, and a decide is reached when we have a three chain.
537///
538/// # Example
539/// Suppose we have a decide for view 1, and we then move on to get undecided views 2, 3, and 4. Further,
540/// suppose that our *next* proposal is for view 5, but this leader did not see info for view 4, so the
541/// justify qc of the proposal points to view 3. This is fine, and the undecided chain now becomes
542/// 2-3-5.
543///
544/// Assuming we continue with honest leaders, we then eventually could get a chain like: 2-3-5-6-7-8. This
545/// will prompt a decide event to occur (this code), where the `proposal` is for view 8. Now, since the
546/// lowest value in the 3-chain here would be 5 (excluding 8 since we only walk the parents), we begin at
547/// the first link in the chain, and walk back through all undecided views, making our new anchor view 5,
548/// and out new locked view will be 6.
549///
550/// Upon receipt then of a proposal for view 9, assuming it is valid, this entire process will repeat, and
551/// the anchor view will be set to view 6, with the locked view as view 7.
552///
553/// # Panics
554/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
555/// impossible.
556#[allow(clippy::too_many_arguments)]
557pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
558    proposal: &QuorumProposalWrapper<TYPES>,
559    consensus: OuterConsensus<TYPES>,
560    existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
561    public_key: &TYPES::SignatureKey,
562    with_epochs: bool,
563    membership: &EpochMembershipCoordinator<TYPES>,
564    storage: &I::Storage,
565    epoch_height: u64,
566) -> LeafChainTraversalOutcome<TYPES> {
567    let consensus_reader = consensus.read().await;
568    let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
569    let view_number = proposal.view_number();
570    let parent_view_number = proposal.justify_qc().view_number();
571    let old_anchor_view = consensus_reader.last_decided_view();
572
573    let mut last_view_number_visited = view_number;
574    let mut current_chain_length = 0usize;
575    let mut res = LeafChainTraversalOutcome::default();
576
577    if let Err(e) = consensus_reader.visit_leaf_ancestors(
578        parent_view_number,
579        Terminator::Exclusive(old_anchor_view),
580        true,
581        |leaf, state, delta| {
582            // This is the core paper logic. We're implementing the chain in chained hotstuff.
583            if res.new_decided_view_number.is_none() {
584                // If the last view number is the child of the leaf we've moved to...
585                if last_view_number_visited == leaf.view_number() + 1 {
586                    last_view_number_visited = leaf.view_number();
587
588                    // The chain grows by one
589                    current_chain_length += 1;
590
591                    // We emit a locked view when the chain length is 2
592                    if current_chain_length == 2 {
593                        res.new_locked_view_number = Some(leaf.view_number());
594                        // The next leaf in the chain, if there is one, is decided, so this
595                        // leaf's justify_qc would become the QC for the decided chain.
596                        res.committing_qc = Some(CertificatePair::for_parent(leaf));
597                    } else if current_chain_length == 3 {
598                        // And we decide when the chain length is 3.
599                        res.new_decided_view_number = Some(leaf.view_number());
600                    }
601                } else {
602                    // There isn't a new chain extension available, so we signal to the callback
603                    // owner that we can exit for now.
604                    return false;
605                }
606            }
607
608            // Now, if we *have* reached a decide, we need to do some state updates.
609            if let Some(new_decided_view) = res.new_decided_view_number {
610                // First, get a mutable reference to the provided leaf.
611                let mut leaf = leaf.clone();
612
613                // Update the metrics
614                if leaf.view_number() == new_decided_view {
615                    consensus_reader
616                        .metrics
617                        .last_synced_block_height
618                        .set(usize::try_from(leaf.height()).unwrap_or(0));
619                }
620
621                // Check if there's a new upgrade certificate available.
622                if let Some(cert) = leaf.upgrade_certificate() {
623                    if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
624                        if cert.data.decide_by < view_number {
625                            tracing::warn!(
626                                "Failed to decide an upgrade certificate in time. Ignoring."
627                            );
628                        } else {
629                            tracing::info!("Reached decide on upgrade certificate: {cert:?}");
630                            res.decided_upgrade_cert = Some(cert.clone());
631                        }
632                    }
633                }
634                // If the block payload is available for this leaf, include it in
635                // the leaf chain that we send to the client.
636                if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
637                    leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
638                }
639
640                // Get the VID share at the leaf's view number, corresponding to our key
641                // (if one exists)
642                let vid_share = consensus_reader
643                    .vid_shares()
644                    .get(&leaf.view_number())
645                    .and_then(|key_map| key_map.get(public_key))
646                    .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
647                    .map(|prop| prop.data.clone());
648
649                let state_cert = if leaf.with_epoch
650                    && is_epoch_root(
651                        leaf.block_header().block_number(),
652                        consensus_reader.epoch_height,
653                    ) {
654                    match consensus_reader.state_cert() {
655                        // Sanity check that the state cert is for the same view as the decided leaf
656                        Some(state_cert)
657                            if state_cert.light_client_state.view_number
658                                == leaf.view_number().u64() =>
659                        {
660                            Some(state_cert.clone())
661                        },
662                        _ => None,
663                    }
664                } else {
665                    None
666                };
667
668                // Add our data into a new `LeafInfo`
669                res.leaf_views.push(LeafInfo::new(
670                    leaf.clone(),
671                    Arc::clone(&state),
672                    delta.clone(),
673                    vid_share,
674                    state_cert,
675                ));
676                if let Some(ref payload) = leaf.block_payload() {
677                    res.included_txns = Some(
678                        payload
679                            .transaction_commitments(leaf.block_header().metadata())
680                            .into_iter()
681                            .collect::<HashSet<_>>(),
682                    );
683                }
684            }
685            true
686        },
687    ) {
688        tracing::debug!("Leaf ascension failed; error={e}");
689    }
690
691    let epoch_height = consensus_reader.epoch_height;
692    drop(consensus_reader);
693
694    if with_epochs && res.new_decided_view_number.is_some() {
695        for decided_leaf_info in &res.leaf_views {
696            decide_epoch_root::<TYPES, I>(
697                &decided_leaf_info.leaf,
698                epoch_height,
699                membership,
700                storage,
701                &consensus,
702            )
703            .await;
704        }
705    }
706
707    res
708}
709
710/// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not.
711#[instrument(skip_all)]
712#[allow(clippy::too_many_arguments)]
713pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
714    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
715    event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
716    membership: EpochMembershipCoordinator<TYPES>,
717    public_key: TYPES::SignatureKey,
718    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
719    consensus: OuterConsensus<TYPES>,
720    upgrade_lock: &UpgradeLock<TYPES, V>,
721    parent_qc: &QuorumCertificate2<TYPES>,
722    epoch_height: u64,
723) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
724    let consensus_reader = consensus.read().await;
725    let vsm_contains_parent_view = consensus_reader
726        .validated_state_map()
727        .contains_key(&parent_qc.view_number());
728    drop(consensus_reader);
729
730    if !vsm_contains_parent_view {
731        let _ = fetch_proposal(
732            parent_qc,
733            event_sender.clone(),
734            event_receiver.clone(),
735            membership,
736            consensus.clone(),
737            public_key.clone(),
738            private_key.clone(),
739            upgrade_lock,
740            epoch_height,
741        )
742        .await
743        .context(info!("Failed to fetch proposal"))?;
744    }
745
746    let consensus_reader = consensus.read().await;
747    let parent_view = consensus_reader
748        .validated_state_map()
749        .get(&parent_qc.view_number())
750        .context(debug!(
751            "Couldn't find parent view in state map, waiting for replica to see proposal; \
752             parent_view_number: {}",
753            *parent_qc.view_number()
754        ))?;
755
756    let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
757        "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
758         parent_view {:?}",
759        *parent_qc.view_number(),
760        parent_view
761    ))?;
762
763    if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
764        // NOTE: This happens on the genesis block
765        tracing::debug!(
766            "They don't equal: {:?}   {:?}",
767            leaf_commitment,
768            consensus_reader.high_qc().data().leaf_commit
769        );
770    }
771
772    let leaf = consensus_reader
773        .saved_leaves()
774        .get(&leaf_commitment)
775        .context(info!("Failed to find high QC of parent"))?;
776
777    Ok((leaf.clone(), Arc::clone(state)))
778}
779
780pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
781    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
782    validation_info: &ValidationInfo<TYPES, I, V>,
783) -> Result<()> {
784    let in_transition_epoch = proposal
785        .data
786        .justify_qc()
787        .data
788        .block_number
789        .is_some_and(|bn| {
790            !is_transition_block(bn, validation_info.epoch_height)
791                && is_epoch_transition(bn, validation_info.epoch_height)
792                && bn % validation_info.epoch_height != 0
793        });
794    let justify_qc = proposal.data.justify_qc();
795    let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
796    if !in_transition_epoch {
797        tracing::debug!(
798            "Storing high QC for view {:?} and height {:?}",
799            justify_qc.view_number(),
800            justify_qc.data.block_number
801        );
802        if let Err(e) = validation_info
803            .storage
804            .update_high_qc2(justify_qc.clone())
805            .await
806        {
807            bail!("Failed to store High QC, not voting; error = {e:?}");
808        }
809        if justify_qc
810            .data
811            .block_number
812            .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
813        {
814            let Some(state_cert) = proposal.data.state_cert() else {
815                bail!("Epoch root QC has no state cert, not voting!");
816            };
817            if let Err(e) = validation_info
818                .storage
819                .update_state_cert(state_cert.clone())
820                .await
821            {
822                bail!(
823                    "Failed to store the light client state update certificate, not voting; error \
824                     = {:?}",
825                    e
826                );
827            }
828            validation_info
829                .consensus
830                .write()
831                .await
832                .update_state_cert(state_cert.clone())?;
833        }
834        if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
835            if let Err(e) = validation_info
836                .storage
837                .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
838                .await
839            {
840                bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
841            }
842        }
843    }
844    let mut consensus_writer = validation_info.consensus.write().await;
845    if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
846        if justify_qc
847            .data
848            .block_number
849            .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
850        {
851            consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
852            consensus_writer
853                .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
854            return Ok(());
855        }
856        consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
857    }
858    consensus_writer.update_high_qc(justify_qc.clone())?;
859
860    Ok(())
861}
862
863async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
864    validation_info: &ValidationInfo<TYPES, I, V>,
865) -> Option<(
866    QuorumCertificate2<TYPES>,
867    NextEpochQuorumCertificate2<TYPES>,
868)> {
869    validation_info
870        .consensus
871        .read()
872        .await
873        .transition_qc()
874        .cloned()
875}
876
877pub(crate) async fn validate_epoch_transition_qc<
878    TYPES: NodeType,
879    I: NodeImplementation<TYPES>,
880    V: Versions,
881>(
882    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
883    validation_info: &ValidationInfo<TYPES, I, V>,
884) -> Result<()> {
885    let proposed_qc = proposal.data.justify_qc();
886    let Some(qc_block_number) = proposed_qc.data().block_number else {
887        bail!("Justify QC has no block number");
888    };
889    if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
890        || qc_block_number % validation_info.epoch_height == 0
891    {
892        return Ok(());
893    }
894    let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
895        bail!("Next epoch justify QC is not present");
896    };
897    ensure!(
898        next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
899        "Next epoch QC has different leaf commit to justify QC"
900    );
901
902    if is_transition_block(qc_block_number, validation_info.epoch_height) {
903        // Height is epoch height - 2
904        ensure!(
905            transition_qc(validation_info)
906                .await
907                .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
908            "Proposed transition qc must have view number greater than or equal to previous \
909             transition QC"
910        );
911
912        validation_info
913            .consensus
914            .write()
915            .await
916            .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
917        // reset the high qc to the transition qc
918        update_high_qc(proposal, validation_info).await?;
919    } else {
920        // Height is either epoch height - 1 or epoch height
921        ensure!(
922            transition_qc(validation_info)
923                .await
924                .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
925            "Transition block must have view number greater than previous transition QC"
926        );
927        ensure!(
928            proposal.data.view_change_evidence().is_none(),
929            "Second to last block and last block of epoch must directly extend previous block, Qc \
930             Block number: {qc_block_number}, Proposal Block number: {}",
931            proposal.data.block_header().block_number()
932        );
933        ensure!(
934            proposed_qc.view_number() + 1 == proposal.data.view_number()
935                || transition_qc(validation_info)
936                    .await
937                    .is_some_and(|(qc, _)| &qc == proposed_qc),
938            "Transition proposals must extend the previous view directly, or extend the previous \
939             transition block"
940        );
941    }
942    Ok(())
943}
944
945/// Validate the state and safety and liveness of a proposal then emit
946/// a `QuorumProposalValidated` event.
947///
948///
949/// # Errors
950/// If any validation or state update fails.
951#[allow(clippy::too_many_lines)]
952#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
953pub(crate) async fn validate_proposal_safety_and_liveness<
954    TYPES: NodeType,
955    I: NodeImplementation<TYPES>,
956    V: Versions,
957>(
958    proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
959    parent_leaf: Leaf2<TYPES>,
960    validation_info: &ValidationInfo<TYPES, I, V>,
961    event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
962    sender: TYPES::SignatureKey,
963) -> Result<()> {
964    let view_number = proposal.data.view_number();
965
966    let mut valid_epoch_transition = false;
967    if validation_info
968        .upgrade_lock
969        .version(proposal.data.justify_qc().view_number())
970        .await
971        .is_ok_and(|v| v >= V::Epochs::VERSION)
972    {
973        let Some(block_number) = proposal.data.justify_qc().data.block_number else {
974            bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
975        };
976        if is_epoch_transition(block_number, validation_info.epoch_height) {
977            validate_epoch_transition_qc(&proposal, validation_info).await?;
978            valid_epoch_transition = true;
979        }
980    }
981
982    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
983    ensure!(
984        proposed_leaf.parent_commitment() == parent_leaf.commit(),
985        "Proposed leaf does not extend the parent leaf."
986    );
987    let proposal_epoch = option_epoch_from_block_number::<TYPES>(
988        validation_info
989            .upgrade_lock
990            .epochs_enabled(view_number)
991            .await,
992        proposed_leaf.height(),
993        validation_info.epoch_height,
994    );
995
996    let state = Arc::new(
997        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
998    );
999
1000    {
1001        let mut consensus_writer = validation_info.consensus.write().await;
1002        if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
1003            tracing::trace!("{e:?}");
1004        }
1005
1006        // Update our internal storage of the proposal. The proposal is valid, so
1007        // we swallow this error and just log if it occurs.
1008        if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
1009            tracing::debug!("Internal proposal update failed; error = {e:#}");
1010        };
1011    }
1012
1013    UpgradeCertificate::validate(
1014        proposal.data.upgrade_certificate(),
1015        &validation_info.membership,
1016        proposal_epoch,
1017        &validation_info.upgrade_lock,
1018    )
1019    .await?;
1020
1021    // Validate that the upgrade certificate is re-attached, if we saw one on the parent
1022    proposed_leaf
1023        .extends_upgrade(
1024            &parent_leaf,
1025            &validation_info.upgrade_lock.decided_upgrade_certificate,
1026        )
1027        .await?;
1028
1029    let justify_qc = proposal.data.justify_qc().clone();
1030    // Create a positive vote if either liveness or safety check
1031    // passes.
1032
1033    {
1034        let consensus_reader = validation_info.consensus.read().await;
1035        // Epoch safety check:
1036        // The proposal is safe if
1037        // 1. the proposed block and the justify QC block belong to the same epoch or
1038        // 2. the justify QC is the eQC for the previous block
1039        let justify_qc_epoch = option_epoch_from_block_number::<TYPES>(
1040            validation_info
1041                .upgrade_lock
1042                .epochs_enabled(view_number)
1043                .await,
1044            parent_leaf.height(),
1045            validation_info.epoch_height,
1046        );
1047        ensure!(
1048            proposal_epoch == justify_qc_epoch
1049                || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1050            {
1051                error!(
1052                    "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1053                     QC leaf is {parent_leaf:?}"
1054                )
1055            }
1056        );
1057
1058        // Make sure that the epoch transition proposal includes the next epoch QC
1059        if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1060            && validation_info
1061                .upgrade_lock
1062                .epochs_enabled(view_number)
1063                .await
1064        {
1065            ensure!(
1066                proposal.data.next_epoch_justify_qc().is_some(),
1067                "Epoch transition proposal does not include the next epoch justify QC. Do not \
1068                 vote!"
1069            );
1070        }
1071
1072        // Liveness check.
1073        let liveness_check =
1074            justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1075
1076        // Safety check.
1077        // Check if proposal extends from the locked leaf.
1078        let outcome = consensus_reader.visit_leaf_ancestors(
1079            justify_qc.view_number(),
1080            Terminator::Inclusive(consensus_reader.locked_view()),
1081            false,
1082            |leaf, _, _| {
1083                // if leaf view no == locked view no then we're done, report success by
1084                // returning true
1085                leaf.view_number() != consensus_reader.locked_view()
1086            },
1087        );
1088        let safety_check = outcome.is_ok();
1089
1090        ensure!(safety_check || liveness_check, {
1091            if let Err(e) = outcome {
1092                broadcast_event(
1093                    Event {
1094                        view_number,
1095                        event: EventType::Error { error: Arc::new(e) },
1096                    },
1097                    &validation_info.output_event_stream,
1098                )
1099                .await;
1100            }
1101
1102            error!(
1103                "Failed safety and liveness check \n High QC is {:?}  Proposal QC is {:?}  Locked \
1104                 view is {:?}",
1105                consensus_reader.high_qc(),
1106                proposal.data,
1107                consensus_reader.locked_view()
1108            )
1109        });
1110    }
1111
1112    // We accept the proposal, notify the application layer
1113    broadcast_event(
1114        Event {
1115            view_number,
1116            event: EventType::QuorumProposal {
1117                proposal: proposal.clone(),
1118                sender,
1119            },
1120        },
1121        &validation_info.output_event_stream,
1122    )
1123    .await;
1124
1125    // Notify other tasks
1126    broadcast_event(
1127        Arc::new(HotShotEvent::QuorumProposalValidated(
1128            proposal.clone(),
1129            parent_leaf,
1130        )),
1131        &event_stream,
1132    )
1133    .await;
1134
1135    Ok(())
1136}
1137
1138/// Validates, from a given `proposal` that the view that it is being submitted for is valid when
1139/// compared to `cur_view` which is the highest proposed view (so far) for the caller. If the proposal
1140/// is for a view that's later than expected, that the proposal includes a timeout or view sync certificate.
1141///
1142/// # Errors
1143/// If any validation or view number check fails.
1144pub(crate) async fn validate_proposal_view_and_certs<
1145    TYPES: NodeType,
1146    I: NodeImplementation<TYPES>,
1147    V: Versions,
1148>(
1149    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1150    validation_info: &ValidationInfo<TYPES, I, V>,
1151) -> Result<()> {
1152    let view_number = proposal.data.view_number();
1153    ensure!(
1154        view_number >= validation_info.consensus.read().await.cur_view(),
1155        "Proposal is from an older view {:?}",
1156        proposal.data
1157    );
1158
1159    // Validate the proposal's signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment
1160    let mut membership = validation_info.membership.clone();
1161    proposal.validate_signature(&membership).await?;
1162
1163    // Verify a timeout certificate OR a view sync certificate exists and is valid.
1164    if proposal.data.justify_qc().view_number() != view_number - 1 {
1165        let received_proposal_cert =
1166            proposal
1167                .data
1168                .view_change_evidence()
1169                .clone()
1170                .context(debug!(
1171                    "Quorum proposal for view {view_number} needed a timeout or view sync \
1172                     certificate, but did not have one",
1173                ))?;
1174
1175        match received_proposal_cert {
1176            ViewChangeEvidence2::Timeout(timeout_cert) => {
1177                ensure!(
1178                    timeout_cert.data().view == view_number - 1,
1179                    "Timeout certificate for view {view_number} was not for the immediately \
1180                     preceding view"
1181                );
1182                let timeout_cert_epoch = timeout_cert.data().epoch();
1183                membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1184
1185                let membership_stake_table = membership.stake_table().await;
1186                let membership_success_threshold = membership.success_threshold().await;
1187
1188                timeout_cert
1189                    .is_valid_cert(
1190                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1191                        membership_success_threshold,
1192                        &validation_info.upgrade_lock,
1193                    )
1194                    .await
1195                    .context(|e| {
1196                        warn!("Timeout certificate for view {view_number} was invalid: {e}")
1197                    })?;
1198            },
1199            ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1200                ensure!(
1201                    view_sync_cert.view_number == view_number,
1202                    "View sync cert view number {:?} does not match proposal view number {:?}",
1203                    view_sync_cert.view_number,
1204                    view_number
1205                );
1206
1207                let view_sync_cert_epoch = view_sync_cert.data().epoch();
1208                membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1209
1210                let membership_stake_table = membership.stake_table().await;
1211                let membership_success_threshold = membership.success_threshold().await;
1212
1213                // View sync certs must also be valid.
1214                view_sync_cert
1215                    .is_valid_cert(
1216                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1217                        membership_success_threshold,
1218                        &validation_info.upgrade_lock,
1219                    )
1220                    .await
1221                    .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1222            },
1223        }
1224    }
1225
1226    // Validate the upgrade certificate -- this is just a signature validation.
1227    // Note that we don't do anything with the certificate directly if this passes; it eventually gets stored as part of the leaf if nothing goes wrong.
1228    {
1229        let epoch = option_epoch_from_block_number::<TYPES>(
1230            proposal.data.epoch().is_some(),
1231            proposal.data.block_header().block_number(),
1232            validation_info.epoch_height,
1233        );
1234        UpgradeCertificate::validate(
1235            proposal.data.upgrade_certificate(),
1236            &validation_info.membership,
1237            epoch,
1238            &validation_info.upgrade_lock,
1239        )
1240        .await?;
1241    }
1242
1243    Ok(())
1244}
1245
1246/// Helper function to send events and log errors
1247pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1248    match sender.broadcast_direct(event).await {
1249        Ok(None) => (),
1250        Ok(Some(overflowed)) => {
1251            tracing::error!(
1252                "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1253            );
1254        },
1255        Err(SendError(e)) => {
1256            tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1257        },
1258    }
1259}
1260
1261/// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it
1262/// corresponds to the provided high_qc.
1263pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
1264    qc: &QuorumCertificate2<TYPES>,
1265    maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1266    consensus: &OuterConsensus<TYPES>,
1267    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1268    upgrade_lock: &UpgradeLock<TYPES, V>,
1269    epoch_height: u64,
1270) -> Result<()> {
1271    let cert = CertificatePair::new(qc.clone(), maybe_next_epoch_qc.cloned());
1272
1273    let mut epoch_membership = membership_coordinator
1274        .stake_table_for_epoch(cert.epoch())
1275        .await?;
1276
1277    let membership_stake_table = epoch_membership.stake_table().await;
1278    let membership_success_threshold = epoch_membership.success_threshold().await;
1279
1280    if let Err(e) = cert
1281        .qc()
1282        .is_valid_cert(
1283            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1284            membership_success_threshold,
1285            upgrade_lock,
1286        )
1287        .await
1288    {
1289        consensus.read().await.metrics.invalid_qc.update(1);
1290        return Err(warn!("Invalid certificate: {e}"));
1291    }
1292
1293    // Check the next epoch QC if required.
1294    if upgrade_lock.epochs_enabled(cert.view_number()).await {
1295        if let Some(next_epoch_qc) = cert.verify_next_epoch_qc(epoch_height)? {
1296            epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1297            let membership_next_stake_table = epoch_membership.stake_table().await;
1298            let membership_next_success_threshold = epoch_membership.success_threshold().await;
1299            next_epoch_qc
1300                .is_valid_cert(
1301                    &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1302                    membership_next_success_threshold,
1303                    upgrade_lock,
1304                )
1305                .await
1306                .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1307        }
1308    }
1309
1310    Ok(())
1311}
1312
1313/// Validates the light client state update certificate
1314pub async fn validate_light_client_state_update_certificate<TYPES: NodeType, V: Versions>(
1315    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1316    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1317    upgrade_lock: &UpgradeLock<TYPES, V>,
1318) -> Result<()> {
1319    tracing::debug!("Validating light client state update certificate");
1320
1321    let epoch_membership = membership_coordinator
1322        .membership_for_epoch(state_cert.epoch())
1323        .await?;
1324
1325    let membership_stake_table = epoch_membership.stake_table().await;
1326    let membership_success_threshold = epoch_membership.success_threshold().await;
1327
1328    let mut state_key_map = HashMap::new();
1329    membership_stake_table.into_iter().for_each(|config| {
1330        state_key_map.insert(
1331            config.state_ver_key.clone(),
1332            config.stake_table_entry.stake(),
1333        );
1334    });
1335
1336    let mut accumulated_stake = U256::from(0);
1337    let signed_state_digest = derive_signed_state_digest(
1338        &state_cert.light_client_state,
1339        &state_cert.next_stake_table_state,
1340        &state_cert.auth_root,
1341    );
1342    for (key, sig, sig_v2) in state_cert.signatures.iter() {
1343        if let Some(stake) = state_key_map.get(key) {
1344            accumulated_stake += *stake;
1345            #[allow(clippy::collapsible_else_if)]
1346            // We only perform the second signature check prior to the DrbAndHeaderUpgrade
1347            if !upgrade_lock
1348                .proposal2_version(TYPES::View::new(state_cert.light_client_state.view_number))
1349                .await
1350            {
1351                if !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1352                    key,
1353                    sig_v2,
1354                    &state_cert.light_client_state,
1355                    &state_cert.next_stake_table_state,
1356                ) {
1357                    bail!("Invalid light client state update certificate signature");
1358                }
1359            } else {
1360                if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1361                    key,
1362                    sig,
1363                    signed_state_digest,
1364                ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1365                    key,
1366                    sig_v2,
1367                    &state_cert.light_client_state,
1368                    &state_cert.next_stake_table_state,
1369                ) {
1370                    bail!("Invalid light client state update certificate signature");
1371                }
1372            }
1373        } else {
1374            bail!("Invalid light client state update certificate signature");
1375        }
1376    }
1377    if accumulated_stake < membership_success_threshold {
1378        bail!("Light client state update certificate does not meet the success threshold");
1379    }
1380
1381    Ok(())
1382}
1383
1384pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1385    qc: &QuorumCertificate2<TYPES>,
1386    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1387    epoch_height: u64,
1388) -> bool {
1389    qc.data
1390        .block_number
1391        .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1392        && Some(state_cert.epoch) == qc.data.epoch()
1393        && qc.view_number().u64() == state_cert.light_client_state.view_number
1394}
1395
1396/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state;
1397/// makes sure it corresponds to the given DA certificate;
1398/// if it's not yet available, waits for it with the given timeout.
1399pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1400    target_epoch: Option<TYPES::Epoch>,
1401    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1402    da_cert: &DaCertificate2<TYPES>,
1403    consensus: &OuterConsensus<TYPES>,
1404    receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1405    cancel_receiver: Receiver<()>,
1406    id: u64,
1407) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1408    tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1409    let maybe_second_vid_share = consensus
1410        .read()
1411        .await
1412        .vid_shares()
1413        .get(&vid_share.data.view_number())
1414        .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1415        .and_then(|epoch_map| epoch_map.get(&target_epoch))
1416        .cloned();
1417    if let Some(second_vid_share) = maybe_second_vid_share {
1418        if (target_epoch == da_cert.epoch()
1419            && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1420            || (target_epoch != da_cert.epoch()
1421                && Some(second_vid_share.data.payload_commitment())
1422                    == da_cert.data().next_epoch_payload_commit)
1423        {
1424            return Ok(second_vid_share);
1425        }
1426    }
1427
1428    let receiver = receiver.clone();
1429    let da_cert_clone = da_cert.clone();
1430    let Some(event) = EventDependency::new(
1431        receiver,
1432        cancel_receiver,
1433        format!(
1434            "VoteDependency Second VID share for view {:?}, my id {:?}",
1435            vid_share.data.view_number(),
1436            id
1437        ),
1438        Box::new(move |event| {
1439            let event = event.as_ref();
1440            if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1441                if target_epoch == da_cert_clone.epoch() {
1442                    second_vid_share.data.payload_commitment()
1443                        == da_cert_clone.data().payload_commit
1444                } else {
1445                    Some(second_vid_share.data.payload_commitment())
1446                        == da_cert_clone.data().next_epoch_payload_commit
1447                }
1448            } else {
1449                false
1450            }
1451        }),
1452    )
1453    .completed()
1454    .await
1455    else {
1456        return Err(warn!("Error while waiting for the second VID share."));
1457    };
1458    let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1459        // this shouldn't happen
1460        return Err(warn!(
1461            "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1462             possible."
1463        ));
1464    };
1465    Ok(second_vid_share.clone())
1466}
1467
1468pub async fn broadcast_view_change<TYPES: NodeType>(
1469    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1470    new_view_number: TYPES::View,
1471    epoch: Option<TYPES::Epoch>,
1472    first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
1473) {
1474    let mut broadcast_epoch = epoch;
1475    if let Some((first_epoch_view, first_epoch)) = first_epoch {
1476        if new_view_number == first_epoch_view && broadcast_epoch != Some(first_epoch) {
1477            broadcast_epoch = Some(first_epoch);
1478        }
1479    }
1480    tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1481    broadcast_event(
1482        Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1483        sender,
1484    )
1485    .await
1486}
1487
1488pub fn derive_signed_state_digest(
1489    lc_state: &LightClientState,
1490    next_stake_state: &StakeTableState,
1491    auth_root: &FixedBytes<32>,
1492) -> CircuitField {
1493    let lc_state_sol: LightClientStateSol = (*lc_state).into();
1494    let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1495
1496    let res = alloy::primitives::keccak256(
1497        (
1498            lc_state_sol.abi_encode(),
1499            stake_st_sol.abi_encode(),
1500            auth_root.abi_encode(),
1501        )
1502            .abi_encode_packed(),
1503    );
1504    CircuitField::from_be_bytes_mod_order(res.as_ref())
1505}