1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13 data::{
14 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17 },
18 drb::{DrbInput, DrbResult},
19 event::{HotShotAction, LeafInfo},
20 message::{convert_proposal, Proposal},
21 simple_certificate::{
22 CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
23 QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
24 },
25 stake_table::HSStakeTable,
26 traits::{
27 metrics::Metrics,
28 node_implementation::{ConsensusTime, NodeType, Versions},
29 storage::Storage,
30 ValidatedState as HotShotState,
31 },
32 utils::genesis_epoch_from_version,
33 vote::HasViewNumber,
34};
35use indexmap::IndexMap;
36use serde::{de::DeserializeOwned, Serialize};
37
38use super::{
39 impls::NodeState,
40 utils::BackoffParams,
41 v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44 v0::impls::{StakeTableHash, ValidatedState},
45 v0_3::{
46 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
47 Validator,
48 },
49 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
50 BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
51 PubKey, SeqTypes, ValidatorMap,
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56 async fn try_fetch_leaf(
58 &self,
59 retry: usize,
60 height: u64,
61 stake_table: HSStakeTable<SeqTypes>,
62 success_threshold: U256,
63 ) -> anyhow::Result<Leaf2>;
64
65 async fn fetch_leaf(
67 &self,
68 height: u64,
69 stake_table: HSStakeTable<SeqTypes>,
70 success_threshold: U256,
71 ) -> anyhow::Result<Leaf2> {
72 self.backoff()
73 .retry(self, |provider, retry| {
74 let stake_table_clone = stake_table.clone();
75 async move {
76 provider
77 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78 .await
79 }
80 .boxed()
81 })
82 .await
83 }
84
85 async fn try_fetch_accounts(
87 &self,
88 retry: usize,
89 instance: &NodeState,
90 height: u64,
91 view: ViewNumber,
92 fee_merkle_tree_root: FeeMerkleCommitment,
93 accounts: &[FeeAccount],
94 ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96 async fn fetch_accounts(
98 &self,
99 instance: &NodeState,
100 height: u64,
101 view: ViewNumber,
102 fee_merkle_tree_root: FeeMerkleCommitment,
103 accounts: Vec<FeeAccount>,
104 ) -> anyhow::Result<Vec<FeeAccountProof>> {
105 self.backoff()
106 .retry(self, |provider, retry| {
107 let accounts = &accounts;
108 async move {
109 provider
110 .try_fetch_accounts(
111 retry,
112 instance,
113 height,
114 view,
115 fee_merkle_tree_root,
116 accounts,
117 )
118 .await
119 .map_err(|err| {
120 err.context(format!(
121 "fetching accounts {accounts:?}, height {height}, view {view}"
122 ))
123 })
124 }
125 .boxed()
126 })
127 .await
128 }
129
130 async fn try_remember_blocks_merkle_tree(
132 &self,
133 retry: usize,
134 instance: &NodeState,
135 height: u64,
136 view: ViewNumber,
137 mt: &mut BlockMerkleTree,
138 ) -> anyhow::Result<()>;
139
140 async fn remember_blocks_merkle_tree(
142 &self,
143 instance: &NodeState,
144 height: u64,
145 view: ViewNumber,
146 mt: &mut BlockMerkleTree,
147 ) -> anyhow::Result<()> {
148 self.backoff()
149 .retry(mt, |mt, retry| {
150 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152 .boxed()
153 })
154 .await
155 }
156
157 async fn try_fetch_chain_config(
159 &self,
160 retry: usize,
161 commitment: Commitment<ChainConfig>,
162 ) -> anyhow::Result<ChainConfig>;
163
164 async fn fetch_chain_config(
166 &self,
167 commitment: Commitment<ChainConfig>,
168 ) -> anyhow::Result<ChainConfig> {
169 self.backoff()
170 .retry(self, |provider, retry| {
171 provider
172 .try_fetch_chain_config(retry, commitment)
173 .map_err(|err| err.context("fetching chain config"))
174 .boxed()
175 })
176 .await
177 }
178
179 async fn try_fetch_reward_accounts_v2(
181 &self,
182 retry: usize,
183 instance: &NodeState,
184 height: u64,
185 view: ViewNumber,
186 reward_merkle_tree_root: RewardMerkleCommitmentV2,
187 accounts: &[RewardAccountV2],
188 ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
189
190 async fn fetch_reward_accounts_v2(
192 &self,
193 instance: &NodeState,
194 height: u64,
195 view: ViewNumber,
196 reward_merkle_tree_root: RewardMerkleCommitmentV2,
197 accounts: Vec<RewardAccountV2>,
198 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
199 self.backoff()
200 .retry(self, |provider, retry| {
201 let accounts = &accounts;
202 async move {
203 provider
204 .try_fetch_reward_accounts_v2(
205 retry,
206 instance,
207 height,
208 view,
209 reward_merkle_tree_root,
210 accounts,
211 )
212 .await
213 .map_err(|err| {
214 err.context(format!(
215 "fetching reward accounts {accounts:?}, height {height}, view \
216 {view}"
217 ))
218 })
219 }
220 .boxed()
221 })
222 .await
223 }
224
225 async fn try_fetch_reward_accounts_v1(
227 &self,
228 retry: usize,
229 instance: &NodeState,
230 height: u64,
231 view: ViewNumber,
232 reward_merkle_tree_root: RewardMerkleCommitmentV1,
233 accounts: &[RewardAccountV1],
234 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
235
236 async fn fetch_reward_accounts_v1(
238 &self,
239 instance: &NodeState,
240 height: u64,
241 view: ViewNumber,
242 reward_merkle_tree_root: RewardMerkleCommitmentV1,
243 accounts: Vec<RewardAccountV1>,
244 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
245 self.backoff()
246 .retry(self, |provider, retry| {
247 let accounts = &accounts;
248 async move {
249 provider
250 .try_fetch_reward_accounts_v1(
251 retry,
252 instance,
253 height,
254 view,
255 reward_merkle_tree_root,
256 accounts,
257 )
258 .await
259 .map_err(|err| {
260 err.context(format!(
261 "fetching v1 reward accounts {accounts:?}, height {height}, view \
262 {view}"
263 ))
264 })
265 }
266 .boxed()
267 })
268 .await
269 }
270
271 async fn try_fetch_state_cert(
273 &self,
274 retry: usize,
275 epoch: u64,
276 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
277
278 async fn fetch_state_cert(
280 &self,
281 epoch: u64,
282 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
283 self.backoff()
284 .retry(self, |provider, retry| {
285 provider
286 .try_fetch_state_cert(retry, epoch)
287 .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
288 .boxed()
289 })
290 .await
291 }
292
293 fn is_local(&self) -> bool;
295
296 fn backoff(&self) -> &BackoffParams;
298
299 fn name(&self) -> String;
301}
302
303#[async_trait]
304impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
305 async fn try_fetch_leaf(
306 &self,
307 retry: usize,
308 height: u64,
309 stake_table: HSStakeTable<SeqTypes>,
310 success_threshold: U256,
311 ) -> anyhow::Result<Leaf2> {
312 (**self)
313 .try_fetch_leaf(retry, height, stake_table, success_threshold)
314 .await
315 }
316
317 async fn fetch_leaf(
318 &self,
319 height: u64,
320 stake_table: HSStakeTable<SeqTypes>,
321 success_threshold: U256,
322 ) -> anyhow::Result<Leaf2> {
323 (**self)
324 .fetch_leaf(height, stake_table, success_threshold)
325 .await
326 }
327 async fn try_fetch_accounts(
328 &self,
329 retry: usize,
330 instance: &NodeState,
331 height: u64,
332 view: ViewNumber,
333 fee_merkle_tree_root: FeeMerkleCommitment,
334 accounts: &[FeeAccount],
335 ) -> anyhow::Result<Vec<FeeAccountProof>> {
336 (**self)
337 .try_fetch_accounts(
338 retry,
339 instance,
340 height,
341 view,
342 fee_merkle_tree_root,
343 accounts,
344 )
345 .await
346 }
347
348 async fn fetch_accounts(
349 &self,
350 instance: &NodeState,
351 height: u64,
352 view: ViewNumber,
353 fee_merkle_tree_root: FeeMerkleCommitment,
354 accounts: Vec<FeeAccount>,
355 ) -> anyhow::Result<Vec<FeeAccountProof>> {
356 (**self)
357 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
358 .await
359 }
360
361 async fn try_remember_blocks_merkle_tree(
362 &self,
363 retry: usize,
364 instance: &NodeState,
365 height: u64,
366 view: ViewNumber,
367 mt: &mut BlockMerkleTree,
368 ) -> anyhow::Result<()> {
369 (**self)
370 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
371 .await
372 }
373
374 async fn remember_blocks_merkle_tree(
375 &self,
376 instance: &NodeState,
377 height: u64,
378 view: ViewNumber,
379 mt: &mut BlockMerkleTree,
380 ) -> anyhow::Result<()> {
381 (**self)
382 .remember_blocks_merkle_tree(instance, height, view, mt)
383 .await
384 }
385
386 async fn try_fetch_chain_config(
387 &self,
388 retry: usize,
389 commitment: Commitment<ChainConfig>,
390 ) -> anyhow::Result<ChainConfig> {
391 (**self).try_fetch_chain_config(retry, commitment).await
392 }
393
394 async fn fetch_chain_config(
395 &self,
396 commitment: Commitment<ChainConfig>,
397 ) -> anyhow::Result<ChainConfig> {
398 (**self).fetch_chain_config(commitment).await
399 }
400
401 async fn try_fetch_reward_accounts_v2(
402 &self,
403 retry: usize,
404 instance: &NodeState,
405 height: u64,
406 view: ViewNumber,
407 reward_merkle_tree_root: RewardMerkleCommitmentV2,
408 accounts: &[RewardAccountV2],
409 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
410 (**self)
411 .try_fetch_reward_accounts_v2(
412 retry,
413 instance,
414 height,
415 view,
416 reward_merkle_tree_root,
417 accounts,
418 )
419 .await
420 }
421
422 async fn fetch_reward_accounts_v2(
423 &self,
424 instance: &NodeState,
425 height: u64,
426 view: ViewNumber,
427 reward_merkle_tree_root: RewardMerkleCommitmentV2,
428 accounts: Vec<RewardAccountV2>,
429 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
430 (**self)
431 .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
432 .await
433 }
434
435 async fn try_fetch_reward_accounts_v1(
436 &self,
437 retry: usize,
438 instance: &NodeState,
439 height: u64,
440 view: ViewNumber,
441 reward_merkle_tree_root: RewardMerkleCommitmentV1,
442 accounts: &[RewardAccountV1],
443 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
444 (**self)
445 .try_fetch_reward_accounts_v1(
446 retry,
447 instance,
448 height,
449 view,
450 reward_merkle_tree_root,
451 accounts,
452 )
453 .await
454 }
455
456 async fn fetch_reward_accounts_v1(
457 &self,
458 instance: &NodeState,
459 height: u64,
460 view: ViewNumber,
461 reward_merkle_tree_root: RewardMerkleCommitmentV1,
462 accounts: Vec<RewardAccountV1>,
463 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
464 (**self)
465 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
466 .await
467 }
468
469 async fn try_fetch_state_cert(
470 &self,
471 retry: usize,
472 epoch: u64,
473 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
474 (**self).try_fetch_state_cert(retry, epoch).await
475 }
476
477 async fn fetch_state_cert(
478 &self,
479 epoch: u64,
480 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
481 (**self).fetch_state_cert(epoch).await
482 }
483
484 fn backoff(&self) -> &BackoffParams {
485 (**self).backoff()
486 }
487
488 fn name(&self) -> String {
489 (**self).name()
490 }
491
492 fn is_local(&self) -> bool {
493 (**self).is_local()
494 }
495}
496
497#[async_trait]
498pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
499 type Persistence: SequencerPersistence + MembershipPersistence;
500
501 fn set_view_retention(&mut self, view_retention: u64);
502 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
503 async fn reset(self) -> anyhow::Result<()>;
504}
505
506pub enum EventsPersistenceRead {
510 Complete,
511 UntilL1Block(u64),
512}
513
514#[async_trait]
515pub trait MembershipPersistence: Send + Sync + 'static {
517 async fn load_stake(
519 &self,
520 epoch: EpochNumber,
521 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
522
523 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
525
526 async fn store_stake(
528 &self,
529 epoch: EpochNumber,
530 stake: ValidatorMap,
531 block_reward: Option<RewardAmount>,
532 stake_table_hash: Option<StakeTableHash>,
533 ) -> anyhow::Result<()>;
534
535 async fn store_events(
536 &self,
537 l1_finalized: u64,
538 events: Vec<(EventKey, StakeTableEvent)>,
539 ) -> anyhow::Result<()>;
540 async fn load_events(
541 &self,
542 l1_finalized: u64,
543 ) -> anyhow::Result<(
544 Option<EventsPersistenceRead>,
545 Vec<(EventKey, StakeTableEvent)>,
546 )>;
547
548 async fn store_all_validators(
549 &self,
550 epoch: EpochNumber,
551 all_validators: IndexMap<Address, Validator<PubKey>>,
552 ) -> anyhow::Result<()>;
553
554 async fn load_all_validators(
555 &self,
556 epoch: EpochNumber,
557 offset: u64,
558 limit: u64,
559 ) -> anyhow::Result<Vec<Validator<PubKey>>>;
560}
561
562#[async_trait]
563pub trait SequencerPersistence:
564 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
565{
566 fn into_catchup_provider(
568 self,
569 _backoff: BackoffParams,
570 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
571 bail!("state catchup is not implemented for this persistence type");
572 }
573
574 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
579
580 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
582
583 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
585
586 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
588
589 async fn load_quorum_proposals(
591 &self,
592 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
593
594 async fn load_quorum_proposal(
595 &self,
596 view: ViewNumber,
597 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
598
599 async fn load_vid_share(
600 &self,
601 view: ViewNumber,
602 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
603 async fn load_da_proposal(
604 &self,
605 view: ViewNumber,
606 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
607 async fn load_upgrade_certificate(
608 &self,
609 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
610 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
611 async fn load_state_cert(
612 &self,
613 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
614
615 async fn get_state_cert_by_epoch(
617 &self,
618 epoch: u64,
619 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
620
621 async fn insert_state_cert(
623 &self,
624 epoch: u64,
625 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
626 ) -> anyhow::Result<()>;
627
628 async fn load_consensus_state<V: Versions>(
635 &self,
636 state: NodeState,
637 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
638 let genesis_validated_state = ValidatedState::genesis(&state).0;
639 let highest_voted_view = match self
640 .load_latest_acted_view()
641 .await
642 .context("loading last voted view")?
643 {
644 Some(view) => {
645 tracing::info!(?view, "starting with last actioned view");
646 view
647 },
648 None => {
649 tracing::info!("no saved view, starting from genesis");
650 ViewNumber::genesis()
651 },
652 };
653
654 let restart_view = match self
655 .load_restart_view()
656 .await
657 .context("loading restart view")?
658 {
659 Some(view) => {
660 tracing::info!(?view, "starting from saved view");
661 view
662 },
663 None => {
664 tracing::info!("no saved view, starting from genesis");
665 ViewNumber::genesis()
666 },
667 };
668 let next_epoch_high_qc = self
669 .load_next_epoch_quorum_certificate()
670 .await
671 .context("loading next epoch qc")?;
672 let (leaf, mut high_qc, anchor_view) = match self
673 .load_anchor_leaf()
674 .await
675 .context("loading anchor leaf")?
676 {
677 Some((leaf, high_qc)) => {
678 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
679 ensure!(
680 leaf.view_number() == high_qc.view_number,
681 format!(
682 "loaded anchor leaf from view {}, but high QC is from view {}",
683 leaf.view_number(),
684 high_qc.view_number
685 )
686 );
687
688 let anchor_view = leaf.view_number();
689 (leaf, high_qc, Some(anchor_view))
690 },
691 None => {
692 tracing::info!("no saved leaf, starting from genesis leaf");
693 (
694 hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
695 .await,
696 QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
697 None,
698 )
699 },
700 };
701
702 if let Some((extended_high_qc, _)) = self.load_eqc().await {
703 if extended_high_qc.view_number() > high_qc.view_number() {
704 high_qc = extended_high_qc
705 }
706 }
707
708 let validated_state = if leaf.block_header().height() == 0 {
709 genesis_validated_state
711 } else {
712 ValidatedState::from_header(leaf.block_header())
715 };
716
717 let restart_view = max(restart_view, leaf.view_number());
722 let epoch = genesis_epoch_from_version::<V, SeqTypes>();
724
725 let config = self.load_config().await.context("loading config")?;
726 let epoch_height = config
727 .as_ref()
728 .map(|c| c.config.epoch_height)
729 .unwrap_or_default();
730 let epoch_start_block = config
731 .as_ref()
732 .map(|c| c.config.epoch_start_block)
733 .unwrap_or_default();
734
735 let saved_proposals = self
736 .load_quorum_proposals()
737 .await
738 .context("loading saved proposals")?;
739
740 let upgrade_certificate = self
741 .load_upgrade_certificate()
742 .await
743 .context("loading upgrade certificate")?;
744
745 let start_epoch_info = self
746 .load_start_epoch_info()
747 .await
748 .context("loading start epoch info")?;
749
750 let state_cert = self
751 .load_state_cert()
752 .await
753 .context("loading light client state update certificate")?;
754
755 tracing::warn!(
756 ?leaf,
757 ?restart_view,
758 ?epoch,
759 ?high_qc,
760 ?validated_state,
761 ?state_cert,
762 "loaded consensus state"
763 );
764
765 Ok((
766 HotShotInitializer {
767 instance_state: state,
768 epoch_height,
769 epoch_start_block,
770 anchor_leaf: leaf,
771 anchor_state: Arc::new(validated_state),
772 anchor_state_delta: None,
773 start_view: restart_view,
774 start_epoch: epoch,
775 last_actioned_view: highest_voted_view,
776 saved_proposals,
777 high_qc,
778 next_epoch_high_qc,
779 decided_upgrade_certificate: upgrade_certificate,
780 undecided_leaves: Default::default(),
781 undecided_state: Default::default(),
782 saved_vid_shares: Default::default(), start_epoch_info,
784 state_cert,
785 },
786 anchor_view,
787 ))
788 }
789
790 async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
792 if let EventType::Decide {
793 leaf_chain,
794 committing_qc,
795 deciding_qc,
796 ..
797 } = &event.event
798 {
799 let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
800 return;
802 };
803
804 let chain = leaf_chain.iter().zip(
806 std::iter::once((**committing_qc).clone())
808 .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
811 );
812
813 if let Err(err) = self
814 .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
815 .await
816 {
817 tracing::error!(
818 "failed to save decided leaves, chain may not be up to date: {err:#}"
819 );
820 return;
821 }
822 }
823 }
824
825 async fn append_decided_leaves(
851 &self,
852 decided_view: ViewNumber,
853 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
854 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
855 consumer: &(impl EventConsumer + 'static),
856 ) -> anyhow::Result<()>;
857
858 async fn load_anchor_leaf(
859 &self,
860 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
861 async fn append_vid(
862 &self,
863 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
864 ) -> anyhow::Result<()>;
865 async fn append_vid2(
867 &self,
868 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
869 ) -> anyhow::Result<()>;
870 async fn append_da(
871 &self,
872 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
873 vid_commit: VidCommitment,
874 ) -> anyhow::Result<()>;
875 async fn record_action(
876 &self,
877 view: ViewNumber,
878 epoch: Option<EpochNumber>,
879 action: HotShotAction,
880 ) -> anyhow::Result<()>;
881
882 async fn append_quorum_proposal2(
883 &self,
884 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
885 ) -> anyhow::Result<()>;
886
887 async fn store_eqc(
889 &self,
890 _high_qc: QuorumCertificate2<SeqTypes>,
891 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
892 ) -> anyhow::Result<()>;
893
894 async fn load_eqc(
896 &self,
897 ) -> Option<(
898 QuorumCertificate2<SeqTypes>,
899 NextEpochQuorumCertificate2<SeqTypes>,
900 )>;
901
902 async fn store_upgrade_certificate(
903 &self,
904 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
905 ) -> anyhow::Result<()>;
906
907 async fn migrate_storage(&self) -> anyhow::Result<()> {
908 tracing::warn!("migrating consensus data...");
909
910 self.migrate_anchor_leaf().await?;
911 self.migrate_da_proposals().await?;
912 self.migrate_vid_shares().await?;
913 self.migrate_quorum_proposals().await?;
914 self.migrate_quorum_certificates().await?;
915
916 tracing::warn!("consensus storage has been migrated to new types");
917
918 Ok(())
919 }
920
921 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
922 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
923 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
924 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
925 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
926
927 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
928 match self.load_anchor_leaf().await? {
929 Some((leaf, _)) => Ok(leaf.view_number()),
930 None => Ok(ViewNumber::genesis()),
931 }
932 }
933
934 async fn store_next_epoch_quorum_certificate(
935 &self,
936 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
937 ) -> anyhow::Result<()>;
938
939 async fn load_next_epoch_quorum_certificate(
940 &self,
941 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
942
943 async fn append_da2(
944 &self,
945 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
946 vid_commit: VidCommitment,
947 ) -> anyhow::Result<()>;
948
949 async fn append_proposal2(
950 &self,
951 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
952 ) -> anyhow::Result<()> {
953 self.append_quorum_proposal2(proposal).await
954 }
955
956 async fn store_drb_result(
957 &self,
958 epoch: <SeqTypes as NodeType>::Epoch,
959 drb_result: DrbResult,
960 ) -> anyhow::Result<()>;
961 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
962 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
963 async fn store_epoch_root(
964 &self,
965 epoch: <SeqTypes as NodeType>::Epoch,
966 block_header: <SeqTypes as NodeType>::BlockHeader,
967 ) -> anyhow::Result<()>;
968 async fn add_state_cert(
969 &self,
970 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
971 ) -> anyhow::Result<()>;
972
973 fn enable_metrics(&mut self, metrics: &dyn Metrics);
974}
975
976#[async_trait]
977pub trait EventConsumer: Debug + Send + Sync {
978 async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
979}
980
981#[async_trait]
982impl<T> EventConsumer for Box<T>
983where
984 T: EventConsumer + ?Sized,
985{
986 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
987 (**self).handle_event(event).await
988 }
989}
990
991#[derive(Clone, Copy, Debug)]
992pub struct NullEventConsumer;
993
994#[async_trait]
995impl EventConsumer for NullEventConsumer {
996 async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
997 Ok(())
998 }
999}
1000
1001#[async_trait]
1002impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1003 async fn append_vid(
1004 &self,
1005 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
1006 ) -> anyhow::Result<()> {
1007 (**self).append_vid(proposal).await
1008 }
1009
1010 async fn append_vid2(
1011 &self,
1012 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
1013 ) -> anyhow::Result<()> {
1014 (**self).append_vid2(proposal).await
1015 }
1016
1017 async fn append_da(
1018 &self,
1019 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1020 vid_commit: VidCommitment,
1021 ) -> anyhow::Result<()> {
1022 (**self).append_da(proposal, vid_commit).await
1023 }
1024
1025 async fn append_da2(
1026 &self,
1027 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1028 vid_commit: VidCommitment,
1029 ) -> anyhow::Result<()> {
1030 (**self).append_da2(proposal, vid_commit).await
1031 }
1032
1033 async fn record_action(
1034 &self,
1035 view: ViewNumber,
1036 epoch: Option<EpochNumber>,
1037 action: HotShotAction,
1038 ) -> anyhow::Result<()> {
1039 (**self).record_action(view, epoch, action).await
1040 }
1041
1042 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1043 Ok(())
1044 }
1045
1046 async fn append_proposal(
1047 &self,
1048 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1049 ) -> anyhow::Result<()> {
1050 (**self)
1051 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1052 .await
1053 }
1054
1055 async fn append_proposal2(
1056 &self,
1057 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1058 ) -> anyhow::Result<()> {
1059 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1060 convert_proposal(proposal.clone());
1061 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1062 }
1063
1064 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1065 Ok(())
1066 }
1067
1068 async fn update_eqc(
1070 &self,
1071 high_qc: QuorumCertificate2<SeqTypes>,
1072 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1073 ) -> anyhow::Result<()> {
1074 if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1075 if high_qc.view_number() < existing_high_qc.view_number() {
1076 return Ok(());
1077 }
1078 }
1079
1080 (**self).store_eqc(high_qc, next_epoch_high_qc).await
1081 }
1082
1083 async fn update_next_epoch_high_qc2(
1084 &self,
1085 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1086 ) -> anyhow::Result<()> {
1087 Ok(())
1088 }
1089
1090 async fn update_decided_upgrade_certificate(
1091 &self,
1092 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1093 ) -> anyhow::Result<()> {
1094 (**self)
1095 .store_upgrade_certificate(decided_upgrade_certificate)
1096 .await
1097 }
1098
1099 async fn store_drb_result(
1100 &self,
1101 epoch: <SeqTypes as NodeType>::Epoch,
1102 drb_result: DrbResult,
1103 ) -> anyhow::Result<()> {
1104 (**self).store_drb_result(epoch, drb_result).await
1105 }
1106
1107 async fn store_epoch_root(
1108 &self,
1109 epoch: <SeqTypes as NodeType>::Epoch,
1110 block_header: <SeqTypes as NodeType>::BlockHeader,
1111 ) -> anyhow::Result<()> {
1112 (**self).store_epoch_root(epoch, block_header).await
1113 }
1114
1115 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1116 (**self).store_drb_input(drb_input).await
1117 }
1118
1119 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1120 (**self).load_drb_input(epoch).await
1121 }
1122
1123 async fn update_state_cert(
1124 &self,
1125 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1126 ) -> anyhow::Result<()> {
1127 (**self).add_state_cert(state_cert).await
1128 }
1129}
1130
1131pub trait FromNsPayloadBytes<'a> {
1136 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1138}
1139
1140pub trait NsPayloadBytesRange<'a> {
1145 type Output: FromNsPayloadBytes<'a>;
1146
1147 fn ns_payload_range(&self) -> Range<usize>;
1149}
1150
1151pub trait FromStringOrInteger: Sized {
1162 type Binary: Serialize + DeserializeOwned;
1163 type Integer: Serialize + DeserializeOwned;
1164
1165 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1166 fn from_string(s: String) -> anyhow::Result<Self>;
1167 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1168
1169 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1170 fn to_string(&self) -> anyhow::Result<String>;
1171}