espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::U256;
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17    },
18    drb::{DrbInput, DrbResult},
19    event::{HotShotAction, LeafInfo},
20    message::{convert_proposal, Proposal},
21    simple_certificate::{
22        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
23        QuorumCertificate2, UpgradeCertificate,
24    },
25    stake_table::HSStakeTable,
26    traits::{
27        metrics::Metrics,
28        node_implementation::{ConsensusTime, NodeType, Versions},
29        storage::Storage,
30        ValidatedState as HotShotState,
31    },
32    utils::genesis_epoch_from_version,
33    vote::HasViewNumber,
34};
35use serde::{de::DeserializeOwned, Serialize};
36
37use super::{
38    impls::NodeState,
39    utils::BackoffParams,
40    v0_3::{EventKey, IndexedStake, StakeTableEvent},
41};
42use crate::{
43    v0::impls::{StakeTableHash, ValidatedState},
44    v0_3::{
45        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
46    },
47    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
48    BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
49    SeqTypes, ValidatorMap,
50};
51
52#[async_trait]
53pub trait StateCatchup: Send + Sync {
54    /// Fetch the leaf at the given height without retrying on transient errors.
55    async fn try_fetch_leaf(
56        &self,
57        retry: usize,
58        height: u64,
59        stake_table: HSStakeTable<SeqTypes>,
60        success_threshold: U256,
61    ) -> anyhow::Result<Leaf2>;
62
63    /// Fetch the leaf at the given height, retrying on transient errors.
64    async fn fetch_leaf(
65        &self,
66        height: u64,
67        stake_table: HSStakeTable<SeqTypes>,
68        success_threshold: U256,
69    ) -> anyhow::Result<Leaf2> {
70        self.backoff()
71            .retry(self, |provider, retry| {
72                let stake_table_clone = stake_table.clone();
73                async move {
74                    provider
75                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
76                        .await
77                }
78                .boxed()
79            })
80            .await
81    }
82
83    /// Fetch the given list of accounts without retrying on transient errors.
84    async fn try_fetch_accounts(
85        &self,
86        retry: usize,
87        instance: &NodeState,
88        height: u64,
89        view: ViewNumber,
90        fee_merkle_tree_root: FeeMerkleCommitment,
91        accounts: &[FeeAccount],
92    ) -> anyhow::Result<Vec<FeeAccountProof>>;
93
94    /// Fetch the given list of accounts, retrying on transient errors.
95    async fn fetch_accounts(
96        &self,
97        instance: &NodeState,
98        height: u64,
99        view: ViewNumber,
100        fee_merkle_tree_root: FeeMerkleCommitment,
101        accounts: Vec<FeeAccount>,
102    ) -> anyhow::Result<Vec<FeeAccountProof>> {
103        self.backoff()
104            .retry(self, |provider, retry| {
105                let accounts = &accounts;
106                async move {
107                    provider
108                        .try_fetch_accounts(
109                            retry,
110                            instance,
111                            height,
112                            view,
113                            fee_merkle_tree_root,
114                            accounts,
115                        )
116                        .await
117                        .map_err(|err| {
118                            err.context(format!(
119                                "fetching accounts {accounts:?}, height {height}, view {view}"
120                            ))
121                        })
122                }
123                .boxed()
124            })
125            .await
126    }
127
128    /// Fetch and remember the blocks frontier without retrying on transient errors.
129    async fn try_remember_blocks_merkle_tree(
130        &self,
131        retry: usize,
132        instance: &NodeState,
133        height: u64,
134        view: ViewNumber,
135        mt: &mut BlockMerkleTree,
136    ) -> anyhow::Result<()>;
137
138    /// Fetch and remember the blocks frontier, retrying on transient errors.
139    async fn remember_blocks_merkle_tree(
140        &self,
141        instance: &NodeState,
142        height: u64,
143        view: ViewNumber,
144        mt: &mut BlockMerkleTree,
145    ) -> anyhow::Result<()> {
146        self.backoff()
147            .retry(mt, |mt, retry| {
148                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
149                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
150                    .boxed()
151            })
152            .await
153    }
154
155    /// Fetch the chain config without retrying on transient errors.
156    async fn try_fetch_chain_config(
157        &self,
158        retry: usize,
159        commitment: Commitment<ChainConfig>,
160    ) -> anyhow::Result<ChainConfig>;
161
162    /// Fetch the chain config, retrying on transient errors.
163    async fn fetch_chain_config(
164        &self,
165        commitment: Commitment<ChainConfig>,
166    ) -> anyhow::Result<ChainConfig> {
167        self.backoff()
168            .retry(self, |provider, retry| {
169                provider
170                    .try_fetch_chain_config(retry, commitment)
171                    .map_err(|err| err.context("fetching chain config"))
172                    .boxed()
173            })
174            .await
175    }
176
177    /// Fetch the given list of reward accounts without retrying on transient errors.
178    async fn try_fetch_reward_accounts_v2(
179        &self,
180        retry: usize,
181        instance: &NodeState,
182        height: u64,
183        view: ViewNumber,
184        reward_merkle_tree_root: RewardMerkleCommitmentV2,
185        accounts: &[RewardAccountV2],
186    ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
187
188    /// Fetch the given list of reward accounts, retrying on transient errors.
189    async fn fetch_reward_accounts_v2(
190        &self,
191        instance: &NodeState,
192        height: u64,
193        view: ViewNumber,
194        reward_merkle_tree_root: RewardMerkleCommitmentV2,
195        accounts: Vec<RewardAccountV2>,
196    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
197        self.backoff()
198            .retry(self, |provider, retry| {
199                let accounts = &accounts;
200                async move {
201                    provider
202                        .try_fetch_reward_accounts_v2(
203                            retry,
204                            instance,
205                            height,
206                            view,
207                            reward_merkle_tree_root,
208                            accounts,
209                        )
210                        .await
211                        .map_err(|err| {
212                            err.context(format!(
213                                "fetching reward accounts {accounts:?}, height {height}, view \
214                                 {view}"
215                            ))
216                        })
217                }
218                .boxed()
219            })
220            .await
221    }
222
223    /// Fetch the given list of reward accounts without retrying on transient errors.
224    async fn try_fetch_reward_accounts_v1(
225        &self,
226        retry: usize,
227        instance: &NodeState,
228        height: u64,
229        view: ViewNumber,
230        reward_merkle_tree_root: RewardMerkleCommitmentV1,
231        accounts: &[RewardAccountV1],
232    ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
233
234    /// Fetch the given list of reward accounts, retrying on transient errors.
235    async fn fetch_reward_accounts_v1(
236        &self,
237        instance: &NodeState,
238        height: u64,
239        view: ViewNumber,
240        reward_merkle_tree_root: RewardMerkleCommitmentV1,
241        accounts: Vec<RewardAccountV1>,
242    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
243        self.backoff()
244            .retry(self, |provider, retry| {
245                let accounts = &accounts;
246                async move {
247                    provider
248                        .try_fetch_reward_accounts_v1(
249                            retry,
250                            instance,
251                            height,
252                            view,
253                            reward_merkle_tree_root,
254                            accounts,
255                        )
256                        .await
257                        .map_err(|err| {
258                            err.context(format!(
259                                "fetching v1 reward accounts {accounts:?}, height {height}, view \
260                                 {view}"
261                            ))
262                        })
263                }
264                .boxed()
265            })
266            .await
267    }
268
269    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
270    fn is_local(&self) -> bool;
271
272    /// Returns the backoff parameters for the catchup provider.
273    fn backoff(&self) -> &BackoffParams;
274
275    /// Returns the name of the catchup provider.
276    fn name(&self) -> String;
277}
278
279#[async_trait]
280impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
281    async fn try_fetch_leaf(
282        &self,
283        retry: usize,
284        height: u64,
285        stake_table: HSStakeTable<SeqTypes>,
286        success_threshold: U256,
287    ) -> anyhow::Result<Leaf2> {
288        (**self)
289            .try_fetch_leaf(retry, height, stake_table, success_threshold)
290            .await
291    }
292
293    async fn fetch_leaf(
294        &self,
295        height: u64,
296        stake_table: HSStakeTable<SeqTypes>,
297        success_threshold: U256,
298    ) -> anyhow::Result<Leaf2> {
299        (**self)
300            .fetch_leaf(height, stake_table, success_threshold)
301            .await
302    }
303    async fn try_fetch_accounts(
304        &self,
305        retry: usize,
306        instance: &NodeState,
307        height: u64,
308        view: ViewNumber,
309        fee_merkle_tree_root: FeeMerkleCommitment,
310        accounts: &[FeeAccount],
311    ) -> anyhow::Result<Vec<FeeAccountProof>> {
312        (**self)
313            .try_fetch_accounts(
314                retry,
315                instance,
316                height,
317                view,
318                fee_merkle_tree_root,
319                accounts,
320            )
321            .await
322    }
323
324    async fn fetch_accounts(
325        &self,
326        instance: &NodeState,
327        height: u64,
328        view: ViewNumber,
329        fee_merkle_tree_root: FeeMerkleCommitment,
330        accounts: Vec<FeeAccount>,
331    ) -> anyhow::Result<Vec<FeeAccountProof>> {
332        (**self)
333            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
334            .await
335    }
336
337    async fn try_remember_blocks_merkle_tree(
338        &self,
339        retry: usize,
340        instance: &NodeState,
341        height: u64,
342        view: ViewNumber,
343        mt: &mut BlockMerkleTree,
344    ) -> anyhow::Result<()> {
345        (**self)
346            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
347            .await
348    }
349
350    async fn remember_blocks_merkle_tree(
351        &self,
352        instance: &NodeState,
353        height: u64,
354        view: ViewNumber,
355        mt: &mut BlockMerkleTree,
356    ) -> anyhow::Result<()> {
357        (**self)
358            .remember_blocks_merkle_tree(instance, height, view, mt)
359            .await
360    }
361
362    async fn try_fetch_chain_config(
363        &self,
364        retry: usize,
365        commitment: Commitment<ChainConfig>,
366    ) -> anyhow::Result<ChainConfig> {
367        (**self).try_fetch_chain_config(retry, commitment).await
368    }
369
370    async fn fetch_chain_config(
371        &self,
372        commitment: Commitment<ChainConfig>,
373    ) -> anyhow::Result<ChainConfig> {
374        (**self).fetch_chain_config(commitment).await
375    }
376
377    async fn try_fetch_reward_accounts_v2(
378        &self,
379        retry: usize,
380        instance: &NodeState,
381        height: u64,
382        view: ViewNumber,
383        reward_merkle_tree_root: RewardMerkleCommitmentV2,
384        accounts: &[RewardAccountV2],
385    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
386        (**self)
387            .try_fetch_reward_accounts_v2(
388                retry,
389                instance,
390                height,
391                view,
392                reward_merkle_tree_root,
393                accounts,
394            )
395            .await
396    }
397
398    async fn fetch_reward_accounts_v2(
399        &self,
400        instance: &NodeState,
401        height: u64,
402        view: ViewNumber,
403        reward_merkle_tree_root: RewardMerkleCommitmentV2,
404        accounts: Vec<RewardAccountV2>,
405    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
406        (**self)
407            .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
408            .await
409    }
410
411    async fn try_fetch_reward_accounts_v1(
412        &self,
413        retry: usize,
414        instance: &NodeState,
415        height: u64,
416        view: ViewNumber,
417        reward_merkle_tree_root: RewardMerkleCommitmentV1,
418        accounts: &[RewardAccountV1],
419    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
420        (**self)
421            .try_fetch_reward_accounts_v1(
422                retry,
423                instance,
424                height,
425                view,
426                reward_merkle_tree_root,
427                accounts,
428            )
429            .await
430    }
431
432    async fn fetch_reward_accounts_v1(
433        &self,
434        instance: &NodeState,
435        height: u64,
436        view: ViewNumber,
437        reward_merkle_tree_root: RewardMerkleCommitmentV1,
438        accounts: Vec<RewardAccountV1>,
439    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
440        (**self)
441            .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
442            .await
443    }
444
445    fn backoff(&self) -> &BackoffParams {
446        (**self).backoff()
447    }
448
449    fn name(&self) -> String {
450        (**self).name()
451    }
452
453    fn is_local(&self) -> bool {
454        (**self).is_local()
455    }
456}
457
458#[async_trait]
459pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
460    type Persistence: SequencerPersistence + MembershipPersistence;
461
462    fn set_view_retention(&mut self, view_retention: u64);
463    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
464    async fn reset(self) -> anyhow::Result<()>;
465}
466
467/// Determine the read state based on the queried block range.
468// - If the persistence returned events up to the requested block, the read is complete.
469/// - Otherwise, indicate that the read is up to the last processed block.
470pub enum EventsPersistenceRead {
471    Complete,
472    UntilL1Block(u64),
473}
474
475#[async_trait]
476/// Trait used by `Memberships` implementations to interact with persistence layer.
477pub trait MembershipPersistence: Send + Sync + 'static {
478    /// Load stake table for epoch from storage
479    async fn load_stake(
480        &self,
481        epoch: EpochNumber,
482    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
483
484    /// Load stake tables for storage for latest `n` known epochs
485    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
486
487    /// Store stake table at `epoch` in the persistence layer
488    async fn store_stake(
489        &self,
490        epoch: EpochNumber,
491        stake: ValidatorMap,
492        block_reward: Option<RewardAmount>,
493        stake_table_hash: Option<StakeTableHash>,
494    ) -> anyhow::Result<()>;
495
496    async fn store_events(
497        &self,
498        l1_finalized: u64,
499        events: Vec<(EventKey, StakeTableEvent)>,
500    ) -> anyhow::Result<()>;
501    async fn load_events(
502        &self,
503        l1_finalized: u64,
504    ) -> anyhow::Result<(
505        Option<EventsPersistenceRead>,
506        Vec<(EventKey, StakeTableEvent)>,
507    )>;
508}
509
510#[async_trait]
511pub trait SequencerPersistence:
512    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage
513{
514    /// Use this storage as a state catchup backend, if supported.
515    fn into_catchup_provider(
516        self,
517        _backoff: BackoffParams,
518    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
519        bail!("state catchup is not implemented for this persistence type");
520    }
521
522    /// Load the orchestrator config from storage.
523    ///
524    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
525    /// `Err` if it could not be determined whether a config exists or not.
526    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
527
528    /// Save the orchestrator config to storage.
529    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
530
531    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
532    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
533
534    /// Load the view to restart from.
535    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
536
537    /// Load the proposals saved by consensus
538    async fn load_quorum_proposals(
539        &self,
540    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
541
542    async fn load_quorum_proposal(
543        &self,
544        view: ViewNumber,
545    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
546
547    async fn load_vid_share(
548        &self,
549        view: ViewNumber,
550    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
551    async fn load_da_proposal(
552        &self,
553        view: ViewNumber,
554    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
555    async fn load_upgrade_certificate(
556        &self,
557    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
558    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
559    async fn load_state_cert(
560        &self,
561    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
562
563    /// Load the latest known consensus state.
564    ///
565    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
566    /// if there is no saved state). Also returns the anchor view number, which can be used as a
567    /// reference point to process any events which were not processed before a previous shutdown,
568    /// if applicable,.
569    async fn load_consensus_state<V: Versions>(
570        &self,
571        state: NodeState,
572    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
573        let genesis_validated_state = ValidatedState::genesis(&state).0;
574        let highest_voted_view = match self
575            .load_latest_acted_view()
576            .await
577            .context("loading last voted view")?
578        {
579            Some(view) => {
580                tracing::info!(?view, "starting with last actioned view");
581                view
582            },
583            None => {
584                tracing::info!("no saved view, starting from genesis");
585                ViewNumber::genesis()
586            },
587        };
588
589        let restart_view = match self
590            .load_restart_view()
591            .await
592            .context("loading restart view")?
593        {
594            Some(view) => {
595                tracing::info!(?view, "starting from saved view");
596                view
597            },
598            None => {
599                tracing::info!("no saved view, starting from genesis");
600                ViewNumber::genesis()
601            },
602        };
603        let next_epoch_high_qc = self
604            .load_next_epoch_quorum_certificate()
605            .await
606            .context("loading next epoch qc")?;
607        let (leaf, mut high_qc, anchor_view) = match self
608            .load_anchor_leaf()
609            .await
610            .context("loading anchor leaf")?
611        {
612            Some((leaf, high_qc)) => {
613                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
614                ensure!(
615                    leaf.view_number() == high_qc.view_number,
616                    format!(
617                        "loaded anchor leaf from view {}, but high QC is from view {}",
618                        leaf.view_number(),
619                        high_qc.view_number
620                    )
621                );
622
623                let anchor_view = leaf.view_number();
624                (leaf, high_qc, Some(anchor_view))
625            },
626            None => {
627                tracing::info!("no saved leaf, starting from genesis leaf");
628                (
629                    hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
630                        .await,
631                    QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
632                    None,
633                )
634            },
635        };
636
637        if let Some((extended_high_qc, _)) = self.load_eqc().await {
638            if extended_high_qc.view_number() > high_qc.view_number() {
639                high_qc = extended_high_qc
640            }
641        }
642
643        let validated_state = if leaf.block_header().height() == 0 {
644            // If we are starting from genesis, we can provide the full state.
645            genesis_validated_state
646        } else {
647            // Otherwise, we will have to construct a sparse state and fetch missing data during
648            // catchup.
649            ValidatedState::from_header(leaf.block_header())
650        };
651
652        // If we are not starting from genesis, we start from the view following the maximum view
653        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
654        // starting in a view in which we had already voted before the restart, and prevents
655        // unnecessary catchup from starting in a view earlier than the anchor leaf.
656        let restart_view = max(restart_view, leaf.view_number());
657        // TODO:
658        let epoch = genesis_epoch_from_version::<V, SeqTypes>();
659
660        let config = self.load_config().await.context("loading config")?;
661        let epoch_height = config
662            .as_ref()
663            .map(|c| c.config.epoch_height)
664            .unwrap_or_default();
665        let epoch_start_block = config
666            .as_ref()
667            .map(|c| c.config.epoch_start_block)
668            .unwrap_or_default();
669
670        let saved_proposals = self
671            .load_quorum_proposals()
672            .await
673            .context("loading saved proposals")?;
674
675        let upgrade_certificate = self
676            .load_upgrade_certificate()
677            .await
678            .context("loading upgrade certificate")?;
679
680        let start_epoch_info = self
681            .load_start_epoch_info()
682            .await
683            .context("loading start epoch info")?;
684
685        let state_cert = self
686            .load_state_cert()
687            .await
688            .context("loading light client state update certificate")?;
689
690        tracing::warn!(
691            ?leaf,
692            ?restart_view,
693            ?epoch,
694            ?high_qc,
695            ?validated_state,
696            ?state_cert,
697            "loaded consensus state"
698        );
699
700        Ok((
701            HotShotInitializer {
702                instance_state: state,
703                epoch_height,
704                epoch_start_block,
705                anchor_leaf: leaf,
706                anchor_state: Arc::new(validated_state),
707                anchor_state_delta: None,
708                start_view: restart_view,
709                start_epoch: epoch,
710                last_actioned_view: highest_voted_view,
711                saved_proposals,
712                high_qc,
713                next_epoch_high_qc,
714                decided_upgrade_certificate: upgrade_certificate,
715                undecided_leaves: Default::default(),
716                undecided_state: Default::default(),
717                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
718                start_epoch_info,
719                state_cert,
720            },
721            anchor_view,
722        ))
723    }
724
725    /// Update storage based on an event from consensus.
726    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
727        if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
728            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
729                // No new leaves.
730                return;
731            };
732
733            // Associate each decided leaf with a QC.
734            let chain = leaf_chain.iter().zip(
735                // The first (most recent) leaf corresponds to the QC triggering the decide event.
736                std::iter::once((**qc).clone())
737                    // Moving backwards in the chain, each leaf corresponds with the subsequent
738                    // leaf's justify QC.
739                    .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
740            );
741
742            if let Err(err) = self
743                .append_decided_leaves(leaf.view_number(), chain, consumer)
744                .await
745            {
746                tracing::error!(
747                    "failed to save decided leaves, chain may not be up to date: {err:#}"
748                );
749                return;
750            }
751        }
752    }
753
754    /// Append decided leaves to persistent storage and emit a corresponding event.
755    ///
756    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
757    /// up to and including `view`. If available in persistent storage, full block payloads and VID
758    /// info will also be included for each leaf.
759    ///
760    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
761    /// collected The consumer's handling of this event is a prerequisite for the completion of
762    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
763    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
764    /// will eventually be passed to the consumer.
765    ///
766    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
767    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
768    /// processed twice, or the consumer may get two events which share a subset of their data.
769    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
770    /// idempotent.
771    ///
772    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
773    /// until the next decide, at which point all persisted leaves from the failed GC run will be
774    /// included in the event along with subsequently decided leaves.
775    ///
776    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
777    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
778    /// storage to long-term archival storage.
779    async fn append_decided_leaves(
780        &self,
781        decided_view: ViewNumber,
782        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
783        consumer: &(impl EventConsumer + 'static),
784    ) -> anyhow::Result<()>;
785
786    async fn load_anchor_leaf(
787        &self,
788    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
789    async fn append_vid(
790        &self,
791        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
792    ) -> anyhow::Result<()>;
793    // TODO: merge these two `append_vid`s
794    async fn append_vid2(
795        &self,
796        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
797    ) -> anyhow::Result<()>;
798    async fn append_da(
799        &self,
800        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
801        vid_commit: VidCommitment,
802    ) -> anyhow::Result<()>;
803    async fn record_action(
804        &self,
805        view: ViewNumber,
806        epoch: Option<EpochNumber>,
807        action: HotShotAction,
808    ) -> anyhow::Result<()>;
809
810    async fn append_quorum_proposal2(
811        &self,
812        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
813    ) -> anyhow::Result<()>;
814
815    /// Update the current eQC in storage.
816    async fn store_eqc(
817        &self,
818        _high_qc: QuorumCertificate2<SeqTypes>,
819        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
820    ) -> anyhow::Result<()>;
821
822    /// Load the current eQC from storage.
823    async fn load_eqc(
824        &self,
825    ) -> Option<(
826        QuorumCertificate2<SeqTypes>,
827        NextEpochQuorumCertificate2<SeqTypes>,
828    )>;
829
830    async fn store_upgrade_certificate(
831        &self,
832        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
833    ) -> anyhow::Result<()>;
834    async fn migrate_consensus(&self) -> anyhow::Result<()> {
835        tracing::warn!("migrating consensus data...");
836
837        self.migrate_anchor_leaf().await?;
838        self.migrate_da_proposals().await?;
839        self.migrate_vid_shares().await?;
840        self.migrate_quorum_proposals().await?;
841        self.migrate_quorum_certificates().await?;
842
843        tracing::warn!("consensus storage has been migrated to new types");
844
845        Ok(())
846    }
847
848    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
849    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
850    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
851    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
852    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
853
854    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
855        match self.load_anchor_leaf().await? {
856            Some((leaf, _)) => Ok(leaf.view_number()),
857            None => Ok(ViewNumber::genesis()),
858        }
859    }
860
861    async fn store_next_epoch_quorum_certificate(
862        &self,
863        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
864    ) -> anyhow::Result<()>;
865
866    async fn load_next_epoch_quorum_certificate(
867        &self,
868    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
869
870    async fn append_da2(
871        &self,
872        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
873        vid_commit: VidCommitment,
874    ) -> anyhow::Result<()>;
875
876    async fn append_proposal2(
877        &self,
878        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
879    ) -> anyhow::Result<()> {
880        self.append_quorum_proposal2(proposal).await
881    }
882
883    async fn store_drb_result(
884        &self,
885        epoch: <SeqTypes as NodeType>::Epoch,
886        drb_result: DrbResult,
887    ) -> anyhow::Result<()>;
888    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
889    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
890    async fn store_epoch_root(
891        &self,
892        epoch: <SeqTypes as NodeType>::Epoch,
893        block_header: <SeqTypes as NodeType>::BlockHeader,
894    ) -> anyhow::Result<()>;
895    async fn add_state_cert(
896        &self,
897        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
898    ) -> anyhow::Result<()>;
899
900    fn enable_metrics(&mut self, metrics: &dyn Metrics);
901}
902
903#[async_trait]
904pub trait EventConsumer: Debug + Send + Sync {
905    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
906}
907
908#[async_trait]
909impl<T> EventConsumer for Box<T>
910where
911    T: EventConsumer + ?Sized,
912{
913    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
914        (**self).handle_event(event).await
915    }
916}
917
918#[derive(Clone, Copy, Debug)]
919pub struct NullEventConsumer;
920
921#[async_trait]
922impl EventConsumer for NullEventConsumer {
923    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
924        Ok(())
925    }
926}
927
928#[async_trait]
929impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
930    async fn append_vid(
931        &self,
932        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
933    ) -> anyhow::Result<()> {
934        (**self).append_vid(proposal).await
935    }
936
937    async fn append_vid2(
938        &self,
939        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
940    ) -> anyhow::Result<()> {
941        (**self).append_vid2(proposal).await
942    }
943
944    async fn append_da(
945        &self,
946        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
947        vid_commit: VidCommitment,
948    ) -> anyhow::Result<()> {
949        (**self).append_da(proposal, vid_commit).await
950    }
951
952    async fn append_da2(
953        &self,
954        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
955        vid_commit: VidCommitment,
956    ) -> anyhow::Result<()> {
957        (**self).append_da2(proposal, vid_commit).await
958    }
959
960    async fn record_action(
961        &self,
962        view: ViewNumber,
963        epoch: Option<EpochNumber>,
964        action: HotShotAction,
965    ) -> anyhow::Result<()> {
966        (**self).record_action(view, epoch, action).await
967    }
968
969    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
970        Ok(())
971    }
972
973    async fn append_proposal(
974        &self,
975        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
976    ) -> anyhow::Result<()> {
977        (**self)
978            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
979            .await
980    }
981
982    async fn append_proposal2(
983        &self,
984        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
985    ) -> anyhow::Result<()> {
986        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
987            convert_proposal(proposal.clone());
988        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
989    }
990
991    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
992        Ok(())
993    }
994
995    /// Update the current eQC in storage.
996    async fn update_eqc(
997        &self,
998        high_qc: QuorumCertificate2<SeqTypes>,
999        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1000    ) -> anyhow::Result<()> {
1001        if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1002            if high_qc.view_number() < existing_high_qc.view_number() {
1003                return Ok(());
1004            }
1005        }
1006
1007        (**self).store_eqc(high_qc, next_epoch_high_qc).await
1008    }
1009
1010    async fn update_next_epoch_high_qc2(
1011        &self,
1012        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1013    ) -> anyhow::Result<()> {
1014        Ok(())
1015    }
1016
1017    async fn update_decided_upgrade_certificate(
1018        &self,
1019        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1020    ) -> anyhow::Result<()> {
1021        (**self)
1022            .store_upgrade_certificate(decided_upgrade_certificate)
1023            .await
1024    }
1025
1026    async fn store_drb_result(
1027        &self,
1028        epoch: <SeqTypes as NodeType>::Epoch,
1029        drb_result: DrbResult,
1030    ) -> anyhow::Result<()> {
1031        (**self).store_drb_result(epoch, drb_result).await
1032    }
1033
1034    async fn store_epoch_root(
1035        &self,
1036        epoch: <SeqTypes as NodeType>::Epoch,
1037        block_header: <SeqTypes as NodeType>::BlockHeader,
1038    ) -> anyhow::Result<()> {
1039        (**self).store_epoch_root(epoch, block_header).await
1040    }
1041
1042    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1043        (**self).store_drb_input(drb_input).await
1044    }
1045
1046    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1047        (**self).load_drb_input(epoch).await
1048    }
1049
1050    async fn update_state_cert(
1051        &self,
1052        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1053    ) -> anyhow::Result<()> {
1054        (**self).add_state_cert(state_cert).await
1055    }
1056}
1057
1058/// Data that can be deserialized from a subslice of namespace payload bytes.
1059///
1060/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
1061/// namespace payload bytes to read.
1062pub trait FromNsPayloadBytes<'a> {
1063    /// Deserialize `Self` from namespace payload bytes.
1064    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1065}
1066
1067/// Specifies a subslice of namespace payload bytes to read.
1068///
1069/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
1070/// deserialized from that subslice of bytes.
1071pub trait NsPayloadBytesRange<'a> {
1072    type Output: FromNsPayloadBytes<'a>;
1073
1074    /// Range relative to this ns payload
1075    fn ns_payload_range(&self) -> Range<usize>;
1076}
1077
1078/// Types which can be deserialized from either integers or strings.
1079///
1080/// Some types can be represented as an integer or a string in human-readable formats like JSON or
1081/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
1082/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
1083/// to derive this user-friendly serialization.
1084///
1085/// These types are assumed to have an efficient representation as an integral type in Rust --
1086/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
1087/// encoding. With human readable encodings, serialization is always to a string.
1088pub trait FromStringOrInteger: Sized {
1089    type Binary: Serialize + DeserializeOwned;
1090    type Integer: Serialize + DeserializeOwned;
1091
1092    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1093    fn from_string(s: String) -> anyhow::Result<Self>;
1094    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1095
1096    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1097    fn to_string(&self) -> anyhow::Result<String>;
1098}