1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::U256;
6use anyhow::{bail, ensure, Context};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{types::EventType, HotShotInitializer, InitializerEpochInfo};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13 data::{
14 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
15 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17 },
18 drb::{DrbInput, DrbResult},
19 event::{HotShotAction, LeafInfo},
20 message::{convert_proposal, Proposal},
21 simple_certificate::{
22 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
23 QuorumCertificate2, UpgradeCertificate,
24 },
25 stake_table::HSStakeTable,
26 traits::{
27 metrics::Metrics,
28 node_implementation::{ConsensusTime, NodeType, Versions},
29 storage::Storage,
30 ValidatedState as HotShotState,
31 },
32 utils::genesis_epoch_from_version,
33 vote::HasViewNumber,
34};
35use serde::{de::DeserializeOwned, Serialize};
36
37use super::{
38 impls::NodeState,
39 utils::BackoffParams,
40 v0_3::{EventKey, IndexedStake, StakeTableEvent},
41};
42use crate::{
43 v0::impls::{StakeTableHash, ValidatedState},
44 v0_3::{
45 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
46 },
47 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
48 BlockMerkleTree, Event, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2, NetworkConfig,
49 SeqTypes, ValidatorMap,
50};
51
52#[async_trait]
53pub trait StateCatchup: Send + Sync {
54 async fn try_fetch_leaf(
56 &self,
57 retry: usize,
58 height: u64,
59 stake_table: HSStakeTable<SeqTypes>,
60 success_threshold: U256,
61 ) -> anyhow::Result<Leaf2>;
62
63 async fn fetch_leaf(
65 &self,
66 height: u64,
67 stake_table: HSStakeTable<SeqTypes>,
68 success_threshold: U256,
69 ) -> anyhow::Result<Leaf2> {
70 self.backoff()
71 .retry(self, |provider, retry| {
72 let stake_table_clone = stake_table.clone();
73 async move {
74 provider
75 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
76 .await
77 }
78 .boxed()
79 })
80 .await
81 }
82
83 async fn try_fetch_accounts(
85 &self,
86 retry: usize,
87 instance: &NodeState,
88 height: u64,
89 view: ViewNumber,
90 fee_merkle_tree_root: FeeMerkleCommitment,
91 accounts: &[FeeAccount],
92 ) -> anyhow::Result<Vec<FeeAccountProof>>;
93
94 async fn fetch_accounts(
96 &self,
97 instance: &NodeState,
98 height: u64,
99 view: ViewNumber,
100 fee_merkle_tree_root: FeeMerkleCommitment,
101 accounts: Vec<FeeAccount>,
102 ) -> anyhow::Result<Vec<FeeAccountProof>> {
103 self.backoff()
104 .retry(self, |provider, retry| {
105 let accounts = &accounts;
106 async move {
107 provider
108 .try_fetch_accounts(
109 retry,
110 instance,
111 height,
112 view,
113 fee_merkle_tree_root,
114 accounts,
115 )
116 .await
117 .map_err(|err| {
118 err.context(format!(
119 "fetching accounts {accounts:?}, height {height}, view {view}"
120 ))
121 })
122 }
123 .boxed()
124 })
125 .await
126 }
127
128 async fn try_remember_blocks_merkle_tree(
130 &self,
131 retry: usize,
132 instance: &NodeState,
133 height: u64,
134 view: ViewNumber,
135 mt: &mut BlockMerkleTree,
136 ) -> anyhow::Result<()>;
137
138 async fn remember_blocks_merkle_tree(
140 &self,
141 instance: &NodeState,
142 height: u64,
143 view: ViewNumber,
144 mt: &mut BlockMerkleTree,
145 ) -> anyhow::Result<()> {
146 self.backoff()
147 .retry(mt, |mt, retry| {
148 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
149 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
150 .boxed()
151 })
152 .await
153 }
154
155 async fn try_fetch_chain_config(
157 &self,
158 retry: usize,
159 commitment: Commitment<ChainConfig>,
160 ) -> anyhow::Result<ChainConfig>;
161
162 async fn fetch_chain_config(
164 &self,
165 commitment: Commitment<ChainConfig>,
166 ) -> anyhow::Result<ChainConfig> {
167 self.backoff()
168 .retry(self, |provider, retry| {
169 provider
170 .try_fetch_chain_config(retry, commitment)
171 .map_err(|err| err.context("fetching chain config"))
172 .boxed()
173 })
174 .await
175 }
176
177 async fn try_fetch_reward_accounts_v2(
179 &self,
180 retry: usize,
181 instance: &NodeState,
182 height: u64,
183 view: ViewNumber,
184 reward_merkle_tree_root: RewardMerkleCommitmentV2,
185 accounts: &[RewardAccountV2],
186 ) -> anyhow::Result<Vec<RewardAccountProofV2>>;
187
188 async fn fetch_reward_accounts_v2(
190 &self,
191 instance: &NodeState,
192 height: u64,
193 view: ViewNumber,
194 reward_merkle_tree_root: RewardMerkleCommitmentV2,
195 accounts: Vec<RewardAccountV2>,
196 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
197 self.backoff()
198 .retry(self, |provider, retry| {
199 let accounts = &accounts;
200 async move {
201 provider
202 .try_fetch_reward_accounts_v2(
203 retry,
204 instance,
205 height,
206 view,
207 reward_merkle_tree_root,
208 accounts,
209 )
210 .await
211 .map_err(|err| {
212 err.context(format!(
213 "fetching reward accounts {accounts:?}, height {height}, view \
214 {view}"
215 ))
216 })
217 }
218 .boxed()
219 })
220 .await
221 }
222
223 async fn try_fetch_reward_accounts_v1(
225 &self,
226 retry: usize,
227 instance: &NodeState,
228 height: u64,
229 view: ViewNumber,
230 reward_merkle_tree_root: RewardMerkleCommitmentV1,
231 accounts: &[RewardAccountV1],
232 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
233
234 async fn fetch_reward_accounts_v1(
236 &self,
237 instance: &NodeState,
238 height: u64,
239 view: ViewNumber,
240 reward_merkle_tree_root: RewardMerkleCommitmentV1,
241 accounts: Vec<RewardAccountV1>,
242 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
243 self.backoff()
244 .retry(self, |provider, retry| {
245 let accounts = &accounts;
246 async move {
247 provider
248 .try_fetch_reward_accounts_v1(
249 retry,
250 instance,
251 height,
252 view,
253 reward_merkle_tree_root,
254 accounts,
255 )
256 .await
257 .map_err(|err| {
258 err.context(format!(
259 "fetching v1 reward accounts {accounts:?}, height {height}, view \
260 {view}"
261 ))
262 })
263 }
264 .boxed()
265 })
266 .await
267 }
268
269 fn is_local(&self) -> bool;
271
272 fn backoff(&self) -> &BackoffParams;
274
275 fn name(&self) -> String;
277}
278
279#[async_trait]
280impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
281 async fn try_fetch_leaf(
282 &self,
283 retry: usize,
284 height: u64,
285 stake_table: HSStakeTable<SeqTypes>,
286 success_threshold: U256,
287 ) -> anyhow::Result<Leaf2> {
288 (**self)
289 .try_fetch_leaf(retry, height, stake_table, success_threshold)
290 .await
291 }
292
293 async fn fetch_leaf(
294 &self,
295 height: u64,
296 stake_table: HSStakeTable<SeqTypes>,
297 success_threshold: U256,
298 ) -> anyhow::Result<Leaf2> {
299 (**self)
300 .fetch_leaf(height, stake_table, success_threshold)
301 .await
302 }
303 async fn try_fetch_accounts(
304 &self,
305 retry: usize,
306 instance: &NodeState,
307 height: u64,
308 view: ViewNumber,
309 fee_merkle_tree_root: FeeMerkleCommitment,
310 accounts: &[FeeAccount],
311 ) -> anyhow::Result<Vec<FeeAccountProof>> {
312 (**self)
313 .try_fetch_accounts(
314 retry,
315 instance,
316 height,
317 view,
318 fee_merkle_tree_root,
319 accounts,
320 )
321 .await
322 }
323
324 async fn fetch_accounts(
325 &self,
326 instance: &NodeState,
327 height: u64,
328 view: ViewNumber,
329 fee_merkle_tree_root: FeeMerkleCommitment,
330 accounts: Vec<FeeAccount>,
331 ) -> anyhow::Result<Vec<FeeAccountProof>> {
332 (**self)
333 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
334 .await
335 }
336
337 async fn try_remember_blocks_merkle_tree(
338 &self,
339 retry: usize,
340 instance: &NodeState,
341 height: u64,
342 view: ViewNumber,
343 mt: &mut BlockMerkleTree,
344 ) -> anyhow::Result<()> {
345 (**self)
346 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
347 .await
348 }
349
350 async fn remember_blocks_merkle_tree(
351 &self,
352 instance: &NodeState,
353 height: u64,
354 view: ViewNumber,
355 mt: &mut BlockMerkleTree,
356 ) -> anyhow::Result<()> {
357 (**self)
358 .remember_blocks_merkle_tree(instance, height, view, mt)
359 .await
360 }
361
362 async fn try_fetch_chain_config(
363 &self,
364 retry: usize,
365 commitment: Commitment<ChainConfig>,
366 ) -> anyhow::Result<ChainConfig> {
367 (**self).try_fetch_chain_config(retry, commitment).await
368 }
369
370 async fn fetch_chain_config(
371 &self,
372 commitment: Commitment<ChainConfig>,
373 ) -> anyhow::Result<ChainConfig> {
374 (**self).fetch_chain_config(commitment).await
375 }
376
377 async fn try_fetch_reward_accounts_v2(
378 &self,
379 retry: usize,
380 instance: &NodeState,
381 height: u64,
382 view: ViewNumber,
383 reward_merkle_tree_root: RewardMerkleCommitmentV2,
384 accounts: &[RewardAccountV2],
385 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
386 (**self)
387 .try_fetch_reward_accounts_v2(
388 retry,
389 instance,
390 height,
391 view,
392 reward_merkle_tree_root,
393 accounts,
394 )
395 .await
396 }
397
398 async fn fetch_reward_accounts_v2(
399 &self,
400 instance: &NodeState,
401 height: u64,
402 view: ViewNumber,
403 reward_merkle_tree_root: RewardMerkleCommitmentV2,
404 accounts: Vec<RewardAccountV2>,
405 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
406 (**self)
407 .fetch_reward_accounts_v2(instance, height, view, reward_merkle_tree_root, accounts)
408 .await
409 }
410
411 async fn try_fetch_reward_accounts_v1(
412 &self,
413 retry: usize,
414 instance: &NodeState,
415 height: u64,
416 view: ViewNumber,
417 reward_merkle_tree_root: RewardMerkleCommitmentV1,
418 accounts: &[RewardAccountV1],
419 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
420 (**self)
421 .try_fetch_reward_accounts_v1(
422 retry,
423 instance,
424 height,
425 view,
426 reward_merkle_tree_root,
427 accounts,
428 )
429 .await
430 }
431
432 async fn fetch_reward_accounts_v1(
433 &self,
434 instance: &NodeState,
435 height: u64,
436 view: ViewNumber,
437 reward_merkle_tree_root: RewardMerkleCommitmentV1,
438 accounts: Vec<RewardAccountV1>,
439 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
440 (**self)
441 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
442 .await
443 }
444
445 fn backoff(&self) -> &BackoffParams {
446 (**self).backoff()
447 }
448
449 fn name(&self) -> String {
450 (**self).name()
451 }
452
453 fn is_local(&self) -> bool {
454 (**self).is_local()
455 }
456}
457
458#[async_trait]
459pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
460 type Persistence: SequencerPersistence + MembershipPersistence;
461
462 fn set_view_retention(&mut self, view_retention: u64);
463 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
464 async fn reset(self) -> anyhow::Result<()>;
465}
466
467pub enum EventsPersistenceRead {
471 Complete,
472 UntilL1Block(u64),
473}
474
475#[async_trait]
476pub trait MembershipPersistence: Send + Sync + 'static {
478 async fn load_stake(
480 &self,
481 epoch: EpochNumber,
482 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>>;
483
484 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
486
487 async fn store_stake(
489 &self,
490 epoch: EpochNumber,
491 stake: ValidatorMap,
492 block_reward: Option<RewardAmount>,
493 stake_table_hash: Option<StakeTableHash>,
494 ) -> anyhow::Result<()>;
495
496 async fn store_events(
497 &self,
498 l1_finalized: u64,
499 events: Vec<(EventKey, StakeTableEvent)>,
500 ) -> anyhow::Result<()>;
501 async fn load_events(
502 &self,
503 l1_finalized: u64,
504 ) -> anyhow::Result<(
505 Option<EventsPersistenceRead>,
506 Vec<(EventKey, StakeTableEvent)>,
507 )>;
508}
509
510#[async_trait]
511pub trait SequencerPersistence:
512 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage
513{
514 fn into_catchup_provider(
516 self,
517 _backoff: BackoffParams,
518 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
519 bail!("state catchup is not implemented for this persistence type");
520 }
521
522 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
527
528 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
530
531 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
533
534 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
536
537 async fn load_quorum_proposals(
539 &self,
540 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
541
542 async fn load_quorum_proposal(
543 &self,
544 view: ViewNumber,
545 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
546
547 async fn load_vid_share(
548 &self,
549 view: ViewNumber,
550 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
551 async fn load_da_proposal(
552 &self,
553 view: ViewNumber,
554 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
555 async fn load_upgrade_certificate(
556 &self,
557 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
558 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
559 async fn load_state_cert(
560 &self,
561 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
562
563 async fn load_consensus_state<V: Versions>(
570 &self,
571 state: NodeState,
572 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
573 let genesis_validated_state = ValidatedState::genesis(&state).0;
574 let highest_voted_view = match self
575 .load_latest_acted_view()
576 .await
577 .context("loading last voted view")?
578 {
579 Some(view) => {
580 tracing::info!(?view, "starting with last actioned view");
581 view
582 },
583 None => {
584 tracing::info!("no saved view, starting from genesis");
585 ViewNumber::genesis()
586 },
587 };
588
589 let restart_view = match self
590 .load_restart_view()
591 .await
592 .context("loading restart view")?
593 {
594 Some(view) => {
595 tracing::info!(?view, "starting from saved view");
596 view
597 },
598 None => {
599 tracing::info!("no saved view, starting from genesis");
600 ViewNumber::genesis()
601 },
602 };
603 let next_epoch_high_qc = self
604 .load_next_epoch_quorum_certificate()
605 .await
606 .context("loading next epoch qc")?;
607 let (leaf, mut high_qc, anchor_view) = match self
608 .load_anchor_leaf()
609 .await
610 .context("loading anchor leaf")?
611 {
612 Some((leaf, high_qc)) => {
613 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
614 ensure!(
615 leaf.view_number() == high_qc.view_number,
616 format!(
617 "loaded anchor leaf from view {}, but high QC is from view {}",
618 leaf.view_number(),
619 high_qc.view_number
620 )
621 );
622
623 let anchor_view = leaf.view_number();
624 (leaf, high_qc, Some(anchor_view))
625 },
626 None => {
627 tracing::info!("no saved leaf, starting from genesis leaf");
628 (
629 hotshot_types::data::Leaf2::genesis::<V>(&genesis_validated_state, &state)
630 .await,
631 QuorumCertificate2::genesis::<V>(&genesis_validated_state, &state).await,
632 None,
633 )
634 },
635 };
636
637 if let Some((extended_high_qc, _)) = self.load_eqc().await {
638 if extended_high_qc.view_number() > high_qc.view_number() {
639 high_qc = extended_high_qc
640 }
641 }
642
643 let validated_state = if leaf.block_header().height() == 0 {
644 genesis_validated_state
646 } else {
647 ValidatedState::from_header(leaf.block_header())
650 };
651
652 let restart_view = max(restart_view, leaf.view_number());
657 let epoch = genesis_epoch_from_version::<V, SeqTypes>();
659
660 let config = self.load_config().await.context("loading config")?;
661 let epoch_height = config
662 .as_ref()
663 .map(|c| c.config.epoch_height)
664 .unwrap_or_default();
665 let epoch_start_block = config
666 .as_ref()
667 .map(|c| c.config.epoch_start_block)
668 .unwrap_or_default();
669
670 let saved_proposals = self
671 .load_quorum_proposals()
672 .await
673 .context("loading saved proposals")?;
674
675 let upgrade_certificate = self
676 .load_upgrade_certificate()
677 .await
678 .context("loading upgrade certificate")?;
679
680 let start_epoch_info = self
681 .load_start_epoch_info()
682 .await
683 .context("loading start epoch info")?;
684
685 let state_cert = self
686 .load_state_cert()
687 .await
688 .context("loading light client state update certificate")?;
689
690 tracing::warn!(
691 ?leaf,
692 ?restart_view,
693 ?epoch,
694 ?high_qc,
695 ?validated_state,
696 ?state_cert,
697 "loaded consensus state"
698 );
699
700 Ok((
701 HotShotInitializer {
702 instance_state: state,
703 epoch_height,
704 epoch_start_block,
705 anchor_leaf: leaf,
706 anchor_state: Arc::new(validated_state),
707 anchor_state_delta: None,
708 start_view: restart_view,
709 start_epoch: epoch,
710 last_actioned_view: highest_voted_view,
711 saved_proposals,
712 high_qc,
713 next_epoch_high_qc,
714 decided_upgrade_certificate: upgrade_certificate,
715 undecided_leaves: Default::default(),
716 undecided_state: Default::default(),
717 saved_vid_shares: Default::default(), start_epoch_info,
719 state_cert,
720 },
721 anchor_view,
722 ))
723 }
724
725 async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
727 if let EventType::Decide { leaf_chain, qc, .. } = &event.event {
728 let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
729 return;
731 };
732
733 let chain = leaf_chain.iter().zip(
735 std::iter::once((**qc).clone())
737 .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())),
740 );
741
742 if let Err(err) = self
743 .append_decided_leaves(leaf.view_number(), chain, consumer)
744 .await
745 {
746 tracing::error!(
747 "failed to save decided leaves, chain may not be up to date: {err:#}"
748 );
749 return;
750 }
751 }
752 }
753
754 async fn append_decided_leaves(
780 &self,
781 decided_view: ViewNumber,
782 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
783 consumer: &(impl EventConsumer + 'static),
784 ) -> anyhow::Result<()>;
785
786 async fn load_anchor_leaf(
787 &self,
788 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
789 async fn append_vid(
790 &self,
791 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
792 ) -> anyhow::Result<()>;
793 async fn append_vid2(
795 &self,
796 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
797 ) -> anyhow::Result<()>;
798 async fn append_da(
799 &self,
800 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
801 vid_commit: VidCommitment,
802 ) -> anyhow::Result<()>;
803 async fn record_action(
804 &self,
805 view: ViewNumber,
806 epoch: Option<EpochNumber>,
807 action: HotShotAction,
808 ) -> anyhow::Result<()>;
809
810 async fn append_quorum_proposal2(
811 &self,
812 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
813 ) -> anyhow::Result<()>;
814
815 async fn store_eqc(
817 &self,
818 _high_qc: QuorumCertificate2<SeqTypes>,
819 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
820 ) -> anyhow::Result<()>;
821
822 async fn load_eqc(
824 &self,
825 ) -> Option<(
826 QuorumCertificate2<SeqTypes>,
827 NextEpochQuorumCertificate2<SeqTypes>,
828 )>;
829
830 async fn store_upgrade_certificate(
831 &self,
832 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
833 ) -> anyhow::Result<()>;
834 async fn migrate_consensus(&self) -> anyhow::Result<()> {
835 tracing::warn!("migrating consensus data...");
836
837 self.migrate_anchor_leaf().await?;
838 self.migrate_da_proposals().await?;
839 self.migrate_vid_shares().await?;
840 self.migrate_quorum_proposals().await?;
841 self.migrate_quorum_certificates().await?;
842
843 tracing::warn!("consensus storage has been migrated to new types");
844
845 Ok(())
846 }
847
848 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
849 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
850 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
851 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
852 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
853
854 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
855 match self.load_anchor_leaf().await? {
856 Some((leaf, _)) => Ok(leaf.view_number()),
857 None => Ok(ViewNumber::genesis()),
858 }
859 }
860
861 async fn store_next_epoch_quorum_certificate(
862 &self,
863 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
864 ) -> anyhow::Result<()>;
865
866 async fn load_next_epoch_quorum_certificate(
867 &self,
868 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
869
870 async fn append_da2(
871 &self,
872 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
873 vid_commit: VidCommitment,
874 ) -> anyhow::Result<()>;
875
876 async fn append_proposal2(
877 &self,
878 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
879 ) -> anyhow::Result<()> {
880 self.append_quorum_proposal2(proposal).await
881 }
882
883 async fn store_drb_result(
884 &self,
885 epoch: <SeqTypes as NodeType>::Epoch,
886 drb_result: DrbResult,
887 ) -> anyhow::Result<()>;
888 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
889 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
890 async fn store_epoch_root(
891 &self,
892 epoch: <SeqTypes as NodeType>::Epoch,
893 block_header: <SeqTypes as NodeType>::BlockHeader,
894 ) -> anyhow::Result<()>;
895 async fn add_state_cert(
896 &self,
897 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
898 ) -> anyhow::Result<()>;
899
900 fn enable_metrics(&mut self, metrics: &dyn Metrics);
901}
902
903#[async_trait]
904pub trait EventConsumer: Debug + Send + Sync {
905 async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
906}
907
908#[async_trait]
909impl<T> EventConsumer for Box<T>
910where
911 T: EventConsumer + ?Sized,
912{
913 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
914 (**self).handle_event(event).await
915 }
916}
917
918#[derive(Clone, Copy, Debug)]
919pub struct NullEventConsumer;
920
921#[async_trait]
922impl EventConsumer for NullEventConsumer {
923 async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
924 Ok(())
925 }
926}
927
928#[async_trait]
929impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
930 async fn append_vid(
931 &self,
932 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
933 ) -> anyhow::Result<()> {
934 (**self).append_vid(proposal).await
935 }
936
937 async fn append_vid2(
938 &self,
939 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
940 ) -> anyhow::Result<()> {
941 (**self).append_vid2(proposal).await
942 }
943
944 async fn append_da(
945 &self,
946 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
947 vid_commit: VidCommitment,
948 ) -> anyhow::Result<()> {
949 (**self).append_da(proposal, vid_commit).await
950 }
951
952 async fn append_da2(
953 &self,
954 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
955 vid_commit: VidCommitment,
956 ) -> anyhow::Result<()> {
957 (**self).append_da2(proposal, vid_commit).await
958 }
959
960 async fn record_action(
961 &self,
962 view: ViewNumber,
963 epoch: Option<EpochNumber>,
964 action: HotShotAction,
965 ) -> anyhow::Result<()> {
966 (**self).record_action(view, epoch, action).await
967 }
968
969 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
970 Ok(())
971 }
972
973 async fn append_proposal(
974 &self,
975 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
976 ) -> anyhow::Result<()> {
977 (**self)
978 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
979 .await
980 }
981
982 async fn append_proposal2(
983 &self,
984 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
985 ) -> anyhow::Result<()> {
986 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
987 convert_proposal(proposal.clone());
988 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
989 }
990
991 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
992 Ok(())
993 }
994
995 async fn update_eqc(
997 &self,
998 high_qc: QuorumCertificate2<SeqTypes>,
999 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1000 ) -> anyhow::Result<()> {
1001 if let Some((existing_high_qc, _)) = (**self).load_eqc().await {
1002 if high_qc.view_number() < existing_high_qc.view_number() {
1003 return Ok(());
1004 }
1005 }
1006
1007 (**self).store_eqc(high_qc, next_epoch_high_qc).await
1008 }
1009
1010 async fn update_next_epoch_high_qc2(
1011 &self,
1012 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1013 ) -> anyhow::Result<()> {
1014 Ok(())
1015 }
1016
1017 async fn update_decided_upgrade_certificate(
1018 &self,
1019 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1020 ) -> anyhow::Result<()> {
1021 (**self)
1022 .store_upgrade_certificate(decided_upgrade_certificate)
1023 .await
1024 }
1025
1026 async fn store_drb_result(
1027 &self,
1028 epoch: <SeqTypes as NodeType>::Epoch,
1029 drb_result: DrbResult,
1030 ) -> anyhow::Result<()> {
1031 (**self).store_drb_result(epoch, drb_result).await
1032 }
1033
1034 async fn store_epoch_root(
1035 &self,
1036 epoch: <SeqTypes as NodeType>::Epoch,
1037 block_header: <SeqTypes as NodeType>::BlockHeader,
1038 ) -> anyhow::Result<()> {
1039 (**self).store_epoch_root(epoch, block_header).await
1040 }
1041
1042 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1043 (**self).store_drb_input(drb_input).await
1044 }
1045
1046 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1047 (**self).load_drb_input(epoch).await
1048 }
1049
1050 async fn update_state_cert(
1051 &self,
1052 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1053 ) -> anyhow::Result<()> {
1054 (**self).add_state_cert(state_cert).await
1055 }
1056}
1057
1058pub trait FromNsPayloadBytes<'a> {
1063 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1065}
1066
1067pub trait NsPayloadBytesRange<'a> {
1072 type Output: FromNsPayloadBytes<'a>;
1073
1074 fn ns_payload_range(&self) -> Range<usize>;
1076}
1077
1078pub trait FromStringOrInteger: Sized {
1089 type Binary: Serialize + DeserializeOwned;
1090 type Integer: Serialize + DeserializeOwned;
1091
1092 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1093 fn from_string(s: String) -> anyhow::Result<Self>;
1094 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1095
1096 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1097 fn to_string(&self) -> anyhow::Result<String>;
1098}