espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17    },
18    drb::{DrbInput, DrbResult},
19    event::{HotShotAction, LeafInfo},
20    message::{convert_proposal, Proposal},
21    simple_certificate::{
22        CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
23        QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::HSStakeTable,
26    traits::{
27        metrics::Metrics,
28        node_implementation::{ConsensusTime, NodeType, Versions},
29        storage::Storage,
30        ValidatedState as HotShotState,
31    },
32    utils::genesis_epoch_from_version,
33    vote::HasViewNumber,
34};
35use indexmap::IndexMap;
36use serde::{de::DeserializeOwned, Serialize};
37
38use super::{
39    impls::NodeState,
40    utils::BackoffParams,
41    v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44    v0::impls::{StakeTableHash, ValidatedState},
45    v0_3::{
46        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
47        Validator,
48    },
49    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
50    BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
51    PubKey, SeqTypes, ValidatorMap,
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56    /// Fetch the leaf at the given height without retrying on transient errors.
57    async fn try_fetch_leaf(
58        &self,
59        retry: usize,
60        height: u64,
61        stake_table: HSStakeTable<SeqTypes>,
62        success_threshold: U256,
63    ) -> anyhow::Result<Leaf2>;
64
65    /// Fetch the leaf at the given height, retrying on transient errors.
66    async fn fetch_leaf(
67        &self,
68        height: u64,
69        stake_table: HSStakeTable<SeqTypes>,
70        success_threshold: U256,
71    ) -> anyhow::Result<Leaf2> {
72        self.backoff()
73            .retry(self, |provider, retry| {
74                let stake_table_clone = stake_table.clone();
75                async move {
76                    provider
77                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78                        .await
79                }
80                .boxed()
81            })
82            .await
83    }
84
85    /// Fetch the given list of accounts without retrying on transient errors.
86    async fn try_fetch_accounts(
87        &self,
88        retry: usize,
89        instance: &NodeState,
90        height: u64,
91        view: ViewNumber,
92        fee_merkle_tree_root: FeeMerkleCommitment,
93        accounts: &[FeeAccount],
94    ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96    /// Fetch the given list of accounts, retrying on transient errors.
97    async fn fetch_accounts(
98        &self,
99        instance: &NodeState,
100        height: u64,
101        view: ViewNumber,
102        fee_merkle_tree_root: FeeMerkleCommitment,
103        accounts: Vec<FeeAccount>,
104    ) -> anyhow::Result<Vec<FeeAccountProof>> {
105        self.backoff()
106            .retry(self, |provider, retry| {
107                let accounts = &accounts;
108                async move {
109                    provider
110                        .try_fetch_accounts(
111                            retry,
112                            instance,
113                            height,
114                            view,
115                            fee_merkle_tree_root,
116                            accounts,
117                        )
118                        .await
119                        .map_err(|err| {
120                            err.context(format!(
121                                "fetching accounts {accounts:?}, height {height}, view {view}"
122                            ))
123                        })
124                }
125                .boxed()
126            })
127            .await
128    }
129
130    /// Fetch and remember the blocks frontier without retrying on transient errors.
131    async fn try_remember_blocks_merkle_tree(
132        &self,
133        retry: usize,
134        instance: &NodeState,
135        height: u64,
136        view: ViewNumber,
137        mt: &mut BlockMerkleTree,
138    ) -> anyhow::Result<()>;
139
140    /// Fetch and remember the blocks frontier, retrying on transient errors.
141    async fn remember_blocks_merkle_tree(
142        &self,
143        instance: &NodeState,
144        height: u64,
145        view: ViewNumber,
146        mt: &mut BlockMerkleTree,
147    ) -> anyhow::Result<()> {
148        self.backoff()
149            .retry(mt, |mt, retry| {
150                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152                    .boxed()
153            })
154            .await
155    }
156
157    /// Fetch the chain config without retrying on transient errors.
158    async fn try_fetch_chain_config(
159        &self,
160        retry: usize,
161        commitment: Commitment<ChainConfig>,
162    ) -> anyhow::Result<ChainConfig>;
163
164    /// Fetch the chain config, retrying on transient errors.
165    async fn fetch_chain_config(
166        &self,
167        commitment: Commitment<ChainConfig>,
168    ) -> anyhow::Result<ChainConfig> {
169        self.backoff()
170            .retry(self, |provider, retry| {
171                provider
172                    .try_fetch_chain_config(retry, commitment)
173                    .map_err(|err| err.context("fetching chain config"))
174                    .boxed()
175            })
176            .await
177    }
178
179    /// Fetch the given list of reward accounts without retrying on transient errors.
180    async fn try_fetch_reward_accounts_v2(
181        &self,
182        retry: usize,
183        instance: &NodeState,
184        height: u64,
185        view: ViewNumber,
186        reward_merkle_tree_root: RewardMerkleCommitmentV2,
187        accounts: &[RewardAccountV2],
188    ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
189
190    /// Fetch the given list of reward accounts, retrying on transient errors.
191    async fn fetch_reward_accounts_v2(
192        &self,
193        instance: &NodeState,
194        height: u64,
195        view: ViewNumber,
196        reward_merkle_tree_root: RewardMerkleCommitmentV2,
197        accounts: Vec<RewardAccountV2>,
198    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
199        self.backoff()
200            .retry(self, |provider, retry| {
201                let accounts = &accounts;
202                async move {
203                    provider
204                        .try_fetch_reward_accounts_v2(
205                            retry,
206                            instance,
207                            height,
208                            view,
209                            reward_merkle_tree_root,
210                            accounts,
211                        )
212                        .await
213                        .map_err(|err| {
214                            err.context(format!(
215                                "fetching reward accounts {accounts:?}, height {height}, view \
216                                 {view}"
217                            ))
218                        })
219                }
220                .boxed()
221            })
222            .await
223    }
224
225    /// Fetch the given list of reward accounts without retrying on transient errors.
226    async fn try_fetch_reward_accounts_v1(
227        &self,
228        retry: usize,
229        instance: &NodeState,
230        height: u64,
231        view: ViewNumber,
232        reward_merkle_tree_root: RewardMerkleCommitmentV1,
233        accounts: &[RewardAccountV1],
234    ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
235
236    /// Fetch the given list of reward accounts, retrying on transient errors.
237    async fn fetch_reward_accounts_v1(
238        &self,
239        instance: &NodeState,
240        height: u64,
241        view: ViewNumber,
242        reward_merkle_tree_root: RewardMerkleCommitmentV1,
243        accounts: Vec<RewardAccountV1>,
244    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
245        self.backoff()
246            .retry(self, |provider, retry| {
247                let accounts = &accounts;
248                async move {
249                    provider
250                        .try_fetch_reward_accounts_v1(
251                            retry,
252                            instance,
253                            height,
254                            view,
255                            reward_merkle_tree_root,
256                            accounts,
257                        )
258                        .await
259                        .map_err(|err| {
260                            err.context(format!(
261                                "fetching v1 reward accounts {accounts:?}, height {height}, view \
262                                 {view}"
263                            ))
264                        })
265                }
266                .boxed()
267            })
268            .await
269    }
270
271    /// Fetch the state certificate for a given epoch without retrying on transient errors.
272    async fn try_fetch_state_cert(
273        &self,
274        retry: usize,
275        epoch: u64,
276    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
277
278    /// Fetch the state certificate for a given epoch, retrying on transient errors.
279    async fn fetch_state_cert(
280        &self,
281        epoch: u64,
282    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
283        self.backoff()
284            .retry(self, |provider, retry| {
285                provider
286                    .try_fetch_state_cert(retry, epoch)
287                    .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
288                    .boxed()
289            })
290            .await
291    }
292
293    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
294    fn is_local(&self) -> bool;
295
296    /// Returns the backoff parameters for the catchup provider.
297    fn backoff(&self) -> &BackoffParams;
298
299    /// Returns the name of the catchup provider.
300    fn name(&self) -> String;
301}
302
303#[async_trait]
304impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
305    async fn try_fetch_leaf(
306        &self,
307        retry: usize,
308        height: u64,
309        stake_table: HSStakeTable<SeqTypes>,
310        success_threshold: U256,
311    ) -> anyhow::Result<Leaf2> {
312        (**self)
313            .try_fetch_leaf(retry, height, stake_table, success_threshold)
314            .await
315    }
316
317    async fn fetch_leaf(
318        &self,
319        height: u64,
320        stake_table: HSStakeTable<SeqTypes>,
321        success_threshold: U256,
322    ) -> anyhow::Result<Leaf2> {
323        (**self)
324            .fetch_leaf(height, stake_table, success_threshold)
325            .await
326    }
327    async fn try_fetch_accounts(
328        &self,
329        retry: usize,
330        instance: &NodeState,
331        height: u64,
332        view: ViewNumber,
333        fee_merkle_tree_root: FeeMerkleCommitment,
334        accounts: &[FeeAccount],
335    ) -> anyhow::Result<Vec<FeeAccountProof>> {
336        (**self)
337            .try_fetch_accounts(
338                retry,
339                instance,
340                height,
341                view,
342                fee_merkle_tree_root,
343                accounts,
344            )
345            .await
346    }
347
348    async fn fetch_accounts(
349        &self,
350        instance: &NodeState,
351        height: u64,
352        view: ViewNumber,
353        fee_merkle_tree_root: FeeMerkleCommitment,
354        accounts: Vec<FeeAccount>,
355    ) -> anyhow::Result<Vec<FeeAccountProof>> {
356        (**self)
357            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
358            .await
359    }
360
361    async fn try_remember_blocks_merkle_tree(
362        &self,
363        retry: usize,
364        instance: &NodeState,
365        height: u64,
366        view: ViewNumber,
367        mt: &mut BlockMerkleTree,
368    ) -> anyhow::Result<()> {
369        (**self)
370            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
371            .await
372    }
373
374    async fn remember_blocks_merkle_tree(
375        &self,
376        instance: &NodeState,
377        height: u64,
378        view: ViewNumber,
379        mt: &mut BlockMerkleTree,
380    ) -> anyhow::Result<()> {
381        (**self)
382            .remember_blocks_merkle_tree(instance, height, view, mt)
383            .await
384    }
385
386    async fn try_fetch_chain_config(
387        &self,
388        retry: usize,
389        commitment: Commitment<ChainConfig>,
390    ) -> anyhow::Result<ChainConfig> {
391        (**self).try_fetch_chain_config(retry, commitment).await
392    }
393
394    async fn fetch_chain_config(
395        &self,
396        commitment: Commitment<ChainConfig>,
397    ) -> anyhow::Result<ChainConfig> {
398        (**self).fetch_chain_config(commitment).await
399    }
400
401    async fn try_fetch_reward_accounts_v2(
402        &self,
403        retry: usize,
404        instance: &NodeState,
405        height: u64,
406        view: ViewNumber,
407        reward_merkle_tree_root: RewardMerkleCommitmentV2,
408        accounts: &[RewardAccountV2],
409    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
410        (**self)
411            .try_fetch_reward_accounts_v2(
412                retry,
413                instance,
414                height,
415                view,
416                reward_merkle_tree_root,
417                accounts,
418            )
419            .await
420    }
421
422    async fn fetch_reward_accounts_v2(
423        &self,
424        instance: &NodeState,
425        height: u64,
426        view: ViewNumber,
427        reward_merkle_tree_root: RewardMerkleCommitmentV2,
428        accounts: Vec<RewardAccountV2>,
429    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
430        (**self)
431            .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
432            .await
433    }
434
435    async fn try_fetch_reward_accounts_v1(
436        &self,
437        retry: usize,
438        instance: &NodeState,
439        height: u64,
440        view: ViewNumber,
441        reward_merkle_tree_root: RewardMerkleCommitmentV1,
442        accounts: &[RewardAccountV1],
443    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
444        (**self)
445            .try_fetch_reward_accounts_v1(
446                retry,
447                instance,
448                height,
449                view,
450                reward_merkle_tree_root,
451                accounts,
452            )
453            .await
454    }
455
456    async fn fetch_reward_accounts_v1(
457        &self,
458        instance: &NodeState,
459        height: u64,
460        view: ViewNumber,
461        reward_merkle_tree_root: RewardMerkleCommitmentV1,
462        accounts: Vec<RewardAccountV1>,
463    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
464        (**self)
465            .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
466            .await
467    }
468
469    async fn try_fetch_state_cert(
470        &self,
471        retry: usize,
472        epoch: u64,
473    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
474        (**self).try_fetch_state_cert(retry, epoch).await
475    }
476
477    async fn fetch_state_cert(
478        &self,
479        epoch: u64,
480    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
481        (**self).fetch_state_cert(epoch).await
482    }
483
484    fn backoff(&self) -> &BackoffParams {
485        (**self).backoff()
486    }
487
488    fn name(&self) -> String {
489        (**self).name()
490    }
491
492    fn is_local(&self) -> bool {
493        (**self).is_local()
494    }
495}
496
497#[async_trait]
498pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
499    type Persistence: SequencerPersistence + MembershipPersistence;
500
501    fn set_view_retention(&mut self, view_retention: u64);
502    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
503    async fn reset(self) -> anyhow::Result<()>;
504}
505
506/// Determine the read state based on the queried block range.
507// - If the persistence returned events up to the requested block, the read is complete.
508/// - Otherwise, indicate that the read is up to the last processed block.
509pub enum EventsPersistenceRead {
510    Complete,
511    UntilL1Block(u64),
512}
513
514#[async_trait]
515/// Trait used by `Memberships` implementations to interact with persistence layer.
516pub trait MembershipPersistence: Send + Sync + 'static {
517    /// Load stake table for epoch from storage
518    async fn load_stake(
519        &self,
520        epoch: EpochNumber,
521    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
522
523    /// Load stake tables for storage for latest `n` known epochs
524    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
525
526    /// Store stake table at `epoch` in the persistence layer
527    async fn store_stake(
528        &self,
529        epoch: EpochNumber,
530        stake: ValidatorMap,
531        block_reward: Option<RewardAmount>,
532        stake_table_hash: Option<StakeTableHash>,
533    ) -> anyhow::Result<()>;
534
535    async fn store_events(
536        &self,
537        l1_finalized: u64,
538        events: Vec<(EventKey, StakeTableEvent)>,
539    ) -> anyhow::Result<()>;
540    async fn load_events(
541        &self,
542        l1_finalized: u64,
543    ) -> anyhow::Result<(
544        Option<EventsPersistenceRead>,
545        Vec<(EventKey, StakeTableEvent)>,
546    )>;
547
548    async fn store_all_validators(
549        &self,
550        epoch: EpochNumber,
551        all_validators: IndexMap<Address, Validator<PubKey>>,
552    ) -> anyhow::Result<()>;
553
554    async fn load_all_validators(
555        &self,
556        epoch: EpochNumber,
557        offset: u64,
558        limit: u64,
559    ) -> anyhow::Result<Vec<Validator<PubKey>>>;
560}
561
562#[async_trait]
563pub trait SequencerPersistence:
564    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
565{
566    /// Use this storage as a state catchup backend, if supported.
567    fn into_catchup_provider(
568        self,
569        _backoff: BackoffParams,
570    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
571        bail!("state catchup is not implemented for this persistence type");
572    }
573
574    /// Load the orchestrator config from storage.
575    ///
576    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
577    /// `Err` if it could not be determined whether a config exists or not.
578    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
579
580    /// Save the orchestrator config to storage.
581    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
582
583    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
584    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
585
586    /// Load the view to restart from.
587    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
588
589    /// Load the proposals saved by consensus
590    async fn load_quorum_proposals(
591        &self,
592    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
593
594    async fn load_quorum_proposal(
595        &self,
596        view: ViewNumber,
597    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
598
599    async fn load_vid_share(
600        &self,
601        view: ViewNumber,
602    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
603    async fn load_da_proposal(
604        &self,
605        view: ViewNumber,
606    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
607    async fn load_upgrade_certificate(
608        &self,
609    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
610    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
611    async fn load_state_cert(
612        &self,
613    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
614
615    /// Get a state certificate for an epoch.
616    async fn get_state_cert_by_epoch(
617        &self,
618        epoch: u64,
619    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
620
621    /// Insert a state certificate for a given epoch.
622    async fn insert_state_cert(
623        &self,
624        epoch: u64,
625        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
626    ) -> anyhow::Result<()>;
627
628    /// Load the latest known consensus state.
629    ///
630    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
631    /// if there is no saved state). Also returns the anchor view number, which can be used as a
632    /// reference point to process any events which were not processed before a previous shutdown,
633    /// if applicable,.
634    async fn load_consensus_state<V: Versions>(
635        &self,
636        state: NodeState,
637    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
638        let genesis_validated_state = ValidatedState::genesis(&state).0;
639        let highest_voted_view = match self
640            .load_latest_acted_view()
641            .await
642            .context("loading last voted view")?
643        {
644            Some(view) => {
645                tracing::info!(?view, "starting with last actioned view");
646                view
647            },
648            None => {
649                tracing::info!("no saved view, starting from genesis");
650                ViewNumber::genesis()
651            },
652        };
653
654        let restart_view = match self
655            .load_restart_view()
656            .await
657            .context("loading restart view")?
658        {
659            Some(view) => {
660                tracing::info!(?view, "starting from saved view");
661                view
662            },
663            None => {
664                tracing::info!("no saved view, starting from genesis");
665                ViewNumber::genesis()
666            },
667        };
668        let next_epoch_high_qc = self
669            .load_next_epoch_quorum_certificate()
670            .await
671            .context("loading next epoch qc")?;
672        let (leaf, mut high_qc, anchor_view) = match self
673            .load_anchor_leaf()
674            .await
675            .context("loading anchor leaf")?
676        {
677            Some((leaf, high_qc)) => {
678                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
679                ensure!(
680                    leaf.view_number() == high_qc.view_number,
681                    format!(
682                        "loaded anchor leaf from view {}, but high QC is from view {}",
683                        leaf.view_number(),
684                        high_qc.view_number
685                    )
686                );
687
688                let anchor_view = leaf.view_number();
689                (leaf, high_qc, Some(anchor_view))
690            },
691            None => {
692                tracing::info!("no saved leaf, starting from genesis leaf");
693                (
694                    hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
695                        .await,
696                    QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
697                    None,
698                )
699            },
700        };
701
702        if let Some((extended_high_qc, _)) = self.load_eqc().await {
703            if extended_high_qc.view_number() > high_qc.view_number() {
704                high_qc = extended_high_qc
705            }
706        }
707
708        let validated_state = if leaf.block_header().height() == 0 {
709            // If we are starting from genesis, we can provide the full state.
710            genesis_validated_state
711        } else {
712            // Otherwise, we will have to construct a sparse state and fetch missing data during
713            // catchup.
714            ValidatedState::from_header(leaf.block_header())
715        };
716
717        // If we are not starting from genesis, we start from the view following the maximum view
718        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
719        // starting in a view in which we had already voted before the restart, and prevents
720        // unnecessary catchup from starting in a view earlier than the anchor leaf.
721        let restart_view = max(restart_view, leaf.view_number());
722        // TODO:
723        let epoch = genesis_epoch_from_version::<V, SeqTypes>();
724
725        let config = self.load_config().await.context("loading config")?;
726        let epoch_height = config
727            .as_ref()
728            .map(|c| c.config.epoch_height)
729            .unwrap_or_default();
730        let epoch_start_block = config
731            .as_ref()
732            .map(|c| c.config.epoch_start_block)
733            .unwrap_or_default();
734
735        let saved_proposals = self
736            .load_quorum_proposals()
737            .await
738            .context("loading saved proposals")?;
739
740        let upgrade_certificate = self
741            .load_upgrade_certificate()
742            .await
743            .context("loading upgrade certificate")?;
744
745        let start_epoch_info = self
746            .load_start_epoch_info()
747            .await
748            .context("loading start epoch info")?;
749
750        let state_cert = self
751            .load_state_cert()
752            .await
753            .context("loading light client state update certificate")?;
754
755        tracing::warn!(
756            ?leaf,
757            ?restart_view,
758            ?epoch,
759            ?high_qc,
760            ?validated_state,
761            ?state_cert,
762            "loaded consensus state"
763        );
764
765        Ok((
766            HotShotInitializer {
767                instance_state: state,
768                epoch_height,
769                epoch_start_block,
770                anchor_leaf: leaf,
771                anchor_state: Arc::new(validated_state),
772                anchor_state_delta: None,
773                start_view: restart_view,
774                start_epoch: epoch,
775                last_actioned_view: highest_voted_view,
776                saved_proposals,
777                high_qc,
778                next_epoch_high_qc,
779                decided_upgrade_certificate: upgrade_certificate,
780                undecided_leaves: Default::default(),
781                undecided_state: Default::default(),
782                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
783                start_epoch_info,
784                state_cert,
785            },
786            anchor_view,
787        ))
788    }
789
790    /// Update storage based on an event from consensus.
791    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
792        if let EventType::Decide {
793            leaf_chain,
794            committing_qc,
795            deciding_qc,
796            ..
797        } = &event.event
798        {
799            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
800                // No new leaves.
801                return;
802            };
803
804            // Associate each decided leaf with a QC.
805            let chain = leaf_chain.iter().zip(
806                // The first (most recent) leaf corresponds to the QC triggering the decide event.
807                std::iter::once((**committing_qc).clone())
808                    // Moving backwards in the chain, each leaf corresponds with the subsequent
809                    // leaf's justify QC.
810                    .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
811            );
812
813            if let Err(err) = self
814                .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
815                .await
816            {
817                tracing::error!(
818                    "failed to save decided leaves, chain may not be up to date: {err:#}"
819                );
820                return;
821            }
822        }
823    }
824
825    /// Append decided leaves to persistent storage and emit a corresponding event.
826    ///
827    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
828    /// up to and including `view`. If available in persistent storage, full block payloads and VID
829    /// info will also be included for each leaf.
830    ///
831    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
832    /// collected The consumer's handling of this event is a prerequisite for the completion of
833    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
834    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
835    /// will eventually be passed to the consumer.
836    ///
837    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
838    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
839    /// processed twice, or the consumer may get two events which share a subset of their data.
840    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
841    /// idempotent.
842    ///
843    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
844    /// until the next decide, at which point all persisted leaves from the failed GC run will be
845    /// included in the event along with subsequently decided leaves.
846    ///
847    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
848    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
849    /// storage to long-term archival storage.
850    async fn append_decided_leaves(
851        &self,
852        decided_view: ViewNumber,
853        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
854        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
855        consumer: &(impl EventConsumer + 'static),
856    ) -> anyhow::Result<()>;
857
858    async fn load_anchor_leaf(
859        &self,
860    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
861    async fn append_vid(
862        &self,
863        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
864    ) -> anyhow::Result<()>;
865    // TODO: merge these two `append_vid`s
866    async fn append_vid2(
867        &self,
868        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
869    ) -> anyhow::Result<()>;
870    async fn append_da(
871        &self,
872        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
873        vid_commit: VidCommitment,
874    ) -> anyhow::Result<()>;
875    async fn record_action(
876        &self,
877        view: ViewNumber,
878        epoch: Option<EpochNumber>,
879        action: HotShotAction,
880    ) -> anyhow::Result<()>;
881
882    async fn append_quorum_proposal2(
883        &self,
884        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
885    ) -> anyhow::Result<()>;
886
887    /// Update the current eQC in storage.
888    async fn store_eqc(
889        &self,
890        _high_qc: QuorumCertificate2<SeqTypes>,
891        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
892    ) -> anyhow::Result<()>;
893
894    /// Load the current eQC from storage.
895    async fn load_eqc(
896        &self,
897    ) -> Option<(
898        QuorumCertificate2<SeqTypes>,
899        NextEpochQuorumCertificate2<SeqTypes>,
900    )>;
901
902    async fn store_upgrade_certificate(
903        &self,
904        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
905    ) -> anyhow::Result<()>;
906
907    async fn migrate_stake_table_events(&self) -> anyhow::Result<()>;
908
909    async fn migrate_storage(&self) -> anyhow::Result<()> {
910        tracing::warn!("migrating consensus data...");
911
912        self.migrate_anchor_leaf().await?;
913        self.migrate_da_proposals().await?;
914        self.migrate_vid_shares().await?;
915        self.migrate_quorum_proposals().await?;
916        self.migrate_quorum_certificates().await?;
917
918        tracing::warn!("consensus storage has been migrated to new types");
919
920        tracing::warn!("migrating stake table events");
921        self.migrate_stake_table_events().await?;
922        tracing::warn!("stake table events have been migrated");
923
924        Ok(())
925    }
926
927    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
928    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
929    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
930    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
931    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
932
933    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
934        match self.load_anchor_leaf().await? {
935            Some((leaf, _)) => Ok(leaf.view_number()),
936            None => Ok(ViewNumber::genesis()),
937        }
938    }
939
940    async fn store_next_epoch_quorum_certificate(
941        &self,
942        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
943    ) -> anyhow::Result<()>;
944
945    async fn load_next_epoch_quorum_certificate(
946        &self,
947    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
948
949    async fn append_da2(
950        &self,
951        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
952        vid_commit: VidCommitment,
953    ) -> anyhow::Result<()>;
954
955    async fn append_proposal2(
956        &self,
957        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
958    ) -> anyhow::Result<()> {
959        self.append_quorum_proposal2(proposal).await
960    }
961
962    async fn store_drb_result(
963        &self,
964        epoch: <SeqTypes as NodeType>::Epoch,
965        drb_result: DrbResult,
966    ) -> anyhow::Result<()>;
967    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
968    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
969    async fn store_epoch_root(
970        &self,
971        epoch: <SeqTypes as NodeType>::Epoch,
972        block_header: <SeqTypes as NodeType>::BlockHeader,
973    ) -> anyhow::Result<()>;
974    async fn add_state_cert(
975        &self,
976        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
977    ) -> anyhow::Result<()>;
978
979    fn enable_metrics(&mut self, metrics: &dyn Metrics);
980}
981
982#[async_trait]
983pub trait EventConsumer: Debug + Send + Sync {
984    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
985}
986
987#[async_trait]
988impl<T> EventConsumer for Box<T>
989where
990    T: EventConsumer + ?Sized,
991{
992    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
993        (**self).handle_event(event).await
994    }
995}
996
997#[derive(Clone, Copy, Debug)]
998pub struct NullEventConsumer;
999
1000#[async_trait]
1001impl EventConsumer for NullEventConsumer {
1002    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
1003        Ok(())
1004    }
1005}
1006
1007#[async_trait]
1008impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1009    async fn append_vid(
1010        &self,
1011        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
1012    ) -> anyhow::Result<()> {
1013        (**self).append_vid(proposal).await
1014    }
1015
1016    async fn append_vid2(
1017        &self,
1018        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
1019    ) -> anyhow::Result<()> {
1020        (**self).append_vid2(proposal).await
1021    }
1022
1023    async fn append_da(
1024        &self,
1025        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1026        vid_commit: VidCommitment,
1027    ) -> anyhow::Result<()> {
1028        (**self).append_da(proposal, vid_commit).await
1029    }
1030
1031    async fn append_da2(
1032        &self,
1033        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1034        vid_commit: VidCommitment,
1035    ) -> anyhow::Result<()> {
1036        (**self).append_da2(proposal, vid_commit).await
1037    }
1038
1039    async fn record_action(
1040        &self,
1041        view: ViewNumber,
1042        epoch: Option<EpochNumber>,
1043        action: HotShotAction,
1044    ) -> anyhow::Result<()> {
1045        (**self).record_action(view, epoch, action).await
1046    }
1047
1048    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1049        Ok(())
1050    }
1051
1052    async fn append_proposal(
1053        &self,
1054        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1055    ) -> anyhow::Result<()> {
1056        (**self)
1057            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1058            .await
1059    }
1060
1061    async fn append_proposal2(
1062        &self,
1063        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1064    ) -> anyhow::Result<()> {
1065        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1066            convert_proposal(proposal.clone());
1067        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1068    }
1069
1070    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1071        Ok(())
1072    }
1073
1074    /// Update the current eQC in storage.
1075    async fn update_eqc(
1076        &self,
1077        high_qc: QuorumCertificate2<SeqTypes>,
1078        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1079    ) -> anyhow::Result<()> {
1080        if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1081            if high_qc.view_number() < existing_high_qc.view_number() {
1082                return Ok(());
1083            }
1084        }
1085
1086        (**self).store_eqc(high_qc, next_epoch_high_qc).await
1087    }
1088
1089    async fn update_next_epoch_high_qc2(
1090        &self,
1091        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1092    ) -> anyhow::Result<()> {
1093        Ok(())
1094    }
1095
1096    async fn update_decided_upgrade_certificate(
1097        &self,
1098        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1099    ) -> anyhow::Result<()> {
1100        (**self)
1101            .store_upgrade_certificate(decided_upgrade_certificate)
1102            .await
1103    }
1104
1105    async fn store_drb_result(
1106        &self,
1107        epoch: <SeqTypes as NodeType>::Epoch,
1108        drb_result: DrbResult,
1109    ) -> anyhow::Result<()> {
1110        (**self).store_drb_result(epoch, drb_result).await
1111    }
1112
1113    async fn store_epoch_root(
1114        &self,
1115        epoch: <SeqTypes as NodeType>::Epoch,
1116        block_header: <SeqTypes as NodeType>::BlockHeader,
1117    ) -> anyhow::Result<()> {
1118        (**self).store_epoch_root(epoch, block_header).await
1119    }
1120
1121    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1122        (**self).store_drb_input(drb_input).await
1123    }
1124
1125    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1126        (**self).load_drb_input(epoch).await
1127    }
1128
1129    async fn update_state_cert(
1130        &self,
1131        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1132    ) -> anyhow::Result<()> {
1133        (**self).add_state_cert(state_cert).await
1134    }
1135}
1136
1137/// Data that can be deserialized from a subslice of namespace payload bytes.
1138///
1139/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
1140/// namespace payload bytes to read.
1141pub trait FromNsPayloadBytes<'a> {
1142    /// Deserialize `Self` from namespace payload bytes.
1143    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1144}
1145
1146/// Specifies a subslice of namespace payload bytes to read.
1147///
1148/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
1149/// deserialized from that subslice of bytes.
1150pub trait NsPayloadBytesRange<'a> {
1151    type Output: FromNsPayloadBytes<'a>;
1152
1153    /// Range relative to this ns payload
1154    fn ns_payload_range(&self) -> Range<usize>;
1155}
1156
1157/// Types which can be deserialized from either integers or strings.
1158///
1159/// Some types can be represented as an integer or a string in human-readable formats like JSON or
1160/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
1161/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
1162/// to derive this user-friendly serialization.
1163///
1164/// These types are assumed to have an efficient representation as an integral type in Rust --
1165/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
1166/// encoding. With human readable encodings, serialization is always to a string.
1167pub trait FromStringOrInteger: Sized {
1168    type Binary: Serialize + DeserializeOwned;
1169    type Integer: Serialize + DeserializeOwned;
1170
1171    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1172    fn from_string(s: String) -> anyhow::Result<Self>;
1173    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1174
1175    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1176    fn to_string(&self) -> anyhow::Result<String>;
1177}