1use anyhow::Context;
12use async_trait::async_trait;
13use espresso_types::v0_3::ChainConfig;
14
15pub mod fs;
16pub mod no_storage;
17mod persistence_metrics;
18pub mod sql;
19
20fn migrate_network_config(
22 mut network_config: serde_json::Value,
23) -> anyhow::Result<serde_json::Value> {
24 let config = network_config
25 .get_mut("config")
26 .context("missing field `config`")?
27 .as_object_mut()
28 .context("`config` must be an object")?;
29
30 if !config.contains_key("builder_urls") {
31 let url = config
36 .remove("builder_url")
37 .context("missing field `builder_url`")?;
38 config.insert("builder_urls".into(), vec![url].into());
39 }
40
41 if !config.contains_key("start_proposing_view") {
47 config.insert("start_proposing_view".into(), 9007199254740991u64.into());
48 }
49 if !config.contains_key("stop_proposing_view") {
50 config.insert("stop_proposing_view".into(), 0.into());
51 }
52 if !config.contains_key("start_voting_view") {
53 config.insert("start_voting_view".into(), 9007199254740991u64.into());
54 }
55 if !config.contains_key("stop_voting_view") {
56 config.insert("stop_voting_view".into(), 0.into());
57 }
58 if !config.contains_key("start_proposing_time") {
59 config.insert("start_proposing_time".into(), 9007199254740991u64.into());
60 }
61 if !config.contains_key("stop_proposing_time") {
62 config.insert("stop_proposing_time".into(), 0.into());
63 }
64 if !config.contains_key("start_voting_time") {
65 config.insert("start_voting_time".into(), 9007199254740991u64.into());
66 }
67 if !config.contains_key("stop_voting_time") {
68 config.insert("stop_voting_time".into(), 0.into());
69 }
70
71 if !config.contains_key("epoch_height") {
74 config.insert("epoch_height".into(), 0.into());
75 }
76
77 if !config.contains_key("drb_difficulty") {
80 config.insert("drb_difficulty".into(), 0.into());
81 }
82 if !config.contains_key("drb_upgrade_difficulty") {
83 config.insert("drb_upgrade_difficulty".into(), 0.into());
84 }
85
86 if !config.contains_key("da_committees") {
88 config.insert("da_committees".into(), serde_json::json!([]));
89 }
90
91 Ok(network_config)
92}
93
94#[async_trait]
95pub trait ChainConfigPersistence: Sized + Send + Sync {
96 async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()>;
97}
98
99#[cfg(test)]
100mod tests {
101 use std::{cmp::max, collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
102
103 use alloy::{
104 network::EthereumWallet,
105 primitives::{Address, U256},
106 providers::{ext::AnvilApi, Provider, ProviderBuilder},
107 };
108 use anyhow::bail;
109 use async_lock::{Mutex, RwLock};
110 use async_trait::async_trait;
111 use committable::{Commitment, Committable};
112 use espresso_contract_deployer::{
113 builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
114 Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
115 };
116 use espresso_types::{
117 traits::{
118 EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer,
119 PersistenceOptions, SequencerPersistence,
120 },
121 v0_3::{Fetcher, Validator},
122 Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes,
123 SequencerVersions, ValidatedState,
124 };
125 use futures::{future::join_all, StreamExt, TryStreamExt};
126 use hotshot::{
127 types::{BLSPubKey, SignatureKey},
128 InitializerEpochInfo,
129 };
130 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
131 use hotshot_example_types::node_types::TestVersions;
132 use hotshot_query_service::{availability::BlockQueryData, testing::mocks::MockVersions};
133 use hotshot_types::{
134 data::{
135 ns_table::parse_ns_table, vid_commitment, vid_disperse::AvidMDisperseShare,
136 DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
137 ViewNumber,
138 },
139 event::{EventType, HotShotAction, LeafInfo},
140 light_client::StateKeyPair,
141 message::{convert_proposal, Proposal, UpgradeLock},
142 simple_certificate::{
143 CertificatePair, NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2,
144 UpgradeCertificate,
145 },
146 simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData},
147 traits::{
148 block_contents::BlockHeader,
149 node_implementation::{ConsensusTime, Versions},
150 EncodeBytes,
151 },
152 utils::EpochTransitionIndicator,
153 vid::avidm::{init_avidm_param, AvidMScheme},
154 vote::HasViewNumber,
155 };
156 use indexmap::IndexMap;
157 use portpicker::pick_unused_port;
158 use staking_cli::demo::{DelegationConfig, StakingTransactions};
159 use surf_disco::Client;
160 use tide_disco::error::ServerError;
161 use tokio::{spawn, time::sleep};
162 use vbs::version::{StaticVersion, StaticVersionType, Version};
163
164 use crate::{
165 api::{
166 test_helpers::{TestNetwork, TestNetworkConfigBuilder, STAKE_TABLE_CAPACITY_FOR_TEST},
167 Options,
168 },
169 catchup::NullStateCatchup,
170 testing::{staking_priv_keys, TestConfigBuilder},
171 SequencerApiVersion, RECENT_STAKE_TABLES_LIMIT,
172 };
173
174 #[async_trait]
175 pub trait TestablePersistence: SequencerPersistence + MembershipPersistence {
176 type Storage: Sync;
177
178 async fn tmp_storage() -> Self::Storage;
179 fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self>;
180
181 async fn connect(storage: &Self::Storage) -> Self {
182 Self::options(storage).create().await.unwrap()
183 }
184 }
185
186 #[rstest_reuse::template]
187 #[rstest::rstest]
188 #[case(PhantomData::<crate::persistence::sql::Persistence>)]
189 #[case(PhantomData::<crate::persistence::fs::Persistence>)]
190 #[test_log::test(tokio::test(flavor = "multi_thread"))]
191 pub fn persistence_types<P: TestablePersistence>(#[case] _p: PhantomData<P>) {}
192
193 #[derive(Clone, Debug, Default)]
194 struct EventCollector {
195 events: Arc<RwLock<Vec<Event>>>,
196 }
197
198 impl EventCollector {
199 async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
200 self.events
201 .read()
202 .await
203 .iter()
204 .flat_map(|event| {
205 let EventType::Decide { leaf_chain, .. } = &event.event else {
206 panic!("expected decide event, got {event:?}");
207 };
208 leaf_chain.iter().cloned().rev()
209 })
210 .collect::<Vec<_>>()
211 }
212 }
213
214 #[async_trait]
215 impl EventConsumer for EventCollector {
216 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
217 self.events.write().await.push(event.clone());
218 Ok(())
219 }
220 }
221
222 #[derive(Clone, Copy, Debug)]
223 struct FailConsumer;
224
225 #[async_trait]
226 impl EventConsumer for FailConsumer {
227 async fn handle_event(&self, _: &Event) -> anyhow::Result<()> {
228 bail!("mock error injection");
229 }
230 }
231
232 #[rstest_reuse::apply(persistence_types)]
233 pub async fn test_voted_view<P: TestablePersistence>(_p: PhantomData<P>) {
234 let tmp = P::tmp_storage().await;
235 let storage = P::connect(&tmp).await;
236
237 assert_eq!(storage.load_latest_acted_view().await.unwrap(), None);
239
240 let view1 = ViewNumber::genesis();
242 storage
243 .record_action(view1, None, HotShotAction::Vote)
244 .await
245 .unwrap();
246 assert_eq!(
247 storage.load_latest_acted_view().await.unwrap().unwrap(),
248 view1
249 );
250
251 let view2 = view1 + 1;
253 storage
254 .record_action(view2, None, HotShotAction::Vote)
255 .await
256 .unwrap();
257 assert_eq!(
258 storage.load_latest_acted_view().await.unwrap().unwrap(),
259 view2
260 );
261
262 storage
264 .record_action(view1, None, HotShotAction::Vote)
265 .await
266 .unwrap();
267 assert_eq!(
268 storage.load_latest_acted_view().await.unwrap().unwrap(),
269 view2
270 );
271 }
272
273 #[rstest_reuse::apply(persistence_types)]
274 pub async fn test_restart_view<P: TestablePersistence>(_p: PhantomData<P>) {
275 let tmp = P::tmp_storage().await;
276 let storage = P::connect(&tmp).await;
277
278 assert_eq!(storage.load_restart_view().await.unwrap(), None);
280
281 let view1 = ViewNumber::genesis();
283 storage
284 .record_action(view1, None, HotShotAction::Vote)
285 .await
286 .unwrap();
287 assert_eq!(
288 storage.load_restart_view().await.unwrap().unwrap(),
289 view1 + 1
290 );
291
292 let view2 = view1 + 1;
294 storage
295 .record_action(view2, None, HotShotAction::Vote)
296 .await
297 .unwrap();
298 assert_eq!(
299 storage.load_restart_view().await.unwrap().unwrap(),
300 view2 + 1
301 );
302
303 storage
305 .record_action(view1, None, HotShotAction::Vote)
306 .await
307 .unwrap();
308 assert_eq!(
309 storage.load_restart_view().await.unwrap().unwrap(),
310 view2 + 1
311 );
312
313 storage
315 .record_action(view2 + 1, None, HotShotAction::Propose)
316 .await
317 .unwrap();
318 assert_eq!(
319 storage.load_restart_view().await.unwrap().unwrap(),
320 view2 + 1
321 );
322
323 storage
325 .record_action(view2 + 1, None, HotShotAction::TimeoutVote)
326 .await
327 .unwrap();
328 assert_eq!(
329 storage.load_restart_view().await.unwrap().unwrap(),
330 view2 + 1
331 );
332 }
333
334 #[rstest_reuse::apply(persistence_types)]
335 pub async fn test_store_drb_input<P: TestablePersistence>(_p: PhantomData<P>) {
336 use hotshot_types::drb::DrbInput;
337
338 let tmp = P::tmp_storage().await;
339 let storage = P::connect(&tmp).await;
340 let difficulty_level = 10;
341
342 if storage.load_drb_input(10).await.is_ok() {
344 panic!("unexpected nonempty drb_input");
345 }
346
347 let drb_input_1 = DrbInput {
348 epoch: 10,
349 iteration: 10,
350 value: [0u8; 32],
351 difficulty_level,
352 };
353
354 let drb_input_2 = DrbInput {
355 epoch: 10,
356 iteration: 20,
357 value: [0u8; 32],
358 difficulty_level,
359 };
360
361 let drb_input_3 = DrbInput {
362 epoch: 10,
363 iteration: 30,
364 value: [0u8; 32],
365 difficulty_level,
366 };
367
368 let _ = storage.store_drb_input(drb_input_1.clone()).await;
369
370 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_1);
371
372 let _ = storage.store_drb_input(drb_input_3.clone()).await;
373
374 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
376
377 let _ = storage.store_drb_input(drb_input_2.clone()).await;
378
379 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
381 }
382
383 #[rstest_reuse::apply(persistence_types)]
384 pub async fn test_epoch_info<P: TestablePersistence>(_p: PhantomData<P>) {
385 let tmp = P::tmp_storage().await;
386 let storage = P::connect(&tmp).await;
387
388 assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
390
391 storage
393 .store_drb_result(EpochNumber::new(1), [1; 32])
394 .await
395 .unwrap();
396 assert_eq!(
397 storage.load_start_epoch_info().await.unwrap(),
398 vec![InitializerEpochInfo::<SeqTypes> {
399 epoch: EpochNumber::new(1),
400 drb_result: [1; 32],
401 block_header: None,
402 }]
403 );
404
405 storage
407 .store_drb_result(EpochNumber::new(2), [3; 32])
408 .await
409 .unwrap();
410 assert_eq!(
411 storage.load_start_epoch_info().await.unwrap(),
412 vec![
413 InitializerEpochInfo::<SeqTypes> {
414 epoch: EpochNumber::new(1),
415 drb_result: [1; 32],
416 block_header: None,
417 },
418 InitializerEpochInfo::<SeqTypes> {
419 epoch: EpochNumber::new(2),
420 drb_result: [3; 32],
421 block_header: None,
422 }
423 ]
424 );
425
426 let instance_state = NodeState::mock();
428 let validated_state = hotshot_types::traits::ValidatedState::genesis(&instance_state).0;
429 let leaf: Leaf2 = Leaf::genesis::<MockVersions>(&validated_state, &instance_state)
430 .await
431 .into();
432 let header = leaf.block_header().clone();
433
434 storage
436 .store_epoch_root(EpochNumber::new(1), header.clone())
437 .await
438 .unwrap();
439 assert_eq!(
440 storage.load_start_epoch_info().await.unwrap(),
441 vec![
442 InitializerEpochInfo::<SeqTypes> {
443 epoch: EpochNumber::new(1),
444 drb_result: [1; 32],
445 block_header: Some(header.clone()),
446 },
447 InitializerEpochInfo::<SeqTypes> {
448 epoch: EpochNumber::new(2),
449 drb_result: [3; 32],
450 block_header: None,
451 }
452 ]
453 );
454
455 let total_epochs = RECENT_STAKE_TABLES_LIMIT + 10;
457 for i in 0..total_epochs {
458 let epoch = EpochNumber::new(i);
459 let drb = [i as u8; 32];
460 storage
461 .store_drb_result(epoch, drb)
462 .await
463 .unwrap_or_else(|_| panic!("Failed to store DRB result for epoch {i}"));
464 }
465
466 let results = storage.load_start_epoch_info().await.unwrap();
467
468 assert_eq!(
470 results.len(),
471 RECENT_STAKE_TABLES_LIMIT as usize,
472 "Should return only the most recent {RECENT_STAKE_TABLES_LIMIT} epochs",
473 );
474
475 for (i, info) in results.iter().enumerate() {
476 let expected_epoch =
477 EpochNumber::new(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64);
478 let expected_drb = [(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64) as u8; 32];
479 assert_eq!(info.epoch, expected_epoch, "invalid epoch at index {i}",);
480 assert_eq!(info.drb_result, expected_drb, "invalid DRB at index {i}",);
481 assert!(info.block_header.is_none(), "Expected no block header");
482 }
483 }
484
485 fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
486 LeafInfo {
487 leaf,
488 vid_share: None,
489 state: Default::default(),
490 delta: None,
491 state_cert: None,
492 }
493 }
494
495 #[rstest_reuse::apply(persistence_types)]
496 pub async fn test_append_and_decide<P: TestablePersistence>(_p: PhantomData<P>) {
497 let tmp = P::tmp_storage().await;
498 let storage = P::connect(&tmp).await;
499
500 assert_eq!(
502 storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
503 None
504 );
505
506 let leaf: Leaf2 =
507 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
508 let leaf_payload = leaf.block_payload().unwrap();
509 let leaf_payload_bytes_arc = leaf_payload.encode();
510
511 let avidm_param = init_avidm_param(2).unwrap();
512 let weights = vec![1u32; 2];
513
514 let ns_table = parse_ns_table(
515 leaf_payload.byte_len().as_usize(),
516 &leaf_payload.ns_table().encode(),
517 );
518 let (payload_commitment, shares) =
519 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
520 .unwrap();
521
522 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
523 let signature = PubKey::sign(&privkey, &[]).unwrap();
524 let mut vid = AvidMDisperseShare::<SeqTypes> {
525 view_number: ViewNumber::new(0),
526 payload_commitment,
527 share: shares[0].clone(),
528 recipient_key: pubkey,
529 epoch: Some(EpochNumber::new(0)),
530 target_epoch: Some(EpochNumber::new(0)),
531 common: avidm_param,
532 };
533 let mut quorum_proposal = Proposal {
534 data: QuorumProposalWrapper::<SeqTypes> {
535 proposal: QuorumProposal2::<SeqTypes> {
536 epoch: None,
537 block_header: leaf.block_header().clone(),
538 view_number: ViewNumber::genesis(),
539 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
540 &ValidatedState::default(),
541 &NodeState::mock(),
542 )
543 .await,
544 upgrade_certificate: None,
545 view_change_evidence: None,
546 next_drb_result: None,
547 next_epoch_justify_qc: None,
548 state_cert: None,
549 },
550 },
551 signature,
552 _pd: Default::default(),
553 };
554
555 let vid_share0 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
556
557 storage.append_vid(&vid_share0).await.unwrap();
558
559 assert_eq!(
560 storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
561 Some(vid_share0.clone())
562 );
563
564 vid.view_number = ViewNumber::new(1);
565
566 let vid_share1 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
567 storage.append_vid(&vid_share1).await.unwrap();
568
569 assert_eq!(
570 storage.load_vid_share(vid.view_number()).await.unwrap(),
571 Some(vid_share1.clone())
572 );
573
574 vid.view_number = ViewNumber::new(2);
575
576 let vid_share2 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
577 storage.append_vid(&vid_share2).await.unwrap();
578
579 assert_eq!(
580 storage.load_vid_share(vid.view_number()).await.unwrap(),
581 Some(vid_share2.clone())
582 );
583
584 vid.view_number = ViewNumber::new(3);
585
586 let vid_share3 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
587 storage.append_vid(&vid_share3).await.unwrap();
588
589 assert_eq!(
590 storage.load_vid_share(vid.view_number()).await.unwrap(),
591 Some(vid_share3.clone())
592 );
593
594 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
595 .expect("Failed to sign block payload");
596
597 let da_proposal_inner = DaProposal2::<SeqTypes> {
598 encoded_transactions: leaf_payload_bytes_arc.clone(),
599 metadata: leaf_payload.ns_table().clone(),
600 view_number: ViewNumber::new(0),
601 epoch: None,
602 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
603 };
604
605 let da_proposal = Proposal {
606 data: da_proposal_inner,
607 signature: block_payload_signature,
608 _pd: Default::default(),
609 };
610
611 let vid_commitment = vid_commitment::<TestVersions>(
612 &leaf_payload_bytes_arc,
613 &leaf.block_header().metadata().encode(),
614 2,
615 <TestVersions as Versions>::Base::VERSION,
616 );
617
618 storage
619 .append_da2(&da_proposal, vid_commitment)
620 .await
621 .unwrap();
622
623 assert_eq!(
624 storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(),
625 Some(da_proposal.clone())
626 );
627
628 let mut da_proposal1 = da_proposal.clone();
629 da_proposal1.data.view_number = ViewNumber::new(1);
630 storage
631 .append_da2(&da_proposal1.clone(), vid_commitment)
632 .await
633 .unwrap();
634
635 assert_eq!(
636 storage
637 .load_da_proposal(da_proposal1.data.view_number)
638 .await
639 .unwrap(),
640 Some(da_proposal1.clone())
641 );
642
643 let mut da_proposal2 = da_proposal1.clone();
644 da_proposal2.data.view_number = ViewNumber::new(2);
645 storage
646 .append_da2(&da_proposal2.clone(), vid_commitment)
647 .await
648 .unwrap();
649
650 assert_eq!(
651 storage
652 .load_da_proposal(da_proposal2.data.view_number)
653 .await
654 .unwrap(),
655 Some(da_proposal2.clone())
656 );
657
658 let mut da_proposal3 = da_proposal2.clone();
659 da_proposal3.data.view_number = ViewNumber::new(3);
660 storage
661 .append_da2(&da_proposal3.clone(), vid_commitment)
662 .await
663 .unwrap();
664
665 assert_eq!(
666 storage
667 .load_da_proposal(da_proposal3.data.view_number)
668 .await
669 .unwrap(),
670 Some(da_proposal3.clone())
671 );
672
673 let quorum_proposal1 = quorum_proposal.clone();
674
675 storage
676 .append_quorum_proposal2(&quorum_proposal1)
677 .await
678 .unwrap();
679
680 assert_eq!(
681 storage.load_quorum_proposals().await.unwrap(),
682 BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())])
683 );
684
685 quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
686 let quorum_proposal2 = quorum_proposal.clone();
687 storage
688 .append_quorum_proposal2(&quorum_proposal2)
689 .await
690 .unwrap();
691
692 assert_eq!(
693 storage.load_quorum_proposals().await.unwrap(),
694 BTreeMap::from_iter([
695 (ViewNumber::genesis(), quorum_proposal1.clone()),
696 (ViewNumber::new(1), quorum_proposal2.clone())
697 ])
698 );
699
700 quorum_proposal.data.proposal.view_number = ViewNumber::new(2);
701 quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(1);
702 let quorum_proposal3 = quorum_proposal.clone();
703 storage
704 .append_quorum_proposal2(&quorum_proposal3)
705 .await
706 .unwrap();
707
708 assert_eq!(
709 storage.load_quorum_proposals().await.unwrap(),
710 BTreeMap::from_iter([
711 (ViewNumber::genesis(), quorum_proposal1.clone()),
712 (ViewNumber::new(1), quorum_proposal2.clone()),
713 (ViewNumber::new(2), quorum_proposal3.clone())
714 ])
715 );
716
717 quorum_proposal.data.proposal.view_number = ViewNumber::new(3);
718 quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(2);
719
720 let quorum_proposal4 = quorum_proposal.clone();
722 storage
723 .append_quorum_proposal2(&quorum_proposal4)
724 .await
725 .unwrap();
726
727 assert_eq!(
728 storage.load_quorum_proposals().await.unwrap(),
729 BTreeMap::from_iter([
730 (ViewNumber::genesis(), quorum_proposal1.clone()),
731 (ViewNumber::new(1), quorum_proposal2.clone()),
732 (ViewNumber::new(2), quorum_proposal3.clone()),
733 (ViewNumber::new(3), quorum_proposal4.clone())
734 ])
735 );
736
737 let leaves = [
740 Leaf2::from_quorum_proposal(&quorum_proposal1.data),
741 Leaf2::from_quorum_proposal(&quorum_proposal2.data),
742 Leaf2::from_quorum_proposal(&quorum_proposal3.data),
743 Leaf2::from_quorum_proposal(&quorum_proposal4.data),
744 ];
745 let mut final_qc = leaves[3].justify_qc();
746 final_qc.view_number += 1;
747 final_qc.data.leaf_commit = Committable::commit(&leaf);
748 let qcs = [
749 leaves[1].justify_qc(),
750 leaves[2].justify_qc(),
751 leaves[3].justify_qc(),
752 final_qc,
753 ];
754
755 assert_eq!(
756 storage.load_anchor_view().await.unwrap(),
757 ViewNumber::genesis()
758 );
759
760 let consumer = EventCollector::default();
761 let leaf_chain = leaves
762 .iter()
763 .take(3)
764 .map(|leaf| leaf_info(leaf.clone()))
765 .zip(&qcs)
766 .collect::<Vec<_>>();
767 tracing::info!(?leaf_chain, "decide view 2");
768 storage
769 .append_decided_leaves(
770 ViewNumber::new(2),
771 leaf_chain
772 .iter()
773 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change((*qc).clone()))),
774 None,
775 &consumer,
776 )
777 .await
778 .unwrap();
779 assert_eq!(
780 storage.load_anchor_view().await.unwrap(),
781 ViewNumber::new(2)
782 );
783
784 for i in 0..=2 {
785 assert_eq!(
786 storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(),
787 None
788 );
789
790 assert_eq!(
791 storage.load_vid_share(ViewNumber::new(i)).await.unwrap(),
792 None
793 );
794 }
795
796 assert_eq!(
797 storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
798 Some(da_proposal3)
799 );
800
801 assert_eq!(
802 storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
803 Some(convert_proposal(vid_share3.clone()))
804 );
805
806 let proposals = storage.load_quorum_proposals().await.unwrap();
807 assert_eq!(
808 proposals,
809 BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)])
810 );
811
812 for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
814 assert_eq!(info.leaf, *leaf);
815 let decided_vid_share = info.vid_share.as_ref().unwrap();
816 assert_eq!(decided_vid_share.view_number(), leaf.view_number());
817 }
818
819 assert_eq!(
821 storage.load_anchor_leaf().await.unwrap(),
822 Some((leaves[2].clone(), qcs[2].clone()))
823 );
824 assert_eq!(
825 storage.load_anchor_view().await.unwrap(),
826 leaves[2].view_number()
827 );
828
829 let consumer = EventCollector::default();
831 tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3");
832 storage
833 .append_decided_leaves(
834 ViewNumber::new(3),
835 vec![(
836 &leaf_info(leaves[3].clone()),
837 CertificatePair::non_epoch_change(qcs[3].clone()),
838 )],
839 None,
840 &consumer,
841 )
842 .await
843 .unwrap();
844 assert_eq!(
845 storage.load_anchor_view().await.unwrap(),
846 ViewNumber::new(3)
847 );
848
849 let events = consumer.events.read().await;
851 assert_eq!(events.len(), 1);
852 assert_eq!(events[0].view_number, ViewNumber::new(3));
853 let EventType::Decide {
854 committing_qc,
855 leaf_chain,
856 ..
857 } = &events[0].event
858 else {
859 panic!("expected decide event, got {:?}", events[0]);
860 };
861 assert_eq!(*committing_qc.qc(), qcs[3]);
862 assert_eq!(leaf_chain.len(), 1);
863 let info = &leaf_chain[0];
864 assert_eq!(info.leaf, leaves[3]);
865
866 assert_eq!(
868 storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
869 None
870 );
871
872 assert_eq!(
873 storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
874 None
875 );
876 assert_eq!(
877 storage.load_quorum_proposals().await.unwrap(),
878 BTreeMap::new()
879 );
880 }
881
882 #[rstest_reuse::apply(persistence_types)]
883 pub async fn test_upgrade_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
884 let tmp = P::tmp_storage().await;
885 let storage = P::connect(&tmp).await;
886
887 assert_eq!(storage.load_upgrade_certificate().await.unwrap(), None);
889
890 let upgrade_data = UpgradeProposalData {
891 old_version: Version { major: 0, minor: 1 },
892 new_version: Version { major: 1, minor: 0 },
893 decide_by: ViewNumber::genesis(),
894 new_version_hash: Default::default(),
895 old_version_last_view: ViewNumber::genesis(),
896 new_version_first_view: ViewNumber::genesis(),
897 };
898
899 let decide_upgrade_certificate = UpgradeCertificate::<SeqTypes>::new(
900 upgrade_data.clone(),
901 upgrade_data.commit(),
902 ViewNumber::genesis(),
903 Default::default(),
904 Default::default(),
905 );
906 let res = storage
907 .store_upgrade_certificate(Some(decide_upgrade_certificate.clone()))
908 .await;
909 assert!(res.is_ok());
910
911 let res = storage.load_upgrade_certificate().await.unwrap();
912 let view_number = res.unwrap().view_number;
913 assert_eq!(view_number, ViewNumber::genesis());
914
915 let new_view_number_for_certificate = ViewNumber::new(50);
916 let mut new_upgrade_certificate = decide_upgrade_certificate.clone();
917 new_upgrade_certificate.view_number = new_view_number_for_certificate;
918
919 let res = storage
920 .store_upgrade_certificate(Some(new_upgrade_certificate.clone()))
921 .await;
922 assert!(res.is_ok());
923
924 let res = storage.load_upgrade_certificate().await.unwrap();
925 let view_number = res.unwrap().view_number;
926 assert_eq!(view_number, new_view_number_for_certificate);
927 }
928
929 #[rstest_reuse::apply(persistence_types)]
930 pub async fn test_next_epoch_quorum_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
931 let tmp = P::tmp_storage().await;
932 let storage = P::connect(&tmp).await;
933
934 assert_eq!(
936 storage.load_next_epoch_quorum_certificate().await.unwrap(),
937 None
938 );
939
940 let upgrade_lock = UpgradeLock::<SeqTypes, TestVersions>::new();
941
942 let genesis_view = ViewNumber::genesis();
943
944 let leaf =
945 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::default()).await;
946 let data: NextEpochQuorumData2<SeqTypes> = QuorumData2 {
947 leaf_commit: leaf.commit(),
948 epoch: Some(EpochNumber::new(1)),
949 block_number: Some(leaf.height()),
950 }
951 .into();
952
953 let versioned_data =
954 VersionedVoteData::new_infallible(data.clone(), genesis_view, &upgrade_lock).await;
955
956 let bytes: [u8; 32] = versioned_data.commit().into();
957
958 let next_epoch_qc = NextEpochQuorumCertificate2::new(
959 data,
960 Commitment::from_raw(bytes),
961 genesis_view,
962 None,
963 PhantomData,
964 );
965
966 let res = storage
967 .store_next_epoch_quorum_certificate(next_epoch_qc.clone())
968 .await;
969 assert!(res.is_ok());
970
971 let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
972 let view_number = res.unwrap().view_number;
973 assert_eq!(view_number, ViewNumber::genesis());
974
975 let new_view_number_for_qc = ViewNumber::new(50);
976 let mut new_qc = next_epoch_qc.clone();
977 new_qc.view_number = new_view_number_for_qc;
978
979 let res = storage
980 .store_next_epoch_quorum_certificate(new_qc.clone())
981 .await;
982 assert!(res.is_ok());
983
984 let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
985 let view_number = res.unwrap().view_number;
986 assert_eq!(view_number, new_view_number_for_qc);
987 }
988
989 #[rstest_reuse::apply(persistence_types)]
990 pub async fn test_decide_with_failing_event_consumer<P: TestablePersistence>(
991 _p: PhantomData<P>,
992 ) {
993 let tmp = P::tmp_storage().await;
994 let storage = P::connect(&tmp).await;
995
996 let mut chain = vec![];
998
999 let leaf: Leaf2 =
1000 Leaf::genesis::<MockVersions>(&ValidatedState::default(), &NodeState::mock())
1001 .await
1002 .into();
1003 let leaf_payload = leaf.block_payload().unwrap();
1004 let leaf_payload_bytes_arc = leaf_payload.encode();
1005 let avidm_param = init_avidm_param(2).unwrap();
1006 let weights = vec![1u32; 2];
1007 let ns_table = parse_ns_table(
1008 leaf_payload.byte_len().as_usize(),
1009 &leaf_payload.ns_table().encode(),
1010 );
1011 let (payload_commitment, shares) =
1012 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1013 .unwrap();
1014
1015 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1016 let mut vid = AvidMDisperseShare::<SeqTypes> {
1017 view_number: ViewNumber::new(0),
1018 payload_commitment,
1019 share: shares[0].clone(),
1020 recipient_key: pubkey,
1021 epoch: Some(EpochNumber::new(0)),
1022 target_epoch: Some(EpochNumber::new(0)),
1023 common: avidm_param,
1024 }
1025 .to_proposal(&privkey)
1026 .unwrap()
1027 .clone();
1028 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1029 proposal: QuorumProposal2::<SeqTypes> {
1030 block_header: leaf.block_header().clone(),
1031 view_number: ViewNumber::genesis(),
1032 justify_qc: QuorumCertificate::genesis::<TestVersions>(
1033 &ValidatedState::default(),
1034 &NodeState::mock(),
1035 )
1036 .await
1037 .to_qc2(),
1038 upgrade_certificate: None,
1039 view_change_evidence: None,
1040 next_drb_result: None,
1041 next_epoch_justify_qc: None,
1042 epoch: None,
1043 state_cert: None,
1044 },
1045 };
1046 let mut qc = QuorumCertificate2::genesis::<TestVersions>(
1047 &ValidatedState::default(),
1048 &NodeState::mock(),
1049 )
1050 .await;
1051
1052 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1053 .expect("Failed to sign block payload");
1054 let mut da_proposal = Proposal {
1055 data: DaProposal2::<SeqTypes> {
1056 encoded_transactions: leaf_payload_bytes_arc.clone(),
1057 metadata: leaf_payload.ns_table().clone(),
1058 view_number: ViewNumber::new(0),
1059 epoch: Some(EpochNumber::new(0)),
1060 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1061 },
1062 signature: block_payload_signature,
1063 _pd: Default::default(),
1064 };
1065
1066 let vid_commitment = vid_commitment::<TestVersions>(
1067 &leaf_payload_bytes_arc,
1068 &leaf.block_header().metadata().encode(),
1069 2,
1070 <TestVersions as Versions>::Base::VERSION,
1071 );
1072
1073 for i in 0..4 {
1074 quorum_proposal.proposal.view_number = ViewNumber::new(i);
1075 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
1076 qc.view_number = leaf.view_number();
1077 qc.data.leaf_commit = Committable::commit(&leaf);
1078 vid.data.view_number = leaf.view_number();
1079 da_proposal.data.view_number = leaf.view_number();
1080 chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone()));
1081 }
1082
1083 for (_, _, vid, da) in &chain {
1085 tracing::info!(?da, ?vid, "insert proposal");
1086 storage.append_da2(da, vid_commitment).await.unwrap();
1087 storage
1088 .append_vid(&convert_proposal(vid.clone()))
1089 .await
1090 .unwrap();
1091 }
1092
1093 let leaf_chain = chain
1095 .iter()
1096 .take(2)
1097 .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1098 .collect::<Vec<_>>();
1099 tracing::info!("decide with event handling failure");
1100 storage
1101 .append_decided_leaves(
1102 ViewNumber::new(1),
1103 leaf_chain
1104 .iter()
1105 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1106 None,
1107 &FailConsumer,
1108 )
1109 .await
1110 .unwrap();
1111 for i in 0..4 {
1113 tracing::info!(i, "check proposal availability");
1114 assert!(storage
1115 .load_vid_share(ViewNumber::new(i))
1116 .await
1117 .unwrap()
1118 .is_some());
1119 assert!(storage
1120 .load_da_proposal(ViewNumber::new(i))
1121 .await
1122 .unwrap()
1123 .is_some());
1124 }
1125 tracing::info!("check anchor leaf updated");
1126 assert_eq!(
1127 storage
1128 .load_anchor_leaf()
1129 .await
1130 .unwrap()
1131 .unwrap()
1132 .0
1133 .view_number(),
1134 ViewNumber::new(1)
1135 );
1136 assert_eq!(
1137 storage.load_anchor_view().await.unwrap(),
1138 ViewNumber::new(1)
1139 );
1140
1141 let consumer = EventCollector::default();
1144 let leaf_chain = chain
1145 .iter()
1146 .skip(2)
1147 .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1148 .collect::<Vec<_>>();
1149 tracing::info!("decide successfully");
1150 storage
1151 .append_decided_leaves(
1152 ViewNumber::new(3),
1153 leaf_chain
1154 .iter()
1155 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1156 None,
1157 &consumer,
1158 )
1159 .await
1160 .unwrap();
1161 for i in 0..4 {
1163 tracing::info!(i, "check proposal garbage collected");
1164 assert!(storage
1165 .load_vid_share(ViewNumber::new(i))
1166 .await
1167 .unwrap()
1168 .is_none());
1169 assert!(storage
1170 .load_da_proposal(ViewNumber::new(i))
1171 .await
1172 .unwrap()
1173 .is_none());
1174 }
1175 tracing::info!("check anchor leaf updated");
1176 assert_eq!(
1177 storage
1178 .load_anchor_leaf()
1179 .await
1180 .unwrap()
1181 .unwrap()
1182 .0
1183 .view_number(),
1184 ViewNumber::new(3)
1185 );
1186 assert_eq!(
1187 storage.load_anchor_view().await.unwrap(),
1188 ViewNumber::new(3)
1189 );
1190
1191 tracing::info!("check decide event");
1193 let leaf_chain = consumer.leaf_chain().await;
1194 assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
1195 for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) {
1196 assert_eq!(info.leaf, *leaf);
1197 let decided_vid_share = info.vid_share.as_ref().unwrap();
1198 assert_eq!(decided_vid_share.view_number(), leaf.view_number());
1199 assert!(info.leaf.block_payload().is_some());
1200 }
1201 }
1202
1203 #[rstest_reuse::apply(persistence_types)]
1204 pub async fn test_pruning<P: TestablePersistence>(_p: PhantomData<P>) {
1205 let tmp = P::tmp_storage().await;
1206
1207 let mut options = P::options(&tmp);
1208 options.set_view_retention(1);
1209 let storage = options.create().await.unwrap();
1210
1211 let leaf =
1213 Leaf::genesis::<MockVersions>(&ValidatedState::default(), &NodeState::mock()).await;
1214 let leaf_payload = leaf.block_payload().unwrap();
1215 let leaf_payload_bytes_arc = leaf_payload.encode();
1216 let avidm_param = init_avidm_param(2).unwrap();
1217 let weights = vec![1u32; 2];
1218
1219 let ns_table = parse_ns_table(
1220 leaf_payload.byte_len().as_usize(),
1221 &leaf_payload.ns_table().encode(),
1222 );
1223 let (payload_commitment, shares) =
1224 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1225 .unwrap();
1226
1227 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1228 let vid_share = convert_proposal(
1229 AvidMDisperseShare::<SeqTypes> {
1230 view_number: ViewNumber::new(0),
1231 payload_commitment,
1232 share: shares[0].clone(),
1233 recipient_key: pubkey,
1234 epoch: None,
1235 target_epoch: None,
1236 common: avidm_param,
1237 }
1238 .to_proposal(&privkey)
1239 .unwrap()
1240 .clone(),
1241 );
1242
1243 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1244 proposal: QuorumProposal2::<SeqTypes> {
1245 block_header: leaf.block_header().clone(),
1246 view_number: ViewNumber::genesis(),
1247 justify_qc: QuorumCertificate::genesis::<TestVersions>(
1248 &ValidatedState::default(),
1249 &NodeState::mock(),
1250 )
1251 .await
1252 .to_qc2(),
1253 upgrade_certificate: None,
1254 view_change_evidence: None,
1255 next_drb_result: None,
1256 next_epoch_justify_qc: None,
1257 epoch: None,
1258 state_cert: None,
1259 },
1260 };
1261 let quorum_proposal_signature =
1262 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1263 .expect("Failed to sign quorum proposal");
1264 let quorum_proposal = Proposal {
1265 data: quorum_proposal,
1266 signature: quorum_proposal_signature,
1267 _pd: Default::default(),
1268 };
1269
1270 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1271 .expect("Failed to sign block payload");
1272 let da_proposal = Proposal {
1273 data: DaProposal2::<SeqTypes> {
1274 encoded_transactions: leaf_payload_bytes_arc,
1275 metadata: leaf_payload.ns_table().clone(),
1276 view_number: ViewNumber::new(0),
1277 epoch: None,
1278 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1279 },
1280 signature: block_payload_signature,
1281 _pd: Default::default(),
1282 };
1283
1284 storage
1285 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1286 .await
1287 .unwrap();
1288 storage.append_vid(&vid_share).await.unwrap();
1289 storage
1290 .append_quorum_proposal2(&quorum_proposal)
1291 .await
1292 .unwrap();
1293
1294 storage
1296 .append_decided_leaves(ViewNumber::new(1), [], None, &NullEventConsumer)
1297 .await
1298 .unwrap();
1299
1300 assert_eq!(
1303 storage
1304 .load_da_proposal(ViewNumber::new(0))
1305 .await
1306 .unwrap()
1307 .unwrap(),
1308 da_proposal
1309 );
1310 assert_eq!(
1311 storage
1312 .load_vid_share(ViewNumber::new(0))
1313 .await
1314 .unwrap()
1315 .unwrap(),
1316 vid_share
1317 );
1318 assert_eq!(
1319 storage
1320 .load_quorum_proposal(ViewNumber::new(0))
1321 .await
1322 .unwrap(),
1323 quorum_proposal
1324 );
1325
1326 storage
1328 .append_decided_leaves(ViewNumber::new(2), [], None, &NullEventConsumer)
1329 .await
1330 .unwrap();
1331 assert!(storage
1332 .load_da_proposal(ViewNumber::new(0))
1333 .await
1334 .unwrap()
1335 .is_none());
1336 assert!(storage
1337 .load_vid_share(ViewNumber::new(0))
1338 .await
1339 .unwrap()
1340 .is_none());
1341 assert!(storage
1342 .load_quorum_proposal(ViewNumber::new(0))
1343 .await
1344 .is_err());
1345 }
1346
1347 async fn assert_events_eq<P: TestablePersistence>(
1348 persistence: &P,
1349 block: u64,
1350 stake_table_fetcher: &Fetcher,
1351 l1_client: &L1Client,
1352 stake_table_contract: Address,
1353 ) -> anyhow::Result<()> {
1354 let (stored_l1, events) = persistence.load_events(0, block).await?;
1356 assert!(!events.is_empty());
1357 assert!(stored_l1.is_some());
1358 assert!(events.iter().all(|((l1_block, _), _)| *l1_block <= block));
1359 let contract_events = Fetcher::fetch_events_from_contract(
1361 l1_client.clone(),
1362 stake_table_contract,
1363 None,
1364 block,
1365 )
1366 .await?;
1367 assert_eq!(
1368 contract_events, events,
1369 "Events from contract and persistence do not match"
1370 );
1371
1372 let fetched_events = stake_table_fetcher
1374 .fetch_and_store_stake_table_events(stake_table_contract, block)
1375 .await?;
1376 assert_eq!(fetched_events, events);
1377
1378 Ok(())
1379 }
1380
1381 #[rstest_reuse::apply(persistence_types)]
1384 pub async fn test_stake_table_fetching_from_persistence<P: TestablePersistence>(
1385 #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1386 stake_table_version: StakeTableContractVersion,
1387 _p: PhantomData<P>,
1388 ) -> anyhow::Result<()> {
1389 let epoch_height = 20;
1390 type PosVersion = SequencerVersions<StaticVersion<0, 3>, StaticVersion<0, 0>>;
1391
1392 let network_config = TestConfigBuilder::default()
1393 .epoch_height(epoch_height)
1394 .build();
1395
1396 let anvil_provider = network_config.anvil().unwrap();
1397
1398 let query_service_port = pick_unused_port().expect("No ports free for query service");
1399 let query_api_options = Options::with_port(query_service_port);
1400
1401 const NODE_COUNT: usize = 2;
1402
1403 let storage = join_all((0..NODE_COUNT).map(|_| P::tmp_storage())).await;
1404 let persistence_options: [_; NODE_COUNT] = storage
1405 .iter()
1406 .map(P::options)
1407 .collect::<Vec<_>>()
1408 .try_into()
1409 .unwrap();
1410
1411 let persistence = persistence_options[0].clone().create().await.unwrap();
1412
1413 let l1_url = network_config.l1_url();
1415
1416 let testnet_config = TestNetworkConfigBuilder::with_num_nodes()
1417 .api_config(query_api_options)
1418 .network_config(network_config.clone())
1419 .persistences(persistence_options.clone())
1420 .pos_hook::<PosVersion>(DelegationConfig::MultipleDelegators, stake_table_version)
1421 .await
1422 .expect("Pos deployment failed")
1423 .build();
1424
1425 let test_network = TestNetwork::new(testnet_config, PosVersion::new()).await;
1427
1428 let client: Client<ServerError, SequencerApiVersion> = Client::new(
1429 format!("http://localhost:{query_service_port}")
1430 .parse()
1431 .unwrap(),
1432 );
1433 client.connect(None).await;
1434 tracing::info!(query_service_port, "server running");
1435
1436 let _initial_blocks = client
1438 .socket("availability/stream/blocks/0")
1439 .subscribe::<BlockQueryData<SeqTypes>>()
1440 .await
1441 .unwrap()
1442 .take(40)
1443 .try_collect::<Vec<_>>()
1444 .await
1445 .unwrap();
1446 let membership_coordinator = test_network
1448 .server
1449 .consensus()
1450 .read()
1451 .await
1452 .membership_coordinator
1453 .clone();
1454
1455 let l1_client = L1Client::new(vec![l1_url]).unwrap();
1456 let node_state = test_network.server.node_state();
1457 let chain_config = node_state.chain_config;
1458 let stake_table_contract = chain_config.stake_table_contract.unwrap();
1459
1460 let current_membership = membership_coordinator.membership();
1461 {
1462 let membership_state = current_membership.read().await;
1463 let stake_table_fetcher = membership_state.fetcher();
1464
1465 let block1 = anvil_provider
1466 .get_block_number()
1467 .await
1468 .expect("latest l1 block");
1469
1470 assert_events_eq(
1471 &persistence,
1472 block1,
1473 stake_table_fetcher,
1474 &l1_client,
1475 stake_table_contract,
1476 )
1477 .await?;
1478 }
1479 let _epoch_4_blocks = client
1480 .socket("availability/stream/blocks/0")
1481 .subscribe::<BlockQueryData<SeqTypes>>()
1482 .await
1483 .unwrap()
1484 .take(65)
1485 .try_collect::<Vec<_>>()
1486 .await
1487 .unwrap();
1488 let block2 = anvil_provider
1489 .get_block_number()
1490 .await
1491 .expect("latest l1 block");
1492
1493 {
1494 let membership_state = current_membership.read().await;
1495 let stake_table_fetcher = membership_state.fetcher();
1496
1497 assert_events_eq(
1498 &persistence,
1499 block2,
1500 stake_table_fetcher,
1501 &l1_client,
1502 stake_table_contract,
1503 )
1504 .await?;
1505 }
1506 Ok(())
1507 }
1508
1509 #[rstest_reuse::apply(persistence_types)]
1510 pub async fn test_stake_table_background_fetching<P: TestablePersistence>(
1511 #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1512 stake_table_version: StakeTableContractVersion,
1513 _p: PhantomData<P>,
1514 ) -> anyhow::Result<()> {
1515 use espresso_types::v0_3::ChainConfig;
1516 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1517
1518 let blocks_per_epoch = 10;
1519
1520 let network_config = TestConfigBuilder::<1>::default()
1521 .epoch_height(blocks_per_epoch)
1522 .build();
1523
1524 let anvil_provider = network_config.anvil().unwrap();
1525
1526 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1527 &network_config.hotshot_config().hotshot_stake_table(),
1528 STAKE_TABLE_CAPACITY_FOR_TEST,
1529 )
1530 .unwrap();
1531
1532 let (_, priv_keys): (Vec<_>, Vec<_>) = (0..20)
1533 .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed([1; 32], i as u64))
1534 .unzip();
1535 let state_key_pairs = (0..20)
1536 .map(|i| StateKeyPair::generate_from_seed_indexed([2; 32], i as u64))
1537 .collect::<Vec<_>>();
1538
1539 let validators = staking_priv_keys(&priv_keys, &state_key_pairs, 20);
1540
1541 let deployer = ProviderBuilder::new()
1542 .wallet(EthereumWallet::from(network_config.signer().clone()))
1543 .connect_http(network_config.l1_url().clone());
1544
1545 let mut contracts = Contracts::new();
1546 let args = DeployerArgsBuilder::default()
1547 .deployer(deployer.clone())
1548 .rpc_url(network_config.l1_url().clone())
1549 .mock_light_client(true)
1550 .genesis_lc_state(genesis_state)
1551 .genesis_st_state(genesis_stake)
1552 .blocks_per_epoch(blocks_per_epoch)
1553 .epoch_start_block(1)
1554 .exit_escrow_period(U256::from(max(
1555 blocks_per_epoch * 15 + 100,
1556 DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1557 )))
1558 .multisig_pauser(network_config.signer().address())
1559 .token_name("Espresso".to_string())
1560 .token_symbol("ESP".to_string())
1561 .initial_token_supply(U256::from(3590000000u64))
1562 .ops_timelock_delay(U256::from(0))
1563 .ops_timelock_admin(network_config.signer().address())
1564 .ops_timelock_proposers(vec![network_config.signer().address()])
1565 .ops_timelock_executors(vec![network_config.signer().address()])
1566 .safe_exit_timelock_delay(U256::from(10))
1567 .safe_exit_timelock_admin(network_config.signer().address())
1568 .safe_exit_timelock_proposers(vec![network_config.signer().address()])
1569 .safe_exit_timelock_executors(vec![network_config.signer().address()])
1570 .build()
1571 .unwrap();
1572
1573 match stake_table_version {
1574 StakeTableContractVersion::V1 => args.deploy_to_stake_table_v1(&mut contracts).await,
1575 StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1576 }
1577 .expect("contracts deployed");
1578
1579 let st_addr = contracts
1580 .address(Contract::StakeTableProxy)
1581 .expect("StakeTableProxy deployed");
1582 let l1_url = network_config.l1_url().clone();
1583
1584 let mut planned_txns = StakingTransactions::create(
1585 l1_url.clone(),
1586 &deployer,
1587 st_addr,
1588 validators,
1589 None,
1590 DelegationConfig::MultipleDelegators,
1591 )
1592 .await
1593 .expect("stake table setup failed");
1594
1595 planned_txns
1596 .apply_prerequisites()
1597 .await
1598 .expect("prerequisites failed");
1599
1600 planned_txns.apply_one().await.expect("send tx failed");
1602
1603 anvil_provider
1605 .anvil_set_interval_mining(1)
1606 .await
1607 .expect("interval mining");
1608
1609 spawn({
1613 async move {
1614 {
1615 while let Some(receipt) =
1616 planned_txns.apply_one().await.expect("send tx failed")
1617 {
1618 tracing::debug!(?receipt, "transaction finalized");
1619 }
1620 }
1621 }
1622 });
1623
1624 let storage = P::tmp_storage().await;
1625 let persistence = P::options(&storage).create().await.unwrap();
1626
1627 let l1_client = L1ClientOptions {
1628 stake_table_update_interval: Duration::from_secs(7),
1629 l1_retry_delay: Duration::from_millis(10),
1630 l1_events_max_block_range: 10000,
1631 ..Default::default()
1632 }
1633 .connect(vec![l1_url])
1634 .unwrap();
1635 l1_client.spawn_tasks().await;
1636
1637 let fetcher = Fetcher::new(
1638 Arc::new(NullStateCatchup::default()),
1639 Arc::new(Mutex::new(persistence.clone())),
1640 l1_client.clone(),
1641 ChainConfig {
1642 stake_table_contract: Some(st_addr),
1643 base_fee: 0.into(),
1644 ..Default::default()
1645 },
1646 );
1647
1648 sleep(Duration::from_secs(20)).await;
1650
1651 fetcher.spawn_update_loop().await;
1652 let mut prev_l1_block = 0;
1653 let mut prev_events_len = 0;
1654 for _i in 0..10 {
1655 tokio::time::sleep(std::time::Duration::from_secs(8)).await;
1659
1660 let block = anvil_provider
1661 .get_block_number()
1662 .await
1663 .expect("latest l1 block");
1664
1665 let (read_offset, persisted_events) = persistence.load_events(0, block).await?;
1666 let read_offset = read_offset.unwrap();
1667 let l1_block = match read_offset {
1668 EventsPersistenceRead::Complete => block,
1669 EventsPersistenceRead::UntilL1Block(block) => block,
1670 };
1671
1672 tracing::info!("{l1_block:?}, persistence events = {persisted_events:?}.");
1673 assert!(persisted_events.len() > prev_events_len);
1674
1675 assert!(l1_block > prev_l1_block, "events not updated");
1676
1677 let contract_events =
1678 Fetcher::fetch_events_from_contract(l1_client.clone(), st_addr, None, l1_block)
1679 .await?;
1680 assert_eq!(persisted_events, contract_events);
1681
1682 prev_l1_block = l1_block;
1683 prev_events_len = persisted_events.len();
1684 }
1685
1686 Ok(())
1687 }
1688
1689 #[rstest_reuse::apply(persistence_types)]
1690 pub async fn test_membership_persistence<P: TestablePersistence>(
1691 _p: PhantomData<P>,
1692 ) -> anyhow::Result<()> {
1693 let tmp = P::tmp_storage().await;
1694 let mut opt = P::options(&tmp);
1695
1696 let storage = opt.create().await.unwrap();
1697
1698 let validator = Validator::mock();
1699 let mut st = IndexMap::new();
1700 st.insert(validator.account, validator);
1701
1702 storage
1703 .store_stake(EpochNumber::new(10), st.clone(), None, None)
1704 .await?;
1705
1706 let (table, ..) = storage.load_stake(EpochNumber::new(10)).await?.unwrap();
1707 assert_eq!(st, table);
1708
1709 let val2 = Validator::mock();
1710 let mut st2 = IndexMap::new();
1711 st2.insert(val2.account, val2);
1712 storage
1713 .store_stake(EpochNumber::new(11), st2.clone(), None, None)
1714 .await?;
1715
1716 let tables = storage.load_latest_stake(4).await?.unwrap();
1717 let mut iter = tables.iter();
1718 assert_eq!(
1719 Some(&(EpochNumber::new(11), (st2.clone(), None), None)),
1720 iter.next()
1721 );
1722 assert_eq!(Some(&(EpochNumber::new(10), (st, None), None)), iter.next());
1723 assert_eq!(None, iter.next());
1724
1725 for i in 0..=20 {
1726 storage
1727 .store_stake(EpochNumber::new(i), st2.clone(), None, None)
1728 .await?;
1729 }
1730
1731 let tables = storage.load_latest_stake(5).await?.unwrap();
1732 let mut iter = tables.iter();
1733 assert_eq!(
1734 Some(&(EpochNumber::new(20), (st2.clone(), None), None)),
1735 iter.next()
1736 );
1737 assert_eq!(
1738 Some(&(EpochNumber::new(19), (st2.clone(), None), None)),
1739 iter.next()
1740 );
1741 assert_eq!(
1742 Some(&(EpochNumber::new(18), (st2.clone(), None), None)),
1743 iter.next()
1744 );
1745 assert_eq!(
1746 Some(&(EpochNumber::new(17), (st2.clone(), None), None)),
1747 iter.next()
1748 );
1749 assert_eq!(
1750 Some(&(EpochNumber::new(16), (st2, None), None)),
1751 iter.next()
1752 );
1753 assert_eq!(None, iter.next());
1754
1755 Ok(())
1756 }
1757
1758 #[rstest_reuse::apply(persistence_types)]
1759 pub async fn test_store_and_load_all_validators<P: TestablePersistence>(
1760 _p: PhantomData<P>,
1761 ) -> anyhow::Result<()> {
1762 let tmp = P::tmp_storage().await;
1763 let mut opt = P::options(&tmp);
1764 let storage = opt.create().await.unwrap();
1765
1766 let mut vmap1 = IndexMap::new();
1767 for _i in 0..25 {
1768 let v = Validator::mock();
1769 vmap1.insert(v.account, v);
1770 }
1771 storage
1772 .store_all_validators(EpochNumber::new(10), vmap1.clone())
1773 .await?;
1774
1775 let mut expected_all: Vec<_> = vmap1.clone().into_values().collect();
1776 expected_all.sort_by_key(|v| v.account);
1777
1778 let loaded_all = storage
1780 .load_all_validators(EpochNumber::new(10), 0, 100)
1781 .await?;
1782 assert_eq!(expected_all, loaded_all);
1784
1785 let loaded_first_10 = storage
1787 .load_all_validators(EpochNumber::new(10), 0, 10)
1788 .await?;
1789
1790 assert_eq!(expected_all[..10], loaded_first_10);
1791
1792 let loaded_next_10 = storage
1794 .load_all_validators(EpochNumber::new(10), 10, 10)
1795 .await?;
1796
1797 assert_eq!(expected_all[10..20], loaded_next_10);
1798
1799 let loaded_last_5 = storage
1801 .load_all_validators(EpochNumber::new(10), 20, 10)
1802 .await?;
1803
1804 assert_eq!(expected_all[20..], loaded_last_5);
1805
1806 let loaded_empty = storage
1808 .load_all_validators(EpochNumber::new(10), 100, 10)
1809 .await?;
1810 assert!(loaded_empty.is_empty());
1811
1812 let validator2 = Validator::mock();
1814 let mut vmap2 = IndexMap::new();
1815 vmap2.insert(validator2.account, validator2.clone());
1816
1817 storage
1818 .store_all_validators(EpochNumber::new(11), vmap2.clone())
1819 .await?;
1820
1821 let mut expected_epoch11: Vec<_> = vmap2.clone().into_values().collect();
1822 expected_epoch11.sort_by_key(|v| v.account);
1823
1824 let loaded2 = storage
1825 .load_all_validators(EpochNumber::new(11), 0, 100)
1826 .await?;
1827
1828 assert_eq!(expected_epoch11, loaded2);
1829
1830 let loaded1_again = storage
1832 .load_all_validators(EpochNumber::new(10), 0, 100)
1833 .await?;
1834
1835 assert_eq!(expected_all, loaded1_again);
1836
1837 Ok(())
1838 }
1839
1840 #[rstest_reuse::apply(persistence_types)]
1841 pub async fn test_non_consecutive_decide<P: TestablePersistence>(_p: PhantomData<P>) {
1842 let tmp = P::tmp_storage().await;
1843 let storage = P::connect(&tmp).await;
1844
1845 let genesis_leaf: Leaf2 =
1846 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
1847 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1848 proposal: QuorumProposal2::<SeqTypes> {
1849 epoch: None,
1850 block_header: genesis_leaf.block_header().clone(),
1851 view_number: genesis_leaf.view_number(),
1852 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
1853 &ValidatedState::default(),
1854 &NodeState::mock(),
1855 )
1856 .await,
1857 upgrade_certificate: None,
1858 view_change_evidence: None,
1859 next_drb_result: None,
1860 next_epoch_justify_qc: None,
1861 state_cert: None,
1862 },
1863 };
1864
1865 let leaf0 = Leaf2::from_quorum_proposal(&quorum_proposal);
1866
1867 quorum_proposal.proposal.view_number = ViewNumber::new(2);
1868 *quorum_proposal.proposal.block_header.height_mut() = 2;
1869 quorum_proposal.proposal.justify_qc.view_number = ViewNumber::new(1);
1870 let leaf2 = Leaf2::from_quorum_proposal(&quorum_proposal);
1871
1872 let mut qc0 = leaf0.justify_qc();
1873 qc0.data.leaf_commit = Committable::commit(&leaf0);
1874
1875 let mut qc2 = leaf2.justify_qc();
1876 qc2.view_number += 1;
1877 qc2.data.leaf_commit = Committable::commit(&leaf2);
1878
1879 let mut deciding_qc = qc2.clone();
1880 deciding_qc.view_number += 1;
1881
1882 storage
1884 .append_decided_leaves(
1885 ViewNumber::new(0),
1886 [(
1887 &leaf_info(leaf0.clone()),
1888 CertificatePair::non_epoch_change(qc0),
1889 )],
1890 None,
1891 &FailConsumer,
1892 )
1893 .await
1894 .unwrap();
1895
1896 let consumer = EventCollector::default();
1900 storage
1901 .append_decided_leaves(
1902 ViewNumber::new(2),
1903 [(
1904 &leaf_info(leaf2.clone()),
1905 CertificatePair::non_epoch_change(qc2),
1906 )],
1907 Some(Arc::new(CertificatePair::non_epoch_change(
1908 deciding_qc.clone(),
1909 ))),
1910 &consumer,
1911 )
1912 .await
1913 .unwrap();
1914
1915 let events = consumer.events.read().await;
1916 assert_eq!(events.len(), 2);
1917
1918 let EventType::Decide {
1919 leaf_chain: leaf_chain0,
1920 deciding_qc: deciding_qc0,
1921 ..
1922 } = &events[0].event
1923 else {
1924 panic!("expected decide event, got {:?}", events[0].event);
1925 };
1926 assert_eq!(leaf_chain0.len(), 1);
1927 assert_eq!(leaf_chain0[0].leaf, leaf0);
1928 assert_eq!(*deciding_qc0, None);
1929
1930 let EventType::Decide {
1931 leaf_chain: leaf_chain2,
1932 deciding_qc: deciding_qc2,
1933 ..
1934 } = &events[1].event
1935 else {
1936 panic!("expected decide event, got {:?}", events[1].event);
1937 };
1938 assert_eq!(leaf_chain2.len(), 1);
1939 assert_eq!(leaf_chain2[0].leaf, leaf2);
1940 assert_eq!(deciding_qc2.as_ref().unwrap().qc(), &deciding_qc);
1941 }
1942}