1pub mod api;
2pub mod catchup;
3pub mod context;
4pub mod genesis;
5mod proposal_fetcher;
6mod request_response;
7
8mod external_event_handler;
9pub mod options;
10pub mod state_signature;
11
12mod restart_tests;
13
14mod message_compat_tests;
15
16use std::sync::Arc;
17
18use alloy::primitives::U256;
19use anyhow::Context;
20use async_lock::{Mutex, RwLock};
21use catchup::{ParallelStateCatchup, StatePeers};
22use context::SequencerContext;
23use espresso_types::{
24 traits::{EventConsumer, MembershipPersistence},
25 v0_3::Fetcher,
26 BackoffParams, EpochCommittees, L1ClientOptions, NodeState, PubKey, SeqTypes, ValidatedState,
27};
28use genesis::L1Finalized;
29use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
30use libp2p::Multiaddr;
31use network::libp2p::split_off_peer_id;
32use options::Identity;
33use proposal_fetcher::ProposalFetcherConfig;
34use tokio::select;
35use tracing::info;
36use url::Url;
37
38use crate::request_response::data_source::Storage as RequestResponseStorage;
39pub mod persistence;
40pub mod state;
41use std::{fmt::Debug, marker::PhantomData, time::Duration};
42
43use derivative::Derivative;
44use espresso_types::v0::traits::SequencerPersistence;
45pub use genesis::Genesis;
46use hotshot::{
47 traits::implementations::{
48 derive_libp2p_multiaddr, derive_libp2p_peer_id, CdnMetricsValue, CdnTopic,
49 CombinedNetworks, GossipConfig, KeyPair, Libp2pNetwork, MemoryNetwork, PushCdnNetwork,
50 RequestResponseConfig, WrappedSignatureKey,
51 },
52 types::SignatureKey,
53};
54use hotshot_orchestrator::client::{get_complete_config, OrchestratorClient};
55use hotshot_types::{
56 data::ViewNumber,
57 epoch_membership::EpochMembershipCoordinator,
58 light_client::{StateKeyPair, StateSignKey},
59 signature_key::{BLSPrivKey, BLSPubKey},
60 traits::{
61 metrics::{Metrics, NoMetrics},
62 network::ConnectedNetwork,
63 node_implementation::{NodeImplementation, NodeType, Versions},
64 storage::Storage,
65 },
66 utils::BuilderCommitment,
67 ValidatorConfig,
68};
69pub use options::Options;
70use serde::{Deserialize, Serialize};
71use vbs::version::{StaticVersion, StaticVersionType};
72pub mod network;
73
74mod run;
75pub use run::main;
76
77pub const RECENT_STAKE_TABLES_LIMIT: u64 = 20;
78#[derive(Derivative, Serialize, Deserialize)]
80#[derivative(
81 Copy(bound = ""),
82 Debug(bound = ""),
83 Default(bound = ""),
84 PartialEq(bound = ""),
85 Eq(bound = ""),
86 Hash(bound = "")
87)]
88pub struct Node<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(PhantomData<fn(&N, &P)>);
89
90impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Clone for Node<N, P> {
93 fn clone(&self) -> Self {
94 *self
95 }
96}
97
98pub type SequencerApiVersion = StaticVersion<0, 1>;
99
100impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> NodeImplementation<SeqTypes>
101 for Node<N, P>
102{
103 type Network = N;
104 type Storage = Arc<P>;
105}
106
107#[derive(Clone, Debug)]
108pub struct NetworkParams {
109 pub cdn_endpoint: String,
111 pub orchestrator_url: Url,
112 pub state_relay_server_url: Url,
113 pub private_staking_key: BLSPrivKey,
114 pub private_state_key: StateSignKey,
115 pub state_peers: Vec<Url>,
116 pub config_peers: Option<Vec<Url>>,
117 pub catchup_backoff: BackoffParams,
118 pub public_api_url: Option<Url>,
120
121 pub libp2p_advertise_address: String,
123 pub libp2p_bind_address: String,
125 pub libp2p_bootstrap_nodes: Option<Vec<Multiaddr>>,
128
129 pub libp2p_heartbeat_interval: Duration,
131
132 pub libp2p_history_gossip: usize,
134 pub libp2p_history_length: usize,
136
137 pub libp2p_mesh_n: usize,
139 pub libp2p_mesh_n_high: usize,
141 pub libp2p_mesh_n_low: usize,
143 pub libp2p_mesh_outbound_min: usize,
145
146 pub libp2p_max_gossip_transmit_size: usize,
148
149 pub libp2p_max_direct_transmit_size: u64,
151
152 pub libp2p_max_ihave_length: usize,
154
155 pub libp2p_max_ihave_messages: usize,
157
158 pub libp2p_published_message_ids_cache_time: Duration,
160
161 pub libp2p_iwant_followup_time: Duration,
163
164 pub libp2p_max_messages_per_rpc: Option<usize>,
166
167 pub libp2p_gossip_retransmission: u32,
169
170 pub libp2p_flood_publish: bool,
172
173 pub libp2p_duplicate_cache_time: Duration,
175
176 pub libp2p_fanout_ttl: Duration,
178
179 pub libp2p_heartbeat_initial_delay: Duration,
181
182 pub libp2p_gossip_factor: f64,
184
185 pub libp2p_gossip_lazy: usize,
187}
188
189pub struct L1Params {
190 pub urls: Vec<Url>,
191 pub options: L1ClientOptions,
192}
193
194#[allow(clippy::too_many_arguments)]
195pub async fn init_node<
196 P: SequencerPersistence + MembershipPersistence + DhtPersistentStorage,
197 V: Versions,
198>(
199 genesis: Genesis,
200 network_params: NetworkParams,
201 metrics: &dyn Metrics,
202 mut persistence: P,
203 l1_params: L1Params,
204 storage: Option<RequestResponseStorage>,
205 seq_versions: V,
206 event_consumer: impl EventConsumer + 'static,
207 is_da: bool,
208 identity: Identity,
209 proposal_fetcher_config: ProposalFetcherConfig,
210) -> anyhow::Result<SequencerContext<network::Production, P, V>>
211where
212 Arc<P>: Storage<SeqTypes>,
213{
214 metrics
216 .text_family(
217 "version".into(),
218 vec!["rev".into(), "desc".into(), "timestamp".into()],
219 )
220 .create(vec![
221 env!("VERGEN_GIT_SHA").into(),
222 env!("VERGEN_GIT_DESCRIBE").into(),
223 env!("VERGEN_GIT_COMMIT_TIMESTAMP").into(),
224 ]);
225
226 metrics
228 .text_family(
229 "node_identity_general".into(),
230 vec![
231 "name".into(),
232 "company_name".into(),
233 "company_website".into(),
234 "operating_system".into(),
235 "node_type".into(),
236 "network_type".into(),
237 ],
238 )
239 .create(vec![
240 identity.node_name.unwrap_or("".into()),
241 identity.company_name.unwrap_or("".into()),
242 identity
243 .company_website
244 .map(|u| u.into())
245 .unwrap_or("".into()),
246 identity.operating_system.unwrap_or("".into()),
247 identity.node_type.unwrap_or("".into()),
248 identity.network_type.unwrap_or("".into()),
249 ]);
250
251 metrics
253 .text_family(
254 "node_identity_location".into(),
255 vec!["country".into(), "latitude".into(), "longitude".into()],
256 )
257 .create(vec![
258 identity.country_code.unwrap_or("".into()),
259 identity
260 .latitude
261 .map(|l| l.to_string())
262 .unwrap_or("".into()),
263 identity
264 .longitude
265 .map(|l| l.to_string())
266 .unwrap_or("".into()),
267 ]);
268
269 let pub_key = BLSPubKey::from_private(&network_params.private_staking_key);
271 metrics
272 .text_family("node".into(), vec!["key".into()])
273 .create(vec![pub_key.to_string()]);
274
275 let libp2p_bind_address = derive_libp2p_multiaddr(&network_params.libp2p_bind_address)
277 .with_context(|| {
278 format!(
279 "Failed to derive Libp2p bind address of {}",
280 &network_params.libp2p_bind_address
281 )
282 })?;
283 let libp2p_advertise_address =
284 derive_libp2p_multiaddr(&network_params.libp2p_advertise_address).with_context(|| {
285 format!(
286 "Failed to derive Libp2p advertise address of {}",
287 &network_params.libp2p_advertise_address
288 )
289 })?;
290
291 info!("Libp2p bind address: {}", libp2p_bind_address);
292 info!("Libp2p advertise address: {}", libp2p_advertise_address);
293
294 let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url);
296 let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key);
297 let validator_config = ValidatorConfig {
298 public_key: pub_key,
299 private_key: network_params.private_staking_key,
300 stake_value: U256::ONE,
301 state_public_key: state_key_pair.ver_key(),
302 state_private_key: state_key_pair.sign_key(),
303 is_da,
304 };
305
306 let libp2p_public_key = derive_libp2p_peer_id::<<SeqTypes as NodeType>::SignatureKey>(
308 &validator_config.private_key,
309 )
310 .with_context(|| "Failed to derive Libp2p peer ID")?;
311
312 info!("Starting Libp2p with PeerID: {libp2p_public_key}");
314
315 let (mut network_config, wait_for_orchestrator) = match (
316 persistence.load_config().await?,
317 network_params.config_peers,
318 ) {
319 (Some(config), _) => {
320 tracing::info!("loaded network config from storage, rejoining existing network");
321 (config, false)
322 },
323 (None, Some(peers)) => {
325 tracing::info!(?peers, "loading network config from peers");
326 let peers = StatePeers::<SequencerApiVersion>::from_urls(
327 peers,
328 network_params.catchup_backoff,
329 &NoMetrics,
330 );
331 let config = peers.fetch_config(validator_config.clone()).await?;
332
333 tracing::info!(
334 node_id = config.node_index,
335 stake_table = ?config.config.known_nodes_with_stake,
336 "loaded config",
337 );
338 persistence.save_config(&config).await?;
339 (config, false)
340 },
341 (None, None) => {
343 tracing::info!("loading network config from orchestrator");
344 tracing::error!(
345 "waiting for other nodes to connect, DO NOT RESTART until fully connected"
346 );
347 let config = get_complete_config(
348 &orchestrator_client,
349 validator_config.clone(),
350 Some(libp2p_advertise_address),
353 Some(libp2p_public_key),
354 )
355 .await?
356 .0;
357
358 tracing::info!(
359 node_id = config.node_index,
360 stake_table = ?config.config.known_nodes_with_stake,
361 "loaded config",
362 );
363 persistence.save_config(&config).await?;
364 tracing::error!("all nodes connected");
365 (config, true)
366 },
367 };
368
369 if let Some(upgrade) = genesis.upgrades.get(&V::Upgrade::VERSION) {
370 upgrade.set_hotshot_config_parameters(&mut network_config.config);
371 }
372
373 let epoch_height = genesis.epoch_height.unwrap_or_default();
374 let drb_difficulty = genesis.drb_difficulty.unwrap_or_default();
375 let epoch_start_block = genesis.epoch_start_block.unwrap_or_default();
376 let stake_table_capacity = genesis
377 .stake_table_capacity
378 .unwrap_or(hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY);
379
380 tracing::info!("setting epoch_height={epoch_height:?}");
381 tracing::info!("setting drb_difficulty={drb_difficulty:?}");
382 tracing::info!("setting epoch_start_block={epoch_start_block:?}");
383 tracing::info!("setting stake_table_capacity={stake_table_capacity:?}");
384 network_config.config.epoch_height = epoch_height;
385 network_config.config.drb_difficulty = drb_difficulty;
386 network_config.config.epoch_start_block = epoch_start_block;
387 network_config.config.stake_table_capacity = stake_table_capacity;
388
389 if let Some(bootstrap_nodes) = network_params.libp2p_bootstrap_nodes {
392 if let Some(libp2p_config) = network_config.libp2p_config.as_mut() {
393 libp2p_config.bootstrap_nodes = bootstrap_nodes
397 .into_iter()
398 .map(split_off_peer_id)
399 .collect::<Result<Vec<_>, _>>()
400 .with_context(|| "Failed to parse peer ID from bootstrap node")?;
401 } else {
402 tracing::warn!("No libp2p configuration found, ignoring supplied bootstrap nodes");
405 }
406 }
407
408 let node_index = network_config.node_index;
409
410 let topics = {
412 let mut topics = vec![CdnTopic::Global];
413 if is_da {
414 topics.push(CdnTopic::Da);
415 }
416 topics
417 };
418
419 let cdn_network = PushCdnNetwork::new(
421 network_params.cdn_endpoint,
422 topics,
423 KeyPair {
424 public_key: WrappedSignatureKey(validator_config.public_key),
425 private_key: validator_config.private_key.clone(),
426 },
427 CdnMetricsValue::new(metrics),
428 )
429 .with_context(|| format!("Failed to create CDN network {node_index}"))?;
430
431 let gossip_config = GossipConfig {
433 heartbeat_interval: network_params.libp2p_heartbeat_interval,
434 history_gossip: network_params.libp2p_history_gossip,
435 history_length: network_params.libp2p_history_length,
436 mesh_n: network_params.libp2p_mesh_n,
437 mesh_n_high: network_params.libp2p_mesh_n_high,
438 mesh_n_low: network_params.libp2p_mesh_n_low,
439 mesh_outbound_min: network_params.libp2p_mesh_outbound_min,
440 max_ihave_messages: network_params.libp2p_max_ihave_messages,
441 max_transmit_size: network_params.libp2p_max_gossip_transmit_size,
442 max_ihave_length: network_params.libp2p_max_ihave_length,
443 published_message_ids_cache_time: network_params.libp2p_published_message_ids_cache_time,
444 iwant_followup_time: network_params.libp2p_iwant_followup_time,
445 max_messages_per_rpc: network_params.libp2p_max_messages_per_rpc,
446 gossip_retransmission: network_params.libp2p_gossip_retransmission,
447 flood_publish: network_params.libp2p_flood_publish,
448 duplicate_cache_time: network_params.libp2p_duplicate_cache_time,
449 fanout_ttl: network_params.libp2p_fanout_ttl,
450 heartbeat_initial_delay: network_params.libp2p_heartbeat_initial_delay,
451 gossip_factor: network_params.libp2p_gossip_factor,
452 gossip_lazy: network_params.libp2p_gossip_lazy,
453 };
454
455 let request_response_config = RequestResponseConfig {
457 request_size_maximum: network_params.libp2p_max_direct_transmit_size,
458 response_size_maximum: network_params.libp2p_max_direct_transmit_size,
459 };
460
461 let l1_client = l1_params
462 .options
463 .with_metrics(metrics)
464 .connect(l1_params.urls)
465 .with_context(|| "failed to create L1 client")?;
466 genesis.validate_fee_contract(&l1_client).await?;
467
468 l1_client.spawn_tasks().await;
469 let l1_genesis = match genesis.l1_finalized {
470 L1Finalized::Block(b) => b,
471 L1Finalized::Number { number } => l1_client.wait_for_finalized_block(number).await,
472 L1Finalized::Timestamp { timestamp } => {
473 l1_client
474 .wait_for_finalized_block_with_timestamp(U256::from(timestamp.unix_timestamp()))
475 .await
476 },
477 };
478
479 let mut genesis_state = ValidatedState {
480 chain_config: genesis.chain_config.into(),
481 ..Default::default()
482 };
483 for (address, amount) in genesis.accounts {
484 tracing::info!(%address, %amount, "Prefunding account for demo");
485 genesis_state.prefund_account(address, amount);
486 }
487
488 let state_catchup_providers = ParallelStateCatchup::new(&[]);
490
491 let state_peers = StatePeers::<SequencerApiVersion>::from_urls(
493 network_params.state_peers,
494 network_params.catchup_backoff,
495 metrics,
496 );
497 state_catchup_providers.add_provider(Arc::new(state_peers));
498
499 match persistence
501 .clone()
502 .into_catchup_provider(network_params.catchup_backoff)
503 {
504 Ok(catchup) => {
505 state_catchup_providers.add_provider(Arc::new(catchup));
506 },
507 Err(e) => {
508 tracing::warn!(
509 "Failed to create local catchup provider: {e:#}. Only using remote catchup."
510 );
511 },
512 };
513
514 persistence.enable_metrics(metrics);
515
516 let fetcher = Fetcher::new(
517 Arc::new(state_catchup_providers.clone()),
518 Arc::new(Mutex::new(persistence.clone())),
519 l1_client.clone(),
520 genesis.chain_config,
521 );
522 fetcher.spawn_update_loop().await;
523 let block_reward = fetcher.fetch_block_reward().await.ok().unwrap_or_default();
524 let mut membership = EpochCommittees::new_stake(
526 network_config.config.known_nodes_with_stake.clone(),
527 network_config.config.known_da_nodes.clone(),
528 block_reward,
529 fetcher,
530 );
531 membership.reload_stake(RECENT_STAKE_TABLES_LIMIT).await;
532
533 let membership: Arc<RwLock<EpochCommittees>> = Arc::new(RwLock::new(membership));
534 let persistence = Arc::new(persistence);
535 let coordinator = EpochMembershipCoordinator::new(
536 membership,
537 network_config.config.epoch_height,
538 &persistence.clone(),
539 );
540
541 let instance_state = NodeState {
542 chain_config: genesis.chain_config,
543 l1_client,
544 genesis_header: genesis.header,
545 genesis_state,
546 l1_genesis: Some(l1_genesis),
547 node_id: node_index,
548 upgrades: genesis.upgrades,
549 current_version: V::Base::VERSION,
550 epoch_height: Some(epoch_height),
551 state_catchup: Arc::new(state_catchup_providers.clone()),
552 coordinator: coordinator.clone(),
553 genesis_version: genesis.genesis_version,
554 };
555
556 let network = {
558 let p2p_network = Libp2pNetwork::from_config(
559 network_config.clone(),
560 persistence.clone(),
561 coordinator.membership().clone(),
562 gossip_config,
563 request_response_config,
564 libp2p_bind_address,
565 &validator_config.public_key,
566 &validator_config.private_key,
569 hotshot::traits::implementations::Libp2pMetricsValue::new(metrics),
570 )
571 .await
572 .with_context(|| {
573 format!(
574 "Failed to create libp2p network on node {node_index}; binding to {:?}",
575 network_params.libp2p_bind_address
576 )
577 })?;
578
579 tracing::warn!("Waiting for at least one connection to be initialized");
580 select! {
581 _ = cdn_network.wait_for_ready() => {
582 tracing::warn!("CDN connection initialized");
583 },
584 _ = p2p_network.wait_for_ready() => {
585 tracing::warn!("P2P connection initialized");
586 },
587 };
588
589 Arc::from(CombinedNetworks::new(
591 cdn_network,
592 p2p_network,
593 Some(Duration::from_secs(1)),
594 ))
595 };
596
597 let mut ctx = SequencerContext::init(
598 network_config,
599 validator_config,
600 coordinator,
601 instance_state,
602 storage,
603 state_catchup_providers,
604 persistence,
605 network,
606 Some(network_params.state_relay_server_url),
607 metrics,
608 genesis.stake_table.capacity,
609 event_consumer,
610 seq_versions,
611 proposal_fetcher_config,
612 )
613 .await?;
614 if wait_for_orchestrator {
615 ctx = ctx.wait_for_orchestrator(orchestrator_client);
616 }
617 Ok(ctx)
618}
619
620pub fn empty_builder_commitment() -> BuilderCommitment {
621 BuilderCommitment::from_bytes([])
622}
623
624#[cfg(any(test, feature = "testing"))]
625pub mod testing {
626 use std::{
627 collections::{BTreeMap, HashMap},
628 time::Duration,
629 };
630
631 use alloy::{
632 network::EthereumWallet,
633 node_bindings::{Anvil, AnvilInstance},
634 primitives::U256,
635 providers::{
636 fillers::{
637 BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
638 },
639 layers::AnvilProvider,
640 ProviderBuilder, RootProvider,
641 },
642 signers::{
643 k256::ecdsa::SigningKey,
644 local::{LocalSigner, PrivateKeySigner},
645 },
646 };
647 use async_lock::RwLock;
648 use catchup::NullStateCatchup;
649 use committable::Committable;
650 use espresso_contract_deployer::{
651 builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
652 Contract, Contracts,
653 };
654 use espresso_types::{
655 eth_signature_key::EthKeyPair,
656 v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, StateCatchup},
657 EpochVersion, Event, FeeAccount, L1Client, NetworkConfig, PubKey, SeqTypes, Transaction,
658 Upgrade, UpgradeMap,
659 };
660 use futures::{
661 future::join_all,
662 stream::{Stream, StreamExt},
663 };
664 use hotshot::{
665 traits::{
666 implementations::{MasterMap, MemoryNetwork},
667 BlockPayload,
668 },
669 types::EventType::Decide,
670 };
671 use hotshot_builder_core_refactored::service::{
672 BuilderConfig as LegacyBuilderConfig, GlobalState as LegacyGlobalState,
673 };
674 use hotshot_testing::block_builder::{
675 BuilderTask, SimpleBuilderImplementation, TestBuilderImplementation,
676 };
677 use hotshot_types::{
678 event::LeafInfo,
679 light_client::StateKeyPair,
680 signature_key::BLSKeyPair,
681 traits::{
682 block_contents::BlockHeader, metrics::NoMetrics, network::Topic,
683 signature_key::BuilderSignatureKey, EncodeBytes,
684 },
685 HotShotConfig, PeerConfig,
686 };
687 use portpicker::pick_unused_port;
688 use rand::SeedableRng as _;
689 use rand_chacha::ChaCha20Rng;
690 use staking_cli::demo::{setup_stake_table_contract_for_test, DelegationConfig};
691 use tokio::spawn;
692 use vbs::version::Version;
693
694 use super::*;
695 use crate::{
696 catchup::ParallelStateCatchup,
697 persistence::no_storage::{self, NoStorage},
698 };
699
700 const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
701 const BUILDER_CHANNEL_CAPACITY_FOR_TEST: usize = 128;
702 type AnvilFillProvider = AnvilProvider<
703 FillProvider<
704 JoinFill<
705 alloy::providers::Identity,
706 JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
707 >,
708 RootProvider,
709 >,
710 >;
711 struct LegacyBuilderImplementation {
712 global_state: Arc<LegacyGlobalState<SeqTypes>>,
713 }
714
715 impl BuilderTask<SeqTypes> for LegacyBuilderImplementation {
716 fn start(
717 self: Box<Self>,
718 stream: Box<
719 dyn futures::prelude::Stream<Item = hotshot::types::Event<SeqTypes>>
720 + std::marker::Unpin
721 + Send
722 + 'static,
723 >,
724 ) {
725 spawn(async move {
726 let res = self.global_state.start_event_loop(stream).await;
727 tracing::error!(?res, "testing legacy builder service exited");
728 });
729 }
730 }
731
732 pub async fn run_legacy_builder<const NUM_NODES: usize>(
733 port: Option<u16>,
734 max_block_size: Option<u64>,
735 ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
736 let builder_key_pair = TestConfig::<0>::builder_key();
737 let port = port.unwrap_or_else(|| pick_unused_port().expect("No ports available"));
738
739 let url: Url = format!("http://localhost:{port}")
741 .parse()
742 .expect("Failed to parse builder URL");
743
744 let global_state = LegacyGlobalState::new(
746 LegacyBuilderConfig {
747 builder_keys: (builder_key_pair.fee_account(), builder_key_pair),
748 max_api_waiting_time: Duration::from_secs(1),
749 max_block_size_increment_period: Duration::from_secs(60),
750 maximize_txn_capture_timeout: Duration::from_millis(100),
751 txn_garbage_collect_duration: Duration::from_secs(60),
752 txn_channel_capacity: BUILDER_CHANNEL_CAPACITY_FOR_TEST,
753 tx_status_cache_capacity: 81920,
754 base_fee: 10,
755 },
756 NodeState::default(),
757 max_block_size.unwrap_or(300),
758 NUM_NODES,
759 );
760
761 let app = Arc::clone(&global_state)
763 .into_app()
764 .expect("Failed to create builder tide-disco app");
765
766 spawn(
767 app.serve(
768 format!("http://0.0.0.0:{port}")
769 .parse::<Url>()
770 .expect("Failed to parse builder listener"),
771 EpochVersion::instance(),
772 ),
773 );
774
775 (Box::new(LegacyBuilderImplementation { global_state }), url)
777 }
778
779 pub async fn run_test_builder<const NUM_NODES: usize>(
780 port: Option<u16>,
781 ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
782 let port = port.unwrap_or_else(|| pick_unused_port().expect("No ports available"));
783
784 let url: Url = format!("http://localhost:{port}")
786 .parse()
787 .expect("Failed to parse builder URL");
788 tracing::info!("Starting test builder on {url}");
789
790 (
791 <SimpleBuilderImplementation as TestBuilderImplementation<SeqTypes>>::start(
792 NUM_NODES,
793 format!("http://0.0.0.0:{port}")
794 .parse()
795 .expect("Failed to parse builder listener"),
796 (),
797 HashMap::new(),
798 )
799 .await,
800 url,
801 )
802 }
803
804 pub struct TestConfigBuilder<const NUM_NODES: usize> {
805 config: HotShotConfig<SeqTypes>,
806 priv_keys: Vec<BLSPrivKey>,
807 state_key_pairs: Vec<StateKeyPair>,
808 master_map: Arc<MasterMap<PubKey>>,
809 l1_url: Url,
810 l1_opt: L1ClientOptions,
811 anvil_provider: Option<AnvilFillProvider>,
812 signer: LocalSigner<SigningKey>,
813 state_relay_url: Option<Url>,
814 builder_port: Option<u16>,
815 upgrades: BTreeMap<Version, Upgrade>,
816 }
817
818 pub fn staking_priv_keys(
819 priv_keys: &[BLSPrivKey],
820 state_key_pairs: &[StateKeyPair],
821 num_nodes: usize,
822 ) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
823 let seed = [42u8; 32];
824 let mut rng = ChaCha20Rng::from_seed(seed); let eth_key_pairs = (0..num_nodes).map(|_| SigningKey::random(&mut rng).into());
826 eth_key_pairs
827 .zip(priv_keys.iter())
828 .zip(state_key_pairs.iter())
829 .map(|((eth, bls), state)| (eth, bls.clone().into(), state.clone()))
830 .collect()
831 }
832
833 impl<const NUM_NODES: usize> TestConfigBuilder<NUM_NODES> {
834 pub fn builder_port(mut self, builder_port: Option<u16>) -> Self {
835 self.builder_port = builder_port;
836 self
837 }
838
839 pub fn state_relay_url(mut self, url: Url) -> Self {
840 self.state_relay_url = Some(url);
841 self
842 }
843
844 pub fn anvil_provider(mut self, anvil: AnvilInstance) -> Self {
849 self.l1_url = anvil.endpoint().parse().unwrap();
850 let l1_client = L1Client::anvil(&anvil).expect("create l1 client");
851 let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
852 self.anvil_provider = Some(anvil_provider);
853 self
854 }
855
856 pub fn l1_url(mut self, l1_url: Url) -> Self {
859 self.anvil_provider = None;
860 self.l1_url = l1_url;
861 self
862 }
863
864 pub fn l1_opt(mut self, opt: L1ClientOptions) -> Self {
865 self.l1_opt = opt;
866 self
867 }
868
869 pub fn signer(mut self, signer: LocalSigner<SigningKey>) -> Self {
870 self.signer = signer;
871 self
872 }
873
874 pub fn upgrades<V: Versions>(mut self, upgrades: BTreeMap<Version, Upgrade>) -> Self {
875 let upgrade = upgrades.get(&<V as Versions>::Upgrade::VERSION).unwrap();
876 upgrade.set_hotshot_config_parameters(&mut self.config);
877 self.upgrades = upgrades;
878 self
879 }
880
881 pub async fn set_upgrades(mut self, version: Version) -> Self {
884 let upgrade = match version {
885 version if version >= EpochVersion::VERSION => {
886 tracing::debug!(?version, "upgrade version");
887 let blocks_per_epoch = self.config.epoch_height;
888 let epoch_start_block = self.config.epoch_start_block;
889
890 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
891 &self.config.hotshot_stake_table(),
892 STAKE_TABLE_CAPACITY_FOR_TEST,
893 )
894 .unwrap();
895
896 let validators =
897 staking_priv_keys(&self.priv_keys, &self.state_key_pairs, NUM_NODES);
898
899 let deployer = ProviderBuilder::new()
900 .wallet(EthereumWallet::from(self.signer.clone()))
901 .on_http(self.l1_url.clone());
902
903 let mut contracts = Contracts::new();
904 let args = DeployerArgsBuilder::default()
905 .deployer(deployer.clone())
906 .mock_light_client(true)
907 .genesis_lc_state(genesis_state)
908 .genesis_st_state(genesis_stake)
909 .blocks_per_epoch(blocks_per_epoch)
910 .epoch_start_block(epoch_start_block)
911 .multisig_pauser(self.signer.address())
912 .build()
913 .unwrap();
914 args.deploy_all(&mut contracts)
915 .await
916 .expect("failed to deploy all contracts");
917
918 let st_addr = contracts
919 .address(Contract::StakeTableProxy)
920 .expect("StakeTableProxy address not found");
921 let token_addr = contracts
922 .address(Contract::EspTokenProxy)
923 .expect("EspTokenProxy address not found");
924 setup_stake_table_contract_for_test(
925 self.l1_url.clone(),
926 &deployer,
927 st_addr,
928 token_addr,
929 validators,
930 DelegationConfig::default(),
931 )
932 .await
933 .expect("stake table setup failed");
934
935 Upgrade::pos_view_based(st_addr)
936 },
937 _ => panic!("Upgrade not configured for version {version:?}"),
938 };
939
940 let mut upgrades = std::collections::BTreeMap::new();
941 upgrade.set_hotshot_config_parameters(&mut self.config);
942 upgrades.insert(version, upgrade);
943
944 self.upgrades = upgrades;
945 self
946 }
947
948 pub fn epoch_height(mut self, epoch_height: u64) -> Self {
949 self.config.epoch_height = epoch_height;
950 self
951 }
952
953 pub fn epoch_start_block(mut self, start_block: u64) -> Self {
954 self.config.epoch_start_block = start_block;
955 self
956 }
957
958 pub fn build(self) -> TestConfig<NUM_NODES> {
959 TestConfig {
960 config: self.config,
961 priv_keys: self.priv_keys,
962 state_key_pairs: self.state_key_pairs,
963 master_map: self.master_map,
964 l1_url: self.l1_url,
965 l1_opt: self.l1_opt,
966 signer: self.signer,
967 state_relay_url: self.state_relay_url,
968 builder_port: self.builder_port,
969 upgrades: self.upgrades,
970 anvil_provider: self.anvil_provider,
971 }
972 }
973
974 pub fn stake_table_capacity(mut self, stake_table_capacity: usize) -> Self {
975 self.config.stake_table_capacity = stake_table_capacity;
976 self
977 }
978 }
979
980 impl<const NUM_NODES: usize> Default for TestConfigBuilder<NUM_NODES> {
981 fn default() -> Self {
982 let num_nodes = NUM_NODES;
983
984 let seed = [0; 32];
986 let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..num_nodes)
987 .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed(seed, i as u64))
988 .unzip();
989 let state_key_pairs = (0..num_nodes)
990 .map(|i| StateKeyPair::generate_from_seed_indexed(seed, i as u64))
991 .collect::<Vec<_>>();
992 let known_nodes_with_stake = pub_keys
993 .iter()
994 .zip(&state_key_pairs)
995 .map(|(pub_key, state_key_pair)| PeerConfig::<SeqTypes> {
996 stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
997 state_ver_key: state_key_pair.ver_key(),
998 })
999 .collect::<Vec<_>>();
1000
1001 let master_map = MasterMap::new();
1002
1003 let config: HotShotConfig<SeqTypes> = HotShotConfig {
1004 fixed_leader_for_gpuvid: 0,
1005 num_nodes_with_stake: num_nodes.try_into().unwrap(),
1006 known_da_nodes: known_nodes_with_stake.clone(),
1007 known_nodes_with_stake: known_nodes_with_stake.clone(),
1008 next_view_timeout: Duration::from_secs(5).as_millis() as u64,
1009 num_bootstrap: 1usize,
1010 da_staked_committee_size: num_nodes,
1011 view_sync_timeout: Duration::from_secs(1),
1012 data_request_delay: Duration::from_secs(1),
1013 builder_urls: vec1::vec1![Url::parse(&format!(
1014 "http://127.0.0.1:{}",
1015 pick_unused_port().unwrap()
1016 ))
1017 .unwrap()],
1018 builder_timeout: Duration::from_secs(1),
1019 start_threshold: (
1020 known_nodes_with_stake.clone().len() as u64,
1021 known_nodes_with_stake.clone().len() as u64,
1022 ),
1023 start_proposing_view: 0,
1024 stop_proposing_view: 0,
1025 start_voting_view: 0,
1026 stop_voting_view: 0,
1027 start_proposing_time: 0,
1028 start_voting_time: 0,
1029 stop_proposing_time: 0,
1030 stop_voting_time: 0,
1031 epoch_height: 30,
1032 epoch_start_block: 1,
1033 stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
1034 drb_difficulty: 10,
1035 drb_upgrade_difficulty: 20,
1036 };
1037
1038 let anvil = Anvil::new().args(["--slots-in-an-epoch", "0"]).spawn();
1039
1040 let l1_client = L1Client::anvil(&anvil).expect("failed to create l1 client");
1041 let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1042
1043 let l1_signer_key = anvil_provider.anvil().keys()[0].clone();
1044 let signer = LocalSigner::from(l1_signer_key);
1045
1046 Self {
1047 config,
1048 priv_keys,
1049 state_key_pairs,
1050 master_map,
1051 l1_url: anvil_provider.anvil().endpoint().parse().unwrap(),
1052 l1_opt: L1ClientOptions {
1053 stake_table_update_interval: Duration::from_secs(5),
1054 l1_events_max_block_range: 1000,
1055 l1_polling_interval: Duration::from_secs(1),
1056 subscription_timeout: Duration::from_secs(5),
1057 ..Default::default()
1058 },
1059 anvil_provider: Some(anvil_provider),
1060 signer,
1061 state_relay_url: None,
1062 builder_port: None,
1063 upgrades: Default::default(),
1064 }
1065 }
1066 }
1067
1068 #[derive(Clone)]
1069 pub struct TestConfig<const NUM_NODES: usize> {
1070 config: HotShotConfig<SeqTypes>,
1071 priv_keys: Vec<BLSPrivKey>,
1072 state_key_pairs: Vec<StateKeyPair>,
1073 master_map: Arc<MasterMap<PubKey>>,
1074 l1_url: Url,
1075 l1_opt: L1ClientOptions,
1076 anvil_provider: Option<AnvilFillProvider>,
1077 signer: LocalSigner<SigningKey>,
1078 state_relay_url: Option<Url>,
1079 builder_port: Option<u16>,
1080 upgrades: BTreeMap<Version, Upgrade>,
1081 }
1082
1083 impl<const NUM_NODES: usize> TestConfig<NUM_NODES> {
1084 pub fn num_nodes(&self) -> usize {
1085 self.priv_keys.len()
1086 }
1087
1088 pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1089 &self.config
1090 }
1091
1092 pub fn set_builder_urls(&mut self, builder_urls: vec1::Vec1<Url>) {
1093 self.config.builder_urls = builder_urls;
1094 }
1095
1096 pub fn builder_port(&self) -> Option<u16> {
1097 self.builder_port
1098 }
1099
1100 pub fn signer(&self) -> LocalSigner<SigningKey> {
1101 self.signer.clone()
1102 }
1103
1104 pub fn l1_url(&self) -> Url {
1105 self.l1_url.clone()
1106 }
1107 pub fn anvil(&self) -> Option<&AnvilFillProvider> {
1108 self.anvil_provider.as_ref()
1109 }
1110
1111 pub fn get_upgrade_map(&self) -> UpgradeMap {
1112 self.upgrades.clone().into()
1113 }
1114
1115 pub fn upgrades(&self) -> BTreeMap<Version, Upgrade> {
1116 self.upgrades.clone()
1117 }
1118
1119 pub fn staking_priv_keys(&self) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
1120 staking_priv_keys(&self.priv_keys, &self.state_key_pairs, self.num_nodes())
1121 }
1122
1123 pub async fn init_nodes<V: Versions>(
1124 &self,
1125 bind_version: V,
1126 ) -> Vec<SequencerContext<network::Memory, NoStorage, V>> {
1127 join_all((0..self.num_nodes()).map(|i| async move {
1128 self.init_node(
1129 i,
1130 ValidatedState::default(),
1131 no_storage::Options,
1132 Some(NullStateCatchup::default()),
1133 None,
1134 &NoMetrics,
1135 STAKE_TABLE_CAPACITY_FOR_TEST,
1136 NullEventConsumer,
1137 bind_version,
1138 Default::default(),
1139 )
1140 .await
1141 }))
1142 .await
1143 }
1144
1145 pub fn known_nodes_with_stake(&self) -> &[PeerConfig<SeqTypes>] {
1146 &self.config.known_nodes_with_stake
1147 }
1148
1149 #[allow(clippy::too_many_arguments)]
1150 pub async fn init_node<V: Versions, P: PersistenceOptions>(
1151 &self,
1152 i: usize,
1153 mut state: ValidatedState,
1154 mut persistence_opt: P,
1155 state_peers: Option<impl StateCatchup + 'static>,
1156 storage: Option<RequestResponseStorage>,
1157 metrics: &dyn Metrics,
1158 stake_table_capacity: usize,
1159 event_consumer: impl EventConsumer + 'static,
1160 bind_version: V,
1161 upgrades: BTreeMap<Version, Upgrade>,
1162 ) -> SequencerContext<network::Memory, P::Persistence, V> {
1163 let config = self.config.clone();
1164 let my_peer_config = &config.known_nodes_with_stake[i];
1165 let is_da = config.known_da_nodes.contains(my_peer_config);
1166
1167 let validator_config = ValidatorConfig {
1169 public_key: my_peer_config.stake_table_entry.stake_key,
1170 private_key: self.priv_keys[i].clone(),
1171 stake_value: my_peer_config.stake_table_entry.stake_amount,
1172 state_public_key: self.state_key_pairs[i].ver_key(),
1173 state_private_key: self.state_key_pairs[i].sign_key(),
1174 is_da,
1175 };
1176
1177 let topics = if is_da {
1178 vec![Topic::Global, Topic::Da]
1179 } else {
1180 vec![Topic::Global]
1181 };
1182
1183 let network = Arc::new(MemoryNetwork::new(
1184 &my_peer_config.stake_table_entry.stake_key,
1185 &self.master_map,
1186 &topics,
1187 None,
1188 ));
1189
1190 let builder_account = Self::builder_key().fee_account();
1192 tracing::info!(%builder_account, "prefunding builder account");
1193 state.prefund_account(builder_account, U256::MAX.into());
1194
1195 let persistence = persistence_opt.create().await.unwrap();
1196
1197 let chain_config = state.chain_config.resolve().unwrap_or_default();
1198
1199 let catchup_providers = ParallelStateCatchup::new(&[]);
1201
1202 if let Some(state_peers) = state_peers {
1204 catchup_providers.add_provider(Arc::new(state_peers));
1205 }
1206
1207 match persistence
1209 .clone()
1210 .into_catchup_provider(BackoffParams::default())
1211 {
1212 Ok(local_catchup) => {
1213 catchup_providers.add_provider(local_catchup);
1214 },
1215 Err(e) => {
1216 tracing::warn!(
1217 "Failed to create local catchup provider: {e:#}. Only using remote catchup."
1218 );
1219 },
1220 };
1221
1222 let l1_client = self
1223 .l1_opt
1224 .clone()
1225 .connect(vec![self.l1_url.clone()])
1226 .expect("failed to create L1 client");
1227 l1_client.spawn_tasks().await;
1228
1229 let fetcher = Fetcher::new(
1230 Arc::new(catchup_providers.clone()),
1231 Arc::new(Mutex::new(persistence.clone())),
1232 l1_client.clone(),
1233 chain_config,
1234 );
1235 fetcher.spawn_update_loop().await;
1236
1237 let block_reward = fetcher.fetch_block_reward().await.ok().unwrap_or_default();
1238 let mut membership = EpochCommittees::new_stake(
1239 config.known_nodes_with_stake.clone(),
1240 config.known_da_nodes.clone(),
1241 block_reward,
1242 fetcher,
1243 );
1244 membership.reload_stake(50).await;
1245
1246 let membership = Arc::new(RwLock::new(membership));
1247 let persistence = Arc::new(persistence);
1248
1249 let coordinator = EpochMembershipCoordinator::new(
1250 membership,
1251 config.epoch_height,
1252 &persistence.clone(),
1253 );
1254
1255 let node_state = NodeState::new(
1256 i as u64,
1257 chain_config,
1258 l1_client,
1259 Arc::new(catchup_providers.clone()),
1260 V::Base::VERSION,
1261 coordinator.clone(),
1262 Version { major: 0, minor: 1 },
1263 )
1264 .with_current_version(V::Base::version())
1265 .with_genesis(state)
1266 .with_epoch_height(config.epoch_height)
1267 .with_upgrades(upgrades);
1268
1269 tracing::info!(
1270 i,
1271 key = %my_peer_config.stake_table_entry.stake_key,
1272 state_key = %my_peer_config.state_ver_key,
1273 "starting node",
1274 );
1275
1276 SequencerContext::init(
1277 NetworkConfig {
1278 config,
1279 ..Default::default()
1282 },
1283 validator_config,
1284 coordinator,
1285 node_state,
1286 storage,
1287 catchup_providers,
1288 persistence,
1289 network,
1290 self.state_relay_url.clone(),
1291 metrics,
1292 stake_table_capacity,
1293 event_consumer,
1294 bind_version,
1295 Default::default(),
1296 )
1297 .await
1298 .unwrap()
1299 }
1300
1301 pub fn builder_key() -> EthKeyPair {
1302 FeeAccount::generated_from_seed_indexed([1; 32], 0).1
1303 }
1304 }
1305
1306 pub async fn wait_for_decide_on_handle(
1309 events: &mut (impl Stream<Item = Event> + Unpin),
1310 submitted_txn: &Transaction,
1311 ) -> (u64, usize) {
1312 let commitment = submitted_txn.commit();
1313
1314 loop {
1316 let event = events.next().await.unwrap();
1317 tracing::info!("Received event from handle: {event:?}");
1318
1319 if let Decide { leaf_chain, .. } = event.event {
1320 if let Some((height, size)) =
1321 leaf_chain.iter().find_map(|LeafInfo { leaf, .. }| {
1322 if leaf
1323 .block_payload()
1324 .as_ref()?
1325 .transaction_commitments(leaf.block_header().metadata())
1326 .contains(&commitment)
1327 {
1328 let size = leaf.block_payload().unwrap().encode().len();
1329 Some((leaf.block_header().block_number(), size))
1330 } else {
1331 None
1332 }
1333 })
1334 {
1335 tracing::info!(height, "transaction {commitment} sequenced");
1336 return (height, size);
1337 }
1338 } else {
1339 }
1341 }
1342 }
1343}
1344
1345#[cfg(test)]
1346mod test {
1347
1348 use alloy::node_bindings::Anvil;
1349 use espresso_types::{Header, MockSequencerVersions, NamespaceId, Payload, Transaction};
1350 use futures::StreamExt;
1351 use hotshot::types::EventType::Decide;
1352 use hotshot_example_types::node_types::TestVersions;
1353 use hotshot_types::{
1354 event::LeafInfo,
1355 traits::block_contents::{BlockHeader, BlockPayload},
1356 };
1357 use sequencer_utils::test_utils::setup_test;
1358 use testing::{wait_for_decide_on_handle, TestConfigBuilder};
1359
1360 use self::testing::run_test_builder;
1361 use super::*;
1362
1363 #[tokio::test(flavor = "multi_thread")]
1364 async fn test_skeleton_instantiation() {
1365 setup_test();
1366 let anvil = Anvil::new().spawn();
1368 let url = anvil.endpoint_url();
1369 const NUM_NODES: usize = 5;
1370 let mut config = TestConfigBuilder::<NUM_NODES>::default()
1371 .l1_url(url)
1372 .build();
1373
1374 let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1375
1376 config.set_builder_urls(vec1::vec1![builder_url]);
1377
1378 let handles = config.init_nodes(MockSequencerVersions::new()).await;
1379
1380 let handle_0 = &handles[0];
1381
1382 builder_task.start(Box::new(handle_0.event_stream().await));
1384
1385 let mut events = handle_0.event_stream().await;
1386
1387 for handle in handles.iter() {
1388 handle.start_consensus().await;
1389 }
1390
1391 let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3]);
1393 handles[0]
1394 .submit_transaction(txn.clone())
1395 .await
1396 .expect("Failed to submit transaction");
1397 tracing::info!("Submitted transaction to handle: {txn:?}");
1398
1399 wait_for_decide_on_handle(&mut events, &txn).await;
1400 }
1401
1402 #[tokio::test(flavor = "multi_thread")]
1403 async fn test_header_invariants() {
1404 setup_test();
1405
1406 let success_height = 30;
1407 let anvil = Anvil::new().spawn();
1409 let url = anvil.endpoint_url();
1410 const NUM_NODES: usize = 5;
1411 let mut config = TestConfigBuilder::<NUM_NODES>::default()
1412 .l1_url(url)
1413 .build();
1414
1415 let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1416
1417 config.set_builder_urls(vec1::vec1![builder_url]);
1418 let handles = config.init_nodes(MockSequencerVersions::new()).await;
1419
1420 let handle_0 = &handles[0];
1421
1422 let mut events = handle_0.event_stream().await;
1423
1424 builder_task.start(Box::new(handle_0.event_stream().await));
1426
1427 for handle in handles.iter() {
1428 handle.start_consensus().await;
1429 }
1430
1431 let mut parent = {
1432 let (genesis_payload, genesis_ns_table) =
1434 Payload::from_transactions([], &ValidatedState::default(), &NodeState::mock())
1435 .await
1436 .unwrap();
1437
1438 let genesis_state = NodeState::mock();
1439 Header::genesis::<TestVersions>(&genesis_state, genesis_payload, &genesis_ns_table)
1440 };
1441
1442 loop {
1443 let event = events.next().await.unwrap();
1444 tracing::info!("Received event from handle: {event:?}");
1445 let Decide { leaf_chain, .. } = event.event else {
1446 continue;
1447 };
1448 tracing::info!("Got decide {leaf_chain:?}");
1449
1450 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
1453 let header = leaf.block_header().clone();
1454 if header.height() == 0 {
1455 parent = header;
1456 continue;
1457 }
1458 assert_eq!(header.height(), parent.height() + 1);
1459 assert!(header.timestamp() >= parent.timestamp());
1460 assert!(header.l1_head() >= parent.l1_head());
1461 assert!(header.l1_finalized() >= parent.l1_finalized());
1462 parent = header;
1463 }
1464
1465 if parent.height() >= success_height {
1466 break;
1467 }
1468 }
1469 }
1470}