sequencer/
persistence.rs

1//! Sequencer node persistence.
2//!
3//! This module implements the persistence required for a sequencer node to rejoin the network and
4//! resume participating in consensus, in the event that its process crashes or is killed and loses
5//! all in-memory state.
6//!
7//! This is distinct from the query service persistent storage found in the `api` module, which is
8//! an extension that node operators can opt into. This module defines the minimum level of
9//! persistence which is _required_ to run a node.
10
11use async_trait::async_trait;
12use espresso_types::v0_3::ChainConfig;
13
14pub mod fs;
15pub mod no_storage;
16mod persistence_metrics;
17pub mod sql;
18
19#[async_trait]
20pub trait ChainConfigPersistence: Sized + Send + Sync {
21    async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()>;
22}
23
24#[cfg(test)]
25mod tests {
26    use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
27
28    use alloy::{
29        network::EthereumWallet,
30        primitives::{Address, U256},
31        providers::{ext::AnvilApi, Provider, ProviderBuilder},
32    };
33    use anyhow::bail;
34    use async_lock::{Mutex, RwLock};
35    use async_trait::async_trait;
36    use committable::{Commitment, Committable};
37    use espresso_contract_deployer::{
38        builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
39        Contract, Contracts,
40    };
41    use espresso_types::{
42        traits::{
43            EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer,
44            PersistenceOptions, SequencerPersistence,
45        },
46        v0_3::{Fetcher, Validator},
47        Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes,
48        SequencerVersions, ValidatedState,
49    };
50    use futures::{future::join_all, StreamExt, TryStreamExt};
51    use hotshot::{
52        types::{BLSPubKey, SignatureKey},
53        InitializerEpochInfo,
54    };
55    use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
56    use hotshot_example_types::node_types::TestVersions;
57    use hotshot_query_service::{availability::BlockQueryData, testing::mocks::MockVersions};
58    use hotshot_types::{
59        data::{
60            ns_table::parse_ns_table, vid_commitment, vid_disperse::VidDisperseShare2, DaProposal2,
61            EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment, VidDisperseShare,
62            ViewNumber,
63        },
64        event::{EventType, HotShotAction, LeafInfo},
65        light_client::StateKeyPair,
66        message::{convert_proposal, Proposal, UpgradeLock},
67        simple_certificate::{
68            NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
69        },
70        simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData},
71        traits::{
72            block_contents::BlockHeader,
73            node_implementation::{ConsensusTime, Versions},
74            EncodeBytes,
75        },
76        utils::EpochTransitionIndicator,
77        vid::avidm::{init_avidm_param, AvidMScheme},
78        vote::HasViewNumber,
79    };
80    use indexmap::IndexMap;
81    use portpicker::pick_unused_port;
82    use staking_cli::demo::{setup_stake_table_contract_for_test, DelegationConfig};
83    use surf_disco::Client;
84    use tide_disco::error::ServerError;
85    use tokio::{spawn, time::sleep};
86    use vbs::version::{StaticVersion, StaticVersionType, Version};
87
88    use crate::{
89        api::{
90            test_helpers::{TestNetwork, TestNetworkConfigBuilder, STAKE_TABLE_CAPACITY_FOR_TEST},
91            Options,
92        },
93        catchup::NullStateCatchup,
94        testing::{staking_priv_keys, TestConfigBuilder},
95        SequencerApiVersion, RECENT_STAKE_TABLES_LIMIT,
96    };
97
98    #[async_trait]
99    pub trait TestablePersistence: SequencerPersistence + MembershipPersistence {
100        type Storage: Sync;
101
102        async fn tmp_storage() -> Self::Storage;
103        fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self>;
104
105        async fn connect(storage: &Self::Storage) -> Self {
106            Self::options(storage).create().await.unwrap()
107        }
108    }
109
110    #[rstest_reuse::template]
111    #[rstest::rstest]
112    #[case(PhantomData::<crate::persistence::sql::Persistence>)]
113    #[case(PhantomData::<crate::persistence::fs::Persistence>)]
114    #[test_log::test(tokio::test(flavor = "multi_thread"))]
115    pub fn persistence_types<P: TestablePersistence>(#[case] _p: PhantomData<P>) {}
116
117    #[derive(Clone, Debug, Default)]
118    struct EventCollector {
119        events: Arc<RwLock<Vec<Event>>>,
120    }
121
122    impl EventCollector {
123        async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
124            self.events
125                .read()
126                .await
127                .iter()
128                .flat_map(|event| {
129                    let EventType::Decide { leaf_chain, .. } = &event.event else {
130                        panic!("expected decide event, got {event:?}");
131                    };
132                    leaf_chain.iter().cloned().rev()
133                })
134                .collect::<Vec<_>>()
135        }
136    }
137
138    #[async_trait]
139    impl EventConsumer for EventCollector {
140        async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
141            self.events.write().await.push(event.clone());
142            Ok(())
143        }
144    }
145
146    #[rstest_reuse::apply(persistence_types)]
147    pub async fn test_voted_view<P: TestablePersistence>(_p: PhantomData<P>) {
148        let tmp = P::tmp_storage().await;
149        let storage = P::connect(&tmp).await;
150
151        // Initially, there is no saved view.
152        assert_eq!(storage.load_latest_acted_view().await.unwrap(), None);
153
154        // Store a view.
155        let view1 = ViewNumber::genesis();
156        storage
157            .record_action(view1, None, HotShotAction::Vote)
158            .await
159            .unwrap();
160        assert_eq!(
161            storage.load_latest_acted_view().await.unwrap().unwrap(),
162            view1
163        );
164
165        // Store a newer view, make sure storage gets updated.
166        let view2 = view1 + 1;
167        storage
168            .record_action(view2, None, HotShotAction::Vote)
169            .await
170            .unwrap();
171        assert_eq!(
172            storage.load_latest_acted_view().await.unwrap().unwrap(),
173            view2
174        );
175
176        // Store an old view, make sure storage is unchanged.
177        storage
178            .record_action(view1, None, HotShotAction::Vote)
179            .await
180            .unwrap();
181        assert_eq!(
182            storage.load_latest_acted_view().await.unwrap().unwrap(),
183            view2
184        );
185    }
186
187    #[rstest_reuse::apply(persistence_types)]
188    pub async fn test_restart_view<P: TestablePersistence>(_p: PhantomData<P>) {
189        let tmp = P::tmp_storage().await;
190        let storage = P::connect(&tmp).await;
191
192        // Initially, there is no saved view.
193        assert_eq!(storage.load_restart_view().await.unwrap(), None);
194
195        // Store a view.
196        let view1 = ViewNumber::genesis();
197        storage
198            .record_action(view1, None, HotShotAction::Vote)
199            .await
200            .unwrap();
201        assert_eq!(
202            storage.load_restart_view().await.unwrap().unwrap(),
203            view1 + 1
204        );
205
206        // Store a newer view, make sure storage gets updated.
207        let view2 = view1 + 1;
208        storage
209            .record_action(view2, None, HotShotAction::Vote)
210            .await
211            .unwrap();
212        assert_eq!(
213            storage.load_restart_view().await.unwrap().unwrap(),
214            view2 + 1
215        );
216
217        // Store an old view, make sure storage is unchanged.
218        storage
219            .record_action(view1, None, HotShotAction::Vote)
220            .await
221            .unwrap();
222        assert_eq!(
223            storage.load_restart_view().await.unwrap().unwrap(),
224            view2 + 1
225        );
226
227        // store a higher proposed view, make sure storage is unchanged.
228        storage
229            .record_action(view2 + 1, None, HotShotAction::Propose)
230            .await
231            .unwrap();
232        assert_eq!(
233            storage.load_restart_view().await.unwrap().unwrap(),
234            view2 + 1
235        );
236
237        // store a higher timeout vote view, make sure storage is unchanged.
238        storage
239            .record_action(view2 + 1, None, HotShotAction::TimeoutVote)
240            .await
241            .unwrap();
242        assert_eq!(
243            storage.load_restart_view().await.unwrap().unwrap(),
244            view2 + 1
245        );
246    }
247
248    #[rstest_reuse::apply(persistence_types)]
249    pub async fn test_store_drb_input<P: TestablePersistence>(_p: PhantomData<P>) {
250        use hotshot_types::drb::DrbInput;
251
252        let tmp = P::tmp_storage().await;
253        let storage = P::connect(&tmp).await;
254        let difficulty_level = 10;
255
256        // Initially, there is no saved info.
257        if storage.load_drb_input(10).await.is_ok() {
258            panic!("unexpected nonempty drb_input");
259        }
260
261        let drb_input_1 = DrbInput {
262            epoch: 10,
263            iteration: 10,
264            value: [0u8; 32],
265            difficulty_level,
266        };
267
268        let drb_input_2 = DrbInput {
269            epoch: 10,
270            iteration: 20,
271            value: [0u8; 32],
272            difficulty_level,
273        };
274
275        let drb_input_3 = DrbInput {
276            epoch: 10,
277            iteration: 30,
278            value: [0u8; 32],
279            difficulty_level,
280        };
281
282        let _ = storage.store_drb_input(drb_input_1.clone()).await;
283
284        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_1);
285
286        let _ = storage.store_drb_input(drb_input_3.clone()).await;
287
288        // check that the drb input is overwritten
289        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
290
291        let _ = storage.store_drb_input(drb_input_2.clone()).await;
292
293        // check that the drb input is not overwritten by the older value
294        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
295    }
296
297    #[rstest_reuse::apply(persistence_types)]
298    pub async fn test_epoch_info<P: TestablePersistence>(_p: PhantomData<P>) {
299        let tmp = P::tmp_storage().await;
300        let storage = P::connect(&tmp).await;
301
302        // Initially, there is no saved info.
303        assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
304
305        // Store a drb result.
306        storage
307            .store_drb_result(EpochNumber::new(1), [1; 32])
308            .await
309            .unwrap();
310        assert_eq!(
311            storage.load_start_epoch_info().await.unwrap(),
312            vec![InitializerEpochInfo::<SeqTypes> {
313                epoch: EpochNumber::new(1),
314                drb_result: [1; 32],
315                block_header: None,
316            }]
317        );
318
319        // Store a second DRB result
320        storage
321            .store_drb_result(EpochNumber::new(2), [3; 32])
322            .await
323            .unwrap();
324        assert_eq!(
325            storage.load_start_epoch_info().await.unwrap(),
326            vec![
327                InitializerEpochInfo::<SeqTypes> {
328                    epoch: EpochNumber::new(1),
329                    drb_result: [1; 32],
330                    block_header: None,
331                },
332                InitializerEpochInfo::<SeqTypes> {
333                    epoch: EpochNumber::new(2),
334                    drb_result: [3; 32],
335                    block_header: None,
336                }
337            ]
338        );
339
340        // Make a header
341        let instance_state = NodeState::mock();
342        let validated_state = hotshot_types::traits::ValidatedState::genesis(&instance_state).0;
343        let leaf: Leaf2 = Leaf::genesis::<MockVersions>(&validated_state, &instance_state)
344            .await
345            .into();
346        let header = leaf.block_header().clone();
347
348        // Test storing the header
349        storage
350            .store_epoch_root(EpochNumber::new(1), header.clone())
351            .await
352            .unwrap();
353        assert_eq!(
354            storage.load_start_epoch_info().await.unwrap(),
355            vec![
356                InitializerEpochInfo::<SeqTypes> {
357                    epoch: EpochNumber::new(1),
358                    drb_result: [1; 32],
359                    block_header: Some(header.clone()),
360                },
361                InitializerEpochInfo::<SeqTypes> {
362                    epoch: EpochNumber::new(2),
363                    drb_result: [3; 32],
364                    block_header: None,
365                }
366            ]
367        );
368
369        // Store more than the limit
370        let total_epochs = RECENT_STAKE_TABLES_LIMIT + 10;
371        for i in 0..total_epochs {
372            let epoch = EpochNumber::new(i);
373            let drb = [i as u8; 32];
374            storage
375                .store_drb_result(epoch, drb)
376                .await
377                .unwrap_or_else(|_| panic!("Failed to store DRB result for epoch {i}"));
378        }
379
380        let results = storage.load_start_epoch_info().await.unwrap();
381
382        // Check that only the most recent RECENT_STAKE_TABLES_LIMIT epochs are returned
383        assert_eq!(
384            results.len(),
385            RECENT_STAKE_TABLES_LIMIT as usize,
386            "Should return only the most recent {RECENT_STAKE_TABLES_LIMIT} epochs",
387        );
388
389        for (i, info) in results.iter().enumerate() {
390            let expected_epoch =
391                EpochNumber::new(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64);
392            let expected_drb = [(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64) as u8; 32];
393            assert_eq!(info.epoch, expected_epoch, "invalid epoch at index {i}",);
394            assert_eq!(info.drb_result, expected_drb, "invalid DRB at index {i}",);
395            assert!(info.block_header.is_none(), "Expected no block header");
396        }
397    }
398
399    fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
400        LeafInfo {
401            leaf,
402            vid_share: None,
403            state: Default::default(),
404            delta: None,
405            state_cert: None,
406        }
407    }
408
409    #[rstest_reuse::apply(persistence_types)]
410    pub async fn test_append_and_decide<P: TestablePersistence>(_p: PhantomData<P>) {
411        let tmp = P::tmp_storage().await;
412        let storage = P::connect(&tmp).await;
413
414        // Test append VID
415        assert_eq!(
416            storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
417            None
418        );
419
420        let leaf: Leaf2 =
421            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
422        let leaf_payload = leaf.block_payload().unwrap();
423        let leaf_payload_bytes_arc = leaf_payload.encode();
424
425        let avidm_param = init_avidm_param(2).unwrap();
426        let weights = vec![1u32; 2];
427
428        let ns_table = parse_ns_table(
429            leaf_payload.byte_len().as_usize(),
430            &leaf_payload.ns_table().encode(),
431        );
432        let (payload_commitment, shares) =
433            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
434                .unwrap();
435
436        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
437        let signature = PubKey::sign(&privkey, &[]).unwrap();
438        let mut vid = VidDisperseShare2::<SeqTypes> {
439            view_number: ViewNumber::new(0),
440            payload_commitment,
441            share: shares[0].clone(),
442            recipient_key: pubkey,
443            epoch: Some(EpochNumber::new(0)),
444            target_epoch: Some(EpochNumber::new(0)),
445            common: avidm_param,
446        };
447        let mut quorum_proposal = Proposal {
448            data: QuorumProposalWrapper::<SeqTypes> {
449                proposal: QuorumProposal2::<SeqTypes> {
450                    epoch: None,
451                    block_header: leaf.block_header().clone(),
452                    view_number: ViewNumber::genesis(),
453                    justify_qc: QuorumCertificate2::genesis::<TestVersions>(
454                        &ValidatedState::default(),
455                        &NodeState::mock(),
456                    )
457                    .await,
458                    upgrade_certificate: None,
459                    view_change_evidence: None,
460                    next_drb_result: None,
461                    next_epoch_justify_qc: None,
462                    state_cert: None,
463                },
464            },
465            signature,
466            _pd: Default::default(),
467        };
468
469        let vid_share0 = vid.clone().to_proposal(&privkey).unwrap().clone();
470
471        storage.append_vid2(&vid_share0).await.unwrap();
472
473        assert_eq!(
474            storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
475            Some(convert_proposal(vid_share0.clone()))
476        );
477
478        vid.view_number = ViewNumber::new(1);
479
480        let vid_share1 = vid.clone().to_proposal(&privkey).unwrap().clone();
481        storage.append_vid2(&vid_share1).await.unwrap();
482
483        assert_eq!(
484            storage.load_vid_share(vid.view_number()).await.unwrap(),
485            Some(convert_proposal(vid_share1.clone()))
486        );
487
488        vid.view_number = ViewNumber::new(2);
489
490        let vid_share2 = vid.clone().to_proposal(&privkey).unwrap().clone();
491        storage.append_vid2(&vid_share2).await.unwrap();
492
493        assert_eq!(
494            storage.load_vid_share(vid.view_number()).await.unwrap(),
495            Some(convert_proposal(vid_share2.clone()))
496        );
497
498        vid.view_number = ViewNumber::new(3);
499
500        let vid_share3 = vid.clone().to_proposal(&privkey).unwrap().clone();
501        storage.append_vid2(&vid_share3).await.unwrap();
502
503        assert_eq!(
504            storage.load_vid_share(vid.view_number()).await.unwrap(),
505            Some(convert_proposal(vid_share3.clone()))
506        );
507
508        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
509            .expect("Failed to sign block payload");
510
511        let da_proposal_inner = DaProposal2::<SeqTypes> {
512            encoded_transactions: leaf_payload_bytes_arc.clone(),
513            metadata: leaf_payload.ns_table().clone(),
514            view_number: ViewNumber::new(0),
515            epoch: None,
516            epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
517        };
518
519        let da_proposal = Proposal {
520            data: da_proposal_inner,
521            signature: block_payload_signature,
522            _pd: Default::default(),
523        };
524
525        let vid_commitment = vid_commitment::<TestVersions>(
526            &leaf_payload_bytes_arc,
527            &leaf.block_header().metadata().encode(),
528            2,
529            <TestVersions as Versions>::Base::VERSION,
530        );
531
532        storage
533            .append_da2(&da_proposal, vid_commitment)
534            .await
535            .unwrap();
536
537        assert_eq!(
538            storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(),
539            Some(da_proposal.clone())
540        );
541
542        let mut da_proposal1 = da_proposal.clone();
543        da_proposal1.data.view_number = ViewNumber::new(1);
544        storage
545            .append_da2(&da_proposal1.clone(), vid_commitment)
546            .await
547            .unwrap();
548
549        assert_eq!(
550            storage
551                .load_da_proposal(da_proposal1.data.view_number)
552                .await
553                .unwrap(),
554            Some(da_proposal1.clone())
555        );
556
557        let mut da_proposal2 = da_proposal1.clone();
558        da_proposal2.data.view_number = ViewNumber::new(2);
559        storage
560            .append_da2(&da_proposal2.clone(), vid_commitment)
561            .await
562            .unwrap();
563
564        assert_eq!(
565            storage
566                .load_da_proposal(da_proposal2.data.view_number)
567                .await
568                .unwrap(),
569            Some(da_proposal2.clone())
570        );
571
572        let mut da_proposal3 = da_proposal2.clone();
573        da_proposal3.data.view_number = ViewNumber::new(3);
574        storage
575            .append_da2(&da_proposal3.clone(), vid_commitment)
576            .await
577            .unwrap();
578
579        assert_eq!(
580            storage
581                .load_da_proposal(da_proposal3.data.view_number)
582                .await
583                .unwrap(),
584            Some(da_proposal3.clone())
585        );
586
587        let quorum_proposal1 = quorum_proposal.clone();
588
589        storage
590            .append_quorum_proposal2(&quorum_proposal1)
591            .await
592            .unwrap();
593
594        assert_eq!(
595            storage.load_quorum_proposals().await.unwrap(),
596            BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())])
597        );
598
599        quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
600        let quorum_proposal2 = quorum_proposal.clone();
601        storage
602            .append_quorum_proposal2(&quorum_proposal2)
603            .await
604            .unwrap();
605
606        assert_eq!(
607            storage.load_quorum_proposals().await.unwrap(),
608            BTreeMap::from_iter([
609                (ViewNumber::genesis(), quorum_proposal1.clone()),
610                (ViewNumber::new(1), quorum_proposal2.clone())
611            ])
612        );
613
614        quorum_proposal.data.proposal.view_number = ViewNumber::new(2);
615        quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(1);
616        let quorum_proposal3 = quorum_proposal.clone();
617        storage
618            .append_quorum_proposal2(&quorum_proposal3)
619            .await
620            .unwrap();
621
622        assert_eq!(
623            storage.load_quorum_proposals().await.unwrap(),
624            BTreeMap::from_iter([
625                (ViewNumber::genesis(), quorum_proposal1.clone()),
626                (ViewNumber::new(1), quorum_proposal2.clone()),
627                (ViewNumber::new(2), quorum_proposal3.clone())
628            ])
629        );
630
631        quorum_proposal.data.proposal.view_number = ViewNumber::new(3);
632        quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(2);
633
634        // This one should stick around after GC runs.
635        let quorum_proposal4 = quorum_proposal.clone();
636        storage
637            .append_quorum_proposal2(&quorum_proposal4)
638            .await
639            .unwrap();
640
641        assert_eq!(
642            storage.load_quorum_proposals().await.unwrap(),
643            BTreeMap::from_iter([
644                (ViewNumber::genesis(), quorum_proposal1.clone()),
645                (ViewNumber::new(1), quorum_proposal2.clone()),
646                (ViewNumber::new(2), quorum_proposal3.clone()),
647                (ViewNumber::new(3), quorum_proposal4.clone())
648            ])
649        );
650
651        // Test decide and garbage collection. Pass in a leaf chain with no VID shares or payloads,
652        // so we have to fetch the missing data from storage.
653        let leaves = [
654            Leaf2::from_quorum_proposal(&quorum_proposal1.data),
655            Leaf2::from_quorum_proposal(&quorum_proposal2.data),
656            Leaf2::from_quorum_proposal(&quorum_proposal3.data),
657            Leaf2::from_quorum_proposal(&quorum_proposal4.data),
658        ];
659        let mut final_qc = leaves[3].justify_qc();
660        final_qc.view_number += 1;
661        final_qc.data.leaf_commit = Committable::commit(&leaf);
662        let qcs = [
663            leaves[1].justify_qc(),
664            leaves[2].justify_qc(),
665            leaves[3].justify_qc(),
666            final_qc,
667        ];
668
669        assert_eq!(
670            storage.load_anchor_view().await.unwrap(),
671            ViewNumber::genesis()
672        );
673
674        let consumer = EventCollector::default();
675        let leaf_chain = leaves
676            .iter()
677            .take(3)
678            .map(|leaf| leaf_info(leaf.clone()))
679            .zip(&qcs)
680            .collect::<Vec<_>>();
681        tracing::info!(?leaf_chain, "decide view 2");
682        storage
683            .append_decided_leaves(
684                ViewNumber::new(2),
685                leaf_chain.iter().map(|(leaf, qc)| (leaf, (*qc).clone())),
686                &consumer,
687            )
688            .await
689            .unwrap();
690        assert_eq!(
691            storage.load_anchor_view().await.unwrap(),
692            ViewNumber::new(2)
693        );
694
695        for i in 0..=2 {
696            assert_eq!(
697                storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(),
698                None
699            );
700
701            assert_eq!(
702                storage.load_vid_share(ViewNumber::new(i)).await.unwrap(),
703                None
704            );
705        }
706
707        assert_eq!(
708            storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
709            Some(da_proposal3)
710        );
711
712        assert_eq!(
713            storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
714            Some(convert_proposal(vid_share3.clone()))
715        );
716
717        let proposals = storage.load_quorum_proposals().await.unwrap();
718        assert_eq!(
719            proposals,
720            BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)])
721        );
722
723        // A decide event should have been processed.
724        for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
725            assert_eq!(info.leaf, *leaf);
726            let decided_vid_share = info.vid_share.as_ref().unwrap();
727            let view_number = match decided_vid_share {
728                VidDisperseShare::V0(share) => share.view_number,
729                VidDisperseShare::V1(share) => share.view_number,
730            };
731            assert_eq!(view_number, leaf.view_number());
732        }
733
734        // The decided leaf should not have been garbage collected.
735        assert_eq!(
736            storage.load_anchor_leaf().await.unwrap(),
737            Some((leaves[2].clone(), qcs[2].clone()))
738        );
739        assert_eq!(
740            storage.load_anchor_view().await.unwrap(),
741            leaves[2].view_number()
742        );
743
744        // Process a second decide event.
745        let consumer = EventCollector::default();
746        tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3");
747        storage
748            .append_decided_leaves(
749                ViewNumber::new(3),
750                vec![(&leaf_info(leaves[3].clone()), qcs[3].clone())],
751                &consumer,
752            )
753            .await
754            .unwrap();
755        assert_eq!(
756            storage.load_anchor_view().await.unwrap(),
757            ViewNumber::new(3)
758        );
759
760        // A decide event should have been processed.
761        let events = consumer.events.read().await;
762        assert_eq!(events.len(), 1);
763        assert_eq!(events[0].view_number, ViewNumber::new(3));
764        let EventType::Decide { qc, leaf_chain, .. } = &events[0].event else {
765            panic!("expected decide event, got {:?}", events[0]);
766        };
767        assert_eq!(**qc, qcs[3]);
768        assert_eq!(leaf_chain.len(), 1);
769        let info = &leaf_chain[0];
770        assert_eq!(info.leaf, leaves[3]);
771
772        // The remaining data should have been GCed.
773        assert_eq!(
774            storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
775            None
776        );
777
778        assert_eq!(
779            storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
780            None
781        );
782        assert_eq!(
783            storage.load_quorum_proposals().await.unwrap(),
784            BTreeMap::new()
785        );
786    }
787
788    #[rstest_reuse::apply(persistence_types)]
789    pub async fn test_upgrade_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
790        let tmp = P::tmp_storage().await;
791        let storage = P::connect(&tmp).await;
792
793        // Test get upgrade certificate
794        assert_eq!(storage.load_upgrade_certificate().await.unwrap(), None);
795
796        let upgrade_data = UpgradeProposalData {
797            old_version: Version { major: 0, minor: 1 },
798            new_version: Version { major: 1, minor: 0 },
799            decide_by: ViewNumber::genesis(),
800            new_version_hash: Default::default(),
801            old_version_last_view: ViewNumber::genesis(),
802            new_version_first_view: ViewNumber::genesis(),
803        };
804
805        let decide_upgrade_certificate = UpgradeCertificate::<SeqTypes>::new(
806            upgrade_data.clone(),
807            upgrade_data.commit(),
808            ViewNumber::genesis(),
809            Default::default(),
810            Default::default(),
811        );
812        let res = storage
813            .store_upgrade_certificate(Some(decide_upgrade_certificate.clone()))
814            .await;
815        assert!(res.is_ok());
816
817        let res = storage.load_upgrade_certificate().await.unwrap();
818        let view_number = res.unwrap().view_number;
819        assert_eq!(view_number, ViewNumber::genesis());
820
821        let new_view_number_for_certificate = ViewNumber::new(50);
822        let mut new_upgrade_certificate = decide_upgrade_certificate.clone();
823        new_upgrade_certificate.view_number = new_view_number_for_certificate;
824
825        let res = storage
826            .store_upgrade_certificate(Some(new_upgrade_certificate.clone()))
827            .await;
828        assert!(res.is_ok());
829
830        let res = storage.load_upgrade_certificate().await.unwrap();
831        let view_number = res.unwrap().view_number;
832        assert_eq!(view_number, new_view_number_for_certificate);
833    }
834
835    #[rstest_reuse::apply(persistence_types)]
836    pub async fn test_next_epoch_quorum_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
837        let tmp = P::tmp_storage().await;
838        let storage = P::connect(&tmp).await;
839
840        //  test that next epoch qc2 does not exist
841        assert_eq!(
842            storage.load_next_epoch_quorum_certificate().await.unwrap(),
843            None
844        );
845
846        let upgrade_lock = UpgradeLock::<SeqTypes, TestVersions>::new();
847
848        let genesis_view = ViewNumber::genesis();
849
850        let leaf =
851            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::default()).await;
852        let data: NextEpochQuorumData2<SeqTypes> = QuorumData2 {
853            leaf_commit: leaf.commit(),
854            epoch: Some(EpochNumber::new(1)),
855            block_number: Some(leaf.height()),
856        }
857        .into();
858
859        let versioned_data =
860            VersionedVoteData::new_infallible(data.clone(), genesis_view, &upgrade_lock).await;
861
862        let bytes: [u8; 32] = versioned_data.commit().into();
863
864        let next_epoch_qc = NextEpochQuorumCertificate2::new(
865            data,
866            Commitment::from_raw(bytes),
867            genesis_view,
868            None,
869            PhantomData,
870        );
871
872        let res = storage
873            .store_next_epoch_quorum_certificate(next_epoch_qc.clone())
874            .await;
875        assert!(res.is_ok());
876
877        let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
878        let view_number = res.unwrap().view_number;
879        assert_eq!(view_number, ViewNumber::genesis());
880
881        let new_view_number_for_qc = ViewNumber::new(50);
882        let mut new_qc = next_epoch_qc.clone();
883        new_qc.view_number = new_view_number_for_qc;
884
885        let res = storage
886            .store_next_epoch_quorum_certificate(new_qc.clone())
887            .await;
888        assert!(res.is_ok());
889
890        let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
891        let view_number = res.unwrap().view_number;
892        assert_eq!(view_number, new_view_number_for_qc);
893    }
894
895    #[rstest_reuse::apply(persistence_types)]
896    pub async fn test_decide_with_failing_event_consumer<P: TestablePersistence>(
897        _p: PhantomData<P>,
898    ) {
899        #[derive(Clone, Copy, Debug)]
900        struct FailConsumer;
901
902        #[async_trait]
903        impl EventConsumer for FailConsumer {
904            async fn handle_event(&self, _: &Event) -> anyhow::Result<()> {
905                bail!("mock error injection");
906            }
907        }
908
909        let tmp = P::tmp_storage().await;
910        let storage = P::connect(&tmp).await;
911
912        // Create a short blockchain.
913        let mut chain = vec![];
914
915        let leaf: Leaf2 =
916            Leaf::genesis::<MockVersions>(&ValidatedState::default(), &NodeState::mock())
917                .await
918                .into();
919        let leaf_payload = leaf.block_payload().unwrap();
920        let leaf_payload_bytes_arc = leaf_payload.encode();
921        let avidm_param = init_avidm_param(2).unwrap();
922        let weights = vec![1u32; 2];
923        let ns_table = parse_ns_table(
924            leaf_payload.byte_len().as_usize(),
925            &leaf_payload.ns_table().encode(),
926        );
927        let (payload_commitment, shares) =
928            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
929                .unwrap();
930
931        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
932        let mut vid = VidDisperseShare2::<SeqTypes> {
933            view_number: ViewNumber::new(0),
934            payload_commitment,
935            share: shares[0].clone(),
936            recipient_key: pubkey,
937            epoch: Some(EpochNumber::new(0)),
938            target_epoch: Some(EpochNumber::new(0)),
939            common: avidm_param,
940        }
941        .to_proposal(&privkey)
942        .unwrap()
943        .clone();
944        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
945            proposal: QuorumProposal2::<SeqTypes> {
946                block_header: leaf.block_header().clone(),
947                view_number: ViewNumber::genesis(),
948                justify_qc: QuorumCertificate::genesis::<TestVersions>(
949                    &ValidatedState::default(),
950                    &NodeState::mock(),
951                )
952                .await
953                .to_qc2(),
954                upgrade_certificate: None,
955                view_change_evidence: None,
956                next_drb_result: None,
957                next_epoch_justify_qc: None,
958                epoch: None,
959                state_cert: None,
960            },
961        };
962        let mut qc = QuorumCertificate2::genesis::<TestVersions>(
963            &ValidatedState::default(),
964            &NodeState::mock(),
965        )
966        .await;
967
968        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
969            .expect("Failed to sign block payload");
970        let mut da_proposal = Proposal {
971            data: DaProposal2::<SeqTypes> {
972                encoded_transactions: leaf_payload_bytes_arc.clone(),
973                metadata: leaf_payload.ns_table().clone(),
974                view_number: ViewNumber::new(0),
975                epoch: Some(EpochNumber::new(0)),
976                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
977            },
978            signature: block_payload_signature,
979            _pd: Default::default(),
980        };
981
982        let vid_commitment = vid_commitment::<TestVersions>(
983            &leaf_payload_bytes_arc,
984            &leaf.block_header().metadata().encode(),
985            2,
986            <TestVersions as Versions>::Base::VERSION,
987        );
988
989        for i in 0..4 {
990            quorum_proposal.proposal.view_number = ViewNumber::new(i);
991            let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
992            qc.view_number = leaf.view_number();
993            qc.data.leaf_commit = Committable::commit(&leaf);
994            vid.data.view_number = leaf.view_number();
995            da_proposal.data.view_number = leaf.view_number();
996            chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone()));
997        }
998
999        // Add proposals.
1000        for (_, _, vid, da) in &chain {
1001            tracing::info!(?da, ?vid, "insert proposal");
1002            storage.append_da2(da, vid_commitment).await.unwrap();
1003            storage.append_vid2(vid).await.unwrap();
1004        }
1005
1006        // Decide 2 leaves, but fail in event processing.
1007        let leaf_chain = chain
1008            .iter()
1009            .take(2)
1010            .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1011            .collect::<Vec<_>>();
1012        tracing::info!("decide with event handling failure");
1013        storage
1014            .append_decided_leaves(
1015                ViewNumber::new(1),
1016                leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
1017                &FailConsumer,
1018            )
1019            .await
1020            .unwrap();
1021        // No garbage collection should have run.
1022        for i in 0..4 {
1023            tracing::info!(i, "check proposal availability");
1024            assert!(storage
1025                .load_vid_share(ViewNumber::new(i))
1026                .await
1027                .unwrap()
1028                .is_some());
1029            assert!(storage
1030                .load_da_proposal(ViewNumber::new(i))
1031                .await
1032                .unwrap()
1033                .is_some());
1034        }
1035        tracing::info!("check anchor leaf updated");
1036        assert_eq!(
1037            storage
1038                .load_anchor_leaf()
1039                .await
1040                .unwrap()
1041                .unwrap()
1042                .0
1043                .view_number(),
1044            ViewNumber::new(1)
1045        );
1046        assert_eq!(
1047            storage.load_anchor_view().await.unwrap(),
1048            ViewNumber::new(1)
1049        );
1050
1051        // Now decide remaining leaves successfully. We should now garbage collect and process a
1052        // decide event for all the leaves.
1053        let consumer = EventCollector::default();
1054        let leaf_chain = chain
1055            .iter()
1056            .skip(2)
1057            .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1058            .collect::<Vec<_>>();
1059        tracing::info!("decide successfully");
1060        storage
1061            .append_decided_leaves(
1062                ViewNumber::new(3),
1063                leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
1064                &consumer,
1065            )
1066            .await
1067            .unwrap();
1068        // Garbage collection should have run.
1069        for i in 0..4 {
1070            tracing::info!(i, "check proposal garbage collected");
1071            assert!(storage
1072                .load_vid_share(ViewNumber::new(i))
1073                .await
1074                .unwrap()
1075                .is_none());
1076            assert!(storage
1077                .load_da_proposal(ViewNumber::new(i))
1078                .await
1079                .unwrap()
1080                .is_none());
1081        }
1082        tracing::info!("check anchor leaf updated");
1083        assert_eq!(
1084            storage
1085                .load_anchor_leaf()
1086                .await
1087                .unwrap()
1088                .unwrap()
1089                .0
1090                .view_number(),
1091            ViewNumber::new(3)
1092        );
1093        assert_eq!(
1094            storage.load_anchor_view().await.unwrap(),
1095            ViewNumber::new(3)
1096        );
1097
1098        // Check decide event.
1099        tracing::info!("check decide event");
1100        let leaf_chain = consumer.leaf_chain().await;
1101        assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
1102        for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) {
1103            assert_eq!(info.leaf, *leaf);
1104            let decided_vid_share = info.vid_share.as_ref().unwrap();
1105            let view_number = match decided_vid_share {
1106                VidDisperseShare::V0(share) => share.view_number,
1107                VidDisperseShare::V1(share) => share.view_number,
1108            };
1109            assert_eq!(view_number, leaf.view_number());
1110            assert!(info.leaf.block_payload().is_some());
1111        }
1112    }
1113
1114    #[rstest_reuse::apply(persistence_types)]
1115    pub async fn test_pruning<P: TestablePersistence>(_p: PhantomData<P>) {
1116        let tmp = P::tmp_storage().await;
1117
1118        let mut options = P::options(&tmp);
1119        options.set_view_retention(1);
1120        let storage = options.create().await.unwrap();
1121
1122        // Add some "old" data, from view 0.
1123        let leaf =
1124            Leaf::genesis::<MockVersions>(&ValidatedState::default(), &NodeState::mock()).await;
1125        let leaf_payload = leaf.block_payload().unwrap();
1126        let leaf_payload_bytes_arc = leaf_payload.encode();
1127        let avidm_param = init_avidm_param(2).unwrap();
1128        let weights = vec![1u32; 2];
1129
1130        let ns_table = parse_ns_table(
1131            leaf_payload.byte_len().as_usize(),
1132            &leaf_payload.ns_table().encode(),
1133        );
1134        let (payload_commitment, shares) =
1135            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1136                .unwrap();
1137
1138        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1139        let vid_share = VidDisperseShare2::<SeqTypes> {
1140            view_number: ViewNumber::new(0),
1141            payload_commitment,
1142            share: shares[0].clone(),
1143            recipient_key: pubkey,
1144            epoch: None,
1145            target_epoch: None,
1146            common: avidm_param,
1147        }
1148        .to_proposal(&privkey)
1149        .unwrap()
1150        .clone();
1151
1152        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1153            proposal: QuorumProposal2::<SeqTypes> {
1154                block_header: leaf.block_header().clone(),
1155                view_number: ViewNumber::genesis(),
1156                justify_qc: QuorumCertificate::genesis::<TestVersions>(
1157                    &ValidatedState::default(),
1158                    &NodeState::mock(),
1159                )
1160                .await
1161                .to_qc2(),
1162                upgrade_certificate: None,
1163                view_change_evidence: None,
1164                next_drb_result: None,
1165                next_epoch_justify_qc: None,
1166                epoch: None,
1167                state_cert: None,
1168            },
1169        };
1170        let quorum_proposal_signature =
1171            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1172                .expect("Failed to sign quorum proposal");
1173        let quorum_proposal = Proposal {
1174            data: quorum_proposal,
1175            signature: quorum_proposal_signature,
1176            _pd: Default::default(),
1177        };
1178
1179        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1180            .expect("Failed to sign block payload");
1181        let da_proposal = Proposal {
1182            data: DaProposal2::<SeqTypes> {
1183                encoded_transactions: leaf_payload_bytes_arc,
1184                metadata: leaf_payload.ns_table().clone(),
1185                view_number: ViewNumber::new(0),
1186                epoch: None,
1187                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1188            },
1189            signature: block_payload_signature,
1190            _pd: Default::default(),
1191        };
1192
1193        storage
1194            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1195            .await
1196            .unwrap();
1197        storage.append_vid2(&vid_share).await.unwrap();
1198        storage
1199            .append_quorum_proposal2(&quorum_proposal)
1200            .await
1201            .unwrap();
1202
1203        // Decide a newer view, view 1.
1204        storage
1205            .append_decided_leaves(ViewNumber::new(1), [], &NullEventConsumer)
1206            .await
1207            .unwrap();
1208
1209        // The old data is not more than the retention period (1 view) old, so it should not be
1210        // GCed.
1211        assert_eq!(
1212            storage
1213                .load_da_proposal(ViewNumber::new(0))
1214                .await
1215                .unwrap()
1216                .unwrap(),
1217            da_proposal
1218        );
1219        assert_eq!(
1220            storage
1221                .load_vid_share(ViewNumber::new(0))
1222                .await
1223                .unwrap()
1224                .unwrap(),
1225            convert_proposal(vid_share)
1226        );
1227        assert_eq!(
1228            storage
1229                .load_quorum_proposal(ViewNumber::new(0))
1230                .await
1231                .unwrap(),
1232            quorum_proposal
1233        );
1234
1235        // Decide an even newer view, triggering GC of the old data.
1236        storage
1237            .append_decided_leaves(ViewNumber::new(2), [], &NullEventConsumer)
1238            .await
1239            .unwrap();
1240        assert!(storage
1241            .load_da_proposal(ViewNumber::new(0))
1242            .await
1243            .unwrap()
1244            .is_none());
1245        assert!(storage
1246            .load_vid_share(ViewNumber::new(0))
1247            .await
1248            .unwrap()
1249            .is_none());
1250        assert!(storage
1251            .load_quorum_proposal(ViewNumber::new(0))
1252            .await
1253            .is_err());
1254    }
1255
1256    async fn assert_events_eq<P: TestablePersistence>(
1257        persistence: &P,
1258        block: u64,
1259        stake_table_fetcher: &Fetcher,
1260        l1_client: &L1Client,
1261        stake_table_contract: Address,
1262    ) -> anyhow::Result<()> {
1263        // Load persisted events
1264        let (stored_l1, events) = persistence.load_events(block).await?;
1265        assert!(!events.is_empty());
1266        assert!(stored_l1.is_some());
1267        assert!(events.iter().all(|((l1_block, _), _)| *l1_block <= block));
1268        // Fetch events directly from the contract and compare with persisted data
1269        let contract_events = Fetcher::fetch_events_from_contract(
1270            l1_client.clone(),
1271            stake_table_contract,
1272            None,
1273            block,
1274        )
1275        .await
1276        .sort_events()?;
1277        assert_eq!(
1278            contract_events, events,
1279            "Events from contract and persistence do not match"
1280        );
1281
1282        // Fetch events from stake table fetcher and compare with persisted data
1283        let fetched_events = stake_table_fetcher
1284            .fetch_events(stake_table_contract, block)
1285            .await?;
1286        assert_eq!(fetched_events, events);
1287
1288        Ok(())
1289    }
1290
1291    // test for validating stake table event fetching from persistence,
1292    // ensuring that persisted data matches the on-chain events and that event fetcher work correctly.
1293    #[rstest_reuse::apply(persistence_types)]
1294    pub async fn test_stake_table_fetching_from_persistence<P: TestablePersistence>(
1295        #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1296        stake_table_version: StakeTableContractVersion,
1297        _p: PhantomData<P>,
1298    ) -> anyhow::Result<()> {
1299        let epoch_height = 20;
1300        type PosVersion = SequencerVersions<StaticVersion<0, 3>, StaticVersion<0, 0>>;
1301
1302        let network_config = TestConfigBuilder::default()
1303            .epoch_height(epoch_height)
1304            .build();
1305
1306        let anvil_provider = network_config.anvil().unwrap();
1307
1308        let query_service_port = pick_unused_port().expect("No ports free for query service");
1309        let query_api_options = Options::with_port(query_service_port);
1310
1311        const NODE_COUNT: usize = 2;
1312
1313        let storage = join_all((0..NODE_COUNT).map(|_| P::tmp_storage())).await;
1314        let persistence_options: [_; NODE_COUNT] = storage
1315            .iter()
1316            .map(P::options)
1317            .collect::<Vec<_>>()
1318            .try_into()
1319            .unwrap();
1320
1321        let persistence = persistence_options[0].clone().create().await.unwrap();
1322
1323        // Build the config with PoS hook
1324        let l1_url = network_config.l1_url();
1325
1326        let testnet_config = TestNetworkConfigBuilder::with_num_nodes()
1327            .api_config(query_api_options)
1328            .network_config(network_config.clone())
1329            .persistences(persistence_options.clone())
1330            .pos_hook::<PosVersion>(DelegationConfig::MultipleDelegators, stake_table_version)
1331            .await
1332            .expect("Pos deployment failed")
1333            .build();
1334
1335        //start the network
1336        let test_network = TestNetwork::new(testnet_config, PosVersion::new()).await;
1337
1338        let client: Client<ServerError, SequencerApiVersion> = Client::new(
1339            format!("http://localhost:{query_service_port}")
1340                .parse()
1341                .unwrap(),
1342        );
1343        client.connect(None).await;
1344        tracing::info!(query_service_port, "server running");
1345
1346        // wait until we enter in epoch 3
1347        let _initial_blocks = client
1348            .socket("availability/stream/blocks/0")
1349            .subscribe::<BlockQueryData<SeqTypes>>()
1350            .await
1351            .unwrap()
1352            .take(40)
1353            .try_collect::<Vec<_>>()
1354            .await
1355            .unwrap();
1356        // Load initial persisted events and validate they exist.
1357        let membership_coordinator = test_network
1358            .server
1359            .consensus()
1360            .read()
1361            .await
1362            .membership_coordinator
1363            .clone();
1364
1365        let l1_client = L1Client::new(vec![l1_url]).unwrap();
1366        let node_state = test_network.server.node_state();
1367        let chain_config = node_state.chain_config;
1368        let stake_table_contract = chain_config.stake_table_contract.unwrap();
1369
1370        let current_membership = membership_coordinator.membership();
1371        {
1372            let membership_state = current_membership.read().await;
1373            let stake_table_fetcher = membership_state.fetcher();
1374
1375            let block1 = anvil_provider
1376                .get_block_number()
1377                .await
1378                .expect("latest l1 block");
1379
1380            assert_events_eq(
1381                &persistence,
1382                block1,
1383                stake_table_fetcher,
1384                &l1_client,
1385                stake_table_contract,
1386            )
1387            .await?;
1388        }
1389        let _epoch_4_blocks = client
1390            .socket("availability/stream/blocks/0")
1391            .subscribe::<BlockQueryData<SeqTypes>>()
1392            .await
1393            .unwrap()
1394            .take(65)
1395            .try_collect::<Vec<_>>()
1396            .await
1397            .unwrap();
1398        let block2 = anvil_provider
1399            .get_block_number()
1400            .await
1401            .expect("latest l1 block");
1402
1403        {
1404            let membership_state = current_membership.read().await;
1405            let stake_table_fetcher = membership_state.fetcher();
1406
1407            assert_events_eq(
1408                &persistence,
1409                block2,
1410                stake_table_fetcher,
1411                &l1_client,
1412                stake_table_contract,
1413            )
1414            .await?;
1415        }
1416        Ok(())
1417    }
1418
1419    #[rstest_reuse::apply(persistence_types)]
1420    pub async fn test_stake_table_background_fetching<P: TestablePersistence>(
1421        #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1422        stake_table_version: StakeTableContractVersion,
1423        _p: PhantomData<P>,
1424    ) -> anyhow::Result<()> {
1425        use espresso_types::v0_3::ChainConfig;
1426        use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1427
1428        let blocks_per_epoch = 10;
1429
1430        let network_config = TestConfigBuilder::<1>::default()
1431            .epoch_height(blocks_per_epoch)
1432            .build();
1433
1434        let anvil_provider = network_config.anvil().unwrap();
1435
1436        let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1437            &network_config.hotshot_config().hotshot_stake_table(),
1438            STAKE_TABLE_CAPACITY_FOR_TEST,
1439        )
1440        .unwrap();
1441
1442        let (_, priv_keys): (Vec<_>, Vec<_>) = (0..200)
1443            .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed([1; 32], i as u64))
1444            .unzip();
1445        let state_key_pairs = (0..200)
1446            .map(|i| StateKeyPair::generate_from_seed_indexed([2; 32], i as u64))
1447            .collect::<Vec<_>>();
1448
1449        let validators = staking_priv_keys(&priv_keys, &state_key_pairs, 1000);
1450
1451        let deployer = ProviderBuilder::new()
1452            .wallet(EthereumWallet::from(network_config.signer().clone()))
1453            .on_http(network_config.l1_url().clone());
1454
1455        let mut contracts = Contracts::new();
1456        let args = DeployerArgsBuilder::default()
1457            .deployer(deployer.clone())
1458            .mock_light_client(true)
1459            .genesis_lc_state(genesis_state)
1460            .genesis_st_state(genesis_stake)
1461            .blocks_per_epoch(blocks_per_epoch)
1462            .epoch_start_block(1)
1463            .multisig_pauser(network_config.signer().address())
1464            .token_name("Espresso".to_string())
1465            .token_symbol("ESP".to_string())
1466            .initial_token_supply(U256::from(3590000000u64))
1467            .ops_timelock_delay(U256::from(0))
1468            .ops_timelock_admin(network_config.signer().address())
1469            .ops_timelock_proposers(vec![network_config.signer().address()])
1470            .ops_timelock_executors(vec![network_config.signer().address()])
1471            .safe_exit_timelock_delay(U256::from(10))
1472            .safe_exit_timelock_admin(network_config.signer().address())
1473            .safe_exit_timelock_proposers(vec![network_config.signer().address()])
1474            .safe_exit_timelock_executors(vec![network_config.signer().address()])
1475            .build()
1476            .unwrap();
1477
1478        match stake_table_version {
1479            StakeTableContractVersion::V1 => args.deploy_to_stake_table_v1(&mut contracts).await,
1480            StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1481        }
1482        .expect("contracts deployed");
1483
1484        let st_addr = contracts
1485            .address(Contract::StakeTableProxy)
1486            .expect("StakeTableProxy deployed");
1487        let l1_url = network_config.l1_url().clone();
1488
1489        // new block every 1s
1490        anvil_provider
1491            .anvil_set_interval_mining(1)
1492            .await
1493            .expect("interval mining");
1494
1495        // spawn a separate task
1496        // this is going to keep registering validators and multiple delegators
1497        // the interval mining is set to 1s so each transaction finalization would take atleast 1s
1498        spawn({
1499            let l1_url = l1_url.clone();
1500            async move {
1501                {
1502                    setup_stake_table_contract_for_test(
1503                        l1_url,
1504                        &deployer,
1505                        st_addr,
1506                        validators,
1507                        DelegationConfig::MultipleDelegators,
1508                    )
1509                    .await
1510                    .expect("stake table setup failed");
1511                }
1512            }
1513        });
1514
1515        let storage = P::tmp_storage().await;
1516        let persistence = P::options(&storage).create().await.unwrap();
1517
1518        let l1_client = L1ClientOptions {
1519            stake_table_update_interval: Duration::from_secs(7),
1520            l1_retry_delay: Duration::from_millis(10),
1521            l1_events_max_block_range: 10000,
1522            ..Default::default()
1523        }
1524        .connect(vec![l1_url])
1525        .unwrap();
1526        l1_client.spawn_tasks().await;
1527
1528        let fetcher = Fetcher::new(
1529            Arc::new(NullStateCatchup::default()),
1530            Arc::new(Mutex::new(persistence.clone())),
1531            l1_client.clone(),
1532            ChainConfig {
1533                stake_table_contract: Some(st_addr),
1534                base_fee: 0.into(),
1535                ..Default::default()
1536            },
1537        );
1538
1539        // sleep so that we have enough events
1540        sleep(Duration::from_secs(20)).await;
1541
1542        fetcher.spawn_update_loop().await;
1543        let mut prev_l1_block = 0;
1544        let mut prev_events_len = 0;
1545        for _i in 0..10 {
1546            // Wait for more than update interval to assert that persistence was updated
1547            // L1 update interval is 7s in this test
1548            tokio::time::sleep(std::time::Duration::from_secs(8)).await;
1549
1550            let block = anvil_provider
1551                .get_block_number()
1552                .await
1553                .expect("latest l1 block");
1554
1555            let (read_offset, persisted_events) = persistence.load_events(block).await?;
1556            let read_offset = read_offset.unwrap();
1557            let l1_block = match read_offset {
1558                EventsPersistenceRead::Complete => block,
1559                EventsPersistenceRead::UntilL1Block(block) => block,
1560            };
1561
1562            tracing::info!("{l1_block:?}, persistence events = {persisted_events:?}.");
1563            assert!(persisted_events.len() > prev_events_len);
1564
1565            assert!(l1_block > prev_l1_block, "events not updated");
1566
1567            let contract_events =
1568                Fetcher::fetch_events_from_contract(l1_client.clone(), st_addr, None, l1_block)
1569                    .await
1570                    .sort_events()?;
1571            assert_eq!(persisted_events, contract_events);
1572
1573            prev_l1_block = l1_block;
1574            prev_events_len = persisted_events.len();
1575        }
1576
1577        Ok(())
1578    }
1579
1580    #[rstest_reuse::apply(persistence_types)]
1581    pub async fn test_membership_persistence<P: TestablePersistence>(
1582        _p: PhantomData<P>,
1583    ) -> anyhow::Result<()> {
1584        let tmp = P::tmp_storage().await;
1585        let mut opt = P::options(&tmp);
1586
1587        let storage = opt.create().await.unwrap();
1588
1589        let validator = Validator::mock();
1590        let mut st = IndexMap::new();
1591        st.insert(validator.account, validator);
1592
1593        storage
1594            .store_stake(EpochNumber::new(10), st.clone(), None, None)
1595            .await?;
1596
1597        let (table, ..) = storage.load_stake(EpochNumber::new(10)).await?.unwrap();
1598        assert_eq!(st, table);
1599
1600        let val2 = Validator::mock();
1601        let mut st2 = IndexMap::new();
1602        st2.insert(val2.account, val2);
1603        storage
1604            .store_stake(EpochNumber::new(11), st2.clone(), None, None)
1605            .await?;
1606
1607        let tables = storage.load_latest_stake(4).await?.unwrap();
1608        let mut iter = tables.iter();
1609        assert_eq!(
1610            Some(&(EpochNumber::new(11), (st2.clone(), None), None)),
1611            iter.next()
1612        );
1613        assert_eq!(Some(&(EpochNumber::new(10), (st, None), None)), iter.next());
1614        assert_eq!(None, iter.next());
1615
1616        for i in 0..=20 {
1617            storage
1618                .store_stake(EpochNumber::new(i), st2.clone(), None, None)
1619                .await?;
1620        }
1621
1622        let tables = storage.load_latest_stake(5).await?.unwrap();
1623        let mut iter = tables.iter();
1624        assert_eq!(
1625            Some(&(EpochNumber::new(20), (st2.clone(), None), None)),
1626            iter.next()
1627        );
1628        assert_eq!(
1629            Some(&(EpochNumber::new(19), (st2.clone(), None), None)),
1630            iter.next()
1631        );
1632        assert_eq!(
1633            Some(&(EpochNumber::new(18), (st2.clone(), None), None)),
1634            iter.next()
1635        );
1636        assert_eq!(
1637            Some(&(EpochNumber::new(17), (st2.clone(), None), None)),
1638            iter.next()
1639        );
1640        assert_eq!(
1641            Some(&(EpochNumber::new(16), (st2, None), None)),
1642            iter.next()
1643        );
1644        assert_eq!(None, iter.next());
1645
1646        Ok(())
1647    }
1648}