1use async_trait::async_trait;
12use espresso_types::v0_3::ChainConfig;
13
14pub mod fs;
15pub mod no_storage;
16mod persistence_metrics;
17pub mod sql;
18
19#[async_trait]
20pub trait ChainConfigPersistence: Sized + Send + Sync {
21 async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()>;
22}
23
24#[cfg(test)]
25mod tests {
26 use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
27
28 use alloy::{
29 network::EthereumWallet,
30 primitives::{Address, U256},
31 providers::{ext::AnvilApi, Provider, ProviderBuilder},
32 };
33 use anyhow::bail;
34 use async_lock::{Mutex, RwLock};
35 use async_trait::async_trait;
36 use committable::{Commitment, Committable};
37 use espresso_contract_deployer::{
38 builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
39 Contract, Contracts,
40 };
41 use espresso_types::{
42 traits::{
43 EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer,
44 PersistenceOptions, SequencerPersistence,
45 },
46 v0_3::{Fetcher, Validator},
47 Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes,
48 SequencerVersions, ValidatedState,
49 };
50 use futures::{future::join_all, StreamExt, TryStreamExt};
51 use hotshot::{
52 types::{BLSPubKey, SignatureKey},
53 InitializerEpochInfo,
54 };
55 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
56 use hotshot_example_types::node_types::TestVersions;
57 use hotshot_query_service::{availability::BlockQueryData, testing::mocks::MockVersions};
58 use hotshot_types::{
59 data::{
60 ns_table::parse_ns_table, vid_commitment, vid_disperse::VidDisperseShare2, DaProposal2,
61 EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment, VidDisperseShare,
62 ViewNumber,
63 },
64 event::{EventType, HotShotAction, LeafInfo},
65 light_client::StateKeyPair,
66 message::{convert_proposal, Proposal, UpgradeLock},
67 simple_certificate::{
68 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
69 },
70 simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData},
71 traits::{
72 block_contents::BlockHeader,
73 node_implementation::{ConsensusTime, Versions},
74 EncodeBytes,
75 },
76 utils::EpochTransitionIndicator,
77 vid::avidm::{init_avidm_param, AvidMScheme},
78 vote::HasViewNumber,
79 };
80 use indexmap::IndexMap;
81 use portpicker::pick_unused_port;
82 use staking_cli::demo::{setup_stake_table_contract_for_test, DelegationConfig};
83 use surf_disco::Client;
84 use tide_disco::error::ServerError;
85 use tokio::{spawn, time::sleep};
86 use vbs::version::{StaticVersion, StaticVersionType, Version};
87
88 use crate::{
89 api::{
90 test_helpers::{TestNetwork, TestNetworkConfigBuilder, STAKE_TABLE_CAPACITY_FOR_TEST},
91 Options,
92 },
93 catchup::NullStateCatchup,
94 testing::{staking_priv_keys, TestConfigBuilder},
95 SequencerApiVersion, RECENT_STAKE_TABLES_LIMIT,
96 };
97
98 #[async_trait]
99 pub trait TestablePersistence: SequencerPersistence + MembershipPersistence {
100 type Storage: Sync;
101
102 async fn tmp_storage() -> Self::Storage;
103 fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self>;
104
105 async fn connect(storage: &Self::Storage) -> Self {
106 Self::options(storage).create().await.unwrap()
107 }
108 }
109
110 #[rstest_reuse::template]
111 #[rstest::rstest]
112 #[case(PhantomData::<crate::persistence::sql::Persistence>)]
113 #[case(PhantomData::<crate::persistence::fs::Persistence>)]
114 #[test_log::test(tokio::test(flavor = "multi_thread"))]
115 pub fn persistence_types<P: TestablePersistence>(#[case] _p: PhantomData<P>) {}
116
117 #[derive(Clone, Debug, Default)]
118 struct EventCollector {
119 events: Arc<RwLock<Vec<Event>>>,
120 }
121
122 impl EventCollector {
123 async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
124 self.events
125 .read()
126 .await
127 .iter()
128 .flat_map(|event| {
129 let EventType::Decide { leaf_chain, .. } = &event.event else {
130 panic!("expected decide event, got {event:?}");
131 };
132 leaf_chain.iter().cloned().rev()
133 })
134 .collect::<Vec<_>>()
135 }
136 }
137
138 #[async_trait]
139 impl EventConsumer for EventCollector {
140 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
141 self.events.write().await.push(event.clone());
142 Ok(())
143 }
144 }
145
146 #[rstest_reuse::apply(persistence_types)]
147 pub async fn test_voted_view<P: TestablePersistence>(_p: PhantomData<P>) {
148 let tmp = P::tmp_storage().await;
149 let storage = P::connect(&tmp).await;
150
151 assert_eq!(storage.load_latest_acted_view().await.unwrap(), None);
153
154 let view1 = ViewNumber::genesis();
156 storage
157 .record_action(view1, None, HotShotAction::Vote)
158 .await
159 .unwrap();
160 assert_eq!(
161 storage.load_latest_acted_view().await.unwrap().unwrap(),
162 view1
163 );
164
165 let view2 = view1 + 1;
167 storage
168 .record_action(view2, None, HotShotAction::Vote)
169 .await
170 .unwrap();
171 assert_eq!(
172 storage.load_latest_acted_view().await.unwrap().unwrap(),
173 view2
174 );
175
176 storage
178 .record_action(view1, None, HotShotAction::Vote)
179 .await
180 .unwrap();
181 assert_eq!(
182 storage.load_latest_acted_view().await.unwrap().unwrap(),
183 view2
184 );
185 }
186
187 #[rstest_reuse::apply(persistence_types)]
188 pub async fn test_restart_view<P: TestablePersistence>(_p: PhantomData<P>) {
189 let tmp = P::tmp_storage().await;
190 let storage = P::connect(&tmp).await;
191
192 assert_eq!(storage.load_restart_view().await.unwrap(), None);
194
195 let view1 = ViewNumber::genesis();
197 storage
198 .record_action(view1, None, HotShotAction::Vote)
199 .await
200 .unwrap();
201 assert_eq!(
202 storage.load_restart_view().await.unwrap().unwrap(),
203 view1 + 1
204 );
205
206 let view2 = view1 + 1;
208 storage
209 .record_action(view2, None, HotShotAction::Vote)
210 .await
211 .unwrap();
212 assert_eq!(
213 storage.load_restart_view().await.unwrap().unwrap(),
214 view2 + 1
215 );
216
217 storage
219 .record_action(view1, None, HotShotAction::Vote)
220 .await
221 .unwrap();
222 assert_eq!(
223 storage.load_restart_view().await.unwrap().unwrap(),
224 view2 + 1
225 );
226
227 storage
229 .record_action(view2 + 1, None, HotShotAction::Propose)
230 .await
231 .unwrap();
232 assert_eq!(
233 storage.load_restart_view().await.unwrap().unwrap(),
234 view2 + 1
235 );
236
237 storage
239 .record_action(view2 + 1, None, HotShotAction::TimeoutVote)
240 .await
241 .unwrap();
242 assert_eq!(
243 storage.load_restart_view().await.unwrap().unwrap(),
244 view2 + 1
245 );
246 }
247
248 #[rstest_reuse::apply(persistence_types)]
249 pub async fn test_store_drb_input<P: TestablePersistence>(_p: PhantomData<P>) {
250 use hotshot_types::drb::DrbInput;
251
252 let tmp = P::tmp_storage().await;
253 let storage = P::connect(&tmp).await;
254 let difficulty_level = 10;
255
256 if storage.load_drb_input(10).await.is_ok() {
258 panic!("unexpected nonempty drb_input");
259 }
260
261 let drb_input_1 = DrbInput {
262 epoch: 10,
263 iteration: 10,
264 value: [0u8; 32],
265 difficulty_level,
266 };
267
268 let drb_input_2 = DrbInput {
269 epoch: 10,
270 iteration: 20,
271 value: [0u8; 32],
272 difficulty_level,
273 };
274
275 let drb_input_3 = DrbInput {
276 epoch: 10,
277 iteration: 30,
278 value: [0u8; 32],
279 difficulty_level,
280 };
281
282 let _ = storage.store_drb_input(drb_input_1.clone()).await;
283
284 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_1);
285
286 let _ = storage.store_drb_input(drb_input_3.clone()).await;
287
288 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
290
291 let _ = storage.store_drb_input(drb_input_2.clone()).await;
292
293 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
295 }
296
297 #[rstest_reuse::apply(persistence_types)]
298 pub async fn test_epoch_info<P: TestablePersistence>(_p: PhantomData<P>) {
299 let tmp = P::tmp_storage().await;
300 let storage = P::connect(&tmp).await;
301
302 assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
304
305 storage
307 .store_drb_result(EpochNumber::new(1), [1; 32])
308 .await
309 .unwrap();
310 assert_eq!(
311 storage.load_start_epoch_info().await.unwrap(),
312 vec![InitializerEpochInfo::<SeqTypes> {
313 epoch: EpochNumber::new(1),
314 drb_result: [1; 32],
315 block_header: None,
316 }]
317 );
318
319 storage
321 .store_drb_result(EpochNumber::new(2), [3; 32])
322 .await
323 .unwrap();
324 assert_eq!(
325 storage.load_start_epoch_info().await.unwrap(),
326 vec![
327 InitializerEpochInfo::<SeqTypes> {
328 epoch: EpochNumber::new(1),
329 drb_result: [1; 32],
330 block_header: None,
331 },
332 InitializerEpochInfo::<SeqTypes> {
333 epoch: EpochNumber::new(2),
334 drb_result: [3; 32],
335 block_header: None,
336 }
337 ]
338 );
339
340 let instance_state = NodeState::mock();
342 let validated_state = hotshot_types::traits::ValidatedState::genesis(&instance_state).0;
343 let leaf: Leaf2 = Leaf::genesis::<MockVersions>(&validated_state, &instance_state)
344 .await
345 .into();
346 let header = leaf.block_header().clone();
347
348 storage
350 .store_epoch_root(EpochNumber::new(1), header.clone())
351 .await
352 .unwrap();
353 assert_eq!(
354 storage.load_start_epoch_info().await.unwrap(),
355 vec![
356 InitializerEpochInfo::<SeqTypes> {
357 epoch: EpochNumber::new(1),
358 drb_result: [1; 32],
359 block_header: Some(header.clone()),
360 },
361 InitializerEpochInfo::<SeqTypes> {
362 epoch: EpochNumber::new(2),
363 drb_result: [3; 32],
364 block_header: None,
365 }
366 ]
367 );
368
369 let total_epochs = RECENT_STAKE_TABLES_LIMIT + 10;
371 for i in 0..total_epochs {
372 let epoch = EpochNumber::new(i);
373 let drb = [i as u8; 32];
374 storage
375 .store_drb_result(epoch, drb)
376 .await
377 .unwrap_or_else(|_| panic!("Failed to store DRB result for epoch {i}"));
378 }
379
380 let results = storage.load_start_epoch_info().await.unwrap();
381
382 assert_eq!(
384 results.len(),
385 RECENT_STAKE_TABLES_LIMIT as usize,
386 "Should return only the most recent {RECENT_STAKE_TABLES_LIMIT} epochs",
387 );
388
389 for (i, info) in results.iter().enumerate() {
390 let expected_epoch =
391 EpochNumber::new(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64);
392 let expected_drb = [(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64) as u8; 32];
393 assert_eq!(info.epoch, expected_epoch, "invalid epoch at index {i}",);
394 assert_eq!(info.drb_result, expected_drb, "invalid DRB at index {i}",);
395 assert!(info.block_header.is_none(), "Expected no block header");
396 }
397 }
398
399 fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
400 LeafInfo {
401 leaf,
402 vid_share: None,
403 state: Default::default(),
404 delta: None,
405 state_cert: None,
406 }
407 }
408
409 #[rstest_reuse::apply(persistence_types)]
410 pub async fn test_append_and_decide<P: TestablePersistence>(_p: PhantomData<P>) {
411 let tmp = P::tmp_storage().await;
412 let storage = P::connect(&tmp).await;
413
414 assert_eq!(
416 storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
417 None
418 );
419
420 let leaf: Leaf2 =
421 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
422 let leaf_payload = leaf.block_payload().unwrap();
423 let leaf_payload_bytes_arc = leaf_payload.encode();
424
425 let avidm_param = init_avidm_param(2).unwrap();
426 let weights = vec![1u32; 2];
427
428 let ns_table = parse_ns_table(
429 leaf_payload.byte_len().as_usize(),
430 &leaf_payload.ns_table().encode(),
431 );
432 let (payload_commitment, shares) =
433 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
434 .unwrap();
435
436 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
437 let signature = PubKey::sign(&privkey, &[]).unwrap();
438 let mut vid = VidDisperseShare2::<SeqTypes> {
439 view_number: ViewNumber::new(0),
440 payload_commitment,
441 share: shares[0].clone(),
442 recipient_key: pubkey,
443 epoch: Some(EpochNumber::new(0)),
444 target_epoch: Some(EpochNumber::new(0)),
445 common: avidm_param,
446 };
447 let mut quorum_proposal = Proposal {
448 data: QuorumProposalWrapper::<SeqTypes> {
449 proposal: QuorumProposal2::<SeqTypes> {
450 epoch: None,
451 block_header: leaf.block_header().clone(),
452 view_number: ViewNumber::genesis(),
453 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
454 &ValidatedState::default(),
455 &NodeState::mock(),
456 )
457 .await,
458 upgrade_certificate: None,
459 view_change_evidence: None,
460 next_drb_result: None,
461 next_epoch_justify_qc: None,
462 state_cert: None,
463 },
464 },
465 signature,
466 _pd: Default::default(),
467 };
468
469 let vid_share0 = vid.clone().to_proposal(&privkey).unwrap().clone();
470
471 storage.append_vid2(&vid_share0).await.unwrap();
472
473 assert_eq!(
474 storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
475 Some(convert_proposal(vid_share0.clone()))
476 );
477
478 vid.view_number = ViewNumber::new(1);
479
480 let vid_share1 = vid.clone().to_proposal(&privkey).unwrap().clone();
481 storage.append_vid2(&vid_share1).await.unwrap();
482
483 assert_eq!(
484 storage.load_vid_share(vid.view_number()).await.unwrap(),
485 Some(convert_proposal(vid_share1.clone()))
486 );
487
488 vid.view_number = ViewNumber::new(2);
489
490 let vid_share2 = vid.clone().to_proposal(&privkey).unwrap().clone();
491 storage.append_vid2(&vid_share2).await.unwrap();
492
493 assert_eq!(
494 storage.load_vid_share(vid.view_number()).await.unwrap(),
495 Some(convert_proposal(vid_share2.clone()))
496 );
497
498 vid.view_number = ViewNumber::new(3);
499
500 let vid_share3 = vid.clone().to_proposal(&privkey).unwrap().clone();
501 storage.append_vid2(&vid_share3).await.unwrap();
502
503 assert_eq!(
504 storage.load_vid_share(vid.view_number()).await.unwrap(),
505 Some(convert_proposal(vid_share3.clone()))
506 );
507
508 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
509 .expect("Failed to sign block payload");
510
511 let da_proposal_inner = DaProposal2::<SeqTypes> {
512 encoded_transactions: leaf_payload_bytes_arc.clone(),
513 metadata: leaf_payload.ns_table().clone(),
514 view_number: ViewNumber::new(0),
515 epoch: None,
516 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
517 };
518
519 let da_proposal = Proposal {
520 data: da_proposal_inner,
521 signature: block_payload_signature,
522 _pd: Default::default(),
523 };
524
525 let vid_commitment = vid_commitment::<TestVersions>(
526 &leaf_payload_bytes_arc,
527 &leaf.block_header().metadata().encode(),
528 2,
529 <TestVersions as Versions>::Base::VERSION,
530 );
531
532 storage
533 .append_da2(&da_proposal, vid_commitment)
534 .await
535 .unwrap();
536
537 assert_eq!(
538 storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(),
539 Some(da_proposal.clone())
540 );
541
542 let mut da_proposal1 = da_proposal.clone();
543 da_proposal1.data.view_number = ViewNumber::new(1);
544 storage
545 .append_da2(&da_proposal1.clone(), vid_commitment)
546 .await
547 .unwrap();
548
549 assert_eq!(
550 storage
551 .load_da_proposal(da_proposal1.data.view_number)
552 .await
553 .unwrap(),
554 Some(da_proposal1.clone())
555 );
556
557 let mut da_proposal2 = da_proposal1.clone();
558 da_proposal2.data.view_number = ViewNumber::new(2);
559 storage
560 .append_da2(&da_proposal2.clone(), vid_commitment)
561 .await
562 .unwrap();
563
564 assert_eq!(
565 storage
566 .load_da_proposal(da_proposal2.data.view_number)
567 .await
568 .unwrap(),
569 Some(da_proposal2.clone())
570 );
571
572 let mut da_proposal3 = da_proposal2.clone();
573 da_proposal3.data.view_number = ViewNumber::new(3);
574 storage
575 .append_da2(&da_proposal3.clone(), vid_commitment)
576 .await
577 .unwrap();
578
579 assert_eq!(
580 storage
581 .load_da_proposal(da_proposal3.data.view_number)
582 .await
583 .unwrap(),
584 Some(da_proposal3.clone())
585 );
586
587 let quorum_proposal1 = quorum_proposal.clone();
588
589 storage
590 .append_quorum_proposal2(&quorum_proposal1)
591 .await
592 .unwrap();
593
594 assert_eq!(
595 storage.load_quorum_proposals().await.unwrap(),
596 BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())])
597 );
598
599 quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
600 let quorum_proposal2 = quorum_proposal.clone();
601 storage
602 .append_quorum_proposal2(&quorum_proposal2)
603 .await
604 .unwrap();
605
606 assert_eq!(
607 storage.load_quorum_proposals().await.unwrap(),
608 BTreeMap::from_iter([
609 (ViewNumber::genesis(), quorum_proposal1.clone()),
610 (ViewNumber::new(1), quorum_proposal2.clone())
611 ])
612 );
613
614 quorum_proposal.data.proposal.view_number = ViewNumber::new(2);
615 quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(1);
616 let quorum_proposal3 = quorum_proposal.clone();
617 storage
618 .append_quorum_proposal2(&quorum_proposal3)
619 .await
620 .unwrap();
621
622 assert_eq!(
623 storage.load_quorum_proposals().await.unwrap(),
624 BTreeMap::from_iter([
625 (ViewNumber::genesis(), quorum_proposal1.clone()),
626 (ViewNumber::new(1), quorum_proposal2.clone()),
627 (ViewNumber::new(2), quorum_proposal3.clone())
628 ])
629 );
630
631 quorum_proposal.data.proposal.view_number = ViewNumber::new(3);
632 quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(2);
633
634 let quorum_proposal4 = quorum_proposal.clone();
636 storage
637 .append_quorum_proposal2(&quorum_proposal4)
638 .await
639 .unwrap();
640
641 assert_eq!(
642 storage.load_quorum_proposals().await.unwrap(),
643 BTreeMap::from_iter([
644 (ViewNumber::genesis(), quorum_proposal1.clone()),
645 (ViewNumber::new(1), quorum_proposal2.clone()),
646 (ViewNumber::new(2), quorum_proposal3.clone()),
647 (ViewNumber::new(3), quorum_proposal4.clone())
648 ])
649 );
650
651 let leaves = [
654 Leaf2::from_quorum_proposal(&quorum_proposal1.data),
655 Leaf2::from_quorum_proposal(&quorum_proposal2.data),
656 Leaf2::from_quorum_proposal(&quorum_proposal3.data),
657 Leaf2::from_quorum_proposal(&quorum_proposal4.data),
658 ];
659 let mut final_qc = leaves[3].justify_qc();
660 final_qc.view_number += 1;
661 final_qc.data.leaf_commit = Committable::commit(&leaf);
662 let qcs = [
663 leaves[1].justify_qc(),
664 leaves[2].justify_qc(),
665 leaves[3].justify_qc(),
666 final_qc,
667 ];
668
669 assert_eq!(
670 storage.load_anchor_view().await.unwrap(),
671 ViewNumber::genesis()
672 );
673
674 let consumer = EventCollector::default();
675 let leaf_chain = leaves
676 .iter()
677 .take(3)
678 .map(|leaf| leaf_info(leaf.clone()))
679 .zip(&qcs)
680 .collect::<Vec<_>>();
681 tracing::info!(?leaf_chain, "decide view 2");
682 storage
683 .append_decided_leaves(
684 ViewNumber::new(2),
685 leaf_chain.iter().map(|(leaf, qc)| (leaf, (*qc).clone())),
686 &consumer,
687 )
688 .await
689 .unwrap();
690 assert_eq!(
691 storage.load_anchor_view().await.unwrap(),
692 ViewNumber::new(2)
693 );
694
695 for i in 0..=2 {
696 assert_eq!(
697 storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(),
698 None
699 );
700
701 assert_eq!(
702 storage.load_vid_share(ViewNumber::new(i)).await.unwrap(),
703 None
704 );
705 }
706
707 assert_eq!(
708 storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
709 Some(da_proposal3)
710 );
711
712 assert_eq!(
713 storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
714 Some(convert_proposal(vid_share3.clone()))
715 );
716
717 let proposals = storage.load_quorum_proposals().await.unwrap();
718 assert_eq!(
719 proposals,
720 BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)])
721 );
722
723 for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
725 assert_eq!(info.leaf, *leaf);
726 let decided_vid_share = info.vid_share.as_ref().unwrap();
727 let view_number = match decided_vid_share {
728 VidDisperseShare::V0(share) => share.view_number,
729 VidDisperseShare::V1(share) => share.view_number,
730 };
731 assert_eq!(view_number, leaf.view_number());
732 }
733
734 assert_eq!(
736 storage.load_anchor_leaf().await.unwrap(),
737 Some((leaves[2].clone(), qcs[2].clone()))
738 );
739 assert_eq!(
740 storage.load_anchor_view().await.unwrap(),
741 leaves[2].view_number()
742 );
743
744 let consumer = EventCollector::default();
746 tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3");
747 storage
748 .append_decided_leaves(
749 ViewNumber::new(3),
750 vec![(&leaf_info(leaves[3].clone()), qcs[3].clone())],
751 &consumer,
752 )
753 .await
754 .unwrap();
755 assert_eq!(
756 storage.load_anchor_view().await.unwrap(),
757 ViewNumber::new(3)
758 );
759
760 let events = consumer.events.read().await;
762 assert_eq!(events.len(), 1);
763 assert_eq!(events[0].view_number, ViewNumber::new(3));
764 let EventType::Decide { qc, leaf_chain, .. } = &events[0].event else {
765 panic!("expected decide event, got {:?}", events[0]);
766 };
767 assert_eq!(**qc, qcs[3]);
768 assert_eq!(leaf_chain.len(), 1);
769 let info = &leaf_chain[0];
770 assert_eq!(info.leaf, leaves[3]);
771
772 assert_eq!(
774 storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
775 None
776 );
777
778 assert_eq!(
779 storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
780 None
781 );
782 assert_eq!(
783 storage.load_quorum_proposals().await.unwrap(),
784 BTreeMap::new()
785 );
786 }
787
788 #[rstest_reuse::apply(persistence_types)]
789 pub async fn test_upgrade_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
790 let tmp = P::tmp_storage().await;
791 let storage = P::connect(&tmp).await;
792
793 assert_eq!(storage.load_upgrade_certificate().await.unwrap(), None);
795
796 let upgrade_data = UpgradeProposalData {
797 old_version: Version { major: 0, minor: 1 },
798 new_version: Version { major: 1, minor: 0 },
799 decide_by: ViewNumber::genesis(),
800 new_version_hash: Default::default(),
801 old_version_last_view: ViewNumber::genesis(),
802 new_version_first_view: ViewNumber::genesis(),
803 };
804
805 let decide_upgrade_certificate = UpgradeCertificate::<SeqTypes>::new(
806 upgrade_data.clone(),
807 upgrade_data.commit(),
808 ViewNumber::genesis(),
809 Default::default(),
810 Default::default(),
811 );
812 let res = storage
813 .store_upgrade_certificate(Some(decide_upgrade_certificate.clone()))
814 .await;
815 assert!(res.is_ok());
816
817 let res = storage.load_upgrade_certificate().await.unwrap();
818 let view_number = res.unwrap().view_number;
819 assert_eq!(view_number, ViewNumber::genesis());
820
821 let new_view_number_for_certificate = ViewNumber::new(50);
822 let mut new_upgrade_certificate = decide_upgrade_certificate.clone();
823 new_upgrade_certificate.view_number = new_view_number_for_certificate;
824
825 let res = storage
826 .store_upgrade_certificate(Some(new_upgrade_certificate.clone()))
827 .await;
828 assert!(res.is_ok());
829
830 let res = storage.load_upgrade_certificate().await.unwrap();
831 let view_number = res.unwrap().view_number;
832 assert_eq!(view_number, new_view_number_for_certificate);
833 }
834
835 #[rstest_reuse::apply(persistence_types)]
836 pub async fn test_next_epoch_quorum_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
837 let tmp = P::tmp_storage().await;
838 let storage = P::connect(&tmp).await;
839
840 assert_eq!(
842 storage.load_next_epoch_quorum_certificate().await.unwrap(),
843 None
844 );
845
846 let upgrade_lock = UpgradeLock::<SeqTypes, TestVersions>::new();
847
848 let genesis_view = ViewNumber::genesis();
849
850 let leaf =
851 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::default()).await;
852 let data: NextEpochQuorumData2<SeqTypes> = QuorumData2 {
853 leaf_commit: leaf.commit(),
854 epoch: Some(EpochNumber::new(1)),
855 block_number: Some(leaf.height()),
856 }
857 .into();
858
859 let versioned_data =
860 VersionedVoteData::new_infallible(data.clone(), genesis_view, &upgrade_lock).await;
861
862 let bytes: [u8; 32] = versioned_data.commit().into();
863
864 let next_epoch_qc = NextEpochQuorumCertificate2::new(
865 data,
866 Commitment::from_raw(bytes),
867 genesis_view,
868 None,
869 PhantomData,
870 );
871
872 let res = storage
873 .store_next_epoch_quorum_certificate(next_epoch_qc.clone())
874 .await;
875 assert!(res.is_ok());
876
877 let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
878 let view_number = res.unwrap().view_number;
879 assert_eq!(view_number, ViewNumber::genesis());
880
881 let new_view_number_for_qc = ViewNumber::new(50);
882 let mut new_qc = next_epoch_qc.clone();
883 new_qc.view_number = new_view_number_for_qc;
884
885 let res = storage
886 .store_next_epoch_quorum_certificate(new_qc.clone())
887 .await;
888 assert!(res.is_ok());
889
890 let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
891 let view_number = res.unwrap().view_number;
892 assert_eq!(view_number, new_view_number_for_qc);
893 }
894
895 #[rstest_reuse::apply(persistence_types)]
896 pub async fn test_decide_with_failing_event_consumer<P: TestablePersistence>(
897 _p: PhantomData<P>,
898 ) {
899 #[derive(Clone, Copy, Debug)]
900 struct FailConsumer;
901
902 #[async_trait]
903 impl EventConsumer for FailConsumer {
904 async fn handle_event(&self, _: &Event) -> anyhow::Result<()> {
905 bail!("mock error injection");
906 }
907 }
908
909 let tmp = P::tmp_storage().await;
910 let storage = P::connect(&tmp).await;
911
912 let mut chain = vec![];
914
915 let leaf: Leaf2 =
916 Leaf::genesis::<MockVersions>(&ValidatedState::default(), &NodeState::mock())
917 .await
918 .into();
919 let leaf_payload = leaf.block_payload().unwrap();
920 let leaf_payload_bytes_arc = leaf_payload.encode();
921 let avidm_param = init_avidm_param(2).unwrap();
922 let weights = vec![1u32; 2];
923 let ns_table = parse_ns_table(
924 leaf_payload.byte_len().as_usize(),
925 &leaf_payload.ns_table().encode(),
926 );
927 let (payload_commitment, shares) =
928 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
929 .unwrap();
930
931 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
932 let mut vid = VidDisperseShare2::<SeqTypes> {
933 view_number: ViewNumber::new(0),
934 payload_commitment,
935 share: shares[0].clone(),
936 recipient_key: pubkey,
937 epoch: Some(EpochNumber::new(0)),
938 target_epoch: Some(EpochNumber::new(0)),
939 common: avidm_param,
940 }
941 .to_proposal(&privkey)
942 .unwrap()
943 .clone();
944 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
945 proposal: QuorumProposal2::<SeqTypes> {
946 block_header: leaf.block_header().clone(),
947 view_number: ViewNumber::genesis(),
948 justify_qc: QuorumCertificate::genesis::<TestVersions>(
949 &ValidatedState::default(),
950 &NodeState::mock(),
951 )
952 .await
953 .to_qc2(),
954 upgrade_certificate: None,
955 view_change_evidence: None,
956 next_drb_result: None,
957 next_epoch_justify_qc: None,
958 epoch: None,
959 state_cert: None,
960 },
961 };
962 let mut qc = QuorumCertificate2::genesis::<TestVersions>(
963 &ValidatedState::default(),
964 &NodeState::mock(),
965 )
966 .await;
967
968 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
969 .expect("Failed to sign block payload");
970 let mut da_proposal = Proposal {
971 data: DaProposal2::<SeqTypes> {
972 encoded_transactions: leaf_payload_bytes_arc.clone(),
973 metadata: leaf_payload.ns_table().clone(),
974 view_number: ViewNumber::new(0),
975 epoch: Some(EpochNumber::new(0)),
976 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
977 },
978 signature: block_payload_signature,
979 _pd: Default::default(),
980 };
981
982 let vid_commitment = vid_commitment::<TestVersions>(
983 &leaf_payload_bytes_arc,
984 &leaf.block_header().metadata().encode(),
985 2,
986 <TestVersions as Versions>::Base::VERSION,
987 );
988
989 for i in 0..4 {
990 quorum_proposal.proposal.view_number = ViewNumber::new(i);
991 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
992 qc.view_number = leaf.view_number();
993 qc.data.leaf_commit = Committable::commit(&leaf);
994 vid.data.view_number = leaf.view_number();
995 da_proposal.data.view_number = leaf.view_number();
996 chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone()));
997 }
998
999 for (_, _, vid, da) in &chain {
1001 tracing::info!(?da, ?vid, "insert proposal");
1002 storage.append_da2(da, vid_commitment).await.unwrap();
1003 storage.append_vid2(vid).await.unwrap();
1004 }
1005
1006 let leaf_chain = chain
1008 .iter()
1009 .take(2)
1010 .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1011 .collect::<Vec<_>>();
1012 tracing::info!("decide with event handling failure");
1013 storage
1014 .append_decided_leaves(
1015 ViewNumber::new(1),
1016 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
1017 &FailConsumer,
1018 )
1019 .await
1020 .unwrap();
1021 for i in 0..4 {
1023 tracing::info!(i, "check proposal availability");
1024 assert!(storage
1025 .load_vid_share(ViewNumber::new(i))
1026 .await
1027 .unwrap()
1028 .is_some());
1029 assert!(storage
1030 .load_da_proposal(ViewNumber::new(i))
1031 .await
1032 .unwrap()
1033 .is_some());
1034 }
1035 tracing::info!("check anchor leaf updated");
1036 assert_eq!(
1037 storage
1038 .load_anchor_leaf()
1039 .await
1040 .unwrap()
1041 .unwrap()
1042 .0
1043 .view_number(),
1044 ViewNumber::new(1)
1045 );
1046 assert_eq!(
1047 storage.load_anchor_view().await.unwrap(),
1048 ViewNumber::new(1)
1049 );
1050
1051 let consumer = EventCollector::default();
1054 let leaf_chain = chain
1055 .iter()
1056 .skip(2)
1057 .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1058 .collect::<Vec<_>>();
1059 tracing::info!("decide successfully");
1060 storage
1061 .append_decided_leaves(
1062 ViewNumber::new(3),
1063 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
1064 &consumer,
1065 )
1066 .await
1067 .unwrap();
1068 for i in 0..4 {
1070 tracing::info!(i, "check proposal garbage collected");
1071 assert!(storage
1072 .load_vid_share(ViewNumber::new(i))
1073 .await
1074 .unwrap()
1075 .is_none());
1076 assert!(storage
1077 .load_da_proposal(ViewNumber::new(i))
1078 .await
1079 .unwrap()
1080 .is_none());
1081 }
1082 tracing::info!("check anchor leaf updated");
1083 assert_eq!(
1084 storage
1085 .load_anchor_leaf()
1086 .await
1087 .unwrap()
1088 .unwrap()
1089 .0
1090 .view_number(),
1091 ViewNumber::new(3)
1092 );
1093 assert_eq!(
1094 storage.load_anchor_view().await.unwrap(),
1095 ViewNumber::new(3)
1096 );
1097
1098 tracing::info!("check decide event");
1100 let leaf_chain = consumer.leaf_chain().await;
1101 assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
1102 for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) {
1103 assert_eq!(info.leaf, *leaf);
1104 let decided_vid_share = info.vid_share.as_ref().unwrap();
1105 let view_number = match decided_vid_share {
1106 VidDisperseShare::V0(share) => share.view_number,
1107 VidDisperseShare::V1(share) => share.view_number,
1108 };
1109 assert_eq!(view_number, leaf.view_number());
1110 assert!(info.leaf.block_payload().is_some());
1111 }
1112 }
1113
1114 #[rstest_reuse::apply(persistence_types)]
1115 pub async fn test_pruning<P: TestablePersistence>(_p: PhantomData<P>) {
1116 let tmp = P::tmp_storage().await;
1117
1118 let mut options = P::options(&tmp);
1119 options.set_view_retention(1);
1120 let storage = options.create().await.unwrap();
1121
1122 let leaf =
1124 Leaf::genesis::<MockVersions>(&ValidatedState::default(), &NodeState::mock()).await;
1125 let leaf_payload = leaf.block_payload().unwrap();
1126 let leaf_payload_bytes_arc = leaf_payload.encode();
1127 let avidm_param = init_avidm_param(2).unwrap();
1128 let weights = vec![1u32; 2];
1129
1130 let ns_table = parse_ns_table(
1131 leaf_payload.byte_len().as_usize(),
1132 &leaf_payload.ns_table().encode(),
1133 );
1134 let (payload_commitment, shares) =
1135 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1136 .unwrap();
1137
1138 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1139 let vid_share = VidDisperseShare2::<SeqTypes> {
1140 view_number: ViewNumber::new(0),
1141 payload_commitment,
1142 share: shares[0].clone(),
1143 recipient_key: pubkey,
1144 epoch: None,
1145 target_epoch: None,
1146 common: avidm_param,
1147 }
1148 .to_proposal(&privkey)
1149 .unwrap()
1150 .clone();
1151
1152 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1153 proposal: QuorumProposal2::<SeqTypes> {
1154 block_header: leaf.block_header().clone(),
1155 view_number: ViewNumber::genesis(),
1156 justify_qc: QuorumCertificate::genesis::<TestVersions>(
1157 &ValidatedState::default(),
1158 &NodeState::mock(),
1159 )
1160 .await
1161 .to_qc2(),
1162 upgrade_certificate: None,
1163 view_change_evidence: None,
1164 next_drb_result: None,
1165 next_epoch_justify_qc: None,
1166 epoch: None,
1167 state_cert: None,
1168 },
1169 };
1170 let quorum_proposal_signature =
1171 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1172 .expect("Failed to sign quorum proposal");
1173 let quorum_proposal = Proposal {
1174 data: quorum_proposal,
1175 signature: quorum_proposal_signature,
1176 _pd: Default::default(),
1177 };
1178
1179 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1180 .expect("Failed to sign block payload");
1181 let da_proposal = Proposal {
1182 data: DaProposal2::<SeqTypes> {
1183 encoded_transactions: leaf_payload_bytes_arc,
1184 metadata: leaf_payload.ns_table().clone(),
1185 view_number: ViewNumber::new(0),
1186 epoch: None,
1187 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1188 },
1189 signature: block_payload_signature,
1190 _pd: Default::default(),
1191 };
1192
1193 storage
1194 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1195 .await
1196 .unwrap();
1197 storage.append_vid2(&vid_share).await.unwrap();
1198 storage
1199 .append_quorum_proposal2(&quorum_proposal)
1200 .await
1201 .unwrap();
1202
1203 storage
1205 .append_decided_leaves(ViewNumber::new(1), [], &NullEventConsumer)
1206 .await
1207 .unwrap();
1208
1209 assert_eq!(
1212 storage
1213 .load_da_proposal(ViewNumber::new(0))
1214 .await
1215 .unwrap()
1216 .unwrap(),
1217 da_proposal
1218 );
1219 assert_eq!(
1220 storage
1221 .load_vid_share(ViewNumber::new(0))
1222 .await
1223 .unwrap()
1224 .unwrap(),
1225 convert_proposal(vid_share)
1226 );
1227 assert_eq!(
1228 storage
1229 .load_quorum_proposal(ViewNumber::new(0))
1230 .await
1231 .unwrap(),
1232 quorum_proposal
1233 );
1234
1235 storage
1237 .append_decided_leaves(ViewNumber::new(2), [], &NullEventConsumer)
1238 .await
1239 .unwrap();
1240 assert!(storage
1241 .load_da_proposal(ViewNumber::new(0))
1242 .await
1243 .unwrap()
1244 .is_none());
1245 assert!(storage
1246 .load_vid_share(ViewNumber::new(0))
1247 .await
1248 .unwrap()
1249 .is_none());
1250 assert!(storage
1251 .load_quorum_proposal(ViewNumber::new(0))
1252 .await
1253 .is_err());
1254 }
1255
1256 async fn assert_events_eq<P: TestablePersistence>(
1257 persistence: &P,
1258 block: u64,
1259 stake_table_fetcher: &Fetcher,
1260 l1_client: &L1Client,
1261 stake_table_contract: Address,
1262 ) -> anyhow::Result<()> {
1263 let (stored_l1, events) = persistence.load_events(block).await?;
1265 assert!(!events.is_empty());
1266 assert!(stored_l1.is_some());
1267 assert!(events.iter().all(|((l1_block, _), _)| *l1_block <= block));
1268 let contract_events = Fetcher::fetch_events_from_contract(
1270 l1_client.clone(),
1271 stake_table_contract,
1272 None,
1273 block,
1274 )
1275 .await
1276 .sort_events()?;
1277 assert_eq!(
1278 contract_events, events,
1279 "Events from contract and persistence do not match"
1280 );
1281
1282 let fetched_events = stake_table_fetcher
1284 .fetch_events(stake_table_contract, block)
1285 .await?;
1286 assert_eq!(fetched_events, events);
1287
1288 Ok(())
1289 }
1290
1291 #[rstest_reuse::apply(persistence_types)]
1294 pub async fn test_stake_table_fetching_from_persistence<P: TestablePersistence>(
1295 #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1296 stake_table_version: StakeTableContractVersion,
1297 _p: PhantomData<P>,
1298 ) -> anyhow::Result<()> {
1299 let epoch_height = 20;
1300 type PosVersion = SequencerVersions<StaticVersion<0, 3>, StaticVersion<0, 0>>;
1301
1302 let network_config = TestConfigBuilder::default()
1303 .epoch_height(epoch_height)
1304 .build();
1305
1306 let anvil_provider = network_config.anvil().unwrap();
1307
1308 let query_service_port = pick_unused_port().expect("No ports free for query service");
1309 let query_api_options = Options::with_port(query_service_port);
1310
1311 const NODE_COUNT: usize = 2;
1312
1313 let storage = join_all((0..NODE_COUNT).map(|_| P::tmp_storage())).await;
1314 let persistence_options: [_; NODE_COUNT] = storage
1315 .iter()
1316 .map(P::options)
1317 .collect::<Vec<_>>()
1318 .try_into()
1319 .unwrap();
1320
1321 let persistence = persistence_options[0].clone().create().await.unwrap();
1322
1323 let l1_url = network_config.l1_url();
1325
1326 let testnet_config = TestNetworkConfigBuilder::with_num_nodes()
1327 .api_config(query_api_options)
1328 .network_config(network_config.clone())
1329 .persistences(persistence_options.clone())
1330 .pos_hook::<PosVersion>(DelegationConfig::MultipleDelegators, stake_table_version)
1331 .await
1332 .expect("Pos deployment failed")
1333 .build();
1334
1335 let test_network = TestNetwork::new(testnet_config, PosVersion::new()).await;
1337
1338 let client: Client<ServerError, SequencerApiVersion> = Client::new(
1339 format!("http://localhost:{query_service_port}")
1340 .parse()
1341 .unwrap(),
1342 );
1343 client.connect(None).await;
1344 tracing::info!(query_service_port, "server running");
1345
1346 let _initial_blocks = client
1348 .socket("availability/stream/blocks/0")
1349 .subscribe::<BlockQueryData<SeqTypes>>()
1350 .await
1351 .unwrap()
1352 .take(40)
1353 .try_collect::<Vec<_>>()
1354 .await
1355 .unwrap();
1356 let membership_coordinator = test_network
1358 .server
1359 .consensus()
1360 .read()
1361 .await
1362 .membership_coordinator
1363 .clone();
1364
1365 let l1_client = L1Client::new(vec![l1_url]).unwrap();
1366 let node_state = test_network.server.node_state();
1367 let chain_config = node_state.chain_config;
1368 let stake_table_contract = chain_config.stake_table_contract.unwrap();
1369
1370 let current_membership = membership_coordinator.membership();
1371 {
1372 let membership_state = current_membership.read().await;
1373 let stake_table_fetcher = membership_state.fetcher();
1374
1375 let block1 = anvil_provider
1376 .get_block_number()
1377 .await
1378 .expect("latest l1 block");
1379
1380 assert_events_eq(
1381 &persistence,
1382 block1,
1383 stake_table_fetcher,
1384 &l1_client,
1385 stake_table_contract,
1386 )
1387 .await?;
1388 }
1389 let _epoch_4_blocks = client
1390 .socket("availability/stream/blocks/0")
1391 .subscribe::<BlockQueryData<SeqTypes>>()
1392 .await
1393 .unwrap()
1394 .take(65)
1395 .try_collect::<Vec<_>>()
1396 .await
1397 .unwrap();
1398 let block2 = anvil_provider
1399 .get_block_number()
1400 .await
1401 .expect("latest l1 block");
1402
1403 {
1404 let membership_state = current_membership.read().await;
1405 let stake_table_fetcher = membership_state.fetcher();
1406
1407 assert_events_eq(
1408 &persistence,
1409 block2,
1410 stake_table_fetcher,
1411 &l1_client,
1412 stake_table_contract,
1413 )
1414 .await?;
1415 }
1416 Ok(())
1417 }
1418
1419 #[rstest_reuse::apply(persistence_types)]
1420 pub async fn test_stake_table_background_fetching<P: TestablePersistence>(
1421 #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1422 stake_table_version: StakeTableContractVersion,
1423 _p: PhantomData<P>,
1424 ) -> anyhow::Result<()> {
1425 use espresso_types::v0_3::ChainConfig;
1426 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1427
1428 let blocks_per_epoch = 10;
1429
1430 let network_config = TestConfigBuilder::<1>::default()
1431 .epoch_height(blocks_per_epoch)
1432 .build();
1433
1434 let anvil_provider = network_config.anvil().unwrap();
1435
1436 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1437 &network_config.hotshot_config().hotshot_stake_table(),
1438 STAKE_TABLE_CAPACITY_FOR_TEST,
1439 )
1440 .unwrap();
1441
1442 let (_, priv_keys): (Vec<_>, Vec<_>) = (0..200)
1443 .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed([1; 32], i as u64))
1444 .unzip();
1445 let state_key_pairs = (0..200)
1446 .map(|i| StateKeyPair::generate_from_seed_indexed([2; 32], i as u64))
1447 .collect::<Vec<_>>();
1448
1449 let validators = staking_priv_keys(&priv_keys, &state_key_pairs, 1000);
1450
1451 let deployer = ProviderBuilder::new()
1452 .wallet(EthereumWallet::from(network_config.signer().clone()))
1453 .on_http(network_config.l1_url().clone());
1454
1455 let mut contracts = Contracts::new();
1456 let args = DeployerArgsBuilder::default()
1457 .deployer(deployer.clone())
1458 .mock_light_client(true)
1459 .genesis_lc_state(genesis_state)
1460 .genesis_st_state(genesis_stake)
1461 .blocks_per_epoch(blocks_per_epoch)
1462 .epoch_start_block(1)
1463 .multisig_pauser(network_config.signer().address())
1464 .token_name("Espresso".to_string())
1465 .token_symbol("ESP".to_string())
1466 .initial_token_supply(U256::from(3590000000u64))
1467 .ops_timelock_delay(U256::from(0))
1468 .ops_timelock_admin(network_config.signer().address())
1469 .ops_timelock_proposers(vec![network_config.signer().address()])
1470 .ops_timelock_executors(vec![network_config.signer().address()])
1471 .safe_exit_timelock_delay(U256::from(10))
1472 .safe_exit_timelock_admin(network_config.signer().address())
1473 .safe_exit_timelock_proposers(vec![network_config.signer().address()])
1474 .safe_exit_timelock_executors(vec![network_config.signer().address()])
1475 .build()
1476 .unwrap();
1477
1478 match stake_table_version {
1479 StakeTableContractVersion::V1 => args.deploy_to_stake_table_v1(&mut contracts).await,
1480 StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1481 }
1482 .expect("contracts deployed");
1483
1484 let st_addr = contracts
1485 .address(Contract::StakeTableProxy)
1486 .expect("StakeTableProxy deployed");
1487 let l1_url = network_config.l1_url().clone();
1488
1489 anvil_provider
1491 .anvil_set_interval_mining(1)
1492 .await
1493 .expect("interval mining");
1494
1495 spawn({
1499 let l1_url = l1_url.clone();
1500 async move {
1501 {
1502 setup_stake_table_contract_for_test(
1503 l1_url,
1504 &deployer,
1505 st_addr,
1506 validators,
1507 DelegationConfig::MultipleDelegators,
1508 )
1509 .await
1510 .expect("stake table setup failed");
1511 }
1512 }
1513 });
1514
1515 let storage = P::tmp_storage().await;
1516 let persistence = P::options(&storage).create().await.unwrap();
1517
1518 let l1_client = L1ClientOptions {
1519 stake_table_update_interval: Duration::from_secs(7),
1520 l1_retry_delay: Duration::from_millis(10),
1521 l1_events_max_block_range: 10000,
1522 ..Default::default()
1523 }
1524 .connect(vec![l1_url])
1525 .unwrap();
1526 l1_client.spawn_tasks().await;
1527
1528 let fetcher = Fetcher::new(
1529 Arc::new(NullStateCatchup::default()),
1530 Arc::new(Mutex::new(persistence.clone())),
1531 l1_client.clone(),
1532 ChainConfig {
1533 stake_table_contract: Some(st_addr),
1534 base_fee: 0.into(),
1535 ..Default::default()
1536 },
1537 );
1538
1539 sleep(Duration::from_secs(20)).await;
1541
1542 fetcher.spawn_update_loop().await;
1543 let mut prev_l1_block = 0;
1544 let mut prev_events_len = 0;
1545 for _i in 0..10 {
1546 tokio::time::sleep(std::time::Duration::from_secs(8)).await;
1549
1550 let block = anvil_provider
1551 .get_block_number()
1552 .await
1553 .expect("latest l1 block");
1554
1555 let (read_offset, persisted_events) = persistence.load_events(block).await?;
1556 let read_offset = read_offset.unwrap();
1557 let l1_block = match read_offset {
1558 EventsPersistenceRead::Complete => block,
1559 EventsPersistenceRead::UntilL1Block(block) => block,
1560 };
1561
1562 tracing::info!("{l1_block:?}, persistence events = {persisted_events:?}.");
1563 assert!(persisted_events.len() > prev_events_len);
1564
1565 assert!(l1_block > prev_l1_block, "events not updated");
1566
1567 let contract_events =
1568 Fetcher::fetch_events_from_contract(l1_client.clone(), st_addr, None, l1_block)
1569 .await
1570 .sort_events()?;
1571 assert_eq!(persisted_events, contract_events);
1572
1573 prev_l1_block = l1_block;
1574 prev_events_len = persisted_events.len();
1575 }
1576
1577 Ok(())
1578 }
1579
1580 #[rstest_reuse::apply(persistence_types)]
1581 pub async fn test_membership_persistence<P: TestablePersistence>(
1582 _p: PhantomData<P>,
1583 ) -> anyhow::Result<()> {
1584 let tmp = P::tmp_storage().await;
1585 let mut opt = P::options(&tmp);
1586
1587 let storage = opt.create().await.unwrap();
1588
1589 let validator = Validator::mock();
1590 let mut st = IndexMap::new();
1591 st.insert(validator.account, validator);
1592
1593 storage
1594 .store_stake(EpochNumber::new(10), st.clone(), None, None)
1595 .await?;
1596
1597 let (table, ..) = storage.load_stake(EpochNumber::new(10)).await?.unwrap();
1598 assert_eq!(st, table);
1599
1600 let val2 = Validator::mock();
1601 let mut st2 = IndexMap::new();
1602 st2.insert(val2.account, val2);
1603 storage
1604 .store_stake(EpochNumber::new(11), st2.clone(), None, None)
1605 .await?;
1606
1607 let tables = storage.load_latest_stake(4).await?.unwrap();
1608 let mut iter = tables.iter();
1609 assert_eq!(
1610 Some(&(EpochNumber::new(11), (st2.clone(), None), None)),
1611 iter.next()
1612 );
1613 assert_eq!(Some(&(EpochNumber::new(10), (st, None), None)), iter.next());
1614 assert_eq!(None, iter.next());
1615
1616 for i in 0..=20 {
1617 storage
1618 .store_stake(EpochNumber::new(i), st2.clone(), None, None)
1619 .await?;
1620 }
1621
1622 let tables = storage.load_latest_stake(5).await?.unwrap();
1623 let mut iter = tables.iter();
1624 assert_eq!(
1625 Some(&(EpochNumber::new(20), (st2.clone(), None), None)),
1626 iter.next()
1627 );
1628 assert_eq!(
1629 Some(&(EpochNumber::new(19), (st2.clone(), None), None)),
1630 iter.next()
1631 );
1632 assert_eq!(
1633 Some(&(EpochNumber::new(18), (st2.clone(), None), None)),
1634 iter.next()
1635 );
1636 assert_eq!(
1637 Some(&(EpochNumber::new(17), (st2.clone(), None), None)),
1638 iter.next()
1639 );
1640 assert_eq!(
1641 Some(&(EpochNumber::new(16), (st2, None), None)),
1642 iter.next()
1643 );
1644 assert_eq!(None, iter.next());
1645
1646 Ok(())
1647 }
1648}