sequencer/
api.rs

1use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
2
3use alloy::primitives::{Address, U256};
4use anyhow::{bail, ensure, Context};
5use async_lock::RwLock;
6use async_once_cell::Lazy;
7use async_trait::async_trait;
8use committable::Commitment;
9use data_source::{
10    CatchupDataSource, RequestResponseDataSource, StakeTableDataSource, StakeTableWithEpochNumber,
11    StateCertDataSource, StateCertFetchingDataSource, SubmitDataSource,
12};
13use derivative::Derivative;
14use espresso_types::{
15    config::PublicNetworkConfig,
16    retain_accounts,
17    traits::EventsPersistenceRead,
18    v0::traits::{SequencerPersistence, StateCatchup},
19    v0_3::{
20        ChainConfig, RewardAccountQueryDataV1, RewardAccountV1, RewardAmount, RewardMerkleTreeV1,
21        StakeTableEvent, Validator,
22    },
23    v0_4::{RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2},
24    AccountQueryData, BlockMerkleTree, FeeAccount, FeeMerkleTree, Leaf2, NodeState, PubKey,
25    Transaction, ValidatorMap,
26};
27use futures::{
28    future::{BoxFuture, Future, FutureExt},
29    stream::BoxStream,
30};
31use hotshot_contract_adapter::sol_types::{EspToken, StakeTableV2};
32use hotshot_events_service::events_source::{
33    EventFilterSet, EventsSource, EventsStreamer, StartupInfo,
34};
35use hotshot_query_service::{availability::VidCommonQueryData, data_source::ExtensibleDataSource};
36use hotshot_types::{
37    data::{EpochNumber, VidCommitment, VidCommon, VidShare, ViewNumber},
38    event::{Event, LegacyEvent},
39    light_client::LCV3StateSignatureRequestBody,
40    network::NetworkConfig,
41    simple_certificate::LightClientStateUpdateCertificateV2,
42    traits::{
43        election::Membership,
44        network::ConnectedNetwork,
45        node_implementation::{ConsensusTime, NodeType, Versions},
46    },
47    vid::avidm::{init_avidm_param, AvidMScheme},
48    vote::HasViewNumber,
49    PeerConfig,
50};
51use itertools::Itertools;
52use jf_merkle_tree_compat::MerkleTreeScheme;
53use moka::future::Cache;
54use rand::Rng;
55use request_response::RequestType;
56use tokio::time::timeout;
57
58use self::data_source::{HotShotConfigDataSource, NodeStateDataSource, StateSignatureDataSource};
59use crate::{
60    api::data_source::TokenDataSource,
61    catchup::{
62        add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
63        add_v2_reward_accounts_to_state, CatchupStorage,
64    },
65    context::Consensus,
66    request_response::{
67        data_source::{retain_v1_reward_accounts, retain_v2_reward_accounts},
68        request::{Request, Response},
69    },
70    state_cert::{validate_state_cert, StateCertFetchError},
71    state_signature::StateSigner,
72    SeqTypes, SequencerApiVersion, SequencerContext,
73};
74
75pub mod data_source;
76pub mod endpoints;
77pub mod fs;
78pub mod light_client;
79pub mod options;
80pub mod sql;
81mod update;
82
83pub use options::Options;
84
85pub type BlocksFrontier = <BlockMerkleTree as MerkleTreeScheme>::MembershipProof;
86
87type BoxLazy<T> = Pin<Arc<Lazy<T, BoxFuture<'static, T>>>>;
88
89#[derive(Derivative)]
90#[derivative(Clone(bound = ""), Debug(bound = ""))]
91struct ApiState<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> {
92    // The consensus state is initialized lazily so we can start the API (and healthcheck endpoints)
93    // before consensus has started. Any endpoint that uses consensus state will wait for
94    // initialization to finish, but endpoints that do not require a consensus handle can proceed
95    // without waiting.
96    #[derivative(Debug = "ignore")]
97    sequencer_context: BoxLazy<SequencerContext<N, P, V>>,
98
99    // we cache `token_contract_address` for the lifetime of the program, since we do not expect this to ever change
100    token_contract_address: Cache<(), Address>,
101
102    // we cache `token_supply` for up to an hour, to avoid repeatedly querying the contract for information that rarely changes
103    token_supply: Cache<(), U256>,
104}
105
106impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> ApiState<N, P, V> {
107    fn new(context_init: impl Future<Output = SequencerContext<N, P, V>> + Send + 'static) -> Self {
108        Self {
109            sequencer_context: Arc::pin(Lazy::from_future(context_init.boxed())),
110            token_contract_address: Cache::builder().max_capacity(1).build(),
111            token_supply: Cache::builder()
112                .max_capacity(1)
113                .time_to_live(Duration::from_secs(3600))
114                .build(),
115        }
116    }
117
118    async fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
119        self.sequencer_context
120            .as_ref()
121            .get()
122            .await
123            .get_ref()
124            .state_signer()
125    }
126
127    async fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
128        self.sequencer_context
129            .as_ref()
130            .get()
131            .await
132            .get_ref()
133            .event_streamer()
134    }
135
136    async fn consensus(&self) -> Arc<RwLock<Consensus<N, P, V>>> {
137        self.sequencer_context
138            .as_ref()
139            .get()
140            .await
141            .get_ref()
142            .consensus()
143    }
144
145    async fn network_config(&self) -> NetworkConfig<SeqTypes> {
146        self.sequencer_context
147            .as_ref()
148            .get()
149            .await
150            .get_ref()
151            .network_config()
152    }
153}
154
155type StorageState<N, P, D, V> = ExtensibleDataSource<D, ApiState<N, P, V>>;
156
157#[async_trait]
158impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> EventsSource<SeqTypes>
159    for ApiState<N, P, V>
160{
161    type EventStream = BoxStream<'static, Arc<Event<SeqTypes>>>;
162    type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<SeqTypes>>>;
163
164    async fn get_event_stream(
165        &self,
166        _filter: Option<EventFilterSet<SeqTypes>>,
167    ) -> Self::EventStream {
168        self.event_streamer()
169            .await
170            .read()
171            .await
172            .get_event_stream(None)
173            .await
174    }
175
176    async fn get_legacy_event_stream(
177        &self,
178        _filter: Option<EventFilterSet<SeqTypes>>,
179    ) -> Self::LegacyEventStream {
180        self.event_streamer()
181            .await
182            .read()
183            .await
184            .get_legacy_event_stream(None)
185            .await
186    }
187
188    async fn get_startup_info(&self) -> StartupInfo<SeqTypes> {
189        self.event_streamer()
190            .await
191            .read()
192            .await
193            .get_startup_info()
194            .await
195    }
196}
197
198impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, V: Versions, P: SequencerPersistence>
199    TokenDataSource<SeqTypes> for StorageState<N, P, D, V>
200{
201    async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
202        self.as_ref().get_total_supply_l1().await
203    }
204}
205
206impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, V: Versions, P: SequencerPersistence>
207    SubmitDataSource<N, P> for StorageState<N, P, D, V>
208{
209    async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
210        self.as_ref().submit(tx).await
211    }
212}
213
214impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
215    StakeTableDataSource<SeqTypes> for StorageState<N, P, D, V>
216{
217    /// Get the stake table for a given epoch
218    async fn get_stake_table(
219        &self,
220        epoch: Option<<SeqTypes as NodeType>::Epoch>,
221    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
222        self.as_ref().get_stake_table(epoch).await
223    }
224
225    /// Get the stake table for the current epoch if not provided
226    async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
227        self.as_ref().get_stake_table_current().await
228    }
229
230    /// Get the DA stake table for a given epoch
231    async fn get_da_stake_table(
232        &self,
233        epoch: Option<<SeqTypes as NodeType>::Epoch>,
234    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
235        self.as_ref().get_da_stake_table(epoch).await
236    }
237
238    /// Get the DA stake table for the current epoch if not provided
239    async fn get_da_stake_table_current(
240        &self,
241    ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
242        self.as_ref().get_da_stake_table_current().await
243    }
244
245    /// Get all the validators
246    async fn get_validators(
247        &self,
248        epoch: <SeqTypes as NodeType>::Epoch,
249    ) -> anyhow::Result<ValidatorMap> {
250        self.as_ref().get_validators(epoch).await
251    }
252
253    async fn get_block_reward(
254        &self,
255        epoch: Option<EpochNumber>,
256    ) -> anyhow::Result<Option<RewardAmount>> {
257        self.as_ref().get_block_reward(epoch).await
258    }
259    /// Get all the validator participation for the current epoch
260    async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
261        self.as_ref().current_proposal_participation().await
262    }
263    async fn previous_proposal_participation(&self) -> HashMap<PubKey, f64> {
264        self.as_ref().previous_proposal_participation().await
265    }
266
267    async fn get_all_validators(
268        &self,
269        epoch: <SeqTypes as NodeType>::Epoch,
270        offset: u64,
271        limit: u64,
272    ) -> anyhow::Result<Vec<Validator<PubKey>>> {
273        self.as_ref().get_all_validators(epoch, offset, limit).await
274    }
275
276    async fn stake_table_events(
277        &self,
278        from_l1_block: u64,
279        to_l1_block: u64,
280    ) -> anyhow::Result<Vec<StakeTableEvent>> {
281        self.as_ref()
282            .stake_table_events(from_l1_block, to_l1_block)
283            .await
284    }
285}
286
287impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> TokenDataSource<SeqTypes>
288    for ApiState<N, P, V>
289{
290    async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
291        match self.token_supply.get(&()).await {
292            Some(supply) => Ok(supply),
293            None => match self.token_contract_address.get(&()).await {
294                Some(token_contract_address) => {
295                    let node_state = self.sequencer_context.as_ref().get().await.node_state();
296                    let provider = node_state.l1_client.provider;
297
298                    let token = EspToken::new(token_contract_address, provider.clone());
299
300                    let supply = token
301                        .totalSupply()
302                        .call()
303                        .await
304                        .context("Failed to retrieve totalSupply from the contract")?;
305
306                    self.token_supply.insert((), supply).await;
307
308                    Ok(supply)
309                },
310                None => {
311                    let node_state = self.sequencer_context.as_ref().get().await.node_state();
312                    let stake_table_address = node_state
313                        .chain_config
314                        .stake_table_contract
315                        .context("No stake table contract in chain config")?;
316
317                    let provider = node_state.l1_client.provider;
318
319                    let stake_table = StakeTableV2::new(stake_table_address, provider.clone());
320                    let token_contract_address = stake_table.token().call().await?;
321
322                    self.token_contract_address
323                        .insert((), token_contract_address)
324                        .await;
325
326                    let token = EspToken::new(token_contract_address, provider.clone());
327
328                    let supply = token
329                        .totalSupply()
330                        .call()
331                        .await
332                        .context("Failed to retrieve totalSupply from the contract")?;
333
334                    self.token_supply.insert((), supply).await;
335
336                    Ok(supply)
337                },
338            },
339        }
340    }
341}
342
343impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
344    StakeTableDataSource<SeqTypes> for ApiState<N, P, V>
345{
346    /// Get the stake table for a given epoch
347    async fn get_stake_table(
348        &self,
349        epoch: Option<<SeqTypes as NodeType>::Epoch>,
350    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
351        let highest_epoch = self
352            .consensus()
353            .await
354            .read()
355            .await
356            .cur_epoch()
357            .await
358            .map(|e| e + 1);
359        if epoch > highest_epoch {
360            return Err(anyhow::anyhow!(
361                "requested stake table for epoch {epoch:?} is beyond the current epoch + 1 \
362                 {highest_epoch:?}"
363            ));
364        }
365        let mem = self
366            .consensus()
367            .await
368            .read()
369            .await
370            .membership_coordinator
371            .stake_table_for_epoch(epoch)
372            .await?;
373
374        Ok(mem.stake_table().await.0)
375    }
376
377    /// Get the stake table for the current epoch and return it along with the epoch number
378    async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
379        let epoch = self.consensus().await.read().await.cur_epoch().await;
380
381        Ok(StakeTableWithEpochNumber {
382            epoch,
383            stake_table: self.get_stake_table(epoch).await?,
384        })
385    }
386
387    /// Get the DA stake table for a given epoch
388    async fn get_da_stake_table(
389        &self,
390        epoch: Option<<SeqTypes as NodeType>::Epoch>,
391    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
392        Ok(self
393            .consensus()
394            .await
395            .read()
396            .await
397            .membership_coordinator
398            .membership()
399            .read()
400            .await
401            .da_stake_table(epoch)
402            .0)
403    }
404
405    /// Get the DA stake table for the current epoch and return it along with the epoch number
406    async fn get_da_stake_table_current(
407        &self,
408    ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
409        let epoch = self.consensus().await.read().await.cur_epoch().await;
410
411        Ok(StakeTableWithEpochNumber {
412            epoch,
413            stake_table: self.get_da_stake_table(epoch).await?,
414        })
415    }
416
417    async fn get_block_reward(
418        &self,
419        epoch: Option<EpochNumber>,
420    ) -> anyhow::Result<Option<RewardAmount>> {
421        let coordinator = self
422            .consensus()
423            .await
424            .read()
425            .await
426            .membership_coordinator
427            .clone();
428
429        let membership = coordinator.membership().read().await;
430        let block_reward = match epoch {
431            None => membership.fixed_block_reward(),
432            Some(e) => membership.epoch_block_reward(e),
433        };
434
435        Ok(block_reward)
436    }
437
438    /// Get the whole validators map
439    async fn get_validators(
440        &self,
441        epoch: <SeqTypes as NodeType>::Epoch,
442    ) -> anyhow::Result<ValidatorMap> {
443        let mem = self
444            .consensus()
445            .await
446            .read()
447            .await
448            .membership_coordinator
449            .membership_for_epoch(Some(epoch))
450            .await
451            .context("membership not found")?;
452
453        let r = mem.coordinator.membership().read().await;
454        r.active_validators(&epoch)
455    }
456
457    /// Get the current proposal participation.
458    async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
459        self.consensus()
460            .await
461            .read()
462            .await
463            .consensus()
464            .read()
465            .await
466            .current_proposal_participation()
467    }
468
469    /// Get the previous proposal participation.
470    async fn previous_proposal_participation(&self) -> HashMap<PubKey, f64> {
471        self.consensus()
472            .await
473            .read()
474            .await
475            .consensus()
476            .read()
477            .await
478            .previous_proposal_participation()
479    }
480
481    async fn get_all_validators(
482        &self,
483        epoch: <SeqTypes as NodeType>::Epoch,
484        offset: u64,
485        limit: u64,
486    ) -> anyhow::Result<Vec<Validator<PubKey>>> {
487        let handle = self.consensus().await;
488        let handle_read = handle.read().await;
489        let storage = handle_read.storage();
490        storage.load_all_validators(epoch, offset, limit).await
491    }
492
493    async fn stake_table_events(
494        &self,
495        from_l1_block: u64,
496        to_l1_block: u64,
497    ) -> anyhow::Result<Vec<StakeTableEvent>> {
498        let handle = self.consensus().await;
499        let handle_read = handle.read().await;
500        let storage = handle_read.storage();
501        let (status, events) = storage.load_events(from_l1_block, to_l1_block).await?;
502        ensure!(
503            status == Some(EventsPersistenceRead::Complete),
504            "some events in range [{from_l1_block}, {to_l1_block}] are not available ({status:?})"
505        );
506        Ok(events.into_iter().map(|(_, event)| event).collect())
507    }
508}
509
510impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
511    RequestResponseDataSource<SeqTypes> for StorageState<N, P, D, V>
512{
513    async fn request_vid_shares(
514        &self,
515        block_number: u64,
516        vid_common_data: VidCommonQueryData<SeqTypes>,
517        timeout_duration: Duration,
518    ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
519        self.as_ref()
520            .request_vid_shares(block_number, vid_common_data, timeout_duration)
521            .await
522    }
523}
524
525#[async_trait]
526impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
527    StateCertFetchingDataSource<SeqTypes> for StorageState<N, P, D, V>
528{
529    async fn request_state_cert(
530        &self,
531        epoch: u64,
532        timeout: Duration,
533    ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
534        self.as_ref().request_state_cert(epoch, timeout).await
535    }
536}
537
538impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
539    RequestResponseDataSource<SeqTypes> for ApiState<N, P, V>
540{
541    async fn request_vid_shares(
542        &self,
543        block_number: u64,
544        vid_common_data: VidCommonQueryData<SeqTypes>,
545        duration: Duration,
546    ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
547        // Get a handle to the request response protocol
548        let request_response_protocol = self
549            .sequencer_context
550            .as_ref()
551            .get()
552            .await
553            .request_response_protocol
554            .clone();
555
556        async move {
557            // Get the total VID weight based on the VID common data
558            let total_weight = match vid_common_data.common() {
559                VidCommon::V0(_) => {
560                    // TODO: This needs to be done via the stake table
561                    return Err(anyhow::anyhow!(
562                        "V0 total weight calculation not supported yet"
563                    ));
564                },
565                VidCommon::V1(v1) => v1.total_weights,
566                VidCommon::V2(v2) => v2.param.total_weights,
567            };
568
569            // Create the AvidM parameters from the total weight
570            let avidm_param = init_avidm_param(total_weight)
571                .with_context(|| "failed to initialize avidm param")?;
572
573            // Get the payload hash for verification
574            let VidCommitment::V1(local_payload_hash) = vid_common_data.payload_hash() else {
575                bail!("V0 share verification not supported yet");
576            };
577
578            // Create a random request id
579            let request_id = rand::thread_rng().gen();
580
581            // Request and verify the shares from all other nodes, timing out after `duration` seconds
582            let received_shares = Arc::new(parking_lot::Mutex::new(Vec::new()));
583            let received_shares_clone = received_shares.clone();
584            let request_result: anyhow::Result<_, _> = timeout(
585                duration,
586                request_response_protocol.request_indefinitely::<_, _, _>(
587                    Request::VidShare(block_number, request_id),
588                    RequestType::Broadcast,
589                    move |_request, response| {
590                        let avidm_param = avidm_param.clone();
591                        let received_shares = received_shares_clone.clone();
592                        async move {
593                            // Make sure the response was a V1 share
594                            let Response::VidShare(VidShare::V1(received_share)) = response else {
595                                bail!("V0 share verification not supported yet");
596                            };
597
598                            // Verify the share
599                            let Ok(Ok(_)) = AvidMScheme::verify_share(
600                                &avidm_param,
601                                &local_payload_hash,
602                                &received_share,
603                            ) else {
604                                bail!("share verification failed");
605                            };
606
607                            // Add the share to the list of received shares
608                            received_shares.lock().push(received_share);
609
610                            bail!("waiting for more shares");
611
612                            #[allow(unreachable_code)]
613                            Ok(())
614                        }
615                    },
616                ),
617            )
618            .await;
619
620            // If the request timed out, return the shares we have collected so far
621            match request_result {
622                Err(_) => {
623                    // If it timed out, this was successful. Return the shares we have collected so far
624                    Ok(received_shares
625                        .lock()
626                        .clone()
627                        .into_iter()
628                        .map(VidShare::V1)
629                        .collect())
630                },
631
632                // If it was an error from the inner request, return that error
633                Ok(Err(e)) => Err(e).with_context(|| "failed to request vid shares"),
634
635                // If it was successful, this was unexpected.
636                Ok(Ok(_)) => bail!("this should not be possible"),
637            }
638        }
639        .boxed()
640    }
641}
642
643#[async_trait]
644impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
645    StateCertFetchingDataSource<SeqTypes> for ApiState<N, P, V>
646{
647    async fn request_state_cert(
648        &self,
649        epoch: u64,
650        timeout: Duration,
651    ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
652        tracing::info!("fetching state certificate for epoch={epoch}");
653        let consensus = self.consensus().await;
654        let consensus_read = consensus.read().await;
655
656        let current_epoch = consensus_read.cur_epoch().await;
657
658        // // The highest epoch we can have a state certificate for is current_epoch + 1
659        // // Check if requested epoch is beyond the highest possible epoch
660        let highest_epoch = current_epoch.map(|e| e.u64() + 1);
661
662        if Some(epoch) > highest_epoch {
663            return Err(StateCertFetchError::Other(anyhow::anyhow!(
664                "requested state certificate for epoch {epoch} is beyond the highest possible \
665                 epoch {highest_epoch:?}"
666            )));
667        }
668
669        // Get the stake table for validation
670        let coordinator = consensus_read.membership_coordinator.clone();
671        if let Err(err) = coordinator
672            .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
673            .await
674        {
675            tracing::warn!(
676                "Failed to get membership for epoch {epoch}: {err:#}. Waiting for catchup"
677            );
678
679            coordinator
680                .wait_for_catchup(EpochNumber::new(epoch))
681                .await
682                .map_err(|e| {
683                    StateCertFetchError::Other(
684                        anyhow::Error::new(e)
685                            .context(format!("failed to catch up for stake table epoch={epoch}")),
686                    )
687                })?;
688        }
689
690        let membership = coordinator
691            .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
692            .await
693            .map_err(|e| {
694                StateCertFetchError::Other(
695                    anyhow::Error::new(e)
696                        .context(format!("failed to get stake table for epoch={epoch}")),
697                )
698            })?;
699
700        let stake_table = membership.stake_table().await;
701
702        drop(consensus_read);
703        drop(consensus);
704
705        let state_catchup = self
706            .sequencer_context
707            .as_ref()
708            .get()
709            .await
710            .node_state()
711            .state_catchup
712            .clone();
713
714        let result = tokio::time::timeout(timeout, state_catchup.fetch_state_cert(epoch)).await;
715
716        match result {
717            Err(_) => Err(StateCertFetchError::FetchError(anyhow::anyhow!(
718                "timeout while fetching state cert for epoch {epoch}"
719            ))),
720            Ok(Ok(cert)) => {
721                // Validation errors should be mapped to ValidationError
722                validate_state_cert(&cert, &stake_table).map_err(|e| {
723                    StateCertFetchError::ValidationError(e.context(format!(
724                        "state certificate validation failed for epoch={epoch}"
725                    )))
726                })?;
727
728                tracing::info!("fetched and validated state certificate for epoch {epoch}");
729                Ok(cert)
730            },
731            Ok(Err(e)) => Err(StateCertFetchError::FetchError(
732                e.context(format!("failed to fetch state cert for epoch {epoch}")),
733            )),
734        }
735    }
736}
737
738// Thin wrapper implementations that delegate to persistence
739#[async_trait]
740impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence> StateCertDataSource
741    for StorageState<N, P, D, V>
742{
743    async fn get_state_cert_by_epoch(
744        &self,
745        epoch: u64,
746    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
747        self.as_ref().get_state_cert_by_epoch(epoch).await
748    }
749
750    async fn insert_state_cert(
751        &self,
752        epoch: u64,
753        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
754    ) -> anyhow::Result<()> {
755        self.as_ref().insert_state_cert(epoch, cert).await
756    }
757}
758
759#[async_trait]
760impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> StateCertDataSource
761    for ApiState<N, P, V>
762{
763    async fn get_state_cert_by_epoch(
764        &self,
765        epoch: u64,
766    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
767        let consensus = self.consensus().await;
768        let consensus_lock = consensus.read().await;
769        let persistence = consensus_lock.storage();
770
771        persistence.get_state_cert_by_epoch(epoch).await
772    }
773
774    async fn insert_state_cert(
775        &self,
776        epoch: u64,
777        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
778    ) -> anyhow::Result<()> {
779        let consensus = self.consensus().await;
780        let consensus_lock = consensus.read().await;
781        let persistence = consensus_lock.storage();
782
783        persistence.insert_state_cert(epoch, cert).await
784    }
785}
786
787impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> SubmitDataSource<N, P>
788    for ApiState<N, P, V>
789{
790    async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
791        let handle = self.consensus().await;
792
793        let consensus_read_lock = handle.read().await;
794
795        // Fetch full chain config from the validated state, if present.
796        // This is necessary because we support chain config upgrades,
797        // so the updated chain config is found in the validated state.
798        let cf = consensus_read_lock
799            .decided_state()
800            .await
801            .chain_config
802            .resolve();
803
804        // Use the chain config from the validated state if available,
805        // otherwise, use the node state's chain config
806        // The node state's chain config is the node's base version chain config
807        let cf = match cf {
808            Some(cf) => cf,
809            None => self.node_state().await.chain_config,
810        };
811
812        let max_block_size: u64 = cf.max_block_size.into();
813        let txn_size = tx.payload().len() as u64;
814
815        // reject transaction bigger than block size
816        if txn_size > max_block_size {
817            bail!("transaction size ({txn_size}) is greater than max_block_size ({max_block_size})")
818        }
819
820        consensus_read_lock.submit_transaction(tx).await?;
821        Ok(())
822    }
823}
824
825impl<N, P, D, V> NodeStateDataSource for StorageState<N, P, D, V>
826where
827    N: ConnectedNetwork<PubKey>,
828    V: Versions,
829    P: SequencerPersistence,
830    D: Sync,
831{
832    async fn node_state(&self) -> NodeState {
833        self.as_ref().node_state().await
834    }
835}
836
837impl<
838        N: ConnectedNetwork<PubKey>,
839        V: Versions,
840        P: SequencerPersistence,
841        D: CatchupStorage + Send + Sync,
842    > CatchupDataSource for StorageState<N, P, D, V>
843{
844    #[tracing::instrument(skip(self, instance))]
845    async fn get_accounts(
846        &self,
847        instance: &NodeState,
848        height: u64,
849        view: ViewNumber,
850        accounts: &[FeeAccount],
851    ) -> anyhow::Result<FeeMerkleTree> {
852        // Check if we have the desired state in memory.
853        match self
854            .as_ref()
855            .get_accounts(instance, height, view, accounts)
856            .await
857        {
858            Ok(accounts) => return Ok(accounts),
859            Err(err) => {
860                tracing::info!("accounts not in memory, trying storage: {err:#}");
861            },
862        }
863
864        // Try storage.
865        let (tree, leaf) = self
866            .inner()
867            .get_accounts(instance, height, view, accounts)
868            .await
869            .context("accounts not in memory, and could not fetch from storage")?;
870        // If we successfully fetched accounts from storage, try to add them back into the in-memory
871        // state.
872
873        let consensus = self
874            .as_ref()
875            .consensus()
876            .await
877            .read()
878            .await
879            .consensus()
880            .clone();
881        if let Err(err) =
882            add_fee_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf).await
883        {
884            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
885        }
886        tracing::info!(?view, "updated with fetched account state");
887
888        Ok(tree)
889    }
890
891    #[tracing::instrument(skip(self, instance))]
892    async fn get_frontier(
893        &self,
894        instance: &NodeState,
895        height: u64,
896        view: ViewNumber,
897    ) -> anyhow::Result<BlocksFrontier> {
898        // Check if we have the desired state in memory.
899        match self.as_ref().get_frontier(instance, height, view).await {
900            Ok(frontier) => return Ok(frontier),
901            Err(err) => {
902                tracing::info!("frontier is not in memory, trying storage: {err:#}");
903            },
904        }
905
906        // Try storage.
907        self.inner().get_frontier(instance, height, view).await
908    }
909
910    async fn get_chain_config(
911        &self,
912        commitment: Commitment<ChainConfig>,
913    ) -> anyhow::Result<ChainConfig> {
914        // Check if we have the desired state in memory.
915        match self.as_ref().get_chain_config(commitment).await {
916            Ok(cf) => return Ok(cf),
917            Err(err) => {
918                tracing::info!("chain config is not in memory, trying storage: {err:#}");
919            },
920        }
921
922        // Try storage.
923        self.inner().get_chain_config(commitment).await
924    }
925    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
926        // Check if we have the desired state in memory.
927        match self.as_ref().get_leaf_chain(height).await {
928            Ok(cf) => return Ok(cf),
929            Err(err) => {
930                tracing::info!("leaf chain is not in memory, trying storage: {err:#}");
931            },
932        }
933
934        // Try storage.
935        self.inner().get_leaf_chain(height).await
936    }
937
938    #[tracing::instrument(skip(self, instance))]
939    async fn get_reward_accounts_v2(
940        &self,
941        instance: &NodeState,
942        height: u64,
943        view: ViewNumber,
944        accounts: &[RewardAccountV2],
945    ) -> anyhow::Result<RewardMerkleTreeV2> {
946        // Check if we have the desired state in memory.
947        match self
948            .as_ref()
949            .get_reward_accounts_v2(instance, height, view, accounts)
950            .await
951        {
952            Ok(accounts) => return Ok(accounts),
953            Err(err) => {
954                tracing::info!("reward accounts not in memory, trying storage: {err:#}");
955            },
956        }
957
958        // Try storage.
959        let (tree, leaf) = self
960            .inner()
961            .get_reward_accounts_v2(instance, height, view, accounts)
962            .await
963            .context("accounts not in memory, and could not fetch from storage")?;
964
965        // If we successfully fetched accounts from storage, try to add them back into the in-memory
966        // state.
967        let consensus = self
968            .as_ref()
969            .consensus()
970            .await
971            .read()
972            .await
973            .consensus()
974            .clone();
975        if let Err(err) =
976            add_v2_reward_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf)
977                .await
978        {
979            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
980        }
981        tracing::info!(?view, "updated with fetched account state");
982
983        Ok(tree)
984    }
985
986    async fn get_all_reward_accounts(
987        &self,
988        height: u64,
989        offset: u64,
990        limit: u64,
991    ) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
992        self.inner()
993            .get_all_reward_accounts(height, offset, limit)
994            .await
995    }
996
997    #[tracing::instrument(skip(self, instance))]
998    async fn get_reward_accounts_v1(
999        &self,
1000        instance: &NodeState,
1001        height: u64,
1002        view: ViewNumber,
1003        accounts: &[RewardAccountV1],
1004    ) -> anyhow::Result<RewardMerkleTreeV1> {
1005        // Check if we have the desired state in memory.
1006        match self
1007            .as_ref()
1008            .get_reward_accounts_v1(instance, height, view, accounts)
1009            .await
1010        {
1011            Ok(accounts) => return Ok(accounts),
1012            Err(err) => {
1013                tracing::info!("reward accounts not in memory, trying storage: {err:#}");
1014            },
1015        }
1016
1017        // Try storage.
1018        let (tree, leaf) = self
1019            .inner()
1020            .get_reward_accounts_v1(instance, height, view, accounts)
1021            .await
1022            .context("accounts not in memory, and could not fetch from storage")?;
1023
1024        // If we successfully fetched accounts from storage, try to add them back into the in-memory
1025        // state.
1026        let consensus = self
1027            .as_ref()
1028            .consensus()
1029            .await
1030            .read()
1031            .await
1032            .consensus()
1033            .clone();
1034        if let Err(err) =
1035            add_v1_reward_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf)
1036                .await
1037        {
1038            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
1039        }
1040        tracing::info!(?view, "updated with fetched account state");
1041
1042        Ok(tree)
1043    }
1044
1045    #[tracing::instrument(skip(self))]
1046    async fn get_state_cert(
1047        &self,
1048        epoch: u64,
1049    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1050        let consensus = self.as_ref().consensus().await;
1051        let consensus_lock = consensus.read().await;
1052        let persistence = consensus_lock.storage();
1053
1054        persistence
1055            .get_state_cert_by_epoch(epoch)
1056            .await?
1057            .context(format!("state cert for epoch {epoch} not found"))
1058    }
1059}
1060
1061impl<N, V, P> NodeStateDataSource for ApiState<N, P, V>
1062where
1063    N: ConnectedNetwork<PubKey>,
1064    V: Versions,
1065    P: SequencerPersistence,
1066{
1067    async fn node_state(&self) -> NodeState {
1068        self.sequencer_context.as_ref().get().await.node_state()
1069    }
1070}
1071
1072impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> CatchupDataSource
1073    for ApiState<N, P, V>
1074{
1075    #[tracing::instrument(skip(self, _instance))]
1076    async fn get_accounts(
1077        &self,
1078        _instance: &NodeState,
1079        height: u64,
1080        view: ViewNumber,
1081        accounts: &[FeeAccount],
1082    ) -> anyhow::Result<FeeMerkleTree> {
1083        let state = self
1084            .consensus()
1085            .await
1086            .read()
1087            .await
1088            .state(view)
1089            .await
1090            .context(format!(
1091                "state not available for height {height}, view {view}"
1092            ))?;
1093        retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
1094    }
1095
1096    #[tracing::instrument(skip(self, _instance))]
1097    async fn get_frontier(
1098        &self,
1099        _instance: &NodeState,
1100        height: u64,
1101        view: ViewNumber,
1102    ) -> anyhow::Result<BlocksFrontier> {
1103        let state = self
1104            .consensus()
1105            .await
1106            .read()
1107            .await
1108            .state(view)
1109            .await
1110            .context(format!(
1111                "state not available for height {height}, view {view}"
1112            ))?;
1113        let tree = &state.block_merkle_tree;
1114        let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
1115        Ok(frontier)
1116    }
1117
1118    async fn get_chain_config(
1119        &self,
1120        commitment: Commitment<ChainConfig>,
1121    ) -> anyhow::Result<ChainConfig> {
1122        let state = self.consensus().await.read().await.decided_state().await;
1123        let chain_config = state.chain_config;
1124
1125        if chain_config.commit() == commitment {
1126            chain_config.resolve().context("chain config found")
1127        } else {
1128            bail!("chain config not found")
1129        }
1130    }
1131
1132    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
1133        let mut leaves = self
1134            .consensus()
1135            .await
1136            .read()
1137            .await
1138            .consensus()
1139            .read()
1140            .await
1141            .undecided_leaves();
1142        leaves.sort_by_key(|l| l.view_number());
1143        let (position, mut last_leaf) = leaves
1144            .iter()
1145            .find_position(|l| l.height() == height)
1146            .context(format!("leaf chain not available for {height}"))?;
1147        let mut chain = vec![last_leaf.clone()];
1148        for leaf in leaves.iter().skip(position + 1) {
1149            if leaf.justify_qc().view_number() == last_leaf.view_number() {
1150                chain.push(leaf.clone());
1151            } else {
1152                continue;
1153            }
1154            if leaf.view_number() == last_leaf.view_number() + 1 {
1155                // one away from decide
1156                last_leaf = leaf;
1157                break;
1158            }
1159            last_leaf = leaf;
1160        }
1161        // Make sure we got one more leaf to confirm the decide
1162        for leaf in leaves
1163            .iter()
1164            .skip_while(|l| l.view_number() <= last_leaf.view_number())
1165        {
1166            if leaf.justify_qc().view_number() == last_leaf.view_number() {
1167                chain.push(leaf.clone());
1168                return Ok(chain);
1169            }
1170        }
1171        bail!(format!("leaf chain not available for {height}"))
1172    }
1173
1174    #[tracing::instrument(skip(self, _instance))]
1175    async fn get_reward_accounts_v2(
1176        &self,
1177        _instance: &NodeState,
1178        height: u64,
1179        view: ViewNumber,
1180        accounts: &[RewardAccountV2],
1181    ) -> anyhow::Result<RewardMerkleTreeV2> {
1182        let state = self
1183            .consensus()
1184            .await
1185            .read()
1186            .await
1187            .state(view)
1188            .await
1189            .context(format!(
1190                "state not available for height {height}, view {view}"
1191            ))?;
1192
1193        retain_v2_reward_accounts(&state.reward_merkle_tree_v2, accounts.iter().copied())
1194    }
1195
1196    // We can iterate over the in-memory reward merkle tree
1197    // however, there is no guarantee that we have all the accounts in
1198    // in-memory reward tree
1199    // So, we only query the state table in database for the reward accounts
1200    // We never hit this because we only query the storage in `StorageState``
1201    // trait implementation
1202    async fn get_all_reward_accounts(
1203        &self,
1204        _height: u64,
1205        _offset: u64,
1206        _limit: u64,
1207    ) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
1208        bail!("get_all_reward_accounts is not implemented for ApiState")
1209    }
1210
1211    #[tracing::instrument(skip(self, _instance))]
1212    async fn get_reward_accounts_v1(
1213        &self,
1214        _instance: &NodeState,
1215        height: u64,
1216        view: ViewNumber,
1217        accounts: &[RewardAccountV1],
1218    ) -> anyhow::Result<RewardMerkleTreeV1> {
1219        let state = self
1220            .consensus()
1221            .await
1222            .read()
1223            .await
1224            .state(view)
1225            .await
1226            .context(format!(
1227                "state not available for height {height}, view {view}"
1228            ))?;
1229
1230        retain_v1_reward_accounts(&state.reward_merkle_tree_v1, accounts.iter().copied())
1231    }
1232
1233    async fn get_state_cert(
1234        &self,
1235        epoch: u64,
1236    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1237        self.get_state_cert_by_epoch(epoch)
1238            .await?
1239            .context(format!("state cert not found for epoch {epoch}"))
1240    }
1241}
1242
1243impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
1244    HotShotConfigDataSource for StorageState<N, P, D, V>
1245{
1246    async fn get_config(&self) -> PublicNetworkConfig {
1247        self.as_ref().network_config().await.into()
1248    }
1249}
1250
1251impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> HotShotConfigDataSource
1252    for ApiState<N, P, V>
1253{
1254    async fn get_config(&self) -> PublicNetworkConfig {
1255        self.network_config().await.into()
1256    }
1257}
1258
1259#[async_trait]
1260impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
1261    StateSignatureDataSource<N> for StorageState<N, P, D, V>
1262{
1263    async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1264        self.as_ref().get_state_signature(height).await
1265    }
1266}
1267
1268#[async_trait]
1269impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> StateSignatureDataSource<N>
1270    for ApiState<N, P, V>
1271{
1272    async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1273        self.state_signer()
1274            .await
1275            .read()
1276            .await
1277            .get_state_signature(height)
1278            .await
1279    }
1280}
1281
1282pub(crate) trait RewardAccountProofDataSource: Sync {
1283    fn load_v1_reward_account_proof(
1284        &self,
1285        _height: u64,
1286        _account: RewardAccountV1,
1287    ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>> {
1288        async {
1289            bail!("reward merklized state is not supported for this data source");
1290        }
1291    }
1292
1293    fn load_v2_reward_account_proof(
1294        &self,
1295        _height: u64,
1296        _account: RewardAccountV2,
1297    ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1298        async {
1299            bail!("reward merklized state is not supported for this data source");
1300        }
1301    }
1302}
1303
1304impl RewardAccountProofDataSource for hotshot_query_service::data_source::MetricsDataSource {}
1305
1306impl<T, S> RewardAccountProofDataSource
1307    for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
1308where
1309    T: RewardAccountProofDataSource,
1310    S: Sync,
1311{
1312    async fn load_v1_reward_account_proof(
1313        &self,
1314        height: u64,
1315        account: RewardAccountV1,
1316    ) -> anyhow::Result<RewardAccountQueryDataV1> {
1317        self.inner()
1318            .load_v1_reward_account_proof(height, account)
1319            .await
1320    }
1321
1322    async fn load_v2_reward_account_proof(
1323        &self,
1324        height: u64,
1325        account: RewardAccountV2,
1326    ) -> anyhow::Result<RewardAccountQueryDataV2> {
1327        self.inner()
1328            .load_v2_reward_account_proof(height, account)
1329            .await
1330    }
1331}
1332
1333#[cfg(any(test, feature = "testing"))]
1334pub mod test_helpers {
1335    use std::{cmp::max, time::Duration};
1336
1337    use alloy::{
1338        network::EthereumWallet,
1339        primitives::{Address, U256},
1340        providers::{ext::AnvilApi, ProviderBuilder},
1341    };
1342    use committable::Committable;
1343    use espresso_contract_deployer::{
1344        builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
1345        Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1346    };
1347    use espresso_types::{
1348        v0::traits::{NullEventConsumer, PersistenceOptions, StateCatchup},
1349        DrbAndHeaderUpgradeVersion, EpochVersion, FeeVersion, MockSequencerVersions, NamespaceId,
1350        SequencerVersions, ValidatedState, V0_1,
1351    };
1352    use futures::{
1353        future::{join_all, FutureExt},
1354        stream::StreamExt,
1355    };
1356    use hotshot::types::{Event, EventType};
1357    use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1358    use hotshot_types::{
1359        event::LeafInfo,
1360        light_client::LCV3StateSignatureRequestBody,
1361        traits::{metrics::NoMetrics, node_implementation::ConsensusTime},
1362        HotShotConfig,
1363    };
1364    use itertools::izip;
1365    use jf_merkle_tree_compat::{MerkleCommitment, MerkleTreeScheme};
1366    use portpicker::pick_unused_port;
1367    use staking_cli::demo::{DelegationConfig, StakingTransactions};
1368    use surf_disco::Client;
1369    use tempfile::TempDir;
1370    use tide_disco::{error::ServerError, Api, App, Error, StatusCode};
1371    use tokio::{spawn, task::JoinHandle, time::sleep};
1372    use url::Url;
1373    use vbs::version::{StaticVersion, StaticVersionType};
1374
1375    use super::*;
1376    use crate::{
1377        catchup::NullStateCatchup,
1378        network,
1379        persistence::no_storage,
1380        testing::{run_legacy_builder, wait_for_decide_on_handle, TestConfig, TestConfigBuilder},
1381    };
1382
1383    pub const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
1384
1385    pub struct TestNetwork<P: PersistenceOptions, const NUM_NODES: usize, V: Versions> {
1386        pub server: SequencerContext<network::Memory, P::Persistence, V>,
1387        pub peers: Vec<SequencerContext<network::Memory, P::Persistence, V>>,
1388        pub cfg: TestConfig<{ NUM_NODES }>,
1389        // todo (abdul): remove this when fs storage is removed
1390        pub temp_dir: Option<TempDir>,
1391        pub contracts: Option<Contracts>,
1392    }
1393
1394    pub struct TestNetworkConfig<const NUM_NODES: usize, P, C>
1395    where
1396        P: PersistenceOptions,
1397        C: StateCatchup + 'static,
1398    {
1399        state: [ValidatedState; NUM_NODES],
1400        persistence: [P; NUM_NODES],
1401        catchup: [C; NUM_NODES],
1402        network_config: TestConfig<{ NUM_NODES }>,
1403        api_config: Options,
1404        contracts: Option<Contracts>,
1405    }
1406
1407    impl<const NUM_NODES: usize, P, C> TestNetworkConfig<{ NUM_NODES }, P, C>
1408    where
1409        P: PersistenceOptions,
1410        C: StateCatchup + 'static,
1411    {
1412        pub fn states(&self) -> [ValidatedState; NUM_NODES] {
1413            self.state.clone()
1414        }
1415    }
1416    #[derive(Clone)]
1417    pub struct TestNetworkConfigBuilder<const NUM_NODES: usize, P, C>
1418    where
1419        P: PersistenceOptions,
1420        C: StateCatchup + 'static,
1421    {
1422        state: [ValidatedState; NUM_NODES],
1423        persistence: Option<[P; NUM_NODES]>,
1424        catchup: Option<[C; NUM_NODES]>,
1425        api_config: Option<Options>,
1426        network_config: Option<TestConfig<{ NUM_NODES }>>,
1427        contracts: Option<Contracts>,
1428    }
1429
1430    impl Default for TestNetworkConfigBuilder<5, no_storage::Options, NullStateCatchup> {
1431        fn default() -> Self {
1432            TestNetworkConfigBuilder {
1433                state: std::array::from_fn(|_| ValidatedState::default()),
1434                persistence: Some([no_storage::Options; 5]),
1435                catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1436                network_config: None,
1437                api_config: None,
1438                contracts: None,
1439            }
1440        }
1441    }
1442
1443    pub enum AnyTestNetwork<P: PersistenceOptions, const NUM_NODES: usize> {
1444        V0_1(TestNetwork<P, NUM_NODES, SequencerVersions<V0_1, V0_1>>),
1445        V0_2(TestNetwork<P, NUM_NODES, SequencerVersions<FeeVersion, FeeVersion>>),
1446        V0_3(TestNetwork<P, NUM_NODES, SequencerVersions<EpochVersion, EpochVersion>>),
1447        V0_4(
1448            TestNetwork<
1449                P,
1450                NUM_NODES,
1451                SequencerVersions<DrbAndHeaderUpgradeVersion, DrbAndHeaderUpgradeVersion>,
1452            >,
1453        ),
1454    }
1455
1456    impl<P: PersistenceOptions, const NUM_NODES: usize> AnyTestNetwork<P, NUM_NODES> {
1457        pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1458            match self {
1459                AnyTestNetwork::V0_1(network) => network.cfg.hotshot_config(),
1460                AnyTestNetwork::V0_2(network) => network.cfg.hotshot_config(),
1461                AnyTestNetwork::V0_3(network) => network.cfg.hotshot_config(),
1462                AnyTestNetwork::V0_4(network) => network.cfg.hotshot_config(),
1463            }
1464        }
1465    }
1466
1467    impl<const NUM_NODES: usize>
1468        TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1469    {
1470        pub fn with_num_nodes(
1471        ) -> TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1472        {
1473            TestNetworkConfigBuilder {
1474                state: std::array::from_fn(|_| ValidatedState::default()),
1475                persistence: Some([no_storage::Options; { NUM_NODES }]),
1476                catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1477                network_config: None,
1478                api_config: None,
1479                contracts: None,
1480            }
1481        }
1482    }
1483
1484    impl<const NUM_NODES: usize, P, C> TestNetworkConfigBuilder<{ NUM_NODES }, P, C>
1485    where
1486        P: PersistenceOptions,
1487        C: StateCatchup + 'static,
1488    {
1489        pub fn states(mut self, state: [ValidatedState; NUM_NODES]) -> Self {
1490            self.state = state;
1491            self
1492        }
1493
1494        pub fn persistences<NP: PersistenceOptions>(
1495            self,
1496            persistence: [NP; NUM_NODES],
1497        ) -> TestNetworkConfigBuilder<{ NUM_NODES }, NP, C> {
1498            TestNetworkConfigBuilder {
1499                state: self.state,
1500                catchup: self.catchup,
1501                network_config: self.network_config,
1502                api_config: self.api_config,
1503                persistence: Some(persistence),
1504                contracts: self.contracts,
1505            }
1506        }
1507
1508        pub fn api_config(mut self, api_config: Options) -> Self {
1509            self.api_config = Some(api_config);
1510            self
1511        }
1512
1513        pub fn catchups<NC: StateCatchup + 'static>(
1514            self,
1515            catchup: [NC; NUM_NODES],
1516        ) -> TestNetworkConfigBuilder<{ NUM_NODES }, P, NC> {
1517            TestNetworkConfigBuilder {
1518                state: self.state,
1519                catchup: Some(catchup),
1520                network_config: self.network_config,
1521                api_config: self.api_config,
1522                persistence: self.persistence,
1523                contracts: self.contracts,
1524            }
1525        }
1526
1527        pub fn network_config(mut self, network_config: TestConfig<{ NUM_NODES }>) -> Self {
1528            self.network_config = Some(network_config);
1529            self
1530        }
1531
1532        pub fn contracts(mut self, contracts: Contracts) -> Self {
1533            self.contracts = Some(contracts);
1534            self
1535        }
1536
1537        /// Setup for POS testing. Deploys contracts and adds the
1538        /// stake table address to state. Must be called before `build()`.
1539        pub async fn pos_hook<V: Versions>(
1540            self,
1541            delegation_config: DelegationConfig,
1542            stake_table_version: StakeTableContractVersion,
1543        ) -> anyhow::Result<Self> {
1544            if <V as Versions>::Upgrade::VERSION < EpochVersion::VERSION
1545                && <V as Versions>::Base::VERSION < EpochVersion::VERSION
1546            {
1547                panic!("given version does not require pos deployment");
1548            };
1549
1550            let network_config = self
1551                .network_config
1552                .as_ref()
1553                .expect("network_config is required");
1554
1555            let l1_url = network_config.l1_url();
1556            let signer = network_config.signer();
1557            let deployer = ProviderBuilder::new()
1558                .wallet(EthereumWallet::from(signer.clone()))
1559                .connect_http(l1_url.clone());
1560
1561            let blocks_per_epoch = network_config.hotshot_config().epoch_height;
1562            let epoch_start_block = network_config.hotshot_config().epoch_start_block;
1563            let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1564                &network_config.hotshot_config().hotshot_stake_table(),
1565                STAKE_TABLE_CAPACITY_FOR_TEST,
1566            )
1567            .unwrap();
1568
1569            let mut contracts = Contracts::new();
1570            let args = DeployerArgsBuilder::default()
1571                .deployer(deployer.clone())
1572                .rpc_url(l1_url.clone())
1573                .mock_light_client(true)
1574                .genesis_lc_state(genesis_state)
1575                .genesis_st_state(genesis_stake)
1576                .blocks_per_epoch(blocks_per_epoch)
1577                .epoch_start_block(epoch_start_block)
1578                .exit_escrow_period(U256::from(max(
1579                    blocks_per_epoch * 15 + 100,
1580                    DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1581                )))
1582                .multisig_pauser(signer.address())
1583                .token_name("Espresso".to_string())
1584                .token_symbol("ESP".to_string())
1585                .initial_token_supply(U256::from(100000u64))
1586                .ops_timelock_delay(U256::from(0))
1587                .ops_timelock_admin(signer.address())
1588                .ops_timelock_proposers(vec![signer.address()])
1589                .ops_timelock_executors(vec![signer.address()])
1590                .safe_exit_timelock_delay(U256::from(10))
1591                .safe_exit_timelock_admin(signer.address())
1592                .safe_exit_timelock_proposers(vec![signer.address()])
1593                .safe_exit_timelock_executors(vec![signer.address()])
1594                .build()
1595                .unwrap();
1596
1597            match stake_table_version {
1598                StakeTableContractVersion::V1 => {
1599                    args.deploy_to_stake_table_v1(&mut contracts).await
1600                },
1601                StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1602            }
1603            .context("failed to deploy contracts")?;
1604
1605            let stake_table_address = contracts
1606                .address(Contract::StakeTableProxy)
1607                .expect("StakeTableProxy address not found");
1608
1609            StakingTransactions::create(
1610                l1_url.clone(),
1611                &deployer,
1612                stake_table_address,
1613                network_config.staking_priv_keys(),
1614                None,
1615                delegation_config,
1616            )
1617            .await
1618            .expect("stake table setup failed")
1619            .apply_all()
1620            .await
1621            .expect("send all txns failed");
1622
1623            // enable interval mining with a 1s interval.
1624            // This ensures that blocks are finalized every second, even when there are no transactions.
1625            // It's useful for testing stake table updates,
1626            // which rely on the finalized L1 block number.
1627            if let Some(anvil) = network_config.anvil() {
1628                anvil
1629                    .anvil_set_interval_mining(1)
1630                    .await
1631                    .expect("interval mining");
1632            }
1633
1634            // Add stake table address to `ChainConfig` (held in state),
1635            // avoiding overwrite other values. Base fee is set to `0` to avoid
1636            // unnecessary catchup of `FeeState`.
1637            let state = self.state[0].clone();
1638            let chain_config = if let Some(cf) = state.chain_config.resolve() {
1639                ChainConfig {
1640                    base_fee: 0.into(),
1641                    stake_table_contract: Some(stake_table_address),
1642                    ..cf
1643                }
1644            } else {
1645                ChainConfig {
1646                    base_fee: 0.into(),
1647                    stake_table_contract: Some(stake_table_address),
1648                    ..Default::default()
1649                }
1650            };
1651
1652            let state = ValidatedState {
1653                chain_config: chain_config.into(),
1654                ..state
1655            };
1656            Ok(self
1657                .states(std::array::from_fn(|_| state.clone()))
1658                .contracts(contracts))
1659        }
1660
1661        pub fn build(self) -> TestNetworkConfig<{ NUM_NODES }, P, C> {
1662            TestNetworkConfig {
1663                state: self.state,
1664                persistence: self.persistence.unwrap(),
1665                catchup: self.catchup.unwrap(),
1666                network_config: self.network_config.unwrap(),
1667                api_config: self.api_config.unwrap(),
1668                contracts: self.contracts,
1669            }
1670        }
1671    }
1672
1673    impl<P: PersistenceOptions, const NUM_NODES: usize, V: Versions> TestNetwork<P, { NUM_NODES }, V> {
1674        pub async fn new<C: StateCatchup + 'static>(
1675            cfg: TestNetworkConfig<{ NUM_NODES }, P, C>,
1676            bind_version: V,
1677        ) -> Self {
1678            let mut cfg = cfg;
1679            let mut builder_tasks = Vec::new();
1680
1681            let chain_config = cfg.state[0].chain_config.resolve();
1682            if chain_config.is_none() {
1683                tracing::warn!("Chain config is not set, using default max_block_size");
1684            }
1685            let (task, builder_url) = run_legacy_builder::<{ NUM_NODES }>(
1686                cfg.network_config.builder_port(),
1687                chain_config.map(|c| *c.max_block_size),
1688            )
1689            .await;
1690            builder_tasks.push(task);
1691            cfg.network_config
1692                .set_builder_urls(vec1::vec1![builder_url.clone()]);
1693
1694            // add default storage if none is provided as query module is now required
1695            let mut opt = cfg.api_config.clone();
1696            let temp_dir = if opt.storage_fs.is_none() && opt.storage_sql.is_none() {
1697                let temp_dir = tempfile::tempdir().unwrap();
1698                opt = opt.query_fs(
1699                    Default::default(),
1700                    crate::persistence::fs::Options::new(temp_dir.path().to_path_buf()),
1701                );
1702                Some(temp_dir)
1703            } else {
1704                None
1705            };
1706
1707            let mut nodes = join_all(
1708                izip!(cfg.state, cfg.persistence, cfg.catchup)
1709                    .enumerate()
1710                    .map(|(i, (state, persistence, state_peers))| {
1711                        let opt = opt.clone();
1712                        let cfg = &cfg.network_config;
1713                        let upgrades_map = cfg.upgrades();
1714                        async move {
1715                            if i == 0 {
1716                                opt.serve(|metrics, consumer, storage| {
1717                                    let cfg = cfg.clone();
1718                                    async move {
1719                                        Ok(cfg
1720                                            .init_node(
1721                                                0,
1722                                                state,
1723                                                persistence,
1724                                                Some(state_peers),
1725                                                storage,
1726                                                &*metrics,
1727                                                STAKE_TABLE_CAPACITY_FOR_TEST,
1728                                                consumer,
1729                                                bind_version,
1730                                                upgrades_map,
1731                                            )
1732                                            .await)
1733                                    }
1734                                    .boxed()
1735                                })
1736                                .await
1737                                .unwrap()
1738                            } else {
1739                                cfg.init_node(
1740                                    i,
1741                                    state,
1742                                    persistence,
1743                                    Some(state_peers),
1744                                    None,
1745                                    &NoMetrics,
1746                                    STAKE_TABLE_CAPACITY_FOR_TEST,
1747                                    NullEventConsumer,
1748                                    bind_version,
1749                                    upgrades_map,
1750                                )
1751                                .await
1752                            }
1753                        }
1754                    }),
1755            )
1756            .await;
1757
1758            let handle_0 = &nodes[0];
1759
1760            // Hook the builder(s) up to the event stream from the first node
1761            for builder_task in builder_tasks {
1762                builder_task.start(Box::new(handle_0.event_stream().await));
1763            }
1764
1765            for ctx in &nodes {
1766                ctx.start_consensus().await;
1767            }
1768
1769            let server = nodes.remove(0);
1770            let peers = nodes;
1771
1772            Self {
1773                server,
1774                peers,
1775                cfg: cfg.network_config,
1776                temp_dir,
1777                contracts: cfg.contracts,
1778            }
1779        }
1780
1781        pub async fn stop_consensus(&mut self) {
1782            self.server.shutdown_consensus().await;
1783
1784            for ctx in &mut self.peers {
1785                ctx.shutdown_consensus().await;
1786            }
1787        }
1788    }
1789
1790    /// Test the status API with custom options.
1791    ///
1792    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
1793    /// By default, the options are the minimal required to run this test (configuring a port and
1794    /// enabling the status API). `opt` may add additional functionality (e.g. adding a query module
1795    /// to test a different initialization path) but should not remove or modify the existing
1796    /// functionality (e.g. removing the status module or changing the port).
1797    pub async fn status_test_helper(opt: impl FnOnce(Options) -> Options) {
1798        let port = pick_unused_port().expect("No ports free");
1799        let url = format!("http://localhost:{port}").parse().unwrap();
1800        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1801
1802        let options = opt(Options::with_port(port));
1803        let network_config = TestConfigBuilder::default().build();
1804        let config = TestNetworkConfigBuilder::default()
1805            .api_config(options)
1806            .network_config(network_config)
1807            .build();
1808        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1809        client.connect(None).await;
1810
1811        // The status API is well tested in the query service repo. Here we are just smoke testing
1812        // that we set it up correctly. Wait for a (non-genesis) block to be sequenced and then
1813        // check the success rate metrics.
1814        while client
1815            .get::<u64>("status/block-height")
1816            .send()
1817            .await
1818            .unwrap()
1819            <= 1
1820        {
1821            sleep(Duration::from_secs(1)).await;
1822        }
1823        let success_rate = client
1824            .get::<f64>("status/success-rate")
1825            .send()
1826            .await
1827            .unwrap();
1828        // If metrics are populating correctly, we should get a finite number. If not, we might get
1829        // NaN or infinity due to division by 0.
1830        assert!(success_rate.is_finite(), "{success_rate}");
1831        // We know at least some views have been successful, since we finalized a block.
1832        assert!(success_rate > 0.0, "{success_rate}");
1833    }
1834
1835    /// Test the submit API with custom options.
1836    ///
1837    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
1838    /// By default, the options are the minimal required to run this test (configuring a port and
1839    /// enabling the submit API). `opt` may add additional functionality (e.g. adding a query module
1840    /// to test a different initialization path) but should not remove or modify the existing
1841    /// functionality (e.g. removing the submit module or changing the port).
1842    pub async fn submit_test_helper(opt: impl FnOnce(Options) -> Options) {
1843        let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3, 4]);
1844
1845        let port = pick_unused_port().expect("No ports free");
1846
1847        let url = format!("http://localhost:{port}").parse().unwrap();
1848        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1849
1850        let options = opt(Options::with_port(port).submit(Default::default()));
1851        let network_config = TestConfigBuilder::default().build();
1852        let config = TestNetworkConfigBuilder::default()
1853            .api_config(options)
1854            .network_config(network_config)
1855            .build();
1856        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1857        let mut events = network.server.event_stream().await;
1858
1859        client.connect(None).await;
1860
1861        let hash = client
1862            .post("submit/submit")
1863            .body_json(&txn)
1864            .unwrap()
1865            .send()
1866            .await
1867            .unwrap();
1868        assert_eq!(txn.commit(), hash);
1869
1870        // Wait for a Decide event containing transaction matching the one we sent
1871        wait_for_decide_on_handle(&mut events, &txn).await;
1872    }
1873
1874    /// Test the state signature API.
1875    pub async fn state_signature_test_helper(opt: impl FnOnce(Options) -> Options) {
1876        let port = pick_unused_port().expect("No ports free");
1877
1878        let url = format!("http://localhost:{port}").parse().unwrap();
1879
1880        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1881
1882        let options = opt(Options::with_port(port));
1883        let network_config = TestConfigBuilder::default().build();
1884        let config = TestNetworkConfigBuilder::default()
1885            .api_config(options)
1886            .network_config(network_config)
1887            .build();
1888        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1889
1890        let mut height: u64;
1891        // Wait for block >=2 appears
1892        // It's waiting for an extra second to make sure that the signature is generated
1893        loop {
1894            height = network.server.decided_leaf().await.height();
1895            sleep(std::time::Duration::from_secs(1)).await;
1896            if height >= 2 {
1897                break;
1898            }
1899        }
1900        // we cannot verify the signature now, because we don't know the stake table
1901        client
1902            .get::<LCV3StateSignatureRequestBody>(&format!("state-signature/block/{height}"))
1903            .send()
1904            .await
1905            .unwrap();
1906    }
1907
1908    /// Test the catchup API with custom options.
1909    ///
1910    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
1911    /// By default, the options are the minimal required to run this test (configuring a port and
1912    /// enabling the catchup API). `opt` may add additional functionality (e.g. adding a query module
1913    /// to test a different initialization path) but should not remove or modify the existing
1914    /// functionality (e.g. removing the catchup module or changing the port).
1915    pub async fn catchup_test_helper(opt: impl FnOnce(Options) -> Options) {
1916        let port = pick_unused_port().expect("No ports free");
1917        let url = format!("http://localhost:{port}").parse().unwrap();
1918        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1919
1920        let options = opt(Options::with_port(port));
1921        let network_config = TestConfigBuilder::default().build();
1922        let config = TestNetworkConfigBuilder::default()
1923            .api_config(options)
1924            .network_config(network_config)
1925            .build();
1926        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1927        client.connect(None).await;
1928
1929        // Wait for a few blocks to be decided.
1930        let mut events = network.server.event_stream().await;
1931        loop {
1932            if let Event {
1933                event: EventType::Decide { leaf_chain, .. },
1934                ..
1935            } = events.next().await.unwrap()
1936            {
1937                if leaf_chain
1938                    .iter()
1939                    .any(|LeafInfo { leaf, .. }| leaf.block_header().height() > 2)
1940                {
1941                    break;
1942                }
1943            }
1944        }
1945
1946        // Stop consensus running on the node so we freeze the decided and undecided states.
1947        // We'll let it go out of scope here since it's a write lock.
1948        {
1949            network.server.shutdown_consensus().await;
1950        }
1951
1952        // Undecided fee state: absent account.
1953        let leaf = network.server.decided_leaf().await;
1954        let height = leaf.height() + 1;
1955        let view = leaf.view_number() + 1;
1956        let res = client
1957            .get::<AccountQueryData>(&format!(
1958                "catchup/{height}/{}/account/{:x}",
1959                view.u64(),
1960                Address::default()
1961            ))
1962            .send()
1963            .await
1964            .unwrap();
1965        assert_eq!(res.balance, U256::ZERO);
1966        assert_eq!(
1967            res.proof
1968                .verify(
1969                    &network
1970                        .server
1971                        .state(view)
1972                        .await
1973                        .unwrap()
1974                        .fee_merkle_tree
1975                        .commitment()
1976                )
1977                .unwrap(),
1978            U256::ZERO,
1979        );
1980
1981        // Undecided block state.
1982        let res = client
1983            .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
1984            .send()
1985            .await
1986            .unwrap();
1987        let root = &network
1988            .server
1989            .state(view)
1990            .await
1991            .unwrap()
1992            .block_merkle_tree
1993            .commitment();
1994        BlockMerkleTree::verify(root, root.size() - 1, res)
1995            .unwrap()
1996            .unwrap();
1997    }
1998
1999    pub async fn spawn_dishonest_peer_catchup_api() -> anyhow::Result<(Url, JoinHandle<()>)> {
2000        let toml = toml::from_str::<toml::Value>(include_str!("../api/catchup.toml")).unwrap();
2001        let mut api =
2002            Api::<(), hotshot_query_service::Error, SequencerApiVersion>::new(toml).unwrap();
2003
2004        api.get("account", |_req, _state: &()| {
2005            async move {
2006                Result::<AccountQueryData, _>::Err(hotshot_query_service::Error::catch_all(
2007                    StatusCode::BAD_REQUEST,
2008                    "no account found".to_string(),
2009                ))
2010            }
2011            .boxed()
2012        })?
2013        .get("blocks", |_req, _state| {
2014            async move {
2015                Result::<BlocksFrontier, _>::Err(hotshot_query_service::Error::catch_all(
2016                    StatusCode::BAD_REQUEST,
2017                    "no block found".to_string(),
2018                ))
2019            }
2020            .boxed()
2021        })?
2022        .get("chainconfig", |_req, _state| {
2023            async move {
2024                Result::<ChainConfig, _>::Ok(ChainConfig {
2025                    max_block_size: 300.into(),
2026                    base_fee: 1.into(),
2027                    fee_recipient: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
2028                        .parse()
2029                        .unwrap(),
2030                    ..Default::default()
2031                })
2032            }
2033            .boxed()
2034        })?
2035        .get("leafchain", |_req, _state| {
2036            async move {
2037                Result::<Vec<Leaf2>, _>::Err(hotshot_query_service::Error::catch_all(
2038                    StatusCode::BAD_REQUEST,
2039                    "No leafchain found".to_string(),
2040                ))
2041            }
2042            .boxed()
2043        })?;
2044
2045        let mut app = App::<_, hotshot_query_service::Error>::with_state(());
2046        app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap());
2047
2048        app.register_module::<_, _>("catchup", api).unwrap();
2049
2050        let port = pick_unused_port().expect("no free port");
2051        let url: Url = Url::parse(&format!("http://localhost:{port}")).unwrap();
2052
2053        let handle = spawn({
2054            let url = url.clone();
2055            async move {
2056                let _ = app.serve(url, SequencerApiVersion::instance()).await;
2057            }
2058        });
2059
2060        Ok((url, handle))
2061    }
2062}
2063
2064#[cfg(test)]
2065mod api_tests {
2066    use std::{fmt::Debug, marker::PhantomData};
2067
2068    use committable::Committable;
2069    use data_source::testing::TestableSequencerDataSource;
2070    use espresso_types::{
2071        traits::{EventConsumer, PersistenceOptions},
2072        Header, Leaf2, MockSequencerVersions, NamespaceId, NamespaceProofQueryData, ValidatedState,
2073    };
2074    use futures::{future, stream::StreamExt};
2075    use hotshot_example_types::node_types::TestVersions;
2076    use hotshot_query_service::availability::{
2077        AvailabilityDataSource, BlockQueryData, VidCommonQueryData,
2078    };
2079    use hotshot_types::{
2080        data::{
2081            ns_table::parse_ns_table, vid_disperse::AvidMDisperseShare, DaProposal2, EpochNumber,
2082            QuorumProposal2, QuorumProposalWrapper, VidCommitment, VidDisperseShare,
2083        },
2084        event::LeafInfo,
2085        message::Proposal,
2086        simple_certificate::{CertificatePair, QuorumCertificate2},
2087        traits::{node_implementation::ConsensusTime, signature_key::SignatureKey, EncodeBytes},
2088        utils::EpochTransitionIndicator,
2089        vid::avidm::{init_avidm_param, AvidMScheme},
2090    };
2091    use portpicker::pick_unused_port;
2092    use surf_disco::Client;
2093    use test_helpers::{
2094        catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
2095        TestNetwork, TestNetworkConfigBuilder,
2096    };
2097    use tide_disco::error::ServerError;
2098    use vbs::version::StaticVersion;
2099
2100    use super::{update::ApiEventConsumer, *};
2101    use crate::{
2102        network,
2103        persistence::no_storage::NoStorage,
2104        testing::{wait_for_decide_on_handle, TestConfigBuilder},
2105    };
2106
2107    #[rstest_reuse::template]
2108    #[rstest::rstest]
2109    #[case(PhantomData::<crate::api::sql::DataSource>)]
2110    #[case(PhantomData::<crate::api::fs::DataSource>)]
2111    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2112    pub fn testable_sequencer_data_source<D: TestableSequencerDataSource>(
2113        #[case] _d: PhantomData<D>,
2114    ) {
2115    }
2116
2117    #[rstest_reuse::apply(testable_sequencer_data_source)]
2118    pub(crate) async fn submit_test_with_query_module<D: TestableSequencerDataSource>(
2119        _d: PhantomData<D>,
2120    ) {
2121        let storage = D::create_storage().await;
2122        submit_test_helper(|opt| D::options(&storage, opt)).await
2123    }
2124
2125    #[rstest_reuse::apply(testable_sequencer_data_source)]
2126    pub(crate) async fn status_test_with_query_module<D: TestableSequencerDataSource>(
2127        _d: PhantomData<D>,
2128    ) {
2129        let storage = D::create_storage().await;
2130        status_test_helper(|opt| D::options(&storage, opt)).await
2131    }
2132
2133    #[rstest_reuse::apply(testable_sequencer_data_source)]
2134    pub(crate) async fn state_signature_test_with_query_module<D: TestableSequencerDataSource>(
2135        _d: PhantomData<D>,
2136    ) {
2137        let storage = D::create_storage().await;
2138        state_signature_test_helper(|opt| D::options(&storage, opt)).await
2139    }
2140
2141    #[rstest_reuse::apply(testable_sequencer_data_source)]
2142    pub(crate) async fn test_namespace_query<D: TestableSequencerDataSource>(_d: PhantomData<D>) {
2143        // Arbitrary transaction, arbitrary namespace ID
2144        let ns_id = NamespaceId::from(42_u32);
2145        let txn = Transaction::new(ns_id, vec![1, 2, 3, 4]);
2146
2147        // Start query service.
2148        let port = pick_unused_port().expect("No ports free");
2149        let storage = D::create_storage().await;
2150        let network_config = TestConfigBuilder::default().build();
2151        let config = TestNetworkConfigBuilder::default()
2152            .api_config(D::options(&storage, Options::with_port(port)).submit(Default::default()))
2153            .network_config(network_config)
2154            .build();
2155        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2156        let mut events = network.server.event_stream().await;
2157
2158        // Connect client.
2159        let client: Client<ServerError, StaticVersion<0, 1>> =
2160            Client::new(format!("http://localhost:{port}").parse().unwrap());
2161        client.connect(None).await;
2162
2163        let hash = client
2164            .post("submit/submit")
2165            .body_json(&txn)
2166            .unwrap()
2167            .send()
2168            .await
2169            .unwrap();
2170        assert_eq!(txn.commit(), hash);
2171
2172        // Wait for a Decide event containing transaction matching the one we sent
2173        let block_height = wait_for_decide_on_handle(&mut events, &txn).await.0 as usize;
2174        tracing::info!(block_height, "transaction sequenced");
2175
2176        // Submit a second transaction for range queries.
2177        let txn2 = Transaction::new(ns_id, vec![5, 6, 7, 8]);
2178        client
2179            .post::<Commitment<Transaction>>("submit/submit")
2180            .body_json(&txn2)
2181            .unwrap()
2182            .send()
2183            .await
2184            .unwrap();
2185        let block_height2 = wait_for_decide_on_handle(&mut events, &txn2).await.0 as usize;
2186        tracing::info!(block_height2, "transaction sequenced");
2187
2188        // Wait for the query service to update to this block height.
2189        client
2190            .socket(&format!("availability/stream/blocks/{block_height2}"))
2191            .subscribe::<BlockQueryData<SeqTypes>>()
2192            .await
2193            .unwrap()
2194            .next()
2195            .await
2196            .unwrap()
2197            .unwrap();
2198
2199        let mut found_txn = false;
2200        let mut found_empty_block = false;
2201        for block_num in 0..=block_height {
2202            let header: Header = client
2203                .get(&format!("availability/header/{block_num}"))
2204                .send()
2205                .await
2206                .unwrap();
2207            let ns_query_res: NamespaceProofQueryData = client
2208                .get(&format!("availability/block/{block_num}/namespace/{ns_id}"))
2209                .send()
2210                .await
2211                .unwrap();
2212
2213            // Check other means of querying the same proof.
2214            assert_eq!(
2215                ns_query_res,
2216                client
2217                    .get(&format!(
2218                        "availability/block/hash/{}/namespace/{ns_id}",
2219                        header.commit()
2220                    ))
2221                    .send()
2222                    .await
2223                    .unwrap()
2224            );
2225            assert_eq!(
2226                ns_query_res,
2227                client
2228                    .get(&format!(
2229                        "availability/block/payload-hash/{}/namespace/{ns_id}",
2230                        header.payload_commitment()
2231                    ))
2232                    .send()
2233                    .await
2234                    .unwrap()
2235            );
2236
2237            // Verify namespace proof if present
2238            if let Some(ns_proof) = ns_query_res.proof {
2239                let vid_common: VidCommonQueryData<SeqTypes> = client
2240                    .get(&format!("availability/vid/common/{block_num}"))
2241                    .send()
2242                    .await
2243                    .unwrap();
2244                ns_proof
2245                    .verify(
2246                        header.ns_table(),
2247                        &header.payload_commitment(),
2248                        vid_common.common(),
2249                    )
2250                    .unwrap();
2251            } else {
2252                // Namespace proof should be present if ns_id exists in ns_table
2253                assert!(header.ns_table().find_ns_id(&ns_id).is_none());
2254                assert!(ns_query_res.transactions.is_empty());
2255            }
2256
2257            found_empty_block = found_empty_block || ns_query_res.transactions.is_empty();
2258
2259            for txn in ns_query_res.transactions {
2260                if txn.commit() == hash {
2261                    // Ensure that we validate an inclusion proof
2262                    found_txn = true;
2263                }
2264            }
2265        }
2266        assert!(found_txn);
2267        assert!(found_empty_block);
2268
2269        // Test range query.
2270        let ns_proofs: Vec<NamespaceProofQueryData> = client
2271            .get(&format!(
2272                "availability/block/{block_height}/{}/namespace/{ns_id}",
2273                block_height2 + 1
2274            ))
2275            .send()
2276            .await
2277            .unwrap();
2278        assert_eq!(ns_proofs.len(), block_height2 + 1 - block_height);
2279        assert_eq!(&ns_proofs[0].transactions, std::slice::from_ref(&txn));
2280        assert_eq!(
2281            &ns_proofs[ns_proofs.len() - 1].transactions,
2282            std::slice::from_ref(&txn2)
2283        );
2284        for proof in &ns_proofs[1..ns_proofs.len() - 1] {
2285            assert_eq!(proof.transactions, &[]);
2286        }
2287    }
2288
2289    #[rstest_reuse::apply(testable_sequencer_data_source)]
2290    pub(crate) async fn catchup_test_with_query_module<D: TestableSequencerDataSource>(
2291        _d: PhantomData<D>,
2292    ) {
2293        let storage = D::create_storage().await;
2294        catchup_test_helper(|opt| D::options(&storage, opt)).await
2295    }
2296
2297    #[rstest_reuse::apply(testable_sequencer_data_source)]
2298    pub async fn test_non_consecutive_decide_with_failing_event_consumer<D>(_d: PhantomData<D>)
2299    where
2300        D: TestableSequencerDataSource + Debug + 'static,
2301    {
2302        #[derive(Clone, Copy, Debug)]
2303        struct FailConsumer;
2304
2305        #[async_trait]
2306        impl EventConsumer for FailConsumer {
2307            async fn handle_event(&self, _: &Event<SeqTypes>) -> anyhow::Result<()> {
2308                bail!("mock error injection");
2309            }
2310        }
2311
2312        let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);
2313
2314        let storage = D::create_storage().await;
2315        let persistence = D::persistence_options(&storage).create().await.unwrap();
2316        let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
2317            Arc::new(StorageState::new(
2318                D::create(D::persistence_options(&storage), Default::default(), false)
2319                    .await
2320                    .unwrap(),
2321                ApiState::new(future::pending()),
2322            ));
2323
2324        // Create two non-consecutive leaf chains.
2325        let mut chain1 = vec![];
2326
2327        let genesis = Leaf2::genesis::<TestVersions>(&Default::default(), &NodeState::mock()).await;
2328        let payload = genesis.block_payload().unwrap();
2329        let payload_bytes_arc = payload.encode();
2330
2331        let avidm_param = init_avidm_param(2).unwrap();
2332        let weights = vec![1u32; 2];
2333
2334        let ns_table = parse_ns_table(payload.byte_len().as_usize(), &payload.ns_table().encode());
2335        let (payload_commitment, shares) =
2336            AvidMScheme::ns_disperse(&avidm_param, &weights, &payload_bytes_arc, ns_table).unwrap();
2337
2338        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2339            proposal: QuorumProposal2::<SeqTypes> {
2340                block_header: genesis.block_header().clone(),
2341                view_number: ViewNumber::genesis(),
2342                justify_qc: QuorumCertificate2::genesis::<MockSequencerVersions>(
2343                    &ValidatedState::default(),
2344                    &NodeState::mock(),
2345                )
2346                .await,
2347                upgrade_certificate: None,
2348                view_change_evidence: None,
2349                next_drb_result: None,
2350                next_epoch_justify_qc: None,
2351                epoch: None,
2352                state_cert: None,
2353            },
2354        };
2355        let mut qc = QuorumCertificate2::genesis::<MockSequencerVersions>(
2356            &ValidatedState::default(),
2357            &NodeState::mock(),
2358        )
2359        .await;
2360
2361        let mut justify_qc = qc.clone();
2362        for i in 0..5 {
2363            *quorum_proposal.proposal.block_header.height_mut() = i;
2364            quorum_proposal.proposal.view_number = ViewNumber::new(i);
2365            quorum_proposal.proposal.justify_qc = justify_qc;
2366            let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
2367            qc.view_number = leaf.view_number();
2368            qc.data.leaf_commit = Committable::commit(&leaf);
2369            justify_qc = qc.clone();
2370            chain1.push((leaf.clone(), CertificatePair::non_epoch_change(qc.clone())));
2371
2372            // Include a quorum proposal for each leaf.
2373            let quorum_proposal_signature =
2374                PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2375                    .expect("Failed to sign quorum_proposal");
2376            persistence
2377                .append_quorum_proposal2(&Proposal {
2378                    data: quorum_proposal.clone(),
2379                    signature: quorum_proposal_signature,
2380                    _pd: Default::default(),
2381                })
2382                .await
2383                .unwrap();
2384
2385            // Include VID information for each leaf.
2386            let share: VidDisperseShare<SeqTypes> = AvidMDisperseShare {
2387                view_number: leaf.view_number(),
2388                payload_commitment,
2389                share: shares[0].clone(),
2390                recipient_key: pubkey,
2391                epoch: Some(EpochNumber::new(0)),
2392                target_epoch: Some(EpochNumber::new(0)),
2393                common: avidm_param.clone(),
2394            }
2395            .into();
2396
2397            persistence
2398                .append_vid(&share.to_proposal(&privkey).unwrap())
2399                .await
2400                .unwrap();
2401
2402            // Include payload information for each leaf.
2403            let block_payload_signature =
2404                PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
2405            let da_proposal_inner = DaProposal2::<SeqTypes> {
2406                encoded_transactions: payload_bytes_arc.clone(),
2407                metadata: payload.ns_table().clone(),
2408                view_number: leaf.view_number(),
2409                epoch: Some(EpochNumber::new(0)),
2410                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
2411            };
2412            let da_proposal = Proposal {
2413                data: da_proposal_inner,
2414                signature: block_payload_signature,
2415                _pd: Default::default(),
2416            };
2417            persistence
2418                .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
2419                .await
2420                .unwrap();
2421        }
2422        // Split into two chains.
2423        let mut chain2 = chain1.split_off(2);
2424        // Make non-consecutive (i.e. we skip a leaf).
2425        chain2.remove(0);
2426
2427        // Decide 2 leaves, but fail in event processing.
2428        let leaf_chain = chain1
2429            .iter()
2430            .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2431            .collect::<Vec<_>>();
2432        tracing::info!("decide with event handling failure");
2433        persistence
2434            .append_decided_leaves(
2435                ViewNumber::new(1),
2436                leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2437                None,
2438                &FailConsumer,
2439            )
2440            .await
2441            .unwrap();
2442
2443        // Now decide remaining leaves successfully. We should now process a decide event for all
2444        // the leaves.
2445        let consumer = ApiEventConsumer::from(data_source.clone());
2446        let leaf_chain = chain2
2447            .iter()
2448            .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2449            .collect::<Vec<_>>();
2450        tracing::info!("decide successfully");
2451        persistence
2452            .append_decided_leaves(
2453                ViewNumber::new(4),
2454                leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2455                None,
2456                &consumer,
2457            )
2458            .await
2459            .unwrap();
2460
2461        // Check that the leaves were moved to archive storage, along with payload and VID
2462        // information.
2463        for (leaf, cert) in chain1.iter().chain(&chain2) {
2464            tracing::info!(height = leaf.height(), "check archive");
2465            let qd = data_source.get_leaf(leaf.height() as usize).await.await;
2466            let stored_leaf: Leaf2 = qd.leaf().clone();
2467            let stored_qc = qd.qc().clone();
2468            assert_eq!(&stored_leaf, leaf);
2469            assert_eq!(&stored_qc, cert.qc());
2470
2471            data_source
2472                .get_block(leaf.height() as usize)
2473                .await
2474                .try_resolve()
2475                .ok()
2476                .unwrap();
2477            data_source
2478                .get_vid_common(leaf.height() as usize)
2479                .await
2480                .try_resolve()
2481                .ok()
2482                .unwrap();
2483
2484            // Check that all data has been garbage collected for the decided views.
2485            assert!(persistence
2486                .load_da_proposal(leaf.view_number())
2487                .await
2488                .unwrap()
2489                .is_none());
2490            assert!(persistence
2491                .load_vid_share(leaf.view_number())
2492                .await
2493                .unwrap()
2494                .is_none());
2495            assert!(persistence
2496                .load_quorum_proposal(leaf.view_number())
2497                .await
2498                .is_err());
2499        }
2500
2501        // Check that data has _not_ been garbage collected for the missing view.
2502        assert!(persistence
2503            .load_da_proposal(ViewNumber::new(2))
2504            .await
2505            .unwrap()
2506            .is_some());
2507        assert!(persistence
2508            .load_vid_share(ViewNumber::new(2))
2509            .await
2510            .unwrap()
2511            .is_some());
2512        persistence
2513            .load_quorum_proposal(ViewNumber::new(2))
2514            .await
2515            .unwrap();
2516    }
2517
2518    #[rstest_reuse::apply(testable_sequencer_data_source)]
2519    pub async fn test_decide_missing_data<D>(_d: PhantomData<D>)
2520    where
2521        D: TestableSequencerDataSource + Debug + 'static,
2522    {
2523        let storage = D::create_storage().await;
2524        let persistence = D::persistence_options(&storage).create().await.unwrap();
2525        let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
2526            Arc::new(StorageState::new(
2527                D::create(D::persistence_options(&storage), Default::default(), false)
2528                    .await
2529                    .unwrap(),
2530                ApiState::new(future::pending()),
2531            ));
2532        let consumer = ApiEventConsumer::from(data_source.clone());
2533
2534        let mut qc = QuorumCertificate2::genesis::<MockSequencerVersions>(
2535            &ValidatedState::default(),
2536            &NodeState::mock(),
2537        )
2538        .await;
2539        let leaf =
2540            Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
2541
2542        // Append the genesis leaf. We don't use this for the test, because the update function will
2543        // automatically fill in the missing data for genesis. We just append this to get into a
2544        // consistent state to then append the leaf from view 1, which will have missing data.
2545        tracing::info!(?leaf, ?qc, "decide genesis leaf");
2546        persistence
2547            .append_decided_leaves(
2548                leaf.view_number(),
2549                [(
2550                    &leaf_info(leaf.clone()),
2551                    CertificatePair::non_epoch_change(qc.clone()),
2552                )],
2553                None,
2554                &consumer,
2555            )
2556            .await
2557            .unwrap();
2558
2559        // Create another leaf, with missing data.
2560        let mut block_header = leaf.block_header().clone();
2561        *block_header.height_mut() += 1;
2562        let qp = QuorumProposalWrapper {
2563            proposal: QuorumProposal2 {
2564                block_header,
2565                view_number: leaf.view_number() + 1,
2566                justify_qc: qc.clone(),
2567                upgrade_certificate: None,
2568                view_change_evidence: None,
2569                next_drb_result: None,
2570                next_epoch_justify_qc: None,
2571                epoch: None,
2572                state_cert: None,
2573            },
2574        };
2575
2576        let leaf = Leaf2::from_quorum_proposal(&qp);
2577        qc.view_number = leaf.view_number();
2578        qc.data.leaf_commit = Committable::commit(&leaf);
2579
2580        // Decide a leaf without the corresponding payload or VID.
2581        tracing::info!(?leaf, ?qc, "append leaf 1");
2582        persistence
2583            .append_decided_leaves(
2584                leaf.view_number(),
2585                [(
2586                    &leaf_info(leaf.clone()),
2587                    CertificatePair::non_epoch_change(qc),
2588                )],
2589                None,
2590                &consumer,
2591            )
2592            .await
2593            .unwrap();
2594
2595        // Check that we still processed the leaf.
2596        assert_eq!(leaf, data_source.get_leaf(1).await.await.leaf().clone());
2597        assert!(data_source.get_vid_common(1).await.is_pending());
2598        assert!(data_source.get_block(1).await.is_pending());
2599    }
2600
2601    fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
2602        LeafInfo {
2603            leaf,
2604            vid_share: None,
2605            state: Default::default(),
2606            delta: None,
2607            state_cert: None,
2608        }
2609    }
2610}
2611
2612#[cfg(test)]
2613mod test {
2614    use std::{
2615        collections::{HashMap, HashSet},
2616        str::FromStr,
2617        time::Duration,
2618    };
2619
2620    use ::light_client::{
2621        consensus::{
2622            header::HeaderProof,
2623            leaf::{LeafProof, LeafProofHint},
2624            payload::PayloadProof,
2625        },
2626        testing::{EpochChangeQuorum, LegacyVersion},
2627    };
2628    use alloy::{
2629        eips::BlockId,
2630        network::EthereumWallet,
2631        primitives::U256,
2632        providers::{Provider, ProviderBuilder},
2633    };
2634    use async_lock::Mutex;
2635    use committable::{Commitment, Committable};
2636    use espresso_contract_deployer::{
2637        builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
2638        upgrade_stake_table_v2, Contract, Contracts,
2639    };
2640    use espresso_types::{
2641        config::PublicHotShotConfig,
2642        traits::{MembershipPersistence, NullEventConsumer, PersistenceOptions},
2643        v0_3::{Fetcher, RewardAmount, RewardMerkleProofV1, COMMISSION_BASIS_POINTS},
2644        v0_4::{
2645            RewardAccountV2, RewardMerkleProofV2, RewardMerkleTreeV2, REWARD_MERKLE_TREE_V2_HEIGHT,
2646        },
2647        validators_from_l1_events, ADVZNamespaceProofQueryData, DrbAndHeaderUpgradeVersion,
2648        EpochVersion, FeeAmount, FeeVersion, Header, L1Client, L1ClientOptions,
2649        MockSequencerVersions, NamespaceId, NamespaceProofQueryData, NsProof, RewardDistributor,
2650        SequencerVersions, StakeTableState, StateCertQueryDataV1, StateCertQueryDataV2,
2651        ValidatedState,
2652    };
2653    use futures::{
2654        future::{self, join_all, try_join_all},
2655        stream::{StreamExt, TryStreamExt},
2656        try_join,
2657    };
2658    use hotshot::types::EventType;
2659    use hotshot_contract_adapter::{
2660        reward::RewardClaimInput,
2661        sol_types::{EspToken, StakeTableV2},
2662        stake_table::StakeTableContractVersion,
2663    };
2664    use hotshot_example_types::node_types::EpochsTestVersions;
2665    use hotshot_query_service::{
2666        availability::{
2667            BlockQueryData, BlockSummaryQueryData, LeafQueryData, TransactionQueryData,
2668            VidCommonQueryData,
2669        },
2670        data_source::{
2671            sql::Config,
2672            storage::{sql::query, SqlStorage, StorageConnectionType},
2673            Transaction as _, VersionedDataSource,
2674        },
2675        explorer::TransactionSummariesResponse,
2676        merklized_state::UpdateStateData,
2677        types::HeightIndexed,
2678    };
2679    use hotshot_types::{
2680        data::EpochNumber,
2681        event::LeafInfo,
2682        traits::{
2683            block_contents::BlockHeader, election::Membership, metrics::NoMetrics,
2684            node_implementation::ConsensusTime,
2685        },
2686        utils::epoch_from_block_number,
2687        ValidatorConfig,
2688    };
2689    use jf_merkle_tree_compat::{
2690        prelude::{MerkleProof, Sha3Node},
2691        MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
2692    };
2693    use portpicker::pick_unused_port;
2694    use pretty_assertions::assert_matches;
2695    use rand::seq::SliceRandom;
2696    use rstest::rstest;
2697    use staking_cli::{
2698        demo::DelegationConfig,
2699        registration::{fetch_commission, update_commission},
2700    };
2701    use surf_disco::Client;
2702    use test_helpers::{
2703        catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
2704        TestNetwork, TestNetworkConfigBuilder,
2705    };
2706    use tide_disco::{
2707        app::AppHealth, error::ServerError, healthcheck::HealthStatus, Error, StatusCode, Url,
2708    };
2709    use tokio::time::sleep;
2710    use vbs::version::{StaticVersion, StaticVersionType};
2711
2712    use self::{
2713        data_source::testing::TestableSequencerDataSource, options::HotshotEvents,
2714        sql::DataSource as SqlDataSource,
2715    };
2716    use super::*;
2717
2718    async fn wait_until_block_height(
2719        client: &Client<ServerError, StaticVersion<0, 1>>,
2720        endpoint: &str,
2721        height: u64,
2722    ) {
2723        const MAX_RETRIES: usize = 30;
2724
2725        for _retry in 0..=MAX_RETRIES {
2726            let bh = client
2727                .get::<u64>(endpoint)
2728                .send()
2729                .await
2730                .expect("block height not found");
2731
2732            if bh >= height {
2733                return;
2734            }
2735            sleep(Duration::from_secs(3)).await;
2736        }
2737
2738        panic!("Max retries reached. {endpoint} block height did not exceed {height}");
2739    }
2740    use crate::{
2741        api::{
2742            options::Query,
2743            sql::{impl_testable_data_source::tmp_options, reconstruct_state},
2744            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2745        },
2746        catchup::{NullStateCatchup, StatePeers},
2747        persistence,
2748        persistence::no_storage,
2749        testing::{wait_for_decide_on_handle, wait_for_epochs, TestConfig, TestConfigBuilder},
2750    };
2751
2752    type PosVersionV3 = SequencerVersions<StaticVersion<0, 3>, StaticVersion<0, 0>>;
2753    type PosVersionV4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
2754
2755    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2756    async fn test_healthcheck() {
2757        let port = pick_unused_port().expect("No ports free");
2758        let url = format!("http://localhost:{port}").parse().unwrap();
2759        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2760        let options = Options::with_port(port);
2761        let network_config = TestConfigBuilder::default().build();
2762        let config = TestNetworkConfigBuilder::<5, _, NullStateCatchup>::default()
2763            .api_config(options)
2764            .network_config(network_config)
2765            .build();
2766        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2767
2768        client.connect(None).await;
2769        let health = client.get::<AppHealth>("healthcheck").send().await.unwrap();
2770        assert_eq!(health.status, HealthStatus::Available);
2771    }
2772
2773    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2774    async fn status_test_without_query_module() {
2775        status_test_helper(|opt| opt).await
2776    }
2777
2778    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2779    async fn submit_test_without_query_module() {
2780        submit_test_helper(|opt| opt).await
2781    }
2782
2783    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2784    async fn state_signature_test_without_query_module() {
2785        state_signature_test_helper(|opt| opt).await
2786    }
2787
2788    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2789    async fn catchup_test_without_query_module() {
2790        catchup_test_helper(|opt| opt).await
2791    }
2792
2793    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2794    async fn test_leaf_only_data_source() {
2795        let port = pick_unused_port().expect("No ports free");
2796
2797        let storage = SqlDataSource::create_storage().await;
2798        let options =
2799            SqlDataSource::leaf_only_ds_options(&storage, Options::with_port(port)).unwrap();
2800
2801        let network_config = TestConfigBuilder::default().build();
2802        let config = TestNetworkConfigBuilder::default()
2803            .api_config(options)
2804            .network_config(network_config)
2805            .build();
2806        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2807        let url = format!("http://localhost:{port}").parse().unwrap();
2808        let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
2809
2810        tracing::info!("waiting for blocks");
2811        client.connect(Some(Duration::from_secs(15))).await;
2812        // Wait until some blocks have been decided.
2813
2814        let account = TestConfig::<5>::builder_key().fee_account();
2815
2816        let _headers = client
2817            .socket("availability/stream/headers/0")
2818            .subscribe::<Header>()
2819            .await
2820            .unwrap()
2821            .take(10)
2822            .try_collect::<Vec<_>>()
2823            .await
2824            .unwrap();
2825
2826        for i in 1..5 {
2827            let leaf = client
2828                .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{i}"))
2829                .send()
2830                .await
2831                .unwrap();
2832
2833            assert_eq!(leaf.height(), i);
2834
2835            let header = client
2836                .get::<Header>(&format!("availability/header/{i}"))
2837                .send()
2838                .await
2839                .unwrap();
2840
2841            assert_eq!(header.height(), i);
2842
2843            let vid = client
2844                .get::<VidCommonQueryData<SeqTypes>>(&format!("availability/vid/common/{i}"))
2845                .send()
2846                .await
2847                .unwrap();
2848
2849            assert_eq!(vid.height(), i);
2850
2851            client
2852                .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
2853                    "block-state/{i}/{}",
2854                    i - 1
2855                ))
2856                .send()
2857                .await
2858                .unwrap();
2859
2860            client
2861                .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
2862                    "fee-state/{}/{}",
2863                    i + 1,
2864                    account
2865                ))
2866                .send()
2867                .await
2868                .unwrap();
2869        }
2870
2871        // This would fail even though we have processed atleast 10 leaves
2872        // this is because light weight nodes only support leaves, headers and VID
2873        client
2874            .get::<BlockQueryData<SeqTypes>>("availability/block/1")
2875            .send()
2876            .await
2877            .unwrap_err();
2878    }
2879
2880    async fn run_catchup_test(url_suffix: &str) {
2881        // Start a sequencer network, using the query service for catchup.
2882        let port = pick_unused_port().expect("No ports free");
2883        const NUM_NODES: usize = 5;
2884
2885        let url: url::Url = format!("http://localhost:{port}{url_suffix}")
2886            .parse()
2887            .unwrap();
2888
2889        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2890            .api_config(Options::with_port(port))
2891            .network_config(TestConfigBuilder::default().build())
2892            .catchups(std::array::from_fn(|_| {
2893                StatePeers::<StaticVersion<0, 1>>::from_urls(
2894                    vec![url.clone()],
2895                    Default::default(),
2896                    &NoMetrics,
2897                )
2898            }))
2899            .build();
2900        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2901
2902        // Wait for replica 0 to reach a (non-genesis) decide, before disconnecting it.
2903        let mut events = network.peers[0].event_stream().await;
2904        loop {
2905            let event = events.next().await.unwrap();
2906            let EventType::Decide { leaf_chain, .. } = event.event else {
2907                continue;
2908            };
2909            if leaf_chain[0].leaf.height() > 0 {
2910                break;
2911            }
2912        }
2913
2914        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
2915        // drop the node and recreate it so it loses all of its temporary state and starts off from
2916        // genesis. It should be able to catch up by listening to proposals and then rebuild its
2917        // state from its peers.
2918        tracing::info!("shutting down node");
2919        network.peers.remove(0);
2920
2921        // Wait for a few blocks to pass while the node is down, so it falls behind.
2922        network
2923            .server
2924            .event_stream()
2925            .await
2926            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2927            .take(3)
2928            .collect::<Vec<_>>()
2929            .await;
2930
2931        tracing::info!("restarting node");
2932        let node = network
2933            .cfg
2934            .init_node(
2935                1,
2936                ValidatedState::default(),
2937                no_storage::Options,
2938                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
2939                    vec![url],
2940                    Default::default(),
2941                    &NoMetrics,
2942                )),
2943                None,
2944                &NoMetrics,
2945                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2946                NullEventConsumer,
2947                MockSequencerVersions::new(),
2948                Default::default(),
2949            )
2950            .await;
2951        let mut events = node.event_stream().await;
2952
2953        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
2954        // caught up and all nodes are in sync.
2955        let mut proposers = [false; NUM_NODES];
2956        loop {
2957            let event = events.next().await.unwrap();
2958            let EventType::Decide { leaf_chain, .. } = event.event else {
2959                continue;
2960            };
2961            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
2962                let height = leaf.height();
2963                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
2964                if height == 0 {
2965                    continue;
2966                }
2967
2968                tracing::info!(
2969                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
2970                );
2971                proposers[leaf_builder] = true;
2972            }
2973
2974            if proposers.iter().all(|has_proposed| *has_proposed) {
2975                break;
2976            }
2977        }
2978    }
2979
2980    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2981    async fn test_catchup() {
2982        run_catchup_test("").await;
2983    }
2984
2985    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2986    async fn test_catchup_v0() {
2987        run_catchup_test("/v0").await;
2988    }
2989
2990    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2991    async fn test_catchup_v1() {
2992        run_catchup_test("/v1").await;
2993    }
2994
2995    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2996    async fn test_catchup_no_state_peers() {
2997        // Start a sequencer network, using the query service for catchup.
2998        let port = pick_unused_port().expect("No ports free");
2999        const NUM_NODES: usize = 5;
3000        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3001            .api_config(Options::with_port(port))
3002            .network_config(TestConfigBuilder::default().build())
3003            .build();
3004        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3005
3006        // Wait for replica 0 to reach a (non-genesis) decide, before disconnecting it.
3007        let mut events = network.peers[0].event_stream().await;
3008        loop {
3009            let event = events.next().await.unwrap();
3010            let EventType::Decide { leaf_chain, .. } = event.event else {
3011                continue;
3012            };
3013            if leaf_chain[0].leaf.height() > 0 {
3014                break;
3015            }
3016        }
3017
3018        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
3019        // drop the node and recreate it so it loses all of its temporary state and starts off from
3020        // genesis. It should be able to catch up by listening to proposals and then rebuild its
3021        // state from its peers.
3022        tracing::info!("shutting down node");
3023        network.peers.remove(0);
3024
3025        // Wait for a few blocks to pass while the node is down, so it falls behind.
3026        network
3027            .server
3028            .event_stream()
3029            .await
3030            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3031            .take(3)
3032            .collect::<Vec<_>>()
3033            .await;
3034
3035        tracing::info!("restarting node");
3036        let node = network
3037            .cfg
3038            .init_node(
3039                1,
3040                ValidatedState::default(),
3041                no_storage::Options,
3042                None::<NullStateCatchup>,
3043                None,
3044                &NoMetrics,
3045                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3046                NullEventConsumer,
3047                MockSequencerVersions::new(),
3048                Default::default(),
3049            )
3050            .await;
3051        let mut events = node.event_stream().await;
3052
3053        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
3054        // caught up and all nodes are in sync.
3055        let mut proposers = [false; NUM_NODES];
3056        loop {
3057            let event = events.next().await.unwrap();
3058            let EventType::Decide { leaf_chain, .. } = event.event else {
3059                continue;
3060            };
3061            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3062                let height = leaf.height();
3063                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3064                if height == 0 {
3065                    continue;
3066                }
3067
3068                tracing::info!(
3069                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3070                );
3071                proposers[leaf_builder] = true;
3072            }
3073
3074            if proposers.iter().all(|has_proposed| *has_proposed) {
3075                break;
3076            }
3077        }
3078    }
3079
3080    #[ignore]
3081    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3082    async fn test_catchup_epochs_no_state_peers() {
3083        // Start a sequencer network, using the query service for catchup.
3084        let port = pick_unused_port().expect("No ports free");
3085        const EPOCH_HEIGHT: u64 = 5;
3086        let network_config = TestConfigBuilder::default()
3087            .epoch_height(EPOCH_HEIGHT)
3088            .build();
3089        const NUM_NODES: usize = 5;
3090        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3091            .api_config(Options::with_port(port))
3092            .network_config(network_config)
3093            .build();
3094        let mut network = TestNetwork::new(config, EpochsTestVersions {}).await;
3095
3096        // Wait for replica 0 to decide in the third epoch.
3097        let mut events = network.peers[0].event_stream().await;
3098        loop {
3099            let event = events.next().await.unwrap();
3100            let EventType::Decide { leaf_chain, .. } = event.event else {
3101                continue;
3102            };
3103            tracing::error!("got decide height {}", leaf_chain[0].leaf.height());
3104
3105            if leaf_chain[0].leaf.height() > EPOCH_HEIGHT * 3 {
3106                tracing::error!("decided past one epoch");
3107                break;
3108            }
3109        }
3110
3111        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
3112        // drop the node and recreate it so it loses all of its temporary state and starts off from
3113        // genesis. It should be able to catch up by listening to proposals and then rebuild its
3114        // state from its peers.
3115        tracing::info!("shutting down node");
3116        network.peers.remove(0);
3117
3118        // Wait for a few blocks to pass while the node is down, so it falls behind.
3119        network
3120            .server
3121            .event_stream()
3122            .await
3123            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3124            .take(3)
3125            .collect::<Vec<_>>()
3126            .await;
3127
3128        tracing::error!("restarting node");
3129        let node = network
3130            .cfg
3131            .init_node(
3132                1,
3133                ValidatedState::default(),
3134                no_storage::Options,
3135                None::<NullStateCatchup>,
3136                None,
3137                &NoMetrics,
3138                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3139                NullEventConsumer,
3140                MockSequencerVersions::new(),
3141                Default::default(),
3142            )
3143            .await;
3144        let mut events = node.event_stream().await;
3145
3146        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
3147        // caught up and all nodes are in sync.
3148        let mut proposers = [false; NUM_NODES];
3149        loop {
3150            let event = events.next().await.unwrap();
3151            let EventType::Decide { leaf_chain, .. } = event.event else {
3152                continue;
3153            };
3154            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3155                let height = leaf.height();
3156                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3157                if height == 0 {
3158                    continue;
3159                }
3160
3161                tracing::info!(
3162                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3163                );
3164                proposers[leaf_builder] = true;
3165            }
3166
3167            if proposers.iter().all(|has_proposed| *has_proposed) {
3168                break;
3169            }
3170        }
3171    }
3172
3173    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3174    async fn test_chain_config_from_instance() {
3175        // This test uses a ValidatedState which only has the default chain config commitment.
3176        // The NodeState has the full chain config.
3177        // Both chain config commitments will match, so the ValidatedState should have the
3178        // full chain config after a non-genesis block is decided.
3179
3180        let port = pick_unused_port().expect("No ports free");
3181
3182        let chain_config: ChainConfig = ChainConfig::default();
3183
3184        let state = ValidatedState {
3185            chain_config: chain_config.commit().into(),
3186            ..Default::default()
3187        };
3188
3189        let states = std::array::from_fn(|_| state.clone());
3190
3191        let config = TestNetworkConfigBuilder::default()
3192            .api_config(Options::with_port(port))
3193            .states(states)
3194            .catchups(std::array::from_fn(|_| {
3195                StatePeers::<StaticVersion<0, 1>>::from_urls(
3196                    vec![format!("http://localhost:{port}").parse().unwrap()],
3197                    Default::default(),
3198                    &NoMetrics,
3199                )
3200            }))
3201            .network_config(TestConfigBuilder::default().build())
3202            .build();
3203
3204        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3205
3206        // Wait for few blocks to be decided.
3207        network
3208            .server
3209            .event_stream()
3210            .await
3211            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3212            .take(3)
3213            .collect::<Vec<_>>()
3214            .await;
3215
3216        for peer in &network.peers {
3217            let state = peer.consensus().read().await.decided_state().await;
3218
3219            assert_eq!(state.chain_config.resolve().unwrap(), chain_config)
3220        }
3221
3222        network.server.shut_down().await;
3223        drop(network);
3224    }
3225
3226    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3227    async fn test_chain_config_catchup() {
3228        // This test uses a ValidatedState with a non-default chain config
3229        // so it will be different from the NodeState chain config used by the TestNetwork.
3230        // However, for this test to work, at least one node should have a full chain config
3231        // to allow other nodes to catch up.
3232
3233        let port = pick_unused_port().expect("No ports free");
3234
3235        let cf = ChainConfig {
3236            max_block_size: 300.into(),
3237            base_fee: 1.into(),
3238            ..Default::default()
3239        };
3240
3241        // State1 contains only the chain config commitment
3242        let state1 = ValidatedState {
3243            chain_config: cf.commit().into(),
3244            ..Default::default()
3245        };
3246
3247        //state 2 contains the full chain config
3248        let state2 = ValidatedState {
3249            chain_config: cf.into(),
3250            ..Default::default()
3251        };
3252
3253        let mut states = std::array::from_fn(|_| state1.clone());
3254        // only one node has the full chain config
3255        // all the other nodes should do a catchup to get the full chain config from peer 0
3256        states[0] = state2;
3257
3258        const NUM_NODES: usize = 5;
3259        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3260            .api_config(Options::from(options::Http {
3261                port,
3262                max_connections: None,
3263            }))
3264            .states(states)
3265            .catchups(std::array::from_fn(|_| {
3266                StatePeers::<StaticVersion<0, 1>>::from_urls(
3267                    vec![format!("http://localhost:{port}").parse().unwrap()],
3268                    Default::default(),
3269                    &NoMetrics,
3270                )
3271            }))
3272            .network_config(TestConfigBuilder::default().build())
3273            .build();
3274
3275        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3276
3277        // Wait for a few blocks to be decided.
3278        network
3279            .server
3280            .event_stream()
3281            .await
3282            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3283            .take(3)
3284            .collect::<Vec<_>>()
3285            .await;
3286
3287        for peer in &network.peers {
3288            let state = peer.consensus().read().await.decided_state().await;
3289
3290            assert_eq!(state.chain_config.resolve().unwrap(), cf)
3291        }
3292
3293        network.server.shut_down().await;
3294        drop(network);
3295    }
3296
3297    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3298    async fn test_pos_upgrade_view_based() {
3299        type PosUpgrade = SequencerVersions<FeeVersion, EpochVersion>;
3300        test_upgrade_helper::<PosUpgrade>(PosUpgrade::new()).await;
3301    }
3302
3303    async fn test_upgrade_helper<V: Versions>(version: V) {
3304        // wait this number of views beyond the configured first view
3305        // before asserting anything.
3306        let wait_extra_views = 10;
3307        // Number of nodes running in the test network.
3308        const NUM_NODES: usize = 5;
3309        let upgrade_version = <V as Versions>::Upgrade::VERSION;
3310        let port = pick_unused_port().expect("No ports free");
3311
3312        let test_config = TestConfigBuilder::default()
3313            .epoch_height(200)
3314            .epoch_start_block(321)
3315            .set_upgrades(upgrade_version)
3316            .await
3317            .build();
3318
3319        let chain_config_genesis = ValidatedState::default().chain_config.resolve().unwrap();
3320        let chain_config_upgrade = test_config.get_upgrade_map().chain_config(upgrade_version);
3321        assert_ne!(chain_config_genesis, chain_config_upgrade);
3322        tracing::debug!(?chain_config_genesis, ?chain_config_upgrade);
3323
3324        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3325            .api_config(Options::from(options::Http {
3326                port,
3327                max_connections: None,
3328            }))
3329            .catchups(std::array::from_fn(|_| {
3330                StatePeers::<SequencerApiVersion>::from_urls(
3331                    vec![format!("http://localhost:{port}").parse().unwrap()],
3332                    Default::default(),
3333                    &NoMetrics,
3334                )
3335            }))
3336            .network_config(test_config)
3337            .build();
3338
3339        let mut network = TestNetwork::new(config, version).await;
3340        let mut events = network.server.event_stream().await;
3341
3342        // First loop to get an `UpgradeProposal`. Note that the
3343        // actual upgrade will take several to many subsequent views for
3344        // voting and finally the actual upgrade.
3345        let upgrade = loop {
3346            let event = events.next().await.unwrap();
3347            match event.event {
3348                EventType::UpgradeProposal { proposal, .. } => {
3349                    tracing::info!(?proposal, "proposal");
3350                    let upgrade = proposal.data.upgrade_proposal;
3351                    let new_version = upgrade.new_version;
3352                    tracing::info!(?new_version, "upgrade proposal new version");
3353                    assert_eq!(new_version, <V as Versions>::Upgrade::VERSION);
3354                    break upgrade;
3355                },
3356                _ => continue,
3357            }
3358        };
3359
3360        let wanted_view = upgrade.new_version_first_view + wait_extra_views;
3361        // Loop until we get the `new_version_first_view`, then test the upgrade.
3362        loop {
3363            let event = events.next().await.unwrap();
3364            let view_number = event.view_number;
3365
3366            tracing::debug!(?view_number, ?upgrade.new_version_first_view, "upgrade_new_view");
3367            if view_number > wanted_view {
3368                tracing::info!(?view_number, ?upgrade.new_version_first_view, "passed upgrade view");
3369                let states = join_all(
3370                    network
3371                        .peers
3372                        .iter()
3373                        .map(|peer| async { peer.consensus().read().await.decided_state().await }),
3374                )
3375                .await;
3376                let leaves = join_all(
3377                    network
3378                        .peers
3379                        .iter()
3380                        .map(|peer| async { peer.consensus().read().await.decided_leaf().await }),
3381                )
3382                .await;
3383                let configs: Vec<ChainConfig> = states
3384                    .iter()
3385                    .map(|state| state.chain_config.resolve().unwrap())
3386                    .collect();
3387
3388                tracing::info!(?leaves, ?configs, "post upgrade state");
3389                for config in configs {
3390                    assert_eq!(config, chain_config_upgrade);
3391                }
3392                for leaf in leaves {
3393                    assert_eq!(leaf.block_header().version(), upgrade_version);
3394                }
3395                break;
3396            }
3397            sleep(Duration::from_millis(200)).await;
3398        }
3399
3400        network.server.shut_down().await;
3401    }
3402
3403    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3404    pub(crate) async fn test_restart() {
3405        const NUM_NODES: usize = 5;
3406        // Initialize nodes.
3407        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3408        let persistence: [_; NUM_NODES] = storage
3409            .iter()
3410            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3411            .collect::<Vec<_>>()
3412            .try_into()
3413            .unwrap();
3414        let port = pick_unused_port().unwrap();
3415        let config = TestNetworkConfigBuilder::default()
3416            .api_config(SqlDataSource::options(
3417                &storage[0],
3418                Options::with_port(port),
3419            ))
3420            .persistences(persistence.clone())
3421            .network_config(TestConfigBuilder::default().build())
3422            .build();
3423        let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3424
3425        // Connect client.
3426        let client: Client<ServerError, SequencerApiVersion> =
3427            Client::new(format!("http://localhost:{port}").parse().unwrap());
3428        client.connect(None).await;
3429        tracing::info!(port, "server running");
3430
3431        // Wait until some blocks have been decided.
3432        client
3433            .socket("availability/stream/blocks/0")
3434            .subscribe::<BlockQueryData<SeqTypes>>()
3435            .await
3436            .unwrap()
3437            .take(3)
3438            .collect::<Vec<_>>()
3439            .await;
3440
3441        // Shut down the consensus nodes.
3442        tracing::info!("shutting down nodes");
3443        network.stop_consensus().await;
3444
3445        // Get the block height we reached.
3446        let height = client
3447            .get::<usize>("status/block-height")
3448            .send()
3449            .await
3450            .unwrap();
3451        tracing::info!("decided {height} blocks before shutting down");
3452
3453        // Get the decided chain, so we can check consistency after the restart.
3454        let chain: Vec<LeafQueryData<SeqTypes>> = client
3455            .socket("availability/stream/leaves/0")
3456            .subscribe()
3457            .await
3458            .unwrap()
3459            .take(height)
3460            .try_collect()
3461            .await
3462            .unwrap();
3463        let decided_view = chain.last().unwrap().leaf().view_number();
3464
3465        // Get the most recent state, for catchup.
3466
3467        let state = network.server.decided_state().await;
3468        tracing::info!(?decided_view, ?state, "consensus state");
3469
3470        // Fully shut down the API servers.
3471        drop(network);
3472
3473        // Start up again, resuming from the last decided leaf.
3474        let port = pick_unused_port().expect("No ports free");
3475
3476        let config = TestNetworkConfigBuilder::default()
3477            .api_config(SqlDataSource::options(
3478                &storage[0],
3479                Options::with_port(port),
3480            ))
3481            .persistences(persistence)
3482            .catchups(std::array::from_fn(|_| {
3483                // Catchup using node 0 as a peer. Node 0 was running the archival state service
3484                // before the restart, so it should be able to resume without catching up by loading
3485                // state from storage.
3486                StatePeers::<StaticVersion<0, 1>>::from_urls(
3487                    vec![format!("http://localhost:{port}").parse().unwrap()],
3488                    Default::default(),
3489                    &NoMetrics,
3490                )
3491            }))
3492            .network_config(TestConfigBuilder::default().build())
3493            .build();
3494        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3495        let client: Client<ServerError, StaticVersion<0, 1>> =
3496            Client::new(format!("http://localhost:{port}").parse().unwrap());
3497        client.connect(None).await;
3498        tracing::info!(port, "server running");
3499
3500        // Make sure we can decide new blocks after the restart.
3501        tracing::info!("waiting for decide, height {height}");
3502        let new_leaf: LeafQueryData<SeqTypes> = client
3503            .socket(&format!("availability/stream/leaves/{height}"))
3504            .subscribe()
3505            .await
3506            .unwrap()
3507            .next()
3508            .await
3509            .unwrap()
3510            .unwrap();
3511        assert_eq!(new_leaf.height(), height as u64);
3512        assert_eq!(
3513            new_leaf.leaf().parent_commitment(),
3514            chain[height - 1].hash()
3515        );
3516
3517        // Ensure the new chain is consistent with the old chain.
3518        let new_chain: Vec<LeafQueryData<SeqTypes>> = client
3519            .socket("availability/stream/leaves/0")
3520            .subscribe()
3521            .await
3522            .unwrap()
3523            .take(height)
3524            .try_collect()
3525            .await
3526            .unwrap();
3527        assert_eq!(chain, new_chain);
3528    }
3529
3530    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3531    async fn test_fetch_config() {
3532        let port = pick_unused_port().expect("No ports free");
3533        let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap();
3534        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url.clone());
3535
3536        let options = Options::with_port(port).config(Default::default());
3537        let network_config = TestConfigBuilder::default().build();
3538        let config = TestNetworkConfigBuilder::default()
3539            .api_config(options)
3540            .network_config(network_config)
3541            .build();
3542        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3543        client.connect(None).await;
3544
3545        // Fetch a network config from the API server. The first peer URL is bogus, to test the
3546        // failure/retry case.
3547        let peers = StatePeers::<StaticVersion<0, 1>>::from_urls(
3548            vec!["https://notarealnode.network".parse().unwrap(), url],
3549            Default::default(),
3550            &NoMetrics,
3551        );
3552
3553        // Fetch the config from node 1, a different node than the one running the service.
3554        let validator =
3555            ValidatorConfig::generated_from_seed_indexed([0; 32], 1, U256::from(1), false);
3556        let config = peers.fetch_config(validator.clone()).await.unwrap();
3557
3558        // Check the node-specific information in the recovered config is correct.
3559        assert_eq!(config.node_index, 1);
3560
3561        // Check the public information is also correct (with respect to the node that actually
3562        // served the config, for public keys).
3563        pretty_assertions::assert_eq!(
3564            serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(),
3565            serde_json::to_value(PublicHotShotConfig::from(
3566                network.cfg.hotshot_config().clone()
3567            ))
3568            .unwrap()
3569        );
3570    }
3571
3572    async fn run_hotshot_event_streaming_test(url_suffix: &str) {
3573        let query_service_port = pick_unused_port().expect("No ports free for query service");
3574
3575        let url = format!("http://localhost:{query_service_port}{url_suffix}")
3576            .parse()
3577            .unwrap();
3578
3579        let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3580
3581        let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3582
3583        let network_config = TestConfigBuilder::default().build();
3584        let config = TestNetworkConfigBuilder::default()
3585            .api_config(options)
3586            .network_config(network_config)
3587            .build();
3588        let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3589
3590        let mut subscribed_events = client
3591            .socket("hotshot-events/events")
3592            .subscribe::<Event<SeqTypes>>()
3593            .await
3594            .unwrap();
3595
3596        let total_count = 5;
3597        // wait for these events to receive on client 1
3598        let mut receive_count = 0;
3599        loop {
3600            let event = subscribed_events.next().await.unwrap();
3601            tracing::info!("Received event in hotshot event streaming Client 1: {event:?}");
3602            receive_count += 1;
3603            if receive_count > total_count {
3604                tracing::info!("Client Received at least desired events, exiting loop");
3605                break;
3606            }
3607        }
3608        assert_eq!(receive_count, total_count + 1);
3609    }
3610
3611    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3612    async fn test_hotshot_event_streaming_v0() {
3613        run_hotshot_event_streaming_test("/v0").await;
3614    }
3615
3616    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3617    async fn test_hotshot_event_streaming_v1() {
3618        run_hotshot_event_streaming_test("/v1").await;
3619    }
3620
3621    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3622    async fn test_hotshot_event_streaming() {
3623        run_hotshot_event_streaming_test("").await;
3624    }
3625
3626    // TODO when `EpochVersion` becomes base version we can merge this
3627    // w/ above test.
3628    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3629    async fn test_hotshot_event_streaming_epoch_progression() {
3630        let epoch_height = 35;
3631        let wanted_epochs = 4;
3632
3633        let network_config = TestConfigBuilder::default()
3634            .epoch_height(epoch_height)
3635            .build();
3636
3637        let query_service_port = pick_unused_port().expect("No ports free for query service");
3638
3639        let hotshot_url = format!("http://localhost:{query_service_port}")
3640            .parse()
3641            .unwrap();
3642
3643        let client: Client<ServerError, SequencerApiVersion> = Client::new(hotshot_url);
3644        let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3645
3646        let config = TestNetworkConfigBuilder::default()
3647            .api_config(options)
3648            .network_config(network_config.clone())
3649            .pos_hook::<PosVersionV3>(DelegationConfig::VariableAmounts, Default::default())
3650            .await
3651            .expect("Pos Deployment")
3652            .build();
3653
3654        let _network = TestNetwork::new(config, PosVersionV3::new()).await;
3655
3656        let mut subscribed_events = client
3657            .socket("hotshot-events/events")
3658            .subscribe::<Event<SeqTypes>>()
3659            .await
3660            .unwrap();
3661
3662        let wanted_views = epoch_height * wanted_epochs;
3663
3664        let mut views = HashSet::new();
3665        let mut epochs = HashSet::new();
3666        for _ in 0..=600 {
3667            let event = subscribed_events.next().await.unwrap();
3668            let event = event.unwrap();
3669            let view_number = event.view_number;
3670            views.insert(view_number.u64());
3671
3672            if let hotshot::types::EventType::Decide { committing_qc, .. } = event.event {
3673                assert!(committing_qc.epoch().is_some(), "epochs are live");
3674                assert!(committing_qc.block_number().is_some());
3675
3676                let epoch = committing_qc.epoch().unwrap().u64();
3677                epochs.insert(epoch);
3678
3679                tracing::debug!(
3680                    "Got decide: epoch: {:?}, block: {:?} ",
3681                    epoch,
3682                    committing_qc.block_number()
3683                );
3684
3685                let expected_epoch =
3686                    epoch_from_block_number(committing_qc.block_number().unwrap(), epoch_height);
3687                tracing::debug!("expected epoch: {expected_epoch}, qc epoch: {epoch}");
3688
3689                assert_eq!(expected_epoch, epoch);
3690            }
3691            if views.contains(&wanted_views) {
3692                tracing::info!("Client Received at least desired views, exiting loop");
3693                break;
3694            }
3695        }
3696
3697        // prevent false positive when we overflow the range
3698        assert!(views.contains(&wanted_views), "Views are not progressing");
3699        assert!(
3700            epochs.contains(&wanted_epochs),
3701            "Epochs are not progressing"
3702        );
3703    }
3704
3705    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3706    async fn test_pos_rewards_basic() -> anyhow::Result<()> {
3707        // Basic PoS rewards test:
3708        // - Sets up a single validator and a single delegator (the node itself).
3709        // - Sets the number of blocks in each epoch to 20.
3710        // - Rewards begin applying from block 41 (i.e., the start of the 3rd epoch).
3711        // - Since the validator is also the delegator, it receives the full reward.
3712        // - Verifies that the reward at block height 60 matches the expected amount.
3713        let epoch_height = 20;
3714
3715        let network_config = TestConfigBuilder::default()
3716            .epoch_height(epoch_height)
3717            .build();
3718
3719        let api_port = pick_unused_port().expect("No ports free for query service");
3720
3721        const NUM_NODES: usize = 1;
3722        // Initialize nodes.
3723        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3724        let persistence: [_; NUM_NODES] = storage
3725            .iter()
3726            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3727            .collect::<Vec<_>>()
3728            .try_into()
3729            .unwrap();
3730
3731        let config = TestNetworkConfigBuilder::with_num_nodes()
3732            .api_config(SqlDataSource::options(
3733                &storage[0],
3734                Options::with_port(api_port),
3735            ))
3736            .network_config(network_config.clone())
3737            .persistences(persistence.clone())
3738            .catchups(std::array::from_fn(|_| {
3739                StatePeers::<StaticVersion<0, 1>>::from_urls(
3740                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
3741                    Default::default(),
3742                    &NoMetrics,
3743                )
3744            }))
3745            .pos_hook::<PosVersionV3>(DelegationConfig::VariableAmounts, Default::default())
3746            .await
3747            .unwrap()
3748            .build();
3749
3750        let network = TestNetwork::new(config, PosVersionV3::new()).await;
3751        let client: Client<ServerError, SequencerApiVersion> =
3752            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3753
3754        // first two epochs will be 1 and 2
3755        // rewards are distributed starting third epoch
3756        // third epoch starts from block 40 as epoch height is 20
3757        // wait for atleast 65 blocks
3758        let _blocks = client
3759            .socket("availability/stream/blocks/0")
3760            .subscribe::<BlockQueryData<SeqTypes>>()
3761            .await
3762            .unwrap()
3763            .take(65)
3764            .try_collect::<Vec<_>>()
3765            .await
3766            .unwrap();
3767
3768        let staking_priv_keys = network_config.staking_priv_keys();
3769        let account = staking_priv_keys[0].0.clone();
3770        let address = account.address();
3771
3772        let block_height = 60;
3773
3774        // get the validator address balance at block height 60
3775        let amount = client
3776            .get::<Option<RewardAmount>>(&format!(
3777                "reward-state/reward-balance/{block_height}/{address}"
3778            ))
3779            .send()
3780            .await
3781            .unwrap()
3782            .unwrap();
3783
3784        tracing::info!("amount={amount:?}");
3785
3786        let epoch_start_block = 40;
3787
3788        let node_state = network.server.node_state();
3789        let membership = node_state.coordinator.membership().read().await;
3790        let block_reward = membership
3791            .fixed_block_reward()
3792            .expect("block reward is not None");
3793        drop(membership);
3794
3795        // The validator gets all the block reward so we can calculate the expected amount
3796        let expected_amount = block_reward.0 * (U256::from(block_height - epoch_start_block));
3797
3798        assert_eq!(amount.0, expected_amount, "reward amount don't match");
3799
3800        Ok(())
3801    }
3802
3803    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3804    async fn test_cumulative_pos_rewards() -> anyhow::Result<()> {
3805        // This test registers 5 validators and multiple delegators for each validator.
3806        // One of the delegators is also a validator.
3807        // The test verifies that the cumulative reward at each block height equals
3808        // the total block reward, which is a constant.
3809
3810        let epoch_height = 20;
3811
3812        let network_config = TestConfigBuilder::default()
3813            .epoch_height(epoch_height)
3814            .build();
3815
3816        let api_port = pick_unused_port().expect("No ports free for query service");
3817
3818        const NUM_NODES: usize = 5;
3819        // Initialize nodes.
3820        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3821        let persistence: [_; NUM_NODES] = storage
3822            .iter()
3823            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3824            .collect::<Vec<_>>()
3825            .try_into()
3826            .unwrap();
3827
3828        let config = TestNetworkConfigBuilder::with_num_nodes()
3829            .api_config(SqlDataSource::options(
3830                &storage[0],
3831                Options::with_port(api_port),
3832            ))
3833            .network_config(network_config)
3834            .persistences(persistence.clone())
3835            .catchups(std::array::from_fn(|_| {
3836                StatePeers::<StaticVersion<0, 1>>::from_urls(
3837                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
3838                    Default::default(),
3839                    &NoMetrics,
3840                )
3841            }))
3842            .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3843            .await
3844            .unwrap()
3845            .build();
3846
3847        let network = TestNetwork::new(config, PosVersionV3::new()).await;
3848        let node_state = network.server.node_state();
3849        let membership = node_state.coordinator.membership().read().await;
3850        let block_reward = membership
3851            .fixed_block_reward()
3852            .expect("block reward is not None");
3853        drop(membership);
3854        let client: Client<ServerError, SequencerApiVersion> =
3855            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3856
3857        // wait for atleast 75 blocks
3858        let _blocks = client
3859            .socket("availability/stream/blocks/0")
3860            .subscribe::<BlockQueryData<SeqTypes>>()
3861            .await
3862            .unwrap()
3863            .take(75)
3864            .try_collect::<Vec<_>>()
3865            .await
3866            .unwrap();
3867
3868        // We are going to check cumulative blocks from block height 40 to 67
3869        // Basically epoch 3 and epoch 4 as epoch height is 20
3870        // get all the validators
3871        let validators = client
3872            .get::<ValidatorMap>("node/validators/3")
3873            .send()
3874            .await
3875            .expect("failed to get validator");
3876
3877        // insert all the address in a map
3878        // We will query the reward-balance at each block height for all the addresses
3879        // We don't know which validator was the leader because we don't have access to Membership
3880        let mut addresses = HashSet::new();
3881        for v in validators.values() {
3882            addresses.insert(v.account);
3883            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3884        }
3885        // get all the validators
3886        let validators = client
3887            .get::<ValidatorMap>("node/validators/4")
3888            .send()
3889            .await
3890            .expect("failed to get validator");
3891        for v in validators.values() {
3892            addresses.insert(v.account);
3893            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3894        }
3895
3896        let mut prev_cumulative_amount = U256::ZERO;
3897        // Check Cumulative rewards for epoch 3
3898        // i.e block height 41 to 59
3899        for block in 41..=67 {
3900            let mut cumulative_amount = U256::ZERO;
3901            for address in addresses.clone() {
3902                let amount = client
3903                    .get::<Option<RewardAmount>>(&format!(
3904                        "reward-state/reward-balance/{block}/{address}"
3905                    ))
3906                    .send()
3907                    .await
3908                    .ok()
3909                    .flatten();
3910
3911                if let Some(amount) = amount {
3912                    tracing::info!("address={address}, amount={amount}");
3913                    cumulative_amount += amount.0;
3914                };
3915            }
3916
3917            // assert cumulative reward is equal to block reward
3918            assert_eq!(cumulative_amount - prev_cumulative_amount, block_reward.0);
3919            tracing::info!("cumulative_amount is correct for block={block}");
3920            prev_cumulative_amount = cumulative_amount;
3921        }
3922
3923        Ok(())
3924    }
3925
3926    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3927    async fn test_stake_table_duplicate_events_from_contract() -> anyhow::Result<()> {
3928        // TODO(abdul): This test currently uses TestNetwork only for contract deployment and for L1 block number.
3929        // Once the stake table deployment logic is refactored and isolated, TestNetwork here will be unnecessary
3930
3931        let epoch_height = 20;
3932
3933        let network_config = TestConfigBuilder::default()
3934            .epoch_height(epoch_height)
3935            .build();
3936
3937        let api_port = pick_unused_port().expect("No ports free for query service");
3938
3939        const NUM_NODES: usize = 5;
3940        // Initialize nodes.
3941        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3942        let persistence: [_; NUM_NODES] = storage
3943            .iter()
3944            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3945            .collect::<Vec<_>>()
3946            .try_into()
3947            .unwrap();
3948
3949        let l1_url = network_config.l1_url();
3950        let config = TestNetworkConfigBuilder::with_num_nodes()
3951            .api_config(SqlDataSource::options(
3952                &storage[0],
3953                Options::with_port(api_port),
3954            ))
3955            .network_config(network_config)
3956            .persistences(persistence.clone())
3957            .catchups(std::array::from_fn(|_| {
3958                StatePeers::<StaticVersion<0, 1>>::from_urls(
3959                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
3960                    Default::default(),
3961                    &NoMetrics,
3962                )
3963            }))
3964            .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3965            .await
3966            .unwrap()
3967            .build();
3968
3969        let network = TestNetwork::new(config, PosVersionV3::new()).await;
3970
3971        let mut prev_st = None;
3972        let state = network.server.decided_state().await;
3973        let chain_config = state.chain_config.resolve().expect("resolve chain config");
3974        let stake_table = chain_config.stake_table_contract.unwrap();
3975
3976        let l1_client = L1ClientOptions::default()
3977            .connect(vec![l1_url])
3978            .expect("failed to connect to l1");
3979
3980        let client: Client<ServerError, SequencerApiVersion> =
3981            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3982
3983        let mut headers = client
3984            .socket("availability/stream/headers/0")
3985            .subscribe::<Header>()
3986            .await
3987            .unwrap();
3988
3989        let mut target_bh = 0;
3990        while let Some(header) = headers.next().await {
3991            let header = header.unwrap();
3992            println!("got header with height {}", header.height());
3993            if header.height() == 0 {
3994                continue;
3995            }
3996            let l1_block = header.l1_finalized().expect("l1 block not found");
3997
3998            let sorted_events = Fetcher::fetch_events_from_contract(
3999                l1_client.clone(),
4000                stake_table,
4001                None,
4002                l1_block.number(),
4003            )
4004            .await?;
4005
4006            let mut sorted_dedup_removed = sorted_events.clone();
4007            sorted_dedup_removed.dedup();
4008
4009            assert_eq!(
4010                sorted_events.len(),
4011                sorted_dedup_removed.len(),
4012                "duplicates found"
4013            );
4014
4015            // This also checks if there is a duplicate registration
4016            let stake_table =
4017                validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e)).unwrap();
4018            if let Some(prev_st) = prev_st {
4019                assert_eq!(stake_table, prev_st);
4020            }
4021
4022            prev_st = Some(stake_table);
4023
4024            if target_bh == 100 {
4025                break;
4026            }
4027
4028            target_bh = header.height();
4029        }
4030
4031        Ok(())
4032    }
4033
4034    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4035    async fn test_rewards_v3() -> anyhow::Result<()> {
4036        // The test registers multiple delegators for each validator
4037        // It verifies that no rewards are distributed in the first two epochs
4038        // and that rewards are correctly allocated starting from the third epoch.
4039        // also checks that the total stake of delegators matches the stake of the validator
4040        // and that the calculated rewards match those obtained via the merklized state api
4041        const EPOCH_HEIGHT: u64 = 20;
4042
4043        let network_config = TestConfigBuilder::default()
4044            .epoch_height(EPOCH_HEIGHT)
4045            .build();
4046
4047        let api_port = pick_unused_port().expect("No ports free for query service");
4048
4049        const NUM_NODES: usize = 7;
4050
4051        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4052        let persistence: [_; NUM_NODES] = storage
4053            .iter()
4054            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4055            .collect::<Vec<_>>()
4056            .try_into()
4057            .unwrap();
4058
4059        let config = TestNetworkConfigBuilder::with_num_nodes()
4060            .api_config(SqlDataSource::options(
4061                &storage[0],
4062                Options::with_port(api_port),
4063            ))
4064            .network_config(network_config)
4065            .persistences(persistence.clone())
4066            .catchups(std::array::from_fn(|_| {
4067                StatePeers::<StaticVersion<0, 1>>::from_urls(
4068                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4069                    Default::default(),
4070                    &NoMetrics,
4071                )
4072            }))
4073            .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
4074            .await
4075            .unwrap()
4076            .build();
4077
4078        let network = TestNetwork::new(config, PosVersionV3::new()).await;
4079        let client: Client<ServerError, SequencerApiVersion> =
4080            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4081
4082        // Wait for 3 epochs to allow rewards distribution to take effect.
4083        let mut events = network.peers[0].event_stream().await;
4084        while let Some(event) = events.next().await {
4085            if let EventType::Decide { leaf_chain, .. } = event.event {
4086                let height = leaf_chain[0].leaf.height();
4087                tracing::info!("Node 0 decided at height: {height}");
4088                if height > EPOCH_HEIGHT * 3 {
4089                    break;
4090                }
4091            }
4092        }
4093
4094        // Verify that there are no validators for epoch # 1 and epoch # 2
4095        {
4096            client
4097                .get::<ValidatorMap>("node/validators/1")
4098                .send()
4099                .await
4100                .unwrap()
4101                .is_empty();
4102
4103            client
4104                .get::<ValidatorMap>("node/validators/2")
4105                .send()
4106                .await
4107                .unwrap()
4108                .is_empty();
4109        }
4110
4111        // Get the epoch # 3 validators
4112        let validators = client
4113            .get::<ValidatorMap>("node/validators/3")
4114            .send()
4115            .await
4116            .expect("validators");
4117
4118        assert!(!validators.is_empty());
4119
4120        // Collect addresses to track rewards for all participants.
4121        let mut addresses = HashSet::new();
4122        for v in validators.values() {
4123            addresses.insert(v.account);
4124            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4125        }
4126
4127        // Verify no rewards are distributed in the first two epochs.
4128        for block in 0..=EPOCH_HEIGHT * 2 {
4129            for address in addresses.clone() {
4130                let amount = client
4131                    .get::<Option<RewardAmount>>(&format!(
4132                        "reward-state/reward-balance/{block}/{address}"
4133                    ))
4134                    .send()
4135                    .await
4136                    .ok()
4137                    .flatten();
4138                assert!(amount.is_none(), "amount is not none for block {block}")
4139            }
4140        }
4141
4142        // Collect leaves for epoch 3 to 5 to verify reward calculations.
4143        let leaves = client
4144            .socket("availability/stream/leaves/41")
4145            .subscribe::<LeafQueryData<SeqTypes>>()
4146            .await
4147            .unwrap()
4148            .take((EPOCH_HEIGHT * 3).try_into().unwrap())
4149            .try_collect::<Vec<_>>()
4150            .await
4151            .unwrap();
4152
4153        let node_state = network.server.node_state();
4154        let coordinator = node_state.coordinator;
4155
4156        let membership = coordinator.membership().read().await;
4157        let block_reward = membership
4158            .fixed_block_reward()
4159            .expect("block reward is not None");
4160
4161        drop(membership);
4162
4163        let mut rewards_map = HashMap::new();
4164
4165        for leaf in leaves {
4166            let block = leaf.height();
4167            tracing::info!("verify rewards for block={block:?}");
4168            let membership = coordinator.membership().read().await;
4169            let epoch = epoch_from_block_number(block, EPOCH_HEIGHT);
4170            let epoch_number = EpochNumber::new(epoch);
4171            let leader = membership
4172                .leader(leaf.leaf().view_number(), Some(epoch_number))
4173                .expect("leader");
4174            let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
4175
4176            drop(membership);
4177
4178            let validators = client
4179                .get::<ValidatorMap>(&format!("node/validators/{epoch}"))
4180                .send()
4181                .await
4182                .expect("validators");
4183
4184            let leader_validator = validators
4185                .get(&leader_eth_address)
4186                .expect("leader not found");
4187
4188            let distributor =
4189                RewardDistributor::new(leader_validator.clone(), block_reward, U256::ZERO.into());
4190            // Verify that the sum of delegator stakes equals the validator's total stake.
4191            for validator in validators.values() {
4192                let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
4193
4194                assert_eq!(delegator_stake_sum, validator.stake);
4195            }
4196
4197            let computed_rewards = distributor.compute_rewards().expect("reward computation");
4198
4199            // Verify that the leader commission amount is within the tolerated range.
4200            // Due to potential rounding errors in decimal calculations for delegator rewards,
4201            // the actual distributed commission
4202            // amount may differ very slightly from the calculated value.
4203            // this asserts that it is within 10wei tolerance level.
4204            // 10 wei is 10* 10E-18
4205            let total_reward = block_reward.0;
4206            let leader_commission_basis_points = U256::from(leader_validator.commission);
4207            let calculated_leader_commission_reward = leader_commission_basis_points
4208                .checked_mul(total_reward)
4209                .context("overflow")?
4210                .checked_div(U256::from(COMMISSION_BASIS_POINTS))
4211                .context("overflow")?;
4212
4213            assert!(
4214                computed_rewards.leader_commission().0 - calculated_leader_commission_reward
4215                    <= U256::from(10_u64)
4216            );
4217
4218            // Aggregate reward amounts by address in the map.
4219            // This is necessary because there can be two entries for a leader address:
4220            // - One entry for commission rewards.
4221            // - Another for delegator rewards when the leader is delegating.
4222            // Also, rewards are accumulated for the same addresses
4223            let leader_commission = *computed_rewards.leader_commission();
4224            for (address, amount) in computed_rewards.delegators().clone() {
4225                rewards_map
4226                    .entry(address)
4227                    .and_modify(|entry| *entry += amount)
4228                    .or_insert(amount);
4229            }
4230
4231            // add leader commission reward
4232            rewards_map
4233                .entry(leader_eth_address)
4234                .and_modify(|entry| *entry += leader_commission)
4235                .or_insert(leader_commission);
4236
4237            // assert that the reward matches to what is in the reward merkle tree
4238            for (address, calculated_amount) in rewards_map.iter() {
4239                let amount_from_api = client
4240                    .get::<Option<RewardAmount>>(&format!(
4241                        "reward-state/reward-balance/{block}/{address}"
4242                    ))
4243                    .send()
4244                    .await
4245                    .ok()
4246                    .flatten()
4247                    .expect("amount");
4248                assert_eq!(amount_from_api, *calculated_amount)
4249            }
4250        }
4251
4252        Ok(())
4253    }
4254
4255    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4256    async fn test_rewards_v4() -> anyhow::Result<()> {
4257        // This test verifies PoS reward distribution logic for multiple delegators per validator.
4258        //
4259        //  assertions:
4260        // - No rewards are distributed during the first 2 epochs.
4261        // - Rewards begin from epoch 3 onward.
4262        // - Delegator stake sums match the corresponding validator stake.
4263        // - Reward values match those returned by the reward state API.
4264        // - Commission calculations are within a small acceptable rounding tolerance.
4265        // - Ensure that the `total_reward_distributed` field in the block header matches the total block reward distributed
4266        const EPOCH_HEIGHT: u64 = 20;
4267
4268        type V4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
4269
4270        let network_config = TestConfigBuilder::default()
4271            .epoch_height(EPOCH_HEIGHT)
4272            .build();
4273
4274        let api_port = pick_unused_port().expect("No ports free for query service");
4275
4276        const NUM_NODES: usize = 5;
4277
4278        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4279        let persistence: [_; NUM_NODES] = storage
4280            .iter()
4281            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4282            .collect::<Vec<_>>()
4283            .try_into()
4284            .unwrap();
4285
4286        let config = TestNetworkConfigBuilder::with_num_nodes()
4287            .api_config(SqlDataSource::options(
4288                &storage[0],
4289                Options::with_port(api_port),
4290            ))
4291            .network_config(network_config)
4292            .persistences(persistence.clone())
4293            .catchups(std::array::from_fn(|_| {
4294                StatePeers::<StaticVersion<0, 1>>::from_urls(
4295                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4296                    Default::default(),
4297                    &NoMetrics,
4298                )
4299            }))
4300            .pos_hook::<V4>(DelegationConfig::MultipleDelegators, Default::default())
4301            .await
4302            .unwrap()
4303            .build();
4304
4305        let network = TestNetwork::new(config, V4::new()).await;
4306        let client: Client<ServerError, SequencerApiVersion> =
4307            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4308
4309        // Wait for the chain to progress beyond epoch 3 so rewards start being distributed.
4310        let mut events = network.peers[0].event_stream().await;
4311        while let Some(event) = events.next().await {
4312            if let EventType::Decide { leaf_chain, .. } = event.event {
4313                let height = leaf_chain[0].leaf.height();
4314                tracing::info!("Node 0 decided at height: {height}");
4315                if height > EPOCH_HEIGHT * 3 {
4316                    break;
4317                }
4318            }
4319        }
4320
4321        // Verify that there are no validators for epoch # 1 and epoch # 2
4322        {
4323            client
4324                .get::<ValidatorMap>("node/validators/1")
4325                .send()
4326                .await
4327                .unwrap()
4328                .is_empty();
4329
4330            client
4331                .get::<ValidatorMap>("node/validators/2")
4332                .send()
4333                .await
4334                .unwrap()
4335                .is_empty();
4336        }
4337
4338        // Get the epoch # 3 validators
4339        let validators = client
4340            .get::<ValidatorMap>("node/validators/3")
4341            .send()
4342            .await
4343            .expect("validators");
4344
4345        assert!(!validators.is_empty());
4346
4347        // Collect addresses to track rewards for all participants.
4348        let mut addresses = HashSet::new();
4349        for v in validators.values() {
4350            addresses.insert(v.account);
4351            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4352        }
4353
4354        let mut leaves = client
4355            .socket("availability/stream/leaves/0")
4356            .subscribe::<LeafQueryData<SeqTypes>>()
4357            .await
4358            .unwrap();
4359
4360        let node_state = network.server.node_state();
4361        let coordinator = node_state.coordinator;
4362
4363        let membership = coordinator.membership().read().await;
4364
4365        // Ensure rewards remain zero up for the first two epochs
4366        while let Some(leaf) = leaves.next().await {
4367            let leaf = leaf.unwrap();
4368            let header = leaf.header();
4369            assert_eq!(header.total_reward_distributed().unwrap().0, U256::ZERO);
4370
4371            let epoch_number =
4372                EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4373
4374            assert!(membership.epoch_block_reward(epoch_number).is_none());
4375
4376            let height = header.height();
4377            for address in addresses.clone() {
4378                let amount = client
4379                    .get::<Option<RewardAmount>>(&format!(
4380                        "reward-state-v2/reward-balance/{height}/{address}"
4381                    ))
4382                    .send()
4383                    .await
4384                    .ok()
4385                    .flatten();
4386                assert!(amount.is_none(), "amount is not none for block {height}")
4387            }
4388
4389            if leaf.height() == EPOCH_HEIGHT * 2 {
4390                break;
4391            }
4392        }
4393
4394        drop(membership);
4395
4396        let mut rewards_map = HashMap::new();
4397        let mut total_distributed = U256::ZERO;
4398        let mut epoch_rewards = HashMap::<EpochNumber, U256>::new();
4399
4400        while let Some(leaf) = leaves.next().await {
4401            let leaf = leaf.unwrap();
4402
4403            let header = leaf.header();
4404            let distributed = header
4405                .total_reward_distributed()
4406                .expect("rewards distributed is none");
4407
4408            let block = leaf.height();
4409            tracing::info!("verify rewards for block={block:?}");
4410            let membership = coordinator.membership().read().await;
4411            let epoch_number =
4412                EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4413
4414            let block_reward = membership.epoch_block_reward(epoch_number).unwrap();
4415            let leader = membership
4416                .leader(leaf.leaf().view_number(), Some(epoch_number))
4417                .expect("leader");
4418            let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
4419
4420            drop(membership);
4421
4422            let validators = client
4423                .get::<ValidatorMap>(&format!("node/validators/{epoch_number}"))
4424                .send()
4425                .await
4426                .expect("validators");
4427
4428            let leader_validator = validators
4429                .get(&leader_eth_address)
4430                .expect("leader not found");
4431
4432            let distributor =
4433                RewardDistributor::new(leader_validator.clone(), block_reward, distributed);
4434            // Verify that the sum of delegator stakes equals the validator's total stake.
4435            for validator in validators.values() {
4436                let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
4437
4438                assert_eq!(delegator_stake_sum, validator.stake);
4439            }
4440
4441            let computed_rewards = distributor.compute_rewards().expect("reward computation");
4442
4443            // Validate that the leader's commission is within a 10 wei tolerance of the expected value.
4444            let total_reward = block_reward.0;
4445            let leader_commission_basis_points = U256::from(leader_validator.commission);
4446            let calculated_leader_commission_reward = leader_commission_basis_points
4447                .checked_mul(total_reward)
4448                .context("overflow")?
4449                .checked_div(U256::from(COMMISSION_BASIS_POINTS))
4450                .context("overflow")?;
4451
4452            assert!(
4453                computed_rewards.leader_commission().0 - calculated_leader_commission_reward
4454                    <= U256::from(10_u64)
4455            );
4456
4457            // Aggregate rewards by address (both delegator and leader).
4458            let leader_commission = *computed_rewards.leader_commission();
4459            for (address, amount) in computed_rewards.delegators().clone() {
4460                rewards_map
4461                    .entry(address)
4462                    .and_modify(|entry| *entry += amount)
4463                    .or_insert(amount);
4464            }
4465
4466            // add leader commission reward
4467            rewards_map
4468                .entry(leader_eth_address)
4469                .and_modify(|entry| *entry += leader_commission)
4470                .or_insert(leader_commission);
4471
4472            // assert that the reward matches to what is in the reward merkle tree
4473            for (address, calculated_amount) in rewards_map.iter() {
4474                let mut attempt = 0;
4475                let amount_from_api = loop {
4476                    let result = client
4477                        .get::<Option<RewardAmount>>(&format!(
4478                            "reward-state-v2/reward-balance/{block}/{address}"
4479                        ))
4480                        .send()
4481                        .await
4482                        .ok()
4483                        .flatten();
4484
4485                    if let Some(amount) = result {
4486                        break amount;
4487                    }
4488
4489                    attempt += 1;
4490                    if attempt >= 3 {
4491                        panic!(
4492                            "Failed to fetch reward amount for address {address} after 3 retries"
4493                        );
4494                    }
4495
4496                    sleep(Duration::from_secs(2)).await;
4497                };
4498
4499                assert_eq!(amount_from_api, *calculated_amount);
4500            }
4501
4502            // Confirm the header's total distributed field matches the cumulative expected amount.
4503            total_distributed += block_reward.0;
4504            assert_eq!(
4505                header.total_reward_distributed().unwrap().0,
4506                total_distributed
4507            );
4508
4509            // Block reward shouldn't change for the same epoch
4510            epoch_rewards
4511                .entry(epoch_number)
4512                .and_modify(|r| assert_eq!(*r, block_reward.0))
4513                .or_insert(block_reward.0);
4514
4515            // Stop the test after verifying 5 full epochs.
4516            if leaf.height() == EPOCH_HEIGHT * 5 {
4517                break;
4518            }
4519        }
4520
4521        Ok(())
4522    }
4523
4524    #[rstest]
4525    #[case(PosVersionV3::new())]
4526    #[case(PosVersionV4::new())]
4527    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4528
4529    async fn test_node_stake_table_api<Ver: Versions>(#[case] ver: Ver) {
4530        let epoch_height = 20;
4531
4532        let network_config = TestConfigBuilder::default()
4533            .epoch_height(epoch_height)
4534            .build();
4535
4536        let api_port = pick_unused_port().expect("No ports free for query service");
4537
4538        const NUM_NODES: usize = 2;
4539        // Initialize nodes.
4540        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4541        let persistence: [_; NUM_NODES] = storage
4542            .iter()
4543            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4544            .collect::<Vec<_>>()
4545            .try_into()
4546            .unwrap();
4547
4548        let config = TestNetworkConfigBuilder::with_num_nodes()
4549            .api_config(SqlDataSource::options(
4550                &storage[0],
4551                Options::with_port(api_port),
4552            ))
4553            .network_config(network_config)
4554            .persistences(persistence.clone())
4555            .catchups(std::array::from_fn(|_| {
4556                StatePeers::<StaticVersion<0, 1>>::from_urls(
4557                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4558                    Default::default(),
4559                    &NoMetrics,
4560                )
4561            }))
4562            .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4563            .await
4564            .unwrap()
4565            .build();
4566
4567        let _network = TestNetwork::new(config, ver).await;
4568
4569        let client: Client<ServerError, SequencerApiVersion> =
4570            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4571
4572        // wait for atleast 2 epochs
4573        let _blocks = client
4574            .socket("availability/stream/blocks/0")
4575            .subscribe::<BlockQueryData<SeqTypes>>()
4576            .await
4577            .unwrap()
4578            .take(40)
4579            .try_collect::<Vec<_>>()
4580            .await
4581            .unwrap();
4582
4583        for i in 1..=3 {
4584            let _st = client
4585                .get::<Vec<PeerConfig<SeqTypes>>>(&format!("node/stake-table/{}", i as u64))
4586                .send()
4587                .await
4588                .expect("failed to get stake table");
4589        }
4590
4591        let _st = client
4592            .get::<StakeTableWithEpochNumber<SeqTypes>>("node/stake-table/current")
4593            .send()
4594            .await
4595            .expect("failed to get stake table");
4596    }
4597
4598    #[rstest]
4599    #[case(PosVersionV3::new())]
4600    #[case(PosVersionV4::new())]
4601    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4602
4603    async fn test_epoch_stake_table_catchup<Ver: Versions>(#[case] ver: Ver) {
4604        const EPOCH_HEIGHT: u64 = 10;
4605        const NUM_NODES: usize = 6;
4606
4607        let port = pick_unused_port().expect("No ports free");
4608
4609        let network_config = TestConfigBuilder::default()
4610            .epoch_height(EPOCH_HEIGHT)
4611            .build();
4612
4613        // Initialize storage for each node
4614        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4615
4616        let persistence_options: [_; NUM_NODES] = storage
4617            .iter()
4618            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4619            .collect::<Vec<_>>()
4620            .try_into()
4621            .unwrap();
4622
4623        // setup catchup peers
4624        let catchup_peers = std::array::from_fn(|_| {
4625            StatePeers::<StaticVersion<0, 1>>::from_urls(
4626                vec![format!("http://localhost:{port}").parse().unwrap()],
4627                Default::default(),
4628                &NoMetrics,
4629            )
4630        });
4631        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
4632            .api_config(SqlDataSource::options(
4633                &storage[0],
4634                Options::with_port(port),
4635            ))
4636            .network_config(network_config)
4637            .persistences(persistence_options.clone())
4638            .catchups(catchup_peers)
4639            .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4640            .await
4641            .unwrap()
4642            .build();
4643
4644        let state = config.states()[0].clone();
4645        let mut network = TestNetwork::new(config, ver).await;
4646
4647        // Wait for the peer 0 (node 1) to advance past three epochs
4648        let mut events = network.peers[0].event_stream().await;
4649        while let Some(event) = events.next().await {
4650            if let EventType::Decide { leaf_chain, .. } = event.event {
4651                let height = leaf_chain[0].leaf.height();
4652                tracing::info!("Node 0 decided at height: {height}");
4653                if height > EPOCH_HEIGHT * 3 {
4654                    break;
4655                }
4656            }
4657        }
4658
4659        // Shutdown and remove node 1 to simulate falling behind
4660        tracing::info!("Shutting down peer 0");
4661        network.peers.remove(0);
4662
4663        // Wait for epochs to progress with node 1 offline
4664        let mut events = network.server.event_stream().await;
4665        while let Some(event) = events.next().await {
4666            if let EventType::Decide { leaf_chain, .. } = event.event {
4667                let height = leaf_chain[0].leaf.height();
4668                if height > EPOCH_HEIGHT * 7 {
4669                    break;
4670                }
4671            }
4672        }
4673
4674        // add node 1 to the network with fresh storage
4675        let storage = SqlDataSource::create_storage().await;
4676        let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
4677        tracing::info!("Restarting peer 0");
4678        let node = network
4679            .cfg
4680            .init_node(
4681                1,
4682                state,
4683                options,
4684                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4685                    vec![format!("http://localhost:{port}").parse().unwrap()],
4686                    Default::default(),
4687                    &NoMetrics,
4688                )),
4689                None,
4690                &NoMetrics,
4691                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4692                NullEventConsumer,
4693                ver,
4694                Default::default(),
4695            )
4696            .await;
4697
4698        let coordinator = node.node_state().coordinator;
4699        let server_node_state = network.server.node_state();
4700        let server_coordinator = server_node_state.coordinator;
4701        // Verify that the restarted node catches up for each epoch
4702        for epoch_num in 1..=7 {
4703            let epoch = EpochNumber::new(epoch_num);
4704            let membership_for_epoch = coordinator.membership_for_epoch(Some(epoch)).await;
4705            if membership_for_epoch.is_err() {
4706                coordinator.wait_for_catchup(epoch).await.unwrap();
4707            }
4708
4709            println!("have stake table for epoch = {epoch_num}");
4710
4711            let node_stake_table = coordinator
4712                .membership()
4713                .read()
4714                .await
4715                .stake_table(Some(epoch));
4716            let stake_table = server_coordinator
4717                .membership()
4718                .read()
4719                .await
4720                .stake_table(Some(epoch));
4721            println!("asserting stake table for epoch = {epoch_num}");
4722
4723            assert_eq!(
4724                node_stake_table, stake_table,
4725                "Stake table mismatch for epoch {epoch_num}",
4726            );
4727        }
4728    }
4729
4730    #[rstest]
4731    #[case(PosVersionV3::new())]
4732    #[case(PosVersionV4::new())]
4733    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4734
4735    async fn test_epoch_stake_table_catchup_stress<Ver: Versions>(#[case] versions: Ver) {
4736        const EPOCH_HEIGHT: u64 = 10;
4737        const NUM_NODES: usize = 6;
4738
4739        let port = pick_unused_port().expect("No ports free");
4740
4741        let network_config = TestConfigBuilder::default()
4742            .epoch_height(EPOCH_HEIGHT)
4743            .build();
4744
4745        // Initialize storage for each node
4746        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4747
4748        let persistence_options: [_; NUM_NODES] = storage
4749            .iter()
4750            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4751            .collect::<Vec<_>>()
4752            .try_into()
4753            .unwrap();
4754
4755        // setup catchup peers
4756        let catchup_peers = std::array::from_fn(|_| {
4757            StatePeers::<StaticVersion<0, 1>>::from_urls(
4758                vec![format!("http://localhost:{port}").parse().unwrap()],
4759                Default::default(),
4760                &NoMetrics,
4761            )
4762        });
4763        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
4764            .api_config(SqlDataSource::options(
4765                &storage[0],
4766                Options::with_port(port),
4767            ))
4768            .network_config(network_config)
4769            .persistences(persistence_options.clone())
4770            .catchups(catchup_peers)
4771            .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4772            .await
4773            .unwrap()
4774            .build();
4775
4776        let state = config.states()[0].clone();
4777        let mut network = TestNetwork::new(config, versions).await;
4778
4779        // Wait for the peer 0 (node 1) to advance past three epochs
4780        let mut events = network.peers[0].event_stream().await;
4781        while let Some(event) = events.next().await {
4782            if let EventType::Decide { leaf_chain, .. } = event.event {
4783                let height = leaf_chain[0].leaf.height();
4784                tracing::info!("Node 0 decided at height: {height}");
4785                if height > EPOCH_HEIGHT * 3 {
4786                    break;
4787                }
4788            }
4789        }
4790
4791        // Shutdown and remove node 1 to simulate falling behind
4792        tracing::info!("Shutting down peer 0");
4793        network.peers.remove(0);
4794
4795        // Wait for epochs to progress with node 1 offline
4796        let mut events = network.server.event_stream().await;
4797        while let Some(event) = events.next().await {
4798            if let EventType::Decide { leaf_chain, .. } = event.event {
4799                let height = leaf_chain[0].leaf.height();
4800                tracing::info!("Server decided at height: {height}");
4801                //  until 7 epochs
4802                if height > EPOCH_HEIGHT * 7 {
4803                    break;
4804                }
4805            }
4806        }
4807
4808        // add node 1 to the network with fresh storage
4809        let storage = SqlDataSource::create_storage().await;
4810        let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
4811
4812        tracing::info!("Restarting peer 0");
4813        let node = network
4814            .cfg
4815            .init_node(
4816                1,
4817                state,
4818                options,
4819                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4820                    vec![format!("http://localhost:{port}").parse().unwrap()],
4821                    Default::default(),
4822                    &NoMetrics,
4823                )),
4824                None,
4825                &NoMetrics,
4826                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4827                NullEventConsumer,
4828                versions,
4829                Default::default(),
4830            )
4831            .await;
4832
4833        let coordinator = node.node_state().coordinator;
4834
4835        let server_node_state = network.server.node_state();
4836        let server_coordinator = server_node_state.coordinator;
4837
4838        // Trigger catchup for all epochs in quick succession and in random order
4839        let mut rand_epochs: Vec<_> = (1..=7).collect();
4840        rand_epochs.shuffle(&mut rand::thread_rng());
4841        println!("trigger catchup in this order: {rand_epochs:?}");
4842        for epoch_num in rand_epochs {
4843            let epoch = EpochNumber::new(epoch_num);
4844            let _ = coordinator.membership_for_epoch(Some(epoch)).await;
4845        }
4846
4847        // Verify that the restarted node catches up for each epoch
4848        for epoch_num in 1..=7 {
4849            println!("getting stake table for epoch = {epoch_num}");
4850            let epoch = EpochNumber::new(epoch_num);
4851            let _ = coordinator.wait_for_catchup(epoch).await.unwrap();
4852
4853            println!("have stake table for epoch = {epoch_num}");
4854
4855            let node_stake_table = coordinator
4856                .membership()
4857                .read()
4858                .await
4859                .stake_table(Some(epoch));
4860            let stake_table = server_coordinator
4861                .membership()
4862                .read()
4863                .await
4864                .stake_table(Some(epoch));
4865
4866            println!("asserting stake table for epoch = {epoch_num}");
4867
4868            assert_eq!(
4869                node_stake_table, stake_table,
4870                "Stake table mismatch for epoch {epoch_num}",
4871            );
4872        }
4873    }
4874
4875    #[rstest]
4876    #[case(PosVersionV3::new())]
4877    #[case(PosVersionV4::new())]
4878    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4879    async fn test_merklized_state_catchup_on_restart<Ver: Versions>(
4880        #[case] versions: Ver,
4881    ) -> anyhow::Result<()> {
4882        // This test verifies that a query node can catch up on
4883        // merklized state after being offline for multiple epochs.
4884        //
4885        // Steps:
4886        // 1. Start a test network with 5 sequencer nodes.
4887        // 2. Start a separate node with the query module enabled, connected to the network.
4888        //    - This node stores merklized state
4889        // 3. Shut down the query node after 1 epoch.
4890        // 4. Allow the network to progress 3 more epochs (query node remains offline).
4891        // 5. Restart the query node.
4892        //    - The node is expected to reconstruct or catch up on its own
4893        const EPOCH_HEIGHT: u64 = 10;
4894
4895        let network_config = TestConfigBuilder::default()
4896            .epoch_height(EPOCH_HEIGHT)
4897            .build();
4898
4899        let api_port = pick_unused_port().expect("No ports free for query service");
4900
4901        tracing::info!("API PORT = {api_port}");
4902        const NUM_NODES: usize = 5;
4903
4904        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4905        let persistence: [_; NUM_NODES] = storage
4906            .iter()
4907            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4908            .collect::<Vec<_>>()
4909            .try_into()
4910            .unwrap();
4911
4912        let config = TestNetworkConfigBuilder::with_num_nodes()
4913            .api_config(SqlDataSource::options(
4914                &storage[0],
4915                Options::with_port(api_port).catchup(Default::default()),
4916            ))
4917            .network_config(network_config)
4918            .persistences(persistence.clone())
4919            .catchups(std::array::from_fn(|_| {
4920                StatePeers::<StaticVersion<0, 1>>::from_urls(
4921                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4922                    Default::default(),
4923                    &NoMetrics,
4924                )
4925            }))
4926            .pos_hook::<Ver>(
4927                DelegationConfig::MultipleDelegators,
4928                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
4929            )
4930            .await
4931            .unwrap()
4932            .build();
4933        let state = config.states()[0].clone();
4934        let mut network = TestNetwork::new(config, versions).await;
4935
4936        // Remove peer 0 and restart it with the query module enabled.
4937        // Adding an additional node to the test network is not straight forward,
4938        // as the keys have already been initialized in the config above.
4939        // So, we remove this node and re-add it using the same index.
4940        network.peers[0].shut_down().await;
4941        network.peers.remove(0);
4942        let node_0_storage = &storage[1];
4943        let node_0_persistence = persistence[1].clone();
4944        let node_0_port = pick_unused_port().expect("No ports free for query service");
4945        tracing::info!("node_0_port {node_0_port}");
4946        // enable query module with api peers
4947        let opt = Options::with_port(node_0_port).query_sql(
4948            Query {
4949                peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
4950            },
4951            tmp_options(node_0_storage),
4952        );
4953
4954        // start the query node so that it builds the merklized state
4955        let node_0 = opt
4956            .clone()
4957            .serve(|metrics, consumer, storage| {
4958                let cfg = network.cfg.clone();
4959                let node_0_persistence = node_0_persistence.clone();
4960                let state = state.clone();
4961                async move {
4962                    Ok(cfg
4963                        .init_node(
4964                            1,
4965                            state,
4966                            node_0_persistence.clone(),
4967                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4968                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
4969                                Default::default(),
4970                                &NoMetrics,
4971                            )),
4972                            storage,
4973                            &*metrics,
4974                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4975                            consumer,
4976                            versions,
4977                            Default::default(),
4978                        )
4979                        .await)
4980                }
4981                .boxed()
4982            })
4983            .await
4984            .unwrap();
4985
4986        let mut events = network.peers[2].event_stream().await;
4987        // wait for 1 epoch
4988        wait_for_epochs(&mut events, EPOCH_HEIGHT, 1).await;
4989
4990        // shutdown the node for 3 epochs
4991        drop(node_0);
4992
4993        // wait for 4 epochs
4994        wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
4995
4996        // start the node again.
4997        let node_0 = opt
4998            .serve(|metrics, consumer, storage| {
4999                let cfg = network.cfg.clone();
5000                async move {
5001                    Ok(cfg
5002                        .init_node(
5003                            1,
5004                            state,
5005                            node_0_persistence,
5006                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5007                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
5008                                Default::default(),
5009                                &NoMetrics,
5010                            )),
5011                            storage,
5012                            &*metrics,
5013                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5014                            consumer,
5015                            versions,
5016                            Default::default(),
5017                        )
5018                        .await)
5019                }
5020                .boxed()
5021            })
5022            .await
5023            .unwrap();
5024
5025        let client: Client<ServerError, SequencerApiVersion> =
5026            Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5027        client.connect(None).await;
5028
5029        wait_for_epochs(&mut events, EPOCH_HEIGHT, 6).await;
5030
5031        let epoch_7_block = EPOCH_HEIGHT * 6 + 1;
5032
5033        // check that the node's state has reward accounts
5034        let mut retries = 0;
5035        loop {
5036            sleep(Duration::from_secs(1)).await;
5037            let state = node_0.decided_state().await;
5038
5039            let leaves = if Ver::Base::VERSION == EpochVersion::VERSION {
5040                // Use legacy tree for V3
5041                state.reward_merkle_tree_v1.num_leaves()
5042            } else {
5043                // Use new tree for V4 and above
5044                state.reward_merkle_tree_v2.num_leaves()
5045            };
5046
5047            if leaves > 0 {
5048                tracing::info!("Node's state has reward accounts");
5049                break;
5050            }
5051
5052            retries += 1;
5053            if retries > 120 {
5054                panic!("max retries reached. failed to catchup reward state");
5055            }
5056        }
5057
5058        retries = 0;
5059        // check that the node has stored atleast 6 epochs merklized state in persistence
5060        loop {
5061            sleep(Duration::from_secs(3)).await;
5062
5063            let bh = client
5064                .get::<u64>("block-state/block-height")
5065                .send()
5066                .await
5067                .expect("block height not found");
5068
5069            tracing::info!("block state: block height={bh}");
5070            if bh > epoch_7_block {
5071                break;
5072            }
5073
5074            retries += 1;
5075            if retries > 30 {
5076                panic!(
5077                    "max retries reached. block state block height is less than epoch 7 start \
5078                     block"
5079                );
5080            }
5081        }
5082
5083        // shutdown consensus to freeze the state
5084        node_0.shutdown_consensus().await;
5085        let decided_leaf = node_0.decided_leaf().await;
5086        let state = node_0.decided_state().await;
5087
5088        state
5089            .block_merkle_tree
5090            .lookup(decided_leaf.height() - 1)
5091            .expect_ok()
5092            .expect("block state not found");
5093
5094        Ok(())
5095    }
5096
5097    #[rstest]
5098    #[case(PosVersionV3::new())]
5099    #[case(PosVersionV4::new())]
5100    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5101    async fn test_state_reconstruction<Ver: Versions>(
5102        #[case] pos_version: Ver,
5103    ) -> anyhow::Result<()> {
5104        // This test verifies that a query node can successfully reconstruct its state
5105        // after being shut down from the database
5106        //
5107        // Steps:
5108        // 1. Start a test network with 5 nodes.
5109        // 2. Add a query node connected to the network.
5110        // 3. Let the network run until 3 epochs have passed.
5111        // 4. Shut down the query node.
5112        // 5. Attempt to reconstruct its state from storage using:
5113        //    - No fee/reward accounts
5114        //    - Only fee accounts
5115        //    - Only reward accounts
5116        //    - Both fee and reward accounts
5117        // 6. Assert that the reconstructed state is correct in all scenarios.
5118
5119        const EPOCH_HEIGHT: u64 = 10;
5120
5121        let network_config = TestConfigBuilder::default()
5122            .epoch_height(EPOCH_HEIGHT)
5123            .build();
5124
5125        let api_port = pick_unused_port().expect("No ports free for query service");
5126
5127        tracing::info!("API PORT = {api_port}");
5128        const NUM_NODES: usize = 5;
5129
5130        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5131        let persistence: [_; NUM_NODES] = storage
5132            .iter()
5133            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5134            .collect::<Vec<_>>()
5135            .try_into()
5136            .unwrap();
5137
5138        let config = TestNetworkConfigBuilder::with_num_nodes()
5139            .api_config(SqlDataSource::options(
5140                &storage[0],
5141                Options::with_port(api_port),
5142            ))
5143            .network_config(network_config)
5144            .persistences(persistence.clone())
5145            .catchups(std::array::from_fn(|_| {
5146                StatePeers::<StaticVersion<0, 1>>::from_urls(
5147                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5148                    Default::default(),
5149                    &NoMetrics,
5150                )
5151            }))
5152            .pos_hook::<Ver>(
5153                DelegationConfig::MultipleDelegators,
5154                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5155            )
5156            .await
5157            .unwrap()
5158            .build();
5159        let state = config.states()[0].clone();
5160        let mut network = TestNetwork::new(config, pos_version).await;
5161        // Remove peer 0 and restart it with the query module enabled.
5162        // Adding an additional node to the test network is not straight forward,
5163        // as the keys have already been initialized in the config above.
5164        // So, we remove this node and re-add it using the same index.
5165        network.peers.remove(0);
5166
5167        let node_0_storage = &storage[1];
5168        let node_0_persistence = persistence[1].clone();
5169        let node_0_port = pick_unused_port().expect("No ports free for query service");
5170        tracing::info!("node_0_port {node_0_port}");
5171        let opt = Options::with_port(node_0_port).query_sql(
5172            Query {
5173                peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
5174            },
5175            tmp_options(node_0_storage),
5176        );
5177        let node_0 = opt
5178            .clone()
5179            .serve(|metrics, consumer, storage| {
5180                let cfg = network.cfg.clone();
5181                let node_0_persistence = node_0_persistence.clone();
5182                let state = state.clone();
5183                async move {
5184                    Ok(cfg
5185                        .init_node(
5186                            1,
5187                            state,
5188                            node_0_persistence.clone(),
5189                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5190                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
5191                                Default::default(),
5192                                &NoMetrics,
5193                            )),
5194                            storage,
5195                            &*metrics,
5196                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5197                            consumer,
5198                            pos_version,
5199                            Default::default(),
5200                        )
5201                        .await)
5202                }
5203                .boxed()
5204            })
5205            .await
5206            .unwrap();
5207
5208        let mut events = network.peers[2].event_stream().await;
5209        // Wait until at least 3 epochs have passed
5210        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
5211
5212        tracing::warn!("shutting down node 0");
5213
5214        node_0.shutdown_consensus().await;
5215
5216        let instance = node_0.node_state();
5217        let state = node_0.decided_state().await;
5218        let fee_accounts = state
5219            .fee_merkle_tree
5220            .clone()
5221            .into_iter()
5222            .map(|(acct, _)| acct)
5223            .collect::<Vec<_>>();
5224        let reward_accounts = match Ver::Base::VERSION {
5225            EpochVersion::VERSION => state
5226                .reward_merkle_tree_v1
5227                .clone()
5228                .into_iter()
5229                .map(|(acct, _)| RewardAccountV2::from(acct))
5230                .collect::<Vec<_>>(),
5231            DrbAndHeaderUpgradeVersion::VERSION => state
5232                .reward_merkle_tree_v2
5233                .clone()
5234                .into_iter()
5235                .map(|(acct, _)| acct)
5236                .collect::<Vec<_>>(),
5237            _ => panic!("invalid version"),
5238        };
5239
5240        let client: Client<ServerError, SequencerApiVersion> =
5241            Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5242        client.connect(Some(Duration::from_secs(10))).await;
5243
5244        // wait 3s to be sure that all the
5245        // transactions have been committed
5246        sleep(Duration::from_secs(3)).await;
5247
5248        tracing::info!("getting node block height");
5249        let node_block_height = client
5250            .get::<u64>("node/block-height")
5251            .send()
5252            .await
5253            .context("getting Espresso block height")
5254            .unwrap();
5255
5256        tracing::info!("node block height={node_block_height}");
5257
5258        let leaf_query_data = client
5259            .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{}", node_block_height - 1))
5260            .send()
5261            .await
5262            .context("error getting leaf")
5263            .unwrap();
5264
5265        tracing::info!("leaf={leaf_query_data:?}");
5266        let leaf = leaf_query_data.leaf();
5267        let to_view = leaf.view_number() + 1;
5268
5269        let ds = SqlStorage::connect(
5270            Config::try_from(&node_0_persistence).unwrap(),
5271            StorageConnectionType::Sequencer,
5272        )
5273        .await
5274        .unwrap();
5275        let mut tx = ds.read().await?;
5276
5277        let (state, leaf) = reconstruct_state(
5278            &instance,
5279            &ds,
5280            &mut tx,
5281            node_block_height - 1,
5282            to_view,
5283            &[],
5284            &[],
5285        )
5286        .await
5287        .unwrap();
5288        assert_eq!(leaf.view_number(), to_view);
5289        assert!(
5290            state
5291                .block_merkle_tree
5292                .lookup(node_block_height - 1)
5293                .expect_ok()
5294                .is_ok(),
5295            "inconsistent block merkle tree"
5296        );
5297
5298        // Reconstruct fee state
5299        let (state, leaf) = reconstruct_state(
5300            &instance,
5301            &ds,
5302            &mut tx,
5303            node_block_height - 1,
5304            to_view,
5305            &fee_accounts,
5306            &[],
5307        )
5308        .await
5309        .unwrap();
5310
5311        assert_eq!(leaf.view_number(), to_view);
5312        assert!(
5313            state
5314                .block_merkle_tree
5315                .lookup(node_block_height - 1)
5316                .expect_ok()
5317                .is_ok(),
5318            "inconsistent block merkle tree"
5319        );
5320
5321        for account in &fee_accounts {
5322            state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
5323        }
5324
5325        // Reconstruct reward state
5326
5327        let (state, leaf) = reconstruct_state(
5328            &instance,
5329            &ds,
5330            &mut tx,
5331            node_block_height - 1,
5332            to_view,
5333            &[],
5334            &reward_accounts,
5335        )
5336        .await
5337        .unwrap();
5338
5339        match Ver::Base::VERSION {
5340            EpochVersion::VERSION => {
5341                for account in reward_accounts.clone() {
5342                    state
5343                        .reward_merkle_tree_v1
5344                        .lookup(RewardAccountV1::from(account))
5345                        .expect_ok()
5346                        .unwrap();
5347                }
5348            },
5349            DrbAndHeaderUpgradeVersion::VERSION => {
5350                for account in &reward_accounts {
5351                    state
5352                        .reward_merkle_tree_v2
5353                        .lookup(account)
5354                        .expect_ok()
5355                        .unwrap();
5356                }
5357            },
5358            _ => panic!("invalid version"),
5359        };
5360
5361        assert_eq!(leaf.view_number(), to_view);
5362        assert!(
5363            state
5364                .block_merkle_tree
5365                .lookup(node_block_height - 1)
5366                .expect_ok()
5367                .is_ok(),
5368            "inconsistent block merkle tree"
5369        );
5370        // Reconstruct reward and fee state
5371
5372        let (state, leaf) = reconstruct_state(
5373            &instance,
5374            &ds,
5375            &mut tx,
5376            node_block_height - 1,
5377            to_view,
5378            &fee_accounts,
5379            &reward_accounts,
5380        )
5381        .await
5382        .unwrap();
5383
5384        assert!(
5385            state
5386                .block_merkle_tree
5387                .lookup(node_block_height - 1)
5388                .expect_ok()
5389                .is_ok(),
5390            "inconsistent block merkle tree"
5391        );
5392        assert_eq!(leaf.view_number(), to_view);
5393
5394        match Ver::Base::VERSION {
5395            EpochVersion::VERSION => {
5396                for account in reward_accounts.clone() {
5397                    state
5398                        .reward_merkle_tree_v1
5399                        .lookup(RewardAccountV1::from(account))
5400                        .expect_ok()
5401                        .unwrap();
5402                }
5403            },
5404            DrbAndHeaderUpgradeVersion::VERSION => {
5405                for account in &reward_accounts {
5406                    state
5407                        .reward_merkle_tree_v2
5408                        .lookup(account)
5409                        .expect_ok()
5410                        .unwrap();
5411                }
5412            },
5413            _ => panic!("invalid version"),
5414        };
5415
5416        for account in &fee_accounts {
5417            state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
5418        }
5419
5420        Ok(())
5421    }
5422
5423    #[rstest]
5424    #[case(PosVersionV3::new())]
5425    #[case(PosVersionV4::new())]
5426    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5427    async fn test_block_reward_api<Ver: Versions>(#[case] versions: Ver) -> anyhow::Result<()> {
5428        let epoch_height = 10;
5429
5430        let network_config = TestConfigBuilder::default()
5431            .epoch_height(epoch_height)
5432            .build();
5433
5434        let api_port = pick_unused_port().expect("No ports free for query service");
5435
5436        const NUM_NODES: usize = 1;
5437        // Initialize nodes.
5438        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5439        let persistence: [_; NUM_NODES] = storage
5440            .iter()
5441            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5442            .collect::<Vec<_>>()
5443            .try_into()
5444            .unwrap();
5445
5446        let config = TestNetworkConfigBuilder::with_num_nodes()
5447            .api_config(SqlDataSource::options(
5448                &storage[0],
5449                Options::with_port(api_port),
5450            ))
5451            .network_config(network_config.clone())
5452            .persistences(persistence.clone())
5453            .catchups(std::array::from_fn(|_| {
5454                StatePeers::<StaticVersion<0, 1>>::from_urls(
5455                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5456                    Default::default(),
5457                    &NoMetrics,
5458                )
5459            }))
5460            .pos_hook::<Ver>(DelegationConfig::VariableAmounts, Default::default())
5461            .await
5462            .unwrap()
5463            .build();
5464
5465        let _network = TestNetwork::new(config, versions).await;
5466        let client: Client<ServerError, SequencerApiVersion> =
5467            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5468
5469        let _blocks = client
5470            .socket("availability/stream/blocks/0")
5471            .subscribe::<BlockQueryData<SeqTypes>>()
5472            .await
5473            .unwrap()
5474            .take(3)
5475            .try_collect::<Vec<_>>()
5476            .await
5477            .unwrap();
5478
5479        let block_reward = client
5480            .get::<Option<RewardAmount>>("node/block-reward")
5481            .send()
5482            .await
5483            .expect("failed to get block reward")
5484            .expect("block reward is None");
5485        tracing::info!("block_reward={block_reward:?}");
5486
5487        assert!(block_reward.0 > U256::ZERO);
5488
5489        Ok(())
5490    }
5491
5492    #[rstest]
5493    #[case(PosVersionV4::new())]
5494    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5495    async fn test_token_supply_api<Ver: Versions>(#[case] versions: Ver) -> anyhow::Result<()> {
5496        let epoch_height = 10;
5497
5498        let network_config = TestConfigBuilder::default()
5499            .epoch_height(epoch_height)
5500            .build();
5501
5502        let api_port = pick_unused_port().expect("No ports free for query service");
5503
5504        const NUM_NODES: usize = 1;
5505        // Initialize nodes.
5506        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5507        let persistence: [_; NUM_NODES] = storage
5508            .iter()
5509            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5510            .collect::<Vec<_>>()
5511            .try_into()
5512            .unwrap();
5513
5514        let config = TestNetworkConfigBuilder::with_num_nodes()
5515            .api_config(SqlDataSource::options(
5516                &storage[0],
5517                Options::with_port(api_port),
5518            ))
5519            .network_config(network_config.clone())
5520            .persistences(persistence.clone())
5521            .catchups(std::array::from_fn(|_| {
5522                StatePeers::<StaticVersion<0, 1>>::from_urls(
5523                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5524                    Default::default(),
5525                    &NoMetrics,
5526                )
5527            }))
5528            .pos_hook::<Ver>(DelegationConfig::VariableAmounts, Default::default())
5529            .await
5530            .unwrap()
5531            .build();
5532
5533        let _network = TestNetwork::new(config, versions).await;
5534        let client: Client<ServerError, SequencerApiVersion> =
5535            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5536
5537        let _blocks = client
5538            .socket("availability/stream/blocks/0")
5539            .subscribe::<BlockQueryData<SeqTypes>>()
5540            .await
5541            .unwrap()
5542            .take(3)
5543            .try_collect::<Vec<_>>()
5544            .await
5545            .unwrap();
5546
5547        let total_minted_supply = client
5548            .get::<String>("token/total-minted-supply")
5549            .send()
5550            .await
5551            .expect("failed to get total_minted_supply");
5552        tracing::info!("total_minted_supply={total_minted_supply:?}");
5553
5554        assert_eq!(total_minted_supply, "100000.0");
5555
5556        Ok(())
5557    }
5558
5559    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5560    async fn test_scanning_token_contract_initialized_event() -> anyhow::Result<()> {
5561        use espresso_types::v0_3::ChainConfig;
5562
5563        let blocks_per_epoch = 10;
5564
5565        let network_config = TestConfigBuilder::<1>::default()
5566            .epoch_height(blocks_per_epoch)
5567            .build();
5568
5569        let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
5570            &network_config.hotshot_config().hotshot_stake_table(),
5571            STAKE_TABLE_CAPACITY_FOR_TEST,
5572        )
5573        .unwrap();
5574
5575        let deployer = ProviderBuilder::new()
5576            .wallet(EthereumWallet::from(network_config.signer().clone()))
5577            .connect_http(network_config.l1_url().clone());
5578
5579        let mut contracts = Contracts::new();
5580        let args = DeployerArgsBuilder::default()
5581            .deployer(deployer.clone())
5582            .rpc_url(network_config.l1_url().clone())
5583            .mock_light_client(true)
5584            .genesis_lc_state(genesis_state)
5585            .genesis_st_state(genesis_stake)
5586            .blocks_per_epoch(blocks_per_epoch)
5587            .epoch_start_block(1)
5588            .multisig_pauser(network_config.signer().address())
5589            .token_name("Espresso".to_string())
5590            .token_symbol("ESP".to_string())
5591            .initial_token_supply(U256::from(3590000000u64))
5592            .ops_timelock_delay(U256::from(0))
5593            .ops_timelock_admin(network_config.signer().address())
5594            .ops_timelock_proposers(vec![network_config.signer().address()])
5595            .ops_timelock_executors(vec![network_config.signer().address()])
5596            .safe_exit_timelock_delay(U256::from(0))
5597            .safe_exit_timelock_admin(network_config.signer().address())
5598            .safe_exit_timelock_proposers(vec![network_config.signer().address()])
5599            .safe_exit_timelock_executors(vec![network_config.signer().address()])
5600            .build()
5601            .unwrap();
5602
5603        args.deploy_all(&mut contracts).await.unwrap();
5604
5605        let st_addr = contracts
5606            .address(Contract::StakeTableProxy)
5607            .expect("StakeTableProxy deployed");
5608
5609        let l1_url = network_config.l1_url().clone();
5610
5611        let storage = SqlDataSource::create_storage().await;
5612        let mut opt = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5613        let persistence = opt.create().await.unwrap();
5614
5615        let l1_client = L1ClientOptions {
5616            stake_table_update_interval: Duration::from_secs(7),
5617            l1_retry_delay: Duration::from_millis(10),
5618            l1_events_max_block_range: 10000,
5619            ..Default::default()
5620        }
5621        .connect(vec![l1_url])
5622        .unwrap();
5623        l1_client.spawn_tasks().await;
5624
5625        let fetcher = Fetcher::new(
5626            Arc::new(NullStateCatchup::default()),
5627            Arc::new(Mutex::new(persistence.clone())),
5628            l1_client.clone(),
5629            ChainConfig {
5630                stake_table_contract: Some(st_addr),
5631                base_fee: 0.into(),
5632                ..Default::default()
5633            },
5634        );
5635
5636        let provider = l1_client.provider;
5637        let stake_table = StakeTableV2::new(st_addr, provider.clone());
5638
5639        let stake_table_init_block = stake_table
5640            .initializedAtBlock()
5641            .block(BlockId::finalized())
5642            .call()
5643            .await?
5644            .to::<u64>();
5645
5646        tracing::info!("stake table init block = {stake_table_init_block}");
5647
5648        let token_address = stake_table
5649            .token()
5650            .block(BlockId::finalized())
5651            .call()
5652            .await
5653            .context("Failed to get token address")?;
5654
5655        let token = EspToken::new(token_address, provider.clone());
5656
5657        let init_log = fetcher
5658            .scan_token_contract_initialized_event_log(stake_table_init_block, token)
5659            .await
5660            .unwrap();
5661
5662        let init_tx = provider
5663            .get_transaction_receipt(
5664                init_log
5665                    .transaction_hash
5666                    .context(format!("transaction hash not found. init_log={init_log:?}"))?,
5667            )
5668            .await
5669            .unwrap()
5670            .unwrap();
5671
5672        let mint_transfer = init_tx.decoded_log::<EspToken::Transfer>().unwrap();
5673
5674        assert!(mint_transfer.value > U256::ZERO);
5675
5676        Ok(())
5677    }
5678
5679    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5680    async fn test_tx_metadata() {
5681        let port = pick_unused_port().expect("No ports free");
5682
5683        let url = format!("http://localhost:{port}").parse().unwrap();
5684        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5685
5686        let storage = SqlDataSource::create_storage().await;
5687        let network_config = TestConfigBuilder::default().build();
5688        let config = TestNetworkConfigBuilder::default()
5689            .api_config(
5690                SqlDataSource::options(&storage, Options::with_port(port))
5691                    .submit(Default::default())
5692                    .explorer(Default::default()),
5693            )
5694            .network_config(network_config)
5695            .build();
5696        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5697        let mut events = network.server.event_stream().await;
5698
5699        client.connect(None).await;
5700
5701        // Submit a few transactions in different namespaces.
5702        let namespace_counts = [(101, 1), (102, 2), (103, 3)];
5703        for (ns, count) in &namespace_counts {
5704            for i in 0..*count {
5705                let ns_id = NamespaceId::from(*ns as u64);
5706                let txn = Transaction::new(ns_id, vec![*ns, i]);
5707                client
5708                    .post::<()>("submit/submit")
5709                    .body_json(&txn)
5710                    .unwrap()
5711                    .send()
5712                    .await
5713                    .unwrap();
5714                let (block, _) = wait_for_decide_on_handle(&mut events, &txn).await;
5715
5716                // Block summary should contain information about the namespace.
5717                let summary: BlockSummaryQueryData<SeqTypes> = client
5718                    .get(&format!("availability/block/summary/{block}"))
5719                    .send()
5720                    .await
5721                    .unwrap();
5722                let ns_info = summary.namespaces();
5723                assert_eq!(ns_info.len(), 1);
5724                assert_eq!(ns_info.keys().copied().collect::<Vec<_>>(), vec![ns_id]);
5725                assert_eq!(ns_info[&ns_id].num_transactions, 1);
5726                assert_eq!(ns_info[&ns_id].size, txn.size_in_block(true));
5727            }
5728        }
5729
5730        // List transactions in each namespace.
5731        for (ns, count) in &namespace_counts {
5732            tracing::info!(ns, "list transactions in namespace");
5733
5734            let ns_id = NamespaceId::from(*ns as u64);
5735            let summaries: TransactionSummariesResponse<SeqTypes> = client
5736                .get(&format!(
5737                    "explorer/transactions/latest/{count}/namespace/{ns_id}"
5738                ))
5739                .send()
5740                .await
5741                .unwrap();
5742            let txs = summaries.transaction_summaries;
5743            assert_eq!(txs.len(), *count as usize);
5744
5745            // Check that transactions are listed in descending order.
5746            for i in 0..*count {
5747                let summary = &txs[i as usize];
5748                let expected = Transaction::new(ns_id, vec![*ns, count - i - 1]);
5749                assert_eq!(summary.rollups, vec![ns_id]);
5750                assert_eq!(summary.hash, expected.commit());
5751            }
5752        }
5753    }
5754
5755    use std::time::Instant;
5756
5757    use rand::thread_rng;
5758
5759    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5760    async fn test_aggregator_namespace_endpoints() {
5761        let mut rng = thread_rng();
5762
5763        let port = pick_unused_port().expect("No ports free");
5764
5765        let url = format!("http://localhost:{port}").parse().unwrap();
5766        tracing::info!("Sequencer URL = {url}");
5767        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5768
5769        let options = Options::with_port(port).submit(Default::default());
5770        const NUM_NODES: usize = 2;
5771        // Initialize storage for each node
5772        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5773
5774        let persistence_options: [_; NUM_NODES] = storage
5775            .iter()
5776            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5777            .collect::<Vec<_>>()
5778            .try_into()
5779            .unwrap();
5780
5781        let network_config = TestConfigBuilder::default().build();
5782
5783        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5784            .api_config(SqlDataSource::options(&storage[0], options))
5785            .network_config(network_config)
5786            .persistences(persistence_options.clone())
5787            .build();
5788        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5789        let mut events = network.server.event_stream().await;
5790        let start = Instant::now();
5791        let mut total_transactions = 0;
5792        let mut tx_heights = Vec::new();
5793        let mut sizes = HashMap::new();
5794        // inserting transactions for some namespaces
5795        // the number of transactions inserted is equal to namespace number.
5796        for namespace in 1..=4 {
5797            for _count in 0..namespace {
5798                // Generate a random payload length between 4 and 10 bytes
5799                let payload_len = rng.gen_range(4..=10);
5800                let payload: Vec<u8> = (0..payload_len).map(|_| rng.gen()).collect();
5801
5802                let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
5803
5804                client.connect(None).await;
5805
5806                let hash = client
5807                    .post("submit/submit")
5808                    .body_json(&txn)
5809                    .unwrap()
5810                    .send()
5811                    .await
5812                    .unwrap();
5813                assert_eq!(txn.commit(), hash);
5814
5815                // Wait for a Decide event containing transaction matching the one we sent
5816                let (height, size) = wait_for_decide_on_handle(&mut events, &txn).await;
5817                tx_heights.push(height);
5818                total_transactions += 1;
5819                *sizes.entry(namespace).or_insert(0) += size;
5820            }
5821        }
5822
5823        let duration = start.elapsed();
5824
5825        println!("Time elapsed to submit transactions: {duration:?}");
5826
5827        let last_tx_height = tx_heights.last().unwrap();
5828        for namespace in 1..=4 {
5829            let count = client
5830                .get::<u64>(&format!("node/transactions/count/namespace/{namespace}"))
5831                .send()
5832                .await
5833                .unwrap();
5834            assert_eq!(
5835                count, namespace as u64,
5836                "Incorrect transaction count for namespace {namespace}: expected {namespace}, got \
5837                 {count}"
5838            );
5839
5840            // check the range endpoint
5841            let to_endpoint_count = client
5842                .get::<u64>(&format!(
5843                    "node/transactions/count/namespace/{namespace}/{last_tx_height}"
5844                ))
5845                .send()
5846                .await
5847                .unwrap();
5848            assert_eq!(
5849                to_endpoint_count, namespace as u64,
5850                "Incorrect transaction count for range endpoint (to only) for namespace \
5851                 {namespace}: expected {namespace}, got {to_endpoint_count}"
5852            );
5853
5854            // check the range endpoint
5855            let from_to_endpoint_count = client
5856                .get::<u64>(&format!(
5857                    "node/transactions/count/namespace/{namespace}/0/{last_tx_height}"
5858                ))
5859                .send()
5860                .await
5861                .unwrap();
5862            assert_eq!(
5863                from_to_endpoint_count, namespace as u64,
5864                "Incorrect transaction count for range endpoint (from-to) for namespace \
5865                 {namespace}: expected {namespace}, got {from_to_endpoint_count}"
5866            );
5867
5868            let ns_size = client
5869                .get::<usize>(&format!("node/payloads/size/namespace/{namespace}"))
5870                .send()
5871                .await
5872                .unwrap();
5873
5874            let expected_ns_size = *sizes.get(&namespace).unwrap();
5875            assert_eq!(
5876                ns_size, expected_ns_size,
5877                "Incorrect payload size for namespace {namespace}: expected {expected_ns_size}, \
5878                 got {ns_size}"
5879            );
5880
5881            let ns_size_to = client
5882                .get::<usize>(&format!(
5883                    "node/payloads/size/namespace/{namespace}/{last_tx_height}"
5884                ))
5885                .send()
5886                .await
5887                .unwrap();
5888            assert_eq!(
5889                ns_size_to, expected_ns_size,
5890                "Incorrect payload size for namespace {namespace} up to height {last_tx_height}: \
5891                 expected {expected_ns_size}, got {ns_size_to}"
5892            );
5893
5894            let ns_size_from_to = client
5895                .get::<usize>(&format!(
5896                    "node/payloads/size/namespace/{namespace}/0/{last_tx_height}"
5897                ))
5898                .send()
5899                .await
5900                .unwrap();
5901            assert_eq!(
5902                ns_size_from_to, expected_ns_size,
5903                "Incorrect payload size for namespace {namespace} from 0 to height \
5904                 {last_tx_height}: expected {expected_ns_size}, got {ns_size_from_to}"
5905            );
5906        }
5907
5908        let total_tx_count = client
5909            .get::<u64>("node/transactions/count")
5910            .send()
5911            .await
5912            .unwrap();
5913        assert_eq!(
5914            total_tx_count, total_transactions,
5915            "Incorrect total transaction count: expected {total_transactions}, got \
5916             {total_tx_count}"
5917        );
5918
5919        let total_payload_size = client
5920            .get::<usize>("node/payloads/size")
5921            .send()
5922            .await
5923            .unwrap();
5924
5925        let expected_total_size: usize = sizes.values().copied().sum();
5926        assert_eq!(
5927            total_payload_size, expected_total_size,
5928            "Incorrect total payload size: expected {expected_total_size}, got \
5929             {total_payload_size}"
5930        );
5931    }
5932
5933    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5934    async fn test_stream_transactions_endpoint() {
5935        // This test submits transactions to a sequencer for multiple namespaces,
5936        // waits for them to be decided, and then verifies that:
5937        // 1. All transactions appear in the transaction stream.
5938        // 2. Each namespace-specific transaction stream only includes the transactions of that namespace.
5939
5940        let mut rng = thread_rng();
5941
5942        let port = pick_unused_port().expect("No ports free");
5943
5944        let url = format!("http://localhost:{port}").parse().unwrap();
5945        tracing::info!("Sequencer URL = {url}");
5946        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5947
5948        let options = Options::with_port(port).submit(Default::default());
5949        const NUM_NODES: usize = 2;
5950        // Initialize storage for each node
5951        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5952
5953        let persistence_options: [_; NUM_NODES] = storage
5954            .iter()
5955            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5956            .collect::<Vec<_>>()
5957            .try_into()
5958            .unwrap();
5959
5960        let network_config = TestConfigBuilder::default().build();
5961
5962        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5963            .api_config(SqlDataSource::options(&storage[0], options))
5964            .network_config(network_config)
5965            .persistences(persistence_options.clone())
5966            .build();
5967        let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5968        let mut events = network.server.event_stream().await;
5969        let mut all_transactions = HashMap::new();
5970        let mut namespace_tx: HashMap<_, HashSet<_>> = HashMap::new();
5971
5972        // Submit transactions to namespaces 1 through 4
5973
5974        for namespace in 1..=4 {
5975            for _count in 0..namespace {
5976                let payload_len = rng.gen_range(4..=10);
5977                let payload: Vec<u8> = (0..payload_len).map(|_| rng.gen()).collect();
5978
5979                let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
5980
5981                client.connect(None).await;
5982
5983                let hash = client
5984                    .post("submit/submit")
5985                    .body_json(&txn)
5986                    .unwrap()
5987                    .send()
5988                    .await
5989                    .unwrap();
5990                assert_eq!(txn.commit(), hash);
5991
5992                // Wait for a Decide event containing transaction matching the one we sent
5993                wait_for_decide_on_handle(&mut events, &txn).await;
5994                // Store transaction for later validation
5995
5996                all_transactions.insert(txn.commit(), txn.clone());
5997                namespace_tx.entry(namespace).or_default().insert(txn);
5998            }
5999        }
6000
6001        let mut transactions = client
6002            .socket("availability/stream/transactions/0")
6003            .subscribe::<TransactionQueryData<SeqTypes>>()
6004            .await
6005            .expect("failed to subscribe to transactions endpoint");
6006
6007        let mut count = 0;
6008        while let Some(tx) = transactions.next().await {
6009            let tx = tx.unwrap();
6010            let expected = all_transactions
6011                .get(&tx.transaction().commit())
6012                .expect("txn not found ");
6013            assert_eq!(tx.transaction(), expected, "invalid transaction");
6014            count += 1;
6015
6016            if count == all_transactions.len() {
6017                break;
6018            }
6019        }
6020
6021        // Validate namespace-specific stream endpoint
6022
6023        for (namespace, expected_ns_txns) in &namespace_tx {
6024            let mut api_namespace_txns = client
6025                .socket(&format!(
6026                    "availability/stream/transactions/0/namespace/{namespace}",
6027                ))
6028                .subscribe::<TransactionQueryData<SeqTypes>>()
6029                .await
6030                .unwrap_or_else(|_| {
6031                    panic!("failed to subscribe to transactions namespace {namespace}")
6032                });
6033
6034            let mut received = HashSet::new();
6035
6036            while let Some(res) = api_namespace_txns.next().await {
6037                let tx = res.expect("stream error");
6038                received.insert(tx.transaction().clone());
6039
6040                if received.len() == expected_ns_txns.len() {
6041                    break;
6042                }
6043            }
6044
6045            assert_eq!(
6046                received, *expected_ns_txns,
6047                "Mismatched transactions for namespace {namespace}"
6048            );
6049        }
6050    }
6051
6052    #[rstest]
6053    #[case(PosVersionV3::new())]
6054    #[case(PosVersionV4::new())]
6055    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6056    async fn test_v3_and_v4_reward_tree_updates<Ver: Versions>(
6057        #[case] versions: Ver,
6058    ) -> anyhow::Result<()> {
6059        // This test checks that the correct merkle tree is updated based on version
6060        //
6061        // When the protocol version is v3:
6062        // - The v3 Merkle tree is updated
6063        // - The v4 Merkle tree must be empty.
6064        //
6065        // When the protocol version is v4:
6066        // - The v4 Merkle tree is updated
6067        // - The v3 Merkle tree must be empty.
6068        const EPOCH_HEIGHT: u64 = 10;
6069
6070        let network_config = TestConfigBuilder::default()
6071            .epoch_height(EPOCH_HEIGHT)
6072            .build();
6073
6074        let api_port = pick_unused_port().expect("No ports free for query service");
6075
6076        tracing::info!("API PORT = {api_port}");
6077        const NUM_NODES: usize = 5;
6078
6079        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6080        let persistence: [_; NUM_NODES] = storage
6081            .iter()
6082            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6083            .collect::<Vec<_>>()
6084            .try_into()
6085            .unwrap();
6086
6087        let config = TestNetworkConfigBuilder::with_num_nodes()
6088            .api_config(SqlDataSource::options(
6089                &storage[0],
6090                Options::with_port(api_port).catchup(Default::default()),
6091            ))
6092            .network_config(network_config)
6093            .persistences(persistence.clone())
6094            .catchups(std::array::from_fn(|_| {
6095                StatePeers::<StaticVersion<0, 1>>::from_urls(
6096                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6097                    Default::default(),
6098                    &NoMetrics,
6099                )
6100            }))
6101            .pos_hook::<Ver>(
6102                DelegationConfig::MultipleDelegators,
6103                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6104            )
6105            .await
6106            .unwrap()
6107            .build();
6108        let mut network = TestNetwork::new(config, versions).await;
6109
6110        let mut events = network.peers[2].event_stream().await;
6111        // wait for 4 epochs
6112        wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
6113
6114        let validated_state = network.server.decided_state().await;
6115        let version = Ver::Base::VERSION;
6116        if version == EpochVersion::VERSION {
6117            let v1_tree = &validated_state.reward_merkle_tree_v1;
6118            assert!(v1_tree.num_leaves() > 0, "v1 reward tree tree is empty");
6119            let v2_tree = &validated_state.reward_merkle_tree_v2;
6120            assert!(
6121                v2_tree.num_leaves() == 0,
6122                "v2 reward tree tree is not empty"
6123            );
6124        } else {
6125            let v1_tree = &validated_state.reward_merkle_tree_v1;
6126            assert!(
6127                v1_tree.num_leaves() == 0,
6128                "v1 reward tree tree is not empty"
6129            );
6130            let v2_tree = &validated_state.reward_merkle_tree_v2;
6131            assert!(v2_tree.num_leaves() > 0, "v2 reward tree tree is empty");
6132        }
6133
6134        network.stop_consensus().await;
6135        Ok(())
6136    }
6137
6138    #[rstest]
6139    #[case(PosVersionV3::new())]
6140    #[case(PosVersionV4::new())]
6141    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6142
6143    pub(crate) async fn test_state_cert_query<Ver: Versions>(#[case] versions: Ver) {
6144        const TEST_EPOCH_HEIGHT: u64 = 10;
6145        const TEST_EPOCHS: u64 = 5;
6146
6147        let network_config = TestConfigBuilder::default()
6148            .epoch_height(TEST_EPOCH_HEIGHT)
6149            .build();
6150
6151        let api_port = pick_unused_port().expect("No ports free for query service");
6152
6153        tracing::info!("API PORT = {api_port}");
6154        const NUM_NODES: usize = 2;
6155
6156        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6157        let persistence: [_; NUM_NODES] = storage
6158            .iter()
6159            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6160            .collect::<Vec<_>>()
6161            .try_into()
6162            .unwrap();
6163
6164        let config = TestNetworkConfigBuilder::with_num_nodes()
6165            .api_config(SqlDataSource::options(
6166                &storage[0],
6167                Options::with_port(api_port).catchup(Default::default()),
6168            ))
6169            .network_config(network_config)
6170            .persistences(persistence.clone())
6171            .catchups(std::array::from_fn(|_| {
6172                StatePeers::<StaticVersion<0, 1>>::from_urls(
6173                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6174                    Default::default(),
6175                    &NoMetrics,
6176                )
6177            }))
6178            .pos_hook::<Ver>(
6179                DelegationConfig::MultipleDelegators,
6180                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6181            )
6182            .await
6183            .unwrap()
6184            .build();
6185
6186        let network = TestNetwork::new(config, versions).await;
6187        let mut events = network.server.event_stream().await;
6188
6189        // Wait until 5 epochs have passed.
6190        loop {
6191            let event = events.next().await.unwrap();
6192            tracing::info!("Received event from handle: {event:?}");
6193
6194            if let hotshot::types::EventType::Decide { leaf_chain, .. } = event.event {
6195                println!(
6196                    "Decide event received: {:?}",
6197                    leaf_chain.first().unwrap().leaf.height()
6198                );
6199                if let Some(first_leaf) = leaf_chain.first() {
6200                    let height = first_leaf.leaf.height();
6201                    tracing::info!("Decide event received at height: {height}");
6202
6203                    if height >= TEST_EPOCHS * TEST_EPOCH_HEIGHT {
6204                        break;
6205                    }
6206                }
6207            }
6208        }
6209
6210        // Connect client.
6211        let client: Client<ServerError, StaticVersion<0, 1>> =
6212            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6213        client.connect(Some(Duration::from_secs(10))).await;
6214
6215        // Get the state cert for the epoch 3 to 5
6216        for i in 3..=TEST_EPOCHS {
6217            // v2
6218
6219            let state_query_data_v2 = client
6220                .get::<StateCertQueryDataV2<SeqTypes>>(&format!("availability/state-cert-v2/{i}"))
6221                .send()
6222                .await
6223                .unwrap();
6224            let state_cert_v2 = state_query_data_v2.0.clone();
6225            tracing::info!("state_cert_v2: {state_cert_v2:?}");
6226            assert_eq!(state_cert_v2.epoch.u64(), i);
6227            assert_eq!(
6228                state_cert_v2.light_client_state.block_height,
6229                i * TEST_EPOCH_HEIGHT - 5
6230            );
6231            let block_height = state_cert_v2.light_client_state.block_height;
6232
6233            let header: Header = client
6234                .get(&format!("availability/header/{block_height}"))
6235                .send()
6236                .await
6237                .unwrap();
6238
6239            // verify auth root if the consensus version is v4
6240            if header.version() == DrbAndHeaderUpgradeVersion::VERSION {
6241                let auth_root = state_cert_v2.auth_root;
6242                let header_auth_root = header.auth_root().unwrap();
6243                if auth_root.is_zero() || header_auth_root.is_zero() {
6244                    panic!("auth root shouldn't be zero");
6245                }
6246
6247                assert_eq!(auth_root, header_auth_root, "auth root mismatch");
6248            }
6249
6250            // v1
6251            let state_query_data_v1 = client
6252                .get::<StateCertQueryDataV1<SeqTypes>>(&format!("availability/state-cert/{i}"))
6253                .send()
6254                .await
6255                .unwrap();
6256
6257            let state_cert_v1 = state_query_data_v1.0.clone();
6258            tracing::info!("state_cert_v1: {state_cert_v1:?}");
6259            assert_eq!(state_query_data_v1, state_query_data_v2.into());
6260        }
6261    }
6262
6263    /// Test state certificate catchup functionality by simulating a node that falls behind and needs
6264    /// to catch up. This test starts a 5-node network with epoch height 10, waits for 3 epochs to
6265    /// pass, then removes and restarts node 0 with a fresh storage. The
6266    /// restarted node catches up for the missing state certificates.
6267
6268    #[rstest]
6269    #[case(PosVersionV3::new())]
6270    #[case(PosVersionV4::new())]
6271    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6272    pub(crate) async fn test_state_cert_catchup<Ver: Versions>(#[case] versions: Ver) {
6273        const EPOCH_HEIGHT: u64 = 10;
6274
6275        let network_config = TestConfigBuilder::default()
6276            .epoch_height(EPOCH_HEIGHT)
6277            .build();
6278
6279        let api_port = pick_unused_port().expect("No ports free for query service");
6280
6281        tracing::info!("API PORT = {api_port}");
6282        const NUM_NODES: usize = 5;
6283
6284        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6285        let persistence: [_; NUM_NODES] = storage
6286            .iter()
6287            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6288            .collect::<Vec<_>>()
6289            .try_into()
6290            .unwrap();
6291
6292        let config = TestNetworkConfigBuilder::with_num_nodes()
6293            .api_config(SqlDataSource::options(
6294                &storage[0],
6295                Options::with_port(api_port),
6296            ))
6297            .network_config(network_config)
6298            .persistences(persistence.clone())
6299            .catchups(std::array::from_fn(|_| {
6300                StatePeers::<StaticVersion<0, 1>>::from_urls(
6301                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6302                    Default::default(),
6303                    &NoMetrics,
6304                )
6305            }))
6306            .pos_hook::<Ver>(
6307                DelegationConfig::MultipleDelegators,
6308                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6309            )
6310            .await
6311            .unwrap()
6312            .build();
6313        let state = config.states()[0].clone();
6314        let mut network = TestNetwork::new(config, versions).await;
6315
6316        let mut events = network.peers[2].event_stream().await;
6317        // Wait until at least 5 epochs have passed
6318        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
6319
6320        // Remove peer 0 and restart it with the query module enabled.
6321        // Adding an additional node to the test network is not straight forward,
6322        // as the keys have already been initialized in the config above.
6323        // So, we remove this node and re-add it using the same index.
6324        network.peers.remove(0);
6325
6326        let new_storage: hotshot_query_service::data_source::sql::testing::TmpDb =
6327            SqlDataSource::create_storage().await;
6328        let new_persistence: persistence::sql::Options =
6329            <SqlDataSource as TestableSequencerDataSource>::persistence_options(&new_storage);
6330
6331        let node_0_port = pick_unused_port().expect("No ports free for query service");
6332        tracing::info!("node_0_port {node_0_port}");
6333        let opt = Options::with_port(node_0_port).query_sql(
6334            Query {
6335                peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
6336            },
6337            tmp_options(&new_storage),
6338        );
6339        let node_0 = opt
6340            .clone()
6341            .serve(|metrics, consumer, storage| {
6342                let cfg = network.cfg.clone();
6343                let new_persistence = new_persistence.clone();
6344                let state = state.clone();
6345                async move {
6346                    Ok(cfg
6347                        .init_node(
6348                            1,
6349                            state,
6350                            new_persistence.clone(),
6351                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
6352                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
6353                                Default::default(),
6354                                &NoMetrics,
6355                            )),
6356                            storage,
6357                            &*metrics,
6358                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
6359                            consumer,
6360                            versions,
6361                            Default::default(),
6362                        )
6363                        .await)
6364                }
6365                .boxed()
6366            })
6367            .await
6368            .unwrap();
6369
6370        let mut events = node_0.event_stream().await;
6371        // Wait until at least 5 epochs have passed
6372        wait_for_epochs(&mut events, EPOCH_HEIGHT, 5).await;
6373
6374        let client: Client<ServerError, StaticVersion<0, 1>> =
6375            Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
6376        client.connect(Some(Duration::from_secs(60))).await;
6377
6378        for epoch in 3..=5 {
6379            let state_cert = client
6380                .get::<StateCertQueryDataV2<SeqTypes>>(&format!(
6381                    "availability/state-cert-v2/{epoch}"
6382                ))
6383                .send()
6384                .await
6385                .unwrap();
6386            assert_eq!(state_cert.0.epoch.u64(), epoch);
6387        }
6388    }
6389
6390    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6391    async fn test_integration_commission_updates() -> anyhow::Result<()> {
6392        const NUM_NODES: usize = 3;
6393        const EPOCH_HEIGHT: u64 = 10;
6394
6395        // Use version that supports epochs (V3 or V4)
6396        let versions = PosVersionV4::new();
6397
6398        let api_port = pick_unused_port().expect("No ports free for query service");
6399
6400        // Initialize storage for nodes
6401        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6402        let persistence: [_; NUM_NODES] = storage
6403            .iter()
6404            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6405            .collect::<Vec<_>>()
6406            .try_into()
6407            .unwrap();
6408
6409        // Configure test network with epochs
6410        let network_config = TestConfigBuilder::default()
6411            .epoch_height(EPOCH_HEIGHT)
6412            .build();
6413
6414        // Build test network configuration starting with V1 stake table
6415        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
6416            .api_config(SqlDataSource::options(
6417                &storage[0],
6418                Options::with_port(api_port),
6419            ))
6420            .network_config(network_config.clone())
6421            .persistences(persistence.clone())
6422            .catchups(std::array::from_fn(|_| {
6423                StatePeers::<SequencerApiVersion>::from_urls(
6424                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6425                    Default::default(),
6426                    &NoMetrics,
6427                )
6428            }))
6429            .pos_hook::<PosVersionV4>(
6430                // We want no new rewards after setting the commission to zero.
6431                DelegationConfig::NoSelfDelegation,
6432                StakeTableContractVersion::V1, // upgraded later
6433            )
6434            .await
6435            .unwrap()
6436            .build();
6437
6438        let network = TestNetwork::new(config, versions).await;
6439        let provider = network.cfg.anvil().unwrap();
6440        let deployer_addr = network.cfg.signer().address();
6441        let mut contracts = network.contracts.unwrap();
6442        let st_addr = contracts.address(Contract::StakeTableProxy).unwrap();
6443        upgrade_stake_table_v2(
6444            provider,
6445            L1Client::new(vec![network.cfg.l1_url()])?,
6446            &mut contracts,
6447            deployer_addr,
6448            deployer_addr,
6449        )
6450        .await?;
6451
6452        let mut commissions = vec![];
6453        for (i, (validator, provider)) in
6454            network_config.validator_providers().into_iter().enumerate()
6455        {
6456            let commission = fetch_commission(provider.clone(), st_addr, validator).await?;
6457            let new_commission = match i {
6458                0 => 0u16,
6459                1 => commission.to_evm() + 500u16,
6460                2 => commission.to_evm() - 100u16,
6461                _ => unreachable!(),
6462            }
6463            .try_into()?;
6464            commissions.push((validator, commission, new_commission));
6465            tracing::info!(%validator, %commission, %new_commission, "Update commission");
6466            update_commission(provider, st_addr, new_commission)
6467                .await?
6468                .get_receipt()
6469                .await?;
6470        }
6471
6472        // wait until new stake table takes effect
6473        let current_epoch = network.peers[0]
6474            .decided_leaf()
6475            .await
6476            .epoch(EPOCH_HEIGHT)
6477            .unwrap();
6478        let target_epoch = current_epoch.u64() + 3;
6479        println!("target epoch for new stake table: {target_epoch}");
6480        let mut events = network.peers[0].event_stream().await;
6481        wait_for_epochs(&mut events, EPOCH_HEIGHT, target_epoch).await;
6482
6483        // the last epoch with the old commissions
6484        let client: Client<ServerError, SequencerApiVersion> =
6485            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6486        let validators = client
6487            .get::<ValidatorMap>(&format!("node/validators/{}", target_epoch - 1))
6488            .send()
6489            .await
6490            .expect("validators");
6491        assert!(!validators.is_empty());
6492        for (val, old_comm, _) in commissions.clone() {
6493            assert_eq!(validators.get(&val).unwrap().commission, old_comm.to_evm());
6494        }
6495
6496        // the first epoch with the new commissions
6497        let client: Client<ServerError, SequencerApiVersion> =
6498            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6499        let validators = client
6500            .get::<ValidatorMap>(&format!("node/validators/{target_epoch}"))
6501            .send()
6502            .await
6503            .expect("validators");
6504        assert!(!validators.is_empty());
6505        for (val, _, new_comm) in commissions.clone() {
6506            assert_eq!(validators.get(&val).unwrap().commission, new_comm.to_evm());
6507        }
6508
6509        let last_block_with_old_commissions = EPOCH_HEIGHT * (target_epoch - 1);
6510        let block_with_new_commissions = EPOCH_HEIGHT * target_epoch;
6511        let mut new_amounts = vec![];
6512        for (val, ..) in commissions {
6513            let before = client
6514                .get::<Option<RewardAmount>>(&format!(
6515                    "reward-state-v2/reward-balance/{last_block_with_old_commissions}/{val}"
6516                ))
6517                .send()
6518                .await?
6519                .unwrap();
6520            let after = client
6521                .get::<Option<RewardAmount>>(&format!(
6522                    "reward-state-v2/reward-balance/{block_with_new_commissions}/{val}"
6523                ))
6524                .send()
6525                .await?
6526                .unwrap();
6527            new_amounts.push(after - before);
6528        }
6529
6530        let tolerance = U256::from(10 * EPOCH_HEIGHT).into();
6531        // validator zero got new new rewards except remainders
6532        assert!(new_amounts[0] < tolerance);
6533
6534        // other validators are still receiving rewards
6535        assert!(new_amounts[1] + new_amounts[2] > tolerance);
6536
6537        Ok(())
6538    }
6539
6540    #[rstest]
6541    #[case(PosVersionV3::new())]
6542    #[case(PosVersionV4::new())]
6543    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6544    async fn test_reward_proof_endpoint<Ver: Versions>(
6545        #[case] versions: Ver,
6546    ) -> anyhow::Result<()> {
6547        const EPOCH_HEIGHT: u64 = 10;
6548        const NUM_NODES: usize = 5;
6549
6550        let network_config = TestConfigBuilder::default()
6551            .epoch_height(EPOCH_HEIGHT)
6552            .build();
6553
6554        let api_port = pick_unused_port().expect("No ports free for query service");
6555        println!("API PORT = {api_port}");
6556
6557        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6558        let persistence: [_; NUM_NODES] = storage
6559            .iter()
6560            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6561            .collect::<Vec<_>>()
6562            .try_into()
6563            .unwrap();
6564
6565        let config = TestNetworkConfigBuilder::with_num_nodes()
6566            .api_config(SqlDataSource::options(
6567                &storage[0],
6568                Options::with_port(api_port).catchup(Default::default()),
6569            ))
6570            .network_config(network_config)
6571            .persistences(persistence.clone())
6572            .catchups(std::array::from_fn(|_| {
6573                StatePeers::<StaticVersion<0, 1>>::from_urls(
6574                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6575                    Default::default(),
6576                    &NoMetrics,
6577                )
6578            }))
6579            .pos_hook::<Ver>(
6580                DelegationConfig::MultipleDelegators,
6581                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6582            )
6583            .await
6584            .unwrap()
6585            .build();
6586
6587        let mut network = TestNetwork::new(config, versions).await;
6588
6589        // wait for 4 epochs
6590        let mut events = network.server.event_stream().await;
6591        wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
6592
6593        let url = format!("http://localhost:{api_port}").parse().unwrap();
6594        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6595
6596        let validated_state = network.server.decided_state().await;
6597        let decided_leaf = network.server.decided_leaf().await;
6598        let height = decided_leaf.height();
6599
6600        // validate proof returned from the api
6601        if Ver::Base::VERSION == EpochVersion::VERSION {
6602            // V1 case
6603            wait_until_block_height(&client, "reward-state/block-height", height).await;
6604
6605            network.stop_consensus().await;
6606
6607            for (address, _) in validated_state.reward_merkle_tree_v1.iter() {
6608                let (_, expected_proof) = validated_state
6609                    .reward_merkle_tree_v1
6610                    .lookup(*address)
6611                    .expect_ok()
6612                    .unwrap();
6613
6614                let res = client
6615                    .get::<RewardAccountQueryDataV1>(&format!(
6616                        "reward-state/proof/{height}/{address}"
6617                    ))
6618                    .send()
6619                    .await
6620                    .unwrap();
6621
6622                match res.proof.proof {
6623                    RewardMerkleProofV1::Presence(p) => {
6624                        assert_eq!(
6625                            p, expected_proof,
6626                            "Proof mismatch for V1 at {height}, addr={address}"
6627                        );
6628                    },
6629                    other => panic!(
6630                        "Expected Present proof for V1 at {height}, addr={address}, got {other:?}"
6631                    ),
6632                }
6633            }
6634        } else {
6635            // V2 case
6636            wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
6637
6638            network.stop_consensus().await;
6639
6640            for (address, _) in validated_state.reward_merkle_tree_v2.iter() {
6641                let (_, expected_proof) = validated_state
6642                    .reward_merkle_tree_v2
6643                    .lookup(*address)
6644                    .expect_ok()
6645                    .unwrap();
6646
6647                let res = client
6648                    .get::<RewardAccountQueryDataV2>(&format!(
6649                        "reward-state-v2/proof/{height}/{address}"
6650                    ))
6651                    .send()
6652                    .await
6653                    .unwrap();
6654
6655                match res.proof.proof.clone() {
6656                    RewardMerkleProofV2::Presence(p) => {
6657                        assert_eq!(
6658                            p, expected_proof,
6659                            "Proof mismatch for V2 at {height}, addr={address}"
6660                        );
6661                    },
6662                    other => panic!(
6663                        "Expected Present proof for V2 at {height}, addr={address}, got {other:?}"
6664                    ),
6665                }
6666
6667                let reward_claim_input = client
6668                    .get::<RewardClaimInput>(&format!(
6669                        "reward-state-v2/reward-claim-input/{height}/{address}"
6670                    ))
6671                    .send()
6672                    .await
6673                    .unwrap();
6674
6675                assert_eq!(reward_claim_input, res.to_reward_claim_input()?);
6676            }
6677        }
6678
6679        Ok(())
6680    }
6681
6682    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6683    async fn test_all_validators_endpoint() -> anyhow::Result<()> {
6684        const EPOCH_HEIGHT: u64 = 20;
6685
6686        type V4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
6687
6688        let network_config = TestConfigBuilder::default()
6689            .epoch_height(EPOCH_HEIGHT)
6690            .build();
6691
6692        let api_port = pick_unused_port().expect("No ports free for query service");
6693
6694        const NUM_NODES: usize = 5;
6695
6696        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6697        let persistence: [_; NUM_NODES] = storage
6698            .iter()
6699            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6700            .collect::<Vec<_>>()
6701            .try_into()
6702            .unwrap();
6703
6704        let config = TestNetworkConfigBuilder::with_num_nodes()
6705            .api_config(SqlDataSource::options(
6706                &storage[0],
6707                Options::with_port(api_port),
6708            ))
6709            .network_config(network_config)
6710            .persistences(persistence.clone())
6711            .catchups(std::array::from_fn(|_| {
6712                StatePeers::<StaticVersion<0, 1>>::from_urls(
6713                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6714                    Default::default(),
6715                    &NoMetrics,
6716                )
6717            }))
6718            .pos_hook::<V4>(DelegationConfig::MultipleDelegators, Default::default())
6719            .await
6720            .unwrap()
6721            .build();
6722
6723        let network = TestNetwork::new(config, V4::new()).await;
6724        let client: Client<ServerError, SequencerApiVersion> =
6725            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6726
6727        let err = client
6728            .get::<Vec<Validator<PubKey>>>("node/all-validators/1/0/1001")
6729            .header("Accept", "application/json")
6730            .send()
6731            .await
6732            .unwrap_err();
6733
6734        assert_matches!(err, ServerError { status, message} if
6735                status == StatusCode::BAD_REQUEST
6736                && message.contains("Limit cannot be greater than 1000")
6737        );
6738
6739        // Wait for the chain to progress beyond epoch 3
6740        let mut events = network.peers[0].event_stream().await;
6741        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
6742
6743        // Verify that there are no validators for epoch # 1 and epoch # 2
6744        {
6745            client
6746                .get::<Vec<Validator<PubKey>>>("node/all-validators/1/0/100")
6747                .send()
6748                .await
6749                .unwrap()
6750                .is_empty();
6751
6752            client
6753                .get::<Vec<Validator<PubKey>>>("node/all-validators/2/0/100")
6754                .send()
6755                .await
6756                .unwrap()
6757                .is_empty();
6758        }
6759
6760        // Get the epoch # 3 validators
6761        let validators = client
6762            .get::<Vec<Validator<PubKey>>>("node/all-validators/3/0/100")
6763            .send()
6764            .await
6765            .expect("validators");
6766
6767        assert!(!validators.is_empty());
6768
6769        Ok(())
6770    }
6771
6772    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6773    async fn test_reward_accounts_catchup_endpoint() -> anyhow::Result<()> {
6774        const EPOCH_HEIGHT: u64 = 10;
6775        const NUM_NODES: usize = 3;
6776
6777        let network_config = TestConfigBuilder::default()
6778            .epoch_height(EPOCH_HEIGHT)
6779            .build();
6780
6781        let api_port = pick_unused_port().expect("No ports free for query service");
6782        println!("API PORT = {api_port}");
6783
6784        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6785        let persistence: [_; NUM_NODES] = storage
6786            .iter()
6787            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6788            .collect::<Vec<_>>()
6789            .try_into()
6790            .unwrap();
6791
6792        let config = TestNetworkConfigBuilder::with_num_nodes()
6793            .api_config(SqlDataSource::options(
6794                &storage[0],
6795                Options::with_port(api_port).catchup(Default::default()),
6796            ))
6797            .network_config(network_config)
6798            .persistences(persistence.clone())
6799            .catchups(std::array::from_fn(|_| {
6800                StatePeers::<StaticVersion<0, 1>>::from_urls(
6801                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6802                    Default::default(),
6803                    &NoMetrics,
6804                )
6805            }))
6806            .pos_hook::<PosVersionV4>(
6807                DelegationConfig::MultipleDelegators,
6808                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6809            )
6810            .await
6811            .unwrap()
6812            .build();
6813
6814        let mut network = TestNetwork::new(config, PosVersionV4::new()).await;
6815
6816        let client: Client<ServerError, StaticVersion<0, 1>> =
6817            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6818
6819        client.connect(None).await;
6820
6821        let mut events = network.server.event_stream().await;
6822        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
6823
6824        network.stop_consensus().await;
6825        let height = network.server.decided_leaf().await.height();
6826        wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
6827
6828        let err = client
6829            .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
6830                "catchup/{height}/reward-amounts/10001/0"
6831            ))
6832            .send()
6833            .await
6834            .unwrap_err();
6835
6836        assert_matches!(err, ServerError { status, .. } if
6837            status == StatusCode::BAD_REQUEST
6838
6839        );
6840
6841        let mut expected: Vec<_> = network
6842            .server
6843            .decided_state()
6844            .await
6845            .reward_merkle_tree_v2
6846            .iter()
6847            .map(|(addr, amt)| (*addr, *amt))
6848            .collect();
6849        // Results are sorted by account address descending
6850        expected.sort_by_key(|(acct, _)| std::cmp::Reverse(*acct));
6851
6852        tracing::info!("expected accounts = {expected:?}");
6853        let limit = expected.len().min(10_000) as u64;
6854        let offset = 0u64;
6855        let expected: Vec<_> = expected.into_iter().take(limit as usize).collect();
6856
6857        let res = client
6858            .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
6859                "catchup/{height}/reward-amounts/{limit}/{offset}"
6860            ))
6861            .send()
6862            .await
6863            .unwrap();
6864
6865        assert_eq!(res, expected);
6866
6867        Ok(())
6868    }
6869
6870    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6871    async fn test_get_all_reward_accounts_multiple_cases() -> anyhow::Result<()> {
6872        let storage = SqlDataSource::create_storage().await;
6873        let sql_options = tmp_options(&storage);
6874        let db = SqlStorage::connect(
6875            Config::try_from(&sql_options)?,
6876            StorageConnectionType::Sequencer,
6877        )
6878        .await?;
6879
6880        let validated_state = ValidatedState::default();
6881        let instance_state =
6882            NodeState::mock().with_genesis_version(DrbAndHeaderUpgradeVersion::version());
6883        let genesis_leaf = LeafQueryData::<SeqTypes>::genesis::<
6884            SequencerVersions<DrbAndHeaderUpgradeVersion, DrbAndHeaderUpgradeVersion>,
6885        >(&validated_state, &instance_state)
6886        .await;
6887
6888        let mut reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
6889
6890        let account1 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000001")?;
6891        let account2 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000002")?;
6892        let account3 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000003")?;
6893        let account4 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000004")?;
6894
6895        // Insert account1 with balance 1000, account2 with balance 2000
6896        let accounts_height_5 = vec![
6897            (account1, RewardAmount::from(1000u64)),
6898            (account2, RewardAmount::from(2000u64)),
6899        ];
6900
6901        let accounts_height_10 = vec![
6902            (account1, RewardAmount::from(1500u64)),
6903            (account3, RewardAmount::from(3000u64)),
6904        ];
6905
6906        let accounts_height_15 = vec![
6907            (account2, RewardAmount::from(2500u64)),
6908            (account4, RewardAmount::from(4000u64)),
6909        ];
6910
6911        let mut tx = db.write().await?;
6912
6913        let header_json = serde_json::to_value(genesis_leaf.header())?;
6914
6915        for height in [5i64, 10, 15, 16] {
6916            query(
6917                "INSERT INTO header (height, hash, payload_hash, timestamp, data)
6918                     VALUES ($1, $2, $3, $4, $5)",
6919            )
6920            .bind(height)
6921            .bind(format!("hash_{height}"))
6922            .bind("payload_hash")
6923            .bind(0i64)
6924            .bind(&header_json)
6925            .execute(tx.as_mut())
6926            .await?;
6927        }
6928
6929        for (height, accounts) in [
6930            (5u64, &accounts_height_5),
6931            (10, &accounts_height_10),
6932            (15, &accounts_height_15),
6933        ] {
6934            for (account, balance) in accounts {
6935                reward_tree.update(*account, *balance)?;
6936
6937                let (_, proof) = reward_tree.lookup(*account).expect_ok().unwrap();
6938
6939                let traversal_path = <RewardAccountV2 as ToTraversalPath<
6940                    { RewardMerkleTreeV2::ARITY },
6941                >>::to_traversal_path(
6942                    account, reward_tree.height()
6943                );
6944
6945                UpdateStateData::<
6946                    SeqTypes,
6947                    RewardMerkleTreeV2,
6948                    { RewardMerkleTreeV2::ARITY },
6949                >::insert_merkle_nodes(&mut tx, proof, traversal_path, height)
6950                .await?;
6951            }
6952        }
6953
6954        UpdateStateData::<
6955            SeqTypes,
6956            RewardMerkleTreeV2,
6957            { RewardMerkleTreeV2::ARITY },
6958        >::set_last_state_height(&mut tx, 15)
6959        .await?;
6960
6961        tx.commit().await?;
6962
6963        let result_height_5 = db.get_all_reward_accounts(5, 0, 100).await?;
6964        assert_eq!(result_height_5.len(), 2,);
6965        for (account, balance) in &accounts_height_5 {
6966            assert!(result_height_5
6967                .iter()
6968                .any(|(acc, bal)| acc == account && bal == balance),);
6969        }
6970
6971        let result_height_10 = db.get_all_reward_accounts(10, 0, 100).await?;
6972        assert_eq!(result_height_10.len(), 3,);
6973
6974        // Verify account1 has the updated balance from height 10
6975        //  not the old balance from height 5
6976        let expected_at_height_10 = vec![
6977            (account1, RewardAmount::from(1500u64)),
6978            (account2, RewardAmount::from(2000u64)),
6979            (account3, RewardAmount::from(3000u64)),
6980        ];
6981        for (account, balance) in &expected_at_height_10 {
6982            assert!(result_height_10
6983                .iter()
6984                .any(|(acc, bal)| acc == account && bal == balance),);
6985        }
6986
6987        let result_height_15 = db.get_all_reward_accounts(15, 0, 100).await?;
6988        assert_eq!(result_height_15.len(), 4,);
6989
6990        // Verify account2 has the updated balance from height 15, and account4 is new
6991        let expected_at_height_15 = vec![
6992            (account1, RewardAmount::from(1500u64)),
6993            (account2, RewardAmount::from(2500u64)),
6994            (account3, RewardAmount::from(3000u64)),
6995            (account4, RewardAmount::from(4000u64)),
6996        ];
6997        for (account, balance) in &expected_at_height_15 {
6998            assert!(result_height_15
6999                .iter()
7000                .any(|(acc, bal)| acc == account && bal == balance),);
7001        }
7002
7003        // Test pagination
7004        // results are sorted by account address descending
7005        let result_limit_2 = db.get_all_reward_accounts(15, 0, 2).await?;
7006        assert_eq!(result_limit_2.len(), 2);
7007        assert_eq!(result_limit_2[0], (account4, RewardAmount::from(4000u64)));
7008        assert_eq!(result_limit_2[1], (account3, RewardAmount::from(3000u64)));
7009
7010        let result_offset_2 = db.get_all_reward_accounts(15, 2, 2).await?;
7011        assert_eq!(result_offset_2.len(), 2);
7012        assert_eq!(result_offset_2[0], (account2, RewardAmount::from(2500u64)));
7013        assert_eq!(result_offset_2[1], (account1, RewardAmount::from(1500u64)));
7014
7015        Ok(())
7016    }
7017
7018    ///  ensure get_all_reward_accounts fails when merklized state height
7019    /// is behind the requested height
7020    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7021    async fn test_get_all_reward_accounts_check_state_height() -> anyhow::Result<()> {
7022        let storage = SqlDataSource::create_storage().await;
7023        let sql_options = tmp_options(&storage);
7024        let db = SqlStorage::connect(
7025            Config::try_from(&sql_options)?,
7026            StorageConnectionType::Sequencer,
7027        )
7028        .await?;
7029
7030        let validated_state = ValidatedState::default();
7031        let instance_state =
7032            NodeState::mock().with_genesis_version(DrbAndHeaderUpgradeVersion::version());
7033        let genesis_leaf = LeafQueryData::<SeqTypes>::genesis::<
7034            SequencerVersions<DrbAndHeaderUpgradeVersion, DrbAndHeaderUpgradeVersion>,
7035        >(&validated_state, &instance_state)
7036        .await;
7037
7038        let mut reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
7039        let account1 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000001")?;
7040
7041        let mut tx = db.write().await?;
7042
7043        let header_json = serde_json::to_value(genesis_leaf.header())?;
7044
7045        for height in [5i64, 10, 15, 20] {
7046            query(
7047                "INSERT INTO header (height, hash, payload_hash, timestamp, data)
7048                     VALUES ($1, $2, $3, $4, $5)",
7049            )
7050            .bind(height)
7051            .bind(format!("hash_{height}"))
7052            .bind("payload_hash")
7053            .bind(0i64)
7054            .bind(&header_json)
7055            .execute(tx.as_mut())
7056            .await?;
7057        }
7058
7059        // Insert merkle data at height 5
7060        reward_tree.update(account1, RewardAmount::from(1000u64))?;
7061        let (_, proof) = reward_tree.lookup(account1).expect_ok().unwrap();
7062        let traversal_path =
7063            <RewardAccountV2 as ToTraversalPath<{ RewardMerkleTreeV2::ARITY }>>::to_traversal_path(
7064                &account1,
7065                reward_tree.height(),
7066            );
7067        UpdateStateData::<
7068            SeqTypes,
7069            RewardMerkleTreeV2,
7070            { RewardMerkleTreeV2::ARITY },
7071        >::insert_merkle_nodes(&mut tx, proof, traversal_path, 5)
7072        .await?;
7073
7074        // Set the merklized state height to 10
7075        // less than max block height of 20
7076        UpdateStateData::<
7077            SeqTypes,
7078            RewardMerkleTreeV2,
7079            { RewardMerkleTreeV2::ARITY },
7080        >::set_last_state_height(&mut tx, 10)
7081        .await?;
7082
7083        tx.commit().await?;
7084
7085        // Query at height 5 should succeed
7086        let result = db.get_all_reward_accounts(5, 0, 100).await;
7087        assert!(result.is_ok());
7088        assert_eq!(result.unwrap().len(), 1);
7089
7090        // Query at height 10 should succeed
7091        let result = db.get_all_reward_accounts(10, 0, 100).await;
7092        assert!(result.is_ok());
7093
7094        // Query at height 15 should fail
7095        // state not yet processed
7096        let result = db.get_all_reward_accounts(15, 0, 100).await;
7097        assert!(result.is_err());
7098        let err = result.unwrap_err().to_string();
7099        assert!(err.contains("not yet available"));
7100
7101        Ok(())
7102    }
7103
7104    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7105    async fn test_namespace_query_compat_v0_2() {
7106        test_namespace_query_compat_helper(SequencerVersions::<FeeVersion, FeeVersion>::new())
7107            .await;
7108    }
7109
7110    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7111    async fn test_namespace_query_compat_v0_3() {
7112        test_namespace_query_compat_helper(SequencerVersions::<EpochVersion, EpochVersion>::new())
7113            .await;
7114    }
7115
7116    async fn test_namespace_query_compat_helper<V: Versions>(v: V) {
7117        // Number of nodes running in the test network.
7118        const NUM_NODES: usize = 5;
7119
7120        let port = pick_unused_port().expect("No ports free");
7121        let url: Url = format!("http://localhost:{port}").parse().unwrap();
7122
7123        let test_config = TestConfigBuilder::default().build();
7124        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7125            .api_config(Options::from(options::Http {
7126                port,
7127                max_connections: None,
7128            }))
7129            .catchups(std::array::from_fn(|_| {
7130                StatePeers::<SequencerApiVersion>::from_urls(
7131                    vec![url.clone()],
7132                    Default::default(),
7133                    &NoMetrics,
7134                )
7135            }))
7136            .network_config(test_config)
7137            .build();
7138
7139        let mut network = TestNetwork::new(config, v).await;
7140        let mut events = network.server.event_stream().await;
7141
7142        // Submit a transaction.
7143        let ns = NamespaceId::from(10_000u64);
7144        let tx = Transaction::new(ns, vec![1, 2, 3]);
7145        network.server.submit_transaction(tx.clone()).await.unwrap();
7146        let block = wait_for_decide_on_handle(&mut events, &tx).await.0;
7147
7148        // Check namespace proof queries.
7149        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7150        client.connect(None).await;
7151
7152        let (header, common): (Header, VidCommonQueryData<SeqTypes>) = try_join!(
7153            client.get(&format!("availability/header/{block}")).send(),
7154            client
7155                .get(&format!("availability/vid/common/{block}"))
7156                .send()
7157        )
7158        .unwrap();
7159        let version = header.version();
7160
7161        // The latest version of the API (whether we specifically ask for v1 or let the redirect
7162        // occur) will give us a namespace proof no matter which VID version is in use.
7163        for api_ver in ["/v1", ""] {
7164            tracing::info!("test namespace API version: {api_ver}");
7165
7166            let ns_proof: NamespaceProofQueryData = client
7167                .get(&format!(
7168                    "{api_ver}/availability/block/{block}/namespace/{ns}"
7169                ))
7170                .send()
7171                .await
7172                .unwrap();
7173            let proof = ns_proof.proof.as_ref().unwrap();
7174            if version < EpochVersion::version() {
7175                assert!(matches!(proof, NsProof::V0(..)));
7176            } else {
7177                assert!(matches!(proof, NsProof::V1(..)));
7178            }
7179            let (txs, ns_from_proof) = proof
7180                .verify(
7181                    header.ns_table(),
7182                    &header.payload_commitment(),
7183                    common.common(),
7184                )
7185                .unwrap();
7186            assert_eq!(ns_from_proof, ns);
7187            assert_eq!(txs, ns_proof.transactions);
7188            assert_eq!(txs, std::slice::from_ref(&tx));
7189
7190            // Test range endpoint.
7191            let ns_proofs: Vec<NamespaceProofQueryData> = client
7192                .get(&format!(
7193                    "{api_ver}/availability/block/{}/{}/namespace/{ns}",
7194                    block,
7195                    block + 1
7196                ))
7197                .send()
7198                .await
7199                .unwrap();
7200            assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
7201
7202            // Any API version can correctly tell us that the namespace does not exist.
7203            let ns_proof: NamespaceProofQueryData = client
7204                .get(&format!(
7205                    "{api_ver}/availability/block/{}/namespace/{ns}",
7206                    block - 1
7207                ))
7208                .send()
7209                .await
7210                .unwrap();
7211            assert_eq!(ns_proof.proof, None);
7212            assert_eq!(ns_proof.transactions, vec![]);
7213
7214            // Test streaming.
7215            let mut proofs = client
7216                .socket(&format!(
7217                    "{api_ver}/availability/stream/blocks/0/namespace/{ns}"
7218                ))
7219                .subscribe()
7220                .await
7221                .unwrap();
7222            for i in 0.. {
7223                tracing::info!(i, "stream proof");
7224                let proof: NamespaceProofQueryData = proofs.next().await.unwrap().unwrap();
7225                if proof.proof.is_none() {
7226                    tracing::info!("waiting for non-trivial proof from stream");
7227                    continue;
7228                }
7229                assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
7230                break;
7231            }
7232        }
7233
7234        // The legacy version of the API only works for old VID.
7235        tracing::info!("test namespace API version: v0");
7236        if version < EpochVersion::version() {
7237            let ns_proof: ADVZNamespaceProofQueryData = client
7238                .get(&format!("v0/availability/block/{block}/namespace/{ns}"))
7239                .send()
7240                .await
7241                .unwrap();
7242            let proof = ns_proof.proof.as_ref().unwrap();
7243            let VidCommon::V0(common) = common.common() else {
7244                panic!("wrong VID common version");
7245            };
7246            let (txs, ns_from_proof) = proof
7247                .verify(header.ns_table(), &header.payload_commitment(), common)
7248                .unwrap();
7249            assert_eq!(ns_from_proof, ns);
7250            assert_eq!(txs, ns_proof.transactions);
7251            assert_eq!(&txs, std::slice::from_ref(&tx));
7252
7253            // Test range endpoint.
7254            let ns_proofs: Vec<ADVZNamespaceProofQueryData> = client
7255                .get(&format!(
7256                    "v0/availability/block/{}/{}/namespace/{ns}",
7257                    block,
7258                    block + 1
7259                ))
7260                .send()
7261                .await
7262                .unwrap();
7263            assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
7264        } else {
7265            // It will fail if we ask for a proof for a block using new VID.
7266            client
7267                .get::<ADVZNamespaceProofQueryData>(&format!(
7268                    "v0/availability/block/{block}/namespace/{ns}"
7269                ))
7270                .send()
7271                .await
7272                .unwrap_err();
7273        }
7274
7275        // Any API version can correctly tell us that the namespace does not exist.
7276        let ns_proof: ADVZNamespaceProofQueryData = client
7277            .get(&format!(
7278                "v0/availability/block/{}/namespace/{ns}",
7279                block - 1
7280            ))
7281            .send()
7282            .await
7283            .unwrap();
7284        assert_eq!(ns_proof.proof, None);
7285        assert_eq!(ns_proof.transactions, vec![]);
7286
7287        // Use the legacy API to stream namespace proofs until we get to a non-trivial proof or a
7288        // VID version we can't deal with.
7289        let mut proofs = client
7290            .socket(&format!("v0/availability/stream/blocks/0/namespace/{ns}"))
7291            .subscribe()
7292            .await
7293            .unwrap();
7294        for i in 0.. {
7295            tracing::info!(i, "stream proof");
7296            let proof: ADVZNamespaceProofQueryData = match proofs.next().await {
7297                Some(proof) => proof.unwrap(),
7298                None => {
7299                    // Steam not expected to end on legacy consensus version.
7300                    assert!(
7301                        version >= EpochVersion::version(),
7302                        "legacy steam ended while still on legacy consensus"
7303                    );
7304                    break;
7305                },
7306            };
7307            if proof.proof.is_none() {
7308                tracing::info!("waiting for non-trivial proof from stream");
7309                continue;
7310            }
7311            assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
7312            break;
7313        }
7314
7315        network.server.shut_down().await;
7316    }
7317
7318    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7319    async fn test_light_client_completeness() {
7320        // Run the through a protocol upgrade and epoch change, then check that we are able to get a
7321        // correct light client proof for every finalized leaf.
7322
7323        const NUM_NODES: usize = 1;
7324        const EPOCH_HEIGHT: u64 = 200;
7325
7326        let upgrade_version = EpochVersion::version();
7327        let port = pick_unused_port().expect("No ports free");
7328        let url: Url = format!("http://localhost:{port}").parse().unwrap();
7329
7330        let test_config = TestConfigBuilder::default()
7331            .epoch_height(EPOCH_HEIGHT)
7332            .epoch_start_block(321)
7333            .set_upgrades(upgrade_version)
7334            .await
7335            .build();
7336
7337        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7338        let persistence: [_; NUM_NODES] = storage
7339            .iter()
7340            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7341            .collect::<Vec<_>>()
7342            .try_into()
7343            .unwrap();
7344
7345        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7346            .api_config(
7347                SqlDataSource::options(&storage[0], Options::with_port(port))
7348                    .light_client(Default::default()),
7349            )
7350            .persistences(persistence.clone())
7351            .catchups(std::array::from_fn(|_| {
7352                StatePeers::<SequencerApiVersion>::from_urls(
7353                    vec![url.clone()],
7354                    Default::default(),
7355                    &NoMetrics,
7356                )
7357            }))
7358            .network_config(test_config)
7359            .build();
7360
7361        let mut network = TestNetwork::new(
7362            config,
7363            SequencerVersions::<LegacyVersion, EpochVersion>::new(),
7364        )
7365        .await;
7366        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7367        client.connect(None).await;
7368
7369        // Get a leaf stream so that we can wait for various events. Also keep track of each leaf
7370        // yielded, which we can use as ground truth later in the test.
7371        let mut actual_leaves = vec![];
7372        let mut actual_blocks = vec![];
7373        let mut leaves = client
7374            .socket("availability/stream/leaves/0")
7375            .subscribe::<LeafQueryData<SeqTypes>>()
7376            .await
7377            .unwrap()
7378            .zip(
7379                client
7380                    .socket("availability/stream/blocks/0")
7381                    .subscribe::<BlockQueryData<SeqTypes>>()
7382                    .await
7383                    .unwrap(),
7384            )
7385            .map(|(leaf, block)| {
7386                let leaf = leaf.unwrap();
7387                let block = block.unwrap();
7388                actual_leaves.push(leaf.clone());
7389                actual_blocks.push(block);
7390                leaf
7391            });
7392
7393        // Wait for the upgrade to take effect.
7394        let (upgrade_height, first_epoch) = loop {
7395            let leaf: LeafQueryData<SeqTypes> = leaves.next().await.unwrap();
7396            if leaf.header().version() < EpochVersion::version() {
7397                tracing::info!(version = %leaf.header().version(), height = leaf.header().height(), view = ?leaf.leaf().view_number(), "waiting for epoch upgrade");
7398                continue;
7399            }
7400            break (leaf.height(), leaf.leaf().epoch(EPOCH_HEIGHT).unwrap());
7401        };
7402        tracing::info!(upgrade_height, ?first_epoch, "epochs enabled");
7403
7404        // Wait for two epoch changes (so we get to the first epoch that actually uses the stake
7405        // table).
7406        let mut epoch_heights = [0; 2];
7407        for (i, epoch_height) in epoch_heights.iter_mut().enumerate() {
7408            let desired_epoch = first_epoch + (i as u64) + 1;
7409            *epoch_height = loop {
7410                let leaf = leaves.next().await.unwrap();
7411                let epoch = leaf.leaf().epoch(EPOCH_HEIGHT).unwrap();
7412                if epoch > desired_epoch {
7413                    tracing::info!(
7414                        height = leaf.height(),
7415                        ?desired_epoch,
7416                        ?epoch,
7417                        "changed epoch"
7418                    );
7419                    break leaf.height();
7420                }
7421                tracing::info!(
7422                    ?desired_epoch,
7423                    height = leaf.header().height(),
7424                    view = ?leaf.leaf().view_number(),
7425                    "waiting for epoch change"
7426                );
7427            };
7428        }
7429
7430        // Wait a few more blocks.
7431        let max_block = epoch_heights[1] + 1;
7432        loop {
7433            let leaf = leaves.next().await.unwrap();
7434            if leaf.height() > max_block {
7435                break;
7436            }
7437            tracing::info!(max_block, height = leaf.height(), "waiting for block");
7438        }
7439
7440        // Stop consensus. All the blocks we are going to query have already been produced.
7441        // Continuing to run consensus would just waste resources while we check stuff.
7442        network.stop_consensus().await;
7443
7444        // Check light client. Querying every single block is too slow, so we'll check a few blocks
7445        // around various critical points:
7446        let heights =
7447        // * The first few blocks, including genesis
7448            (0..=1)
7449        // * A few blocks just before and after the upgrade
7450            .chain(upgrade_height-1..=upgrade_height+1)
7451        // * A few blocks just before and after the first epoch change
7452            .chain(epoch_heights[0]-1..=epoch_heights[0] + 1)
7453        // * A few blocks just before and after the stake table comes into effect
7454            .chain(epoch_heights[1]-1..=max_block);
7455
7456        let quorum = EpochChangeQuorum::new(EPOCH_HEIGHT);
7457        for i in heights {
7458            let leaf = &actual_leaves[i as usize];
7459            let block = &actual_blocks[i as usize];
7460            tracing::info!(i, ?leaf, ?block, "check leaf");
7461
7462            // Get the same leaf proof by various IDs.
7463            let client = &client;
7464            let proofs = try_join_all(
7465                [
7466                    format!("light-client/leaf/{i}"),
7467                    format!("light-client/leaf/hash/{}", leaf.hash()),
7468                    format!("light-client/leaf/block-hash/{}", leaf.block_hash()),
7469                ]
7470                .into_iter()
7471                .map(|path| async move {
7472                    tracing::info!(i, path, "fetch leaf proof");
7473                    let proof = client.get::<LeafProof>(&path).send().await?;
7474                    Ok::<_, anyhow::Error>((path, proof))
7475                }),
7476            )
7477            .await
7478            .unwrap();
7479
7480            // Check proofs against expected leaf.
7481            for (path, proof) in proofs {
7482                tracing::info!(i, path, ?proof, "check leaf proof");
7483                assert_eq!(
7484                    proof.verify(LeafProofHint::Quorum(&quorum)).await.unwrap(),
7485                    *leaf
7486                );
7487            }
7488
7489            // Get the corresponding header.
7490            let root_height = i + 1;
7491            let root = actual_leaves[root_height as usize].header();
7492            let proofs = try_join_all(
7493                [
7494                    format!("light-client/header/{root_height}/{i}"),
7495                    format!(
7496                        "light-client/header/{root_height}/hash/{}",
7497                        leaf.block_hash()
7498                    ),
7499                ]
7500                .into_iter()
7501                .map(|path| async move {
7502                    tracing::info!(i, path, "get header proof");
7503                    let proof = client.get::<HeaderProof>(&path).send().await?;
7504                    Ok::<_, anyhow::Error>((path, proof))
7505                }),
7506            )
7507            .await
7508            .unwrap();
7509            for (path, proof) in proofs {
7510                tracing::info!(i, path, ?proof, "check header proof");
7511                assert_eq!(
7512                    proof.verify_ref(root.block_merkle_tree_root()).unwrap(),
7513                    leaf.header()
7514                );
7515            }
7516
7517            // Get the corresponding payload.
7518            let proof = client
7519                .get::<PayloadProof>(&format!("light-client/payload/{i}"))
7520                .send()
7521                .await
7522                .unwrap();
7523            assert_eq!(proof.verify(leaf.header()).unwrap(), *block.payload());
7524        }
7525
7526        // Check light client stake table.
7527        let events: Vec<StakeTableEvent> = client
7528            .get(&format!("light-client/stake-table/{}", first_epoch + 2))
7529            .send()
7530            .await
7531            .unwrap();
7532        let mut state_from_events = StakeTableState::default();
7533        for event in events {
7534            state_from_events.apply_event(event).unwrap().unwrap();
7535        }
7536
7537        assert_eq!(
7538            state_from_events.into_validators(),
7539            network
7540                .server
7541                .consensus()
7542                .read()
7543                .await
7544                .storage()
7545                .load_all_validators(first_epoch + 2, 0, 1_000_000)
7546                .await
7547                .unwrap()
7548                .into_iter()
7549                .map(|v| (v.account, v))
7550                .collect::<ValidatorMap>()
7551        );
7552
7553        // Querying for a stake table before the first real epoch is an error.
7554        let err = client
7555            .get::<Vec<StakeTableEvent>>(&format!("light-client/stake-table/{}", first_epoch + 1))
7556            .send()
7557            .await
7558            .unwrap_err();
7559        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
7560    }
7561}