sequencer/
persistence.rs

1//! Sequencer node persistence.
2//!
3//! This module implements the persistence required for a sequencer node to rejoin the network and
4//! resume participating in consensus, in the event that its process crashes or is killed and loses
5//! all in-memory state.
6//!
7//! This is distinct from the query service persistent storage found in the `api` module, which is
8//! an extension that node operators can opt into. This module defines the minimum level of
9//! persistence which is _required_ to run a node.
10
11use anyhow::Context;
12use async_trait::async_trait;
13use espresso_types::v0_3::ChainConfig;
14
15pub mod fs;
16pub mod no_storage;
17mod persistence_metrics;
18pub mod sql;
19
20/// Update a `NetworkConfig` that may have originally been persisted with an old version.
21fn migrate_network_config(
22    mut network_config: serde_json::Value,
23) -> anyhow::Result<serde_json::Value> {
24    let config = network_config
25        .get_mut("config")
26        .context("missing field `config`")?
27        .as_object_mut()
28        .context("`config` must be an object")?;
29
30    if !config.contains_key("builder_urls") {
31        // When multi-builder support was added, the configuration field `builder_url: Url` was
32        // replaced by an array `builder_urls: Vec<Url>`. If the saved config has no `builder_urls`
33        // field, it is older than this change. Populate `builder_urls` with a singleton array
34        // formed from the old value of `builder_url`, and delete the no longer used `builder_url`.
35        let url = config
36            .remove("builder_url")
37            .context("missing field `builder_url`")?;
38        config.insert("builder_urls".into(), vec![url].into());
39    }
40
41    // HotShotConfig was upgraded to include parameters for proposing and voting on upgrades.
42    // Configs which were persisted before this upgrade may be missing these parameters. This
43    // migration initializes them with a default. By default, we use JS MAX_SAFE_INTEGER for the
44    // start parameters so that nodes will never do an upgrade, unless explicitly configured
45    // otherwise.
46    if !config.contains_key("start_proposing_view") {
47        config.insert("start_proposing_view".into(), 9007199254740991u64.into());
48    }
49    if !config.contains_key("stop_proposing_view") {
50        config.insert("stop_proposing_view".into(), 0.into());
51    }
52    if !config.contains_key("start_voting_view") {
53        config.insert("start_voting_view".into(), 9007199254740991u64.into());
54    }
55    if !config.contains_key("stop_voting_view") {
56        config.insert("stop_voting_view".into(), 0.into());
57    }
58    if !config.contains_key("start_proposing_time") {
59        config.insert("start_proposing_time".into(), 9007199254740991u64.into());
60    }
61    if !config.contains_key("stop_proposing_time") {
62        config.insert("stop_proposing_time".into(), 0.into());
63    }
64    if !config.contains_key("start_voting_time") {
65        config.insert("start_voting_time".into(), 9007199254740991u64.into());
66    }
67    if !config.contains_key("stop_voting_time") {
68        config.insert("stop_voting_time".into(), 0.into());
69    }
70
71    // HotShotConfig was upgraded to include an `epoch_height` parameter. Initialize with a default
72    // if missing.
73    if !config.contains_key("epoch_height") {
74        config.insert("epoch_height".into(), 0.into());
75    }
76
77    // HotShotConfig was upgraded to include `drb_difficulty` and `drb_upgrade_difficulty` parameters. Initialize with a default
78    // if missing.
79    if !config.contains_key("drb_difficulty") {
80        config.insert("drb_difficulty".into(), 0.into());
81    }
82    if !config.contains_key("drb_upgrade_difficulty") {
83        config.insert("drb_upgrade_difficulty".into(), 0.into());
84    }
85
86    // HotShotConfig was upgraded to include `da_committeees`. Initialize with an empty `da_committees` if missing.
87    if !config.contains_key("da_committees") {
88        config.insert("da_committees".into(), serde_json::json!([]));
89    }
90
91    Ok(network_config)
92}
93
94#[async_trait]
95pub trait ChainConfigPersistence: Sized + Send + Sync {
96    async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()>;
97}
98
99#[cfg(test)]
100mod tests {
101    use std::{cmp::max, collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
102
103    use alloy::{
104        network::EthereumWallet,
105        primitives::{Address, U256},
106        providers::{ext::AnvilApi, Provider, ProviderBuilder},
107    };
108    use anyhow::bail;
109    use async_lock::{Mutex, RwLock};
110    use async_trait::async_trait;
111    use committable::{Commitment, Committable};
112    use espresso_contract_deployer::{
113        builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
114        Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
115    };
116    use espresso_types::{
117        traits::{
118            EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer,
119            PersistenceOptions, SequencerPersistence,
120        },
121        v0_3::{Fetcher, Validator},
122        Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes,
123        SequencerVersions, ValidatedState,
124    };
125    use futures::{future::join_all, StreamExt, TryStreamExt};
126    use hotshot::{
127        types::{BLSPubKey, SignatureKey},
128        InitializerEpochInfo,
129    };
130    use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
131    use hotshot_example_types::node_types::TestVersions;
132    use hotshot_query_service::{availability::BlockQueryData, testing::mocks::MockVersions};
133    use hotshot_types::{
134        data::{
135            ns_table::parse_ns_table, vid_commitment, vid_disperse::AvidMDisperseShare,
136            DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
137            ViewNumber,
138        },
139        event::{EventType, HotShotAction, LeafInfo},
140        light_client::StateKeyPair,
141        message::{convert_proposal, Proposal, UpgradeLock},
142        simple_certificate::{
143            CertificatePair, NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2,
144            UpgradeCertificate,
145        },
146        simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData},
147        traits::{
148            block_contents::BlockHeader,
149            node_implementation::{ConsensusTime, Versions},
150            EncodeBytes,
151        },
152        utils::EpochTransitionIndicator,
153        vid::avidm::{init_avidm_param, AvidMScheme},
154        vote::HasViewNumber,
155    };
156    use indexmap::IndexMap;
157    use portpicker::pick_unused_port;
158    use staking_cli::demo::{DelegationConfig, StakingTransactions};
159    use surf_disco::Client;
160    use tide_disco::error::ServerError;
161    use tokio::{spawn, time::sleep};
162    use vbs::version::{StaticVersion, StaticVersionType, Version};
163
164    use crate::{
165        api::{
166            test_helpers::{TestNetwork, TestNetworkConfigBuilder, STAKE_TABLE_CAPACITY_FOR_TEST},
167            Options,
168        },
169        catchup::NullStateCatchup,
170        testing::{staking_priv_keys, TestConfigBuilder},
171        SequencerApiVersion, RECENT_STAKE_TABLES_LIMIT,
172    };
173
174    #[async_trait]
175    pub trait TestablePersistence: SequencerPersistence + MembershipPersistence {
176        type Storage: Sync;
177
178        async fn tmp_storage() -> Self::Storage;
179        fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self>;
180
181        async fn connect(storage: &Self::Storage) -> Self {
182            Self::options(storage).create().await.unwrap()
183        }
184    }
185
186    #[rstest_reuse::template]
187    #[rstest::rstest]
188    #[case(PhantomData::<crate::persistence::sql::Persistence>)]
189    #[case(PhantomData::<crate::persistence::fs::Persistence>)]
190    #[test_log::test(tokio::test(flavor = "multi_thread"))]
191    pub fn persistence_types<P: TestablePersistence>(#[case] _p: PhantomData<P>) {}
192
193    #[derive(Clone, Debug, Default)]
194    struct EventCollector {
195        events: Arc<RwLock<Vec<Event>>>,
196    }
197
198    impl EventCollector {
199        async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
200            self.events
201                .read()
202                .await
203                .iter()
204                .flat_map(|event| {
205                    let EventType::Decide { leaf_chain, .. } = &event.event else {
206                        panic!("expected decide event, got {event:?}");
207                    };
208                    leaf_chain.iter().cloned().rev()
209                })
210                .collect::<Vec<_>>()
211        }
212    }
213
214    #[async_trait]
215    impl EventConsumer for EventCollector {
216        async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
217            self.events.write().await.push(event.clone());
218            Ok(())
219        }
220    }
221
222    #[derive(Clone, Copy, Debug)]
223    struct FailConsumer;
224
225    #[async_trait]
226    impl EventConsumer for FailConsumer {
227        async fn handle_event(&self, _: &Event) -> anyhow::Result<()> {
228            bail!("mock error injection");
229        }
230    }
231
232    #[rstest_reuse::apply(persistence_types)]
233    pub async fn test_voted_view<P: TestablePersistence>(_p: PhantomData<P>) {
234        let tmp = P::tmp_storage().await;
235        let storage = P::connect(&tmp).await;
236
237        // Initially, there is no saved view.
238        assert_eq!(storage.load_latest_acted_view().await.unwrap(), None);
239
240        // Store a view.
241        let view1 = ViewNumber::genesis();
242        storage
243            .record_action(view1, None, HotShotAction::Vote)
244            .await
245            .unwrap();
246        assert_eq!(
247            storage.load_latest_acted_view().await.unwrap().unwrap(),
248            view1
249        );
250
251        // Store a newer view, make sure storage gets updated.
252        let view2 = view1 + 1;
253        storage
254            .record_action(view2, None, HotShotAction::Vote)
255            .await
256            .unwrap();
257        assert_eq!(
258            storage.load_latest_acted_view().await.unwrap().unwrap(),
259            view2
260        );
261
262        // Store an old view, make sure storage is unchanged.
263        storage
264            .record_action(view1, None, HotShotAction::Vote)
265            .await
266            .unwrap();
267        assert_eq!(
268            storage.load_latest_acted_view().await.unwrap().unwrap(),
269            view2
270        );
271    }
272
273    #[rstest_reuse::apply(persistence_types)]
274    pub async fn test_restart_view<P: TestablePersistence>(_p: PhantomData<P>) {
275        let tmp = P::tmp_storage().await;
276        let storage = P::connect(&tmp).await;
277
278        // Initially, there is no saved view.
279        assert_eq!(storage.load_restart_view().await.unwrap(), None);
280
281        // Store a view.
282        let view1 = ViewNumber::genesis();
283        storage
284            .record_action(view1, None, HotShotAction::Vote)
285            .await
286            .unwrap();
287        assert_eq!(
288            storage.load_restart_view().await.unwrap().unwrap(),
289            view1 + 1
290        );
291
292        // Store a newer view, make sure storage gets updated.
293        let view2 = view1 + 1;
294        storage
295            .record_action(view2, None, HotShotAction::Vote)
296            .await
297            .unwrap();
298        assert_eq!(
299            storage.load_restart_view().await.unwrap().unwrap(),
300            view2 + 1
301        );
302
303        // Store an old view, make sure storage is unchanged.
304        storage
305            .record_action(view1, None, HotShotAction::Vote)
306            .await
307            .unwrap();
308        assert_eq!(
309            storage.load_restart_view().await.unwrap().unwrap(),
310            view2 + 1
311        );
312
313        // store a higher proposed view, make sure storage is unchanged.
314        storage
315            .record_action(view2 + 1, None, HotShotAction::Propose)
316            .await
317            .unwrap();
318        assert_eq!(
319            storage.load_restart_view().await.unwrap().unwrap(),
320            view2 + 1
321        );
322
323        // store a higher timeout vote view, make sure storage is unchanged.
324        storage
325            .record_action(view2 + 1, None, HotShotAction::TimeoutVote)
326            .await
327            .unwrap();
328        assert_eq!(
329            storage.load_restart_view().await.unwrap().unwrap(),
330            view2 + 1
331        );
332    }
333
334    #[rstest_reuse::apply(persistence_types)]
335    pub async fn test_store_drb_input<P: TestablePersistence>(_p: PhantomData<P>) {
336        use hotshot_types::drb::DrbInput;
337
338        let tmp = P::tmp_storage().await;
339        let storage = P::connect(&tmp).await;
340        let difficulty_level = 10;
341
342        // Initially, there is no saved info.
343        if storage.load_drb_input(10).await.is_ok() {
344            panic!("unexpected nonempty drb_input");
345        }
346
347        let drb_input_1 = DrbInput {
348            epoch: 10,
349            iteration: 10,
350            value: [0u8; 32],
351            difficulty_level,
352        };
353
354        let drb_input_2 = DrbInput {
355            epoch: 10,
356            iteration: 20,
357            value: [0u8; 32],
358            difficulty_level,
359        };
360
361        let drb_input_3 = DrbInput {
362            epoch: 10,
363            iteration: 30,
364            value: [0u8; 32],
365            difficulty_level,
366        };
367
368        let _ = storage.store_drb_input(drb_input_1.clone()).await;
369
370        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_1);
371
372        let _ = storage.store_drb_input(drb_input_3.clone()).await;
373
374        // check that the drb input is overwritten
375        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
376
377        let _ = storage.store_drb_input(drb_input_2.clone()).await;
378
379        // check that the drb input is not overwritten by the older value
380        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
381    }
382
383    #[rstest_reuse::apply(persistence_types)]
384    pub async fn test_epoch_info<P: TestablePersistence>(_p: PhantomData<P>) {
385        let tmp = P::tmp_storage().await;
386        let storage = P::connect(&tmp).await;
387
388        // Initially, there is no saved info.
389        assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
390
391        // Store a drb result.
392        storage
393            .store_drb_result(EpochNumber::new(1), [1; 32])
394            .await
395            .unwrap();
396        assert_eq!(
397            storage.load_start_epoch_info().await.unwrap(),
398            vec![InitializerEpochInfo::<SeqTypes> {
399                epoch: EpochNumber::new(1),
400                drb_result: [1; 32],
401                block_header: None,
402            }]
403        );
404
405        // Store a second DRB result
406        storage
407            .store_drb_result(EpochNumber::new(2), [3; 32])
408            .await
409            .unwrap();
410        assert_eq!(
411            storage.load_start_epoch_info().await.unwrap(),
412            vec![
413                InitializerEpochInfo::<SeqTypes> {
414                    epoch: EpochNumber::new(1),
415                    drb_result: [1; 32],
416                    block_header: None,
417                },
418                InitializerEpochInfo::<SeqTypes> {
419                    epoch: EpochNumber::new(2),
420                    drb_result: [3; 32],
421                    block_header: None,
422                }
423            ]
424        );
425
426        // Make a header
427        let instance_state = NodeState::mock();
428        let validated_state = hotshot_types::traits::ValidatedState::genesis(&instance_state).0;
429        let leaf: Leaf2 = Leaf::genesis::<MockVersions>(&validated_state, &instance_state)
430            .await
431            .into();
432        let header = leaf.block_header().clone();
433
434        // Test storing the header
435        storage
436            .store_epoch_root(EpochNumber::new(1), header.clone())
437            .await
438            .unwrap();
439        assert_eq!(
440            storage.load_start_epoch_info().await.unwrap(),
441            vec![
442                InitializerEpochInfo::<SeqTypes> {
443                    epoch: EpochNumber::new(1),
444                    drb_result: [1; 32],
445                    block_header: Some(header.clone()),
446                },
447                InitializerEpochInfo::<SeqTypes> {
448                    epoch: EpochNumber::new(2),
449                    drb_result: [3; 32],
450                    block_header: None,
451                }
452            ]
453        );
454
455        // Store more than the limit
456        let total_epochs = RECENT_STAKE_TABLES_LIMIT + 10;
457        for i in 0..total_epochs {
458            let epoch = EpochNumber::new(i);
459            let drb = [i as u8; 32];
460            storage
461                .store_drb_result(epoch, drb)
462                .await
463                .unwrap_or_else(|_| panic!("Failed to store DRB result for epoch {i}"));
464        }
465
466        let results = storage.load_start_epoch_info().await.unwrap();
467
468        // Check that only the most recent RECENT_STAKE_TABLES_LIMIT epochs are returned
469        assert_eq!(
470            results.len(),
471            RECENT_STAKE_TABLES_LIMIT as usize,
472            "Should return only the most recent {RECENT_STAKE_TABLES_LIMIT} epochs",
473        );
474
475        for (i, info) in results.iter().enumerate() {
476            let expected_epoch =
477                EpochNumber::new(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64);
478            let expected_drb = [(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64) as u8; 32];
479            assert_eq!(info.epoch, expected_epoch, "invalid epoch at index {i}",);
480            assert_eq!(info.drb_result, expected_drb, "invalid DRB at index {i}",);
481            assert!(info.block_header.is_none(), "Expected no block header");
482        }
483    }
484
485    fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
486        LeafInfo {
487            leaf,
488            vid_share: None,
489            state: Default::default(),
490            delta: None,
491            state_cert: None,
492        }
493    }
494
495    #[rstest_reuse::apply(persistence_types)]
496    pub async fn test_append_and_decide<P: TestablePersistence>(_p: PhantomData<P>) {
497        let tmp = P::tmp_storage().await;
498        let storage = P::connect(&tmp).await;
499
500        // Test append VID
501        assert_eq!(
502            storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
503            None
504        );
505
506        let leaf: Leaf2 =
507            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
508        let leaf_payload = leaf.block_payload().unwrap();
509        let leaf_payload_bytes_arc = leaf_payload.encode();
510
511        let avidm_param = init_avidm_param(2).unwrap();
512        let weights = vec![1u32; 2];
513
514        let ns_table = parse_ns_table(
515            leaf_payload.byte_len().as_usize(),
516            &leaf_payload.ns_table().encode(),
517        );
518        let (payload_commitment, shares) =
519            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
520                .unwrap();
521
522        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
523        let signature = PubKey::sign(&privkey, &[]).unwrap();
524        let mut vid = AvidMDisperseShare::<SeqTypes> {
525            view_number: ViewNumber::new(0),
526            payload_commitment,
527            share: shares[0].clone(),
528            recipient_key: pubkey,
529            epoch: Some(EpochNumber::new(0)),
530            target_epoch: Some(EpochNumber::new(0)),
531            common: avidm_param,
532        };
533        let mut quorum_proposal = Proposal {
534            data: QuorumProposalWrapper::<SeqTypes> {
535                proposal: QuorumProposal2::<SeqTypes> {
536                    epoch: None,
537                    block_header: leaf.block_header().clone(),
538                    view_number: ViewNumber::genesis(),
539                    justify_qc: QuorumCertificate2::genesis::<TestVersions>(
540                        &ValidatedState::default(),
541                        &NodeState::mock(),
542                    )
543                    .await,
544                    upgrade_certificate: None,
545                    view_change_evidence: None,
546                    next_drb_result: None,
547                    next_epoch_justify_qc: None,
548                    state_cert: None,
549                },
550            },
551            signature,
552            _pd: Default::default(),
553        };
554
555        let vid_share0 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
556
557        storage.append_vid(&vid_share0).await.unwrap();
558
559        assert_eq!(
560            storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
561            Some(vid_share0.clone())
562        );
563
564        vid.view_number = ViewNumber::new(1);
565
566        let vid_share1 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
567        storage.append_vid(&vid_share1).await.unwrap();
568
569        assert_eq!(
570            storage.load_vid_share(vid.view_number()).await.unwrap(),
571            Some(vid_share1.clone())
572        );
573
574        vid.view_number = ViewNumber::new(2);
575
576        let vid_share2 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
577        storage.append_vid(&vid_share2).await.unwrap();
578
579        assert_eq!(
580            storage.load_vid_share(vid.view_number()).await.unwrap(),
581            Some(vid_share2.clone())
582        );
583
584        vid.view_number = ViewNumber::new(3);
585
586        let vid_share3 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
587        storage.append_vid(&vid_share3).await.unwrap();
588
589        assert_eq!(
590            storage.load_vid_share(vid.view_number()).await.unwrap(),
591            Some(vid_share3.clone())
592        );
593
594        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
595            .expect("Failed to sign block payload");
596
597        let da_proposal_inner = DaProposal2::<SeqTypes> {
598            encoded_transactions: leaf_payload_bytes_arc.clone(),
599            metadata: leaf_payload.ns_table().clone(),
600            view_number: ViewNumber::new(0),
601            epoch: None,
602            epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
603        };
604
605        let da_proposal = Proposal {
606            data: da_proposal_inner,
607            signature: block_payload_signature,
608            _pd: Default::default(),
609        };
610
611        let vid_commitment = vid_commitment::<TestVersions>(
612            &leaf_payload_bytes_arc,
613            &leaf.block_header().metadata().encode(),
614            2,
615            <TestVersions as Versions>::Base::VERSION,
616        );
617
618        storage
619            .append_da2(&da_proposal, vid_commitment)
620            .await
621            .unwrap();
622
623        assert_eq!(
624            storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(),
625            Some(da_proposal.clone())
626        );
627
628        let mut da_proposal1 = da_proposal.clone();
629        da_proposal1.data.view_number = ViewNumber::new(1);
630        storage
631            .append_da2(&da_proposal1.clone(), vid_commitment)
632            .await
633            .unwrap();
634
635        assert_eq!(
636            storage
637                .load_da_proposal(da_proposal1.data.view_number)
638                .await
639                .unwrap(),
640            Some(da_proposal1.clone())
641        );
642
643        let mut da_proposal2 = da_proposal1.clone();
644        da_proposal2.data.view_number = ViewNumber::new(2);
645        storage
646            .append_da2(&da_proposal2.clone(), vid_commitment)
647            .await
648            .unwrap();
649
650        assert_eq!(
651            storage
652                .load_da_proposal(da_proposal2.data.view_number)
653                .await
654                .unwrap(),
655            Some(da_proposal2.clone())
656        );
657
658        let mut da_proposal3 = da_proposal2.clone();
659        da_proposal3.data.view_number = ViewNumber::new(3);
660        storage
661            .append_da2(&da_proposal3.clone(), vid_commitment)
662            .await
663            .unwrap();
664
665        assert_eq!(
666            storage
667                .load_da_proposal(da_proposal3.data.view_number)
668                .await
669                .unwrap(),
670            Some(da_proposal3.clone())
671        );
672
673        let quorum_proposal1 = quorum_proposal.clone();
674
675        storage
676            .append_quorum_proposal2(&quorum_proposal1)
677            .await
678            .unwrap();
679
680        assert_eq!(
681            storage.load_quorum_proposals().await.unwrap(),
682            BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())])
683        );
684
685        quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
686        let quorum_proposal2 = quorum_proposal.clone();
687        storage
688            .append_quorum_proposal2(&quorum_proposal2)
689            .await
690            .unwrap();
691
692        assert_eq!(
693            storage.load_quorum_proposals().await.unwrap(),
694            BTreeMap::from_iter([
695                (ViewNumber::genesis(), quorum_proposal1.clone()),
696                (ViewNumber::new(1), quorum_proposal2.clone())
697            ])
698        );
699
700        quorum_proposal.data.proposal.view_number = ViewNumber::new(2);
701        quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(1);
702        let quorum_proposal3 = quorum_proposal.clone();
703        storage
704            .append_quorum_proposal2(&quorum_proposal3)
705            .await
706            .unwrap();
707
708        assert_eq!(
709            storage.load_quorum_proposals().await.unwrap(),
710            BTreeMap::from_iter([
711                (ViewNumber::genesis(), quorum_proposal1.clone()),
712                (ViewNumber::new(1), quorum_proposal2.clone()),
713                (ViewNumber::new(2), quorum_proposal3.clone())
714            ])
715        );
716
717        quorum_proposal.data.proposal.view_number = ViewNumber::new(3);
718        quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(2);
719
720        // This one should stick around after GC runs.
721        let quorum_proposal4 = quorum_proposal.clone();
722        storage
723            .append_quorum_proposal2(&quorum_proposal4)
724            .await
725            .unwrap();
726
727        assert_eq!(
728            storage.load_quorum_proposals().await.unwrap(),
729            BTreeMap::from_iter([
730                (ViewNumber::genesis(), quorum_proposal1.clone()),
731                (ViewNumber::new(1), quorum_proposal2.clone()),
732                (ViewNumber::new(2), quorum_proposal3.clone()),
733                (ViewNumber::new(3), quorum_proposal4.clone())
734            ])
735        );
736
737        // Test decide and garbage collection. Pass in a leaf chain with no VID shares or payloads,
738        // so we have to fetch the missing data from storage.
739        let leaves = [
740            Leaf2::from_quorum_proposal(&quorum_proposal1.data),
741            Leaf2::from_quorum_proposal(&quorum_proposal2.data),
742            Leaf2::from_quorum_proposal(&quorum_proposal3.data),
743            Leaf2::from_quorum_proposal(&quorum_proposal4.data),
744        ];
745        let mut final_qc = leaves[3].justify_qc();
746        final_qc.view_number += 1;
747        final_qc.data.leaf_commit = Committable::commit(&leaf);
748        let qcs = [
749            leaves[1].justify_qc(),
750            leaves[2].justify_qc(),
751            leaves[3].justify_qc(),
752            final_qc,
753        ];
754
755        assert_eq!(
756            storage.load_anchor_view().await.unwrap(),
757            ViewNumber::genesis()
758        );
759
760        let consumer = EventCollector::default();
761        let leaf_chain = leaves
762            .iter()
763            .take(3)
764            .map(|leaf| leaf_info(leaf.clone()))
765            .zip(&qcs)
766            .collect::<Vec<_>>();
767        tracing::info!(?leaf_chain, "decide view 2");
768        storage
769            .append_decided_leaves(
770                ViewNumber::new(2),
771                leaf_chain
772                    .iter()
773                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change((*qc).clone()))),
774                None,
775                &consumer,
776            )
777            .await
778            .unwrap();
779        assert_eq!(
780            storage.load_anchor_view().await.unwrap(),
781            ViewNumber::new(2)
782        );
783
784        for i in 0..=2 {
785            assert_eq!(
786                storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(),
787                None
788            );
789
790            assert_eq!(
791                storage.load_vid_share(ViewNumber::new(i)).await.unwrap(),
792                None
793            );
794        }
795
796        assert_eq!(
797            storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
798            Some(da_proposal3)
799        );
800
801        assert_eq!(
802            storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
803            Some(convert_proposal(vid_share3.clone()))
804        );
805
806        let proposals = storage.load_quorum_proposals().await.unwrap();
807        assert_eq!(
808            proposals,
809            BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)])
810        );
811
812        // A decide event should have been processed.
813        for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
814            assert_eq!(info.leaf, *leaf);
815            let decided_vid_share = info.vid_share.as_ref().unwrap();
816            assert_eq!(decided_vid_share.view_number(), leaf.view_number());
817        }
818
819        // The decided leaf should not have been garbage collected.
820        assert_eq!(
821            storage.load_anchor_leaf().await.unwrap(),
822            Some((leaves[2].clone(), qcs[2].clone()))
823        );
824        assert_eq!(
825            storage.load_anchor_view().await.unwrap(),
826            leaves[2].view_number()
827        );
828
829        // Process a second decide event.
830        let consumer = EventCollector::default();
831        tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3");
832        storage
833            .append_decided_leaves(
834                ViewNumber::new(3),
835                vec![(
836                    &leaf_info(leaves[3].clone()),
837                    CertificatePair::non_epoch_change(qcs[3].clone()),
838                )],
839                None,
840                &consumer,
841            )
842            .await
843            .unwrap();
844        assert_eq!(
845            storage.load_anchor_view().await.unwrap(),
846            ViewNumber::new(3)
847        );
848
849        // A decide event should have been processed.
850        let events = consumer.events.read().await;
851        assert_eq!(events.len(), 1);
852        assert_eq!(events[0].view_number, ViewNumber::new(3));
853        let EventType::Decide {
854            committing_qc,
855            leaf_chain,
856            ..
857        } = &events[0].event
858        else {
859            panic!("expected decide event, got {:?}", events[0]);
860        };
861        assert_eq!(*committing_qc.qc(), qcs[3]);
862        assert_eq!(leaf_chain.len(), 1);
863        let info = &leaf_chain[0];
864        assert_eq!(info.leaf, leaves[3]);
865
866        // The remaining data should have been GCed.
867        assert_eq!(
868            storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
869            None
870        );
871
872        assert_eq!(
873            storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
874            None
875        );
876        assert_eq!(
877            storage.load_quorum_proposals().await.unwrap(),
878            BTreeMap::new()
879        );
880    }
881
882    #[rstest_reuse::apply(persistence_types)]
883    pub async fn test_upgrade_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
884        let tmp = P::tmp_storage().await;
885        let storage = P::connect(&tmp).await;
886
887        // Test get upgrade certificate
888        assert_eq!(storage.load_upgrade_certificate().await.unwrap(), None);
889
890        let upgrade_data = UpgradeProposalData {
891            old_version: Version { major: 0, minor: 1 },
892            new_version: Version { major: 1, minor: 0 },
893            decide_by: ViewNumber::genesis(),
894            new_version_hash: Default::default(),
895            old_version_last_view: ViewNumber::genesis(),
896            new_version_first_view: ViewNumber::genesis(),
897        };
898
899        let decide_upgrade_certificate = UpgradeCertificate::<SeqTypes>::new(
900            upgrade_data.clone(),
901            upgrade_data.commit(),
902            ViewNumber::genesis(),
903            Default::default(),
904            Default::default(),
905        );
906        let res = storage
907            .store_upgrade_certificate(Some(decide_upgrade_certificate.clone()))
908            .await;
909        assert!(res.is_ok());
910
911        let res = storage.load_upgrade_certificate().await.unwrap();
912        let view_number = res.unwrap().view_number;
913        assert_eq!(view_number, ViewNumber::genesis());
914
915        let new_view_number_for_certificate = ViewNumber::new(50);
916        let mut new_upgrade_certificate = decide_upgrade_certificate.clone();
917        new_upgrade_certificate.view_number = new_view_number_for_certificate;
918
919        let res = storage
920            .store_upgrade_certificate(Some(new_upgrade_certificate.clone()))
921            .await;
922        assert!(res.is_ok());
923
924        let res = storage.load_upgrade_certificate().await.unwrap();
925        let view_number = res.unwrap().view_number;
926        assert_eq!(view_number, new_view_number_for_certificate);
927    }
928
929    #[rstest_reuse::apply(persistence_types)]
930    pub async fn test_next_epoch_quorum_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
931        let tmp = P::tmp_storage().await;
932        let storage = P::connect(&tmp).await;
933
934        //  test that next epoch qc2 does not exist
935        assert_eq!(
936            storage.load_next_epoch_quorum_certificate().await.unwrap(),
937            None
938        );
939
940        let upgrade_lock = UpgradeLock::<SeqTypes, TestVersions>::new();
941
942        let genesis_view = ViewNumber::genesis();
943
944        let leaf =
945            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::default()).await;
946        let data: NextEpochQuorumData2<SeqTypes> = QuorumData2 {
947            leaf_commit: leaf.commit(),
948            epoch: Some(EpochNumber::new(1)),
949            block_number: Some(leaf.height()),
950        }
951        .into();
952
953        let versioned_data =
954            VersionedVoteData::new_infallible(data.clone(), genesis_view, &upgrade_lock).await;
955
956        let bytes: [u8; 32] = versioned_data.commit().into();
957
958        let next_epoch_qc = NextEpochQuorumCertificate2::new(
959            data,
960            Commitment::from_raw(bytes),
961            genesis_view,
962            None,
963            PhantomData,
964        );
965
966        let res = storage
967            .store_next_epoch_quorum_certificate(next_epoch_qc.clone())
968            .await;
969        assert!(res.is_ok());
970
971        let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
972        let view_number = res.unwrap().view_number;
973        assert_eq!(view_number, ViewNumber::genesis());
974
975        let new_view_number_for_qc = ViewNumber::new(50);
976        let mut new_qc = next_epoch_qc.clone();
977        new_qc.view_number = new_view_number_for_qc;
978
979        let res = storage
980            .store_next_epoch_quorum_certificate(new_qc.clone())
981            .await;
982        assert!(res.is_ok());
983
984        let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
985        let view_number = res.unwrap().view_number;
986        assert_eq!(view_number, new_view_number_for_qc);
987    }
988
989    #[rstest_reuse::apply(persistence_types)]
990    pub async fn test_decide_with_failing_event_consumer<P: TestablePersistence>(
991        _p: PhantomData<P>,
992    ) {
993        let tmp = P::tmp_storage().await;
994        let storage = P::connect(&tmp).await;
995
996        // Create a short blockchain.
997        let mut chain = vec![];
998
999        let leaf: Leaf2 =
1000            Leaf::genesis::<MockVersions>(&ValidatedState::default(), &NodeState::mock())
1001                .await
1002                .into();
1003        let leaf_payload = leaf.block_payload().unwrap();
1004        let leaf_payload_bytes_arc = leaf_payload.encode();
1005        let avidm_param = init_avidm_param(2).unwrap();
1006        let weights = vec![1u32; 2];
1007        let ns_table = parse_ns_table(
1008            leaf_payload.byte_len().as_usize(),
1009            &leaf_payload.ns_table().encode(),
1010        );
1011        let (payload_commitment, shares) =
1012            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1013                .unwrap();
1014
1015        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1016        let mut vid = AvidMDisperseShare::<SeqTypes> {
1017            view_number: ViewNumber::new(0),
1018            payload_commitment,
1019            share: shares[0].clone(),
1020            recipient_key: pubkey,
1021            epoch: Some(EpochNumber::new(0)),
1022            target_epoch: Some(EpochNumber::new(0)),
1023            common: avidm_param,
1024        }
1025        .to_proposal(&privkey)
1026        .unwrap()
1027        .clone();
1028        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1029            proposal: QuorumProposal2::<SeqTypes> {
1030                block_header: leaf.block_header().clone(),
1031                view_number: ViewNumber::genesis(),
1032                justify_qc: QuorumCertificate::genesis::<TestVersions>(
1033                    &ValidatedState::default(),
1034                    &NodeState::mock(),
1035                )
1036                .await
1037                .to_qc2(),
1038                upgrade_certificate: None,
1039                view_change_evidence: None,
1040                next_drb_result: None,
1041                next_epoch_justify_qc: None,
1042                epoch: None,
1043                state_cert: None,
1044            },
1045        };
1046        let mut qc = QuorumCertificate2::genesis::<TestVersions>(
1047            &ValidatedState::default(),
1048            &NodeState::mock(),
1049        )
1050        .await;
1051
1052        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1053            .expect("Failed to sign block payload");
1054        let mut da_proposal = Proposal {
1055            data: DaProposal2::<SeqTypes> {
1056                encoded_transactions: leaf_payload_bytes_arc.clone(),
1057                metadata: leaf_payload.ns_table().clone(),
1058                view_number: ViewNumber::new(0),
1059                epoch: Some(EpochNumber::new(0)),
1060                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1061            },
1062            signature: block_payload_signature,
1063            _pd: Default::default(),
1064        };
1065
1066        let vid_commitment = vid_commitment::<TestVersions>(
1067            &leaf_payload_bytes_arc,
1068            &leaf.block_header().metadata().encode(),
1069            2,
1070            <TestVersions as Versions>::Base::VERSION,
1071        );
1072
1073        for i in 0..4 {
1074            quorum_proposal.proposal.view_number = ViewNumber::new(i);
1075            let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
1076            qc.view_number = leaf.view_number();
1077            qc.data.leaf_commit = Committable::commit(&leaf);
1078            vid.data.view_number = leaf.view_number();
1079            da_proposal.data.view_number = leaf.view_number();
1080            chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone()));
1081        }
1082
1083        // Add proposals.
1084        for (_, _, vid, da) in &chain {
1085            tracing::info!(?da, ?vid, "insert proposal");
1086            storage.append_da2(da, vid_commitment).await.unwrap();
1087            storage
1088                .append_vid(&convert_proposal(vid.clone()))
1089                .await
1090                .unwrap();
1091        }
1092
1093        // Decide 2 leaves, but fail in event processing.
1094        let leaf_chain = chain
1095            .iter()
1096            .take(2)
1097            .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1098            .collect::<Vec<_>>();
1099        tracing::info!("decide with event handling failure");
1100        storage
1101            .append_decided_leaves(
1102                ViewNumber::new(1),
1103                leaf_chain
1104                    .iter()
1105                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1106                None,
1107                &FailConsumer,
1108            )
1109            .await
1110            .unwrap();
1111        // No garbage collection should have run.
1112        for i in 0..4 {
1113            tracing::info!(i, "check proposal availability");
1114            assert!(storage
1115                .load_vid_share(ViewNumber::new(i))
1116                .await
1117                .unwrap()
1118                .is_some());
1119            assert!(storage
1120                .load_da_proposal(ViewNumber::new(i))
1121                .await
1122                .unwrap()
1123                .is_some());
1124        }
1125        tracing::info!("check anchor leaf updated");
1126        assert_eq!(
1127            storage
1128                .load_anchor_leaf()
1129                .await
1130                .unwrap()
1131                .unwrap()
1132                .0
1133                .view_number(),
1134            ViewNumber::new(1)
1135        );
1136        assert_eq!(
1137            storage.load_anchor_view().await.unwrap(),
1138            ViewNumber::new(1)
1139        );
1140
1141        // Now decide remaining leaves successfully. We should now garbage collect and process a
1142        // decide event for all the leaves.
1143        let consumer = EventCollector::default();
1144        let leaf_chain = chain
1145            .iter()
1146            .skip(2)
1147            .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1148            .collect::<Vec<_>>();
1149        tracing::info!("decide successfully");
1150        storage
1151            .append_decided_leaves(
1152                ViewNumber::new(3),
1153                leaf_chain
1154                    .iter()
1155                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1156                None,
1157                &consumer,
1158            )
1159            .await
1160            .unwrap();
1161        // Garbage collection should have run.
1162        for i in 0..4 {
1163            tracing::info!(i, "check proposal garbage collected");
1164            assert!(storage
1165                .load_vid_share(ViewNumber::new(i))
1166                .await
1167                .unwrap()
1168                .is_none());
1169            assert!(storage
1170                .load_da_proposal(ViewNumber::new(i))
1171                .await
1172                .unwrap()
1173                .is_none());
1174        }
1175        tracing::info!("check anchor leaf updated");
1176        assert_eq!(
1177            storage
1178                .load_anchor_leaf()
1179                .await
1180                .unwrap()
1181                .unwrap()
1182                .0
1183                .view_number(),
1184            ViewNumber::new(3)
1185        );
1186        assert_eq!(
1187            storage.load_anchor_view().await.unwrap(),
1188            ViewNumber::new(3)
1189        );
1190
1191        // Check decide event.
1192        tracing::info!("check decide event");
1193        let leaf_chain = consumer.leaf_chain().await;
1194        assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
1195        for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) {
1196            assert_eq!(info.leaf, *leaf);
1197            let decided_vid_share = info.vid_share.as_ref().unwrap();
1198            assert_eq!(decided_vid_share.view_number(), leaf.view_number());
1199            assert!(info.leaf.block_payload().is_some());
1200        }
1201    }
1202
1203    #[rstest_reuse::apply(persistence_types)]
1204    pub async fn test_pruning<P: TestablePersistence>(_p: PhantomData<P>) {
1205        let tmp = P::tmp_storage().await;
1206
1207        let mut options = P::options(&tmp);
1208        options.set_view_retention(1);
1209        let storage = options.create().await.unwrap();
1210
1211        // Add some "old" data, from view 0.
1212        let leaf =
1213            Leaf::genesis::<MockVersions>(&ValidatedState::default(), &NodeState::mock()).await;
1214        let leaf_payload = leaf.block_payload().unwrap();
1215        let leaf_payload_bytes_arc = leaf_payload.encode();
1216        let avidm_param = init_avidm_param(2).unwrap();
1217        let weights = vec![1u32; 2];
1218
1219        let ns_table = parse_ns_table(
1220            leaf_payload.byte_len().as_usize(),
1221            &leaf_payload.ns_table().encode(),
1222        );
1223        let (payload_commitment, shares) =
1224            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1225                .unwrap();
1226
1227        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1228        let vid_share = convert_proposal(
1229            AvidMDisperseShare::<SeqTypes> {
1230                view_number: ViewNumber::new(0),
1231                payload_commitment,
1232                share: shares[0].clone(),
1233                recipient_key: pubkey,
1234                epoch: None,
1235                target_epoch: None,
1236                common: avidm_param,
1237            }
1238            .to_proposal(&privkey)
1239            .unwrap()
1240            .clone(),
1241        );
1242
1243        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1244            proposal: QuorumProposal2::<SeqTypes> {
1245                block_header: leaf.block_header().clone(),
1246                view_number: ViewNumber::genesis(),
1247                justify_qc: QuorumCertificate::genesis::<TestVersions>(
1248                    &ValidatedState::default(),
1249                    &NodeState::mock(),
1250                )
1251                .await
1252                .to_qc2(),
1253                upgrade_certificate: None,
1254                view_change_evidence: None,
1255                next_drb_result: None,
1256                next_epoch_justify_qc: None,
1257                epoch: None,
1258                state_cert: None,
1259            },
1260        };
1261        let quorum_proposal_signature =
1262            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1263                .expect("Failed to sign quorum proposal");
1264        let quorum_proposal = Proposal {
1265            data: quorum_proposal,
1266            signature: quorum_proposal_signature,
1267            _pd: Default::default(),
1268        };
1269
1270        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1271            .expect("Failed to sign block payload");
1272        let da_proposal = Proposal {
1273            data: DaProposal2::<SeqTypes> {
1274                encoded_transactions: leaf_payload_bytes_arc,
1275                metadata: leaf_payload.ns_table().clone(),
1276                view_number: ViewNumber::new(0),
1277                epoch: None,
1278                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1279            },
1280            signature: block_payload_signature,
1281            _pd: Default::default(),
1282        };
1283
1284        storage
1285            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1286            .await
1287            .unwrap();
1288        storage.append_vid(&vid_share).await.unwrap();
1289        storage
1290            .append_quorum_proposal2(&quorum_proposal)
1291            .await
1292            .unwrap();
1293
1294        // Decide a newer view, view 1.
1295        storage
1296            .append_decided_leaves(ViewNumber::new(1), [], None, &NullEventConsumer)
1297            .await
1298            .unwrap();
1299
1300        // The old data is not more than the retention period (1 view) old, so it should not be
1301        // GCed.
1302        assert_eq!(
1303            storage
1304                .load_da_proposal(ViewNumber::new(0))
1305                .await
1306                .unwrap()
1307                .unwrap(),
1308            da_proposal
1309        );
1310        assert_eq!(
1311            storage
1312                .load_vid_share(ViewNumber::new(0))
1313                .await
1314                .unwrap()
1315                .unwrap(),
1316            vid_share
1317        );
1318        assert_eq!(
1319            storage
1320                .load_quorum_proposal(ViewNumber::new(0))
1321                .await
1322                .unwrap(),
1323            quorum_proposal
1324        );
1325
1326        // Decide an even newer view, triggering GC of the old data.
1327        storage
1328            .append_decided_leaves(ViewNumber::new(2), [], None, &NullEventConsumer)
1329            .await
1330            .unwrap();
1331        assert!(storage
1332            .load_da_proposal(ViewNumber::new(0))
1333            .await
1334            .unwrap()
1335            .is_none());
1336        assert!(storage
1337            .load_vid_share(ViewNumber::new(0))
1338            .await
1339            .unwrap()
1340            .is_none());
1341        assert!(storage
1342            .load_quorum_proposal(ViewNumber::new(0))
1343            .await
1344            .is_err());
1345    }
1346
1347    async fn assert_events_eq<P: TestablePersistence>(
1348        persistence: &P,
1349        block: u64,
1350        stake_table_fetcher: &Fetcher,
1351        l1_client: &L1Client,
1352        stake_table_contract: Address,
1353    ) -> anyhow::Result<()> {
1354        // Load persisted events
1355        let (stored_l1, events) = persistence.load_events(0, block).await?;
1356        assert!(!events.is_empty());
1357        assert!(stored_l1.is_some());
1358        assert!(events.iter().all(|((l1_block, _), _)| *l1_block <= block));
1359        // Fetch events directly from the contract and compare with persisted data
1360        let contract_events = Fetcher::fetch_events_from_contract(
1361            l1_client.clone(),
1362            stake_table_contract,
1363            None,
1364            block,
1365        )
1366        .await?;
1367        assert_eq!(
1368            contract_events, events,
1369            "Events from contract and persistence do not match"
1370        );
1371
1372        // Fetch events from stake table fetcher and compare with persisted data
1373        let fetched_events = stake_table_fetcher
1374            .fetch_and_store_stake_table_events(stake_table_contract, block)
1375            .await?;
1376        assert_eq!(fetched_events, events);
1377
1378        Ok(())
1379    }
1380
1381    // test for validating stake table event fetching from persistence,
1382    // ensuring that persisted data matches the on-chain events and that event fetcher work correctly.
1383    #[rstest_reuse::apply(persistence_types)]
1384    pub async fn test_stake_table_fetching_from_persistence<P: TestablePersistence>(
1385        #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1386        stake_table_version: StakeTableContractVersion,
1387        _p: PhantomData<P>,
1388    ) -> anyhow::Result<()> {
1389        let epoch_height = 20;
1390        type PosVersion = SequencerVersions<StaticVersion<0, 3>, StaticVersion<0, 0>>;
1391
1392        let network_config = TestConfigBuilder::default()
1393            .epoch_height(epoch_height)
1394            .build();
1395
1396        let anvil_provider = network_config.anvil().unwrap();
1397
1398        let query_service_port = pick_unused_port().expect("No ports free for query service");
1399        let query_api_options = Options::with_port(query_service_port);
1400
1401        const NODE_COUNT: usize = 2;
1402
1403        let storage = join_all((0..NODE_COUNT).map(|_| P::tmp_storage())).await;
1404        let persistence_options: [_; NODE_COUNT] = storage
1405            .iter()
1406            .map(P::options)
1407            .collect::<Vec<_>>()
1408            .try_into()
1409            .unwrap();
1410
1411        let persistence = persistence_options[0].clone().create().await.unwrap();
1412
1413        // Build the config with PoS hook
1414        let l1_url = network_config.l1_url();
1415
1416        let testnet_config = TestNetworkConfigBuilder::with_num_nodes()
1417            .api_config(query_api_options)
1418            .network_config(network_config.clone())
1419            .persistences(persistence_options.clone())
1420            .pos_hook::<PosVersion>(DelegationConfig::MultipleDelegators, stake_table_version)
1421            .await
1422            .expect("Pos deployment failed")
1423            .build();
1424
1425        //start the network
1426        let test_network = TestNetwork::new(testnet_config, PosVersion::new()).await;
1427
1428        let client: Client<ServerError, SequencerApiVersion> = Client::new(
1429            format!("http://localhost:{query_service_port}")
1430                .parse()
1431                .unwrap(),
1432        );
1433        client.connect(None).await;
1434        tracing::info!(query_service_port, "server running");
1435
1436        // wait until we enter in epoch 3
1437        let _initial_blocks = client
1438            .socket("availability/stream/blocks/0")
1439            .subscribe::<BlockQueryData<SeqTypes>>()
1440            .await
1441            .unwrap()
1442            .take(40)
1443            .try_collect::<Vec<_>>()
1444            .await
1445            .unwrap();
1446        // Load initial persisted events and validate they exist.
1447        let membership_coordinator = test_network
1448            .server
1449            .consensus()
1450            .read()
1451            .await
1452            .membership_coordinator
1453            .clone();
1454
1455        let l1_client = L1Client::new(vec![l1_url]).unwrap();
1456        let node_state = test_network.server.node_state();
1457        let chain_config = node_state.chain_config;
1458        let stake_table_contract = chain_config.stake_table_contract.unwrap();
1459
1460        let current_membership = membership_coordinator.membership();
1461        {
1462            let membership_state = current_membership.read().await;
1463            let stake_table_fetcher = membership_state.fetcher();
1464
1465            let block1 = anvil_provider
1466                .get_block_number()
1467                .await
1468                .expect("latest l1 block");
1469
1470            assert_events_eq(
1471                &persistence,
1472                block1,
1473                stake_table_fetcher,
1474                &l1_client,
1475                stake_table_contract,
1476            )
1477            .await?;
1478        }
1479        let _epoch_4_blocks = client
1480            .socket("availability/stream/blocks/0")
1481            .subscribe::<BlockQueryData<SeqTypes>>()
1482            .await
1483            .unwrap()
1484            .take(65)
1485            .try_collect::<Vec<_>>()
1486            .await
1487            .unwrap();
1488        let block2 = anvil_provider
1489            .get_block_number()
1490            .await
1491            .expect("latest l1 block");
1492
1493        {
1494            let membership_state = current_membership.read().await;
1495            let stake_table_fetcher = membership_state.fetcher();
1496
1497            assert_events_eq(
1498                &persistence,
1499                block2,
1500                stake_table_fetcher,
1501                &l1_client,
1502                stake_table_contract,
1503            )
1504            .await?;
1505        }
1506        Ok(())
1507    }
1508
1509    #[rstest_reuse::apply(persistence_types)]
1510    pub async fn test_stake_table_background_fetching<P: TestablePersistence>(
1511        #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1512        stake_table_version: StakeTableContractVersion,
1513        _p: PhantomData<P>,
1514    ) -> anyhow::Result<()> {
1515        use espresso_types::v0_3::ChainConfig;
1516        use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1517
1518        let blocks_per_epoch = 10;
1519
1520        let network_config = TestConfigBuilder::<1>::default()
1521            .epoch_height(blocks_per_epoch)
1522            .build();
1523
1524        let anvil_provider = network_config.anvil().unwrap();
1525
1526        let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1527            &network_config.hotshot_config().hotshot_stake_table(),
1528            STAKE_TABLE_CAPACITY_FOR_TEST,
1529        )
1530        .unwrap();
1531
1532        let (_, priv_keys): (Vec<_>, Vec<_>) = (0..20)
1533            .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed([1; 32], i as u64))
1534            .unzip();
1535        let state_key_pairs = (0..20)
1536            .map(|i| StateKeyPair::generate_from_seed_indexed([2; 32], i as u64))
1537            .collect::<Vec<_>>();
1538
1539        let validators = staking_priv_keys(&priv_keys, &state_key_pairs, 20);
1540
1541        let deployer = ProviderBuilder::new()
1542            .wallet(EthereumWallet::from(network_config.signer().clone()))
1543            .connect_http(network_config.l1_url().clone());
1544
1545        let mut contracts = Contracts::new();
1546        let args = DeployerArgsBuilder::default()
1547            .deployer(deployer.clone())
1548            .rpc_url(network_config.l1_url().clone())
1549            .mock_light_client(true)
1550            .genesis_lc_state(genesis_state)
1551            .genesis_st_state(genesis_stake)
1552            .blocks_per_epoch(blocks_per_epoch)
1553            .epoch_start_block(1)
1554            .exit_escrow_period(U256::from(max(
1555                blocks_per_epoch * 15 + 100,
1556                DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1557            )))
1558            .multisig_pauser(network_config.signer().address())
1559            .token_name("Espresso".to_string())
1560            .token_symbol("ESP".to_string())
1561            .initial_token_supply(U256::from(3590000000u64))
1562            .ops_timelock_delay(U256::from(0))
1563            .ops_timelock_admin(network_config.signer().address())
1564            .ops_timelock_proposers(vec![network_config.signer().address()])
1565            .ops_timelock_executors(vec![network_config.signer().address()])
1566            .safe_exit_timelock_delay(U256::from(10))
1567            .safe_exit_timelock_admin(network_config.signer().address())
1568            .safe_exit_timelock_proposers(vec![network_config.signer().address()])
1569            .safe_exit_timelock_executors(vec![network_config.signer().address()])
1570            .build()
1571            .unwrap();
1572
1573        match stake_table_version {
1574            StakeTableContractVersion::V1 => args.deploy_to_stake_table_v1(&mut contracts).await,
1575            StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1576        }
1577        .expect("contracts deployed");
1578
1579        let st_addr = contracts
1580            .address(Contract::StakeTableProxy)
1581            .expect("StakeTableProxy deployed");
1582        let l1_url = network_config.l1_url().clone();
1583
1584        let mut planned_txns = StakingTransactions::create(
1585            l1_url.clone(),
1586            &deployer,
1587            st_addr,
1588            validators,
1589            None,
1590            DelegationConfig::MultipleDelegators,
1591        )
1592        .await
1593        .expect("stake table setup failed");
1594
1595        planned_txns
1596            .apply_prerequisites()
1597            .await
1598            .expect("prerequisites failed");
1599
1600        // Ensure we have at least one stake table affecting transaction
1601        planned_txns.apply_one().await.expect("send tx failed");
1602
1603        // new block every 1s
1604        anvil_provider
1605            .anvil_set_interval_mining(1)
1606            .await
1607            .expect("interval mining");
1608
1609        // spawn a separate task
1610        // this is going to keep registering validators and multiple delegators
1611        // the interval mining is set to 1s so each transaction finalization would take atleast 1s
1612        spawn({
1613            async move {
1614                {
1615                    while let Some(receipt) =
1616                        planned_txns.apply_one().await.expect("send tx failed")
1617                    {
1618                        tracing::debug!(?receipt, "transaction finalized");
1619                    }
1620                }
1621            }
1622        });
1623
1624        let storage = P::tmp_storage().await;
1625        let persistence = P::options(&storage).create().await.unwrap();
1626
1627        let l1_client = L1ClientOptions {
1628            stake_table_update_interval: Duration::from_secs(7),
1629            l1_retry_delay: Duration::from_millis(10),
1630            l1_events_max_block_range: 10000,
1631            ..Default::default()
1632        }
1633        .connect(vec![l1_url])
1634        .unwrap();
1635        l1_client.spawn_tasks().await;
1636
1637        let fetcher = Fetcher::new(
1638            Arc::new(NullStateCatchup::default()),
1639            Arc::new(Mutex::new(persistence.clone())),
1640            l1_client.clone(),
1641            ChainConfig {
1642                stake_table_contract: Some(st_addr),
1643                base_fee: 0.into(),
1644                ..Default::default()
1645            },
1646        );
1647
1648        // sleep so that we have enough events
1649        sleep(Duration::from_secs(20)).await;
1650
1651        fetcher.spawn_update_loop().await;
1652        let mut prev_l1_block = 0;
1653        let mut prev_events_len = 0;
1654        for _i in 0..10 {
1655            // Wait for more than update interval to assert that persistence was updated
1656            // L1 update interval is 7s in this test
1657
1658            tokio::time::sleep(std::time::Duration::from_secs(8)).await;
1659
1660            let block = anvil_provider
1661                .get_block_number()
1662                .await
1663                .expect("latest l1 block");
1664
1665            let (read_offset, persisted_events) = persistence.load_events(0, block).await?;
1666            let read_offset = read_offset.unwrap();
1667            let l1_block = match read_offset {
1668                EventsPersistenceRead::Complete => block,
1669                EventsPersistenceRead::UntilL1Block(block) => block,
1670            };
1671
1672            tracing::info!("{l1_block:?}, persistence events = {persisted_events:?}.");
1673            assert!(persisted_events.len() > prev_events_len);
1674
1675            assert!(l1_block > prev_l1_block, "events not updated");
1676
1677            let contract_events =
1678                Fetcher::fetch_events_from_contract(l1_client.clone(), st_addr, None, l1_block)
1679                    .await?;
1680            assert_eq!(persisted_events, contract_events);
1681
1682            prev_l1_block = l1_block;
1683            prev_events_len = persisted_events.len();
1684        }
1685
1686        Ok(())
1687    }
1688
1689    #[rstest_reuse::apply(persistence_types)]
1690    pub async fn test_membership_persistence<P: TestablePersistence>(
1691        _p: PhantomData<P>,
1692    ) -> anyhow::Result<()> {
1693        let tmp = P::tmp_storage().await;
1694        let mut opt = P::options(&tmp);
1695
1696        let storage = opt.create().await.unwrap();
1697
1698        let validator = Validator::mock();
1699        let mut st = IndexMap::new();
1700        st.insert(validator.account, validator);
1701
1702        storage
1703            .store_stake(EpochNumber::new(10), st.clone(), None, None)
1704            .await?;
1705
1706        let (table, ..) = storage.load_stake(EpochNumber::new(10)).await?.unwrap();
1707        assert_eq!(st, table);
1708
1709        let val2 = Validator::mock();
1710        let mut st2 = IndexMap::new();
1711        st2.insert(val2.account, val2);
1712        storage
1713            .store_stake(EpochNumber::new(11), st2.clone(), None, None)
1714            .await?;
1715
1716        let tables = storage.load_latest_stake(4).await?.unwrap();
1717        let mut iter = tables.iter();
1718        assert_eq!(
1719            Some(&(EpochNumber::new(11), (st2.clone(), None), None)),
1720            iter.next()
1721        );
1722        assert_eq!(Some(&(EpochNumber::new(10), (st, None), None)), iter.next());
1723        assert_eq!(None, iter.next());
1724
1725        for i in 0..=20 {
1726            storage
1727                .store_stake(EpochNumber::new(i), st2.clone(), None, None)
1728                .await?;
1729        }
1730
1731        let tables = storage.load_latest_stake(5).await?.unwrap();
1732        let mut iter = tables.iter();
1733        assert_eq!(
1734            Some(&(EpochNumber::new(20), (st2.clone(), None), None)),
1735            iter.next()
1736        );
1737        assert_eq!(
1738            Some(&(EpochNumber::new(19), (st2.clone(), None), None)),
1739            iter.next()
1740        );
1741        assert_eq!(
1742            Some(&(EpochNumber::new(18), (st2.clone(), None), None)),
1743            iter.next()
1744        );
1745        assert_eq!(
1746            Some(&(EpochNumber::new(17), (st2.clone(), None), None)),
1747            iter.next()
1748        );
1749        assert_eq!(
1750            Some(&(EpochNumber::new(16), (st2, None), None)),
1751            iter.next()
1752        );
1753        assert_eq!(None, iter.next());
1754
1755        Ok(())
1756    }
1757
1758    #[rstest_reuse::apply(persistence_types)]
1759    pub async fn test_store_and_load_all_validators<P: TestablePersistence>(
1760        _p: PhantomData<P>,
1761    ) -> anyhow::Result<()> {
1762        let tmp = P::tmp_storage().await;
1763        let mut opt = P::options(&tmp);
1764        let storage = opt.create().await.unwrap();
1765
1766        let mut vmap1 = IndexMap::new();
1767        for _i in 0..25 {
1768            let v = Validator::mock();
1769            vmap1.insert(v.account, v);
1770        }
1771        storage
1772            .store_all_validators(EpochNumber::new(10), vmap1.clone())
1773            .await?;
1774
1775        let mut expected_all: Vec<_> = vmap1.clone().into_values().collect();
1776        expected_all.sort_by_key(|v| v.account);
1777
1778        // Load all
1779        let loaded_all = storage
1780            .load_all_validators(EpochNumber::new(10), 0, 100)
1781            .await?;
1782        // SQLite returns a different ordered list even though there is an `ORDER BY address ASC` clause
1783        assert_eq!(expected_all, loaded_all);
1784
1785        // Load first 10
1786        let loaded_first_10 = storage
1787            .load_all_validators(EpochNumber::new(10), 0, 10)
1788            .await?;
1789
1790        assert_eq!(expected_all[..10], loaded_first_10);
1791
1792        // Load next 10
1793        let loaded_next_10 = storage
1794            .load_all_validators(EpochNumber::new(10), 10, 10)
1795            .await?;
1796
1797        assert_eq!(expected_all[10..20], loaded_next_10);
1798
1799        // Load remaining 5
1800        let loaded_last_5 = storage
1801            .load_all_validators(EpochNumber::new(10), 20, 10)
1802            .await?;
1803
1804        assert_eq!(expected_all[20..], loaded_last_5);
1805
1806        // offset beyond size should return empty
1807        let loaded_empty = storage
1808            .load_all_validators(EpochNumber::new(10), 100, 10)
1809            .await?;
1810        assert!(loaded_empty.is_empty());
1811
1812        // epoch 11
1813        let validator2 = Validator::mock();
1814        let mut vmap2 = IndexMap::new();
1815        vmap2.insert(validator2.account, validator2.clone());
1816
1817        storage
1818            .store_all_validators(EpochNumber::new(11), vmap2.clone())
1819            .await?;
1820
1821        let mut expected_epoch11: Vec<_> = vmap2.clone().into_values().collect();
1822        expected_epoch11.sort_by_key(|v| v.account);
1823
1824        let loaded2 = storage
1825            .load_all_validators(EpochNumber::new(11), 0, 100)
1826            .await?;
1827
1828        assert_eq!(expected_epoch11, loaded2);
1829
1830        // Epoch 10 still there
1831        let loaded1_again = storage
1832            .load_all_validators(EpochNumber::new(10), 0, 100)
1833            .await?;
1834
1835        assert_eq!(expected_all, loaded1_again);
1836
1837        Ok(())
1838    }
1839
1840    #[rstest_reuse::apply(persistence_types)]
1841    pub async fn test_non_consecutive_decide<P: TestablePersistence>(_p: PhantomData<P>) {
1842        let tmp = P::tmp_storage().await;
1843        let storage = P::connect(&tmp).await;
1844
1845        let genesis_leaf: Leaf2 =
1846            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
1847        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1848            proposal: QuorumProposal2::<SeqTypes> {
1849                epoch: None,
1850                block_header: genesis_leaf.block_header().clone(),
1851                view_number: genesis_leaf.view_number(),
1852                justify_qc: QuorumCertificate2::genesis::<TestVersions>(
1853                    &ValidatedState::default(),
1854                    &NodeState::mock(),
1855                )
1856                .await,
1857                upgrade_certificate: None,
1858                view_change_evidence: None,
1859                next_drb_result: None,
1860                next_epoch_justify_qc: None,
1861                state_cert: None,
1862            },
1863        };
1864
1865        let leaf0 = Leaf2::from_quorum_proposal(&quorum_proposal);
1866
1867        quorum_proposal.proposal.view_number = ViewNumber::new(2);
1868        *quorum_proposal.proposal.block_header.height_mut() = 2;
1869        quorum_proposal.proposal.justify_qc.view_number = ViewNumber::new(1);
1870        let leaf2 = Leaf2::from_quorum_proposal(&quorum_proposal);
1871
1872        let mut qc0 = leaf0.justify_qc();
1873        qc0.data.leaf_commit = Committable::commit(&leaf0);
1874
1875        let mut qc2 = leaf2.justify_qc();
1876        qc2.view_number += 1;
1877        qc2.data.leaf_commit = Committable::commit(&leaf2);
1878
1879        let mut deciding_qc = qc2.clone();
1880        deciding_qc.view_number += 1;
1881
1882        // Decide the first leaf, but fail to generate a decide event.
1883        storage
1884            .append_decided_leaves(
1885                ViewNumber::new(0),
1886                [(
1887                    &leaf_info(leaf0.clone()),
1888                    CertificatePair::non_epoch_change(qc0),
1889                )],
1890                None,
1891                &FailConsumer,
1892            )
1893            .await
1894            .unwrap();
1895
1896        // Later, decide a new leaf, but skipping some leaf in the middle. This should generate
1897        // decide events for both the leaves, correctly separating into two events since the leaves
1898        // are non-consecutive, and correctly applying `deciding_qc` only to the last event.
1899        let consumer = EventCollector::default();
1900        storage
1901            .append_decided_leaves(
1902                ViewNumber::new(2),
1903                [(
1904                    &leaf_info(leaf2.clone()),
1905                    CertificatePair::non_epoch_change(qc2),
1906                )],
1907                Some(Arc::new(CertificatePair::non_epoch_change(
1908                    deciding_qc.clone(),
1909                ))),
1910                &consumer,
1911            )
1912            .await
1913            .unwrap();
1914
1915        let events = consumer.events.read().await;
1916        assert_eq!(events.len(), 2);
1917
1918        let EventType::Decide {
1919            leaf_chain: leaf_chain0,
1920            deciding_qc: deciding_qc0,
1921            ..
1922        } = &events[0].event
1923        else {
1924            panic!("expected decide event, got {:?}", events[0].event);
1925        };
1926        assert_eq!(leaf_chain0.len(), 1);
1927        assert_eq!(leaf_chain0[0].leaf, leaf0);
1928        assert_eq!(*deciding_qc0, None);
1929
1930        let EventType::Decide {
1931            leaf_chain: leaf_chain2,
1932            deciding_qc: deciding_qc2,
1933            ..
1934        } = &events[1].event
1935        else {
1936            panic!("expected decide event, got {:?}", events[1].event);
1937        };
1938        assert_eq!(leaf_chain2.len(), 1);
1939        assert_eq!(leaf_chain2[0].leaf, leaf2);
1940        assert_eq!(deciding_qc2.as_ref().unwrap().qc(), &deciding_qc);
1941    }
1942}