1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13 data::{
14 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17 },
18 drb::{DrbInput, DrbResult},
19 event::{HotShotAction, LeafInfo},
20 message::{convert_proposal, Proposal},
21 simple_certificate::{
22 CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
23 QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
24 },
25 stake_table::HSStakeTable,
26 traits::{
27 metrics::Metrics,
28 node_implementation::{ConsensusTime, NodeType, Versions},
29 storage::Storage,
30 ValidatedState as HotShotState,
31 },
32 utils::genesis_epoch_from_version,
33 vote::HasViewNumber,
34};
35use indexmap::IndexMap;
36use serde::{de::DeserializeOwned, Serialize};
37
38use super::{
39 impls::NodeState,
40 utils::BackoffParams,
41 v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44 v0::impls::{StakeTableHash, ValidatedState},
45 v0_3::{
46 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
47 Validator,
48 },
49 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
50 BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
51 PubKey, SeqTypes, ValidatorMap,
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56 async fn try_fetch_leaf(
58 &self,
59 retry: usize,
60 height: u64,
61 stake_table: HSStakeTable<SeqTypes>,
62 success_threshold: U256,
63 ) -> anyhow::Result<Leaf2>;
64
65 async fn fetch_leaf(
67 &self,
68 height: u64,
69 stake_table: HSStakeTable<SeqTypes>,
70 success_threshold: U256,
71 ) -> anyhow::Result<Leaf2> {
72 self.backoff()
73 .retry(self, |provider, retry| {
74 let stake_table_clone = stake_table.clone();
75 async move {
76 provider
77 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78 .await
79 }
80 .boxed()
81 })
82 .await
83 }
84
85 async fn try_fetch_accounts(
87 &self,
88 retry: usize,
89 instance: &NodeState,
90 height: u64,
91 view: ViewNumber,
92 fee_merkle_tree_root: FeeMerkleCommitment,
93 accounts: &[FeeAccount],
94 ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96 async fn fetch_accounts(
98 &self,
99 instance: &NodeState,
100 height: u64,
101 view: ViewNumber,
102 fee_merkle_tree_root: FeeMerkleCommitment,
103 accounts: Vec<FeeAccount>,
104 ) -> anyhow::Result<Vec<FeeAccountProof>> {
105 self.backoff()
106 .retry(self, |provider, retry| {
107 let accounts = &accounts;
108 async move {
109 provider
110 .try_fetch_accounts(
111 retry,
112 instance,
113 height,
114 view,
115 fee_merkle_tree_root,
116 accounts,
117 )
118 .await
119 .map_err(|err| {
120 err.context(format!(
121 "fetching accounts {accounts:?}, height {height}, view {view}"
122 ))
123 })
124 }
125 .boxed()
126 })
127 .await
128 }
129
130 async fn try_remember_blocks_merkle_tree(
132 &self,
133 retry: usize,
134 instance: &NodeState,
135 height: u64,
136 view: ViewNumber,
137 mt: &mut BlockMerkleTree,
138 ) -> anyhow::Result<()>;
139
140 async fn remember_blocks_merkle_tree(
142 &self,
143 instance: &NodeState,
144 height: u64,
145 view: ViewNumber,
146 mt: &mut BlockMerkleTree,
147 ) -> anyhow::Result<()> {
148 self.backoff()
149 .retry(mt, |mt, retry| {
150 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152 .boxed()
153 })
154 .await
155 }
156
157 async fn try_fetch_chain_config(
159 &self,
160 retry: usize,
161 commitment: Commitment<ChainConfig>,
162 ) -> anyhow::Result<ChainConfig>;
163
164 async fn fetch_chain_config(
166 &self,
167 commitment: Commitment<ChainConfig>,
168 ) -> anyhow::Result<ChainConfig> {
169 self.backoff()
170 .retry(self, |provider, retry| {
171 provider
172 .try_fetch_chain_config(retry, commitment)
173 .map_err(|err| err.context("fetching chain config"))
174 .boxed()
175 })
176 .await
177 }
178
179 async fn try_fetch_reward_accounts_v2(
181 &self,
182 retry: usize,
183 instance: &NodeState,
184 height: u64,
185 view: ViewNumber,
186 reward_merkle_tree_root: RewardMerkleCommitmentV2,
187 accounts: &[RewardAccountV2],
188 ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
189
190 async fn fetch_reward_accounts_v2(
192 &self,
193 instance: &NodeState,
194 height: u64,
195 view: ViewNumber,
196 reward_merkle_tree_root: RewardMerkleCommitmentV2,
197 accounts: Vec<RewardAccountV2>,
198 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
199 self.backoff()
200 .retry(self, |provider, retry| {
201 let accounts = &accounts;
202 async move {
203 provider
204 .try_fetch_reward_accounts_v2(
205 retry,
206 instance,
207 height,
208 view,
209 reward_merkle_tree_root,
210 accounts,
211 )
212 .await
213 .map_err(|err| {
214 err.context(format!(
215 "fetching reward accounts {accounts:?}, height {height}, view \
216 {view}"
217 ))
218 })
219 }
220 .boxed()
221 })
222 .await
223 }
224
225 async fn try_fetch_reward_accounts_v1(
227 &self,
228 retry: usize,
229 instance: &NodeState,
230 height: u64,
231 view: ViewNumber,
232 reward_merkle_tree_root: RewardMerkleCommitmentV1,
233 accounts: &[RewardAccountV1],
234 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
235
236 async fn fetch_reward_accounts_v1(
238 &self,
239 instance: &NodeState,
240 height: u64,
241 view: ViewNumber,
242 reward_merkle_tree_root: RewardMerkleCommitmentV1,
243 accounts: Vec<RewardAccountV1>,
244 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
245 self.backoff()
246 .retry(self, |provider, retry| {
247 let accounts = &accounts;
248 async move {
249 provider
250 .try_fetch_reward_accounts_v1(
251 retry,
252 instance,
253 height,
254 view,
255 reward_merkle_tree_root,
256 accounts,
257 )
258 .await
259 .map_err(|err| {
260 err.context(format!(
261 "fetching v1 reward accounts {accounts:?}, height {height}, view \
262 {view}"
263 ))
264 })
265 }
266 .boxed()
267 })
268 .await
269 }
270
271 async fn try_fetch_state_cert(
273 &self,
274 retry: usize,
275 epoch: u64,
276 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
277
278 async fn fetch_state_cert(
280 &self,
281 epoch: u64,
282 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
283 self.backoff()
284 .retry(self, |provider, retry| {
285 provider
286 .try_fetch_state_cert(retry, epoch)
287 .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
288 .boxed()
289 })
290 .await
291 }
292
293 fn is_local(&self) -> bool;
295
296 fn backoff(&self) -> &BackoffParams;
298
299 fn name(&self) -> String;
301}
302
303#[async_trait]
304impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
305 async fn try_fetch_leaf(
306 &self,
307 retry: usize,
308 height: u64,
309 stake_table: HSStakeTable<SeqTypes>,
310 success_threshold: U256,
311 ) -> anyhow::Result<Leaf2> {
312 (**self)
313 .try_fetch_leaf(retry, height, stake_table, success_threshold)
314 .await
315 }
316
317 async fn fetch_leaf(
318 &self,
319 height: u64,
320 stake_table: HSStakeTable<SeqTypes>,
321 success_threshold: U256,
322 ) -> anyhow::Result<Leaf2> {
323 (**self)
324 .fetch_leaf(height, stake_table, success_threshold)
325 .await
326 }
327 async fn try_fetch_accounts(
328 &self,
329 retry: usize,
330 instance: &NodeState,
331 height: u64,
332 view: ViewNumber,
333 fee_merkle_tree_root: FeeMerkleCommitment,
334 accounts: &[FeeAccount],
335 ) -> anyhow::Result<Vec<FeeAccountProof>> {
336 (**self)
337 .try_fetch_accounts(
338 retry,
339 instance,
340 height,
341 view,
342 fee_merkle_tree_root,
343 accounts,
344 )
345 .await
346 }
347
348 async fn fetch_accounts(
349 &self,
350 instance: &NodeState,
351 height: u64,
352 view: ViewNumber,
353 fee_merkle_tree_root: FeeMerkleCommitment,
354 accounts: Vec<FeeAccount>,
355 ) -> anyhow::Result<Vec<FeeAccountProof>> {
356 (**self)
357 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
358 .await
359 }
360
361 async fn try_remember_blocks_merkle_tree(
362 &self,
363 retry: usize,
364 instance: &NodeState,
365 height: u64,
366 view: ViewNumber,
367 mt: &mut BlockMerkleTree,
368 ) -> anyhow::Result<()> {
369 (**self)
370 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
371 .await
372 }
373
374 async fn remember_blocks_merkle_tree(
375 &self,
376 instance: &NodeState,
377 height: u64,
378 view: ViewNumber,
379 mt: &mut BlockMerkleTree,
380 ) -> anyhow::Result<()> {
381 (**self)
382 .remember_blocks_merkle_tree(instance, height, view, mt)
383 .await
384 }
385
386 async fn try_fetch_chain_config(
387 &self,
388 retry: usize,
389 commitment: Commitment<ChainConfig>,
390 ) -> anyhow::Result<ChainConfig> {
391 (**self).try_fetch_chain_config(retry, commitment).await
392 }
393
394 async fn fetch_chain_config(
395 &self,
396 commitment: Commitment<ChainConfig>,
397 ) -> anyhow::Result<ChainConfig> {
398 (**self).fetch_chain_config(commitment).await
399 }
400
401 async fn try_fetch_reward_accounts_v2(
402 &self,
403 retry: usize,
404 instance: &NodeState,
405 height: u64,
406 view: ViewNumber,
407 reward_merkle_tree_root: RewardMerkleCommitmentV2,
408 accounts: &[RewardAccountV2],
409 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
410 (**self)
411 .try_fetch_reward_accounts_v2(
412 retry,
413 instance,
414 height,
415 view,
416 reward_merkle_tree_root,
417 accounts,
418 )
419 .await
420 }
421
422 async fn fetch_reward_accounts_v2(
423 &self,
424 instance: &NodeState,
425 height: u64,
426 view: ViewNumber,
427 reward_merkle_tree_root: RewardMerkleCommitmentV2,
428 accounts: Vec<RewardAccountV2>,
429 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
430 (**self)
431 .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
432 .await
433 }
434
435 async fn try_fetch_reward_accounts_v1(
436 &self,
437 retry: usize,
438 instance: &NodeState,
439 height: u64,
440 view: ViewNumber,
441 reward_merkle_tree_root: RewardMerkleCommitmentV1,
442 accounts: &[RewardAccountV1],
443 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
444 (**self)
445 .try_fetch_reward_accounts_v1(
446 retry,
447 instance,
448 height,
449 view,
450 reward_merkle_tree_root,
451 accounts,
452 )
453 .await
454 }
455
456 async fn fetch_reward_accounts_v1(
457 &self,
458 instance: &NodeState,
459 height: u64,
460 view: ViewNumber,
461 reward_merkle_tree_root: RewardMerkleCommitmentV1,
462 accounts: Vec<RewardAccountV1>,
463 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
464 (**self)
465 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
466 .await
467 }
468
469 async fn try_fetch_state_cert(
470 &self,
471 retry: usize,
472 epoch: u64,
473 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
474 (**self).try_fetch_state_cert(retry, epoch).await
475 }
476
477 async fn fetch_state_cert(
478 &self,
479 epoch: u64,
480 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
481 (**self).fetch_state_cert(epoch).await
482 }
483
484 fn backoff(&self) -> &BackoffParams {
485 (**self).backoff()
486 }
487
488 fn name(&self) -> String {
489 (**self).name()
490 }
491
492 fn is_local(&self) -> bool {
493 (**self).is_local()
494 }
495}
496
497#[async_trait]
498pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
499 type Persistence: SequencerPersistence + MembershipPersistence;
500
501 fn set_view_retention(&mut self, view_retention: u64);
502 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
503 async fn reset(self) -> anyhow::Result<()>;
504}
505
506pub enum EventsPersistenceRead {
510 Complete,
511 UntilL1Block(u64),
512}
513
514#[async_trait]
515pub trait MembershipPersistence: Send + Sync + 'static {
517 async fn load_stake(
519 &self,
520 epoch: EpochNumber,
521 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
522
523 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
525
526 async fn store_stake(
528 &self,
529 epoch: EpochNumber,
530 stake: ValidatorMap,
531 block_reward: Option<RewardAmount>,
532 stake_table_hash: Option<StakeTableHash>,
533 ) -> anyhow::Result<()>;
534
535 async fn store_events(
536 &self,
537 l1_finalized: u64,
538 events: Vec<(EventKey, StakeTableEvent)>,
539 ) -> anyhow::Result<()>;
540 async fn load_events(
541 &self,
542 l1_finalized: u64,
543 ) -> anyhow::Result<(
544 Option<EventsPersistenceRead>,
545 Vec<(EventKey, StakeTableEvent)>,
546 )>;
547
548 async fn store_all_validators(
549 &self,
550 epoch: EpochNumber,
551 all_validators: IndexMap<Address, Validator<PubKey>>,
552 ) -> anyhow::Result<()>;
553
554 async fn load_all_validators(
555 &self,
556 epoch: EpochNumber,
557 offset: u64,
558 limit: u64,
559 ) -> anyhow::Result<Vec<Validator<PubKey>>>;
560}
561
562#[async_trait]
563pub trait SequencerPersistence:
564 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
565{
566 fn into_catchup_provider(
568 self,
569 _backoff: BackoffParams,
570 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
571 bail!("state catchup is not implemented for this persistence type");
572 }
573
574 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
579
580 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
582
583 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
585
586 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
588
589 async fn load_quorum_proposals(
591 &self,
592 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
593
594 async fn load_quorum_proposal(
595 &self,
596 view: ViewNumber,
597 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
598
599 async fn load_vid_share(
600 &self,
601 view: ViewNumber,
602 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
603 async fn load_da_proposal(
604 &self,
605 view: ViewNumber,
606 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
607 async fn load_upgrade_certificate(
608 &self,
609 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
610 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
611 async fn load_state_cert(
612 &self,
613 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
614
615 async fn get_state_cert_by_epoch(
617 &self,
618 epoch: u64,
619 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
620
621 async fn insert_state_cert(
623 &self,
624 epoch: u64,
625 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
626 ) -> anyhow::Result<()>;
627
628 async fn load_consensus_state<V: Versions>(
635 &self,
636 state: NodeState,
637 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
638 let genesis_validated_state = ValidatedState::genesis(&state).0;
639 let highest_voted_view = match self
640 .load_latest_acted_view()
641 .await
642 .context("loading last voted view")?
643 {
644 Some(view) => {
645 tracing::info!(?view, "starting with last actioned view");
646 view
647 },
648 None => {
649 tracing::info!("no saved view, starting from genesis");
650 ViewNumber::genesis()
651 },
652 };
653
654 let restart_view = match self
655 .load_restart_view()
656 .await
657 .context("loading restart view")?
658 {
659 Some(view) => {
660 tracing::info!(?view, "starting from saved view");
661 view
662 },
663 None => {
664 tracing::info!("no saved view, starting from genesis");
665 ViewNumber::genesis()
666 },
667 };
668 let next_epoch_high_qc = self
669 .load_next_epoch_quorum_certificate()
670 .await
671 .context("loading next epoch qc")?;
672 let (leaf, mut high_qc, anchor_view) = match self
673 .load_anchor_leaf()
674 .await
675 .context("loading anchor leaf")?
676 {
677 Some((leaf, high_qc)) => {
678 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
679 ensure!(
680 leaf.view_number() == high_qc.view_number,
681 format!(
682 "loaded anchor leaf from view {}, but high QC is from view {}",
683 leaf.view_number(),
684 high_qc.view_number
685 )
686 );
687
688 let anchor_view = leaf.view_number();
689 (leaf, high_qc, Some(anchor_view))
690 },
691 None => {
692 tracing::info!("no saved leaf, starting from genesis leaf");
693 (
694 hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
695 .await,
696 QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
697 None,
698 )
699 },
700 };
701
702 if let Some((extended_high_qc, _)) = self.load_eqc().await {
703 if extended_high_qc.view_number() > high_qc.view_number() {
704 high_qc = extended_high_qc
705 }
706 }
707
708 let validated_state = if leaf.block_header().height() == 0 {
709 genesis_validated_state
711 } else {
712 ValidatedState::from_header(leaf.block_header())
715 };
716
717 let restart_view = max(restart_view, leaf.view_number());
722 let epoch = genesis_epoch_from_version::<V, SeqTypes>();
724
725 let config = self.load_config().await.context("loading config")?;
726 let epoch_height = config
727 .as_ref()
728 .map(|c| c.config.epoch_height)
729 .unwrap_or_default();
730 let epoch_start_block = config
731 .as_ref()
732 .map(|c| c.config.epoch_start_block)
733 .unwrap_or_default();
734
735 let saved_proposals = self
736 .load_quorum_proposals()
737 .await
738 .context("loading saved proposals")?;
739
740 let upgrade_certificate = self
741 .load_upgrade_certificate()
742 .await
743 .context("loading upgrade certificate")?;
744
745 let start_epoch_info = self
746 .load_start_epoch_info()
747 .await
748 .context("loading start epoch info")?;
749
750 let state_cert = self
751 .load_state_cert()
752 .await
753 .context("loading light client state update certificate")?;
754
755 tracing::warn!(
756 ?leaf,
757 ?restart_view,
758 ?epoch,
759 ?high_qc,
760 ?validated_state,
761 ?state_cert,
762 "loaded consensus state"
763 );
764
765 Ok((
766 HotShotInitializer {
767 instance_state: state,
768 epoch_height,
769 epoch_start_block,
770 anchor_leaf: leaf,
771 anchor_state: Arc::new(validated_state),
772 anchor_state_delta: None,
773 start_view: restart_view,
774 start_epoch: epoch,
775 last_actioned_view: highest_voted_view,
776 saved_proposals,
777 high_qc,
778 next_epoch_high_qc,
779 decided_upgrade_certificate: upgrade_certificate,
780 undecided_leaves: Default::default(),
781 undecided_state: Default::default(),
782 saved_vid_shares: Default::default(), start_epoch_info,
784 state_cert,
785 },
786 anchor_view,
787 ))
788 }
789
790 async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
792 if let EventType::Decide {
793 leaf_chain,
794 committing_qc,
795 deciding_qc,
796 ..
797 } = &event.event
798 {
799 let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
800 return;
802 };
803
804 let chain = leaf_chain.iter().zip(
806 std::iter::once((**committing_qc).clone())
808 .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
811 );
812
813 if let Err(err) = self
814 .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
815 .await
816 {
817 tracing::error!(
818 "failed to save decided leaves, chain may not be up to date: {err:#}"
819 );
820 return;
821 }
822 }
823 }
824
825 async fn append_decided_leaves(
851 &self,
852 decided_view: ViewNumber,
853 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
854 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
855 consumer: &(impl EventConsumer + 'static),
856 ) -> anyhow::Result<()>;
857
858 async fn load_anchor_leaf(
859 &self,
860 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
861 async fn append_vid(
862 &self,
863 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
864 ) -> anyhow::Result<()>;
865 async fn append_vid2(
867 &self,
868 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
869 ) -> anyhow::Result<()>;
870 async fn append_da(
871 &self,
872 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
873 vid_commit: VidCommitment,
874 ) -> anyhow::Result<()>;
875 async fn record_action(
876 &self,
877 view: ViewNumber,
878 epoch: Option<EpochNumber>,
879 action: HotShotAction,
880 ) -> anyhow::Result<()>;
881
882 async fn append_quorum_proposal2(
883 &self,
884 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
885 ) -> anyhow::Result<()>;
886
887 async fn store_eqc(
889 &self,
890 _high_qc: QuorumCertificate2<SeqTypes>,
891 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
892 ) -> anyhow::Result<()>;
893
894 async fn load_eqc(
896 &self,
897 ) -> Option<(
898 QuorumCertificate2<SeqTypes>,
899 NextEpochQuorumCertificate2<SeqTypes>,
900 )>;
901
902 async fn store_upgrade_certificate(
903 &self,
904 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
905 ) -> anyhow::Result<()>;
906
907 async fn migrate_stake_table_events(&self) -> anyhow::Result<()>;
908
909 async fn migrate_storage(&self) -> anyhow::Result<()> {
910 tracing::warn!("migrating consensus data...");
911
912 self.migrate_anchor_leaf().await?;
913 self.migrate_da_proposals().await?;
914 self.migrate_vid_shares().await?;
915 self.migrate_quorum_proposals().await?;
916 self.migrate_quorum_certificates().await?;
917
918 tracing::warn!("consensus storage has been migrated to new types");
919
920 tracing::warn!("migrating stake table events");
921 self.migrate_stake_table_events().await?;
922 tracing::warn!("stake table events have been migrated");
923
924 Ok(())
925 }
926
927 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
928 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
929 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
930 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
931 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
932
933 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
934 match self.load_anchor_leaf().await? {
935 Some((leaf, _)) => Ok(leaf.view_number()),
936 None => Ok(ViewNumber::genesis()),
937 }
938 }
939
940 async fn store_next_epoch_quorum_certificate(
941 &self,
942 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
943 ) -> anyhow::Result<()>;
944
945 async fn load_next_epoch_quorum_certificate(
946 &self,
947 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
948
949 async fn append_da2(
950 &self,
951 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
952 vid_commit: VidCommitment,
953 ) -> anyhow::Result<()>;
954
955 async fn append_proposal2(
956 &self,
957 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
958 ) -> anyhow::Result<()> {
959 self.append_quorum_proposal2(proposal).await
960 }
961
962 async fn store_drb_result(
963 &self,
964 epoch: <SeqTypes as NodeType>::Epoch,
965 drb_result: DrbResult,
966 ) -> anyhow::Result<()>;
967 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
968 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
969 async fn store_epoch_root(
970 &self,
971 epoch: <SeqTypes as NodeType>::Epoch,
972 block_header: <SeqTypes as NodeType>::BlockHeader,
973 ) -> anyhow::Result<()>;
974 async fn add_state_cert(
975 &self,
976 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
977 ) -> anyhow::Result<()>;
978
979 fn enable_metrics(&mut self, metrics: &dyn Metrics);
980}
981
982#[async_trait]
983pub trait EventConsumer: Debug + Send + Sync {
984 async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
985}
986
987#[async_trait]
988impl<T> EventConsumer for Box<T>
989where
990 T: EventConsumer + ?Sized,
991{
992 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
993 (**self).handle_event(event).await
994 }
995}
996
997#[derive(Clone, Copy, Debug)]
998pub struct NullEventConsumer;
999
1000#[async_trait]
1001impl EventConsumer for NullEventConsumer {
1002 async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
1003 Ok(())
1004 }
1005}
1006
1007#[async_trait]
1008impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1009 async fn append_vid(
1010 &self,
1011 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
1012 ) -> anyhow::Result<()> {
1013 (**self).append_vid(proposal).await
1014 }
1015
1016 async fn append_vid2(
1017 &self,
1018 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
1019 ) -> anyhow::Result<()> {
1020 (**self).append_vid2(proposal).await
1021 }
1022
1023 async fn append_da(
1024 &self,
1025 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1026 vid_commit: VidCommitment,
1027 ) -> anyhow::Result<()> {
1028 (**self).append_da(proposal, vid_commit).await
1029 }
1030
1031 async fn append_da2(
1032 &self,
1033 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1034 vid_commit: VidCommitment,
1035 ) -> anyhow::Result<()> {
1036 (**self).append_da2(proposal, vid_commit).await
1037 }
1038
1039 async fn record_action(
1040 &self,
1041 view: ViewNumber,
1042 epoch: Option<EpochNumber>,
1043 action: HotShotAction,
1044 ) -> anyhow::Result<()> {
1045 (**self).record_action(view, epoch, action).await
1046 }
1047
1048 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1049 Ok(())
1050 }
1051
1052 async fn append_proposal(
1053 &self,
1054 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1055 ) -> anyhow::Result<()> {
1056 (**self)
1057 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1058 .await
1059 }
1060
1061 async fn append_proposal2(
1062 &self,
1063 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1064 ) -> anyhow::Result<()> {
1065 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1066 convert_proposal(proposal.clone());
1067 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1068 }
1069
1070 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1071 Ok(())
1072 }
1073
1074 async fn update_eqc(
1076 &self,
1077 high_qc: QuorumCertificate2<SeqTypes>,
1078 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1079 ) -> anyhow::Result<()> {
1080 if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1081 if high_qc.view_number() < existing_high_qc.view_number() {
1082 return Ok(());
1083 }
1084 }
1085
1086 (**self).store_eqc(high_qc, next_epoch_high_qc).await
1087 }
1088
1089 async fn update_next_epoch_high_qc2(
1090 &self,
1091 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1092 ) -> anyhow::Result<()> {
1093 Ok(())
1094 }
1095
1096 async fn update_decided_upgrade_certificate(
1097 &self,
1098 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1099 ) -> anyhow::Result<()> {
1100 (**self)
1101 .store_upgrade_certificate(decided_upgrade_certificate)
1102 .await
1103 }
1104
1105 async fn store_drb_result(
1106 &self,
1107 epoch: <SeqTypes as NodeType>::Epoch,
1108 drb_result: DrbResult,
1109 ) -> anyhow::Result<()> {
1110 (**self).store_drb_result(epoch, drb_result).await
1111 }
1112
1113 async fn store_epoch_root(
1114 &self,
1115 epoch: <SeqTypes as NodeType>::Epoch,
1116 block_header: <SeqTypes as NodeType>::BlockHeader,
1117 ) -> anyhow::Result<()> {
1118 (**self).store_epoch_root(epoch, block_header).await
1119 }
1120
1121 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1122 (**self).store_drb_input(drb_input).await
1123 }
1124
1125 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1126 (**self).load_drb_input(epoch).await
1127 }
1128
1129 async fn update_state_cert(
1130 &self,
1131 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1132 ) -> anyhow::Result<()> {
1133 (**self).add_state_cert(state_cert).await
1134 }
1135}
1136
1137pub trait FromNsPayloadBytes<'a> {
1142 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1144}
1145
1146pub trait NsPayloadBytesRange<'a> {
1151 type Output: FromNsPayloadBytes<'a>;
1152
1153 fn ns_payload_range(&self) -> Range<usize>;
1155}
1156
1157pub trait FromStringOrInteger: Sized {
1168 type Binary: Serialize + DeserializeOwned;
1169 type Integer: Serialize + DeserializeOwned;
1170
1171 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1172 fn from_string(s: String) -> anyhow::Result<Self>;
1173 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1174
1175 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1176 fn to_string(&self) -> anyhow::Result<String>;
1177}