espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::U256;
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17    },
18    drb::{DrbInput, DrbResult},
19    event::{HotShotAction, LeafInfo},
20    message::{convert_proposal, Proposal},
21    simple_certificate::{
22        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
23        QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::HSStakeTable,
26    traits::{
27        metrics::Metrics,
28        node_implementation::{ConsensusTime, NodeType, Versions},
29        storage::Storage,
30        ValidatedState as HotShotState,
31    },
32    utils::genesis_epoch_from_version,
33};
34use serde::{de::DeserializeOwned, Serialize};
35
36use super::{
37    impls::NodeState,
38    utils::BackoffParams,
39    v0_3::{EventKey, IndexedStake, StakeTableEvent},
40};
41use crate::{
42    v0::impls::{StakeTableHash, ValidatedState},
43    v0_3::{
44        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
45    },
46    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
47    BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
48    SeqTypes, ValidatorMap,
49};
50
51#[async_trait]
52pub trait StateCatchup: Send + Sync {
53    /// Fetch the leaf at the given height without retrying on transient errors.
54    async fn try_fetch_leaf(
55        &self,
56        retry: usize,
57        height: u64,
58        stake_table: HSStakeTable<SeqTypes>,
59        success_threshold: U256,
60    ) -> anyhow::Result<Leaf2>;
61
62    /// Fetch the leaf at the given height, retrying on transient errors.
63    async fn fetch_leaf(
64        &self,
65        height: u64,
66        stake_table: HSStakeTable<SeqTypes>,
67        success_threshold: U256,
68    ) -> anyhow::Result<Leaf2> {
69        self.backoff()
70            .retry(self, |provider, retry| {
71                let stake_table_clone = stake_table.clone();
72                async move {
73                    provider
74                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
75                        .await
76                }
77                .boxed()
78            })
79            .await
80    }
81
82    /// Fetch the given list of accounts without retrying on transient errors.
83    async fn try_fetch_accounts(
84        &self,
85        retry: usize,
86        instance: &NodeState,
87        height: u64,
88        view: ViewNumber,
89        fee_merkle_tree_root: FeeMerkleCommitment,
90        accounts: &[FeeAccount],
91    ) -> anyhow::Result<Vec<FeeAccountProof>>;
92
93    /// Fetch the given list of accounts, retrying on transient errors.
94    async fn fetch_accounts(
95        &self,
96        instance: &NodeState,
97        height: u64,
98        view: ViewNumber,
99        fee_merkle_tree_root: FeeMerkleCommitment,
100        accounts: Vec<FeeAccount>,
101    ) -> anyhow::Result<Vec<FeeAccountProof>> {
102        self.backoff()
103            .retry(self, |provider, retry| {
104                let accounts = &accounts;
105                async move {
106                    provider
107                        .try_fetch_accounts(
108                            retry,
109                            instance,
110                            height,
111                            view,
112                            fee_merkle_tree_root,
113                            accounts,
114                        )
115                        .await
116                        .map_err(|err| {
117                            err.context(format!(
118                                "fetching accounts {accounts:?}, height {height}, view {view}"
119                            ))
120                        })
121                }
122                .boxed()
123            })
124            .await
125    }
126
127    /// Fetch and remember the blocks frontier without retrying on transient errors.
128    async fn try_remember_blocks_merkle_tree(
129        &self,
130        retry: usize,
131        instance: &NodeState,
132        height: u64,
133        view: ViewNumber,
134        mt: &mut BlockMerkleTree,
135    ) -> anyhow::Result<()>;
136
137    /// Fetch and remember the blocks frontier, retrying on transient errors.
138    async fn remember_blocks_merkle_tree(
139        &self,
140        instance: &NodeState,
141        height: u64,
142        view: ViewNumber,
143        mt: &mut BlockMerkleTree,
144    ) -> anyhow::Result<()> {
145        self.backoff()
146            .retry(mt, |mt, retry| {
147                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
148                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
149                    .boxed()
150            })
151            .await
152    }
153
154    /// Fetch the chain config without retrying on transient errors.
155    async fn try_fetch_chain_config(
156        &self,
157        retry: usize,
158        commitment: Commitment<ChainConfig>,
159    ) -> anyhow::Result<ChainConfig>;
160
161    /// Fetch the chain config, retrying on transient errors.
162    async fn fetch_chain_config(
163        &self,
164        commitment: Commitment<ChainConfig>,
165    ) -> anyhow::Result<ChainConfig> {
166        self.backoff()
167            .retry(self, |provider, retry| {
168                provider
169                    .try_fetch_chain_config(retry, commitment)
170                    .map_err(|err| err.context("fetching chain config"))
171                    .boxed()
172            })
173            .await
174    }
175
176    /// Fetch the given list of reward accounts without retrying on transient errors.
177    async fn try_fetch_reward_accounts_v2(
178        &self,
179        retry: usize,
180        instance: &NodeState,
181        height: u64,
182        view: ViewNumber,
183        reward_merkle_tree_root: RewardMerkleCommitmentV2,
184        accounts: &[RewardAccountV2],
185    ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
186
187    /// Fetch the given list of reward accounts, retrying on transient errors.
188    async fn fetch_reward_accounts_v2(
189        &self,
190        instance: &NodeState,
191        height: u64,
192        view: ViewNumber,
193        reward_merkle_tree_root: RewardMerkleCommitmentV2,
194        accounts: Vec<RewardAccountV2>,
195    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
196        self.backoff()
197            .retry(self, |provider, retry| {
198                let accounts = &accounts;
199                async move {
200                    provider
201                        .try_fetch_reward_accounts_v2(
202                            retry,
203                            instance,
204                            height,
205                            view,
206                            reward_merkle_tree_root,
207                            accounts,
208                        )
209                        .await
210                        .map_err(|err| {
211                            err.context(format!(
212                                "fetching reward accounts {accounts:?}, height {height}, view \
213                                 {view}"
214                            ))
215                        })
216                }
217                .boxed()
218            })
219            .await
220    }
221
222    /// Fetch the given list of reward accounts without retrying on transient errors.
223    async fn try_fetch_reward_accounts_v1(
224        &self,
225        retry: usize,
226        instance: &NodeState,
227        height: u64,
228        view: ViewNumber,
229        reward_merkle_tree_root: RewardMerkleCommitmentV1,
230        accounts: &[RewardAccountV1],
231    ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
232
233    /// Fetch the given list of reward accounts, retrying on transient errors.
234    async fn fetch_reward_accounts_v1(
235        &self,
236        instance: &NodeState,
237        height: u64,
238        view: ViewNumber,
239        reward_merkle_tree_root: RewardMerkleCommitmentV1,
240        accounts: Vec<RewardAccountV1>,
241    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
242        self.backoff()
243            .retry(self, |provider, retry| {
244                let accounts = &accounts;
245                async move {
246                    provider
247                        .try_fetch_reward_accounts_v1(
248                            retry,
249                            instance,
250                            height,
251                            view,
252                            reward_merkle_tree_root,
253                            accounts,
254                        )
255                        .await
256                        .map_err(|err| {
257                            err.context(format!(
258                                "fetching v1 reward accounts {accounts:?}, height {height}, view \
259                                 {view}"
260                            ))
261                        })
262                }
263                .boxed()
264            })
265            .await
266    }
267
268    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
269    fn is_local(&self) -> bool;
270
271    /// Returns the backoff parameters for the catchup provider.
272    fn backoff(&self) -> &BackoffParams;
273
274    /// Returns the name of the catchup provider.
275    fn name(&self) -> String;
276}
277
278#[async_trait]
279impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
280    async fn try_fetch_leaf(
281        &self,
282        retry: usize,
283        height: u64,
284        stake_table: HSStakeTable<SeqTypes>,
285        success_threshold: U256,
286    ) -> anyhow::Result<Leaf2> {
287        (**self)
288            .try_fetch_leaf(retry, height, stake_table, success_threshold)
289            .await
290    }
291
292    async fn fetch_leaf(
293        &self,
294        height: u64,
295        stake_table: HSStakeTable<SeqTypes>,
296        success_threshold: U256,
297    ) -> anyhow::Result<Leaf2> {
298        (**self)
299            .fetch_leaf(height, stake_table, success_threshold)
300            .await
301    }
302    async fn try_fetch_accounts(
303        &self,
304        retry: usize,
305        instance: &NodeState,
306        height: u64,
307        view: ViewNumber,
308        fee_merkle_tree_root: FeeMerkleCommitment,
309        accounts: &[FeeAccount],
310    ) -> anyhow::Result<Vec<FeeAccountProof>> {
311        (**self)
312            .try_fetch_accounts(
313                retry,
314                instance,
315                height,
316                view,
317                fee_merkle_tree_root,
318                accounts,
319            )
320            .await
321    }
322
323    async fn fetch_accounts(
324        &self,
325        instance: &NodeState,
326        height: u64,
327        view: ViewNumber,
328        fee_merkle_tree_root: FeeMerkleCommitment,
329        accounts: Vec<FeeAccount>,
330    ) -> anyhow::Result<Vec<FeeAccountProof>> {
331        (**self)
332            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
333            .await
334    }
335
336    async fn try_remember_blocks_merkle_tree(
337        &self,
338        retry: usize,
339        instance: &NodeState,
340        height: u64,
341        view: ViewNumber,
342        mt: &mut BlockMerkleTree,
343    ) -> anyhow::Result<()> {
344        (**self)
345            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
346            .await
347    }
348
349    async fn remember_blocks_merkle_tree(
350        &self,
351        instance: &NodeState,
352        height: u64,
353        view: ViewNumber,
354        mt: &mut BlockMerkleTree,
355    ) -> anyhow::Result<()> {
356        (**self)
357            .remember_blocks_merkle_tree(instance, height, view, mt)
358            .await
359    }
360
361    async fn try_fetch_chain_config(
362        &self,
363        retry: usize,
364        commitment: Commitment<ChainConfig>,
365    ) -> anyhow::Result<ChainConfig> {
366        (**self).try_fetch_chain_config(retry, commitment).await
367    }
368
369    async fn fetch_chain_config(
370        &self,
371        commitment: Commitment<ChainConfig>,
372    ) -> anyhow::Result<ChainConfig> {
373        (**self).fetch_chain_config(commitment).await
374    }
375
376    async fn try_fetch_reward_accounts_v2(
377        &self,
378        retry: usize,
379        instance: &NodeState,
380        height: u64,
381        view: ViewNumber,
382        reward_merkle_tree_root: RewardMerkleCommitmentV2,
383        accounts: &[RewardAccountV2],
384    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
385        (**self)
386            .try_fetch_reward_accounts_v2(
387                retry,
388                instance,
389                height,
390                view,
391                reward_merkle_tree_root,
392                accounts,
393            )
394            .await
395    }
396
397    async fn fetch_reward_accounts_v2(
398        &self,
399        instance: &NodeState,
400        height: u64,
401        view: ViewNumber,
402        reward_merkle_tree_root: RewardMerkleCommitmentV2,
403        accounts: Vec<RewardAccountV2>,
404    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
405        (**self)
406            .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
407            .await
408    }
409
410    async fn try_fetch_reward_accounts_v1(
411        &self,
412        retry: usize,
413        instance: &NodeState,
414        height: u64,
415        view: ViewNumber,
416        reward_merkle_tree_root: RewardMerkleCommitmentV1,
417        accounts: &[RewardAccountV1],
418    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
419        (**self)
420            .try_fetch_reward_accounts_v1(
421                retry,
422                instance,
423                height,
424                view,
425                reward_merkle_tree_root,
426                accounts,
427            )
428            .await
429    }
430
431    async fn fetch_reward_accounts_v1(
432        &self,
433        instance: &NodeState,
434        height: u64,
435        view: ViewNumber,
436        reward_merkle_tree_root: RewardMerkleCommitmentV1,
437        accounts: Vec<RewardAccountV1>,
438    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
439        (**self)
440            .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
441            .await
442    }
443
444    fn backoff(&self) -> &BackoffParams {
445        (**self).backoff()
446    }
447
448    fn name(&self) -> String {
449        (**self).name()
450    }
451
452    fn is_local(&self) -> bool {
453        (**self).is_local()
454    }
455}
456
457#[async_trait]
458pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
459    type Persistence: SequencerPersistence + MembershipPersistence;
460
461    fn set_view_retention(&mut self, view_retention: u64);
462    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
463    async fn reset(self) -> anyhow::Result<()>;
464}
465
466/// Determine the read state based on the queried block range.
467// - If the persistence returned events up to the requested block, the read is complete.
468/// - Otherwise, indicate that the read is up to the last processed block.
469pub enum EventsPersistenceRead {
470    Complete,
471    UntilL1Block(u64),
472}
473
474#[async_trait]
475/// Trait used by `Memberships` implementations to interact with persistence layer.
476pub trait MembershipPersistence: Send + Sync + 'static {
477    /// Load stake table for epoch from storage
478    async fn load_stake(
479        &self,
480        epoch: EpochNumber,
481    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
482
483    /// Load stake tables for storage for latest `n` known epochs
484    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
485
486    /// Store stake table at `epoch` in the persistence layer
487    async fn store_stake(
488        &self,
489        epoch: EpochNumber,
490        stake: ValidatorMap,
491        block_reward: Option<RewardAmount>,
492        stake_table_hash: Option<StakeTableHash>,
493    ) -> anyhow::Result<()>;
494
495    async fn store_events(
496        &self,
497        l1_finalized: u64,
498        events: Vec<(EventKey, StakeTableEvent)>,
499    ) -> anyhow::Result<()>;
500    async fn load_events(
501        &self,
502        l1_finalized: u64,
503    ) -> anyhow::Result<(
504        Option<EventsPersistenceRead>,
505        Vec<(EventKey, StakeTableEvent)>,
506    )>;
507}
508
509#[async_trait]
510pub trait SequencerPersistence:
511    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage
512{
513    /// Use this storage as a state catchup backend, if supported.
514    fn into_catchup_provider(
515        self,
516        _backoff: BackoffParams,
517    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
518        bail!("state catchup is not implemented for this persistence type");
519    }
520
521    /// Load the orchestrator config from storage.
522    ///
523    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
524    /// `Err` if it could not be determined whether a config exists or not.
525    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
526
527    /// Save the orchestrator config to storage.
528    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
529
530    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
531    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
532
533    /// Load the view to restart from.
534    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
535
536    /// Load the proposals saved by consensus
537    async fn load_quorum_proposals(
538        &self,
539    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
540
541    async fn load_quorum_proposal(
542        &self,
543        view: ViewNumber,
544    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
545
546    async fn load_vid_share(
547        &self,
548        view: ViewNumber,
549    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
550    async fn load_da_proposal(
551        &self,
552        view: ViewNumber,
553    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
554    async fn load_upgrade_certificate(
555        &self,
556    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
557    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
558    async fn load_state_cert(
559        &self,
560    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
561
562    /// Load the latest known consensus state.
563    ///
564    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
565    /// if there is no saved state). Also returns the anchor view number, which can be used as a
566    /// reference point to process any events which were not processed before a previous shutdown,
567    /// if applicable,.
568    async fn load_consensus_state<V: Versions>(
569        &self,
570        state: NodeState,
571    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
572        let genesis_validated_state = ValidatedState::genesis(&state).0;
573        let highest_voted_view = match self
574            .load_latest_acted_view()
575            .await
576            .context("loading last voted view")?
577        {
578            Some(view) => {
579                tracing::info!(?view, "starting with last actioned view");
580                view
581            },
582            None => {
583                tracing::info!("no saved view, starting from genesis");
584                ViewNumber::genesis()
585            },
586        };
587
588        let restart_view = match self
589            .load_restart_view()
590            .await
591            .context("loading restart view")?
592        {
593            Some(view) => {
594                tracing::info!(?view, "starting from saved view");
595                view
596            },
597            None => {
598                tracing::info!("no saved view, starting from genesis");
599                ViewNumber::genesis()
600            },
601        };
602        let next_epoch_high_qc = self
603            .load_next_epoch_quorum_certificate()
604            .await
605            .context("loading next epoch qc")?;
606        let (leaf, high_qc, anchor_view) = match self
607            .load_anchor_leaf()
608            .await
609            .context("loading anchor leaf")?
610        {
611            Some((leaf, high_qc)) => {
612                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
613                ensure!(
614                    leaf.view_number() == high_qc.view_number,
615                    format!(
616                        "loaded anchor leaf from view {}, but high QC is from view {}",
617                        leaf.view_number(),
618                        high_qc.view_number
619                    )
620                );
621
622                let anchor_view = leaf.view_number();
623                (leaf, high_qc, Some(anchor_view))
624            },
625            None => {
626                tracing::info!("no saved leaf, starting from genesis leaf");
627                (
628                    hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
629                        .await,
630                    QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
631                    None,
632                )
633            },
634        };
635        let validated_state = if leaf.block_header().height() == 0 {
636            // If we are starting from genesis, we can provide the full state.
637            genesis_validated_state
638        } else {
639            // Otherwise, we will have to construct a sparse state and fetch missing data during
640            // catchup.
641            ValidatedState::from_header(leaf.block_header())
642        };
643
644        // If we are not starting from genesis, we start from the view following the maximum view
645        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
646        // starting in a view in which we had already voted before the restart, and prevents
647        // unnecessary catchup from starting in a view earlier than the anchor leaf.
648        let restart_view = max(restart_view, leaf.view_number());
649        // TODO:
650        let epoch = genesis_epoch_from_version::<V, SeqTypes>();
651
652        let config = self.load_config().await.context("loading config")?;
653        let epoch_height = config
654            .as_ref()
655            .map(|c| c.config.epoch_height)
656            .unwrap_or_default();
657        let epoch_start_block = config
658            .as_ref()
659            .map(|c| c.config.epoch_start_block)
660            .unwrap_or_default();
661
662        let saved_proposals = self
663            .load_quorum_proposals()
664            .await
665            .context("loading saved proposals")?;
666
667        let upgrade_certificate = self
668            .load_upgrade_certificate()
669            .await
670            .context("loading upgrade certificate")?;
671
672        let start_epoch_info = self
673            .load_start_epoch_info()
674            .await
675            .context("loading start epoch info")?;
676
677        let state_cert = self
678            .load_state_cert()
679            .await
680            .context("loading light client state update certificate")?;
681
682        tracing::info!(
683            ?leaf,
684            ?restart_view,
685            ?epoch,
686            ?high_qc,
687            ?validated_state,
688            ?state_cert,
689            "loaded consensus state"
690        );
691
692        Ok((
693            HotShotInitializer {
694                instance_state: state,
695                epoch_height,
696                epoch_start_block,
697                anchor_leaf: leaf,
698                anchor_state: Arc::new(validated_state),
699                anchor_state_delta: None,
700                start_view: restart_view,
701                start_epoch: epoch,
702                last_actioned_view: highest_voted_view,
703                saved_proposals,
704                high_qc,
705                next_epoch_high_qc,
706                decided_upgrade_certificate: upgrade_certificate,
707                undecided_leaves: Default::default(),
708                undecided_state: Default::default(),
709                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
710                start_epoch_info,
711                state_cert,
712            },
713            anchor_view,
714        ))
715    }
716
717    /// Update storage based on an event from consensus.
718    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
719        if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
720            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
721                // No new leaves.
722                return;
723            };
724
725            // Associate each decided leaf with a QC.
726            let chain = leaf_chain.iter().zip(
727                // The first (most recent) leaf corresponds to the QC triggering the decide event.
728                std::iter::once((**qc).clone())
729                    // Moving backwards in the chain, each leaf corresponds with the subsequent
730                    // leaf's justify QC.
731                    .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
732            );
733
734            if let Err(err) = self
735                .append_decided_leaves(leaf.view_number(), chain, consumer)
736                .await
737            {
738                tracing::error!(
739                    "failed to save decided leaves, chain may not be up to date: {err:#}"
740                );
741                return;
742            }
743        }
744    }
745
746    /// Append decided leaves to persistent storage and emit a corresponding event.
747    ///
748    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
749    /// up to and including `view`. If available in persistent storage, full block payloads and VID
750    /// info will also be included for each leaf.
751    ///
752    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
753    /// collected The consumer's handling of this event is a prerequisite for the completion of
754    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
755    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
756    /// will eventually be passed to the consumer.
757    ///
758    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
759    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
760    /// processed twice, or the consumer may get two events which share a subset of their data.
761    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
762    /// idempotent.
763    ///
764    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
765    /// until the next decide, at which point all persisted leaves from the failed GC run will be
766    /// included in the event along with subsequently decided leaves.
767    ///
768    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
769    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
770    /// storage to long-term archival storage.
771    async fn append_decided_leaves(
772        &self,
773        decided_view: ViewNumber,
774        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
775        consumer: &(impl EventConsumer + 'static),
776    ) -> anyhow::Result<()>;
777
778    async fn load_anchor_leaf(
779        &self,
780    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
781    async fn append_vid(
782        &self,
783        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
784    ) -> anyhow::Result<()>;
785    // TODO: merge these two `append_vid`s
786    async fn append_vid2(
787        &self,
788        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
789    ) -> anyhow::Result<()>;
790    async fn append_da(
791        &self,
792        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
793        vid_commit: VidCommitment,
794    ) -> anyhow::Result<()>;
795    async fn record_action(
796        &self,
797        view: ViewNumber,
798        epoch: Option<EpochNumber>,
799        action: HotShotAction,
800    ) -> anyhow::Result<()>;
801
802    async fn append_quorum_proposal2(
803        &self,
804        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
805    ) -> anyhow::Result<()>;
806    async fn store_upgrade_certificate(
807        &self,
808        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
809    ) -> anyhow::Result<()>;
810    async fn migrate_consensus(&self) -> anyhow::Result<()> {
811        tracing::warn!("migrating consensus data...");
812
813        self.migrate_anchor_leaf().await?;
814        self.migrate_da_proposals().await?;
815        self.migrate_vid_shares().await?;
816        self.migrate_quorum_proposals().await?;
817        self.migrate_quorum_certificates().await?;
818
819        tracing::warn!("consensus storage has been migrated to new types");
820
821        Ok(())
822    }
823
824    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
825    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
826    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
827    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
828    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
829
830    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
831        match self.load_anchor_leaf().await? {
832            Some((leaf, _)) => Ok(leaf.view_number()),
833            None => Ok(ViewNumber::genesis()),
834        }
835    }
836
837    async fn store_next_epoch_quorum_certificate(
838        &self,
839        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
840    ) -> anyhow::Result<()>;
841
842    async fn load_next_epoch_quorum_certificate(
843        &self,
844    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
845
846    async fn append_da2(
847        &self,
848        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
849        vid_commit: VidCommitment,
850    ) -> anyhow::Result<()>;
851
852    async fn append_proposal2(
853        &self,
854        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
855    ) -> anyhow::Result<()> {
856        self.append_quorum_proposal2(proposal).await
857    }
858
859    async fn store_drb_result(
860        &self,
861        epoch: <SeqTypes as NodeType>::Epoch,
862        drb_result: DrbResult,
863    ) -> anyhow::Result<()>;
864    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
865    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
866    async fn store_epoch_root(
867        &self,
868        epoch: <SeqTypes as NodeType>::Epoch,
869        block_header: <SeqTypes as NodeType>::BlockHeader,
870    ) -> anyhow::Result<()>;
871    async fn add_state_cert(
872        &self,
873        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
874    ) -> anyhow::Result<()>;
875
876    fn enable_metrics(&mut self, metrics: &dyn Metrics);
877}
878
879#[async_trait]
880pub trait EventConsumer: Debug + Send + Sync {
881    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
882}
883
884#[async_trait]
885impl<T> EventConsumer for Box<T>
886where
887    T: EventConsumer + ?Sized,
888{
889    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
890        (**self).handle_event(event).await
891    }
892}
893
894#[derive(Clone, Copy, Debug)]
895pub struct NullEventConsumer;
896
897#[async_trait]
898impl EventConsumer for NullEventConsumer {
899    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
900        Ok(())
901    }
902}
903
904#[async_trait]
905impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
906    async fn append_vid(
907        &self,
908        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
909    ) -> anyhow::Result<()> {
910        (**self).append_vid(proposal).await
911    }
912
913    async fn append_vid2(
914        &self,
915        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
916    ) -> anyhow::Result<()> {
917        (**self).append_vid2(proposal).await
918    }
919
920    async fn append_da(
921        &self,
922        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
923        vid_commit: VidCommitment,
924    ) -> anyhow::Result<()> {
925        (**self).append_da(proposal, vid_commit).await
926    }
927
928    async fn append_da2(
929        &self,
930        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
931        vid_commit: VidCommitment,
932    ) -> anyhow::Result<()> {
933        (**self).append_da2(proposal, vid_commit).await
934    }
935
936    async fn record_action(
937        &self,
938        view: ViewNumber,
939        epoch: Option<EpochNumber>,
940        action: HotShotAction,
941    ) -> anyhow::Result<()> {
942        (**self).record_action(view, epoch, action).await
943    }
944
945    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
946        Ok(())
947    }
948
949    async fn append_proposal(
950        &self,
951        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
952    ) -> anyhow::Result<()> {
953        (**self)
954            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
955            .await
956    }
957
958    async fn append_proposal2(
959        &self,
960        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
961    ) -> anyhow::Result<()> {
962        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
963            convert_proposal(proposal.clone());
964        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
965    }
966
967    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
968        Ok(())
969    }
970
971    async fn update_decided_upgrade_certificate(
972        &self,
973        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
974    ) -> anyhow::Result<()> {
975        (**self)
976            .store_upgrade_certificate(decided_upgrade_certificate)
977            .await
978    }
979
980    async fn store_drb_result(
981        &self,
982        epoch: <SeqTypes as NodeType>::Epoch,
983        drb_result: DrbResult,
984    ) -> anyhow::Result<()> {
985        (**self).store_drb_result(epoch, drb_result).await
986    }
987
988    async fn store_epoch_root(
989        &self,
990        epoch: <SeqTypes as NodeType>::Epoch,
991        block_header: <SeqTypes as NodeType>::BlockHeader,
992    ) -> anyhow::Result<()> {
993        (**self).store_epoch_root(epoch, block_header).await
994    }
995
996    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
997        (**self).store_drb_input(drb_input).await
998    }
999
1000    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1001        (**self).load_drb_input(epoch).await
1002    }
1003
1004    async fn update_state_cert(
1005        &self,
1006        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1007    ) -> anyhow::Result<()> {
1008        (**self).add_state_cert(state_cert).await
1009    }
1010}
1011
1012/// Data that can be deserialized from a subslice of namespace payload bytes.
1013///
1014/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
1015/// namespace payload bytes to read.
1016pub trait FromNsPayloadBytes<'a> {
1017    /// Deserialize `Self` from namespace payload bytes.
1018    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1019}
1020
1021/// Specifies a subslice of namespace payload bytes to read.
1022///
1023/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
1024/// deserialized from that subslice of bytes.
1025pub trait NsPayloadBytesRange<'a> {
1026    type Output: FromNsPayloadBytes<'a>;
1027
1028    /// Range relative to this ns payload
1029    fn ns_payload_range(&self) -> Range<usize>;
1030}
1031
1032/// Types which can be deserialized from either integers or strings.
1033///
1034/// Some types can be represented as an integer or a string in human-readable formats like JSON or
1035/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
1036/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
1037/// to derive this user-friendly serialization.
1038///
1039/// These types are assumed to have an efficient representation as an integral type in Rust --
1040/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
1041/// encoding. With human readable encodings, serialization is always to a string.
1042pub trait FromStringOrInteger: Sized {
1043    type Binary: Serialize + DeserializeOwned;
1044    type Integer: Serialize + DeserializeOwned;
1045
1046    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1047    fn from_string(s: String) -> anyhow::Result<Self>;
1048    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1049
1050    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1051    fn to_string(&self) -> anyhow::Result<String>;
1052}