1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13 data::{
14 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
15 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
16 },
17 drb::{DrbInput, DrbResult},
18 event::{HotShotAction, LeafInfo},
19 message::{convert_proposal, Proposal},
20 simple_certificate::{
21 CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
22 QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
23 },
24 stake_table::HSStakeTable,
25 traits::{
26 metrics::Metrics,
27 node_implementation::{ConsensusTime, NodeType},
28 storage::Storage,
29 ValidatedState as HotShotState,
30 },
31 utils::genesis_epoch_from_version,
32 vote::HasViewNumber,
33};
34use indexmap::IndexMap;
35use serde::{de::DeserializeOwned, Serialize};
36use versions::Upgrade;
37
38use super::{
39 impls::NodeState,
40 utils::BackoffParams,
41 v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44 v0::impls::{StakeTableHash, ValidatedState},
45 v0_3::{
46 ChainConfig, RegisteredValidator, RewardAccountProofV1, RewardAccountV1, RewardAmount,
47 RewardMerkleCommitmentV1,
48 },
49 v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2},
50 AuthenticatedValidatorMap, BlockMerkleTree, Event, FeeAccount, FeeAccountProof,
51 FeeMerkleCommitment, Leaf2, NetworkConfig, PubKey, SeqTypes,
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56 async fn try_fetch_leaf(
58 &self,
59 retry: usize,
60 height: u64,
61 stake_table: HSStakeTable<SeqTypes>,
62 success_threshold: U256,
63 ) -> anyhow::Result<Leaf2>;
64
65 async fn fetch_leaf(
67 &self,
68 height: u64,
69 stake_table: HSStakeTable<SeqTypes>,
70 success_threshold: U256,
71 ) -> anyhow::Result<Leaf2> {
72 self.backoff()
73 .retry(self, |provider, retry| {
74 let stake_table_clone = stake_table.clone();
75 async move {
76 provider
77 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78 .await
79 }
80 .boxed()
81 })
82 .await
83 }
84
85 async fn try_fetch_accounts(
87 &self,
88 retry: usize,
89 instance: &NodeState,
90 height: u64,
91 view: ViewNumber,
92 fee_merkle_tree_root: FeeMerkleCommitment,
93 accounts: &[FeeAccount],
94 ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96 async fn fetch_accounts(
98 &self,
99 instance: &NodeState,
100 height: u64,
101 view: ViewNumber,
102 fee_merkle_tree_root: FeeMerkleCommitment,
103 accounts: Vec<FeeAccount>,
104 ) -> anyhow::Result<Vec<FeeAccountProof>> {
105 self.backoff()
106 .retry(self, |provider, retry| {
107 let accounts = &accounts;
108 async move {
109 provider
110 .try_fetch_accounts(
111 retry,
112 instance,
113 height,
114 view,
115 fee_merkle_tree_root,
116 accounts,
117 )
118 .await
119 .map_err(|err| {
120 err.context(format!(
121 "fetching accounts {accounts:?}, height {height}, view {view}"
122 ))
123 })
124 }
125 .boxed()
126 })
127 .await
128 }
129
130 async fn try_remember_blocks_merkle_tree(
132 &self,
133 retry: usize,
134 instance: &NodeState,
135 height: u64,
136 view: ViewNumber,
137 mt: &mut BlockMerkleTree,
138 ) -> anyhow::Result<()>;
139
140 async fn remember_blocks_merkle_tree(
142 &self,
143 instance: &NodeState,
144 height: u64,
145 view: ViewNumber,
146 mt: &mut BlockMerkleTree,
147 ) -> anyhow::Result<()> {
148 self.backoff()
149 .retry(mt, |mt, retry| {
150 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152 .boxed()
153 })
154 .await
155 }
156
157 async fn try_fetch_chain_config(
159 &self,
160 retry: usize,
161 commitment: Commitment<ChainConfig>,
162 ) -> anyhow::Result<ChainConfig>;
163
164 async fn fetch_chain_config(
166 &self,
167 commitment: Commitment<ChainConfig>,
168 ) -> anyhow::Result<ChainConfig> {
169 self.backoff()
170 .retry(self, |provider, retry| {
171 provider
172 .try_fetch_chain_config(retry, commitment)
173 .map_err(|err| err.context("fetching chain config"))
174 .boxed()
175 })
176 .await
177 }
178
179 async fn try_fetch_reward_merkle_tree_v2(
181 &self,
182 retry: usize,
183 height: u64,
184 view: ViewNumber,
185 reward_merkle_tree_root: RewardMerkleCommitmentV2,
186 accounts: Arc<Vec<RewardAccountV2>>,
187 ) -> anyhow::Result<PermittedRewardMerkleTreeV2>;
188
189 async fn fetch_reward_merkle_tree_v2(
190 &self,
191 height: u64,
192 view: ViewNumber,
193 reward_merkle_tree_root: RewardMerkleCommitmentV2,
194 accounts: Arc<Vec<RewardAccountV2>>,
195 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
196 self.backoff()
197 .retry(self, |provider, retry| {
198 let accounts = accounts.clone();
199 async move {
200 provider
201 .try_fetch_reward_merkle_tree_v2(
202 retry,
203 height,
204 view,
205 reward_merkle_tree_root,
206 accounts,
207 )
208 .await
209 .map_err(|err| {
210 err.context(format!("fetching reward merkle tree for height {height}"))
211 })
212 }
213 .boxed()
214 })
215 .await
216 }
217
218 async fn try_fetch_reward_accounts_v1(
220 &self,
221 retry: usize,
222 instance: &NodeState,
223 height: u64,
224 view: ViewNumber,
225 reward_merkle_tree_root: RewardMerkleCommitmentV1,
226 accounts: &[RewardAccountV1],
227 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
228
229 async fn fetch_reward_accounts_v1(
231 &self,
232 instance: &NodeState,
233 height: u64,
234 view: ViewNumber,
235 reward_merkle_tree_root: RewardMerkleCommitmentV1,
236 accounts: Vec<RewardAccountV1>,
237 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
238 self.backoff()
239 .retry(self, |provider, retry| {
240 let accounts = &accounts;
241 async move {
242 provider
243 .try_fetch_reward_accounts_v1(
244 retry,
245 instance,
246 height,
247 view,
248 reward_merkle_tree_root,
249 accounts,
250 )
251 .await
252 .map_err(|err| {
253 err.context(format!(
254 "fetching v1 reward accounts {accounts:?}, height {height}, view \
255 {view}"
256 ))
257 })
258 }
259 .boxed()
260 })
261 .await
262 }
263
264 async fn try_fetch_state_cert(
266 &self,
267 retry: usize,
268 epoch: u64,
269 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
270
271 async fn fetch_state_cert(
273 &self,
274 epoch: u64,
275 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
276 self.backoff()
277 .retry(self, |provider, retry| {
278 provider
279 .try_fetch_state_cert(retry, epoch)
280 .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
281 .boxed()
282 })
283 .await
284 }
285
286 fn is_local(&self) -> bool;
288
289 fn backoff(&self) -> &BackoffParams;
291
292 fn name(&self) -> String;
294}
295
296#[async_trait]
297impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
298 async fn try_fetch_leaf(
299 &self,
300 retry: usize,
301 height: u64,
302 stake_table: HSStakeTable<SeqTypes>,
303 success_threshold: U256,
304 ) -> anyhow::Result<Leaf2> {
305 (**self)
306 .try_fetch_leaf(retry, height, stake_table, success_threshold)
307 .await
308 }
309
310 async fn fetch_leaf(
311 &self,
312 height: u64,
313 stake_table: HSStakeTable<SeqTypes>,
314 success_threshold: U256,
315 ) -> anyhow::Result<Leaf2> {
316 (**self)
317 .fetch_leaf(height, stake_table, success_threshold)
318 .await
319 }
320 async fn try_fetch_accounts(
321 &self,
322 retry: usize,
323 instance: &NodeState,
324 height: u64,
325 view: ViewNumber,
326 fee_merkle_tree_root: FeeMerkleCommitment,
327 accounts: &[FeeAccount],
328 ) -> anyhow::Result<Vec<FeeAccountProof>> {
329 (**self)
330 .try_fetch_accounts(
331 retry,
332 instance,
333 height,
334 view,
335 fee_merkle_tree_root,
336 accounts,
337 )
338 .await
339 }
340
341 async fn fetch_accounts(
342 &self,
343 instance: &NodeState,
344 height: u64,
345 view: ViewNumber,
346 fee_merkle_tree_root: FeeMerkleCommitment,
347 accounts: Vec<FeeAccount>,
348 ) -> anyhow::Result<Vec<FeeAccountProof>> {
349 (**self)
350 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
351 .await
352 }
353
354 async fn try_remember_blocks_merkle_tree(
355 &self,
356 retry: usize,
357 instance: &NodeState,
358 height: u64,
359 view: ViewNumber,
360 mt: &mut BlockMerkleTree,
361 ) -> anyhow::Result<()> {
362 (**self)
363 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
364 .await
365 }
366
367 async fn remember_blocks_merkle_tree(
368 &self,
369 instance: &NodeState,
370 height: u64,
371 view: ViewNumber,
372 mt: &mut BlockMerkleTree,
373 ) -> anyhow::Result<()> {
374 (**self)
375 .remember_blocks_merkle_tree(instance, height, view, mt)
376 .await
377 }
378
379 async fn try_fetch_chain_config(
380 &self,
381 retry: usize,
382 commitment: Commitment<ChainConfig>,
383 ) -> anyhow::Result<ChainConfig> {
384 (**self).try_fetch_chain_config(retry, commitment).await
385 }
386
387 async fn fetch_chain_config(
388 &self,
389 commitment: Commitment<ChainConfig>,
390 ) -> anyhow::Result<ChainConfig> {
391 (**self).fetch_chain_config(commitment).await
392 }
393
394 async fn try_fetch_reward_merkle_tree_v2(
395 &self,
396 retry: usize,
397 height: u64,
398 view: ViewNumber,
399 reward_merkle_tree_root: RewardMerkleCommitmentV2,
400 accounts: Arc<Vec<RewardAccountV2>>,
401 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
402 (**self)
403 .try_fetch_reward_merkle_tree_v2(retry, height, view, reward_merkle_tree_root, accounts)
404 .await
405 }
406
407 async fn fetch_reward_merkle_tree_v2(
408 &self,
409 height: u64,
410 view: ViewNumber,
411 reward_merkle_tree_root: RewardMerkleCommitmentV2,
412 accounts: Arc<Vec<RewardAccountV2>>,
413 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
414 (**self)
415 .fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts)
416 .await
417 }
418
419 async fn try_fetch_reward_accounts_v1(
420 &self,
421 retry: usize,
422 instance: &NodeState,
423 height: u64,
424 view: ViewNumber,
425 reward_merkle_tree_root: RewardMerkleCommitmentV1,
426 accounts: &[RewardAccountV1],
427 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
428 (**self)
429 .try_fetch_reward_accounts_v1(
430 retry,
431 instance,
432 height,
433 view,
434 reward_merkle_tree_root,
435 accounts,
436 )
437 .await
438 }
439
440 async fn fetch_reward_accounts_v1(
441 &self,
442 instance: &NodeState,
443 height: u64,
444 view: ViewNumber,
445 reward_merkle_tree_root: RewardMerkleCommitmentV1,
446 accounts: Vec<RewardAccountV1>,
447 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
448 (**self)
449 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
450 .await
451 }
452
453 async fn try_fetch_state_cert(
454 &self,
455 retry: usize,
456 epoch: u64,
457 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
458 (**self).try_fetch_state_cert(retry, epoch).await
459 }
460
461 async fn fetch_state_cert(
462 &self,
463 epoch: u64,
464 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
465 (**self).fetch_state_cert(epoch).await
466 }
467
468 fn backoff(&self) -> &BackoffParams {
469 (**self).backoff()
470 }
471
472 fn name(&self) -> String {
473 (**self).name()
474 }
475
476 fn is_local(&self) -> bool {
477 (**self).is_local()
478 }
479}
480
481#[async_trait]
482pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
483 type Persistence: SequencerPersistence + MembershipPersistence;
484
485 fn set_view_retention(&mut self, view_retention: u64);
486 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
487 async fn reset(self) -> anyhow::Result<()>;
488}
489
490#[derive(Clone, Copy, Debug, PartialEq, Eq)]
494pub enum EventsPersistenceRead {
495 Complete,
496 UntilL1Block(u64),
497}
498
499pub type StakeTuple = (
501 AuthenticatedValidatorMap,
502 Option<RewardAmount>,
503 Option<StakeTableHash>,
504);
505
506#[async_trait]
507pub trait MembershipPersistence: Send + Sync + 'static {
509 async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>>;
511
512 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
514
515 async fn store_stake(
517 &self,
518 epoch: EpochNumber,
519 stake: AuthenticatedValidatorMap,
520 block_reward: Option<RewardAmount>,
521 stake_table_hash: Option<StakeTableHash>,
522 ) -> anyhow::Result<()>;
523
524 async fn store_events(
525 &self,
526 l1_finalized: u64,
527 events: Vec<(EventKey, StakeTableEvent)>,
528 ) -> anyhow::Result<()>;
529 async fn load_events(
530 &self,
531 from_l1_block: u64,
532 l1_finalized: u64,
533 ) -> anyhow::Result<(
534 Option<EventsPersistenceRead>,
535 Vec<(EventKey, StakeTableEvent)>,
536 )>;
537
538 async fn delete_stake_tables(&self) -> anyhow::Result<()>;
540
541 async fn store_all_validators(
542 &self,
543 epoch: EpochNumber,
544 all_validators: IndexMap<Address, RegisteredValidator<PubKey>>,
545 ) -> anyhow::Result<()>;
546
547 async fn load_all_validators(
548 &self,
549 epoch: EpochNumber,
550 offset: u64,
551 limit: u64,
552 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>>;
553}
554
555#[async_trait]
556pub trait SequencerPersistence:
557 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
558{
559 async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()>;
560
561 fn into_catchup_provider(
563 self,
564 _backoff: BackoffParams,
565 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
566 bail!("state catchup is not implemented for this persistence type");
567 }
568
569 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
574
575 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
577
578 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
580
581 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
583
584 async fn load_quorum_proposals(
586 &self,
587 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
588
589 async fn load_quorum_proposal(
590 &self,
591 view: ViewNumber,
592 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
593
594 async fn load_vid_share(
595 &self,
596 view: ViewNumber,
597 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
598 async fn load_da_proposal(
599 &self,
600 view: ViewNumber,
601 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
602 async fn load_upgrade_certificate(
603 &self,
604 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
605 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
606 async fn load_state_cert(
607 &self,
608 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
609
610 async fn get_state_cert_by_epoch(
612 &self,
613 epoch: u64,
614 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
615
616 async fn insert_state_cert(
618 &self,
619 epoch: u64,
620 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
621 ) -> anyhow::Result<()>;
622
623 async fn load_consensus_state(
630 &self,
631 state: NodeState,
632 upgrade: Upgrade,
633 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
634 let genesis_validated_state = ValidatedState::genesis(&state).0;
635 let highest_voted_view = match self
636 .load_latest_acted_view()
637 .await
638 .context("loading last voted view")?
639 {
640 Some(view) => {
641 tracing::info!(?view, "starting with last actioned view");
642 view
643 },
644 None => {
645 tracing::info!("no saved view, starting from genesis");
646 ViewNumber::genesis()
647 },
648 };
649
650 let restart_view = match self
651 .load_restart_view()
652 .await
653 .context("loading restart view")?
654 {
655 Some(view) => {
656 tracing::info!(?view, "starting from saved view");
657 view
658 },
659 None => {
660 tracing::info!("no saved view, starting from genesis");
661 ViewNumber::genesis()
662 },
663 };
664 let next_epoch_high_qc = self
665 .load_next_epoch_quorum_certificate()
666 .await
667 .context("loading next epoch qc")?;
668 let (leaf, mut high_qc, anchor_view) = match self
669 .load_anchor_leaf()
670 .await
671 .context("loading anchor leaf")?
672 {
673 Some((leaf, high_qc)) => {
674 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
675 ensure!(
676 leaf.view_number() == high_qc.view_number,
677 format!(
678 "loaded anchor leaf from view {}, but high QC is from view {}",
679 leaf.view_number(),
680 high_qc.view_number
681 )
682 );
683
684 let anchor_view = leaf.view_number();
685 (leaf, high_qc, Some(anchor_view))
686 },
687 None => {
688 tracing::info!("no saved leaf, starting from genesis leaf");
689 (
690 hotshot_types::data::Leaf2::genesis(
691 &genesis_validated_state,
692 &state,
693 upgrade.base,
694 )
695 .await,
696 QuorumCertificate2::genesis(&genesis_validated_state, &state, upgrade).await,
697 None,
698 )
699 },
700 };
701
702 if let Some((extended_high_qc, _)) = self.load_eqc().await {
703 if extended_high_qc.view_number() > high_qc.view_number() {
704 high_qc = extended_high_qc
705 }
706 }
707
708 let validated_state = if leaf.block_header().height() == 0 {
709 genesis_validated_state
711 } else {
712 ValidatedState::from_header(leaf.block_header())
715 };
716
717 let restart_view = max(restart_view, leaf.view_number());
722 let epoch = genesis_epoch_from_version::<SeqTypes>(upgrade.base);
724
725 let config = self.load_config().await.context("loading config")?;
726 let epoch_height = config
727 .as_ref()
728 .map(|c| c.config.epoch_height)
729 .unwrap_or_default();
730 let epoch_start_block = config
731 .as_ref()
732 .map(|c| c.config.epoch_start_block)
733 .unwrap_or_default();
734
735 let saved_proposals = self
736 .load_quorum_proposals()
737 .await
738 .context("loading saved proposals")?;
739
740 let upgrade_certificate = self
741 .load_upgrade_certificate()
742 .await
743 .context("loading upgrade certificate")?;
744
745 let start_epoch_info = self
746 .load_start_epoch_info()
747 .await
748 .context("loading start epoch info")?;
749
750 let state_cert = self
751 .load_state_cert()
752 .await
753 .context("loading light client state update certificate")?;
754
755 tracing::warn!(
756 ?leaf,
757 ?restart_view,
758 ?epoch,
759 ?high_qc,
760 ?validated_state,
761 ?state_cert,
762 "loaded consensus state"
763 );
764
765 Ok((
766 HotShotInitializer {
767 instance_state: state,
768 epoch_height,
769 epoch_start_block,
770 anchor_leaf: leaf,
771 anchor_state: Arc::new(validated_state),
772 anchor_state_delta: None,
773 start_view: restart_view,
774 start_epoch: epoch,
775 last_actioned_view: highest_voted_view,
776 saved_proposals,
777 high_qc,
778 next_epoch_high_qc,
779 decided_upgrade_certificate: upgrade_certificate,
780 undecided_leaves: Default::default(),
781 undecided_state: Default::default(),
782 saved_vid_shares: Default::default(), start_epoch_info,
784 state_cert,
785 },
786 anchor_view,
787 ))
788 }
789
790 async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
792 if let EventType::Decide {
793 leaf_chain,
794 committing_qc,
795 deciding_qc,
796 ..
797 } = &event.event
798 {
799 let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
800 return;
802 };
803
804 let chain = leaf_chain.iter().zip(
806 std::iter::once((**committing_qc).clone())
808 .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
811 );
812
813 if let Err(err) = self
814 .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
815 .await
816 {
817 tracing::error!(
818 "failed to save decided leaves, chain may not be up to date: {err:#}"
819 );
820 return;
821 }
822 }
823 }
824
825 async fn append_decided_leaves(
851 &self,
852 decided_view: ViewNumber,
853 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
854 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
855 consumer: &(impl EventConsumer + 'static),
856 ) -> anyhow::Result<()>;
857
858 async fn load_anchor_leaf(
859 &self,
860 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
861 async fn append_vid(
862 &self,
863 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
864 ) -> anyhow::Result<()>;
865 async fn append_da(
866 &self,
867 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
868 vid_commit: VidCommitment,
869 ) -> anyhow::Result<()>;
870 async fn record_action(
871 &self,
872 view: ViewNumber,
873 epoch: Option<EpochNumber>,
874 action: HotShotAction,
875 ) -> anyhow::Result<()>;
876
877 async fn append_quorum_proposal2(
878 &self,
879 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
880 ) -> anyhow::Result<()>;
881
882 async fn store_eqc(
884 &self,
885 _high_qc: QuorumCertificate2<SeqTypes>,
886 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
887 ) -> anyhow::Result<()>;
888
889 async fn load_eqc(
891 &self,
892 ) -> Option<(
893 QuorumCertificate2<SeqTypes>,
894 NextEpochQuorumCertificate2<SeqTypes>,
895 )>;
896
897 async fn store_upgrade_certificate(
898 &self,
899 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
900 ) -> anyhow::Result<()>;
901
902 async fn migrate_storage(&self) -> anyhow::Result<()> {
903 tracing::warn!("migrating consensus data...");
904
905 self.migrate_anchor_leaf().await?;
906 self.migrate_da_proposals().await?;
907 self.migrate_vid_shares().await?;
908 self.migrate_quorum_proposals().await?;
909 self.migrate_quorum_certificates().await?;
910 self.migrate_reward_merkle_tree_v2()
911 .await
912 .context("failed to migrate reward merkle tree v2")?;
913 self.migrate_validator_authenticated().await?;
914
915 tracing::warn!("consensus storage has been migrated to new types");
916
917 Ok(())
918 }
919
920 async fn migrate_validator_authenticated(&self) -> anyhow::Result<()>;
921
922 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
923 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
924 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
925 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
926 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
927
928 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
929 match self.load_anchor_leaf().await? {
930 Some((leaf, _)) => Ok(leaf.view_number()),
931 None => Ok(ViewNumber::genesis()),
932 }
933 }
934
935 async fn store_next_epoch_quorum_certificate(
936 &self,
937 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
938 ) -> anyhow::Result<()>;
939
940 async fn load_next_epoch_quorum_certificate(
941 &self,
942 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
943
944 async fn append_da2(
945 &self,
946 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
947 vid_commit: VidCommitment,
948 ) -> anyhow::Result<()>;
949
950 async fn append_proposal2(
951 &self,
952 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
953 ) -> anyhow::Result<()> {
954 self.append_quorum_proposal2(proposal).await
955 }
956
957 async fn store_drb_result(
958 &self,
959 epoch: <SeqTypes as NodeType>::Epoch,
960 drb_result: DrbResult,
961 ) -> anyhow::Result<()>;
962 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
963 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
964 async fn store_epoch_root(
965 &self,
966 epoch: <SeqTypes as NodeType>::Epoch,
967 block_header: <SeqTypes as NodeType>::BlockHeader,
968 ) -> anyhow::Result<()>;
969 async fn add_state_cert(
970 &self,
971 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
972 ) -> anyhow::Result<()>;
973
974 fn enable_metrics(&mut self, metrics: &dyn Metrics);
975}
976
977#[async_trait]
978pub trait EventConsumer: Debug + Send + Sync {
979 async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
980}
981
982#[async_trait]
983impl<T> EventConsumer for Box<T>
984where
985 T: EventConsumer + ?Sized,
986{
987 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
988 (**self).handle_event(event).await
989 }
990}
991
992#[derive(Clone, Copy, Debug)]
993pub struct NullEventConsumer;
994
995#[async_trait]
996impl EventConsumer for NullEventConsumer {
997 async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
998 Ok(())
999 }
1000}
1001
1002#[async_trait]
1003impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1004 async fn append_vid(
1005 &self,
1006 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1007 ) -> anyhow::Result<()> {
1008 (**self).append_vid(proposal).await
1009 }
1010
1011 async fn append_da(
1012 &self,
1013 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1014 vid_commit: VidCommitment,
1015 ) -> anyhow::Result<()> {
1016 (**self).append_da(proposal, vid_commit).await
1017 }
1018
1019 async fn append_da2(
1020 &self,
1021 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1022 vid_commit: VidCommitment,
1023 ) -> anyhow::Result<()> {
1024 (**self).append_da2(proposal, vid_commit).await
1025 }
1026
1027 async fn record_action(
1028 &self,
1029 view: ViewNumber,
1030 epoch: Option<EpochNumber>,
1031 action: HotShotAction,
1032 ) -> anyhow::Result<()> {
1033 (**self).record_action(view, epoch, action).await
1034 }
1035
1036 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1037 Ok(())
1038 }
1039
1040 async fn append_proposal(
1041 &self,
1042 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1043 ) -> anyhow::Result<()> {
1044 (**self)
1045 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1046 .await
1047 }
1048
1049 async fn append_proposal2(
1050 &self,
1051 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1052 ) -> anyhow::Result<()> {
1053 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1054 convert_proposal(proposal.clone());
1055 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1056 }
1057
1058 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1059 Ok(())
1060 }
1061
1062 async fn update_eqc(
1064 &self,
1065 high_qc: QuorumCertificate2<SeqTypes>,
1066 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1067 ) -> anyhow::Result<()> {
1068 if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1069 if high_qc.view_number() < existing_high_qc.view_number() {
1070 return Ok(());
1071 }
1072 }
1073
1074 (**self).store_eqc(high_qc, next_epoch_high_qc).await
1075 }
1076
1077 async fn update_next_epoch_high_qc2(
1078 &self,
1079 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1080 ) -> anyhow::Result<()> {
1081 Ok(())
1082 }
1083
1084 async fn update_decided_upgrade_certificate(
1085 &self,
1086 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1087 ) -> anyhow::Result<()> {
1088 (**self)
1089 .store_upgrade_certificate(decided_upgrade_certificate)
1090 .await
1091 }
1092
1093 async fn store_drb_result(
1094 &self,
1095 epoch: <SeqTypes as NodeType>::Epoch,
1096 drb_result: DrbResult,
1097 ) -> anyhow::Result<()> {
1098 (**self).store_drb_result(epoch, drb_result).await
1099 }
1100
1101 async fn store_epoch_root(
1102 &self,
1103 epoch: <SeqTypes as NodeType>::Epoch,
1104 block_header: <SeqTypes as NodeType>::BlockHeader,
1105 ) -> anyhow::Result<()> {
1106 (**self).store_epoch_root(epoch, block_header).await
1107 }
1108
1109 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1110 (**self).store_drb_input(drb_input).await
1111 }
1112
1113 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1114 (**self).load_drb_input(epoch).await
1115 }
1116
1117 async fn update_state_cert(
1118 &self,
1119 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1120 ) -> anyhow::Result<()> {
1121 (**self).add_state_cert(state_cert).await
1122 }
1123}
1124
1125pub trait FromNsPayloadBytes<'a> {
1130 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1132}
1133
1134pub trait NsPayloadBytesRange<'a> {
1139 type Output: FromNsPayloadBytes<'a>;
1140
1141 fn ns_payload_range(&self) -> Range<usize>;
1143}
1144
1145pub trait FromStringOrInteger: Sized {
1156 type Binary: Serialize + DeserializeOwned;
1157 type Integer: Serialize + DeserializeOwned;
1158
1159 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1160 fn from_string(s: String) -> anyhow::Result<Self>;
1161 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1162
1163 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1164 fn to_string(&self) -> anyhow::Result<String>;
1165}