espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::U256;
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17    },
18    drb::{DrbInput, DrbResult},
19    event::{HotShotAction, LeafInfo},
20    message::{convert_proposal, Proposal},
21    simple_certificate::{
22        LightClientStateUpdateCertificate, NextEpochQuorumCertificate2, QuorumCertificate,
23        QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::HSStakeTable,
26    traits::{
27        metrics::Metrics,
28        node_implementation::{ConsensusTime, NodeType, Versions},
29        storage::Storage,
30        ValidatedState as HotShotState,
31    },
32    utils::genesis_epoch_from_version,
33};
34use serde::{de::DeserializeOwned, Serialize};
35
36use super::{
37    impls::NodeState,
38    utils::BackoffParams,
39    v0_1::{RewardAccount, RewardAccountProof, RewardMerkleCommitment},
40    v0_3::{EventKey, IndexedStake, StakeTableEvent},
41};
42use crate::{
43    v0::impls::ValidatedState, v0_3::ChainConfig, BlockMerkleTree, Event, FeeAccount,
44    FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig, SeqTypes, ValidatorMap,
45};
46
47#[async_trait]
48pub trait StateCatchup: Send + Sync {
49    /// Fetch the leaf at the given height without retrying on transient errors.
50    async fn try_fetch_leaf(
51        &self,
52        retry: usize,
53        height: u64,
54        stake_table: HSStakeTable<SeqTypes>,
55        success_threshold: U256,
56    ) -> anyhow::Result<Leaf2>;
57
58    /// Fetch the leaf at the given height, retrying on transient errors.
59    async fn fetch_leaf(
60        &self,
61        height: u64,
62        stake_table: HSStakeTable<SeqTypes>,
63        success_threshold: U256,
64    ) -> anyhow::Result<Leaf2> {
65        self.backoff()
66            .retry(self, |provider, retry| {
67                let stake_table_clone = stake_table.clone();
68                async move {
69                    provider
70                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
71                        .await
72                }
73                .boxed()
74            })
75            .await
76    }
77
78    /// Fetch the given list of accounts without retrying on transient errors.
79    async fn try_fetch_accounts(
80        &self,
81        retry: usize,
82        instance: &NodeState,
83        height: u64,
84        view: ViewNumber,
85        fee_merkle_tree_root: FeeMerkleCommitment,
86        accounts: &[FeeAccount],
87    ) -> anyhow::Result<Vec<FeeAccountProof>>;
88
89    /// Fetch the given list of accounts, retrying on transient errors.
90    async fn fetch_accounts(
91        &self,
92        instance: &NodeState,
93        height: u64,
94        view: ViewNumber,
95        fee_merkle_tree_root: FeeMerkleCommitment,
96        accounts: Vec<FeeAccount>,
97    ) -> anyhow::Result<Vec<FeeAccountProof>> {
98        self.backoff()
99            .retry(self, |provider, retry| {
100                let accounts = &accounts;
101                async move {
102                    provider
103                        .try_fetch_accounts(
104                            retry,
105                            instance,
106                            height,
107                            view,
108                            fee_merkle_tree_root,
109                            accounts,
110                        )
111                        .await
112                        .map_err(|err| {
113                            err.context(format!(
114                                "fetching accounts {accounts:?}, height {height}, view {view}"
115                            ))
116                        })
117                }
118                .boxed()
119            })
120            .await
121    }
122
123    /// Fetch and remember the blocks frontier without retrying on transient errors.
124    async fn try_remember_blocks_merkle_tree(
125        &self,
126        retry: usize,
127        instance: &NodeState,
128        height: u64,
129        view: ViewNumber,
130        mt: &mut BlockMerkleTree,
131    ) -> anyhow::Result<()>;
132
133    /// Fetch and remember the blocks frontier, retrying on transient errors.
134    async fn remember_blocks_merkle_tree(
135        &self,
136        instance: &NodeState,
137        height: u64,
138        view: ViewNumber,
139        mt: &mut BlockMerkleTree,
140    ) -> anyhow::Result<()> {
141        self.backoff()
142            .retry(mt, |mt, retry| {
143                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
144                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
145                    .boxed()
146            })
147            .await
148    }
149
150    /// Fetch the chain config without retrying on transient errors.
151    async fn try_fetch_chain_config(
152        &self,
153        retry: usize,
154        commitment: Commitment<ChainConfig>,
155    ) -> anyhow::Result<ChainConfig>;
156
157    /// Fetch the chain config, retrying on transient errors.
158    async fn fetch_chain_config(
159        &self,
160        commitment: Commitment<ChainConfig>,
161    ) -> anyhow::Result<ChainConfig> {
162        self.backoff()
163            .retry(self, |provider, retry| {
164                provider
165                    .try_fetch_chain_config(retry, commitment)
166                    .map_err(|err| err.context("fetching chain config"))
167                    .boxed()
168            })
169            .await
170    }
171
172    /// Fetch the given list of reward accounts without retrying on transient errors.
173    async fn try_fetch_reward_accounts(
174        &self,
175        retry: usize,
176        instance: &NodeState,
177        height: u64,
178        view: ViewNumber,
179        reward_merkle_tree_root: RewardMerkleCommitment,
180        accounts: &[RewardAccount],
181    ) -> anyhow::Result<Vec<RewardAccountProof>>;
182
183    /// Fetch the given list of reward accounts, retrying on transient errors.
184    async fn fetch_reward_accounts(
185        &self,
186        instance: &NodeState,
187        height: u64,
188        view: ViewNumber,
189        reward_merkle_tree_root: RewardMerkleCommitment,
190        accounts: Vec<RewardAccount>,
191    ) -> anyhow::Result<Vec<RewardAccountProof>> {
192        self.backoff()
193            .retry(self, |provider, retry| {
194                let accounts = &accounts;
195                async move {
196                    provider
197                        .try_fetch_reward_accounts(
198                            retry,
199                            instance,
200                            height,
201                            view,
202                            reward_merkle_tree_root,
203                            accounts,
204                        )
205                        .await
206                        .map_err(|err| {
207                            err.context(format!(
208                                "fetching reward accounts {accounts:?}, height {height}, view {view}"
209                            ))
210                        })
211                }
212                .boxed()
213            })
214            .await
215    }
216
217    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
218    fn is_local(&self) -> bool;
219
220    /// Returns the backoff parameters for the catchup provider.
221    fn backoff(&self) -> &BackoffParams;
222
223    /// Returns the name of the catchup provider.
224    fn name(&self) -> String;
225}
226
227#[async_trait]
228impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
229    async fn try_fetch_leaf(
230        &self,
231        retry: usize,
232        height: u64,
233        stake_table: HSStakeTable<SeqTypes>,
234        success_threshold: U256,
235    ) -> anyhow::Result<Leaf2> {
236        (**self)
237            .try_fetch_leaf(retry, height, stake_table, success_threshold)
238            .await
239    }
240
241    async fn fetch_leaf(
242        &self,
243        height: u64,
244        stake_table: HSStakeTable<SeqTypes>,
245        success_threshold: U256,
246    ) -> anyhow::Result<Leaf2> {
247        (**self)
248            .fetch_leaf(height, stake_table, success_threshold)
249            .await
250    }
251    async fn try_fetch_accounts(
252        &self,
253        retry: usize,
254        instance: &NodeState,
255        height: u64,
256        view: ViewNumber,
257        fee_merkle_tree_root: FeeMerkleCommitment,
258        accounts: &[FeeAccount],
259    ) -> anyhow::Result<Vec<FeeAccountProof>> {
260        (**self)
261            .try_fetch_accounts(
262                retry,
263                instance,
264                height,
265                view,
266                fee_merkle_tree_root,
267                accounts,
268            )
269            .await
270    }
271
272    async fn fetch_accounts(
273        &self,
274        instance: &NodeState,
275        height: u64,
276        view: ViewNumber,
277        fee_merkle_tree_root: FeeMerkleCommitment,
278        accounts: Vec<FeeAccount>,
279    ) -> anyhow::Result<Vec<FeeAccountProof>> {
280        (**self)
281            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
282            .await
283    }
284
285    async fn try_remember_blocks_merkle_tree(
286        &self,
287        retry: usize,
288        instance: &NodeState,
289        height: u64,
290        view: ViewNumber,
291        mt: &mut BlockMerkleTree,
292    ) -> anyhow::Result<()> {
293        (**self)
294            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
295            .await
296    }
297
298    async fn remember_blocks_merkle_tree(
299        &self,
300        instance: &NodeState,
301        height: u64,
302        view: ViewNumber,
303        mt: &mut BlockMerkleTree,
304    ) -> anyhow::Result<()> {
305        (**self)
306            .remember_blocks_merkle_tree(instance, height, view, mt)
307            .await
308    }
309
310    async fn try_fetch_chain_config(
311        &self,
312        retry: usize,
313        commitment: Commitment<ChainConfig>,
314    ) -> anyhow::Result<ChainConfig> {
315        (**self).try_fetch_chain_config(retry, commitment).await
316    }
317
318    async fn fetch_chain_config(
319        &self,
320        commitment: Commitment<ChainConfig>,
321    ) -> anyhow::Result<ChainConfig> {
322        (**self).fetch_chain_config(commitment).await
323    }
324
325    async fn try_fetch_reward_accounts(
326        &self,
327        retry: usize,
328        instance: &NodeState,
329        height: u64,
330        view: ViewNumber,
331        reward_merkle_tree_root: RewardMerkleCommitment,
332        accounts: &[RewardAccount],
333    ) -> anyhow::Result<Vec<RewardAccountProof>> {
334        (**self)
335            .try_fetch_reward_accounts(
336                retry,
337                instance,
338                height,
339                view,
340                reward_merkle_tree_root,
341                accounts,
342            )
343            .await
344    }
345
346    async fn fetch_reward_accounts(
347        &self,
348        instance: &NodeState,
349        height: u64,
350        view: ViewNumber,
351        reward_merkle_tree_root: RewardMerkleCommitment,
352        accounts: Vec<RewardAccount>,
353    ) -> anyhow::Result<Vec<RewardAccountProof>> {
354        (**self)
355            .fetch_reward_accounts(instance, height, view, reward_merkle_tree_root, accounts)
356            .await
357    }
358
359    fn backoff(&self) -> &BackoffParams {
360        (**self).backoff()
361    }
362
363    fn name(&self) -> String {
364        (**self).name()
365    }
366
367    fn is_local(&self) -> bool {
368        (**self).is_local()
369    }
370}
371
372#[async_trait]
373pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
374    type Persistence: SequencerPersistence + MembershipPersistence;
375
376    fn set_view_retention(&mut self, view_retention: u64);
377    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
378    async fn reset(self) -> anyhow::Result<()>;
379}
380
381/// Determine the read state based on the queried block range.
382// - If the persistence returned events up to the requested block, the read is complete.
383/// - Otherwise, indicate that the read is up to the last processed block.
384pub enum EventsPersistenceRead {
385    Complete,
386    UntilL1Block(u64),
387}
388
389#[async_trait]
390/// Trait used by `Memberships` implementations to interact with persistence layer.
391pub trait MembershipPersistence: Send + Sync + 'static {
392    /// Load stake table for epoch from storage
393    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<ValidatorMap>>;
394
395    /// Load stake tables for storage for latest `n` known epochs
396    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
397
398    /// Store stake table at `epoch` in the persistence layer
399    async fn store_stake(&self, epoch: EpochNumber, stake: ValidatorMap) -> anyhow::Result<()>;
400
401    async fn store_events(
402        &self,
403        l1_finalized: u64,
404        events: Vec<(EventKey, StakeTableEvent)>,
405    ) -> anyhow::Result<()>;
406    async fn load_events(
407        &self,
408        l1_finalized: u64,
409    ) -> anyhow::Result<(
410        Option<EventsPersistenceRead>,
411        Vec<(EventKey, StakeTableEvent)>,
412    )>;
413}
414
415#[async_trait]
416pub trait SequencerPersistence:
417    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage
418{
419    /// Use this storage as a state catchup backend, if supported.
420    fn into_catchup_provider(
421        self,
422        _backoff: BackoffParams,
423    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
424        bail!("state catchup is not implemented for this persistence type");
425    }
426
427    /// Load the orchestrator config from storage.
428    ///
429    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
430    /// `Err` if it could not be determined whether a config exists or not.
431    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
432
433    /// Save the orchestrator config to storage.
434    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
435
436    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
437    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
438
439    /// Load the view to restart from.
440    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
441
442    /// Load the proposals saved by consensus
443    async fn load_quorum_proposals(
444        &self,
445    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
446
447    async fn load_quorum_proposal(
448        &self,
449        view: ViewNumber,
450    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
451
452    async fn load_vid_share(
453        &self,
454        view: ViewNumber,
455    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
456    async fn load_da_proposal(
457        &self,
458        view: ViewNumber,
459    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
460    async fn load_upgrade_certificate(
461        &self,
462    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
463    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
464    async fn load_state_cert(
465        &self,
466    ) -> anyhow::Result<Option<LightClientStateUpdateCertificate<SeqTypes>>>;
467
468    /// Load the latest known consensus state.
469    ///
470    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
471    /// if there is no saved state). Also returns the anchor view number, which can be used as a
472    /// reference point to process any events which were not processed before a previous shutdown,
473    /// if applicable,.
474    async fn load_consensus_state<V: Versions>(
475        &self,
476        state: NodeState,
477    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
478        let genesis_validated_state = ValidatedState::genesis(&state).0;
479        let highest_voted_view = match self
480            .load_latest_acted_view()
481            .await
482            .context("loading last voted view")?
483        {
484            Some(view) => {
485                tracing::info!(?view, "starting with last actioned view");
486                view
487            },
488            None => {
489                tracing::info!("no saved view, starting from genesis");
490                ViewNumber::genesis()
491            },
492        };
493
494        let restart_view = match self
495            .load_restart_view()
496            .await
497            .context("loading restart view")?
498        {
499            Some(view) => {
500                tracing::info!(?view, "starting from saved view");
501                view
502            },
503            None => {
504                tracing::info!("no saved view, starting from genesis");
505                ViewNumber::genesis()
506            },
507        };
508        let next_epoch_high_qc = self
509            .load_next_epoch_quorum_certificate()
510            .await
511            .context("loading next epoch qc")?;
512        let (leaf, high_qc, anchor_view) = match self
513            .load_anchor_leaf()
514            .await
515            .context("loading anchor leaf")?
516        {
517            Some((leaf, high_qc)) => {
518                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
519                ensure!(
520                    leaf.view_number() == high_qc.view_number,
521                    format!(
522                        "loaded anchor leaf from view {}, but high QC is from view {}",
523                        leaf.view_number(),
524                        high_qc.view_number
525                    )
526                );
527
528                let anchor_view = leaf.view_number();
529                (leaf, high_qc, Some(anchor_view))
530            },
531            None => {
532                tracing::info!("no saved leaf, starting from genesis leaf");
533                (
534                    hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
535                        .await,
536                    QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
537                    None,
538                )
539            },
540        };
541        let validated_state = if leaf.block_header().height() == 0 {
542            // If we are starting from genesis, we can provide the full state.
543            genesis_validated_state
544        } else {
545            // Otherwise, we will have to construct a sparse state and fetch missing data during
546            // catchup.
547            ValidatedState::from_header(leaf.block_header())
548        };
549
550        // If we are not starting from genesis, we start from the view following the maximum view
551        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
552        // starting in a view in which we had already voted before the restart, and prevents
553        // unnecessary catchup from starting in a view earlier than the anchor leaf.
554        let restart_view = max(restart_view, leaf.view_number());
555        // TODO:
556        let epoch = genesis_epoch_from_version::<V, SeqTypes>();
557
558        let config = self.load_config().await.context("loading config")?;
559        let epoch_height = config
560            .as_ref()
561            .map(|c| c.config.epoch_height)
562            .unwrap_or_default();
563        let epoch_start_block = config
564            .as_ref()
565            .map(|c| c.config.epoch_start_block)
566            .unwrap_or_default();
567
568        let saved_proposals = self
569            .load_quorum_proposals()
570            .await
571            .context("loading saved proposals")?;
572
573        let upgrade_certificate = self
574            .load_upgrade_certificate()
575            .await
576            .context("loading upgrade certificate")?;
577
578        let start_epoch_info = self
579            .load_start_epoch_info()
580            .await
581            .context("loading start epoch info")?;
582
583        let state_cert = self
584            .load_state_cert()
585            .await
586            .context("loading light client state update certificate")?;
587
588        tracing::info!(
589            ?leaf,
590            ?restart_view,
591            ?epoch,
592            ?high_qc,
593            ?validated_state,
594            ?state_cert,
595            "loaded consensus state"
596        );
597
598        Ok((
599            HotShotInitializer {
600                instance_state: state,
601                epoch_height,
602                epoch_start_block,
603                anchor_leaf: leaf,
604                anchor_state: Arc::new(validated_state),
605                anchor_state_delta: None,
606                start_view: restart_view,
607                start_epoch: epoch,
608                last_actioned_view: highest_voted_view,
609                saved_proposals,
610                high_qc,
611                next_epoch_high_qc,
612                decided_upgrade_certificate: upgrade_certificate,
613                undecided_leaves: Default::default(),
614                undecided_state: Default::default(),
615                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
616                start_epoch_info,
617                state_cert,
618            },
619            anchor_view,
620        ))
621    }
622
623    /// Update storage based on an event from consensus.
624    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
625        if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
626            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
627                // No new leaves.
628                return;
629            };
630
631            // Associate each decided leaf with a QC.
632            let chain = leaf_chain.iter().zip(
633                // The first (most recent) leaf corresponds to the QC triggering the decide event.
634                std::iter::once((**qc).clone())
635                    // Moving backwards in the chain, each leaf corresponds with the subsequent
636                    // leaf's justify QC.
637                    .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
638            );
639
640            if let Err(err) = self
641                .append_decided_leaves(leaf.view_number(), chain, consumer)
642                .await
643            {
644                tracing::error!(
645                    "failed to save decided leaves, chain may not be up to date: {err:#}"
646                );
647                return;
648            }
649        }
650    }
651
652    /// Append decided leaves to persistent storage and emit a corresponding event.
653    ///
654    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
655    /// up to and including `view`. If available in persistent storage, full block payloads and VID
656    /// info will also be included for each leaf.
657    ///
658    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
659    /// collected The consumer's handling of this event is a prerequisite for the completion of
660    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
661    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
662    /// will eventually be passed to the consumer.
663    ///
664    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
665    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
666    /// processed twice, or the consumer may get two events which share a subset of their data.
667    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
668    /// idempotent.
669    ///
670    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
671    /// until the next decide, at which point all persisted leaves from the failed GC run will be
672    /// included in the event along with subsequently decided leaves.
673    ///
674    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
675    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
676    /// storage to long-term archival storage.
677    async fn append_decided_leaves(
678        &self,
679        decided_view: ViewNumber,
680        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
681        consumer: &(impl EventConsumer + 'static),
682    ) -> anyhow::Result<()>;
683
684    async fn load_anchor_leaf(
685        &self,
686    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
687    async fn append_vid(
688        &self,
689        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
690    ) -> anyhow::Result<()>;
691    // TODO: merge these two `append_vid`s
692    async fn append_vid2(
693        &self,
694        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
695    ) -> anyhow::Result<()>;
696    async fn append_da(
697        &self,
698        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
699        vid_commit: VidCommitment,
700    ) -> anyhow::Result<()>;
701    async fn record_action(
702        &self,
703        view: ViewNumber,
704        epoch: Option<EpochNumber>,
705        action: HotShotAction,
706    ) -> anyhow::Result<()>;
707
708    async fn append_quorum_proposal2(
709        &self,
710        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
711    ) -> anyhow::Result<()>;
712    async fn store_upgrade_certificate(
713        &self,
714        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
715    ) -> anyhow::Result<()>;
716    async fn migrate_consensus(&self) -> anyhow::Result<()> {
717        tracing::warn!("migrating consensus data...");
718
719        self.migrate_anchor_leaf().await?;
720        self.migrate_da_proposals().await?;
721        self.migrate_vid_shares().await?;
722        self.migrate_quorum_proposals().await?;
723        self.migrate_quorum_certificates().await?;
724
725        tracing::warn!("consensus storage has been migrated to new types");
726
727        Ok(())
728    }
729
730    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
731    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
732    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
733    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
734    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
735
736    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
737        match self.load_anchor_leaf().await? {
738            Some((leaf, _)) => Ok(leaf.view_number()),
739            None => Ok(ViewNumber::genesis()),
740        }
741    }
742
743    async fn store_next_epoch_quorum_certificate(
744        &self,
745        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
746    ) -> anyhow::Result<()>;
747
748    async fn load_next_epoch_quorum_certificate(
749        &self,
750    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
751
752    async fn append_da2(
753        &self,
754        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
755        vid_commit: VidCommitment,
756    ) -> anyhow::Result<()>;
757
758    async fn append_proposal2(
759        &self,
760        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
761    ) -> anyhow::Result<()> {
762        self.append_quorum_proposal2(proposal).await
763    }
764
765    async fn store_drb_result(
766        &self,
767        epoch: <SeqTypes as NodeType>::Epoch,
768        drb_result: DrbResult,
769    ) -> anyhow::Result<()>;
770    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
771    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
772    async fn store_epoch_root(
773        &self,
774        epoch: <SeqTypes as NodeType>::Epoch,
775        block_header: <SeqTypes as NodeType>::BlockHeader,
776    ) -> anyhow::Result<()>;
777    async fn add_state_cert(
778        &self,
779        state_cert: LightClientStateUpdateCertificate<SeqTypes>,
780    ) -> anyhow::Result<()>;
781
782    fn enable_metrics(&mut self, metrics: &dyn Metrics);
783}
784
785#[async_trait]
786pub trait EventConsumer: Debug + Send + Sync {
787    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
788}
789
790#[async_trait]
791impl<T> EventConsumer for Box<T>
792where
793    T: EventConsumer + ?Sized,
794{
795    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
796        (**self).handle_event(event).await
797    }
798}
799
800#[derive(Clone, Copy, Debug)]
801pub struct NullEventConsumer;
802
803#[async_trait]
804impl EventConsumer for NullEventConsumer {
805    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
806        Ok(())
807    }
808}
809
810#[async_trait]
811impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
812    async fn append_vid(
813        &self,
814        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
815    ) -> anyhow::Result<()> {
816        (**self).append_vid(proposal).await
817    }
818
819    async fn append_vid2(
820        &self,
821        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
822    ) -> anyhow::Result<()> {
823        (**self).append_vid2(proposal).await
824    }
825
826    async fn append_da(
827        &self,
828        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
829        vid_commit: VidCommitment,
830    ) -> anyhow::Result<()> {
831        (**self).append_da(proposal, vid_commit).await
832    }
833
834    async fn append_da2(
835        &self,
836        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
837        vid_commit: VidCommitment,
838    ) -> anyhow::Result<()> {
839        (**self).append_da2(proposal, vid_commit).await
840    }
841
842    async fn record_action(
843        &self,
844        view: ViewNumber,
845        epoch: Option<EpochNumber>,
846        action: HotShotAction,
847    ) -> anyhow::Result<()> {
848        (**self).record_action(view, epoch, action).await
849    }
850
851    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
852        Ok(())
853    }
854
855    async fn append_proposal(
856        &self,
857        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
858    ) -> anyhow::Result<()> {
859        (**self)
860            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
861            .await
862    }
863
864    async fn append_proposal2(
865        &self,
866        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
867    ) -> anyhow::Result<()> {
868        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
869            convert_proposal(proposal.clone());
870        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
871    }
872
873    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
874        Ok(())
875    }
876
877    async fn update_decided_upgrade_certificate(
878        &self,
879        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
880    ) -> anyhow::Result<()> {
881        (**self)
882            .store_upgrade_certificate(decided_upgrade_certificate)
883            .await
884    }
885
886    async fn store_drb_result(
887        &self,
888        epoch: <SeqTypes as NodeType>::Epoch,
889        drb_result: DrbResult,
890    ) -> anyhow::Result<()> {
891        (**self).store_drb_result(epoch, drb_result).await
892    }
893
894    async fn store_epoch_root(
895        &self,
896        epoch: <SeqTypes as NodeType>::Epoch,
897        block_header: <SeqTypes as NodeType>::BlockHeader,
898    ) -> anyhow::Result<()> {
899        (**self).store_epoch_root(epoch, block_header).await
900    }
901
902    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
903        (**self).store_drb_input(drb_input).await
904    }
905
906    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
907        (**self).load_drb_input(epoch).await
908    }
909
910    async fn update_state_cert(
911        &self,
912        state_cert: LightClientStateUpdateCertificate<SeqTypes>,
913    ) -> anyhow::Result<()> {
914        (**self).add_state_cert(state_cert).await
915    }
916}
917
918/// Data that can be deserialized from a subslice of namespace payload bytes.
919///
920/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
921/// namespace payload bytes to read.
922pub trait FromNsPayloadBytes<'a> {
923    /// Deserialize `Self` from namespace payload bytes.
924    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
925}
926
927/// Specifies a subslice of namespace payload bytes to read.
928///
929/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
930/// deserialized from that subslice of bytes.
931pub trait NsPayloadBytesRange<'a> {
932    type Output: FromNsPayloadBytes<'a>;
933
934    /// Range relative to this ns payload
935    fn ns_payload_range(&self) -> Range<usize>;
936}
937
938/// Types which can be deserialized from either integers or strings.
939///
940/// Some types can be represented as an integer or a string in human-readable formats like JSON or
941/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
942/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
943/// to derive this user-friendly serialization.
944///
945/// These types are assumed to have an efficient representation as an integral type in Rust --
946/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
947/// encoding. With human readable encodings, serialization is always to a string.
948pub trait FromStringOrInteger: Sized {
949    type Binary: Serialize + DeserializeOwned;
950    type Integer: Serialize + DeserializeOwned;
951
952    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
953    fn from_string(s: String) -> anyhow::Result<Self>;
954    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
955
956    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
957    fn to_string(&self) -> anyhow::Result<String>;
958}