espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
15        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
16    },
17    drb::{DrbInput, DrbResult},
18    event::{HotShotAction, LeafInfo},
19    message::{convert_proposal, Proposal},
20    simple_certificate::{
21        CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
22        QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
23    },
24    stake_table::HSStakeTable,
25    traits::{
26        metrics::Metrics,
27        node_implementation::{ConsensusTime, NodeType, Versions},
28        storage::Storage,
29        ValidatedState as HotShotState,
30    },
31    utils::genesis_epoch_from_version,
32    vote::HasViewNumber,
33};
34use indexmap::IndexMap;
35use serde::{de::DeserializeOwned, Serialize};
36
37use super::{
38    impls::NodeState,
39    utils::BackoffParams,
40    v0_3::{EventKey, IndexedStake, StakeTableEvent},
41};
42use crate::{
43    v0::impls::{StakeTableHash, ValidatedState},
44    v0_3::{
45        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
46        Validator,
47    },
48    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
49    BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
50    PubKey, SeqTypes, ValidatorMap,
51};
52
53#[async_trait]
54pub trait StateCatchup: Send + Sync {
55    /// Fetch the leaf at the given height without retrying on transient errors.
56    async fn try_fetch_leaf(
57        &self,
58        retry: usize,
59        height: u64,
60        stake_table: HSStakeTable<SeqTypes>,
61        success_threshold: U256,
62    ) -> anyhow::Result<Leaf2>;
63
64    /// Fetch the leaf at the given height, retrying on transient errors.
65    async fn fetch_leaf(
66        &self,
67        height: u64,
68        stake_table: HSStakeTable<SeqTypes>,
69        success_threshold: U256,
70    ) -> anyhow::Result<Leaf2> {
71        self.backoff()
72            .retry(self, |provider, retry| {
73                let stake_table_clone = stake_table.clone();
74                async move {
75                    provider
76                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
77                        .await
78                }
79                .boxed()
80            })
81            .await
82    }
83
84    /// Fetch the given list of accounts without retrying on transient errors.
85    async fn try_fetch_accounts(
86        &self,
87        retry: usize,
88        instance: &NodeState,
89        height: u64,
90        view: ViewNumber,
91        fee_merkle_tree_root: FeeMerkleCommitment,
92        accounts: &[FeeAccount],
93    ) -> anyhow::Result<Vec<FeeAccountProof>>;
94
95    /// Fetch the given list of accounts, retrying on transient errors.
96    async fn fetch_accounts(
97        &self,
98        instance: &NodeState,
99        height: u64,
100        view: ViewNumber,
101        fee_merkle_tree_root: FeeMerkleCommitment,
102        accounts: Vec<FeeAccount>,
103    ) -> anyhow::Result<Vec<FeeAccountProof>> {
104        self.backoff()
105            .retry(self, |provider, retry| {
106                let accounts = &accounts;
107                async move {
108                    provider
109                        .try_fetch_accounts(
110                            retry,
111                            instance,
112                            height,
113                            view,
114                            fee_merkle_tree_root,
115                            accounts,
116                        )
117                        .await
118                        .map_err(|err| {
119                            err.context(format!(
120                                "fetching accounts {accounts:?}, height {height}, view {view}"
121                            ))
122                        })
123                }
124                .boxed()
125            })
126            .await
127    }
128
129    /// Fetch and remember the blocks frontier without retrying on transient errors.
130    async fn try_remember_blocks_merkle_tree(
131        &self,
132        retry: usize,
133        instance: &NodeState,
134        height: u64,
135        view: ViewNumber,
136        mt: &mut BlockMerkleTree,
137    ) -> anyhow::Result<()>;
138
139    /// Fetch and remember the blocks frontier, retrying on transient errors.
140    async fn remember_blocks_merkle_tree(
141        &self,
142        instance: &NodeState,
143        height: u64,
144        view: ViewNumber,
145        mt: &mut BlockMerkleTree,
146    ) -> anyhow::Result<()> {
147        self.backoff()
148            .retry(mt, |mt, retry| {
149                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
150                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
151                    .boxed()
152            })
153            .await
154    }
155
156    /// Fetch the chain config without retrying on transient errors.
157    async fn try_fetch_chain_config(
158        &self,
159        retry: usize,
160        commitment: Commitment<ChainConfig>,
161    ) -> anyhow::Result<ChainConfig>;
162
163    /// Fetch the chain config, retrying on transient errors.
164    async fn fetch_chain_config(
165        &self,
166        commitment: Commitment<ChainConfig>,
167    ) -> anyhow::Result<ChainConfig> {
168        self.backoff()
169            .retry(self, |provider, retry| {
170                provider
171                    .try_fetch_chain_config(retry, commitment)
172                    .map_err(|err| err.context("fetching chain config"))
173                    .boxed()
174            })
175            .await
176    }
177
178    /// Fetch the given list of reward accounts without retrying on transient errors.
179    async fn try_fetch_reward_accounts_v2(
180        &self,
181        retry: usize,
182        instance: &NodeState,
183        height: u64,
184        view: ViewNumber,
185        reward_merkle_tree_root: RewardMerkleCommitmentV2,
186        accounts: &[RewardAccountV2],
187    ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
188
189    /// Fetch the given list of reward accounts, retrying on transient errors.
190    async fn fetch_reward_accounts_v2(
191        &self,
192        instance: &NodeState,
193        height: u64,
194        view: ViewNumber,
195        reward_merkle_tree_root: RewardMerkleCommitmentV2,
196        accounts: Vec<RewardAccountV2>,
197    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
198        self.backoff()
199            .retry(self, |provider, retry| {
200                let accounts = &accounts;
201                async move {
202                    provider
203                        .try_fetch_reward_accounts_v2(
204                            retry,
205                            instance,
206                            height,
207                            view,
208                            reward_merkle_tree_root,
209                            accounts,
210                        )
211                        .await
212                        .map_err(|err| {
213                            err.context(format!(
214                                "fetching reward accounts {accounts:?}, height {height}, view \
215                                 {view}"
216                            ))
217                        })
218                }
219                .boxed()
220            })
221            .await
222    }
223
224    /// Fetch the given list of reward accounts without retrying on transient errors.
225    async fn try_fetch_reward_accounts_v1(
226        &self,
227        retry: usize,
228        instance: &NodeState,
229        height: u64,
230        view: ViewNumber,
231        reward_merkle_tree_root: RewardMerkleCommitmentV1,
232        accounts: &[RewardAccountV1],
233    ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
234
235    /// Fetch the given list of reward accounts, retrying on transient errors.
236    async fn fetch_reward_accounts_v1(
237        &self,
238        instance: &NodeState,
239        height: u64,
240        view: ViewNumber,
241        reward_merkle_tree_root: RewardMerkleCommitmentV1,
242        accounts: Vec<RewardAccountV1>,
243    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
244        self.backoff()
245            .retry(self, |provider, retry| {
246                let accounts = &accounts;
247                async move {
248                    provider
249                        .try_fetch_reward_accounts_v1(
250                            retry,
251                            instance,
252                            height,
253                            view,
254                            reward_merkle_tree_root,
255                            accounts,
256                        )
257                        .await
258                        .map_err(|err| {
259                            err.context(format!(
260                                "fetching v1 reward accounts {accounts:?}, height {height}, view \
261                                 {view}"
262                            ))
263                        })
264                }
265                .boxed()
266            })
267            .await
268    }
269
270    /// Fetch the state certificate for a given epoch without retrying on transient errors.
271    async fn try_fetch_state_cert(
272        &self,
273        retry: usize,
274        epoch: u64,
275    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
276
277    /// Fetch the state certificate for a given epoch, retrying on transient errors.
278    async fn fetch_state_cert(
279        &self,
280        epoch: u64,
281    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
282        self.backoff()
283            .retry(self, |provider, retry| {
284                provider
285                    .try_fetch_state_cert(retry, epoch)
286                    .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
287                    .boxed()
288            })
289            .await
290    }
291
292    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
293    fn is_local(&self) -> bool;
294
295    /// Returns the backoff parameters for the catchup provider.
296    fn backoff(&self) -> &BackoffParams;
297
298    /// Returns the name of the catchup provider.
299    fn name(&self) -> String;
300}
301
302#[async_trait]
303impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
304    async fn try_fetch_leaf(
305        &self,
306        retry: usize,
307        height: u64,
308        stake_table: HSStakeTable<SeqTypes>,
309        success_threshold: U256,
310    ) -> anyhow::Result<Leaf2> {
311        (**self)
312            .try_fetch_leaf(retry, height, stake_table, success_threshold)
313            .await
314    }
315
316    async fn fetch_leaf(
317        &self,
318        height: u64,
319        stake_table: HSStakeTable<SeqTypes>,
320        success_threshold: U256,
321    ) -> anyhow::Result<Leaf2> {
322        (**self)
323            .fetch_leaf(height, stake_table, success_threshold)
324            .await
325    }
326    async fn try_fetch_accounts(
327        &self,
328        retry: usize,
329        instance: &NodeState,
330        height: u64,
331        view: ViewNumber,
332        fee_merkle_tree_root: FeeMerkleCommitment,
333        accounts: &[FeeAccount],
334    ) -> anyhow::Result<Vec<FeeAccountProof>> {
335        (**self)
336            .try_fetch_accounts(
337                retry,
338                instance,
339                height,
340                view,
341                fee_merkle_tree_root,
342                accounts,
343            )
344            .await
345    }
346
347    async fn fetch_accounts(
348        &self,
349        instance: &NodeState,
350        height: u64,
351        view: ViewNumber,
352        fee_merkle_tree_root: FeeMerkleCommitment,
353        accounts: Vec<FeeAccount>,
354    ) -> anyhow::Result<Vec<FeeAccountProof>> {
355        (**self)
356            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
357            .await
358    }
359
360    async fn try_remember_blocks_merkle_tree(
361        &self,
362        retry: usize,
363        instance: &NodeState,
364        height: u64,
365        view: ViewNumber,
366        mt: &mut BlockMerkleTree,
367    ) -> anyhow::Result<()> {
368        (**self)
369            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
370            .await
371    }
372
373    async fn remember_blocks_merkle_tree(
374        &self,
375        instance: &NodeState,
376        height: u64,
377        view: ViewNumber,
378        mt: &mut BlockMerkleTree,
379    ) -> anyhow::Result<()> {
380        (**self)
381            .remember_blocks_merkle_tree(instance, height, view, mt)
382            .await
383    }
384
385    async fn try_fetch_chain_config(
386        &self,
387        retry: usize,
388        commitment: Commitment<ChainConfig>,
389    ) -> anyhow::Result<ChainConfig> {
390        (**self).try_fetch_chain_config(retry, commitment).await
391    }
392
393    async fn fetch_chain_config(
394        &self,
395        commitment: Commitment<ChainConfig>,
396    ) -> anyhow::Result<ChainConfig> {
397        (**self).fetch_chain_config(commitment).await
398    }
399
400    async fn try_fetch_reward_accounts_v2(
401        &self,
402        retry: usize,
403        instance: &NodeState,
404        height: u64,
405        view: ViewNumber,
406        reward_merkle_tree_root: RewardMerkleCommitmentV2,
407        accounts: &[RewardAccountV2],
408    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
409        (**self)
410            .try_fetch_reward_accounts_v2(
411                retry,
412                instance,
413                height,
414                view,
415                reward_merkle_tree_root,
416                accounts,
417            )
418            .await
419    }
420
421    async fn fetch_reward_accounts_v2(
422        &self,
423        instance: &NodeState,
424        height: u64,
425        view: ViewNumber,
426        reward_merkle_tree_root: RewardMerkleCommitmentV2,
427        accounts: Vec<RewardAccountV2>,
428    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
429        (**self)
430            .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
431            .await
432    }
433
434    async fn try_fetch_reward_accounts_v1(
435        &self,
436        retry: usize,
437        instance: &NodeState,
438        height: u64,
439        view: ViewNumber,
440        reward_merkle_tree_root: RewardMerkleCommitmentV1,
441        accounts: &[RewardAccountV1],
442    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
443        (**self)
444            .try_fetch_reward_accounts_v1(
445                retry,
446                instance,
447                height,
448                view,
449                reward_merkle_tree_root,
450                accounts,
451            )
452            .await
453    }
454
455    async fn fetch_reward_accounts_v1(
456        &self,
457        instance: &NodeState,
458        height: u64,
459        view: ViewNumber,
460        reward_merkle_tree_root: RewardMerkleCommitmentV1,
461        accounts: Vec<RewardAccountV1>,
462    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
463        (**self)
464            .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
465            .await
466    }
467
468    async fn try_fetch_state_cert(
469        &self,
470        retry: usize,
471        epoch: u64,
472    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
473        (**self).try_fetch_state_cert(retry, epoch).await
474    }
475
476    async fn fetch_state_cert(
477        &self,
478        epoch: u64,
479    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
480        (**self).fetch_state_cert(epoch).await
481    }
482
483    fn backoff(&self) -> &BackoffParams {
484        (**self).backoff()
485    }
486
487    fn name(&self) -> String {
488        (**self).name()
489    }
490
491    fn is_local(&self) -> bool {
492        (**self).is_local()
493    }
494}
495
496#[async_trait]
497pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
498    type Persistence: SequencerPersistence + MembershipPersistence;
499
500    fn set_view_retention(&mut self, view_retention: u64);
501    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
502    async fn reset(self) -> anyhow::Result<()>;
503}
504
505/// Determine the read state based on the queried block range.
506// - If the persistence returned events up to the requested block, the read is complete.
507/// - Otherwise, indicate that the read is up to the last processed block.
508#[derive(Clone, Copy, Debug, PartialEq, Eq)]
509pub enum EventsPersistenceRead {
510    Complete,
511    UntilL1Block(u64),
512}
513
514#[async_trait]
515/// Trait used by `Memberships` implementations to interact with persistence layer.
516pub trait MembershipPersistence: Send + Sync + 'static {
517    /// Load stake table for epoch from storage
518    async fn load_stake(
519        &self,
520        epoch: EpochNumber,
521    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
522
523    /// Load stake tables for storage for latest `n` known epochs
524    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
525
526    /// Store stake table at `epoch` in the persistence layer
527    async fn store_stake(
528        &self,
529        epoch: EpochNumber,
530        stake: ValidatorMap,
531        block_reward: Option<RewardAmount>,
532        stake_table_hash: Option<StakeTableHash>,
533    ) -> anyhow::Result<()>;
534
535    async fn store_events(
536        &self,
537        l1_finalized: u64,
538        events: Vec<(EventKey, StakeTableEvent)>,
539    ) -> anyhow::Result<()>;
540    async fn load_events(
541        &self,
542        from_l1_block: u64,
543        l1_finalized: u64,
544    ) -> anyhow::Result<(
545        Option<EventsPersistenceRead>,
546        Vec<(EventKey, StakeTableEvent)>,
547    )>;
548
549    async fn store_all_validators(
550        &self,
551        epoch: EpochNumber,
552        all_validators: IndexMap<Address, Validator<PubKey>>,
553    ) -> anyhow::Result<()>;
554
555    async fn load_all_validators(
556        &self,
557        epoch: EpochNumber,
558        offset: u64,
559        limit: u64,
560    ) -> anyhow::Result<Vec<Validator<PubKey>>>;
561}
562
563#[async_trait]
564pub trait SequencerPersistence:
565    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
566{
567    /// Use this storage as a state catchup backend, if supported.
568    fn into_catchup_provider(
569        self,
570        _backoff: BackoffParams,
571    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
572        bail!("state catchup is not implemented for this persistence type");
573    }
574
575    /// Load the orchestrator config from storage.
576    ///
577    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
578    /// `Err` if it could not be determined whether a config exists or not.
579    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
580
581    /// Save the orchestrator config to storage.
582    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
583
584    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
585    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
586
587    /// Load the view to restart from.
588    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
589
590    /// Load the proposals saved by consensus
591    async fn load_quorum_proposals(
592        &self,
593    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
594
595    async fn load_quorum_proposal(
596        &self,
597        view: ViewNumber,
598    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
599
600    async fn load_vid_share(
601        &self,
602        view: ViewNumber,
603    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
604    async fn load_da_proposal(
605        &self,
606        view: ViewNumber,
607    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
608    async fn load_upgrade_certificate(
609        &self,
610    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
611    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
612    async fn load_state_cert(
613        &self,
614    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
615
616    /// Get a state certificate for an epoch.
617    async fn get_state_cert_by_epoch(
618        &self,
619        epoch: u64,
620    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
621
622    /// Insert a state certificate for a given epoch.
623    async fn insert_state_cert(
624        &self,
625        epoch: u64,
626        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
627    ) -> anyhow::Result<()>;
628
629    /// Load the latest known consensus state.
630    ///
631    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
632    /// if there is no saved state). Also returns the anchor view number, which can be used as a
633    /// reference point to process any events which were not processed before a previous shutdown,
634    /// if applicable,.
635    async fn load_consensus_state<V: Versions>(
636        &self,
637        state: NodeState,
638    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
639        let genesis_validated_state = ValidatedState::genesis(&state).0;
640        let highest_voted_view = match self
641            .load_latest_acted_view()
642            .await
643            .context("loading last voted view")?
644        {
645            Some(view) => {
646                tracing::info!(?view, "starting with last actioned view");
647                view
648            },
649            None => {
650                tracing::info!("no saved view, starting from genesis");
651                ViewNumber::genesis()
652            },
653        };
654
655        let restart_view = match self
656            .load_restart_view()
657            .await
658            .context("loading restart view")?
659        {
660            Some(view) => {
661                tracing::info!(?view, "starting from saved view");
662                view
663            },
664            None => {
665                tracing::info!("no saved view, starting from genesis");
666                ViewNumber::genesis()
667            },
668        };
669        let next_epoch_high_qc = self
670            .load_next_epoch_quorum_certificate()
671            .await
672            .context("loading next epoch qc")?;
673        let (leaf, mut high_qc, anchor_view) = match self
674            .load_anchor_leaf()
675            .await
676            .context("loading anchor leaf")?
677        {
678            Some((leaf, high_qc)) => {
679                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
680                ensure!(
681                    leaf.view_number() == high_qc.view_number,
682                    format!(
683                        "loaded anchor leaf from view {}, but high QC is from view {}",
684                        leaf.view_number(),
685                        high_qc.view_number
686                    )
687                );
688
689                let anchor_view = leaf.view_number();
690                (leaf, high_qc, Some(anchor_view))
691            },
692            None => {
693                tracing::info!("no saved leaf, starting from genesis leaf");
694                (
695                    hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
696                        .await,
697                    QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
698                    None,
699                )
700            },
701        };
702
703        if let Some((extended_high_qc, _)) = self.load_eqc().await {
704            if extended_high_qc.view_number() > high_qc.view_number() {
705                high_qc = extended_high_qc
706            }
707        }
708
709        let validated_state = if leaf.block_header().height() == 0 {
710            // If we are starting from genesis, we can provide the full state.
711            genesis_validated_state
712        } else {
713            // Otherwise, we will have to construct a sparse state and fetch missing data during
714            // catchup.
715            ValidatedState::from_header(leaf.block_header())
716        };
717
718        // If we are not starting from genesis, we start from the view following the maximum view
719        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
720        // starting in a view in which we had already voted before the restart, and prevents
721        // unnecessary catchup from starting in a view earlier than the anchor leaf.
722        let restart_view = max(restart_view, leaf.view_number());
723        // TODO:
724        let epoch = genesis_epoch_from_version::<V, SeqTypes>();
725
726        let config = self.load_config().await.context("loading config")?;
727        let epoch_height = config
728            .as_ref()
729            .map(|c| c.config.epoch_height)
730            .unwrap_or_default();
731        let epoch_start_block = config
732            .as_ref()
733            .map(|c| c.config.epoch_start_block)
734            .unwrap_or_default();
735
736        let saved_proposals = self
737            .load_quorum_proposals()
738            .await
739            .context("loading saved proposals")?;
740
741        let upgrade_certificate = self
742            .load_upgrade_certificate()
743            .await
744            .context("loading upgrade certificate")?;
745
746        let start_epoch_info = self
747            .load_start_epoch_info()
748            .await
749            .context("loading start epoch info")?;
750
751        let state_cert = self
752            .load_state_cert()
753            .await
754            .context("loading light client state update certificate")?;
755
756        tracing::warn!(
757            ?leaf,
758            ?restart_view,
759            ?epoch,
760            ?high_qc,
761            ?validated_state,
762            ?state_cert,
763            "loaded consensus state"
764        );
765
766        Ok((
767            HotShotInitializer {
768                instance_state: state,
769                epoch_height,
770                epoch_start_block,
771                anchor_leaf: leaf,
772                anchor_state: Arc::new(validated_state),
773                anchor_state_delta: None,
774                start_view: restart_view,
775                start_epoch: epoch,
776                last_actioned_view: highest_voted_view,
777                saved_proposals,
778                high_qc,
779                next_epoch_high_qc,
780                decided_upgrade_certificate: upgrade_certificate,
781                undecided_leaves: Default::default(),
782                undecided_state: Default::default(),
783                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
784                start_epoch_info,
785                state_cert,
786            },
787            anchor_view,
788        ))
789    }
790
791    /// Update storage based on an event from consensus.
792    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
793        if let EventType::Decide {
794            leaf_chain,
795            committing_qc,
796            deciding_qc,
797            ..
798        } = &event.event
799        {
800            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
801                // No new leaves.
802                return;
803            };
804
805            // Associate each decided leaf with a QC.
806            let chain = leaf_chain.iter().zip(
807                // The first (most recent) leaf corresponds to the QC triggering the decide event.
808                std::iter::once((**committing_qc).clone())
809                    // Moving backwards in the chain, each leaf corresponds with the subsequent
810                    // leaf's justify QC.
811                    .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
812            );
813
814            if let Err(err) = self
815                .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
816                .await
817            {
818                tracing::error!(
819                    "failed to save decided leaves, chain may not be up to date: {err:#}"
820                );
821                return;
822            }
823        }
824    }
825
826    /// Append decided leaves to persistent storage and emit a corresponding event.
827    ///
828    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
829    /// up to and including `view`. If available in persistent storage, full block payloads and VID
830    /// info will also be included for each leaf.
831    ///
832    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
833    /// collected The consumer's handling of this event is a prerequisite for the completion of
834    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
835    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
836    /// will eventually be passed to the consumer.
837    ///
838    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
839    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
840    /// processed twice, or the consumer may get two events which share a subset of their data.
841    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
842    /// idempotent.
843    ///
844    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
845    /// until the next decide, at which point all persisted leaves from the failed GC run will be
846    /// included in the event along with subsequently decided leaves.
847    ///
848    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
849    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
850    /// storage to long-term archival storage.
851    async fn append_decided_leaves(
852        &self,
853        decided_view: ViewNumber,
854        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
855        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
856        consumer: &(impl EventConsumer + 'static),
857    ) -> anyhow::Result<()>;
858
859    async fn load_anchor_leaf(
860        &self,
861    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
862    async fn append_vid(
863        &self,
864        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
865    ) -> anyhow::Result<()>;
866    async fn append_da(
867        &self,
868        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
869        vid_commit: VidCommitment,
870    ) -> anyhow::Result<()>;
871    async fn record_action(
872        &self,
873        view: ViewNumber,
874        epoch: Option<EpochNumber>,
875        action: HotShotAction,
876    ) -> anyhow::Result<()>;
877
878    async fn append_quorum_proposal2(
879        &self,
880        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
881    ) -> anyhow::Result<()>;
882
883    /// Update the current eQC in storage.
884    async fn store_eqc(
885        &self,
886        _high_qc: QuorumCertificate2<SeqTypes>,
887        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
888    ) -> anyhow::Result<()>;
889
890    /// Load the current eQC from storage.
891    async fn load_eqc(
892        &self,
893    ) -> Option<(
894        QuorumCertificate2<SeqTypes>,
895        NextEpochQuorumCertificate2<SeqTypes>,
896    )>;
897
898    async fn store_upgrade_certificate(
899        &self,
900        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
901    ) -> anyhow::Result<()>;
902
903    async fn migrate_storage(&self) -> anyhow::Result<()> {
904        tracing::warn!("migrating consensus data...");
905
906        self.migrate_anchor_leaf().await?;
907        self.migrate_da_proposals().await?;
908        self.migrate_vid_shares().await?;
909        self.migrate_quorum_proposals().await?;
910        self.migrate_quorum_certificates().await?;
911
912        tracing::warn!("consensus storage has been migrated to new types");
913
914        Ok(())
915    }
916
917    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
918    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
919    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
920    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
921    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
922
923    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
924        match self.load_anchor_leaf().await? {
925            Some((leaf, _)) => Ok(leaf.view_number()),
926            None => Ok(ViewNumber::genesis()),
927        }
928    }
929
930    async fn store_next_epoch_quorum_certificate(
931        &self,
932        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
933    ) -> anyhow::Result<()>;
934
935    async fn load_next_epoch_quorum_certificate(
936        &self,
937    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
938
939    async fn append_da2(
940        &self,
941        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
942        vid_commit: VidCommitment,
943    ) -> anyhow::Result<()>;
944
945    async fn append_proposal2(
946        &self,
947        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
948    ) -> anyhow::Result<()> {
949        self.append_quorum_proposal2(proposal).await
950    }
951
952    async fn store_drb_result(
953        &self,
954        epoch: <SeqTypes as NodeType>::Epoch,
955        drb_result: DrbResult,
956    ) -> anyhow::Result<()>;
957    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
958    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
959    async fn store_epoch_root(
960        &self,
961        epoch: <SeqTypes as NodeType>::Epoch,
962        block_header: <SeqTypes as NodeType>::BlockHeader,
963    ) -> anyhow::Result<()>;
964    async fn add_state_cert(
965        &self,
966        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
967    ) -> anyhow::Result<()>;
968
969    fn enable_metrics(&mut self, metrics: &dyn Metrics);
970}
971
972#[async_trait]
973pub trait EventConsumer: Debug + Send + Sync {
974    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
975}
976
977#[async_trait]
978impl<T> EventConsumer for Box<T>
979where
980    T: EventConsumer + ?Sized,
981{
982    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
983        (**self).handle_event(event).await
984    }
985}
986
987#[derive(Clone, Copy, Debug)]
988pub struct NullEventConsumer;
989
990#[async_trait]
991impl EventConsumer for NullEventConsumer {
992    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
993        Ok(())
994    }
995}
996
997#[async_trait]
998impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
999    async fn append_vid(
1000        &self,
1001        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1002    ) -> anyhow::Result<()> {
1003        (**self).append_vid(proposal).await
1004    }
1005
1006    async fn append_da(
1007        &self,
1008        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1009        vid_commit: VidCommitment,
1010    ) -> anyhow::Result<()> {
1011        (**self).append_da(proposal, vid_commit).await
1012    }
1013
1014    async fn append_da2(
1015        &self,
1016        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1017        vid_commit: VidCommitment,
1018    ) -> anyhow::Result<()> {
1019        (**self).append_da2(proposal, vid_commit).await
1020    }
1021
1022    async fn record_action(
1023        &self,
1024        view: ViewNumber,
1025        epoch: Option<EpochNumber>,
1026        action: HotShotAction,
1027    ) -> anyhow::Result<()> {
1028        (**self).record_action(view, epoch, action).await
1029    }
1030
1031    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1032        Ok(())
1033    }
1034
1035    async fn append_proposal(
1036        &self,
1037        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1038    ) -> anyhow::Result<()> {
1039        (**self)
1040            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1041            .await
1042    }
1043
1044    async fn append_proposal2(
1045        &self,
1046        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1047    ) -> anyhow::Result<()> {
1048        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1049            convert_proposal(proposal.clone());
1050        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1051    }
1052
1053    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1054        Ok(())
1055    }
1056
1057    /// Update the current eQC in storage.
1058    async fn update_eqc(
1059        &self,
1060        high_qc: QuorumCertificate2<SeqTypes>,
1061        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1062    ) -> anyhow::Result<()> {
1063        if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1064            if high_qc.view_number() < existing_high_qc.view_number() {
1065                return Ok(());
1066            }
1067        }
1068
1069        (**self).store_eqc(high_qc, next_epoch_high_qc).await
1070    }
1071
1072    async fn update_next_epoch_high_qc2(
1073        &self,
1074        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1075    ) -> anyhow::Result<()> {
1076        Ok(())
1077    }
1078
1079    async fn update_decided_upgrade_certificate(
1080        &self,
1081        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1082    ) -> anyhow::Result<()> {
1083        (**self)
1084            .store_upgrade_certificate(decided_upgrade_certificate)
1085            .await
1086    }
1087
1088    async fn store_drb_result(
1089        &self,
1090        epoch: <SeqTypes as NodeType>::Epoch,
1091        drb_result: DrbResult,
1092    ) -> anyhow::Result<()> {
1093        (**self).store_drb_result(epoch, drb_result).await
1094    }
1095
1096    async fn store_epoch_root(
1097        &self,
1098        epoch: <SeqTypes as NodeType>::Epoch,
1099        block_header: <SeqTypes as NodeType>::BlockHeader,
1100    ) -> anyhow::Result<()> {
1101        (**self).store_epoch_root(epoch, block_header).await
1102    }
1103
1104    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1105        (**self).store_drb_input(drb_input).await
1106    }
1107
1108    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1109        (**self).load_drb_input(epoch).await
1110    }
1111
1112    async fn update_state_cert(
1113        &self,
1114        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1115    ) -> anyhow::Result<()> {
1116        (**self).add_state_cert(state_cert).await
1117    }
1118}
1119
1120/// Data that can be deserialized from a subslice of namespace payload bytes.
1121///
1122/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
1123/// namespace payload bytes to read.
1124pub trait FromNsPayloadBytes<'a> {
1125    /// Deserialize `Self` from namespace payload bytes.
1126    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1127}
1128
1129/// Specifies a subslice of namespace payload bytes to read.
1130///
1131/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
1132/// deserialized from that subslice of bytes.
1133pub trait NsPayloadBytesRange<'a> {
1134    type Output: FromNsPayloadBytes<'a>;
1135
1136    /// Range relative to this ns payload
1137    fn ns_payload_range(&self) -> Range<usize>;
1138}
1139
1140/// Types which can be deserialized from either integers or strings.
1141///
1142/// Some types can be represented as an integer or a string in human-readable formats like JSON or
1143/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
1144/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
1145/// to derive this user-friendly serialization.
1146///
1147/// These types are assumed to have an efficient representation as an integral type in Rust --
1148/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
1149/// encoding. With human readable encodings, serialization is always to a string.
1150pub trait FromStringOrInteger: Sized {
1151    type Binary: Serialize + DeserializeOwned;
1152    type Integer: Serialize + DeserializeOwned;
1153
1154    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1155    fn from_string(s: String) -> anyhow::Result<Self>;
1156    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1157
1158    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1159    fn to_string(&self) -> anyhow::Result<String>;
1160}