1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13 data::{
14 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
15 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
16 },
17 drb::{DrbInput, DrbResult},
18 event::{HotShotAction, LeafInfo},
19 message::{convert_proposal, Proposal},
20 simple_certificate::{
21 CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
22 QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
23 },
24 stake_table::HSStakeTable,
25 traits::{
26 metrics::Metrics,
27 node_implementation::{ConsensusTime, NodeType, Versions},
28 storage::Storage,
29 ValidatedState as HotShotState,
30 },
31 utils::genesis_epoch_from_version,
32 vote::HasViewNumber,
33};
34use indexmap::IndexMap;
35use serde::{de::DeserializeOwned, Serialize};
36
37use super::{
38 impls::NodeState,
39 utils::BackoffParams,
40 v0_3::{EventKey, IndexedStake, StakeTableEvent},
41};
42use crate::{
43 v0::impls::{StakeTableHash, ValidatedState},
44 v0_3::{
45 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
46 Validator,
47 },
48 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
49 BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
50 PubKey, SeqTypes, ValidatorMap,
51};
52
53#[async_trait]
54pub trait StateCatchup: Send + Sync {
55 async fn try_fetch_leaf(
57 &self,
58 retry: usize,
59 height: u64,
60 stake_table: HSStakeTable<SeqTypes>,
61 success_threshold: U256,
62 ) -> anyhow::Result<Leaf2>;
63
64 async fn fetch_leaf(
66 &self,
67 height: u64,
68 stake_table: HSStakeTable<SeqTypes>,
69 success_threshold: U256,
70 ) -> anyhow::Result<Leaf2> {
71 self.backoff()
72 .retry(self, |provider, retry| {
73 let stake_table_clone = stake_table.clone();
74 async move {
75 provider
76 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
77 .await
78 }
79 .boxed()
80 })
81 .await
82 }
83
84 async fn try_fetch_accounts(
86 &self,
87 retry: usize,
88 instance: &NodeState,
89 height: u64,
90 view: ViewNumber,
91 fee_merkle_tree_root: FeeMerkleCommitment,
92 accounts: &[FeeAccount],
93 ) -> anyhow::Result<Vec<FeeAccountProof>>;
94
95 async fn fetch_accounts(
97 &self,
98 instance: &NodeState,
99 height: u64,
100 view: ViewNumber,
101 fee_merkle_tree_root: FeeMerkleCommitment,
102 accounts: Vec<FeeAccount>,
103 ) -> anyhow::Result<Vec<FeeAccountProof>> {
104 self.backoff()
105 .retry(self, |provider, retry| {
106 let accounts = &accounts;
107 async move {
108 provider
109 .try_fetch_accounts(
110 retry,
111 instance,
112 height,
113 view,
114 fee_merkle_tree_root,
115 accounts,
116 )
117 .await
118 .map_err(|err| {
119 err.context(format!(
120 "fetching accounts {accounts:?}, height {height}, view {view}"
121 ))
122 })
123 }
124 .boxed()
125 })
126 .await
127 }
128
129 async fn try_remember_blocks_merkle_tree(
131 &self,
132 retry: usize,
133 instance: &NodeState,
134 height: u64,
135 view: ViewNumber,
136 mt: &mut BlockMerkleTree,
137 ) -> anyhow::Result<()>;
138
139 async fn remember_blocks_merkle_tree(
141 &self,
142 instance: &NodeState,
143 height: u64,
144 view: ViewNumber,
145 mt: &mut BlockMerkleTree,
146 ) -> anyhow::Result<()> {
147 self.backoff()
148 .retry(mt, |mt, retry| {
149 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
150 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
151 .boxed()
152 })
153 .await
154 }
155
156 async fn try_fetch_chain_config(
158 &self,
159 retry: usize,
160 commitment: Commitment<ChainConfig>,
161 ) -> anyhow::Result<ChainConfig>;
162
163 async fn fetch_chain_config(
165 &self,
166 commitment: Commitment<ChainConfig>,
167 ) -> anyhow::Result<ChainConfig> {
168 self.backoff()
169 .retry(self, |provider, retry| {
170 provider
171 .try_fetch_chain_config(retry, commitment)
172 .map_err(|err| err.context("fetching chain config"))
173 .boxed()
174 })
175 .await
176 }
177
178 async fn try_fetch_reward_accounts_v2(
180 &self,
181 retry: usize,
182 instance: &NodeState,
183 height: u64,
184 view: ViewNumber,
185 reward_merkle_tree_root: RewardMerkleCommitmentV2,
186 accounts: &[RewardAccountV2],
187 ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
188
189 async fn fetch_reward_accounts_v2(
191 &self,
192 instance: &NodeState,
193 height: u64,
194 view: ViewNumber,
195 reward_merkle_tree_root: RewardMerkleCommitmentV2,
196 accounts: Vec<RewardAccountV2>,
197 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
198 self.backoff()
199 .retry(self, |provider, retry| {
200 let accounts = &accounts;
201 async move {
202 provider
203 .try_fetch_reward_accounts_v2(
204 retry,
205 instance,
206 height,
207 view,
208 reward_merkle_tree_root,
209 accounts,
210 )
211 .await
212 .map_err(|err| {
213 err.context(format!(
214 "fetching reward accounts {accounts:?}, height {height}, view \
215 {view}"
216 ))
217 })
218 }
219 .boxed()
220 })
221 .await
222 }
223
224 async fn try_fetch_reward_accounts_v1(
226 &self,
227 retry: usize,
228 instance: &NodeState,
229 height: u64,
230 view: ViewNumber,
231 reward_merkle_tree_root: RewardMerkleCommitmentV1,
232 accounts: &[RewardAccountV1],
233 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
234
235 async fn fetch_reward_accounts_v1(
237 &self,
238 instance: &NodeState,
239 height: u64,
240 view: ViewNumber,
241 reward_merkle_tree_root: RewardMerkleCommitmentV1,
242 accounts: Vec<RewardAccountV1>,
243 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
244 self.backoff()
245 .retry(self, |provider, retry| {
246 let accounts = &accounts;
247 async move {
248 provider
249 .try_fetch_reward_accounts_v1(
250 retry,
251 instance,
252 height,
253 view,
254 reward_merkle_tree_root,
255 accounts,
256 )
257 .await
258 .map_err(|err| {
259 err.context(format!(
260 "fetching v1 reward accounts {accounts:?}, height {height}, view \
261 {view}"
262 ))
263 })
264 }
265 .boxed()
266 })
267 .await
268 }
269
270 async fn try_fetch_state_cert(
272 &self,
273 retry: usize,
274 epoch: u64,
275 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
276
277 async fn fetch_state_cert(
279 &self,
280 epoch: u64,
281 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
282 self.backoff()
283 .retry(self, |provider, retry| {
284 provider
285 .try_fetch_state_cert(retry, epoch)
286 .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
287 .boxed()
288 })
289 .await
290 }
291
292 fn is_local(&self) -> bool;
294
295 fn backoff(&self) -> &BackoffParams;
297
298 fn name(&self) -> String;
300}
301
302#[async_trait]
303impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
304 async fn try_fetch_leaf(
305 &self,
306 retry: usize,
307 height: u64,
308 stake_table: HSStakeTable<SeqTypes>,
309 success_threshold: U256,
310 ) -> anyhow::Result<Leaf2> {
311 (**self)
312 .try_fetch_leaf(retry, height, stake_table, success_threshold)
313 .await
314 }
315
316 async fn fetch_leaf(
317 &self,
318 height: u64,
319 stake_table: HSStakeTable<SeqTypes>,
320 success_threshold: U256,
321 ) -> anyhow::Result<Leaf2> {
322 (**self)
323 .fetch_leaf(height, stake_table, success_threshold)
324 .await
325 }
326 async fn try_fetch_accounts(
327 &self,
328 retry: usize,
329 instance: &NodeState,
330 height: u64,
331 view: ViewNumber,
332 fee_merkle_tree_root: FeeMerkleCommitment,
333 accounts: &[FeeAccount],
334 ) -> anyhow::Result<Vec<FeeAccountProof>> {
335 (**self)
336 .try_fetch_accounts(
337 retry,
338 instance,
339 height,
340 view,
341 fee_merkle_tree_root,
342 accounts,
343 )
344 .await
345 }
346
347 async fn fetch_accounts(
348 &self,
349 instance: &NodeState,
350 height: u64,
351 view: ViewNumber,
352 fee_merkle_tree_root: FeeMerkleCommitment,
353 accounts: Vec<FeeAccount>,
354 ) -> anyhow::Result<Vec<FeeAccountProof>> {
355 (**self)
356 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
357 .await
358 }
359
360 async fn try_remember_blocks_merkle_tree(
361 &self,
362 retry: usize,
363 instance: &NodeState,
364 height: u64,
365 view: ViewNumber,
366 mt: &mut BlockMerkleTree,
367 ) -> anyhow::Result<()> {
368 (**self)
369 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
370 .await
371 }
372
373 async fn remember_blocks_merkle_tree(
374 &self,
375 instance: &NodeState,
376 height: u64,
377 view: ViewNumber,
378 mt: &mut BlockMerkleTree,
379 ) -> anyhow::Result<()> {
380 (**self)
381 .remember_blocks_merkle_tree(instance, height, view, mt)
382 .await
383 }
384
385 async fn try_fetch_chain_config(
386 &self,
387 retry: usize,
388 commitment: Commitment<ChainConfig>,
389 ) -> anyhow::Result<ChainConfig> {
390 (**self).try_fetch_chain_config(retry, commitment).await
391 }
392
393 async fn fetch_chain_config(
394 &self,
395 commitment: Commitment<ChainConfig>,
396 ) -> anyhow::Result<ChainConfig> {
397 (**self).fetch_chain_config(commitment).await
398 }
399
400 async fn try_fetch_reward_accounts_v2(
401 &self,
402 retry: usize,
403 instance: &NodeState,
404 height: u64,
405 view: ViewNumber,
406 reward_merkle_tree_root: RewardMerkleCommitmentV2,
407 accounts: &[RewardAccountV2],
408 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
409 (**self)
410 .try_fetch_reward_accounts_v2(
411 retry,
412 instance,
413 height,
414 view,
415 reward_merkle_tree_root,
416 accounts,
417 )
418 .await
419 }
420
421 async fn fetch_reward_accounts_v2(
422 &self,
423 instance: &NodeState,
424 height: u64,
425 view: ViewNumber,
426 reward_merkle_tree_root: RewardMerkleCommitmentV2,
427 accounts: Vec<RewardAccountV2>,
428 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
429 (**self)
430 .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
431 .await
432 }
433
434 async fn try_fetch_reward_accounts_v1(
435 &self,
436 retry: usize,
437 instance: &NodeState,
438 height: u64,
439 view: ViewNumber,
440 reward_merkle_tree_root: RewardMerkleCommitmentV1,
441 accounts: &[RewardAccountV1],
442 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
443 (**self)
444 .try_fetch_reward_accounts_v1(
445 retry,
446 instance,
447 height,
448 view,
449 reward_merkle_tree_root,
450 accounts,
451 )
452 .await
453 }
454
455 async fn fetch_reward_accounts_v1(
456 &self,
457 instance: &NodeState,
458 height: u64,
459 view: ViewNumber,
460 reward_merkle_tree_root: RewardMerkleCommitmentV1,
461 accounts: Vec<RewardAccountV1>,
462 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
463 (**self)
464 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
465 .await
466 }
467
468 async fn try_fetch_state_cert(
469 &self,
470 retry: usize,
471 epoch: u64,
472 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
473 (**self).try_fetch_state_cert(retry, epoch).await
474 }
475
476 async fn fetch_state_cert(
477 &self,
478 epoch: u64,
479 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
480 (**self).fetch_state_cert(epoch).await
481 }
482
483 fn backoff(&self) -> &BackoffParams {
484 (**self).backoff()
485 }
486
487 fn name(&self) -> String {
488 (**self).name()
489 }
490
491 fn is_local(&self) -> bool {
492 (**self).is_local()
493 }
494}
495
496#[async_trait]
497pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
498 type Persistence: SequencerPersistence + MembershipPersistence;
499
500 fn set_view_retention(&mut self, view_retention: u64);
501 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
502 async fn reset(self) -> anyhow::Result<()>;
503}
504
505#[derive(Clone, Copy, Debug, PartialEq, Eq)]
509pub enum EventsPersistenceRead {
510 Complete,
511 UntilL1Block(u64),
512}
513
514#[async_trait]
515pub trait MembershipPersistence: Send + Sync + 'static {
517 async fn load_stake(
519 &self,
520 epoch: EpochNumber,
521 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
522
523 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
525
526 async fn store_stake(
528 &self,
529 epoch: EpochNumber,
530 stake: ValidatorMap,
531 block_reward: Option<RewardAmount>,
532 stake_table_hash: Option<StakeTableHash>,
533 ) -> anyhow::Result<()>;
534
535 async fn store_events(
536 &self,
537 l1_finalized: u64,
538 events: Vec<(EventKey, StakeTableEvent)>,
539 ) -> anyhow::Result<()>;
540 async fn load_events(
541 &self,
542 from_l1_block: u64,
543 l1_finalized: u64,
544 ) -> anyhow::Result<(
545 Option<EventsPersistenceRead>,
546 Vec<(EventKey, StakeTableEvent)>,
547 )>;
548
549 async fn store_all_validators(
550 &self,
551 epoch: EpochNumber,
552 all_validators: IndexMap<Address, Validator<PubKey>>,
553 ) -> anyhow::Result<()>;
554
555 async fn load_all_validators(
556 &self,
557 epoch: EpochNumber,
558 offset: u64,
559 limit: u64,
560 ) -> anyhow::Result<Vec<Validator<PubKey>>>;
561}
562
563#[async_trait]
564pub trait SequencerPersistence:
565 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
566{
567 fn into_catchup_provider(
569 self,
570 _backoff: BackoffParams,
571 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
572 bail!("state catchup is not implemented for this persistence type");
573 }
574
575 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
580
581 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
583
584 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
586
587 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
589
590 async fn load_quorum_proposals(
592 &self,
593 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
594
595 async fn load_quorum_proposal(
596 &self,
597 view: ViewNumber,
598 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
599
600 async fn load_vid_share(
601 &self,
602 view: ViewNumber,
603 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
604 async fn load_da_proposal(
605 &self,
606 view: ViewNumber,
607 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
608 async fn load_upgrade_certificate(
609 &self,
610 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
611 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
612 async fn load_state_cert(
613 &self,
614 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
615
616 async fn get_state_cert_by_epoch(
618 &self,
619 epoch: u64,
620 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
621
622 async fn insert_state_cert(
624 &self,
625 epoch: u64,
626 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
627 ) -> anyhow::Result<()>;
628
629 async fn load_consensus_state<V: Versions>(
636 &self,
637 state: NodeState,
638 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
639 let genesis_validated_state = ValidatedState::genesis(&state).0;
640 let highest_voted_view = match self
641 .load_latest_acted_view()
642 .await
643 .context("loading last voted view")?
644 {
645 Some(view) => {
646 tracing::info!(?view, "starting with last actioned view");
647 view
648 },
649 None => {
650 tracing::info!("no saved view, starting from genesis");
651 ViewNumber::genesis()
652 },
653 };
654
655 let restart_view = match self
656 .load_restart_view()
657 .await
658 .context("loading restart view")?
659 {
660 Some(view) => {
661 tracing::info!(?view, "starting from saved view");
662 view
663 },
664 None => {
665 tracing::info!("no saved view, starting from genesis");
666 ViewNumber::genesis()
667 },
668 };
669 let next_epoch_high_qc = self
670 .load_next_epoch_quorum_certificate()
671 .await
672 .context("loading next epoch qc")?;
673 let (leaf, mut high_qc, anchor_view) = match self
674 .load_anchor_leaf()
675 .await
676 .context("loading anchor leaf")?
677 {
678 Some((leaf, high_qc)) => {
679 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
680 ensure!(
681 leaf.view_number() == high_qc.view_number,
682 format!(
683 "loaded anchor leaf from view {}, but high QC is from view {}",
684 leaf.view_number(),
685 high_qc.view_number
686 )
687 );
688
689 let anchor_view = leaf.view_number();
690 (leaf, high_qc, Some(anchor_view))
691 },
692 None => {
693 tracing::info!("no saved leaf, starting from genesis leaf");
694 (
695 hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
696 .await,
697 QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
698 None,
699 )
700 },
701 };
702
703 if let Some((extended_high_qc, _)) = self.load_eqc().await {
704 if extended_high_qc.view_number() > high_qc.view_number() {
705 high_qc = extended_high_qc
706 }
707 }
708
709 let validated_state = if leaf.block_header().height() == 0 {
710 genesis_validated_state
712 } else {
713 ValidatedState::from_header(leaf.block_header())
716 };
717
718 let restart_view = max(restart_view, leaf.view_number());
723 let epoch = genesis_epoch_from_version::<V, SeqTypes>();
725
726 let config = self.load_config().await.context("loading config")?;
727 let epoch_height = config
728 .as_ref()
729 .map(|c| c.config.epoch_height)
730 .unwrap_or_default();
731 let epoch_start_block = config
732 .as_ref()
733 .map(|c| c.config.epoch_start_block)
734 .unwrap_or_default();
735
736 let saved_proposals = self
737 .load_quorum_proposals()
738 .await
739 .context("loading saved proposals")?;
740
741 let upgrade_certificate = self
742 .load_upgrade_certificate()
743 .await
744 .context("loading upgrade certificate")?;
745
746 let start_epoch_info = self
747 .load_start_epoch_info()
748 .await
749 .context("loading start epoch info")?;
750
751 let state_cert = self
752 .load_state_cert()
753 .await
754 .context("loading light client state update certificate")?;
755
756 tracing::warn!(
757 ?leaf,
758 ?restart_view,
759 ?epoch,
760 ?high_qc,
761 ?validated_state,
762 ?state_cert,
763 "loaded consensus state"
764 );
765
766 Ok((
767 HotShotInitializer {
768 instance_state: state,
769 epoch_height,
770 epoch_start_block,
771 anchor_leaf: leaf,
772 anchor_state: Arc::new(validated_state),
773 anchor_state_delta: None,
774 start_view: restart_view,
775 start_epoch: epoch,
776 last_actioned_view: highest_voted_view,
777 saved_proposals,
778 high_qc,
779 next_epoch_high_qc,
780 decided_upgrade_certificate: upgrade_certificate,
781 undecided_leaves: Default::default(),
782 undecided_state: Default::default(),
783 saved_vid_shares: Default::default(), start_epoch_info,
785 state_cert,
786 },
787 anchor_view,
788 ))
789 }
790
791 async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
793 if let EventType::Decide {
794 leaf_chain,
795 committing_qc,
796 deciding_qc,
797 ..
798 } = &event.event
799 {
800 let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
801 return;
803 };
804
805 let chain = leaf_chain.iter().zip(
807 std::iter::once((**committing_qc).clone())
809 .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
812 );
813
814 if let Err(err) = self
815 .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
816 .await
817 {
818 tracing::error!(
819 "failed to save decided leaves, chain may not be up to date: {err:#}"
820 );
821 return;
822 }
823 }
824 }
825
826 async fn append_decided_leaves(
852 &self,
853 decided_view: ViewNumber,
854 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
855 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
856 consumer: &(impl EventConsumer + 'static),
857 ) -> anyhow::Result<()>;
858
859 async fn load_anchor_leaf(
860 &self,
861 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
862 async fn append_vid(
863 &self,
864 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
865 ) -> anyhow::Result<()>;
866 async fn append_da(
867 &self,
868 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
869 vid_commit: VidCommitment,
870 ) -> anyhow::Result<()>;
871 async fn record_action(
872 &self,
873 view: ViewNumber,
874 epoch: Option<EpochNumber>,
875 action: HotShotAction,
876 ) -> anyhow::Result<()>;
877
878 async fn append_quorum_proposal2(
879 &self,
880 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
881 ) -> anyhow::Result<()>;
882
883 async fn store_eqc(
885 &self,
886 _high_qc: QuorumCertificate2<SeqTypes>,
887 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
888 ) -> anyhow::Result<()>;
889
890 async fn load_eqc(
892 &self,
893 ) -> Option<(
894 QuorumCertificate2<SeqTypes>,
895 NextEpochQuorumCertificate2<SeqTypes>,
896 )>;
897
898 async fn store_upgrade_certificate(
899 &self,
900 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
901 ) -> anyhow::Result<()>;
902
903 async fn migrate_storage(&self) -> anyhow::Result<()> {
904 tracing::warn!("migrating consensus data...");
905
906 self.migrate_anchor_leaf().await?;
907 self.migrate_da_proposals().await?;
908 self.migrate_vid_shares().await?;
909 self.migrate_quorum_proposals().await?;
910 self.migrate_quorum_certificates().await?;
911
912 tracing::warn!("consensus storage has been migrated to new types");
913
914 Ok(())
915 }
916
917 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
918 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
919 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
920 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
921 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
922
923 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
924 match self.load_anchor_leaf().await? {
925 Some((leaf, _)) => Ok(leaf.view_number()),
926 None => Ok(ViewNumber::genesis()),
927 }
928 }
929
930 async fn store_next_epoch_quorum_certificate(
931 &self,
932 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
933 ) -> anyhow::Result<()>;
934
935 async fn load_next_epoch_quorum_certificate(
936 &self,
937 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
938
939 async fn append_da2(
940 &self,
941 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
942 vid_commit: VidCommitment,
943 ) -> anyhow::Result<()>;
944
945 async fn append_proposal2(
946 &self,
947 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
948 ) -> anyhow::Result<()> {
949 self.append_quorum_proposal2(proposal).await
950 }
951
952 async fn store_drb_result(
953 &self,
954 epoch: <SeqTypes as NodeType>::Epoch,
955 drb_result: DrbResult,
956 ) -> anyhow::Result<()>;
957 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
958 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
959 async fn store_epoch_root(
960 &self,
961 epoch: <SeqTypes as NodeType>::Epoch,
962 block_header: <SeqTypes as NodeType>::BlockHeader,
963 ) -> anyhow::Result<()>;
964 async fn add_state_cert(
965 &self,
966 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
967 ) -> anyhow::Result<()>;
968
969 fn enable_metrics(&mut self, metrics: &dyn Metrics);
970}
971
972#[async_trait]
973pub trait EventConsumer: Debug + Send + Sync {
974 async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
975}
976
977#[async_trait]
978impl<T> EventConsumer for Box<T>
979where
980 T: EventConsumer + ?Sized,
981{
982 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
983 (**self).handle_event(event).await
984 }
985}
986
987#[derive(Clone, Copy, Debug)]
988pub struct NullEventConsumer;
989
990#[async_trait]
991impl EventConsumer for NullEventConsumer {
992 async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
993 Ok(())
994 }
995}
996
997#[async_trait]
998impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
999 async fn append_vid(
1000 &self,
1001 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1002 ) -> anyhow::Result<()> {
1003 (**self).append_vid(proposal).await
1004 }
1005
1006 async fn append_da(
1007 &self,
1008 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1009 vid_commit: VidCommitment,
1010 ) -> anyhow::Result<()> {
1011 (**self).append_da(proposal, vid_commit).await
1012 }
1013
1014 async fn append_da2(
1015 &self,
1016 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1017 vid_commit: VidCommitment,
1018 ) -> anyhow::Result<()> {
1019 (**self).append_da2(proposal, vid_commit).await
1020 }
1021
1022 async fn record_action(
1023 &self,
1024 view: ViewNumber,
1025 epoch: Option<EpochNumber>,
1026 action: HotShotAction,
1027 ) -> anyhow::Result<()> {
1028 (**self).record_action(view, epoch, action).await
1029 }
1030
1031 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1032 Ok(())
1033 }
1034
1035 async fn append_proposal(
1036 &self,
1037 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1038 ) -> anyhow::Result<()> {
1039 (**self)
1040 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1041 .await
1042 }
1043
1044 async fn append_proposal2(
1045 &self,
1046 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1047 ) -> anyhow::Result<()> {
1048 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1049 convert_proposal(proposal.clone());
1050 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1051 }
1052
1053 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1054 Ok(())
1055 }
1056
1057 async fn update_eqc(
1059 &self,
1060 high_qc: QuorumCertificate2<SeqTypes>,
1061 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1062 ) -> anyhow::Result<()> {
1063 if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1064 if high_qc.view_number() < existing_high_qc.view_number() {
1065 return Ok(());
1066 }
1067 }
1068
1069 (**self).store_eqc(high_qc, next_epoch_high_qc).await
1070 }
1071
1072 async fn update_next_epoch_high_qc2(
1073 &self,
1074 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1075 ) -> anyhow::Result<()> {
1076 Ok(())
1077 }
1078
1079 async fn update_decided_upgrade_certificate(
1080 &self,
1081 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1082 ) -> anyhow::Result<()> {
1083 (**self)
1084 .store_upgrade_certificate(decided_upgrade_certificate)
1085 .await
1086 }
1087
1088 async fn store_drb_result(
1089 &self,
1090 epoch: <SeqTypes as NodeType>::Epoch,
1091 drb_result: DrbResult,
1092 ) -> anyhow::Result<()> {
1093 (**self).store_drb_result(epoch, drb_result).await
1094 }
1095
1096 async fn store_epoch_root(
1097 &self,
1098 epoch: <SeqTypes as NodeType>::Epoch,
1099 block_header: <SeqTypes as NodeType>::BlockHeader,
1100 ) -> anyhow::Result<()> {
1101 (**self).store_epoch_root(epoch, block_header).await
1102 }
1103
1104 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1105 (**self).store_drb_input(drb_input).await
1106 }
1107
1108 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1109 (**self).load_drb_input(epoch).await
1110 }
1111
1112 async fn update_state_cert(
1113 &self,
1114 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1115 ) -> anyhow::Result<()> {
1116 (**self).add_state_cert(state_cert).await
1117 }
1118}
1119
1120pub trait FromNsPayloadBytes<'a> {
1125 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1127}
1128
1129pub trait NsPayloadBytesRange<'a> {
1134 type Output: FromNsPayloadBytes<'a>;
1135
1136 fn ns_payload_range(&self) -> Range<usize>;
1138}
1139
1140pub trait FromStringOrInteger: Sized {
1151 type Binary: Serialize + DeserializeOwned;
1152 type Integer: Serialize + DeserializeOwned;
1153
1154 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1155 fn from_string(s: String) -> anyhow::Result<Self>;
1156 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1157
1158 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1159 fn to_string(&self) -> anyhow::Result<String>;
1160}