espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::U256;
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17    },
18    drb::{DrbInput, DrbResult},
19    event::{HotShotAction, LeafInfo},
20    message::{convert_proposal, Proposal},
21    simple_certificate::{
22        LightClientStateUpdateCertificate, NextEpochQuorumCertificate2, QuorumCertificate,
23        QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::HSStakeTable,
26    traits::{
27        metrics::Metrics,
28        node_implementation::{ConsensusTime, NodeType, Versions},
29        storage::Storage,
30        ValidatedState as HotShotState,
31    },
32    utils::genesis_epoch_from_version,
33};
34use serde::{de::DeserializeOwned, Serialize};
35
36use super::{
37    impls::NodeState,
38    utils::BackoffParams,
39    v0_1::{RewardAccount, RewardAccountProof, RewardMerkleCommitment},
40    v0_3::{EventKey, IndexedStake, StakeTableEvent},
41};
42use crate::{
43    v0::impls::ValidatedState, v0_3::ChainConfig, BlockMerkleTree, Event, FeeAccount,
44    FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig, SeqTypes, ValidatorMap,
45};
46
47#[async_trait]
48pub trait StateCatchup: Send + Sync {
49    /// Fetch the leaf at the given height without retrying on transient errors.
50    async fn try_fetch_leaf(
51        &self,
52        retry: usize,
53        height: u64,
54        stake_table: HSStakeTable<SeqTypes>,
55        success_threshold: U256,
56    ) -> anyhow::Result<Leaf2>;
57
58    /// Fetch the leaf at the given height, retrying on transient errors.
59    async fn fetch_leaf(
60        &self,
61        height: u64,
62        stake_table: HSStakeTable<SeqTypes>,
63        success_threshold: U256,
64    ) -> anyhow::Result<Leaf2> {
65        self.backoff()
66            .retry(self, |provider, retry| {
67                let stake_table_clone = stake_table.clone();
68                async move {
69                    provider
70                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
71                        .await
72                }
73                .boxed()
74            })
75            .await
76    }
77
78    /// Fetch the given list of accounts without retrying on transient errors.
79    async fn try_fetch_accounts(
80        &self,
81        retry: usize,
82        instance: &NodeState,
83        height: u64,
84        view: ViewNumber,
85        fee_merkle_tree_root: FeeMerkleCommitment,
86        accounts: &[FeeAccount],
87    ) -> anyhow::Result<Vec<FeeAccountProof>>;
88
89    /// Fetch the given list of accounts, retrying on transient errors.
90    async fn fetch_accounts(
91        &self,
92        instance: &NodeState,
93        height: u64,
94        view: ViewNumber,
95        fee_merkle_tree_root: FeeMerkleCommitment,
96        accounts: Vec<FeeAccount>,
97    ) -> anyhow::Result<Vec<FeeAccountProof>> {
98        self.backoff()
99            .retry(self, |provider, retry| {
100                let accounts = &accounts;
101                async move {
102                    provider
103                        .try_fetch_accounts(
104                            retry,
105                            instance,
106                            height,
107                            view,
108                            fee_merkle_tree_root,
109                            accounts,
110                        )
111                        .await
112                        .map_err(|err| {
113                            err.context(format!(
114                                "fetching accounts {accounts:?}, height {height}, view {view}"
115                            ))
116                        })
117                }
118                .boxed()
119            })
120            .await
121    }
122
123    /// Fetch and remember the blocks frontier without retrying on transient errors.
124    async fn try_remember_blocks_merkle_tree(
125        &self,
126        retry: usize,
127        instance: &NodeState,
128        height: u64,
129        view: ViewNumber,
130        mt: &mut BlockMerkleTree,
131    ) -> anyhow::Result<()>;
132
133    /// Fetch and remember the blocks frontier, retrying on transient errors.
134    async fn remember_blocks_merkle_tree(
135        &self,
136        instance: &NodeState,
137        height: u64,
138        view: ViewNumber,
139        mt: &mut BlockMerkleTree,
140    ) -> anyhow::Result<()> {
141        self.backoff()
142            .retry(mt, |mt, retry| {
143                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
144                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
145                    .boxed()
146            })
147            .await
148    }
149
150    /// Fetch the chain config without retrying on transient errors.
151    async fn try_fetch_chain_config(
152        &self,
153        retry: usize,
154        commitment: Commitment<ChainConfig>,
155    ) -> anyhow::Result<ChainConfig>;
156
157    /// Fetch the chain config, retrying on transient errors.
158    async fn fetch_chain_config(
159        &self,
160        commitment: Commitment<ChainConfig>,
161    ) -> anyhow::Result<ChainConfig> {
162        self.backoff()
163            .retry(self, |provider, retry| {
164                provider
165                    .try_fetch_chain_config(retry, commitment)
166                    .map_err(|err| err.context("fetching chain config"))
167                    .boxed()
168            })
169            .await
170    }
171
172    /// Fetch the given list of reward accounts without retrying on transient errors.
173    async fn try_fetch_reward_accounts(
174        &self,
175        retry: usize,
176        instance: &NodeState,
177        height: u64,
178        view: ViewNumber,
179        reward_merkle_tree_root: RewardMerkleCommitment,
180        accounts: &[RewardAccount],
181    ) -> anyhow::Result<Vec<RewardAccountProof>>;
182
183    /// Fetch the given list of reward accounts, retrying on transient errors.
184    async fn fetch_reward_accounts(
185        &self,
186        instance: &NodeState,
187        height: u64,
188        view: ViewNumber,
189        reward_merkle_tree_root: RewardMerkleCommitment,
190        accounts: Vec<RewardAccount>,
191    ) -> anyhow::Result<Vec<RewardAccountProof>> {
192        self.backoff()
193            .retry(self, |provider, retry| {
194                let accounts = &accounts;
195                async move {
196                    provider
197                        .try_fetch_reward_accounts(
198                            retry,
199                            instance,
200                            height,
201                            view,
202                            reward_merkle_tree_root,
203                            accounts,
204                        )
205                        .await
206                        .map_err(|err| {
207                            err.context(format!(
208                                "fetching reward accounts {accounts:?}, height {height}, view \
209                                 {view}"
210                            ))
211                        })
212                }
213                .boxed()
214            })
215            .await
216    }
217
218    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
219    fn is_local(&self) -> bool;
220
221    /// Returns the backoff parameters for the catchup provider.
222    fn backoff(&self) -> &BackoffParams;
223
224    /// Returns the name of the catchup provider.
225    fn name(&self) -> String;
226}
227
228#[async_trait]
229impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
230    async fn try_fetch_leaf(
231        &self,
232        retry: usize,
233        height: u64,
234        stake_table: HSStakeTable<SeqTypes>,
235        success_threshold: U256,
236    ) -> anyhow::Result<Leaf2> {
237        (**self)
238            .try_fetch_leaf(retry, height, stake_table, success_threshold)
239            .await
240    }
241
242    async fn fetch_leaf(
243        &self,
244        height: u64,
245        stake_table: HSStakeTable<SeqTypes>,
246        success_threshold: U256,
247    ) -> anyhow::Result<Leaf2> {
248        (**self)
249            .fetch_leaf(height, stake_table, success_threshold)
250            .await
251    }
252    async fn try_fetch_accounts(
253        &self,
254        retry: usize,
255        instance: &NodeState,
256        height: u64,
257        view: ViewNumber,
258        fee_merkle_tree_root: FeeMerkleCommitment,
259        accounts: &[FeeAccount],
260    ) -> anyhow::Result<Vec<FeeAccountProof>> {
261        (**self)
262            .try_fetch_accounts(
263                retry,
264                instance,
265                height,
266                view,
267                fee_merkle_tree_root,
268                accounts,
269            )
270            .await
271    }
272
273    async fn fetch_accounts(
274        &self,
275        instance: &NodeState,
276        height: u64,
277        view: ViewNumber,
278        fee_merkle_tree_root: FeeMerkleCommitment,
279        accounts: Vec<FeeAccount>,
280    ) -> anyhow::Result<Vec<FeeAccountProof>> {
281        (**self)
282            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
283            .await
284    }
285
286    async fn try_remember_blocks_merkle_tree(
287        &self,
288        retry: usize,
289        instance: &NodeState,
290        height: u64,
291        view: ViewNumber,
292        mt: &mut BlockMerkleTree,
293    ) -> anyhow::Result<()> {
294        (**self)
295            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
296            .await
297    }
298
299    async fn remember_blocks_merkle_tree(
300        &self,
301        instance: &NodeState,
302        height: u64,
303        view: ViewNumber,
304        mt: &mut BlockMerkleTree,
305    ) -> anyhow::Result<()> {
306        (**self)
307            .remember_blocks_merkle_tree(instance, height, view, mt)
308            .await
309    }
310
311    async fn try_fetch_chain_config(
312        &self,
313        retry: usize,
314        commitment: Commitment<ChainConfig>,
315    ) -> anyhow::Result<ChainConfig> {
316        (**self).try_fetch_chain_config(retry, commitment).await
317    }
318
319    async fn fetch_chain_config(
320        &self,
321        commitment: Commitment<ChainConfig>,
322    ) -> anyhow::Result<ChainConfig> {
323        (**self).fetch_chain_config(commitment).await
324    }
325
326    async fn try_fetch_reward_accounts(
327        &self,
328        retry: usize,
329        instance: &NodeState,
330        height: u64,
331        view: ViewNumber,
332        reward_merkle_tree_root: RewardMerkleCommitment,
333        accounts: &[RewardAccount],
334    ) -> anyhow::Result<Vec<RewardAccountProof>> {
335        (**self)
336            .try_fetch_reward_accounts(
337                retry,
338                instance,
339                height,
340                view,
341                reward_merkle_tree_root,
342                accounts,
343            )
344            .await
345    }
346
347    async fn fetch_reward_accounts(
348        &self,
349        instance: &NodeState,
350        height: u64,
351        view: ViewNumber,
352        reward_merkle_tree_root: RewardMerkleCommitment,
353        accounts: Vec<RewardAccount>,
354    ) -> anyhow::Result<Vec<RewardAccountProof>> {
355        (**self)
356            .fetch_reward_accounts(instance, height, view, reward_merkle_tree_root, accounts)
357            .await
358    }
359
360    fn backoff(&self) -> &BackoffParams {
361        (**self).backoff()
362    }
363
364    fn name(&self) -> String {
365        (**self).name()
366    }
367
368    fn is_local(&self) -> bool {
369        (**self).is_local()
370    }
371}
372
373#[async_trait]
374pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
375    type Persistence: SequencerPersistence + MembershipPersistence;
376
377    fn set_view_retention(&mut self, view_retention: u64);
378    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
379    async fn reset(self) -> anyhow::Result<()>;
380}
381
382/// Determine the read state based on the queried block range.
383// - If the persistence returned events up to the requested block, the read is complete.
384/// - Otherwise, indicate that the read is up to the last processed block.
385pub enum EventsPersistenceRead {
386    Complete,
387    UntilL1Block(u64),
388}
389
390#[async_trait]
391/// Trait used by `Memberships` implementations to interact with persistence layer.
392pub trait MembershipPersistence: Send + Sync + 'static {
393    /// Load stake table for epoch from storage
394    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<ValidatorMap>>;
395
396    /// Load stake tables for storage for latest `n` known epochs
397    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
398
399    /// Store stake table at `epoch` in the persistence layer
400    async fn store_stake(&self, epoch: EpochNumber, stake: ValidatorMap) -> anyhow::Result<()>;
401
402    async fn store_events(
403        &self,
404        l1_finalized: u64,
405        events: Vec<(EventKey, StakeTableEvent)>,
406    ) -> anyhow::Result<()>;
407    async fn load_events(
408        &self,
409        l1_finalized: u64,
410    ) -> anyhow::Result<(
411        Option<EventsPersistenceRead>,
412        Vec<(EventKey, StakeTableEvent)>,
413    )>;
414}
415
416#[async_trait]
417pub trait SequencerPersistence:
418    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage
419{
420    /// Use this storage as a state catchup backend, if supported.
421    fn into_catchup_provider(
422        self,
423        _backoff: BackoffParams,
424    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
425        bail!("state catchup is not implemented for this persistence type");
426    }
427
428    /// Load the orchestrator config from storage.
429    ///
430    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
431    /// `Err` if it could not be determined whether a config exists or not.
432    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
433
434    /// Save the orchestrator config to storage.
435    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
436
437    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
438    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
439
440    /// Load the view to restart from.
441    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
442
443    /// Load the proposals saved by consensus
444    async fn load_quorum_proposals(
445        &self,
446    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
447
448    async fn load_quorum_proposal(
449        &self,
450        view: ViewNumber,
451    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
452
453    async fn load_vid_share(
454        &self,
455        view: ViewNumber,
456    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
457    async fn load_da_proposal(
458        &self,
459        view: ViewNumber,
460    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
461    async fn load_upgrade_certificate(
462        &self,
463    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
464    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
465    async fn load_state_cert(
466        &self,
467    ) -> anyhow::Result<Option<LightClientStateUpdateCertificate<SeqTypes>>>;
468
469    /// Load the latest known consensus state.
470    ///
471    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
472    /// if there is no saved state). Also returns the anchor view number, which can be used as a
473    /// reference point to process any events which were not processed before a previous shutdown,
474    /// if applicable,.
475    async fn load_consensus_state<V: Versions>(
476        &self,
477        state: NodeState,
478    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
479        let genesis_validated_state = ValidatedState::genesis(&state).0;
480        let highest_voted_view = match self
481            .load_latest_acted_view()
482            .await
483            .context("loading last voted view")?
484        {
485            Some(view) => {
486                tracing::info!(?view, "starting with last actioned view");
487                view
488            },
489            None => {
490                tracing::info!("no saved view, starting from genesis");
491                ViewNumber::genesis()
492            },
493        };
494
495        let restart_view = match self
496            .load_restart_view()
497            .await
498            .context("loading restart view")?
499        {
500            Some(view) => {
501                tracing::info!(?view, "starting from saved view");
502                view
503            },
504            None => {
505                tracing::info!("no saved view, starting from genesis");
506                ViewNumber::genesis()
507            },
508        };
509        let next_epoch_high_qc = self
510            .load_next_epoch_quorum_certificate()
511            .await
512            .context("loading next epoch qc")?;
513        let (leaf, high_qc, anchor_view) = match self
514            .load_anchor_leaf()
515            .await
516            .context("loading anchor leaf")?
517        {
518            Some((leaf, high_qc)) => {
519                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
520                ensure!(
521                    leaf.view_number() == high_qc.view_number,
522                    format!(
523                        "loaded anchor leaf from view {}, but high QC is from view {}",
524                        leaf.view_number(),
525                        high_qc.view_number
526                    )
527                );
528
529                let anchor_view = leaf.view_number();
530                (leaf, high_qc, Some(anchor_view))
531            },
532            None => {
533                tracing::info!("no saved leaf, starting from genesis leaf");
534                (
535                    hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
536                        .await,
537                    QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
538                    None,
539                )
540            },
541        };
542        let validated_state = if leaf.block_header().height() == 0 {
543            // If we are starting from genesis, we can provide the full state.
544            genesis_validated_state
545        } else {
546            // Otherwise, we will have to construct a sparse state and fetch missing data during
547            // catchup.
548            ValidatedState::from_header(leaf.block_header())
549        };
550
551        // If we are not starting from genesis, we start from the view following the maximum view
552        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
553        // starting in a view in which we had already voted before the restart, and prevents
554        // unnecessary catchup from starting in a view earlier than the anchor leaf.
555        let restart_view = max(restart_view, leaf.view_number());
556        // TODO:
557        let epoch = genesis_epoch_from_version::<V, SeqTypes>();
558
559        let config = self.load_config().await.context("loading config")?;
560        let epoch_height = config
561            .as_ref()
562            .map(|c| c.config.epoch_height)
563            .unwrap_or_default();
564        let epoch_start_block = config
565            .as_ref()
566            .map(|c| c.config.epoch_start_block)
567            .unwrap_or_default();
568
569        let saved_proposals = self
570            .load_quorum_proposals()
571            .await
572            .context("loading saved proposals")?;
573
574        let upgrade_certificate = self
575            .load_upgrade_certificate()
576            .await
577            .context("loading upgrade certificate")?;
578
579        let start_epoch_info = self
580            .load_start_epoch_info()
581            .await
582            .context("loading start epoch info")?;
583
584        let state_cert = self
585            .load_state_cert()
586            .await
587            .context("loading light client state update certificate")?;
588
589        tracing::info!(
590            ?leaf,
591            ?restart_view,
592            ?epoch,
593            ?high_qc,
594            ?validated_state,
595            ?state_cert,
596            "loaded consensus state"
597        );
598
599        Ok((
600            HotShotInitializer {
601                instance_state: state,
602                epoch_height,
603                epoch_start_block,
604                anchor_leaf: leaf,
605                anchor_state: Arc::new(validated_state),
606                anchor_state_delta: None,
607                start_view: restart_view,
608                start_epoch: epoch,
609                last_actioned_view: highest_voted_view,
610                saved_proposals,
611                high_qc,
612                next_epoch_high_qc,
613                decided_upgrade_certificate: upgrade_certificate,
614                undecided_leaves: Default::default(),
615                undecided_state: Default::default(),
616                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
617                start_epoch_info,
618                state_cert,
619            },
620            anchor_view,
621        ))
622    }
623
624    /// Update storage based on an event from consensus.
625    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
626        if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
627            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
628                // No new leaves.
629                return;
630            };
631
632            // Associate each decided leaf with a QC.
633            let chain = leaf_chain.iter().zip(
634                // The first (most recent) leaf corresponds to the QC triggering the decide event.
635                std::iter::once((**qc).clone())
636                    // Moving backwards in the chain, each leaf corresponds with the subsequent
637                    // leaf's justify QC.
638                    .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
639            );
640
641            if let Err(err) = self
642                .append_decided_leaves(leaf.view_number(), chain, consumer)
643                .await
644            {
645                tracing::error!(
646                    "failed to save decided leaves, chain may not be up to date: {err:#}"
647                );
648                return;
649            }
650        }
651    }
652
653    /// Append decided leaves to persistent storage and emit a corresponding event.
654    ///
655    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
656    /// up to and including `view`. If available in persistent storage, full block payloads and VID
657    /// info will also be included for each leaf.
658    ///
659    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
660    /// collected The consumer's handling of this event is a prerequisite for the completion of
661    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
662    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
663    /// will eventually be passed to the consumer.
664    ///
665    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
666    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
667    /// processed twice, or the consumer may get two events which share a subset of their data.
668    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
669    /// idempotent.
670    ///
671    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
672    /// until the next decide, at which point all persisted leaves from the failed GC run will be
673    /// included in the event along with subsequently decided leaves.
674    ///
675    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
676    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
677    /// storage to long-term archival storage.
678    async fn append_decided_leaves(
679        &self,
680        decided_view: ViewNumber,
681        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
682        consumer: &(impl EventConsumer + 'static),
683    ) -> anyhow::Result<()>;
684
685    async fn load_anchor_leaf(
686        &self,
687    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
688    async fn append_vid(
689        &self,
690        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
691    ) -> anyhow::Result<()>;
692    // TODO: merge these two `append_vid`s
693    async fn append_vid2(
694        &self,
695        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
696    ) -> anyhow::Result<()>;
697    async fn append_da(
698        &self,
699        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
700        vid_commit: VidCommitment,
701    ) -> anyhow::Result<()>;
702    async fn record_action(
703        &self,
704        view: ViewNumber,
705        epoch: Option<EpochNumber>,
706        action: HotShotAction,
707    ) -> anyhow::Result<()>;
708
709    async fn append_quorum_proposal2(
710        &self,
711        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
712    ) -> anyhow::Result<()>;
713    async fn store_upgrade_certificate(
714        &self,
715        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
716    ) -> anyhow::Result<()>;
717    async fn migrate_consensus(&self) -> anyhow::Result<()> {
718        tracing::warn!("migrating consensus data...");
719
720        self.migrate_anchor_leaf().await?;
721        self.migrate_da_proposals().await?;
722        self.migrate_vid_shares().await?;
723        self.migrate_quorum_proposals().await?;
724        self.migrate_quorum_certificates().await?;
725
726        tracing::warn!("consensus storage has been migrated to new types");
727
728        Ok(())
729    }
730
731    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
732    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
733    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
734    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
735    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
736
737    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
738        match self.load_anchor_leaf().await? {
739            Some((leaf, _)) => Ok(leaf.view_number()),
740            None => Ok(ViewNumber::genesis()),
741        }
742    }
743
744    async fn store_next_epoch_quorum_certificate(
745        &self,
746        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
747    ) -> anyhow::Result<()>;
748
749    async fn load_next_epoch_quorum_certificate(
750        &self,
751    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
752
753    async fn append_da2(
754        &self,
755        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
756        vid_commit: VidCommitment,
757    ) -> anyhow::Result<()>;
758
759    async fn append_proposal2(
760        &self,
761        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
762    ) -> anyhow::Result<()> {
763        self.append_quorum_proposal2(proposal).await
764    }
765
766    async fn store_drb_result(
767        &self,
768        epoch: <SeqTypes as NodeType>::Epoch,
769        drb_result: DrbResult,
770    ) -> anyhow::Result<()>;
771    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
772    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
773    async fn store_epoch_root(
774        &self,
775        epoch: <SeqTypes as NodeType>::Epoch,
776        block_header: <SeqTypes as NodeType>::BlockHeader,
777    ) -> anyhow::Result<()>;
778    async fn add_state_cert(
779        &self,
780        state_cert: LightClientStateUpdateCertificate<SeqTypes>,
781    ) -> anyhow::Result<()>;
782
783    fn enable_metrics(&mut self, metrics: &dyn Metrics);
784}
785
786#[async_trait]
787pub trait EventConsumer: Debug + Send + Sync {
788    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
789}
790
791#[async_trait]
792impl<T> EventConsumer for Box<T>
793where
794    T: EventConsumer + ?Sized,
795{
796    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
797        (**self).handle_event(event).await
798    }
799}
800
801#[derive(Clone, Copy, Debug)]
802pub struct NullEventConsumer;
803
804#[async_trait]
805impl EventConsumer for NullEventConsumer {
806    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
807        Ok(())
808    }
809}
810
811#[async_trait]
812impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
813    async fn append_vid(
814        &self,
815        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
816    ) -> anyhow::Result<()> {
817        (**self).append_vid(proposal).await
818    }
819
820    async fn append_vid2(
821        &self,
822        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
823    ) -> anyhow::Result<()> {
824        (**self).append_vid2(proposal).await
825    }
826
827    async fn append_da(
828        &self,
829        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
830        vid_commit: VidCommitment,
831    ) -> anyhow::Result<()> {
832        (**self).append_da(proposal, vid_commit).await
833    }
834
835    async fn append_da2(
836        &self,
837        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
838        vid_commit: VidCommitment,
839    ) -> anyhow::Result<()> {
840        (**self).append_da2(proposal, vid_commit).await
841    }
842
843    async fn record_action(
844        &self,
845        view: ViewNumber,
846        epoch: Option<EpochNumber>,
847        action: HotShotAction,
848    ) -> anyhow::Result<()> {
849        (**self).record_action(view, epoch, action).await
850    }
851
852    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
853        Ok(())
854    }
855
856    async fn append_proposal(
857        &self,
858        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
859    ) -> anyhow::Result<()> {
860        (**self)
861            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
862            .await
863    }
864
865    async fn append_proposal2(
866        &self,
867        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
868    ) -> anyhow::Result<()> {
869        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
870            convert_proposal(proposal.clone());
871        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
872    }
873
874    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
875        Ok(())
876    }
877
878    async fn update_decided_upgrade_certificate(
879        &self,
880        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
881    ) -> anyhow::Result<()> {
882        (**self)
883            .store_upgrade_certificate(decided_upgrade_certificate)
884            .await
885    }
886
887    async fn store_drb_result(
888        &self,
889        epoch: <SeqTypes as NodeType>::Epoch,
890        drb_result: DrbResult,
891    ) -> anyhow::Result<()> {
892        (**self).store_drb_result(epoch, drb_result).await
893    }
894
895    async fn store_epoch_root(
896        &self,
897        epoch: <SeqTypes as NodeType>::Epoch,
898        block_header: <SeqTypes as NodeType>::BlockHeader,
899    ) -> anyhow::Result<()> {
900        (**self).store_epoch_root(epoch, block_header).await
901    }
902
903    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
904        (**self).store_drb_input(drb_input).await
905    }
906
907    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
908        (**self).load_drb_input(epoch).await
909    }
910
911    async fn update_state_cert(
912        &self,
913        state_cert: LightClientStateUpdateCertificate<SeqTypes>,
914    ) -> anyhow::Result<()> {
915        (**self).add_state_cert(state_cert).await
916    }
917}
918
919/// Data that can be deserialized from a subslice of namespace payload bytes.
920///
921/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
922/// namespace payload bytes to read.
923pub trait FromNsPayloadBytes<'a> {
924    /// Deserialize `Self` from namespace payload bytes.
925    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
926}
927
928/// Specifies a subslice of namespace payload bytes to read.
929///
930/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
931/// deserialized from that subslice of bytes.
932pub trait NsPayloadBytesRange<'a> {
933    type Output: FromNsPayloadBytes<'a>;
934
935    /// Range relative to this ns payload
936    fn ns_payload_range(&self) -> Range<usize>;
937}
938
939/// Types which can be deserialized from either integers or strings.
940///
941/// Some types can be represented as an integer or a string in human-readable formats like JSON or
942/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
943/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
944/// to derive this user-friendly serialization.
945///
946/// These types are assumed to have an efficient representation as an integral type in Rust --
947/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
948/// encoding. With human readable encodings, serialization is always to a string.
949pub trait FromStringOrInteger: Sized {
950    type Binary: Serialize + DeserializeOwned;
951    type Integer: Serialize + DeserializeOwned;
952
953    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
954    fn from_string(s: String) -> anyhow::Result<Self>;
955    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
956
957    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
958    fn to_string(&self) -> anyhow::Result<String>;
959}