espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17    },
18    drb::{DrbInput, DrbResult},
19    event::{HotShotAction, LeafInfo},
20    message::{convert_proposal, Proposal},
21    simple_certificate::{
22        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
23        QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::HSStakeTable,
26    traits::{
27        metrics::Metrics,
28        node_implementation::{ConsensusTime, NodeType, Versions},
29        storage::Storage,
30        ValidatedState as HotShotState,
31    },
32    utils::genesis_epoch_from_version,
33    vote::HasViewNumber,
34};
35use indexmap::IndexMap;
36use serde::{de::DeserializeOwned, Serialize};
37
38use super::{
39    impls::NodeState,
40    utils::BackoffParams,
41    v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44    v0::impls::{StakeTableHash, ValidatedState},
45    v0_3::{
46        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
47        Validator,
48    },
49    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
50    BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
51    PubKey, SeqTypes, ValidatorMap,
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56    /// Fetch the leaf at the given height without retrying on transient errors.
57    async fn try_fetch_leaf(
58        &self,
59        retry: usize,
60        height: u64,
61        stake_table: HSStakeTable<SeqTypes>,
62        success_threshold: U256,
63    ) -> anyhow::Result<Leaf2>;
64
65    /// Fetch the leaf at the given height, retrying on transient errors.
66    async fn fetch_leaf(
67        &self,
68        height: u64,
69        stake_table: HSStakeTable<SeqTypes>,
70        success_threshold: U256,
71    ) -> anyhow::Result<Leaf2> {
72        self.backoff()
73            .retry(self, |provider, retry| {
74                let stake_table_clone = stake_table.clone();
75                async move {
76                    provider
77                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78                        .await
79                }
80                .boxed()
81            })
82            .await
83    }
84
85    /// Fetch the given list of accounts without retrying on transient errors.
86    async fn try_fetch_accounts(
87        &self,
88        retry: usize,
89        instance: &NodeState,
90        height: u64,
91        view: ViewNumber,
92        fee_merkle_tree_root: FeeMerkleCommitment,
93        accounts: &[FeeAccount],
94    ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96    /// Fetch the given list of accounts, retrying on transient errors.
97    async fn fetch_accounts(
98        &self,
99        instance: &NodeState,
100        height: u64,
101        view: ViewNumber,
102        fee_merkle_tree_root: FeeMerkleCommitment,
103        accounts: Vec<FeeAccount>,
104    ) -> anyhow::Result<Vec<FeeAccountProof>> {
105        self.backoff()
106            .retry(self, |provider, retry| {
107                let accounts = &accounts;
108                async move {
109                    provider
110                        .try_fetch_accounts(
111                            retry,
112                            instance,
113                            height,
114                            view,
115                            fee_merkle_tree_root,
116                            accounts,
117                        )
118                        .await
119                        .map_err(|err| {
120                            err.context(format!(
121                                "fetching accounts {accounts:?}, height {height}, view {view}"
122                            ))
123                        })
124                }
125                .boxed()
126            })
127            .await
128    }
129
130    /// Fetch and remember the blocks frontier without retrying on transient errors.
131    async fn try_remember_blocks_merkle_tree(
132        &self,
133        retry: usize,
134        instance: &NodeState,
135        height: u64,
136        view: ViewNumber,
137        mt: &mut BlockMerkleTree,
138    ) -> anyhow::Result<()>;
139
140    /// Fetch and remember the blocks frontier, retrying on transient errors.
141    async fn remember_blocks_merkle_tree(
142        &self,
143        instance: &NodeState,
144        height: u64,
145        view: ViewNumber,
146        mt: &mut BlockMerkleTree,
147    ) -> anyhow::Result<()> {
148        self.backoff()
149            .retry(mt, |mt, retry| {
150                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152                    .boxed()
153            })
154            .await
155    }
156
157    /// Fetch the chain config without retrying on transient errors.
158    async fn try_fetch_chain_config(
159        &self,
160        retry: usize,
161        commitment: Commitment<ChainConfig>,
162    ) -> anyhow::Result<ChainConfig>;
163
164    /// Fetch the chain config, retrying on transient errors.
165    async fn fetch_chain_config(
166        &self,
167        commitment: Commitment<ChainConfig>,
168    ) -> anyhow::Result<ChainConfig> {
169        self.backoff()
170            .retry(self, |provider, retry| {
171                provider
172                    .try_fetch_chain_config(retry, commitment)
173                    .map_err(|err| err.context("fetching chain config"))
174                    .boxed()
175            })
176            .await
177    }
178
179    /// Fetch the given list of reward accounts without retrying on transient errors.
180    async fn try_fetch_reward_accounts_v2(
181        &self,
182        retry: usize,
183        instance: &NodeState,
184        height: u64,
185        view: ViewNumber,
186        reward_merkle_tree_root: RewardMerkleCommitmentV2,
187        accounts: &[RewardAccountV2],
188    ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
189
190    /// Fetch the given list of reward accounts, retrying on transient errors.
191    async fn fetch_reward_accounts_v2(
192        &self,
193        instance: &NodeState,
194        height: u64,
195        view: ViewNumber,
196        reward_merkle_tree_root: RewardMerkleCommitmentV2,
197        accounts: Vec<RewardAccountV2>,
198    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
199        self.backoff()
200            .retry(self, |provider, retry| {
201                let accounts = &accounts;
202                async move {
203                    provider
204                        .try_fetch_reward_accounts_v2(
205                            retry,
206                            instance,
207                            height,
208                            view,
209                            reward_merkle_tree_root,
210                            accounts,
211                        )
212                        .await
213                        .map_err(|err| {
214                            err.context(format!(
215                                "fetching reward accounts {accounts:?}, height {height}, view \
216                                 {view}"
217                            ))
218                        })
219                }
220                .boxed()
221            })
222            .await
223    }
224
225    /// Fetch the given list of reward accounts without retrying on transient errors.
226    async fn try_fetch_reward_accounts_v1(
227        &self,
228        retry: usize,
229        instance: &NodeState,
230        height: u64,
231        view: ViewNumber,
232        reward_merkle_tree_root: RewardMerkleCommitmentV1,
233        accounts: &[RewardAccountV1],
234    ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
235
236    /// Fetch the given list of reward accounts, retrying on transient errors.
237    async fn fetch_reward_accounts_v1(
238        &self,
239        instance: &NodeState,
240        height: u64,
241        view: ViewNumber,
242        reward_merkle_tree_root: RewardMerkleCommitmentV1,
243        accounts: Vec<RewardAccountV1>,
244    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
245        self.backoff()
246            .retry(self, |provider, retry| {
247                let accounts = &accounts;
248                async move {
249                    provider
250                        .try_fetch_reward_accounts_v1(
251                            retry,
252                            instance,
253                            height,
254                            view,
255                            reward_merkle_tree_root,
256                            accounts,
257                        )
258                        .await
259                        .map_err(|err| {
260                            err.context(format!(
261                                "fetching v1 reward accounts {accounts:?}, height {height}, view \
262                                 {view}"
263                            ))
264                        })
265                }
266                .boxed()
267            })
268            .await
269    }
270
271    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
272    fn is_local(&self) -> bool;
273
274    /// Returns the backoff parameters for the catchup provider.
275    fn backoff(&self) -> &BackoffParams;
276
277    /// Returns the name of the catchup provider.
278    fn name(&self) -> String;
279}
280
281#[async_trait]
282impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
283    async fn try_fetch_leaf(
284        &self,
285        retry: usize,
286        height: u64,
287        stake_table: HSStakeTable<SeqTypes>,
288        success_threshold: U256,
289    ) -> anyhow::Result<Leaf2> {
290        (**self)
291            .try_fetch_leaf(retry, height, stake_table, success_threshold)
292            .await
293    }
294
295    async fn fetch_leaf(
296        &self,
297        height: u64,
298        stake_table: HSStakeTable<SeqTypes>,
299        success_threshold: U256,
300    ) -> anyhow::Result<Leaf2> {
301        (**self)
302            .fetch_leaf(height, stake_table, success_threshold)
303            .await
304    }
305    async fn try_fetch_accounts(
306        &self,
307        retry: usize,
308        instance: &NodeState,
309        height: u64,
310        view: ViewNumber,
311        fee_merkle_tree_root: FeeMerkleCommitment,
312        accounts: &[FeeAccount],
313    ) -> anyhow::Result<Vec<FeeAccountProof>> {
314        (**self)
315            .try_fetch_accounts(
316                retry,
317                instance,
318                height,
319                view,
320                fee_merkle_tree_root,
321                accounts,
322            )
323            .await
324    }
325
326    async fn fetch_accounts(
327        &self,
328        instance: &NodeState,
329        height: u64,
330        view: ViewNumber,
331        fee_merkle_tree_root: FeeMerkleCommitment,
332        accounts: Vec<FeeAccount>,
333    ) -> anyhow::Result<Vec<FeeAccountProof>> {
334        (**self)
335            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
336            .await
337    }
338
339    async fn try_remember_blocks_merkle_tree(
340        &self,
341        retry: usize,
342        instance: &NodeState,
343        height: u64,
344        view: ViewNumber,
345        mt: &mut BlockMerkleTree,
346    ) -> anyhow::Result<()> {
347        (**self)
348            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
349            .await
350    }
351
352    async fn remember_blocks_merkle_tree(
353        &self,
354        instance: &NodeState,
355        height: u64,
356        view: ViewNumber,
357        mt: &mut BlockMerkleTree,
358    ) -> anyhow::Result<()> {
359        (**self)
360            .remember_blocks_merkle_tree(instance, height, view, mt)
361            .await
362    }
363
364    async fn try_fetch_chain_config(
365        &self,
366        retry: usize,
367        commitment: Commitment<ChainConfig>,
368    ) -> anyhow::Result<ChainConfig> {
369        (**self).try_fetch_chain_config(retry, commitment).await
370    }
371
372    async fn fetch_chain_config(
373        &self,
374        commitment: Commitment<ChainConfig>,
375    ) -> anyhow::Result<ChainConfig> {
376        (**self).fetch_chain_config(commitment).await
377    }
378
379    async fn try_fetch_reward_accounts_v2(
380        &self,
381        retry: usize,
382        instance: &NodeState,
383        height: u64,
384        view: ViewNumber,
385        reward_merkle_tree_root: RewardMerkleCommitmentV2,
386        accounts: &[RewardAccountV2],
387    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
388        (**self)
389            .try_fetch_reward_accounts_v2(
390                retry,
391                instance,
392                height,
393                view,
394                reward_merkle_tree_root,
395                accounts,
396            )
397            .await
398    }
399
400    async fn fetch_reward_accounts_v2(
401        &self,
402        instance: &NodeState,
403        height: u64,
404        view: ViewNumber,
405        reward_merkle_tree_root: RewardMerkleCommitmentV2,
406        accounts: Vec<RewardAccountV2>,
407    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
408        (**self)
409            .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
410            .await
411    }
412
413    async fn try_fetch_reward_accounts_v1(
414        &self,
415        retry: usize,
416        instance: &NodeState,
417        height: u64,
418        view: ViewNumber,
419        reward_merkle_tree_root: RewardMerkleCommitmentV1,
420        accounts: &[RewardAccountV1],
421    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
422        (**self)
423            .try_fetch_reward_accounts_v1(
424                retry,
425                instance,
426                height,
427                view,
428                reward_merkle_tree_root,
429                accounts,
430            )
431            .await
432    }
433
434    async fn fetch_reward_accounts_v1(
435        &self,
436        instance: &NodeState,
437        height: u64,
438        view: ViewNumber,
439        reward_merkle_tree_root: RewardMerkleCommitmentV1,
440        accounts: Vec<RewardAccountV1>,
441    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
442        (**self)
443            .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
444            .await
445    }
446
447    fn backoff(&self) -> &BackoffParams {
448        (**self).backoff()
449    }
450
451    fn name(&self) -> String {
452        (**self).name()
453    }
454
455    fn is_local(&self) -> bool {
456        (**self).is_local()
457    }
458}
459
460#[async_trait]
461pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
462    type Persistence: SequencerPersistence + MembershipPersistence;
463
464    fn set_view_retention(&mut self, view_retention: u64);
465    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
466    async fn reset(self) -> anyhow::Result<()>;
467}
468
469/// Determine the read state based on the queried block range.
470// - If the persistence returned events up to the requested block, the read is complete.
471/// - Otherwise, indicate that the read is up to the last processed block.
472pub enum EventsPersistenceRead {
473    Complete,
474    UntilL1Block(u64),
475}
476
477#[async_trait]
478/// Trait used by `Memberships` implementations to interact with persistence layer.
479pub trait MembershipPersistence: Send + Sync + 'static {
480    /// Load stake table for epoch from storage
481    async fn load_stake(
482        &self,
483        epoch: EpochNumber,
484    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
485
486    /// Load stake tables for storage for latest `n` known epochs
487    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
488
489    /// Store stake table at `epoch` in the persistence layer
490    async fn store_stake(
491        &self,
492        epoch: EpochNumber,
493        stake: ValidatorMap,
494        block_reward: Option<RewardAmount>,
495        stake_table_hash: Option<StakeTableHash>,
496    ) -> anyhow::Result<()>;
497
498    async fn store_events(
499        &self,
500        l1_finalized: u64,
501        events: Vec<(EventKey, StakeTableEvent)>,
502    ) -> anyhow::Result<()>;
503    async fn load_events(
504        &self,
505        l1_finalized: u64,
506    ) -> anyhow::Result<(
507        Option<EventsPersistenceRead>,
508        Vec<(EventKey, StakeTableEvent)>,
509    )>;
510
511    async fn store_all_validators(
512        &self,
513        epoch: EpochNumber,
514        all_validators: IndexMap<Address, Validator<PubKey>>,
515    ) -> anyhow::Result<()>;
516
517    async fn load_all_validators(
518        &self,
519        epoch: EpochNumber,
520        offset: u64,
521        limit: u64,
522    ) -> anyhow::Result<Vec<Validator<PubKey>>>;
523}
524
525#[async_trait]
526pub trait SequencerPersistence:
527    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
528{
529    /// Use this storage as a state catchup backend, if supported.
530    fn into_catchup_provider(
531        self,
532        _backoff: BackoffParams,
533    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
534        bail!("state catchup is not implemented for this persistence type");
535    }
536
537    /// Load the orchestrator config from storage.
538    ///
539    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
540    /// `Err` if it could not be determined whether a config exists or not.
541    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
542
543    /// Save the orchestrator config to storage.
544    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
545
546    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
547    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
548
549    /// Load the view to restart from.
550    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
551
552    /// Load the proposals saved by consensus
553    async fn load_quorum_proposals(
554        &self,
555    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
556
557    async fn load_quorum_proposal(
558        &self,
559        view: ViewNumber,
560    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
561
562    async fn load_vid_share(
563        &self,
564        view: ViewNumber,
565    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
566    async fn load_da_proposal(
567        &self,
568        view: ViewNumber,
569    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
570    async fn load_upgrade_certificate(
571        &self,
572    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
573    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
574    async fn load_state_cert(
575        &self,
576    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
577
578    /// Load the latest known consensus state.
579    ///
580    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
581    /// if there is no saved state). Also returns the anchor view number, which can be used as a
582    /// reference point to process any events which were not processed before a previous shutdown,
583    /// if applicable,.
584    async fn load_consensus_state<V: Versions>(
585        &self,
586        state: NodeState,
587    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
588        let genesis_validated_state = ValidatedState::genesis(&state).0;
589        let highest_voted_view = match self
590            .load_latest_acted_view()
591            .await
592            .context("loading last voted view")?
593        {
594            Some(view) => {
595                tracing::info!(?view, "starting with last actioned view");
596                view
597            },
598            None => {
599                tracing::info!("no saved view, starting from genesis");
600                ViewNumber::genesis()
601            },
602        };
603
604        let restart_view = match self
605            .load_restart_view()
606            .await
607            .context("loading restart view")?
608        {
609            Some(view) => {
610                tracing::info!(?view, "starting from saved view");
611                view
612            },
613            None => {
614                tracing::info!("no saved view, starting from genesis");
615                ViewNumber::genesis()
616            },
617        };
618        let next_epoch_high_qc = self
619            .load_next_epoch_quorum_certificate()
620            .await
621            .context("loading next epoch qc")?;
622        let (leaf, mut high_qc, anchor_view) = match self
623            .load_anchor_leaf()
624            .await
625            .context("loading anchor leaf")?
626        {
627            Some((leaf, high_qc)) => {
628                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
629                ensure!(
630                    leaf.view_number() == high_qc.view_number,
631                    format!(
632                        "loaded anchor leaf from view {}, but high QC is from view {}",
633                        leaf.view_number(),
634                        high_qc.view_number
635                    )
636                );
637
638                let anchor_view = leaf.view_number();
639                (leaf, high_qc, Some(anchor_view))
640            },
641            None => {
642                tracing::info!("no saved leaf, starting from genesis leaf");
643                (
644                    hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
645                        .await,
646                    QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
647                    None,
648                )
649            },
650        };
651
652        if let Some((extended_high_qc, _)) = self.load_eqc().await {
653            if extended_high_qc.view_number() > high_qc.view_number() {
654                high_qc = extended_high_qc
655            }
656        }
657
658        let validated_state = if leaf.block_header().height() == 0 {
659            // If we are starting from genesis, we can provide the full state.
660            genesis_validated_state
661        } else {
662            // Otherwise, we will have to construct a sparse state and fetch missing data during
663            // catchup.
664            ValidatedState::from_header(leaf.block_header())
665        };
666
667        // If we are not starting from genesis, we start from the view following the maximum view
668        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
669        // starting in a view in which we had already voted before the restart, and prevents
670        // unnecessary catchup from starting in a view earlier than the anchor leaf.
671        let restart_view = max(restart_view, leaf.view_number());
672        // TODO:
673        let epoch = genesis_epoch_from_version::<V, SeqTypes>();
674
675        let config = self.load_config().await.context("loading config")?;
676        let epoch_height = config
677            .as_ref()
678            .map(|c| c.config.epoch_height)
679            .unwrap_or_default();
680        let epoch_start_block = config
681            .as_ref()
682            .map(|c| c.config.epoch_start_block)
683            .unwrap_or_default();
684
685        let saved_proposals = self
686            .load_quorum_proposals()
687            .await
688            .context("loading saved proposals")?;
689
690        let upgrade_certificate = self
691            .load_upgrade_certificate()
692            .await
693            .context("loading upgrade certificate")?;
694
695        let start_epoch_info = self
696            .load_start_epoch_info()
697            .await
698            .context("loading start epoch info")?;
699
700        let state_cert = self
701            .load_state_cert()
702            .await
703            .context("loading light client state update certificate")?;
704
705        tracing::warn!(
706            ?leaf,
707            ?restart_view,
708            ?epoch,
709            ?high_qc,
710            ?validated_state,
711            ?state_cert,
712            "loaded consensus state"
713        );
714
715        Ok((
716            HotShotInitializer {
717                instance_state: state,
718                epoch_height,
719                epoch_start_block,
720                anchor_leaf: leaf,
721                anchor_state: Arc::new(validated_state),
722                anchor_state_delta: None,
723                start_view: restart_view,
724                start_epoch: epoch,
725                last_actioned_view: highest_voted_view,
726                saved_proposals,
727                high_qc,
728                next_epoch_high_qc,
729                decided_upgrade_certificate: upgrade_certificate,
730                undecided_leaves: Default::default(),
731                undecided_state: Default::default(),
732                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
733                start_epoch_info,
734                state_cert,
735            },
736            anchor_view,
737        ))
738    }
739
740    /// Update storage based on an event from consensus.
741    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
742        if let EventType::Decide {
743            leaf_chain,
744            committing_qc,
745            deciding_qc,
746            ..
747        } = &event.event
748        {
749            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
750                // No new leaves.
751                return;
752            };
753
754            // Associate each decided leaf with a QC.
755            let chain = leaf_chain.iter().zip(
756                // The first (most recent) leaf corresponds to the QC triggering the decide event.
757                std::iter::once((**committing_qc).clone())
758                    // Moving backwards in the chain, each leaf corresponds with the subsequent
759                    // leaf's justify QC.
760                    .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
761            );
762
763            if let Err(err) = self
764                .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
765                .await
766            {
767                tracing::error!(
768                    "failed to save decided leaves, chain may not be up to date: {err:#}"
769                );
770                return;
771            }
772        }
773    }
774
775    /// Append decided leaves to persistent storage and emit a corresponding event.
776    ///
777    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
778    /// up to and including `view`. If available in persistent storage, full block payloads and VID
779    /// info will also be included for each leaf.
780    ///
781    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
782    /// collected The consumer's handling of this event is a prerequisite for the completion of
783    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
784    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
785    /// will eventually be passed to the consumer.
786    ///
787    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
788    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
789    /// processed twice, or the consumer may get two events which share a subset of their data.
790    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
791    /// idempotent.
792    ///
793    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
794    /// until the next decide, at which point all persisted leaves from the failed GC run will be
795    /// included in the event along with subsequently decided leaves.
796    ///
797    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
798    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
799    /// storage to long-term archival storage.
800    async fn append_decided_leaves(
801        &self,
802        decided_view: ViewNumber,
803        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
804        deciding_qc: Option<Arc<QuorumCertificate2<SeqTypes>>>,
805        consumer: &(impl EventConsumer + 'static),
806    ) -> anyhow::Result<()>;
807
808    async fn load_anchor_leaf(
809        &self,
810    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
811    async fn append_vid(
812        &self,
813        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
814    ) -> anyhow::Result<()>;
815    // TODO: merge these two `append_vid`s
816    async fn append_vid2(
817        &self,
818        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
819    ) -> anyhow::Result<()>;
820    async fn append_da(
821        &self,
822        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
823        vid_commit: VidCommitment,
824    ) -> anyhow::Result<()>;
825    async fn record_action(
826        &self,
827        view: ViewNumber,
828        epoch: Option<EpochNumber>,
829        action: HotShotAction,
830    ) -> anyhow::Result<()>;
831
832    async fn append_quorum_proposal2(
833        &self,
834        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
835    ) -> anyhow::Result<()>;
836
837    /// Update the current eQC in storage.
838    async fn store_eqc(
839        &self,
840        _high_qc: QuorumCertificate2<SeqTypes>,
841        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
842    ) -> anyhow::Result<()>;
843
844    /// Load the current eQC from storage.
845    async fn load_eqc(
846        &self,
847    ) -> Option<(
848        QuorumCertificate2<SeqTypes>,
849        NextEpochQuorumCertificate2<SeqTypes>,
850    )>;
851
852    async fn store_upgrade_certificate(
853        &self,
854        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
855    ) -> anyhow::Result<()>;
856
857    async fn migrate_stake_table_events(&self) -> anyhow::Result<()>;
858
859    async fn migrate_storage(&self) -> anyhow::Result<()> {
860        tracing::warn!("migrating consensus data...");
861
862        self.migrate_anchor_leaf().await?;
863        self.migrate_da_proposals().await?;
864        self.migrate_vid_shares().await?;
865        self.migrate_quorum_proposals().await?;
866        self.migrate_quorum_certificates().await?;
867
868        tracing::warn!("consensus storage has been migrated to new types");
869
870        tracing::warn!("migrating stake table events");
871        self.migrate_stake_table_events().await?;
872        tracing::warn!("stake table events have been migrated");
873
874        Ok(())
875    }
876
877    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
878    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
879    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
880    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
881    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
882
883    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
884        match self.load_anchor_leaf().await? {
885            Some((leaf, _)) => Ok(leaf.view_number()),
886            None => Ok(ViewNumber::genesis()),
887        }
888    }
889
890    async fn store_next_epoch_quorum_certificate(
891        &self,
892        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
893    ) -> anyhow::Result<()>;
894
895    async fn load_next_epoch_quorum_certificate(
896        &self,
897    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
898
899    async fn append_da2(
900        &self,
901        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
902        vid_commit: VidCommitment,
903    ) -> anyhow::Result<()>;
904
905    async fn append_proposal2(
906        &self,
907        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
908    ) -> anyhow::Result<()> {
909        self.append_quorum_proposal2(proposal).await
910    }
911
912    async fn store_drb_result(
913        &self,
914        epoch: <SeqTypes as NodeType>::Epoch,
915        drb_result: DrbResult,
916    ) -> anyhow::Result<()>;
917    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
918    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
919    async fn store_epoch_root(
920        &self,
921        epoch: <SeqTypes as NodeType>::Epoch,
922        block_header: <SeqTypes as NodeType>::BlockHeader,
923    ) -> anyhow::Result<()>;
924    async fn add_state_cert(
925        &self,
926        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
927    ) -> anyhow::Result<()>;
928
929    fn enable_metrics(&mut self, metrics: &dyn Metrics);
930}
931
932#[async_trait]
933pub trait EventConsumer: Debug + Send + Sync {
934    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
935}
936
937#[async_trait]
938impl<T> EventConsumer for Box<T>
939where
940    T: EventConsumer + ?Sized,
941{
942    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
943        (**self).handle_event(event).await
944    }
945}
946
947#[derive(Clone, Copy, Debug)]
948pub struct NullEventConsumer;
949
950#[async_trait]
951impl EventConsumer for NullEventConsumer {
952    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
953        Ok(())
954    }
955}
956
957#[async_trait]
958impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
959    async fn append_vid(
960        &self,
961        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
962    ) -> anyhow::Result<()> {
963        (**self).append_vid(proposal).await
964    }
965
966    async fn append_vid2(
967        &self,
968        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
969    ) -> anyhow::Result<()> {
970        (**self).append_vid2(proposal).await
971    }
972
973    async fn append_da(
974        &self,
975        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
976        vid_commit: VidCommitment,
977    ) -> anyhow::Result<()> {
978        (**self).append_da(proposal, vid_commit).await
979    }
980
981    async fn append_da2(
982        &self,
983        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
984        vid_commit: VidCommitment,
985    ) -> anyhow::Result<()> {
986        (**self).append_da2(proposal, vid_commit).await
987    }
988
989    async fn record_action(
990        &self,
991        view: ViewNumber,
992        epoch: Option<EpochNumber>,
993        action: HotShotAction,
994    ) -> anyhow::Result<()> {
995        (**self).record_action(view, epoch, action).await
996    }
997
998    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
999        Ok(())
1000    }
1001
1002    async fn append_proposal(
1003        &self,
1004        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1005    ) -> anyhow::Result<()> {
1006        (**self)
1007            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1008            .await
1009    }
1010
1011    async fn append_proposal2(
1012        &self,
1013        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1014    ) -> anyhow::Result<()> {
1015        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1016            convert_proposal(proposal.clone());
1017        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1018    }
1019
1020    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1021        Ok(())
1022    }
1023
1024    /// Update the current eQC in storage.
1025    async fn update_eqc(
1026        &self,
1027        high_qc: QuorumCertificate2<SeqTypes>,
1028        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1029    ) -> anyhow::Result<()> {
1030        if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1031            if high_qc.view_number() < existing_high_qc.view_number() {
1032                return Ok(());
1033            }
1034        }
1035
1036        (**self).store_eqc(high_qc, next_epoch_high_qc).await
1037    }
1038
1039    async fn update_next_epoch_high_qc2(
1040        &self,
1041        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1042    ) -> anyhow::Result<()> {
1043        Ok(())
1044    }
1045
1046    async fn update_decided_upgrade_certificate(
1047        &self,
1048        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1049    ) -> anyhow::Result<()> {
1050        (**self)
1051            .store_upgrade_certificate(decided_upgrade_certificate)
1052            .await
1053    }
1054
1055    async fn store_drb_result(
1056        &self,
1057        epoch: <SeqTypes as NodeType>::Epoch,
1058        drb_result: DrbResult,
1059    ) -> anyhow::Result<()> {
1060        (**self).store_drb_result(epoch, drb_result).await
1061    }
1062
1063    async fn store_epoch_root(
1064        &self,
1065        epoch: <SeqTypes as NodeType>::Epoch,
1066        block_header: <SeqTypes as NodeType>::BlockHeader,
1067    ) -> anyhow::Result<()> {
1068        (**self).store_epoch_root(epoch, block_header).await
1069    }
1070
1071    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1072        (**self).store_drb_input(drb_input).await
1073    }
1074
1075    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1076        (**self).load_drb_input(epoch).await
1077    }
1078
1079    async fn update_state_cert(
1080        &self,
1081        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1082    ) -> anyhow::Result<()> {
1083        (**self).add_state_cert(state_cert).await
1084    }
1085}
1086
1087/// Data that can be deserialized from a subslice of namespace payload bytes.
1088///
1089/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
1090/// namespace payload bytes to read.
1091pub trait FromNsPayloadBytes<'a> {
1092    /// Deserialize `Self` from namespace payload bytes.
1093    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1094}
1095
1096/// Specifies a subslice of namespace payload bytes to read.
1097///
1098/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
1099/// deserialized from that subslice of bytes.
1100pub trait NsPayloadBytesRange<'a> {
1101    type Output: FromNsPayloadBytes<'a>;
1102
1103    /// Range relative to this ns payload
1104    fn ns_payload_range(&self) -> Range<usize>;
1105}
1106
1107/// Types which can be deserialized from either integers or strings.
1108///
1109/// Some types can be represented as an integer or a string in human-readable formats like JSON or
1110/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
1111/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
1112/// to derive this user-friendly serialization.
1113///
1114/// These types are assumed to have an efficient representation as an integral type in Rust --
1115/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
1116/// encoding. With human readable encodings, serialization is always to a string.
1117pub trait FromStringOrInteger: Sized {
1118    type Binary: Serialize + DeserializeOwned;
1119    type Integer: Serialize + DeserializeOwned;
1120
1121    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1122    fn from_string(s: String) -> anyhow::Result<Self>;
1123    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1124
1125    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1126    fn to_string(&self) -> anyhow::Result<String>;
1127}