espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
15        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
16    },
17    drb::{DrbInput, DrbResult},
18    event::{HotShotAction, LeafInfo},
19    message::{convert_proposal, Proposal},
20    simple_certificate::{
21        CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
22        QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
23    },
24    stake_table::HSStakeTable,
25    traits::{
26        metrics::Metrics,
27        node_implementation::{ConsensusTime, NodeType},
28        storage::Storage,
29        ValidatedState as HotShotState,
30    },
31    utils::genesis_epoch_from_version,
32    vote::HasViewNumber,
33};
34use indexmap::IndexMap;
35use serde::{de::DeserializeOwned, Serialize};
36use versions::Upgrade;
37
38use super::{
39    impls::NodeState,
40    utils::BackoffParams,
41    v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44    v0::impls::{StakeTableHash, ValidatedState},
45    v0_3::{
46        ChainConfig, RegisteredValidator, RewardAccountProofV1, RewardAccountV1, RewardAmount,
47        RewardMerkleCommitmentV1,
48    },
49    v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2},
50    AuthenticatedValidatorMap, BlockMerkleTree, Event, FeeAccount, FeeAccountProof,
51    FeeMerkleCommitment, Leaf2, NetworkConfig, PubKey, SeqTypes,
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56    /// Fetch the leaf at the given height without retrying on transient errors.
57    async fn try_fetch_leaf(
58        &self,
59        retry: usize,
60        height: u64,
61        stake_table: HSStakeTable<SeqTypes>,
62        success_threshold: U256,
63    ) -> anyhow::Result<Leaf2>;
64
65    /// Fetch the leaf at the given height, retrying on transient errors.
66    async fn fetch_leaf(
67        &self,
68        height: u64,
69        stake_table: HSStakeTable<SeqTypes>,
70        success_threshold: U256,
71    ) -> anyhow::Result<Leaf2> {
72        self.backoff()
73            .retry(self, |provider, retry| {
74                let stake_table_clone = stake_table.clone();
75                async move {
76                    provider
77                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78                        .await
79                }
80                .boxed()
81            })
82            .await
83    }
84
85    /// Fetch the given list of accounts without retrying on transient errors.
86    async fn try_fetch_accounts(
87        &self,
88        retry: usize,
89        instance: &NodeState,
90        height: u64,
91        view: ViewNumber,
92        fee_merkle_tree_root: FeeMerkleCommitment,
93        accounts: &[FeeAccount],
94    ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96    /// Fetch the given list of accounts, retrying on transient errors.
97    async fn fetch_accounts(
98        &self,
99        instance: &NodeState,
100        height: u64,
101        view: ViewNumber,
102        fee_merkle_tree_root: FeeMerkleCommitment,
103        accounts: Vec<FeeAccount>,
104    ) -> anyhow::Result<Vec<FeeAccountProof>> {
105        self.backoff()
106            .retry(self, |provider, retry| {
107                let accounts = &accounts;
108                async move {
109                    provider
110                        .try_fetch_accounts(
111                            retry,
112                            instance,
113                            height,
114                            view,
115                            fee_merkle_tree_root,
116                            accounts,
117                        )
118                        .await
119                        .map_err(|err| {
120                            err.context(format!(
121                                "fetching accounts {accounts:?}, height {height}, view {view}"
122                            ))
123                        })
124                }
125                .boxed()
126            })
127            .await
128    }
129
130    /// Fetch and remember the blocks frontier without retrying on transient errors.
131    async fn try_remember_blocks_merkle_tree(
132        &self,
133        retry: usize,
134        instance: &NodeState,
135        height: u64,
136        view: ViewNumber,
137        mt: &mut BlockMerkleTree,
138    ) -> anyhow::Result<()>;
139
140    /// Fetch and remember the blocks frontier, retrying on transient errors.
141    async fn remember_blocks_merkle_tree(
142        &self,
143        instance: &NodeState,
144        height: u64,
145        view: ViewNumber,
146        mt: &mut BlockMerkleTree,
147    ) -> anyhow::Result<()> {
148        self.backoff()
149            .retry(mt, |mt, retry| {
150                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152                    .boxed()
153            })
154            .await
155    }
156
157    /// Fetch the chain config without retrying on transient errors.
158    async fn try_fetch_chain_config(
159        &self,
160        retry: usize,
161        commitment: Commitment<ChainConfig>,
162    ) -> anyhow::Result<ChainConfig>;
163
164    /// Fetch the chain config, retrying on transient errors.
165    async fn fetch_chain_config(
166        &self,
167        commitment: Commitment<ChainConfig>,
168    ) -> anyhow::Result<ChainConfig> {
169        self.backoff()
170            .retry(self, |provider, retry| {
171                provider
172                    .try_fetch_chain_config(retry, commitment)
173                    .map_err(|err| err.context("fetching chain config"))
174                    .boxed()
175            })
176            .await
177    }
178
179    /// Fetch the given reward merkle tree without retrying on transient errors.
180    async fn try_fetch_reward_merkle_tree_v2(
181        &self,
182        retry: usize,
183        height: u64,
184        view: ViewNumber,
185        reward_merkle_tree_root: RewardMerkleCommitmentV2,
186        accounts: Arc<Vec<RewardAccountV2>>,
187    ) -> anyhow::Result<PermittedRewardMerkleTreeV2>;
188
189    async fn fetch_reward_merkle_tree_v2(
190        &self,
191        height: u64,
192        view: ViewNumber,
193        reward_merkle_tree_root: RewardMerkleCommitmentV2,
194        accounts: Arc<Vec<RewardAccountV2>>,
195    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
196        self.backoff()
197            .retry(self, |provider, retry| {
198                let accounts = accounts.clone();
199                async move {
200                    provider
201                        .try_fetch_reward_merkle_tree_v2(
202                            retry,
203                            height,
204                            view,
205                            reward_merkle_tree_root,
206                            accounts,
207                        )
208                        .await
209                        .map_err(|err| {
210                            err.context(format!("fetching reward merkle tree for height {height}"))
211                        })
212                }
213                .boxed()
214            })
215            .await
216    }
217
218    /// Fetch the given list of reward accounts without retrying on transient errors.
219    async fn try_fetch_reward_accounts_v1(
220        &self,
221        retry: usize,
222        instance: &NodeState,
223        height: u64,
224        view: ViewNumber,
225        reward_merkle_tree_root: RewardMerkleCommitmentV1,
226        accounts: &[RewardAccountV1],
227    ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
228
229    /// Fetch the given list of reward accounts, retrying on transient errors.
230    async fn fetch_reward_accounts_v1(
231        &self,
232        instance: &NodeState,
233        height: u64,
234        view: ViewNumber,
235        reward_merkle_tree_root: RewardMerkleCommitmentV1,
236        accounts: Vec<RewardAccountV1>,
237    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
238        self.backoff()
239            .retry(self, |provider, retry| {
240                let accounts = &accounts;
241                async move {
242                    provider
243                        .try_fetch_reward_accounts_v1(
244                            retry,
245                            instance,
246                            height,
247                            view,
248                            reward_merkle_tree_root,
249                            accounts,
250                        )
251                        .await
252                        .map_err(|err| {
253                            err.context(format!(
254                                "fetching v1 reward accounts {accounts:?}, height {height}, view \
255                                 {view}"
256                            ))
257                        })
258                }
259                .boxed()
260            })
261            .await
262    }
263
264    /// Fetch the state certificate for a given epoch without retrying on transient errors.
265    async fn try_fetch_state_cert(
266        &self,
267        retry: usize,
268        epoch: u64,
269    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
270
271    /// Fetch the state certificate for a given epoch, retrying on transient errors.
272    async fn fetch_state_cert(
273        &self,
274        epoch: u64,
275    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
276        self.backoff()
277            .retry(self, |provider, retry| {
278                provider
279                    .try_fetch_state_cert(retry, epoch)
280                    .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
281                    .boxed()
282            })
283            .await
284    }
285
286    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
287    fn is_local(&self) -> bool;
288
289    /// Returns the backoff parameters for the catchup provider.
290    fn backoff(&self) -> &BackoffParams;
291
292    /// Returns the name of the catchup provider.
293    fn name(&self) -> String;
294}
295
296#[async_trait]
297impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
298    async fn try_fetch_leaf(
299        &self,
300        retry: usize,
301        height: u64,
302        stake_table: HSStakeTable<SeqTypes>,
303        success_threshold: U256,
304    ) -> anyhow::Result<Leaf2> {
305        (**self)
306            .try_fetch_leaf(retry, height, stake_table, success_threshold)
307            .await
308    }
309
310    async fn fetch_leaf(
311        &self,
312        height: u64,
313        stake_table: HSStakeTable<SeqTypes>,
314        success_threshold: U256,
315    ) -> anyhow::Result<Leaf2> {
316        (**self)
317            .fetch_leaf(height, stake_table, success_threshold)
318            .await
319    }
320    async fn try_fetch_accounts(
321        &self,
322        retry: usize,
323        instance: &NodeState,
324        height: u64,
325        view: ViewNumber,
326        fee_merkle_tree_root: FeeMerkleCommitment,
327        accounts: &[FeeAccount],
328    ) -> anyhow::Result<Vec<FeeAccountProof>> {
329        (**self)
330            .try_fetch_accounts(
331                retry,
332                instance,
333                height,
334                view,
335                fee_merkle_tree_root,
336                accounts,
337            )
338            .await
339    }
340
341    async fn fetch_accounts(
342        &self,
343        instance: &NodeState,
344        height: u64,
345        view: ViewNumber,
346        fee_merkle_tree_root: FeeMerkleCommitment,
347        accounts: Vec<FeeAccount>,
348    ) -> anyhow::Result<Vec<FeeAccountProof>> {
349        (**self)
350            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
351            .await
352    }
353
354    async fn try_remember_blocks_merkle_tree(
355        &self,
356        retry: usize,
357        instance: &NodeState,
358        height: u64,
359        view: ViewNumber,
360        mt: &mut BlockMerkleTree,
361    ) -> anyhow::Result<()> {
362        (**self)
363            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
364            .await
365    }
366
367    async fn remember_blocks_merkle_tree(
368        &self,
369        instance: &NodeState,
370        height: u64,
371        view: ViewNumber,
372        mt: &mut BlockMerkleTree,
373    ) -> anyhow::Result<()> {
374        (**self)
375            .remember_blocks_merkle_tree(instance, height, view, mt)
376            .await
377    }
378
379    async fn try_fetch_chain_config(
380        &self,
381        retry: usize,
382        commitment: Commitment<ChainConfig>,
383    ) -> anyhow::Result<ChainConfig> {
384        (**self).try_fetch_chain_config(retry, commitment).await
385    }
386
387    async fn fetch_chain_config(
388        &self,
389        commitment: Commitment<ChainConfig>,
390    ) -> anyhow::Result<ChainConfig> {
391        (**self).fetch_chain_config(commitment).await
392    }
393
394    async fn try_fetch_reward_merkle_tree_v2(
395        &self,
396        retry: usize,
397        height: u64,
398        view: ViewNumber,
399        reward_merkle_tree_root: RewardMerkleCommitmentV2,
400        accounts: Arc<Vec<RewardAccountV2>>,
401    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
402        (**self)
403            .try_fetch_reward_merkle_tree_v2(retry, height, view, reward_merkle_tree_root, accounts)
404            .await
405    }
406
407    async fn fetch_reward_merkle_tree_v2(
408        &self,
409        height: u64,
410        view: ViewNumber,
411        reward_merkle_tree_root: RewardMerkleCommitmentV2,
412        accounts: Arc<Vec<RewardAccountV2>>,
413    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
414        (**self)
415            .fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts)
416            .await
417    }
418
419    async fn try_fetch_reward_accounts_v1(
420        &self,
421        retry: usize,
422        instance: &NodeState,
423        height: u64,
424        view: ViewNumber,
425        reward_merkle_tree_root: RewardMerkleCommitmentV1,
426        accounts: &[RewardAccountV1],
427    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
428        (**self)
429            .try_fetch_reward_accounts_v1(
430                retry,
431                instance,
432                height,
433                view,
434                reward_merkle_tree_root,
435                accounts,
436            )
437            .await
438    }
439
440    async fn fetch_reward_accounts_v1(
441        &self,
442        instance: &NodeState,
443        height: u64,
444        view: ViewNumber,
445        reward_merkle_tree_root: RewardMerkleCommitmentV1,
446        accounts: Vec<RewardAccountV1>,
447    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
448        (**self)
449            .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
450            .await
451    }
452
453    async fn try_fetch_state_cert(
454        &self,
455        retry: usize,
456        epoch: u64,
457    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
458        (**self).try_fetch_state_cert(retry, epoch).await
459    }
460
461    async fn fetch_state_cert(
462        &self,
463        epoch: u64,
464    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
465        (**self).fetch_state_cert(epoch).await
466    }
467
468    fn backoff(&self) -> &BackoffParams {
469        (**self).backoff()
470    }
471
472    fn name(&self) -> String {
473        (**self).name()
474    }
475
476    fn is_local(&self) -> bool {
477        (**self).is_local()
478    }
479}
480
481#[async_trait]
482pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
483    type Persistence: SequencerPersistence + MembershipPersistence;
484
485    fn set_view_retention(&mut self, view_retention: u64);
486    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
487    async fn reset(self) -> anyhow::Result<()>;
488}
489
490/// Determine the read state based on the queried block range.
491// - If the persistence returned events up to the requested block, the read is complete.
492/// - Otherwise, indicate that the read is up to the last processed block.
493#[derive(Clone, Copy, Debug, PartialEq, Eq)]
494pub enum EventsPersistenceRead {
495    Complete,
496    UntilL1Block(u64),
497}
498
499/// Tuple type for stake table data: (validators, block_reward, stake_table_hash)
500pub type StakeTuple = (
501    AuthenticatedValidatorMap,
502    Option<RewardAmount>,
503    Option<StakeTableHash>,
504);
505
506#[async_trait]
507/// Trait used by `Memberships` implementations to interact with persistence layer.
508pub trait MembershipPersistence: Send + Sync + 'static {
509    /// Load stake table for epoch from storage
510    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>>;
511
512    /// Load stake tables for storage for latest `n` known epochs
513    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
514
515    /// Store stake table at `epoch` in the persistence layer
516    async fn store_stake(
517        &self,
518        epoch: EpochNumber,
519        stake: AuthenticatedValidatorMap,
520        block_reward: Option<RewardAmount>,
521        stake_table_hash: Option<StakeTableHash>,
522    ) -> anyhow::Result<()>;
523
524    async fn store_events(
525        &self,
526        l1_finalized: u64,
527        events: Vec<(EventKey, StakeTableEvent)>,
528    ) -> anyhow::Result<()>;
529    async fn load_events(
530        &self,
531        from_l1_block: u64,
532        l1_finalized: u64,
533    ) -> anyhow::Result<(
534        Option<EventsPersistenceRead>,
535        Vec<(EventKey, StakeTableEvent)>,
536    )>;
537
538    /// Delete all stake table events, the L1 block tracker, and the epoch DRB and root data.
539    async fn delete_stake_tables(&self) -> anyhow::Result<()>;
540
541    async fn store_all_validators(
542        &self,
543        epoch: EpochNumber,
544        all_validators: IndexMap<Address, RegisteredValidator<PubKey>>,
545    ) -> anyhow::Result<()>;
546
547    async fn load_all_validators(
548        &self,
549        epoch: EpochNumber,
550        offset: u64,
551        limit: u64,
552    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>>;
553}
554
555#[async_trait]
556pub trait SequencerPersistence:
557    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
558{
559    async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()>;
560
561    /// Use this storage as a state catchup backend, if supported.
562    fn into_catchup_provider(
563        self,
564        _backoff: BackoffParams,
565    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
566        bail!("state catchup is not implemented for this persistence type");
567    }
568
569    /// Load the orchestrator config from storage.
570    ///
571    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
572    /// `Err` if it could not be determined whether a config exists or not.
573    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
574
575    /// Save the orchestrator config to storage.
576    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
577
578    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
579    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
580
581    /// Load the view to restart from.
582    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
583
584    /// Load the proposals saved by consensus
585    async fn load_quorum_proposals(
586        &self,
587    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
588
589    async fn load_quorum_proposal(
590        &self,
591        view: ViewNumber,
592    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
593
594    async fn load_vid_share(
595        &self,
596        view: ViewNumber,
597    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
598    async fn load_da_proposal(
599        &self,
600        view: ViewNumber,
601    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
602    async fn load_upgrade_certificate(
603        &self,
604    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
605    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
606    async fn load_state_cert(
607        &self,
608    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
609
610    /// Get a state certificate for an epoch.
611    async fn get_state_cert_by_epoch(
612        &self,
613        epoch: u64,
614    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
615
616    /// Insert a state certificate for a given epoch.
617    async fn insert_state_cert(
618        &self,
619        epoch: u64,
620        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
621    ) -> anyhow::Result<()>;
622
623    /// Load the latest known consensus state.
624    ///
625    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
626    /// if there is no saved state). Also returns the anchor view number, which can be used as a
627    /// reference point to process any events which were not processed before a previous shutdown,
628    /// if applicable,.
629    async fn load_consensus_state(
630        &self,
631        state: NodeState,
632        upgrade: Upgrade,
633    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
634        let genesis_validated_state = ValidatedState::genesis(&state).0;
635        let highest_voted_view = match self
636            .load_latest_acted_view()
637            .await
638            .context("loading last voted view")?
639        {
640            Some(view) => {
641                tracing::info!(?view, "starting with last actioned view");
642                view
643            },
644            None => {
645                tracing::info!("no saved view, starting from genesis");
646                ViewNumber::genesis()
647            },
648        };
649
650        let restart_view = match self
651            .load_restart_view()
652            .await
653            .context("loading restart view")?
654        {
655            Some(view) => {
656                tracing::info!(?view, "starting from saved view");
657                view
658            },
659            None => {
660                tracing::info!("no saved view, starting from genesis");
661                ViewNumber::genesis()
662            },
663        };
664        let next_epoch_high_qc = self
665            .load_next_epoch_quorum_certificate()
666            .await
667            .context("loading next epoch qc")?;
668        let (leaf, mut high_qc, anchor_view) = match self
669            .load_anchor_leaf()
670            .await
671            .context("loading anchor leaf")?
672        {
673            Some((leaf, high_qc)) => {
674                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
675                ensure!(
676                    leaf.view_number() == high_qc.view_number,
677                    format!(
678                        "loaded anchor leaf from view {}, but high QC is from view {}",
679                        leaf.view_number(),
680                        high_qc.view_number
681                    )
682                );
683
684                let anchor_view = leaf.view_number();
685                (leaf, high_qc, Some(anchor_view))
686            },
687            None => {
688                tracing::info!("no saved leaf, starting from genesis leaf");
689                (
690                    hotshot_types::data::Leaf2::genesis(
691                        &genesis_validated_state,
692                        &state,
693                        upgrade.base,
694                    )
695                    .await,
696                    QuorumCertificate2::genesis(&genesis_validated_state, &state, upgrade).await,
697                    None,
698                )
699            },
700        };
701
702        if let Some((extended_high_qc, _)) = self.load_eqc().await {
703            if extended_high_qc.view_number() > high_qc.view_number() {
704                high_qc = extended_high_qc
705            }
706        }
707
708        let validated_state = if leaf.block_header().height() == 0 {
709            // If we are starting from genesis, we can provide the full state.
710            genesis_validated_state
711        } else {
712            // Otherwise, we will have to construct a sparse state and fetch missing data during
713            // catchup.
714            ValidatedState::from_header(leaf.block_header())
715        };
716
717        // If we are not starting from genesis, we start from the view following the maximum view
718        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
719        // starting in a view in which we had already voted before the restart, and prevents
720        // unnecessary catchup from starting in a view earlier than the anchor leaf.
721        let restart_view = max(restart_view, leaf.view_number());
722        // TODO:
723        let epoch = genesis_epoch_from_version::<SeqTypes>(upgrade.base);
724
725        let config = self.load_config().await.context("loading config")?;
726        let epoch_height = config
727            .as_ref()
728            .map(|c| c.config.epoch_height)
729            .unwrap_or_default();
730        let epoch_start_block = config
731            .as_ref()
732            .map(|c| c.config.epoch_start_block)
733            .unwrap_or_default();
734
735        let saved_proposals = self
736            .load_quorum_proposals()
737            .await
738            .context("loading saved proposals")?;
739
740        let upgrade_certificate = self
741            .load_upgrade_certificate()
742            .await
743            .context("loading upgrade certificate")?;
744
745        let start_epoch_info = self
746            .load_start_epoch_info()
747            .await
748            .context("loading start epoch info")?;
749
750        let state_cert = self
751            .load_state_cert()
752            .await
753            .context("loading light client state update certificate")?;
754
755        tracing::warn!(
756            ?leaf,
757            ?restart_view,
758            ?epoch,
759            ?high_qc,
760            ?validated_state,
761            ?state_cert,
762            "loaded consensus state"
763        );
764
765        Ok((
766            HotShotInitializer {
767                instance_state: state,
768                epoch_height,
769                epoch_start_block,
770                anchor_leaf: leaf,
771                anchor_state: Arc::new(validated_state),
772                anchor_state_delta: None,
773                start_view: restart_view,
774                start_epoch: epoch,
775                last_actioned_view: highest_voted_view,
776                saved_proposals,
777                high_qc,
778                next_epoch_high_qc,
779                decided_upgrade_certificate: upgrade_certificate,
780                undecided_leaves: Default::default(),
781                undecided_state: Default::default(),
782                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
783                start_epoch_info,
784                state_cert,
785            },
786            anchor_view,
787        ))
788    }
789
790    /// Update storage based on an event from consensus.
791    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
792        if let EventType::Decide {
793            leaf_chain,
794            committing_qc,
795            deciding_qc,
796            ..
797        } = &event.event
798        {
799            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
800                // No new leaves.
801                return;
802            };
803
804            // Associate each decided leaf with a QC.
805            let chain = leaf_chain.iter().zip(
806                // The first (most recent) leaf corresponds to the QC triggering the decide event.
807                std::iter::once((**committing_qc).clone())
808                    // Moving backwards in the chain, each leaf corresponds with the subsequent
809                    // leaf's justify QC.
810                    .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
811            );
812
813            if let Err(err) = self
814                .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
815                .await
816            {
817                tracing::error!(
818                    "failed to save decided leaves, chain may not be up to date: {err:#}"
819                );
820                return;
821            }
822        }
823    }
824
825    /// Append decided leaves to persistent storage and emit a corresponding event.
826    ///
827    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
828    /// up to and including `view`. If available in persistent storage, full block payloads and VID
829    /// info will also be included for each leaf.
830    ///
831    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
832    /// collected The consumer's handling of this event is a prerequisite for the completion of
833    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
834    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
835    /// will eventually be passed to the consumer.
836    ///
837    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
838    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
839    /// processed twice, or the consumer may get two events which share a subset of their data.
840    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
841    /// idempotent.
842    ///
843    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
844    /// until the next decide, at which point all persisted leaves from the failed GC run will be
845    /// included in the event along with subsequently decided leaves.
846    ///
847    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
848    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
849    /// storage to long-term archival storage.
850    async fn append_decided_leaves(
851        &self,
852        decided_view: ViewNumber,
853        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
854        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
855        consumer: &(impl EventConsumer + 'static),
856    ) -> anyhow::Result<()>;
857
858    async fn load_anchor_leaf(
859        &self,
860    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
861    async fn append_vid(
862        &self,
863        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
864    ) -> anyhow::Result<()>;
865    async fn append_da(
866        &self,
867        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
868        vid_commit: VidCommitment,
869    ) -> anyhow::Result<()>;
870    async fn record_action(
871        &self,
872        view: ViewNumber,
873        epoch: Option<EpochNumber>,
874        action: HotShotAction,
875    ) -> anyhow::Result<()>;
876
877    async fn append_quorum_proposal2(
878        &self,
879        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
880    ) -> anyhow::Result<()>;
881
882    /// Update the current eQC in storage.
883    async fn store_eqc(
884        &self,
885        _high_qc: QuorumCertificate2<SeqTypes>,
886        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
887    ) -> anyhow::Result<()>;
888
889    /// Load the current eQC from storage.
890    async fn load_eqc(
891        &self,
892    ) -> Option<(
893        QuorumCertificate2<SeqTypes>,
894        NextEpochQuorumCertificate2<SeqTypes>,
895    )>;
896
897    async fn store_upgrade_certificate(
898        &self,
899        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
900    ) -> anyhow::Result<()>;
901
902    async fn migrate_storage(&self) -> anyhow::Result<()> {
903        tracing::warn!("migrating consensus data...");
904
905        self.migrate_anchor_leaf().await?;
906        self.migrate_da_proposals().await?;
907        self.migrate_vid_shares().await?;
908        self.migrate_quorum_proposals().await?;
909        self.migrate_quorum_certificates().await?;
910        self.migrate_reward_merkle_tree_v2()
911            .await
912            .context("failed to migrate reward merkle tree v2")?;
913        self.migrate_validator_authenticated().await?;
914
915        tracing::warn!("consensus storage has been migrated to new types");
916
917        Ok(())
918    }
919
920    async fn migrate_validator_authenticated(&self) -> anyhow::Result<()>;
921
922    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
923    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
924    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
925    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
926    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
927
928    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
929        match self.load_anchor_leaf().await? {
930            Some((leaf, _)) => Ok(leaf.view_number()),
931            None => Ok(ViewNumber::genesis()),
932        }
933    }
934
935    async fn store_next_epoch_quorum_certificate(
936        &self,
937        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
938    ) -> anyhow::Result<()>;
939
940    async fn load_next_epoch_quorum_certificate(
941        &self,
942    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
943
944    async fn append_da2(
945        &self,
946        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
947        vid_commit: VidCommitment,
948    ) -> anyhow::Result<()>;
949
950    async fn append_proposal2(
951        &self,
952        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
953    ) -> anyhow::Result<()> {
954        self.append_quorum_proposal2(proposal).await
955    }
956
957    async fn store_drb_result(
958        &self,
959        epoch: <SeqTypes as NodeType>::Epoch,
960        drb_result: DrbResult,
961    ) -> anyhow::Result<()>;
962    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
963    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
964    async fn store_epoch_root(
965        &self,
966        epoch: <SeqTypes as NodeType>::Epoch,
967        block_header: <SeqTypes as NodeType>::BlockHeader,
968    ) -> anyhow::Result<()>;
969    async fn add_state_cert(
970        &self,
971        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
972    ) -> anyhow::Result<()>;
973
974    fn enable_metrics(&mut self, metrics: &dyn Metrics);
975}
976
977#[async_trait]
978pub trait EventConsumer: Debug + Send + Sync {
979    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
980}
981
982#[async_trait]
983impl<T> EventConsumer for Box<T>
984where
985    T: EventConsumer + ?Sized,
986{
987    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
988        (**self).handle_event(event).await
989    }
990}
991
992#[derive(Clone, Copy, Debug)]
993pub struct NullEventConsumer;
994
995#[async_trait]
996impl EventConsumer for NullEventConsumer {
997    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
998        Ok(())
999    }
1000}
1001
1002#[async_trait]
1003impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1004    async fn append_vid(
1005        &self,
1006        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1007    ) -> anyhow::Result<()> {
1008        (**self).append_vid(proposal).await
1009    }
1010
1011    async fn append_da(
1012        &self,
1013        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1014        vid_commit: VidCommitment,
1015    ) -> anyhow::Result<()> {
1016        (**self).append_da(proposal, vid_commit).await
1017    }
1018
1019    async fn append_da2(
1020        &self,
1021        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1022        vid_commit: VidCommitment,
1023    ) -> anyhow::Result<()> {
1024        (**self).append_da2(proposal, vid_commit).await
1025    }
1026
1027    async fn record_action(
1028        &self,
1029        view: ViewNumber,
1030        epoch: Option<EpochNumber>,
1031        action: HotShotAction,
1032    ) -> anyhow::Result<()> {
1033        (**self).record_action(view, epoch, action).await
1034    }
1035
1036    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1037        Ok(())
1038    }
1039
1040    async fn append_proposal(
1041        &self,
1042        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1043    ) -> anyhow::Result<()> {
1044        (**self)
1045            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1046            .await
1047    }
1048
1049    async fn append_proposal2(
1050        &self,
1051        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1052    ) -> anyhow::Result<()> {
1053        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1054            convert_proposal(proposal.clone());
1055        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1056    }
1057
1058    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1059        Ok(())
1060    }
1061
1062    /// Update the current eQC in storage.
1063    async fn update_eqc(
1064        &self,
1065        high_qc: QuorumCertificate2<SeqTypes>,
1066        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1067    ) -> anyhow::Result<()> {
1068        if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1069            if high_qc.view_number() < existing_high_qc.view_number() {
1070                return Ok(());
1071            }
1072        }
1073
1074        (**self).store_eqc(high_qc, next_epoch_high_qc).await
1075    }
1076
1077    async fn update_next_epoch_high_qc2(
1078        &self,
1079        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1080    ) -> anyhow::Result<()> {
1081        Ok(())
1082    }
1083
1084    async fn update_decided_upgrade_certificate(
1085        &self,
1086        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1087    ) -> anyhow::Result<()> {
1088        (**self)
1089            .store_upgrade_certificate(decided_upgrade_certificate)
1090            .await
1091    }
1092
1093    async fn store_drb_result(
1094        &self,
1095        epoch: <SeqTypes as NodeType>::Epoch,
1096        drb_result: DrbResult,
1097    ) -> anyhow::Result<()> {
1098        (**self).store_drb_result(epoch, drb_result).await
1099    }
1100
1101    async fn store_epoch_root(
1102        &self,
1103        epoch: <SeqTypes as NodeType>::Epoch,
1104        block_header: <SeqTypes as NodeType>::BlockHeader,
1105    ) -> anyhow::Result<()> {
1106        (**self).store_epoch_root(epoch, block_header).await
1107    }
1108
1109    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1110        (**self).store_drb_input(drb_input).await
1111    }
1112
1113    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1114        (**self).load_drb_input(epoch).await
1115    }
1116
1117    async fn update_state_cert(
1118        &self,
1119        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1120    ) -> anyhow::Result<()> {
1121        (**self).add_state_cert(state_cert).await
1122    }
1123}
1124
1125/// Data that can be deserialized from a subslice of namespace payload bytes.
1126///
1127/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
1128/// namespace payload bytes to read.
1129pub trait FromNsPayloadBytes<'a> {
1130    /// Deserialize `Self` from namespace payload bytes.
1131    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1132}
1133
1134/// Specifies a subslice of namespace payload bytes to read.
1135///
1136/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
1137/// deserialized from that subslice of bytes.
1138pub trait NsPayloadBytesRange<'a> {
1139    type Output: FromNsPayloadBytes<'a>;
1140
1141    /// Range relative to this ns payload
1142    fn ns_payload_range(&self) -> Range<usize>;
1143}
1144
1145/// Types which can be deserialized from either integers or strings.
1146///
1147/// Some types can be represented as an integer or a string in human-readable formats like JSON or
1148/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
1149/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
1150/// to derive this user-friendly serialization.
1151///
1152/// These types are assumed to have an efficient representation as an integral type in Rust --
1153/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
1154/// encoding. With human readable encodings, serialization is always to a string.
1155pub trait FromStringOrInteger: Sized {
1156    type Binary: Serialize + DeserializeOwned;
1157    type Integer: Serialize + DeserializeOwned;
1158
1159    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1160    fn from_string(s: String) -> anyhow::Result<Self>;
1161    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1162
1163    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1164    fn to_string(&self) -> anyhow::Result<String>;
1165}