sequencer/
api.rs

1use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
2
3use anyhow::{bail, Context};
4use async_lock::RwLock;
5use async_once_cell::Lazy;
6use async_trait::async_trait;
7use committable::Commitment;
8use data_source::{
9    CatchupDataSource, RequestResponseDataSource, StakeTableDataSource, StakeTableWithEpochNumber,
10    SubmitDataSource,
11};
12use derivative::Derivative;
13use espresso_types::{
14    config::PublicNetworkConfig,
15    retain_accounts,
16    v0::traits::SequencerPersistence,
17    v0_3::{ChainConfig, RewardAccountV1, RewardAmount, RewardMerkleTreeV1},
18    v0_4::{RewardAccountV2, RewardMerkleTreeV2},
19    AccountQueryData, BlockMerkleTree, FeeAccount, FeeMerkleTree, Leaf2, NodeState, PubKey,
20    Transaction, ValidatorMap,
21};
22use futures::{
23    future::{BoxFuture, Future, FutureExt},
24    stream::BoxStream,
25};
26use hotshot_events_service::events_source::{
27    EventFilterSet, EventsSource, EventsStreamer, StartupInfo,
28};
29use hotshot_query_service::{
30    availability::VidCommonQueryData, data_source::ExtensibleDataSource, VidCommon,
31};
32use hotshot_types::{
33    data::{EpochNumber, VidCommitment, VidShare, ViewNumber},
34    event::{Event, LegacyEvent},
35    light_client::LCV3StateSignatureRequestBody,
36    network::NetworkConfig,
37    traits::{
38        network::ConnectedNetwork,
39        node_implementation::{NodeType, Versions},
40    },
41    vid::avidm::{init_avidm_param, AvidMScheme},
42    vote::HasViewNumber,
43    PeerConfig,
44};
45use itertools::Itertools;
46use jf_merkle_tree::MerkleTreeScheme;
47use rand::Rng;
48use request_response::RequestType;
49use tokio::time::timeout;
50
51use self::data_source::{HotShotConfigDataSource, NodeStateDataSource, StateSignatureDataSource};
52use crate::{
53    catchup::{
54        add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
55        add_v2_reward_accounts_to_state, CatchupStorage,
56    },
57    context::Consensus,
58    request_response::{
59        data_source::{retain_v1_reward_accounts, retain_v2_reward_accounts},
60        request::{Request, Response},
61    },
62    state_signature::StateSigner,
63    SeqTypes, SequencerApiVersion, SequencerContext,
64};
65
66pub mod data_source;
67pub mod endpoints;
68pub mod fs;
69pub mod options;
70pub mod sql;
71mod update;
72
73pub use options::Options;
74
75pub type BlocksFrontier = <BlockMerkleTree as MerkleTreeScheme>::MembershipProof;
76
77type BoxLazy<T> = Pin<Arc<Lazy<T, BoxFuture<'static, T>>>>;
78
79#[derive(Derivative)]
80#[derivative(Clone(bound = ""), Debug(bound = ""))]
81struct ApiState<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> {
82    // The consensus state is initialized lazily so we can start the API (and healthcheck endpoints)
83    // before consensus has started. Any endpoint that uses consensus state will wait for
84    // initialization to finish, but endpoints that do not require a consensus handle can proceed
85    // without waiting.
86    #[derivative(Debug = "ignore")]
87    sequencer_context: BoxLazy<SequencerContext<N, P, V>>,
88}
89
90impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> ApiState<N, P, V> {
91    fn new(context_init: impl Future<Output = SequencerContext<N, P, V>> + Send + 'static) -> Self {
92        Self {
93            sequencer_context: Arc::pin(Lazy::from_future(context_init.boxed())),
94        }
95    }
96
97    async fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
98        self.sequencer_context
99            .as_ref()
100            .get()
101            .await
102            .get_ref()
103            .state_signer()
104    }
105
106    async fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
107        self.sequencer_context
108            .as_ref()
109            .get()
110            .await
111            .get_ref()
112            .event_streamer()
113    }
114
115    async fn consensus(&self) -> Arc<RwLock<Consensus<N, P, V>>> {
116        self.sequencer_context
117            .as_ref()
118            .get()
119            .await
120            .get_ref()
121            .consensus()
122    }
123
124    async fn network_config(&self) -> NetworkConfig<SeqTypes> {
125        self.sequencer_context
126            .as_ref()
127            .get()
128            .await
129            .get_ref()
130            .network_config()
131    }
132}
133
134type StorageState<N, P, D, V> = ExtensibleDataSource<D, ApiState<N, P, V>>;
135
136#[async_trait]
137impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> EventsSource<SeqTypes>
138    for ApiState<N, P, V>
139{
140    type EventStream = BoxStream<'static, Arc<Event<SeqTypes>>>;
141    type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<SeqTypes>>>;
142
143    async fn get_event_stream(
144        &self,
145        _filter: Option<EventFilterSet<SeqTypes>>,
146    ) -> Self::EventStream {
147        self.event_streamer()
148            .await
149            .read()
150            .await
151            .get_event_stream(None)
152            .await
153    }
154
155    async fn get_legacy_event_stream(
156        &self,
157        _filter: Option<EventFilterSet<SeqTypes>>,
158    ) -> Self::LegacyEventStream {
159        self.event_streamer()
160            .await
161            .read()
162            .await
163            .get_legacy_event_stream(None)
164            .await
165    }
166
167    async fn get_startup_info(&self) -> StartupInfo<SeqTypes> {
168        self.event_streamer()
169            .await
170            .read()
171            .await
172            .get_startup_info()
173            .await
174    }
175}
176
177impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, V: Versions, P: SequencerPersistence>
178    SubmitDataSource<N, P> for StorageState<N, P, D, V>
179{
180    async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
181        self.as_ref().submit(tx).await
182    }
183}
184
185impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
186    StakeTableDataSource<SeqTypes> for StorageState<N, P, D, V>
187{
188    /// Get the stake table for a given epoch
189    async fn get_stake_table(
190        &self,
191        epoch: Option<<SeqTypes as NodeType>::Epoch>,
192    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
193        self.as_ref().get_stake_table(epoch).await
194    }
195
196    /// Get the stake table for the current epoch if not provided
197    async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
198        self.as_ref().get_stake_table_current().await
199    }
200
201    /// Get all the validators
202    async fn get_validators(
203        &self,
204        epoch: <SeqTypes as NodeType>::Epoch,
205    ) -> anyhow::Result<ValidatorMap> {
206        self.as_ref().get_validators(epoch).await
207    }
208
209    async fn get_block_reward(
210        &self,
211        epoch: Option<EpochNumber>,
212    ) -> anyhow::Result<Option<RewardAmount>> {
213        self.as_ref().get_block_reward(epoch).await
214    }
215    /// Get all the validator participation for the current epoch
216    async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
217        self.as_ref().current_proposal_participation().await
218    }
219    async fn previous_proposal_participation(&self) -> HashMap<PubKey, f64> {
220        self.as_ref().previous_proposal_participation().await
221    }
222}
223
224impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
225    StakeTableDataSource<SeqTypes> for ApiState<N, P, V>
226{
227    /// Get the stake table for a given epoch
228    async fn get_stake_table(
229        &self,
230        epoch: Option<<SeqTypes as NodeType>::Epoch>,
231    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
232        let highest_epoch = self
233            .consensus()
234            .await
235            .read()
236            .await
237            .cur_epoch()
238            .await
239            .map(|e| e + 1);
240        if epoch > highest_epoch {
241            return Err(anyhow::anyhow!(
242                "requested stake table for epoch {epoch:?} is beyond the current epoch + 1 \
243                 {highest_epoch:?}"
244            ));
245        }
246        let mem = self
247            .consensus()
248            .await
249            .read()
250            .await
251            .membership_coordinator
252            .stake_table_for_epoch(epoch)
253            .await?;
254
255        Ok(mem.stake_table().await.0)
256    }
257
258    /// Get the stake table for the current epoch and return it along with the epoch number
259    async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
260        let epoch = self.consensus().await.read().await.cur_epoch().await;
261
262        Ok(StakeTableWithEpochNumber {
263            epoch,
264            stake_table: self.get_stake_table(epoch).await?,
265        })
266    }
267
268    async fn get_block_reward(
269        &self,
270        epoch: Option<EpochNumber>,
271    ) -> anyhow::Result<Option<RewardAmount>> {
272        let coordinator = self
273            .consensus()
274            .await
275            .read()
276            .await
277            .membership_coordinator
278            .clone();
279
280        let membership = coordinator.membership().read().await;
281
282        Ok(membership.block_reward(epoch))
283    }
284
285    /// Get the whole validators map
286    async fn get_validators(
287        &self,
288        epoch: <SeqTypes as NodeType>::Epoch,
289    ) -> anyhow::Result<ValidatorMap> {
290        let mem = self
291            .consensus()
292            .await
293            .read()
294            .await
295            .membership_coordinator
296            .membership_for_epoch(Some(epoch))
297            .await
298            .context("membership not found")?;
299
300        let r = mem.coordinator.membership().read().await;
301        r.active_validators(&epoch)
302    }
303
304    /// Get the current proposal participation.
305    async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
306        self.consensus()
307            .await
308            .read()
309            .await
310            .consensus()
311            .read()
312            .await
313            .current_proposal_participation()
314    }
315
316    /// Get the previous proposal participation.
317    async fn previous_proposal_participation(&self) -> HashMap<PubKey, f64> {
318        self.consensus()
319            .await
320            .read()
321            .await
322            .consensus()
323            .read()
324            .await
325            .previous_proposal_participation()
326    }
327}
328
329#[async_trait]
330impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
331    RequestResponseDataSource<SeqTypes> for StorageState<N, P, D, V>
332{
333    async fn request_vid_shares(
334        &self,
335        block_number: u64,
336        vid_common_data: VidCommonQueryData<SeqTypes>,
337        timeout_duration: Duration,
338    ) -> anyhow::Result<Vec<VidShare>> {
339        self.as_ref()
340            .request_vid_shares(block_number, vid_common_data, timeout_duration)
341            .await
342    }
343}
344
345#[async_trait]
346impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
347    RequestResponseDataSource<SeqTypes> for ApiState<N, P, V>
348{
349    async fn request_vid_shares(
350        &self,
351        block_number: u64,
352        vid_common_data: VidCommonQueryData<SeqTypes>,
353        duration: Duration,
354    ) -> anyhow::Result<Vec<VidShare>> {
355        // Get a handle to the request response protocol
356        let request_response_protocol = self
357            .sequencer_context
358            .as_ref()
359            .get()
360            .await
361            .request_response_protocol
362            .clone();
363
364        // Get the total VID weight based on the VID common data
365        let total_weight = match vid_common_data.common() {
366            VidCommon::V0(_) => {
367                // TODO: This needs to be done via the stake table
368                return Err(anyhow::anyhow!(
369                    "V0 total weight calculation not supported yet"
370                ));
371            },
372            VidCommon::V1(v1) => v1.total_weights,
373        };
374
375        // Create the AvidM parameters from the total weight
376        let avidm_param =
377            init_avidm_param(total_weight).with_context(|| "failed to initialize avidm param")?;
378
379        // Get the payload hash for verification
380        let VidCommitment::V1(local_payload_hash) = vid_common_data.payload_hash() else {
381            bail!("V0 share verification not supported yet");
382        };
383
384        // Create a random request id
385        let request_id = rand::thread_rng().gen();
386
387        // Request and verify the shares from all other nodes, timing out after `duration` seconds
388        let received_shares = Arc::new(parking_lot::Mutex::new(Vec::new()));
389        let received_shares_clone = received_shares.clone();
390        let request_result: anyhow::Result<_, _> = timeout(
391            duration,
392            request_response_protocol.request_indefinitely::<_, _, _>(
393                Request::VidShare(block_number, request_id),
394                RequestType::Broadcast,
395                move |_request, response| {
396                    let avidm_param = avidm_param.clone();
397                    let received_shares = received_shares_clone.clone();
398                    async move {
399                        // Make sure the response was a V1 share
400                        let Response::VidShare(VidShare::V1(received_share)) = response else {
401                            bail!("V0 share verification not supported yet");
402                        };
403
404                        // Verify the share
405                        let Ok(Ok(_)) = AvidMScheme::verify_share(
406                            &avidm_param,
407                            &local_payload_hash,
408                            &received_share,
409                        ) else {
410                            bail!("share verification failed");
411                        };
412
413                        // Add the share to the list of received shares
414                        received_shares.lock().push(received_share);
415
416                        bail!("waiting for more shares");
417
418                        #[allow(unreachable_code)]
419                        Ok(())
420                    }
421                },
422            ),
423        )
424        .await;
425
426        // If the request timed out, return the shares we have collected so far
427        match request_result {
428            Err(_) => {
429                // If it timed out, this was successful. Return the shares we have collected so far
430                Ok(received_shares
431                    .lock()
432                    .clone()
433                    .into_iter()
434                    .map(VidShare::V1)
435                    .collect())
436            },
437
438            // If it was an error from the inner request, return that error
439            Ok(Err(e)) => Err(e).with_context(|| "failed to request vid shares"),
440
441            // If it was successful, this was unexpected.
442            Ok(Ok(_)) => bail!("this should not be possible"),
443        }
444    }
445}
446
447impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> SubmitDataSource<N, P>
448    for ApiState<N, P, V>
449{
450    async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
451        let handle = self.consensus().await;
452
453        let consensus_read_lock = handle.read().await;
454
455        // Fetch full chain config from the validated state, if present.
456        // This is necessary because we support chain config upgrades,
457        // so the updated chain config is found in the validated state.
458        let cf = consensus_read_lock
459            .decided_state()
460            .await
461            .chain_config
462            .resolve();
463
464        // Use the chain config from the validated state if available,
465        // otherwise, use the node state's chain config
466        // The node state's chain config is the node's base version chain config
467        let cf = match cf {
468            Some(cf) => cf,
469            None => self.node_state().await.chain_config,
470        };
471
472        let max_block_size: u64 = cf.max_block_size.into();
473        let txn_size = tx.payload().len() as u64;
474
475        // reject transaction bigger than block size
476        if txn_size > max_block_size {
477            bail!("transaction size ({txn_size}) is greater than max_block_size ({max_block_size})")
478        }
479
480        consensus_read_lock.submit_transaction(tx).await?;
481        Ok(())
482    }
483}
484
485impl<N, P, D, V> NodeStateDataSource for StorageState<N, P, D, V>
486where
487    N: ConnectedNetwork<PubKey>,
488    V: Versions,
489    P: SequencerPersistence,
490    D: Sync,
491{
492    async fn node_state(&self) -> NodeState {
493        self.as_ref().node_state().await
494    }
495}
496
497impl<
498        N: ConnectedNetwork<PubKey>,
499        V: Versions,
500        P: SequencerPersistence,
501        D: CatchupStorage + Send + Sync,
502    > CatchupDataSource for StorageState<N, P, D, V>
503{
504    #[tracing::instrument(skip(self, instance))]
505    async fn get_accounts(
506        &self,
507        instance: &NodeState,
508        height: u64,
509        view: ViewNumber,
510        accounts: &[FeeAccount],
511    ) -> anyhow::Result<FeeMerkleTree> {
512        // Check if we have the desired state in memory.
513        match self
514            .as_ref()
515            .get_accounts(instance, height, view, accounts)
516            .await
517        {
518            Ok(accounts) => return Ok(accounts),
519            Err(err) => {
520                tracing::info!("accounts not in memory, trying storage: {err:#}");
521            },
522        }
523
524        // Try storage.
525        let (tree, leaf) = self
526            .inner()
527            .get_accounts(instance, height, view, accounts)
528            .await
529            .context("accounts not in memory, and could not fetch from storage")?;
530        // If we successfully fetched accounts from storage, try to add them back into the in-memory
531        // state.
532
533        let consensus = self
534            .as_ref()
535            .consensus()
536            .await
537            .read()
538            .await
539            .consensus()
540            .clone();
541        if let Err(err) =
542            add_fee_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf).await
543        {
544            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
545        }
546        tracing::info!(?view, "updated with fetched account state");
547
548        Ok(tree)
549    }
550
551    #[tracing::instrument(skip(self, instance))]
552    async fn get_frontier(
553        &self,
554        instance: &NodeState,
555        height: u64,
556        view: ViewNumber,
557    ) -> anyhow::Result<BlocksFrontier> {
558        // Check if we have the desired state in memory.
559        match self.as_ref().get_frontier(instance, height, view).await {
560            Ok(frontier) => return Ok(frontier),
561            Err(err) => {
562                tracing::info!("frontier is not in memory, trying storage: {err:#}");
563            },
564        }
565
566        // Try storage.
567        self.inner().get_frontier(instance, height, view).await
568    }
569
570    async fn get_chain_config(
571        &self,
572        commitment: Commitment<ChainConfig>,
573    ) -> anyhow::Result<ChainConfig> {
574        // Check if we have the desired state in memory.
575        match self.as_ref().get_chain_config(commitment).await {
576            Ok(cf) => return Ok(cf),
577            Err(err) => {
578                tracing::info!("chain config is not in memory, trying storage: {err:#}");
579            },
580        }
581
582        // Try storage.
583        self.inner().get_chain_config(commitment).await
584    }
585    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
586        // Check if we have the desired state in memory.
587        match self.as_ref().get_leaf_chain(height).await {
588            Ok(cf) => return Ok(cf),
589            Err(err) => {
590                tracing::info!("leaf chain is not in memory, trying storage: {err:#}");
591            },
592        }
593
594        // Try storage.
595        self.inner().get_leaf_chain(height).await
596    }
597
598    #[tracing::instrument(skip(self, instance))]
599    async fn get_reward_accounts_v2(
600        &self,
601        instance: &NodeState,
602        height: u64,
603        view: ViewNumber,
604        accounts: &[RewardAccountV2],
605    ) -> anyhow::Result<RewardMerkleTreeV2> {
606        // Check if we have the desired state in memory.
607        match self
608            .as_ref()
609            .get_reward_accounts_v2(instance, height, view, accounts)
610            .await
611        {
612            Ok(accounts) => return Ok(accounts),
613            Err(err) => {
614                tracing::info!("reward accounts not in memory, trying storage: {err:#}");
615            },
616        }
617
618        // Try storage.
619        let (tree, leaf) = self
620            .inner()
621            .get_reward_accounts_v2(instance, height, view, accounts)
622            .await
623            .context("accounts not in memory, and could not fetch from storage")?;
624
625        // If we successfully fetched accounts from storage, try to add them back into the in-memory
626        // state.
627        let consensus = self
628            .as_ref()
629            .consensus()
630            .await
631            .read()
632            .await
633            .consensus()
634            .clone();
635        if let Err(err) =
636            add_v2_reward_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf)
637                .await
638        {
639            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
640        }
641        tracing::info!(?view, "updated with fetched account state");
642
643        Ok(tree)
644    }
645
646    #[tracing::instrument(skip(self, instance))]
647    async fn get_reward_accounts_v1(
648        &self,
649        instance: &NodeState,
650        height: u64,
651        view: ViewNumber,
652        accounts: &[RewardAccountV1],
653    ) -> anyhow::Result<RewardMerkleTreeV1> {
654        // Check if we have the desired state in memory.
655        match self
656            .as_ref()
657            .get_reward_accounts_v1(instance, height, view, accounts)
658            .await
659        {
660            Ok(accounts) => return Ok(accounts),
661            Err(err) => {
662                tracing::info!("reward accounts not in memory, trying storage: {err:#}");
663            },
664        }
665
666        // Try storage.
667        let (tree, leaf) = self
668            .inner()
669            .get_reward_accounts_v1(instance, height, view, accounts)
670            .await
671            .context("accounts not in memory, and could not fetch from storage")?;
672
673        // If we successfully fetched accounts from storage, try to add them back into the in-memory
674        // state.
675        let consensus = self
676            .as_ref()
677            .consensus()
678            .await
679            .read()
680            .await
681            .consensus()
682            .clone();
683        if let Err(err) =
684            add_v1_reward_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf)
685                .await
686        {
687            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
688        }
689        tracing::info!(?view, "updated with fetched account state");
690
691        Ok(tree)
692    }
693}
694
695impl<N, V, P> NodeStateDataSource for ApiState<N, P, V>
696where
697    N: ConnectedNetwork<PubKey>,
698    V: Versions,
699    P: SequencerPersistence,
700{
701    async fn node_state(&self) -> NodeState {
702        self.sequencer_context.as_ref().get().await.node_state()
703    }
704}
705
706impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> CatchupDataSource
707    for ApiState<N, P, V>
708{
709    #[tracing::instrument(skip(self, _instance))]
710    async fn get_accounts(
711        &self,
712        _instance: &NodeState,
713        height: u64,
714        view: ViewNumber,
715        accounts: &[FeeAccount],
716    ) -> anyhow::Result<FeeMerkleTree> {
717        let state = self
718            .consensus()
719            .await
720            .read()
721            .await
722            .state(view)
723            .await
724            .context(format!(
725                "state not available for height {height}, view {view}"
726            ))?;
727        retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
728    }
729
730    #[tracing::instrument(skip(self, _instance))]
731    async fn get_frontier(
732        &self,
733        _instance: &NodeState,
734        height: u64,
735        view: ViewNumber,
736    ) -> anyhow::Result<BlocksFrontier> {
737        let state = self
738            .consensus()
739            .await
740            .read()
741            .await
742            .state(view)
743            .await
744            .context(format!(
745                "state not available for height {height}, view {view}"
746            ))?;
747        let tree = &state.block_merkle_tree;
748        let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
749        Ok(frontier)
750    }
751
752    async fn get_chain_config(
753        &self,
754        commitment: Commitment<ChainConfig>,
755    ) -> anyhow::Result<ChainConfig> {
756        let state = self.consensus().await.read().await.decided_state().await;
757        let chain_config = state.chain_config;
758
759        if chain_config.commit() == commitment {
760            chain_config.resolve().context("chain config found")
761        } else {
762            bail!("chain config not found")
763        }
764    }
765
766    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
767        let mut leaves = self
768            .consensus()
769            .await
770            .read()
771            .await
772            .consensus()
773            .read()
774            .await
775            .undecided_leaves();
776        leaves.sort_by_key(|l| l.view_number());
777        let (position, mut last_leaf) = leaves
778            .iter()
779            .find_position(|l| l.height() == height)
780            .context(format!("leaf chain not available for {height}"))?;
781        let mut chain = vec![last_leaf.clone()];
782        for leaf in leaves.iter().skip(position + 1) {
783            if leaf.justify_qc().view_number() == last_leaf.view_number() {
784                chain.push(leaf.clone());
785            } else {
786                continue;
787            }
788            if leaf.view_number() == last_leaf.view_number() + 1 {
789                // one away from decide
790                last_leaf = leaf;
791                break;
792            }
793            last_leaf = leaf;
794        }
795        // Make sure we got one more leaf to confirm the decide
796        for leaf in leaves
797            .iter()
798            .skip_while(|l| l.view_number() <= last_leaf.view_number())
799        {
800            if leaf.justify_qc().view_number() == last_leaf.view_number() {
801                chain.push(leaf.clone());
802                return Ok(chain);
803            }
804        }
805        bail!(format!("leaf chain not available for {height}"))
806    }
807
808    #[tracing::instrument(skip(self, _instance))]
809    async fn get_reward_accounts_v2(
810        &self,
811        _instance: &NodeState,
812        height: u64,
813        view: ViewNumber,
814        accounts: &[RewardAccountV2],
815    ) -> anyhow::Result<RewardMerkleTreeV2> {
816        let state = self
817            .consensus()
818            .await
819            .read()
820            .await
821            .state(view)
822            .await
823            .context(format!(
824                "state not available for height {height}, view {view}"
825            ))?;
826
827        retain_v2_reward_accounts(&state.reward_merkle_tree_v2, accounts.iter().copied())
828    }
829
830    #[tracing::instrument(skip(self, _instance))]
831    async fn get_reward_accounts_v1(
832        &self,
833        _instance: &NodeState,
834        height: u64,
835        view: ViewNumber,
836        accounts: &[RewardAccountV1],
837    ) -> anyhow::Result<RewardMerkleTreeV1> {
838        let state = self
839            .consensus()
840            .await
841            .read()
842            .await
843            .state(view)
844            .await
845            .context(format!(
846                "state not available for height {height}, view {view}"
847            ))?;
848
849        retain_v1_reward_accounts(&state.reward_merkle_tree_v1, accounts.iter().copied())
850    }
851}
852
853impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
854    HotShotConfigDataSource for StorageState<N, P, D, V>
855{
856    async fn get_config(&self) -> PublicNetworkConfig {
857        self.as_ref().network_config().await.into()
858    }
859}
860
861impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> HotShotConfigDataSource
862    for ApiState<N, P, V>
863{
864    async fn get_config(&self) -> PublicNetworkConfig {
865        self.network_config().await.into()
866    }
867}
868
869#[async_trait]
870impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
871    StateSignatureDataSource<N> for StorageState<N, P, D, V>
872{
873    async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
874        self.as_ref().get_state_signature(height).await
875    }
876}
877
878#[async_trait]
879impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> StateSignatureDataSource<N>
880    for ApiState<N, P, V>
881{
882    async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
883        self.state_signer()
884            .await
885            .read()
886            .await
887            .get_state_signature(height)
888            .await
889    }
890}
891
892#[cfg(any(test, feature = "testing"))]
893pub mod test_helpers {
894    use std::time::Duration;
895
896    use alloy::{
897        network::EthereumWallet,
898        primitives::{Address, U256},
899        providers::{ext::AnvilApi, ProviderBuilder},
900    };
901    use committable::Committable;
902    use espresso_contract_deployer::{
903        builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
904        Contract, Contracts,
905    };
906    use espresso_types::{
907        v0::traits::{NullEventConsumer, PersistenceOptions, StateCatchup},
908        DrbAndHeaderUpgradeVersion, EpochVersion, FeeVersion, MockSequencerVersions, NamespaceId,
909        SequencerVersions, ValidatedState, V0_1,
910    };
911    use futures::{
912        future::{join_all, FutureExt},
913        stream::StreamExt,
914    };
915    use hotshot::types::{Event, EventType};
916    use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
917    use hotshot_types::{
918        event::LeafInfo,
919        light_client::LCV3StateSignatureRequestBody,
920        traits::{metrics::NoMetrics, node_implementation::ConsensusTime},
921        HotShotConfig,
922    };
923    use itertools::izip;
924    use jf_merkle_tree::{MerkleCommitment, MerkleTreeScheme};
925    use portpicker::pick_unused_port;
926    use staking_cli::demo::{setup_stake_table_contract_for_test, DelegationConfig};
927    use surf_disco::Client;
928    use tempfile::TempDir;
929    use tide_disco::{error::ServerError, Api, App, Error, StatusCode};
930    use tokio::{spawn, task::JoinHandle, time::sleep};
931    use url::Url;
932    use vbs::version::{StaticVersion, StaticVersionType};
933
934    use super::*;
935    use crate::{
936        catchup::NullStateCatchup,
937        network,
938        persistence::no_storage,
939        testing::{run_legacy_builder, wait_for_decide_on_handle, TestConfig, TestConfigBuilder},
940    };
941
942    pub const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
943
944    pub struct TestNetwork<P: PersistenceOptions, const NUM_NODES: usize, V: Versions> {
945        pub server: SequencerContext<network::Memory, P::Persistence, V>,
946        pub peers: Vec<SequencerContext<network::Memory, P::Persistence, V>>,
947        pub cfg: TestConfig<{ NUM_NODES }>,
948        // todo (abdul): remove this when fs storage is removed
949        pub temp_dir: Option<TempDir>,
950    }
951
952    pub struct TestNetworkConfig<const NUM_NODES: usize, P, C>
953    where
954        P: PersistenceOptions,
955        C: StateCatchup + 'static,
956    {
957        state: [ValidatedState; NUM_NODES],
958        persistence: [P; NUM_NODES],
959        catchup: [C; NUM_NODES],
960        network_config: TestConfig<{ NUM_NODES }>,
961        api_config: Options,
962    }
963
964    impl<const NUM_NODES: usize, P, C> TestNetworkConfig<{ NUM_NODES }, P, C>
965    where
966        P: PersistenceOptions,
967        C: StateCatchup + 'static,
968    {
969        pub fn states(&self) -> [ValidatedState; NUM_NODES] {
970            self.state.clone()
971        }
972    }
973    #[derive(Clone)]
974    pub struct TestNetworkConfigBuilder<const NUM_NODES: usize, P, C>
975    where
976        P: PersistenceOptions,
977        C: StateCatchup + 'static,
978    {
979        state: [ValidatedState; NUM_NODES],
980        persistence: Option<[P; NUM_NODES]>,
981        catchup: Option<[C; NUM_NODES]>,
982        api_config: Option<Options>,
983        network_config: Option<TestConfig<{ NUM_NODES }>>,
984    }
985
986    impl Default for TestNetworkConfigBuilder<5, no_storage::Options, NullStateCatchup> {
987        fn default() -> Self {
988            TestNetworkConfigBuilder {
989                state: std::array::from_fn(|_| ValidatedState::default()),
990                persistence: Some([no_storage::Options; 5]),
991                catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
992                network_config: None,
993                api_config: None,
994            }
995        }
996    }
997
998    pub enum AnyTestNetwork<P: PersistenceOptions, const NUM_NODES: usize> {
999        V0_1(TestNetwork<P, NUM_NODES, SequencerVersions<V0_1, V0_1>>),
1000        V0_2(TestNetwork<P, NUM_NODES, SequencerVersions<FeeVersion, FeeVersion>>),
1001        V0_3(TestNetwork<P, NUM_NODES, SequencerVersions<EpochVersion, EpochVersion>>),
1002        V0_4(
1003            TestNetwork<
1004                P,
1005                NUM_NODES,
1006                SequencerVersions<DrbAndHeaderUpgradeVersion, DrbAndHeaderUpgradeVersion>,
1007            >,
1008        ),
1009    }
1010
1011    impl<P: PersistenceOptions, const NUM_NODES: usize> AnyTestNetwork<P, NUM_NODES> {
1012        pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1013            match self {
1014                AnyTestNetwork::V0_1(network) => network.cfg.hotshot_config(),
1015                AnyTestNetwork::V0_2(network) => network.cfg.hotshot_config(),
1016                AnyTestNetwork::V0_3(network) => network.cfg.hotshot_config(),
1017                AnyTestNetwork::V0_4(network) => network.cfg.hotshot_config(),
1018            }
1019        }
1020    }
1021
1022    impl<const NUM_NODES: usize>
1023        TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1024    {
1025        pub fn with_num_nodes(
1026        ) -> TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1027        {
1028            TestNetworkConfigBuilder {
1029                state: std::array::from_fn(|_| ValidatedState::default()),
1030                persistence: Some([no_storage::Options; { NUM_NODES }]),
1031                catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1032                network_config: None,
1033                api_config: None,
1034            }
1035        }
1036    }
1037
1038    impl<const NUM_NODES: usize, P, C> TestNetworkConfigBuilder<{ NUM_NODES }, P, C>
1039    where
1040        P: PersistenceOptions,
1041        C: StateCatchup + 'static,
1042    {
1043        pub fn states(mut self, state: [ValidatedState; NUM_NODES]) -> Self {
1044            self.state = state;
1045            self
1046        }
1047
1048        pub fn persistences<NP: PersistenceOptions>(
1049            self,
1050            persistence: [NP; NUM_NODES],
1051        ) -> TestNetworkConfigBuilder<{ NUM_NODES }, NP, C> {
1052            TestNetworkConfigBuilder {
1053                state: self.state,
1054                catchup: self.catchup,
1055                network_config: self.network_config,
1056                api_config: self.api_config,
1057                persistence: Some(persistence),
1058            }
1059        }
1060
1061        pub fn api_config(mut self, api_config: Options) -> Self {
1062            self.api_config = Some(api_config);
1063            self
1064        }
1065
1066        pub fn catchups<NC: StateCatchup + 'static>(
1067            self,
1068            catchup: [NC; NUM_NODES],
1069        ) -> TestNetworkConfigBuilder<{ NUM_NODES }, P, NC> {
1070            TestNetworkConfigBuilder {
1071                state: self.state,
1072                catchup: Some(catchup),
1073                network_config: self.network_config,
1074                api_config: self.api_config,
1075                persistence: self.persistence,
1076            }
1077        }
1078
1079        pub fn network_config(mut self, network_config: TestConfig<{ NUM_NODES }>) -> Self {
1080            self.network_config = Some(network_config);
1081            self
1082        }
1083
1084        /// Setup for POS testing. Deploys contracts and adds the
1085        /// stake table address to state. Must be called before `build()`.
1086        pub async fn pos_hook<V: Versions>(
1087            self,
1088            delegation_config: DelegationConfig,
1089            stake_table_version: StakeTableContractVersion,
1090        ) -> anyhow::Result<Self> {
1091            if <V as Versions>::Upgrade::VERSION < EpochVersion::VERSION
1092                && <V as Versions>::Base::VERSION < EpochVersion::VERSION
1093            {
1094                panic!("given version does not require pos deployment");
1095            };
1096
1097            let network_config = self
1098                .network_config
1099                .as_ref()
1100                .expect("network_config is required");
1101
1102            let l1_url = network_config.l1_url();
1103            let signer = network_config.signer();
1104            let deployer = ProviderBuilder::new()
1105                .wallet(EthereumWallet::from(signer.clone()))
1106                .on_http(l1_url.clone());
1107
1108            let blocks_per_epoch = network_config.hotshot_config().epoch_height;
1109            let epoch_start_block = network_config.hotshot_config().epoch_start_block;
1110            let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1111                &network_config.hotshot_config().hotshot_stake_table(),
1112                STAKE_TABLE_CAPACITY_FOR_TEST,
1113            )
1114            .unwrap();
1115
1116            let mut contracts = Contracts::new();
1117            let args = DeployerArgsBuilder::default()
1118                .deployer(deployer.clone())
1119                .mock_light_client(true)
1120                .genesis_lc_state(genesis_state)
1121                .genesis_st_state(genesis_stake)
1122                .blocks_per_epoch(blocks_per_epoch)
1123                .epoch_start_block(epoch_start_block)
1124                .multisig_pauser(signer.address())
1125                .token_name("Espresso".to_string())
1126                .token_symbol("ESP".to_string())
1127                .initial_token_supply(U256::from(100000u64))
1128                .ops_timelock_delay(U256::from(0))
1129                .ops_timelock_admin(signer.address())
1130                .ops_timelock_proposers(vec![signer.address()])
1131                .ops_timelock_executors(vec![signer.address()])
1132                .safe_exit_timelock_delay(U256::from(10))
1133                .safe_exit_timelock_admin(signer.address())
1134                .safe_exit_timelock_proposers(vec![signer.address()])
1135                .safe_exit_timelock_executors(vec![signer.address()])
1136                .build()
1137                .unwrap();
1138
1139            match stake_table_version {
1140                StakeTableContractVersion::V1 => {
1141                    args.deploy_to_stake_table_v1(&mut contracts).await
1142                },
1143                StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1144            }
1145            .context("failed to deploy contracts")?;
1146
1147            let stake_table_address = contracts
1148                .address(Contract::StakeTableProxy)
1149                .expect("StakeTableProxy address not found");
1150            setup_stake_table_contract_for_test(
1151                l1_url.clone(),
1152                &deployer,
1153                stake_table_address,
1154                network_config.staking_priv_keys(),
1155                delegation_config,
1156            )
1157            .await
1158            .expect("stake table setup failed");
1159
1160            // enable interval mining with a 1s interval.
1161            // This ensures that blocks are finalized every second, even when there are no transactions.
1162            // It's useful for testing stake table updates,
1163            // which rely on the finalized L1 block number.
1164            if let Some(anvil) = network_config.anvil() {
1165                anvil
1166                    .anvil_set_interval_mining(1)
1167                    .await
1168                    .expect("interval mining");
1169            }
1170
1171            // Add stake table address to `ChainConfig` (held in state),
1172            // avoiding overwrite other values. Base fee is set to `0` to avoid
1173            // unnecessary catchup of `FeeState`.
1174            let state = self.state[0].clone();
1175            let chain_config = if let Some(cf) = state.chain_config.resolve() {
1176                ChainConfig {
1177                    base_fee: 0.into(),
1178                    stake_table_contract: Some(stake_table_address),
1179                    ..cf
1180                }
1181            } else {
1182                ChainConfig {
1183                    base_fee: 0.into(),
1184                    stake_table_contract: Some(stake_table_address),
1185                    ..Default::default()
1186                }
1187            };
1188
1189            let state = ValidatedState {
1190                chain_config: chain_config.into(),
1191                ..state
1192            };
1193            Ok(self.states(std::array::from_fn(|_| state.clone())))
1194        }
1195
1196        pub fn build(self) -> TestNetworkConfig<{ NUM_NODES }, P, C> {
1197            TestNetworkConfig {
1198                state: self.state,
1199                persistence: self.persistence.unwrap(),
1200                catchup: self.catchup.unwrap(),
1201                network_config: self.network_config.unwrap(),
1202                api_config: self.api_config.unwrap(),
1203            }
1204        }
1205    }
1206
1207    impl<P: PersistenceOptions, const NUM_NODES: usize, V: Versions> TestNetwork<P, { NUM_NODES }, V> {
1208        pub async fn new<C: StateCatchup + 'static>(
1209            cfg: TestNetworkConfig<{ NUM_NODES }, P, C>,
1210            bind_version: V,
1211        ) -> Self {
1212            let mut cfg = cfg;
1213            let mut builder_tasks = Vec::new();
1214
1215            let chain_config = cfg.state[0].chain_config.resolve();
1216            if chain_config.is_none() {
1217                tracing::warn!("Chain config is not set, using default max_block_size");
1218            }
1219            let (task, builder_url) = run_legacy_builder::<{ NUM_NODES }>(
1220                cfg.network_config.builder_port(),
1221                chain_config.map(|c| *c.max_block_size),
1222            )
1223            .await;
1224            builder_tasks.push(task);
1225            cfg.network_config
1226                .set_builder_urls(vec1::vec1![builder_url.clone()]);
1227
1228            // add default storage if none is provided as query module is now required
1229            let mut opt = cfg.api_config.clone();
1230            let temp_dir = if opt.storage_fs.is_none() && opt.storage_sql.is_none() {
1231                let temp_dir = tempfile::tempdir().unwrap();
1232                opt = opt.query_fs(
1233                    Default::default(),
1234                    crate::persistence::fs::Options::new(temp_dir.path().to_path_buf()),
1235                );
1236                Some(temp_dir)
1237            } else {
1238                None
1239            };
1240
1241            let mut nodes = join_all(
1242                izip!(cfg.state, cfg.persistence, cfg.catchup)
1243                    .enumerate()
1244                    .map(|(i, (state, persistence, state_peers))| {
1245                        let opt = opt.clone();
1246                        let cfg = &cfg.network_config;
1247                        let upgrades_map = cfg.upgrades();
1248                        async move {
1249                            if i == 0 {
1250                                opt.serve(|metrics, consumer, storage| {
1251                                    let cfg = cfg.clone();
1252                                    async move {
1253                                        Ok(cfg
1254                                            .init_node(
1255                                                0,
1256                                                state,
1257                                                persistence,
1258                                                Some(state_peers),
1259                                                storage,
1260                                                &*metrics,
1261                                                STAKE_TABLE_CAPACITY_FOR_TEST,
1262                                                consumer,
1263                                                bind_version,
1264                                                upgrades_map,
1265                                            )
1266                                            .await)
1267                                    }
1268                                    .boxed()
1269                                })
1270                                .await
1271                                .unwrap()
1272                            } else {
1273                                cfg.init_node(
1274                                    i,
1275                                    state,
1276                                    persistence,
1277                                    Some(state_peers),
1278                                    None,
1279                                    &NoMetrics,
1280                                    STAKE_TABLE_CAPACITY_FOR_TEST,
1281                                    NullEventConsumer,
1282                                    bind_version,
1283                                    upgrades_map,
1284                                )
1285                                .await
1286                            }
1287                        }
1288                    }),
1289            )
1290            .await;
1291
1292            let handle_0 = &nodes[0];
1293
1294            // Hook the builder(s) up to the event stream from the first node
1295            for builder_task in builder_tasks {
1296                builder_task.start(Box::new(handle_0.event_stream().await));
1297            }
1298
1299            for ctx in &nodes {
1300                ctx.start_consensus().await;
1301            }
1302
1303            let server = nodes.remove(0);
1304            let peers = nodes;
1305
1306            Self {
1307                server,
1308                peers,
1309                cfg: cfg.network_config,
1310                temp_dir,
1311            }
1312        }
1313
1314        pub async fn stop_consensus(&mut self) {
1315            self.server.shutdown_consensus().await;
1316
1317            for ctx in &mut self.peers {
1318                ctx.shutdown_consensus().await;
1319            }
1320        }
1321    }
1322
1323    /// Test the status API with custom options.
1324    ///
1325    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
1326    /// By default, the options are the minimal required to run this test (configuring a port and
1327    /// enabling the status API). `opt` may add additional functionality (e.g. adding a query module
1328    /// to test a different initialization path) but should not remove or modify the existing
1329    /// functionality (e.g. removing the status module or changing the port).
1330    pub async fn status_test_helper(opt: impl FnOnce(Options) -> Options) {
1331        let port = pick_unused_port().expect("No ports free");
1332        let url = format!("http://localhost:{port}").parse().unwrap();
1333        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1334
1335        let options = opt(Options::with_port(port));
1336        let network_config = TestConfigBuilder::default().build();
1337        let config = TestNetworkConfigBuilder::default()
1338            .api_config(options)
1339            .network_config(network_config)
1340            .build();
1341        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1342        client.connect(None).await;
1343
1344        // The status API is well tested in the query service repo. Here we are just smoke testing
1345        // that we set it up correctly. Wait for a (non-genesis) block to be sequenced and then
1346        // check the success rate metrics.
1347        while client
1348            .get::<u64>("status/block-height")
1349            .send()
1350            .await
1351            .unwrap()
1352            <= 1
1353        {
1354            sleep(Duration::from_secs(1)).await;
1355        }
1356        let success_rate = client
1357            .get::<f64>("status/success-rate")
1358            .send()
1359            .await
1360            .unwrap();
1361        // If metrics are populating correctly, we should get a finite number. If not, we might get
1362        // NaN or infinity due to division by 0.
1363        assert!(success_rate.is_finite(), "{success_rate}");
1364        // We know at least some views have been successful, since we finalized a block.
1365        assert!(success_rate > 0.0, "{success_rate}");
1366    }
1367
1368    /// Test the submit API with custom options.
1369    ///
1370    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
1371    /// By default, the options are the minimal required to run this test (configuring a port and
1372    /// enabling the submit API). `opt` may add additional functionality (e.g. adding a query module
1373    /// to test a different initialization path) but should not remove or modify the existing
1374    /// functionality (e.g. removing the submit module or changing the port).
1375    pub async fn submit_test_helper(opt: impl FnOnce(Options) -> Options) {
1376        let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3, 4]);
1377
1378        let port = pick_unused_port().expect("No ports free");
1379
1380        let url = format!("http://localhost:{port}").parse().unwrap();
1381        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1382
1383        let options = opt(Options::with_port(port).submit(Default::default()));
1384        let network_config = TestConfigBuilder::default().build();
1385        let config = TestNetworkConfigBuilder::default()
1386            .api_config(options)
1387            .network_config(network_config)
1388            .build();
1389        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1390        let mut events = network.server.event_stream().await;
1391
1392        client.connect(None).await;
1393
1394        let hash = client
1395            .post("submit/submit")
1396            .body_json(&txn)
1397            .unwrap()
1398            .send()
1399            .await
1400            .unwrap();
1401        assert_eq!(txn.commit(), hash);
1402
1403        // Wait for a Decide event containing transaction matching the one we sent
1404        wait_for_decide_on_handle(&mut events, &txn).await;
1405    }
1406
1407    /// Test the state signature API.
1408    pub async fn state_signature_test_helper(opt: impl FnOnce(Options) -> Options) {
1409        let port = pick_unused_port().expect("No ports free");
1410
1411        let url = format!("http://localhost:{port}").parse().unwrap();
1412
1413        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1414
1415        let options = opt(Options::with_port(port));
1416        let network_config = TestConfigBuilder::default().build();
1417        let config = TestNetworkConfigBuilder::default()
1418            .api_config(options)
1419            .network_config(network_config)
1420            .build();
1421        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1422
1423        let mut height: u64;
1424        // Wait for block >=2 appears
1425        // It's waiting for an extra second to make sure that the signature is generated
1426        loop {
1427            height = network.server.decided_leaf().await.height();
1428            sleep(std::time::Duration::from_secs(1)).await;
1429            if height >= 2 {
1430                break;
1431            }
1432        }
1433        // we cannot verify the signature now, because we don't know the stake table
1434        client
1435            .get::<LCV3StateSignatureRequestBody>(&format!("state-signature/block/{height}"))
1436            .send()
1437            .await
1438            .unwrap();
1439    }
1440
1441    /// Test the catchup API with custom options.
1442    ///
1443    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
1444    /// By default, the options are the minimal required to run this test (configuring a port and
1445    /// enabling the catchup API). `opt` may add additional functionality (e.g. adding a query module
1446    /// to test a different initialization path) but should not remove or modify the existing
1447    /// functionality (e.g. removing the catchup module or changing the port).
1448    pub async fn catchup_test_helper(opt: impl FnOnce(Options) -> Options) {
1449        let port = pick_unused_port().expect("No ports free");
1450        let url = format!("http://localhost:{port}").parse().unwrap();
1451        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1452
1453        let options = opt(Options::with_port(port));
1454        let network_config = TestConfigBuilder::default().build();
1455        let config = TestNetworkConfigBuilder::default()
1456            .api_config(options)
1457            .network_config(network_config)
1458            .build();
1459        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1460        client.connect(None).await;
1461
1462        // Wait for a few blocks to be decided.
1463        let mut events = network.server.event_stream().await;
1464        loop {
1465            if let Event {
1466                event: EventType::Decide { leaf_chain, .. },
1467                ..
1468            } = events.next().await.unwrap()
1469            {
1470                if leaf_chain
1471                    .iter()
1472                    .any(|LeafInfo { leaf, .. }| leaf.block_header().height() > 2)
1473                {
1474                    break;
1475                }
1476            }
1477        }
1478
1479        // Stop consensus running on the node so we freeze the decided and undecided states.
1480        // We'll let it go out of scope here since it's a write lock.
1481        {
1482            network.server.shutdown_consensus().await;
1483        }
1484
1485        // Undecided fee state: absent account.
1486        let leaf = network.server.decided_leaf().await;
1487        let height = leaf.height() + 1;
1488        let view = leaf.view_number() + 1;
1489        let res = client
1490            .get::<AccountQueryData>(&format!(
1491                "catchup/{height}/{}/account/{:x}",
1492                view.u64(),
1493                Address::default()
1494            ))
1495            .send()
1496            .await
1497            .unwrap();
1498        assert_eq!(res.balance, U256::ZERO);
1499        assert_eq!(
1500            res.proof
1501                .verify(
1502                    &network
1503                        .server
1504                        .state(view)
1505                        .await
1506                        .unwrap()
1507                        .fee_merkle_tree
1508                        .commitment()
1509                )
1510                .unwrap(),
1511            U256::ZERO,
1512        );
1513
1514        // Undecided block state.
1515        let res = client
1516            .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
1517            .send()
1518            .await
1519            .unwrap();
1520        let root = &network
1521            .server
1522            .state(view)
1523            .await
1524            .unwrap()
1525            .block_merkle_tree
1526            .commitment();
1527        BlockMerkleTree::verify(root, root.size() - 1, res)
1528            .unwrap()
1529            .unwrap();
1530    }
1531
1532    pub async fn spawn_dishonest_peer_catchup_api() -> anyhow::Result<(Url, JoinHandle<()>)> {
1533        let toml = toml::from_str::<toml::Value>(include_str!("../api/catchup.toml")).unwrap();
1534        let mut api =
1535            Api::<(), hotshot_query_service::Error, SequencerApiVersion>::new(toml).unwrap();
1536
1537        api.get("account", |_req, _state: &()| {
1538            async move {
1539                Result::<AccountQueryData, _>::Err(hotshot_query_service::Error::catch_all(
1540                    StatusCode::BAD_REQUEST,
1541                    "no account found".to_string(),
1542                ))
1543            }
1544            .boxed()
1545        })?
1546        .get("blocks", |_req, _state| {
1547            async move {
1548                Result::<BlocksFrontier, _>::Err(hotshot_query_service::Error::catch_all(
1549                    StatusCode::BAD_REQUEST,
1550                    "no block found".to_string(),
1551                ))
1552            }
1553            .boxed()
1554        })?
1555        .get("chainconfig", |_req, _state| {
1556            async move {
1557                Result::<ChainConfig, _>::Ok(ChainConfig {
1558                    max_block_size: 300.into(),
1559                    base_fee: 1.into(),
1560                    fee_recipient: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
1561                        .parse()
1562                        .unwrap(),
1563                    ..Default::default()
1564                })
1565            }
1566            .boxed()
1567        })?
1568        .get("leafchain", |_req, _state| {
1569            async move {
1570                Result::<Vec<Leaf2>, _>::Err(hotshot_query_service::Error::catch_all(
1571                    StatusCode::BAD_REQUEST,
1572                    "No leafchain found".to_string(),
1573                ))
1574            }
1575            .boxed()
1576        })?;
1577
1578        let mut app = App::<_, hotshot_query_service::Error>::with_state(());
1579        app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap());
1580
1581        app.register_module::<_, _>("catchup", api).unwrap();
1582
1583        let port = pick_unused_port().expect("no free port");
1584        let url: Url = Url::parse(&format!("http://localhost:{port}")).unwrap();
1585
1586        let handle = spawn({
1587            let url = url.clone();
1588            async move {
1589                let _ = app.serve(url, SequencerApiVersion::instance()).await;
1590            }
1591        });
1592
1593        Ok((url, handle))
1594    }
1595}
1596
1597#[cfg(test)]
1598mod api_tests {
1599    use std::{fmt::Debug, marker::PhantomData};
1600
1601    use committable::Committable;
1602    use data_source::testing::TestableSequencerDataSource;
1603    use espresso_types::{
1604        traits::{EventConsumer, PersistenceOptions},
1605        Header, Leaf2, MockSequencerVersions, NamespaceId, NamespaceProofQueryData, ValidatedState,
1606    };
1607    use futures::{future, stream::StreamExt};
1608    use hotshot_example_types::node_types::TestVersions;
1609    use hotshot_query_service::availability::{
1610        AvailabilityDataSource, BlockQueryData, VidCommonQueryData,
1611    };
1612    use hotshot_types::{
1613        data::{
1614            ns_table::parse_ns_table, vid_disperse::VidDisperseShare2, DaProposal2, EpochNumber,
1615            QuorumProposal2, QuorumProposalWrapper, VidCommitment,
1616        },
1617        event::LeafInfo,
1618        message::Proposal,
1619        simple_certificate::QuorumCertificate2,
1620        traits::{node_implementation::ConsensusTime, signature_key::SignatureKey, EncodeBytes},
1621        utils::EpochTransitionIndicator,
1622        vid::avidm::{init_avidm_param, AvidMScheme},
1623    };
1624    use portpicker::pick_unused_port;
1625    use surf_disco::Client;
1626    use test_helpers::{
1627        catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
1628        TestNetwork, TestNetworkConfigBuilder,
1629    };
1630    use tide_disco::error::ServerError;
1631    use vbs::version::StaticVersion;
1632
1633    use super::{update::ApiEventConsumer, *};
1634    use crate::{
1635        network,
1636        persistence::no_storage::NoStorage,
1637        testing::{wait_for_decide_on_handle, TestConfigBuilder},
1638    };
1639
1640    #[rstest_reuse::template]
1641    #[rstest::rstest]
1642    #[case(PhantomData::<crate::api::sql::DataSource>)]
1643    #[case(PhantomData::<crate::api::fs::DataSource>)]
1644    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1645    pub fn testable_sequencer_data_source<D: TestableSequencerDataSource>(
1646        #[case] _d: PhantomData<D>,
1647    ) {
1648    }
1649
1650    #[rstest_reuse::apply(testable_sequencer_data_source)]
1651    pub(crate) async fn submit_test_with_query_module<D: TestableSequencerDataSource>(
1652        _d: PhantomData<D>,
1653    ) {
1654        let storage = D::create_storage().await;
1655        submit_test_helper(|opt| D::options(&storage, opt)).await
1656    }
1657
1658    #[rstest_reuse::apply(testable_sequencer_data_source)]
1659    pub(crate) async fn status_test_with_query_module<D: TestableSequencerDataSource>(
1660        _d: PhantomData<D>,
1661    ) {
1662        let storage = D::create_storage().await;
1663        status_test_helper(|opt| D::options(&storage, opt)).await
1664    }
1665
1666    #[rstest_reuse::apply(testable_sequencer_data_source)]
1667    pub(crate) async fn state_signature_test_with_query_module<D: TestableSequencerDataSource>(
1668        _d: PhantomData<D>,
1669    ) {
1670        let storage = D::create_storage().await;
1671        state_signature_test_helper(|opt| D::options(&storage, opt)).await
1672    }
1673
1674    #[rstest_reuse::apply(testable_sequencer_data_source)]
1675    pub(crate) async fn test_namespace_query<D: TestableSequencerDataSource>(_d: PhantomData<D>) {
1676        // Arbitrary transaction, arbitrary namespace ID
1677        let ns_id = NamespaceId::from(42_u32);
1678        let txn = Transaction::new(ns_id, vec![1, 2, 3, 4]);
1679
1680        // Start query service.
1681        let port = pick_unused_port().expect("No ports free");
1682        let storage = D::create_storage().await;
1683        let network_config = TestConfigBuilder::default().build();
1684        let config = TestNetworkConfigBuilder::default()
1685            .api_config(D::options(&storage, Options::with_port(port)).submit(Default::default()))
1686            .network_config(network_config)
1687            .build();
1688        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1689        let mut events = network.server.event_stream().await;
1690
1691        // Connect client.
1692        let client: Client<ServerError, StaticVersion<0, 1>> =
1693            Client::new(format!("http://localhost:{port}").parse().unwrap());
1694        client.connect(None).await;
1695
1696        let hash = client
1697            .post("submit/submit")
1698            .body_json(&txn)
1699            .unwrap()
1700            .send()
1701            .await
1702            .unwrap();
1703        assert_eq!(txn.commit(), hash);
1704
1705        // Wait for a Decide event containing transaction matching the one we sent
1706        let block_height = wait_for_decide_on_handle(&mut events, &txn).await.0 as usize;
1707        tracing::info!(block_height, "transaction sequenced");
1708
1709        // Wait for the query service to update to this block height.
1710        client
1711            .socket(&format!("availability/stream/blocks/{block_height}"))
1712            .subscribe::<BlockQueryData<SeqTypes>>()
1713            .await
1714            .unwrap()
1715            .next()
1716            .await
1717            .unwrap()
1718            .unwrap();
1719
1720        let mut found_txn = false;
1721        let mut found_empty_block = false;
1722        for block_num in 0..=block_height {
1723            let header: Header = client
1724                .get(&format!("availability/header/{block_num}"))
1725                .send()
1726                .await
1727                .unwrap();
1728            let ns_query_res: NamespaceProofQueryData = client
1729                .get(&format!("availability/block/{block_num}/namespace/{ns_id}"))
1730                .send()
1731                .await
1732                .unwrap();
1733
1734            // Verify namespace proof if present
1735            if let Some(ns_proof) = ns_query_res.proof {
1736                let vid_common: VidCommonQueryData<SeqTypes> = client
1737                    .get(&format!("availability/vid/common/{block_num}"))
1738                    .send()
1739                    .await
1740                    .unwrap();
1741                ns_proof
1742                    .verify(
1743                        header.ns_table(),
1744                        &header.payload_commitment(),
1745                        vid_common.common(),
1746                    )
1747                    .unwrap();
1748            } else {
1749                // Namespace proof should be present if ns_id exists in ns_table
1750                assert!(header.ns_table().find_ns_id(&ns_id).is_none());
1751                assert!(ns_query_res.transactions.is_empty());
1752            }
1753
1754            found_empty_block = found_empty_block || ns_query_res.transactions.is_empty();
1755
1756            for txn in ns_query_res.transactions {
1757                if txn.commit() == hash {
1758                    // Ensure that we validate an inclusion proof
1759                    found_txn = true;
1760                }
1761            }
1762        }
1763        assert!(found_txn);
1764        assert!(found_empty_block);
1765    }
1766
1767    #[rstest_reuse::apply(testable_sequencer_data_source)]
1768    pub(crate) async fn catchup_test_with_query_module<D: TestableSequencerDataSource>(
1769        _d: PhantomData<D>,
1770    ) {
1771        let storage = D::create_storage().await;
1772        catchup_test_helper(|opt| D::options(&storage, opt)).await
1773    }
1774
1775    #[rstest_reuse::apply(testable_sequencer_data_source)]
1776    pub async fn test_non_consecutive_decide_with_failing_event_consumer<D>(_d: PhantomData<D>)
1777    where
1778        D: TestableSequencerDataSource + Debug + 'static,
1779    {
1780        #[derive(Clone, Copy, Debug)]
1781        struct FailConsumer;
1782
1783        #[async_trait]
1784        impl EventConsumer for FailConsumer {
1785            async fn handle_event(&self, _: &Event<SeqTypes>) -> anyhow::Result<()> {
1786                bail!("mock error injection");
1787            }
1788        }
1789
1790        let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);
1791
1792        let storage = D::create_storage().await;
1793        let persistence = D::persistence_options(&storage).create().await.unwrap();
1794        let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
1795            Arc::new(StorageState::new(
1796                D::create(D::persistence_options(&storage), Default::default(), false)
1797                    .await
1798                    .unwrap(),
1799                ApiState::new(future::pending()),
1800            ));
1801
1802        // Create two non-consecutive leaf chains.
1803        let mut chain1 = vec![];
1804
1805        let genesis = Leaf2::genesis::<TestVersions>(&Default::default(), &NodeState::mock()).await;
1806        let payload = genesis.block_payload().unwrap();
1807        let payload_bytes_arc = payload.encode();
1808
1809        let avidm_param = init_avidm_param(2).unwrap();
1810        let weights = vec![1u32; 2];
1811
1812        let ns_table = parse_ns_table(payload.byte_len().as_usize(), &payload.ns_table().encode());
1813        let (payload_commitment, shares) =
1814            AvidMScheme::ns_disperse(&avidm_param, &weights, &payload_bytes_arc, ns_table).unwrap();
1815
1816        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1817            proposal: QuorumProposal2::<SeqTypes> {
1818                block_header: genesis.block_header().clone(),
1819                view_number: ViewNumber::genesis(),
1820                justify_qc: QuorumCertificate2::genesis::<MockSequencerVersions>(
1821                    &ValidatedState::default(),
1822                    &NodeState::mock(),
1823                )
1824                .await,
1825                upgrade_certificate: None,
1826                view_change_evidence: None,
1827                next_drb_result: None,
1828                next_epoch_justify_qc: None,
1829                epoch: None,
1830                state_cert: None,
1831            },
1832        };
1833        let mut qc = QuorumCertificate2::genesis::<MockSequencerVersions>(
1834            &ValidatedState::default(),
1835            &NodeState::mock(),
1836        )
1837        .await;
1838
1839        let mut justify_qc = qc.clone();
1840        for i in 0..5 {
1841            *quorum_proposal.proposal.block_header.height_mut() = i;
1842            quorum_proposal.proposal.view_number = ViewNumber::new(i);
1843            quorum_proposal.proposal.justify_qc = justify_qc;
1844            let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
1845            qc.view_number = leaf.view_number();
1846            qc.data.leaf_commit = Committable::commit(&leaf);
1847            justify_qc = qc.clone();
1848            chain1.push((leaf.clone(), qc.clone()));
1849
1850            // Include a quorum proposal for each leaf.
1851            let quorum_proposal_signature =
1852                PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1853                    .expect("Failed to sign quorum_proposal");
1854            persistence
1855                .append_quorum_proposal2(&Proposal {
1856                    data: quorum_proposal.clone(),
1857                    signature: quorum_proposal_signature,
1858                    _pd: Default::default(),
1859                })
1860                .await
1861                .unwrap();
1862
1863            // Include VID information for each leaf.
1864            let share = VidDisperseShare2::<SeqTypes> {
1865                view_number: leaf.view_number(),
1866                payload_commitment,
1867                share: shares[0].clone(),
1868                recipient_key: pubkey,
1869                epoch: Some(EpochNumber::new(0)),
1870                target_epoch: Some(EpochNumber::new(0)),
1871                common: avidm_param.clone(),
1872            };
1873            persistence
1874                .append_vid2(&share.to_proposal(&privkey).unwrap())
1875                .await
1876                .unwrap();
1877
1878            // Include payload information for each leaf.
1879            let block_payload_signature =
1880                PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
1881            let da_proposal_inner = DaProposal2::<SeqTypes> {
1882                encoded_transactions: payload_bytes_arc.clone(),
1883                metadata: payload.ns_table().clone(),
1884                view_number: leaf.view_number(),
1885                epoch: Some(EpochNumber::new(0)),
1886                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1887            };
1888            let da_proposal = Proposal {
1889                data: da_proposal_inner,
1890                signature: block_payload_signature,
1891                _pd: Default::default(),
1892            };
1893            persistence
1894                .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1895                .await
1896                .unwrap();
1897        }
1898        // Split into two chains.
1899        let mut chain2 = chain1.split_off(2);
1900        // Make non-consecutive (i.e. we skip a leaf).
1901        chain2.remove(0);
1902
1903        // Decide 2 leaves, but fail in event processing.
1904        let leaf_chain = chain1
1905            .iter()
1906            .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
1907            .collect::<Vec<_>>();
1908        tracing::info!("decide with event handling failure");
1909        persistence
1910            .append_decided_leaves(
1911                ViewNumber::new(1),
1912                leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
1913                &FailConsumer,
1914            )
1915            .await
1916            .unwrap();
1917
1918        // Now decide remaining leaves successfully. We should now process a decide event for all
1919        // the leaves.
1920        let consumer = ApiEventConsumer::from(data_source.clone());
1921        let leaf_chain = chain2
1922            .iter()
1923            .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
1924            .collect::<Vec<_>>();
1925        tracing::info!("decide successfully");
1926        persistence
1927            .append_decided_leaves(
1928                ViewNumber::new(4),
1929                leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
1930                &consumer,
1931            )
1932            .await
1933            .unwrap();
1934
1935        // Check that the leaves were moved to archive storage, along with payload and VID
1936        // information.
1937        for (leaf, qc) in chain1.iter().chain(&chain2) {
1938            tracing::info!(height = leaf.height(), "check archive");
1939            let qd = data_source.get_leaf(leaf.height() as usize).await.await;
1940            let stored_leaf: Leaf2 = qd.leaf().clone();
1941            let stored_qc = qd.qc().clone();
1942            assert_eq!(&stored_leaf, leaf);
1943            assert_eq!(&stored_qc, qc);
1944
1945            data_source
1946                .get_block(leaf.height() as usize)
1947                .await
1948                .try_resolve()
1949                .ok()
1950                .unwrap();
1951            data_source
1952                .get_vid_common(leaf.height() as usize)
1953                .await
1954                .try_resolve()
1955                .ok()
1956                .unwrap();
1957
1958            // Check that all data has been garbage collected for the decided views.
1959            assert!(persistence
1960                .load_da_proposal(leaf.view_number())
1961                .await
1962                .unwrap()
1963                .is_none());
1964            assert!(persistence
1965                .load_vid_share(leaf.view_number())
1966                .await
1967                .unwrap()
1968                .is_none());
1969            assert!(persistence
1970                .load_quorum_proposal(leaf.view_number())
1971                .await
1972                .is_err());
1973        }
1974
1975        // Check that data has _not_ been garbage collected for the missing view.
1976        assert!(persistence
1977            .load_da_proposal(ViewNumber::new(2))
1978            .await
1979            .unwrap()
1980            .is_some());
1981        assert!(persistence
1982            .load_vid_share(ViewNumber::new(2))
1983            .await
1984            .unwrap()
1985            .is_some());
1986        persistence
1987            .load_quorum_proposal(ViewNumber::new(2))
1988            .await
1989            .unwrap();
1990    }
1991
1992    #[rstest_reuse::apply(testable_sequencer_data_source)]
1993    pub async fn test_decide_missing_data<D>(_d: PhantomData<D>)
1994    where
1995        D: TestableSequencerDataSource + Debug + 'static,
1996    {
1997        let storage = D::create_storage().await;
1998        let persistence = D::persistence_options(&storage).create().await.unwrap();
1999        let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
2000            Arc::new(StorageState::new(
2001                D::create(D::persistence_options(&storage), Default::default(), false)
2002                    .await
2003                    .unwrap(),
2004                ApiState::new(future::pending()),
2005            ));
2006        let consumer = ApiEventConsumer::from(data_source.clone());
2007
2008        let mut qc = QuorumCertificate2::genesis::<MockSequencerVersions>(
2009            &ValidatedState::default(),
2010            &NodeState::mock(),
2011        )
2012        .await;
2013        let leaf =
2014            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
2015
2016        // Append the genesis leaf. We don't use this for the test, because the update function will
2017        // automatically fill in the missing data for genesis. We just append this to get into a
2018        // consistent state to then append the leaf from view 1, which will have missing data.
2019        tracing::info!(?leaf, ?qc, "decide genesis leaf");
2020        persistence
2021            .append_decided_leaves(
2022                leaf.view_number(),
2023                [(&leaf_info(leaf.clone()), qc.clone())],
2024                &consumer,
2025            )
2026            .await
2027            .unwrap();
2028
2029        // Create another leaf, with missing data.
2030        let mut block_header = leaf.block_header().clone();
2031        *block_header.height_mut() += 1;
2032        let qp = QuorumProposalWrapper {
2033            proposal: QuorumProposal2 {
2034                block_header,
2035                view_number: leaf.view_number() + 1,
2036                justify_qc: qc.clone(),
2037                upgrade_certificate: None,
2038                view_change_evidence: None,
2039                next_drb_result: None,
2040                next_epoch_justify_qc: None,
2041                epoch: None,
2042                state_cert: None,
2043            },
2044        };
2045
2046        let leaf = Leaf2::from_quorum_proposal(&qp);
2047        qc.view_number = leaf.view_number();
2048        qc.data.leaf_commit = Committable::commit(&leaf);
2049
2050        // Decide a leaf without the corresponding payload or VID.
2051        tracing::info!(?leaf, ?qc, "append leaf 1");
2052        persistence
2053            .append_decided_leaves(
2054                leaf.view_number(),
2055                [(&leaf_info(leaf.clone()), qc)],
2056                &consumer,
2057            )
2058            .await
2059            .unwrap();
2060
2061        // Check that we still processed the leaf.
2062        assert_eq!(leaf, data_source.get_leaf(1).await.await.leaf().clone());
2063        assert!(data_source.get_vid_common(1).await.is_pending());
2064        assert!(data_source.get_block(1).await.is_pending());
2065    }
2066
2067    fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
2068        LeafInfo {
2069            leaf,
2070            vid_share: None,
2071            state: Default::default(),
2072            delta: None,
2073            state_cert: None,
2074        }
2075    }
2076}
2077
2078#[cfg(test)]
2079mod test {
2080    use std::{
2081        collections::{HashMap, HashSet},
2082        time::Duration,
2083    };
2084
2085    use alloy::{
2086        eips::BlockId,
2087        network::EthereumWallet,
2088        primitives::U256,
2089        providers::{Provider, ProviderBuilder},
2090    };
2091    use async_lock::Mutex;
2092    use committable::{Commitment, Committable};
2093    use espresso_contract_deployer::{
2094        builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
2095        Contract, Contracts,
2096    };
2097    use espresso_types::{
2098        config::PublicHotShotConfig,
2099        traits::{NullEventConsumer, PersistenceOptions},
2100        v0_3::{Fetcher, RewardAmount, COMMISSION_BASIS_POINTS},
2101        validators_from_l1_events, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAmount, FeeVersion,
2102        Header, L1ClientOptions, MockSequencerVersions, NamespaceId, RewardDistributor,
2103        SequencerVersions, ValidatedState,
2104    };
2105    use futures::{
2106        future::{self, join_all},
2107        stream::{StreamExt, TryStreamExt},
2108    };
2109    use hotshot::types::EventType;
2110    use hotshot_contract_adapter::sol_types::{EspToken, StakeTableV2};
2111    use hotshot_example_types::node_types::EpochsTestVersions;
2112    use hotshot_query_service::{
2113        availability::{
2114            BlockQueryData, BlockSummaryQueryData, LeafQueryData, StateCertQueryDataV1,
2115            StateCertQueryDataV2, TransactionQueryData, VidCommonQueryData,
2116        },
2117        data_source::{sql::Config, storage::SqlStorage, VersionedDataSource},
2118        explorer::TransactionSummariesResponse,
2119        types::HeightIndexed,
2120    };
2121    use hotshot_types::{
2122        data::EpochNumber,
2123        event::LeafInfo,
2124        traits::{
2125            block_contents::BlockHeader, election::Membership, metrics::NoMetrics,
2126            node_implementation::ConsensusTime,
2127        },
2128        utils::epoch_from_block_number,
2129        ValidatorConfig,
2130    };
2131    use jf_merkle_tree::prelude::{MerkleProof, Sha3Node};
2132    use portpicker::pick_unused_port;
2133    use rand::seq::SliceRandom;
2134    use rstest::rstest;
2135    use staking_cli::demo::DelegationConfig;
2136    use surf_disco::Client;
2137    use test_helpers::{
2138        catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
2139        TestNetwork, TestNetworkConfigBuilder,
2140    };
2141    use tide_disco::{app::AppHealth, error::ServerError, healthcheck::HealthStatus};
2142    use tokio::time::sleep;
2143    use vbs::version::{StaticVersion, StaticVersionType};
2144
2145    use self::{
2146        data_source::testing::TestableSequencerDataSource, options::HotshotEvents,
2147        sql::DataSource as SqlDataSource,
2148    };
2149    use super::*;
2150    use crate::{
2151        api::{
2152            options::Query,
2153            sql::{impl_testable_data_source::tmp_options, reconstruct_state},
2154            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2155        },
2156        catchup::{NullStateCatchup, StatePeers},
2157        persistence::no_storage,
2158        testing::{wait_for_decide_on_handle, TestConfig, TestConfigBuilder},
2159    };
2160
2161    type PosVersionV3 = SequencerVersions<StaticVersion<0, 3>, StaticVersion<0, 0>>;
2162    type PosVersionV4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
2163
2164    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2165    async fn test_healthcheck() {
2166        let port = pick_unused_port().expect("No ports free");
2167        let url = format!("http://localhost:{port}").parse().unwrap();
2168        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2169        let options = Options::with_port(port);
2170        let network_config = TestConfigBuilder::default().build();
2171        let config = TestNetworkConfigBuilder::<5, _, NullStateCatchup>::default()
2172            .api_config(options)
2173            .network_config(network_config)
2174            .build();
2175        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2176
2177        client.connect(None).await;
2178        let health = client.get::<AppHealth>("healthcheck").send().await.unwrap();
2179        assert_eq!(health.status, HealthStatus::Available);
2180    }
2181
2182    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2183    async fn status_test_without_query_module() {
2184        status_test_helper(|opt| opt).await
2185    }
2186
2187    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2188    async fn submit_test_without_query_module() {
2189        submit_test_helper(|opt| opt).await
2190    }
2191
2192    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2193    async fn state_signature_test_without_query_module() {
2194        state_signature_test_helper(|opt| opt).await
2195    }
2196
2197    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2198    async fn catchup_test_without_query_module() {
2199        catchup_test_helper(|opt| opt).await
2200    }
2201
2202    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2203    async fn slow_test_merklized_state_api() {
2204        let port = pick_unused_port().expect("No ports free");
2205
2206        let storage = SqlDataSource::create_storage().await;
2207
2208        let options = SqlDataSource::options(&storage, Options::with_port(port));
2209
2210        let network_config = TestConfigBuilder::default().build();
2211        let config = TestNetworkConfigBuilder::default()
2212            .api_config(options)
2213            .network_config(network_config)
2214            .build();
2215        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2216        let url = format!("http://localhost:{port}").parse().unwrap();
2217        let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
2218
2219        client.connect(Some(Duration::from_secs(15))).await;
2220
2221        // Wait until some blocks have been decided.
2222        tracing::info!("waiting for blocks");
2223        let blocks = client
2224            .socket("availability/stream/blocks/0")
2225            .subscribe::<BlockQueryData<SeqTypes>>()
2226            .await
2227            .unwrap()
2228            .take(4)
2229            .try_collect::<Vec<_>>()
2230            .await
2231            .unwrap();
2232
2233        // sleep for few seconds so that state data is upserted
2234        tracing::info!("waiting for state to be inserted");
2235        sleep(Duration::from_secs(5)).await;
2236        network.stop_consensus().await;
2237
2238        for block in blocks {
2239            let i = block.height();
2240            tracing::info!(i, "get block state");
2241            let path = client
2242                .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
2243                    "block-state/{}/{i}",
2244                    i + 1
2245                ))
2246                .send()
2247                .await
2248                .unwrap();
2249            assert_eq!(*path.elem().unwrap(), block.hash());
2250
2251            tracing::info!(i, "get fee state");
2252            let account = TestConfig::<5>::builder_key().fee_account();
2253            let path = client
2254                .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
2255                    "fee-state/{}/{}",
2256                    i + 1,
2257                    account
2258                ))
2259                .send()
2260                .await
2261                .unwrap();
2262            assert_eq!(*path.index(), account);
2263            assert!(*path.elem().unwrap() > 0.into(), "{:?}", path.elem());
2264        }
2265
2266        // testing fee_balance api
2267        let account = TestConfig::<5>::builder_key().fee_account();
2268        let amount = client
2269            .get::<Option<FeeAmount>>(&format!("fee-state/fee-balance/latest/{account}"))
2270            .send()
2271            .await
2272            .unwrap()
2273            .unwrap();
2274        let expected = U256::MAX;
2275        assert_eq!(expected, amount.0);
2276    }
2277
2278    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2279    async fn test_leaf_only_data_source() {
2280        let port = pick_unused_port().expect("No ports free");
2281
2282        let storage = SqlDataSource::create_storage().await;
2283        let options =
2284            SqlDataSource::leaf_only_ds_options(&storage, Options::with_port(port)).unwrap();
2285
2286        let network_config = TestConfigBuilder::default().build();
2287        let config = TestNetworkConfigBuilder::default()
2288            .api_config(options)
2289            .network_config(network_config)
2290            .build();
2291        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2292        let url = format!("http://localhost:{port}").parse().unwrap();
2293        let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
2294
2295        tracing::info!("waiting for blocks");
2296        client.connect(Some(Duration::from_secs(15))).await;
2297        // Wait until some blocks have been decided.
2298
2299        let account = TestConfig::<5>::builder_key().fee_account();
2300
2301        let _headers = client
2302            .socket("availability/stream/headers/0")
2303            .subscribe::<Header>()
2304            .await
2305            .unwrap()
2306            .take(10)
2307            .try_collect::<Vec<_>>()
2308            .await
2309            .unwrap();
2310
2311        for i in 1..5 {
2312            let leaf = client
2313                .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{i}"))
2314                .send()
2315                .await
2316                .unwrap();
2317
2318            assert_eq!(leaf.height(), i);
2319
2320            let header = client
2321                .get::<Header>(&format!("availability/header/{i}"))
2322                .send()
2323                .await
2324                .unwrap();
2325
2326            assert_eq!(header.height(), i);
2327
2328            let vid = client
2329                .get::<VidCommonQueryData<SeqTypes>>(&format!("availability/vid/common/{i}"))
2330                .send()
2331                .await
2332                .unwrap();
2333
2334            assert_eq!(vid.height(), i);
2335
2336            client
2337                .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
2338                    "block-state/{i}/{}",
2339                    i - 1
2340                ))
2341                .send()
2342                .await
2343                .unwrap();
2344
2345            client
2346                .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
2347                    "fee-state/{}/{}",
2348                    i + 1,
2349                    account
2350                ))
2351                .send()
2352                .await
2353                .unwrap();
2354        }
2355
2356        // This would fail even though we have processed atleast 10 leaves
2357        // this is because light weight nodes only support leaves, headers and VID
2358        client
2359            .get::<BlockQueryData<SeqTypes>>("availability/block/1")
2360            .send()
2361            .await
2362            .unwrap_err();
2363    }
2364
2365    async fn run_catchup_test(url_suffix: &str) {
2366        // Start a sequencer network, using the query service for catchup.
2367        let port = pick_unused_port().expect("No ports free");
2368        const NUM_NODES: usize = 5;
2369
2370        let url: url::Url = format!("http://localhost:{port}{url_suffix}")
2371            .parse()
2372            .unwrap();
2373
2374        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2375            .api_config(Options::with_port(port))
2376            .network_config(TestConfigBuilder::default().build())
2377            .catchups(std::array::from_fn(|_| {
2378                StatePeers::<StaticVersion<0, 1>>::from_urls(
2379                    vec![url.clone()],
2380                    Default::default(),
2381                    &NoMetrics,
2382                )
2383            }))
2384            .build();
2385        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2386
2387        // Wait for replica 0 to reach a (non-genesis) decide, before disconnecting it.
2388        let mut events = network.peers[0].event_stream().await;
2389        loop {
2390            let event = events.next().await.unwrap();
2391            let EventType::Decide { leaf_chain, .. } = event.event else {
2392                continue;
2393            };
2394            if leaf_chain[0].leaf.height() > 0 {
2395                break;
2396            }
2397        }
2398
2399        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
2400        // drop the node and recreate it so it loses all of its temporary state and starts off from
2401        // genesis. It should be able to catch up by listening to proposals and then rebuild its
2402        // state from its peers.
2403        tracing::info!("shutting down node");
2404        network.peers.remove(0);
2405
2406        // Wait for a few blocks to pass while the node is down, so it falls behind.
2407        network
2408            .server
2409            .event_stream()
2410            .await
2411            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2412            .take(3)
2413            .collect::<Vec<_>>()
2414            .await;
2415
2416        tracing::info!("restarting node");
2417        let node = network
2418            .cfg
2419            .init_node(
2420                1,
2421                ValidatedState::default(),
2422                no_storage::Options,
2423                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
2424                    vec![url],
2425                    Default::default(),
2426                    &NoMetrics,
2427                )),
2428                None,
2429                &NoMetrics,
2430                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2431                NullEventConsumer,
2432                MockSequencerVersions::new(),
2433                Default::default(),
2434            )
2435            .await;
2436        let mut events = node.event_stream().await;
2437
2438        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
2439        // caught up and all nodes are in sync.
2440        let mut proposers = [false; NUM_NODES];
2441        loop {
2442            let event = events.next().await.unwrap();
2443            let EventType::Decide { leaf_chain, .. } = event.event else {
2444                continue;
2445            };
2446            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
2447                let height = leaf.height();
2448                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
2449                if height == 0 {
2450                    continue;
2451                }
2452
2453                tracing::info!(
2454                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
2455                );
2456                proposers[leaf_builder] = true;
2457            }
2458
2459            if proposers.iter().all(|has_proposed| *has_proposed) {
2460                break;
2461            }
2462        }
2463    }
2464
2465    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2466    async fn test_catchup() {
2467        run_catchup_test("").await;
2468    }
2469
2470    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2471    async fn test_catchup_v0() {
2472        run_catchup_test("/v0").await;
2473    }
2474
2475    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2476    async fn test_catchup_v1() {
2477        run_catchup_test("/v1").await;
2478    }
2479
2480    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2481    async fn test_catchup_no_state_peers() {
2482        // Start a sequencer network, using the query service for catchup.
2483        let port = pick_unused_port().expect("No ports free");
2484        const NUM_NODES: usize = 5;
2485        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2486            .api_config(Options::with_port(port))
2487            .network_config(TestConfigBuilder::default().build())
2488            .build();
2489        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2490
2491        // Wait for replica 0 to reach a (non-genesis) decide, before disconnecting it.
2492        let mut events = network.peers[0].event_stream().await;
2493        loop {
2494            let event = events.next().await.unwrap();
2495            let EventType::Decide { leaf_chain, .. } = event.event else {
2496                continue;
2497            };
2498            if leaf_chain[0].leaf.height() > 0 {
2499                break;
2500            }
2501        }
2502
2503        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
2504        // drop the node and recreate it so it loses all of its temporary state and starts off from
2505        // genesis. It should be able to catch up by listening to proposals and then rebuild its
2506        // state from its peers.
2507        tracing::info!("shutting down node");
2508        network.peers.remove(0);
2509
2510        // Wait for a few blocks to pass while the node is down, so it falls behind.
2511        network
2512            .server
2513            .event_stream()
2514            .await
2515            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2516            .take(3)
2517            .collect::<Vec<_>>()
2518            .await;
2519
2520        tracing::info!("restarting node");
2521        let node = network
2522            .cfg
2523            .init_node(
2524                1,
2525                ValidatedState::default(),
2526                no_storage::Options,
2527                None::<NullStateCatchup>,
2528                None,
2529                &NoMetrics,
2530                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2531                NullEventConsumer,
2532                MockSequencerVersions::new(),
2533                Default::default(),
2534            )
2535            .await;
2536        let mut events = node.event_stream().await;
2537
2538        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
2539        // caught up and all nodes are in sync.
2540        let mut proposers = [false; NUM_NODES];
2541        loop {
2542            let event = events.next().await.unwrap();
2543            let EventType::Decide { leaf_chain, .. } = event.event else {
2544                continue;
2545            };
2546            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
2547                let height = leaf.height();
2548                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
2549                if height == 0 {
2550                    continue;
2551                }
2552
2553                tracing::info!(
2554                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
2555                );
2556                proposers[leaf_builder] = true;
2557            }
2558
2559            if proposers.iter().all(|has_proposed| *has_proposed) {
2560                break;
2561            }
2562        }
2563    }
2564
2565    #[ignore]
2566    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2567    async fn test_catchup_epochs_no_state_peers() {
2568        // Start a sequencer network, using the query service for catchup.
2569        let port = pick_unused_port().expect("No ports free");
2570        const EPOCH_HEIGHT: u64 = 5;
2571        let network_config = TestConfigBuilder::default()
2572            .epoch_height(EPOCH_HEIGHT)
2573            .build();
2574        const NUM_NODES: usize = 5;
2575        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2576            .api_config(Options::with_port(port))
2577            .network_config(network_config)
2578            .build();
2579        let mut network = TestNetwork::new(config, EpochsTestVersions {}).await;
2580
2581        // Wait for replica 0 to decide in the third epoch.
2582        let mut events = network.peers[0].event_stream().await;
2583        loop {
2584            let event = events.next().await.unwrap();
2585            let EventType::Decide { leaf_chain, .. } = event.event else {
2586                continue;
2587            };
2588            tracing::error!("got decide height {}", leaf_chain[0].leaf.height());
2589
2590            if leaf_chain[0].leaf.height() > EPOCH_HEIGHT * 3 {
2591                tracing::error!("decided past one epoch");
2592                break;
2593            }
2594        }
2595
2596        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
2597        // drop the node and recreate it so it loses all of its temporary state and starts off from
2598        // genesis. It should be able to catch up by listening to proposals and then rebuild its
2599        // state from its peers.
2600        tracing::info!("shutting down node");
2601        network.peers.remove(0);
2602
2603        // Wait for a few blocks to pass while the node is down, so it falls behind.
2604        network
2605            .server
2606            .event_stream()
2607            .await
2608            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2609            .take(3)
2610            .collect::<Vec<_>>()
2611            .await;
2612
2613        tracing::error!("restarting node");
2614        let node = network
2615            .cfg
2616            .init_node(
2617                1,
2618                ValidatedState::default(),
2619                no_storage::Options,
2620                None::<NullStateCatchup>,
2621                None,
2622                &NoMetrics,
2623                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2624                NullEventConsumer,
2625                MockSequencerVersions::new(),
2626                Default::default(),
2627            )
2628            .await;
2629        let mut events = node.event_stream().await;
2630
2631        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
2632        // caught up and all nodes are in sync.
2633        let mut proposers = [false; NUM_NODES];
2634        loop {
2635            let event = events.next().await.unwrap();
2636            let EventType::Decide { leaf_chain, .. } = event.event else {
2637                continue;
2638            };
2639            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
2640                let height = leaf.height();
2641                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
2642                if height == 0 {
2643                    continue;
2644                }
2645
2646                tracing::info!(
2647                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
2648                );
2649                proposers[leaf_builder] = true;
2650            }
2651
2652            if proposers.iter().all(|has_proposed| *has_proposed) {
2653                break;
2654            }
2655        }
2656    }
2657
2658    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2659    async fn test_chain_config_from_instance() {
2660        // This test uses a ValidatedState which only has the default chain config commitment.
2661        // The NodeState has the full chain config.
2662        // Both chain config commitments will match, so the ValidatedState should have the full chain config after a non-genesis block is decided.
2663
2664        let port = pick_unused_port().expect("No ports free");
2665
2666        let chain_config: ChainConfig = ChainConfig::default();
2667
2668        let state = ValidatedState {
2669            chain_config: chain_config.commit().into(),
2670            ..Default::default()
2671        };
2672
2673        let states = std::array::from_fn(|_| state.clone());
2674
2675        let config = TestNetworkConfigBuilder::default()
2676            .api_config(Options::with_port(port))
2677            .states(states)
2678            .catchups(std::array::from_fn(|_| {
2679                StatePeers::<StaticVersion<0, 1>>::from_urls(
2680                    vec![format!("http://localhost:{port}").parse().unwrap()],
2681                    Default::default(),
2682                    &NoMetrics,
2683                )
2684            }))
2685            .network_config(TestConfigBuilder::default().build())
2686            .build();
2687
2688        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2689
2690        // Wait for few blocks to be decided.
2691        network
2692            .server
2693            .event_stream()
2694            .await
2695            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2696            .take(3)
2697            .collect::<Vec<_>>()
2698            .await;
2699
2700        for peer in &network.peers {
2701            let state = peer.consensus().read().await.decided_state().await;
2702
2703            assert_eq!(state.chain_config.resolve().unwrap(), chain_config)
2704        }
2705
2706        network.server.shut_down().await;
2707        drop(network);
2708    }
2709
2710    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2711    async fn test_chain_config_catchup() {
2712        // This test uses a ValidatedState with a non-default chain config
2713        // so it will be different from the NodeState chain config used by the TestNetwork.
2714        // However, for this test to work, at least one node should have a full chain config
2715        // to allow other nodes to catch up.
2716
2717        let port = pick_unused_port().expect("No ports free");
2718
2719        let cf = ChainConfig {
2720            max_block_size: 300.into(),
2721            base_fee: 1.into(),
2722            ..Default::default()
2723        };
2724
2725        // State1 contains only the chain config commitment
2726        let state1 = ValidatedState {
2727            chain_config: cf.commit().into(),
2728            ..Default::default()
2729        };
2730
2731        //state 2 contains the full chain config
2732        let state2 = ValidatedState {
2733            chain_config: cf.into(),
2734            ..Default::default()
2735        };
2736
2737        let mut states = std::array::from_fn(|_| state1.clone());
2738        // only one node has the full chain config
2739        // all the other nodes should do a catchup to get the full chain config from peer 0
2740        states[0] = state2;
2741
2742        const NUM_NODES: usize = 5;
2743        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2744            .api_config(Options::from(options::Http {
2745                port,
2746                max_connections: None,
2747            }))
2748            .states(states)
2749            .catchups(std::array::from_fn(|_| {
2750                StatePeers::<StaticVersion<0, 1>>::from_urls(
2751                    vec![format!("http://localhost:{port}").parse().unwrap()],
2752                    Default::default(),
2753                    &NoMetrics,
2754                )
2755            }))
2756            .network_config(TestConfigBuilder::default().build())
2757            .build();
2758
2759        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2760
2761        // Wait for a few blocks to be decided.
2762        network
2763            .server
2764            .event_stream()
2765            .await
2766            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2767            .take(3)
2768            .collect::<Vec<_>>()
2769            .await;
2770
2771        for peer in &network.peers {
2772            let state = peer.consensus().read().await.decided_state().await;
2773
2774            assert_eq!(state.chain_config.resolve().unwrap(), cf)
2775        }
2776
2777        network.server.shut_down().await;
2778        drop(network);
2779    }
2780
2781    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2782    async fn test_pos_upgrade_view_based() {
2783        type PosUpgrade = SequencerVersions<FeeVersion, EpochVersion>;
2784        test_upgrade_helper::<PosUpgrade>(PosUpgrade::new()).await;
2785    }
2786
2787    async fn test_upgrade_helper<V: Versions>(version: V) {
2788        // wait this number of views beyond the configured first view
2789        // before asserting anything.
2790        let wait_extra_views = 10;
2791        // Number of nodes running in the test network.
2792        const NUM_NODES: usize = 5;
2793        let upgrade_version = <V as Versions>::Upgrade::VERSION;
2794        let port = pick_unused_port().expect("No ports free");
2795
2796        let test_config = TestConfigBuilder::default()
2797            .epoch_height(200)
2798            .epoch_start_block(321)
2799            .set_upgrades(upgrade_version)
2800            .await
2801            .build();
2802
2803        let chain_config_upgrade = test_config.get_upgrade_map().chain_config(upgrade_version);
2804        tracing::debug!(?chain_config_upgrade);
2805
2806        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2807            .api_config(Options::from(options::Http {
2808                port,
2809                max_connections: None,
2810            }))
2811            .catchups(std::array::from_fn(|_| {
2812                StatePeers::<SequencerApiVersion>::from_urls(
2813                    vec![format!("http://localhost:{port}").parse().unwrap()],
2814                    Default::default(),
2815                    &NoMetrics,
2816                )
2817            }))
2818            .network_config(test_config)
2819            .build();
2820
2821        let mut network = TestNetwork::new(config, version).await;
2822        let mut events = network.server.event_stream().await;
2823
2824        // First loop to get an `UpgradeProposal`. Note that the
2825        // actual upgrade will take several to many subsequent views for
2826        // voting and finally the actual upgrade.
2827        let upgrade = loop {
2828            let event = events.next().await.unwrap();
2829            match event.event {
2830                EventType::UpgradeProposal { proposal, .. } => {
2831                    tracing::info!(?proposal, "proposal");
2832                    let upgrade = proposal.data.upgrade_proposal;
2833                    let new_version = upgrade.new_version;
2834                    tracing::info!(?new_version, "upgrade proposal new version");
2835                    assert_eq!(new_version, <V as Versions>::Upgrade::VERSION);
2836                    break upgrade;
2837                },
2838                _ => continue,
2839            }
2840        };
2841
2842        let wanted_view = upgrade.new_version_first_view + wait_extra_views;
2843        // Loop until we get the `new_version_first_view`, then test the upgrade.
2844        loop {
2845            let event = events.next().await.unwrap();
2846            let view_number = event.view_number;
2847
2848            tracing::debug!(?view_number, ?upgrade.new_version_first_view, "upgrade_new_view");
2849            if view_number > wanted_view {
2850                let states: Vec<_> = network
2851                    .peers
2852                    .iter()
2853                    .map(|peer| async { peer.consensus().read().await.decided_state().await })
2854                    .collect();
2855
2856                let configs: Option<Vec<ChainConfig>> = join_all(states)
2857                    .await
2858                    .iter()
2859                    .map(|state| state.chain_config.resolve())
2860                    .collect();
2861
2862                tracing::debug!(?configs, "`ChainConfig`s for nodes");
2863                if let Some(configs) = configs {
2864                    for config in configs {
2865                        assert_eq!(config, chain_config_upgrade);
2866                    }
2867                    break; // if assertion did not panic, the test was successful, so we exit the loop
2868                }
2869            }
2870            sleep(Duration::from_millis(200)).await;
2871        }
2872
2873        network.server.shut_down().await;
2874    }
2875
2876    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2877    pub(crate) async fn test_restart() {
2878        const NUM_NODES: usize = 5;
2879        // Initialize nodes.
2880        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
2881        let persistence: [_; NUM_NODES] = storage
2882            .iter()
2883            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
2884            .collect::<Vec<_>>()
2885            .try_into()
2886            .unwrap();
2887        let port = pick_unused_port().unwrap();
2888        let config = TestNetworkConfigBuilder::default()
2889            .api_config(SqlDataSource::options(
2890                &storage[0],
2891                Options::with_port(port),
2892            ))
2893            .persistences(persistence.clone())
2894            .network_config(TestConfigBuilder::default().build())
2895            .build();
2896        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2897
2898        // Connect client.
2899        let client: Client<ServerError, SequencerApiVersion> =
2900            Client::new(format!("http://localhost:{port}").parse().unwrap());
2901        client.connect(None).await;
2902        tracing::info!(port, "server running");
2903
2904        // Wait until some blocks have been decided.
2905        client
2906            .socket("availability/stream/blocks/0")
2907            .subscribe::<BlockQueryData<SeqTypes>>()
2908            .await
2909            .unwrap()
2910            .take(3)
2911            .collect::<Vec<_>>()
2912            .await;
2913
2914        // Shut down the consensus nodes.
2915        tracing::info!("shutting down nodes");
2916        network.stop_consensus().await;
2917
2918        // Get the block height we reached.
2919        let height = client
2920            .get::<usize>("status/block-height")
2921            .send()
2922            .await
2923            .unwrap();
2924        tracing::info!("decided {height} blocks before shutting down");
2925
2926        // Get the decided chain, so we can check consistency after the restart.
2927        let chain: Vec<LeafQueryData<SeqTypes>> = client
2928            .socket("availability/stream/leaves/0")
2929            .subscribe()
2930            .await
2931            .unwrap()
2932            .take(height)
2933            .try_collect()
2934            .await
2935            .unwrap();
2936        let decided_view = chain.last().unwrap().leaf().view_number();
2937
2938        // Get the most recent state, for catchup.
2939
2940        let state = network.server.decided_state().await;
2941        tracing::info!(?decided_view, ?state, "consensus state");
2942
2943        // Fully shut down the API servers.
2944        drop(network);
2945
2946        // Start up again, resuming from the last decided leaf.
2947        let port = pick_unused_port().expect("No ports free");
2948
2949        let config = TestNetworkConfigBuilder::default()
2950            .api_config(SqlDataSource::options(
2951                &storage[0],
2952                Options::with_port(port),
2953            ))
2954            .persistences(persistence)
2955            .catchups(std::array::from_fn(|_| {
2956                // Catchup using node 0 as a peer. Node 0 was running the archival state service
2957                // before the restart, so it should be able to resume without catching up by loading
2958                // state from storage.
2959                StatePeers::<StaticVersion<0, 1>>::from_urls(
2960                    vec![format!("http://localhost:{port}").parse().unwrap()],
2961                    Default::default(),
2962                    &NoMetrics,
2963                )
2964            }))
2965            .network_config(TestConfigBuilder::default().build())
2966            .build();
2967        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2968        let client: Client<ServerError, StaticVersion<0, 1>> =
2969            Client::new(format!("http://localhost:{port}").parse().unwrap());
2970        client.connect(None).await;
2971        tracing::info!(port, "server running");
2972
2973        // Make sure we can decide new blocks after the restart.
2974        tracing::info!("waiting for decide, height {height}");
2975        let new_leaf: LeafQueryData<SeqTypes> = client
2976            .socket(&format!("availability/stream/leaves/{height}"))
2977            .subscribe()
2978            .await
2979            .unwrap()
2980            .next()
2981            .await
2982            .unwrap()
2983            .unwrap();
2984        assert_eq!(new_leaf.height(), height as u64);
2985        assert_eq!(
2986            new_leaf.leaf().parent_commitment(),
2987            chain[height - 1].hash()
2988        );
2989
2990        // Ensure the new chain is consistent with the old chain.
2991        let new_chain: Vec<LeafQueryData<SeqTypes>> = client
2992            .socket("availability/stream/leaves/0")
2993            .subscribe()
2994            .await
2995            .unwrap()
2996            .take(height)
2997            .try_collect()
2998            .await
2999            .unwrap();
3000        assert_eq!(chain, new_chain);
3001    }
3002
3003    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3004    async fn test_fetch_config() {
3005        let port = pick_unused_port().expect("No ports free");
3006        let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap();
3007        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url.clone());
3008
3009        let options = Options::with_port(port).config(Default::default());
3010        let network_config = TestConfigBuilder::default().build();
3011        let config = TestNetworkConfigBuilder::default()
3012            .api_config(options)
3013            .network_config(network_config)
3014            .build();
3015        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3016        client.connect(None).await;
3017
3018        // Fetch a network config from the API server. The first peer URL is bogus, to test the
3019        // failure/retry case.
3020        let peers = StatePeers::<StaticVersion<0, 1>>::from_urls(
3021            vec!["https://notarealnode.network".parse().unwrap(), url],
3022            Default::default(),
3023            &NoMetrics,
3024        );
3025
3026        // Fetch the config from node 1, a different node than the one running the service.
3027        let validator =
3028            ValidatorConfig::generated_from_seed_indexed([0; 32], 1, U256::from(1), false);
3029        let config = peers.fetch_config(validator.clone()).await.unwrap();
3030
3031        // Check the node-specific information in the recovered config is correct.
3032        assert_eq!(config.node_index, 1);
3033
3034        // Check the public information is also correct (with respect to the node that actually
3035        // served the config, for public keys).
3036        pretty_assertions::assert_eq!(
3037            serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(),
3038            serde_json::to_value(PublicHotShotConfig::from(
3039                network.cfg.hotshot_config().clone()
3040            ))
3041            .unwrap()
3042        );
3043    }
3044
3045    async fn run_hotshot_event_streaming_test(url_suffix: &str) {
3046        let query_service_port = pick_unused_port().expect("No ports free for query service");
3047
3048        let url = format!("http://localhost:{query_service_port}{url_suffix}")
3049            .parse()
3050            .unwrap();
3051
3052        let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3053
3054        let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3055
3056        let network_config = TestConfigBuilder::default().build();
3057        let config = TestNetworkConfigBuilder::default()
3058            .api_config(options)
3059            .network_config(network_config)
3060            .build();
3061        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3062
3063        let mut subscribed_events = client
3064            .socket("hotshot-events/events")
3065            .subscribe::<Event<SeqTypes>>()
3066            .await
3067            .unwrap();
3068
3069        let total_count = 5;
3070        // wait for these events to receive on client 1
3071        let mut receive_count = 0;
3072        loop {
3073            let event = subscribed_events.next().await.unwrap();
3074            tracing::info!("Received event in hotshot event streaming Client 1: {event:?}");
3075            receive_count += 1;
3076            if receive_count > total_count {
3077                tracing::info!("Client Received at least desired events, exiting loop");
3078                break;
3079            }
3080        }
3081        assert_eq!(receive_count, total_count + 1);
3082    }
3083
3084    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3085    async fn test_hotshot_event_streaming_v0() {
3086        run_hotshot_event_streaming_test("/v0").await;
3087    }
3088
3089    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3090    async fn test_hotshot_event_streaming_v1() {
3091        run_hotshot_event_streaming_test("/v1").await;
3092    }
3093
3094    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3095    async fn test_hotshot_event_streaming() {
3096        run_hotshot_event_streaming_test("").await;
3097    }
3098
3099    // TODO when `EpochVersion` becomes base version we can merge this
3100    // w/ above test.
3101    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3102    async fn test_hotshot_event_streaming_epoch_progression() {
3103        let epoch_height = 35;
3104        let wanted_epochs = 4;
3105
3106        let network_config = TestConfigBuilder::default()
3107            .epoch_height(epoch_height)
3108            .build();
3109
3110        let query_service_port = pick_unused_port().expect("No ports free for query service");
3111
3112        let hotshot_url = format!("http://localhost:{query_service_port}")
3113            .parse()
3114            .unwrap();
3115
3116        let client: Client<ServerError, SequencerApiVersion> = Client::new(hotshot_url);
3117        let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3118
3119        let config = TestNetworkConfigBuilder::default()
3120            .api_config(options)
3121            .network_config(network_config.clone())
3122            .pos_hook::<PosVersionV3>(DelegationConfig::VariableAmounts, Default::default())
3123            .await
3124            .expect("Pos Deployment")
3125            .build();
3126
3127        let _network = TestNetwork::new(config, PosVersionV3::new()).await;
3128
3129        let mut subscribed_events = client
3130            .socket("hotshot-events/events")
3131            .subscribe::<Event<SeqTypes>>()
3132            .await
3133            .unwrap();
3134
3135        let wanted_views = epoch_height * wanted_epochs;
3136
3137        let mut views = HashSet::new();
3138        let mut epochs = HashSet::new();
3139        for _ in 0..=600 {
3140            let event = subscribed_events.next().await.unwrap();
3141            let event = event.unwrap();
3142            let view_number = event.view_number;
3143            views.insert(view_number.u64());
3144
3145            if let hotshot::types::EventType::Decide { qc, .. } = event.event {
3146                assert!(qc.data.epoch.is_some(), "epochs are live");
3147                assert!(qc.data.block_number.is_some());
3148
3149                let epoch = qc.data.epoch.unwrap().u64();
3150                epochs.insert(epoch);
3151
3152                tracing::debug!(
3153                    "Got decide: epoch: {:?}, block: {:?} ",
3154                    epoch,
3155                    qc.data.block_number
3156                );
3157
3158                let expected_epoch =
3159                    epoch_from_block_number(qc.data.block_number.unwrap(), epoch_height);
3160                tracing::debug!("expected epoch: {expected_epoch}, qc epoch: {epoch}");
3161
3162                assert_eq!(expected_epoch, epoch);
3163            }
3164            if views.contains(&wanted_views) {
3165                tracing::info!("Client Received at least desired views, exiting loop");
3166                break;
3167            }
3168        }
3169
3170        // prevent false positive when we overflow the range
3171        assert!(views.contains(&wanted_views), "Views are not progressing");
3172        assert!(
3173            epochs.contains(&wanted_epochs),
3174            "Epochs are not progressing"
3175        );
3176    }
3177
3178    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3179    async fn test_pos_rewards_basic() -> anyhow::Result<()> {
3180        // Basic PoS rewards test:
3181        // - Sets up a single validator and a single delegator (the node itself).
3182        // - Sets the number of blocks in each epoch to 20.
3183        // - Rewards begin applying from block 41 (i.e., the start of the 3rd epoch).
3184        // - Since the validator is also the delegator, it receives the full reward.
3185        // - Verifies that the reward at block height 60 matches the expected amount.
3186        let epoch_height = 20;
3187
3188        let network_config = TestConfigBuilder::default()
3189            .epoch_height(epoch_height)
3190            .build();
3191
3192        let api_port = pick_unused_port().expect("No ports free for query service");
3193
3194        const NUM_NODES: usize = 1;
3195        // Initialize nodes.
3196        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3197        let persistence: [_; NUM_NODES] = storage
3198            .iter()
3199            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3200            .collect::<Vec<_>>()
3201            .try_into()
3202            .unwrap();
3203
3204        let config = TestNetworkConfigBuilder::with_num_nodes()
3205            .api_config(SqlDataSource::options(
3206                &storage[0],
3207                Options::with_port(api_port),
3208            ))
3209            .network_config(network_config.clone())
3210            .persistences(persistence.clone())
3211            .catchups(std::array::from_fn(|_| {
3212                StatePeers::<StaticVersion<0, 1>>::from_urls(
3213                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
3214                    Default::default(),
3215                    &NoMetrics,
3216                )
3217            }))
3218            .pos_hook::<PosVersionV3>(DelegationConfig::VariableAmounts, Default::default())
3219            .await
3220            .unwrap()
3221            .build();
3222
3223        let network = TestNetwork::new(config, PosVersionV3::new()).await;
3224        let client: Client<ServerError, SequencerApiVersion> =
3225            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3226
3227        // first two epochs will be 1 and 2
3228        // rewards are distributed starting third epoch
3229        // third epoch starts from block 40 as epoch height is 20
3230        // wait for atleast 65 blocks
3231        let _blocks = client
3232            .socket("availability/stream/blocks/0")
3233            .subscribe::<BlockQueryData<SeqTypes>>()
3234            .await
3235            .unwrap()
3236            .take(65)
3237            .try_collect::<Vec<_>>()
3238            .await
3239            .unwrap();
3240
3241        let staking_priv_keys = network_config.staking_priv_keys();
3242        let account = staking_priv_keys[0].0.clone();
3243        let address = account.address();
3244
3245        let block_height = 60;
3246
3247        // get the validator address balance at block height 60
3248        let amount = client
3249            .get::<Option<RewardAmount>>(&format!(
3250                "reward-state/reward-balance/{block_height}/{address}"
3251            ))
3252            .send()
3253            .await
3254            .unwrap()
3255            .unwrap();
3256
3257        tracing::info!("amount={amount:?}");
3258
3259        let epoch_start_block = 40;
3260
3261        let node_state = network.server.node_state();
3262        let membership = node_state.coordinator.membership().read().await;
3263        let block_reward = membership
3264            .block_reward(None)
3265            .expect("block reward is not None");
3266        drop(membership);
3267
3268        // The validator gets all the block reward so we can calculate the expected amount
3269        let expected_amount = block_reward.0 * (U256::from(block_height - epoch_start_block));
3270
3271        assert_eq!(amount.0, expected_amount, "reward amount don't match");
3272
3273        Ok(())
3274    }
3275
3276    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3277    async fn test_cumulative_pos_rewards() -> anyhow::Result<()> {
3278        // This test registers 5 validators and multiple delegators for each validator.
3279        // One of the delegators is also a validator.
3280        // The test verifies that the cumulative reward at each block height equals the total block reward,
3281        // which is a constant.
3282
3283        let epoch_height = 20;
3284
3285        let network_config = TestConfigBuilder::default()
3286            .epoch_height(epoch_height)
3287            .build();
3288
3289        let api_port = pick_unused_port().expect("No ports free for query service");
3290
3291        const NUM_NODES: usize = 5;
3292        // Initialize nodes.
3293        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3294        let persistence: [_; NUM_NODES] = storage
3295            .iter()
3296            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3297            .collect::<Vec<_>>()
3298            .try_into()
3299            .unwrap();
3300
3301        let config = TestNetworkConfigBuilder::with_num_nodes()
3302            .api_config(SqlDataSource::options(
3303                &storage[0],
3304                Options::with_port(api_port),
3305            ))
3306            .network_config(network_config)
3307            .persistences(persistence.clone())
3308            .catchups(std::array::from_fn(|_| {
3309                StatePeers::<StaticVersion<0, 1>>::from_urls(
3310                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
3311                    Default::default(),
3312                    &NoMetrics,
3313                )
3314            }))
3315            .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3316            .await
3317            .unwrap()
3318            .build();
3319
3320        let network = TestNetwork::new(config, PosVersionV3::new()).await;
3321        let node_state = network.server.node_state();
3322        let membership = node_state.coordinator.membership().read().await;
3323        let block_reward = membership
3324            .block_reward(None)
3325            .expect("block reward is not None");
3326        drop(membership);
3327        let client: Client<ServerError, SequencerApiVersion> =
3328            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3329
3330        // wait for atleast 75 blocks
3331        let _blocks = client
3332            .socket("availability/stream/blocks/0")
3333            .subscribe::<BlockQueryData<SeqTypes>>()
3334            .await
3335            .unwrap()
3336            .take(75)
3337            .try_collect::<Vec<_>>()
3338            .await
3339            .unwrap();
3340
3341        // We are going to check cumulative blocks from block height 40 to 67
3342        // Basically epoch 3 and epoch 4 as epoch height is 20
3343        // get all the validators
3344        let validators = client
3345            .get::<ValidatorMap>("node/validators/3")
3346            .send()
3347            .await
3348            .expect("failed to get validator");
3349
3350        // insert all the address in a map
3351        // We will query the reward-balance at each block height for all the addresses
3352        // We don't know which validator was the leader because we don't have access to Membership
3353        let mut addresses = HashSet::new();
3354        for v in validators.values() {
3355            addresses.insert(v.account);
3356            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3357        }
3358        // get all the validators
3359        let validators = client
3360            .get::<ValidatorMap>("node/validators/4")
3361            .send()
3362            .await
3363            .expect("failed to get validator");
3364        for v in validators.values() {
3365            addresses.insert(v.account);
3366            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3367        }
3368
3369        let mut prev_cumulative_amount = U256::ZERO;
3370        // Check Cumulative rewards for epoch 3
3371        // i.e block height 41 to 59
3372        for block in 41..=67 {
3373            let mut cumulative_amount = U256::ZERO;
3374            for address in addresses.clone() {
3375                let amount = client
3376                    .get::<Option<RewardAmount>>(&format!(
3377                        "reward-state/reward-balance/{block}/{address}"
3378                    ))
3379                    .send()
3380                    .await
3381                    .ok()
3382                    .flatten();
3383
3384                if let Some(amount) = amount {
3385                    tracing::info!("address={address}, amount={amount}");
3386                    cumulative_amount += amount.0;
3387                };
3388            }
3389
3390            // assert cumulative reward is equal to block reward
3391            assert_eq!(cumulative_amount - prev_cumulative_amount, block_reward.0);
3392            tracing::info!("cumulative_amount is correct for block={block}");
3393            prev_cumulative_amount = cumulative_amount;
3394        }
3395
3396        Ok(())
3397    }
3398
3399    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3400    async fn test_stake_table_duplicate_events_from_contract() -> anyhow::Result<()> {
3401        // TODO(abdul): This test currently uses TestNetwork only for contract deployment and for L1 block number.
3402        // Once the stake table deployment logic is refactored and isolated, TestNetwork here will be unnecessary
3403
3404        let epoch_height = 20;
3405
3406        let network_config = TestConfigBuilder::default()
3407            .epoch_height(epoch_height)
3408            .build();
3409
3410        let api_port = pick_unused_port().expect("No ports free for query service");
3411
3412        const NUM_NODES: usize = 5;
3413        // Initialize nodes.
3414        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3415        let persistence: [_; NUM_NODES] = storage
3416            .iter()
3417            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3418            .collect::<Vec<_>>()
3419            .try_into()
3420            .unwrap();
3421
3422        let l1_url = network_config.l1_url();
3423        let config = TestNetworkConfigBuilder::with_num_nodes()
3424            .api_config(SqlDataSource::options(
3425                &storage[0],
3426                Options::with_port(api_port),
3427            ))
3428            .network_config(network_config)
3429            .persistences(persistence.clone())
3430            .catchups(std::array::from_fn(|_| {
3431                StatePeers::<StaticVersion<0, 1>>::from_urls(
3432                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
3433                    Default::default(),
3434                    &NoMetrics,
3435                )
3436            }))
3437            .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3438            .await
3439            .unwrap()
3440            .build();
3441
3442        let network = TestNetwork::new(config, PosVersionV3::new()).await;
3443
3444        let mut prev_st = None;
3445        let state = network.server.decided_state().await;
3446        let chain_config = state.chain_config.resolve().expect("resolve chain config");
3447        let stake_table = chain_config.stake_table_contract.unwrap();
3448
3449        let l1_client = L1ClientOptions::default()
3450            .connect(vec![l1_url])
3451            .expect("failed to connect to l1");
3452
3453        let client: Client<ServerError, SequencerApiVersion> =
3454            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3455
3456        let mut headers = client
3457            .socket("availability/stream/headers/0")
3458            .subscribe::<Header>()
3459            .await
3460            .unwrap();
3461
3462        let mut target_bh = 0;
3463        while let Some(header) = headers.next().await {
3464            let header = header.unwrap();
3465            if header.height() == 0 {
3466                continue;
3467            }
3468            let l1_block = header.l1_finalized().expect("l1 block not found");
3469
3470            let events = Fetcher::fetch_events_from_contract(
3471                l1_client.clone(),
3472                stake_table,
3473                None,
3474                l1_block.number(),
3475            )
3476            .await;
3477            let sorted_events = events.sort_events().expect("failed to sort");
3478
3479            let mut sorted_dedup_removed = sorted_events.clone();
3480            sorted_dedup_removed.dedup();
3481
3482            assert_eq!(
3483                sorted_events.len(),
3484                sorted_dedup_removed.len(),
3485                "duplicates found"
3486            );
3487
3488            // This also checks if there is a duplicate registration
3489            let stake_table =
3490                validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e)).unwrap();
3491            if let Some(prev_st) = prev_st {
3492                assert_eq!(stake_table, prev_st);
3493            }
3494
3495            prev_st = Some(stake_table);
3496
3497            if target_bh == 100 {
3498                break;
3499            }
3500
3501            target_bh = header.height();
3502        }
3503
3504        Ok(())
3505    }
3506
3507    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3508    async fn test_rewards_v3() -> anyhow::Result<()> {
3509        // The test registers multiple delegators for each validator
3510        // It verifies that no rewards are distributed in the first two epochs
3511        // and that rewards are correctly allocated starting from the third epoch.
3512        // also checks that the total stake of delegators matches the stake of the validator
3513        // and that the calculated rewards match those obtained via the merklized state api
3514        const EPOCH_HEIGHT: u64 = 20;
3515
3516        let network_config = TestConfigBuilder::default()
3517            .epoch_height(EPOCH_HEIGHT)
3518            .build();
3519
3520        let api_port = pick_unused_port().expect("No ports free for query service");
3521
3522        const NUM_NODES: usize = 7;
3523
3524        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3525        let persistence: [_; NUM_NODES] = storage
3526            .iter()
3527            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3528            .collect::<Vec<_>>()
3529            .try_into()
3530            .unwrap();
3531
3532        let config = TestNetworkConfigBuilder::with_num_nodes()
3533            .api_config(SqlDataSource::options(
3534                &storage[0],
3535                Options::with_port(api_port),
3536            ))
3537            .network_config(network_config)
3538            .persistences(persistence.clone())
3539            .catchups(std::array::from_fn(|_| {
3540                StatePeers::<StaticVersion<0, 1>>::from_urls(
3541                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
3542                    Default::default(),
3543                    &NoMetrics,
3544                )
3545            }))
3546            .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3547            .await
3548            .unwrap()
3549            .build();
3550
3551        let network = TestNetwork::new(config, PosVersionV3::new()).await;
3552        let client: Client<ServerError, SequencerApiVersion> =
3553            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3554
3555        // Wait for 3 epochs to allow rewards distribution to take effect.
3556        let mut events = network.peers[0].event_stream().await;
3557        while let Some(event) = events.next().await {
3558            if let EventType::Decide { leaf_chain, .. } = event.event {
3559                let height = leaf_chain[0].leaf.height();
3560                tracing::info!("Node 0 decided at height: {height}");
3561                if height > EPOCH_HEIGHT * 3 {
3562                    break;
3563                }
3564            }
3565        }
3566
3567        // Verify that there are no validators for epoch # 1 and epoch # 2
3568        {
3569            client
3570                .get::<ValidatorMap>("node/validators/1")
3571                .send()
3572                .await
3573                .unwrap()
3574                .is_empty();
3575
3576            client
3577                .get::<ValidatorMap>("node/validators/2")
3578                .send()
3579                .await
3580                .unwrap()
3581                .is_empty();
3582        }
3583
3584        // Get the epoch # 3 validators
3585        let validators = client
3586            .get::<ValidatorMap>("node/validators/3")
3587            .send()
3588            .await
3589            .expect("validators");
3590
3591        assert!(!validators.is_empty());
3592
3593        // Collect addresses to track rewards for all participants.
3594        let mut addresses = HashSet::new();
3595        for v in validators.values() {
3596            addresses.insert(v.account);
3597            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3598        }
3599
3600        // Verify no rewards are distributed in the first two epochs.
3601        for block in 0..=EPOCH_HEIGHT * 2 {
3602            for address in addresses.clone() {
3603                let amount = client
3604                    .get::<Option<RewardAmount>>(&format!(
3605                        "reward-state/reward-balance/{block}/{address}"
3606                    ))
3607                    .send()
3608                    .await
3609                    .ok()
3610                    .flatten();
3611                assert!(amount.is_none(), "amount is not none for block {block}")
3612            }
3613        }
3614
3615        // Collect leaves for epoch 3 to 5 to verify reward calculations.
3616        let leaves = client
3617            .socket("availability/stream/leaves/41")
3618            .subscribe::<LeafQueryData<SeqTypes>>()
3619            .await
3620            .unwrap()
3621            .take((EPOCH_HEIGHT * 3).try_into().unwrap())
3622            .try_collect::<Vec<_>>()
3623            .await
3624            .unwrap();
3625
3626        let node_state = network.server.node_state();
3627        let coordinator = node_state.coordinator;
3628
3629        let membership = coordinator.membership().read().await;
3630        let block_reward = membership
3631            .block_reward(None)
3632            .expect("block reward is not None");
3633
3634        drop(membership);
3635
3636        let mut rewards_map = HashMap::new();
3637
3638        for leaf in leaves {
3639            let block = leaf.height();
3640            tracing::info!("verify rewards for block={block:?}");
3641            let membership = coordinator.membership().read().await;
3642            let epoch = epoch_from_block_number(block, EPOCH_HEIGHT);
3643            let epoch_number = EpochNumber::new(epoch);
3644            let leader = membership
3645                .leader(leaf.leaf().view_number(), Some(epoch_number))
3646                .expect("leader");
3647            let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
3648
3649            drop(membership);
3650
3651            let validators = client
3652                .get::<ValidatorMap>(&format!("node/validators/{epoch}"))
3653                .send()
3654                .await
3655                .expect("validators");
3656
3657            let leader_validator = validators
3658                .get(&leader_eth_address)
3659                .expect("leader not found");
3660
3661            let distributor =
3662                RewardDistributor::new(leader_validator.clone(), block_reward, U256::ZERO.into());
3663            // Verify that the sum of delegator stakes equals the validator's total stake.
3664            for validator in validators.values() {
3665                let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
3666
3667                assert_eq!(delegator_stake_sum, validator.stake);
3668            }
3669
3670            let computed_rewards = distributor.compute_rewards().expect("reward computation");
3671
3672            // Verify that the leader commission amount is within the tolerated range.
3673            // Due to potential rounding errors in decimal calculations for delegator rewards,
3674            // the actual distributed commission
3675            // amount may differ very slightly from the calculated value.
3676            // this asserts that it is within 10wei tolerance level.
3677            // 10 wei is 10* 10E-18
3678            let total_reward = block_reward.0;
3679            let leader_commission_basis_points = U256::from(leader_validator.commission);
3680            let calculated_leader_commission_reward = leader_commission_basis_points
3681                .checked_mul(total_reward)
3682                .context("overflow")?
3683                .checked_div(U256::from(COMMISSION_BASIS_POINTS))
3684                .context("overflow")?;
3685
3686            assert!(
3687                computed_rewards.leader_commission().0 - calculated_leader_commission_reward
3688                    <= U256::from(10_u64)
3689            );
3690
3691            // Aggregate reward amounts by address in the map.
3692            // This is necessary because there can be two entries for a leader address:
3693            // - One entry for commission rewards.
3694            // - Another for delegator rewards when the leader is delegating.
3695            // Also, rewards are accumulated for the same addresses
3696            let leader_commission = *computed_rewards.leader_commission();
3697            for (address, amount) in computed_rewards.delegators().clone() {
3698                rewards_map
3699                    .entry(address)
3700                    .and_modify(|entry| *entry += amount)
3701                    .or_insert(amount);
3702            }
3703
3704            // add leader commission reward
3705            rewards_map
3706                .entry(leader_eth_address)
3707                .and_modify(|entry| *entry += leader_commission)
3708                .or_insert(leader_commission);
3709
3710            // assert that the reward matches to what is in the reward merkle tree
3711            for (address, calculated_amount) in rewards_map.iter() {
3712                let amount_from_api = client
3713                    .get::<Option<RewardAmount>>(&format!(
3714                        "reward-state/reward-balance/{block}/{address}"
3715                    ))
3716                    .send()
3717                    .await
3718                    .ok()
3719                    .flatten()
3720                    .expect("amount");
3721                assert_eq!(amount_from_api, *calculated_amount)
3722            }
3723        }
3724
3725        Ok(())
3726    }
3727
3728    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3729    async fn test_rewards_v4() -> anyhow::Result<()> {
3730        // This test verifies PoS reward distribution logic for multiple delegators per validator.
3731        //
3732        //  assertions:
3733        // - No rewards are distributed during the first 2 epochs.
3734        // - Rewards begin from epoch 3 onward.
3735        // - Delegator stake sums match the corresponding validator stake.
3736        // - Reward values match those returned by the reward state API.
3737        // - Commission calculations are within a small acceptable rounding tolerance.
3738        // - Ensure that the `total_reward_distributed` field in the block header matches the total block reward distributed
3739        const EPOCH_HEIGHT: u64 = 20;
3740
3741        type V4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
3742
3743        let network_config = TestConfigBuilder::default()
3744            .epoch_height(EPOCH_HEIGHT)
3745            .build();
3746
3747        let api_port = pick_unused_port().expect("No ports free for query service");
3748
3749        const NUM_NODES: usize = 5;
3750
3751        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3752        let persistence: [_; NUM_NODES] = storage
3753            .iter()
3754            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3755            .collect::<Vec<_>>()
3756            .try_into()
3757            .unwrap();
3758
3759        let config = TestNetworkConfigBuilder::with_num_nodes()
3760            .api_config(SqlDataSource::options(
3761                &storage[0],
3762                Options::with_port(api_port),
3763            ))
3764            .network_config(network_config)
3765            .persistences(persistence.clone())
3766            .catchups(std::array::from_fn(|_| {
3767                StatePeers::<StaticVersion<0, 1>>::from_urls(
3768                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
3769                    Default::default(),
3770                    &NoMetrics,
3771                )
3772            }))
3773            .pos_hook::<V4>(DelegationConfig::MultipleDelegators, Default::default())
3774            .await
3775            .unwrap()
3776            .build();
3777
3778        let network = TestNetwork::new(config, V4::new()).await;
3779        let client: Client<ServerError, SequencerApiVersion> =
3780            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3781
3782        // Wait for the chain to progress beyond epoch 3 so rewards start being distributed.
3783        let mut events = network.peers[0].event_stream().await;
3784        while let Some(event) = events.next().await {
3785            if let EventType::Decide { leaf_chain, .. } = event.event {
3786                let height = leaf_chain[0].leaf.height();
3787                tracing::info!("Node 0 decided at height: {height}");
3788                if height > EPOCH_HEIGHT * 3 {
3789                    break;
3790                }
3791            }
3792        }
3793
3794        // Verify that there are no validators for epoch # 1 and epoch # 2
3795        {
3796            client
3797                .get::<ValidatorMap>("node/validators/1")
3798                .send()
3799                .await
3800                .unwrap()
3801                .is_empty();
3802
3803            client
3804                .get::<ValidatorMap>("node/validators/2")
3805                .send()
3806                .await
3807                .unwrap()
3808                .is_empty();
3809        }
3810
3811        // Get the epoch # 3 validators
3812        let validators = client
3813            .get::<ValidatorMap>("node/validators/3")
3814            .send()
3815            .await
3816            .expect("validators");
3817
3818        assert!(!validators.is_empty());
3819
3820        // Collect addresses to track rewards for all participants.
3821        let mut addresses = HashSet::new();
3822        for v in validators.values() {
3823            addresses.insert(v.account);
3824            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3825        }
3826
3827        let mut leaves = client
3828            .socket("availability/stream/leaves/0")
3829            .subscribe::<LeafQueryData<SeqTypes>>()
3830            .await
3831            .unwrap();
3832
3833        let node_state = network.server.node_state();
3834        let coordinator = node_state.coordinator;
3835
3836        let membership = coordinator.membership().read().await;
3837
3838        // Ensure rewards remain zero up for the first two epochs
3839        while let Some(leaf) = leaves.next().await {
3840            let leaf = leaf.unwrap();
3841            let header = leaf.header();
3842            assert_eq!(header.total_reward_distributed().unwrap().0, U256::ZERO);
3843
3844            let epoch_number =
3845                EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
3846
3847            assert!(membership.block_reward(Some(epoch_number)).is_none());
3848
3849            let height = header.height();
3850            for address in addresses.clone() {
3851                let amount = client
3852                    .get::<Option<RewardAmount>>(&format!(
3853                        "reward-state-v2/reward-balance/{height}/{address}"
3854                    ))
3855                    .send()
3856                    .await
3857                    .ok()
3858                    .flatten();
3859                assert!(amount.is_none(), "amount is not none for block {height}")
3860            }
3861
3862            if leaf.height() == EPOCH_HEIGHT * 2 {
3863                break;
3864            }
3865        }
3866
3867        drop(membership);
3868
3869        let mut rewards_map = HashMap::new();
3870        let mut total_distributed = U256::ZERO;
3871        let mut epoch_rewards = HashMap::<EpochNumber, U256>::new();
3872
3873        while let Some(leaf) = leaves.next().await {
3874            let leaf = leaf.unwrap();
3875
3876            let header = leaf.header();
3877            let distributed = header
3878                .total_reward_distributed()
3879                .expect("rewards distributed is none");
3880
3881            let block = leaf.height();
3882            tracing::info!("verify rewards for block={block:?}");
3883            let membership = coordinator.membership().read().await;
3884            let epoch_number =
3885                EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
3886
3887            let block_reward = membership.block_reward(Some(epoch_number)).unwrap();
3888            let leader = membership
3889                .leader(leaf.leaf().view_number(), Some(epoch_number))
3890                .expect("leader");
3891            let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
3892
3893            drop(membership);
3894
3895            let validators = client
3896                .get::<ValidatorMap>(&format!("node/validators/{epoch_number}"))
3897                .send()
3898                .await
3899                .expect("validators");
3900
3901            let leader_validator = validators
3902                .get(&leader_eth_address)
3903                .expect("leader not found");
3904
3905            let distributor =
3906                RewardDistributor::new(leader_validator.clone(), block_reward, distributed);
3907            // Verify that the sum of delegator stakes equals the validator's total stake.
3908            for validator in validators.values() {
3909                let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
3910
3911                assert_eq!(delegator_stake_sum, validator.stake);
3912            }
3913
3914            let computed_rewards = distributor.compute_rewards().expect("reward computation");
3915
3916            // Validate that the leader's commission is within a 10 wei tolerance of the expected value.
3917            let total_reward = block_reward.0;
3918            let leader_commission_basis_points = U256::from(leader_validator.commission);
3919            let calculated_leader_commission_reward = leader_commission_basis_points
3920                .checked_mul(total_reward)
3921                .context("overflow")?
3922                .checked_div(U256::from(COMMISSION_BASIS_POINTS))
3923                .context("overflow")?;
3924
3925            assert!(
3926                computed_rewards.leader_commission().0 - calculated_leader_commission_reward
3927                    <= U256::from(10_u64)
3928            );
3929
3930            // Aggregate rewards by address (both delegator and leader).
3931            let leader_commission = *computed_rewards.leader_commission();
3932            for (address, amount) in computed_rewards.delegators().clone() {
3933                rewards_map
3934                    .entry(address)
3935                    .and_modify(|entry| *entry += amount)
3936                    .or_insert(amount);
3937            }
3938
3939            // add leader commission reward
3940            rewards_map
3941                .entry(leader_eth_address)
3942                .and_modify(|entry| *entry += leader_commission)
3943                .or_insert(leader_commission);
3944
3945            // assert that the reward matches to what is in the reward merkle tree
3946            for (address, calculated_amount) in rewards_map.iter() {
3947                let mut attempt = 0;
3948                let amount_from_api = loop {
3949                    let result = client
3950                        .get::<Option<RewardAmount>>(&format!(
3951                            "reward-state-v2/reward-balance/{block}/{address}"
3952                        ))
3953                        .send()
3954                        .await
3955                        .ok()
3956                        .flatten();
3957
3958                    if let Some(amount) = result {
3959                        break amount;
3960                    }
3961
3962                    attempt += 1;
3963                    if attempt >= 3 {
3964                        panic!(
3965                            "Failed to fetch reward amount for address {address} after 3 retries"
3966                        );
3967                    }
3968
3969                    sleep(Duration::from_secs(2)).await;
3970                };
3971
3972                assert_eq!(amount_from_api, *calculated_amount);
3973            }
3974
3975            // Confirm the header's total distributed field matches the cumulative expected amount.
3976            total_distributed += block_reward.0;
3977            assert_eq!(
3978                header.total_reward_distributed().unwrap().0,
3979                total_distributed
3980            );
3981
3982            // Block reward shouldn't change for the same epoch
3983            epoch_rewards
3984                .entry(epoch_number)
3985                .and_modify(|r| assert_eq!(*r, block_reward.0))
3986                .or_insert(block_reward.0);
3987
3988            // Stop the test after verifying 5 full epochs.
3989            if leaf.height() == EPOCH_HEIGHT * 5 {
3990                break;
3991            }
3992        }
3993
3994        Ok(())
3995    }
3996
3997    #[rstest]
3998    #[case(PosVersionV3::new())]
3999    #[case(PosVersionV4::new())]
4000    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4001
4002    async fn test_node_stake_table_api<Ver: Versions>(#[case] ver: Ver) {
4003        let epoch_height = 20;
4004
4005        let network_config = TestConfigBuilder::default()
4006            .epoch_height(epoch_height)
4007            .build();
4008
4009        let api_port = pick_unused_port().expect("No ports free for query service");
4010
4011        const NUM_NODES: usize = 2;
4012        // Initialize nodes.
4013        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4014        let persistence: [_; NUM_NODES] = storage
4015            .iter()
4016            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4017            .collect::<Vec<_>>()
4018            .try_into()
4019            .unwrap();
4020
4021        let config = TestNetworkConfigBuilder::with_num_nodes()
4022            .api_config(SqlDataSource::options(
4023                &storage[0],
4024                Options::with_port(api_port),
4025            ))
4026            .network_config(network_config)
4027            .persistences(persistence.clone())
4028            .catchups(std::array::from_fn(|_| {
4029                StatePeers::<StaticVersion<0, 1>>::from_urls(
4030                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4031                    Default::default(),
4032                    &NoMetrics,
4033                )
4034            }))
4035            .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4036            .await
4037            .unwrap()
4038            .build();
4039
4040        let _network = TestNetwork::new(config, ver).await;
4041
4042        let client: Client<ServerError, SequencerApiVersion> =
4043            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4044
4045        // wait for atleast 2 epochs
4046        let _blocks = client
4047            .socket("availability/stream/blocks/0")
4048            .subscribe::<BlockQueryData<SeqTypes>>()
4049            .await
4050            .unwrap()
4051            .take(40)
4052            .try_collect::<Vec<_>>()
4053            .await
4054            .unwrap();
4055
4056        for i in 1..=3 {
4057            let _st = client
4058                .get::<Vec<PeerConfig<SeqTypes>>>(&format!("node/stake-table/{}", i as u64))
4059                .send()
4060                .await
4061                .expect("failed to get stake table");
4062        }
4063
4064        let _st = client
4065            .get::<StakeTableWithEpochNumber<SeqTypes>>("node/stake-table/current")
4066            .send()
4067            .await
4068            .expect("failed to get stake table");
4069    }
4070
4071    #[rstest]
4072    #[case(PosVersionV3::new())]
4073    #[case(PosVersionV4::new())]
4074    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4075
4076    async fn test_epoch_stake_table_catchup<Ver: Versions>(#[case] ver: Ver) {
4077        const EPOCH_HEIGHT: u64 = 10;
4078        const NUM_NODES: usize = 6;
4079
4080        let port = pick_unused_port().expect("No ports free");
4081
4082        let network_config = TestConfigBuilder::default()
4083            .epoch_height(EPOCH_HEIGHT)
4084            .build();
4085
4086        // Initialize storage for each node
4087        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4088
4089        let persistence_options: [_; NUM_NODES] = storage
4090            .iter()
4091            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4092            .collect::<Vec<_>>()
4093            .try_into()
4094            .unwrap();
4095
4096        // setup catchup peers
4097        let catchup_peers = std::array::from_fn(|_| {
4098            StatePeers::<StaticVersion<0, 1>>::from_urls(
4099                vec![format!("http://localhost:{port}").parse().unwrap()],
4100                Default::default(),
4101                &NoMetrics,
4102            )
4103        });
4104        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
4105            .api_config(SqlDataSource::options(
4106                &storage[0],
4107                Options::with_port(port),
4108            ))
4109            .network_config(network_config)
4110            .persistences(persistence_options.clone())
4111            .catchups(catchup_peers)
4112            .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4113            .await
4114            .unwrap()
4115            .build();
4116
4117        let state = config.states()[0].clone();
4118        let mut network = TestNetwork::new(config, ver).await;
4119
4120        // Wait for the peer 0 (node 1) to advance past three epochs
4121        let mut events = network.peers[0].event_stream().await;
4122        while let Some(event) = events.next().await {
4123            if let EventType::Decide { leaf_chain, .. } = event.event {
4124                let height = leaf_chain[0].leaf.height();
4125                tracing::info!("Node 0 decided at height: {height}");
4126                if height > EPOCH_HEIGHT * 3 {
4127                    break;
4128                }
4129            }
4130        }
4131
4132        // Shutdown and remove node 1 to simulate falling behind
4133        tracing::info!("Shutting down peer 0");
4134        network.peers.remove(0);
4135
4136        // Wait for epochs to progress with node 1 offline
4137        let mut events = network.server.event_stream().await;
4138        while let Some(event) = events.next().await {
4139            if let EventType::Decide { leaf_chain, .. } = event.event {
4140                let height = leaf_chain[0].leaf.height();
4141                if height > EPOCH_HEIGHT * 7 {
4142                    break;
4143                }
4144            }
4145        }
4146
4147        // add node 1 to the network with fresh storage
4148        let storage = SqlDataSource::create_storage().await;
4149        let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
4150        tracing::info!("Restarting peer 0");
4151        let node = network
4152            .cfg
4153            .init_node(
4154                1,
4155                state,
4156                options,
4157                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4158                    vec![format!("http://localhost:{port}").parse().unwrap()],
4159                    Default::default(),
4160                    &NoMetrics,
4161                )),
4162                None,
4163                &NoMetrics,
4164                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4165                NullEventConsumer,
4166                ver,
4167                Default::default(),
4168            )
4169            .await;
4170
4171        let coordinator = node.node_state().coordinator;
4172        let server_node_state = network.server.node_state();
4173        let server_coordinator = server_node_state.coordinator;
4174        // Verify that the restarted node catches up for each epoch
4175        for epoch_num in 1..=7 {
4176            let epoch = EpochNumber::new(epoch_num);
4177            let membership_for_epoch = coordinator.membership_for_epoch(Some(epoch)).await;
4178            if membership_for_epoch.is_err() {
4179                coordinator.wait_for_catchup(epoch).await.unwrap();
4180            }
4181
4182            println!("have stake table for epoch = {epoch_num}");
4183
4184            let node_stake_table = coordinator
4185                .membership()
4186                .read()
4187                .await
4188                .stake_table(Some(epoch));
4189            let stake_table = server_coordinator
4190                .membership()
4191                .read()
4192                .await
4193                .stake_table(Some(epoch));
4194            println!("asserting stake table for epoch = {epoch_num}");
4195
4196            assert_eq!(
4197                node_stake_table, stake_table,
4198                "Stake table mismatch for epoch {epoch_num}",
4199            );
4200        }
4201    }
4202
4203    #[rstest]
4204    #[case(PosVersionV3::new())]
4205    #[case(PosVersionV4::new())]
4206    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4207
4208    async fn test_epoch_stake_table_catchup_stress<Ver: Versions>(#[case] versions: Ver) {
4209        const EPOCH_HEIGHT: u64 = 10;
4210        const NUM_NODES: usize = 6;
4211
4212        let port = pick_unused_port().expect("No ports free");
4213
4214        let network_config = TestConfigBuilder::default()
4215            .epoch_height(EPOCH_HEIGHT)
4216            .build();
4217
4218        // Initialize storage for each node
4219        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4220
4221        let persistence_options: [_; NUM_NODES] = storage
4222            .iter()
4223            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4224            .collect::<Vec<_>>()
4225            .try_into()
4226            .unwrap();
4227
4228        // setup catchup peers
4229        let catchup_peers = std::array::from_fn(|_| {
4230            StatePeers::<StaticVersion<0, 1>>::from_urls(
4231                vec![format!("http://localhost:{port}").parse().unwrap()],
4232                Default::default(),
4233                &NoMetrics,
4234            )
4235        });
4236        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
4237            .api_config(SqlDataSource::options(
4238                &storage[0],
4239                Options::with_port(port),
4240            ))
4241            .network_config(network_config)
4242            .persistences(persistence_options.clone())
4243            .catchups(catchup_peers)
4244            .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4245            .await
4246            .unwrap()
4247            .build();
4248
4249        let state = config.states()[0].clone();
4250        let mut network = TestNetwork::new(config, versions).await;
4251
4252        // Wait for the peer 0 (node 1) to advance past three epochs
4253        let mut events = network.peers[0].event_stream().await;
4254        while let Some(event) = events.next().await {
4255            if let EventType::Decide { leaf_chain, .. } = event.event {
4256                let height = leaf_chain[0].leaf.height();
4257                tracing::info!("Node 0 decided at height: {height}");
4258                if height > EPOCH_HEIGHT * 3 {
4259                    break;
4260                }
4261            }
4262        }
4263
4264        // Shutdown and remove node 1 to simulate falling behind
4265        tracing::info!("Shutting down peer 0");
4266        network.peers.remove(0);
4267
4268        // Wait for epochs to progress with node 1 offline
4269        let mut events = network.server.event_stream().await;
4270        while let Some(event) = events.next().await {
4271            if let EventType::Decide { leaf_chain, .. } = event.event {
4272                let height = leaf_chain[0].leaf.height();
4273                tracing::info!("Server decided at height: {height}");
4274                //  until 7 epochs
4275                if height > EPOCH_HEIGHT * 7 {
4276                    break;
4277                }
4278            }
4279        }
4280
4281        // add node 1 to the network with fresh storage
4282        let storage = SqlDataSource::create_storage().await;
4283        let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
4284
4285        tracing::info!("Restarting peer 0");
4286        let node = network
4287            .cfg
4288            .init_node(
4289                1,
4290                state,
4291                options,
4292                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4293                    vec![format!("http://localhost:{port}").parse().unwrap()],
4294                    Default::default(),
4295                    &NoMetrics,
4296                )),
4297                None,
4298                &NoMetrics,
4299                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4300                NullEventConsumer,
4301                versions,
4302                Default::default(),
4303            )
4304            .await;
4305
4306        let coordinator = node.node_state().coordinator;
4307
4308        let server_node_state = network.server.node_state();
4309        let server_coordinator = server_node_state.coordinator;
4310
4311        // Trigger catchup for all epochs in quick succession and in random order
4312        let mut rand_epochs: Vec<_> = (1..=7).collect();
4313        rand_epochs.shuffle(&mut rand::thread_rng());
4314        println!("trigger catchup in this order: {rand_epochs:?}");
4315        for epoch_num in rand_epochs {
4316            let epoch = EpochNumber::new(epoch_num);
4317            let _ = coordinator.membership_for_epoch(Some(epoch)).await;
4318        }
4319
4320        // Verify that the restarted node catches up for each epoch
4321        for epoch_num in 1..=7 {
4322            println!("getting stake table for epoch = {epoch_num}");
4323            let epoch = EpochNumber::new(epoch_num);
4324            let _ = coordinator.wait_for_catchup(epoch).await.unwrap();
4325
4326            println!("have stake table for epoch = {epoch_num}");
4327
4328            let node_stake_table = coordinator
4329                .membership()
4330                .read()
4331                .await
4332                .stake_table(Some(epoch));
4333            let stake_table = server_coordinator
4334                .membership()
4335                .read()
4336                .await
4337                .stake_table(Some(epoch));
4338
4339            println!("asserting stake table for epoch = {epoch_num}");
4340
4341            assert_eq!(
4342                node_stake_table, stake_table,
4343                "Stake table mismatch for epoch {epoch_num}",
4344            );
4345        }
4346    }
4347
4348    #[rstest]
4349    #[case(PosVersionV3::new())]
4350    #[case(PosVersionV4::new())]
4351    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4352    async fn test_merklized_state_catchup_on_restart<Ver: Versions>(
4353        #[case] versions: Ver,
4354    ) -> anyhow::Result<()> {
4355        // This test verifies that a query node can catch up on
4356        // merklized state after being offline for multiple epochs.
4357        //
4358        // Steps:
4359        // 1. Start a test network with 5 sequencer nodes.
4360        // 2. Start a separate node with the query module enabled, connected to the network.
4361        //    - This node stores merklized state
4362        // 3. Shut down the query node after 1 epoch.
4363        // 4. Allow the network to progress 3 more epochs (query node remains offline).
4364        // 5. Restart the query node.
4365        //    - The node is expected to reconstruct or catch up on its own
4366        const EPOCH_HEIGHT: u64 = 10;
4367
4368        let network_config = TestConfigBuilder::default()
4369            .epoch_height(EPOCH_HEIGHT)
4370            .build();
4371
4372        let api_port = pick_unused_port().expect("No ports free for query service");
4373
4374        tracing::info!("API PORT = {api_port}");
4375        const NUM_NODES: usize = 5;
4376
4377        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4378        let persistence: [_; NUM_NODES] = storage
4379            .iter()
4380            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4381            .collect::<Vec<_>>()
4382            .try_into()
4383            .unwrap();
4384
4385        let config = TestNetworkConfigBuilder::with_num_nodes()
4386            .api_config(SqlDataSource::options(
4387                &storage[0],
4388                Options::with_port(api_port).catchup(Default::default()),
4389            ))
4390            .network_config(network_config)
4391            .persistences(persistence.clone())
4392            .catchups(std::array::from_fn(|_| {
4393                StatePeers::<StaticVersion<0, 1>>::from_urls(
4394                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4395                    Default::default(),
4396                    &NoMetrics,
4397                )
4398            }))
4399            .pos_hook::<Ver>(
4400                DelegationConfig::MultipleDelegators,
4401                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
4402            )
4403            .await
4404            .unwrap()
4405            .build();
4406        let state = config.states()[0].clone();
4407        let mut network = TestNetwork::new(config, versions).await;
4408
4409        // Remove peer 0 and restart it with the query module enabled.
4410        // Adding an additional node to the test network is not straight forward,
4411        // as the keys have already been initialized in the config above.
4412        // So, we remove this node and re-add it using the same index.
4413        network.peers[0].shut_down().await;
4414        network.peers.remove(0);
4415        let node_0_storage = &storage[1];
4416        let node_0_persistence = persistence[1].clone();
4417        let node_0_port = pick_unused_port().expect("No ports free for query service");
4418        tracing::info!("node_0_port {node_0_port}");
4419        // enable query module with api peers
4420        let opt = Options::with_port(node_0_port).query_sql(
4421            Query {
4422                peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
4423            },
4424            tmp_options(node_0_storage),
4425        );
4426
4427        // start the query node so that it builds the merklized state
4428        let node_0 = opt
4429            .clone()
4430            .serve(|metrics, consumer, storage| {
4431                let cfg = network.cfg.clone();
4432                let node_0_persistence = node_0_persistence.clone();
4433                let state = state.clone();
4434                async move {
4435                    Ok(cfg
4436                        .init_node(
4437                            1,
4438                            state,
4439                            node_0_persistence.clone(),
4440                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4441                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
4442                                Default::default(),
4443                                &NoMetrics,
4444                            )),
4445                            storage,
4446                            &*metrics,
4447                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4448                            consumer,
4449                            versions,
4450                            Default::default(),
4451                        )
4452                        .await)
4453                }
4454                .boxed()
4455            })
4456            .await
4457            .unwrap();
4458
4459        let mut events = network.peers[2].event_stream().await;
4460        // wait for 1 epoch
4461        wait_for_epochs(&mut events, EPOCH_HEIGHT, 1).await;
4462
4463        // shutdown the node for 3 epochs
4464        drop(node_0);
4465
4466        // wait for 4 epochs
4467        wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
4468
4469        // start the node again.
4470        let node_0 = opt
4471            .serve(|metrics, consumer, storage| {
4472                let cfg = network.cfg.clone();
4473                async move {
4474                    Ok(cfg
4475                        .init_node(
4476                            1,
4477                            state,
4478                            node_0_persistence,
4479                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4480                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
4481                                Default::default(),
4482                                &NoMetrics,
4483                            )),
4484                            storage,
4485                            &*metrics,
4486                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4487                            consumer,
4488                            versions,
4489                            Default::default(),
4490                        )
4491                        .await)
4492                }
4493                .boxed()
4494            })
4495            .await
4496            .unwrap();
4497
4498        let client: Client<ServerError, SequencerApiVersion> =
4499            Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
4500        client.connect(None).await;
4501
4502        wait_for_epochs(&mut events, EPOCH_HEIGHT, 6).await;
4503
4504        let epoch_7_block = EPOCH_HEIGHT * 6 + 1;
4505
4506        // check that the node's state has reward accounts
4507        let mut retries = 0;
4508        loop {
4509            sleep(Duration::from_secs(1)).await;
4510            let state = node_0.decided_state().await;
4511
4512            let leaves = if Ver::Base::VERSION == EpochVersion::VERSION {
4513                // Use legacy tree for V3
4514                state.reward_merkle_tree_v1.num_leaves()
4515            } else {
4516                // Use new tree for V4 and above
4517                state.reward_merkle_tree_v2.num_leaves()
4518            };
4519
4520            if leaves > 0 {
4521                tracing::info!("Node's state has reward accounts");
4522                break;
4523            }
4524
4525            retries += 1;
4526            if retries > 120 {
4527                panic!("max retries reached. failed to catchup reward state");
4528            }
4529        }
4530
4531        retries = 0;
4532        // check that the node has stored atleast 6 epochs merklized state in persistence
4533        loop {
4534            sleep(Duration::from_secs(3)).await;
4535
4536            let bh = client
4537                .get::<u64>("block-state/block-height")
4538                .send()
4539                .await
4540                .expect("block height not found");
4541
4542            tracing::info!("block state: block height={bh}");
4543            if bh > epoch_7_block {
4544                break;
4545            }
4546
4547            retries += 1;
4548            if retries > 30 {
4549                panic!(
4550                    "max retries reached. block state block height is less than epoch 7 start \
4551                     block"
4552                );
4553            }
4554        }
4555
4556        // shutdown consensus to freeze the state
4557        node_0.shutdown_consensus().await;
4558        let decided_leaf = node_0.decided_leaf().await;
4559        let state = node_0.decided_state().await;
4560
4561        state
4562            .block_merkle_tree
4563            .lookup(decided_leaf.height() - 1)
4564            .expect_ok()
4565            .expect("block state not found");
4566
4567        Ok(())
4568    }
4569
4570    #[rstest]
4571    #[case(PosVersionV3::new())]
4572    #[case(PosVersionV4::new())]
4573    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4574    async fn test_state_reconstruction<Ver: Versions>(
4575        #[case] pos_version: Ver,
4576    ) -> anyhow::Result<()> {
4577        // This test verifies that a query node can successfully reconstruct its state
4578        // after being shut down from the database
4579        //
4580        // Steps:
4581        // 1. Start a test network with 5 nodes.
4582        // 2. Add a query node connected to the network.
4583        // 3. Let the network run until 3 epochs have passed.
4584        // 4. Shut down the query node.
4585        // 5. Attempt to reconstruct its state from storage using:
4586        //    - No fee/reward accounts
4587        //    - Only fee accounts
4588        //    - Only reward accounts
4589        //    - Both fee and reward accounts
4590        // 6. Assert that the reconstructed state is correct in all scenarios.
4591
4592        const EPOCH_HEIGHT: u64 = 10;
4593
4594        let network_config = TestConfigBuilder::default()
4595            .epoch_height(EPOCH_HEIGHT)
4596            .build();
4597
4598        let api_port = pick_unused_port().expect("No ports free for query service");
4599
4600        tracing::info!("API PORT = {api_port}");
4601        const NUM_NODES: usize = 5;
4602
4603        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4604        let persistence: [_; NUM_NODES] = storage
4605            .iter()
4606            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4607            .collect::<Vec<_>>()
4608            .try_into()
4609            .unwrap();
4610
4611        let config = TestNetworkConfigBuilder::with_num_nodes()
4612            .api_config(SqlDataSource::options(
4613                &storage[0],
4614                Options::with_port(api_port),
4615            ))
4616            .network_config(network_config)
4617            .persistences(persistence.clone())
4618            .catchups(std::array::from_fn(|_| {
4619                StatePeers::<StaticVersion<0, 1>>::from_urls(
4620                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4621                    Default::default(),
4622                    &NoMetrics,
4623                )
4624            }))
4625            .pos_hook::<Ver>(
4626                DelegationConfig::MultipleDelegators,
4627                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
4628            )
4629            .await
4630            .unwrap()
4631            .build();
4632        let state = config.states()[0].clone();
4633        let mut network = TestNetwork::new(config, pos_version).await;
4634        // Remove peer 0 and restart it with the query module enabled.
4635        // Adding an additional node to the test network is not straight forward,
4636        // as the keys have already been initialized in the config above.
4637        // So, we remove this node and re-add it using the same index.
4638        network.peers.remove(0);
4639
4640        let node_0_storage = &storage[1];
4641        let node_0_persistence = persistence[1].clone();
4642        let node_0_port = pick_unused_port().expect("No ports free for query service");
4643        tracing::info!("node_0_port {node_0_port}");
4644        let opt = Options::with_port(node_0_port).query_sql(
4645            Query {
4646                peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
4647            },
4648            tmp_options(node_0_storage),
4649        );
4650        let node_0 = opt
4651            .clone()
4652            .serve(|metrics, consumer, storage| {
4653                let cfg = network.cfg.clone();
4654                let node_0_persistence = node_0_persistence.clone();
4655                let state = state.clone();
4656                async move {
4657                    Ok(cfg
4658                        .init_node(
4659                            1,
4660                            state,
4661                            node_0_persistence.clone(),
4662                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4663                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
4664                                Default::default(),
4665                                &NoMetrics,
4666                            )),
4667                            storage,
4668                            &*metrics,
4669                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4670                            consumer,
4671                            pos_version,
4672                            Default::default(),
4673                        )
4674                        .await)
4675                }
4676                .boxed()
4677            })
4678            .await
4679            .unwrap();
4680
4681        let mut events = network.peers[2].event_stream().await;
4682        // Wait until at least 3 epochs have passed
4683        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
4684
4685        tracing::warn!("shutting down node 0");
4686
4687        node_0.shutdown_consensus().await;
4688
4689        let instance = node_0.node_state();
4690        let state = node_0.decided_state().await;
4691        let fee_accounts = state
4692            .fee_merkle_tree
4693            .clone()
4694            .into_iter()
4695            .map(|(acct, _)| acct)
4696            .collect::<Vec<_>>();
4697        let reward_accounts = match Ver::Base::VERSION {
4698            EpochVersion::VERSION => state
4699                .reward_merkle_tree_v1
4700                .clone()
4701                .into_iter()
4702                .map(|(acct, _)| RewardAccountV2::from(acct))
4703                .collect::<Vec<_>>(),
4704            DrbAndHeaderUpgradeVersion::VERSION => state
4705                .reward_merkle_tree_v2
4706                .clone()
4707                .into_iter()
4708                .map(|(acct, _)| acct)
4709                .collect::<Vec<_>>(),
4710            _ => panic!("invalid version"),
4711        };
4712
4713        let client: Client<ServerError, SequencerApiVersion> =
4714            Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
4715        client.connect(Some(Duration::from_secs(10))).await;
4716
4717        // wait 3s to be sure that all the
4718        // transactions have been committed
4719        sleep(Duration::from_secs(3)).await;
4720
4721        tracing::info!("getting node block height");
4722        let node_block_height = client
4723            .get::<u64>("node/block-height")
4724            .send()
4725            .await
4726            .context("getting Espresso block height")
4727            .unwrap();
4728
4729        tracing::info!("node block height={node_block_height}");
4730
4731        let leaf_query_data = client
4732            .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{}", node_block_height - 1))
4733            .send()
4734            .await
4735            .context("error getting leaf")
4736            .unwrap();
4737
4738        tracing::info!("leaf={leaf_query_data:?}");
4739        let leaf = leaf_query_data.leaf();
4740        let to_view = leaf.view_number() + 1;
4741
4742        let ds = SqlStorage::connect(Config::try_from(&node_0_persistence).unwrap())
4743            .await
4744            .unwrap();
4745        let mut tx = ds.write().await?;
4746
4747        let (state, leaf) =
4748            reconstruct_state(&instance, &mut tx, node_block_height - 1, to_view, &[], &[])
4749                .await
4750                .unwrap();
4751        assert_eq!(leaf.view_number(), to_view);
4752        assert!(
4753            state
4754                .block_merkle_tree
4755                .lookup(node_block_height - 1)
4756                .expect_ok()
4757                .is_ok(),
4758            "inconsistent block merkle tree"
4759        );
4760
4761        // Reconstruct fee state
4762        let (state, leaf) = reconstruct_state(
4763            &instance,
4764            &mut tx,
4765            node_block_height - 1,
4766            to_view,
4767            &fee_accounts,
4768            &[],
4769        )
4770        .await
4771        .unwrap();
4772
4773        assert_eq!(leaf.view_number(), to_view);
4774        assert!(
4775            state
4776                .block_merkle_tree
4777                .lookup(node_block_height - 1)
4778                .expect_ok()
4779                .is_ok(),
4780            "inconsistent block merkle tree"
4781        );
4782
4783        for account in &fee_accounts {
4784            state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
4785        }
4786
4787        // Reconstruct reward state
4788
4789        let (state, leaf) = reconstruct_state(
4790            &instance,
4791            &mut tx,
4792            node_block_height - 1,
4793            to_view,
4794            &[],
4795            &reward_accounts,
4796        )
4797        .await
4798        .unwrap();
4799
4800        match Ver::Base::VERSION {
4801            EpochVersion::VERSION => {
4802                for account in reward_accounts.clone() {
4803                    state
4804                        .reward_merkle_tree_v1
4805                        .lookup(RewardAccountV1::from(account))
4806                        .expect_ok()
4807                        .unwrap();
4808                }
4809            },
4810            DrbAndHeaderUpgradeVersion::VERSION => {
4811                for account in &reward_accounts {
4812                    state
4813                        .reward_merkle_tree_v2
4814                        .lookup(account)
4815                        .expect_ok()
4816                        .unwrap();
4817                }
4818            },
4819            _ => panic!("invalid version"),
4820        };
4821
4822        assert_eq!(leaf.view_number(), to_view);
4823        assert!(
4824            state
4825                .block_merkle_tree
4826                .lookup(node_block_height - 1)
4827                .expect_ok()
4828                .is_ok(),
4829            "inconsistent block merkle tree"
4830        );
4831        // Reconstruct reward and fee state
4832
4833        let (state, leaf) = reconstruct_state(
4834            &instance,
4835            &mut tx,
4836            node_block_height - 1,
4837            to_view,
4838            &fee_accounts,
4839            &reward_accounts,
4840        )
4841        .await
4842        .unwrap();
4843
4844        assert!(
4845            state
4846                .block_merkle_tree
4847                .lookup(node_block_height - 1)
4848                .expect_ok()
4849                .is_ok(),
4850            "inconsistent block merkle tree"
4851        );
4852        assert_eq!(leaf.view_number(), to_view);
4853
4854        match Ver::Base::VERSION {
4855            EpochVersion::VERSION => {
4856                for account in reward_accounts.clone() {
4857                    state
4858                        .reward_merkle_tree_v1
4859                        .lookup(RewardAccountV1::from(account))
4860                        .expect_ok()
4861                        .unwrap();
4862                }
4863            },
4864            DrbAndHeaderUpgradeVersion::VERSION => {
4865                for account in &reward_accounts {
4866                    state
4867                        .reward_merkle_tree_v2
4868                        .lookup(account)
4869                        .expect_ok()
4870                        .unwrap();
4871                }
4872            },
4873            _ => panic!("invalid version"),
4874        };
4875
4876        for account in &fee_accounts {
4877            state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
4878        }
4879
4880        Ok(())
4881    }
4882
4883    /// Waits until a node has reached the given target epoch (exclusive).
4884    /// The function returns once the first event indicates an epoch higher than `target_epoch`.
4885    async fn wait_for_epochs(
4886        events: &mut (impl futures::Stream<Item = Event<SeqTypes>> + std::marker::Unpin),
4887        epoch_height: u64,
4888        target_epoch: u64,
4889    ) {
4890        while let Some(event) = events.next().await {
4891            if let EventType::Decide { leaf_chain, .. } = event.event {
4892                let leaf = leaf_chain[0].leaf.clone();
4893                let epoch = leaf.epoch(epoch_height);
4894                tracing::info!(
4895                    "Node decided at height: {}, epoch: {:?}",
4896                    leaf.height(),
4897                    epoch
4898                );
4899
4900                if epoch > Some(EpochNumber::new(target_epoch)) {
4901                    break;
4902                }
4903            }
4904        }
4905    }
4906
4907    #[rstest]
4908    #[case(PosVersionV3::new())]
4909    #[case(PosVersionV4::new())]
4910    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4911    async fn test_block_reward_api<Ver: Versions>(#[case] versions: Ver) -> anyhow::Result<()> {
4912        let epoch_height = 10;
4913
4914        let network_config = TestConfigBuilder::default()
4915            .epoch_height(epoch_height)
4916            .build();
4917
4918        let api_port = pick_unused_port().expect("No ports free for query service");
4919
4920        const NUM_NODES: usize = 1;
4921        // Initialize nodes.
4922        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4923        let persistence: [_; NUM_NODES] = storage
4924            .iter()
4925            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4926            .collect::<Vec<_>>()
4927            .try_into()
4928            .unwrap();
4929
4930        let config = TestNetworkConfigBuilder::with_num_nodes()
4931            .api_config(SqlDataSource::options(
4932                &storage[0],
4933                Options::with_port(api_port),
4934            ))
4935            .network_config(network_config.clone())
4936            .persistences(persistence.clone())
4937            .catchups(std::array::from_fn(|_| {
4938                StatePeers::<StaticVersion<0, 1>>::from_urls(
4939                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4940                    Default::default(),
4941                    &NoMetrics,
4942                )
4943            }))
4944            .pos_hook::<Ver>(DelegationConfig::VariableAmounts, Default::default())
4945            .await
4946            .unwrap()
4947            .build();
4948
4949        let _network = TestNetwork::new(config, versions).await;
4950        let client: Client<ServerError, SequencerApiVersion> =
4951            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4952
4953        let _blocks = client
4954            .socket("availability/stream/blocks/0")
4955            .subscribe::<BlockQueryData<SeqTypes>>()
4956            .await
4957            .unwrap()
4958            .take(3)
4959            .try_collect::<Vec<_>>()
4960            .await
4961            .unwrap();
4962
4963        let block_reward = client
4964            .get::<Option<RewardAmount>>("node/block-reward")
4965            .send()
4966            .await
4967            .expect("failed to get block reward")
4968            .expect("block reward is None");
4969        tracing::info!("block_reward={block_reward:?}");
4970
4971        assert!(block_reward.0 > U256::ZERO);
4972
4973        Ok(())
4974    }
4975
4976    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4977    async fn test_scanning_token_contract_initialized_event() -> anyhow::Result<()> {
4978        use espresso_types::v0_3::ChainConfig;
4979
4980        let blocks_per_epoch = 10;
4981
4982        let network_config = TestConfigBuilder::<1>::default()
4983            .epoch_height(blocks_per_epoch)
4984            .build();
4985
4986        let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
4987            &network_config.hotshot_config().hotshot_stake_table(),
4988            STAKE_TABLE_CAPACITY_FOR_TEST,
4989        )
4990        .unwrap();
4991
4992        let deployer = ProviderBuilder::new()
4993            .wallet(EthereumWallet::from(network_config.signer().clone()))
4994            .on_http(network_config.l1_url().clone());
4995
4996        let mut contracts = Contracts::new();
4997        let args = DeployerArgsBuilder::default()
4998            .deployer(deployer.clone())
4999            .mock_light_client(true)
5000            .genesis_lc_state(genesis_state)
5001            .genesis_st_state(genesis_stake)
5002            .blocks_per_epoch(blocks_per_epoch)
5003            .epoch_start_block(1)
5004            .multisig_pauser(network_config.signer().address())
5005            .token_name("Espresso".to_string())
5006            .token_symbol("ESP".to_string())
5007            .initial_token_supply(U256::from(3590000000u64))
5008            .ops_timelock_delay(U256::from(0))
5009            .ops_timelock_admin(network_config.signer().address())
5010            .ops_timelock_proposers(vec![network_config.signer().address()])
5011            .ops_timelock_executors(vec![network_config.signer().address()])
5012            .safe_exit_timelock_delay(U256::from(0))
5013            .safe_exit_timelock_admin(network_config.signer().address())
5014            .safe_exit_timelock_proposers(vec![network_config.signer().address()])
5015            .safe_exit_timelock_executors(vec![network_config.signer().address()])
5016            .build()
5017            .unwrap();
5018
5019        args.deploy_all(&mut contracts).await.unwrap();
5020
5021        let st_addr = contracts
5022            .address(Contract::StakeTableProxy)
5023            .expect("StakeTableProxy deployed");
5024
5025        let l1_url = network_config.l1_url().clone();
5026
5027        let storage = SqlDataSource::create_storage().await;
5028        let mut opt = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5029        let persistence = opt.create().await.unwrap();
5030
5031        let l1_client = L1ClientOptions {
5032            stake_table_update_interval: Duration::from_secs(7),
5033            l1_retry_delay: Duration::from_millis(10),
5034            l1_events_max_block_range: 10000,
5035            ..Default::default()
5036        }
5037        .connect(vec![l1_url])
5038        .unwrap();
5039        l1_client.spawn_tasks().await;
5040
5041        let fetcher = Fetcher::new(
5042            Arc::new(NullStateCatchup::default()),
5043            Arc::new(Mutex::new(persistence.clone())),
5044            l1_client.clone(),
5045            ChainConfig {
5046                stake_table_contract: Some(st_addr),
5047                base_fee: 0.into(),
5048                ..Default::default()
5049            },
5050        );
5051
5052        let provider = l1_client.provider;
5053        let stake_table = StakeTableV2::new(st_addr, provider.clone());
5054
5055        let stake_table_init_block = stake_table
5056            .initializedAtBlock()
5057            .block(BlockId::finalized())
5058            .call()
5059            .await?
5060            ._0
5061            .to::<u64>();
5062
5063        tracing::info!("stake table init block = {stake_table_init_block}");
5064
5065        let token_address = stake_table
5066            .token()
5067            .block(BlockId::finalized())
5068            .call()
5069            .await
5070            .context("Failed to get token address")?
5071            ._0;
5072
5073        let token = EspToken::new(token_address, provider.clone());
5074
5075        let init_log = fetcher
5076            .scan_token_contract_initialized_event_log(stake_table_init_block, token)
5077            .await
5078            .unwrap();
5079
5080        let init_tx = provider
5081            .get_transaction_receipt(
5082                init_log
5083                    .transaction_hash
5084                    .context(format!("transaction hash not found. init_log={init_log:?}"))?,
5085            )
5086            .await
5087            .unwrap()
5088            .unwrap();
5089
5090        let mint_transfer = init_tx.decoded_log::<EspToken::Transfer>().unwrap();
5091
5092        assert!(mint_transfer.value > U256::ZERO);
5093
5094        Ok(())
5095    }
5096
5097    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5098    async fn test_tx_metadata() {
5099        let port = pick_unused_port().expect("No ports free");
5100
5101        let url = format!("http://localhost:{port}").parse().unwrap();
5102        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5103
5104        let storage = SqlDataSource::create_storage().await;
5105        let network_config = TestConfigBuilder::default().build();
5106        let config = TestNetworkConfigBuilder::default()
5107            .api_config(
5108                SqlDataSource::options(&storage, Options::with_port(port))
5109                    .submit(Default::default())
5110                    .explorer(Default::default()),
5111            )
5112            .network_config(network_config)
5113            .build();
5114        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5115        let mut events = network.server.event_stream().await;
5116
5117        client.connect(None).await;
5118
5119        // Submit a few transactions in different namespaces.
5120        let namespace_counts = [(101, 1), (102, 2), (103, 3)];
5121        for (ns, count) in &namespace_counts {
5122            for i in 0..*count {
5123                let ns_id = NamespaceId::from(*ns as u64);
5124                let txn = Transaction::new(ns_id, vec![*ns, i]);
5125                client
5126                    .post::<()>("submit/submit")
5127                    .body_json(&txn)
5128                    .unwrap()
5129                    .send()
5130                    .await
5131                    .unwrap();
5132                let (block, _) = wait_for_decide_on_handle(&mut events, &txn).await;
5133
5134                // Block summary should contain information about the namespace.
5135                let summary: BlockSummaryQueryData<SeqTypes> = client
5136                    .get(&format!("availability/block/summary/{block}"))
5137                    .send()
5138                    .await
5139                    .unwrap();
5140                let ns_info = summary.namespaces();
5141                assert_eq!(ns_info.len(), 1);
5142                assert_eq!(ns_info.keys().copied().collect::<Vec<_>>(), vec![ns_id]);
5143                assert_eq!(ns_info[&ns_id].num_transactions, 1);
5144                assert_eq!(ns_info[&ns_id].size, txn.size_in_block(true));
5145            }
5146        }
5147
5148        // List transactions in each namespace.
5149        for (ns, count) in &namespace_counts {
5150            tracing::info!(ns, "list transactions in namespace");
5151
5152            let ns_id = NamespaceId::from(*ns as u64);
5153            let summaries: TransactionSummariesResponse<SeqTypes> = client
5154                .get(&format!(
5155                    "explorer/transactions/latest/{count}/namespace/{ns_id}"
5156                ))
5157                .send()
5158                .await
5159                .unwrap();
5160            let txs = summaries.transaction_summaries;
5161            assert_eq!(txs.len(), *count as usize);
5162
5163            // Check that transactions are listed in descending order.
5164            for i in 0..*count {
5165                let summary = &txs[i as usize];
5166                let expected = Transaction::new(ns_id, vec![*ns, count - i - 1]);
5167                assert_eq!(summary.rollups, vec![ns_id]);
5168                assert_eq!(summary.hash, expected.commit());
5169            }
5170        }
5171    }
5172
5173    use std::time::Instant;
5174
5175    use rand::thread_rng;
5176
5177    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5178    async fn test_aggregator_namespace_endpoints() {
5179        let mut rng = thread_rng();
5180
5181        let port = pick_unused_port().expect("No ports free");
5182
5183        let url = format!("http://localhost:{port}").parse().unwrap();
5184        tracing::info!("Sequencer URL = {url}");
5185        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5186
5187        let options = Options::with_port(port).submit(Default::default());
5188        const NUM_NODES: usize = 2;
5189        // Initialize storage for each node
5190        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5191
5192        let persistence_options: [_; NUM_NODES] = storage
5193            .iter()
5194            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5195            .collect::<Vec<_>>()
5196            .try_into()
5197            .unwrap();
5198
5199        let network_config = TestConfigBuilder::default().build();
5200
5201        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5202            .api_config(SqlDataSource::options(&storage[0], options))
5203            .network_config(network_config)
5204            .persistences(persistence_options.clone())
5205            .build();
5206        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5207        let mut events = network.server.event_stream().await;
5208        let start = Instant::now();
5209        let mut total_transactions = 0;
5210        let mut tx_heights = Vec::new();
5211        let mut sizes = HashMap::new();
5212        // inserting transactions for some namespaces
5213        // the number of transactions inserted is equal to namespace number.
5214        for namespace in 1..=4 {
5215            for _count in 0..namespace {
5216                // Generate a random payload length between 4 and 10 bytes
5217                let payload_len = rng.gen_range(4..=10);
5218                let payload: Vec<u8> = (0..payload_len).map(|_| rng.gen()).collect();
5219
5220                let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
5221
5222                client.connect(None).await;
5223
5224                let hash = client
5225                    .post("submit/submit")
5226                    .body_json(&txn)
5227                    .unwrap()
5228                    .send()
5229                    .await
5230                    .unwrap();
5231                assert_eq!(txn.commit(), hash);
5232
5233                // Wait for a Decide event containing transaction matching the one we sent
5234                let (height, size) = wait_for_decide_on_handle(&mut events, &txn).await;
5235                tx_heights.push(height);
5236                total_transactions += 1;
5237                *sizes.entry(namespace).or_insert(0) += size;
5238            }
5239        }
5240
5241        let duration = start.elapsed();
5242
5243        println!("Time elapsed to submit transactions: {duration:?}");
5244
5245        let last_tx_height = tx_heights.last().unwrap();
5246        for namespace in 1..=4 {
5247            let count = client
5248                .get::<u64>(&format!("node/transactions/count/namespace/{namespace}"))
5249                .send()
5250                .await
5251                .unwrap();
5252            assert_eq!(
5253                count, namespace as u64,
5254                "Incorrect transaction count for namespace {namespace}: expected {namespace}, got \
5255                 {count}"
5256            );
5257
5258            // check the range endpoint
5259            let to_endpoint_count = client
5260                .get::<u64>(&format!(
5261                    "node/transactions/count/namespace/{namespace}/{last_tx_height}"
5262                ))
5263                .send()
5264                .await
5265                .unwrap();
5266            assert_eq!(
5267                to_endpoint_count, namespace as u64,
5268                "Incorrect transaction count for range endpoint (to only) for namespace \
5269                 {namespace}: expected {namespace}, got {to_endpoint_count}"
5270            );
5271
5272            // check the range endpoint
5273            let from_to_endpoint_count = client
5274                .get::<u64>(&format!(
5275                    "node/transactions/count/namespace/{namespace}/0/{last_tx_height}"
5276                ))
5277                .send()
5278                .await
5279                .unwrap();
5280            assert_eq!(
5281                from_to_endpoint_count, namespace as u64,
5282                "Incorrect transaction count for range endpoint (from-to) for namespace \
5283                 {namespace}: expected {namespace}, got {from_to_endpoint_count}"
5284            );
5285
5286            let ns_size = client
5287                .get::<usize>(&format!("node/payloads/size/namespace/{namespace}"))
5288                .send()
5289                .await
5290                .unwrap();
5291
5292            let expected_ns_size = *sizes.get(&namespace).unwrap();
5293            assert_eq!(
5294                ns_size, expected_ns_size,
5295                "Incorrect payload size for namespace {namespace}: expected {expected_ns_size}, \
5296                 got {ns_size}"
5297            );
5298
5299            let ns_size_to = client
5300                .get::<usize>(&format!(
5301                    "node/payloads/size/namespace/{namespace}/{last_tx_height}"
5302                ))
5303                .send()
5304                .await
5305                .unwrap();
5306            assert_eq!(
5307                ns_size_to, expected_ns_size,
5308                "Incorrect payload size for namespace {namespace} up to height {last_tx_height}: \
5309                 expected {expected_ns_size}, got {ns_size_to}"
5310            );
5311
5312            let ns_size_from_to = client
5313                .get::<usize>(&format!(
5314                    "node/payloads/size/namespace/{namespace}/0/{last_tx_height}"
5315                ))
5316                .send()
5317                .await
5318                .unwrap();
5319            assert_eq!(
5320                ns_size_from_to, expected_ns_size,
5321                "Incorrect payload size for namespace {namespace} from 0 to height \
5322                 {last_tx_height}: expected {expected_ns_size}, got {ns_size_from_to}"
5323            );
5324        }
5325
5326        let total_tx_count = client
5327            .get::<u64>("node/transactions/count")
5328            .send()
5329            .await
5330            .unwrap();
5331        assert_eq!(
5332            total_tx_count, total_transactions,
5333            "Incorrect total transaction count: expected {total_transactions}, got \
5334             {total_tx_count}"
5335        );
5336
5337        let total_payload_size = client
5338            .get::<usize>("node/payloads/size")
5339            .send()
5340            .await
5341            .unwrap();
5342
5343        let expected_total_size: usize = sizes.values().copied().sum();
5344        assert_eq!(
5345            total_payload_size, expected_total_size,
5346            "Incorrect total payload size: expected {expected_total_size}, got \
5347             {total_payload_size}"
5348        );
5349    }
5350
5351    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5352    async fn test_stream_transactions_endpoint() {
5353        // This test submits transactions to a sequencer for multiple namespaces,
5354        // waits for them to be decided, and then verifies that:
5355        // 1. All transactions appear in the transaction stream.
5356        // 2. Each namespace-specific transaction stream only includes the transactions of that namespace.
5357
5358        let mut rng = thread_rng();
5359
5360        let port = pick_unused_port().expect("No ports free");
5361
5362        let url = format!("http://localhost:{port}").parse().unwrap();
5363        tracing::info!("Sequencer URL = {url}");
5364        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5365
5366        let options = Options::with_port(port).submit(Default::default());
5367        const NUM_NODES: usize = 2;
5368        // Initialize storage for each node
5369        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5370
5371        let persistence_options: [_; NUM_NODES] = storage
5372            .iter()
5373            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5374            .collect::<Vec<_>>()
5375            .try_into()
5376            .unwrap();
5377
5378        let network_config = TestConfigBuilder::default().build();
5379
5380        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5381            .api_config(SqlDataSource::options(&storage[0], options))
5382            .network_config(network_config)
5383            .persistences(persistence_options.clone())
5384            .build();
5385        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5386        let mut events = network.server.event_stream().await;
5387        let mut all_transactions = HashMap::new();
5388        let mut namespace_tx: HashMap<_, HashSet<_>> = HashMap::new();
5389
5390        // Submit transactions to namespaces 1 through 4
5391
5392        for namespace in 1..=4 {
5393            for _count in 0..namespace {
5394                let payload_len = rng.gen_range(4..=10);
5395                let payload: Vec<u8> = (0..payload_len).map(|_| rng.gen()).collect();
5396
5397                let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
5398
5399                client.connect(None).await;
5400
5401                let hash = client
5402                    .post("submit/submit")
5403                    .body_json(&txn)
5404                    .unwrap()
5405                    .send()
5406                    .await
5407                    .unwrap();
5408                assert_eq!(txn.commit(), hash);
5409
5410                // Wait for a Decide event containing transaction matching the one we sent
5411                wait_for_decide_on_handle(&mut events, &txn).await;
5412                // Store transaction for later validation
5413
5414                all_transactions.insert(txn.commit(), txn.clone());
5415                namespace_tx.entry(namespace).or_default().insert(txn);
5416            }
5417        }
5418
5419        let mut transactions = client
5420            .socket("availability/stream/transactions/0")
5421            .subscribe::<TransactionQueryData<SeqTypes>>()
5422            .await
5423            .expect("failed to subscribe to transactions endpoint");
5424
5425        let mut count = 0;
5426        while let Some(tx) = transactions.next().await {
5427            let tx = tx.unwrap();
5428            let expected = all_transactions
5429                .get(&tx.transaction().commit())
5430                .expect("txn not found ");
5431            assert_eq!(tx.transaction(), expected, "invalid transaction");
5432            count += 1;
5433
5434            if count == all_transactions.len() {
5435                break;
5436            }
5437        }
5438
5439        // Validate namespace-specific stream endpoint
5440
5441        for (namespace, expected_ns_txns) in &namespace_tx {
5442            let mut api_namespace_txns = client
5443                .socket(&format!(
5444                    "availability/stream/transactions/0/namespace/{namespace}",
5445                ))
5446                .subscribe::<TransactionQueryData<SeqTypes>>()
5447                .await
5448                .unwrap_or_else(|_| {
5449                    panic!("failed to subscribe to transactions namespace {namespace}")
5450                });
5451
5452            let mut received = HashSet::new();
5453
5454            while let Some(res) = api_namespace_txns.next().await {
5455                let tx = res.expect("stream error");
5456                received.insert(tx.transaction().clone());
5457
5458                if received.len() == expected_ns_txns.len() {
5459                    break;
5460                }
5461            }
5462
5463            assert_eq!(
5464                received, *expected_ns_txns,
5465                "Mismatched transactions for namespace {namespace}"
5466            );
5467        }
5468    }
5469
5470    #[rstest]
5471    #[case(PosVersionV3::new())]
5472    #[case(PosVersionV4::new())]
5473    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5474    async fn test_v3_and_v4_reward_tree_updates<Ver: Versions>(
5475        #[case] versions: Ver,
5476    ) -> anyhow::Result<()> {
5477        // This test checks that the correct merkle tree is updated based on version
5478        //
5479        // When the protocol version is v3:
5480        // - The v3 Merkle tree is updated
5481        // - The v4 Merkle tree must be empty.
5482        //
5483        // When the protocol version is v4:
5484        // - The v4 Merkle tree is updated
5485        // - The v3 Merkle tree must be empty.
5486        const EPOCH_HEIGHT: u64 = 10;
5487
5488        let network_config = TestConfigBuilder::default()
5489            .epoch_height(EPOCH_HEIGHT)
5490            .build();
5491
5492        let api_port = pick_unused_port().expect("No ports free for query service");
5493
5494        tracing::info!("API PORT = {api_port}");
5495        const NUM_NODES: usize = 5;
5496
5497        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5498        let persistence: [_; NUM_NODES] = storage
5499            .iter()
5500            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5501            .collect::<Vec<_>>()
5502            .try_into()
5503            .unwrap();
5504
5505        let config = TestNetworkConfigBuilder::with_num_nodes()
5506            .api_config(SqlDataSource::options(
5507                &storage[0],
5508                Options::with_port(api_port).catchup(Default::default()),
5509            ))
5510            .network_config(network_config)
5511            .persistences(persistence.clone())
5512            .catchups(std::array::from_fn(|_| {
5513                StatePeers::<StaticVersion<0, 1>>::from_urls(
5514                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5515                    Default::default(),
5516                    &NoMetrics,
5517                )
5518            }))
5519            .pos_hook::<Ver>(
5520                DelegationConfig::MultipleDelegators,
5521                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5522            )
5523            .await
5524            .unwrap()
5525            .build();
5526        let mut network = TestNetwork::new(config, versions).await;
5527
5528        let mut events = network.peers[2].event_stream().await;
5529        // wait for 4 epochs
5530        wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
5531
5532        let validated_state = network.server.decided_state().await;
5533        let version = Ver::Base::VERSION;
5534        if version == EpochVersion::VERSION {
5535            let v1_tree = &validated_state.reward_merkle_tree_v1;
5536            assert!(v1_tree.num_leaves() > 0, "v1 reward tree tree is empty");
5537            let v2_tree = &validated_state.reward_merkle_tree_v2;
5538            assert!(
5539                v2_tree.num_leaves() == 0,
5540                "v2 reward tree tree is not empty"
5541            );
5542        } else {
5543            let v1_tree = &validated_state.reward_merkle_tree_v1;
5544            assert!(
5545                v1_tree.num_leaves() == 0,
5546                "v1 reward tree tree is not empty"
5547            );
5548            let v2_tree = &validated_state.reward_merkle_tree_v2;
5549            assert!(v2_tree.num_leaves() > 0, "v2 reward tree tree is empty");
5550        }
5551
5552        network.stop_consensus().await;
5553        Ok(())
5554    }
5555
5556    #[rstest]
5557    #[case(PosVersionV3::new())]
5558    #[case(PosVersionV4::new())]
5559    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5560
5561    pub(crate) async fn test_state_cert_query<Ver: Versions>(#[case] versions: Ver) {
5562        const TEST_EPOCH_HEIGHT: u64 = 10;
5563        const TEST_EPOCHS: u64 = 5;
5564
5565        let network_config = TestConfigBuilder::default()
5566            .epoch_height(TEST_EPOCH_HEIGHT)
5567            .build();
5568
5569        let api_port = pick_unused_port().expect("No ports free for query service");
5570
5571        tracing::info!("API PORT = {api_port}");
5572        const NUM_NODES: usize = 2;
5573
5574        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5575        let persistence: [_; NUM_NODES] = storage
5576            .iter()
5577            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5578            .collect::<Vec<_>>()
5579            .try_into()
5580            .unwrap();
5581
5582        let config = TestNetworkConfigBuilder::with_num_nodes()
5583            .api_config(SqlDataSource::options(
5584                &storage[0],
5585                Options::with_port(api_port).catchup(Default::default()),
5586            ))
5587            .network_config(network_config)
5588            .persistences(persistence.clone())
5589            .catchups(std::array::from_fn(|_| {
5590                StatePeers::<StaticVersion<0, 1>>::from_urls(
5591                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5592                    Default::default(),
5593                    &NoMetrics,
5594                )
5595            }))
5596            .pos_hook::<Ver>(
5597                DelegationConfig::MultipleDelegators,
5598                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5599            )
5600            .await
5601            .unwrap()
5602            .build();
5603
5604        let network = TestNetwork::new(config, versions).await;
5605        let mut events = network.server.event_stream().await;
5606
5607        // Wait until 5 epochs have passed.
5608        loop {
5609            let event = events.next().await.unwrap();
5610            tracing::info!("Received event from handle: {event:?}");
5611
5612            if let hotshot::types::EventType::Decide { leaf_chain, .. } = event.event {
5613                println!(
5614                    "Decide event received: {:?}",
5615                    leaf_chain.first().unwrap().leaf.height()
5616                );
5617                if let Some(first_leaf) = leaf_chain.first() {
5618                    let height = first_leaf.leaf.height();
5619                    tracing::info!("Decide event received at height: {height}");
5620
5621                    if height >= TEST_EPOCHS * TEST_EPOCH_HEIGHT {
5622                        break;
5623                    }
5624                }
5625            }
5626        }
5627
5628        // Connect client.
5629        let client: Client<ServerError, StaticVersion<0, 1>> =
5630            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5631        client.connect(Some(Duration::from_secs(10))).await;
5632
5633        // Get the state cert for the epoch 3 to 5
5634        for i in 3..=TEST_EPOCHS {
5635            // v2
5636            let state_query_data_v2 = client
5637                .get::<StateCertQueryDataV2<SeqTypes>>(&format!("availability/state-cert-v2/{i}"))
5638                .send()
5639                .await
5640                .unwrap();
5641            let state_cert_v2 = state_query_data_v2.0.clone();
5642            tracing::info!("state_cert_v2: {state_cert_v2:?}");
5643            assert_eq!(state_cert_v2.epoch.u64(), i);
5644            assert_eq!(
5645                state_cert_v2.light_client_state.block_height,
5646                i * TEST_EPOCH_HEIGHT - 5
5647            );
5648            let block_height = state_cert_v2.light_client_state.block_height;
5649
5650            let header: Header = client
5651                .get(&format!("availability/header/{block_height}"))
5652                .send()
5653                .await
5654                .unwrap();
5655
5656            // verify auth root if the consensus version is v4
5657            if header.version() == DrbAndHeaderUpgradeVersion::VERSION {
5658                let auth_root = state_cert_v2.auth_root;
5659                let header_auth_root = header.auth_root().unwrap();
5660                if auth_root.is_zero() || header_auth_root.is_zero() {
5661                    panic!("auth root shouldn't be zero");
5662                }
5663
5664                assert_eq!(auth_root, header_auth_root, "auth root mismatch");
5665            }
5666
5667            // v1
5668            let state_query_data_v1 = client
5669                .get::<StateCertQueryDataV1<SeqTypes>>(&format!("availability/state-cert/{i}"))
5670                .send()
5671                .await
5672                .unwrap();
5673
5674            let state_cert_v1 = state_query_data_v1.0.clone();
5675            tracing::info!("state_cert_v1: {state_cert_v1:?}");
5676            assert_eq!(state_query_data_v1, state_query_data_v2.into());
5677        }
5678    }
5679}