1use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
2
3use anyhow::{bail, Context};
4use async_lock::RwLock;
5use async_once_cell::Lazy;
6use async_trait::async_trait;
7use committable::Commitment;
8use data_source::{
9 CatchupDataSource, RequestResponseDataSource, StakeTableDataSource, StakeTableWithEpochNumber,
10 SubmitDataSource,
11};
12use derivative::Derivative;
13use espresso_types::{
14 config::PublicNetworkConfig,
15 retain_accounts,
16 v0::traits::SequencerPersistence,
17 v0_3::{ChainConfig, RewardAccountV1, RewardAmount, RewardMerkleTreeV1},
18 v0_4::{RewardAccountV2, RewardMerkleTreeV2},
19 AccountQueryData, BlockMerkleTree, FeeAccount, FeeMerkleTree, Leaf2, NodeState, PubKey,
20 Transaction, ValidatorMap,
21};
22use futures::{
23 future::{BoxFuture, Future, FutureExt},
24 stream::BoxStream,
25};
26use hotshot_events_service::events_source::{
27 EventFilterSet, EventsSource, EventsStreamer, StartupInfo,
28};
29use hotshot_query_service::{
30 availability::VidCommonQueryData, data_source::ExtensibleDataSource, VidCommon,
31};
32use hotshot_types::{
33 data::{EpochNumber, VidCommitment, VidShare, ViewNumber},
34 event::{Event, LegacyEvent},
35 light_client::LCV3StateSignatureRequestBody,
36 network::NetworkConfig,
37 traits::{
38 network::ConnectedNetwork,
39 node_implementation::{NodeType, Versions},
40 },
41 vid::avidm::{init_avidm_param, AvidMScheme},
42 vote::HasViewNumber,
43 PeerConfig,
44};
45use itertools::Itertools;
46use jf_merkle_tree::MerkleTreeScheme;
47use rand::Rng;
48use request_response::RequestType;
49use tokio::time::timeout;
50
51use self::data_source::{HotShotConfigDataSource, NodeStateDataSource, StateSignatureDataSource};
52use crate::{
53 catchup::{
54 add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
55 add_v2_reward_accounts_to_state, CatchupStorage,
56 },
57 context::Consensus,
58 request_response::{
59 data_source::{retain_v1_reward_accounts, retain_v2_reward_accounts},
60 request::{Request, Response},
61 },
62 state_signature::StateSigner,
63 SeqTypes, SequencerApiVersion, SequencerContext,
64};
65
66pub mod data_source;
67pub mod endpoints;
68pub mod fs;
69pub mod options;
70pub mod sql;
71mod update;
72
73pub use options::Options;
74
75pub type BlocksFrontier = <BlockMerkleTree as MerkleTreeScheme>::MembershipProof;
76
77type BoxLazy<T> = Pin<Arc<Lazy<T, BoxFuture<'static, T>>>>;
78
79#[derive(Derivative)]
80#[derivative(Clone(bound = ""), Debug(bound = ""))]
81struct ApiState<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> {
82 #[derivative(Debug = "ignore")]
87 sequencer_context: BoxLazy<SequencerContext<N, P, V>>,
88}
89
90impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> ApiState<N, P, V> {
91 fn new(context_init: impl Future<Output = SequencerContext<N, P, V>> + Send + 'static) -> Self {
92 Self {
93 sequencer_context: Arc::pin(Lazy::from_future(context_init.boxed())),
94 }
95 }
96
97 async fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
98 self.sequencer_context
99 .as_ref()
100 .get()
101 .await
102 .get_ref()
103 .state_signer()
104 }
105
106 async fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
107 self.sequencer_context
108 .as_ref()
109 .get()
110 .await
111 .get_ref()
112 .event_streamer()
113 }
114
115 async fn consensus(&self) -> Arc<RwLock<Consensus<N, P, V>>> {
116 self.sequencer_context
117 .as_ref()
118 .get()
119 .await
120 .get_ref()
121 .consensus()
122 }
123
124 async fn network_config(&self) -> NetworkConfig<SeqTypes> {
125 self.sequencer_context
126 .as_ref()
127 .get()
128 .await
129 .get_ref()
130 .network_config()
131 }
132}
133
134type StorageState<N, P, D, V> = ExtensibleDataSource<D, ApiState<N, P, V>>;
135
136#[async_trait]
137impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> EventsSource<SeqTypes>
138 for ApiState<N, P, V>
139{
140 type EventStream = BoxStream<'static, Arc<Event<SeqTypes>>>;
141 type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<SeqTypes>>>;
142
143 async fn get_event_stream(
144 &self,
145 _filter: Option<EventFilterSet<SeqTypes>>,
146 ) -> Self::EventStream {
147 self.event_streamer()
148 .await
149 .read()
150 .await
151 .get_event_stream(None)
152 .await
153 }
154
155 async fn get_legacy_event_stream(
156 &self,
157 _filter: Option<EventFilterSet<SeqTypes>>,
158 ) -> Self::LegacyEventStream {
159 self.event_streamer()
160 .await
161 .read()
162 .await
163 .get_legacy_event_stream(None)
164 .await
165 }
166
167 async fn get_startup_info(&self) -> StartupInfo<SeqTypes> {
168 self.event_streamer()
169 .await
170 .read()
171 .await
172 .get_startup_info()
173 .await
174 }
175}
176
177impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, V: Versions, P: SequencerPersistence>
178 SubmitDataSource<N, P> for StorageState<N, P, D, V>
179{
180 async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
181 self.as_ref().submit(tx).await
182 }
183}
184
185impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
186 StakeTableDataSource<SeqTypes> for StorageState<N, P, D, V>
187{
188 async fn get_stake_table(
190 &self,
191 epoch: Option<<SeqTypes as NodeType>::Epoch>,
192 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
193 self.as_ref().get_stake_table(epoch).await
194 }
195
196 async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
198 self.as_ref().get_stake_table_current().await
199 }
200
201 async fn get_validators(
203 &self,
204 epoch: <SeqTypes as NodeType>::Epoch,
205 ) -> anyhow::Result<ValidatorMap> {
206 self.as_ref().get_validators(epoch).await
207 }
208
209 async fn get_block_reward(
210 &self,
211 epoch: Option<EpochNumber>,
212 ) -> anyhow::Result<Option<RewardAmount>> {
213 self.as_ref().get_block_reward(epoch).await
214 }
215 async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
217 self.as_ref().current_proposal_participation().await
218 }
219 async fn previous_proposal_participation(&self) -> HashMap<PubKey, f64> {
220 self.as_ref().previous_proposal_participation().await
221 }
222}
223
224impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
225 StakeTableDataSource<SeqTypes> for ApiState<N, P, V>
226{
227 async fn get_stake_table(
229 &self,
230 epoch: Option<<SeqTypes as NodeType>::Epoch>,
231 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
232 let highest_epoch = self
233 .consensus()
234 .await
235 .read()
236 .await
237 .cur_epoch()
238 .await
239 .map(|e| e + 1);
240 if epoch > highest_epoch {
241 return Err(anyhow::anyhow!(
242 "requested stake table for epoch {epoch:?} is beyond the current epoch + 1 \
243 {highest_epoch:?}"
244 ));
245 }
246 let mem = self
247 .consensus()
248 .await
249 .read()
250 .await
251 .membership_coordinator
252 .stake_table_for_epoch(epoch)
253 .await?;
254
255 Ok(mem.stake_table().await.0)
256 }
257
258 async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
260 let epoch = self.consensus().await.read().await.cur_epoch().await;
261
262 Ok(StakeTableWithEpochNumber {
263 epoch,
264 stake_table: self.get_stake_table(epoch).await?,
265 })
266 }
267
268 async fn get_block_reward(
269 &self,
270 epoch: Option<EpochNumber>,
271 ) -> anyhow::Result<Option<RewardAmount>> {
272 let coordinator = self
273 .consensus()
274 .await
275 .read()
276 .await
277 .membership_coordinator
278 .clone();
279
280 let membership = coordinator.membership().read().await;
281
282 Ok(membership.block_reward(epoch))
283 }
284
285 async fn get_validators(
287 &self,
288 epoch: <SeqTypes as NodeType>::Epoch,
289 ) -> anyhow::Result<ValidatorMap> {
290 let mem = self
291 .consensus()
292 .await
293 .read()
294 .await
295 .membership_coordinator
296 .membership_for_epoch(Some(epoch))
297 .await
298 .context("membership not found")?;
299
300 let r = mem.coordinator.membership().read().await;
301 r.active_validators(&epoch)
302 }
303
304 async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
306 self.consensus()
307 .await
308 .read()
309 .await
310 .consensus()
311 .read()
312 .await
313 .current_proposal_participation()
314 }
315
316 async fn previous_proposal_participation(&self) -> HashMap<PubKey, f64> {
318 self.consensus()
319 .await
320 .read()
321 .await
322 .consensus()
323 .read()
324 .await
325 .previous_proposal_participation()
326 }
327}
328
329#[async_trait]
330impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
331 RequestResponseDataSource<SeqTypes> for StorageState<N, P, D, V>
332{
333 async fn request_vid_shares(
334 &self,
335 block_number: u64,
336 vid_common_data: VidCommonQueryData<SeqTypes>,
337 timeout_duration: Duration,
338 ) -> anyhow::Result<Vec<VidShare>> {
339 self.as_ref()
340 .request_vid_shares(block_number, vid_common_data, timeout_duration)
341 .await
342 }
343}
344
345#[async_trait]
346impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence>
347 RequestResponseDataSource<SeqTypes> for ApiState<N, P, V>
348{
349 async fn request_vid_shares(
350 &self,
351 block_number: u64,
352 vid_common_data: VidCommonQueryData<SeqTypes>,
353 duration: Duration,
354 ) -> anyhow::Result<Vec<VidShare>> {
355 let request_response_protocol = self
357 .sequencer_context
358 .as_ref()
359 .get()
360 .await
361 .request_response_protocol
362 .clone();
363
364 let total_weight = match vid_common_data.common() {
366 VidCommon::V0(_) => {
367 return Err(anyhow::anyhow!(
369 "V0 total weight calculation not supported yet"
370 ));
371 },
372 VidCommon::V1(v1) => v1.total_weights,
373 };
374
375 let avidm_param =
377 init_avidm_param(total_weight).with_context(|| "failed to initialize avidm param")?;
378
379 let VidCommitment::V1(local_payload_hash) = vid_common_data.payload_hash() else {
381 bail!("V0 share verification not supported yet");
382 };
383
384 let request_id = rand::thread_rng().gen();
386
387 let received_shares = Arc::new(parking_lot::Mutex::new(Vec::new()));
389 let received_shares_clone = received_shares.clone();
390 let request_result: anyhow::Result<_, _> = timeout(
391 duration,
392 request_response_protocol.request_indefinitely::<_, _, _>(
393 Request::VidShare(block_number, request_id),
394 RequestType::Broadcast,
395 move |_request, response| {
396 let avidm_param = avidm_param.clone();
397 let received_shares = received_shares_clone.clone();
398 async move {
399 let Response::VidShare(VidShare::V1(received_share)) = response else {
401 bail!("V0 share verification not supported yet");
402 };
403
404 let Ok(Ok(_)) = AvidMScheme::verify_share(
406 &avidm_param,
407 &local_payload_hash,
408 &received_share,
409 ) else {
410 bail!("share verification failed");
411 };
412
413 received_shares.lock().push(received_share);
415
416 bail!("waiting for more shares");
417
418 #[allow(unreachable_code)]
419 Ok(())
420 }
421 },
422 ),
423 )
424 .await;
425
426 match request_result {
428 Err(_) => {
429 Ok(received_shares
431 .lock()
432 .clone()
433 .into_iter()
434 .map(VidShare::V1)
435 .collect())
436 },
437
438 Ok(Err(e)) => Err(e).with_context(|| "failed to request vid shares"),
440
441 Ok(Ok(_)) => bail!("this should not be possible"),
443 }
444 }
445}
446
447impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> SubmitDataSource<N, P>
448 for ApiState<N, P, V>
449{
450 async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
451 let handle = self.consensus().await;
452
453 let consensus_read_lock = handle.read().await;
454
455 let cf = consensus_read_lock
459 .decided_state()
460 .await
461 .chain_config
462 .resolve();
463
464 let cf = match cf {
468 Some(cf) => cf,
469 None => self.node_state().await.chain_config,
470 };
471
472 let max_block_size: u64 = cf.max_block_size.into();
473 let txn_size = tx.payload().len() as u64;
474
475 if txn_size > max_block_size {
477 bail!("transaction size ({txn_size}) is greater than max_block_size ({max_block_size})")
478 }
479
480 consensus_read_lock.submit_transaction(tx).await?;
481 Ok(())
482 }
483}
484
485impl<N, P, D, V> NodeStateDataSource for StorageState<N, P, D, V>
486where
487 N: ConnectedNetwork<PubKey>,
488 V: Versions,
489 P: SequencerPersistence,
490 D: Sync,
491{
492 async fn node_state(&self) -> NodeState {
493 self.as_ref().node_state().await
494 }
495}
496
497impl<
498 N: ConnectedNetwork<PubKey>,
499 V: Versions,
500 P: SequencerPersistence,
501 D: CatchupStorage + Send + Sync,
502 > CatchupDataSource for StorageState<N, P, D, V>
503{
504 #[tracing::instrument(skip(self, instance))]
505 async fn get_accounts(
506 &self,
507 instance: &NodeState,
508 height: u64,
509 view: ViewNumber,
510 accounts: &[FeeAccount],
511 ) -> anyhow::Result<FeeMerkleTree> {
512 match self
514 .as_ref()
515 .get_accounts(instance, height, view, accounts)
516 .await
517 {
518 Ok(accounts) => return Ok(accounts),
519 Err(err) => {
520 tracing::info!("accounts not in memory, trying storage: {err:#}");
521 },
522 }
523
524 let (tree, leaf) = self
526 .inner()
527 .get_accounts(instance, height, view, accounts)
528 .await
529 .context("accounts not in memory, and could not fetch from storage")?;
530 let consensus = self
534 .as_ref()
535 .consensus()
536 .await
537 .read()
538 .await
539 .consensus()
540 .clone();
541 if let Err(err) =
542 add_fee_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf).await
543 {
544 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
545 }
546 tracing::info!(?view, "updated with fetched account state");
547
548 Ok(tree)
549 }
550
551 #[tracing::instrument(skip(self, instance))]
552 async fn get_frontier(
553 &self,
554 instance: &NodeState,
555 height: u64,
556 view: ViewNumber,
557 ) -> anyhow::Result<BlocksFrontier> {
558 match self.as_ref().get_frontier(instance, height, view).await {
560 Ok(frontier) => return Ok(frontier),
561 Err(err) => {
562 tracing::info!("frontier is not in memory, trying storage: {err:#}");
563 },
564 }
565
566 self.inner().get_frontier(instance, height, view).await
568 }
569
570 async fn get_chain_config(
571 &self,
572 commitment: Commitment<ChainConfig>,
573 ) -> anyhow::Result<ChainConfig> {
574 match self.as_ref().get_chain_config(commitment).await {
576 Ok(cf) => return Ok(cf),
577 Err(err) => {
578 tracing::info!("chain config is not in memory, trying storage: {err:#}");
579 },
580 }
581
582 self.inner().get_chain_config(commitment).await
584 }
585 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
586 match self.as_ref().get_leaf_chain(height).await {
588 Ok(cf) => return Ok(cf),
589 Err(err) => {
590 tracing::info!("leaf chain is not in memory, trying storage: {err:#}");
591 },
592 }
593
594 self.inner().get_leaf_chain(height).await
596 }
597
598 #[tracing::instrument(skip(self, instance))]
599 async fn get_reward_accounts_v2(
600 &self,
601 instance: &NodeState,
602 height: u64,
603 view: ViewNumber,
604 accounts: &[RewardAccountV2],
605 ) -> anyhow::Result<RewardMerkleTreeV2> {
606 match self
608 .as_ref()
609 .get_reward_accounts_v2(instance, height, view, accounts)
610 .await
611 {
612 Ok(accounts) => return Ok(accounts),
613 Err(err) => {
614 tracing::info!("reward accounts not in memory, trying storage: {err:#}");
615 },
616 }
617
618 let (tree, leaf) = self
620 .inner()
621 .get_reward_accounts_v2(instance, height, view, accounts)
622 .await
623 .context("accounts not in memory, and could not fetch from storage")?;
624
625 let consensus = self
628 .as_ref()
629 .consensus()
630 .await
631 .read()
632 .await
633 .consensus()
634 .clone();
635 if let Err(err) =
636 add_v2_reward_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf)
637 .await
638 {
639 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
640 }
641 tracing::info!(?view, "updated with fetched account state");
642
643 Ok(tree)
644 }
645
646 #[tracing::instrument(skip(self, instance))]
647 async fn get_reward_accounts_v1(
648 &self,
649 instance: &NodeState,
650 height: u64,
651 view: ViewNumber,
652 accounts: &[RewardAccountV1],
653 ) -> anyhow::Result<RewardMerkleTreeV1> {
654 match self
656 .as_ref()
657 .get_reward_accounts_v1(instance, height, view, accounts)
658 .await
659 {
660 Ok(accounts) => return Ok(accounts),
661 Err(err) => {
662 tracing::info!("reward accounts not in memory, trying storage: {err:#}");
663 },
664 }
665
666 let (tree, leaf) = self
668 .inner()
669 .get_reward_accounts_v1(instance, height, view, accounts)
670 .await
671 .context("accounts not in memory, and could not fetch from storage")?;
672
673 let consensus = self
676 .as_ref()
677 .consensus()
678 .await
679 .read()
680 .await
681 .consensus()
682 .clone();
683 if let Err(err) =
684 add_v1_reward_accounts_to_state::<N, V, P>(&consensus, &view, accounts, &tree, leaf)
685 .await
686 {
687 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
688 }
689 tracing::info!(?view, "updated with fetched account state");
690
691 Ok(tree)
692 }
693}
694
695impl<N, V, P> NodeStateDataSource for ApiState<N, P, V>
696where
697 N: ConnectedNetwork<PubKey>,
698 V: Versions,
699 P: SequencerPersistence,
700{
701 async fn node_state(&self) -> NodeState {
702 self.sequencer_context.as_ref().get().await.node_state()
703 }
704}
705
706impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> CatchupDataSource
707 for ApiState<N, P, V>
708{
709 #[tracing::instrument(skip(self, _instance))]
710 async fn get_accounts(
711 &self,
712 _instance: &NodeState,
713 height: u64,
714 view: ViewNumber,
715 accounts: &[FeeAccount],
716 ) -> anyhow::Result<FeeMerkleTree> {
717 let state = self
718 .consensus()
719 .await
720 .read()
721 .await
722 .state(view)
723 .await
724 .context(format!(
725 "state not available for height {height}, view {view}"
726 ))?;
727 retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
728 }
729
730 #[tracing::instrument(skip(self, _instance))]
731 async fn get_frontier(
732 &self,
733 _instance: &NodeState,
734 height: u64,
735 view: ViewNumber,
736 ) -> anyhow::Result<BlocksFrontier> {
737 let state = self
738 .consensus()
739 .await
740 .read()
741 .await
742 .state(view)
743 .await
744 .context(format!(
745 "state not available for height {height}, view {view}"
746 ))?;
747 let tree = &state.block_merkle_tree;
748 let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
749 Ok(frontier)
750 }
751
752 async fn get_chain_config(
753 &self,
754 commitment: Commitment<ChainConfig>,
755 ) -> anyhow::Result<ChainConfig> {
756 let state = self.consensus().await.read().await.decided_state().await;
757 let chain_config = state.chain_config;
758
759 if chain_config.commit() == commitment {
760 chain_config.resolve().context("chain config found")
761 } else {
762 bail!("chain config not found")
763 }
764 }
765
766 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
767 let mut leaves = self
768 .consensus()
769 .await
770 .read()
771 .await
772 .consensus()
773 .read()
774 .await
775 .undecided_leaves();
776 leaves.sort_by_key(|l| l.view_number());
777 let (position, mut last_leaf) = leaves
778 .iter()
779 .find_position(|l| l.height() == height)
780 .context(format!("leaf chain not available for {height}"))?;
781 let mut chain = vec![last_leaf.clone()];
782 for leaf in leaves.iter().skip(position + 1) {
783 if leaf.justify_qc().view_number() == last_leaf.view_number() {
784 chain.push(leaf.clone());
785 } else {
786 continue;
787 }
788 if leaf.view_number() == last_leaf.view_number() + 1 {
789 last_leaf = leaf;
791 break;
792 }
793 last_leaf = leaf;
794 }
795 for leaf in leaves
797 .iter()
798 .skip_while(|l| l.view_number() <= last_leaf.view_number())
799 {
800 if leaf.justify_qc().view_number() == last_leaf.view_number() {
801 chain.push(leaf.clone());
802 return Ok(chain);
803 }
804 }
805 bail!(format!("leaf chain not available for {height}"))
806 }
807
808 #[tracing::instrument(skip(self, _instance))]
809 async fn get_reward_accounts_v2(
810 &self,
811 _instance: &NodeState,
812 height: u64,
813 view: ViewNumber,
814 accounts: &[RewardAccountV2],
815 ) -> anyhow::Result<RewardMerkleTreeV2> {
816 let state = self
817 .consensus()
818 .await
819 .read()
820 .await
821 .state(view)
822 .await
823 .context(format!(
824 "state not available for height {height}, view {view}"
825 ))?;
826
827 retain_v2_reward_accounts(&state.reward_merkle_tree_v2, accounts.iter().copied())
828 }
829
830 #[tracing::instrument(skip(self, _instance))]
831 async fn get_reward_accounts_v1(
832 &self,
833 _instance: &NodeState,
834 height: u64,
835 view: ViewNumber,
836 accounts: &[RewardAccountV1],
837 ) -> anyhow::Result<RewardMerkleTreeV1> {
838 let state = self
839 .consensus()
840 .await
841 .read()
842 .await
843 .state(view)
844 .await
845 .context(format!(
846 "state not available for height {height}, view {view}"
847 ))?;
848
849 retain_v1_reward_accounts(&state.reward_merkle_tree_v1, accounts.iter().copied())
850 }
851}
852
853impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
854 HotShotConfigDataSource for StorageState<N, P, D, V>
855{
856 async fn get_config(&self) -> PublicNetworkConfig {
857 self.as_ref().network_config().await.into()
858 }
859}
860
861impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> HotShotConfigDataSource
862 for ApiState<N, P, V>
863{
864 async fn get_config(&self) -> PublicNetworkConfig {
865 self.network_config().await.into()
866 }
867}
868
869#[async_trait]
870impl<N: ConnectedNetwork<PubKey>, D: Sync, V: Versions, P: SequencerPersistence>
871 StateSignatureDataSource<N> for StorageState<N, P, D, V>
872{
873 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
874 self.as_ref().get_state_signature(height).await
875 }
876}
877
878#[async_trait]
879impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> StateSignatureDataSource<N>
880 for ApiState<N, P, V>
881{
882 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
883 self.state_signer()
884 .await
885 .read()
886 .await
887 .get_state_signature(height)
888 .await
889 }
890}
891
892#[cfg(any(test, feature = "testing"))]
893pub mod test_helpers {
894 use std::time::Duration;
895
896 use alloy::{
897 network::EthereumWallet,
898 primitives::{Address, U256},
899 providers::{ext::AnvilApi, ProviderBuilder},
900 };
901 use committable::Committable;
902 use espresso_contract_deployer::{
903 builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
904 Contract, Contracts,
905 };
906 use espresso_types::{
907 v0::traits::{NullEventConsumer, PersistenceOptions, StateCatchup},
908 DrbAndHeaderUpgradeVersion, EpochVersion, FeeVersion, MockSequencerVersions, NamespaceId,
909 SequencerVersions, ValidatedState, V0_1,
910 };
911 use futures::{
912 future::{join_all, FutureExt},
913 stream::StreamExt,
914 };
915 use hotshot::types::{Event, EventType};
916 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
917 use hotshot_types::{
918 event::LeafInfo,
919 light_client::LCV3StateSignatureRequestBody,
920 traits::{metrics::NoMetrics, node_implementation::ConsensusTime},
921 HotShotConfig,
922 };
923 use itertools::izip;
924 use jf_merkle_tree::{MerkleCommitment, MerkleTreeScheme};
925 use portpicker::pick_unused_port;
926 use staking_cli::demo::{setup_stake_table_contract_for_test, DelegationConfig};
927 use surf_disco::Client;
928 use tempfile::TempDir;
929 use tide_disco::{error::ServerError, Api, App, Error, StatusCode};
930 use tokio::{spawn, task::JoinHandle, time::sleep};
931 use url::Url;
932 use vbs::version::{StaticVersion, StaticVersionType};
933
934 use super::*;
935 use crate::{
936 catchup::NullStateCatchup,
937 network,
938 persistence::no_storage,
939 testing::{run_legacy_builder, wait_for_decide_on_handle, TestConfig, TestConfigBuilder},
940 };
941
942 pub const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
943
944 pub struct TestNetwork<P: PersistenceOptions, const NUM_NODES: usize, V: Versions> {
945 pub server: SequencerContext<network::Memory, P::Persistence, V>,
946 pub peers: Vec<SequencerContext<network::Memory, P::Persistence, V>>,
947 pub cfg: TestConfig<{ NUM_NODES }>,
948 pub temp_dir: Option<TempDir>,
950 }
951
952 pub struct TestNetworkConfig<const NUM_NODES: usize, P, C>
953 where
954 P: PersistenceOptions,
955 C: StateCatchup + 'static,
956 {
957 state: [ValidatedState; NUM_NODES],
958 persistence: [P; NUM_NODES],
959 catchup: [C; NUM_NODES],
960 network_config: TestConfig<{ NUM_NODES }>,
961 api_config: Options,
962 }
963
964 impl<const NUM_NODES: usize, P, C> TestNetworkConfig<{ NUM_NODES }, P, C>
965 where
966 P: PersistenceOptions,
967 C: StateCatchup + 'static,
968 {
969 pub fn states(&self) -> [ValidatedState; NUM_NODES] {
970 self.state.clone()
971 }
972 }
973 #[derive(Clone)]
974 pub struct TestNetworkConfigBuilder<const NUM_NODES: usize, P, C>
975 where
976 P: PersistenceOptions,
977 C: StateCatchup + 'static,
978 {
979 state: [ValidatedState; NUM_NODES],
980 persistence: Option<[P; NUM_NODES]>,
981 catchup: Option<[C; NUM_NODES]>,
982 api_config: Option<Options>,
983 network_config: Option<TestConfig<{ NUM_NODES }>>,
984 }
985
986 impl Default for TestNetworkConfigBuilder<5, no_storage::Options, NullStateCatchup> {
987 fn default() -> Self {
988 TestNetworkConfigBuilder {
989 state: std::array::from_fn(|_| ValidatedState::default()),
990 persistence: Some([no_storage::Options; 5]),
991 catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
992 network_config: None,
993 api_config: None,
994 }
995 }
996 }
997
998 pub enum AnyTestNetwork<P: PersistenceOptions, const NUM_NODES: usize> {
999 V0_1(TestNetwork<P, NUM_NODES, SequencerVersions<V0_1, V0_1>>),
1000 V0_2(TestNetwork<P, NUM_NODES, SequencerVersions<FeeVersion, FeeVersion>>),
1001 V0_3(TestNetwork<P, NUM_NODES, SequencerVersions<EpochVersion, EpochVersion>>),
1002 V0_4(
1003 TestNetwork<
1004 P,
1005 NUM_NODES,
1006 SequencerVersions<DrbAndHeaderUpgradeVersion, DrbAndHeaderUpgradeVersion>,
1007 >,
1008 ),
1009 }
1010
1011 impl<P: PersistenceOptions, const NUM_NODES: usize> AnyTestNetwork<P, NUM_NODES> {
1012 pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1013 match self {
1014 AnyTestNetwork::V0_1(network) => network.cfg.hotshot_config(),
1015 AnyTestNetwork::V0_2(network) => network.cfg.hotshot_config(),
1016 AnyTestNetwork::V0_3(network) => network.cfg.hotshot_config(),
1017 AnyTestNetwork::V0_4(network) => network.cfg.hotshot_config(),
1018 }
1019 }
1020 }
1021
1022 impl<const NUM_NODES: usize>
1023 TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1024 {
1025 pub fn with_num_nodes(
1026 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1027 {
1028 TestNetworkConfigBuilder {
1029 state: std::array::from_fn(|_| ValidatedState::default()),
1030 persistence: Some([no_storage::Options; { NUM_NODES }]),
1031 catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1032 network_config: None,
1033 api_config: None,
1034 }
1035 }
1036 }
1037
1038 impl<const NUM_NODES: usize, P, C> TestNetworkConfigBuilder<{ NUM_NODES }, P, C>
1039 where
1040 P: PersistenceOptions,
1041 C: StateCatchup + 'static,
1042 {
1043 pub fn states(mut self, state: [ValidatedState; NUM_NODES]) -> Self {
1044 self.state = state;
1045 self
1046 }
1047
1048 pub fn persistences<NP: PersistenceOptions>(
1049 self,
1050 persistence: [NP; NUM_NODES],
1051 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, NP, C> {
1052 TestNetworkConfigBuilder {
1053 state: self.state,
1054 catchup: self.catchup,
1055 network_config: self.network_config,
1056 api_config: self.api_config,
1057 persistence: Some(persistence),
1058 }
1059 }
1060
1061 pub fn api_config(mut self, api_config: Options) -> Self {
1062 self.api_config = Some(api_config);
1063 self
1064 }
1065
1066 pub fn catchups<NC: StateCatchup + 'static>(
1067 self,
1068 catchup: [NC; NUM_NODES],
1069 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, P, NC> {
1070 TestNetworkConfigBuilder {
1071 state: self.state,
1072 catchup: Some(catchup),
1073 network_config: self.network_config,
1074 api_config: self.api_config,
1075 persistence: self.persistence,
1076 }
1077 }
1078
1079 pub fn network_config(mut self, network_config: TestConfig<{ NUM_NODES }>) -> Self {
1080 self.network_config = Some(network_config);
1081 self
1082 }
1083
1084 pub async fn pos_hook<V: Versions>(
1087 self,
1088 delegation_config: DelegationConfig,
1089 stake_table_version: StakeTableContractVersion,
1090 ) -> anyhow::Result<Self> {
1091 if <V as Versions>::Upgrade::VERSION < EpochVersion::VERSION
1092 && <V as Versions>::Base::VERSION < EpochVersion::VERSION
1093 {
1094 panic!("given version does not require pos deployment");
1095 };
1096
1097 let network_config = self
1098 .network_config
1099 .as_ref()
1100 .expect("network_config is required");
1101
1102 let l1_url = network_config.l1_url();
1103 let signer = network_config.signer();
1104 let deployer = ProviderBuilder::new()
1105 .wallet(EthereumWallet::from(signer.clone()))
1106 .on_http(l1_url.clone());
1107
1108 let blocks_per_epoch = network_config.hotshot_config().epoch_height;
1109 let epoch_start_block = network_config.hotshot_config().epoch_start_block;
1110 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1111 &network_config.hotshot_config().hotshot_stake_table(),
1112 STAKE_TABLE_CAPACITY_FOR_TEST,
1113 )
1114 .unwrap();
1115
1116 let mut contracts = Contracts::new();
1117 let args = DeployerArgsBuilder::default()
1118 .deployer(deployer.clone())
1119 .mock_light_client(true)
1120 .genesis_lc_state(genesis_state)
1121 .genesis_st_state(genesis_stake)
1122 .blocks_per_epoch(blocks_per_epoch)
1123 .epoch_start_block(epoch_start_block)
1124 .multisig_pauser(signer.address())
1125 .token_name("Espresso".to_string())
1126 .token_symbol("ESP".to_string())
1127 .initial_token_supply(U256::from(100000u64))
1128 .ops_timelock_delay(U256::from(0))
1129 .ops_timelock_admin(signer.address())
1130 .ops_timelock_proposers(vec![signer.address()])
1131 .ops_timelock_executors(vec![signer.address()])
1132 .safe_exit_timelock_delay(U256::from(10))
1133 .safe_exit_timelock_admin(signer.address())
1134 .safe_exit_timelock_proposers(vec![signer.address()])
1135 .safe_exit_timelock_executors(vec![signer.address()])
1136 .build()
1137 .unwrap();
1138
1139 match stake_table_version {
1140 StakeTableContractVersion::V1 => {
1141 args.deploy_to_stake_table_v1(&mut contracts).await
1142 },
1143 StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1144 }
1145 .context("failed to deploy contracts")?;
1146
1147 let stake_table_address = contracts
1148 .address(Contract::StakeTableProxy)
1149 .expect("StakeTableProxy address not found");
1150 setup_stake_table_contract_for_test(
1151 l1_url.clone(),
1152 &deployer,
1153 stake_table_address,
1154 network_config.staking_priv_keys(),
1155 delegation_config,
1156 )
1157 .await
1158 .expect("stake table setup failed");
1159
1160 if let Some(anvil) = network_config.anvil() {
1165 anvil
1166 .anvil_set_interval_mining(1)
1167 .await
1168 .expect("interval mining");
1169 }
1170
1171 let state = self.state[0].clone();
1175 let chain_config = if let Some(cf) = state.chain_config.resolve() {
1176 ChainConfig {
1177 base_fee: 0.into(),
1178 stake_table_contract: Some(stake_table_address),
1179 ..cf
1180 }
1181 } else {
1182 ChainConfig {
1183 base_fee: 0.into(),
1184 stake_table_contract: Some(stake_table_address),
1185 ..Default::default()
1186 }
1187 };
1188
1189 let state = ValidatedState {
1190 chain_config: chain_config.into(),
1191 ..state
1192 };
1193 Ok(self.states(std::array::from_fn(|_| state.clone())))
1194 }
1195
1196 pub fn build(self) -> TestNetworkConfig<{ NUM_NODES }, P, C> {
1197 TestNetworkConfig {
1198 state: self.state,
1199 persistence: self.persistence.unwrap(),
1200 catchup: self.catchup.unwrap(),
1201 network_config: self.network_config.unwrap(),
1202 api_config: self.api_config.unwrap(),
1203 }
1204 }
1205 }
1206
1207 impl<P: PersistenceOptions, const NUM_NODES: usize, V: Versions> TestNetwork<P, { NUM_NODES }, V> {
1208 pub async fn new<C: StateCatchup + 'static>(
1209 cfg: TestNetworkConfig<{ NUM_NODES }, P, C>,
1210 bind_version: V,
1211 ) -> Self {
1212 let mut cfg = cfg;
1213 let mut builder_tasks = Vec::new();
1214
1215 let chain_config = cfg.state[0].chain_config.resolve();
1216 if chain_config.is_none() {
1217 tracing::warn!("Chain config is not set, using default max_block_size");
1218 }
1219 let (task, builder_url) = run_legacy_builder::<{ NUM_NODES }>(
1220 cfg.network_config.builder_port(),
1221 chain_config.map(|c| *c.max_block_size),
1222 )
1223 .await;
1224 builder_tasks.push(task);
1225 cfg.network_config
1226 .set_builder_urls(vec1::vec1![builder_url.clone()]);
1227
1228 let mut opt = cfg.api_config.clone();
1230 let temp_dir = if opt.storage_fs.is_none() && opt.storage_sql.is_none() {
1231 let temp_dir = tempfile::tempdir().unwrap();
1232 opt = opt.query_fs(
1233 Default::default(),
1234 crate::persistence::fs::Options::new(temp_dir.path().to_path_buf()),
1235 );
1236 Some(temp_dir)
1237 } else {
1238 None
1239 };
1240
1241 let mut nodes = join_all(
1242 izip!(cfg.state, cfg.persistence, cfg.catchup)
1243 .enumerate()
1244 .map(|(i, (state, persistence, state_peers))| {
1245 let opt = opt.clone();
1246 let cfg = &cfg.network_config;
1247 let upgrades_map = cfg.upgrades();
1248 async move {
1249 if i == 0 {
1250 opt.serve(|metrics, consumer, storage| {
1251 let cfg = cfg.clone();
1252 async move {
1253 Ok(cfg
1254 .init_node(
1255 0,
1256 state,
1257 persistence,
1258 Some(state_peers),
1259 storage,
1260 &*metrics,
1261 STAKE_TABLE_CAPACITY_FOR_TEST,
1262 consumer,
1263 bind_version,
1264 upgrades_map,
1265 )
1266 .await)
1267 }
1268 .boxed()
1269 })
1270 .await
1271 .unwrap()
1272 } else {
1273 cfg.init_node(
1274 i,
1275 state,
1276 persistence,
1277 Some(state_peers),
1278 None,
1279 &NoMetrics,
1280 STAKE_TABLE_CAPACITY_FOR_TEST,
1281 NullEventConsumer,
1282 bind_version,
1283 upgrades_map,
1284 )
1285 .await
1286 }
1287 }
1288 }),
1289 )
1290 .await;
1291
1292 let handle_0 = &nodes[0];
1293
1294 for builder_task in builder_tasks {
1296 builder_task.start(Box::new(handle_0.event_stream().await));
1297 }
1298
1299 for ctx in &nodes {
1300 ctx.start_consensus().await;
1301 }
1302
1303 let server = nodes.remove(0);
1304 let peers = nodes;
1305
1306 Self {
1307 server,
1308 peers,
1309 cfg: cfg.network_config,
1310 temp_dir,
1311 }
1312 }
1313
1314 pub async fn stop_consensus(&mut self) {
1315 self.server.shutdown_consensus().await;
1316
1317 for ctx in &mut self.peers {
1318 ctx.shutdown_consensus().await;
1319 }
1320 }
1321 }
1322
1323 pub async fn status_test_helper(opt: impl FnOnce(Options) -> Options) {
1331 let port = pick_unused_port().expect("No ports free");
1332 let url = format!("http://localhost:{port}").parse().unwrap();
1333 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1334
1335 let options = opt(Options::with_port(port));
1336 let network_config = TestConfigBuilder::default().build();
1337 let config = TestNetworkConfigBuilder::default()
1338 .api_config(options)
1339 .network_config(network_config)
1340 .build();
1341 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1342 client.connect(None).await;
1343
1344 while client
1348 .get::<u64>("status/block-height")
1349 .send()
1350 .await
1351 .unwrap()
1352 <= 1
1353 {
1354 sleep(Duration::from_secs(1)).await;
1355 }
1356 let success_rate = client
1357 .get::<f64>("status/success-rate")
1358 .send()
1359 .await
1360 .unwrap();
1361 assert!(success_rate.is_finite(), "{success_rate}");
1364 assert!(success_rate > 0.0, "{success_rate}");
1366 }
1367
1368 pub async fn submit_test_helper(opt: impl FnOnce(Options) -> Options) {
1376 let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3, 4]);
1377
1378 let port = pick_unused_port().expect("No ports free");
1379
1380 let url = format!("http://localhost:{port}").parse().unwrap();
1381 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1382
1383 let options = opt(Options::with_port(port).submit(Default::default()));
1384 let network_config = TestConfigBuilder::default().build();
1385 let config = TestNetworkConfigBuilder::default()
1386 .api_config(options)
1387 .network_config(network_config)
1388 .build();
1389 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1390 let mut events = network.server.event_stream().await;
1391
1392 client.connect(None).await;
1393
1394 let hash = client
1395 .post("submit/submit")
1396 .body_json(&txn)
1397 .unwrap()
1398 .send()
1399 .await
1400 .unwrap();
1401 assert_eq!(txn.commit(), hash);
1402
1403 wait_for_decide_on_handle(&mut events, &txn).await;
1405 }
1406
1407 pub async fn state_signature_test_helper(opt: impl FnOnce(Options) -> Options) {
1409 let port = pick_unused_port().expect("No ports free");
1410
1411 let url = format!("http://localhost:{port}").parse().unwrap();
1412
1413 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1414
1415 let options = opt(Options::with_port(port));
1416 let network_config = TestConfigBuilder::default().build();
1417 let config = TestNetworkConfigBuilder::default()
1418 .api_config(options)
1419 .network_config(network_config)
1420 .build();
1421 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1422
1423 let mut height: u64;
1424 loop {
1427 height = network.server.decided_leaf().await.height();
1428 sleep(std::time::Duration::from_secs(1)).await;
1429 if height >= 2 {
1430 break;
1431 }
1432 }
1433 client
1435 .get::<LCV3StateSignatureRequestBody>(&format!("state-signature/block/{height}"))
1436 .send()
1437 .await
1438 .unwrap();
1439 }
1440
1441 pub async fn catchup_test_helper(opt: impl FnOnce(Options) -> Options) {
1449 let port = pick_unused_port().expect("No ports free");
1450 let url = format!("http://localhost:{port}").parse().unwrap();
1451 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
1452
1453 let options = opt(Options::with_port(port));
1454 let network_config = TestConfigBuilder::default().build();
1455 let config = TestNetworkConfigBuilder::default()
1456 .api_config(options)
1457 .network_config(network_config)
1458 .build();
1459 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1460 client.connect(None).await;
1461
1462 let mut events = network.server.event_stream().await;
1464 loop {
1465 if let Event {
1466 event: EventType::Decide { leaf_chain, .. },
1467 ..
1468 } = events.next().await.unwrap()
1469 {
1470 if leaf_chain
1471 .iter()
1472 .any(|LeafInfo { leaf, .. }| leaf.block_header().height() > 2)
1473 {
1474 break;
1475 }
1476 }
1477 }
1478
1479 {
1482 network.server.shutdown_consensus().await;
1483 }
1484
1485 let leaf = network.server.decided_leaf().await;
1487 let height = leaf.height() + 1;
1488 let view = leaf.view_number() + 1;
1489 let res = client
1490 .get::<AccountQueryData>(&format!(
1491 "catchup/{height}/{}/account/{:x}",
1492 view.u64(),
1493 Address::default()
1494 ))
1495 .send()
1496 .await
1497 .unwrap();
1498 assert_eq!(res.balance, U256::ZERO);
1499 assert_eq!(
1500 res.proof
1501 .verify(
1502 &network
1503 .server
1504 .state(view)
1505 .await
1506 .unwrap()
1507 .fee_merkle_tree
1508 .commitment()
1509 )
1510 .unwrap(),
1511 U256::ZERO,
1512 );
1513
1514 let res = client
1516 .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
1517 .send()
1518 .await
1519 .unwrap();
1520 let root = &network
1521 .server
1522 .state(view)
1523 .await
1524 .unwrap()
1525 .block_merkle_tree
1526 .commitment();
1527 BlockMerkleTree::verify(root, root.size() - 1, res)
1528 .unwrap()
1529 .unwrap();
1530 }
1531
1532 pub async fn spawn_dishonest_peer_catchup_api() -> anyhow::Result<(Url, JoinHandle<()>)> {
1533 let toml = toml::from_str::<toml::Value>(include_str!("../api/catchup.toml")).unwrap();
1534 let mut api =
1535 Api::<(), hotshot_query_service::Error, SequencerApiVersion>::new(toml).unwrap();
1536
1537 api.get("account", |_req, _state: &()| {
1538 async move {
1539 Result::<AccountQueryData, _>::Err(hotshot_query_service::Error::catch_all(
1540 StatusCode::BAD_REQUEST,
1541 "no account found".to_string(),
1542 ))
1543 }
1544 .boxed()
1545 })?
1546 .get("blocks", |_req, _state| {
1547 async move {
1548 Result::<BlocksFrontier, _>::Err(hotshot_query_service::Error::catch_all(
1549 StatusCode::BAD_REQUEST,
1550 "no block found".to_string(),
1551 ))
1552 }
1553 .boxed()
1554 })?
1555 .get("chainconfig", |_req, _state| {
1556 async move {
1557 Result::<ChainConfig, _>::Ok(ChainConfig {
1558 max_block_size: 300.into(),
1559 base_fee: 1.into(),
1560 fee_recipient: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
1561 .parse()
1562 .unwrap(),
1563 ..Default::default()
1564 })
1565 }
1566 .boxed()
1567 })?
1568 .get("leafchain", |_req, _state| {
1569 async move {
1570 Result::<Vec<Leaf2>, _>::Err(hotshot_query_service::Error::catch_all(
1571 StatusCode::BAD_REQUEST,
1572 "No leafchain found".to_string(),
1573 ))
1574 }
1575 .boxed()
1576 })?;
1577
1578 let mut app = App::<_, hotshot_query_service::Error>::with_state(());
1579 app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap());
1580
1581 app.register_module::<_, _>("catchup", api).unwrap();
1582
1583 let port = pick_unused_port().expect("no free port");
1584 let url: Url = Url::parse(&format!("http://localhost:{port}")).unwrap();
1585
1586 let handle = spawn({
1587 let url = url.clone();
1588 async move {
1589 let _ = app.serve(url, SequencerApiVersion::instance()).await;
1590 }
1591 });
1592
1593 Ok((url, handle))
1594 }
1595}
1596
1597#[cfg(test)]
1598mod api_tests {
1599 use std::{fmt::Debug, marker::PhantomData};
1600
1601 use committable::Committable;
1602 use data_source::testing::TestableSequencerDataSource;
1603 use espresso_types::{
1604 traits::{EventConsumer, PersistenceOptions},
1605 Header, Leaf2, MockSequencerVersions, NamespaceId, NamespaceProofQueryData, ValidatedState,
1606 };
1607 use futures::{future, stream::StreamExt};
1608 use hotshot_example_types::node_types::TestVersions;
1609 use hotshot_query_service::availability::{
1610 AvailabilityDataSource, BlockQueryData, VidCommonQueryData,
1611 };
1612 use hotshot_types::{
1613 data::{
1614 ns_table::parse_ns_table, vid_disperse::VidDisperseShare2, DaProposal2, EpochNumber,
1615 QuorumProposal2, QuorumProposalWrapper, VidCommitment,
1616 },
1617 event::LeafInfo,
1618 message::Proposal,
1619 simple_certificate::QuorumCertificate2,
1620 traits::{node_implementation::ConsensusTime, signature_key::SignatureKey, EncodeBytes},
1621 utils::EpochTransitionIndicator,
1622 vid::avidm::{init_avidm_param, AvidMScheme},
1623 };
1624 use portpicker::pick_unused_port;
1625 use surf_disco::Client;
1626 use test_helpers::{
1627 catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
1628 TestNetwork, TestNetworkConfigBuilder,
1629 };
1630 use tide_disco::error::ServerError;
1631 use vbs::version::StaticVersion;
1632
1633 use super::{update::ApiEventConsumer, *};
1634 use crate::{
1635 network,
1636 persistence::no_storage::NoStorage,
1637 testing::{wait_for_decide_on_handle, TestConfigBuilder},
1638 };
1639
1640 #[rstest_reuse::template]
1641 #[rstest::rstest]
1642 #[case(PhantomData::<crate::api::sql::DataSource>)]
1643 #[case(PhantomData::<crate::api::fs::DataSource>)]
1644 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1645 pub fn testable_sequencer_data_source<D: TestableSequencerDataSource>(
1646 #[case] _d: PhantomData<D>,
1647 ) {
1648 }
1649
1650 #[rstest_reuse::apply(testable_sequencer_data_source)]
1651 pub(crate) async fn submit_test_with_query_module<D: TestableSequencerDataSource>(
1652 _d: PhantomData<D>,
1653 ) {
1654 let storage = D::create_storage().await;
1655 submit_test_helper(|opt| D::options(&storage, opt)).await
1656 }
1657
1658 #[rstest_reuse::apply(testable_sequencer_data_source)]
1659 pub(crate) async fn status_test_with_query_module<D: TestableSequencerDataSource>(
1660 _d: PhantomData<D>,
1661 ) {
1662 let storage = D::create_storage().await;
1663 status_test_helper(|opt| D::options(&storage, opt)).await
1664 }
1665
1666 #[rstest_reuse::apply(testable_sequencer_data_source)]
1667 pub(crate) async fn state_signature_test_with_query_module<D: TestableSequencerDataSource>(
1668 _d: PhantomData<D>,
1669 ) {
1670 let storage = D::create_storage().await;
1671 state_signature_test_helper(|opt| D::options(&storage, opt)).await
1672 }
1673
1674 #[rstest_reuse::apply(testable_sequencer_data_source)]
1675 pub(crate) async fn test_namespace_query<D: TestableSequencerDataSource>(_d: PhantomData<D>) {
1676 let ns_id = NamespaceId::from(42_u32);
1678 let txn = Transaction::new(ns_id, vec![1, 2, 3, 4]);
1679
1680 let port = pick_unused_port().expect("No ports free");
1682 let storage = D::create_storage().await;
1683 let network_config = TestConfigBuilder::default().build();
1684 let config = TestNetworkConfigBuilder::default()
1685 .api_config(D::options(&storage, Options::with_port(port)).submit(Default::default()))
1686 .network_config(network_config)
1687 .build();
1688 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
1689 let mut events = network.server.event_stream().await;
1690
1691 let client: Client<ServerError, StaticVersion<0, 1>> =
1693 Client::new(format!("http://localhost:{port}").parse().unwrap());
1694 client.connect(None).await;
1695
1696 let hash = client
1697 .post("submit/submit")
1698 .body_json(&txn)
1699 .unwrap()
1700 .send()
1701 .await
1702 .unwrap();
1703 assert_eq!(txn.commit(), hash);
1704
1705 let block_height = wait_for_decide_on_handle(&mut events, &txn).await.0 as usize;
1707 tracing::info!(block_height, "transaction sequenced");
1708
1709 client
1711 .socket(&format!("availability/stream/blocks/{block_height}"))
1712 .subscribe::<BlockQueryData<SeqTypes>>()
1713 .await
1714 .unwrap()
1715 .next()
1716 .await
1717 .unwrap()
1718 .unwrap();
1719
1720 let mut found_txn = false;
1721 let mut found_empty_block = false;
1722 for block_num in 0..=block_height {
1723 let header: Header = client
1724 .get(&format!("availability/header/{block_num}"))
1725 .send()
1726 .await
1727 .unwrap();
1728 let ns_query_res: NamespaceProofQueryData = client
1729 .get(&format!("availability/block/{block_num}/namespace/{ns_id}"))
1730 .send()
1731 .await
1732 .unwrap();
1733
1734 if let Some(ns_proof) = ns_query_res.proof {
1736 let vid_common: VidCommonQueryData<SeqTypes> = client
1737 .get(&format!("availability/vid/common/{block_num}"))
1738 .send()
1739 .await
1740 .unwrap();
1741 ns_proof
1742 .verify(
1743 header.ns_table(),
1744 &header.payload_commitment(),
1745 vid_common.common(),
1746 )
1747 .unwrap();
1748 } else {
1749 assert!(header.ns_table().find_ns_id(&ns_id).is_none());
1751 assert!(ns_query_res.transactions.is_empty());
1752 }
1753
1754 found_empty_block = found_empty_block || ns_query_res.transactions.is_empty();
1755
1756 for txn in ns_query_res.transactions {
1757 if txn.commit() == hash {
1758 found_txn = true;
1760 }
1761 }
1762 }
1763 assert!(found_txn);
1764 assert!(found_empty_block);
1765 }
1766
1767 #[rstest_reuse::apply(testable_sequencer_data_source)]
1768 pub(crate) async fn catchup_test_with_query_module<D: TestableSequencerDataSource>(
1769 _d: PhantomData<D>,
1770 ) {
1771 let storage = D::create_storage().await;
1772 catchup_test_helper(|opt| D::options(&storage, opt)).await
1773 }
1774
1775 #[rstest_reuse::apply(testable_sequencer_data_source)]
1776 pub async fn test_non_consecutive_decide_with_failing_event_consumer<D>(_d: PhantomData<D>)
1777 where
1778 D: TestableSequencerDataSource + Debug + 'static,
1779 {
1780 #[derive(Clone, Copy, Debug)]
1781 struct FailConsumer;
1782
1783 #[async_trait]
1784 impl EventConsumer for FailConsumer {
1785 async fn handle_event(&self, _: &Event<SeqTypes>) -> anyhow::Result<()> {
1786 bail!("mock error injection");
1787 }
1788 }
1789
1790 let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);
1791
1792 let storage = D::create_storage().await;
1793 let persistence = D::persistence_options(&storage).create().await.unwrap();
1794 let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
1795 Arc::new(StorageState::new(
1796 D::create(D::persistence_options(&storage), Default::default(), false)
1797 .await
1798 .unwrap(),
1799 ApiState::new(future::pending()),
1800 ));
1801
1802 let mut chain1 = vec![];
1804
1805 let genesis = Leaf2::genesis::<TestVersions>(&Default::default(), &NodeState::mock()).await;
1806 let payload = genesis.block_payload().unwrap();
1807 let payload_bytes_arc = payload.encode();
1808
1809 let avidm_param = init_avidm_param(2).unwrap();
1810 let weights = vec![1u32; 2];
1811
1812 let ns_table = parse_ns_table(payload.byte_len().as_usize(), &payload.ns_table().encode());
1813 let (payload_commitment, shares) =
1814 AvidMScheme::ns_disperse(&avidm_param, &weights, &payload_bytes_arc, ns_table).unwrap();
1815
1816 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1817 proposal: QuorumProposal2::<SeqTypes> {
1818 block_header: genesis.block_header().clone(),
1819 view_number: ViewNumber::genesis(),
1820 justify_qc: QuorumCertificate2::genesis::<MockSequencerVersions>(
1821 &ValidatedState::default(),
1822 &NodeState::mock(),
1823 )
1824 .await,
1825 upgrade_certificate: None,
1826 view_change_evidence: None,
1827 next_drb_result: None,
1828 next_epoch_justify_qc: None,
1829 epoch: None,
1830 state_cert: None,
1831 },
1832 };
1833 let mut qc = QuorumCertificate2::genesis::<MockSequencerVersions>(
1834 &ValidatedState::default(),
1835 &NodeState::mock(),
1836 )
1837 .await;
1838
1839 let mut justify_qc = qc.clone();
1840 for i in 0..5 {
1841 *quorum_proposal.proposal.block_header.height_mut() = i;
1842 quorum_proposal.proposal.view_number = ViewNumber::new(i);
1843 quorum_proposal.proposal.justify_qc = justify_qc;
1844 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
1845 qc.view_number = leaf.view_number();
1846 qc.data.leaf_commit = Committable::commit(&leaf);
1847 justify_qc = qc.clone();
1848 chain1.push((leaf.clone(), qc.clone()));
1849
1850 let quorum_proposal_signature =
1852 PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1853 .expect("Failed to sign quorum_proposal");
1854 persistence
1855 .append_quorum_proposal2(&Proposal {
1856 data: quorum_proposal.clone(),
1857 signature: quorum_proposal_signature,
1858 _pd: Default::default(),
1859 })
1860 .await
1861 .unwrap();
1862
1863 let share = VidDisperseShare2::<SeqTypes> {
1865 view_number: leaf.view_number(),
1866 payload_commitment,
1867 share: shares[0].clone(),
1868 recipient_key: pubkey,
1869 epoch: Some(EpochNumber::new(0)),
1870 target_epoch: Some(EpochNumber::new(0)),
1871 common: avidm_param.clone(),
1872 };
1873 persistence
1874 .append_vid2(&share.to_proposal(&privkey).unwrap())
1875 .await
1876 .unwrap();
1877
1878 let block_payload_signature =
1880 PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
1881 let da_proposal_inner = DaProposal2::<SeqTypes> {
1882 encoded_transactions: payload_bytes_arc.clone(),
1883 metadata: payload.ns_table().clone(),
1884 view_number: leaf.view_number(),
1885 epoch: Some(EpochNumber::new(0)),
1886 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1887 };
1888 let da_proposal = Proposal {
1889 data: da_proposal_inner,
1890 signature: block_payload_signature,
1891 _pd: Default::default(),
1892 };
1893 persistence
1894 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1895 .await
1896 .unwrap();
1897 }
1898 let mut chain2 = chain1.split_off(2);
1900 chain2.remove(0);
1902
1903 let leaf_chain = chain1
1905 .iter()
1906 .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
1907 .collect::<Vec<_>>();
1908 tracing::info!("decide with event handling failure");
1909 persistence
1910 .append_decided_leaves(
1911 ViewNumber::new(1),
1912 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
1913 &FailConsumer,
1914 )
1915 .await
1916 .unwrap();
1917
1918 let consumer = ApiEventConsumer::from(data_source.clone());
1921 let leaf_chain = chain2
1922 .iter()
1923 .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
1924 .collect::<Vec<_>>();
1925 tracing::info!("decide successfully");
1926 persistence
1927 .append_decided_leaves(
1928 ViewNumber::new(4),
1929 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
1930 &consumer,
1931 )
1932 .await
1933 .unwrap();
1934
1935 for (leaf, qc) in chain1.iter().chain(&chain2) {
1938 tracing::info!(height = leaf.height(), "check archive");
1939 let qd = data_source.get_leaf(leaf.height() as usize).await.await;
1940 let stored_leaf: Leaf2 = qd.leaf().clone();
1941 let stored_qc = qd.qc().clone();
1942 assert_eq!(&stored_leaf, leaf);
1943 assert_eq!(&stored_qc, qc);
1944
1945 data_source
1946 .get_block(leaf.height() as usize)
1947 .await
1948 .try_resolve()
1949 .ok()
1950 .unwrap();
1951 data_source
1952 .get_vid_common(leaf.height() as usize)
1953 .await
1954 .try_resolve()
1955 .ok()
1956 .unwrap();
1957
1958 assert!(persistence
1960 .load_da_proposal(leaf.view_number())
1961 .await
1962 .unwrap()
1963 .is_none());
1964 assert!(persistence
1965 .load_vid_share(leaf.view_number())
1966 .await
1967 .unwrap()
1968 .is_none());
1969 assert!(persistence
1970 .load_quorum_proposal(leaf.view_number())
1971 .await
1972 .is_err());
1973 }
1974
1975 assert!(persistence
1977 .load_da_proposal(ViewNumber::new(2))
1978 .await
1979 .unwrap()
1980 .is_some());
1981 assert!(persistence
1982 .load_vid_share(ViewNumber::new(2))
1983 .await
1984 .unwrap()
1985 .is_some());
1986 persistence
1987 .load_quorum_proposal(ViewNumber::new(2))
1988 .await
1989 .unwrap();
1990 }
1991
1992 #[rstest_reuse::apply(testable_sequencer_data_source)]
1993 pub async fn test_decide_missing_data<D>(_d: PhantomData<D>)
1994 where
1995 D: TestableSequencerDataSource + Debug + 'static,
1996 {
1997 let storage = D::create_storage().await;
1998 let persistence = D::persistence_options(&storage).create().await.unwrap();
1999 let data_source: Arc<StorageState<network::Memory, NoStorage, _, MockSequencerVersions>> =
2000 Arc::new(StorageState::new(
2001 D::create(D::persistence_options(&storage), Default::default(), false)
2002 .await
2003 .unwrap(),
2004 ApiState::new(future::pending()),
2005 ));
2006 let consumer = ApiEventConsumer::from(data_source.clone());
2007
2008 let mut qc = QuorumCertificate2::genesis::<MockSequencerVersions>(
2009 &ValidatedState::default(),
2010 &NodeState::mock(),
2011 )
2012 .await;
2013 let leaf =
2014 Leaf2::genesis::<TestVersions>(&ValidatedState::default(), &NodeState::mock()).await;
2015
2016 tracing::info!(?leaf, ?qc, "decide genesis leaf");
2020 persistence
2021 .append_decided_leaves(
2022 leaf.view_number(),
2023 [(&leaf_info(leaf.clone()), qc.clone())],
2024 &consumer,
2025 )
2026 .await
2027 .unwrap();
2028
2029 let mut block_header = leaf.block_header().clone();
2031 *block_header.height_mut() += 1;
2032 let qp = QuorumProposalWrapper {
2033 proposal: QuorumProposal2 {
2034 block_header,
2035 view_number: leaf.view_number() + 1,
2036 justify_qc: qc.clone(),
2037 upgrade_certificate: None,
2038 view_change_evidence: None,
2039 next_drb_result: None,
2040 next_epoch_justify_qc: None,
2041 epoch: None,
2042 state_cert: None,
2043 },
2044 };
2045
2046 let leaf = Leaf2::from_quorum_proposal(&qp);
2047 qc.view_number = leaf.view_number();
2048 qc.data.leaf_commit = Committable::commit(&leaf);
2049
2050 tracing::info!(?leaf, ?qc, "append leaf 1");
2052 persistence
2053 .append_decided_leaves(
2054 leaf.view_number(),
2055 [(&leaf_info(leaf.clone()), qc)],
2056 &consumer,
2057 )
2058 .await
2059 .unwrap();
2060
2061 assert_eq!(leaf, data_source.get_leaf(1).await.await.leaf().clone());
2063 assert!(data_source.get_vid_common(1).await.is_pending());
2064 assert!(data_source.get_block(1).await.is_pending());
2065 }
2066
2067 fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
2068 LeafInfo {
2069 leaf,
2070 vid_share: None,
2071 state: Default::default(),
2072 delta: None,
2073 state_cert: None,
2074 }
2075 }
2076}
2077
2078#[cfg(test)]
2079mod test {
2080 use std::{
2081 collections::{HashMap, HashSet},
2082 time::Duration,
2083 };
2084
2085 use alloy::{
2086 eips::BlockId,
2087 network::EthereumWallet,
2088 primitives::U256,
2089 providers::{Provider, ProviderBuilder},
2090 };
2091 use async_lock::Mutex;
2092 use committable::{Commitment, Committable};
2093 use espresso_contract_deployer::{
2094 builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
2095 Contract, Contracts,
2096 };
2097 use espresso_types::{
2098 config::PublicHotShotConfig,
2099 traits::{NullEventConsumer, PersistenceOptions},
2100 v0_3::{Fetcher, RewardAmount, COMMISSION_BASIS_POINTS},
2101 validators_from_l1_events, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAmount, FeeVersion,
2102 Header, L1ClientOptions, MockSequencerVersions, NamespaceId, RewardDistributor,
2103 SequencerVersions, ValidatedState,
2104 };
2105 use futures::{
2106 future::{self, join_all},
2107 stream::{StreamExt, TryStreamExt},
2108 };
2109 use hotshot::types::EventType;
2110 use hotshot_contract_adapter::sol_types::{EspToken, StakeTableV2};
2111 use hotshot_example_types::node_types::EpochsTestVersions;
2112 use hotshot_query_service::{
2113 availability::{
2114 BlockQueryData, BlockSummaryQueryData, LeafQueryData, StateCertQueryDataV1,
2115 StateCertQueryDataV2, TransactionQueryData, VidCommonQueryData,
2116 },
2117 data_source::{sql::Config, storage::SqlStorage, VersionedDataSource},
2118 explorer::TransactionSummariesResponse,
2119 types::HeightIndexed,
2120 };
2121 use hotshot_types::{
2122 data::EpochNumber,
2123 event::LeafInfo,
2124 traits::{
2125 block_contents::BlockHeader, election::Membership, metrics::NoMetrics,
2126 node_implementation::ConsensusTime,
2127 },
2128 utils::epoch_from_block_number,
2129 ValidatorConfig,
2130 };
2131 use jf_merkle_tree::prelude::{MerkleProof, Sha3Node};
2132 use portpicker::pick_unused_port;
2133 use rand::seq::SliceRandom;
2134 use rstest::rstest;
2135 use staking_cli::demo::DelegationConfig;
2136 use surf_disco::Client;
2137 use test_helpers::{
2138 catchup_test_helper, state_signature_test_helper, status_test_helper, submit_test_helper,
2139 TestNetwork, TestNetworkConfigBuilder,
2140 };
2141 use tide_disco::{app::AppHealth, error::ServerError, healthcheck::HealthStatus};
2142 use tokio::time::sleep;
2143 use vbs::version::{StaticVersion, StaticVersionType};
2144
2145 use self::{
2146 data_source::testing::TestableSequencerDataSource, options::HotshotEvents,
2147 sql::DataSource as SqlDataSource,
2148 };
2149 use super::*;
2150 use crate::{
2151 api::{
2152 options::Query,
2153 sql::{impl_testable_data_source::tmp_options, reconstruct_state},
2154 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2155 },
2156 catchup::{NullStateCatchup, StatePeers},
2157 persistence::no_storage,
2158 testing::{wait_for_decide_on_handle, TestConfig, TestConfigBuilder},
2159 };
2160
2161 type PosVersionV3 = SequencerVersions<StaticVersion<0, 3>, StaticVersion<0, 0>>;
2162 type PosVersionV4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
2163
2164 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2165 async fn test_healthcheck() {
2166 let port = pick_unused_port().expect("No ports free");
2167 let url = format!("http://localhost:{port}").parse().unwrap();
2168 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2169 let options = Options::with_port(port);
2170 let network_config = TestConfigBuilder::default().build();
2171 let config = TestNetworkConfigBuilder::<5, _, NullStateCatchup>::default()
2172 .api_config(options)
2173 .network_config(network_config)
2174 .build();
2175 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2176
2177 client.connect(None).await;
2178 let health = client.get::<AppHealth>("healthcheck").send().await.unwrap();
2179 assert_eq!(health.status, HealthStatus::Available);
2180 }
2181
2182 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2183 async fn status_test_without_query_module() {
2184 status_test_helper(|opt| opt).await
2185 }
2186
2187 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2188 async fn submit_test_without_query_module() {
2189 submit_test_helper(|opt| opt).await
2190 }
2191
2192 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2193 async fn state_signature_test_without_query_module() {
2194 state_signature_test_helper(|opt| opt).await
2195 }
2196
2197 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2198 async fn catchup_test_without_query_module() {
2199 catchup_test_helper(|opt| opt).await
2200 }
2201
2202 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2203 async fn slow_test_merklized_state_api() {
2204 let port = pick_unused_port().expect("No ports free");
2205
2206 let storage = SqlDataSource::create_storage().await;
2207
2208 let options = SqlDataSource::options(&storage, Options::with_port(port));
2209
2210 let network_config = TestConfigBuilder::default().build();
2211 let config = TestNetworkConfigBuilder::default()
2212 .api_config(options)
2213 .network_config(network_config)
2214 .build();
2215 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2216 let url = format!("http://localhost:{port}").parse().unwrap();
2217 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
2218
2219 client.connect(Some(Duration::from_secs(15))).await;
2220
2221 tracing::info!("waiting for blocks");
2223 let blocks = client
2224 .socket("availability/stream/blocks/0")
2225 .subscribe::<BlockQueryData<SeqTypes>>()
2226 .await
2227 .unwrap()
2228 .take(4)
2229 .try_collect::<Vec<_>>()
2230 .await
2231 .unwrap();
2232
2233 tracing::info!("waiting for state to be inserted");
2235 sleep(Duration::from_secs(5)).await;
2236 network.stop_consensus().await;
2237
2238 for block in blocks {
2239 let i = block.height();
2240 tracing::info!(i, "get block state");
2241 let path = client
2242 .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
2243 "block-state/{}/{i}",
2244 i + 1
2245 ))
2246 .send()
2247 .await
2248 .unwrap();
2249 assert_eq!(*path.elem().unwrap(), block.hash());
2250
2251 tracing::info!(i, "get fee state");
2252 let account = TestConfig::<5>::builder_key().fee_account();
2253 let path = client
2254 .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
2255 "fee-state/{}/{}",
2256 i + 1,
2257 account
2258 ))
2259 .send()
2260 .await
2261 .unwrap();
2262 assert_eq!(*path.index(), account);
2263 assert!(*path.elem().unwrap() > 0.into(), "{:?}", path.elem());
2264 }
2265
2266 let account = TestConfig::<5>::builder_key().fee_account();
2268 let amount = client
2269 .get::<Option<FeeAmount>>(&format!("fee-state/fee-balance/latest/{account}"))
2270 .send()
2271 .await
2272 .unwrap()
2273 .unwrap();
2274 let expected = U256::MAX;
2275 assert_eq!(expected, amount.0);
2276 }
2277
2278 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2279 async fn test_leaf_only_data_source() {
2280 let port = pick_unused_port().expect("No ports free");
2281
2282 let storage = SqlDataSource::create_storage().await;
2283 let options =
2284 SqlDataSource::leaf_only_ds_options(&storage, Options::with_port(port)).unwrap();
2285
2286 let network_config = TestConfigBuilder::default().build();
2287 let config = TestNetworkConfigBuilder::default()
2288 .api_config(options)
2289 .network_config(network_config)
2290 .build();
2291 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2292 let url = format!("http://localhost:{port}").parse().unwrap();
2293 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
2294
2295 tracing::info!("waiting for blocks");
2296 client.connect(Some(Duration::from_secs(15))).await;
2297 let account = TestConfig::<5>::builder_key().fee_account();
2300
2301 let _headers = client
2302 .socket("availability/stream/headers/0")
2303 .subscribe::<Header>()
2304 .await
2305 .unwrap()
2306 .take(10)
2307 .try_collect::<Vec<_>>()
2308 .await
2309 .unwrap();
2310
2311 for i in 1..5 {
2312 let leaf = client
2313 .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{i}"))
2314 .send()
2315 .await
2316 .unwrap();
2317
2318 assert_eq!(leaf.height(), i);
2319
2320 let header = client
2321 .get::<Header>(&format!("availability/header/{i}"))
2322 .send()
2323 .await
2324 .unwrap();
2325
2326 assert_eq!(header.height(), i);
2327
2328 let vid = client
2329 .get::<VidCommonQueryData<SeqTypes>>(&format!("availability/vid/common/{i}"))
2330 .send()
2331 .await
2332 .unwrap();
2333
2334 assert_eq!(vid.height(), i);
2335
2336 client
2337 .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
2338 "block-state/{i}/{}",
2339 i - 1
2340 ))
2341 .send()
2342 .await
2343 .unwrap();
2344
2345 client
2346 .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
2347 "fee-state/{}/{}",
2348 i + 1,
2349 account
2350 ))
2351 .send()
2352 .await
2353 .unwrap();
2354 }
2355
2356 client
2359 .get::<BlockQueryData<SeqTypes>>("availability/block/1")
2360 .send()
2361 .await
2362 .unwrap_err();
2363 }
2364
2365 async fn run_catchup_test(url_suffix: &str) {
2366 let port = pick_unused_port().expect("No ports free");
2368 const NUM_NODES: usize = 5;
2369
2370 let url: url::Url = format!("http://localhost:{port}{url_suffix}")
2371 .parse()
2372 .unwrap();
2373
2374 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2375 .api_config(Options::with_port(port))
2376 .network_config(TestConfigBuilder::default().build())
2377 .catchups(std::array::from_fn(|_| {
2378 StatePeers::<StaticVersion<0, 1>>::from_urls(
2379 vec![url.clone()],
2380 Default::default(),
2381 &NoMetrics,
2382 )
2383 }))
2384 .build();
2385 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2386
2387 let mut events = network.peers[0].event_stream().await;
2389 loop {
2390 let event = events.next().await.unwrap();
2391 let EventType::Decide { leaf_chain, .. } = event.event else {
2392 continue;
2393 };
2394 if leaf_chain[0].leaf.height() > 0 {
2395 break;
2396 }
2397 }
2398
2399 tracing::info!("shutting down node");
2404 network.peers.remove(0);
2405
2406 network
2408 .server
2409 .event_stream()
2410 .await
2411 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2412 .take(3)
2413 .collect::<Vec<_>>()
2414 .await;
2415
2416 tracing::info!("restarting node");
2417 let node = network
2418 .cfg
2419 .init_node(
2420 1,
2421 ValidatedState::default(),
2422 no_storage::Options,
2423 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
2424 vec![url],
2425 Default::default(),
2426 &NoMetrics,
2427 )),
2428 None,
2429 &NoMetrics,
2430 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2431 NullEventConsumer,
2432 MockSequencerVersions::new(),
2433 Default::default(),
2434 )
2435 .await;
2436 let mut events = node.event_stream().await;
2437
2438 let mut proposers = [false; NUM_NODES];
2441 loop {
2442 let event = events.next().await.unwrap();
2443 let EventType::Decide { leaf_chain, .. } = event.event else {
2444 continue;
2445 };
2446 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
2447 let height = leaf.height();
2448 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
2449 if height == 0 {
2450 continue;
2451 }
2452
2453 tracing::info!(
2454 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
2455 );
2456 proposers[leaf_builder] = true;
2457 }
2458
2459 if proposers.iter().all(|has_proposed| *has_proposed) {
2460 break;
2461 }
2462 }
2463 }
2464
2465 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2466 async fn test_catchup() {
2467 run_catchup_test("").await;
2468 }
2469
2470 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2471 async fn test_catchup_v0() {
2472 run_catchup_test("/v0").await;
2473 }
2474
2475 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2476 async fn test_catchup_v1() {
2477 run_catchup_test("/v1").await;
2478 }
2479
2480 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2481 async fn test_catchup_no_state_peers() {
2482 let port = pick_unused_port().expect("No ports free");
2484 const NUM_NODES: usize = 5;
2485 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2486 .api_config(Options::with_port(port))
2487 .network_config(TestConfigBuilder::default().build())
2488 .build();
2489 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2490
2491 let mut events = network.peers[0].event_stream().await;
2493 loop {
2494 let event = events.next().await.unwrap();
2495 let EventType::Decide { leaf_chain, .. } = event.event else {
2496 continue;
2497 };
2498 if leaf_chain[0].leaf.height() > 0 {
2499 break;
2500 }
2501 }
2502
2503 tracing::info!("shutting down node");
2508 network.peers.remove(0);
2509
2510 network
2512 .server
2513 .event_stream()
2514 .await
2515 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2516 .take(3)
2517 .collect::<Vec<_>>()
2518 .await;
2519
2520 tracing::info!("restarting node");
2521 let node = network
2522 .cfg
2523 .init_node(
2524 1,
2525 ValidatedState::default(),
2526 no_storage::Options,
2527 None::<NullStateCatchup>,
2528 None,
2529 &NoMetrics,
2530 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2531 NullEventConsumer,
2532 MockSequencerVersions::new(),
2533 Default::default(),
2534 )
2535 .await;
2536 let mut events = node.event_stream().await;
2537
2538 let mut proposers = [false; NUM_NODES];
2541 loop {
2542 let event = events.next().await.unwrap();
2543 let EventType::Decide { leaf_chain, .. } = event.event else {
2544 continue;
2545 };
2546 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
2547 let height = leaf.height();
2548 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
2549 if height == 0 {
2550 continue;
2551 }
2552
2553 tracing::info!(
2554 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
2555 );
2556 proposers[leaf_builder] = true;
2557 }
2558
2559 if proposers.iter().all(|has_proposed| *has_proposed) {
2560 break;
2561 }
2562 }
2563 }
2564
2565 #[ignore]
2566 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2567 async fn test_catchup_epochs_no_state_peers() {
2568 let port = pick_unused_port().expect("No ports free");
2570 const EPOCH_HEIGHT: u64 = 5;
2571 let network_config = TestConfigBuilder::default()
2572 .epoch_height(EPOCH_HEIGHT)
2573 .build();
2574 const NUM_NODES: usize = 5;
2575 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2576 .api_config(Options::with_port(port))
2577 .network_config(network_config)
2578 .build();
2579 let mut network = TestNetwork::new(config, EpochsTestVersions {}).await;
2580
2581 let mut events = network.peers[0].event_stream().await;
2583 loop {
2584 let event = events.next().await.unwrap();
2585 let EventType::Decide { leaf_chain, .. } = event.event else {
2586 continue;
2587 };
2588 tracing::error!("got decide height {}", leaf_chain[0].leaf.height());
2589
2590 if leaf_chain[0].leaf.height() > EPOCH_HEIGHT * 3 {
2591 tracing::error!("decided past one epoch");
2592 break;
2593 }
2594 }
2595
2596 tracing::info!("shutting down node");
2601 network.peers.remove(0);
2602
2603 network
2605 .server
2606 .event_stream()
2607 .await
2608 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2609 .take(3)
2610 .collect::<Vec<_>>()
2611 .await;
2612
2613 tracing::error!("restarting node");
2614 let node = network
2615 .cfg
2616 .init_node(
2617 1,
2618 ValidatedState::default(),
2619 no_storage::Options,
2620 None::<NullStateCatchup>,
2621 None,
2622 &NoMetrics,
2623 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
2624 NullEventConsumer,
2625 MockSequencerVersions::new(),
2626 Default::default(),
2627 )
2628 .await;
2629 let mut events = node.event_stream().await;
2630
2631 let mut proposers = [false; NUM_NODES];
2634 loop {
2635 let event = events.next().await.unwrap();
2636 let EventType::Decide { leaf_chain, .. } = event.event else {
2637 continue;
2638 };
2639 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
2640 let height = leaf.height();
2641 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
2642 if height == 0 {
2643 continue;
2644 }
2645
2646 tracing::info!(
2647 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
2648 );
2649 proposers[leaf_builder] = true;
2650 }
2651
2652 if proposers.iter().all(|has_proposed| *has_proposed) {
2653 break;
2654 }
2655 }
2656 }
2657
2658 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2659 async fn test_chain_config_from_instance() {
2660 let port = pick_unused_port().expect("No ports free");
2665
2666 let chain_config: ChainConfig = ChainConfig::default();
2667
2668 let state = ValidatedState {
2669 chain_config: chain_config.commit().into(),
2670 ..Default::default()
2671 };
2672
2673 let states = std::array::from_fn(|_| state.clone());
2674
2675 let config = TestNetworkConfigBuilder::default()
2676 .api_config(Options::with_port(port))
2677 .states(states)
2678 .catchups(std::array::from_fn(|_| {
2679 StatePeers::<StaticVersion<0, 1>>::from_urls(
2680 vec![format!("http://localhost:{port}").parse().unwrap()],
2681 Default::default(),
2682 &NoMetrics,
2683 )
2684 }))
2685 .network_config(TestConfigBuilder::default().build())
2686 .build();
2687
2688 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2689
2690 network
2692 .server
2693 .event_stream()
2694 .await
2695 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2696 .take(3)
2697 .collect::<Vec<_>>()
2698 .await;
2699
2700 for peer in &network.peers {
2701 let state = peer.consensus().read().await.decided_state().await;
2702
2703 assert_eq!(state.chain_config.resolve().unwrap(), chain_config)
2704 }
2705
2706 network.server.shut_down().await;
2707 drop(network);
2708 }
2709
2710 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2711 async fn test_chain_config_catchup() {
2712 let port = pick_unused_port().expect("No ports free");
2718
2719 let cf = ChainConfig {
2720 max_block_size: 300.into(),
2721 base_fee: 1.into(),
2722 ..Default::default()
2723 };
2724
2725 let state1 = ValidatedState {
2727 chain_config: cf.commit().into(),
2728 ..Default::default()
2729 };
2730
2731 let state2 = ValidatedState {
2733 chain_config: cf.into(),
2734 ..Default::default()
2735 };
2736
2737 let mut states = std::array::from_fn(|_| state1.clone());
2738 states[0] = state2;
2741
2742 const NUM_NODES: usize = 5;
2743 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2744 .api_config(Options::from(options::Http {
2745 port,
2746 max_connections: None,
2747 }))
2748 .states(states)
2749 .catchups(std::array::from_fn(|_| {
2750 StatePeers::<StaticVersion<0, 1>>::from_urls(
2751 vec![format!("http://localhost:{port}").parse().unwrap()],
2752 Default::default(),
2753 &NoMetrics,
2754 )
2755 }))
2756 .network_config(TestConfigBuilder::default().build())
2757 .build();
2758
2759 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2760
2761 network
2763 .server
2764 .event_stream()
2765 .await
2766 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
2767 .take(3)
2768 .collect::<Vec<_>>()
2769 .await;
2770
2771 for peer in &network.peers {
2772 let state = peer.consensus().read().await.decided_state().await;
2773
2774 assert_eq!(state.chain_config.resolve().unwrap(), cf)
2775 }
2776
2777 network.server.shut_down().await;
2778 drop(network);
2779 }
2780
2781 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2782 async fn test_pos_upgrade_view_based() {
2783 type PosUpgrade = SequencerVersions<FeeVersion, EpochVersion>;
2784 test_upgrade_helper::<PosUpgrade>(PosUpgrade::new()).await;
2785 }
2786
2787 async fn test_upgrade_helper<V: Versions>(version: V) {
2788 let wait_extra_views = 10;
2791 const NUM_NODES: usize = 5;
2793 let upgrade_version = <V as Versions>::Upgrade::VERSION;
2794 let port = pick_unused_port().expect("No ports free");
2795
2796 let test_config = TestConfigBuilder::default()
2797 .epoch_height(200)
2798 .epoch_start_block(321)
2799 .set_upgrades(upgrade_version)
2800 .await
2801 .build();
2802
2803 let chain_config_upgrade = test_config.get_upgrade_map().chain_config(upgrade_version);
2804 tracing::debug!(?chain_config_upgrade);
2805
2806 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
2807 .api_config(Options::from(options::Http {
2808 port,
2809 max_connections: None,
2810 }))
2811 .catchups(std::array::from_fn(|_| {
2812 StatePeers::<SequencerApiVersion>::from_urls(
2813 vec![format!("http://localhost:{port}").parse().unwrap()],
2814 Default::default(),
2815 &NoMetrics,
2816 )
2817 }))
2818 .network_config(test_config)
2819 .build();
2820
2821 let mut network = TestNetwork::new(config, version).await;
2822 let mut events = network.server.event_stream().await;
2823
2824 let upgrade = loop {
2828 let event = events.next().await.unwrap();
2829 match event.event {
2830 EventType::UpgradeProposal { proposal, .. } => {
2831 tracing::info!(?proposal, "proposal");
2832 let upgrade = proposal.data.upgrade_proposal;
2833 let new_version = upgrade.new_version;
2834 tracing::info!(?new_version, "upgrade proposal new version");
2835 assert_eq!(new_version, <V as Versions>::Upgrade::VERSION);
2836 break upgrade;
2837 },
2838 _ => continue,
2839 }
2840 };
2841
2842 let wanted_view = upgrade.new_version_first_view + wait_extra_views;
2843 loop {
2845 let event = events.next().await.unwrap();
2846 let view_number = event.view_number;
2847
2848 tracing::debug!(?view_number, ?upgrade.new_version_first_view, "upgrade_new_view");
2849 if view_number > wanted_view {
2850 let states: Vec<_> = network
2851 .peers
2852 .iter()
2853 .map(|peer| async { peer.consensus().read().await.decided_state().await })
2854 .collect();
2855
2856 let configs: Option<Vec<ChainConfig>> = join_all(states)
2857 .await
2858 .iter()
2859 .map(|state| state.chain_config.resolve())
2860 .collect();
2861
2862 tracing::debug!(?configs, "`ChainConfig`s for nodes");
2863 if let Some(configs) = configs {
2864 for config in configs {
2865 assert_eq!(config, chain_config_upgrade);
2866 }
2867 break; }
2869 }
2870 sleep(Duration::from_millis(200)).await;
2871 }
2872
2873 network.server.shut_down().await;
2874 }
2875
2876 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2877 pub(crate) async fn test_restart() {
2878 const NUM_NODES: usize = 5;
2879 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
2881 let persistence: [_; NUM_NODES] = storage
2882 .iter()
2883 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
2884 .collect::<Vec<_>>()
2885 .try_into()
2886 .unwrap();
2887 let port = pick_unused_port().unwrap();
2888 let config = TestNetworkConfigBuilder::default()
2889 .api_config(SqlDataSource::options(
2890 &storage[0],
2891 Options::with_port(port),
2892 ))
2893 .persistences(persistence.clone())
2894 .network_config(TestConfigBuilder::default().build())
2895 .build();
2896 let mut network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2897
2898 let client: Client<ServerError, SequencerApiVersion> =
2900 Client::new(format!("http://localhost:{port}").parse().unwrap());
2901 client.connect(None).await;
2902 tracing::info!(port, "server running");
2903
2904 client
2906 .socket("availability/stream/blocks/0")
2907 .subscribe::<BlockQueryData<SeqTypes>>()
2908 .await
2909 .unwrap()
2910 .take(3)
2911 .collect::<Vec<_>>()
2912 .await;
2913
2914 tracing::info!("shutting down nodes");
2916 network.stop_consensus().await;
2917
2918 let height = client
2920 .get::<usize>("status/block-height")
2921 .send()
2922 .await
2923 .unwrap();
2924 tracing::info!("decided {height} blocks before shutting down");
2925
2926 let chain: Vec<LeafQueryData<SeqTypes>> = client
2928 .socket("availability/stream/leaves/0")
2929 .subscribe()
2930 .await
2931 .unwrap()
2932 .take(height)
2933 .try_collect()
2934 .await
2935 .unwrap();
2936 let decided_view = chain.last().unwrap().leaf().view_number();
2937
2938 let state = network.server.decided_state().await;
2941 tracing::info!(?decided_view, ?state, "consensus state");
2942
2943 drop(network);
2945
2946 let port = pick_unused_port().expect("No ports free");
2948
2949 let config = TestNetworkConfigBuilder::default()
2950 .api_config(SqlDataSource::options(
2951 &storage[0],
2952 Options::with_port(port),
2953 ))
2954 .persistences(persistence)
2955 .catchups(std::array::from_fn(|_| {
2956 StatePeers::<StaticVersion<0, 1>>::from_urls(
2960 vec![format!("http://localhost:{port}").parse().unwrap()],
2961 Default::default(),
2962 &NoMetrics,
2963 )
2964 }))
2965 .network_config(TestConfigBuilder::default().build())
2966 .build();
2967 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
2968 let client: Client<ServerError, StaticVersion<0, 1>> =
2969 Client::new(format!("http://localhost:{port}").parse().unwrap());
2970 client.connect(None).await;
2971 tracing::info!(port, "server running");
2972
2973 tracing::info!("waiting for decide, height {height}");
2975 let new_leaf: LeafQueryData<SeqTypes> = client
2976 .socket(&format!("availability/stream/leaves/{height}"))
2977 .subscribe()
2978 .await
2979 .unwrap()
2980 .next()
2981 .await
2982 .unwrap()
2983 .unwrap();
2984 assert_eq!(new_leaf.height(), height as u64);
2985 assert_eq!(
2986 new_leaf.leaf().parent_commitment(),
2987 chain[height - 1].hash()
2988 );
2989
2990 let new_chain: Vec<LeafQueryData<SeqTypes>> = client
2992 .socket("availability/stream/leaves/0")
2993 .subscribe()
2994 .await
2995 .unwrap()
2996 .take(height)
2997 .try_collect()
2998 .await
2999 .unwrap();
3000 assert_eq!(chain, new_chain);
3001 }
3002
3003 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3004 async fn test_fetch_config() {
3005 let port = pick_unused_port().expect("No ports free");
3006 let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap();
3007 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url.clone());
3008
3009 let options = Options::with_port(port).config(Default::default());
3010 let network_config = TestConfigBuilder::default().build();
3011 let config = TestNetworkConfigBuilder::default()
3012 .api_config(options)
3013 .network_config(network_config)
3014 .build();
3015 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3016 client.connect(None).await;
3017
3018 let peers = StatePeers::<StaticVersion<0, 1>>::from_urls(
3021 vec!["https://notarealnode.network".parse().unwrap(), url],
3022 Default::default(),
3023 &NoMetrics,
3024 );
3025
3026 let validator =
3028 ValidatorConfig::generated_from_seed_indexed([0; 32], 1, U256::from(1), false);
3029 let config = peers.fetch_config(validator.clone()).await.unwrap();
3030
3031 assert_eq!(config.node_index, 1);
3033
3034 pretty_assertions::assert_eq!(
3037 serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(),
3038 serde_json::to_value(PublicHotShotConfig::from(
3039 network.cfg.hotshot_config().clone()
3040 ))
3041 .unwrap()
3042 );
3043 }
3044
3045 async fn run_hotshot_event_streaming_test(url_suffix: &str) {
3046 let query_service_port = pick_unused_port().expect("No ports free for query service");
3047
3048 let url = format!("http://localhost:{query_service_port}{url_suffix}")
3049 .parse()
3050 .unwrap();
3051
3052 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3053
3054 let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3055
3056 let network_config = TestConfigBuilder::default().build();
3057 let config = TestNetworkConfigBuilder::default()
3058 .api_config(options)
3059 .network_config(network_config)
3060 .build();
3061 let _network = TestNetwork::new(config, MockSequencerVersions::new()).await;
3062
3063 let mut subscribed_events = client
3064 .socket("hotshot-events/events")
3065 .subscribe::<Event<SeqTypes>>()
3066 .await
3067 .unwrap();
3068
3069 let total_count = 5;
3070 let mut receive_count = 0;
3072 loop {
3073 let event = subscribed_events.next().await.unwrap();
3074 tracing::info!("Received event in hotshot event streaming Client 1: {event:?}");
3075 receive_count += 1;
3076 if receive_count > total_count {
3077 tracing::info!("Client Received at least desired events, exiting loop");
3078 break;
3079 }
3080 }
3081 assert_eq!(receive_count, total_count + 1);
3082 }
3083
3084 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3085 async fn test_hotshot_event_streaming_v0() {
3086 run_hotshot_event_streaming_test("/v0").await;
3087 }
3088
3089 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3090 async fn test_hotshot_event_streaming_v1() {
3091 run_hotshot_event_streaming_test("/v1").await;
3092 }
3093
3094 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3095 async fn test_hotshot_event_streaming() {
3096 run_hotshot_event_streaming_test("").await;
3097 }
3098
3099 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3102 async fn test_hotshot_event_streaming_epoch_progression() {
3103 let epoch_height = 35;
3104 let wanted_epochs = 4;
3105
3106 let network_config = TestConfigBuilder::default()
3107 .epoch_height(epoch_height)
3108 .build();
3109
3110 let query_service_port = pick_unused_port().expect("No ports free for query service");
3111
3112 let hotshot_url = format!("http://localhost:{query_service_port}")
3113 .parse()
3114 .unwrap();
3115
3116 let client: Client<ServerError, SequencerApiVersion> = Client::new(hotshot_url);
3117 let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3118
3119 let config = TestNetworkConfigBuilder::default()
3120 .api_config(options)
3121 .network_config(network_config.clone())
3122 .pos_hook::<PosVersionV3>(DelegationConfig::VariableAmounts, Default::default())
3123 .await
3124 .expect("Pos Deployment")
3125 .build();
3126
3127 let _network = TestNetwork::new(config, PosVersionV3::new()).await;
3128
3129 let mut subscribed_events = client
3130 .socket("hotshot-events/events")
3131 .subscribe::<Event<SeqTypes>>()
3132 .await
3133 .unwrap();
3134
3135 let wanted_views = epoch_height * wanted_epochs;
3136
3137 let mut views = HashSet::new();
3138 let mut epochs = HashSet::new();
3139 for _ in 0..=600 {
3140 let event = subscribed_events.next().await.unwrap();
3141 let event = event.unwrap();
3142 let view_number = event.view_number;
3143 views.insert(view_number.u64());
3144
3145 if let hotshot::types::EventType::Decide { qc, .. } = event.event {
3146 assert!(qc.data.epoch.is_some(), "epochs are live");
3147 assert!(qc.data.block_number.is_some());
3148
3149 let epoch = qc.data.epoch.unwrap().u64();
3150 epochs.insert(epoch);
3151
3152 tracing::debug!(
3153 "Got decide: epoch: {:?}, block: {:?} ",
3154 epoch,
3155 qc.data.block_number
3156 );
3157
3158 let expected_epoch =
3159 epoch_from_block_number(qc.data.block_number.unwrap(), epoch_height);
3160 tracing::debug!("expected epoch: {expected_epoch}, qc epoch: {epoch}");
3161
3162 assert_eq!(expected_epoch, epoch);
3163 }
3164 if views.contains(&wanted_views) {
3165 tracing::info!("Client Received at least desired views, exiting loop");
3166 break;
3167 }
3168 }
3169
3170 assert!(views.contains(&wanted_views), "Views are not progressing");
3172 assert!(
3173 epochs.contains(&wanted_epochs),
3174 "Epochs are not progressing"
3175 );
3176 }
3177
3178 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3179 async fn test_pos_rewards_basic() -> anyhow::Result<()> {
3180 let epoch_height = 20;
3187
3188 let network_config = TestConfigBuilder::default()
3189 .epoch_height(epoch_height)
3190 .build();
3191
3192 let api_port = pick_unused_port().expect("No ports free for query service");
3193
3194 const NUM_NODES: usize = 1;
3195 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3197 let persistence: [_; NUM_NODES] = storage
3198 .iter()
3199 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3200 .collect::<Vec<_>>()
3201 .try_into()
3202 .unwrap();
3203
3204 let config = TestNetworkConfigBuilder::with_num_nodes()
3205 .api_config(SqlDataSource::options(
3206 &storage[0],
3207 Options::with_port(api_port),
3208 ))
3209 .network_config(network_config.clone())
3210 .persistences(persistence.clone())
3211 .catchups(std::array::from_fn(|_| {
3212 StatePeers::<StaticVersion<0, 1>>::from_urls(
3213 vec![format!("http://localhost:{api_port}").parse().unwrap()],
3214 Default::default(),
3215 &NoMetrics,
3216 )
3217 }))
3218 .pos_hook::<PosVersionV3>(DelegationConfig::VariableAmounts, Default::default())
3219 .await
3220 .unwrap()
3221 .build();
3222
3223 let network = TestNetwork::new(config, PosVersionV3::new()).await;
3224 let client: Client<ServerError, SequencerApiVersion> =
3225 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3226
3227 let _blocks = client
3232 .socket("availability/stream/blocks/0")
3233 .subscribe::<BlockQueryData<SeqTypes>>()
3234 .await
3235 .unwrap()
3236 .take(65)
3237 .try_collect::<Vec<_>>()
3238 .await
3239 .unwrap();
3240
3241 let staking_priv_keys = network_config.staking_priv_keys();
3242 let account = staking_priv_keys[0].0.clone();
3243 let address = account.address();
3244
3245 let block_height = 60;
3246
3247 let amount = client
3249 .get::<Option<RewardAmount>>(&format!(
3250 "reward-state/reward-balance/{block_height}/{address}"
3251 ))
3252 .send()
3253 .await
3254 .unwrap()
3255 .unwrap();
3256
3257 tracing::info!("amount={amount:?}");
3258
3259 let epoch_start_block = 40;
3260
3261 let node_state = network.server.node_state();
3262 let membership = node_state.coordinator.membership().read().await;
3263 let block_reward = membership
3264 .block_reward(None)
3265 .expect("block reward is not None");
3266 drop(membership);
3267
3268 let expected_amount = block_reward.0 * (U256::from(block_height - epoch_start_block));
3270
3271 assert_eq!(amount.0, expected_amount, "reward amount don't match");
3272
3273 Ok(())
3274 }
3275
3276 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3277 async fn test_cumulative_pos_rewards() -> anyhow::Result<()> {
3278 let epoch_height = 20;
3284
3285 let network_config = TestConfigBuilder::default()
3286 .epoch_height(epoch_height)
3287 .build();
3288
3289 let api_port = pick_unused_port().expect("No ports free for query service");
3290
3291 const NUM_NODES: usize = 5;
3292 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3294 let persistence: [_; NUM_NODES] = storage
3295 .iter()
3296 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3297 .collect::<Vec<_>>()
3298 .try_into()
3299 .unwrap();
3300
3301 let config = TestNetworkConfigBuilder::with_num_nodes()
3302 .api_config(SqlDataSource::options(
3303 &storage[0],
3304 Options::with_port(api_port),
3305 ))
3306 .network_config(network_config)
3307 .persistences(persistence.clone())
3308 .catchups(std::array::from_fn(|_| {
3309 StatePeers::<StaticVersion<0, 1>>::from_urls(
3310 vec![format!("http://localhost:{api_port}").parse().unwrap()],
3311 Default::default(),
3312 &NoMetrics,
3313 )
3314 }))
3315 .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3316 .await
3317 .unwrap()
3318 .build();
3319
3320 let network = TestNetwork::new(config, PosVersionV3::new()).await;
3321 let node_state = network.server.node_state();
3322 let membership = node_state.coordinator.membership().read().await;
3323 let block_reward = membership
3324 .block_reward(None)
3325 .expect("block reward is not None");
3326 drop(membership);
3327 let client: Client<ServerError, SequencerApiVersion> =
3328 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3329
3330 let _blocks = client
3332 .socket("availability/stream/blocks/0")
3333 .subscribe::<BlockQueryData<SeqTypes>>()
3334 .await
3335 .unwrap()
3336 .take(75)
3337 .try_collect::<Vec<_>>()
3338 .await
3339 .unwrap();
3340
3341 let validators = client
3345 .get::<ValidatorMap>("node/validators/3")
3346 .send()
3347 .await
3348 .expect("failed to get validator");
3349
3350 let mut addresses = HashSet::new();
3354 for v in validators.values() {
3355 addresses.insert(v.account);
3356 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3357 }
3358 let validators = client
3360 .get::<ValidatorMap>("node/validators/4")
3361 .send()
3362 .await
3363 .expect("failed to get validator");
3364 for v in validators.values() {
3365 addresses.insert(v.account);
3366 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3367 }
3368
3369 let mut prev_cumulative_amount = U256::ZERO;
3370 for block in 41..=67 {
3373 let mut cumulative_amount = U256::ZERO;
3374 for address in addresses.clone() {
3375 let amount = client
3376 .get::<Option<RewardAmount>>(&format!(
3377 "reward-state/reward-balance/{block}/{address}"
3378 ))
3379 .send()
3380 .await
3381 .ok()
3382 .flatten();
3383
3384 if let Some(amount) = amount {
3385 tracing::info!("address={address}, amount={amount}");
3386 cumulative_amount += amount.0;
3387 };
3388 }
3389
3390 assert_eq!(cumulative_amount - prev_cumulative_amount, block_reward.0);
3392 tracing::info!("cumulative_amount is correct for block={block}");
3393 prev_cumulative_amount = cumulative_amount;
3394 }
3395
3396 Ok(())
3397 }
3398
3399 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3400 async fn test_stake_table_duplicate_events_from_contract() -> anyhow::Result<()> {
3401 let epoch_height = 20;
3405
3406 let network_config = TestConfigBuilder::default()
3407 .epoch_height(epoch_height)
3408 .build();
3409
3410 let api_port = pick_unused_port().expect("No ports free for query service");
3411
3412 const NUM_NODES: usize = 5;
3413 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3415 let persistence: [_; NUM_NODES] = storage
3416 .iter()
3417 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3418 .collect::<Vec<_>>()
3419 .try_into()
3420 .unwrap();
3421
3422 let l1_url = network_config.l1_url();
3423 let config = TestNetworkConfigBuilder::with_num_nodes()
3424 .api_config(SqlDataSource::options(
3425 &storage[0],
3426 Options::with_port(api_port),
3427 ))
3428 .network_config(network_config)
3429 .persistences(persistence.clone())
3430 .catchups(std::array::from_fn(|_| {
3431 StatePeers::<StaticVersion<0, 1>>::from_urls(
3432 vec![format!("http://localhost:{api_port}").parse().unwrap()],
3433 Default::default(),
3434 &NoMetrics,
3435 )
3436 }))
3437 .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3438 .await
3439 .unwrap()
3440 .build();
3441
3442 let network = TestNetwork::new(config, PosVersionV3::new()).await;
3443
3444 let mut prev_st = None;
3445 let state = network.server.decided_state().await;
3446 let chain_config = state.chain_config.resolve().expect("resolve chain config");
3447 let stake_table = chain_config.stake_table_contract.unwrap();
3448
3449 let l1_client = L1ClientOptions::default()
3450 .connect(vec![l1_url])
3451 .expect("failed to connect to l1");
3452
3453 let client: Client<ServerError, SequencerApiVersion> =
3454 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3455
3456 let mut headers = client
3457 .socket("availability/stream/headers/0")
3458 .subscribe::<Header>()
3459 .await
3460 .unwrap();
3461
3462 let mut target_bh = 0;
3463 while let Some(header) = headers.next().await {
3464 let header = header.unwrap();
3465 if header.height() == 0 {
3466 continue;
3467 }
3468 let l1_block = header.l1_finalized().expect("l1 block not found");
3469
3470 let events = Fetcher::fetch_events_from_contract(
3471 l1_client.clone(),
3472 stake_table,
3473 None,
3474 l1_block.number(),
3475 )
3476 .await;
3477 let sorted_events = events.sort_events().expect("failed to sort");
3478
3479 let mut sorted_dedup_removed = sorted_events.clone();
3480 sorted_dedup_removed.dedup();
3481
3482 assert_eq!(
3483 sorted_events.len(),
3484 sorted_dedup_removed.len(),
3485 "duplicates found"
3486 );
3487
3488 let stake_table =
3490 validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e)).unwrap();
3491 if let Some(prev_st) = prev_st {
3492 assert_eq!(stake_table, prev_st);
3493 }
3494
3495 prev_st = Some(stake_table);
3496
3497 if target_bh == 100 {
3498 break;
3499 }
3500
3501 target_bh = header.height();
3502 }
3503
3504 Ok(())
3505 }
3506
3507 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3508 async fn test_rewards_v3() -> anyhow::Result<()> {
3509 const EPOCH_HEIGHT: u64 = 20;
3515
3516 let network_config = TestConfigBuilder::default()
3517 .epoch_height(EPOCH_HEIGHT)
3518 .build();
3519
3520 let api_port = pick_unused_port().expect("No ports free for query service");
3521
3522 const NUM_NODES: usize = 7;
3523
3524 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3525 let persistence: [_; NUM_NODES] = storage
3526 .iter()
3527 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3528 .collect::<Vec<_>>()
3529 .try_into()
3530 .unwrap();
3531
3532 let config = TestNetworkConfigBuilder::with_num_nodes()
3533 .api_config(SqlDataSource::options(
3534 &storage[0],
3535 Options::with_port(api_port),
3536 ))
3537 .network_config(network_config)
3538 .persistences(persistence.clone())
3539 .catchups(std::array::from_fn(|_| {
3540 StatePeers::<StaticVersion<0, 1>>::from_urls(
3541 vec![format!("http://localhost:{api_port}").parse().unwrap()],
3542 Default::default(),
3543 &NoMetrics,
3544 )
3545 }))
3546 .pos_hook::<PosVersionV3>(DelegationConfig::MultipleDelegators, Default::default())
3547 .await
3548 .unwrap()
3549 .build();
3550
3551 let network = TestNetwork::new(config, PosVersionV3::new()).await;
3552 let client: Client<ServerError, SequencerApiVersion> =
3553 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3554
3555 let mut events = network.peers[0].event_stream().await;
3557 while let Some(event) = events.next().await {
3558 if let EventType::Decide { leaf_chain, .. } = event.event {
3559 let height = leaf_chain[0].leaf.height();
3560 tracing::info!("Node 0 decided at height: {height}");
3561 if height > EPOCH_HEIGHT * 3 {
3562 break;
3563 }
3564 }
3565 }
3566
3567 {
3569 client
3570 .get::<ValidatorMap>("node/validators/1")
3571 .send()
3572 .await
3573 .unwrap()
3574 .is_empty();
3575
3576 client
3577 .get::<ValidatorMap>("node/validators/2")
3578 .send()
3579 .await
3580 .unwrap()
3581 .is_empty();
3582 }
3583
3584 let validators = client
3586 .get::<ValidatorMap>("node/validators/3")
3587 .send()
3588 .await
3589 .expect("validators");
3590
3591 assert!(!validators.is_empty());
3592
3593 let mut addresses = HashSet::new();
3595 for v in validators.values() {
3596 addresses.insert(v.account);
3597 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3598 }
3599
3600 for block in 0..=EPOCH_HEIGHT * 2 {
3602 for address in addresses.clone() {
3603 let amount = client
3604 .get::<Option<RewardAmount>>(&format!(
3605 "reward-state/reward-balance/{block}/{address}"
3606 ))
3607 .send()
3608 .await
3609 .ok()
3610 .flatten();
3611 assert!(amount.is_none(), "amount is not none for block {block}")
3612 }
3613 }
3614
3615 let leaves = client
3617 .socket("availability/stream/leaves/41")
3618 .subscribe::<LeafQueryData<SeqTypes>>()
3619 .await
3620 .unwrap()
3621 .take((EPOCH_HEIGHT * 3).try_into().unwrap())
3622 .try_collect::<Vec<_>>()
3623 .await
3624 .unwrap();
3625
3626 let node_state = network.server.node_state();
3627 let coordinator = node_state.coordinator;
3628
3629 let membership = coordinator.membership().read().await;
3630 let block_reward = membership
3631 .block_reward(None)
3632 .expect("block reward is not None");
3633
3634 drop(membership);
3635
3636 let mut rewards_map = HashMap::new();
3637
3638 for leaf in leaves {
3639 let block = leaf.height();
3640 tracing::info!("verify rewards for block={block:?}");
3641 let membership = coordinator.membership().read().await;
3642 let epoch = epoch_from_block_number(block, EPOCH_HEIGHT);
3643 let epoch_number = EpochNumber::new(epoch);
3644 let leader = membership
3645 .leader(leaf.leaf().view_number(), Some(epoch_number))
3646 .expect("leader");
3647 let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
3648
3649 drop(membership);
3650
3651 let validators = client
3652 .get::<ValidatorMap>(&format!("node/validators/{epoch}"))
3653 .send()
3654 .await
3655 .expect("validators");
3656
3657 let leader_validator = validators
3658 .get(&leader_eth_address)
3659 .expect("leader not found");
3660
3661 let distributor =
3662 RewardDistributor::new(leader_validator.clone(), block_reward, U256::ZERO.into());
3663 for validator in validators.values() {
3665 let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
3666
3667 assert_eq!(delegator_stake_sum, validator.stake);
3668 }
3669
3670 let computed_rewards = distributor.compute_rewards().expect("reward computation");
3671
3672 let total_reward = block_reward.0;
3679 let leader_commission_basis_points = U256::from(leader_validator.commission);
3680 let calculated_leader_commission_reward = leader_commission_basis_points
3681 .checked_mul(total_reward)
3682 .context("overflow")?
3683 .checked_div(U256::from(COMMISSION_BASIS_POINTS))
3684 .context("overflow")?;
3685
3686 assert!(
3687 computed_rewards.leader_commission().0 - calculated_leader_commission_reward
3688 <= U256::from(10_u64)
3689 );
3690
3691 let leader_commission = *computed_rewards.leader_commission();
3697 for (address, amount) in computed_rewards.delegators().clone() {
3698 rewards_map
3699 .entry(address)
3700 .and_modify(|entry| *entry += amount)
3701 .or_insert(amount);
3702 }
3703
3704 rewards_map
3706 .entry(leader_eth_address)
3707 .and_modify(|entry| *entry += leader_commission)
3708 .or_insert(leader_commission);
3709
3710 for (address, calculated_amount) in rewards_map.iter() {
3712 let amount_from_api = client
3713 .get::<Option<RewardAmount>>(&format!(
3714 "reward-state/reward-balance/{block}/{address}"
3715 ))
3716 .send()
3717 .await
3718 .ok()
3719 .flatten()
3720 .expect("amount");
3721 assert_eq!(amount_from_api, *calculated_amount)
3722 }
3723 }
3724
3725 Ok(())
3726 }
3727
3728 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3729 async fn test_rewards_v4() -> anyhow::Result<()> {
3730 const EPOCH_HEIGHT: u64 = 20;
3740
3741 type V4 = SequencerVersions<StaticVersion<0, 4>, StaticVersion<0, 0>>;
3742
3743 let network_config = TestConfigBuilder::default()
3744 .epoch_height(EPOCH_HEIGHT)
3745 .build();
3746
3747 let api_port = pick_unused_port().expect("No ports free for query service");
3748
3749 const NUM_NODES: usize = 5;
3750
3751 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3752 let persistence: [_; NUM_NODES] = storage
3753 .iter()
3754 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3755 .collect::<Vec<_>>()
3756 .try_into()
3757 .unwrap();
3758
3759 let config = TestNetworkConfigBuilder::with_num_nodes()
3760 .api_config(SqlDataSource::options(
3761 &storage[0],
3762 Options::with_port(api_port),
3763 ))
3764 .network_config(network_config)
3765 .persistences(persistence.clone())
3766 .catchups(std::array::from_fn(|_| {
3767 StatePeers::<StaticVersion<0, 1>>::from_urls(
3768 vec![format!("http://localhost:{api_port}").parse().unwrap()],
3769 Default::default(),
3770 &NoMetrics,
3771 )
3772 }))
3773 .pos_hook::<V4>(DelegationConfig::MultipleDelegators, Default::default())
3774 .await
3775 .unwrap()
3776 .build();
3777
3778 let network = TestNetwork::new(config, V4::new()).await;
3779 let client: Client<ServerError, SequencerApiVersion> =
3780 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
3781
3782 let mut events = network.peers[0].event_stream().await;
3784 while let Some(event) = events.next().await {
3785 if let EventType::Decide { leaf_chain, .. } = event.event {
3786 let height = leaf_chain[0].leaf.height();
3787 tracing::info!("Node 0 decided at height: {height}");
3788 if height > EPOCH_HEIGHT * 3 {
3789 break;
3790 }
3791 }
3792 }
3793
3794 {
3796 client
3797 .get::<ValidatorMap>("node/validators/1")
3798 .send()
3799 .await
3800 .unwrap()
3801 .is_empty();
3802
3803 client
3804 .get::<ValidatorMap>("node/validators/2")
3805 .send()
3806 .await
3807 .unwrap()
3808 .is_empty();
3809 }
3810
3811 let validators = client
3813 .get::<ValidatorMap>("node/validators/3")
3814 .send()
3815 .await
3816 .expect("validators");
3817
3818 assert!(!validators.is_empty());
3819
3820 let mut addresses = HashSet::new();
3822 for v in validators.values() {
3823 addresses.insert(v.account);
3824 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
3825 }
3826
3827 let mut leaves = client
3828 .socket("availability/stream/leaves/0")
3829 .subscribe::<LeafQueryData<SeqTypes>>()
3830 .await
3831 .unwrap();
3832
3833 let node_state = network.server.node_state();
3834 let coordinator = node_state.coordinator;
3835
3836 let membership = coordinator.membership().read().await;
3837
3838 while let Some(leaf) = leaves.next().await {
3840 let leaf = leaf.unwrap();
3841 let header = leaf.header();
3842 assert_eq!(header.total_reward_distributed().unwrap().0, U256::ZERO);
3843
3844 let epoch_number =
3845 EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
3846
3847 assert!(membership.block_reward(Some(epoch_number)).is_none());
3848
3849 let height = header.height();
3850 for address in addresses.clone() {
3851 let amount = client
3852 .get::<Option<RewardAmount>>(&format!(
3853 "reward-state-v2/reward-balance/{height}/{address}"
3854 ))
3855 .send()
3856 .await
3857 .ok()
3858 .flatten();
3859 assert!(amount.is_none(), "amount is not none for block {height}")
3860 }
3861
3862 if leaf.height() == EPOCH_HEIGHT * 2 {
3863 break;
3864 }
3865 }
3866
3867 drop(membership);
3868
3869 let mut rewards_map = HashMap::new();
3870 let mut total_distributed = U256::ZERO;
3871 let mut epoch_rewards = HashMap::<EpochNumber, U256>::new();
3872
3873 while let Some(leaf) = leaves.next().await {
3874 let leaf = leaf.unwrap();
3875
3876 let header = leaf.header();
3877 let distributed = header
3878 .total_reward_distributed()
3879 .expect("rewards distributed is none");
3880
3881 let block = leaf.height();
3882 tracing::info!("verify rewards for block={block:?}");
3883 let membership = coordinator.membership().read().await;
3884 let epoch_number =
3885 EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
3886
3887 let block_reward = membership.block_reward(Some(epoch_number)).unwrap();
3888 let leader = membership
3889 .leader(leaf.leaf().view_number(), Some(epoch_number))
3890 .expect("leader");
3891 let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
3892
3893 drop(membership);
3894
3895 let validators = client
3896 .get::<ValidatorMap>(&format!("node/validators/{epoch_number}"))
3897 .send()
3898 .await
3899 .expect("validators");
3900
3901 let leader_validator = validators
3902 .get(&leader_eth_address)
3903 .expect("leader not found");
3904
3905 let distributor =
3906 RewardDistributor::new(leader_validator.clone(), block_reward, distributed);
3907 for validator in validators.values() {
3909 let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
3910
3911 assert_eq!(delegator_stake_sum, validator.stake);
3912 }
3913
3914 let computed_rewards = distributor.compute_rewards().expect("reward computation");
3915
3916 let total_reward = block_reward.0;
3918 let leader_commission_basis_points = U256::from(leader_validator.commission);
3919 let calculated_leader_commission_reward = leader_commission_basis_points
3920 .checked_mul(total_reward)
3921 .context("overflow")?
3922 .checked_div(U256::from(COMMISSION_BASIS_POINTS))
3923 .context("overflow")?;
3924
3925 assert!(
3926 computed_rewards.leader_commission().0 - calculated_leader_commission_reward
3927 <= U256::from(10_u64)
3928 );
3929
3930 let leader_commission = *computed_rewards.leader_commission();
3932 for (address, amount) in computed_rewards.delegators().clone() {
3933 rewards_map
3934 .entry(address)
3935 .and_modify(|entry| *entry += amount)
3936 .or_insert(amount);
3937 }
3938
3939 rewards_map
3941 .entry(leader_eth_address)
3942 .and_modify(|entry| *entry += leader_commission)
3943 .or_insert(leader_commission);
3944
3945 for (address, calculated_amount) in rewards_map.iter() {
3947 let mut attempt = 0;
3948 let amount_from_api = loop {
3949 let result = client
3950 .get::<Option<RewardAmount>>(&format!(
3951 "reward-state-v2/reward-balance/{block}/{address}"
3952 ))
3953 .send()
3954 .await
3955 .ok()
3956 .flatten();
3957
3958 if let Some(amount) = result {
3959 break amount;
3960 }
3961
3962 attempt += 1;
3963 if attempt >= 3 {
3964 panic!(
3965 "Failed to fetch reward amount for address {address} after 3 retries"
3966 );
3967 }
3968
3969 sleep(Duration::from_secs(2)).await;
3970 };
3971
3972 assert_eq!(amount_from_api, *calculated_amount);
3973 }
3974
3975 total_distributed += block_reward.0;
3977 assert_eq!(
3978 header.total_reward_distributed().unwrap().0,
3979 total_distributed
3980 );
3981
3982 epoch_rewards
3984 .entry(epoch_number)
3985 .and_modify(|r| assert_eq!(*r, block_reward.0))
3986 .or_insert(block_reward.0);
3987
3988 if leaf.height() == EPOCH_HEIGHT * 5 {
3990 break;
3991 }
3992 }
3993
3994 Ok(())
3995 }
3996
3997 #[rstest]
3998 #[case(PosVersionV3::new())]
3999 #[case(PosVersionV4::new())]
4000 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4001
4002 async fn test_node_stake_table_api<Ver: Versions>(#[case] ver: Ver) {
4003 let epoch_height = 20;
4004
4005 let network_config = TestConfigBuilder::default()
4006 .epoch_height(epoch_height)
4007 .build();
4008
4009 let api_port = pick_unused_port().expect("No ports free for query service");
4010
4011 const NUM_NODES: usize = 2;
4012 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4014 let persistence: [_; NUM_NODES] = storage
4015 .iter()
4016 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4017 .collect::<Vec<_>>()
4018 .try_into()
4019 .unwrap();
4020
4021 let config = TestNetworkConfigBuilder::with_num_nodes()
4022 .api_config(SqlDataSource::options(
4023 &storage[0],
4024 Options::with_port(api_port),
4025 ))
4026 .network_config(network_config)
4027 .persistences(persistence.clone())
4028 .catchups(std::array::from_fn(|_| {
4029 StatePeers::<StaticVersion<0, 1>>::from_urls(
4030 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4031 Default::default(),
4032 &NoMetrics,
4033 )
4034 }))
4035 .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4036 .await
4037 .unwrap()
4038 .build();
4039
4040 let _network = TestNetwork::new(config, ver).await;
4041
4042 let client: Client<ServerError, SequencerApiVersion> =
4043 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4044
4045 let _blocks = client
4047 .socket("availability/stream/blocks/0")
4048 .subscribe::<BlockQueryData<SeqTypes>>()
4049 .await
4050 .unwrap()
4051 .take(40)
4052 .try_collect::<Vec<_>>()
4053 .await
4054 .unwrap();
4055
4056 for i in 1..=3 {
4057 let _st = client
4058 .get::<Vec<PeerConfig<SeqTypes>>>(&format!("node/stake-table/{}", i as u64))
4059 .send()
4060 .await
4061 .expect("failed to get stake table");
4062 }
4063
4064 let _st = client
4065 .get::<StakeTableWithEpochNumber<SeqTypes>>("node/stake-table/current")
4066 .send()
4067 .await
4068 .expect("failed to get stake table");
4069 }
4070
4071 #[rstest]
4072 #[case(PosVersionV3::new())]
4073 #[case(PosVersionV4::new())]
4074 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4075
4076 async fn test_epoch_stake_table_catchup<Ver: Versions>(#[case] ver: Ver) {
4077 const EPOCH_HEIGHT: u64 = 10;
4078 const NUM_NODES: usize = 6;
4079
4080 let port = pick_unused_port().expect("No ports free");
4081
4082 let network_config = TestConfigBuilder::default()
4083 .epoch_height(EPOCH_HEIGHT)
4084 .build();
4085
4086 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4088
4089 let persistence_options: [_; NUM_NODES] = storage
4090 .iter()
4091 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4092 .collect::<Vec<_>>()
4093 .try_into()
4094 .unwrap();
4095
4096 let catchup_peers = std::array::from_fn(|_| {
4098 StatePeers::<StaticVersion<0, 1>>::from_urls(
4099 vec![format!("http://localhost:{port}").parse().unwrap()],
4100 Default::default(),
4101 &NoMetrics,
4102 )
4103 });
4104 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
4105 .api_config(SqlDataSource::options(
4106 &storage[0],
4107 Options::with_port(port),
4108 ))
4109 .network_config(network_config)
4110 .persistences(persistence_options.clone())
4111 .catchups(catchup_peers)
4112 .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4113 .await
4114 .unwrap()
4115 .build();
4116
4117 let state = config.states()[0].clone();
4118 let mut network = TestNetwork::new(config, ver).await;
4119
4120 let mut events = network.peers[0].event_stream().await;
4122 while let Some(event) = events.next().await {
4123 if let EventType::Decide { leaf_chain, .. } = event.event {
4124 let height = leaf_chain[0].leaf.height();
4125 tracing::info!("Node 0 decided at height: {height}");
4126 if height > EPOCH_HEIGHT * 3 {
4127 break;
4128 }
4129 }
4130 }
4131
4132 tracing::info!("Shutting down peer 0");
4134 network.peers.remove(0);
4135
4136 let mut events = network.server.event_stream().await;
4138 while let Some(event) = events.next().await {
4139 if let EventType::Decide { leaf_chain, .. } = event.event {
4140 let height = leaf_chain[0].leaf.height();
4141 if height > EPOCH_HEIGHT * 7 {
4142 break;
4143 }
4144 }
4145 }
4146
4147 let storage = SqlDataSource::create_storage().await;
4149 let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
4150 tracing::info!("Restarting peer 0");
4151 let node = network
4152 .cfg
4153 .init_node(
4154 1,
4155 state,
4156 options,
4157 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4158 vec![format!("http://localhost:{port}").parse().unwrap()],
4159 Default::default(),
4160 &NoMetrics,
4161 )),
4162 None,
4163 &NoMetrics,
4164 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4165 NullEventConsumer,
4166 ver,
4167 Default::default(),
4168 )
4169 .await;
4170
4171 let coordinator = node.node_state().coordinator;
4172 let server_node_state = network.server.node_state();
4173 let server_coordinator = server_node_state.coordinator;
4174 for epoch_num in 1..=7 {
4176 let epoch = EpochNumber::new(epoch_num);
4177 let membership_for_epoch = coordinator.membership_for_epoch(Some(epoch)).await;
4178 if membership_for_epoch.is_err() {
4179 coordinator.wait_for_catchup(epoch).await.unwrap();
4180 }
4181
4182 println!("have stake table for epoch = {epoch_num}");
4183
4184 let node_stake_table = coordinator
4185 .membership()
4186 .read()
4187 .await
4188 .stake_table(Some(epoch));
4189 let stake_table = server_coordinator
4190 .membership()
4191 .read()
4192 .await
4193 .stake_table(Some(epoch));
4194 println!("asserting stake table for epoch = {epoch_num}");
4195
4196 assert_eq!(
4197 node_stake_table, stake_table,
4198 "Stake table mismatch for epoch {epoch_num}",
4199 );
4200 }
4201 }
4202
4203 #[rstest]
4204 #[case(PosVersionV3::new())]
4205 #[case(PosVersionV4::new())]
4206 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4207
4208 async fn test_epoch_stake_table_catchup_stress<Ver: Versions>(#[case] versions: Ver) {
4209 const EPOCH_HEIGHT: u64 = 10;
4210 const NUM_NODES: usize = 6;
4211
4212 let port = pick_unused_port().expect("No ports free");
4213
4214 let network_config = TestConfigBuilder::default()
4215 .epoch_height(EPOCH_HEIGHT)
4216 .build();
4217
4218 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4220
4221 let persistence_options: [_; NUM_NODES] = storage
4222 .iter()
4223 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4224 .collect::<Vec<_>>()
4225 .try_into()
4226 .unwrap();
4227
4228 let catchup_peers = std::array::from_fn(|_| {
4230 StatePeers::<StaticVersion<0, 1>>::from_urls(
4231 vec![format!("http://localhost:{port}").parse().unwrap()],
4232 Default::default(),
4233 &NoMetrics,
4234 )
4235 });
4236 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
4237 .api_config(SqlDataSource::options(
4238 &storage[0],
4239 Options::with_port(port),
4240 ))
4241 .network_config(network_config)
4242 .persistences(persistence_options.clone())
4243 .catchups(catchup_peers)
4244 .pos_hook::<Ver>(DelegationConfig::MultipleDelegators, Default::default())
4245 .await
4246 .unwrap()
4247 .build();
4248
4249 let state = config.states()[0].clone();
4250 let mut network = TestNetwork::new(config, versions).await;
4251
4252 let mut events = network.peers[0].event_stream().await;
4254 while let Some(event) = events.next().await {
4255 if let EventType::Decide { leaf_chain, .. } = event.event {
4256 let height = leaf_chain[0].leaf.height();
4257 tracing::info!("Node 0 decided at height: {height}");
4258 if height > EPOCH_HEIGHT * 3 {
4259 break;
4260 }
4261 }
4262 }
4263
4264 tracing::info!("Shutting down peer 0");
4266 network.peers.remove(0);
4267
4268 let mut events = network.server.event_stream().await;
4270 while let Some(event) = events.next().await {
4271 if let EventType::Decide { leaf_chain, .. } = event.event {
4272 let height = leaf_chain[0].leaf.height();
4273 tracing::info!("Server decided at height: {height}");
4274 if height > EPOCH_HEIGHT * 7 {
4276 break;
4277 }
4278 }
4279 }
4280
4281 let storage = SqlDataSource::create_storage().await;
4283 let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
4284
4285 tracing::info!("Restarting peer 0");
4286 let node = network
4287 .cfg
4288 .init_node(
4289 1,
4290 state,
4291 options,
4292 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4293 vec![format!("http://localhost:{port}").parse().unwrap()],
4294 Default::default(),
4295 &NoMetrics,
4296 )),
4297 None,
4298 &NoMetrics,
4299 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4300 NullEventConsumer,
4301 versions,
4302 Default::default(),
4303 )
4304 .await;
4305
4306 let coordinator = node.node_state().coordinator;
4307
4308 let server_node_state = network.server.node_state();
4309 let server_coordinator = server_node_state.coordinator;
4310
4311 let mut rand_epochs: Vec<_> = (1..=7).collect();
4313 rand_epochs.shuffle(&mut rand::thread_rng());
4314 println!("trigger catchup in this order: {rand_epochs:?}");
4315 for epoch_num in rand_epochs {
4316 let epoch = EpochNumber::new(epoch_num);
4317 let _ = coordinator.membership_for_epoch(Some(epoch)).await;
4318 }
4319
4320 for epoch_num in 1..=7 {
4322 println!("getting stake table for epoch = {epoch_num}");
4323 let epoch = EpochNumber::new(epoch_num);
4324 let _ = coordinator.wait_for_catchup(epoch).await.unwrap();
4325
4326 println!("have stake table for epoch = {epoch_num}");
4327
4328 let node_stake_table = coordinator
4329 .membership()
4330 .read()
4331 .await
4332 .stake_table(Some(epoch));
4333 let stake_table = server_coordinator
4334 .membership()
4335 .read()
4336 .await
4337 .stake_table(Some(epoch));
4338
4339 println!("asserting stake table for epoch = {epoch_num}");
4340
4341 assert_eq!(
4342 node_stake_table, stake_table,
4343 "Stake table mismatch for epoch {epoch_num}",
4344 );
4345 }
4346 }
4347
4348 #[rstest]
4349 #[case(PosVersionV3::new())]
4350 #[case(PosVersionV4::new())]
4351 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4352 async fn test_merklized_state_catchup_on_restart<Ver: Versions>(
4353 #[case] versions: Ver,
4354 ) -> anyhow::Result<()> {
4355 const EPOCH_HEIGHT: u64 = 10;
4367
4368 let network_config = TestConfigBuilder::default()
4369 .epoch_height(EPOCH_HEIGHT)
4370 .build();
4371
4372 let api_port = pick_unused_port().expect("No ports free for query service");
4373
4374 tracing::info!("API PORT = {api_port}");
4375 const NUM_NODES: usize = 5;
4376
4377 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4378 let persistence: [_; NUM_NODES] = storage
4379 .iter()
4380 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4381 .collect::<Vec<_>>()
4382 .try_into()
4383 .unwrap();
4384
4385 let config = TestNetworkConfigBuilder::with_num_nodes()
4386 .api_config(SqlDataSource::options(
4387 &storage[0],
4388 Options::with_port(api_port).catchup(Default::default()),
4389 ))
4390 .network_config(network_config)
4391 .persistences(persistence.clone())
4392 .catchups(std::array::from_fn(|_| {
4393 StatePeers::<StaticVersion<0, 1>>::from_urls(
4394 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4395 Default::default(),
4396 &NoMetrics,
4397 )
4398 }))
4399 .pos_hook::<Ver>(
4400 DelegationConfig::MultipleDelegators,
4401 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
4402 )
4403 .await
4404 .unwrap()
4405 .build();
4406 let state = config.states()[0].clone();
4407 let mut network = TestNetwork::new(config, versions).await;
4408
4409 network.peers[0].shut_down().await;
4414 network.peers.remove(0);
4415 let node_0_storage = &storage[1];
4416 let node_0_persistence = persistence[1].clone();
4417 let node_0_port = pick_unused_port().expect("No ports free for query service");
4418 tracing::info!("node_0_port {node_0_port}");
4419 let opt = Options::with_port(node_0_port).query_sql(
4421 Query {
4422 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
4423 },
4424 tmp_options(node_0_storage),
4425 );
4426
4427 let node_0 = opt
4429 .clone()
4430 .serve(|metrics, consumer, storage| {
4431 let cfg = network.cfg.clone();
4432 let node_0_persistence = node_0_persistence.clone();
4433 let state = state.clone();
4434 async move {
4435 Ok(cfg
4436 .init_node(
4437 1,
4438 state,
4439 node_0_persistence.clone(),
4440 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4441 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4442 Default::default(),
4443 &NoMetrics,
4444 )),
4445 storage,
4446 &*metrics,
4447 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4448 consumer,
4449 versions,
4450 Default::default(),
4451 )
4452 .await)
4453 }
4454 .boxed()
4455 })
4456 .await
4457 .unwrap();
4458
4459 let mut events = network.peers[2].event_stream().await;
4460 wait_for_epochs(&mut events, EPOCH_HEIGHT, 1).await;
4462
4463 drop(node_0);
4465
4466 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
4468
4469 let node_0 = opt
4471 .serve(|metrics, consumer, storage| {
4472 let cfg = network.cfg.clone();
4473 async move {
4474 Ok(cfg
4475 .init_node(
4476 1,
4477 state,
4478 node_0_persistence,
4479 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4480 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4481 Default::default(),
4482 &NoMetrics,
4483 )),
4484 storage,
4485 &*metrics,
4486 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4487 consumer,
4488 versions,
4489 Default::default(),
4490 )
4491 .await)
4492 }
4493 .boxed()
4494 })
4495 .await
4496 .unwrap();
4497
4498 let client: Client<ServerError, SequencerApiVersion> =
4499 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
4500 client.connect(None).await;
4501
4502 wait_for_epochs(&mut events, EPOCH_HEIGHT, 6).await;
4503
4504 let epoch_7_block = EPOCH_HEIGHT * 6 + 1;
4505
4506 let mut retries = 0;
4508 loop {
4509 sleep(Duration::from_secs(1)).await;
4510 let state = node_0.decided_state().await;
4511
4512 let leaves = if Ver::Base::VERSION == EpochVersion::VERSION {
4513 state.reward_merkle_tree_v1.num_leaves()
4515 } else {
4516 state.reward_merkle_tree_v2.num_leaves()
4518 };
4519
4520 if leaves > 0 {
4521 tracing::info!("Node's state has reward accounts");
4522 break;
4523 }
4524
4525 retries += 1;
4526 if retries > 120 {
4527 panic!("max retries reached. failed to catchup reward state");
4528 }
4529 }
4530
4531 retries = 0;
4532 loop {
4534 sleep(Duration::from_secs(3)).await;
4535
4536 let bh = client
4537 .get::<u64>("block-state/block-height")
4538 .send()
4539 .await
4540 .expect("block height not found");
4541
4542 tracing::info!("block state: block height={bh}");
4543 if bh > epoch_7_block {
4544 break;
4545 }
4546
4547 retries += 1;
4548 if retries > 30 {
4549 panic!(
4550 "max retries reached. block state block height is less than epoch 7 start \
4551 block"
4552 );
4553 }
4554 }
4555
4556 node_0.shutdown_consensus().await;
4558 let decided_leaf = node_0.decided_leaf().await;
4559 let state = node_0.decided_state().await;
4560
4561 state
4562 .block_merkle_tree
4563 .lookup(decided_leaf.height() - 1)
4564 .expect_ok()
4565 .expect("block state not found");
4566
4567 Ok(())
4568 }
4569
4570 #[rstest]
4571 #[case(PosVersionV3::new())]
4572 #[case(PosVersionV4::new())]
4573 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4574 async fn test_state_reconstruction<Ver: Versions>(
4575 #[case] pos_version: Ver,
4576 ) -> anyhow::Result<()> {
4577 const EPOCH_HEIGHT: u64 = 10;
4593
4594 let network_config = TestConfigBuilder::default()
4595 .epoch_height(EPOCH_HEIGHT)
4596 .build();
4597
4598 let api_port = pick_unused_port().expect("No ports free for query service");
4599
4600 tracing::info!("API PORT = {api_port}");
4601 const NUM_NODES: usize = 5;
4602
4603 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4604 let persistence: [_; NUM_NODES] = storage
4605 .iter()
4606 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4607 .collect::<Vec<_>>()
4608 .try_into()
4609 .unwrap();
4610
4611 let config = TestNetworkConfigBuilder::with_num_nodes()
4612 .api_config(SqlDataSource::options(
4613 &storage[0],
4614 Options::with_port(api_port),
4615 ))
4616 .network_config(network_config)
4617 .persistences(persistence.clone())
4618 .catchups(std::array::from_fn(|_| {
4619 StatePeers::<StaticVersion<0, 1>>::from_urls(
4620 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4621 Default::default(),
4622 &NoMetrics,
4623 )
4624 }))
4625 .pos_hook::<Ver>(
4626 DelegationConfig::MultipleDelegators,
4627 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
4628 )
4629 .await
4630 .unwrap()
4631 .build();
4632 let state = config.states()[0].clone();
4633 let mut network = TestNetwork::new(config, pos_version).await;
4634 network.peers.remove(0);
4639
4640 let node_0_storage = &storage[1];
4641 let node_0_persistence = persistence[1].clone();
4642 let node_0_port = pick_unused_port().expect("No ports free for query service");
4643 tracing::info!("node_0_port {node_0_port}");
4644 let opt = Options::with_port(node_0_port).query_sql(
4645 Query {
4646 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
4647 },
4648 tmp_options(node_0_storage),
4649 );
4650 let node_0 = opt
4651 .clone()
4652 .serve(|metrics, consumer, storage| {
4653 let cfg = network.cfg.clone();
4654 let node_0_persistence = node_0_persistence.clone();
4655 let state = state.clone();
4656 async move {
4657 Ok(cfg
4658 .init_node(
4659 1,
4660 state,
4661 node_0_persistence.clone(),
4662 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
4663 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4664 Default::default(),
4665 &NoMetrics,
4666 )),
4667 storage,
4668 &*metrics,
4669 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
4670 consumer,
4671 pos_version,
4672 Default::default(),
4673 )
4674 .await)
4675 }
4676 .boxed()
4677 })
4678 .await
4679 .unwrap();
4680
4681 let mut events = network.peers[2].event_stream().await;
4682 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
4684
4685 tracing::warn!("shutting down node 0");
4686
4687 node_0.shutdown_consensus().await;
4688
4689 let instance = node_0.node_state();
4690 let state = node_0.decided_state().await;
4691 let fee_accounts = state
4692 .fee_merkle_tree
4693 .clone()
4694 .into_iter()
4695 .map(|(acct, _)| acct)
4696 .collect::<Vec<_>>();
4697 let reward_accounts = match Ver::Base::VERSION {
4698 EpochVersion::VERSION => state
4699 .reward_merkle_tree_v1
4700 .clone()
4701 .into_iter()
4702 .map(|(acct, _)| RewardAccountV2::from(acct))
4703 .collect::<Vec<_>>(),
4704 DrbAndHeaderUpgradeVersion::VERSION => state
4705 .reward_merkle_tree_v2
4706 .clone()
4707 .into_iter()
4708 .map(|(acct, _)| acct)
4709 .collect::<Vec<_>>(),
4710 _ => panic!("invalid version"),
4711 };
4712
4713 let client: Client<ServerError, SequencerApiVersion> =
4714 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
4715 client.connect(Some(Duration::from_secs(10))).await;
4716
4717 sleep(Duration::from_secs(3)).await;
4720
4721 tracing::info!("getting node block height");
4722 let node_block_height = client
4723 .get::<u64>("node/block-height")
4724 .send()
4725 .await
4726 .context("getting Espresso block height")
4727 .unwrap();
4728
4729 tracing::info!("node block height={node_block_height}");
4730
4731 let leaf_query_data = client
4732 .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{}", node_block_height - 1))
4733 .send()
4734 .await
4735 .context("error getting leaf")
4736 .unwrap();
4737
4738 tracing::info!("leaf={leaf_query_data:?}");
4739 let leaf = leaf_query_data.leaf();
4740 let to_view = leaf.view_number() + 1;
4741
4742 let ds = SqlStorage::connect(Config::try_from(&node_0_persistence).unwrap())
4743 .await
4744 .unwrap();
4745 let mut tx = ds.write().await?;
4746
4747 let (state, leaf) =
4748 reconstruct_state(&instance, &mut tx, node_block_height - 1, to_view, &[], &[])
4749 .await
4750 .unwrap();
4751 assert_eq!(leaf.view_number(), to_view);
4752 assert!(
4753 state
4754 .block_merkle_tree
4755 .lookup(node_block_height - 1)
4756 .expect_ok()
4757 .is_ok(),
4758 "inconsistent block merkle tree"
4759 );
4760
4761 let (state, leaf) = reconstruct_state(
4763 &instance,
4764 &mut tx,
4765 node_block_height - 1,
4766 to_view,
4767 &fee_accounts,
4768 &[],
4769 )
4770 .await
4771 .unwrap();
4772
4773 assert_eq!(leaf.view_number(), to_view);
4774 assert!(
4775 state
4776 .block_merkle_tree
4777 .lookup(node_block_height - 1)
4778 .expect_ok()
4779 .is_ok(),
4780 "inconsistent block merkle tree"
4781 );
4782
4783 for account in &fee_accounts {
4784 state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
4785 }
4786
4787 let (state, leaf) = reconstruct_state(
4790 &instance,
4791 &mut tx,
4792 node_block_height - 1,
4793 to_view,
4794 &[],
4795 &reward_accounts,
4796 )
4797 .await
4798 .unwrap();
4799
4800 match Ver::Base::VERSION {
4801 EpochVersion::VERSION => {
4802 for account in reward_accounts.clone() {
4803 state
4804 .reward_merkle_tree_v1
4805 .lookup(RewardAccountV1::from(account))
4806 .expect_ok()
4807 .unwrap();
4808 }
4809 },
4810 DrbAndHeaderUpgradeVersion::VERSION => {
4811 for account in &reward_accounts {
4812 state
4813 .reward_merkle_tree_v2
4814 .lookup(account)
4815 .expect_ok()
4816 .unwrap();
4817 }
4818 },
4819 _ => panic!("invalid version"),
4820 };
4821
4822 assert_eq!(leaf.view_number(), to_view);
4823 assert!(
4824 state
4825 .block_merkle_tree
4826 .lookup(node_block_height - 1)
4827 .expect_ok()
4828 .is_ok(),
4829 "inconsistent block merkle tree"
4830 );
4831 let (state, leaf) = reconstruct_state(
4834 &instance,
4835 &mut tx,
4836 node_block_height - 1,
4837 to_view,
4838 &fee_accounts,
4839 &reward_accounts,
4840 )
4841 .await
4842 .unwrap();
4843
4844 assert!(
4845 state
4846 .block_merkle_tree
4847 .lookup(node_block_height - 1)
4848 .expect_ok()
4849 .is_ok(),
4850 "inconsistent block merkle tree"
4851 );
4852 assert_eq!(leaf.view_number(), to_view);
4853
4854 match Ver::Base::VERSION {
4855 EpochVersion::VERSION => {
4856 for account in reward_accounts.clone() {
4857 state
4858 .reward_merkle_tree_v1
4859 .lookup(RewardAccountV1::from(account))
4860 .expect_ok()
4861 .unwrap();
4862 }
4863 },
4864 DrbAndHeaderUpgradeVersion::VERSION => {
4865 for account in &reward_accounts {
4866 state
4867 .reward_merkle_tree_v2
4868 .lookup(account)
4869 .expect_ok()
4870 .unwrap();
4871 }
4872 },
4873 _ => panic!("invalid version"),
4874 };
4875
4876 for account in &fee_accounts {
4877 state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
4878 }
4879
4880 Ok(())
4881 }
4882
4883 async fn wait_for_epochs(
4886 events: &mut (impl futures::Stream<Item = Event<SeqTypes>> + std::marker::Unpin),
4887 epoch_height: u64,
4888 target_epoch: u64,
4889 ) {
4890 while let Some(event) = events.next().await {
4891 if let EventType::Decide { leaf_chain, .. } = event.event {
4892 let leaf = leaf_chain[0].leaf.clone();
4893 let epoch = leaf.epoch(epoch_height);
4894 tracing::info!(
4895 "Node decided at height: {}, epoch: {:?}",
4896 leaf.height(),
4897 epoch
4898 );
4899
4900 if epoch > Some(EpochNumber::new(target_epoch)) {
4901 break;
4902 }
4903 }
4904 }
4905 }
4906
4907 #[rstest]
4908 #[case(PosVersionV3::new())]
4909 #[case(PosVersionV4::new())]
4910 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4911 async fn test_block_reward_api<Ver: Versions>(#[case] versions: Ver) -> anyhow::Result<()> {
4912 let epoch_height = 10;
4913
4914 let network_config = TestConfigBuilder::default()
4915 .epoch_height(epoch_height)
4916 .build();
4917
4918 let api_port = pick_unused_port().expect("No ports free for query service");
4919
4920 const NUM_NODES: usize = 1;
4921 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4923 let persistence: [_; NUM_NODES] = storage
4924 .iter()
4925 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4926 .collect::<Vec<_>>()
4927 .try_into()
4928 .unwrap();
4929
4930 let config = TestNetworkConfigBuilder::with_num_nodes()
4931 .api_config(SqlDataSource::options(
4932 &storage[0],
4933 Options::with_port(api_port),
4934 ))
4935 .network_config(network_config.clone())
4936 .persistences(persistence.clone())
4937 .catchups(std::array::from_fn(|_| {
4938 StatePeers::<StaticVersion<0, 1>>::from_urls(
4939 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4940 Default::default(),
4941 &NoMetrics,
4942 )
4943 }))
4944 .pos_hook::<Ver>(DelegationConfig::VariableAmounts, Default::default())
4945 .await
4946 .unwrap()
4947 .build();
4948
4949 let _network = TestNetwork::new(config, versions).await;
4950 let client: Client<ServerError, SequencerApiVersion> =
4951 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4952
4953 let _blocks = client
4954 .socket("availability/stream/blocks/0")
4955 .subscribe::<BlockQueryData<SeqTypes>>()
4956 .await
4957 .unwrap()
4958 .take(3)
4959 .try_collect::<Vec<_>>()
4960 .await
4961 .unwrap();
4962
4963 let block_reward = client
4964 .get::<Option<RewardAmount>>("node/block-reward")
4965 .send()
4966 .await
4967 .expect("failed to get block reward")
4968 .expect("block reward is None");
4969 tracing::info!("block_reward={block_reward:?}");
4970
4971 assert!(block_reward.0 > U256::ZERO);
4972
4973 Ok(())
4974 }
4975
4976 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4977 async fn test_scanning_token_contract_initialized_event() -> anyhow::Result<()> {
4978 use espresso_types::v0_3::ChainConfig;
4979
4980 let blocks_per_epoch = 10;
4981
4982 let network_config = TestConfigBuilder::<1>::default()
4983 .epoch_height(blocks_per_epoch)
4984 .build();
4985
4986 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
4987 &network_config.hotshot_config().hotshot_stake_table(),
4988 STAKE_TABLE_CAPACITY_FOR_TEST,
4989 )
4990 .unwrap();
4991
4992 let deployer = ProviderBuilder::new()
4993 .wallet(EthereumWallet::from(network_config.signer().clone()))
4994 .on_http(network_config.l1_url().clone());
4995
4996 let mut contracts = Contracts::new();
4997 let args = DeployerArgsBuilder::default()
4998 .deployer(deployer.clone())
4999 .mock_light_client(true)
5000 .genesis_lc_state(genesis_state)
5001 .genesis_st_state(genesis_stake)
5002 .blocks_per_epoch(blocks_per_epoch)
5003 .epoch_start_block(1)
5004 .multisig_pauser(network_config.signer().address())
5005 .token_name("Espresso".to_string())
5006 .token_symbol("ESP".to_string())
5007 .initial_token_supply(U256::from(3590000000u64))
5008 .ops_timelock_delay(U256::from(0))
5009 .ops_timelock_admin(network_config.signer().address())
5010 .ops_timelock_proposers(vec![network_config.signer().address()])
5011 .ops_timelock_executors(vec![network_config.signer().address()])
5012 .safe_exit_timelock_delay(U256::from(0))
5013 .safe_exit_timelock_admin(network_config.signer().address())
5014 .safe_exit_timelock_proposers(vec![network_config.signer().address()])
5015 .safe_exit_timelock_executors(vec![network_config.signer().address()])
5016 .build()
5017 .unwrap();
5018
5019 args.deploy_all(&mut contracts).await.unwrap();
5020
5021 let st_addr = contracts
5022 .address(Contract::StakeTableProxy)
5023 .expect("StakeTableProxy deployed");
5024
5025 let l1_url = network_config.l1_url().clone();
5026
5027 let storage = SqlDataSource::create_storage().await;
5028 let mut opt = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5029 let persistence = opt.create().await.unwrap();
5030
5031 let l1_client = L1ClientOptions {
5032 stake_table_update_interval: Duration::from_secs(7),
5033 l1_retry_delay: Duration::from_millis(10),
5034 l1_events_max_block_range: 10000,
5035 ..Default::default()
5036 }
5037 .connect(vec![l1_url])
5038 .unwrap();
5039 l1_client.spawn_tasks().await;
5040
5041 let fetcher = Fetcher::new(
5042 Arc::new(NullStateCatchup::default()),
5043 Arc::new(Mutex::new(persistence.clone())),
5044 l1_client.clone(),
5045 ChainConfig {
5046 stake_table_contract: Some(st_addr),
5047 base_fee: 0.into(),
5048 ..Default::default()
5049 },
5050 );
5051
5052 let provider = l1_client.provider;
5053 let stake_table = StakeTableV2::new(st_addr, provider.clone());
5054
5055 let stake_table_init_block = stake_table
5056 .initializedAtBlock()
5057 .block(BlockId::finalized())
5058 .call()
5059 .await?
5060 ._0
5061 .to::<u64>();
5062
5063 tracing::info!("stake table init block = {stake_table_init_block}");
5064
5065 let token_address = stake_table
5066 .token()
5067 .block(BlockId::finalized())
5068 .call()
5069 .await
5070 .context("Failed to get token address")?
5071 ._0;
5072
5073 let token = EspToken::new(token_address, provider.clone());
5074
5075 let init_log = fetcher
5076 .scan_token_contract_initialized_event_log(stake_table_init_block, token)
5077 .await
5078 .unwrap();
5079
5080 let init_tx = provider
5081 .get_transaction_receipt(
5082 init_log
5083 .transaction_hash
5084 .context(format!("transaction hash not found. init_log={init_log:?}"))?,
5085 )
5086 .await
5087 .unwrap()
5088 .unwrap();
5089
5090 let mint_transfer = init_tx.decoded_log::<EspToken::Transfer>().unwrap();
5091
5092 assert!(mint_transfer.value > U256::ZERO);
5093
5094 Ok(())
5095 }
5096
5097 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5098 async fn test_tx_metadata() {
5099 let port = pick_unused_port().expect("No ports free");
5100
5101 let url = format!("http://localhost:{port}").parse().unwrap();
5102 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5103
5104 let storage = SqlDataSource::create_storage().await;
5105 let network_config = TestConfigBuilder::default().build();
5106 let config = TestNetworkConfigBuilder::default()
5107 .api_config(
5108 SqlDataSource::options(&storage, Options::with_port(port))
5109 .submit(Default::default())
5110 .explorer(Default::default()),
5111 )
5112 .network_config(network_config)
5113 .build();
5114 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5115 let mut events = network.server.event_stream().await;
5116
5117 client.connect(None).await;
5118
5119 let namespace_counts = [(101, 1), (102, 2), (103, 3)];
5121 for (ns, count) in &namespace_counts {
5122 for i in 0..*count {
5123 let ns_id = NamespaceId::from(*ns as u64);
5124 let txn = Transaction::new(ns_id, vec![*ns, i]);
5125 client
5126 .post::<()>("submit/submit")
5127 .body_json(&txn)
5128 .unwrap()
5129 .send()
5130 .await
5131 .unwrap();
5132 let (block, _) = wait_for_decide_on_handle(&mut events, &txn).await;
5133
5134 let summary: BlockSummaryQueryData<SeqTypes> = client
5136 .get(&format!("availability/block/summary/{block}"))
5137 .send()
5138 .await
5139 .unwrap();
5140 let ns_info = summary.namespaces();
5141 assert_eq!(ns_info.len(), 1);
5142 assert_eq!(ns_info.keys().copied().collect::<Vec<_>>(), vec![ns_id]);
5143 assert_eq!(ns_info[&ns_id].num_transactions, 1);
5144 assert_eq!(ns_info[&ns_id].size, txn.size_in_block(true));
5145 }
5146 }
5147
5148 for (ns, count) in &namespace_counts {
5150 tracing::info!(ns, "list transactions in namespace");
5151
5152 let ns_id = NamespaceId::from(*ns as u64);
5153 let summaries: TransactionSummariesResponse<SeqTypes> = client
5154 .get(&format!(
5155 "explorer/transactions/latest/{count}/namespace/{ns_id}"
5156 ))
5157 .send()
5158 .await
5159 .unwrap();
5160 let txs = summaries.transaction_summaries;
5161 assert_eq!(txs.len(), *count as usize);
5162
5163 for i in 0..*count {
5165 let summary = &txs[i as usize];
5166 let expected = Transaction::new(ns_id, vec![*ns, count - i - 1]);
5167 assert_eq!(summary.rollups, vec![ns_id]);
5168 assert_eq!(summary.hash, expected.commit());
5169 }
5170 }
5171 }
5172
5173 use std::time::Instant;
5174
5175 use rand::thread_rng;
5176
5177 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5178 async fn test_aggregator_namespace_endpoints() {
5179 let mut rng = thread_rng();
5180
5181 let port = pick_unused_port().expect("No ports free");
5182
5183 let url = format!("http://localhost:{port}").parse().unwrap();
5184 tracing::info!("Sequencer URL = {url}");
5185 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5186
5187 let options = Options::with_port(port).submit(Default::default());
5188 const NUM_NODES: usize = 2;
5189 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5191
5192 let persistence_options: [_; NUM_NODES] = storage
5193 .iter()
5194 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5195 .collect::<Vec<_>>()
5196 .try_into()
5197 .unwrap();
5198
5199 let network_config = TestConfigBuilder::default().build();
5200
5201 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5202 .api_config(SqlDataSource::options(&storage[0], options))
5203 .network_config(network_config)
5204 .persistences(persistence_options.clone())
5205 .build();
5206 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5207 let mut events = network.server.event_stream().await;
5208 let start = Instant::now();
5209 let mut total_transactions = 0;
5210 let mut tx_heights = Vec::new();
5211 let mut sizes = HashMap::new();
5212 for namespace in 1..=4 {
5215 for _count in 0..namespace {
5216 let payload_len = rng.gen_range(4..=10);
5218 let payload: Vec<u8> = (0..payload_len).map(|_| rng.gen()).collect();
5219
5220 let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
5221
5222 client.connect(None).await;
5223
5224 let hash = client
5225 .post("submit/submit")
5226 .body_json(&txn)
5227 .unwrap()
5228 .send()
5229 .await
5230 .unwrap();
5231 assert_eq!(txn.commit(), hash);
5232
5233 let (height, size) = wait_for_decide_on_handle(&mut events, &txn).await;
5235 tx_heights.push(height);
5236 total_transactions += 1;
5237 *sizes.entry(namespace).or_insert(0) += size;
5238 }
5239 }
5240
5241 let duration = start.elapsed();
5242
5243 println!("Time elapsed to submit transactions: {duration:?}");
5244
5245 let last_tx_height = tx_heights.last().unwrap();
5246 for namespace in 1..=4 {
5247 let count = client
5248 .get::<u64>(&format!("node/transactions/count/namespace/{namespace}"))
5249 .send()
5250 .await
5251 .unwrap();
5252 assert_eq!(
5253 count, namespace as u64,
5254 "Incorrect transaction count for namespace {namespace}: expected {namespace}, got \
5255 {count}"
5256 );
5257
5258 let to_endpoint_count = client
5260 .get::<u64>(&format!(
5261 "node/transactions/count/namespace/{namespace}/{last_tx_height}"
5262 ))
5263 .send()
5264 .await
5265 .unwrap();
5266 assert_eq!(
5267 to_endpoint_count, namespace as u64,
5268 "Incorrect transaction count for range endpoint (to only) for namespace \
5269 {namespace}: expected {namespace}, got {to_endpoint_count}"
5270 );
5271
5272 let from_to_endpoint_count = client
5274 .get::<u64>(&format!(
5275 "node/transactions/count/namespace/{namespace}/0/{last_tx_height}"
5276 ))
5277 .send()
5278 .await
5279 .unwrap();
5280 assert_eq!(
5281 from_to_endpoint_count, namespace as u64,
5282 "Incorrect transaction count for range endpoint (from-to) for namespace \
5283 {namespace}: expected {namespace}, got {from_to_endpoint_count}"
5284 );
5285
5286 let ns_size = client
5287 .get::<usize>(&format!("node/payloads/size/namespace/{namespace}"))
5288 .send()
5289 .await
5290 .unwrap();
5291
5292 let expected_ns_size = *sizes.get(&namespace).unwrap();
5293 assert_eq!(
5294 ns_size, expected_ns_size,
5295 "Incorrect payload size for namespace {namespace}: expected {expected_ns_size}, \
5296 got {ns_size}"
5297 );
5298
5299 let ns_size_to = client
5300 .get::<usize>(&format!(
5301 "node/payloads/size/namespace/{namespace}/{last_tx_height}"
5302 ))
5303 .send()
5304 .await
5305 .unwrap();
5306 assert_eq!(
5307 ns_size_to, expected_ns_size,
5308 "Incorrect payload size for namespace {namespace} up to height {last_tx_height}: \
5309 expected {expected_ns_size}, got {ns_size_to}"
5310 );
5311
5312 let ns_size_from_to = client
5313 .get::<usize>(&format!(
5314 "node/payloads/size/namespace/{namespace}/0/{last_tx_height}"
5315 ))
5316 .send()
5317 .await
5318 .unwrap();
5319 assert_eq!(
5320 ns_size_from_to, expected_ns_size,
5321 "Incorrect payload size for namespace {namespace} from 0 to height \
5322 {last_tx_height}: expected {expected_ns_size}, got {ns_size_from_to}"
5323 );
5324 }
5325
5326 let total_tx_count = client
5327 .get::<u64>("node/transactions/count")
5328 .send()
5329 .await
5330 .unwrap();
5331 assert_eq!(
5332 total_tx_count, total_transactions,
5333 "Incorrect total transaction count: expected {total_transactions}, got \
5334 {total_tx_count}"
5335 );
5336
5337 let total_payload_size = client
5338 .get::<usize>("node/payloads/size")
5339 .send()
5340 .await
5341 .unwrap();
5342
5343 let expected_total_size: usize = sizes.values().copied().sum();
5344 assert_eq!(
5345 total_payload_size, expected_total_size,
5346 "Incorrect total payload size: expected {expected_total_size}, got \
5347 {total_payload_size}"
5348 );
5349 }
5350
5351 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5352 async fn test_stream_transactions_endpoint() {
5353 let mut rng = thread_rng();
5359
5360 let port = pick_unused_port().expect("No ports free");
5361
5362 let url = format!("http://localhost:{port}").parse().unwrap();
5363 tracing::info!("Sequencer URL = {url}");
5364 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
5365
5366 let options = Options::with_port(port).submit(Default::default());
5367 const NUM_NODES: usize = 2;
5368 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5370
5371 let persistence_options: [_; NUM_NODES] = storage
5372 .iter()
5373 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5374 .collect::<Vec<_>>()
5375 .try_into()
5376 .unwrap();
5377
5378 let network_config = TestConfigBuilder::default().build();
5379
5380 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5381 .api_config(SqlDataSource::options(&storage[0], options))
5382 .network_config(network_config)
5383 .persistences(persistence_options.clone())
5384 .build();
5385 let network = TestNetwork::new(config, MockSequencerVersions::new()).await;
5386 let mut events = network.server.event_stream().await;
5387 let mut all_transactions = HashMap::new();
5388 let mut namespace_tx: HashMap<_, HashSet<_>> = HashMap::new();
5389
5390 for namespace in 1..=4 {
5393 for _count in 0..namespace {
5394 let payload_len = rng.gen_range(4..=10);
5395 let payload: Vec<u8> = (0..payload_len).map(|_| rng.gen()).collect();
5396
5397 let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
5398
5399 client.connect(None).await;
5400
5401 let hash = client
5402 .post("submit/submit")
5403 .body_json(&txn)
5404 .unwrap()
5405 .send()
5406 .await
5407 .unwrap();
5408 assert_eq!(txn.commit(), hash);
5409
5410 wait_for_decide_on_handle(&mut events, &txn).await;
5412 all_transactions.insert(txn.commit(), txn.clone());
5415 namespace_tx.entry(namespace).or_default().insert(txn);
5416 }
5417 }
5418
5419 let mut transactions = client
5420 .socket("availability/stream/transactions/0")
5421 .subscribe::<TransactionQueryData<SeqTypes>>()
5422 .await
5423 .expect("failed to subscribe to transactions endpoint");
5424
5425 let mut count = 0;
5426 while let Some(tx) = transactions.next().await {
5427 let tx = tx.unwrap();
5428 let expected = all_transactions
5429 .get(&tx.transaction().commit())
5430 .expect("txn not found ");
5431 assert_eq!(tx.transaction(), expected, "invalid transaction");
5432 count += 1;
5433
5434 if count == all_transactions.len() {
5435 break;
5436 }
5437 }
5438
5439 for (namespace, expected_ns_txns) in &namespace_tx {
5442 let mut api_namespace_txns = client
5443 .socket(&format!(
5444 "availability/stream/transactions/0/namespace/{namespace}",
5445 ))
5446 .subscribe::<TransactionQueryData<SeqTypes>>()
5447 .await
5448 .unwrap_or_else(|_| {
5449 panic!("failed to subscribe to transactions namespace {namespace}")
5450 });
5451
5452 let mut received = HashSet::new();
5453
5454 while let Some(res) = api_namespace_txns.next().await {
5455 let tx = res.expect("stream error");
5456 received.insert(tx.transaction().clone());
5457
5458 if received.len() == expected_ns_txns.len() {
5459 break;
5460 }
5461 }
5462
5463 assert_eq!(
5464 received, *expected_ns_txns,
5465 "Mismatched transactions for namespace {namespace}"
5466 );
5467 }
5468 }
5469
5470 #[rstest]
5471 #[case(PosVersionV3::new())]
5472 #[case(PosVersionV4::new())]
5473 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5474 async fn test_v3_and_v4_reward_tree_updates<Ver: Versions>(
5475 #[case] versions: Ver,
5476 ) -> anyhow::Result<()> {
5477 const EPOCH_HEIGHT: u64 = 10;
5487
5488 let network_config = TestConfigBuilder::default()
5489 .epoch_height(EPOCH_HEIGHT)
5490 .build();
5491
5492 let api_port = pick_unused_port().expect("No ports free for query service");
5493
5494 tracing::info!("API PORT = {api_port}");
5495 const NUM_NODES: usize = 5;
5496
5497 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5498 let persistence: [_; NUM_NODES] = storage
5499 .iter()
5500 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5501 .collect::<Vec<_>>()
5502 .try_into()
5503 .unwrap();
5504
5505 let config = TestNetworkConfigBuilder::with_num_nodes()
5506 .api_config(SqlDataSource::options(
5507 &storage[0],
5508 Options::with_port(api_port).catchup(Default::default()),
5509 ))
5510 .network_config(network_config)
5511 .persistences(persistence.clone())
5512 .catchups(std::array::from_fn(|_| {
5513 StatePeers::<StaticVersion<0, 1>>::from_urls(
5514 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5515 Default::default(),
5516 &NoMetrics,
5517 )
5518 }))
5519 .pos_hook::<Ver>(
5520 DelegationConfig::MultipleDelegators,
5521 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5522 )
5523 .await
5524 .unwrap()
5525 .build();
5526 let mut network = TestNetwork::new(config, versions).await;
5527
5528 let mut events = network.peers[2].event_stream().await;
5529 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
5531
5532 let validated_state = network.server.decided_state().await;
5533 let version = Ver::Base::VERSION;
5534 if version == EpochVersion::VERSION {
5535 let v1_tree = &validated_state.reward_merkle_tree_v1;
5536 assert!(v1_tree.num_leaves() > 0, "v1 reward tree tree is empty");
5537 let v2_tree = &validated_state.reward_merkle_tree_v2;
5538 assert!(
5539 v2_tree.num_leaves() == 0,
5540 "v2 reward tree tree is not empty"
5541 );
5542 } else {
5543 let v1_tree = &validated_state.reward_merkle_tree_v1;
5544 assert!(
5545 v1_tree.num_leaves() == 0,
5546 "v1 reward tree tree is not empty"
5547 );
5548 let v2_tree = &validated_state.reward_merkle_tree_v2;
5549 assert!(v2_tree.num_leaves() > 0, "v2 reward tree tree is empty");
5550 }
5551
5552 network.stop_consensus().await;
5553 Ok(())
5554 }
5555
5556 #[rstest]
5557 #[case(PosVersionV3::new())]
5558 #[case(PosVersionV4::new())]
5559 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5560
5561 pub(crate) async fn test_state_cert_query<Ver: Versions>(#[case] versions: Ver) {
5562 const TEST_EPOCH_HEIGHT: u64 = 10;
5563 const TEST_EPOCHS: u64 = 5;
5564
5565 let network_config = TestConfigBuilder::default()
5566 .epoch_height(TEST_EPOCH_HEIGHT)
5567 .build();
5568
5569 let api_port = pick_unused_port().expect("No ports free for query service");
5570
5571 tracing::info!("API PORT = {api_port}");
5572 const NUM_NODES: usize = 2;
5573
5574 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5575 let persistence: [_; NUM_NODES] = storage
5576 .iter()
5577 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5578 .collect::<Vec<_>>()
5579 .try_into()
5580 .unwrap();
5581
5582 let config = TestNetworkConfigBuilder::with_num_nodes()
5583 .api_config(SqlDataSource::options(
5584 &storage[0],
5585 Options::with_port(api_port).catchup(Default::default()),
5586 ))
5587 .network_config(network_config)
5588 .persistences(persistence.clone())
5589 .catchups(std::array::from_fn(|_| {
5590 StatePeers::<StaticVersion<0, 1>>::from_urls(
5591 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5592 Default::default(),
5593 &NoMetrics,
5594 )
5595 }))
5596 .pos_hook::<Ver>(
5597 DelegationConfig::MultipleDelegators,
5598 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5599 )
5600 .await
5601 .unwrap()
5602 .build();
5603
5604 let network = TestNetwork::new(config, versions).await;
5605 let mut events = network.server.event_stream().await;
5606
5607 loop {
5609 let event = events.next().await.unwrap();
5610 tracing::info!("Received event from handle: {event:?}");
5611
5612 if let hotshot::types::EventType::Decide { leaf_chain, .. } = event.event {
5613 println!(
5614 "Decide event received: {:?}",
5615 leaf_chain.first().unwrap().leaf.height()
5616 );
5617 if let Some(first_leaf) = leaf_chain.first() {
5618 let height = first_leaf.leaf.height();
5619 tracing::info!("Decide event received at height: {height}");
5620
5621 if height >= TEST_EPOCHS * TEST_EPOCH_HEIGHT {
5622 break;
5623 }
5624 }
5625 }
5626 }
5627
5628 let client: Client<ServerError, StaticVersion<0, 1>> =
5630 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5631 client.connect(Some(Duration::from_secs(10))).await;
5632
5633 for i in 3..=TEST_EPOCHS {
5635 let state_query_data_v2 = client
5637 .get::<StateCertQueryDataV2<SeqTypes>>(&format!("availability/state-cert-v2/{i}"))
5638 .send()
5639 .await
5640 .unwrap();
5641 let state_cert_v2 = state_query_data_v2.0.clone();
5642 tracing::info!("state_cert_v2: {state_cert_v2:?}");
5643 assert_eq!(state_cert_v2.epoch.u64(), i);
5644 assert_eq!(
5645 state_cert_v2.light_client_state.block_height,
5646 i * TEST_EPOCH_HEIGHT - 5
5647 );
5648 let block_height = state_cert_v2.light_client_state.block_height;
5649
5650 let header: Header = client
5651 .get(&format!("availability/header/{block_height}"))
5652 .send()
5653 .await
5654 .unwrap();
5655
5656 if header.version() == DrbAndHeaderUpgradeVersion::VERSION {
5658 let auth_root = state_cert_v2.auth_root;
5659 let header_auth_root = header.auth_root().unwrap();
5660 if auth_root.is_zero() || header_auth_root.is_zero() {
5661 panic!("auth root shouldn't be zero");
5662 }
5663
5664 assert_eq!(auth_root, header_auth_root, "auth root mismatch");
5665 }
5666
5667 let state_query_data_v1 = client
5669 .get::<StateCertQueryDataV1<SeqTypes>>(&format!("availability/state-cert/{i}"))
5670 .send()
5671 .await
5672 .unwrap();
5673
5674 let state_cert_v1 = state_query_data_v1.0.clone();
5675 tracing::info!("state_cert_v1: {state_cert_v1:?}");
5676 assert_eq!(state_query_data_v1, state_query_data_v2.into());
5677 }
5678 }
5679}