hotshot_task_impls/
helpers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{HashMap, HashSet},
9    sync::Arc,
10    time::Instant,
11};
12
13use alloy::primitives::U256;
14use async_broadcast::{Receiver, SendError, Sender};
15use async_lock::RwLock;
16use committable::{Commitment, Committable};
17use hotshot_task::dependency::{Dependency, EventDependency};
18use hotshot_types::{
19    consensus::OuterConsensus,
20    data::{Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2},
21    drb::{DrbInput, DrbResult},
22    epoch_membership::EpochMembershipCoordinator,
23    event::{Event, EventType, LeafInfo},
24    message::{Proposal, UpgradeLock},
25    request_response::ProposalRequestPayload,
26    simple_certificate::{
27        DaCertificate2, LightClientStateUpdateCertificate, NextEpochQuorumCertificate2,
28        QuorumCertificate2, UpgradeCertificate,
29    },
30    simple_vote::HasEpoch,
31    stake_table::StakeTableEntries,
32    traits::{
33        block_contents::BlockHeader,
34        election::Membership,
35        node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
36        signature_key::{SignatureKey, StakeTableEntryType, StateSignatureKey},
37        storage::{load_drb_progress_fn, store_drb_progress_fn, Storage},
38        BlockPayload, ValidatedState,
39    },
40    utils::{
41        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_transition_block,
42        option_epoch_from_block_number, Terminator, View, ViewInner,
43    },
44    vote::{Certificate, HasViewNumber},
45};
46use hotshot_utils::anytrace::*;
47use time::OffsetDateTime;
48use tokio::time::timeout;
49use tracing::instrument;
50use vbs::version::StaticVersionType;
51
52use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
53
54/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
55#[instrument(skip_all)]
56#[allow(clippy::too_many_arguments)]
57pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
58    qc: &QuorumCertificate2<TYPES>,
59    event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
60    event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
61    membership_coordinator: EpochMembershipCoordinator<TYPES>,
62    consensus: OuterConsensus<TYPES>,
63    sender_public_key: TYPES::SignatureKey,
64    sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
65    upgrade_lock: &UpgradeLock<TYPES, V>,
66    epoch_height: u64,
67) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
68    let view_number = qc.view_number();
69    let leaf_commit = qc.data.leaf_commit;
70    // We need to be able to sign this request before submitting it to the network. Compute the
71    // payload first.
72    let signed_proposal_request = ProposalRequestPayload {
73        view_number,
74        key: sender_public_key,
75    };
76
77    // Finally, compute the signature for the payload.
78    let signature = TYPES::SignatureKey::sign(
79        &sender_private_key,
80        signed_proposal_request.commit().as_ref(),
81    )
82    .wrap()
83    .context(error!("Failed to sign proposal. This should never happen."))?;
84
85    tracing::info!("Sending proposal request for view {view_number}");
86
87    // First, broadcast that we need a proposal to the current leader
88    broadcast_event(
89        HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
90        &event_sender,
91    )
92    .await;
93
94    let mut rx = event_receiver.clone();
95    // Make a background task to await the arrival of the event data.
96    let Ok(Some(proposal)) =
97        // We want to explicitly timeout here so we aren't waiting around for the data.
98        timeout(REQUEST_TIMEOUT, async move {
99            // We want to iterate until the proposal is not None, or until we reach the timeout.
100            while let Ok(event) = rx.recv_direct().await {
101                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
102                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
103                    if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
104                        return Some(quorum_proposal.clone());
105                    }
106                }
107            }
108            None
109        })
110        .await
111    else {
112        bail!("Request for proposal failed");
113    };
114
115    let view_number = proposal.data.view_number();
116    let justify_qc = proposal.data.justify_qc().clone();
117
118    let justify_qc_epoch = justify_qc.data.epoch();
119
120    let epoch_membership = membership_coordinator
121        .stake_table_for_epoch(justify_qc_epoch)
122        .await?;
123    let membership_stake_table = epoch_membership.stake_table().await;
124    let membership_success_threshold = epoch_membership.success_threshold().await;
125
126    justify_qc
127        .is_valid_cert(
128            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
129            membership_success_threshold,
130            upgrade_lock,
131        )
132        .await
133        .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
134
135    let mut consensus_writer = consensus.write().await;
136    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
137    let state = Arc::new(
138        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
139    );
140
141    if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
142        tracing::trace!("{e:?}");
143    }
144    let view = View {
145        view_inner: ViewInner::Leaf {
146            leaf: leaf.commit(),
147            state,
148            delta: None,
149            epoch: leaf.epoch(epoch_height),
150        },
151    };
152    Ok((leaf, view))
153}
154pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
155    membership: &Arc<RwLock<TYPES::Membership>>,
156    epoch: TYPES::Epoch,
157    storage: &I::Storage,
158    consensus: &OuterConsensus<TYPES>,
159    drb_result: DrbResult,
160) {
161    let mut consensus_writer = consensus.write().await;
162    consensus_writer.drb_results.store_result(epoch, drb_result);
163    drop(consensus_writer);
164    tracing::debug!("Calling store_drb_result for epoch {epoch}");
165    if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
166        tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
167    }
168
169    membership.write().await.add_drb_result(epoch, drb_result)
170}
171
172/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
173async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
174    decided_leaf: &Leaf2<TYPES>,
175    epoch_height: u64,
176    membership: &Arc<RwLock<TYPES::Membership>>,
177    storage: &I::Storage,
178    consensus: &OuterConsensus<TYPES>,
179    upgrade_lock: &UpgradeLock<TYPES, V>,
180) {
181    let decided_block_number = decided_leaf.block_header().block_number();
182    let view_number = decided_leaf.view_number();
183
184    // Skip if this is not the expected block.
185    if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
186        let next_epoch_number =
187            TYPES::Epoch::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
188
189        let start = Instant::now();
190        if let Err(e) = storage
191            .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
192            .await
193        {
194            tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
195        }
196        tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
197
198        let Ok(drb_seed_input_vec) = bincode::serialize(&decided_leaf.justify_qc().signatures)
199        else {
200            tracing::error!("Failed to serialize the QC signature.");
201            return;
202        };
203
204        let membership = membership.clone();
205        let decided_block_header = decided_leaf.block_header().clone();
206        let storage = storage.clone();
207        let store_drb_progress_fn = store_drb_progress_fn(storage.clone());
208        let load_drb_progress_fn = load_drb_progress_fn(storage.clone());
209        let consensus = consensus.clone();
210
211        let consensus_reader = consensus.read().await;
212        let difficulty_level = if upgrade_lock.upgraded_drb_and_header(view_number).await {
213            consensus_reader.drb_upgrade_difficulty
214        } else {
215            consensus_reader.drb_difficulty
216        };
217
218        drop(consensus_reader);
219
220        tokio::spawn(async move {
221            let membership_clone = membership.clone();
222            let epoch_root_future = tokio::spawn(async move {
223                let start = Instant::now();
224                if let Err(e) = Membership::add_epoch_root(
225                    Arc::clone(&membership_clone),
226                    next_epoch_number,
227                    decided_block_header,
228                )
229                .await
230                {
231                    tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
232                }
233                tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
234            });
235
236            let mut consensus_writer = consensus.write().await;
237            consensus_writer
238                .drb_results
239                .garbage_collect(next_epoch_number);
240            drop(consensus_writer);
241
242            let drb_result_future = tokio::spawn(async move {
243                let start = Instant::now();
244                let mut drb_seed_input = [0u8; 32];
245                let len = drb_seed_input_vec.len().min(32);
246                drb_seed_input[..len].copy_from_slice(&drb_seed_input_vec[..len]);
247
248                let drb_input = DrbInput {
249                    epoch: *next_epoch_number,
250                    iteration: 0,
251                    value: drb_seed_input,
252                    difficulty_level,
253                };
254
255                let drb_result = hotshot_types::drb::compute_drb_result(
256                    drb_input,
257                    store_drb_progress_fn,
258                    load_drb_progress_fn,
259                )
260                .await;
261
262                tracing::info!("Time taken to calculate drb result: {:?}", start.elapsed());
263
264                drb_result
265            });
266
267            let (_, drb_result) = tokio::join!(epoch_root_future, drb_result_future);
268
269            let drb_result = match drb_result {
270                Ok(result) => result,
271                Err(e) => {
272                    tracing::error!("Failed to compute DRB result: {e}");
273                    return;
274                },
275            };
276
277            let start = Instant::now();
278            handle_drb_result::<TYPES, I>(
279                &membership,
280                next_epoch_number,
281                &storage,
282                &consensus,
283                drb_result,
284            )
285            .await;
286            tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
287        });
288    }
289}
290
291/// Helper type to give names and to the output values of the leaf chain traversal operation.
292#[derive(Debug)]
293pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
294    /// The new locked view obtained from a 2 chain starting from the proposal's parent.
295    pub new_locked_view_number: Option<TYPES::View>,
296
297    /// The new decided view obtained from a 3 chain starting from the proposal's parent.
298    pub new_decided_view_number: Option<TYPES::View>,
299
300    /// The qc for the decided chain.
301    pub new_decide_qc: Option<QuorumCertificate2<TYPES>>,
302
303    /// The decided leaves with corresponding validated state and VID info.
304    pub leaf_views: Vec<LeafInfo<TYPES>>,
305
306    /// The transactions in the block payload for each leaf.
307    pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
308
309    /// The most recent upgrade certificate from one of the leaves.
310    pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
311}
312
313/// We need Default to be implemented because the leaf ascension has very few failure branches,
314/// and when they *do* happen, we still return intermediate states. Default makes the burden
315/// of filling values easier.
316impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
317    /// The default method for this type is to set all of the returned values to `None`.
318    fn default() -> Self {
319        Self {
320            new_locked_view_number: None,
321            new_decided_view_number: None,
322            new_decide_qc: None,
323            leaf_views: Vec::new(),
324            included_txns: None,
325            decided_upgrade_cert: None,
326        }
327    }
328}
329
330async fn update_metrics<TYPES: NodeType>(
331    consensus: &OuterConsensus<TYPES>,
332    leaf_views: &[LeafInfo<TYPES>],
333) {
334    let consensus_reader = consensus.read().await;
335    let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
336
337    for leaf_view in leaf_views {
338        let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
339
340        let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
341            tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
342            continue;
343        };
344        consensus_reader
345            .metrics
346            .proposal_to_decide_time
347            .add_point(proposal_to_decide_time as f64);
348        if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
349            consensus_reader
350                .metrics
351                .finalized_bytes
352                .add_point(txn_bytes as f64);
353        }
354    }
355}
356
357/// calculate the new decided leaf chain based on the rules of HotStuff 2
358///
359/// # Panics
360/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
361/// impossible.
362#[allow(clippy::too_many_arguments)]
363pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
364    proposal: &QuorumProposalWrapper<TYPES>,
365    consensus: OuterConsensus<TYPES>,
366    existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
367    public_key: &TYPES::SignatureKey,
368    with_epochs: bool,
369    membership: &EpochMembershipCoordinator<TYPES>,
370    storage: &I::Storage,
371    upgrade_lock: &UpgradeLock<TYPES, V>,
372) -> LeafChainTraversalOutcome<TYPES> {
373    let mut res = LeafChainTraversalOutcome::default();
374    let consensus_reader = consensus.read().await;
375    let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
376    res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
377
378    // If we don't have the proposals parent return early
379    let Some(parent_info) = consensus_reader
380        .parent_leaf_info(&proposed_leaf, public_key)
381        .await
382    else {
383        return res;
384    };
385    // Get the parents parent and check if it's consecutive in view to the parent, if so we can decided
386    // the grandparents view.  If not we're done.
387    let Some(grand_parent_info) = consensus_reader
388        .parent_leaf_info(&parent_info.leaf, public_key)
389        .await
390    else {
391        return res;
392    };
393    if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
394        return res;
395    }
396    res.new_decide_qc = Some(parent_info.leaf.justify_qc().clone());
397    let decided_view_number = grand_parent_info.leaf.view_number();
398    res.new_decided_view_number = Some(decided_view_number);
399    // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it.
400    let old_anchor_view = consensus_reader.last_decided_view();
401    let mut current_leaf_info = Some(grand_parent_info);
402    let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
403    let mut txns = HashSet::new();
404    while current_leaf_info
405        .as_ref()
406        .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
407    {
408        // unwrap is safe, we just checked that he option is some
409        let info = &mut current_leaf_info.unwrap();
410        // Check if there's a new upgrade certificate available.
411        if let Some(cert) = info.leaf.upgrade_certificate() {
412            if info.leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
413                if cert.data.decide_by < decided_view_number {
414                    tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
415                } else {
416                    tracing::info!("Reached decide on upgrade certificate: {cert:?}");
417                    res.decided_upgrade_cert = Some(cert.clone());
418                }
419            }
420        }
421
422        // If the block payload is available for this leaf, include it in
423        // the leaf chain that we send to the client.
424        if let Some(payload) = consensus_reader
425            .saved_payloads()
426            .get(&info.leaf.view_number())
427        {
428            info.leaf
429                .fill_block_payload_unchecked(payload.as_ref().payload.clone());
430        }
431
432        if let Some(ref payload) = info.leaf.block_payload() {
433            for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
434                txns.insert(txn);
435            }
436        }
437
438        current_leaf_info = consensus_reader
439            .parent_leaf_info(&info.leaf, public_key)
440            .await;
441        res.leaf_views.push(info.clone());
442    }
443
444    if !txns.is_empty() {
445        res.included_txns = Some(txns);
446    }
447
448    if with_epochs && res.new_decided_view_number.is_some() {
449        let Some(first_leaf) = res.leaf_views.first() else {
450            return res;
451        };
452        let epoch_height = consensus_reader.epoch_height;
453        consensus_reader
454            .metrics
455            .last_synced_block_height
456            .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
457        drop(consensus_reader);
458
459        for decided_leaf_info in &res.leaf_views {
460            decide_epoch_root::<TYPES, I, V>(
461                &decided_leaf_info.leaf,
462                epoch_height,
463                membership.membership(),
464                storage,
465                &consensus,
466                upgrade_lock,
467            )
468            .await;
469        }
470        update_metrics(&consensus, &res.leaf_views).await;
471    }
472
473    res
474}
475
476/// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin
477/// by obtaining the parent view, and if we are in a chain (i.e. the next view from the parent is
478/// one view newer), then we begin attempting to form the chain. This is a direct impl from
479/// [HotStuff](https://arxiv.org/pdf/1803.05069) section 5:
480///
481/// > When a node b* carries a QC that refers to a direct parent, i.e., b*.justify.node = b*.parent,
482/// > we say that it forms a One-Chain. Denote by b'' = b*.justify.node. Node b* forms a Two-Chain,
483/// > if in addition to forming a One-Chain, b''.justify.node = b''.parent.
484/// > It forms a Three-Chain, if b'' forms a Two-Chain.
485///
486/// We follow this exact logic to determine if we are able to reach a commit and a decide. A commit
487/// is reached when we have a two chain, and a decide is reached when we have a three chain.
488///
489/// # Example
490/// Suppose we have a decide for view 1, and we then move on to get undecided views 2, 3, and 4. Further,
491/// suppose that our *next* proposal is for view 5, but this leader did not see info for view 4, so the
492/// justify qc of the proposal points to view 3. This is fine, and the undecided chain now becomes
493/// 2-3-5.
494///
495/// Assuming we continue with honest leaders, we then eventually could get a chain like: 2-3-5-6-7-8. This
496/// will prompt a decide event to occur (this code), where the `proposal` is for view 8. Now, since the
497/// lowest value in the 3-chain here would be 5 (excluding 8 since we only walk the parents), we begin at
498/// the first link in the chain, and walk back through all undecided views, making our new anchor view 5,
499/// and out new locked view will be 6.
500///
501/// Upon receipt then of a proposal for view 9, assuming it is valid, this entire process will repeat, and
502/// the anchor view will be set to view 6, with the locked view as view 7.
503///
504/// # Panics
505/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
506/// impossible.
507#[allow(clippy::too_many_arguments)]
508pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
509    proposal: &QuorumProposalWrapper<TYPES>,
510    consensus: OuterConsensus<TYPES>,
511    existing_upgrade_cert: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
512    public_key: &TYPES::SignatureKey,
513    with_epochs: bool,
514    membership: &Arc<RwLock<TYPES::Membership>>,
515    storage: &I::Storage,
516    epoch_height: u64,
517    upgrade_lock: &UpgradeLock<TYPES, V>,
518) -> LeafChainTraversalOutcome<TYPES> {
519    let consensus_reader = consensus.read().await;
520    let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
521    let view_number = proposal.view_number();
522    let parent_view_number = proposal.justify_qc().view_number();
523    let old_anchor_view = consensus_reader.last_decided_view();
524
525    let mut last_view_number_visited = view_number;
526    let mut current_chain_length = 0usize;
527    let mut res = LeafChainTraversalOutcome::default();
528
529    if let Err(e) = consensus_reader.visit_leaf_ancestors(
530        parent_view_number,
531        Terminator::Exclusive(old_anchor_view),
532        true,
533        |leaf, state, delta| {
534            // This is the core paper logic. We're implementing the chain in chained hotstuff.
535            if res.new_decided_view_number.is_none() {
536                // If the last view number is the child of the leaf we've moved to...
537                if last_view_number_visited == leaf.view_number() + 1 {
538                    last_view_number_visited = leaf.view_number();
539
540                    // The chain grows by one
541                    current_chain_length += 1;
542
543                    // We emit a locked view when the chain length is 2
544                    if current_chain_length == 2 {
545                        res.new_locked_view_number = Some(leaf.view_number());
546                        // The next leaf in the chain, if there is one, is decided, so this
547                        // leaf's justify_qc would become the QC for the decided chain.
548                        res.new_decide_qc = Some(leaf.justify_qc().clone());
549                    } else if current_chain_length == 3 {
550                        // And we decide when the chain length is 3.
551                        res.new_decided_view_number = Some(leaf.view_number());
552                    }
553                } else {
554                    // There isn't a new chain extension available, so we signal to the callback
555                    // owner that we can exit for now.
556                    return false;
557                }
558            }
559
560            // Now, if we *have* reached a decide, we need to do some state updates.
561            if let Some(new_decided_view) = res.new_decided_view_number {
562                // First, get a mutable reference to the provided leaf.
563                let mut leaf = leaf.clone();
564
565                // Update the metrics
566                if leaf.view_number() == new_decided_view {
567                    consensus_reader
568                        .metrics
569                        .last_synced_block_height
570                        .set(usize::try_from(leaf.height()).unwrap_or(0));
571                }
572
573                // Check if there's a new upgrade certificate available.
574                if let Some(cert) = leaf.upgrade_certificate() {
575                    if leaf.upgrade_certificate() != *existing_upgrade_cert_reader {
576                        if cert.data.decide_by < view_number {
577                            tracing::warn!(
578                                "Failed to decide an upgrade certificate in time. Ignoring."
579                            );
580                        } else {
581                            tracing::info!("Reached decide on upgrade certificate: {cert:?}");
582                            res.decided_upgrade_cert = Some(cert.clone());
583                        }
584                    }
585                }
586                // If the block payload is available for this leaf, include it in
587                // the leaf chain that we send to the client.
588                if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
589                    leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
590                }
591
592                // Get the VID share at the leaf's view number, corresponding to our key
593                // (if one exists)
594                let vid_share = consensus_reader
595                    .vid_shares()
596                    .get(&leaf.view_number())
597                    .and_then(|key_map| key_map.get(public_key))
598                    .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
599                    .map(|prop| prop.data.clone());
600
601                let state_cert = if leaf.with_epoch
602                    && is_epoch_root(
603                        leaf.block_header().block_number(),
604                        consensus_reader.epoch_height,
605                    ) {
606                    match consensus_reader.state_cert() {
607                        // Sanity check that the state cert is for the same view as the decided leaf
608                        Some(state_cert)
609                            if state_cert.light_client_state.view_number
610                                == leaf.view_number().u64() =>
611                        {
612                            Some(state_cert.clone())
613                        },
614                        _ => None,
615                    }
616                } else {
617                    None
618                };
619
620                // Add our data into a new `LeafInfo`
621                res.leaf_views.push(LeafInfo::new(
622                    leaf.clone(),
623                    Arc::clone(&state),
624                    delta.clone(),
625                    vid_share,
626                    state_cert,
627                ));
628                if let Some(ref payload) = leaf.block_payload() {
629                    res.included_txns = Some(
630                        payload
631                            .transaction_commitments(leaf.block_header().metadata())
632                            .into_iter()
633                            .collect::<HashSet<_>>(),
634                    );
635                }
636            }
637            true
638        },
639    ) {
640        tracing::debug!("Leaf ascension failed; error={e}");
641    }
642
643    let epoch_height = consensus_reader.epoch_height;
644    drop(consensus_reader);
645
646    if with_epochs && res.new_decided_view_number.is_some() {
647        for decided_leaf_info in &res.leaf_views {
648            decide_epoch_root::<TYPES, I, V>(
649                &decided_leaf_info.leaf,
650                epoch_height,
651                membership,
652                storage,
653                &consensus,
654                upgrade_lock,
655            )
656            .await;
657        }
658    }
659
660    res
661}
662
663/// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not.
664#[instrument(skip_all)]
665#[allow(clippy::too_many_arguments)]
666pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
667    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
668    event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
669    membership: EpochMembershipCoordinator<TYPES>,
670    public_key: TYPES::SignatureKey,
671    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
672    consensus: OuterConsensus<TYPES>,
673    upgrade_lock: &UpgradeLock<TYPES, V>,
674    parent_qc: &QuorumCertificate2<TYPES>,
675    epoch_height: u64,
676) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
677    let consensus_reader = consensus.read().await;
678    let vsm_contains_parent_view = consensus_reader
679        .validated_state_map()
680        .contains_key(&parent_qc.view_number());
681    drop(consensus_reader);
682
683    if !vsm_contains_parent_view {
684        let _ = fetch_proposal(
685            parent_qc,
686            event_sender.clone(),
687            event_receiver.clone(),
688            membership,
689            consensus.clone(),
690            public_key.clone(),
691            private_key.clone(),
692            upgrade_lock,
693            epoch_height,
694        )
695        .await
696        .context(info!("Failed to fetch proposal"))?;
697    }
698
699    let consensus_reader = consensus.read().await;
700    let parent_view = consensus_reader.validated_state_map().get(&parent_qc.view_number()).context(
701        debug!("Couldn't find parent view in state map, waiting for replica to see proposal; parent_view_number: {}", *parent_qc.view_number())
702    )?;
703
704    let (leaf_commitment, state) = parent_view.leaf_and_state().context(
705        info!("Parent of high QC points to a view without a proposal; parent_view_number: {}, parent_view {:?}", *parent_qc.view_number(), parent_view)
706    )?;
707
708    if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
709        // NOTE: This happens on the genesis block
710        tracing::debug!(
711            "They don't equal: {:?}   {:?}",
712            leaf_commitment,
713            consensus_reader.high_qc().data().leaf_commit
714        );
715    }
716
717    let leaf = consensus_reader
718        .saved_leaves()
719        .get(&leaf_commitment)
720        .context(info!("Failed to find high QC of parent"))?;
721
722    Ok((leaf.clone(), Arc::clone(state)))
723}
724
725pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
726    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
727    validation_info: &ValidationInfo<TYPES, I, V>,
728) -> Result<()> {
729    let in_transition_epoch = proposal
730        .data
731        .justify_qc()
732        .data
733        .block_number
734        .is_some_and(|bn| {
735            !is_transition_block(bn, validation_info.epoch_height)
736                && is_epoch_transition(bn, validation_info.epoch_height)
737                && bn % validation_info.epoch_height != 0
738        });
739    let justify_qc = proposal.data.justify_qc();
740    let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
741    if !in_transition_epoch {
742        tracing::debug!(
743            "Storing high QC for view {:?} and height {:?}",
744            justify_qc.view_number(),
745            justify_qc.data.block_number
746        );
747        if let Err(e) = validation_info
748            .storage
749            .update_high_qc2(justify_qc.clone())
750            .await
751        {
752            bail!("Failed to store High QC, not voting; error = {e:?}");
753        }
754        if justify_qc
755            .data
756            .block_number
757            .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
758        {
759            let Some(state_cert) = proposal.data.state_cert() else {
760                bail!("Epoch root QC has no state cert, not voting!");
761            };
762            if let Err(e) = validation_info
763                .storage
764                .update_state_cert(state_cert.clone())
765                .await
766            {
767                bail!("Failed to store the light client state update certificate, not voting; error = {:?}", e);
768            }
769            validation_info
770                .consensus
771                .write()
772                .await
773                .update_state_cert(state_cert.clone())?;
774        }
775        if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
776            if let Err(e) = validation_info
777                .storage
778                .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
779                .await
780            {
781                bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
782            }
783        }
784    }
785    let mut consensus_writer = validation_info.consensus.write().await;
786    if let Some(ref next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
787        if justify_qc
788            .data
789            .block_number
790            .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
791        {
792            consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
793            consensus_writer
794                .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
795            return Ok(());
796        }
797        consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
798    }
799    consensus_writer.update_high_qc(justify_qc.clone())?;
800
801    Ok(())
802}
803
804async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
805    validation_info: &ValidationInfo<TYPES, I, V>,
806) -> Option<(
807    QuorumCertificate2<TYPES>,
808    NextEpochQuorumCertificate2<TYPES>,
809)> {
810    validation_info
811        .consensus
812        .read()
813        .await
814        .transition_qc()
815        .cloned()
816}
817
818pub(crate) async fn validate_epoch_transition_qc<
819    TYPES: NodeType,
820    I: NodeImplementation<TYPES>,
821    V: Versions,
822>(
823    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
824    validation_info: &ValidationInfo<TYPES, I, V>,
825) -> Result<()> {
826    let proposed_qc = proposal.data.justify_qc();
827    let Some(qc_block_number) = proposed_qc.data().block_number else {
828        bail!("Justify QC has no block number");
829    };
830    if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
831        || qc_block_number % validation_info.epoch_height == 0
832    {
833        return Ok(());
834    }
835    let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
836        bail!("Next epoch justify QC is not present");
837    };
838    ensure!(
839        next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
840        "Next epoch QC has different leaf commit to justify QC"
841    );
842
843    if is_transition_block(qc_block_number, validation_info.epoch_height) {
844        // Height is epoch height - 2
845        ensure!(
846            transition_qc(validation_info).await.is_none_or(
847                |(qc, _)| qc.view_number() <= proposed_qc.view_number()
848            ),
849            "Proposed transition qc must have view number greater than or equal to previous transition QC"
850        );
851
852        validation_info
853            .consensus
854            .write()
855            .await
856            .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
857        // reset the high qc to the transition qc
858        update_high_qc(proposal, validation_info).await?;
859    } else {
860        // Height is either epoch height - 1 or epoch height
861        ensure!(
862            transition_qc(validation_info)
863                .await
864                .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
865            "Transition block must have view number greater than previous transition QC"
866        );
867        ensure!(
868            proposal.data.view_change_evidence().is_none(),
869            "Second to last block and last block of epoch must directly extend previous block, Qc Block number: {qc_block_number}, Proposal Block number: {}",
870            proposal.data.block_header().block_number()
871        );
872        ensure!(
873            proposed_qc.view_number() + 1 == proposal.data.view_number()
874            || transition_qc(validation_info).await.is_some_and(|(qc, _)| &qc == proposed_qc),
875            "Transition proposals must extend the previous view directly, or extend the previous transition block"
876        );
877    }
878    Ok(())
879}
880
881/// Validate the state and safety and liveness of a proposal then emit
882/// a `QuorumProposalValidated` event.
883///
884///
885/// # Errors
886/// If any validation or state update fails.
887#[allow(clippy::too_many_lines)]
888#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
889pub async fn validate_proposal_safety_and_liveness<
890    TYPES: NodeType,
891    I: NodeImplementation<TYPES>,
892    V: Versions,
893>(
894    proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
895    parent_leaf: Leaf2<TYPES>,
896    validation_info: &ValidationInfo<TYPES, I, V>,
897    event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
898    sender: TYPES::SignatureKey,
899) -> Result<()> {
900    let view_number = proposal.data.view_number();
901
902    let mut valid_epoch_transition = false;
903    if validation_info
904        .upgrade_lock
905        .version(proposal.data.justify_qc().view_number())
906        .await
907        .is_ok_and(|v| v >= V::Epochs::VERSION)
908    {
909        let Some(block_number) = proposal.data.justify_qc().data.block_number else {
910            bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
911        };
912        if is_epoch_transition(block_number, validation_info.epoch_height) {
913            validate_epoch_transition_qc(&proposal, validation_info).await?;
914            valid_epoch_transition = true;
915        }
916    }
917
918    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
919    ensure!(
920        proposed_leaf.parent_commitment() == parent_leaf.commit(),
921        "Proposed leaf does not extend the parent leaf."
922    );
923    let proposal_epoch = option_epoch_from_block_number::<TYPES>(
924        validation_info
925            .upgrade_lock
926            .epochs_enabled(view_number)
927            .await,
928        proposed_leaf.height(),
929        validation_info.epoch_height,
930    );
931
932    let state = Arc::new(
933        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
934    );
935
936    {
937        let mut consensus_writer = validation_info.consensus.write().await;
938        if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
939            tracing::trace!("{e:?}");
940        }
941
942        // Update our internal storage of the proposal. The proposal is valid, so
943        // we swallow this error and just log if it occurs.
944        if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
945            tracing::debug!("Internal proposal update failed; error = {e:#}");
946        };
947    }
948
949    UpgradeCertificate::validate(
950        proposal.data.upgrade_certificate(),
951        &validation_info.membership,
952        proposal_epoch,
953        &validation_info.upgrade_lock,
954    )
955    .await?;
956
957    // Validate that the upgrade certificate is re-attached, if we saw one on the parent
958    proposed_leaf
959        .extends_upgrade(
960            &parent_leaf,
961            &validation_info.upgrade_lock.decided_upgrade_certificate,
962        )
963        .await?;
964
965    let justify_qc = proposal.data.justify_qc().clone();
966    // Create a positive vote if either liveness or safety check
967    // passes.
968
969    {
970        let consensus_reader = validation_info.consensus.read().await;
971        // Epoch safety check:
972        // The proposal is safe if
973        // 1. the proposed block and the justify QC block belong to the same epoch or
974        // 2. the justify QC is the eQC for the previous block
975        let justify_qc_epoch = option_epoch_from_block_number::<TYPES>(
976            validation_info
977                .upgrade_lock
978                .epochs_enabled(view_number)
979                .await,
980            parent_leaf.height(),
981            validation_info.epoch_height,
982        );
983        ensure!(
984            proposal_epoch == justify_qc_epoch
985                || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
986            {
987                error!("Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify QC leaf is {parent_leaf:?}")
988            }
989        );
990
991        // Make sure that the epoch transition proposal includes the next epoch QC
992        if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
993            && validation_info
994                .upgrade_lock
995                .epochs_enabled(view_number)
996                .await
997        {
998            ensure!(proposal.data.next_epoch_justify_qc().is_some(),
999            "Epoch transition proposal does not include the next epoch justify QC. Do not vote!");
1000        }
1001
1002        // Liveness check.
1003        let liveness_check =
1004            justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1005
1006        // Safety check.
1007        // Check if proposal extends from the locked leaf.
1008        let outcome = consensus_reader.visit_leaf_ancestors(
1009            justify_qc.view_number(),
1010            Terminator::Inclusive(consensus_reader.locked_view()),
1011            false,
1012            |leaf, _, _| {
1013                // if leaf view no == locked view no then we're done, report success by
1014                // returning true
1015                leaf.view_number() != consensus_reader.locked_view()
1016            },
1017        );
1018        let safety_check = outcome.is_ok();
1019
1020        ensure!(safety_check || liveness_check, {
1021            if let Err(e) = outcome {
1022                broadcast_event(
1023                    Event {
1024                        view_number,
1025                        event: EventType::Error { error: Arc::new(e) },
1026                    },
1027                    &validation_info.output_event_stream,
1028                )
1029                .await;
1030            }
1031
1032            error!("Failed safety and liveness check \n High QC is {:?}  Proposal QC is {:?}  Locked view is {:?}", consensus_reader.high_qc(), proposal.data, consensus_reader.locked_view())
1033        });
1034    }
1035
1036    // We accept the proposal, notify the application layer
1037    broadcast_event(
1038        Event {
1039            view_number,
1040            event: EventType::QuorumProposal {
1041                proposal: proposal.clone(),
1042                sender,
1043            },
1044        },
1045        &validation_info.output_event_stream,
1046    )
1047    .await;
1048
1049    // Notify other tasks
1050    broadcast_event(
1051        Arc::new(HotShotEvent::QuorumProposalValidated(
1052            proposal.clone(),
1053            parent_leaf,
1054        )),
1055        &event_stream,
1056    )
1057    .await;
1058
1059    Ok(())
1060}
1061
1062/// Validates, from a given `proposal` that the view that it is being submitted for is valid when
1063/// compared to `cur_view` which is the highest proposed view (so far) for the caller. If the proposal
1064/// is for a view that's later than expected, that the proposal includes a timeout or view sync certificate.
1065///
1066/// # Errors
1067/// If any validation or view number check fails.
1068pub(crate) async fn validate_proposal_view_and_certs<
1069    TYPES: NodeType,
1070    I: NodeImplementation<TYPES>,
1071    V: Versions,
1072>(
1073    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1074    validation_info: &ValidationInfo<TYPES, I, V>,
1075) -> Result<()> {
1076    let view_number = proposal.data.view_number();
1077    ensure!(
1078        view_number >= validation_info.consensus.read().await.cur_view(),
1079        "Proposal is from an older view {:?}",
1080        proposal.data
1081    );
1082
1083    // Validate the proposal's signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment
1084    let mut membership = validation_info.membership.clone();
1085    proposal.validate_signature(&membership).await?;
1086
1087    // Verify a timeout certificate OR a view sync certificate exists and is valid.
1088    if proposal.data.justify_qc().view_number() != view_number - 1 {
1089        let received_proposal_cert =
1090            proposal.data.view_change_evidence().clone().context(debug!(
1091                "Quorum proposal for view {view_number} needed a timeout or view sync certificate, but did not have one",
1092        ))?;
1093
1094        match received_proposal_cert {
1095            ViewChangeEvidence2::Timeout(timeout_cert) => {
1096                ensure!(
1097                    timeout_cert.data().view == view_number - 1,
1098                    "Timeout certificate for view {view_number} was not for the immediately preceding view"
1099                );
1100                let timeout_cert_epoch = timeout_cert.data().epoch();
1101                membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1102
1103                let membership_stake_table = membership.stake_table().await;
1104                let membership_success_threshold = membership.success_threshold().await;
1105
1106                timeout_cert
1107                    .is_valid_cert(
1108                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1109                        membership_success_threshold,
1110                        &validation_info.upgrade_lock,
1111                    )
1112                    .await
1113                    .context(|e| {
1114                        warn!("Timeout certificate for view {view_number} was invalid: {e}")
1115                    })?;
1116            },
1117            ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1118                ensure!(
1119                    view_sync_cert.view_number == view_number,
1120                    "View sync cert view number {:?} does not match proposal view number {:?}",
1121                    view_sync_cert.view_number,
1122                    view_number
1123                );
1124
1125                let view_sync_cert_epoch = view_sync_cert.data().epoch();
1126                membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1127
1128                let membership_stake_table = membership.stake_table().await;
1129                let membership_success_threshold = membership.success_threshold().await;
1130
1131                // View sync certs must also be valid.
1132                view_sync_cert
1133                    .is_valid_cert(
1134                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1135                        membership_success_threshold,
1136                        &validation_info.upgrade_lock,
1137                    )
1138                    .await
1139                    .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1140            },
1141        }
1142    }
1143
1144    // Validate the upgrade certificate -- this is just a signature validation.
1145    // Note that we don't do anything with the certificate directly if this passes; it eventually gets stored as part of the leaf if nothing goes wrong.
1146    {
1147        let epoch = option_epoch_from_block_number::<TYPES>(
1148            proposal.data.epoch().is_some(),
1149            proposal.data.block_header().block_number(),
1150            validation_info.epoch_height,
1151        );
1152        UpgradeCertificate::validate(
1153            proposal.data.upgrade_certificate(),
1154            &validation_info.membership,
1155            epoch,
1156            &validation_info.upgrade_lock,
1157        )
1158        .await?;
1159    }
1160
1161    Ok(())
1162}
1163
1164/// Helper function to send events and log errors
1165pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1166    match sender.broadcast_direct(event).await {
1167        Ok(None) => (),
1168        Ok(Some(overflowed)) => {
1169            tracing::error!(
1170                "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1171            );
1172        },
1173        Err(SendError(e)) => {
1174            tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1175        },
1176    }
1177}
1178
1179/// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it
1180/// corresponds to the provided high_qc.
1181pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
1182    qc: &QuorumCertificate2<TYPES>,
1183    maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1184    consensus: &OuterConsensus<TYPES>,
1185    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1186    upgrade_lock: &UpgradeLock<TYPES, V>,
1187    epoch_height: u64,
1188) -> Result<()> {
1189    let mut epoch_membership = membership_coordinator
1190        .stake_table_for_epoch(qc.data.epoch)
1191        .await?;
1192
1193    let membership_stake_table = epoch_membership.stake_table().await;
1194    let membership_success_threshold = epoch_membership.success_threshold().await;
1195
1196    if let Err(e) = qc
1197        .is_valid_cert(
1198            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1199            membership_success_threshold,
1200            upgrade_lock,
1201        )
1202        .await
1203    {
1204        consensus.read().await.metrics.invalid_qc.update(1);
1205        return Err(warn!("Invalid certificate: {e}"));
1206    }
1207
1208    if upgrade_lock.epochs_enabled(qc.view_number()).await {
1209        ensure!(
1210            qc.data.block_number.is_some(),
1211            "QC for epoch {:?} has no block number",
1212            qc.data.epoch
1213        );
1214    }
1215
1216    if qc
1217        .data
1218        .block_number
1219        .is_some_and(|b| is_epoch_transition(b, epoch_height))
1220    {
1221        ensure!(
1222            maybe_next_epoch_qc.is_some(),
1223            error!("Received High QC for the transition block but not the next epoch QC")
1224        );
1225    }
1226
1227    if let Some(next_epoch_qc) = maybe_next_epoch_qc {
1228        // If the next epoch qc exists, make sure it's equal to the qc
1229        if qc.view_number() != next_epoch_qc.view_number() || qc.data != *next_epoch_qc.data {
1230            bail!("Next epoch qc exists but it's not equal with qc.");
1231        }
1232        epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1233        let membership_next_stake_table = epoch_membership.stake_table().await;
1234        let membership_next_success_threshold = epoch_membership.success_threshold().await;
1235
1236        // Validate the next epoch qc as well
1237        next_epoch_qc
1238            .is_valid_cert(
1239                &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1240                membership_next_success_threshold,
1241                upgrade_lock,
1242            )
1243            .await
1244            .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1245    }
1246    Ok(())
1247}
1248
1249/// Validates the light client state update certificate
1250pub async fn validate_light_client_state_update_certificate<TYPES: NodeType>(
1251    state_cert: &LightClientStateUpdateCertificate<TYPES>,
1252    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1253) -> Result<()> {
1254    tracing::debug!("Validating light client state update certificate");
1255
1256    let epoch_membership = membership_coordinator
1257        .membership_for_epoch(state_cert.epoch())
1258        .await?;
1259
1260    let membership_stake_table = epoch_membership.stake_table().await;
1261    let membership_success_threshold = epoch_membership.success_threshold().await;
1262
1263    let mut state_key_map = HashMap::new();
1264    membership_stake_table.into_iter().for_each(|config| {
1265        state_key_map.insert(
1266            config.state_ver_key.clone(),
1267            config.stake_table_entry.stake(),
1268        );
1269    });
1270
1271    let mut accumulated_stake = U256::from(0);
1272    for (key, sig) in state_cert.signatures.iter() {
1273        if let Some(stake) = state_key_map.get(key) {
1274            accumulated_stake += *stake;
1275            if !key.verify_state_sig(
1276                sig,
1277                &state_cert.light_client_state,
1278                &state_cert.next_stake_table_state,
1279            ) {
1280                bail!("Invalid light client state update certificate signature");
1281            }
1282        } else {
1283            bail!("Invalid light client state update certificate signature");
1284        }
1285    }
1286    if accumulated_stake < membership_success_threshold {
1287        bail!("Light client state update certificate does not meet the success threshold");
1288    }
1289
1290    Ok(())
1291}
1292
1293pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1294    qc: &QuorumCertificate2<TYPES>,
1295    state_cert: &LightClientStateUpdateCertificate<TYPES>,
1296    epoch_height: u64,
1297) -> bool {
1298    qc.data
1299        .block_number
1300        .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1301        && Some(state_cert.epoch) == qc.data.epoch()
1302        && qc.view_number().u64() == state_cert.light_client_state.view_number
1303}
1304
1305/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state;
1306/// makes sure it corresponds to the given DA certificate;
1307/// if it's not yet available, waits for it with the given timeout.
1308pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1309    target_epoch: Option<TYPES::Epoch>,
1310    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1311    da_cert: &DaCertificate2<TYPES>,
1312    consensus: &OuterConsensus<TYPES>,
1313    receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1314    cancel_receiver: Receiver<()>,
1315    id: u64,
1316) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1317    tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1318    let maybe_second_vid_share = consensus
1319        .read()
1320        .await
1321        .vid_shares()
1322        .get(&vid_share.data.view_number())
1323        .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1324        .and_then(|epoch_map| epoch_map.get(&target_epoch))
1325        .cloned();
1326    if let Some(second_vid_share) = maybe_second_vid_share {
1327        if (target_epoch == da_cert.epoch()
1328            && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1329            || (target_epoch != da_cert.epoch()
1330                && Some(second_vid_share.data.payload_commitment())
1331                    == da_cert.data().next_epoch_payload_commit)
1332        {
1333            return Ok(second_vid_share);
1334        }
1335    }
1336
1337    let receiver = receiver.clone();
1338    let da_cert_clone = da_cert.clone();
1339    let Some(event) = EventDependency::new(
1340        receiver,
1341        cancel_receiver,
1342        format!(
1343            "VoteDependency Second VID share for view {:?}, my id {:?}",
1344            vid_share.data.view_number(),
1345            id
1346        ),
1347        Box::new(move |event| {
1348            let event = event.as_ref();
1349            if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1350                if target_epoch == da_cert_clone.epoch() {
1351                    second_vid_share.data.payload_commitment()
1352                        == da_cert_clone.data().payload_commit
1353                } else {
1354                    Some(second_vid_share.data.payload_commitment())
1355                        == da_cert_clone.data().next_epoch_payload_commit
1356                }
1357            } else {
1358                false
1359            }
1360        }),
1361    )
1362    .completed()
1363    .await
1364    else {
1365        return Err(warn!("Error while waiting for the second VID share."));
1366    };
1367    let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1368        // this shouldn't happen
1369        return Err(warn!("Received event is not VidShareValidated but we checked it earlier. Shouldn't be possible."));
1370    };
1371    Ok(second_vid_share.clone())
1372}
1373
1374pub async fn broadcast_view_change<TYPES: NodeType>(
1375    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1376    new_view_number: TYPES::View,
1377    epoch: Option<TYPES::Epoch>,
1378    first_epoch: Option<(TYPES::View, TYPES::Epoch)>,
1379) {
1380    let mut broadcast_epoch = epoch;
1381    if let Some((first_epoch_view, first_epoch)) = first_epoch {
1382        if new_view_number == first_epoch_view && broadcast_epoch != Some(first_epoch) {
1383            broadcast_epoch = Some(first_epoch);
1384        }
1385    }
1386    tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1387    broadcast_event(
1388        Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1389        sender,
1390    )
1391    .await
1392}