1use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
2
3use alloy::primitives::{Address, U256};
4use anyhow::{bail, ensure, Context};
5use async_lock::RwLock;
6use async_once_cell::Lazy;
7use async_trait::async_trait;
8use committable::Commitment;
9use data_source::{
10 CatchupDataSource, RequestResponseDataSource, StakeTableDataSource, StakeTableWithEpochNumber,
11 StateCertDataSource, StateCertFetchingDataSource, SubmitDataSource,
12};
13use derivative::Derivative;
14use espresso_types::{
15 config::PublicNetworkConfig,
16 retain_accounts,
17 traits::EventsPersistenceRead,
18 v0::traits::{SequencerPersistence, StateCatchup},
19 v0_3::{
20 ChainConfig, RewardAccountQueryDataV1, RewardAccountV1, RewardAmount, RewardMerkleTreeV1,
21 StakeTableEvent, Validator,
22 },
23 v0_4::{RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2},
24 AccountQueryData, BlockMerkleTree, FeeAccount, FeeMerkleTree, Leaf2, NodeState, PubKey,
25 Transaction, ValidatorMap,
26};
27use futures::{
28 future::{BoxFuture, Future, FutureExt},
29 stream::BoxStream,
30};
31use hotshot_contract_adapter::sol_types::{EspToken, StakeTableV2};
32use hotshot_events_service::events_source::{
33 EventFilterSet, EventsSource, EventsStreamer, StartupInfo,
34};
35use hotshot_query_service::{availability::VidCommonQueryData, data_source::ExtensibleDataSource};
36use hotshot_types::{
37 data::{EpochNumber, VidCommitment, VidCommon, VidShare, ViewNumber},
38 event::{Event, LegacyEvent},
39 light_client::LCV3StateSignatureRequestBody,
40 network::NetworkConfig,
41 simple_certificate::LightClientStateUpdateCertificateV2,
42 traits::{
43 election::Membership,
44 network::ConnectedNetwork,
45 node_implementation::{ConsensusTime, NodeType, Versions},
46 },
47 vid::avidm::{init_avidm_param, AvidMScheme},
48 vote::HasViewNumber,
49 PeerConfig,
50};
51use itertools::Itertools;
52use jf_merkle_tree_compat::MerkleTreeScheme;
53use moka::future::Cache;
54use rand::Rng;
55use request_response::RequestType;
56use tokio::time::timeout;
57
58use self::data_source::{HotShotConfigDataSource, NodeStateDataSource, StateSignatureDataSource};
59use crate::{
60 api::data_source::TokenDataSource,
61 catchup::{
62 add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
63 add_v2_reward_accounts_to_state, CatchupStorage,
64 },
65 context::Consensus,
66 request_response::{
67 data_source::{retain_v1_reward_accounts, retain_v2_reward_accounts},
68 request::{Request, Response},
69 },
70 state_cert::{validate_state_cert, StateCertFetchError},
71 state_signature::StateSigner,
72 SeqTypes, SequencerApiVersion, SequencerContext,
73};
74
75pub mod data_source;
76pub mod endpoints;
77pub mod fs;
78pub mod light_client;
79pub mod options;
80pub mod sql;
81mod update;
82
83pub use options::Options;
84
85pub type BlocksFrontier = <BlockMerkleTree as MerkleTreeScheme>::MembershipProof;
86
87type BoxLazy<T> = Pin<Arc<Lazy<T, BoxFuture<'static, T>>>>;
88
89#[derive(Derivative)]
90#[derivative(Clone(bound = ""), Debug(bound = ""))]
91struct ApiState<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> {
92 #[derivative(Debug = "ignore")]
97 sequencer_context: BoxLazy<SequencerContext<N, P, V>>,
98
99 token_contract_address: Cache<(), Address>,
101
102 token_supply: Cache<(), U256>,
104}
105
106impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> ApiState<N, P, V> {
107 fn new(context_init: impl Future<Output = SequencerContext<N, P, V>> + Send + 'static) -> Self {
108 Self {
109 sequencer_context: Arc::pin(Lazy::from_future(context_init.boxed())),
110 token_contract_address: Cache::builder().max_capacity(1).build(),
111 token_supply: Cache::builder()
112 .max_capacity(1)
113 .time_to_live(Duration::from_secs(3600))
114 .build(),
115 }
116 }
117
118 async fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
119 self.sequencer_context
120 .as_ref()
121 .get()
122 .await
123 .get_ref()
124 .state_signer()
125 }
126
127 async fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
128 self.sequencer_context
129 .as_ref()
130 .get()
131 .await
132 .get_ref()
133 .event_streamer()
134 }
135
136 async fn consensus(&self) -> Arc<RwLock<Consensus<N, P, V>>> {
137 self.sequencer_context
138 .as_ref()
139 .get()
140 .await
141 .get_ref()
142 .consensus()
143 }
144
145 async fn network_config(&self) -> NetworkConfig<SeqTypes> {
146 self.sequencer_context
147 .as_ref()
148 .get()
149 .await
150 .get_ref()
151 .network_config()
152 }
153}
154
155type StorageState<N, P, D, V> = ExtensibleDataSource<D, ApiState<N, P, V>>;
156
157#[async_trait]
158impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> EventsSource<SeqTypes>
159 for ApiState<N, P, V>
160{
161 type EventStream = BoxStream<'static, Arc<Event<SeqTypes>>>;
162 type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<SeqTypes>>>;
163
164 async fn get_event_stream(
165 &self,
166 _filter: Option<EventFilterSet<SeqTypes>>,
167 ) -> Self::EventStream {
168 self.event_streamer()
169 .await
170 .read()
171 .await
172 .get_event_stream(None)
173 .await
174 }
175
176 async fn get_legacy_event_stream(
177 &self,
178 _filter: Option<EventFilterSet<SeqTypes>>,
179 ) -> Self::LegacyEventStream {
180 self.event_streamer()
181 .await
182 .read()
183 .await
184 .get_legacy_event_stream(None)
185 .await
186 }
187
188 async fn get_startup_info(&self) -> StartupInfo<SeqTypes> {
189 self.event_streamer()
190 .await
191 .read()
192 .await
193 .get_startup_info()
194 .await
195 }
196}
197
198impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, V: Versions, P: SequencerPersistence>
199 TokenDataSource<SeqTypes> for StorageState<N, P, D, V>
200{
201 async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
202 self.as_ref().get_total_supply_l1().await
203 }
204}
205
206impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, V: Versions, P: SequencerPersistence>
207 SubmitDataSource<N, P> for StorageState<N, P, D, V>
208{
209 async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
210 self.as_ref().submit(tx).await
211 }
212}
213
214impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
215 StakeTableDataSource<SeqTypes> for StorageState<N, P, D, V>
216{
217 async fn get_stake_table(
219 &self,
220 epoch: Option<<SeqTypes as NodeType>::Epoch>,
221 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
222 self.as_ref().get_stake_table(epoch).await
223 }
224
225 async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
227 self.as_ref().get_stake_table_current().await
228 }
229
230 async fn get_da_stake_table(
232 &self,
233 epoch: Option<<SeqTypes as NodeType>::Epoch>,
234 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
235 self.as_ref().get_da_stake_table(epoch).await
236 }
237
238 async fn get_da_stake_table_current(
240 &self,
241 ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
242 self.as_ref().get_da_stake_table_current().await
243 }
244
245 async fn get_validators(
247 &self,
248 epoch: <SeqTypes as NodeType>::Epoch,
249 ) -> anyhow::Result<ValidatorMap> {
250 self.as_ref().get_validators(epoch).await
251 }
252
253 async fn get_block_reward(
254 &self,
255 epoch: Option<EpochNumber>,
256 ) -> anyhow::Result<Option<RewardAmount>> {
257 self.as_ref().get_block_reward(epoch).await
258 }
259 async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
261 self.as_ref().current_proposal_participation().await
262 }
263 async fn previous_proposal_participation(&self) -> HashMap<PubKey, f64> {
264 self.as_ref().previous_proposal_participation().await
265 }
266
267 async fn get_all_validators(
268 &self,
269 epoch: <SeqTypes as NodeType>::Epoch,
270 offset: u64,
271 limit: u64,
272 ) -> anyhow::Result<Vec<Validator<PubKey>>> {
273 self.as_ref().get_all_validators(epoch, offset, limit).await
274 }
275
276 async fn stake_table_events(
277 &self,
278 from_l1_block: u64,
279 to_l1_block: u64,
280 ) -> anyhow::Result<Vec<StakeTableEvent>> {
281 self.as_ref()
282 .stake_table_events(from_l1_block, to_l1_block)
283 .await
284 }
285}
286
287impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> TokenDataSource<SeqTypes>
288 for ApiState<N, P, V>
289{
290 async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
291 match self.token_supply.get(&()).await {
292 Some(supply) => Ok(supply),
293 None => match self.token_contract_address.get(&()).await {
294 Some(token_contract_address) => {
295 let node_state = self.sequencer_context.as_ref().get().await.node_state();
296 let provider = node_state.l1_client.provider;
297
298 let token = EspToken::new(token_contract_address, provider.clone());
299
300 let supply = token
301 .totalSupply()
302 .call()
303 .await
304 .context("Failed to retrieve totalSupply from the contract")?;
305
306 self.token_supply.insert((), supply).await;
307
308 Ok(supply)
309 },
310 None => {
311 let node_state = self.sequencer_context.as_ref().get().await.node_state();
312 let stake_table_address = node_state
313 .chain_config
314 .stake_table_contract
315 .context("No stake table contract in chain config")?;
316
317 let provider = node_state.l1_client.provider;
318
319 let stake_table = StakeTableV2::new(stake_table_address, provider.clone());
320 let token_contract_address = stake_table.token().call().await?;
321
322 self.token_contract_address
323 .insert((), token_contract_address)
324 .await;
325
326 let token = EspToken::new(token_contract_address, provider.clone());
327
328 let supply = token
329 .totalSupply()
330 .call()
331 .await
332 .context("Failed to retrieve totalSupply from the contract")?;
333
334 self.token_supply.insert((), supply).await;
335
336 Ok(supply)
337 },
338 },
339 }
340 }
341}
342
343impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
344 StakeTableDataSource<SeqTypes> for ApiState<N, P, V>
345{
346 async fn get_stake_table(
348 &self,
349 epoch: Option<<SeqTypes as NodeType>::Epoch>,
350 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
351 let highest_epoch = self
352 .consensus()
353 .await
354 .read()
355 .await
356 .cur_epoch()
357 .await
358 .map(|e| e + 1);
359 if epoch > highest_epoch {
360 return Err(anyhow::anyhow!(
361 "requested stake table for epoch {epoch:?} is beyond the current epoch + 1 \
362 {highest_epoch:?}"
363 ));
364 }
365 let mem = self
366 .consensus()
367 .await
368 .read()
369 .await
370 .membership_coordinator
371 .stake_table_for_epoch(epoch)
372 .await?;
373
374 Ok(mem.stake_table().await.0)
375 }
376
377 async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
379 let epoch = self.consensus().await.read().await.cur_epoch().await;
380
381 Ok(StakeTableWithEpochNumber {
382 epoch,
383 stake_table: self.get_stake_table(epoch).await?,
384 })
385 }
386
387 async fn get_da_stake_table(
389 &self,
390 epoch: Option<<SeqTypes as NodeType>::Epoch>,
391 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
392 Ok(self
393 .consensus()
394 .await
395 .read()
396 .await
397 .membership_coordinator
398 .membership()
399 .read()
400 .await
401 .da_stake_table(epoch)
402 .0)
403 }
404
405 async fn get_da_stake_table_current(
407 &self,
408 ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
409 let epoch = self.consensus().await.read().await.cur_epoch().await;
410
411 Ok(StakeTableWithEpochNumber {
412 epoch,
413 stake_table: self.get_da_stake_table(epoch).await?,
414 })
415 }
416
417 async fn get_block_reward(
418 &self,
419 epoch: Option<EpochNumber>,
420 ) -> anyhow::Result<Option<RewardAmount>> {
421 let coordinator = self
422 .consensus()
423 .await
424 .read()
425 .await
426 .membership_coordinator
427 .clone();
428
429 let membership = coordinator.membership().read().await;
430 let block_reward = match epoch {
431 None => membership.fixed_block_reward(),
432 Some(e) => membership.epoch_block_reward(e),
433 };
434
435 Ok(block_reward)
436 }
437
438 async fn get_validators(
440 &self,
441 epoch: <SeqTypes as NodeType>::Epoch,
442 ) -> anyhow::Result<ValidatorMap> {
443 let mem = self
444 .consensus()
445 .await
446 .read()
447 .await
448 .membership_coordinator
449 .membership_for_epoch(Some(epoch))
450 .await
451 .context("membership not found")?;
452
453 let r = mem.coordinator.membership().read().await;
454 r.active_validators(&epoch)
455 }
456
457 async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
459 self.consensus()
460 .await
461 .read()
462 .await
463 .consensus()
464 .read()
465 .await
466 .current_proposal_participation()
467 }
468
469 async fn previous_proposal_participation(&self) -> HashMap<PubKey, f64> {
471 self.consensus()
472 .await
473 .read()
474 .await
475 .consensus()
476 .read()
477 .await
478 .previous_proposal_participation()
479 }
480
481 async fn get_all_validators(
482 &self,
483 epoch: <SeqTypes as NodeType>::Epoch,
484 offset: u64,
485 limit: u64,
486 ) -> anyhow::Result<Vec<Validator<PubKey>>> {
487 let handle = self.consensus().await;
488 let handle_read = handle.read().await;
489 let storage = handle_read.storage();
490 storage.load_all_validators(epoch, offset, limit).await
491 }
492
493 async fn stake_table_events(
494 &self,
495 from_l1_block: u64,
496 to_l1_block: u64,
497 ) -> anyhow::Result<Vec<StakeTableEvent>> {
498 let handle = self.consensus().await;
499 let handle_read = handle.read().await;
500 let storage = handle_read.storage();
501 let (status, events) = storage.load_events(from_l1_block, to_l1_block).await?;
502 ensure!(
503 status == Some(EventsPersistenceRead::Complete),
504 "some events in range [{from_l1_block}, {to_l1_block}] are not available ({status:?})"
505 );
506 Ok(events.into_iter().map(|(_, event)| event).collect())
507 }
508}
509
510impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
511 RequestResponseDataSource<SeqTypes> for StorageState<N, P, D, V>
512{
513 async fn request_vid_shares(
514 &self,
515 block_number: u64,
516 vid_common_data: VidCommonQueryData<SeqTypes>,
517 timeout_duration: Duration,
518 ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
519 self.as_ref()
520 .request_vid_shares(block_number, vid_common_data, timeout_duration)
521 .await
522 }
523}
524
525#[async_trait]
526impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
527 StateCertFetchingDataSource<SeqTypes> for StorageState<N, P, D, V>
528{
529 async fn request_state_cert(
530 &self,
531 epoch: u64,
532 timeout: Duration,
533 ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
534 self.as_ref().request_state_cert(epoch, timeout).await
535 }
536}
537
538impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
539 RequestResponseDataSource<SeqTypes> for ApiState<N, P, V>
540{
541 async fn request_vid_shares(
542 &self,
543 block_number: u64,
544 vid_common_data: VidCommonQueryData<SeqTypes>,
545 duration: Duration,
546 ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
547 let request_response_protocol = self
549 .sequencer_context
550 .as_ref()
551 .get()
552 .await
553 .request_response_protocol
554 .clone();
555
556 async move {
557 let total_weight = match vid_common_data.common() {
559 VidCommon::V0(_) => {
560 return Err(anyhow::anyhow!(
562 "V0 total weight calculation not supported yet"
563 ));
564 },
565 VidCommon::V1(v1) => v1.total_weights,
566 VidCommon::V2(v2) => v2.param.total_weights,
567 };
568
569 let avidm_param = init_avidm_param(total_weight)
571 .with_context(|| "failed to initialize avidm param")?;
572
573 let VidCommitment::V1(local_payload_hash) = vid_common_data.payload_hash() else {
575 bail!("V0 share verification not supported yet");
576 };
577
578 let request_id = rand::thread_rng().gen();
580
581 let received_shares = Arc::new(parking_lot::Mutex::new(Vec::new()));
583 let received_shares_clone = received_shares.clone();
584 let request_result: anyhow::Result<_, _> = timeout(
585 duration,
586 request_response_protocol.request_indefinitely::<_, _, _>(
587 Request::VidShare(block_number, request_id),
588 RequestType::Broadcast,
589 move |_request, response| {
590 let avidm_param = avidm_param.clone();
591 let received_shares = received_shares_clone.clone();
592 async move {
593 let Response::VidShare(VidShare::V1(received_share)) = response else {
595 bail!("V0 share verification not supported yet");
596 };
597
598 let Ok(Ok(_)) = AvidMScheme::verify_share(
600 &avidm_param,
601 &local_payload_hash,
602 &received_share,
603 ) else {
604 bail!("share verification failed");
605 };
606
607 received_shares.lock().push(received_share);
609
610 bail!("waiting for more shares");
611
612 #[allow(unreachable_code)]
613 Ok(())
614 }
615 },
616 ),
617 )
618 .await;
619
620 match request_result {
622 Err(_) => {
623 Ok(received_shares
625 .lock()
626 .clone()
627 .into_iter()
628 .map(VidShare::V1)
629 .collect())
630 },
631
632 Ok(Err(e)) => Err(e).with_context(|| "failed to request vid shares"),
634
635 Ok(Ok(_)) => bail!("this should not be possible"),
637 }
638 }
639 .boxed()
640 }
641}
642
643#[async_trait]
644impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
645 StateCertFetchingDataSource<SeqTypes> for ApiState<N, P, V>
646{
647 async fn request_state_cert(
648 &self,
649 epoch: u64,
650 timeout: Duration,
651 ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
652 tracing::info!("fetching state certificate for epoch={epoch}");
653 let consensus = self.consensus().await;
654 let consensus_read = consensus.read().await;
655
656 let current_epoch = consensus_read.cur_epoch().await;
657
658 let highest_epoch = current_epoch.map(|e| e.u64() + 1);
661
662 if Some(epoch) > highest_epoch {
663 return Err(StateCertFetchError::Other(anyhow::anyhow!(
664 "requested state certificate for epoch {epoch} is beyond the highest possible \
665 epoch {highest_epoch:?}"
666 )));
667 }
668
669 let coordinator = consensus_read.membership_coordinator.clone();
671 if let Err(err) = coordinator
672 .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
673 .await
674 {
675 tracing::warn!(
676 "Failed to get membership for epoch {epoch}: {err:#}. Waiting for catchup"
677 );
678
679 coordinator
680 .wait_for_catchup(EpochNumber::new(epoch))
681 .await
682 .map_err(|e| {
683 StateCertFetchError::Other(
684 anyhow::Error::new(e)
685 .context(format!("failed to catch up for stake table epoch={epoch}")),
686 )
687 })?;
688 }
689
690 let membership = coordinator
691 .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
692 .await
693 .map_err(|e| {
694 StateCertFetchError::Other(
695 anyhow::Error::new(e)
696 .context(format!("failed to get stake table for epoch={epoch}")),
697 )
698 })?;
699
700 let stake_table = membership.stake_table().await;
701
702 drop(consensus_read);
703 drop(consensus);
704
705 let state_catchup = self
706 .sequencer_context
707 .as_ref()
708 .get()
709 .await
710 .node_state()
711 .state_catchup
712 .clone();
713
714 let result = tokio::time::timeout(timeout, state_catchup.fetch_state_cert(epoch)).await;
715
716 match result {
717 Err(_) => Err(StateCertFetchError::FetchError(anyhow::anyhow!(
718 "timeout while fetching state cert for epoch {epoch}"
719 ))),
720 Ok(Ok(cert)) => {
721 validate_state_cert(&cert, &stake_table).map_err(|e| {
723 StateCertFetchError::ValidationError(e.context(format!(
724 "state certificate validation failed for epoch={epoch}"
725 )))
726 })?;
727
728 tracing::info!("fetched and validated state certificate for epoch {epoch}");
729 Ok(cert)
730 },
731 Ok(Err(e)) => Err(StateCertFetchError::FetchError(
732 e.context(format!("failed to fetch state cert for epoch {epoch}")),
733 )),
734 }
735 }
736}
737
738#[async_trait]
740impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence> StateCertDataSource
741 for StorageState<N, P, D, V>
742{
743 async fn get_state_cert_by_epoch(
744 &self,
745 epoch: u64,
746 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
747 self.as_ref().get_state_cert_by_epoch(epoch).await
748 }
749
750 async fn insert_state_cert(
751 &self,
752 epoch: u64,
753 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
754 ) -> anyhow::Result<()> {
755 self.as_ref().insert_state_cert(epoch, cert).await
756 }
757}
758
759#[async_trait]
760impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> StateCertDataSource
761 for ApiState<N, P, V>
762{
763 async fn get_state_cert_by_epoch(
764 &self,
765 epoch: u64,
766 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
767 let consensus = self.consensus().await;
768 let consensus_lock = consensus.read().await;
769 let persistence = consensus_lock.storage();
770
771 persistence.get_state_cert_by_epoch(epoch).await
772 }
773
774 async fn insert_state_cert(
775 &self,
776 epoch: u64,
777 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
778 ) -> anyhow::Result<()> {
779 let consensus = self.consensus().await;
780 let consensus_lock = consensus.read().await;
781 let persistence = consensus_lock.storage();
782
783 persistence.insert_state_cert(epoch, cert).await
784 }
785}
786
787impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> SubmitDataSource<N, P>
788 for ApiState<N, P, V>
789{
790 async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
791 let handle = self.consensus().await;
792
793 let consensus_read_lock = handle.read().await;
794
795 let cf = consensus_read_lock
799 .decided_state()
800 .await
801 .chain_config
802 .resolve();
803
804 let cf = match cf {
808 Some(cf) => cf,
809 None => self.node_state().await.chain_config,
810 };
811
812 let max_block_size: u64 = cf.max_block_size.into();
813 let txn_size = tx.payload().len() as u64;
814
815 if txn_size > max_block_size {
817 bail!("transaction size ({txn_size}) is greater than max_block_size ({max_block_size})")
818 }
819
820 consensus_read_lock.submit_transaction(tx).await?;
821 Ok(())
822 }
823}
824
825impl<N, P, D, V> NodeStateDataSource for StorageState<N, P, D, V>
826where
827 N: ConnectedNetwork<PubKey>,
828 V: Versions,
829 P: SequencerPersistence,
830 D: Sync,
831{
832 async fn node_state(&self) -> NodeState {
833 self.as_ref().node_state().await
834 }
835}
836
837impl<
838 N: ConnectedNetwork<PubKey>,
839 V: Versions,
840 P: SequencerPersistence,
841 D: CatchupStorage + Send + Sync,
842 > CatchupDataSource for StorageState<N, P, D, V>
843{
844 #[tracing::instrument(skip(self, instance))]
845 async fn get_accounts(
846 &self,
847 instance: &NodeState,
848 height: u64,
849 view: ViewNumber,
850 accounts: &[FeeAccount],
851 ) -> anyhow::Result<FeeMerkleTree> {
852 match self
854 .as_ref()
855 .get_accounts(instance, height, view, accounts)
856 .await
857 {
858 Ok(accounts) => return Ok(accounts),
859 Err(err) => {
860 tracing::info!("accounts not in memory, trying storage: {err:#}");
861 },
862 }
863
864 let (tree, leaf) = self
866 .inner()
867 .get_accounts(instance, height, view, accounts)
868 .await
869 .context("accounts not in memory, and could not fetch from storage")?;
870 let consensus = self
874 .as_ref()
875 .consensus()
876 .await
877 .read()
878 .await
879 .consensus()
880 .clone();
881 if let Err(err) =
882 add_fee_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf).await
883 {
884 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
885 }
886 tracing::info!(?view, "updated with fetched account state");
887
888 Ok(tree)
889 }
890
891 #[tracing::instrument(skip(self, instance))]
892 async fn get_frontier(
893 &self,
894 instance: &NodeState,
895 height: u64,
896 view: ViewNumber,
897 ) -> anyhow::Result<BlocksFrontier> {
898 match self.as_ref().get_frontier(instance, height, view).await {
900 Ok(frontier) => return Ok(frontier),
901 Err(err) => {
902 tracing::info!("frontier is not in memory, trying storage: {err:#}");
903 },
904 }
905
906 self.inner().get_frontier(instance, height, view).await
908 }
909
910 async fn get_chain_config(
911 &self,
912 commitment: Commitment<ChainConfig>,
913 ) -> anyhow::Result<ChainConfig> {
914 match self.as_ref().get_chain_config(commitment).await {
916 Ok(cf) => return Ok(cf),
917 Err(err) => {
918 tracing::info!("chain config is not in memory, trying storage: {err:#}");
919 },
920 }
921
922 self.inner().get_chain_config(commitment).await
924 }
925 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
926 match self.as_ref().get_leaf_chain(height).await {
928 Ok(cf) => return Ok(cf),
929 Err(err) => {
930 tracing::info!("leaf chain is not in memory, trying storage: {err:#}");
931 },
932 }
933
934 self.inner().get_leaf_chain(height).await
936 }
937
938 #[tracing::instrument(skip(self, instance))]
939 async fn get_reward_accounts_v2(
940 &self,
941 instance: &NodeState,
942 height: u64,
943 view: ViewNumber,
944 accounts: &[RewardAccountV2],
945 ) -> anyhow::Result<RewardMerkleTreeV2> {
946 match self
948 .as_ref()
949 .get_reward_accounts_v2(instance, height, view, accounts)
950 .await
951 {
952 Ok(accounts) => return Ok(accounts),
953 Err(err) => {
954 tracing::info!("reward accounts not in memory, trying storage: {err:#}");
955 },
956 }
957
958 let (tree, leaf) = self
960 .inner()
961 .get_reward_accounts_v2(instance, height, view, accounts)
962 .await
963 .context("accounts not in memory, and could not fetch from storage")?;
964
965 let consensus = self
968 .as_ref()
969 .consensus()
970 .await
971 .read()
972 .await
973 .consensus()
974 .clone();
975 if let Err(err) =
976 add_v2_reward_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf)
977 .await
978 {
979 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
980 }
981 tracing::info!(?view, "updated with fetched account state");
982
983 Ok(tree)
984 }
985
986 async fn get_all_reward_accounts(
987 &self,
988 height: u64,
989 offset: u64,
990 limit: u64,
991 ) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
992 self.inner()
993 .get_all_reward_accounts(height, offset, limit)
994 .await
995 }
996
997 #[tracing::instrument(skip(self, instance))]
998 async fn get_reward_accounts_v1(
999 &self,
1000 instance: &NodeState,
1001 height: u64,
1002 view: ViewNumber,
1003 accounts: &[RewardAccountV1],
1004 ) -> anyhow::Result<RewardMerkleTreeV1> {
1005 match self
1007 .as_ref()
1008 .get_reward_accounts_v1(instance, height, view, accounts)
1009 .await
1010 {
1011 Ok(accounts) => return Ok(accounts),
1012 Err(err) => {
1013 tracing::info!("reward accounts not in memory, trying storage: {err:#}");
1014 },
1015 }
1016
1017 let (tree, leaf) = self
1019 .inner()
1020 .get_reward_accounts_v1(instance, height, view, accounts)
1021 .await
1022 .context("accounts not in memory, and could not fetch from storage")?;
1023
1024 let consensus = self
1027 .as_ref()
1028 .consensus()
1029 .await
1030 .read()
1031 .await
1032 .consensus()
1033 .clone();
1034 if let Err(err) =
1035 add_v1_reward_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf)
1036 .await
1037 {
1038 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
1039 }
1040 tracing::info!(?view, "updated with fetched account state");
1041
1042 Ok(tree)
1043 }
1044
1045 #[tracing::instrument(skip(self))]
1046 async fn get_state_cert(
1047 &self,
1048 epoch: u64,
1049 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1050 let consensus = self.as_ref().consensus().await;
1051 let consensus_lock = consensus.read().await;
1052 let persistence = consensus_lock.storage();
1053
1054 persistence
1055 .get_state_cert_by_epoch(epoch)
1056 .await?
1057 .context(format!("state cert for epoch {epoch} not found"))
1058 }
1059}
1060
1061impl<N, V, P> NodeStateDataSource for ApiState<N, P, V>
1062where
1063 N: ConnectedNetwork<PubKey>,
1064 V: Versions,
1065 P: SequencerPersistence,
1066{
1067 async fn node_state(&self) -> NodeState {
1068 self.sequencer_context.as_ref().get().await.node_state()
1069 }
1070}
1071
1072impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> CatchupDataSource
1073 for ApiState<N, P, V>
1074{
1075 #[tracing::instrument(skip(self, _instance))]
1076 async fn get_accounts(
1077 &self,
1078 _instance: &NodeState,
1079 height: u64,
1080 view: ViewNumber,
1081 accounts: &[FeeAccount],
1082 ) -> anyhow::Result<FeeMerkleTree> {
1083 let state = self
1084 .consensus()
1085 .await
1086 .read()
1087 .await
1088 .state(view)
1089 .await
1090 .context(format!(
1091 "state not available for height {height}, view {view}"
1092 ))?;
1093 retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
1094 }
1095
1096 #[tracing::instrument(skip(self, _instance))]
1097 async fn get_frontier(
1098 &self,
1099 _instance: &NodeState,
1100 height: u64,
1101 view: ViewNumber,
1102 ) -> anyhow::Result<BlocksFrontier> {
1103 let state = self
1104 .consensus()
1105 .await
1106 .read()
1107 .await
1108 .state(view)
1109 .await
1110 .context(format!(
1111 "state not available for height {height}, view {view}"
1112 ))?;
1113 let tree = &state.block_merkle_tree;
1114 let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
1115 Ok(frontier)
1116 }
1117
1118 async fn get_chain_config(
1119 &self,
1120 commitment: Commitment<ChainConfig>,
1121 ) -> anyhow::Result<ChainConfig> {
1122 let state = self.consensus().await.read().await.decided_state().await;
1123 let chain_config = state.chain_config;
1124
1125 if chain_config.commit() == commitment {
1126 chain_config.resolve().context("chain config found")
1127 } else {
1128 bail!("chain config not found")
1129 }
1130 }
1131
1132 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
1133 let mut leaves = self
1134 .consensus()
1135 .await
1136 .read()
1137 .await
1138 .consensus()
1139 .read()
1140 .await
1141 .undecided_leaves();
1142 leaves.sort_by_key(|l| l.view_number());
1143 let (position, mut last_leaf) = leaves
1144 .iter()
1145 .find_position(|l| l.height() == height)
1146 .context(format!("leaf chain not available for {height}"))?;
1147 let mut chain = vec![last_leaf.clone()];
1148 for leaf in leaves.iter().skip(position + 1) {
1149 if leaf.justify_qc().view_number() == last_leaf.view_number() {
1150 chain.push(leaf.clone());
1151 } else {
1152 continue;
1153 }
1154 if leaf.view_number() == last_leaf.view_number() + 1 {
1155 last_leaf = leaf;
1157 break;
1158 }
1159 last_leaf = leaf;
1160 }
1161 for leaf in leaves
1163 .iter()
1164 .skip_while(|l| l.view_number() <= last_leaf.view_number())
1165 {
1166 if leaf.justify_qc().view_number() == last_leaf.view_number() {
1167 chain.push(leaf.clone());
1168 return Ok(chain);
1169 }
1170 }
1171 bail!(format!("leaf chain not available for {height}"))
1172 }
1173
1174 #[tracing::instrument(skip(self, _instance))]
1175 async fn get_reward_accounts_v2(
1176 &self,
1177 _instance: &NodeState,
1178 height: u64,
1179 view: ViewNumber,
1180 accounts: &[RewardAccountV2],
1181 ) -> anyhow::Result<RewardMerkleTreeV2> {
1182 let state = self
1183 .consensus()
1184 .await
1185 .read()
1186 .await
1187 .state(view)
1188 .await
1189 .context(format!(
1190 "state not available for height {height}, view {view}"
1191 ))?;
1192
1193 retain_v2_reward_accounts(&state.reward_merkle_tree_v2, accounts.iter().copied())
1194 }
1195
1196 async fn get_all_reward_accounts(
1203 &self,
1204 _height: u64,
1205 _offset: u64,
1206 _limit: u64,
1207 ) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
1208 bail!("get_all_reward_accounts is not implemented for ApiState")
1209 }
1210
1211 #[tracing::instrument(skip(self, _instance))]
1212 async fn get_reward_accounts_v1(
1213 &self,
1214 _instance: &NodeState,
1215 height: u64,
1216 view: ViewNumber,
1217 accounts: &[RewardAccountV1],
1218 ) -> anyhow::Result<RewardMerkleTreeV1> {
1219 let state = self
1220 .consensus()
1221 .await
1222 .read()
1223 .await
1224 .state(view)
1225 .await
1226 .context(format!(
1227 "state not available for height {height}, view {view}"
1228 ))?;
1229
1230 retain_v1_reward_accounts(&state.reward_merkle_tree_v1, accounts.iter().copied())
1231 }
1232
1233 async fn get_state_cert(
1234 &self,
1235 epoch: u64,
1236 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1237 self.get_state_cert_by_epoch(epoch)
1238 .await?
1239 .context(format!("state cert not found for epoch {epoch}"))
1240 }
1241}
1242
1243impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
1244 HotShotConfigDataSource for StorageState<N, P, D, V>
1245{
1246 async fn get_config(&self) -> PublicNetworkConfig {
1247 self.as_ref().network_config().await.into()
1248 }
1249}
1250
1251impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> HotShotConfigDataSource
1252 for ApiState<N, P, V>
1253{
1254 async fn get_config(&self) -> PublicNetworkConfig {
1255 self.network_config().await.into()
1256 }
1257}
1258
1259#[async_trait]
1260impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
1261 StateSignatureDataSource<N> for StorageState<N, P, D, V>
1262{
1263 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1264 self.as_ref().get_state_signature(height).await
1265 }
1266}
1267
1268#[async_trait]
1269impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> StateSignatureDataSource<N>
1270 for ApiState<N, P, V>
1271{
1272 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1273 self.state_signer()
1274 .await
1275 .read()
1276 .await
1277 .get_state_signature(height)
1278 .await
1279 }
1280}
1281
1282pub(crate) trait RewardAccountProofDataSource: Sync {
1283 fn load_v1_reward_account_proof(
1284 &self,
1285 _height: u64,
1286 _account: RewardAccountV1,
1287 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>> {
1288 async {
1289 bail!("reward merklized state is not supported for this data source");
1290 }
1291 }
1292
1293 fn load_v2_reward_account_proof(
1294 &self,
1295 _height: u64,
1296 _account: RewardAccountV2,
1297 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1298 async {
1299 bail!("reward merklized state is not supported for this data source");
1300 }
1301 }
1302}
1303
1304impl RewardAccountProofDataSource for hotshot_query_service::data_source::MetricsDataSource {}
1305
1306impl<T, S> RewardAccountProofDataSource
1307 for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
1308where
1309 T: RewardAccountProofDataSource,
1310 S: Sync,
1311{
1312 async fn load_v1_reward_account_proof(
1313 &self,
1314 height: u64,
1315 account: RewardAccountV1,
1316 ) -> anyhow::Result<RewardAccountQueryDataV1> {
1317 self.inner()
1318 .load_v1_reward_account_proof(height, account)
1319 .await
1320 }
1321
1322 async fn load_v2_reward_account_proof(
1323 &self,
1324 height: u64,
1325 account: RewardAccountV2,
1326 ) -> anyhow::Result<RewardAccountQueryDataV2> {
1327 self.inner()
1328 .load_v2_reward_account_proof(height, account)
1329 .await
1330 }
1331}
1332
1333#[cfg(any(test, feature = "testing"))]
1334pub mod test_helpers {
1335 use std::{cmp::max, time::Duration};
1336
1337 use alloy::{
1338 network::EthereumWallet,
1339 primitives::{Address, U256},
1340 providers::{ext::AnvilApi, ProviderBuilder},
1341 };
1342 use committable::Committable;
1343 use espresso_contract_deployer::{
1344 builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
1345 Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1346 };
1347 use espresso_types::{
1348 v0::traits::{NullEventConsumer, PersistenceOptions, StateCatchup},
1349 DrbAndHeaderUpgradeVersion, EpochVersion, FeeVersion, MockSequencerVersions, NamespaceId,
1350 SequencerVersions, ValidatedState, V0_1,
1351 };
1352 use futures::{
1353 future::{join_all, FutureExt},
1354 stream::StreamExt,
1355 };
1356 use hotshot::types::{Event, EventType};
1357 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1358 use hotshot_types::{
1359 event::LeafInfo,
1360 light_client::LCV3StateSignatureRequestBody,
1361 traits::{metrics::NoMetrics, node_implementation::ConsensusTime},
1362 HotShotConfig,
1363 };
1364 use itertools::izip;
1365 use jf_merkle_tree_compat::{MerkleCommitment, MerkleTreeScheme};
1366 use portpicker::pick_unused_port;
1367 use staking_cli::demo::{DelegationConfig, StakingTransactions};
1368 use surf_disco::Client;
1369 use tempfile::TempDir;
1370 use tide_disco::{error::ServerError, Api, App, Error, StatusCode};
1371 use tokio::{spawn, task::JoinHandle, time::sleep};
1372 use url::Url;
1373 use vbs::version::{StaticVersion, StaticVersionType};
1374
1375 use super::*;
1376 use crate::{
1377 catchup::NullStateCatchup,
1378 network,
1379 persistence::no_storage,
1380 testing::{run_legacy_builder, wait_for_decide_on_handle, TestConfig, TestConfigBuilder},
1381 };
1382
1383 pub const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
1384
1385 pub struct TestNetwork<P: PersistenceOptions, const NUM_NODES: usize, V: Versions> {
1386 pub server: SequencerContext<network::Memory, P::Persistence, V>,
1387 pub peers: Vec<SequencerContext<network::Memory, P::Persistence, V>>,
1388 pub cfg: TestConfig<{ NUM_NODES }>,
1389 pub temp_dir: Option<TempDir>,
1391 pub contracts: Option<Contracts>,
1392 }
1393
1394 pub struct TestNetworkConfig<const NUM_NODES: usize, P, C>
1395 where
1396 P: PersistenceOptions,
1397 C: StateCatchup + 'static,
1398 {
1399 state: [ValidatedState; NUM_NODES],
1400 persistence: [P; NUM_NODES],
1401 catchup: [C; NUM_NODES],
1402 network_config: TestConfig<{ NUM_NODES }>,
1403 api_config: Options,
1404 contracts: Option<Contracts>,
1405 }
1406
1407 impl<const NUM_NODES: usize, P, C> TestNetworkConfig<{ NUM_NODES }, P, C>
1408 where
1409 P: PersistenceOptions,
1410 C: StateCatchup + 'static,
1411 {
1412 pub fn states(&self) -> [ValidatedState; NUM_NODES] {
1413 self.state.clone()
1414 }
1415 }
1416 #[derive(Clone)]
1417 pub struct TestNetworkConfigBuilder<const NUM_NODES: usize, P, C>
1418 where
1419 P: PersistenceOptions,
1420 C: StateCatchup + 'static,
1421 {
1422 state: [ValidatedState; NUM_NODES],
1423 persistence: Option<[P; NUM_NODES]>,
1424 catchup: Option<[C; NUM_NODES]>,
1425 api_config: Option<Options>,
1426 network_config: Option<TestConfig<{ NUM_NODES }>>,
1427 contracts: Option<Contracts>,
1428 }
1429
1430 impl Default for TestNetworkConfigBuilder<5, no_storage::Options, NullStateCatchup> {
1431 fn default() -> Self {
1432 TestNetworkConfigBuilder {
1433 state: std::array::from_fn(|_| ValidatedState::default()),
1434 persistence: Some([no_storage::Options; 5]),
1435 catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1436 network_config: None,
1437 api_config: None,
1438 contracts: None,
1439 }
1440 }
1441 }
1442
1443 pub enum AnyTestNetwork<P: PersistenceOptions, const NUM_NODES: usize> {
1444 V0_1(TestNetwork<P, NUM_NODES, SequencerVersions<V0_1, V0_1>>),
1445 V0_2(TestNetwork<P, NUM_NODES, SequencerVersions<FeeVersion, FeeVersion>>),
1446 V0_3(TestNetwork<P, NUM_NODES, SequencerVersions<EpochVersion, EpochVersion>>),
1447 V0_4(
1448 TestNetwork<
1449 P,
1450 NUM_NODES,
1451 SequencerVersions<DrbAndHeaderUpgradeVersion, DrbAndHeaderUpgradeVersion>,
1452 >,
1453 ),
1454 }
1455
1456 impl<P: PersistenceOptions, const NUM_NODES: usize> AnyTestNetwork<P, NUM_NODES> {
1457 pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1458 match self {
1459 AnyTestNetwork::V0_1(network) => network.cfg.hotshot_config(),
1460 AnyTestNetwork::V0_2(network) => network.cfg.hotshot_config(),
1461 AnyTestNetwork::V0_3(network) => network.cfg.hotshot_config(),
1462 AnyTestNetwork::V0_4(network) => network.cfg.hotshot_config(),
1463 }
1464 }
1465 }
1466
1467 impl<const NUM_NODES: usize>
1468 TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1469 {
1470 pub fn with_num_nodes(
1471 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1472 {
1473 TestNetworkConfigBuilder {
1474 state: std::array::from_fn(|_| ValidatedState::default()),
1475 persistence: Some([no_storage::Options; { NUM_NODES }]),
1476 catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1477 network_config: None,
1478 api_config: None,
1479 contracts: None,
1480 }
1481 }
1482 }
1483
1484 impl<const NUM_NODES: usize, P, C> TestNetworkConfigBuilder<{ NUM_NODES }, P, C>
1485 where
1486 P: PersistenceOptions,
1487 C: StateCatchup + 'static,
1488 {
1489 pub fn states(mut self, state: [ValidatedState; NUM_NODES]) -> Self {
1490 self.state = state;
1491 self
1492 }
1493
1494 pub fn persistences<NP: PersistenceOptions>(
1495 self,
1496 persistence: [NP; NUM_NODES],
1497 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, NP, C> {
1498 TestNetworkConfigBuilder {
1499 state: self.state,
1500 catchup: self.catchup,
1501 network_config: self.network_config,
1502 api_config: self.api_config,
1503 persistence: Some(persistence),
1504 contracts: self.contracts,
1505 }
1506 }
1507
1508 pub fn api_config(mut self, api_config: Options) -> Self {
1509 self.api_config = Some(api_config);
1510 self
1511 }
1512
1513 pub fn catchups<NC: StateCatchup + 'static>(
1514 self,
1515 catchup: [NC; NUM_NODES],
1516 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, P, NC> {
1517 TestNetworkConfigBuilder {
1518 state: self.state,
1519 catchup: Some(catchup),
1520 network_config: self.network_config,
1521 api_config: self.api_config,
1522 persistence: self.persistence,
1523 contracts: self.contracts,
1524 }
1525 }
1526
1527 pub fn network_config(mut self, network_config: TestConfig<{ NUM_NODES }>) -> Self {
1528 self.network_config = Some(network_config);
1529 self
1530 }
1531
1532 pub fn contracts(mut self, contracts: Contracts) -> Self {
1533 self.contracts = Some(contracts);
1534 self
1535 }
1536
1537 pub async fn pos_hook<V: Versions>(
1540 self,
1541 delegation_config: DelegationConfig,
1542 stake_table_version: StakeTableContractVersion,
1543 ) -> anyhow::Result<Self> {
1544 if <V as Versions>::Upgrade::VERSION < EpochVersion::VERSION
1545 && <V as Versions>::Base::VERSION < EpochVersion::VERSION
1546 {
1547 panic!("given version does not require pos deployment");
1548 };
1549
1550 let network_config = self
1551 .network_config
1552 .as_ref()
1553 .expect("network_config is required");
1554
1555 let l1_url = network_config.l1_url();
1556 let signer = network_config.signer();
1557 let deployer = ProviderBuilder::new()
1558 .wallet(EthereumWallet::from(signer.clone()))
1559 .connect_http(l1_url.clone());
1560
1561 let blocks_per_epoch = network_config.hotshot_config().epoch_height;
1562 let epoch_start_block = network_config.hotshot_config().epoch_start_block;
1563 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1564 &network_config.hotshot_config().hotshot_stake_table(),
1565 STAKE_TABLE_CAPACITY_FOR_TEST,
1566 )
1567 .unwrap();
1568
1569 let mut contracts = Contracts::new();
1570 let args = DeployerArgsBuilder::default()
1571 .deployer(deployer.clone())
1572 .rpc_url(l1_url.clone())
1573 .mock_light_client(true)
1574 .genesis_lc_state(genesis_state)
1575 .genesis_st_state(genesis_stake)
1576 .blocks_per_epoch(blocks_per_epoch)
1577 .epoch_start_block(epoch_start_block)
1578 .exit_escrow_period(U256::from(max(
1579 blocks_per_epoch * 15 + 100,
1580 DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1581 )))
1582 .multisig_pauser(signer.address())
1583 .token_name("Espresso".to_string())
1584 .token_symbol("ESP".to_string())
1585 .initial_token_supply(U256::from(100000u64))
1586 .ops_timelock_delay(U256::from(0))
1587 .ops_timelock_admin(signer.address())
1588 .ops_timelock_proposers(vec![signer.address()])
1589 .ops_timelock_executors(vec![signer.address()])
1590 .safe_exit_timelock_delay(U256::from(10))
1591 .safe_exit_timelock_admin(signer.address())
1592 .safe_exit_timelock_proposers(vec![signer.address()])
1593 .safe_exit_timelock_executors(vec![signer.address()])
1594 .build()
1595 .unwrap();
1596
1597 match stake_table_version {
1598 StakeTableContractVersion::V1 => {
1599 args.deploy_to_stake_table_v1(&mut contracts).await
1600 },
1601 StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1602 }
1603 .context("failed to deploy contracts")?;
1604
1605 let stake_table_address = contracts
1606 .address(Contract::StakeTableProxy)
1607 .expect("StakeTableProxy address not found");
1608
1609 StakingTransactions::create(
1610 l1_url.clone(),
1611 &deployer,
1612 stake_table_address,
1613 network_config.staking_priv_keys(),
1614 None,
1615 delegation_config,
1616 )
1617 .await
1618 .expect("stake table setup failed")
1619 .apply_all()
1620 .await
1621 .expect("send all txns failed");
1622
1623 if let Some(anvil) = network_config.anvil() {
1628 anvil
1629 .anvil_set_interval_mining(1)
1630 .await
1631 .expect("interval mining");
1632 }
1633
1634 let state = self.state[0].clone();
1638 let chain_config = if let Some(cf) = state.chain_config.resolve() {
1639 ChainConfig {
1640 base_fee: 0.into(),
1641 stake_table_contract: Some(stake_table_address),
1642 ..cf
1643 }
1644 } else {
1645 ChainConfig {
1646 base_fee: 0.into(),
1647 stake_table_contract: Some(stake_table_address),
1648 ..Default::default()
1649 }
1650 };
1651
1652 let state = ValidatedState {
1653 chain_config: chain_config.into(),
1654 ..state
1655 };
1656 Ok(self
1657 .states(std::array::from_fn(|_| state.clone()))
1658 .contracts(contracts))
1659 }
1660
1661 pub fn build(self) -> TestNetworkConfig<{ NUM_NODES }, P, C> {
1662 TestNetworkConfig {
1663 state: self.state,
1664 persistence: self.persistence.unwrap(),
1665 catchup: self.catchup.unwrap(),
1666 network_config: self.network_config.unwrap(),
1667 api_config: self.api_config.unwrap(),
1668 contracts: self.contracts,
1669 }
1670 }
1671 }
1672
1673 impl<P: PersistenceOptions, const NUM_NODES: usize, V: Versions> TestNetwork<P, { NUM_NODES }, V> {
1674 pub async fn new<C: StateCatchup + 'static>(
1675 cfg: TestNetworkConfig<{ NUM_NODES }, P, C>,
1676 bind_version: V,
1677 ) -> Self {
1678 let mut cfg = cfg;
1679 let mut builder_tasks = Vec::new();
1680
1681 let chain_config = cfg.state[0].chain_config.resolve();
1682 if chain_config.is_none() {
1683 tracing::warn!("Chain config is not set, using default max_block_size");
1684 }
1685 let (task, builder_url) = run_legacy_builder::<{ NUM_NODES }>(
1686 cfg.network_config.builder_port(),
1687 chain_config.map(|c| *c.max_block_size),
1688 )
1689 .await;
1690 builder_tasks.push(task);
1691 cfg.network_config
1692 .set_builder_urls(vec1::vec1![builder_url.clone()]);
1693
1694 let mut opt = cfg.api_config.clone();
1696 let temp_dir = if opt.storage_fs.is_none() && opt.storage_sql.is_none() {
1697 let temp_dir = tempfile::tempdir().unwrap();
1698 opt = opt.query_fs(
1699 Default::default(),
1700 crate::persistence::fs::Options::new(temp_dir.path().to_path_buf()),
1701 );
1702 Some(temp_dir)
1703 } else {
1704 None
1705 };
1706
1707 let mut nodes = join_all(
1708 izip!(cfg.state, cfg.persistence, cfg.catchup)
1709 .enumerate()
1710 .map(|(i, (state, persistence, state_peers))| {
1711 let opt = opt.clone();
1712 let cfg = &cfg.network_config;
1713 let upgrades_map = cfg.upgrades();
1714 async move {
1715 if i == 0 {
1716 opt.serve(|metrics, consumer, storage| {
1717 let cfg = cfg.clone();
1718 async move {
1719 Ok(cfg
1720 .init_node(
1721 0,
1722 state,
1723 persistence,
1724 Some(state_peers),
1725 storage,
1726 &*metrics,
1727 STAKE_TABLE_CAPACITY_FOR_TEST,
1728 consumer,
1729 bind_version,
1730 upgrades_map,
1731 )
1732 .await)
1733 }
1734 .boxed()
1735 })
1736 .await
1737 .unwrap()
1738 } else {
1739 cfg.init_node(
1740 i,
1741 state,
1742 persistence,
1743 Some(state_peers),
1744 None,
1745 &NoMetrics,
1746 STAKE_TABLE_CAPACITY_FOR_TEST,
1747 NullEventConsumer,
1748 bind_version,
1749 upgrades_map,
1750 )
1751 .await
1752 }
1753 }
1754 }),
1755 )
1756 .await;
1757
1758 let handle_0 = &nodes[0];
1759
1760 for builder_task in builder_tasks {
1762 builder_task.start(Box::new(handle_0.event_stream().await));
1763 }
1764
1765 for ctx in &nodes {
1766 ctx.start_consensus().await;
1767 }
1768
1769 let server = nodes.remove(0);
1770 let peers = nodes;
1771
1772 Self {
1773 server,
1774 peers,
1775 cfg: cfg.network_config,
1776 temp_dir,
1777 contracts: cfg.contracts,
1778 }
1779 }
1780
1781 pub async fn stop_consensus(&mut self) {
1782 self.server.shutdown_consensus().await;
1783
1784 for ctx in &mut self.peers {
1785 ctx.shutdown_consensus().await;
1786 }
1787 }
1788 }
1789
1790 pub async fn status_test_helper(opt: impl FnOnce(Options) -> Options) {
1798 let port = pick_unused_port().expect("No ports free");
1799 let url = format!("http://localhost:{port}").parse().unwrap();
1800 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1801
1802 let options = opt(Options::with_port(port));
1803 let network_config = TestConfigBuilder::default().build();
1804 let config = TestNetworkConfigBuilder::default()
1805 .api_config(options)
1806 .network_config(network_config)
1807 .build();
1808 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1809 client.connect(None).await;
1810
1811 while client
1815 .get::<u64>("status/block-height")
1816 .send()
1817 .await
1818 .unwrap()
1819 <= 1
1820 {
1821 sleep(Duration::from_secs(1)).await;
1822 }
1823 let success_rate = client
1824 .get::<f64>("status/success-rate")
1825 .send()
1826 .await
1827 .unwrap();
1828 assert!(success_rate.is_finite(), "{success_rate}");
1831 assert!(success_rate > 0.0, "{success_rate}");
1833 }
1834
1835 pub async fn submit_test_helper(opt: impl FnOnce(Options) -> Options) {
1843 let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3, 4]);
1844
1845 let port = pick_unused_port().expect("No ports free");
1846
1847 let url = format!("http://localhost:{port}").parse().unwrap();
1848 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1849
1850 let options = opt(Options::with_port(port).submit(Default::default()));
1851 let network_config = TestConfigBuilder::default().build();
1852 let config = TestNetworkConfigBuilder::default()
1853 .api_config(options)
1854 .network_config(network_config)
1855 .build();
1856 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1857 let mut events = network.server.event_stream().await;
1858
1859 client.connect(None).await;
1860
1861 let hash = client
1862 .post("submit/submit")
1863 .body_json(&txn)
1864 .unwrap()
1865 .send()
1866 .await
1867 .unwrap();
1868 assert_eq!(txn.commit(), hash);
1869
1870 wait_for_decide_on_handle(&mut events, &txn).await;
1872 }
1873
1874 pub async fn state_signature_test_helper(opt: impl FnOnce(Options) -> Options) {
1876 let port = pick_unused_port().expect("No ports free");
1877
1878 let url = format!("http://localhost:{port}").parse().unwrap();
1879
1880 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1881
1882 let options = opt(Options::with_port(port));
1883 let network_config = TestConfigBuilder::default().build();
1884 let config = TestNetworkConfigBuilder::default()
1885 .api_config(options)
1886 .network_config(network_config)
1887 .build();
1888 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1889
1890 let mut height: u64;
1891 loop {
1894 height = network.server.decided_leaf().await.height();
1895 sleep(std::time::Duration::from_secs(1)).await;
1896 if height >= 2 {
1897 break;
1898 }
1899 }
1900 client
1902 .get::<LCV3StateSignatureRequestBody>(&format!("state-signature/block/{height}"))
1903 .send()
1904 .await
1905 .unwrap();
1906 }
1907
1908 pub async fn catchup_test_helper(opt: impl FnOnce(Options) -> Options) {
1916 let port = pick_unused_port().expect("No ports free");
1917 let url = format!("http://localhost:{port}").parse().unwrap();
1918 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1919
1920 let options = opt(Options::with_port(port));
1921 let network_config = TestConfigBuilder::default().build();
1922 let config = TestNetworkConfigBuilder::default()
1923 .api_config(options)
1924 .network_config(network_config)
1925 .build();
1926 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1927 client.connect(None).await;
1928
1929 let mut events = network.server.event_stream().await;
1931 loop {
1932 if let Event {
1933 event: EventType::Decide { leaf_chain, .. },
1934 ..
1935 } = events.next().await.unwrap()
1936 {
1937 if leaf_chain
1938 .iter()
1939 .any(|LeafInfo { leaf, .. }| leaf.block_header().height() > 2)
1940 {
1941 break;
1942 }
1943 }
1944 }
1945
1946 {
1949 network.server.shutdown_consensus().await;
1950 }
1951
1952 let leaf = network.server.decided_leaf().await;
1954 let height = leaf.height() + 1;
1955 let view = leaf.view_number() + 1;
1956 let res = client
1957 .get::<AccountQueryData>(&format!(
1958 "catchup/{height}/{}/account/{:x}",
1959 view.u64(),
1960 Address::default()
1961 ))
1962 .send()
1963 .await
1964 .unwrap();
1965 assert_eq!(res.balance, U256::ZERO);
1966 assert_eq!(
1967 res.proof
1968 .verify(
1969 &network
1970 .server
1971 .state(view)
1972 .await
1973 .unwrap()
1974 .fee_merkle_tree
1975 .commitment()
1976 )
1977 .unwrap(),
1978 U256::ZERO,
1979 );
1980
1981 let res = client
1983 .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
1984 .send()
1985 .await
1986 .unwrap();
1987 let root = &network
1988 .server
1989 .state(view)
1990 .await
1991 .unwrap()
1992 .block_merkle_tree
1993 .commitment();
1994 BlockMerkleTree::verify(root, root.size() - 1, res)
1995 .unwrap()
1996 .unwrap();
1997 }
1998
1999 pub async fn spawn_dishonest_peer_catchup_api() -> anyhow::Result<(Url, JoinHandle<()>)> {
2000 let toml = toml::from_str::<toml::Value>(include_str!("../api/catchup.toml")).unwrap();
2001 let mut api =
2002 Api::<(), hotshot_query_service::Error, SequencerApiVersion>::new(toml).unwrap();
2003
2004 api.get("account", |_req, _state: &()| {
2005 async move {
2006 Result::<AccountQueryData, _>::Err(hotshot_query_service::Error::catch_all(
2007 StatusCode::BAD_REQUEST,
2008 "no account found".to_string(),
2009 ))
2010 }
2011 .boxed()
2012 })?
2013 .get("blocks", |_req, _state| {
2014 async move {
2015 Result::<BlocksFrontier, _>::Err(hotshot_query_service::Error::catch_all(
2016 StatusCode::BAD_REQUEST,
2017 "no block found".to_string(),
2018 ))
2019 }
2020 .boxed()
2021 })?
2022 .get("chainconfig", |_req, _state| {
2023 async move {
2024 Result::<ChainConfig, _>::Ok(ChainConfig {
2025 max_block_size: 300.into(),
2026 base_fee: 1.into(),
2027 fee_recipient: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
2028 .parse()
2029 .unwrap(),
2030 ..Default::default()
2031 })
2032 }
2033 .boxed()
2034 })?
2035 .get("leafchain", |_req, _state| {
2036 async move {
2037 Result::<Vec<Leaf2>, _>::Err(hotshot_query_service::Error::catch_all(
2038 StatusCode::BAD_REQUEST,
2039 "No leafchain found".to_string(),
2040 ))
2041 }
2042 .boxed()
2043 })?;
2044
2045 let mut app = App::<_, hotshot_query_service::Error>::with_state(());
2046 app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap());
2047
2048 app.register_module::<_, _>("catchup", api).unwrap();
2049
2050 let port = pick_unused_port().expect("no free port");
2051 let url: Url = Url::parse(&format!("http://localhost:{port}")).unwrap();
2052
2053 let handle = spawn({
2054 let url = url.clone();
2055 async move {
2056 let _ = app.serve(url, SequencerApiVersion::instance()).await;
2057 }
2058 });
2059
2060 Ok((url, handle))
2061 }
2062}
2063
2064#[cfg(test)]
2065mod api_tests {
2066 use std::{fmt::Debug, marker::PhantomData};
2067
2068 use committable::Committable;
2069 use data_source::testing::TestableSequencerDataSource;
2070 use espresso_types::{
2071 traits::{EventConsumer, PersistenceOptions},
2072 Header, Leaf2, MockSequencerVersions, NamespaceId, NamespaceProofQueryData, ValidatedState,
2073 };
2074 use futures::{future, stream::StreamExt};
2075 use hotshot_example_types::node_types::TestVersions;
2076 use hotshot_query_service::availability::{
2077 AvailabilityDataSource, BlockQueryData, VidCommonQueryData,
2078 };
2079 use hotshot_types::{
2080 data::{
2081 ns_table::parse_ns_table, vid_disperse::AvidMDisperseShare, DaProposal2, EpochNumber,
2082 QuorumProposal2, QuorumProposalWrapper, VidCommitment, VidDisperseShare,
2083 },
2084 event::LeafInfo,
2085 message::Proposal,
2086 simple_certificate::{CertificatePair, QuorumCertificate2},
2087 traits::{node_implementation::ConsensusTime, signature_key::SignatureKey, EncodeBytes},
2088 utils::EpochTransitionIndicator,
2089 vid::avidm::{init_avidm_param, AvidMScheme},
2090 };
2091 use portpicker::pick_unused_port;
2092 use surf_disco::Client;
2093 use test_helpers::{
2094 catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
2095 TestNetwork, TestNetworkConfigBuilder,
2096 };
2097 use tide_disco::error::ServerError;
2098 use vbs::version::StaticVersion;
2099
2100 use super::{update::ApiEventConsumer, *};
2101 use crate::{
2102 network,
2103 persistence::no_storage::NoStorage,
2104 testing::{wait_for_decide_on_handle, TestConfigBuilder},
2105 };
2106
2107 #[rstest_reuse::template]
2108 #[rstest::rstest]
2109 #[case(PhantomData::<crate::api::sql::DataSource>)]
2110 #[case(PhantomData::<crate::api::fs::DataSource>)]
2111 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2112 pub fn testable_sequencer_data_source<D: TestableSequencerDataSource>(
2113 #[case] _d: PhantomData<D>,
2114 ) {
2115 }
2116
2117 #[rstest_reuse::apply(testable_sequencer_data_source)]
2118 pub(crate) async fn submit_test_with_query_module<D: TestableSequencerDataSource>(
2119 _d: PhantomData<D>,
2120 ) {
2121 let storage = D::create_storage().await;
2122 submit_test_helper(|opt| D::options(&storage, opt)).await
2123 }
2124
2125 #[rstest_reuse::apply(testable_sequencer_data_source)]
2126 pub(crate) async fn status_test_with_query_module<D: TestableSequencerDataSource>(
2127 _d: PhantomData<D>,
2128 ) {
2129 let storage = D::create_storage().await;
2130 status_test_helper(|opt| D::options(&storage, opt)).await
2131 }
2132
2133 #[rstest_reuse::apply(testable_sequencer_data_source)]
2134 pub(crate) async fn state_signature_test_with_query_module<D: TestableSequencerDataSource>(
2135 _d: PhantomData<D>,
2136 ) {
2137 let storage = D::create_storage().await;
2138 state_signature_test_helper(|opt| D::options(&storage, opt)).await
2139 }
2140
2141 #[rstest_reuse::apply(testable_sequencer_data_source)]
2142 pub(crate) async fn test_namespace_query<D: TestableSequencerDataSource>(_d: PhantomData<D>) {
2143 let ns_id = NamespaceId::from(42_u32);
2145 let txn = Transaction::new(ns_id, vec![1, 2, 3, 4]);
2146
2147 let port = pick_unused_port().expect("No ports free");
2149 let storage = D::create_storage().await;
2150 let network_config = TestConfigBuilder::default().build();
2151 let config = TestNetworkConfigBuilder::default()
2152 .api_config(D::options(&storage, Options::with_port(port)).submit(Default::default()))
2153 .network_config(network_config)
2154 .build();
2155 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2156 let mut events = network.server.event_stream().await;
2157
2158 let client: Client<ServerError, StaticVersion<0, 1>> =
2160 Client::new(format!("http://localhost:{port}").parse().unwrap());
2161 client.connect(None).await;
2162
2163 let hash = client
2164 .post("submit/submit")
2165 .body_json(&txn)
2166 .unwrap()
2167 .send()
2168 .await
2169 .unwrap();
2170 assert_eq!(txn.commit(), hash);
2171
2172 let block_height = wait_for_decide_on_handle(&mut events, &txn).await.0 as usize;
2174 tracing::info!(block_height, "transaction sequenced");
2175
2176 let txn2 = Transaction::new(ns_id, vec![5, 6, 7, 8]);
2178 client
2179 .post::<Commitment<Transaction>>("submit/submit")
2180 .body_json(&txn2)
2181 .unwrap()
2182 .send()
2183 .await
2184 .unwrap();
2185 let block_height2 = wait_for_decide_on_handle(&mut events, &txn2).await.0 as usize;
2186 tracing::info!(block_height2, "transaction sequenced");
2187
2188 client
2190 .socket(&format!("availability/stream/blocks/{block_height2}"))
2191 .subscribe::<BlockQueryData<SeqTypes>>()
2192 .await
2193 .unwrap()
2194 .next()
2195 .await
2196 .unwrap()
2197 .unwrap();
2198
2199 let mut found_txn = false;
2200 let mut found_empty_block = false;
2201 for block_num in 0..=block_height {
2202 let header: Header = client
2203 .get(&format!("availability/header/{block_num}"))
2204 .send()
2205 .await
2206 .unwrap();
2207 let ns_query_res: NamespaceProofQueryData = client
2208 .get(&format!("availability/block/{block_num}/namespace/{ns_id}"))
2209 .send()
2210 .await
2211 .unwrap();
2212
2213 assert_eq!(
2215 ns_query_res,
2216 client
2217 .get(&format!(
2218 "availability/block/hash/{}/namespace/{ns_id}",
2219 header.commit()
2220 ))
2221 .send()
2222 .await
2223 .unwrap()
2224 );
2225 assert_eq!(
2226 ns_query_res,
2227 client
2228 .get(&format!(
2229 "availability/block/payload-hash/{}/namespace/{ns_id}",
2230 header.payload_commitment()
2231 ))
2232 .send()
2233 .await
2234 .unwrap()
2235 );
2236
2237 if let Some(ns_proof) = ns_query_res.proof {
2239 let vid_common: VidCommonQueryData<SeqTypes> = client
2240 .get(&format!("availability/vid/common/{block_num}"))
2241 .send()
2242 .await
2243 .unwrap();
2244 ns_proof
2245 .verify(
2246 header.ns_table(),
2247 &header.payload_commitment(),
2248 vid_common.common(),
2249 )
2250 .unwrap();
2251 } else {
2252 assert!(header.ns_table().find_ns_id(&ns_id).is_none());
2254 assert!(ns_query_res.transactions.is_empty());
2255 }
2256
2257 found_empty_block = found_empty_block || ns_query_res.transactions.is_empty();
2258
2259 for txn in ns_query_res.transactions {
2260 if txn.commit() == hash {
2261 found_txn = true;
2263 }
2264 }
2265 }
2266 assert!(found_txn);
2267 assert!(found_empty_block);
2268
2269 let ns_proofs: Vec<NamespaceProofQueryData> = client
2271 .get(&format!(
2272 "availability/block/{block_height}/{}/namespace/{ns_id}",
2273 block_height2 + 1
2274 ))
2275 .send()
2276 .await
2277 .unwrap();
2278 assert_eq!(ns_proofs.len(), block_height2 + 1 - block_height);
2279 assert_eq!(&ns_proofs[0].transactions, std::slice::from_ref(&txn));
2280 assert_eq!(
2281 &ns_proofs[ns_proofs.len() - 1].transactions,
2282 std::slice::from_ref(&txn2)
2283 );
2284 for proof in &ns_proofs[1..ns_proofs.len() - 1] {
2285 assert_eq!(proof.transactions, &[]);
2286 }
2287 }
2288
2289 #[rstest_reuse::apply(testable_sequencer_data_source)]
2290 pub(crate) async fn catchup_test_with_query_module<D: TestableSequencerDataSource>(
2291 _d: PhantomData<D>,
2292 ) {
2293 let storage = D::create_storage().await;
2294 catchup_test_helper(|opt| D::options(&storage, opt)).await
2295 }
2296
2297 #[rstest_reuse::apply(testable_sequencer_data_source)]
2298 pub async fn test_non_consecutive_decide_with_failing_event_consumer<D>(_d: PhantomData<D>)
2299 where
2300 D: TestableSequencerDataSource + Debug + 'static,
2301 {
2302 #[derive(Clone, Copy, Debug)]
2303 struct FailConsumer;
2304
2305 #[async_trait]
2306 impl EventConsumer for FailConsumer {
2307 async fn handle_event(&self, _: &Event<SeqTypes>) -> anyhow::Result<()> {
2308 bail!("mock error injection");
2309 }
2310 }
2311
2312 let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);
2313
2314 let storage = D::create_storage().await;
2315 let persistence = D::persistence_options(&storage).create().await.unwrap();
2316 let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
2317 Arc::new(StorageState::new(
2318 D::create(D::persistence_options(&storage), Default::default(), false)
2319 .await
2320 .unwrap(),
2321 ApiState::new(future::pending()),
2322 ));
2323
2324 let mut chain1 = vec![];
2326
2327 let genesis = Leaf2::genesis::<TestVersions>(&Default::default(), &NodeState::mock()).await;
2328 let payload = genesis.block_payload().unwrap();
2329 let payload_bytes_arc = payload.encode();
2330
2331 let avidm_param = init_avidm_param(2).unwrap();
2332 let weights = vec![1u32; 2];
2333
2334 let ns_table = parse_ns_table(payload.byte_len().as_usize(), &payload.ns_table().encode());
2335 let (payload_commitment, shares) =
2336 AvidMScheme::ns_disperse(&avidm_param, &weights, &payload_bytes_arc, ns_table).unwrap();
2337
2338 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2339 proposal: QuorumProposal2::<SeqTypes> {
2340 block_header: genesis.block_header().clone(),
2341 view_number: ViewNumber::genesis(),
2342 justify_qc: QuorumCertificate2::genesis::<MockSequencerVersions>(
2343 &ValidatedState::default(),
2344 &NodeState::mock(),
2345 )
2346 .await,
2347 upgrade_certificate: None,
2348 view_change_evidence: None,
2349 next_drb_result: None,
2350 next_epoch_justify_qc: None,
2351 epoch: None,
2352 state_cert: None,
2353 },
2354 };
2355 let mut qc = QuorumCertificate2::genesis::<MockSequencerVersions>(
2356 &ValidatedState::default(),
2357 &NodeState::mock(),
2358 )
2359 .await;
2360
2361 let mut justify_qc = qc.clone();
2362 for i in 0..5 {
2363 *quorum_proposal.proposal.block_header.height_mut() = i;
2364 quorum_proposal.proposal.view_number = ViewNumber::new(i);
2365 quorum_proposal.proposal.justify_qc = justify_qc;
2366 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
2367 qc.view_number = leaf.view_number();
2368 qc.data.leaf_commit = Committable::commit(&leaf);
2369 justify_qc = qc.clone();
2370 chain1.push((leaf.clone(), CertificatePair::non_epoch_change(qc.clone())));
2371
2372 let quorum_proposal_signature =
2374 PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2375 .expect("Failed to sign quorum_proposal");
2376 persistence
2377 .append_quorum_proposal2(&Proposal {
2378 data: quorum_proposal.clone(),
2379 signature: quorum_proposal_signature,
2380 _pd: Default::default(),
2381 })
2382 .await
2383 .unwrap();
2384
2385 let share: VidDisperseShare<SeqTypes> = AvidMDisperseShare {
2387 view_number: leaf.view_number(),
2388 payload_commitment,
2389 share: shares[0].clone(),
2390 recipient_key: pubkey,
2391 epoch: Some(EpochNumber::new(0)),
2392 target_epoch: Some(EpochNumber::new(0)),
2393 common: avidm_param.clone(),
2394 }
2395 .into();
2396
2397 persistence
2398 .append_vid(&share.to_proposal(&privkey).unwrap())
2399 .await
2400 .unwrap();
2401
2402 let block_payload_signature =
2404 PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
2405 let da_proposal_inner = DaProposal2::<SeqTypes> {
2406 encoded_transactions: payload_bytes_arc.clone(),
2407 metadata: payload.ns_table().clone(),
2408 view_number: leaf.view_number(),
2409 epoch: Some(EpochNumber::new(0)),
2410 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
2411 };
2412 let da_proposal = Proposal {
2413 data: da_proposal_inner,
2414 signature: block_payload_signature,
2415 _pd: Default::default(),
2416 };
2417 persistence
2418 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
2419 .await
2420 .unwrap();
2421 }
2422 let mut chain2 = chain1.split_off(2);
2424 chain2.remove(0);
2426
2427 let leaf_chain = chain1
2429 .iter()
2430 .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2431 .collect::<Vec<_>>();
2432 tracing::info!("decide with event handling failure");
2433 persistence
2434 .append_decided_leaves(
2435 ViewNumber::new(1),
2436 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2437 None,
2438 &FailConsumer,
2439 )
2440 .await
2441 .unwrap();
2442
2443 let consumer = ApiEventConsumer::from(data_source.clone());
2446 let leaf_chain = chain2
2447 .iter()
2448 .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2449 .collect::<Vec<_>>();
2450 tracing::info!("decide successfully");
2451 persistence
2452 .append_decided_leaves(
2453 ViewNumber::new(4),
2454 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2455 None,
2456 &consumer,
2457 )
2458 .await
2459 .unwrap();
2460
2461 for (leaf, cert) in chain1.iter().chain(&chain2) {
2464 tracing::info!(height = leaf.height(), "check archive");
2465 let qd = data_source.get_leaf(leaf.height() as usize).await.await;
2466 let stored_leaf: Leaf2 = qd.leaf().clone();
2467 let stored_qc = qd.qc().clone();
2468 assert_eq!(&stored_leaf, leaf);
2469 assert_eq!(&stored_qc, cert.qc());
2470
2471 data_source
2472 .get_block(leaf.height() as usize)
2473 .await
2474 .try_resolve()
2475 .ok()
2476 .unwrap();
2477 data_source
2478 .get_vid_common(leaf.height() as usize)
2479 .await
2480 .try_resolve()
2481 .ok()
2482 .unwrap();
2483
2484 assert!(persistence
2486 .load_da_proposal(leaf.view_number())
2487 .await
2488 .unwrap()
2489 .is_none());
2490 assert!(persistence
2491 .load_vid_share(leaf.view_number())
2492 .await
2493 .unwrap()
2494 .is_none());
2495 assert!(persistence
2496 .load_quorum_proposal(leaf.view_number())
2497 .await
2498 .is_err());
2499 }
2500
2501 assert!(persistence
2503 .load_da_proposal(ViewNumber::new(2))
2504 .await
2505 .unwrap()
2506 .is_some());
2507 assert!(persistence
2508 .load_vid_share(ViewNumber::new(2))
2509 .await
2510 .unwrap()
2511 .is_some());
2512 persistence
2513 .load_quorum_proposal(ViewNumber::new(2))
2514 .await
2515 .unwrap();
2516 }
2517
2518 #[rstest_reuse::apply(testable_sequencer_data_source)]
2519 pub async fn test_decide_missing_data<D>(_d: PhantomData<D>)
2520 where
2521 D: TestableSequencerDataSource + Debug + 'static,
2522 {
2523 let storage = D::create_storage().await;
2524 let persistence = D::persistence_options(&storage).create().await.unwrap();
2525 let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
2526 Arc::new(StorageState::new(
2527 D::create(D::persistence_options(&storage), Default::default(), false)
2528 .await
2529 .unwrap(),
2530 ApiState::new(future::pending()),
2531 ));
2532 let consumer = ApiEventConsumer::from(data_source.clone());
2533
2534 let mut qc = QuorumCertificate2::genesis::<MockSequencerVersions>(
2535 &ValidatedState::default(),
2536 &NodeState::mock(),
2537 )
2538 .await;
2539 let leaf =
2540 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
2541
2542 tracing::info!(?leaf, ?qc, "decide genesis leaf");
2546 persistence
2547 .append_decided_leaves(
2548 leaf.view_number(),
2549 [(
2550 &leaf_info(leaf.clone()),
2551 CertificatePair::non_epoch_change(qc.clone()),
2552 )],
2553 None,
2554 &consumer,
2555 )
2556 .await
2557 .unwrap();
2558
2559 let mut block_header = leaf.block_header().clone();
2561 *block_header.height_mut() += 1;
2562 let qp = QuorumProposalWrapper {
2563 proposal: QuorumProposal2 {
2564 block_header,
2565 view_number: leaf.view_number() + 1,
2566 justify_qc: qc.clone(),
2567 upgrade_certificate: None,
2568 view_change_evidence: None,
2569 next_drb_result: None,
2570 next_epoch_justify_qc: None,
2571 epoch: None,
2572 state_cert: None,
2573 },
2574 };
2575
2576 let leaf = Leaf2::from_quorum_proposal(&qp);
2577 qc.view_number = leaf.view_number();
2578 qc.data.leaf_commit = Committable::commit(&leaf);
2579
2580 tracing::info!(?leaf, ?qc, "append leaf 1");
2582 persistence
2583 .append_decided_leaves(
2584 leaf.view_number(),
2585 [(
2586 &leaf_info(leaf.clone()),
2587 CertificatePair::non_epoch_change(qc),
2588 )],
2589 None,
2590 &consumer,
2591 )
2592 .await
2593 .unwrap();
2594
2595 assert_eq!(leaf, data_source.get_leaf(1).await.await.leaf().clone());
2597 assert!(data_source.get_vid_common(1).await.is_pending());
2598 assert!(data_source.get_block(1).await.is_pending());
2599 }
2600
2601 fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
2602 LeafInfo {
2603 leaf,
2604 vid_share: None,
2605 state: Default::default(),
2606 delta: None,
2607 state_cert: None,
2608 }
2609 }
2610}
2611
2612#[cfg(test)]
2613mod test {
2614 use std::{
2615 collections::{HashMap, HashSet},
2616 str::FromStr,
2617 time::Duration,
2618 };
2619
2620 use ::light_client::{
2621 consensus::{
2622 header::HeaderProof,
2623 leaf::{LeafProof, LeafProofHint},
2624 payload::PayloadProof,
2625 },
2626 testing::{EpochChangeQuorum, LegacyVersion},
2627 };
2628 use alloy::{
2629 eips::BlockId,
2630 network::EthereumWallet,
2631 primitives::U256,
2632 providers::{Provider, ProviderBuilder},
2633 };
2634 use async_lock::Mutex;
2635 use committable::{Commitment, Committable};
2636 use espresso_contract_deployer::{
2637 builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
2638 upgrade_stake_table_v2, Contract, Contracts,
2639 };
2640 use espresso_types::{
2641 config::PublicHotShotConfig,
2642 traits::{MembershipPersistence, NullEventConsumer, PersistenceOptions},
2643 v0_3::{Fetcher, RewardAmount, RewardMerkleProofV1, COMMISSION_BASIS_POINTS},
2644 v0_4::{
2645 RewardAccountV2, RewardMerkleProofV2, RewardMerkleTreeV2, REWARD_MERKLE_TREE_V2_HEIGHT,
2646 },
2647 validators_from_l1_events, ADVZNamespaceProofQueryData, DrbAndHeaderUpgradeVersion,
2648 EpochVersion, FeeAmount, FeeVersion, Header, L1Client, L1ClientOptions,
2649 MockSequencerVersions, NamespaceId, NamespaceProofQueryData, NsProof, RewardDistributor,
2650 SequencerVersions, StakeTableState, StateCertQueryDataV1, StateCertQueryDataV2,
2651 ValidatedState,
2652 };
2653 use futures::{
2654 future::{self, join_all, try_join_all},
2655 stream::{StreamExt, TryStreamExt},
2656 try_join,
2657 };
2658 use hotshot::types::EventType;
2659 use hotshot_contract_adapter::{
2660 reward::RewardClaimInput,
2661 sol_types::{EspToken, StakeTableV2},
2662 stake_table::StakeTableContractVersion,
2663 };
2664 use hotshot_example_types::node_types::EpochsTestVersions;
2665 use hotshot_query_service::{
2666 availability::{
2667 BlockQueryData, BlockSummaryQueryData, LeafQueryData, TransactionQueryData,
2668 VidCommonQueryData,
2669 },
2670 data_source::{
2671 sql::Config,
2672 storage::{sql::query, SqlStorage, StorageConnectionType},
2673 Transaction as _, VersionedDataSource,
2674 },
2675 explorer::TransactionSummariesResponse,
2676 merklized_state::UpdateStateData,
2677 types::HeightIndexed,
2678 };
2679 use hotshot_types::{
2680 data::EpochNumber,
2681 event::LeafInfo,
2682 traits::{
2683 block_contents::BlockHeader, election::Membership, metrics::NoMetrics,
2684 node_implementation::ConsensusTime,
2685 },
2686 utils::epoch_from_block_number,
2687 ValidatorConfig,
2688 };
2689 use jf_merkle_tree_compat::{
2690 prelude::{MerkleProof, Sha3Node},
2691 MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
2692 };
2693 use portpicker::pick_unused_port;
2694 use pretty_assertions::assert_matches;
2695 use rand::seq::SliceRandom;
2696 use rstest::rstest;
2697 use staking_cli::{
2698 demo::DelegationConfig,
2699 registration::{fetch_commission, update_commission},
2700 };
2701 use surf_disco::Client;
2702 use test_helpers::{
2703 catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
2704 TestNetwork, TestNetworkConfigBuilder,
2705 };
2706 use tide_disco::{
2707 app::AppHealth, error::ServerError, healthcheck::HealthStatus, Error, StatusCode, Url,
2708 };
2709 use tokio::time::sleep;
2710 use vbs::version::{StaticVersion, StaticVersionType};
2711
2712 use self::{
2713 data_source::testing::TestableSequencerDataSource, options::HotshotEvents,
2714 sql::DataSource as SqlDataSource,
2715 };
2716 use super::*;
2717
2718 async fn wait_until_block_height(
2719 client: &Client<ServerError, StaticVersion<0, 1>>,
2720 endpoint: &str,
2721 height: u64,
2722 ) {
2723 const MAX_RETRIES: usize = 30;
2724
2725 for _retry in 0..=MAX_RETRIES {
2726 let bh = client
2727 .get::<u64>(endpoint)
2728 .send()
2729 .await
2730 .expect("block height not found");
2731
2732 if bh >= height {
2733 return;
2734 }
2735 sleep(Duration::from_secs(3)).await;
2736 }
2737
2738 panic!("Max retries reached. {endpoint} block height did not exceed {height}");
2739 }
2740 use crate::{
2741 api::{
2742 options::Query,
2743 sql::{impl_testable_data_source::tmp_options, reconstruct_state},
2744 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2745 },
2746 catchup::{NullStateCatchup, StatePeers},
2747 persistence,
2748 persistence::no_storage,
2749 testing::{wait_for_decide_on_handle, wait_for_epochs, TestConfig, TestConfigBuilder},
2750 };
2751
2752 type PosVersionV3 = SequencerVersions<StaticVersion<0, 3>, StaticVersion<0, 0>>;
2753 type PosVersionV4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
2754
2755 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2756 async fn test_healthcheck() {
2757 let port = pick_unused_port().expect("No ports free");
2758 let url = format!("http://localhost:{port}").parse().unwrap();
2759 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2760 let options = Options::with_port(port);
2761 let network_config = TestConfigBuilder::default().build();
2762 let config = TestNetworkConfigBuilder::<5, _, NullStateCatchup>::default()
2763 .api_config(options)
2764 .network_config(network_config)
2765 .build();
2766 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2767
2768 client.connect(None).await;
2769 let health = client.get::<AppHealth>("healthcheck").send().await.unwrap();
2770 assert_eq!(health.status, HealthStatus::Available);
2771 }
2772
2773 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2774 async fn status_test_without_query_module() {
2775 status_test_helper(|opt| opt).await
2776 }
2777
2778 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2779 async fn submit_test_without_query_module() {
2780 submit_test_helper(|opt| opt).await
2781 }
2782
2783 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2784 async fn state_signature_test_without_query_module() {
2785 state_signature_test_helper(|opt| opt).await
2786 }
2787
2788 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2789 async fn catchup_test_without_query_module() {
2790 catchup_test_helper(|opt| opt).await
2791 }
2792
2793 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2794 async fn test_leaf_only_data_source() {
2795 let port = pick_unused_port().expect("No ports free");
2796
2797 let storage = SqlDataSource::create_storage().await;
2798 let options =
2799 SqlDataSource::leaf_only_ds_options(&storage, Options::with_port(port)).unwrap();
2800
2801 let network_config = TestConfigBuilder::default().build();
2802 let config = TestNetworkConfigBuilder::default()
2803 .api_config(options)
2804 .network_config(network_config)
2805 .build();
2806 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2807 let url = format!("http://localhost:{port}").parse().unwrap();
2808 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
2809
2810 tracing::info!("waiting for blocks");
2811 client.connect(Some(Duration::from_secs(15))).await;
2812 let account = TestConfig::<5>::builder_key().fee_account();
2815
2816 let _headers = client
2817 .socket("availability/stream/headers/0")
2818 .subscribe::<Header>()
2819 .await
2820 .unwrap()
2821 .take(10)
2822 .try_collect::<Vec<_>>()
2823 .await
2824 .unwrap();
2825
2826 for i in 1..5 {
2827 let leaf = client
2828 .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{i}"))
2829 .send()
2830 .await
2831 .unwrap();
2832
2833 assert_eq!(leaf.height(), i);
2834
2835 let header = client
2836 .get::<Header>(&format!("availability/header/{i}"))
2837 .send()
2838 .await
2839 .unwrap();
2840
2841 assert_eq!(header.height(), i);
2842
2843 let vid = client
2844 .get::<VidCommonQueryData<SeqTypes>>(&format!("availability/vid/common/{i}"))
2845 .send()
2846 .await
2847 .unwrap();
2848
2849 assert_eq!(vid.height(), i);
2850
2851 client
2852 .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
2853 "block-state/{i}/{}",
2854 i - 1
2855 ))
2856 .send()
2857 .await
2858 .unwrap();
2859
2860 client
2861 .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
2862 "fee-state/{}/{}",
2863 i + 1,
2864 account
2865 ))
2866 .send()
2867 .await
2868 .unwrap();
2869 }
2870
2871 client
2874 .get::<BlockQueryData<SeqTypes>>("availability/block/1")
2875 .send()
2876 .await
2877 .unwrap_err();
2878 }
2879
2880 async fn run_catchup_test(url_suffix: &str) {
2881 let port = pick_unused_port().expect("No ports free");
2883 const NUM_NODES: usize = 5;
2884
2885 let url: url::Url = format!("http://localhost:{port}{url_suffix}")
2886 .parse()
2887 .unwrap();
2888
2889 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2890 .api_config(Options::with_port(port))
2891 .network_config(TestConfigBuilder::default().build())
2892 .catchups(std::array::from_fn(|_| {
2893 StatePeers::<StaticVersion<0, 1>>::from_urls(
2894 vec![url.clone()],
2895 Default::default(),
2896 &NoMetrics,
2897 )
2898 }))
2899 .build();
2900 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2901
2902 let mut events = network.peers[0].event_stream().await;
2904 loop {
2905 let event = events.next().await.unwrap();
2906 let EventType::Decide { leaf_chain, .. } = event.event else {
2907 continue;
2908 };
2909 if leaf_chain[0].leaf.height() > 0 {
2910 break;
2911 }
2912 }
2913
2914 tracing::info!("shutting down node");
2919 network.peers.remove(0);
2920
2921 network
2923 .server
2924 .event_stream()
2925 .await
2926 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2927 .take(3)
2928 .collect::<Vec<_>>()
2929 .await;
2930
2931 tracing::info!("restarting node");
2932 let node = network
2933 .cfg
2934 .init_node(
2935 1,
2936 ValidatedState::default(),
2937 no_storage::Options,
2938 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
2939 vec![url],
2940 Default::default(),
2941 &NoMetrics,
2942 )),
2943 None,
2944 &NoMetrics,
2945 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2946 NullEventConsumer,
2947 MockSequencerVersions::new(),
2948 Default::default(),
2949 )
2950 .await;
2951 let mut events = node.event_stream().await;
2952
2953 let mut proposers = [false; NUM_NODES];
2956 loop {
2957 let event = events.next().await.unwrap();
2958 let EventType::Decide { leaf_chain, .. } = event.event else {
2959 continue;
2960 };
2961 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
2962 let height = leaf.height();
2963 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
2964 if height == 0 {
2965 continue;
2966 }
2967
2968 tracing::info!(
2969 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
2970 );
2971 proposers[leaf_builder] = true;
2972 }
2973
2974 if proposers.iter().all(|has_proposed| *has_proposed) {
2975 break;
2976 }
2977 }
2978 }
2979
2980 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2981 async fn test_catchup() {
2982 run_catchup_test("").await;
2983 }
2984
2985 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2986 async fn test_catchup_v0() {
2987 run_catchup_test("/v0").await;
2988 }
2989
2990 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2991 async fn test_catchup_v1() {
2992 run_catchup_test("/v1").await;
2993 }
2994
2995 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2996 async fn test_catchup_no_state_peers() {
2997 let port = pick_unused_port().expect("No ports free");
2999 const NUM_NODES: usize = 5;
3000 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3001 .api_config(Options::with_port(port))
3002 .network_config(TestConfigBuilder::default().build())
3003 .build();
3004 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3005
3006 let mut events = network.peers[0].event_stream().await;
3008 loop {
3009 let event = events.next().await.unwrap();
3010 let EventType::Decide { leaf_chain, .. } = event.event else {
3011 continue;
3012 };
3013 if leaf_chain[0].leaf.height() > 0 {
3014 break;
3015 }
3016 }
3017
3018 tracing::info!("shutting down node");
3023 network.peers.remove(0);
3024
3025 network
3027 .server
3028 .event_stream()
3029 .await
3030 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3031 .take(3)
3032 .collect::<Vec<_>>()
3033 .await;
3034
3035 tracing::info!("restarting node");
3036 let node = network
3037 .cfg
3038 .init_node(
3039 1,
3040 ValidatedState::default(),
3041 no_storage::Options,
3042 None::<NullStateCatchup>,
3043 None,
3044 &NoMetrics,
3045 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3046 NullEventConsumer,
3047 MockSequencerVersions::new(),
3048 Default::default(),
3049 )
3050 .await;
3051 let mut events = node.event_stream().await;
3052
3053 let mut proposers = [false; NUM_NODES];
3056 loop {
3057 let event = events.next().await.unwrap();
3058 let EventType::Decide { leaf_chain, .. } = event.event else {
3059 continue;
3060 };
3061 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3062 let height = leaf.height();
3063 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3064 if height == 0 {
3065 continue;
3066 }
3067
3068 tracing::info!(
3069 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3070 );
3071 proposers[leaf_builder] = true;
3072 }
3073
3074 if proposers.iter().all(|has_proposed| *has_proposed) {
3075 break;
3076 }
3077 }
3078 }
3079
3080 #[ignore]
3081 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3082 async fn test_catchup_epochs_no_state_peers() {
3083 let port = pick_unused_port().expect("No ports free");
3085 const EPOCH_HEIGHT: u64 = 5;
3086 let network_config = TestConfigBuilder::default()
3087 .epoch_height(EPOCH_HEIGHT)
3088 .build();
3089 const NUM_NODES: usize = 5;
3090 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3091 .api_config(Options::with_port(port))
3092 .network_config(network_config)
3093 .build();
3094 let mut network = TestNetwork::new(config, EpochsTestVersions {}).await;
3095
3096 let mut events = network.peers[0].event_stream().await;
3098 loop {
3099 let event = events.next().await.unwrap();
3100 let EventType::Decide { leaf_chain, .. } = event.event else {
3101 continue;
3102 };
3103 tracing::error!("got decide height {}", leaf_chain[0].leaf.height());
3104
3105 if leaf_chain[0].leaf.height() > EPOCH_HEIGHT * 3 {
3106 tracing::error!("decided past one epoch");
3107 break;
3108 }
3109 }
3110
3111 tracing::info!("shutting down node");
3116 network.peers.remove(0);
3117
3118 network
3120 .server
3121 .event_stream()
3122 .await
3123 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3124 .take(3)
3125 .collect::<Vec<_>>()
3126 .await;
3127
3128 tracing::error!("restarting node");
3129 let node = network
3130 .cfg
3131 .init_node(
3132 1,
3133 ValidatedState::default(),
3134 no_storage::Options,
3135 None::<NullStateCatchup>,
3136 None,
3137 &NoMetrics,
3138 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3139 NullEventConsumer,
3140 MockSequencerVersions::new(),
3141 Default::default(),
3142 )
3143 .await;
3144 let mut events = node.event_stream().await;
3145
3146 let mut proposers = [false; NUM_NODES];
3149 loop {
3150 let event = events.next().await.unwrap();
3151 let EventType::Decide { leaf_chain, .. } = event.event else {
3152 continue;
3153 };
3154 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3155 let height = leaf.height();
3156 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3157 if height == 0 {
3158 continue;
3159 }
3160
3161 tracing::info!(
3162 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3163 );
3164 proposers[leaf_builder] = true;
3165 }
3166
3167 if proposers.iter().all(|has_proposed| *has_proposed) {
3168 break;
3169 }
3170 }
3171 }
3172
3173 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3174 async fn test_chain_config_from_instance() {
3175 let port = pick_unused_port().expect("No ports free");
3181
3182 let chain_config: ChainConfig = ChainConfig::default();
3183
3184 let state = ValidatedState {
3185 chain_config: chain_config.commit().into(),
3186 ..Default::default()
3187 };
3188
3189 let states = std::array::from_fn(|_| state.clone());
3190
3191 let config = TestNetworkConfigBuilder::default()
3192 .api_config(Options::with_port(port))
3193 .states(states)
3194 .catchups(std::array::from_fn(|_| {
3195 StatePeers::<StaticVersion<0, 1>>::from_urls(
3196 vec![format!("http://localhost:{port}").parse().unwrap()],
3197 Default::default(),
3198 &NoMetrics,
3199 )
3200 }))
3201 .network_config(TestConfigBuilder::default().build())
3202 .build();
3203
3204 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3205
3206 network
3208 .server
3209 .event_stream()
3210 .await
3211 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3212 .take(3)
3213 .collect::<Vec<_>>()
3214 .await;
3215
3216 for peer in &network.peers {
3217 let state = peer.consensus().read().await.decided_state().await;
3218
3219 assert_eq!(state.chain_config.resolve().unwrap(), chain_config)
3220 }
3221
3222 network.server.shut_down().await;
3223 drop(network);
3224 }
3225
3226 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3227 async fn test_chain_config_catchup() {
3228 let port = pick_unused_port().expect("No ports free");
3234
3235 let cf = ChainConfig {
3236 max_block_size: 300.into(),
3237 base_fee: 1.into(),
3238 ..Default::default()
3239 };
3240
3241 let state1 = ValidatedState {
3243 chain_config: cf.commit().into(),
3244 ..Default::default()
3245 };
3246
3247 let state2 = ValidatedState {
3249 chain_config: cf.into(),
3250 ..Default::default()
3251 };
3252
3253 let mut states = std::array::from_fn(|_| state1.clone());
3254 states[0] = state2;
3257
3258 const NUM_NODES: usize = 5;
3259 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3260 .api_config(Options::from(options::Http {
3261 port,
3262 max_connections: None,
3263 }))
3264 .states(states)
3265 .catchups(std::array::from_fn(|_| {
3266 StatePeers::<StaticVersion<0, 1>>::from_urls(
3267 vec![format!("http://localhost:{port}").parse().unwrap()],
3268 Default::default(),
3269 &NoMetrics,
3270 )
3271 }))
3272 .network_config(TestConfigBuilder::default().build())
3273 .build();
3274
3275 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3276
3277 network
3279 .server
3280 .event_stream()
3281 .await
3282 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3283 .take(3)
3284 .collect::<Vec<_>>()
3285 .await;
3286
3287 for peer in &network.peers {
3288 let state = peer.consensus().read().await.decided_state().await;
3289
3290 assert_eq!(state.chain_config.resolve().unwrap(), cf)
3291 }
3292
3293 network.server.shut_down().await;
3294 drop(network);
3295 }
3296
3297 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3298 async fn test_pos_upgrade_view_based() {
3299 type PosUpgrade = SequencerVersions<FeeVersion, EpochVersion>;
3300 test_upgrade_helper::<PosUpgrade>(PosUpgrade::new()).await;
3301 }
3302
3303 async fn test_upgrade_helper<V: Versions>(version: V) {
3304 let wait_extra_views = 10;
3307 const NUM_NODES: usize = 5;
3309 let upgrade_version = <V as Versions>::Upgrade::VERSION;
3310 let port = pick_unused_port().expect("No ports free");
3311
3312 let test_config = TestConfigBuilder::default()
3313 .epoch_height(200)
3314 .epoch_start_block(321)
3315 .set_upgrades(upgrade_version)
3316 .await
3317 .build();
3318
3319 let chain_config_genesis = ValidatedState::default().chain_config.resolve().unwrap();
3320 let chain_config_upgrade = test_config.get_upgrade_map().chain_config(upgrade_version);
3321 assert_ne!(chain_config_genesis, chain_config_upgrade);
3322 tracing::debug!(?chain_config_genesis, ?chain_config_upgrade);
3323
3324 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3325 .api_config(Options::from(options::Http {
3326 port,
3327 max_connections: None,
3328 }))
3329 .catchups(std::array::from_fn(|_| {
3330 StatePeers::<SequencerApiVersion>::from_urls(
3331 vec![format!("http://localhost:{port}").parse().unwrap()],
3332 Default::default(),
3333 &NoMetrics,
3334 )
3335 }))
3336 .network_config(test_config)
3337 .build();
3338
3339 let mut network = TestNetwork::new(config, version).await;
3340 let mut events = network.server.event_stream().await;
3341
3342 let upgrade = loop {
3346 let event = events.next().await.unwrap();
3347 match event.event {
3348 EventType::UpgradeProposal { proposal, .. } => {
3349 tracing::info!(?proposal, "proposal");
3350 let upgrade = proposal.data.upgrade_proposal;
3351 let new_version = upgrade.new_version;
3352 tracing::info!(?new_version, "upgrade proposal new version");
3353 assert_eq!(new_version, <V as Versions>::Upgrade::VERSION);
3354 break upgrade;
3355 },
3356 _ => continue,
3357 }
3358 };
3359
3360 let wanted_view = upgrade.new_version_first_view + wait_extra_views;
3361 loop {
3363 let event = events.next().await.unwrap();
3364 let view_number = event.view_number;
3365
3366 tracing::debug!(?view_number, ?upgrade.new_version_first_view, "upgrade_new_view");
3367 if view_number > wanted_view {
3368 tracing::info!(?view_number, ?upgrade.new_version_first_view, "passed upgrade view");
3369 let states = join_all(
3370 network
3371 .peers
3372 .iter()
3373 .map(|peer| async { peer.consensus().read().await.decided_state().await }),
3374 )
3375 .await;
3376 let leaves = join_all(
3377 network
3378 .peers
3379 .iter()
3380 .map(|peer| async { peer.consensus().read().await.decided_leaf().await }),
3381 )
3382 .await;
3383 let configs: Vec<ChainConfig> = states
3384 .iter()
3385 .map(|state| state.chain_config.resolve().unwrap())
3386 .collect();
3387
3388 tracing::info!(?leaves, ?configs, "post upgrade state");
3389 for config in configs {
3390 assert_eq!(config, chain_config_upgrade);
3391 }
3392 for leaf in leaves {
3393 assert_eq!(leaf.block_header().version(), upgrade_version);
3394 }
3395 break;
3396 }
3397 sleep(Duration::from_millis(200)).await;
3398 }
3399
3400 network.server.shut_down().await;
3401 }
3402
3403 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3404 pub(crate) async fn test_restart() {
3405 const NUM_NODES: usize = 5;
3406 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3408 let persistence: [_; NUM_NODES] = storage
3409 .iter()
3410 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3411 .collect::<Vec<_>>()
3412 .try_into()
3413 .unwrap();
3414 let port = pick_unused_port().unwrap();
3415 let config = TestNetworkConfigBuilder::default()
3416 .api_config(SqlDataSource::options(
3417 &storage[0],
3418 Options::with_port(port),
3419 ))
3420 .persistences(persistence.clone())
3421 .network_config(TestConfigBuilder::default().build())
3422 .build();
3423 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3424
3425 let client: Client<ServerError, SequencerApiVersion> =
3427 Client::new(format!("http://localhost:{port}").parse().unwrap());
3428 client.connect(None).await;
3429 tracing::info!(port, "server running");
3430
3431 client
3433 .socket("availability/stream/blocks/0")
3434 .subscribe::<BlockQueryData<SeqTypes>>()
3435 .await
3436 .unwrap()
3437 .take(3)
3438 .collect::<Vec<_>>()
3439 .await;
3440
3441 tracing::info!("shutting down nodes");
3443 network.stop_consensus().await;
3444
3445 let height = client
3447 .get::<usize>("status/block-height")
3448 .send()
3449 .await
3450 .unwrap();
3451 tracing::info!("decided {height} blocks before shutting down");
3452
3453 let chain: Vec<LeafQueryData<SeqTypes>> = client
3455 .socket("availability/stream/leaves/0")
3456 .subscribe()
3457 .await
3458 .unwrap()
3459 .take(height)
3460 .try_collect()
3461 .await
3462 .unwrap();
3463 let decided_view = chain.last().unwrap().leaf().view_number();
3464
3465 let state = network.server.decided_state().await;
3468 tracing::info!(?decided_view, ?state, "consensus state");
3469
3470 drop(network);
3472
3473 let port = pick_unused_port().expect("No ports free");
3475
3476 let config = TestNetworkConfigBuilder::default()
3477 .api_config(SqlDataSource::options(
3478 &storage[0],
3479 Options::with_port(port),
3480 ))
3481 .persistences(persistence)
3482 .catchups(std::array::from_fn(|_| {
3483 StatePeers::<StaticVersion<0, 1>>::from_urls(
3487 vec![format!("http://localhost:{port}").parse().unwrap()],
3488 Default::default(),
3489 &NoMetrics,
3490 )
3491 }))
3492 .network_config(TestConfigBuilder::default().build())
3493 .build();
3494 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3495 let client: Client<ServerError, StaticVersion<0, 1>> =
3496 Client::new(format!("http://localhost:{port}").parse().unwrap());
3497 client.connect(None).await;
3498 tracing::info!(port, "server running");
3499
3500 tracing::info!("waiting for decide, height {height}");
3502 let new_leaf: LeafQueryData<SeqTypes> = client
3503 .socket(&format!("availability/stream/leaves/{height}"))
3504 .subscribe()
3505 .await
3506 .unwrap()
3507 .next()
3508 .await
3509 .unwrap()
3510 .unwrap();
3511 assert_eq!(new_leaf.height(), height as u64);
3512 assert_eq!(
3513 new_leaf.leaf().parent_commitment(),
3514 chain[height - 1].hash()
3515 );
3516
3517 let new_chain: Vec<LeafQueryData<SeqTypes>> = client
3519 .socket("availability/stream/leaves/0")
3520 .subscribe()
3521 .await
3522 .unwrap()
3523 .take(height)
3524 .try_collect()
3525 .await
3526 .unwrap();
3527 assert_eq!(chain, new_chain);
3528 }
3529
3530 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3531 async fn test_fetch_config() {
3532 let port = pick_unused_port().expect("No ports free");
3533 let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap();
3534 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url.clone());
3535
3536 let options = Options::with_port(port).config(Default::default());
3537 let network_config = TestConfigBuilder::default().build();
3538 let config = TestNetworkConfigBuilder::default()
3539 .api_config(options)
3540 .network_config(network_config)
3541 .build();
3542 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3543 client.connect(None).await;
3544
3545 let peers = StatePeers::<StaticVersion<0, 1>>::from_urls(
3548 vec!["https://notarealnode.network".parse().unwrap(), url],
3549 Default::default(),
3550 &NoMetrics,
3551 );
3552
3553 let validator =
3555 ValidatorConfig::generated_from_seed_indexed([0; 32], 1, U256::from(1), false);
3556 let config = peers.fetch_config(validator.clone()).await.unwrap();
3557
3558 assert_eq!(config.node_index, 1);
3560
3561 pretty_assertions::assert_eq!(
3564 serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(),
3565 serde_json::to_value(PublicHotShotConfig::from(
3566 network.cfg.hotshot_config().clone()
3567 ))
3568 .unwrap()
3569 );
3570 }
3571
3572 async fn run_hotshot_event_streaming_test(url_suffix: &str) {
3573 let query_service_port = pick_unused_port().expect("No ports free for query service");
3574
3575 let url = format!("http://localhost:{query_service_port}{url_suffix}")
3576 .parse()
3577 .unwrap();
3578
3579 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3580
3581 let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3582
3583 let network_config = TestConfigBuilder::default().build();
3584 let config = TestNetworkConfigBuilder::default()
3585 .api_config(options)
3586 .network_config(network_config)
3587 .build();
3588 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3589
3590 let mut subscribed_events = client
3591 .socket("hotshot-events/events")
3592 .subscribe::<Event<SeqTypes>>()
3593 .await
3594 .unwrap();
3595
3596 let total_count = 5;
3597 let mut receive_count = 0;
3599 loop {
3600 let event = subscribed_events.next().await.unwrap();
3601 tracing::info!("Received event in hotshot event streaming Client 1: {event:?}");
3602 receive_count += 1;
3603 if receive_count > total_count {
3604 tracing::info!("Client Received at least desired events, exiting loop");
3605 break;
3606 }
3607 }
3608 assert_eq!(receive_count, total_count + 1);
3609 }
3610
3611 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3612 async fn test_hotshot_event_streaming_v0() {
3613 run_hotshot_event_streaming_test("/v0").await;
3614 }
3615
3616 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3617 async fn test_hotshot_event_streaming_v1() {
3618 run_hotshot_event_streaming_test("/v1").await;
3619 }
3620
3621 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3622 async fn test_hotshot_event_streaming() {
3623 run_hotshot_event_streaming_test("").await;
3624 }
3625
3626 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3629 async fn test_hotshot_event_streaming_epoch_progression() {
3630 let epoch_height = 35;
3631 let wanted_epochs = 4;
3632
3633 let network_config = TestConfigBuilder::default()
3634 .epoch_height(epoch_height)
3635 .build();
3636
3637 let query_service_port = pick_unused_port().expect("No ports free for query service");
3638
3639 let hotshot_url = format!("http://localhost:{query_service_port}")
3640 .parse()
3641 .unwrap();
3642
3643 let client: Client<ServerError, SequencerApiVersion> = Client::new(hotshot_url);
3644 let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3645
3646 let config = TestNetworkConfigBuilder::default()
3647 .api_config(options)
3648 .network_config(network_config.clone())
3649 .pos_hook::<PosVersionV3>(DelegationConfig::VariableAmounts, Default::default())
3650 .await
3651 .expect("Pos Deployment")
3652 .build();
3653
3654 let _network = TestNetwork::new(config, PosVersionV3::new()).await;
3655
3656 let mut subscribed_events = client
3657 .socket("hotshot-events/events")
3658 .subscribe::<Event<SeqTypes>>()
3659 .await
3660 .unwrap();
3661
3662 let wanted_views = epoch_height * wanted_epochs;
3663
3664 let mut views = HashSet::new();
3665 let mut epochs = HashSet::new();
3666 for _ in 0..=600 {
3667 let event = subscribed_events.next().await.unwrap();
3668 let event = event.unwrap();
3669 let view_number = event.view_number;
3670 views.insert(view_number.u64());
3671
3672 if let hotshot::types::EventType::Decide { committing_qc, .. } = event.event {
3673 assert!(committing_qc.epoch().is_some(), "epochs are live");
3674 assert!(committing_qc.block_number().is_some());
3675
3676 let epoch = committing_qc.epoch().unwrap().u64();
3677 epochs.insert(epoch);
3678
3679 tracing::debug!(
3680 "Got decide: epoch: {:?}, block: {:?} ",
3681 epoch,
3682 committing_qc.block_number()
3683 );
3684
3685 let expected_epoch =
3686 epoch_from_block_number(committing_qc.block_number().unwrap(), epoch_height);
3687 tracing::debug!("expected epoch: {expected_epoch}, qc epoch: {epoch}");
3688
3689 assert_eq!(expected_epoch, epoch);
3690 }
3691 if views.contains(&wanted_views) {
3692 tracing::info!("Client Received at least desired views, exiting loop");
3693 break;
3694 }
3695 }
3696
3697 assert!(views.contains(&wanted_views), "Views are not progressing");
3699 assert!(
3700 epochs.contains(&wanted_epochs),
3701 "Epochs are not progressing"
3702 );
3703 }
3704
3705 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3706 async fn test_pos_rewards_basic() -> anyhow::Result<()> {
3707 let epoch_height = 20;
3714
3715 let network_config = TestConfigBuilder::default()
3716 .epoch_height(epoch_height)
3717 .build();
3718
3719 let api_port = pick_unused_port().expect("No ports free for query service");
3720
3721 const NUM_NODES: usize = 1;
3722 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3724 let persistence: [_; NUM_NODES] = storage
3725 .iter()
3726 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3727 .collect::<Vec<_>>()
3728 .try_into()
3729 .unwrap();
3730
3731 let config = TestNetworkConfigBuilder::with_num_nodes()
3732 .api_config(SqlDataSource::options(
3733 &storage[0],
3734 Options::with_port(api_port),
3735 ))
3736 .network_config(network_config.clone())
3737 .persistences(persistence.clone())
3738 .catchups(std::array::from_fn(|_| {
3739 StatePeers::<StaticVersion<0, 1>>::from_urls(
3740 vec![format!("http://localhost:{api_port}").parse().unwrap()],
3741 Default::default(),
3742 &NoMetrics,
3743 )
3744 }))
3745 .pos_hook::<PosVersionV3>(DelegationConfig::VariableAmounts, Default::default())
3746 .await
3747 .unwrap()
3748 .build();
3749
3750 let network = TestNetwork::new(config, PosVersionV3::new()).await;
3751 let client: Client<ServerError, SequencerApiVersion> =
3752 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3753
3754 let _blocks = client
3759 .socket("availability/stream/blocks/0")
3760 .subscribe::<BlockQueryData<SeqTypes>>()
3761 .await
3762 .unwrap()
3763 .take(65)
3764 .try_collect::<Vec<_>>()
3765 .await
3766 .unwrap();
3767
3768 let staking_priv_keys = network_config.staking_priv_keys();
3769 let account = staking_priv_keys[0].0.clone();
3770 let address = account.address();
3771
3772 let block_height = 60;
3773
3774 let amount = client
3776 .get::<Option<RewardAmount>>(&format!(
3777 "reward-state/reward-balance/{block_height}/{address}"
3778 ))
3779 .send()
3780 .await
3781 .unwrap()
3782 .unwrap();
3783
3784 tracing::info!("amount={amount:?}");
3785
3786 let epoch_start_block = 40;
3787
3788 let node_state = network.server.node_state();
3789 let membership = node_state.coordinator.membership().read().await;
3790 let block_reward = membership
3791 .fixed_block_reward()
3792 .expect("block reward is not None");
3793 drop(membership);
3794
3795 let expected_amount = block_reward.0 * (U256::from(block_height - epoch_start_block));
3797
3798 assert_eq!(amount.0, expected_amount, "reward amount don't match");
3799
3800 Ok(())
3801 }
3802
3803 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3804 async fn test_cumulative_pos_rewards() -> anyhow::Result<()> {
3805 let epoch_height = 20;
3811
3812 let network_config = TestConfigBuilder::default()
3813 .epoch_height(epoch_height)
3814 .build();
3815
3816 let api_port = pick_unused_port().expect("No ports free for query service");
3817
3818 const NUM_NODES: usize = 5;
3819 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3821 let persistence: [_; NUM_NODES] = storage
3822 .iter()
3823 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3824 .collect::<Vec<_>>()
3825 .try_into()
3826 .unwrap();
3827
3828 let config = TestNetworkConfigBuilder::with_num_nodes()
3829 .api_config(SqlDataSource::options(
3830 &storage[0],
3831 Options::with_port(api_port),
3832 ))
3833 .network_config(network_config)
3834 .persistences(persistence.clone())
3835 .catchups(std::array::from_fn(|_| {
3836 StatePeers::<StaticVersion<0, 1>>::from_urls(
3837 vec![format!("http://localhost:{api_port}").parse().unwrap()],
3838 Default::default(),
3839 &NoMetrics,
3840 )
3841 }))
3842 .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3843 .await
3844 .unwrap()
3845 .build();
3846
3847 let network = TestNetwork::new(config, PosVersionV3::new()).await;
3848 let node_state = network.server.node_state();
3849 let membership = node_state.coordinator.membership().read().await;
3850 let block_reward = membership
3851 .fixed_block_reward()
3852 .expect("block reward is not None");
3853 drop(membership);
3854 let client: Client<ServerError, SequencerApiVersion> =
3855 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3856
3857 let _blocks = client
3859 .socket("availability/stream/blocks/0")
3860 .subscribe::<BlockQueryData<SeqTypes>>()
3861 .await
3862 .unwrap()
3863 .take(75)
3864 .try_collect::<Vec<_>>()
3865 .await
3866 .unwrap();
3867
3868 let validators = client
3872 .get::<ValidatorMap>("node/validators/3")
3873 .send()
3874 .await
3875 .expect("failed to get validator");
3876
3877 let mut addresses = HashSet::new();
3881 for v in validators.values() {
3882 addresses.insert(v.account);
3883 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3884 }
3885 let validators = client
3887 .get::<ValidatorMap>("node/validators/4")
3888 .send()
3889 .await
3890 .expect("failed to get validator");
3891 for v in validators.values() {
3892 addresses.insert(v.account);
3893 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3894 }
3895
3896 let mut prev_cumulative_amount = U256::ZERO;
3897 for block in 41..=67 {
3900 let mut cumulative_amount = U256::ZERO;
3901 for address in addresses.clone() {
3902 let amount = client
3903 .get::<Option<RewardAmount>>(&format!(
3904 "reward-state/reward-balance/{block}/{address}"
3905 ))
3906 .send()
3907 .await
3908 .ok()
3909 .flatten();
3910
3911 if let Some(amount) = amount {
3912 tracing::info!("address={address}, amount={amount}");
3913 cumulative_amount += amount.0;
3914 };
3915 }
3916
3917 assert_eq!(cumulative_amount - prev_cumulative_amount, block_reward.0);
3919 tracing::info!("cumulative_amount is correct for block={block}");
3920 prev_cumulative_amount = cumulative_amount;
3921 }
3922
3923 Ok(())
3924 }
3925
3926 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3927 async fn test_stake_table_duplicate_events_from_contract() -> anyhow::Result<()> {
3928 let epoch_height = 20;
3932
3933 let network_config = TestConfigBuilder::default()
3934 .epoch_height(epoch_height)
3935 .build();
3936
3937 let api_port = pick_unused_port().expect("No ports free for query service");
3938
3939 const NUM_NODES: usize = 5;
3940 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3942 let persistence: [_; NUM_NODES] = storage
3943 .iter()
3944 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3945 .collect::<Vec<_>>()
3946 .try_into()
3947 .unwrap();
3948
3949 let l1_url = network_config.l1_url();
3950 let config = TestNetworkConfigBuilder::with_num_nodes()
3951 .api_config(SqlDataSource::options(
3952 &storage[0],
3953 Options::with_port(api_port),
3954 ))
3955 .network_config(network_config)
3956 .persistences(persistence.clone())
3957 .catchups(std::array::from_fn(|_| {
3958 StatePeers::<StaticVersion<0, 1>>::from_urls(
3959 vec![format!("http://localhost:{api_port}").parse().unwrap()],
3960 Default::default(),
3961 &NoMetrics,
3962 )
3963 }))
3964 .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3965 .await
3966 .unwrap()
3967 .build();
3968
3969 let network = TestNetwork::new(config, PosVersionV3::new()).await;
3970
3971 let mut prev_st = None;
3972 let state = network.server.decided_state().await;
3973 let chain_config = state.chain_config.resolve().expect("resolve chain config");
3974 let stake_table = chain_config.stake_table_contract.unwrap();
3975
3976 let l1_client = L1ClientOptions::default()
3977 .connect(vec![l1_url])
3978 .expect("failed to connect to l1");
3979
3980 let client: Client<ServerError, SequencerApiVersion> =
3981 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3982
3983 let mut headers = client
3984 .socket("availability/stream/headers/0")
3985 .subscribe::<Header>()
3986 .await
3987 .unwrap();
3988
3989 let mut target_bh = 0;
3990 while let Some(header) = headers.next().await {
3991 let header = header.unwrap();
3992 println!("got header with height {}", header.height());
3993 if header.height() == 0 {
3994 continue;
3995 }
3996 let l1_block = header.l1_finalized().expect("l1 block not found");
3997
3998 let sorted_events = Fetcher::fetch_events_from_contract(
3999 l1_client.clone(),
4000 stake_table,
4001 None,
4002 l1_block.number(),
4003 )
4004 .await?;
4005
4006 let mut sorted_dedup_removed = sorted_events.clone();
4007 sorted_dedup_removed.dedup();
4008
4009 assert_eq!(
4010 sorted_events.len(),
4011 sorted_dedup_removed.len(),
4012 "duplicates found"
4013 );
4014
4015 let stake_table =
4017 validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e)).unwrap();
4018 if let Some(prev_st) = prev_st {
4019 assert_eq!(stake_table, prev_st);
4020 }
4021
4022 prev_st = Some(stake_table);
4023
4024 if target_bh == 100 {
4025 break;
4026 }
4027
4028 target_bh = header.height();
4029 }
4030
4031 Ok(())
4032 }
4033
4034 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4035 async fn test_rewards_v3() -> anyhow::Result<()> {
4036 const EPOCH_HEIGHT: u64 = 20;
4042
4043 let network_config = TestConfigBuilder::default()
4044 .epoch_height(EPOCH_HEIGHT)
4045 .build();
4046
4047 let api_port = pick_unused_port().expect("No ports free for query service");
4048
4049 const NUM_NODES: usize = 7;
4050
4051 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4052 let persistence: [_; NUM_NODES] = storage
4053 .iter()
4054 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4055 .collect::<Vec<_>>()
4056 .try_into()
4057 .unwrap();
4058
4059 let config = TestNetworkConfigBuilder::with_num_nodes()
4060 .api_config(SqlDataSource::options(
4061 &storage[0],
4062 Options::with_port(api_port),
4063 ))
4064 .network_config(network_config)
4065 .persistences(persistence.clone())
4066 .catchups(std::array::from_fn(|_| {
4067 StatePeers::<StaticVersion<0, 1>>::from_urls(
4068 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4069 Default::default(),
4070 &NoMetrics,
4071 )
4072 }))
4073 .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
4074 .await
4075 .unwrap()
4076 .build();
4077
4078 let network = TestNetwork::new(config, PosVersionV3::new()).await;
4079 let client: Client<ServerError, SequencerApiVersion> =
4080 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4081
4082 let mut events = network.peers[0].event_stream().await;
4084 while let Some(event) = events.next().await {
4085 if let EventType::Decide { leaf_chain, .. } = event.event {
4086 let height = leaf_chain[0].leaf.height();
4087 tracing::info!("Node 0 decided at height: {height}");
4088 if height > EPOCH_HEIGHT * 3 {
4089 break;
4090 }
4091 }
4092 }
4093
4094 {
4096 client
4097 .get::<ValidatorMap>("node/validators/1")
4098 .send()
4099 .await
4100 .unwrap()
4101 .is_empty();
4102
4103 client
4104 .get::<ValidatorMap>("node/validators/2")
4105 .send()
4106 .await
4107 .unwrap()
4108 .is_empty();
4109 }
4110
4111 let validators = client
4113 .get::<ValidatorMap>("node/validators/3")
4114 .send()
4115 .await
4116 .expect("validators");
4117
4118 assert!(!validators.is_empty());
4119
4120 let mut addresses = HashSet::new();
4122 for v in validators.values() {
4123 addresses.insert(v.account);
4124 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4125 }
4126
4127 for block in 0..=EPOCH_HEIGHT * 2 {
4129 for address in addresses.clone() {
4130 let amount = client
4131 .get::<Option<RewardAmount>>(&format!(
4132 "reward-state/reward-balance/{block}/{address}"
4133 ))
4134 .send()
4135 .await
4136 .ok()
4137 .flatten();
4138 assert!(amount.is_none(), "amount is not none for block {block}")
4139 }
4140 }
4141
4142 let leaves = client
4144 .socket("availability/stream/leaves/41")
4145 .subscribe::<LeafQueryData<SeqTypes>>()
4146 .await
4147 .unwrap()
4148 .take((EPOCH_HEIGHT * 3).try_into().unwrap())
4149 .try_collect::<Vec<_>>()
4150 .await
4151 .unwrap();
4152
4153 let node_state = network.server.node_state();
4154 let coordinator = node_state.coordinator;
4155
4156 let membership = coordinator.membership().read().await;
4157 let block_reward = membership
4158 .fixed_block_reward()
4159 .expect("block reward is not None");
4160
4161 drop(membership);
4162
4163 let mut rewards_map = HashMap::new();
4164
4165 for leaf in leaves {
4166 let block = leaf.height();
4167 tracing::info!("verify rewards for block={block:?}");
4168 let membership = coordinator.membership().read().await;
4169 let epoch = epoch_from_block_number(block, EPOCH_HEIGHT);
4170 let epoch_number = EpochNumber::new(epoch);
4171 let leader = membership
4172 .leader(leaf.leaf().view_number(), Some(epoch_number))
4173 .expect("leader");
4174 let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
4175
4176 drop(membership);
4177
4178 let validators = client
4179 .get::<ValidatorMap>(&format!("node/validators/{epoch}"))
4180 .send()
4181 .await
4182 .expect("validators");
4183
4184 let leader_validator = validators
4185 .get(&leader_eth_address)
4186 .expect("leader not found");
4187
4188 let distributor =
4189 RewardDistributor::new(leader_validator.clone(), block_reward, U256::ZERO.into());
4190 for validator in validators.values() {
4192 let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
4193
4194 assert_eq!(delegator_stake_sum, validator.stake);
4195 }
4196
4197 let computed_rewards = distributor.compute_rewards().expect("reward computation");
4198
4199 let total_reward = block_reward.0;
4206 let leader_commission_basis_points = U256::from(leader_validator.commission);
4207 let calculated_leader_commission_reward = leader_commission_basis_points
4208 .checked_mul(total_reward)
4209 .context("overflow")?
4210 .checked_div(U256::from(COMMISSION_BASIS_POINTS))
4211 .context("overflow")?;
4212
4213 assert!(
4214 computed_rewards.leader_commission().0 - calculated_leader_commission_reward
4215 <= U256::from(10_u64)
4216 );
4217
4218 let leader_commission = *computed_rewards.leader_commission();
4224 for (address, amount) in computed_rewards.delegators().clone() {
4225 rewards_map
4226 .entry(address)
4227 .and_modify(|entry| *entry += amount)
4228 .or_insert(amount);
4229 }
4230
4231 rewards_map
4233 .entry(leader_eth_address)
4234 .and_modify(|entry| *entry += leader_commission)
4235 .or_insert(leader_commission);
4236
4237 for (address, calculated_amount) in rewards_map.iter() {
4239 let amount_from_api = client
4240 .get::<Option<RewardAmount>>(&format!(
4241 "reward-state/reward-balance/{block}/{address}"
4242 ))
4243 .send()
4244 .await
4245 .ok()
4246 .flatten()
4247 .expect("amount");
4248 assert_eq!(amount_from_api, *calculated_amount)
4249 }
4250 }
4251
4252 Ok(())
4253 }
4254
4255 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4256 async fn test_rewards_v4() -> anyhow::Result<()> {
4257 const EPOCH_HEIGHT: u64 = 20;
4267
4268 type V4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
4269
4270 let network_config = TestConfigBuilder::default()
4271 .epoch_height(EPOCH_HEIGHT)
4272 .build();
4273
4274 let api_port = pick_unused_port().expect("No ports free for query service");
4275
4276 const NUM_NODES: usize = 5;
4277
4278 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4279 let persistence: [_; NUM_NODES] = storage
4280 .iter()
4281 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4282 .collect::<Vec<_>>()
4283 .try_into()
4284 .unwrap();
4285
4286 let config = TestNetworkConfigBuilder::with_num_nodes()
4287 .api_config(SqlDataSource::options(
4288 &storage[0],
4289 Options::with_port(api_port),
4290 ))
4291 .network_config(network_config)
4292 .persistences(persistence.clone())
4293 .catchups(std::array::from_fn(|_| {
4294 StatePeers::<StaticVersion<0, 1>>::from_urls(
4295 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4296 Default::default(),
4297 &NoMetrics,
4298 )
4299 }))
4300 .pos_hook::<V4>(DelegationConfig::MultipleDelegators, Default::default())
4301 .await
4302 .unwrap()
4303 .build();
4304
4305 let network = TestNetwork::new(config, V4::new()).await;
4306 let client: Client<ServerError, SequencerApiVersion> =
4307 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4308
4309 let mut events = network.peers[0].event_stream().await;
4311 while let Some(event) = events.next().await {
4312 if let EventType::Decide { leaf_chain, .. } = event.event {
4313 let height = leaf_chain[0].leaf.height();
4314 tracing::info!("Node 0 decided at height: {height}");
4315 if height > EPOCH_HEIGHT * 3 {
4316 break;
4317 }
4318 }
4319 }
4320
4321 {
4323 client
4324 .get::<ValidatorMap>("node/validators/1")
4325 .send()
4326 .await
4327 .unwrap()
4328 .is_empty();
4329
4330 client
4331 .get::<ValidatorMap>("node/validators/2")
4332 .send()
4333 .await
4334 .unwrap()
4335 .is_empty();
4336 }
4337
4338 let validators = client
4340 .get::<ValidatorMap>("node/validators/3")
4341 .send()
4342 .await
4343 .expect("validators");
4344
4345 assert!(!validators.is_empty());
4346
4347 let mut addresses = HashSet::new();
4349 for v in validators.values() {
4350 addresses.insert(v.account);
4351 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4352 }
4353
4354 let mut leaves = client
4355 .socket("availability/stream/leaves/0")
4356 .subscribe::<LeafQueryData<SeqTypes>>()
4357 .await
4358 .unwrap();
4359
4360 let node_state = network.server.node_state();
4361 let coordinator = node_state.coordinator;
4362
4363 let membership = coordinator.membership().read().await;
4364
4365 while let Some(leaf) = leaves.next().await {
4367 let leaf = leaf.unwrap();
4368 let header = leaf.header();
4369 assert_eq!(header.total_reward_distributed().unwrap().0, U256::ZERO);
4370
4371 let epoch_number =
4372 EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4373
4374 assert!(membership.epoch_block_reward(epoch_number).is_none());
4375
4376 let height = header.height();
4377 for address in addresses.clone() {
4378 let amount = client
4379 .get::<Option<RewardAmount>>(&format!(
4380 "reward-state-v2/reward-balance/{height}/{address}"
4381 ))
4382 .send()
4383 .await
4384 .ok()
4385 .flatten();
4386 assert!(amount.is_none(), "amount is not none for block {height}")
4387 }
4388
4389 if leaf.height() == EPOCH_HEIGHT * 2 {
4390 break;
4391 }
4392 }
4393
4394 drop(membership);
4395
4396 let mut rewards_map = HashMap::new();
4397 let mut total_distributed = U256::ZERO;
4398 let mut epoch_rewards = HashMap::<EpochNumber, U256>::new();
4399
4400 while let Some(leaf) = leaves.next().await {
4401 let leaf = leaf.unwrap();
4402
4403 let header = leaf.header();
4404 let distributed = header
4405 .total_reward_distributed()
4406 .expect("rewards distributed is none");
4407
4408 let block = leaf.height();
4409 tracing::info!("verify rewards for block={block:?}");
4410 let membership = coordinator.membership().read().await;
4411 let epoch_number =
4412 EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4413
4414 let block_reward = membership.epoch_block_reward(epoch_number).unwrap();
4415 let leader = membership
4416 .leader(leaf.leaf().view_number(), Some(epoch_number))
4417 .expect("leader");
4418 let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
4419
4420 drop(membership);
4421
4422 let validators = client
4423 .get::<ValidatorMap>(&format!("node/validators/{epoch_number}"))
4424 .send()
4425 .await
4426 .expect("validators");
4427
4428 let leader_validator = validators
4429 .get(&leader_eth_address)
4430 .expect("leader not found");
4431
4432 let distributor =
4433 RewardDistributor::new(leader_validator.clone(), block_reward, distributed);
4434 for validator in validators.values() {
4436 let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
4437
4438 assert_eq!(delegator_stake_sum, validator.stake);
4439 }
4440
4441 let computed_rewards = distributor.compute_rewards().expect("reward computation");
4442
4443 let total_reward = block_reward.0;
4445 let leader_commission_basis_points = U256::from(leader_validator.commission);
4446 let calculated_leader_commission_reward = leader_commission_basis_points
4447 .checked_mul(total_reward)
4448 .context("overflow")?
4449 .checked_div(U256::from(COMMISSION_BASIS_POINTS))
4450 .context("overflow")?;
4451
4452 assert!(
4453 computed_rewards.leader_commission().0 - calculated_leader_commission_reward
4454 <= U256::from(10_u64)
4455 );
4456
4457 let leader_commission = *computed_rewards.leader_commission();
4459 for (address, amount) in computed_rewards.delegators().clone() {
4460 rewards_map
4461 .entry(address)
4462 .and_modify(|entry| *entry += amount)
4463 .or_insert(amount);
4464 }
4465
4466 rewards_map
4468 .entry(leader_eth_address)
4469 .and_modify(|entry| *entry += leader_commission)
4470 .or_insert(leader_commission);
4471
4472 for (address, calculated_amount) in rewards_map.iter() {
4474 let mut attempt = 0;
4475 let amount_from_api = loop {
4476 let result = client
4477 .get::<Option<RewardAmount>>(&format!(
4478 "reward-state-v2/reward-balance/{block}/{address}"
4479 ))
4480 .send()
4481 .await
4482 .ok()
4483 .flatten();
4484
4485 if let Some(amount) = result {
4486 break amount;
4487 }
4488
4489 attempt += 1;
4490 if attempt >= 3 {
4491 panic!(
4492 "Failed to fetch reward amount for address {address} after 3 retries"
4493 );
4494 }
4495
4496 sleep(Duration::from_secs(2)).await;
4497 };
4498
4499 assert_eq!(amount_from_api, *calculated_amount);
4500 }
4501
4502 total_distributed += block_reward.0;
4504 assert_eq!(
4505 header.total_reward_distributed().unwrap().0,
4506 total_distributed
4507 );
4508
4509 epoch_rewards
4511 .entry(epoch_number)
4512 .and_modify(|r| assert_eq!(*r, block_reward.0))
4513 .or_insert(block_reward.0);
4514
4515 if leaf.height() == EPOCH_HEIGHT * 5 {
4517 break;
4518 }
4519 }
4520
4521 Ok(())
4522 }
4523
4524 #[rstest]
4525 #[case(PosVersionV3::new())]
4526 #[case(PosVersionV4::new())]
4527 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4528
4529 async fn test_node_stake_table_api<Ver: Versions>(#[case] ver: Ver) {
4530 let epoch_height = 20;
4531
4532 let network_config = TestConfigBuilder::default()
4533 .epoch_height(epoch_height)
4534 .build();
4535
4536 let api_port = pick_unused_port().expect("No ports free for query service");
4537
4538 const NUM_NODES: usize = 2;
4539 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4541 let persistence: [_; NUM_NODES] = storage
4542 .iter()
4543 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4544 .collect::<Vec<_>>()
4545 .try_into()
4546 .unwrap();
4547
4548 let config = TestNetworkConfigBuilder::with_num_nodes()
4549 .api_config(SqlDataSource::options(
4550 &storage[0],
4551 Options::with_port(api_port),
4552 ))
4553 .network_config(network_config)
4554 .persistences(persistence.clone())
4555 .catchups(std::array::from_fn(|_| {
4556 StatePeers::<StaticVersion<0, 1>>::from_urls(
4557 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4558 Default::default(),
4559 &NoMetrics,
4560 )
4561 }))
4562 .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4563 .await
4564 .unwrap()
4565 .build();
4566
4567 let _network = TestNetwork::new(config, ver).await;
4568
4569 let client: Client<ServerError, SequencerApiVersion> =
4570 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4571
4572 let _blocks = client
4574 .socket("availability/stream/blocks/0")
4575 .subscribe::<BlockQueryData<SeqTypes>>()
4576 .await
4577 .unwrap()
4578 .take(40)
4579 .try_collect::<Vec<_>>()
4580 .await
4581 .unwrap();
4582
4583 for i in 1..=3 {
4584 let _st = client
4585 .get::<Vec<PeerConfig<SeqTypes>>>(&format!("node/stake-table/{}", i as u64))
4586 .send()
4587 .await
4588 .expect("failed to get stake table");
4589 }
4590
4591 let _st = client
4592 .get::<StakeTableWithEpochNumber<SeqTypes>>("node/stake-table/current")
4593 .send()
4594 .await
4595 .expect("failed to get stake table");
4596 }
4597
4598 #[rstest]
4599 #[case(PosVersionV3::new())]
4600 #[case(PosVersionV4::new())]
4601 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4602
4603 async fn test_epoch_stake_table_catchup<Ver: Versions>(#[case] ver: Ver) {
4604 const EPOCH_HEIGHT: u64 = 10;
4605 const NUM_NODES: usize = 6;
4606
4607 let port = pick_unused_port().expect("No ports free");
4608
4609 let network_config = TestConfigBuilder::default()
4610 .epoch_height(EPOCH_HEIGHT)
4611 .build();
4612
4613 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4615
4616 let persistence_options: [_; NUM_NODES] = storage
4617 .iter()
4618 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4619 .collect::<Vec<_>>()
4620 .try_into()
4621 .unwrap();
4622
4623 let catchup_peers = std::array::from_fn(|_| {
4625 StatePeers::<StaticVersion<0, 1>>::from_urls(
4626 vec![format!("http://localhost:{port}").parse().unwrap()],
4627 Default::default(),
4628 &NoMetrics,
4629 )
4630 });
4631 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
4632 .api_config(SqlDataSource::options(
4633 &storage[0],
4634 Options::with_port(port),
4635 ))
4636 .network_config(network_config)
4637 .persistences(persistence_options.clone())
4638 .catchups(catchup_peers)
4639 .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4640 .await
4641 .unwrap()
4642 .build();
4643
4644 let state = config.states()[0].clone();
4645 let mut network = TestNetwork::new(config, ver).await;
4646
4647 let mut events = network.peers[0].event_stream().await;
4649 while let Some(event) = events.next().await {
4650 if let EventType::Decide { leaf_chain, .. } = event.event {
4651 let height = leaf_chain[0].leaf.height();
4652 tracing::info!("Node 0 decided at height: {height}");
4653 if height > EPOCH_HEIGHT * 3 {
4654 break;
4655 }
4656 }
4657 }
4658
4659 tracing::info!("Shutting down peer 0");
4661 network.peers.remove(0);
4662
4663 let mut events = network.server.event_stream().await;
4665 while let Some(event) = events.next().await {
4666 if let EventType::Decide { leaf_chain, .. } = event.event {
4667 let height = leaf_chain[0].leaf.height();
4668 if height > EPOCH_HEIGHT * 7 {
4669 break;
4670 }
4671 }
4672 }
4673
4674 let storage = SqlDataSource::create_storage().await;
4676 let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
4677 tracing::info!("Restarting peer 0");
4678 let node = network
4679 .cfg
4680 .init_node(
4681 1,
4682 state,
4683 options,
4684 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4685 vec![format!("http://localhost:{port}").parse().unwrap()],
4686 Default::default(),
4687 &NoMetrics,
4688 )),
4689 None,
4690 &NoMetrics,
4691 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4692 NullEventConsumer,
4693 ver,
4694 Default::default(),
4695 )
4696 .await;
4697
4698 let coordinator = node.node_state().coordinator;
4699 let server_node_state = network.server.node_state();
4700 let server_coordinator = server_node_state.coordinator;
4701 for epoch_num in 1..=7 {
4703 let epoch = EpochNumber::new(epoch_num);
4704 let membership_for_epoch = coordinator.membership_for_epoch(Some(epoch)).await;
4705 if membership_for_epoch.is_err() {
4706 coordinator.wait_for_catchup(epoch).await.unwrap();
4707 }
4708
4709 println!("have stake table for epoch = {epoch_num}");
4710
4711 let node_stake_table = coordinator
4712 .membership()
4713 .read()
4714 .await
4715 .stake_table(Some(epoch));
4716 let stake_table = server_coordinator
4717 .membership()
4718 .read()
4719 .await
4720 .stake_table(Some(epoch));
4721 println!("asserting stake table for epoch = {epoch_num}");
4722
4723 assert_eq!(
4724 node_stake_table, stake_table,
4725 "Stake table mismatch for epoch {epoch_num}",
4726 );
4727 }
4728 }
4729
4730 #[rstest]
4731 #[case(PosVersionV3::new())]
4732 #[case(PosVersionV4::new())]
4733 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4734
4735 async fn test_epoch_stake_table_catchup_stress<Ver: Versions>(#[case] versions: Ver) {
4736 const EPOCH_HEIGHT: u64 = 10;
4737 const NUM_NODES: usize = 6;
4738
4739 let port = pick_unused_port().expect("No ports free");
4740
4741 let network_config = TestConfigBuilder::default()
4742 .epoch_height(EPOCH_HEIGHT)
4743 .build();
4744
4745 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4747
4748 let persistence_options: [_; NUM_NODES] = storage
4749 .iter()
4750 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4751 .collect::<Vec<_>>()
4752 .try_into()
4753 .unwrap();
4754
4755 let catchup_peers = std::array::from_fn(|_| {
4757 StatePeers::<StaticVersion<0, 1>>::from_urls(
4758 vec![format!("http://localhost:{port}").parse().unwrap()],
4759 Default::default(),
4760 &NoMetrics,
4761 )
4762 });
4763 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
4764 .api_config(SqlDataSource::options(
4765 &storage[0],
4766 Options::with_port(port),
4767 ))
4768 .network_config(network_config)
4769 .persistences(persistence_options.clone())
4770 .catchups(catchup_peers)
4771 .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4772 .await
4773 .unwrap()
4774 .build();
4775
4776 let state = config.states()[0].clone();
4777 let mut network = TestNetwork::new(config, versions).await;
4778
4779 let mut events = network.peers[0].event_stream().await;
4781 while let Some(event) = events.next().await {
4782 if let EventType::Decide { leaf_chain, .. } = event.event {
4783 let height = leaf_chain[0].leaf.height();
4784 tracing::info!("Node 0 decided at height: {height}");
4785 if height > EPOCH_HEIGHT * 3 {
4786 break;
4787 }
4788 }
4789 }
4790
4791 tracing::info!("Shutting down peer 0");
4793 network.peers.remove(0);
4794
4795 let mut events = network.server.event_stream().await;
4797 while let Some(event) = events.next().await {
4798 if let EventType::Decide { leaf_chain, .. } = event.event {
4799 let height = leaf_chain[0].leaf.height();
4800 tracing::info!("Server decided at height: {height}");
4801 if height > EPOCH_HEIGHT * 7 {
4803 break;
4804 }
4805 }
4806 }
4807
4808 let storage = SqlDataSource::create_storage().await;
4810 let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
4811
4812 tracing::info!("Restarting peer 0");
4813 let node = network
4814 .cfg
4815 .init_node(
4816 1,
4817 state,
4818 options,
4819 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4820 vec![format!("http://localhost:{port}").parse().unwrap()],
4821 Default::default(),
4822 &NoMetrics,
4823 )),
4824 None,
4825 &NoMetrics,
4826 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4827 NullEventConsumer,
4828 versions,
4829 Default::default(),
4830 )
4831 .await;
4832
4833 let coordinator = node.node_state().coordinator;
4834
4835 let server_node_state = network.server.node_state();
4836 let server_coordinator = server_node_state.coordinator;
4837
4838 let mut rand_epochs: Vec<_> = (1..=7).collect();
4840 rand_epochs.shuffle(&mut rand::thread_rng());
4841 println!("trigger catchup in this order: {rand_epochs:?}");
4842 for epoch_num in rand_epochs {
4843 let epoch = EpochNumber::new(epoch_num);
4844 let _ = coordinator.membership_for_epoch(Some(epoch)).await;
4845 }
4846
4847 for epoch_num in 1..=7 {
4849 println!("getting stake table for epoch = {epoch_num}");
4850 let epoch = EpochNumber::new(epoch_num);
4851 let _ = coordinator.wait_for_catchup(epoch).await.unwrap();
4852
4853 println!("have stake table for epoch = {epoch_num}");
4854
4855 let node_stake_table = coordinator
4856 .membership()
4857 .read()
4858 .await
4859 .stake_table(Some(epoch));
4860 let stake_table = server_coordinator
4861 .membership()
4862 .read()
4863 .await
4864 .stake_table(Some(epoch));
4865
4866 println!("asserting stake table for epoch = {epoch_num}");
4867
4868 assert_eq!(
4869 node_stake_table, stake_table,
4870 "Stake table mismatch for epoch {epoch_num}",
4871 );
4872 }
4873 }
4874
4875 #[rstest]
4876 #[case(PosVersionV3::new())]
4877 #[case(PosVersionV4::new())]
4878 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4879 async fn test_merklized_state_catchup_on_restart<Ver: Versions>(
4880 #[case] versions: Ver,
4881 ) -> anyhow::Result<()> {
4882 const EPOCH_HEIGHT: u64 = 10;
4894
4895 let network_config = TestConfigBuilder::default()
4896 .epoch_height(EPOCH_HEIGHT)
4897 .build();
4898
4899 let api_port = pick_unused_port().expect("No ports free for query service");
4900
4901 tracing::info!("API PORT = {api_port}");
4902 const NUM_NODES: usize = 5;
4903
4904 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4905 let persistence: [_; NUM_NODES] = storage
4906 .iter()
4907 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4908 .collect::<Vec<_>>()
4909 .try_into()
4910 .unwrap();
4911
4912 let config = TestNetworkConfigBuilder::with_num_nodes()
4913 .api_config(SqlDataSource::options(
4914 &storage[0],
4915 Options::with_port(api_port).catchup(Default::default()),
4916 ))
4917 .network_config(network_config)
4918 .persistences(persistence.clone())
4919 .catchups(std::array::from_fn(|_| {
4920 StatePeers::<StaticVersion<0, 1>>::from_urls(
4921 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4922 Default::default(),
4923 &NoMetrics,
4924 )
4925 }))
4926 .pos_hook::<Ver>(
4927 DelegationConfig::MultipleDelegators,
4928 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
4929 )
4930 .await
4931 .unwrap()
4932 .build();
4933 let state = config.states()[0].clone();
4934 let mut network = TestNetwork::new(config, versions).await;
4935
4936 network.peers[0].shut_down().await;
4941 network.peers.remove(0);
4942 let node_0_storage = &storage[1];
4943 let node_0_persistence = persistence[1].clone();
4944 let node_0_port = pick_unused_port().expect("No ports free for query service");
4945 tracing::info!("node_0_port {node_0_port}");
4946 let opt = Options::with_port(node_0_port).query_sql(
4948 Query {
4949 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
4950 },
4951 tmp_options(node_0_storage),
4952 );
4953
4954 let node_0 = opt
4956 .clone()
4957 .serve(|metrics, consumer, storage| {
4958 let cfg = network.cfg.clone();
4959 let node_0_persistence = node_0_persistence.clone();
4960 let state = state.clone();
4961 async move {
4962 Ok(cfg
4963 .init_node(
4964 1,
4965 state,
4966 node_0_persistence.clone(),
4967 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4968 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4969 Default::default(),
4970 &NoMetrics,
4971 )),
4972 storage,
4973 &*metrics,
4974 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4975 consumer,
4976 versions,
4977 Default::default(),
4978 )
4979 .await)
4980 }
4981 .boxed()
4982 })
4983 .await
4984 .unwrap();
4985
4986 let mut events = network.peers[2].event_stream().await;
4987 wait_for_epochs(&mut events, EPOCH_HEIGHT, 1).await;
4989
4990 drop(node_0);
4992
4993 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
4995
4996 let node_0 = opt
4998 .serve(|metrics, consumer, storage| {
4999 let cfg = network.cfg.clone();
5000 async move {
5001 Ok(cfg
5002 .init_node(
5003 1,
5004 state,
5005 node_0_persistence,
5006 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5007 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5008 Default::default(),
5009 &NoMetrics,
5010 )),
5011 storage,
5012 &*metrics,
5013 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5014 consumer,
5015 versions,
5016 Default::default(),
5017 )
5018 .await)
5019 }
5020 .boxed()
5021 })
5022 .await
5023 .unwrap();
5024
5025 let client: Client<ServerError, SequencerApiVersion> =
5026 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5027 client.connect(None).await;
5028
5029 wait_for_epochs(&mut events, EPOCH_HEIGHT, 6).await;
5030
5031 let epoch_7_block = EPOCH_HEIGHT * 6 + 1;
5032
5033 let mut retries = 0;
5035 loop {
5036 sleep(Duration::from_secs(1)).await;
5037 let state = node_0.decided_state().await;
5038
5039 let leaves = if Ver::Base::VERSION == EpochVersion::VERSION {
5040 state.reward_merkle_tree_v1.num_leaves()
5042 } else {
5043 state.reward_merkle_tree_v2.num_leaves()
5045 };
5046
5047 if leaves > 0 {
5048 tracing::info!("Node's state has reward accounts");
5049 break;
5050 }
5051
5052 retries += 1;
5053 if retries > 120 {
5054 panic!("max retries reached. failed to catchup reward state");
5055 }
5056 }
5057
5058 retries = 0;
5059 loop {
5061 sleep(Duration::from_secs(3)).await;
5062
5063 let bh = client
5064 .get::<u64>("block-state/block-height")
5065 .send()
5066 .await
5067 .expect("block height not found");
5068
5069 tracing::info!("block state: block height={bh}");
5070 if bh > epoch_7_block {
5071 break;
5072 }
5073
5074 retries += 1;
5075 if retries > 30 {
5076 panic!(
5077 "max retries reached. block state block height is less than epoch 7 start \
5078 block"
5079 );
5080 }
5081 }
5082
5083 node_0.shutdown_consensus().await;
5085 let decided_leaf = node_0.decided_leaf().await;
5086 let state = node_0.decided_state().await;
5087
5088 state
5089 .block_merkle_tree
5090 .lookup(decided_leaf.height() - 1)
5091 .expect_ok()
5092 .expect("block state not found");
5093
5094 Ok(())
5095 }
5096
5097 #[rstest]
5098 #[case(PosVersionV3::new())]
5099 #[case(PosVersionV4::new())]
5100 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5101 async fn test_state_reconstruction<Ver: Versions>(
5102 #[case] pos_version: Ver,
5103 ) -> anyhow::Result<()> {
5104 const EPOCH_HEIGHT: u64 = 10;
5120
5121 let network_config = TestConfigBuilder::default()
5122 .epoch_height(EPOCH_HEIGHT)
5123 .build();
5124
5125 let api_port = pick_unused_port().expect("No ports free for query service");
5126
5127 tracing::info!("API PORT = {api_port}");
5128 const NUM_NODES: usize = 5;
5129
5130 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5131 let persistence: [_; NUM_NODES] = storage
5132 .iter()
5133 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5134 .collect::<Vec<_>>()
5135 .try_into()
5136 .unwrap();
5137
5138 let config = TestNetworkConfigBuilder::with_num_nodes()
5139 .api_config(SqlDataSource::options(
5140 &storage[0],
5141 Options::with_port(api_port),
5142 ))
5143 .network_config(network_config)
5144 .persistences(persistence.clone())
5145 .catchups(std::array::from_fn(|_| {
5146 StatePeers::<StaticVersion<0, 1>>::from_urls(
5147 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5148 Default::default(),
5149 &NoMetrics,
5150 )
5151 }))
5152 .pos_hook::<Ver>(
5153 DelegationConfig::MultipleDelegators,
5154 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5155 )
5156 .await
5157 .unwrap()
5158 .build();
5159 let state = config.states()[0].clone();
5160 let mut network = TestNetwork::new(config, pos_version).await;
5161 network.peers.remove(0);
5166
5167 let node_0_storage = &storage[1];
5168 let node_0_persistence = persistence[1].clone();
5169 let node_0_port = pick_unused_port().expect("No ports free for query service");
5170 tracing::info!("node_0_port {node_0_port}");
5171 let opt = Options::with_port(node_0_port).query_sql(
5172 Query {
5173 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
5174 },
5175 tmp_options(node_0_storage),
5176 );
5177 let node_0 = opt
5178 .clone()
5179 .serve(|metrics, consumer, storage| {
5180 let cfg = network.cfg.clone();
5181 let node_0_persistence = node_0_persistence.clone();
5182 let state = state.clone();
5183 async move {
5184 Ok(cfg
5185 .init_node(
5186 1,
5187 state,
5188 node_0_persistence.clone(),
5189 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5190 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5191 Default::default(),
5192 &NoMetrics,
5193 )),
5194 storage,
5195 &*metrics,
5196 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5197 consumer,
5198 pos_version,
5199 Default::default(),
5200 )
5201 .await)
5202 }
5203 .boxed()
5204 })
5205 .await
5206 .unwrap();
5207
5208 let mut events = network.peers[2].event_stream().await;
5209 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
5211
5212 tracing::warn!("shutting down node 0");
5213
5214 node_0.shutdown_consensus().await;
5215
5216 let instance = node_0.node_state();
5217 let state = node_0.decided_state().await;
5218 let fee_accounts = state
5219 .fee_merkle_tree
5220 .clone()
5221 .into_iter()
5222 .map(|(acct, _)| acct)
5223 .collect::<Vec<_>>();
5224 let reward_accounts = match Ver::Base::VERSION {
5225 EpochVersion::VERSION => state
5226 .reward_merkle_tree_v1
5227 .clone()
5228 .into_iter()
5229 .map(|(acct, _)| RewardAccountV2::from(acct))
5230 .collect::<Vec<_>>(),
5231 DrbAndHeaderUpgradeVersion::VERSION => state
5232 .reward_merkle_tree_v2
5233 .clone()
5234 .into_iter()
5235 .map(|(acct, _)| acct)
5236 .collect::<Vec<_>>(),
5237 _ => panic!("invalid version"),
5238 };
5239
5240 let client: Client<ServerError, SequencerApiVersion> =
5241 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5242 client.connect(Some(Duration::from_secs(10))).await;
5243
5244 sleep(Duration::from_secs(3)).await;
5247
5248 tracing::info!("getting node block height");
5249 let node_block_height = client
5250 .get::<u64>("node/block-height")
5251 .send()
5252 .await
5253 .context("getting Espresso block height")
5254 .unwrap();
5255
5256 tracing::info!("node block height={node_block_height}");
5257
5258 let leaf_query_data = client
5259 .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{}", node_block_height - 1))
5260 .send()
5261 .await
5262 .context("error getting leaf")
5263 .unwrap();
5264
5265 tracing::info!("leaf={leaf_query_data:?}");
5266 let leaf = leaf_query_data.leaf();
5267 let to_view = leaf.view_number() + 1;
5268
5269 let ds = SqlStorage::connect(
5270 Config::try_from(&node_0_persistence).unwrap(),
5271 StorageConnectionType::Sequencer,
5272 )
5273 .await
5274 .unwrap();
5275 let mut tx = ds.read().await?;
5276
5277 let (state, leaf) = reconstruct_state(
5278 &instance,
5279 &ds,
5280 &mut tx,
5281 node_block_height - 1,
5282 to_view,
5283 &[],
5284 &[],
5285 )
5286 .await
5287 .unwrap();
5288 assert_eq!(leaf.view_number(), to_view);
5289 assert!(
5290 state
5291 .block_merkle_tree
5292 .lookup(node_block_height - 1)
5293 .expect_ok()
5294 .is_ok(),
5295 "inconsistent block merkle tree"
5296 );
5297
5298 let (state, leaf) = reconstruct_state(
5300 &instance,
5301 &ds,
5302 &mut tx,
5303 node_block_height - 1,
5304 to_view,
5305 &fee_accounts,
5306 &[],
5307 )
5308 .await
5309 .unwrap();
5310
5311 assert_eq!(leaf.view_number(), to_view);
5312 assert!(
5313 state
5314 .block_merkle_tree
5315 .lookup(node_block_height - 1)
5316 .expect_ok()
5317 .is_ok(),
5318 "inconsistent block merkle tree"
5319 );
5320
5321 for account in &fee_accounts {
5322 state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
5323 }
5324
5325 let (state, leaf) = reconstruct_state(
5328 &instance,
5329 &ds,
5330 &mut tx,
5331 node_block_height - 1,
5332 to_view,
5333 &[],
5334 &reward_accounts,
5335 )
5336 .await
5337 .unwrap();
5338
5339 match Ver::Base::VERSION {
5340 EpochVersion::VERSION => {
5341 for account in reward_accounts.clone() {
5342 state
5343 .reward_merkle_tree_v1
5344 .lookup(RewardAccountV1::from(account))
5345 .expect_ok()
5346 .unwrap();
5347 }
5348 },
5349 DrbAndHeaderUpgradeVersion::VERSION => {
5350 for account in &reward_accounts {
5351 state
5352 .reward_merkle_tree_v2
5353 .lookup(account)
5354 .expect_ok()
5355 .unwrap();
5356 }
5357 },
5358 _ => panic!("invalid version"),
5359 };
5360
5361 assert_eq!(leaf.view_number(), to_view);
5362 assert!(
5363 state
5364 .block_merkle_tree
5365 .lookup(node_block_height - 1)
5366 .expect_ok()
5367 .is_ok(),
5368 "inconsistent block merkle tree"
5369 );
5370 let (state, leaf) = reconstruct_state(
5373 &instance,
5374 &ds,
5375 &mut tx,
5376 node_block_height - 1,
5377 to_view,
5378 &fee_accounts,
5379 &reward_accounts,
5380 )
5381 .await
5382 .unwrap();
5383
5384 assert!(
5385 state
5386 .block_merkle_tree
5387 .lookup(node_block_height - 1)
5388 .expect_ok()
5389 .is_ok(),
5390 "inconsistent block merkle tree"
5391 );
5392 assert_eq!(leaf.view_number(), to_view);
5393
5394 match Ver::Base::VERSION {
5395 EpochVersion::VERSION => {
5396 for account in reward_accounts.clone() {
5397 state
5398 .reward_merkle_tree_v1
5399 .lookup(RewardAccountV1::from(account))
5400 .expect_ok()
5401 .unwrap();
5402 }
5403 },
5404 DrbAndHeaderUpgradeVersion::VERSION => {
5405 for account in &reward_accounts {
5406 state
5407 .reward_merkle_tree_v2
5408 .lookup(account)
5409 .expect_ok()
5410 .unwrap();
5411 }
5412 },
5413 _ => panic!("invalid version"),
5414 };
5415
5416 for account in &fee_accounts {
5417 state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
5418 }
5419
5420 Ok(())
5421 }
5422
5423 #[rstest]
5424 #[case(PosVersionV3::new())]
5425 #[case(PosVersionV4::new())]
5426 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5427 async fn test_block_reward_api<Ver: Versions>(#[case] versions: Ver) -> anyhow::Result<()> {
5428 let epoch_height = 10;
5429
5430 let network_config = TestConfigBuilder::default()
5431 .epoch_height(epoch_height)
5432 .build();
5433
5434 let api_port = pick_unused_port().expect("No ports free for query service");
5435
5436 const NUM_NODES: usize = 1;
5437 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5439 let persistence: [_; NUM_NODES] = storage
5440 .iter()
5441 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5442 .collect::<Vec<_>>()
5443 .try_into()
5444 .unwrap();
5445
5446 let config = TestNetworkConfigBuilder::with_num_nodes()
5447 .api_config(SqlDataSource::options(
5448 &storage[0],
5449 Options::with_port(api_port),
5450 ))
5451 .network_config(network_config.clone())
5452 .persistences(persistence.clone())
5453 .catchups(std::array::from_fn(|_| {
5454 StatePeers::<StaticVersion<0, 1>>::from_urls(
5455 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5456 Default::default(),
5457 &NoMetrics,
5458 )
5459 }))
5460 .pos_hook::<Ver>(DelegationConfig::VariableAmounts, Default::default())
5461 .await
5462 .unwrap()
5463 .build();
5464
5465 let _network = TestNetwork::new(config, versions).await;
5466 let client: Client<ServerError, SequencerApiVersion> =
5467 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5468
5469 let _blocks = client
5470 .socket("availability/stream/blocks/0")
5471 .subscribe::<BlockQueryData<SeqTypes>>()
5472 .await
5473 .unwrap()
5474 .take(3)
5475 .try_collect::<Vec<_>>()
5476 .await
5477 .unwrap();
5478
5479 let block_reward = client
5480 .get::<Option<RewardAmount>>("node/block-reward")
5481 .send()
5482 .await
5483 .expect("failed to get block reward")
5484 .expect("block reward is None");
5485 tracing::info!("block_reward={block_reward:?}");
5486
5487 assert!(block_reward.0 > U256::ZERO);
5488
5489 Ok(())
5490 }
5491
5492 #[rstest]
5493 #[case(PosVersionV4::new())]
5494 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5495 async fn test_token_supply_api<Ver: Versions>(#[case] versions: Ver) -> anyhow::Result<()> {
5496 let epoch_height = 10;
5497
5498 let network_config = TestConfigBuilder::default()
5499 .epoch_height(epoch_height)
5500 .build();
5501
5502 let api_port = pick_unused_port().expect("No ports free for query service");
5503
5504 const NUM_NODES: usize = 1;
5505 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5507 let persistence: [_; NUM_NODES] = storage
5508 .iter()
5509 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5510 .collect::<Vec<_>>()
5511 .try_into()
5512 .unwrap();
5513
5514 let config = TestNetworkConfigBuilder::with_num_nodes()
5515 .api_config(SqlDataSource::options(
5516 &storage[0],
5517 Options::with_port(api_port),
5518 ))
5519 .network_config(network_config.clone())
5520 .persistences(persistence.clone())
5521 .catchups(std::array::from_fn(|_| {
5522 StatePeers::<StaticVersion<0, 1>>::from_urls(
5523 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5524 Default::default(),
5525 &NoMetrics,
5526 )
5527 }))
5528 .pos_hook::<Ver>(DelegationConfig::VariableAmounts, Default::default())
5529 .await
5530 .unwrap()
5531 .build();
5532
5533 let _network = TestNetwork::new(config, versions).await;
5534 let client: Client<ServerError, SequencerApiVersion> =
5535 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5536
5537 let _blocks = client
5538 .socket("availability/stream/blocks/0")
5539 .subscribe::<BlockQueryData<SeqTypes>>()
5540 .await
5541 .unwrap()
5542 .take(3)
5543 .try_collect::<Vec<_>>()
5544 .await
5545 .unwrap();
5546
5547 let total_minted_supply = client
5548 .get::<String>("token/total-minted-supply")
5549 .send()
5550 .await
5551 .expect("failed to get total_minted_supply");
5552 tracing::info!("total_minted_supply={total_minted_supply:?}");
5553
5554 assert_eq!(total_minted_supply, "100000.0");
5555
5556 Ok(())
5557 }
5558
5559 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5560 async fn test_scanning_token_contract_initialized_event() -> anyhow::Result<()> {
5561 use espresso_types::v0_3::ChainConfig;
5562
5563 let blocks_per_epoch = 10;
5564
5565 let network_config = TestConfigBuilder::<1>::default()
5566 .epoch_height(blocks_per_epoch)
5567 .build();
5568
5569 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
5570 &network_config.hotshot_config().hotshot_stake_table(),
5571 STAKE_TABLE_CAPACITY_FOR_TEST,
5572 )
5573 .unwrap();
5574
5575 let deployer = ProviderBuilder::new()
5576 .wallet(EthereumWallet::from(network_config.signer().clone()))
5577 .connect_http(network_config.l1_url().clone());
5578
5579 let mut contracts = Contracts::new();
5580 let args = DeployerArgsBuilder::default()
5581 .deployer(deployer.clone())
5582 .rpc_url(network_config.l1_url().clone())
5583 .mock_light_client(true)
5584 .genesis_lc_state(genesis_state)
5585 .genesis_st_state(genesis_stake)
5586 .blocks_per_epoch(blocks_per_epoch)
5587 .epoch_start_block(1)
5588 .multisig_pauser(network_config.signer().address())
5589 .token_name("Espresso".to_string())
5590 .token_symbol("ESP".to_string())
5591 .initial_token_supply(U256::from(3590000000u64))
5592 .ops_timelock_delay(U256::from(0))
5593 .ops_timelock_admin(network_config.signer().address())
5594 .ops_timelock_proposers(vec![network_config.signer().address()])
5595 .ops_timelock_executors(vec![network_config.signer().address()])
5596 .safe_exit_timelock_delay(U256::from(0))
5597 .safe_exit_timelock_admin(network_config.signer().address())
5598 .safe_exit_timelock_proposers(vec![network_config.signer().address()])
5599 .safe_exit_timelock_executors(vec![network_config.signer().address()])
5600 .build()
5601 .unwrap();
5602
5603 args.deploy_all(&mut contracts).await.unwrap();
5604
5605 let st_addr = contracts
5606 .address(Contract::StakeTableProxy)
5607 .expect("StakeTableProxy deployed");
5608
5609 let l1_url = network_config.l1_url().clone();
5610
5611 let storage = SqlDataSource::create_storage().await;
5612 let mut opt = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5613 let persistence = opt.create().await.unwrap();
5614
5615 let l1_client = L1ClientOptions {
5616 stake_table_update_interval: Duration::from_secs(7),
5617 l1_retry_delay: Duration::from_millis(10),
5618 l1_events_max_block_range: 10000,
5619 ..Default::default()
5620 }
5621 .connect(vec![l1_url])
5622 .unwrap();
5623 l1_client.spawn_tasks().await;
5624
5625 let fetcher = Fetcher::new(
5626 Arc::new(NullStateCatchup::default()),
5627 Arc::new(Mutex::new(persistence.clone())),
5628 l1_client.clone(),
5629 ChainConfig {
5630 stake_table_contract: Some(st_addr),
5631 base_fee: 0.into(),
5632 ..Default::default()
5633 },
5634 );
5635
5636 let provider = l1_client.provider;
5637 let stake_table = StakeTableV2::new(st_addr, provider.clone());
5638
5639 let stake_table_init_block = stake_table
5640 .initializedAtBlock()
5641 .block(BlockId::finalized())
5642 .call()
5643 .await?
5644 .to::<u64>();
5645
5646 tracing::info!("stake table init block = {stake_table_init_block}");
5647
5648 let token_address = stake_table
5649 .token()
5650 .block(BlockId::finalized())
5651 .call()
5652 .await
5653 .context("Failed to get token address")?;
5654
5655 let token = EspToken::new(token_address, provider.clone());
5656
5657 let init_log = fetcher
5658 .scan_token_contract_initialized_event_log(stake_table_init_block, token)
5659 .await
5660 .unwrap();
5661
5662 let init_tx = provider
5663 .get_transaction_receipt(
5664 init_log
5665 .transaction_hash
5666 .context(format!("transaction hash not found. init_log={init_log:?}"))?,
5667 )
5668 .await
5669 .unwrap()
5670 .unwrap();
5671
5672 let mint_transfer = init_tx.decoded_log::<EspToken::Transfer>().unwrap();
5673
5674 assert!(mint_transfer.value > U256::ZERO);
5675
5676 Ok(())
5677 }
5678
5679 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5680 async fn test_tx_metadata() {
5681 let port = pick_unused_port().expect("No ports free");
5682
5683 let url = format!("http://localhost:{port}").parse().unwrap();
5684 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5685
5686 let storage = SqlDataSource::create_storage().await;
5687 let network_config = TestConfigBuilder::default().build();
5688 let config = TestNetworkConfigBuilder::default()
5689 .api_config(
5690 SqlDataSource::options(&storage, Options::with_port(port))
5691 .submit(Default::default())
5692 .explorer(Default::default()),
5693 )
5694 .network_config(network_config)
5695 .build();
5696 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5697 let mut events = network.server.event_stream().await;
5698
5699 client.connect(None).await;
5700
5701 let namespace_counts = [(101, 1), (102, 2), (103, 3)];
5703 for (ns, count) in &namespace_counts {
5704 for i in 0..*count {
5705 let ns_id = NamespaceId::from(*ns as u64);
5706 let txn = Transaction::new(ns_id, vec![*ns, i]);
5707 client
5708 .post::<()>("submit/submit")
5709 .body_json(&txn)
5710 .unwrap()
5711 .send()
5712 .await
5713 .unwrap();
5714 let (block, _) = wait_for_decide_on_handle(&mut events, &txn).await;
5715
5716 let summary: BlockSummaryQueryData<SeqTypes> = client
5718 .get(&format!("availability/block/summary/{block}"))
5719 .send()
5720 .await
5721 .unwrap();
5722 let ns_info = summary.namespaces();
5723 assert_eq!(ns_info.len(), 1);
5724 assert_eq!(ns_info.keys().copied().collect::<Vec<_>>(), vec![ns_id]);
5725 assert_eq!(ns_info[&ns_id].num_transactions, 1);
5726 assert_eq!(ns_info[&ns_id].size, txn.size_in_block(true));
5727 }
5728 }
5729
5730 for (ns, count) in &namespace_counts {
5732 tracing::info!(ns, "list transactions in namespace");
5733
5734 let ns_id = NamespaceId::from(*ns as u64);
5735 let summaries: TransactionSummariesResponse<SeqTypes> = client
5736 .get(&format!(
5737 "explorer/transactions/latest/{count}/namespace/{ns_id}"
5738 ))
5739 .send()
5740 .await
5741 .unwrap();
5742 let txs = summaries.transaction_summaries;
5743 assert_eq!(txs.len(), *count as usize);
5744
5745 for i in 0..*count {
5747 let summary = &txs[i as usize];
5748 let expected = Transaction::new(ns_id, vec![*ns, count - i - 1]);
5749 assert_eq!(summary.rollups, vec![ns_id]);
5750 assert_eq!(summary.hash, expected.commit());
5751 }
5752 }
5753 }
5754
5755 use std::time::Instant;
5756
5757 use rand::thread_rng;
5758
5759 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5760 async fn test_aggregator_namespace_endpoints() {
5761 let mut rng = thread_rng();
5762
5763 let port = pick_unused_port().expect("No ports free");
5764
5765 let url = format!("http://localhost:{port}").parse().unwrap();
5766 tracing::info!("Sequencer URL = {url}");
5767 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5768
5769 let options = Options::with_port(port).submit(Default::default());
5770 const NUM_NODES: usize = 2;
5771 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5773
5774 let persistence_options: [_; NUM_NODES] = storage
5775 .iter()
5776 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5777 .collect::<Vec<_>>()
5778 .try_into()
5779 .unwrap();
5780
5781 let network_config = TestConfigBuilder::default().build();
5782
5783 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5784 .api_config(SqlDataSource::options(&storage[0], options))
5785 .network_config(network_config)
5786 .persistences(persistence_options.clone())
5787 .build();
5788 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5789 let mut events = network.server.event_stream().await;
5790 let start = Instant::now();
5791 let mut total_transactions = 0;
5792 let mut tx_heights = Vec::new();
5793 let mut sizes = HashMap::new();
5794 for namespace in 1..=4 {
5797 for _count in 0..namespace {
5798 let payload_len = rng.gen_range(4..=10);
5800 let payload: Vec<u8> = (0..payload_len).map(|_| rng.gen()).collect();
5801
5802 let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
5803
5804 client.connect(None).await;
5805
5806 let hash = client
5807 .post("submit/submit")
5808 .body_json(&txn)
5809 .unwrap()
5810 .send()
5811 .await
5812 .unwrap();
5813 assert_eq!(txn.commit(), hash);
5814
5815 let (height, size) = wait_for_decide_on_handle(&mut events, &txn).await;
5817 tx_heights.push(height);
5818 total_transactions += 1;
5819 *sizes.entry(namespace).or_insert(0) += size;
5820 }
5821 }
5822
5823 let duration = start.elapsed();
5824
5825 println!("Time elapsed to submit transactions: {duration:?}");
5826
5827 let last_tx_height = tx_heights.last().unwrap();
5828 for namespace in 1..=4 {
5829 let count = client
5830 .get::<u64>(&format!("node/transactions/count/namespace/{namespace}"))
5831 .send()
5832 .await
5833 .unwrap();
5834 assert_eq!(
5835 count, namespace as u64,
5836 "Incorrect transaction count for namespace {namespace}: expected {namespace}, got \
5837 {count}"
5838 );
5839
5840 let to_endpoint_count = client
5842 .get::<u64>(&format!(
5843 "node/transactions/count/namespace/{namespace}/{last_tx_height}"
5844 ))
5845 .send()
5846 .await
5847 .unwrap();
5848 assert_eq!(
5849 to_endpoint_count, namespace as u64,
5850 "Incorrect transaction count for range endpoint (to only) for namespace \
5851 {namespace}: expected {namespace}, got {to_endpoint_count}"
5852 );
5853
5854 let from_to_endpoint_count = client
5856 .get::<u64>(&format!(
5857 "node/transactions/count/namespace/{namespace}/0/{last_tx_height}"
5858 ))
5859 .send()
5860 .await
5861 .unwrap();
5862 assert_eq!(
5863 from_to_endpoint_count, namespace as u64,
5864 "Incorrect transaction count for range endpoint (from-to) for namespace \
5865 {namespace}: expected {namespace}, got {from_to_endpoint_count}"
5866 );
5867
5868 let ns_size = client
5869 .get::<usize>(&format!("node/payloads/size/namespace/{namespace}"))
5870 .send()
5871 .await
5872 .unwrap();
5873
5874 let expected_ns_size = *sizes.get(&namespace).unwrap();
5875 assert_eq!(
5876 ns_size, expected_ns_size,
5877 "Incorrect payload size for namespace {namespace}: expected {expected_ns_size}, \
5878 got {ns_size}"
5879 );
5880
5881 let ns_size_to = client
5882 .get::<usize>(&format!(
5883 "node/payloads/size/namespace/{namespace}/{last_tx_height}"
5884 ))
5885 .send()
5886 .await
5887 .unwrap();
5888 assert_eq!(
5889 ns_size_to, expected_ns_size,
5890 "Incorrect payload size for namespace {namespace} up to height {last_tx_height}: \
5891 expected {expected_ns_size}, got {ns_size_to}"
5892 );
5893
5894 let ns_size_from_to = client
5895 .get::<usize>(&format!(
5896 "node/payloads/size/namespace/{namespace}/0/{last_tx_height}"
5897 ))
5898 .send()
5899 .await
5900 .unwrap();
5901 assert_eq!(
5902 ns_size_from_to, expected_ns_size,
5903 "Incorrect payload size for namespace {namespace} from 0 to height \
5904 {last_tx_height}: expected {expected_ns_size}, got {ns_size_from_to}"
5905 );
5906 }
5907
5908 let total_tx_count = client
5909 .get::<u64>("node/transactions/count")
5910 .send()
5911 .await
5912 .unwrap();
5913 assert_eq!(
5914 total_tx_count, total_transactions,
5915 "Incorrect total transaction count: expected {total_transactions}, got \
5916 {total_tx_count}"
5917 );
5918
5919 let total_payload_size = client
5920 .get::<usize>("node/payloads/size")
5921 .send()
5922 .await
5923 .unwrap();
5924
5925 let expected_total_size: usize = sizes.values().copied().sum();
5926 assert_eq!(
5927 total_payload_size, expected_total_size,
5928 "Incorrect total payload size: expected {expected_total_size}, got \
5929 {total_payload_size}"
5930 );
5931 }
5932
5933 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5934 async fn test_stream_transactions_endpoint() {
5935 let mut rng = thread_rng();
5941
5942 let port = pick_unused_port().expect("No ports free");
5943
5944 let url = format!("http://localhost:{port}").parse().unwrap();
5945 tracing::info!("Sequencer URL = {url}");
5946 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5947
5948 let options = Options::with_port(port).submit(Default::default());
5949 const NUM_NODES: usize = 2;
5950 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5952
5953 let persistence_options: [_; NUM_NODES] = storage
5954 .iter()
5955 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5956 .collect::<Vec<_>>()
5957 .try_into()
5958 .unwrap();
5959
5960 let network_config = TestConfigBuilder::default().build();
5961
5962 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5963 .api_config(SqlDataSource::options(&storage[0], options))
5964 .network_config(network_config)
5965 .persistences(persistence_options.clone())
5966 .build();
5967 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5968 let mut events = network.server.event_stream().await;
5969 let mut all_transactions = HashMap::new();
5970 let mut namespace_tx: HashMap<_, HashSet<_>> = HashMap::new();
5971
5972 for namespace in 1..=4 {
5975 for _count in 0..namespace {
5976 let payload_len = rng.gen_range(4..=10);
5977 let payload: Vec<u8> = (0..payload_len).map(|_| rng.gen()).collect();
5978
5979 let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
5980
5981 client.connect(None).await;
5982
5983 let hash = client
5984 .post("submit/submit")
5985 .body_json(&txn)
5986 .unwrap()
5987 .send()
5988 .await
5989 .unwrap();
5990 assert_eq!(txn.commit(), hash);
5991
5992 wait_for_decide_on_handle(&mut events, &txn).await;
5994 all_transactions.insert(txn.commit(), txn.clone());
5997 namespace_tx.entry(namespace).or_default().insert(txn);
5998 }
5999 }
6000
6001 let mut transactions = client
6002 .socket("availability/stream/transactions/0")
6003 .subscribe::<TransactionQueryData<SeqTypes>>()
6004 .await
6005 .expect("failed to subscribe to transactions endpoint");
6006
6007 let mut count = 0;
6008 while let Some(tx) = transactions.next().await {
6009 let tx = tx.unwrap();
6010 let expected = all_transactions
6011 .get(&tx.transaction().commit())
6012 .expect("txn not found ");
6013 assert_eq!(tx.transaction(), expected, "invalid transaction");
6014 count += 1;
6015
6016 if count == all_transactions.len() {
6017 break;
6018 }
6019 }
6020
6021 for (namespace, expected_ns_txns) in &namespace_tx {
6024 let mut api_namespace_txns = client
6025 .socket(&format!(
6026 "availability/stream/transactions/0/namespace/{namespace}",
6027 ))
6028 .subscribe::<TransactionQueryData<SeqTypes>>()
6029 .await
6030 .unwrap_or_else(|_| {
6031 panic!("failed to subscribe to transactions namespace {namespace}")
6032 });
6033
6034 let mut received = HashSet::new();
6035
6036 while let Some(res) = api_namespace_txns.next().await {
6037 let tx = res.expect("stream error");
6038 received.insert(tx.transaction().clone());
6039
6040 if received.len() == expected_ns_txns.len() {
6041 break;
6042 }
6043 }
6044
6045 assert_eq!(
6046 received, *expected_ns_txns,
6047 "Mismatched transactions for namespace {namespace}"
6048 );
6049 }
6050 }
6051
6052 #[rstest]
6053 #[case(PosVersionV3::new())]
6054 #[case(PosVersionV4::new())]
6055 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6056 async fn test_v3_and_v4_reward_tree_updates<Ver: Versions>(
6057 #[case] versions: Ver,
6058 ) -> anyhow::Result<()> {
6059 const EPOCH_HEIGHT: u64 = 10;
6069
6070 let network_config = TestConfigBuilder::default()
6071 .epoch_height(EPOCH_HEIGHT)
6072 .build();
6073
6074 let api_port = pick_unused_port().expect("No ports free for query service");
6075
6076 tracing::info!("API PORT = {api_port}");
6077 const NUM_NODES: usize = 5;
6078
6079 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6080 let persistence: [_; NUM_NODES] = storage
6081 .iter()
6082 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6083 .collect::<Vec<_>>()
6084 .try_into()
6085 .unwrap();
6086
6087 let config = TestNetworkConfigBuilder::with_num_nodes()
6088 .api_config(SqlDataSource::options(
6089 &storage[0],
6090 Options::with_port(api_port).catchup(Default::default()),
6091 ))
6092 .network_config(network_config)
6093 .persistences(persistence.clone())
6094 .catchups(std::array::from_fn(|_| {
6095 StatePeers::<StaticVersion<0, 1>>::from_urls(
6096 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6097 Default::default(),
6098 &NoMetrics,
6099 )
6100 }))
6101 .pos_hook::<Ver>(
6102 DelegationConfig::MultipleDelegators,
6103 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6104 )
6105 .await
6106 .unwrap()
6107 .build();
6108 let mut network = TestNetwork::new(config, versions).await;
6109
6110 let mut events = network.peers[2].event_stream().await;
6111 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
6113
6114 let validated_state = network.server.decided_state().await;
6115 let version = Ver::Base::VERSION;
6116 if version == EpochVersion::VERSION {
6117 let v1_tree = &validated_state.reward_merkle_tree_v1;
6118 assert!(v1_tree.num_leaves() > 0, "v1 reward tree tree is empty");
6119 let v2_tree = &validated_state.reward_merkle_tree_v2;
6120 assert!(
6121 v2_tree.num_leaves() == 0,
6122 "v2 reward tree tree is not empty"
6123 );
6124 } else {
6125 let v1_tree = &validated_state.reward_merkle_tree_v1;
6126 assert!(
6127 v1_tree.num_leaves() == 0,
6128 "v1 reward tree tree is not empty"
6129 );
6130 let v2_tree = &validated_state.reward_merkle_tree_v2;
6131 assert!(v2_tree.num_leaves() > 0, "v2 reward tree tree is empty");
6132 }
6133
6134 network.stop_consensus().await;
6135 Ok(())
6136 }
6137
6138 #[rstest]
6139 #[case(PosVersionV3::new())]
6140 #[case(PosVersionV4::new())]
6141 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6142
6143 pub(crate) async fn test_state_cert_query<Ver: Versions>(#[case] versions: Ver) {
6144 const TEST_EPOCH_HEIGHT: u64 = 10;
6145 const TEST_EPOCHS: u64 = 5;
6146
6147 let network_config = TestConfigBuilder::default()
6148 .epoch_height(TEST_EPOCH_HEIGHT)
6149 .build();
6150
6151 let api_port = pick_unused_port().expect("No ports free for query service");
6152
6153 tracing::info!("API PORT = {api_port}");
6154 const NUM_NODES: usize = 2;
6155
6156 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6157 let persistence: [_; NUM_NODES] = storage
6158 .iter()
6159 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6160 .collect::<Vec<_>>()
6161 .try_into()
6162 .unwrap();
6163
6164 let config = TestNetworkConfigBuilder::with_num_nodes()
6165 .api_config(SqlDataSource::options(
6166 &storage[0],
6167 Options::with_port(api_port).catchup(Default::default()),
6168 ))
6169 .network_config(network_config)
6170 .persistences(persistence.clone())
6171 .catchups(std::array::from_fn(|_| {
6172 StatePeers::<StaticVersion<0, 1>>::from_urls(
6173 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6174 Default::default(),
6175 &NoMetrics,
6176 )
6177 }))
6178 .pos_hook::<Ver>(
6179 DelegationConfig::MultipleDelegators,
6180 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6181 )
6182 .await
6183 .unwrap()
6184 .build();
6185
6186 let network = TestNetwork::new(config, versions).await;
6187 let mut events = network.server.event_stream().await;
6188
6189 loop {
6191 let event = events.next().await.unwrap();
6192 tracing::info!("Received event from handle: {event:?}");
6193
6194 if let hotshot::types::EventType::Decide { leaf_chain, .. } = event.event {
6195 println!(
6196 "Decide event received: {:?}",
6197 leaf_chain.first().unwrap().leaf.height()
6198 );
6199 if let Some(first_leaf) = leaf_chain.first() {
6200 let height = first_leaf.leaf.height();
6201 tracing::info!("Decide event received at height: {height}");
6202
6203 if height >= TEST_EPOCHS * TEST_EPOCH_HEIGHT {
6204 break;
6205 }
6206 }
6207 }
6208 }
6209
6210 let client: Client<ServerError, StaticVersion<0, 1>> =
6212 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6213 client.connect(Some(Duration::from_secs(10))).await;
6214
6215 for i in 3..=TEST_EPOCHS {
6217 let state_query_data_v2 = client
6220 .get::<StateCertQueryDataV2<SeqTypes>>(&format!("availability/state-cert-v2/{i}"))
6221 .send()
6222 .await
6223 .unwrap();
6224 let state_cert_v2 = state_query_data_v2.0.clone();
6225 tracing::info!("state_cert_v2: {state_cert_v2:?}");
6226 assert_eq!(state_cert_v2.epoch.u64(), i);
6227 assert_eq!(
6228 state_cert_v2.light_client_state.block_height,
6229 i * TEST_EPOCH_HEIGHT - 5
6230 );
6231 let block_height = state_cert_v2.light_client_state.block_height;
6232
6233 let header: Header = client
6234 .get(&format!("availability/header/{block_height}"))
6235 .send()
6236 .await
6237 .unwrap();
6238
6239 if header.version() == DrbAndHeaderUpgradeVersion::VERSION {
6241 let auth_root = state_cert_v2.auth_root;
6242 let header_auth_root = header.auth_root().unwrap();
6243 if auth_root.is_zero() || header_auth_root.is_zero() {
6244 panic!("auth root shouldn't be zero");
6245 }
6246
6247 assert_eq!(auth_root, header_auth_root, "auth root mismatch");
6248 }
6249
6250 let state_query_data_v1 = client
6252 .get::<StateCertQueryDataV1<SeqTypes>>(&format!("availability/state-cert/{i}"))
6253 .send()
6254 .await
6255 .unwrap();
6256
6257 let state_cert_v1 = state_query_data_v1.0.clone();
6258 tracing::info!("state_cert_v1: {state_cert_v1:?}");
6259 assert_eq!(state_query_data_v1, state_query_data_v2.into());
6260 }
6261 }
6262
6263 #[rstest]
6269 #[case(PosVersionV3::new())]
6270 #[case(PosVersionV4::new())]
6271 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6272 pub(crate) async fn test_state_cert_catchup<Ver: Versions>(#[case] versions: Ver) {
6273 const EPOCH_HEIGHT: u64 = 10;
6274
6275 let network_config = TestConfigBuilder::default()
6276 .epoch_height(EPOCH_HEIGHT)
6277 .build();
6278
6279 let api_port = pick_unused_port().expect("No ports free for query service");
6280
6281 tracing::info!("API PORT = {api_port}");
6282 const NUM_NODES: usize = 5;
6283
6284 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6285 let persistence: [_; NUM_NODES] = storage
6286 .iter()
6287 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6288 .collect::<Vec<_>>()
6289 .try_into()
6290 .unwrap();
6291
6292 let config = TestNetworkConfigBuilder::with_num_nodes()
6293 .api_config(SqlDataSource::options(
6294 &storage[0],
6295 Options::with_port(api_port),
6296 ))
6297 .network_config(network_config)
6298 .persistences(persistence.clone())
6299 .catchups(std::array::from_fn(|_| {
6300 StatePeers::<StaticVersion<0, 1>>::from_urls(
6301 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6302 Default::default(),
6303 &NoMetrics,
6304 )
6305 }))
6306 .pos_hook::<Ver>(
6307 DelegationConfig::MultipleDelegators,
6308 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6309 )
6310 .await
6311 .unwrap()
6312 .build();
6313 let state = config.states()[0].clone();
6314 let mut network = TestNetwork::new(config, versions).await;
6315
6316 let mut events = network.peers[2].event_stream().await;
6317 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
6319
6320 network.peers.remove(0);
6325
6326 let new_storage: hotshot_query_service::data_source::sql::testing::TmpDb =
6327 SqlDataSource::create_storage().await;
6328 let new_persistence: persistence::sql::Options =
6329 <SqlDataSource as TestableSequencerDataSource>::persistence_options(&new_storage);
6330
6331 let node_0_port = pick_unused_port().expect("No ports free for query service");
6332 tracing::info!("node_0_port {node_0_port}");
6333 let opt = Options::with_port(node_0_port).query_sql(
6334 Query {
6335 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
6336 },
6337 tmp_options(&new_storage),
6338 );
6339 let node_0 = opt
6340 .clone()
6341 .serve(|metrics, consumer, storage| {
6342 let cfg = network.cfg.clone();
6343 let new_persistence = new_persistence.clone();
6344 let state = state.clone();
6345 async move {
6346 Ok(cfg
6347 .init_node(
6348 1,
6349 state,
6350 new_persistence.clone(),
6351 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
6352 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6353 Default::default(),
6354 &NoMetrics,
6355 )),
6356 storage,
6357 &*metrics,
6358 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
6359 consumer,
6360 versions,
6361 Default::default(),
6362 )
6363 .await)
6364 }
6365 .boxed()
6366 })
6367 .await
6368 .unwrap();
6369
6370 let mut events = node_0.event_stream().await;
6371 wait_for_epochs(&mut events, EPOCH_HEIGHT, 5).await;
6373
6374 let client: Client<ServerError, StaticVersion<0, 1>> =
6375 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
6376 client.connect(Some(Duration::from_secs(60))).await;
6377
6378 for epoch in 3..=5 {
6379 let state_cert = client
6380 .get::<StateCertQueryDataV2<SeqTypes>>(&format!(
6381 "availability/state-cert-v2/{epoch}"
6382 ))
6383 .send()
6384 .await
6385 .unwrap();
6386 assert_eq!(state_cert.0.epoch.u64(), epoch);
6387 }
6388 }
6389
6390 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6391 async fn test_integration_commission_updates() -> anyhow::Result<()> {
6392 const NUM_NODES: usize = 3;
6393 const EPOCH_HEIGHT: u64 = 10;
6394
6395 let versions = PosVersionV4::new();
6397
6398 let api_port = pick_unused_port().expect("No ports free for query service");
6399
6400 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6402 let persistence: [_; NUM_NODES] = storage
6403 .iter()
6404 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6405 .collect::<Vec<_>>()
6406 .try_into()
6407 .unwrap();
6408
6409 let network_config = TestConfigBuilder::default()
6411 .epoch_height(EPOCH_HEIGHT)
6412 .build();
6413
6414 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
6416 .api_config(SqlDataSource::options(
6417 &storage[0],
6418 Options::with_port(api_port),
6419 ))
6420 .network_config(network_config.clone())
6421 .persistences(persistence.clone())
6422 .catchups(std::array::from_fn(|_| {
6423 StatePeers::<SequencerApiVersion>::from_urls(
6424 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6425 Default::default(),
6426 &NoMetrics,
6427 )
6428 }))
6429 .pos_hook::<PosVersionV4>(
6430 DelegationConfig::NoSelfDelegation,
6432 StakeTableContractVersion::V1, )
6434 .await
6435 .unwrap()
6436 .build();
6437
6438 let network = TestNetwork::new(config, versions).await;
6439 let provider = network.cfg.anvil().unwrap();
6440 let deployer_addr = network.cfg.signer().address();
6441 let mut contracts = network.contracts.unwrap();
6442 let st_addr = contracts.address(Contract::StakeTableProxy).unwrap();
6443 upgrade_stake_table_v2(
6444 provider,
6445 L1Client::new(vec![network.cfg.l1_url()])?,
6446 &mut contracts,
6447 deployer_addr,
6448 deployer_addr,
6449 )
6450 .await?;
6451
6452 let mut commissions = vec![];
6453 for (i, (validator, provider)) in
6454 network_config.validator_providers().into_iter().enumerate()
6455 {
6456 let commission = fetch_commission(provider.clone(), st_addr, validator).await?;
6457 let new_commission = match i {
6458 0 => 0u16,
6459 1 => commission.to_evm() + 500u16,
6460 2 => commission.to_evm() - 100u16,
6461 _ => unreachable!(),
6462 }
6463 .try_into()?;
6464 commissions.push((validator, commission, new_commission));
6465 tracing::info!(%validator, %commission, %new_commission, "Update commission");
6466 update_commission(provider, st_addr, new_commission)
6467 .await?
6468 .get_receipt()
6469 .await?;
6470 }
6471
6472 let current_epoch = network.peers[0]
6474 .decided_leaf()
6475 .await
6476 .epoch(EPOCH_HEIGHT)
6477 .unwrap();
6478 let target_epoch = current_epoch.u64() + 3;
6479 println!("target epoch for new stake table: {target_epoch}");
6480 let mut events = network.peers[0].event_stream().await;
6481 wait_for_epochs(&mut events, EPOCH_HEIGHT, target_epoch).await;
6482
6483 let client: Client<ServerError, SequencerApiVersion> =
6485 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6486 let validators = client
6487 .get::<ValidatorMap>(&format!("node/validators/{}", target_epoch - 1))
6488 .send()
6489 .await
6490 .expect("validators");
6491 assert!(!validators.is_empty());
6492 for (val, old_comm, _) in commissions.clone() {
6493 assert_eq!(validators.get(&val).unwrap().commission, old_comm.to_evm());
6494 }
6495
6496 let client: Client<ServerError, SequencerApiVersion> =
6498 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6499 let validators = client
6500 .get::<ValidatorMap>(&format!("node/validators/{target_epoch}"))
6501 .send()
6502 .await
6503 .expect("validators");
6504 assert!(!validators.is_empty());
6505 for (val, _, new_comm) in commissions.clone() {
6506 assert_eq!(validators.get(&val).unwrap().commission, new_comm.to_evm());
6507 }
6508
6509 let last_block_with_old_commissions = EPOCH_HEIGHT * (target_epoch - 1);
6510 let block_with_new_commissions = EPOCH_HEIGHT * target_epoch;
6511 let mut new_amounts = vec![];
6512 for (val, ..) in commissions {
6513 let before = client
6514 .get::<Option<RewardAmount>>(&format!(
6515 "reward-state-v2/reward-balance/{last_block_with_old_commissions}/{val}"
6516 ))
6517 .send()
6518 .await?
6519 .unwrap();
6520 let after = client
6521 .get::<Option<RewardAmount>>(&format!(
6522 "reward-state-v2/reward-balance/{block_with_new_commissions}/{val}"
6523 ))
6524 .send()
6525 .await?
6526 .unwrap();
6527 new_amounts.push(after - before);
6528 }
6529
6530 let tolerance = U256::from(10 * EPOCH_HEIGHT).into();
6531 assert!(new_amounts[0] < tolerance);
6533
6534 assert!(new_amounts[1] + new_amounts[2] > tolerance);
6536
6537 Ok(())
6538 }
6539
6540 #[rstest]
6541 #[case(PosVersionV3::new())]
6542 #[case(PosVersionV4::new())]
6543 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6544 async fn test_reward_proof_endpoint<Ver: Versions>(
6545 #[case] versions: Ver,
6546 ) -> anyhow::Result<()> {
6547 const EPOCH_HEIGHT: u64 = 10;
6548 const NUM_NODES: usize = 5;
6549
6550 let network_config = TestConfigBuilder::default()
6551 .epoch_height(EPOCH_HEIGHT)
6552 .build();
6553
6554 let api_port = pick_unused_port().expect("No ports free for query service");
6555 println!("API PORT = {api_port}");
6556
6557 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6558 let persistence: [_; NUM_NODES] = storage
6559 .iter()
6560 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6561 .collect::<Vec<_>>()
6562 .try_into()
6563 .unwrap();
6564
6565 let config = TestNetworkConfigBuilder::with_num_nodes()
6566 .api_config(SqlDataSource::options(
6567 &storage[0],
6568 Options::with_port(api_port).catchup(Default::default()),
6569 ))
6570 .network_config(network_config)
6571 .persistences(persistence.clone())
6572 .catchups(std::array::from_fn(|_| {
6573 StatePeers::<StaticVersion<0, 1>>::from_urls(
6574 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6575 Default::default(),
6576 &NoMetrics,
6577 )
6578 }))
6579 .pos_hook::<Ver>(
6580 DelegationConfig::MultipleDelegators,
6581 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6582 )
6583 .await
6584 .unwrap()
6585 .build();
6586
6587 let mut network = TestNetwork::new(config, versions).await;
6588
6589 let mut events = network.server.event_stream().await;
6591 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
6592
6593 let url = format!("http://localhost:{api_port}").parse().unwrap();
6594 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6595
6596 let validated_state = network.server.decided_state().await;
6597 let decided_leaf = network.server.decided_leaf().await;
6598 let height = decided_leaf.height();
6599
6600 if Ver::Base::VERSION == EpochVersion::VERSION {
6602 wait_until_block_height(&client, "reward-state/block-height", height).await;
6604
6605 network.stop_consensus().await;
6606
6607 for (address, _) in validated_state.reward_merkle_tree_v1.iter() {
6608 let (_, expected_proof) = validated_state
6609 .reward_merkle_tree_v1
6610 .lookup(*address)
6611 .expect_ok()
6612 .unwrap();
6613
6614 let res = client
6615 .get::<RewardAccountQueryDataV1>(&format!(
6616 "reward-state/proof/{height}/{address}"
6617 ))
6618 .send()
6619 .await
6620 .unwrap();
6621
6622 match res.proof.proof {
6623 RewardMerkleProofV1::Presence(p) => {
6624 assert_eq!(
6625 p, expected_proof,
6626 "Proof mismatch for V1 at {height}, addr={address}"
6627 );
6628 },
6629 other => panic!(
6630 "Expected Present proof for V1 at {height}, addr={address}, got {other:?}"
6631 ),
6632 }
6633 }
6634 } else {
6635 wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
6637
6638 network.stop_consensus().await;
6639
6640 for (address, _) in validated_state.reward_merkle_tree_v2.iter() {
6641 let (_, expected_proof) = validated_state
6642 .reward_merkle_tree_v2
6643 .lookup(*address)
6644 .expect_ok()
6645 .unwrap();
6646
6647 let res = client
6648 .get::<RewardAccountQueryDataV2>(&format!(
6649 "reward-state-v2/proof/{height}/{address}"
6650 ))
6651 .send()
6652 .await
6653 .unwrap();
6654
6655 match res.proof.proof.clone() {
6656 RewardMerkleProofV2::Presence(p) => {
6657 assert_eq!(
6658 p, expected_proof,
6659 "Proof mismatch for V2 at {height}, addr={address}"
6660 );
6661 },
6662 other => panic!(
6663 "Expected Present proof for V2 at {height}, addr={address}, got {other:?}"
6664 ),
6665 }
6666
6667 let reward_claim_input = client
6668 .get::<RewardClaimInput>(&format!(
6669 "reward-state-v2/reward-claim-input/{height}/{address}"
6670 ))
6671 .send()
6672 .await
6673 .unwrap();
6674
6675 assert_eq!(reward_claim_input, res.to_reward_claim_input()?);
6676 }
6677 }
6678
6679 Ok(())
6680 }
6681
6682 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6683 async fn test_all_validators_endpoint() -> anyhow::Result<()> {
6684 const EPOCH_HEIGHT: u64 = 20;
6685
6686 type V4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
6687
6688 let network_config = TestConfigBuilder::default()
6689 .epoch_height(EPOCH_HEIGHT)
6690 .build();
6691
6692 let api_port = pick_unused_port().expect("No ports free for query service");
6693
6694 const NUM_NODES: usize = 5;
6695
6696 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6697 let persistence: [_; NUM_NODES] = storage
6698 .iter()
6699 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6700 .collect::<Vec<_>>()
6701 .try_into()
6702 .unwrap();
6703
6704 let config = TestNetworkConfigBuilder::with_num_nodes()
6705 .api_config(SqlDataSource::options(
6706 &storage[0],
6707 Options::with_port(api_port),
6708 ))
6709 .network_config(network_config)
6710 .persistences(persistence.clone())
6711 .catchups(std::array::from_fn(|_| {
6712 StatePeers::<StaticVersion<0, 1>>::from_urls(
6713 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6714 Default::default(),
6715 &NoMetrics,
6716 )
6717 }))
6718 .pos_hook::<V4>(DelegationConfig::MultipleDelegators, Default::default())
6719 .await
6720 .unwrap()
6721 .build();
6722
6723 let network = TestNetwork::new(config, V4::new()).await;
6724 let client: Client<ServerError, SequencerApiVersion> =
6725 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6726
6727 let err = client
6728 .get::<Vec<Validator<PubKey>>>("node/all-validators/1/0/1001")
6729 .header("Accept", "application/json")
6730 .send()
6731 .await
6732 .unwrap_err();
6733
6734 assert_matches!(err, ServerError { status, message} if
6735 status == StatusCode::BAD_REQUEST
6736 && message.contains("Limit cannot be greater than 1000")
6737 );
6738
6739 let mut events = network.peers[0].event_stream().await;
6741 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
6742
6743 {
6745 client
6746 .get::<Vec<Validator<PubKey>>>("node/all-validators/1/0/100")
6747 .send()
6748 .await
6749 .unwrap()
6750 .is_empty();
6751
6752 client
6753 .get::<Vec<Validator<PubKey>>>("node/all-validators/2/0/100")
6754 .send()
6755 .await
6756 .unwrap()
6757 .is_empty();
6758 }
6759
6760 let validators = client
6762 .get::<Vec<Validator<PubKey>>>("node/all-validators/3/0/100")
6763 .send()
6764 .await
6765 .expect("validators");
6766
6767 assert!(!validators.is_empty());
6768
6769 Ok(())
6770 }
6771
6772 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6773 async fn test_reward_accounts_catchup_endpoint() -> anyhow::Result<()> {
6774 const EPOCH_HEIGHT: u64 = 10;
6775 const NUM_NODES: usize = 3;
6776
6777 let network_config = TestConfigBuilder::default()
6778 .epoch_height(EPOCH_HEIGHT)
6779 .build();
6780
6781 let api_port = pick_unused_port().expect("No ports free for query service");
6782 println!("API PORT = {api_port}");
6783
6784 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6785 let persistence: [_; NUM_NODES] = storage
6786 .iter()
6787 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6788 .collect::<Vec<_>>()
6789 .try_into()
6790 .unwrap();
6791
6792 let config = TestNetworkConfigBuilder::with_num_nodes()
6793 .api_config(SqlDataSource::options(
6794 &storage[0],
6795 Options::with_port(api_port).catchup(Default::default()),
6796 ))
6797 .network_config(network_config)
6798 .persistences(persistence.clone())
6799 .catchups(std::array::from_fn(|_| {
6800 StatePeers::<StaticVersion<0, 1>>::from_urls(
6801 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6802 Default::default(),
6803 &NoMetrics,
6804 )
6805 }))
6806 .pos_hook::<PosVersionV4>(
6807 DelegationConfig::MultipleDelegators,
6808 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6809 )
6810 .await
6811 .unwrap()
6812 .build();
6813
6814 let mut network = TestNetwork::new(config, PosVersionV4::new()).await;
6815
6816 let client: Client<ServerError, StaticVersion<0, 1>> =
6817 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6818
6819 client.connect(None).await;
6820
6821 let mut events = network.server.event_stream().await;
6822 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
6823
6824 network.stop_consensus().await;
6825 let height = network.server.decided_leaf().await.height();
6826 wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
6827
6828 let err = client
6829 .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
6830 "catchup/{height}/reward-amounts/10001/0"
6831 ))
6832 .send()
6833 .await
6834 .unwrap_err();
6835
6836 assert_matches!(err, ServerError { status, .. } if
6837 status == StatusCode::BAD_REQUEST
6838
6839 );
6840
6841 let mut expected: Vec<_> = network
6842 .server
6843 .decided_state()
6844 .await
6845 .reward_merkle_tree_v2
6846 .iter()
6847 .map(|(addr, amt)| (*addr, *amt))
6848 .collect();
6849 expected.sort_by_key(|(acct, _)| std::cmp::Reverse(*acct));
6851
6852 tracing::info!("expected accounts = {expected:?}");
6853 let limit = expected.len().min(10_000) as u64;
6854 let offset = 0u64;
6855 let expected: Vec<_> = expected.into_iter().take(limit as usize).collect();
6856
6857 let res = client
6858 .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
6859 "catchup/{height}/reward-amounts/{limit}/{offset}"
6860 ))
6861 .send()
6862 .await
6863 .unwrap();
6864
6865 assert_eq!(res, expected);
6866
6867 Ok(())
6868 }
6869
6870 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6871 async fn test_get_all_reward_accounts_multiple_cases() -> anyhow::Result<()> {
6872 let storage = SqlDataSource::create_storage().await;
6873 let sql_options = tmp_options(&storage);
6874 let db = SqlStorage::connect(
6875 Config::try_from(&sql_options)?,
6876 StorageConnectionType::Sequencer,
6877 )
6878 .await?;
6879
6880 let validated_state = ValidatedState::default();
6881 let instance_state =
6882 NodeState::mock().with_genesis_version(DrbAndHeaderUpgradeVersion::version());
6883 let genesis_leaf = LeafQueryData::<SeqTypes>::genesis::<
6884 SequencerVersions<DrbAndHeaderUpgradeVersion, DrbAndHeaderUpgradeVersion>,
6885 >(&validated_state, &instance_state)
6886 .await;
6887
6888 let mut reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
6889
6890 let account1 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000001")?;
6891 let account2 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000002")?;
6892 let account3 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000003")?;
6893 let account4 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000004")?;
6894
6895 let accounts_height_5 = vec![
6897 (account1, RewardAmount::from(1000u64)),
6898 (account2, RewardAmount::from(2000u64)),
6899 ];
6900
6901 let accounts_height_10 = vec![
6902 (account1, RewardAmount::from(1500u64)),
6903 (account3, RewardAmount::from(3000u64)),
6904 ];
6905
6906 let accounts_height_15 = vec![
6907 (account2, RewardAmount::from(2500u64)),
6908 (account4, RewardAmount::from(4000u64)),
6909 ];
6910
6911 let mut tx = db.write().await?;
6912
6913 let header_json = serde_json::to_value(genesis_leaf.header())?;
6914
6915 for height in [5i64, 10, 15, 16] {
6916 query(
6917 "INSERT INTO header (height, hash, payload_hash, timestamp, data)
6918 VALUES ($1, $2, $3, $4, $5)",
6919 )
6920 .bind(height)
6921 .bind(format!("hash_{height}"))
6922 .bind("payload_hash")
6923 .bind(0i64)
6924 .bind(&header_json)
6925 .execute(tx.as_mut())
6926 .await?;
6927 }
6928
6929 for (height, accounts) in [
6930 (5u64, &accounts_height_5),
6931 (10, &accounts_height_10),
6932 (15, &accounts_height_15),
6933 ] {
6934 for (account, balance) in accounts {
6935 reward_tree.update(*account, *balance)?;
6936
6937 let (_, proof) = reward_tree.lookup(*account).expect_ok().unwrap();
6938
6939 let traversal_path = <RewardAccountV2 as ToTraversalPath<
6940 { RewardMerkleTreeV2::ARITY },
6941 >>::to_traversal_path(
6942 account, reward_tree.height()
6943 );
6944
6945 UpdateStateData::<
6946 SeqTypes,
6947 RewardMerkleTreeV2,
6948 { RewardMerkleTreeV2::ARITY },
6949 >::insert_merkle_nodes(&mut tx, proof, traversal_path, height)
6950 .await?;
6951 }
6952 }
6953
6954 UpdateStateData::<
6955 SeqTypes,
6956 RewardMerkleTreeV2,
6957 { RewardMerkleTreeV2::ARITY },
6958 >::set_last_state_height(&mut tx, 15)
6959 .await?;
6960
6961 tx.commit().await?;
6962
6963 let result_height_5 = db.get_all_reward_accounts(5, 0, 100).await?;
6964 assert_eq!(result_height_5.len(), 2,);
6965 for (account, balance) in &accounts_height_5 {
6966 assert!(result_height_5
6967 .iter()
6968 .any(|(acc, bal)| acc == account && bal == balance),);
6969 }
6970
6971 let result_height_10 = db.get_all_reward_accounts(10, 0, 100).await?;
6972 assert_eq!(result_height_10.len(), 3,);
6973
6974 let expected_at_height_10 = vec![
6977 (account1, RewardAmount::from(1500u64)),
6978 (account2, RewardAmount::from(2000u64)),
6979 (account3, RewardAmount::from(3000u64)),
6980 ];
6981 for (account, balance) in &expected_at_height_10 {
6982 assert!(result_height_10
6983 .iter()
6984 .any(|(acc, bal)| acc == account && bal == balance),);
6985 }
6986
6987 let result_height_15 = db.get_all_reward_accounts(15, 0, 100).await?;
6988 assert_eq!(result_height_15.len(), 4,);
6989
6990 let expected_at_height_15 = vec![
6992 (account1, RewardAmount::from(1500u64)),
6993 (account2, RewardAmount::from(2500u64)),
6994 (account3, RewardAmount::from(3000u64)),
6995 (account4, RewardAmount::from(4000u64)),
6996 ];
6997 for (account, balance) in &expected_at_height_15 {
6998 assert!(result_height_15
6999 .iter()
7000 .any(|(acc, bal)| acc == account && bal == balance),);
7001 }
7002
7003 let result_limit_2 = db.get_all_reward_accounts(15, 0, 2).await?;
7006 assert_eq!(result_limit_2.len(), 2);
7007 assert_eq!(result_limit_2[0], (account4, RewardAmount::from(4000u64)));
7008 assert_eq!(result_limit_2[1], (account3, RewardAmount::from(3000u64)));
7009
7010 let result_offset_2 = db.get_all_reward_accounts(15, 2, 2).await?;
7011 assert_eq!(result_offset_2.len(), 2);
7012 assert_eq!(result_offset_2[0], (account2, RewardAmount::from(2500u64)));
7013 assert_eq!(result_offset_2[1], (account1, RewardAmount::from(1500u64)));
7014
7015 Ok(())
7016 }
7017
7018 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7021 async fn test_get_all_reward_accounts_check_state_height() -> anyhow::Result<()> {
7022 let storage = SqlDataSource::create_storage().await;
7023 let sql_options = tmp_options(&storage);
7024 let db = SqlStorage::connect(
7025 Config::try_from(&sql_options)?,
7026 StorageConnectionType::Sequencer,
7027 )
7028 .await?;
7029
7030 let validated_state = ValidatedState::default();
7031 let instance_state =
7032 NodeState::mock().with_genesis_version(DrbAndHeaderUpgradeVersion::version());
7033 let genesis_leaf = LeafQueryData::<SeqTypes>::genesis::<
7034 SequencerVersions<DrbAndHeaderUpgradeVersion, DrbAndHeaderUpgradeVersion>,
7035 >(&validated_state, &instance_state)
7036 .await;
7037
7038 let mut reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
7039 let account1 = RewardAccountV2::from_str("0x0000000000000000000000000000000000000001")?;
7040
7041 let mut tx = db.write().await?;
7042
7043 let header_json = serde_json::to_value(genesis_leaf.header())?;
7044
7045 for height in [5i64, 10, 15, 20] {
7046 query(
7047 "INSERT INTO header (height, hash, payload_hash, timestamp, data)
7048 VALUES ($1, $2, $3, $4, $5)",
7049 )
7050 .bind(height)
7051 .bind(format!("hash_{height}"))
7052 .bind("payload_hash")
7053 .bind(0i64)
7054 .bind(&header_json)
7055 .execute(tx.as_mut())
7056 .await?;
7057 }
7058
7059 reward_tree.update(account1, RewardAmount::from(1000u64))?;
7061 let (_, proof) = reward_tree.lookup(account1).expect_ok().unwrap();
7062 let traversal_path =
7063 <RewardAccountV2 as ToTraversalPath<{ RewardMerkleTreeV2::ARITY }>>::to_traversal_path(
7064 &account1,
7065 reward_tree.height(),
7066 );
7067 UpdateStateData::<
7068 SeqTypes,
7069 RewardMerkleTreeV2,
7070 { RewardMerkleTreeV2::ARITY },
7071 >::insert_merkle_nodes(&mut tx, proof, traversal_path, 5)
7072 .await?;
7073
7074 UpdateStateData::<
7077 SeqTypes,
7078 RewardMerkleTreeV2,
7079 { RewardMerkleTreeV2::ARITY },
7080 >::set_last_state_height(&mut tx, 10)
7081 .await?;
7082
7083 tx.commit().await?;
7084
7085 let result = db.get_all_reward_accounts(5, 0, 100).await;
7087 assert!(result.is_ok());
7088 assert_eq!(result.unwrap().len(), 1);
7089
7090 let result = db.get_all_reward_accounts(10, 0, 100).await;
7092 assert!(result.is_ok());
7093
7094 let result = db.get_all_reward_accounts(15, 0, 100).await;
7097 assert!(result.is_err());
7098 let err = result.unwrap_err().to_string();
7099 assert!(err.contains("not yet available"));
7100
7101 Ok(())
7102 }
7103
7104 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7105 async fn test_namespace_query_compat_v0_2() {
7106 test_namespace_query_compat_helper(SequencerVersions::<FeeVersion, FeeVersion>::new())
7107 .await;
7108 }
7109
7110 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7111 async fn test_namespace_query_compat_v0_3() {
7112 test_namespace_query_compat_helper(SequencerVersions::<EpochVersion, EpochVersion>::new())
7113 .await;
7114 }
7115
7116 async fn test_namespace_query_compat_helper<V: Versions>(v: V) {
7117 const NUM_NODES: usize = 5;
7119
7120 let port = pick_unused_port().expect("No ports free");
7121 let url: Url = format!("http://localhost:{port}").parse().unwrap();
7122
7123 let test_config = TestConfigBuilder::default().build();
7124 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7125 .api_config(Options::from(options::Http {
7126 port,
7127 max_connections: None,
7128 }))
7129 .catchups(std::array::from_fn(|_| {
7130 StatePeers::<SequencerApiVersion>::from_urls(
7131 vec![url.clone()],
7132 Default::default(),
7133 &NoMetrics,
7134 )
7135 }))
7136 .network_config(test_config)
7137 .build();
7138
7139 let mut network = TestNetwork::new(config, v).await;
7140 let mut events = network.server.event_stream().await;
7141
7142 let ns = NamespaceId::from(10_000u64);
7144 let tx = Transaction::new(ns, vec![1, 2, 3]);
7145 network.server.submit_transaction(tx.clone()).await.unwrap();
7146 let block = wait_for_decide_on_handle(&mut events, &tx).await.0;
7147
7148 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7150 client.connect(None).await;
7151
7152 let (header, common): (Header, VidCommonQueryData<SeqTypes>) = try_join!(
7153 client.get(&format!("availability/header/{block}")).send(),
7154 client
7155 .get(&format!("availability/vid/common/{block}"))
7156 .send()
7157 )
7158 .unwrap();
7159 let version = header.version();
7160
7161 for api_ver in ["/v1", ""] {
7164 tracing::info!("test namespace API version: {api_ver}");
7165
7166 let ns_proof: NamespaceProofQueryData = client
7167 .get(&format!(
7168 "{api_ver}/availability/block/{block}/namespace/{ns}"
7169 ))
7170 .send()
7171 .await
7172 .unwrap();
7173 let proof = ns_proof.proof.as_ref().unwrap();
7174 if version < EpochVersion::version() {
7175 assert!(matches!(proof, NsProof::V0(..)));
7176 } else {
7177 assert!(matches!(proof, NsProof::V1(..)));
7178 }
7179 let (txs, ns_from_proof) = proof
7180 .verify(
7181 header.ns_table(),
7182 &header.payload_commitment(),
7183 common.common(),
7184 )
7185 .unwrap();
7186 assert_eq!(ns_from_proof, ns);
7187 assert_eq!(txs, ns_proof.transactions);
7188 assert_eq!(txs, std::slice::from_ref(&tx));
7189
7190 let ns_proofs: Vec<NamespaceProofQueryData> = client
7192 .get(&format!(
7193 "{api_ver}/availability/block/{}/{}/namespace/{ns}",
7194 block,
7195 block + 1
7196 ))
7197 .send()
7198 .await
7199 .unwrap();
7200 assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
7201
7202 let ns_proof: NamespaceProofQueryData = client
7204 .get(&format!(
7205 "{api_ver}/availability/block/{}/namespace/{ns}",
7206 block - 1
7207 ))
7208 .send()
7209 .await
7210 .unwrap();
7211 assert_eq!(ns_proof.proof, None);
7212 assert_eq!(ns_proof.transactions, vec![]);
7213
7214 let mut proofs = client
7216 .socket(&format!(
7217 "{api_ver}/availability/stream/blocks/0/namespace/{ns}"
7218 ))
7219 .subscribe()
7220 .await
7221 .unwrap();
7222 for i in 0.. {
7223 tracing::info!(i, "stream proof");
7224 let proof: NamespaceProofQueryData = proofs.next().await.unwrap().unwrap();
7225 if proof.proof.is_none() {
7226 tracing::info!("waiting for non-trivial proof from stream");
7227 continue;
7228 }
7229 assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
7230 break;
7231 }
7232 }
7233
7234 tracing::info!("test namespace API version: v0");
7236 if version < EpochVersion::version() {
7237 let ns_proof: ADVZNamespaceProofQueryData = client
7238 .get(&format!("v0/availability/block/{block}/namespace/{ns}"))
7239 .send()
7240 .await
7241 .unwrap();
7242 let proof = ns_proof.proof.as_ref().unwrap();
7243 let VidCommon::V0(common) = common.common() else {
7244 panic!("wrong VID common version");
7245 };
7246 let (txs, ns_from_proof) = proof
7247 .verify(header.ns_table(), &header.payload_commitment(), common)
7248 .unwrap();
7249 assert_eq!(ns_from_proof, ns);
7250 assert_eq!(txs, ns_proof.transactions);
7251 assert_eq!(&txs, std::slice::from_ref(&tx));
7252
7253 let ns_proofs: Vec<ADVZNamespaceProofQueryData> = client
7255 .get(&format!(
7256 "v0/availability/block/{}/{}/namespace/{ns}",
7257 block,
7258 block + 1
7259 ))
7260 .send()
7261 .await
7262 .unwrap();
7263 assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
7264 } else {
7265 client
7267 .get::<ADVZNamespaceProofQueryData>(&format!(
7268 "v0/availability/block/{block}/namespace/{ns}"
7269 ))
7270 .send()
7271 .await
7272 .unwrap_err();
7273 }
7274
7275 let ns_proof: ADVZNamespaceProofQueryData = client
7277 .get(&format!(
7278 "v0/availability/block/{}/namespace/{ns}",
7279 block - 1
7280 ))
7281 .send()
7282 .await
7283 .unwrap();
7284 assert_eq!(ns_proof.proof, None);
7285 assert_eq!(ns_proof.transactions, vec![]);
7286
7287 let mut proofs = client
7290 .socket(&format!("v0/availability/stream/blocks/0/namespace/{ns}"))
7291 .subscribe()
7292 .await
7293 .unwrap();
7294 for i in 0.. {
7295 tracing::info!(i, "stream proof");
7296 let proof: ADVZNamespaceProofQueryData = match proofs.next().await {
7297 Some(proof) => proof.unwrap(),
7298 None => {
7299 assert!(
7301 version >= EpochVersion::version(),
7302 "legacy steam ended while still on legacy consensus"
7303 );
7304 break;
7305 },
7306 };
7307 if proof.proof.is_none() {
7308 tracing::info!("waiting for non-trivial proof from stream");
7309 continue;
7310 }
7311 assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
7312 break;
7313 }
7314
7315 network.server.shut_down().await;
7316 }
7317
7318 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7319 async fn test_light_client_completeness() {
7320 const NUM_NODES: usize = 1;
7324 const EPOCH_HEIGHT: u64 = 200;
7325
7326 let upgrade_version = EpochVersion::version();
7327 let port = pick_unused_port().expect("No ports free");
7328 let url: Url = format!("http://localhost:{port}").parse().unwrap();
7329
7330 let test_config = TestConfigBuilder::default()
7331 .epoch_height(EPOCH_HEIGHT)
7332 .epoch_start_block(321)
7333 .set_upgrades(upgrade_version)
7334 .await
7335 .build();
7336
7337 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7338 let persistence: [_; NUM_NODES] = storage
7339 .iter()
7340 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7341 .collect::<Vec<_>>()
7342 .try_into()
7343 .unwrap();
7344
7345 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7346 .api_config(
7347 SqlDataSource::options(&storage[0], Options::with_port(port))
7348 .light_client(Default::default()),
7349 )
7350 .persistences(persistence.clone())
7351 .catchups(std::array::from_fn(|_| {
7352 StatePeers::<SequencerApiVersion>::from_urls(
7353 vec![url.clone()],
7354 Default::default(),
7355 &NoMetrics,
7356 )
7357 }))
7358 .network_config(test_config)
7359 .build();
7360
7361 let mut network = TestNetwork::new(
7362 config,
7363 SequencerVersions::<LegacyVersion, EpochVersion>::new(),
7364 )
7365 .await;
7366 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7367 client.connect(None).await;
7368
7369 let mut actual_leaves = vec![];
7372 let mut actual_blocks = vec![];
7373 let mut leaves = client
7374 .socket("availability/stream/leaves/0")
7375 .subscribe::<LeafQueryData<SeqTypes>>()
7376 .await
7377 .unwrap()
7378 .zip(
7379 client
7380 .socket("availability/stream/blocks/0")
7381 .subscribe::<BlockQueryData<SeqTypes>>()
7382 .await
7383 .unwrap(),
7384 )
7385 .map(|(leaf, block)| {
7386 let leaf = leaf.unwrap();
7387 let block = block.unwrap();
7388 actual_leaves.push(leaf.clone());
7389 actual_blocks.push(block);
7390 leaf
7391 });
7392
7393 let (upgrade_height, first_epoch) = loop {
7395 let leaf: LeafQueryData<SeqTypes> = leaves.next().await.unwrap();
7396 if leaf.header().version() < EpochVersion::version() {
7397 tracing::info!(version = %leaf.header().version(), height = leaf.header().height(), view = ?leaf.leaf().view_number(), "waiting for epoch upgrade");
7398 continue;
7399 }
7400 break (leaf.height(), leaf.leaf().epoch(EPOCH_HEIGHT).unwrap());
7401 };
7402 tracing::info!(upgrade_height, ?first_epoch, "epochs enabled");
7403
7404 let mut epoch_heights = [0; 2];
7407 for (i, epoch_height) in epoch_heights.iter_mut().enumerate() {
7408 let desired_epoch = first_epoch + (i as u64) + 1;
7409 *epoch_height = loop {
7410 let leaf = leaves.next().await.unwrap();
7411 let epoch = leaf.leaf().epoch(EPOCH_HEIGHT).unwrap();
7412 if epoch > desired_epoch {
7413 tracing::info!(
7414 height = leaf.height(),
7415 ?desired_epoch,
7416 ?epoch,
7417 "changed epoch"
7418 );
7419 break leaf.height();
7420 }
7421 tracing::info!(
7422 ?desired_epoch,
7423 height = leaf.header().height(),
7424 view = ?leaf.leaf().view_number(),
7425 "waiting for epoch change"
7426 );
7427 };
7428 }
7429
7430 let max_block = epoch_heights[1] + 1;
7432 loop {
7433 let leaf = leaves.next().await.unwrap();
7434 if leaf.height() > max_block {
7435 break;
7436 }
7437 tracing::info!(max_block, height = leaf.height(), "waiting for block");
7438 }
7439
7440 network.stop_consensus().await;
7443
7444 let heights =
7447 (0..=1)
7449 .chain(upgrade_height-1..=upgrade_height+1)
7451 .chain(epoch_heights[0]-1..=epoch_heights[0] + 1)
7453 .chain(epoch_heights[1]-1..=max_block);
7455
7456 let quorum = EpochChangeQuorum::new(EPOCH_HEIGHT);
7457 for i in heights {
7458 let leaf = &actual_leaves[i as usize];
7459 let block = &actual_blocks[i as usize];
7460 tracing::info!(i, ?leaf, ?block, "check leaf");
7461
7462 let client = &client;
7464 let proofs = try_join_all(
7465 [
7466 format!("light-client/leaf/{i}"),
7467 format!("light-client/leaf/hash/{}", leaf.hash()),
7468 format!("light-client/leaf/block-hash/{}", leaf.block_hash()),
7469 ]
7470 .into_iter()
7471 .map(|path| async move {
7472 tracing::info!(i, path, "fetch leaf proof");
7473 let proof = client.get::<LeafProof>(&path).send().await?;
7474 Ok::<_, anyhow::Error>((path, proof))
7475 }),
7476 )
7477 .await
7478 .unwrap();
7479
7480 for (path, proof) in proofs {
7482 tracing::info!(i, path, ?proof, "check leaf proof");
7483 assert_eq!(
7484 proof.verify(LeafProofHint::Quorum(&quorum)).await.unwrap(),
7485 *leaf
7486 );
7487 }
7488
7489 let root_height = i + 1;
7491 let root = actual_leaves[root_height as usize].header();
7492 let proofs = try_join_all(
7493 [
7494 format!("light-client/header/{root_height}/{i}"),
7495 format!(
7496 "light-client/header/{root_height}/hash/{}",
7497 leaf.block_hash()
7498 ),
7499 ]
7500 .into_iter()
7501 .map(|path| async move {
7502 tracing::info!(i, path, "get header proof");
7503 let proof = client.get::<HeaderProof>(&path).send().await?;
7504 Ok::<_, anyhow::Error>((path, proof))
7505 }),
7506 )
7507 .await
7508 .unwrap();
7509 for (path, proof) in proofs {
7510 tracing::info!(i, path, ?proof, "check header proof");
7511 assert_eq!(
7512 proof.verify_ref(root.block_merkle_tree_root()).unwrap(),
7513 leaf.header()
7514 );
7515 }
7516
7517 let proof = client
7519 .get::<PayloadProof>(&format!("light-client/payload/{i}"))
7520 .send()
7521 .await
7522 .unwrap();
7523 assert_eq!(proof.verify(leaf.header()).unwrap(), *block.payload());
7524 }
7525
7526 let events: Vec<StakeTableEvent> = client
7528 .get(&format!("light-client/stake-table/{}", first_epoch + 2))
7529 .send()
7530 .await
7531 .unwrap();
7532 let mut state_from_events = StakeTableState::default();
7533 for event in events {
7534 state_from_events.apply_event(event).unwrap().unwrap();
7535 }
7536
7537 assert_eq!(
7538 state_from_events.into_validators(),
7539 network
7540 .server
7541 .consensus()
7542 .read()
7543 .await
7544 .storage()
7545 .load_all_validators(first_epoch + 2, 0, 1_000_000)
7546 .await
7547 .unwrap()
7548 .into_iter()
7549 .map(|v| (v.account, v))
7550 .collect::<ValidatorMap>()
7551 );
7552
7553 let err = client
7555 .get::<Vec<StakeTableEvent>>(&format!("light-client/stake-table/{}", first_epoch + 1))
7556 .send()
7557 .await
7558 .unwrap_err();
7559 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
7560 }
7561}