1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13 data::{
14 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17 },
18 drb::{DrbInput, DrbResult},
19 event::{HotShotAction, LeafInfo},
20 message::{convert_proposal, Proposal},
21 simple_certificate::{
22 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
23 QuorumCertificate2, UpgradeCertificate,
24 },
25 stake_table::HSStakeTable,
26 traits::{
27 metrics::Metrics,
28 node_implementation::{ConsensusTime, NodeType, Versions},
29 storage::Storage,
30 ValidatedState as HotShotState,
31 },
32 utils::genesis_epoch_from_version,
33 vote::HasViewNumber,
34};
35use indexmap::IndexMap;
36use serde::{de::DeserializeOwned, Serialize};
37
38use super::{
39 impls::NodeState,
40 utils::BackoffParams,
41 v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44 v0::impls::{StakeTableHash, ValidatedState},
45 v0_3::{
46 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
47 Validator,
48 },
49 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
50 BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
51 PubKey, SeqTypes, ValidatorMap,
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56 async fn try_fetch_leaf(
58 &self,
59 retry: usize,
60 height: u64,
61 stake_table: HSStakeTable<SeqTypes>,
62 success_threshold: U256,
63 ) -> anyhow::Result<Leaf2>;
64
65 async fn fetch_leaf(
67 &self,
68 height: u64,
69 stake_table: HSStakeTable<SeqTypes>,
70 success_threshold: U256,
71 ) -> anyhow::Result<Leaf2> {
72 self.backoff()
73 .retry(self, |provider, retry| {
74 let stake_table_clone = stake_table.clone();
75 async move {
76 provider
77 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78 .await
79 }
80 .boxed()
81 })
82 .await
83 }
84
85 async fn try_fetch_accounts(
87 &self,
88 retry: usize,
89 instance: &NodeState,
90 height: u64,
91 view: ViewNumber,
92 fee_merkle_tree_root: FeeMerkleCommitment,
93 accounts: &[FeeAccount],
94 ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96 async fn fetch_accounts(
98 &self,
99 instance: &NodeState,
100 height: u64,
101 view: ViewNumber,
102 fee_merkle_tree_root: FeeMerkleCommitment,
103 accounts: Vec<FeeAccount>,
104 ) -> anyhow::Result<Vec<FeeAccountProof>> {
105 self.backoff()
106 .retry(self, |provider, retry| {
107 let accounts = &accounts;
108 async move {
109 provider
110 .try_fetch_accounts(
111 retry,
112 instance,
113 height,
114 view,
115 fee_merkle_tree_root,
116 accounts,
117 )
118 .await
119 .map_err(|err| {
120 err.context(format!(
121 "fetching accounts {accounts:?}, height {height}, view {view}"
122 ))
123 })
124 }
125 .boxed()
126 })
127 .await
128 }
129
130 async fn try_remember_blocks_merkle_tree(
132 &self,
133 retry: usize,
134 instance: &NodeState,
135 height: u64,
136 view: ViewNumber,
137 mt: &mut BlockMerkleTree,
138 ) -> anyhow::Result<()>;
139
140 async fn remember_blocks_merkle_tree(
142 &self,
143 instance: &NodeState,
144 height: u64,
145 view: ViewNumber,
146 mt: &mut BlockMerkleTree,
147 ) -> anyhow::Result<()> {
148 self.backoff()
149 .retry(mt, |mt, retry| {
150 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152 .boxed()
153 })
154 .await
155 }
156
157 async fn try_fetch_chain_config(
159 &self,
160 retry: usize,
161 commitment: Commitment<ChainConfig>,
162 ) -> anyhow::Result<ChainConfig>;
163
164 async fn fetch_chain_config(
166 &self,
167 commitment: Commitment<ChainConfig>,
168 ) -> anyhow::Result<ChainConfig> {
169 self.backoff()
170 .retry(self, |provider, retry| {
171 provider
172 .try_fetch_chain_config(retry, commitment)
173 .map_err(|err| err.context("fetching chain config"))
174 .boxed()
175 })
176 .await
177 }
178
179 async fn try_fetch_reward_accounts_v2(
181 &self,
182 retry: usize,
183 instance: &NodeState,
184 height: u64,
185 view: ViewNumber,
186 reward_merkle_tree_root: RewardMerkleCommitmentV2,
187 accounts: &[RewardAccountV2],
188 ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
189
190 async fn fetch_reward_accounts_v2(
192 &self,
193 instance: &NodeState,
194 height: u64,
195 view: ViewNumber,
196 reward_merkle_tree_root: RewardMerkleCommitmentV2,
197 accounts: Vec<RewardAccountV2>,
198 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
199 self.backoff()
200 .retry(self, |provider, retry| {
201 let accounts = &accounts;
202 async move {
203 provider
204 .try_fetch_reward_accounts_v2(
205 retry,
206 instance,
207 height,
208 view,
209 reward_merkle_tree_root,
210 accounts,
211 )
212 .await
213 .map_err(|err| {
214 err.context(format!(
215 "fetching reward accounts {accounts:?}, height {height}, view \
216 {view}"
217 ))
218 })
219 }
220 .boxed()
221 })
222 .await
223 }
224
225 async fn try_fetch_reward_accounts_v1(
227 &self,
228 retry: usize,
229 instance: &NodeState,
230 height: u64,
231 view: ViewNumber,
232 reward_merkle_tree_root: RewardMerkleCommitmentV1,
233 accounts: &[RewardAccountV1],
234 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
235
236 async fn fetch_reward_accounts_v1(
238 &self,
239 instance: &NodeState,
240 height: u64,
241 view: ViewNumber,
242 reward_merkle_tree_root: RewardMerkleCommitmentV1,
243 accounts: Vec<RewardAccountV1>,
244 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
245 self.backoff()
246 .retry(self, |provider, retry| {
247 let accounts = &accounts;
248 async move {
249 provider
250 .try_fetch_reward_accounts_v1(
251 retry,
252 instance,
253 height,
254 view,
255 reward_merkle_tree_root,
256 accounts,
257 )
258 .await
259 .map_err(|err| {
260 err.context(format!(
261 "fetching v1 reward accounts {accounts:?}, height {height}, view \
262 {view}"
263 ))
264 })
265 }
266 .boxed()
267 })
268 .await
269 }
270
271 fn is_local(&self) -> bool;
273
274 fn backoff(&self) -> &BackoffParams;
276
277 fn name(&self) -> String;
279}
280
281#[async_trait]
282impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
283 async fn try_fetch_leaf(
284 &self,
285 retry: usize,
286 height: u64,
287 stake_table: HSStakeTable<SeqTypes>,
288 success_threshold: U256,
289 ) -> anyhow::Result<Leaf2> {
290 (**self)
291 .try_fetch_leaf(retry, height, stake_table, success_threshold)
292 .await
293 }
294
295 async fn fetch_leaf(
296 &self,
297 height: u64,
298 stake_table: HSStakeTable<SeqTypes>,
299 success_threshold: U256,
300 ) -> anyhow::Result<Leaf2> {
301 (**self)
302 .fetch_leaf(height, stake_table, success_threshold)
303 .await
304 }
305 async fn try_fetch_accounts(
306 &self,
307 retry: usize,
308 instance: &NodeState,
309 height: u64,
310 view: ViewNumber,
311 fee_merkle_tree_root: FeeMerkleCommitment,
312 accounts: &[FeeAccount],
313 ) -> anyhow::Result<Vec<FeeAccountProof>> {
314 (**self)
315 .try_fetch_accounts(
316 retry,
317 instance,
318 height,
319 view,
320 fee_merkle_tree_root,
321 accounts,
322 )
323 .await
324 }
325
326 async fn fetch_accounts(
327 &self,
328 instance: &NodeState,
329 height: u64,
330 view: ViewNumber,
331 fee_merkle_tree_root: FeeMerkleCommitment,
332 accounts: Vec<FeeAccount>,
333 ) -> anyhow::Result<Vec<FeeAccountProof>> {
334 (**self)
335 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
336 .await
337 }
338
339 async fn try_remember_blocks_merkle_tree(
340 &self,
341 retry: usize,
342 instance: &NodeState,
343 height: u64,
344 view: ViewNumber,
345 mt: &mut BlockMerkleTree,
346 ) -> anyhow::Result<()> {
347 (**self)
348 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
349 .await
350 }
351
352 async fn remember_blocks_merkle_tree(
353 &self,
354 instance: &NodeState,
355 height: u64,
356 view: ViewNumber,
357 mt: &mut BlockMerkleTree,
358 ) -> anyhow::Result<()> {
359 (**self)
360 .remember_blocks_merkle_tree(instance, height, view, mt)
361 .await
362 }
363
364 async fn try_fetch_chain_config(
365 &self,
366 retry: usize,
367 commitment: Commitment<ChainConfig>,
368 ) -> anyhow::Result<ChainConfig> {
369 (**self).try_fetch_chain_config(retry, commitment).await
370 }
371
372 async fn fetch_chain_config(
373 &self,
374 commitment: Commitment<ChainConfig>,
375 ) -> anyhow::Result<ChainConfig> {
376 (**self).fetch_chain_config(commitment).await
377 }
378
379 async fn try_fetch_reward_accounts_v2(
380 &self,
381 retry: usize,
382 instance: &NodeState,
383 height: u64,
384 view: ViewNumber,
385 reward_merkle_tree_root: RewardMerkleCommitmentV2,
386 accounts: &[RewardAccountV2],
387 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
388 (**self)
389 .try_fetch_reward_accounts_v2(
390 retry,
391 instance,
392 height,
393 view,
394 reward_merkle_tree_root,
395 accounts,
396 )
397 .await
398 }
399
400 async fn fetch_reward_accounts_v2(
401 &self,
402 instance: &NodeState,
403 height: u64,
404 view: ViewNumber,
405 reward_merkle_tree_root: RewardMerkleCommitmentV2,
406 accounts: Vec<RewardAccountV2>,
407 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
408 (**self)
409 .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
410 .await
411 }
412
413 async fn try_fetch_reward_accounts_v1(
414 &self,
415 retry: usize,
416 instance: &NodeState,
417 height: u64,
418 view: ViewNumber,
419 reward_merkle_tree_root: RewardMerkleCommitmentV1,
420 accounts: &[RewardAccountV1],
421 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
422 (**self)
423 .try_fetch_reward_accounts_v1(
424 retry,
425 instance,
426 height,
427 view,
428 reward_merkle_tree_root,
429 accounts,
430 )
431 .await
432 }
433
434 async fn fetch_reward_accounts_v1(
435 &self,
436 instance: &NodeState,
437 height: u64,
438 view: ViewNumber,
439 reward_merkle_tree_root: RewardMerkleCommitmentV1,
440 accounts: Vec<RewardAccountV1>,
441 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
442 (**self)
443 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
444 .await
445 }
446
447 fn backoff(&self) -> &BackoffParams {
448 (**self).backoff()
449 }
450
451 fn name(&self) -> String {
452 (**self).name()
453 }
454
455 fn is_local(&self) -> bool {
456 (**self).is_local()
457 }
458}
459
460#[async_trait]
461pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
462 type Persistence: SequencerPersistence + MembershipPersistence;
463
464 fn set_view_retention(&mut self, view_retention: u64);
465 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
466 async fn reset(self) -> anyhow::Result<()>;
467}
468
469pub enum EventsPersistenceRead {
473 Complete,
474 UntilL1Block(u64),
475}
476
477#[async_trait]
478pub trait MembershipPersistence: Send + Sync + 'static {
480 async fn load_stake(
482 &self,
483 epoch: EpochNumber,
484 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
485
486 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
488
489 async fn store_stake(
491 &self,
492 epoch: EpochNumber,
493 stake: ValidatorMap,
494 block_reward: Option<RewardAmount>,
495 stake_table_hash: Option<StakeTableHash>,
496 ) -> anyhow::Result<()>;
497
498 async fn store_events(
499 &self,
500 l1_finalized: u64,
501 events: Vec<(EventKey, StakeTableEvent)>,
502 ) -> anyhow::Result<()>;
503 async fn load_events(
504 &self,
505 l1_finalized: u64,
506 ) -> anyhow::Result<(
507 Option<EventsPersistenceRead>,
508 Vec<(EventKey, StakeTableEvent)>,
509 )>;
510
511 async fn store_all_validators(
512 &self,
513 epoch: EpochNumber,
514 all_validators: IndexMap<Address, Validator<PubKey>>,
515 ) -> anyhow::Result<()>;
516
517 async fn load_all_validators(
518 &self,
519 epoch: EpochNumber,
520 offset: u64,
521 limit: u64,
522 ) -> anyhow::Result<Vec<Validator<PubKey>>>;
523}
524
525#[async_trait]
526pub trait SequencerPersistence:
527 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
528{
529 fn into_catchup_provider(
531 self,
532 _backoff: BackoffParams,
533 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
534 bail!("state catchup is not implemented for this persistence type");
535 }
536
537 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
542
543 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
545
546 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
548
549 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
551
552 async fn load_quorum_proposals(
554 &self,
555 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
556
557 async fn load_quorum_proposal(
558 &self,
559 view: ViewNumber,
560 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
561
562 async fn load_vid_share(
563 &self,
564 view: ViewNumber,
565 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
566 async fn load_da_proposal(
567 &self,
568 view: ViewNumber,
569 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
570 async fn load_upgrade_certificate(
571 &self,
572 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
573 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
574 async fn load_state_cert(
575 &self,
576 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
577
578 async fn load_consensus_state<V: Versions>(
585 &self,
586 state: NodeState,
587 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
588 let genesis_validated_state = ValidatedState::genesis(&state).0;
589 let highest_voted_view = match self
590 .load_latest_acted_view()
591 .await
592 .context("loading last voted view")?
593 {
594 Some(view) => {
595 tracing::info!(?view, "starting with last actioned view");
596 view
597 },
598 None => {
599 tracing::info!("no saved view, starting from genesis");
600 ViewNumber::genesis()
601 },
602 };
603
604 let restart_view = match self
605 .load_restart_view()
606 .await
607 .context("loading restart view")?
608 {
609 Some(view) => {
610 tracing::info!(?view, "starting from saved view");
611 view
612 },
613 None => {
614 tracing::info!("no saved view, starting from genesis");
615 ViewNumber::genesis()
616 },
617 };
618 let next_epoch_high_qc = self
619 .load_next_epoch_quorum_certificate()
620 .await
621 .context("loading next epoch qc")?;
622 let (leaf, mut high_qc, anchor_view) = match self
623 .load_anchor_leaf()
624 .await
625 .context("loading anchor leaf")?
626 {
627 Some((leaf, high_qc)) => {
628 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
629 ensure!(
630 leaf.view_number() == high_qc.view_number,
631 format!(
632 "loaded anchor leaf from view {}, but high QC is from view {}",
633 leaf.view_number(),
634 high_qc.view_number
635 )
636 );
637
638 let anchor_view = leaf.view_number();
639 (leaf, high_qc, Some(anchor_view))
640 },
641 None => {
642 tracing::info!("no saved leaf, starting from genesis leaf");
643 (
644 hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
645 .await,
646 QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
647 None,
648 )
649 },
650 };
651
652 if let Some((extended_high_qc, _)) = self.load_eqc().await {
653 if extended_high_qc.view_number() > high_qc.view_number() {
654 high_qc = extended_high_qc
655 }
656 }
657
658 let validated_state = if leaf.block_header().height() == 0 {
659 genesis_validated_state
661 } else {
662 ValidatedState::from_header(leaf.block_header())
665 };
666
667 let restart_view = max(restart_view, leaf.view_number());
672 let epoch = genesis_epoch_from_version::<V, SeqTypes>();
674
675 let config = self.load_config().await.context("loading config")?;
676 let epoch_height = config
677 .as_ref()
678 .map(|c| c.config.epoch_height)
679 .unwrap_or_default();
680 let epoch_start_block = config
681 .as_ref()
682 .map(|c| c.config.epoch_start_block)
683 .unwrap_or_default();
684
685 let saved_proposals = self
686 .load_quorum_proposals()
687 .await
688 .context("loading saved proposals")?;
689
690 let upgrade_certificate = self
691 .load_upgrade_certificate()
692 .await
693 .context("loading upgrade certificate")?;
694
695 let start_epoch_info = self
696 .load_start_epoch_info()
697 .await
698 .context("loading start epoch info")?;
699
700 let state_cert = self
701 .load_state_cert()
702 .await
703 .context("loading light client state update certificate")?;
704
705 tracing::warn!(
706 ?leaf,
707 ?restart_view,
708 ?epoch,
709 ?high_qc,
710 ?validated_state,
711 ?state_cert,
712 "loaded consensus state"
713 );
714
715 Ok((
716 HotShotInitializer {
717 instance_state: state,
718 epoch_height,
719 epoch_start_block,
720 anchor_leaf: leaf,
721 anchor_state: Arc::new(validated_state),
722 anchor_state_delta: None,
723 start_view: restart_view,
724 start_epoch: epoch,
725 last_actioned_view: highest_voted_view,
726 saved_proposals,
727 high_qc,
728 next_epoch_high_qc,
729 decided_upgrade_certificate: upgrade_certificate,
730 undecided_leaves: Default::default(),
731 undecided_state: Default::default(),
732 saved_vid_shares: Default::default(), start_epoch_info,
734 state_cert,
735 },
736 anchor_view,
737 ))
738 }
739
740 async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
742 if let EventType::Decide {
743 leaf_chain,
744 committing_qc,
745 deciding_qc,
746 ..
747 } = &event.event
748 {
749 let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
750 return;
752 };
753
754 let chain = leaf_chain.iter().zip(
756 std::iter::once((**committing_qc).clone())
758 .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
761 );
762
763 if let Err(err) = self
764 .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
765 .await
766 {
767 tracing::error!(
768 "failed to save decided leaves, chain may not be up to date: {err:#}"
769 );
770 return;
771 }
772 }
773 }
774
775 async fn append_decided_leaves(
801 &self,
802 decided_view: ViewNumber,
803 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
804 deciding_qc: Option<Arc<QuorumCertificate2<SeqTypes>>>,
805 consumer: &(impl EventConsumer + 'static),
806 ) -> anyhow::Result<()>;
807
808 async fn load_anchor_leaf(
809 &self,
810 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
811 async fn append_vid(
812 &self,
813 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
814 ) -> anyhow::Result<()>;
815 async fn append_vid2(
817 &self,
818 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
819 ) -> anyhow::Result<()>;
820 async fn append_da(
821 &self,
822 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
823 vid_commit: VidCommitment,
824 ) -> anyhow::Result<()>;
825 async fn record_action(
826 &self,
827 view: ViewNumber,
828 epoch: Option<EpochNumber>,
829 action: HotShotAction,
830 ) -> anyhow::Result<()>;
831
832 async fn append_quorum_proposal2(
833 &self,
834 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
835 ) -> anyhow::Result<()>;
836
837 async fn store_eqc(
839 &self,
840 _high_qc: QuorumCertificate2<SeqTypes>,
841 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
842 ) -> anyhow::Result<()>;
843
844 async fn load_eqc(
846 &self,
847 ) -> Option<(
848 QuorumCertificate2<SeqTypes>,
849 NextEpochQuorumCertificate2<SeqTypes>,
850 )>;
851
852 async fn store_upgrade_certificate(
853 &self,
854 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
855 ) -> anyhow::Result<()>;
856
857 async fn migrate_stake_table_events(&self) -> anyhow::Result<()>;
858
859 async fn migrate_storage(&self) -> anyhow::Result<()> {
860 tracing::warn!("migrating consensus data...");
861
862 self.migrate_anchor_leaf().await?;
863 self.migrate_da_proposals().await?;
864 self.migrate_vid_shares().await?;
865 self.migrate_quorum_proposals().await?;
866 self.migrate_quorum_certificates().await?;
867
868 tracing::warn!("consensus storage has been migrated to new types");
869
870 tracing::warn!("migrating stake table events");
871 self.migrate_stake_table_events().await?;
872 tracing::warn!("stake table events have been migrated");
873
874 Ok(())
875 }
876
877 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
878 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
879 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
880 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
881 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
882
883 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
884 match self.load_anchor_leaf().await? {
885 Some((leaf, _)) => Ok(leaf.view_number()),
886 None => Ok(ViewNumber::genesis()),
887 }
888 }
889
890 async fn store_next_epoch_quorum_certificate(
891 &self,
892 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
893 ) -> anyhow::Result<()>;
894
895 async fn load_next_epoch_quorum_certificate(
896 &self,
897 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
898
899 async fn append_da2(
900 &self,
901 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
902 vid_commit: VidCommitment,
903 ) -> anyhow::Result<()>;
904
905 async fn append_proposal2(
906 &self,
907 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
908 ) -> anyhow::Result<()> {
909 self.append_quorum_proposal2(proposal).await
910 }
911
912 async fn store_drb_result(
913 &self,
914 epoch: <SeqTypes as NodeType>::Epoch,
915 drb_result: DrbResult,
916 ) -> anyhow::Result<()>;
917 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
918 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
919 async fn store_epoch_root(
920 &self,
921 epoch: <SeqTypes as NodeType>::Epoch,
922 block_header: <SeqTypes as NodeType>::BlockHeader,
923 ) -> anyhow::Result<()>;
924 async fn add_state_cert(
925 &self,
926 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
927 ) -> anyhow::Result<()>;
928
929 fn enable_metrics(&mut self, metrics: &dyn Metrics);
930}
931
932#[async_trait]
933pub trait EventConsumer: Debug + Send + Sync {
934 async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
935}
936
937#[async_trait]
938impl<T> EventConsumer for Box<T>
939where
940 T: EventConsumer + ?Sized,
941{
942 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
943 (**self).handle_event(event).await
944 }
945}
946
947#[derive(Clone, Copy, Debug)]
948pub struct NullEventConsumer;
949
950#[async_trait]
951impl EventConsumer for NullEventConsumer {
952 async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
953 Ok(())
954 }
955}
956
957#[async_trait]
958impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
959 async fn append_vid(
960 &self,
961 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
962 ) -> anyhow::Result<()> {
963 (**self).append_vid(proposal).await
964 }
965
966 async fn append_vid2(
967 &self,
968 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
969 ) -> anyhow::Result<()> {
970 (**self).append_vid2(proposal).await
971 }
972
973 async fn append_da(
974 &self,
975 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
976 vid_commit: VidCommitment,
977 ) -> anyhow::Result<()> {
978 (**self).append_da(proposal, vid_commit).await
979 }
980
981 async fn append_da2(
982 &self,
983 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
984 vid_commit: VidCommitment,
985 ) -> anyhow::Result<()> {
986 (**self).append_da2(proposal, vid_commit).await
987 }
988
989 async fn record_action(
990 &self,
991 view: ViewNumber,
992 epoch: Option<EpochNumber>,
993 action: HotShotAction,
994 ) -> anyhow::Result<()> {
995 (**self).record_action(view, epoch, action).await
996 }
997
998 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
999 Ok(())
1000 }
1001
1002 async fn append_proposal(
1003 &self,
1004 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1005 ) -> anyhow::Result<()> {
1006 (**self)
1007 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1008 .await
1009 }
1010
1011 async fn append_proposal2(
1012 &self,
1013 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1014 ) -> anyhow::Result<()> {
1015 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1016 convert_proposal(proposal.clone());
1017 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1018 }
1019
1020 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1021 Ok(())
1022 }
1023
1024 async fn update_eqc(
1026 &self,
1027 high_qc: QuorumCertificate2<SeqTypes>,
1028 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1029 ) -> anyhow::Result<()> {
1030 if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1031 if high_qc.view_number() < existing_high_qc.view_number() {
1032 return Ok(());
1033 }
1034 }
1035
1036 (**self).store_eqc(high_qc, next_epoch_high_qc).await
1037 }
1038
1039 async fn update_next_epoch_high_qc2(
1040 &self,
1041 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1042 ) -> anyhow::Result<()> {
1043 Ok(())
1044 }
1045
1046 async fn update_decided_upgrade_certificate(
1047 &self,
1048 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1049 ) -> anyhow::Result<()> {
1050 (**self)
1051 .store_upgrade_certificate(decided_upgrade_certificate)
1052 .await
1053 }
1054
1055 async fn store_drb_result(
1056 &self,
1057 epoch: <SeqTypes as NodeType>::Epoch,
1058 drb_result: DrbResult,
1059 ) -> anyhow::Result<()> {
1060 (**self).store_drb_result(epoch, drb_result).await
1061 }
1062
1063 async fn store_epoch_root(
1064 &self,
1065 epoch: <SeqTypes as NodeType>::Epoch,
1066 block_header: <SeqTypes as NodeType>::BlockHeader,
1067 ) -> anyhow::Result<()> {
1068 (**self).store_epoch_root(epoch, block_header).await
1069 }
1070
1071 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1072 (**self).store_drb_input(drb_input).await
1073 }
1074
1075 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1076 (**self).load_drb_input(epoch).await
1077 }
1078
1079 async fn update_state_cert(
1080 &self,
1081 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1082 ) -> anyhow::Result<()> {
1083 (**self).add_state_cert(state_cert).await
1084 }
1085}
1086
1087pub trait FromNsPayloadBytes<'a> {
1092 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1094}
1095
1096pub trait NsPayloadBytesRange<'a> {
1101 type Output: FromNsPayloadBytes<'a>;
1102
1103 fn ns_payload_range(&self) -> Range<usize>;
1105}
1106
1107pub trait FromStringOrInteger: Sized {
1118 type Binary: Serialize + DeserializeOwned;
1119 type Integer: Serialize + DeserializeOwned;
1120
1121 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1122 fn from_string(s: String) -> anyhow::Result<Self>;
1123 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1124
1125 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1126 fn to_string(&self) -> anyhow::Result<String>;
1127}