1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::U256;
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13 data::{
14 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17 },
18 drb::{DrbInput, DrbResult},
19 event::{HotShotAction, LeafInfo},
20 message::{convert_proposal, Proposal},
21 simple_certificate::{
22 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
23 QuorumCertificate2, UpgradeCertificate,
24 },
25 stake_table::HSStakeTable,
26 traits::{
27 metrics::Metrics,
28 node_implementation::{ConsensusTime, NodeType, Versions},
29 storage::Storage,
30 ValidatedState as HotShotState,
31 },
32 utils::genesis_epoch_from_version,
33};
34use serde::{de::DeserializeOwned, Serialize};
35
36use super::{
37 impls::NodeState,
38 utils::BackoffParams,
39 v0_3::{EventKey, IndexedStake, StakeTableEvent},
40};
41use crate::{
42 v0::impls::{StakeTableHash, ValidatedState},
43 v0_3::{
44 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
45 },
46 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
47 BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
48 SeqTypes, ValidatorMap,
49};
50
51#[async_trait]
52pub trait StateCatchup: Send + Sync {
53 async fn try_fetch_leaf(
55 &self,
56 retry: usize,
57 height: u64,
58 stake_table: HSStakeTable<SeqTypes>,
59 success_threshold: U256,
60 ) -> anyhow::Result<Leaf2>;
61
62 async fn fetch_leaf(
64 &self,
65 height: u64,
66 stake_table: HSStakeTable<SeqTypes>,
67 success_threshold: U256,
68 ) -> anyhow::Result<Leaf2> {
69 self.backoff()
70 .retry(self, |provider, retry| {
71 let stake_table_clone = stake_table.clone();
72 async move {
73 provider
74 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
75 .await
76 }
77 .boxed()
78 })
79 .await
80 }
81
82 async fn try_fetch_accounts(
84 &self,
85 retry: usize,
86 instance: &NodeState,
87 height: u64,
88 view: ViewNumber,
89 fee_merkle_tree_root: FeeMerkleCommitment,
90 accounts: &[FeeAccount],
91 ) -> anyhow::Result<Vec<FeeAccountProof>>;
92
93 async fn fetch_accounts(
95 &self,
96 instance: &NodeState,
97 height: u64,
98 view: ViewNumber,
99 fee_merkle_tree_root: FeeMerkleCommitment,
100 accounts: Vec<FeeAccount>,
101 ) -> anyhow::Result<Vec<FeeAccountProof>> {
102 self.backoff()
103 .retry(self, |provider, retry| {
104 let accounts = &accounts;
105 async move {
106 provider
107 .try_fetch_accounts(
108 retry,
109 instance,
110 height,
111 view,
112 fee_merkle_tree_root,
113 accounts,
114 )
115 .await
116 .map_err(|err| {
117 err.context(format!(
118 "fetching accounts {accounts:?}, height {height}, view {view}"
119 ))
120 })
121 }
122 .boxed()
123 })
124 .await
125 }
126
127 async fn try_remember_blocks_merkle_tree(
129 &self,
130 retry: usize,
131 instance: &NodeState,
132 height: u64,
133 view: ViewNumber,
134 mt: &mut BlockMerkleTree,
135 ) -> anyhow::Result<()>;
136
137 async fn remember_blocks_merkle_tree(
139 &self,
140 instance: &NodeState,
141 height: u64,
142 view: ViewNumber,
143 mt: &mut BlockMerkleTree,
144 ) -> anyhow::Result<()> {
145 self.backoff()
146 .retry(mt, |mt, retry| {
147 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
148 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
149 .boxed()
150 })
151 .await
152 }
153
154 async fn try_fetch_chain_config(
156 &self,
157 retry: usize,
158 commitment: Commitment<ChainConfig>,
159 ) -> anyhow::Result<ChainConfig>;
160
161 async fn fetch_chain_config(
163 &self,
164 commitment: Commitment<ChainConfig>,
165 ) -> anyhow::Result<ChainConfig> {
166 self.backoff()
167 .retry(self, |provider, retry| {
168 provider
169 .try_fetch_chain_config(retry, commitment)
170 .map_err(|err| err.context("fetching chain config"))
171 .boxed()
172 })
173 .await
174 }
175
176 async fn try_fetch_reward_accounts_v2(
178 &self,
179 retry: usize,
180 instance: &NodeState,
181 height: u64,
182 view: ViewNumber,
183 reward_merkle_tree_root: RewardMerkleCommitmentV2,
184 accounts: &[RewardAccountV2],
185 ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
186
187 async fn fetch_reward_accounts_v2(
189 &self,
190 instance: &NodeState,
191 height: u64,
192 view: ViewNumber,
193 reward_merkle_tree_root: RewardMerkleCommitmentV2,
194 accounts: Vec<RewardAccountV2>,
195 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
196 self.backoff()
197 .retry(self, |provider, retry| {
198 let accounts = &accounts;
199 async move {
200 provider
201 .try_fetch_reward_accounts_v2(
202 retry,
203 instance,
204 height,
205 view,
206 reward_merkle_tree_root,
207 accounts,
208 )
209 .await
210 .map_err(|err| {
211 err.context(format!(
212 "fetching reward accounts {accounts:?}, height {height}, view \
213 {view}"
214 ))
215 })
216 }
217 .boxed()
218 })
219 .await
220 }
221
222 async fn try_fetch_reward_accounts_v1(
224 &self,
225 retry: usize,
226 instance: &NodeState,
227 height: u64,
228 view: ViewNumber,
229 reward_merkle_tree_root: RewardMerkleCommitmentV1,
230 accounts: &[RewardAccountV1],
231 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
232
233 async fn fetch_reward_accounts_v1(
235 &self,
236 instance: &NodeState,
237 height: u64,
238 view: ViewNumber,
239 reward_merkle_tree_root: RewardMerkleCommitmentV1,
240 accounts: Vec<RewardAccountV1>,
241 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
242 self.backoff()
243 .retry(self, |provider, retry| {
244 let accounts = &accounts;
245 async move {
246 provider
247 .try_fetch_reward_accounts_v1(
248 retry,
249 instance,
250 height,
251 view,
252 reward_merkle_tree_root,
253 accounts,
254 )
255 .await
256 .map_err(|err| {
257 err.context(format!(
258 "fetching v1 reward accounts {accounts:?}, height {height}, view \
259 {view}"
260 ))
261 })
262 }
263 .boxed()
264 })
265 .await
266 }
267
268 fn is_local(&self) -> bool;
270
271 fn backoff(&self) -> &BackoffParams;
273
274 fn name(&self) -> String;
276}
277
278#[async_trait]
279impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
280 async fn try_fetch_leaf(
281 &self,
282 retry: usize,
283 height: u64,
284 stake_table: HSStakeTable<SeqTypes>,
285 success_threshold: U256,
286 ) -> anyhow::Result<Leaf2> {
287 (**self)
288 .try_fetch_leaf(retry, height, stake_table, success_threshold)
289 .await
290 }
291
292 async fn fetch_leaf(
293 &self,
294 height: u64,
295 stake_table: HSStakeTable<SeqTypes>,
296 success_threshold: U256,
297 ) -> anyhow::Result<Leaf2> {
298 (**self)
299 .fetch_leaf(height, stake_table, success_threshold)
300 .await
301 }
302 async fn try_fetch_accounts(
303 &self,
304 retry: usize,
305 instance: &NodeState,
306 height: u64,
307 view: ViewNumber,
308 fee_merkle_tree_root: FeeMerkleCommitment,
309 accounts: &[FeeAccount],
310 ) -> anyhow::Result<Vec<FeeAccountProof>> {
311 (**self)
312 .try_fetch_accounts(
313 retry,
314 instance,
315 height,
316 view,
317 fee_merkle_tree_root,
318 accounts,
319 )
320 .await
321 }
322
323 async fn fetch_accounts(
324 &self,
325 instance: &NodeState,
326 height: u64,
327 view: ViewNumber,
328 fee_merkle_tree_root: FeeMerkleCommitment,
329 accounts: Vec<FeeAccount>,
330 ) -> anyhow::Result<Vec<FeeAccountProof>> {
331 (**self)
332 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
333 .await
334 }
335
336 async fn try_remember_blocks_merkle_tree(
337 &self,
338 retry: usize,
339 instance: &NodeState,
340 height: u64,
341 view: ViewNumber,
342 mt: &mut BlockMerkleTree,
343 ) -> anyhow::Result<()> {
344 (**self)
345 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
346 .await
347 }
348
349 async fn remember_blocks_merkle_tree(
350 &self,
351 instance: &NodeState,
352 height: u64,
353 view: ViewNumber,
354 mt: &mut BlockMerkleTree,
355 ) -> anyhow::Result<()> {
356 (**self)
357 .remember_blocks_merkle_tree(instance, height, view, mt)
358 .await
359 }
360
361 async fn try_fetch_chain_config(
362 &self,
363 retry: usize,
364 commitment: Commitment<ChainConfig>,
365 ) -> anyhow::Result<ChainConfig> {
366 (**self).try_fetch_chain_config(retry, commitment).await
367 }
368
369 async fn fetch_chain_config(
370 &self,
371 commitment: Commitment<ChainConfig>,
372 ) -> anyhow::Result<ChainConfig> {
373 (**self).fetch_chain_config(commitment).await
374 }
375
376 async fn try_fetch_reward_accounts_v2(
377 &self,
378 retry: usize,
379 instance: &NodeState,
380 height: u64,
381 view: ViewNumber,
382 reward_merkle_tree_root: RewardMerkleCommitmentV2,
383 accounts: &[RewardAccountV2],
384 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
385 (**self)
386 .try_fetch_reward_accounts_v2(
387 retry,
388 instance,
389 height,
390 view,
391 reward_merkle_tree_root,
392 accounts,
393 )
394 .await
395 }
396
397 async fn fetch_reward_accounts_v2(
398 &self,
399 instance: &NodeState,
400 height: u64,
401 view: ViewNumber,
402 reward_merkle_tree_root: RewardMerkleCommitmentV2,
403 accounts: Vec<RewardAccountV2>,
404 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
405 (**self)
406 .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
407 .await
408 }
409
410 async fn try_fetch_reward_accounts_v1(
411 &self,
412 retry: usize,
413 instance: &NodeState,
414 height: u64,
415 view: ViewNumber,
416 reward_merkle_tree_root: RewardMerkleCommitmentV1,
417 accounts: &[RewardAccountV1],
418 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
419 (**self)
420 .try_fetch_reward_accounts_v1(
421 retry,
422 instance,
423 height,
424 view,
425 reward_merkle_tree_root,
426 accounts,
427 )
428 .await
429 }
430
431 async fn fetch_reward_accounts_v1(
432 &self,
433 instance: &NodeState,
434 height: u64,
435 view: ViewNumber,
436 reward_merkle_tree_root: RewardMerkleCommitmentV1,
437 accounts: Vec<RewardAccountV1>,
438 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
439 (**self)
440 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
441 .await
442 }
443
444 fn backoff(&self) -> &BackoffParams {
445 (**self).backoff()
446 }
447
448 fn name(&self) -> String {
449 (**self).name()
450 }
451
452 fn is_local(&self) -> bool {
453 (**self).is_local()
454 }
455}
456
457#[async_trait]
458pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
459 type Persistence: SequencerPersistence + MembershipPersistence;
460
461 fn set_view_retention(&mut self, view_retention: u64);
462 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
463 async fn reset(self) -> anyhow::Result<()>;
464}
465
466pub enum EventsPersistenceRead {
470 Complete,
471 UntilL1Block(u64),
472}
473
474#[async_trait]
475pub trait MembershipPersistence: Send + Sync + 'static {
477 async fn load_stake(
479 &self,
480 epoch: EpochNumber,
481 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
482
483 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
485
486 async fn store_stake(
488 &self,
489 epoch: EpochNumber,
490 stake: ValidatorMap,
491 block_reward: Option<RewardAmount>,
492 stake_table_hash: Option<StakeTableHash>,
493 ) -> anyhow::Result<()>;
494
495 async fn store_events(
496 &self,
497 l1_finalized: u64,
498 events: Vec<(EventKey, StakeTableEvent)>,
499 ) -> anyhow::Result<()>;
500 async fn load_events(
501 &self,
502 l1_finalized: u64,
503 ) -> anyhow::Result<(
504 Option<EventsPersistenceRead>,
505 Vec<(EventKey, StakeTableEvent)>,
506 )>;
507}
508
509#[async_trait]
510pub trait SequencerPersistence:
511 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage
512{
513 fn into_catchup_provider(
515 self,
516 _backoff: BackoffParams,
517 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
518 bail!("state catchup is not implemented for this persistence type");
519 }
520
521 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
526
527 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
529
530 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
532
533 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
535
536 async fn load_quorum_proposals(
538 &self,
539 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
540
541 async fn load_quorum_proposal(
542 &self,
543 view: ViewNumber,
544 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
545
546 async fn load_vid_share(
547 &self,
548 view: ViewNumber,
549 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
550 async fn load_da_proposal(
551 &self,
552 view: ViewNumber,
553 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
554 async fn load_upgrade_certificate(
555 &self,
556 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
557 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
558 async fn load_state_cert(
559 &self,
560 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
561
562 async fn load_consensus_state<V: Versions>(
569 &self,
570 state: NodeState,
571 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
572 let genesis_validated_state = ValidatedState::genesis(&state).0;
573 let highest_voted_view = match self
574 .load_latest_acted_view()
575 .await
576 .context("loading last voted view")?
577 {
578 Some(view) => {
579 tracing::info!(?view, "starting with last actioned view");
580 view
581 },
582 None => {
583 tracing::info!("no saved view, starting from genesis");
584 ViewNumber::genesis()
585 },
586 };
587
588 let restart_view = match self
589 .load_restart_view()
590 .await
591 .context("loading restart view")?
592 {
593 Some(view) => {
594 tracing::info!(?view, "starting from saved view");
595 view
596 },
597 None => {
598 tracing::info!("no saved view, starting from genesis");
599 ViewNumber::genesis()
600 },
601 };
602 let next_epoch_high_qc = self
603 .load_next_epoch_quorum_certificate()
604 .await
605 .context("loading next epoch qc")?;
606 let (leaf, high_qc, anchor_view) = match self
607 .load_anchor_leaf()
608 .await
609 .context("loading anchor leaf")?
610 {
611 Some((leaf, high_qc)) => {
612 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
613 ensure!(
614 leaf.view_number() == high_qc.view_number,
615 format!(
616 "loaded anchor leaf from view {}, but high QC is from view {}",
617 leaf.view_number(),
618 high_qc.view_number
619 )
620 );
621
622 let anchor_view = leaf.view_number();
623 (leaf, high_qc, Some(anchor_view))
624 },
625 None => {
626 tracing::info!("no saved leaf, starting from genesis leaf");
627 (
628 hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
629 .await,
630 QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
631 None,
632 )
633 },
634 };
635 let validated_state = if leaf.block_header().height() == 0 {
636 genesis_validated_state
638 } else {
639 ValidatedState::from_header(leaf.block_header())
642 };
643
644 let restart_view = max(restart_view, leaf.view_number());
649 let epoch = genesis_epoch_from_version::<V, SeqTypes>();
651
652 let config = self.load_config().await.context("loading config")?;
653 let epoch_height = config
654 .as_ref()
655 .map(|c| c.config.epoch_height)
656 .unwrap_or_default();
657 let epoch_start_block = config
658 .as_ref()
659 .map(|c| c.config.epoch_start_block)
660 .unwrap_or_default();
661
662 let saved_proposals = self
663 .load_quorum_proposals()
664 .await
665 .context("loading saved proposals")?;
666
667 let upgrade_certificate = self
668 .load_upgrade_certificate()
669 .await
670 .context("loading upgrade certificate")?;
671
672 let start_epoch_info = self
673 .load_start_epoch_info()
674 .await
675 .context("loading start epoch info")?;
676
677 let state_cert = self
678 .load_state_cert()
679 .await
680 .context("loading light client state update certificate")?;
681
682 tracing::info!(
683 ?leaf,
684 ?restart_view,
685 ?epoch,
686 ?high_qc,
687 ?validated_state,
688 ?state_cert,
689 "loaded consensus state"
690 );
691
692 Ok((
693 HotShotInitializer {
694 instance_state: state,
695 epoch_height,
696 epoch_start_block,
697 anchor_leaf: leaf,
698 anchor_state: Arc::new(validated_state),
699 anchor_state_delta: None,
700 start_view: restart_view,
701 start_epoch: epoch,
702 last_actioned_view: highest_voted_view,
703 saved_proposals,
704 high_qc,
705 next_epoch_high_qc,
706 decided_upgrade_certificate: upgrade_certificate,
707 undecided_leaves: Default::default(),
708 undecided_state: Default::default(),
709 saved_vid_shares: Default::default(), start_epoch_info,
711 state_cert,
712 },
713 anchor_view,
714 ))
715 }
716
717 async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
719 if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
720 let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
721 return;
723 };
724
725 let chain = leaf_chain.iter().zip(
727 std::iter::once((**qc).clone())
729 .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
732 );
733
734 if let Err(err) = self
735 .append_decided_leaves(leaf.view_number(), chain, consumer)
736 .await
737 {
738 tracing::error!(
739 "failed to save decided leaves, chain may not be up to date: {err:#}"
740 );
741 return;
742 }
743 }
744 }
745
746 async fn append_decided_leaves(
772 &self,
773 decided_view: ViewNumber,
774 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
775 consumer: &(impl EventConsumer + 'static),
776 ) -> anyhow::Result<()>;
777
778 async fn load_anchor_leaf(
779 &self,
780 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
781 async fn append_vid(
782 &self,
783 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
784 ) -> anyhow::Result<()>;
785 async fn append_vid2(
787 &self,
788 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
789 ) -> anyhow::Result<()>;
790 async fn append_da(
791 &self,
792 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
793 vid_commit: VidCommitment,
794 ) -> anyhow::Result<()>;
795 async fn record_action(
796 &self,
797 view: ViewNumber,
798 epoch: Option<EpochNumber>,
799 action: HotShotAction,
800 ) -> anyhow::Result<()>;
801
802 async fn append_quorum_proposal2(
803 &self,
804 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
805 ) -> anyhow::Result<()>;
806 async fn store_upgrade_certificate(
807 &self,
808 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
809 ) -> anyhow::Result<()>;
810 async fn migrate_consensus(&self) -> anyhow::Result<()> {
811 tracing::warn!("migrating consensus data...");
812
813 self.migrate_anchor_leaf().await?;
814 self.migrate_da_proposals().await?;
815 self.migrate_vid_shares().await?;
816 self.migrate_quorum_proposals().await?;
817 self.migrate_quorum_certificates().await?;
818
819 tracing::warn!("consensus storage has been migrated to new types");
820
821 Ok(())
822 }
823
824 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
825 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
826 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
827 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
828 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
829
830 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
831 match self.load_anchor_leaf().await? {
832 Some((leaf, _)) => Ok(leaf.view_number()),
833 None => Ok(ViewNumber::genesis()),
834 }
835 }
836
837 async fn store_next_epoch_quorum_certificate(
838 &self,
839 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
840 ) -> anyhow::Result<()>;
841
842 async fn load_next_epoch_quorum_certificate(
843 &self,
844 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
845
846 async fn append_da2(
847 &self,
848 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
849 vid_commit: VidCommitment,
850 ) -> anyhow::Result<()>;
851
852 async fn append_proposal2(
853 &self,
854 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
855 ) -> anyhow::Result<()> {
856 self.append_quorum_proposal2(proposal).await
857 }
858
859 async fn store_drb_result(
860 &self,
861 epoch: <SeqTypes as NodeType>::Epoch,
862 drb_result: DrbResult,
863 ) -> anyhow::Result<()>;
864 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
865 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
866 async fn store_epoch_root(
867 &self,
868 epoch: <SeqTypes as NodeType>::Epoch,
869 block_header: <SeqTypes as NodeType>::BlockHeader,
870 ) -> anyhow::Result<()>;
871 async fn add_state_cert(
872 &self,
873 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
874 ) -> anyhow::Result<()>;
875
876 fn enable_metrics(&mut self, metrics: &dyn Metrics);
877}
878
879#[async_trait]
880pub trait EventConsumer: Debug + Send + Sync {
881 async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
882}
883
884#[async_trait]
885impl<T> EventConsumer for Box<T>
886where
887 T: EventConsumer + ?Sized,
888{
889 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
890 (**self).handle_event(event).await
891 }
892}
893
894#[derive(Clone, Copy, Debug)]
895pub struct NullEventConsumer;
896
897#[async_trait]
898impl EventConsumer for NullEventConsumer {
899 async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
900 Ok(())
901 }
902}
903
904#[async_trait]
905impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
906 async fn append_vid(
907 &self,
908 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
909 ) -> anyhow::Result<()> {
910 (**self).append_vid(proposal).await
911 }
912
913 async fn append_vid2(
914 &self,
915 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
916 ) -> anyhow::Result<()> {
917 (**self).append_vid2(proposal).await
918 }
919
920 async fn append_da(
921 &self,
922 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
923 vid_commit: VidCommitment,
924 ) -> anyhow::Result<()> {
925 (**self).append_da(proposal, vid_commit).await
926 }
927
928 async fn append_da2(
929 &self,
930 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
931 vid_commit: VidCommitment,
932 ) -> anyhow::Result<()> {
933 (**self).append_da2(proposal, vid_commit).await
934 }
935
936 async fn record_action(
937 &self,
938 view: ViewNumber,
939 epoch: Option<EpochNumber>,
940 action: HotShotAction,
941 ) -> anyhow::Result<()> {
942 (**self).record_action(view, epoch, action).await
943 }
944
945 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
946 Ok(())
947 }
948
949 async fn append_proposal(
950 &self,
951 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
952 ) -> anyhow::Result<()> {
953 (**self)
954 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
955 .await
956 }
957
958 async fn append_proposal2(
959 &self,
960 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
961 ) -> anyhow::Result<()> {
962 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
963 convert_proposal(proposal.clone());
964 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
965 }
966
967 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
968 Ok(())
969 }
970
971 async fn update_decided_upgrade_certificate(
972 &self,
973 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
974 ) -> anyhow::Result<()> {
975 (**self)
976 .store_upgrade_certificate(decided_upgrade_certificate)
977 .await
978 }
979
980 async fn store_drb_result(
981 &self,
982 epoch: <SeqTypes as NodeType>::Epoch,
983 drb_result: DrbResult,
984 ) -> anyhow::Result<()> {
985 (**self).store_drb_result(epoch, drb_result).await
986 }
987
988 async fn store_epoch_root(
989 &self,
990 epoch: <SeqTypes as NodeType>::Epoch,
991 block_header: <SeqTypes as NodeType>::BlockHeader,
992 ) -> anyhow::Result<()> {
993 (**self).store_epoch_root(epoch, block_header).await
994 }
995
996 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
997 (**self).store_drb_input(drb_input).await
998 }
999
1000 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1001 (**self).load_drb_input(epoch).await
1002 }
1003
1004 async fn update_state_cert(
1005 &self,
1006 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1007 ) -> anyhow::Result<()> {
1008 (**self).add_state_cert(state_cert).await
1009 }
1010}
1011
1012pub trait FromNsPayloadBytes<'a> {
1017 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1019}
1020
1021pub trait NsPayloadBytesRange<'a> {
1026 type Output: FromNsPayloadBytes<'a>;
1027
1028 fn ns_payload_range(&self) -> Range<usize>;
1030}
1031
1032pub trait FromStringOrInteger: Sized {
1043 type Binary: Serialize + DeserializeOwned;
1044 type Integer: Serialize + DeserializeOwned;
1045
1046 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1047 fn from_string(s: String) -> anyhow::Result<Self>;
1048 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1049
1050 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1051 fn to_string(&self) -> anyhow::Result<String>;
1052}