espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::U256;
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{
11    types::{BLSPubKey, EventType},
12    HotShotInitializer, InitializerEpochInfo,
13};
14use hotshot_types::{
15    data::{
16        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
17        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
18        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
19    },
20    drb::DrbResult,
21    event::{HotShotAction, LeafInfo},
22    message::{convert_proposal, Proposal},
23    simple_certificate::{
24        LightClientStateUpdateCertificate, NextEpochQuorumCertificate2, QuorumCertificate,
25        QuorumCertificate2, UpgradeCertificate,
26    },
27    stake_table::HSStakeTable,
28    traits::{
29        node_implementation::{ConsensusTime, NodeType, Versions},
30        storage::Storage,
31        ValidatedState as HotShotState,
32    },
33    utils::genesis_epoch_from_version,
34};
35use indexmap::IndexMap;
36use serde::{de::DeserializeOwned, Serialize};
37
38use super::{
39    impls::NodeState,
40    utils::BackoffParams,
41    v0_1::{RewardAccount, RewardAccountProof, RewardMerkleCommitment},
42    v0_3::{EventKey, IndexedStake, StakeTableEvent, Validator},
43};
44use crate::{
45    v0::impls::ValidatedState, v0_99::ChainConfig, BlockMerkleTree, Event, FeeAccount,
46    FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig, SeqTypes,
47};
48
49#[async_trait]
50pub trait StateCatchup: Send + Sync {
51    /// Fetch the leaf at the given height without retrying on transient errors.
52    async fn try_fetch_leaf(
53        &self,
54        retry: usize,
55        height: u64,
56        stake_table: HSStakeTable<SeqTypes>,
57        success_threshold: U256,
58    ) -> anyhow::Result<Leaf2>;
59
60    /// Fetch the leaf at the given height, retrying on transient errors.
61    async fn fetch_leaf(
62        &self,
63        height: u64,
64        stake_table: HSStakeTable<SeqTypes>,
65        success_threshold: U256,
66    ) -> anyhow::Result<Leaf2> {
67        self.backoff()
68            .retry(self, |provider, retry| {
69                let stake_table_clone = stake_table.clone();
70                async move {
71                    provider
72                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
73                        .await
74                }
75                .boxed()
76            })
77            .await
78    }
79
80    /// Fetch the given list of accounts without retrying on transient errors.
81    async fn try_fetch_accounts(
82        &self,
83        retry: usize,
84        instance: &NodeState,
85        height: u64,
86        view: ViewNumber,
87        fee_merkle_tree_root: FeeMerkleCommitment,
88        accounts: &[FeeAccount],
89    ) -> anyhow::Result<Vec<FeeAccountProof>>;
90
91    /// Fetch the given list of accounts, retrying on transient errors.
92    async fn fetch_accounts(
93        &self,
94        instance: &NodeState,
95        height: u64,
96        view: ViewNumber,
97        fee_merkle_tree_root: FeeMerkleCommitment,
98        accounts: Vec<FeeAccount>,
99    ) -> anyhow::Result<Vec<FeeAccountProof>> {
100        self.backoff()
101            .retry(self, |provider, retry| {
102                let accounts = &accounts;
103                async move {
104                    provider
105                        .try_fetch_accounts(
106                            retry,
107                            instance,
108                            height,
109                            view,
110                            fee_merkle_tree_root,
111                            accounts,
112                        )
113                        .await
114                        .map_err(|err| {
115                            err.context(format!(
116                                "fetching accounts {accounts:?}, height {height}, view {view:?}"
117                            ))
118                        })
119                }
120                .boxed()
121            })
122            .await
123    }
124
125    /// Fetch and remember the blocks frontier without retrying on transient errors.
126    async fn try_remember_blocks_merkle_tree(
127        &self,
128        retry: usize,
129        instance: &NodeState,
130        height: u64,
131        view: ViewNumber,
132        mt: &mut BlockMerkleTree,
133    ) -> anyhow::Result<()>;
134
135    /// Fetch and remember the blocks frontier, retrying on transient errors.
136    async fn remember_blocks_merkle_tree(
137        &self,
138        instance: &NodeState,
139        height: u64,
140        view: ViewNumber,
141        mt: &mut BlockMerkleTree,
142    ) -> anyhow::Result<()> {
143        self.backoff()
144            .retry(mt, |mt, retry| {
145                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
146                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
147                    .boxed()
148            })
149            .await
150    }
151
152    /// Fetch the chain config without retrying on transient errors.
153    async fn try_fetch_chain_config(
154        &self,
155        retry: usize,
156        commitment: Commitment<ChainConfig>,
157    ) -> anyhow::Result<ChainConfig>;
158
159    /// Fetch the chain config, retrying on transient errors.
160    async fn fetch_chain_config(
161        &self,
162        commitment: Commitment<ChainConfig>,
163    ) -> anyhow::Result<ChainConfig> {
164        self.backoff()
165            .retry(self, |provider, retry| {
166                provider
167                    .try_fetch_chain_config(retry, commitment)
168                    .map_err(|err| err.context("fetching chain config"))
169                    .boxed()
170            })
171            .await
172    }
173
174    /// Fetch the given list of reward accounts without retrying on transient errors.
175    async fn try_fetch_reward_accounts(
176        &self,
177        retry: usize,
178        instance: &NodeState,
179        height: u64,
180        view: ViewNumber,
181        reward_merkle_tree_root: RewardMerkleCommitment,
182        accounts: &[RewardAccount],
183    ) -> anyhow::Result<Vec<RewardAccountProof>>;
184
185    /// Fetch the given list of reward accounts, retrying on transient errors.
186    async fn fetch_reward_accounts(
187        &self,
188        instance: &NodeState,
189        height: u64,
190        view: ViewNumber,
191        reward_merkle_tree_root: RewardMerkleCommitment,
192        accounts: Vec<RewardAccount>,
193    ) -> anyhow::Result<Vec<RewardAccountProof>> {
194        self.backoff()
195            .retry(self, |provider, retry| {
196                let accounts = &accounts;
197                async move {
198                    provider
199                        .try_fetch_reward_accounts(
200                            retry,
201                            instance,
202                            height,
203                            view,
204                            reward_merkle_tree_root,
205                            accounts,
206                        )
207                        .await
208                        .map_err(|err| {
209                            err.context(format!(
210                                "fetching reward accounts {accounts:?}, height {height}, view {view:?}"
211                            ))
212                        })
213                }
214                .boxed()
215            })
216            .await
217    }
218
219    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
220    fn is_local(&self) -> bool;
221
222    /// Returns the backoff parameters for the catchup provider.
223    fn backoff(&self) -> &BackoffParams;
224
225    /// Returns the name of the catchup provider.
226    fn name(&self) -> String;
227}
228
229#[async_trait]
230impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
231    async fn try_fetch_leaf(
232        &self,
233        retry: usize,
234        height: u64,
235        stake_table: HSStakeTable<SeqTypes>,
236        success_threshold: U256,
237    ) -> anyhow::Result<Leaf2> {
238        (**self)
239            .try_fetch_leaf(retry, height, stake_table, success_threshold)
240            .await
241    }
242
243    async fn fetch_leaf(
244        &self,
245        height: u64,
246        stake_table: HSStakeTable<SeqTypes>,
247        success_threshold: U256,
248    ) -> anyhow::Result<Leaf2> {
249        (**self)
250            .fetch_leaf(height, stake_table, success_threshold)
251            .await
252    }
253    async fn try_fetch_accounts(
254        &self,
255        retry: usize,
256        instance: &NodeState,
257        height: u64,
258        view: ViewNumber,
259        fee_merkle_tree_root: FeeMerkleCommitment,
260        accounts: &[FeeAccount],
261    ) -> anyhow::Result<Vec<FeeAccountProof>> {
262        (**self)
263            .try_fetch_accounts(
264                retry,
265                instance,
266                height,
267                view,
268                fee_merkle_tree_root,
269                accounts,
270            )
271            .await
272    }
273
274    async fn fetch_accounts(
275        &self,
276        instance: &NodeState,
277        height: u64,
278        view: ViewNumber,
279        fee_merkle_tree_root: FeeMerkleCommitment,
280        accounts: Vec<FeeAccount>,
281    ) -> anyhow::Result<Vec<FeeAccountProof>> {
282        (**self)
283            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
284            .await
285    }
286
287    async fn try_remember_blocks_merkle_tree(
288        &self,
289        retry: usize,
290        instance: &NodeState,
291        height: u64,
292        view: ViewNumber,
293        mt: &mut BlockMerkleTree,
294    ) -> anyhow::Result<()> {
295        (**self)
296            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
297            .await
298    }
299
300    async fn remember_blocks_merkle_tree(
301        &self,
302        instance: &NodeState,
303        height: u64,
304        view: ViewNumber,
305        mt: &mut BlockMerkleTree,
306    ) -> anyhow::Result<()> {
307        (**self)
308            .remember_blocks_merkle_tree(instance, height, view, mt)
309            .await
310    }
311
312    async fn try_fetch_chain_config(
313        &self,
314        retry: usize,
315        commitment: Commitment<ChainConfig>,
316    ) -> anyhow::Result<ChainConfig> {
317        (**self).try_fetch_chain_config(retry, commitment).await
318    }
319
320    async fn fetch_chain_config(
321        &self,
322        commitment: Commitment<ChainConfig>,
323    ) -> anyhow::Result<ChainConfig> {
324        (**self).fetch_chain_config(commitment).await
325    }
326
327    async fn try_fetch_reward_accounts(
328        &self,
329        retry: usize,
330        instance: &NodeState,
331        height: u64,
332        view: ViewNumber,
333        reward_merkle_tree_root: RewardMerkleCommitment,
334        accounts: &[RewardAccount],
335    ) -> anyhow::Result<Vec<RewardAccountProof>> {
336        (**self)
337            .try_fetch_reward_accounts(
338                retry,
339                instance,
340                height,
341                view,
342                reward_merkle_tree_root,
343                accounts,
344            )
345            .await
346    }
347
348    async fn fetch_reward_accounts(
349        &self,
350        instance: &NodeState,
351        height: u64,
352        view: ViewNumber,
353        reward_merkle_tree_root: RewardMerkleCommitment,
354        accounts: Vec<RewardAccount>,
355    ) -> anyhow::Result<Vec<RewardAccountProof>> {
356        (**self)
357            .fetch_reward_accounts(instance, height, view, reward_merkle_tree_root, accounts)
358            .await
359    }
360
361    fn backoff(&self) -> &BackoffParams {
362        (**self).backoff()
363    }
364
365    fn name(&self) -> String {
366        (**self).name()
367    }
368
369    fn is_local(&self) -> bool {
370        (**self).is_local()
371    }
372}
373
374#[async_trait]
375pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
376    type Persistence: SequencerPersistence + MembershipPersistence;
377
378    fn set_view_retention(&mut self, view_retention: u64);
379    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
380    async fn reset(self) -> anyhow::Result<()>;
381}
382
383#[async_trait]
384/// Trait used by `Memberships` implementations to interact with persistence layer.
385pub trait MembershipPersistence: Send + Sync + 'static {
386    /// Load stake table for epoch from storage
387    async fn load_stake(
388        &self,
389        epoch: EpochNumber,
390    ) -> anyhow::Result<Option<IndexMap<alloy::primitives::Address, Validator<BLSPubKey>>>>;
391
392    /// Load stake tables for storage for latest `n` known epochs
393    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
394
395    /// Store stake table at `epoch` in the persistence layer
396    async fn store_stake(
397        &self,
398        epoch: EpochNumber,
399        stake: IndexMap<alloy::primitives::Address, Validator<BLSPubKey>>,
400    ) -> anyhow::Result<()>;
401
402    async fn store_events(
403        &self,
404        l1_block: u64,
405        events: Vec<(EventKey, StakeTableEvent)>,
406    ) -> anyhow::Result<()>;
407    async fn load_events(&self) -> anyhow::Result<Option<(u64, Vec<(EventKey, StakeTableEvent)>)>>;
408}
409
410#[async_trait]
411pub trait SequencerPersistence: Sized + Send + Sync + Clone + 'static {
412    /// Use this storage as a state catchup backend, if supported.
413    fn into_catchup_provider(
414        self,
415        _backoff: BackoffParams,
416    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
417        bail!("state catchup is not implemented for this persistence type");
418    }
419
420    /// Load the orchestrator config from storage.
421    ///
422    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
423    /// `Err` if it could not be determined whether a config exists or not.
424    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
425
426    /// Save the orchestrator config to storage.
427    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
428
429    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
430    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
431
432    /// Load the proposals saved by consensus
433    async fn load_quorum_proposals(
434        &self,
435    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
436
437    async fn load_quorum_proposal(
438        &self,
439        view: ViewNumber,
440    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
441
442    async fn load_vid_share(
443        &self,
444        view: ViewNumber,
445    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
446    async fn load_da_proposal(
447        &self,
448        view: ViewNumber,
449    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
450    async fn load_upgrade_certificate(
451        &self,
452    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
453    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
454    async fn load_state_cert(
455        &self,
456    ) -> anyhow::Result<Option<LightClientStateUpdateCertificate<SeqTypes>>>;
457
458    /// Load the latest known consensus state.
459    ///
460    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
461    /// if there is no saved state). Also returns the anchor view number, which can be used as a
462    /// reference point to process any events which were not processed before a previous shutdown,
463    /// if applicable,.
464    async fn load_consensus_state<V: Versions>(
465        &self,
466        state: NodeState,
467    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
468        let genesis_validated_state = ValidatedState::genesis(&state).0;
469        let highest_voted_view = match self
470            .load_latest_acted_view()
471            .await
472            .context("loading last voted view")?
473        {
474            Some(view) => {
475                tracing::info!(?view, "starting from saved view");
476                view
477            },
478            None => {
479                tracing::info!("no saved view, starting from genesis");
480                ViewNumber::genesis()
481            },
482        };
483
484        let next_epoch_high_qc = self
485            .load_next_epoch_quorum_certificate()
486            .await
487            .context("loading next epoch qc")?;
488        let (leaf, high_qc, anchor_view) = match self
489            .load_anchor_leaf()
490            .await
491            .context("loading anchor leaf")?
492        {
493            Some((leaf, high_qc)) => {
494                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
495                ensure!(
496                    leaf.view_number() == high_qc.view_number,
497                    format!(
498                        "loaded anchor leaf from view {:?}, but high QC is from view {:?}",
499                        leaf.view_number(),
500                        high_qc.view_number
501                    )
502                );
503
504                let anchor_view = leaf.view_number();
505                (leaf, high_qc, Some(anchor_view))
506            },
507            None => {
508                tracing::info!("no saved leaf, starting from genesis leaf");
509                (
510                    hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
511                        .await,
512                    QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
513                    None,
514                )
515            },
516        };
517        let validated_state = if leaf.block_header().height() == 0 {
518            // If we are starting from genesis, we can provide the full state.
519            genesis_validated_state
520        } else {
521            // Otherwise, we will have to construct a sparse state and fetch missing data during
522            // catchup.
523            ValidatedState::from_header(leaf.block_header())
524        };
525
526        // If we are not starting from genesis, we start from the view following the maximum view
527        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
528        // starting in a view in which we had already voted before the restart, and prevents
529        // unnecessary catchup from starting in a view earlier than the anchor leaf.
530        let view = max(highest_voted_view, leaf.view_number());
531        // TODO:
532        let epoch = genesis_epoch_from_version::<V, SeqTypes>();
533
534        let config = self.load_config().await.context("loading config")?;
535        let epoch_height = config
536            .as_ref()
537            .map(|c| c.config.epoch_height)
538            .unwrap_or_default();
539        let epoch_start_block = config
540            .as_ref()
541            .map(|c| c.config.epoch_start_block)
542            .unwrap_or_default();
543
544        let saved_proposals = self
545            .load_quorum_proposals()
546            .await
547            .context("loading saved proposals")?;
548
549        let upgrade_certificate = self
550            .load_upgrade_certificate()
551            .await
552            .context("loading upgrade certificate")?;
553
554        let start_epoch_info = self
555            .load_start_epoch_info()
556            .await
557            .context("loading start epoch info")?;
558
559        let state_cert = self
560            .load_state_cert()
561            .await
562            .context("loading light client state update certificate")?;
563
564        tracing::info!(
565            ?leaf,
566            ?view,
567            ?epoch,
568            ?high_qc,
569            ?validated_state,
570            ?state_cert,
571            "loaded consensus state"
572        );
573
574        Ok((
575            HotShotInitializer {
576                instance_state: state,
577                epoch_height,
578                epoch_start_block,
579                anchor_leaf: leaf,
580                anchor_state: Arc::new(validated_state),
581                anchor_state_delta: None,
582                start_view: view,
583                start_epoch: epoch,
584                last_actioned_view: highest_voted_view,
585                saved_proposals,
586                high_qc,
587                next_epoch_high_qc,
588                decided_upgrade_certificate: upgrade_certificate,
589                undecided_leaves: Default::default(),
590                undecided_state: Default::default(),
591                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
592                start_epoch_info,
593                state_cert,
594            },
595            anchor_view,
596        ))
597    }
598
599    /// Update storage based on an event from consensus.
600    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
601        if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
602            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
603                // No new leaves.
604                return;
605            };
606
607            // Associate each decided leaf with a QC.
608            let chain = leaf_chain.iter().zip(
609                // The first (most recent) leaf corresponds to the QC triggering the decide event.
610                std::iter::once((**qc).clone())
611                    // Moving backwards in the chain, each leaf corresponds with the subsequent
612                    // leaf's justify QC.
613                    .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
614            );
615
616            if let Err(err) = self
617                .append_decided_leaves(leaf.view_number(), chain, consumer)
618                .await
619            {
620                tracing::error!(
621                    "failed to save decided leaves, chain may not be up to date: {err:#}"
622                );
623                return;
624            }
625        }
626    }
627
628    /// Append decided leaves to persistent storage and emit a corresponding event.
629    ///
630    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
631    /// up to and including `view`. If available in persistent storage, full block payloads and VID
632    /// info will also be included for each leaf.
633    ///
634    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
635    /// collected The consumer's handling of this event is a prerequisite for the completion of
636    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
637    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
638    /// will eventually be passed to the consumer.
639    ///
640    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
641    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
642    /// processed twice, or the consumer may get two events which share a subset of their data.
643    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
644    /// idempotent.
645    ///
646    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
647    /// until the next decide, at which point all persisted leaves from the failed GC run will be
648    /// included in the event along with subsequently decided leaves.
649    ///
650    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
651    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
652    /// storage to long-term archival storage.
653    async fn append_decided_leaves(
654        &self,
655        decided_view: ViewNumber,
656        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
657        consumer: &(impl EventConsumer + 'static),
658    ) -> anyhow::Result<()>;
659
660    async fn load_anchor_leaf(
661        &self,
662    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
663    async fn append_vid(
664        &self,
665        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
666    ) -> anyhow::Result<()>;
667    // TODO: merge these two `append_vid`s
668    async fn append_vid2(
669        &self,
670        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
671    ) -> anyhow::Result<()>;
672    async fn append_da(
673        &self,
674        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
675        vid_commit: VidCommitment,
676    ) -> anyhow::Result<()>;
677    async fn record_action(
678        &self,
679        view: ViewNumber,
680        epoch: Option<EpochNumber>,
681        action: HotShotAction,
682    ) -> anyhow::Result<()>;
683
684    async fn append_quorum_proposal2(
685        &self,
686        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
687    ) -> anyhow::Result<()>;
688    async fn store_upgrade_certificate(
689        &self,
690        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
691    ) -> anyhow::Result<()>;
692    async fn migrate_consensus(&self) -> anyhow::Result<()> {
693        tracing::warn!("migrating consensus data...");
694
695        self.migrate_anchor_leaf().await?;
696        self.migrate_da_proposals().await?;
697        self.migrate_vid_shares().await?;
698        self.migrate_quorum_proposals().await?;
699        self.migrate_quorum_certificates().await?;
700
701        tracing::warn!("consensus storage has been migrated to new types");
702
703        Ok(())
704    }
705
706    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
707    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
708    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
709    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
710    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
711
712    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
713        match self.load_anchor_leaf().await? {
714            Some((leaf, _)) => Ok(leaf.view_number()),
715            None => Ok(ViewNumber::genesis()),
716        }
717    }
718
719    async fn store_next_epoch_quorum_certificate(
720        &self,
721        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
722    ) -> anyhow::Result<()>;
723
724    async fn load_next_epoch_quorum_certificate(
725        &self,
726    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
727
728    async fn append_da2(
729        &self,
730        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
731        vid_commit: VidCommitment,
732    ) -> anyhow::Result<()>;
733
734    async fn append_proposal2(
735        &self,
736        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
737    ) -> anyhow::Result<()> {
738        self.append_quorum_proposal2(proposal).await
739    }
740
741    async fn add_drb_result(
742        &self,
743        epoch: <SeqTypes as NodeType>::Epoch,
744        drb_result: DrbResult,
745    ) -> anyhow::Result<()>;
746    async fn add_epoch_root(
747        &self,
748        epoch: <SeqTypes as NodeType>::Epoch,
749        block_header: <SeqTypes as NodeType>::BlockHeader,
750    ) -> anyhow::Result<()>;
751    async fn add_state_cert(
752        &self,
753        state_cert: LightClientStateUpdateCertificate<SeqTypes>,
754    ) -> anyhow::Result<()>;
755}
756
757#[async_trait]
758pub trait EventConsumer: Debug + Send + Sync {
759    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
760}
761
762#[async_trait]
763impl<T> EventConsumer for Box<T>
764where
765    T: EventConsumer + ?Sized,
766{
767    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
768        (**self).handle_event(event).await
769    }
770}
771
772#[derive(Clone, Copy, Debug)]
773pub struct NullEventConsumer;
774
775#[async_trait]
776impl EventConsumer for NullEventConsumer {
777    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
778        Ok(())
779    }
780}
781
782#[async_trait]
783impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
784    async fn append_vid(
785        &self,
786        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
787    ) -> anyhow::Result<()> {
788        (**self).append_vid(proposal).await
789    }
790
791    async fn append_vid2(
792        &self,
793        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
794    ) -> anyhow::Result<()> {
795        (**self).append_vid2(proposal).await
796    }
797
798    async fn append_da(
799        &self,
800        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
801        vid_commit: VidCommitment,
802    ) -> anyhow::Result<()> {
803        (**self).append_da(proposal, vid_commit).await
804    }
805
806    async fn append_da2(
807        &self,
808        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
809        vid_commit: VidCommitment,
810    ) -> anyhow::Result<()> {
811        (**self).append_da2(proposal, vid_commit).await
812    }
813
814    async fn record_action(
815        &self,
816        view: ViewNumber,
817        epoch: Option<EpochNumber>,
818        action: HotShotAction,
819    ) -> anyhow::Result<()> {
820        (**self).record_action(view, epoch, action).await
821    }
822
823    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
824        Ok(())
825    }
826
827    async fn append_proposal(
828        &self,
829        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
830    ) -> anyhow::Result<()> {
831        (**self)
832            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
833            .await
834    }
835
836    async fn append_proposal2(
837        &self,
838        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
839    ) -> anyhow::Result<()> {
840        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
841            convert_proposal(proposal.clone());
842        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
843    }
844
845    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
846        Ok(())
847    }
848
849    async fn update_decided_upgrade_certificate(
850        &self,
851        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
852    ) -> anyhow::Result<()> {
853        (**self)
854            .store_upgrade_certificate(decided_upgrade_certificate)
855            .await
856    }
857
858    async fn add_drb_result(
859        &self,
860        epoch: <SeqTypes as NodeType>::Epoch,
861        drb_result: DrbResult,
862    ) -> anyhow::Result<()> {
863        (**self).add_drb_result(epoch, drb_result).await
864    }
865
866    async fn add_epoch_root(
867        &self,
868        epoch: <SeqTypes as NodeType>::Epoch,
869        block_header: <SeqTypes as NodeType>::BlockHeader,
870    ) -> anyhow::Result<()> {
871        (**self).add_epoch_root(epoch, block_header).await
872    }
873
874    async fn update_state_cert(
875        &self,
876        state_cert: LightClientStateUpdateCertificate<SeqTypes>,
877    ) -> anyhow::Result<()> {
878        (**self).add_state_cert(state_cert).await
879    }
880}
881
882/// Data that can be deserialized from a subslice of namespace payload bytes.
883///
884/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
885/// namespace payload bytes to read.
886pub trait FromNsPayloadBytes<'a> {
887    /// Deserialize `Self` from namespace payload bytes.
888    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
889}
890
891/// Specifies a subslice of namespace payload bytes to read.
892///
893/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
894/// deserialized from that subslice of bytes.
895pub trait NsPayloadBytesRange<'a> {
896    type Output: FromNsPayloadBytes<'a>;
897
898    /// Range relative to this ns payload
899    fn ns_payload_range(&self) -> Range<usize>;
900}
901
902/// Types which can be deserialized from either integers or strings.
903///
904/// Some types can be represented as an integer or a string in human-readable formats like JSON or
905/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
906/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
907/// to derive this user-friendly serialization.
908///
909/// These types are assumed to have an efficient representation as an integral type in Rust --
910/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
911/// encoding. With human readable encodings, serialization is always to a string.
912pub trait FromStringOrInteger: Sized {
913    type Binary: Serialize + DeserializeOwned;
914    type Integer: Serialize + DeserializeOwned;
915
916    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
917    fn from_string(s: String) -> anyhow::Result<Self>;
918    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
919
920    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
921    fn to_string(&self) -> anyhow::Result<String>;
922}