hotshot_types/
consensus.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides the core consensus types
8
9use std::{
10    collections::{BTreeMap, HashMap, HashSet},
11    mem::ManuallyDrop,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14};
15
16use alloy::primitives::U256;
17use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
18use committable::{Commitment, Committable};
19use hotshot_utils::anytrace::*;
20use tracing::instrument;
21use vec1::Vec1;
22
23pub use crate::utils::{View, ViewInner};
24use crate::{
25    data::{
26        EpochNumber, Leaf2, QuorumProposalWrapper, VidCommitment, VidDisperse,
27        VidDisperseAndDuration, VidDisperseShare, ViewNumber,
28    },
29    epoch_membership::EpochMembershipCoordinator,
30    error::HotShotError,
31    event::{HotShotAction, LeafInfo},
32    message::{Proposal, UpgradeLock},
33    simple_certificate::{
34        DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
35        QuorumCertificate2,
36    },
37    simple_vote::HasEpoch,
38    stake_table::{HSStakeTable, StakeTableEntries},
39    traits::{
40        block_contents::{BlockHeader, BuilderFee},
41        metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
42        node_implementation::NodeType,
43        signature_key::{SignatureKey, StakeTableEntryType},
44        BlockPayload, ValidatedState,
45    },
46    utils::{
47        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
48        is_transition_block, option_epoch_from_block_number, BuilderCommitment, LeafCommitment,
49        StateAndDelta, Terminator,
50    },
51    vote::{Certificate, HasViewNumber},
52};
53
54/// A type alias for `HashMap<Commitment<T>, T>`
55pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
56
57/// A type alias for `BTreeMap<T::Time, HashMap<T::SignatureKey, BTreeMap<T::Epoch, Proposal<T, VidDisperseShare<T>>>>>`
58pub type VidShares<TYPES> = BTreeMap<
59    ViewNumber,
60    HashMap<
61        <TYPES as NodeType>::SignatureKey,
62        BTreeMap<Option<EpochNumber>, Proposal<TYPES, VidDisperseShare<TYPES>>>,
63    >,
64>;
65
66/// Type alias for consensus state wrapped in a lock.
67pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
68
69/// A thin wrapper around `LockedConsensusState` that helps debugging locks
70#[derive(Clone, Debug)]
71pub struct OuterConsensus<TYPES: NodeType> {
72    /// Inner `LockedConsensusState`
73    pub inner_consensus: LockedConsensusState<TYPES>,
74}
75
76impl<TYPES: NodeType> OuterConsensus<TYPES> {
77    /// Create a new instance of `OuterConsensus`, hopefully uniquely named
78    pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
79        Self {
80            inner_consensus: consensus,
81        }
82    }
83
84    /// Locks inner consensus for reading and leaves debug traces
85    #[instrument(skip_all, target = "OuterConsensus")]
86    pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
87        tracing::trace!("Trying to acquire read lock on consensus");
88        let ret = self.inner_consensus.read().await;
89        tracing::trace!("Acquired read lock on consensus");
90        ConsensusReadLockGuard::new(ret)
91    }
92
93    /// Locks inner consensus for writing and leaves debug traces
94    #[instrument(skip_all, target = "OuterConsensus")]
95    pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
96        tracing::trace!("Trying to acquire write lock on consensus");
97        let ret = self.inner_consensus.write().await;
98        tracing::trace!("Acquired write lock on consensus");
99        ConsensusWriteLockGuard::new(ret)
100    }
101
102    /// Tries to acquire write lock on inner consensus and leaves debug traces
103    #[instrument(skip_all, target = "OuterConsensus")]
104    pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
105        tracing::trace!("Trying to acquire write lock on consensus");
106        let ret = self.inner_consensus.try_write();
107        if let Some(guard) = ret {
108            tracing::trace!("Acquired write lock on consensus");
109            Some(ConsensusWriteLockGuard::new(guard))
110        } else {
111            tracing::trace!("Failed to acquire write lock");
112            None
113        }
114    }
115
116    /// Acquires upgradable read lock on inner consensus and leaves debug traces
117    #[instrument(skip_all, target = "OuterConsensus")]
118    pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
119        tracing::trace!("Trying to acquire upgradable read lock on consensus");
120        let ret = self.inner_consensus.upgradable_read().await;
121        tracing::trace!("Acquired upgradable read lock on consensus");
122        ConsensusUpgradableReadLockGuard::new(ret)
123    }
124
125    /// Tries to acquire read lock on inner consensus and leaves debug traces
126    #[instrument(skip_all, target = "OuterConsensus")]
127    pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
128        tracing::trace!("Trying to acquire read lock on consensus");
129        let ret = self.inner_consensus.try_read();
130        if let Some(guard) = ret {
131            tracing::trace!("Acquired read lock on consensus");
132            Some(ConsensusReadLockGuard::new(guard))
133        } else {
134            tracing::trace!("Failed to acquire read lock");
135            None
136        }
137    }
138}
139
140/// A thin wrapper around `RwLockReadGuard` for `Consensus` that leaves debug traces when the lock is freed
141pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
142    /// Inner `RwLockReadGuard`
143    lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
144}
145
146impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
147    /// Creates a new instance of `ConsensusReadLockGuard` with the same name as parent `OuterConsensus`
148    #[must_use]
149    pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
150        Self { lock_guard }
151    }
152}
153
154impl<TYPES: NodeType> Deref for ConsensusReadLockGuard<'_, TYPES> {
155    type Target = Consensus<TYPES>;
156    fn deref(&self) -> &Self::Target {
157        &self.lock_guard
158    }
159}
160
161impl<TYPES: NodeType> Drop for ConsensusReadLockGuard<'_, TYPES> {
162    #[instrument(skip_all, target = "ConsensusReadLockGuard")]
163    fn drop(&mut self) {
164        tracing::trace!("Read lock on consensus dropped");
165    }
166}
167
168/// A thin wrapper around `RwLockWriteGuard` for `Consensus` that leaves debug traces when the lock is freed
169pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
170    /// Inner `RwLockWriteGuard`
171    lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
172}
173
174impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
175    /// Creates a new instance of `ConsensusWriteLockGuard` with the same name as parent `OuterConsensus`
176    #[must_use]
177    pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
178        Self { lock_guard }
179    }
180}
181
182impl<TYPES: NodeType> Deref for ConsensusWriteLockGuard<'_, TYPES> {
183    type Target = Consensus<TYPES>;
184    fn deref(&self) -> &Self::Target {
185        &self.lock_guard
186    }
187}
188
189impl<TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'_, TYPES> {
190    fn deref_mut(&mut self) -> &mut Self::Target {
191        &mut self.lock_guard
192    }
193}
194
195impl<TYPES: NodeType> Drop for ConsensusWriteLockGuard<'_, TYPES> {
196    #[instrument(skip_all, target = "ConsensusWriteLockGuard")]
197    fn drop(&mut self) {
198        tracing::debug!("Write lock on consensus dropped");
199    }
200}
201
202/// A thin wrapper around `RwLockUpgradableReadGuard` for `Consensus` that leaves debug traces when the lock is freed or upgraded
203pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
204    /// Inner `RwLockUpgradableReadGuard`
205    lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
206    /// A helper bool to indicate whether inner lock has been unsafely taken or not
207    taken: bool,
208}
209
210impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
211    /// Creates a new instance of `ConsensusUpgradableReadLockGuard` with the same name as parent `OuterConsensus`
212    #[must_use]
213    pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
214        Self {
215            lock_guard: ManuallyDrop::new(lock_guard),
216            taken: false,
217        }
218    }
219
220    /// Upgrades the inner `RwLockUpgradableReadGuard` and leaves debug traces
221    #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
222    #[allow(unused_assignments)] // `taken` is read in Drop impl
223    pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
224        let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
225        guard.taken = true;
226        tracing::debug!("Trying to upgrade upgradable read lock on consensus");
227        let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
228        tracing::debug!("Upgraded upgradable read lock on consensus");
229        ConsensusWriteLockGuard::new(ret)
230    }
231}
232
233impl<TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'_, TYPES> {
234    type Target = Consensus<TYPES>;
235
236    fn deref(&self) -> &Self::Target {
237        &self.lock_guard
238    }
239}
240
241impl<TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'_, TYPES> {
242    #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
243    fn drop(&mut self) {
244        if !self.taken {
245            unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
246            tracing::debug!("Upgradable read lock on consensus dropped");
247        }
248    }
249}
250
251/// A bundle of views that we have most recently performed some action
252#[derive(Debug, Clone, Copy)]
253struct HotShotActionViews {
254    /// View we last proposed in to the Quorum
255    proposed: ViewNumber,
256    /// View we last voted in for a QuorumProposal
257    voted: ViewNumber,
258    /// View we last proposed to the DA committee
259    da_proposed: ViewNumber,
260    /// View we lasted voted for DA proposal
261    da_vote: ViewNumber,
262}
263
264impl Default for HotShotActionViews {
265    fn default() -> Self {
266        let genesis = ViewNumber::genesis();
267        Self {
268            proposed: genesis,
269            voted: genesis,
270            da_proposed: genesis,
271            da_vote: genesis,
272        }
273    }
274}
275impl HotShotActionViews {
276    /// Create HotShotActionViews from a view number
277    fn from_view(view: ViewNumber) -> Self {
278        Self {
279            proposed: view,
280            voted: view,
281            da_proposed: view,
282            da_vote: view,
283        }
284    }
285}
286
287#[derive(Debug, Clone)]
288struct ValidatorParticipation<TYPES: NodeType> {
289    epoch: EpochNumber,
290    /// Current epoch participation by key maps key -> (num leader, num times proposed)
291    current_epoch_participation: HashMap<TYPES::SignatureKey, (u64, u64)>,
292
293    /// Last epoch participation by key maps key -> (num leader, num times proposed)
294    last_epoch_participation: HashMap<TYPES::SignatureKey, (u64, u64)>,
295}
296
297impl<TYPES: NodeType> ValidatorParticipation<TYPES> {
298    fn new() -> Self {
299        Self {
300            epoch: EpochNumber::genesis(),
301            current_epoch_participation: HashMap::new(),
302            last_epoch_participation: HashMap::new(),
303        }
304    }
305
306    fn update_participation(
307        &mut self,
308        key: TYPES::SignatureKey,
309        epoch: EpochNumber,
310        proposed: bool,
311    ) {
312        if epoch != self.epoch {
313            return;
314        }
315        let entry = self
316            .current_epoch_participation
317            .entry(key)
318            .or_insert((0, 0));
319        if proposed {
320            entry.1 += 1;
321        }
322        entry.0 += 1;
323    }
324
325    fn update_participation_epoch(&mut self, epoch: EpochNumber) {
326        if epoch <= self.epoch {
327            return;
328        }
329        self.epoch = epoch;
330        self.last_epoch_participation = self.current_epoch_participation.clone();
331        self.current_epoch_participation = HashMap::new();
332    }
333
334    fn get_participation(&self, key: TYPES::SignatureKey) -> (f64, Option<f64>) {
335        let current_epoch_participation = self
336            .current_epoch_participation
337            .get(&key)
338            .unwrap_or(&(0, 0));
339        let num_leader = current_epoch_participation.0;
340        let num_proposed = current_epoch_participation.1;
341
342        let current_epoch_participation_ratio = if num_leader == 0 {
343            0.0
344        } else {
345            num_proposed as f64 / num_leader as f64
346        };
347        let last_epoch_participation = self.last_epoch_participation.get(&key);
348        let last_epoch_participation_ratio = last_epoch_participation.map(|(leader, proposed)| {
349            if *leader == 0 {
350                0.0
351            } else {
352                *proposed as f64 / *leader as f64
353            }
354        });
355        (
356            current_epoch_participation_ratio,
357            last_epoch_participation_ratio,
358        )
359    }
360
361    fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
362        self.current_epoch_participation
363            .iter()
364            .map(|(key, (leader, proposed))| {
365                (
366                    key.clone(),
367                    if *leader == 0 {
368                        0.0
369                    } else {
370                        *proposed as f64 / *leader as f64
371                    },
372                )
373            })
374            .collect()
375    }
376    fn previous_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
377        self.last_epoch_participation
378            .iter()
379            .map(|(key, (leader, proposed))| {
380                (
381                    key.clone(),
382                    if *leader == 0 {
383                        0.0
384                    } else {
385                        *proposed as f64 / *leader as f64
386                    },
387                )
388            })
389            .collect()
390    }
391}
392
393#[derive(Clone, Debug)]
394struct VoteParticipation<TYPES: NodeType> {
395    /// Current epoch
396    epoch: Option<EpochNumber>,
397
398    /// Current stake_table
399    stake_table: HSStakeTable<TYPES>,
400
401    /// Success threshold
402    success_threshold: U256,
403
404    /// Set of views in the current epoch
405    view_set: HashSet<ViewNumber>,
406
407    /// Number of views in the current epoch
408    current_epoch_num_views: u64,
409
410    /// Current epoch participation by key maps key -> num times voted
411    current_epoch_participation:
412        HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, u64>,
413
414    /// Number of views in the last epoch
415    last_epoch_num_views: u64,
416
417    /// Last epoch participation by key maps key -> num times voted
418    last_epoch_participation:
419        HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, u64>,
420}
421
422impl<TYPES: NodeType> VoteParticipation<TYPES> {
423    fn new(
424        stake_table: HSStakeTable<TYPES>,
425        success_threshold: U256,
426        epoch: Option<EpochNumber>,
427    ) -> Self {
428        let current_epoch_participation: HashMap<_, _> = stake_table
429            .iter()
430            .map({
431                |peer_config| {
432                    (
433                        peer_config
434                            .stake_table_entry
435                            .public_key()
436                            .to_verification_key(),
437                        0u64,
438                    )
439                }
440            })
441            .collect();
442        Self {
443            epoch,
444            stake_table,
445            success_threshold,
446            view_set: HashSet::new(),
447            current_epoch_num_views: 0u64,
448            current_epoch_participation,
449            last_epoch_num_views: 0u64,
450            last_epoch_participation: HashMap::new(),
451        }
452    }
453
454    fn update_participation(&mut self, qc: QuorumCertificate2<TYPES>) -> Result<()> {
455        ensure!(
456            qc.epoch() == self.epoch,
457            warn!(
458                "Incorrect epoch while updating vote participation, current epoch: {:?}, QC epoch \
459                 {:?}",
460                self.epoch,
461                qc.epoch()
462            )
463        );
464        ensure!(
465            !self.view_set.contains(&qc.view_number()),
466            info!(
467                "Participation for view {} already updated",
468                qc.view_number()
469            )
470        );
471        let signers = qc
472            .signers(
473                &StakeTableEntries::<TYPES>::from(self.stake_table.clone()).0,
474                self.success_threshold,
475            )
476            .context(|e| warn!("Tracing signers: {e}"))?;
477        for vk in signers {
478            let Some(votes) = self.current_epoch_participation.get_mut(&vk) else {
479                bail!(warn!(
480                    "Trying to update vote participation for unknown key: {:?}",
481                    vk
482                ));
483            };
484            *votes += 1;
485        }
486        self.view_set.insert(qc.view_number());
487        self.current_epoch_num_views += 1;
488        Ok(())
489    }
490
491    fn update_participation_epoch(
492        &mut self,
493        stake_table: HSStakeTable<TYPES>,
494        success_threshold: U256,
495        epoch: Option<EpochNumber>,
496    ) -> Result<()> {
497        ensure!(
498            epoch > self.epoch,
499            info!(
500                "New epoch not greater than current epoch while updating vote participation \
501                 epoch, current epoch: {:?}, new epoch {:?}",
502                self.epoch, epoch
503            )
504        );
505
506        self.epoch = epoch;
507        self.last_epoch_participation = self.current_epoch_participation.clone();
508        self.last_epoch_num_views = self.current_epoch_num_views;
509        self.current_epoch_num_views = 0;
510        self.view_set = HashSet::new();
511        let current_epoch_participation: HashMap<_, _> = stake_table
512            .iter()
513            .map({
514                |peer_config| {
515                    (
516                        peer_config
517                            .stake_table_entry
518                            .public_key()
519                            .to_verification_key(),
520                        0u64,
521                    )
522                }
523            })
524            .collect();
525        self.current_epoch_participation = current_epoch_participation;
526        self.stake_table = stake_table;
527        self.success_threshold = success_threshold;
528        Ok(())
529    }
530
531    fn get_participation(&self, key: TYPES::SignatureKey) -> (Option<f64>, Option<f64>) {
532        let maybe_current_num_votes = self
533            .current_epoch_participation
534            .get(&key.to_verification_key());
535
536        let current_epoch_vote_ratio = maybe_current_num_votes
537            .map(|num_votes| Self::calculate_ratio(num_votes, self.current_epoch_num_views));
538
539        let maybe_last_num_votes = self
540            .last_epoch_participation
541            .get(&key.to_verification_key());
542
543        let last_epoch_vote_ratio = maybe_last_num_votes
544            .map(|num_votes| Self::calculate_ratio(num_votes, self.last_epoch_num_views));
545
546        (current_epoch_vote_ratio, last_epoch_vote_ratio)
547    }
548
549    fn current_vote_participation(
550        &self,
551    ) -> HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
552        self.current_epoch_participation
553            .iter()
554            .map(|(key, votes)| {
555                (
556                    key.clone(),
557                    Self::calculate_ratio(votes, self.current_epoch_num_views),
558                )
559            })
560            .collect()
561    }
562    fn previous_vote_participation(
563        &self,
564    ) -> HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
565        self.last_epoch_participation
566            .iter()
567            .map(|(key, votes)| {
568                (
569                    key.clone(),
570                    Self::calculate_ratio(votes, self.last_epoch_num_views),
571                )
572            })
573            .collect()
574    }
575
576    fn calculate_ratio(num_votes: &u64, total_views: u64) -> f64 {
577        if total_views == 0 {
578            0.0
579        } else {
580            *num_votes as f64 / total_views as f64
581        }
582    }
583}
584
585/// A reference to the consensus algorithm
586///
587/// This will contain the state of all rounds.
588#[derive(derive_more::Debug, Clone)]
589pub struct Consensus<TYPES: NodeType> {
590    /// The validated states that are currently loaded in memory.
591    validated_state_map: BTreeMap<ViewNumber, View<TYPES>>,
592
593    /// All the VID shares we've received for current and future views.
594    vid_shares: VidShares<TYPES>,
595
596    /// All the DA certs we've received for current and future views.
597    /// view -> DA cert
598    saved_da_certs: HashMap<ViewNumber, DaCertificate2<TYPES>>,
599
600    /// View number that is currently on.
601    cur_view: ViewNumber,
602
603    /// Epoch number that is currently on.
604    cur_epoch: Option<EpochNumber>,
605
606    /// Last proposals we sent out, None if we haven't proposed yet.
607    /// Prevents duplicate proposals, and can be served to those trying to catchup
608    last_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
609
610    /// last view had a successful decide event
611    last_decided_view: ViewNumber,
612
613    /// The `locked_qc` view number
614    locked_view: ViewNumber,
615
616    /// Map of leaf hash -> leaf
617    /// - contains undecided leaves
618    /// - includes the MOST RECENT decided leaf
619    saved_leaves: CommitmentMap<Leaf2<TYPES>>,
620
621    /// Bundle of views which we performed the most recent action
622    /// visibible to the network.  Actions are votes and proposals
623    /// for DA and Quorum
624    last_actions: HotShotActionViews,
625
626    /// Saved payloads.
627    ///
628    /// Encoded transactions for every view if we got a payload for that view.
629    saved_payloads: BTreeMap<ViewNumber, Arc<PayloadWithMetadata<TYPES>>>,
630
631    /// the highqc per spec
632    high_qc: QuorumCertificate2<TYPES>,
633
634    /// The high QC for the next epoch
635    next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
636
637    /// Track the participation of each validator in the current epoch and previous epoch
638    validator_participation: ValidatorParticipation<TYPES>,
639
640    /// Track the vote participation of each node in the current epoch and previous epoch
641    vote_participation: VoteParticipation<TYPES>,
642
643    /// A reference to the metrics trait
644    pub metrics: Arc<ConsensusMetricsValue>,
645
646    /// Number of blocks in an epoch, zero means there are no epochs
647    pub epoch_height: u64,
648
649    /// Number of iterations for the DRB calculation, taken from HotShotConfig
650    pub drb_difficulty: u64,
651
652    /// Number of iterations for the DRB calculation post-difficulty upgrade, taken from HotShotConfig
653    pub drb_upgrade_difficulty: u64,
654
655    /// The transition QC for the current epoch
656    transition_qc: Option<(
657        QuorumCertificate2<TYPES>,
658        NextEpochQuorumCertificate2<TYPES>,
659    )>,
660
661    /// The highest block number that we have seen
662    pub highest_block: u64,
663    /// The light client state update certificate
664    pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
665}
666
667/// This struct holds a payload and its metadata
668#[derive(Debug, Clone, Hash, Eq, PartialEq)]
669pub struct PayloadWithMetadata<TYPES: NodeType> {
670    pub payload: TYPES::BlockPayload,
671    pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
672}
673
674/// Contains several `ConsensusMetrics` that we're interested in from the consensus interfaces
675#[derive(Clone, Debug)]
676pub struct ConsensusMetricsValue {
677    /// The number of last synced block height
678    pub last_synced_block_height: Box<dyn Gauge>,
679    /// The number of last decided view
680    pub last_decided_view: Box<dyn Gauge>,
681    /// The number of the last voted view
682    pub last_voted_view: Box<dyn Gauge>,
683    /// Number of timestamp for the last decided time
684    pub last_decided_time: Box<dyn Gauge>,
685    /// The current view
686    pub current_view: Box<dyn Gauge>,
687    /// Number of views that are in-flight since the last decided view
688    pub number_of_views_since_last_decide: Box<dyn Gauge>,
689    /// Number of views that are in-flight since the last anchor view
690    pub number_of_views_per_decide_event: Box<dyn Histogram>,
691    /// Duration of views as leader
692    pub view_duration_as_leader: Box<dyn Histogram>,
693    /// Number of invalid QCs we've seen since the last commit.
694    pub invalid_qc: Box<dyn Gauge>,
695    /// Number of outstanding transactions
696    pub outstanding_transactions: Box<dyn Gauge>,
697    /// Memory size in bytes of the serialized transactions still outstanding
698    pub outstanding_transactions_memory_size: Box<dyn Gauge>,
699    /// Number of views that timed out
700    pub number_of_timeouts: Box<dyn Counter>,
701    /// Number of views that timed out as leader
702    pub number_of_timeouts_as_leader: Box<dyn Counter>,
703    /// The number of empty blocks that have been proposed
704    pub number_of_empty_blocks_proposed: Box<dyn Counter>,
705    /// Number of events in the hotshot event queue
706    pub internal_event_queue_len: Box<dyn Gauge>,
707    /// Time from proposal creation to decide time
708    pub proposal_to_decide_time: Box<dyn Histogram>,
709    /// Time from proposal received to proposal creation
710    pub previous_proposal_to_proposal_time: Box<dyn Histogram>,
711    /// Finalized bytes per view
712    pub finalized_bytes: Box<dyn Histogram>,
713    /// The duration of the validate and apply header
714    pub validate_and_apply_header_duration: Box<dyn Histogram>,
715    /// The duration of update leaf
716    pub update_leaf_duration: Box<dyn Histogram>,
717    /// The time it took to calculate the disperse
718    pub vid_disperse_duration: Box<dyn Histogram>,
719}
720
721impl ConsensusMetricsValue {
722    /// Create a new instance of this [`ConsensusMetricsValue`] struct, setting all the counters and gauges
723    #[must_use]
724    pub fn new(metrics: &dyn Metrics) -> Self {
725        Self {
726            last_synced_block_height: metrics
727                .create_gauge(String::from("last_synced_block_height"), None),
728            last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
729            last_voted_view: metrics.create_gauge(String::from("last_voted_view"), None),
730            last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
731            current_view: metrics.create_gauge(String::from("current_view"), None),
732            number_of_views_since_last_decide: metrics
733                .create_gauge(String::from("number_of_views_since_last_decide"), None),
734            number_of_views_per_decide_event: metrics
735                .create_histogram(String::from("number_of_views_per_decide_event"), None),
736            view_duration_as_leader: metrics
737                .create_histogram(String::from("view_duration_as_leader"), None),
738            invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
739            outstanding_transactions: metrics
740                .create_gauge(String::from("outstanding_transactions"), None),
741            outstanding_transactions_memory_size: metrics
742                .create_gauge(String::from("outstanding_transactions_memory_size"), None),
743            number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
744            number_of_timeouts_as_leader: metrics
745                .create_counter(String::from("number_of_timeouts_as_leader"), None),
746            number_of_empty_blocks_proposed: metrics
747                .create_counter(String::from("number_of_empty_blocks_proposed"), None),
748            internal_event_queue_len: metrics
749                .create_gauge(String::from("internal_event_queue_len"), None),
750            proposal_to_decide_time: metrics
751                .create_histogram(String::from("proposal_to_decide_time"), None),
752            previous_proposal_to_proposal_time: metrics
753                .create_histogram(String::from("previous_proposal_to_proposal_time"), None),
754            finalized_bytes: metrics.create_histogram(String::from("finalized_bytes"), None),
755            validate_and_apply_header_duration: metrics.create_histogram(
756                String::from("validate_and_apply_header_duration"),
757                Some("seconds".to_string()),
758            ),
759            update_leaf_duration: metrics.create_histogram(
760                String::from("update_leaf_duration"),
761                Some("seconds".to_string()),
762            ),
763            vid_disperse_duration: metrics.create_histogram(
764                String::from("vid_disperse_duration"),
765                Some("seconds".to_string()),
766            ),
767        }
768    }
769}
770
771impl Default for ConsensusMetricsValue {
772    fn default() -> Self {
773        Self::new(&*NoMetrics::boxed())
774    }
775}
776
777impl<TYPES: NodeType> Consensus<TYPES> {
778    /// Constructor.
779    #[allow(clippy::too_many_arguments)]
780    pub fn new(
781        validated_state_map: BTreeMap<ViewNumber, View<TYPES>>,
782        vid_shares: Option<VidShares<TYPES>>,
783        cur_view: ViewNumber,
784        cur_epoch: Option<EpochNumber>,
785        locked_view: ViewNumber,
786        last_decided_view: ViewNumber,
787        last_actioned_view: ViewNumber,
788        last_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
789        saved_leaves: CommitmentMap<Leaf2<TYPES>>,
790        saved_payloads: BTreeMap<ViewNumber, Arc<PayloadWithMetadata<TYPES>>>,
791        high_qc: QuorumCertificate2<TYPES>,
792        next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
793        metrics: Arc<ConsensusMetricsValue>,
794        epoch_height: u64,
795        state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
796        drb_difficulty: u64,
797        drb_upgrade_difficulty: u64,
798        stake_table: HSStakeTable<TYPES>,
799        success_threshold: U256,
800    ) -> Self {
801        let transition_qc = if let Some(ref next_epoch_high_qc) = next_epoch_high_qc {
802            if high_qc
803                .data
804                .block_number
805                .is_some_and(|bn| is_transition_block(bn, epoch_height))
806            {
807                if high_qc.data.leaf_commit == next_epoch_high_qc.data.leaf_commit {
808                    Some((high_qc.clone(), next_epoch_high_qc.clone()))
809                } else {
810                    tracing::error!("Next epoch high QC has different leaf commit to high QC");
811                    None
812                }
813            } else {
814                None
815            }
816        } else {
817            None
818        };
819        Consensus {
820            validated_state_map,
821            vid_shares: vid_shares.unwrap_or_default(),
822            saved_da_certs: HashMap::new(),
823            cur_view,
824            cur_epoch,
825            last_decided_view,
826            last_proposals,
827            last_actions: HotShotActionViews::from_view(last_actioned_view),
828            locked_view,
829            saved_leaves,
830            saved_payloads,
831            high_qc,
832            next_epoch_high_qc,
833            metrics,
834            epoch_height,
835            transition_qc,
836            highest_block: 0,
837            state_cert,
838            drb_difficulty,
839            validator_participation: ValidatorParticipation::new(),
840            vote_participation: VoteParticipation::new(stake_table, success_threshold, cur_epoch),
841            drb_upgrade_difficulty,
842        }
843    }
844
845    /// Get the current view.
846    pub fn cur_view(&self) -> ViewNumber {
847        self.cur_view
848    }
849
850    /// Get the current epoch.
851    pub fn cur_epoch(&self) -> Option<EpochNumber> {
852        self.cur_epoch
853    }
854
855    /// Get the last decided view.
856    pub fn last_decided_view(&self) -> ViewNumber {
857        self.last_decided_view
858    }
859
860    /// Get the locked view.
861    pub fn locked_view(&self) -> ViewNumber {
862        self.locked_view
863    }
864
865    /// Get the high QC.
866    pub fn high_qc(&self) -> &QuorumCertificate2<TYPES> {
867        &self.high_qc
868    }
869
870    /// Get the transition QC.
871    pub fn transition_qc(
872        &self,
873    ) -> Option<&(
874        QuorumCertificate2<TYPES>,
875        NextEpochQuorumCertificate2<TYPES>,
876    )> {
877        self.transition_qc.as_ref()
878    }
879
880    ///Update the highest block number
881    pub fn update_highest_block(&mut self, block_number: u64) {
882        if block_number > self.highest_block {
883            self.highest_block = block_number;
884            return;
885        }
886
887        if is_epoch_transition(block_number, self.epoch_height) {
888            let new_epoch = epoch_from_block_number(block_number, self.epoch_height);
889            let high_epoch = epoch_from_block_number(self.highest_block, self.epoch_height);
890            if new_epoch >= high_epoch {
891                self.highest_block = block_number;
892            }
893        }
894    }
895
896    /// Update the transition QC.
897    pub fn update_transition_qc(
898        &mut self,
899        qc: QuorumCertificate2<TYPES>,
900        next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
901    ) {
902        if next_epoch_qc.data.leaf_commit != qc.data().leaf_commit {
903            tracing::error!(
904                "Next epoch QC for view {} has different leaf commit {:?} to {:?}",
905                qc.view_number(),
906                next_epoch_qc.data.leaf_commit,
907                qc.data().leaf_commit
908            );
909            return;
910        }
911        if let Some((transition_qc, _)) = &self.transition_qc {
912            if transition_qc.view_number() >= qc.view_number() {
913                return;
914            }
915        }
916        self.transition_qc = Some((qc, next_epoch_qc));
917    }
918
919    /// Get the current light client state certificate
920    pub fn state_cert(&self) -> Option<&LightClientStateUpdateCertificateV2<TYPES>> {
921        self.state_cert.as_ref()
922    }
923
924    /// Get the next epoch high QC.
925    pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2<TYPES>> {
926        self.next_epoch_high_qc.as_ref()
927    }
928
929    /// Get the validated state map.
930    pub fn validated_state_map(&self) -> &BTreeMap<ViewNumber, View<TYPES>> {
931        &self.validated_state_map
932    }
933
934    /// Get the saved leaves.
935    pub fn saved_leaves(&self) -> &CommitmentMap<Leaf2<TYPES>> {
936        &self.saved_leaves
937    }
938
939    /// Get the saved payloads.
940    pub fn saved_payloads(&self) -> &BTreeMap<ViewNumber, Arc<PayloadWithMetadata<TYPES>>> {
941        &self.saved_payloads
942    }
943
944    /// Get the vid shares.
945    pub fn vid_shares(&self) -> &VidShares<TYPES> {
946        &self.vid_shares
947    }
948
949    /// Get the saved DA certs.
950    pub fn saved_da_certs(&self) -> &HashMap<ViewNumber, DaCertificate2<TYPES>> {
951        &self.saved_da_certs
952    }
953
954    /// Get the map of our recent proposals
955    pub fn last_proposals(
956        &self,
957    ) -> &BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
958        &self.last_proposals
959    }
960
961    /// Update the current view.
962    /// # Errors
963    /// Can return an error when the new view_number is not higher than the existing view number.
964    pub fn update_view(&mut self, view_number: ViewNumber) -> Result<()> {
965        ensure!(
966            view_number > self.cur_view,
967            debug!("New view isn't newer than the current view.")
968        );
969        self.cur_view = view_number;
970        Ok(())
971    }
972
973    /// Update the validator participation
974    pub fn update_validator_participation(
975        &mut self,
976        key: TYPES::SignatureKey,
977        epoch: EpochNumber,
978        proposed: bool,
979    ) {
980        self.validator_participation
981            .update_participation(key, epoch, proposed);
982    }
983
984    /// Update the validator participation epoch
985    pub fn update_validator_participation_epoch(&mut self, epoch: EpochNumber) {
986        self.validator_participation
987            .update_participation_epoch(epoch);
988    }
989
990    /// Get the validator participation
991    pub fn get_validator_participation(&self, key: TYPES::SignatureKey) -> (f64, Option<f64>) {
992        self.validator_participation.get_participation(key)
993    }
994
995    /// Get the current proposal participation
996    pub fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
997        self.validator_participation
998            .current_proposal_participation()
999    }
1000
1001    /// Get the previous proposal participation
1002    pub fn previous_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
1003        self.validator_participation
1004            .previous_proposal_participation()
1005    }
1006
1007    /// Update the vote participation
1008    pub fn update_vote_participation(&mut self, qc: QuorumCertificate2<TYPES>) -> Result<()> {
1009        self.vote_participation.update_participation(qc)
1010    }
1011
1012    /// Update the vote participation epoch
1013    pub fn update_vote_participation_epoch(
1014        &mut self,
1015        stake_table: HSStakeTable<TYPES>,
1016        success_threshold: U256,
1017        epoch: Option<EpochNumber>,
1018    ) -> Result<()> {
1019        self.vote_participation
1020            .update_participation_epoch(stake_table, success_threshold, epoch)
1021    }
1022
1023    /// Get the vote participation
1024    pub fn get_participation(&self, key: TYPES::SignatureKey) -> (Option<f64>, Option<f64>) {
1025        self.vote_participation.get_participation(key)
1026    }
1027
1028    /// Get the current vote participation
1029    pub fn current_vote_participation(
1030        &self,
1031    ) -> HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
1032        self.vote_participation.current_vote_participation()
1033    }
1034
1035    /// Get the previous vote participation
1036    pub fn previous_vote_participation(
1037        &self,
1038    ) -> HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
1039        self.vote_participation.previous_vote_participation()
1040    }
1041
1042    /// Get the parent Leaf Info from a given leaf and our public key.
1043    /// Returns None if we don't have the data in out state
1044    pub async fn parent_leaf_info(
1045        &self,
1046        leaf: &Leaf2<TYPES>,
1047        public_key: &TYPES::SignatureKey,
1048    ) -> Option<LeafInfo<TYPES>> {
1049        let parent_view_number = leaf.justify_qc().view_number();
1050        let parent_epoch = leaf.justify_qc().epoch();
1051        let parent_leaf = self
1052            .saved_leaves
1053            .get(&leaf.justify_qc().data().leaf_commit)?;
1054        let parent_state_and_delta = self.state_and_delta(parent_view_number);
1055        let (Some(state), delta) = parent_state_and_delta else {
1056            return None;
1057        };
1058
1059        let parent_vid = self
1060            .vid_shares()
1061            .get(&parent_view_number)
1062            .and_then(|key_map| key_map.get(public_key))
1063            .and_then(|epoch_map| epoch_map.get(&parent_epoch))
1064            .map(|prop| prop.data.clone());
1065
1066        let state_cert = if parent_leaf.with_epoch
1067            && is_epoch_root(parent_leaf.block_header().block_number(), self.epoch_height)
1068        {
1069            match self.state_cert() {
1070                // Sanity check that the state cert is for the same view as the parent leaf
1071                Some(state_cert)
1072                    if state_cert.light_client_state.view_number == parent_view_number.u64() =>
1073                {
1074                    Some(state_cert.clone())
1075                },
1076                _ => None,
1077            }
1078        } else {
1079            None
1080        };
1081
1082        Some(LeafInfo {
1083            leaf: parent_leaf.clone(),
1084            state,
1085            delta,
1086            vid_share: parent_vid,
1087            state_cert,
1088        })
1089    }
1090
1091    /// Update the current epoch.
1092    /// # Errors
1093    /// Can return an error when the new epoch_number is not higher than the existing epoch number.
1094    pub fn update_epoch(&mut self, epoch_number: EpochNumber) -> Result<()> {
1095        ensure!(
1096            self.cur_epoch.is_none() || Some(epoch_number) > self.cur_epoch,
1097            debug!("New epoch isn't newer than the current epoch.")
1098        );
1099        tracing::trace!(
1100            "Updating epoch from {:?} to {}",
1101            self.cur_epoch,
1102            epoch_number
1103        );
1104        self.cur_epoch = Some(epoch_number);
1105        Ok(())
1106    }
1107
1108    /// Update the last actioned view internally for votes and proposals
1109    ///
1110    /// Returns true if the action is for a newer view than the last action of that type
1111    pub fn update_action(&mut self, action: HotShotAction, view: ViewNumber) -> bool {
1112        let old_view = match action {
1113            HotShotAction::Vote => &mut self.last_actions.voted,
1114            HotShotAction::Propose => &mut self.last_actions.proposed,
1115            HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
1116            HotShotAction::DaVote => {
1117                if view > self.last_actions.da_vote {
1118                    self.last_actions.da_vote = view;
1119                }
1120                // TODO Add logic to prevent double voting.  For now the simple check if
1121                // the last voted view is less than the view we are trying to vote doesn't work
1122                // because the leader of view n + 1 may propose to the DA (and we would vote)
1123                // before the leader of view n.
1124                return true;
1125            },
1126            _ => return true,
1127        };
1128        if view > *old_view {
1129            *old_view = view;
1130            return true;
1131        }
1132        false
1133    }
1134
1135    /// reset last actions to genesis so we can resend events in tests
1136    pub fn reset_actions(&mut self) {
1137        self.last_actions = HotShotActionViews::default();
1138    }
1139
1140    /// Update the last proposal.
1141    ///
1142    /// # Errors
1143    /// Can return an error when the new view_number is not higher than the existing proposed view number.
1144    pub fn update_proposed_view(
1145        &mut self,
1146        proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1147    ) -> Result<()> {
1148        ensure!(
1149            proposal.data.view_number()
1150                > self
1151                    .last_proposals
1152                    .last_key_value()
1153                    .map_or(ViewNumber::genesis(), |(k, _)| { *k }),
1154            debug!("New view isn't newer than the previously proposed view.")
1155        );
1156        self.last_proposals
1157            .insert(proposal.data.view_number(), proposal);
1158        Ok(())
1159    }
1160
1161    /// Update the last decided view.
1162    ///
1163    /// # Errors
1164    /// Can return an error when the new view_number is not higher than the existing decided view number.
1165    pub fn update_last_decided_view(&mut self, view_number: ViewNumber) -> Result<()> {
1166        ensure!(
1167            view_number > self.last_decided_view,
1168            debug!("New view isn't newer than the previously decided view.")
1169        );
1170        self.last_decided_view = view_number;
1171        Ok(())
1172    }
1173
1174    /// Update the locked view.
1175    ///
1176    /// # Errors
1177    /// Can return an error when the new view_number is not higher than the existing locked view number.
1178    pub fn update_locked_view(&mut self, view_number: ViewNumber) -> Result<()> {
1179        ensure!(
1180            view_number > self.locked_view,
1181            debug!("New view isn't newer than the previously locked view.")
1182        );
1183        self.locked_view = view_number;
1184        Ok(())
1185    }
1186
1187    /// Update the validated state map with a new view_number/view combo.
1188    ///
1189    /// # Errors
1190    /// Can return an error when the new view contains less information than the existing view
1191    /// with the same view number.
1192    pub fn update_da_view(
1193        &mut self,
1194        view_number: ViewNumber,
1195        epoch: Option<EpochNumber>,
1196        payload_commitment: VidCommitment,
1197    ) -> Result<()> {
1198        let view = View {
1199            view_inner: ViewInner::Da {
1200                payload_commitment,
1201                epoch,
1202            },
1203        };
1204        self.update_validated_state_map(view_number, view)
1205    }
1206
1207    /// Update the validated state map with a new view_number/view combo.
1208    ///
1209    /// # Errors
1210    /// Can return an error when the new view contains less information than the existing view
1211    /// with the same view number.
1212    pub fn update_leaf(
1213        &mut self,
1214        leaf: Leaf2<TYPES>,
1215        state: Arc<TYPES::ValidatedState>,
1216        delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1217    ) -> Result<()> {
1218        let view_number = leaf.view_number();
1219        let epoch =
1220            option_epoch_from_block_number(leaf.with_epoch, leaf.height(), self.epoch_height);
1221        let view = View {
1222            view_inner: ViewInner::Leaf {
1223                leaf: leaf.commit(),
1224                state,
1225                delta,
1226                epoch,
1227            },
1228        };
1229        self.update_validated_state_map(view_number, view)?;
1230        self.update_saved_leaves(leaf);
1231        Ok(())
1232    }
1233
1234    /// Update the validated state map with a new view_number/view combo.
1235    ///
1236    /// # Errors
1237    /// Can return an error when the new view contains less information than the existing view
1238    /// with the same view number.
1239    fn update_validated_state_map(
1240        &mut self,
1241        view_number: ViewNumber,
1242        new_view: View<TYPES>,
1243    ) -> Result<()> {
1244        if let Some(existing_view) = self.validated_state_map().get(&view_number) {
1245            if let ViewInner::Leaf {
1246                delta: ref existing_delta,
1247                ..
1248            } = existing_view.view_inner
1249            {
1250                if let ViewInner::Leaf {
1251                    delta: ref new_delta,
1252                    ..
1253                } = new_view.view_inner
1254                {
1255                    ensure!(
1256                        new_delta.is_some() || existing_delta.is_none(),
1257                        debug!(
1258                            "Skipping the state update to not override a `Leaf` view with `Some` \
1259                             state delta."
1260                        )
1261                    );
1262                } else {
1263                    bail!(
1264                        "Skipping the state update to not override a `Leaf` view with a \
1265                         non-`Leaf` view."
1266                    );
1267                }
1268            }
1269        }
1270        self.validated_state_map.insert(view_number, new_view);
1271        Ok(())
1272    }
1273
1274    /// Update the saved leaves with a new leaf.
1275    fn update_saved_leaves(&mut self, leaf: Leaf2<TYPES>) {
1276        self.saved_leaves.insert(leaf.commit(), leaf);
1277    }
1278
1279    /// Update the saved payloads with a new encoded transaction.
1280    ///
1281    /// # Errors
1282    /// Can return an error when there's an existing payload corresponding to the same view number.
1283    pub fn update_saved_payloads(
1284        &mut self,
1285        view_number: ViewNumber,
1286        payload: Arc<PayloadWithMetadata<TYPES>>,
1287    ) -> Result<()> {
1288        ensure!(
1289            !self.saved_payloads.contains_key(&view_number),
1290            "Payload with the same view already exists."
1291        );
1292        self.saved_payloads.insert(view_number, payload);
1293        Ok(())
1294    }
1295
1296    /// Update the high QC if given a newer one.
1297    /// # Errors
1298    /// Can return an error when the provided high_qc is not newer than the existing entry.
1299    pub fn update_high_qc(&mut self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
1300        if self.high_qc == high_qc {
1301            return Ok(());
1302        }
1303        // make sure the we don't update the high QC unless is't a higher view
1304        ensure!(
1305            high_qc.view_number > self.high_qc.view_number,
1306            debug!("High QC with an equal or higher view exists.")
1307        );
1308        tracing::debug!("Updating high QC");
1309        self.high_qc = high_qc;
1310
1311        Ok(())
1312    }
1313
1314    /// Update the next epoch high QC if given a newer one.
1315    /// # Errors
1316    /// Can return an error when the provided high_qc is not newer than the existing entry.
1317    /// # Panics
1318    /// It can't actually panic. If the option is None, we will not call unwrap on it.
1319    pub fn update_next_epoch_high_qc(
1320        &mut self,
1321        high_qc: NextEpochQuorumCertificate2<TYPES>,
1322    ) -> Result<()> {
1323        if self.next_epoch_high_qc.as_ref() == Some(&high_qc) {
1324            return Ok(());
1325        }
1326        if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() {
1327            ensure!(
1328                high_qc.view_number > next_epoch_high_qc.view_number,
1329                debug!("Next epoch high QC with an equal or higher view exists.")
1330            );
1331        }
1332        tracing::debug!("Updating next epoch high QC");
1333        self.next_epoch_high_qc = Some(high_qc);
1334
1335        Ok(())
1336    }
1337
1338    /// Resets high qc and next epoch qc to the provided transition qc.
1339    /// # Errors
1340    /// Can return an error when the provided high_qc is not newer than the existing entry.
1341    pub fn reset_high_qc(
1342        &mut self,
1343        high_qc: QuorumCertificate2<TYPES>,
1344        next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
1345    ) -> Result<()> {
1346        ensure!(
1347            high_qc.data.leaf_commit == next_epoch_qc.data.leaf_commit,
1348            error!("High QC's and next epoch QC's leaf commits do not match.")
1349        );
1350        if self.high_qc == high_qc {
1351            return Ok(());
1352        }
1353        let same_epoch = high_qc.data.block_number.is_some_and(|bn| {
1354            let current_qc = self.high_qc();
1355            let Some(high_bn) = current_qc.data.block_number else {
1356                return false;
1357            };
1358            epoch_from_block_number(bn + 1, self.epoch_height)
1359                == epoch_from_block_number(high_bn + 1, self.epoch_height)
1360        });
1361        ensure!(
1362            high_qc
1363                .data
1364                .block_number
1365                .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
1366                && same_epoch,
1367            error!("Provided QC is not a transition QC.")
1368        );
1369        tracing::debug!("Resetting high QC and next epoch high QC");
1370        self.high_qc = high_qc;
1371        self.next_epoch_high_qc = Some(next_epoch_qc);
1372
1373        Ok(())
1374    }
1375
1376    /// Update the light client state update certificate if given a newer one.
1377    /// # Errors
1378    /// Can return an error when the provided state_cert is not newer than the existing entry.
1379    pub fn update_state_cert(
1380        &mut self,
1381        state_cert: LightClientStateUpdateCertificateV2<TYPES>,
1382    ) -> Result<()> {
1383        if let Some(existing_state_cert) = &self.state_cert {
1384            ensure!(
1385                state_cert.epoch > existing_state_cert.epoch,
1386                debug!(
1387                    "Light client state update certification with an equal or higher epoch exists."
1388                )
1389            );
1390        }
1391        tracing::debug!("Updating light client state update certification");
1392        self.state_cert = Some(state_cert);
1393
1394        Ok(())
1395    }
1396
1397    /// Add a new entry to the vid_shares map.
1398    pub fn update_vid_shares(
1399        &mut self,
1400        view_number: ViewNumber,
1401        disperse: Proposal<TYPES, VidDisperseShare<TYPES>>,
1402    ) {
1403        self.vid_shares
1404            .entry(view_number)
1405            .or_default()
1406            .entry(disperse.data.recipient_key().clone())
1407            .or_default()
1408            .insert(disperse.data.target_epoch(), disperse);
1409    }
1410
1411    /// Add a new entry to the da_certs map.
1412    pub fn update_saved_da_certs(&mut self, view_number: ViewNumber, cert: DaCertificate2<TYPES>) {
1413        self.saved_da_certs.insert(view_number, cert);
1414    }
1415
1416    /// gather information from the parent chain of leaves
1417    /// # Errors
1418    /// If the leaf or its ancestors are not found in storage
1419    pub fn visit_leaf_ancestors<F>(
1420        &self,
1421        start_from: ViewNumber,
1422        terminator: Terminator<ViewNumber>,
1423        ok_when_finished: bool,
1424        mut f: F,
1425    ) -> std::result::Result<(), HotShotError<TYPES>>
1426    where
1427        F: FnMut(
1428            &Leaf2<TYPES>,
1429            Arc<<TYPES as NodeType>::ValidatedState>,
1430            Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1431        ) -> bool,
1432    {
1433        let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
1434            view.leaf_commitment().ok_or_else(|| {
1435                HotShotError::InvalidState(format!(
1436                    "Visited failed view {start_from} leaf. Expected successful leaf"
1437                ))
1438            })?
1439        } else {
1440            return Err(HotShotError::InvalidState(format!(
1441                "View {start_from} leaf does not exist in state map "
1442            )));
1443        };
1444
1445        while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
1446            let view = leaf.view_number();
1447            if let (Some(state), delta) = self.state_and_delta(view) {
1448                if let Terminator::Exclusive(stop_before) = terminator {
1449                    if stop_before == view {
1450                        if ok_when_finished {
1451                            return Ok(());
1452                        }
1453                        break;
1454                    }
1455                }
1456                next_leaf = leaf.parent_commitment();
1457                if !f(leaf, state, delta) {
1458                    return Ok(());
1459                }
1460                if let Terminator::Inclusive(stop_after) = terminator {
1461                    if stop_after == view {
1462                        if ok_when_finished {
1463                            return Ok(());
1464                        }
1465                        break;
1466                    }
1467                }
1468            } else {
1469                return Err(HotShotError::InvalidState(format!(
1470                    "View {view} state does not exist in state map"
1471                )));
1472            }
1473        }
1474        Err(HotShotError::MissingLeaf(next_leaf))
1475    }
1476
1477    /// Garbage collects based on state change right now, this removes from both the
1478    /// `saved_payloads` and `validated_state_map` fields of `Consensus`.
1479    /// # Panics
1480    /// On inconsistent stored entries
1481    pub fn collect_garbage(&mut self, old_anchor_view: ViewNumber, new_anchor_view: ViewNumber) {
1482        // Nothing to collect
1483        if new_anchor_view <= old_anchor_view {
1484            return;
1485        }
1486        let gc_view = ViewNumber::new(new_anchor_view.saturating_sub(1));
1487        // state check
1488        let anchor_entry = self
1489            .validated_state_map
1490            .iter()
1491            .next()
1492            .expect("INCONSISTENT STATE: anchor leaf not in state map!");
1493        if **anchor_entry.0 != old_anchor_view.saturating_sub(1) {
1494            tracing::info!(
1495                "Something about GC has failed. Older leaf exists than the previous anchor leaf."
1496            );
1497        }
1498        // perform gc
1499        self.saved_da_certs
1500            .retain(|view_number, _| *view_number >= old_anchor_view);
1501        self.validated_state_map
1502            .range(..gc_view)
1503            .filter_map(|(_view_number, view)| view.leaf_commitment())
1504            .for_each(|leaf| {
1505                self.saved_leaves.remove(&leaf);
1506            });
1507        self.validated_state_map = self.validated_state_map.split_off(&gc_view);
1508        self.saved_payloads = self.saved_payloads.split_off(&gc_view);
1509        self.vid_shares = self.vid_shares.split_off(&gc_view);
1510        self.last_proposals = self.last_proposals.split_off(&gc_view);
1511    }
1512
1513    /// Gets the last decided leaf.
1514    ///
1515    /// # Panics
1516    /// if the last decided view's leaf does not exist in the state map or saved leaves, which
1517    /// should never happen.
1518    #[must_use]
1519    pub fn decided_leaf(&self) -> Leaf2<TYPES> {
1520        let decided_view_num = self.last_decided_view;
1521        let view = self.validated_state_map.get(&decided_view_num).unwrap();
1522        let leaf = view
1523            .leaf_commitment()
1524            .expect("Decided leaf not found! Consensus internally inconsistent");
1525        self.saved_leaves.get(&leaf).unwrap().clone()
1526    }
1527
1528    pub fn undecided_leaves(&self) -> Vec<Leaf2<TYPES>> {
1529        self.saved_leaves.values().cloned().collect::<Vec<_>>()
1530    }
1531
1532    /// Gets the validated state with the given view number, if in the state map.
1533    #[must_use]
1534    pub fn state(&self, view_number: ViewNumber) -> Option<&Arc<TYPES::ValidatedState>> {
1535        match self.validated_state_map.get(&view_number) {
1536            Some(view) => view.state(),
1537            None => None,
1538        }
1539    }
1540
1541    /// Gets the validated state and state delta with the given view number, if in the state map.
1542    #[must_use]
1543    pub fn state_and_delta(&self, view_number: ViewNumber) -> StateAndDelta<TYPES> {
1544        match self.validated_state_map.get(&view_number) {
1545            Some(view) => view.state_and_delta(),
1546            None => (None, None),
1547        }
1548    }
1549
1550    /// Gets the last decided validated state.
1551    ///
1552    /// # Panics
1553    /// If the last decided view's state does not exist in the state map, which should never
1554    /// happen.
1555    #[must_use]
1556    pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
1557        let decided_view_num = self.last_decided_view;
1558        self.state_and_delta(decided_view_num)
1559            .0
1560            .expect("Decided state not found! Consensus internally inconsistent")
1561    }
1562
1563    /// Associated helper function:
1564    /// Takes `LockedConsensusState` which will be updated; locks it for read and write accordingly.
1565    /// Calculates `VidDisperse` based on the view, the txns and the membership,
1566    /// and updates `vid_shares` map with the signed `VidDisperseShare` proposals.
1567    /// Returned `Option` indicates whether the update has actually happened or not.
1568    #[instrument(skip_all, target = "Consensus", fields(view = *view))]
1569    pub async fn calculate_and_update_vid(
1570        consensus: OuterConsensus<TYPES>,
1571        view: ViewNumber,
1572        target_epoch: Option<EpochNumber>,
1573        membership_coordinator: EpochMembershipCoordinator<TYPES>,
1574        private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
1575        upgrade_lock: &UpgradeLock<TYPES>,
1576    ) -> Option<()> {
1577        let payload_with_metadata = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
1578        let epoch = consensus
1579            .read()
1580            .await
1581            .validated_state_map()
1582            .get(&view)?
1583            .view_inner
1584            .epoch()?;
1585
1586        let VidDisperseAndDuration {
1587            disperse: vid,
1588            duration: disperse_duration,
1589        } = VidDisperse::calculate_vid_disperse(
1590            &payload_with_metadata.payload,
1591            &membership_coordinator,
1592            view,
1593            target_epoch,
1594            epoch,
1595            &payload_with_metadata.metadata,
1596            upgrade_lock,
1597        )
1598        .await
1599        .ok()?;
1600
1601        let mut consensus_writer = consensus.write().await;
1602        consensus_writer
1603            .metrics
1604            .vid_disperse_duration
1605            .add_point(disperse_duration.as_secs_f64());
1606        for share in vid.to_shares() {
1607            if let Some(prop) = share.to_proposal(private_key) {
1608                consensus_writer.update_vid_shares(view, prop);
1609            }
1610        }
1611
1612        Some(())
1613    }
1614    /// Returns true if a given leaf is for the epoch transition
1615    pub fn is_epoch_transition(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
1616        let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
1617            tracing::trace!("We don't have a leaf corresponding to the leaf commit");
1618            return false;
1619        };
1620        let block_height = leaf.height();
1621        is_epoch_transition(block_height, self.epoch_height)
1622    }
1623
1624    /// Returns true if our high QC is for one of the epoch transition blocks
1625    pub fn is_high_qc_for_epoch_transition(&self) -> bool {
1626        let Some(block_height) = self.high_qc().data.block_number else {
1627            return false;
1628        };
1629        is_epoch_transition(block_height, self.epoch_height)
1630    }
1631
1632    /// Returns true if the `parent_leaf` formed an eQC for the previous epoch to the `proposed_leaf`
1633    pub fn check_eqc(&self, proposed_leaf: &Leaf2<TYPES>, parent_leaf: &Leaf2<TYPES>) -> bool {
1634        if parent_leaf.view_number() == ViewNumber::genesis() {
1635            return true;
1636        }
1637        let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height);
1638        let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height);
1639
1640        new_epoch - 1 == old_epoch && is_last_block(parent_leaf.height(), self.epoch_height)
1641    }
1642}
1643
1644/// Alias for the block payload commitment and the associated metadata. The primary data
1645/// needed in order to submit a proposal.
1646#[derive(Eq, PartialEq, Debug, Clone)]
1647pub struct CommitmentAndMetadata<TYPES: NodeType> {
1648    /// Vid Commitment
1649    pub commitment: VidCommitment,
1650    /// Builder Commitment
1651    pub builder_commitment: BuilderCommitment,
1652    /// Metadata for the block payload
1653    pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
1654    /// Builder fee data
1655    pub fees: Vec1<BuilderFee<TYPES>>,
1656    /// View number this block is for
1657    pub block_view: ViewNumber,
1658}