sequencer/
lib.rs

1pub mod api;
2pub mod catchup;
3pub mod context;
4pub mod genesis;
5mod proposal_fetcher;
6mod request_response;
7pub mod state_cert;
8pub mod util;
9
10mod external_event_handler;
11pub mod options;
12pub mod state_signature;
13
14mod message_compat_tests;
15
16use std::sync::Arc;
17
18use alloy::primitives::U256;
19use anyhow::Context;
20use async_lock::{Mutex, RwLock};
21use catchup::{ParallelStateCatchup, StatePeers};
22use context::SequencerContext;
23use espresso_types::{
24    traits::{EventConsumer, MembershipPersistence},
25    v0_3::Fetcher,
26    BackoffParams, EpochCommittees, L1ClientOptions, NodeState, PubKey, SeqTypes, ValidatedState,
27};
28use genesis::L1Finalized;
29use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
30use libp2p::Multiaddr;
31use network::libp2p::split_off_peer_id;
32use options::Identity;
33use proposal_fetcher::ProposalFetcherConfig;
34use tokio::select;
35use tracing::info;
36use url::Url;
37
38use crate::request_response::data_source::Storage as RequestResponseStorage;
39pub mod persistence;
40pub mod state;
41use std::{fmt::Debug, marker::PhantomData, time::Duration};
42
43use derivative::Derivative;
44use espresso_types::v0::traits::SequencerPersistence;
45pub use genesis::Genesis;
46use hotshot::{
47    traits::implementations::{
48        derive_libp2p_multiaddr, derive_libp2p_peer_id, CdnMetricsValue, CdnTopic,
49        CombinedNetworks, GossipConfig, KeyPair, Libp2pNetwork, MemoryNetwork, PushCdnNetwork,
50        RequestResponseConfig, WrappedSignatureKey,
51    },
52    types::SignatureKey,
53};
54use hotshot_orchestrator::client::{get_complete_config, OrchestratorClient};
55use hotshot_types::{
56    data::ViewNumber,
57    epoch_membership::EpochMembershipCoordinator,
58    light_client::{StateKeyPair, StateSignKey},
59    signature_key::{BLSPrivKey, BLSPubKey},
60    traits::{
61        metrics::{Metrics, NoMetrics},
62        network::ConnectedNetwork,
63        node_implementation::{NodeImplementation, NodeType, Versions},
64        storage::Storage,
65    },
66    utils::BuilderCommitment,
67    ValidatorConfig,
68};
69pub use options::Options;
70use serde::{Deserialize, Serialize};
71use vbs::version::{StaticVersion, StaticVersionType};
72pub mod network;
73
74pub mod run;
75pub use run::main;
76
77pub const RECENT_STAKE_TABLES_LIMIT: u64 = 20;
78/// The Sequencer node is generic over the hotshot CommChannel.
79#[derive(Derivative, Serialize, Deserialize)]
80#[derivative(
81    Copy(bound = ""),
82    Debug(bound = ""),
83    Default(bound = ""),
84    PartialEq(bound = ""),
85    Eq(bound = ""),
86    Hash(bound = "")
87)]
88pub struct Node<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(PhantomData<fn(&N, &P)>);
89
90// Using derivative to derive Clone triggers the clippy lint
91// https://rust-lang.github.io/rust-clippy/master/index.html#/incorrect_clone_impl_on_copy_type
92impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Clone for Node<N, P> {
93    fn clone(&self) -> Self {
94        *self
95    }
96}
97
98pub type SequencerApiVersion = StaticVersion<0, 1>;
99
100impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> NodeImplementation<SeqTypes>
101    for Node<N, P>
102{
103    type Network = N;
104    type Storage = Arc<P>;
105}
106
107#[derive(Clone, Debug)]
108pub struct NetworkParams {
109    /// The address where a CDN marshal is located
110    pub cdn_endpoint: String,
111    pub orchestrator_url: Url,
112    pub state_relay_server_url: Url,
113
114    /// The URLs of the builders to use for submitting transactions
115    pub builder_urls: Vec<Url>,
116
117    pub private_staking_key: BLSPrivKey,
118    pub private_state_key: StateSignKey,
119    pub state_peers: Vec<Url>,
120    pub config_peers: Option<Vec<Url>>,
121    pub catchup_backoff: BackoffParams,
122    /// The address to advertise as our public API's URL
123    pub public_api_url: Option<Url>,
124
125    /// The address to send to other Libp2p nodes to contact us
126    pub libp2p_advertise_address: String,
127    /// The address to bind to for Libp2p
128    pub libp2p_bind_address: String,
129    /// The (optional) bootstrap node addresses for Libp2p. If supplied, these will
130    /// override the bootstrap nodes specified in the config file.
131    pub libp2p_bootstrap_nodes: Option<Vec<Multiaddr>>,
132
133    /// The heartbeat interval
134    pub libp2p_heartbeat_interval: Duration,
135
136    /// The number of past heartbeats to gossip about
137    pub libp2p_history_gossip: usize,
138    /// The number of past heartbeats to remember the full messages for
139    pub libp2p_history_length: usize,
140
141    /// The target number of peers in the mesh
142    pub libp2p_mesh_n: usize,
143    /// The maximum number of peers in the mesh
144    pub libp2p_mesh_n_high: usize,
145    /// The minimum number of peers in the mesh
146    pub libp2p_mesh_n_low: usize,
147    /// The minimum number of mesh peers that must be outbound
148    pub libp2p_mesh_outbound_min: usize,
149
150    /// The maximum gossip message size
151    pub libp2p_max_gossip_transmit_size: usize,
152
153    /// The maximum direct message size
154    pub libp2p_max_direct_transmit_size: u64,
155
156    /// The maximum number of IHAVE messages to accept from a Libp2p peer within a heartbeat
157    pub libp2p_max_ihave_length: usize,
158
159    /// The maximum number of IHAVE messages to accept from a Libp2p peer within a heartbeat
160    pub libp2p_max_ihave_messages: usize,
161
162    /// The time period that message hashes are stored in the cache
163    pub libp2p_published_message_ids_cache_time: Duration,
164
165    /// The time to wait for a Libp2p message requested through IWANT following an IHAVE advertisement
166    pub libp2p_iwant_followup_time: Duration,
167
168    /// The maximum number of Libp2p messages we will process in a given RPC
169    pub libp2p_max_messages_per_rpc: Option<usize>,
170
171    /// How many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them
172    pub libp2p_gossip_retransmission: u32,
173
174    /// If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score
175    pub libp2p_flood_publish: bool,
176
177    /// The time period that Libp2p message hashes are stored in the cache
178    pub libp2p_duplicate_cache_time: Duration,
179
180    /// Time to live for Libp2p fanout peers
181    pub libp2p_fanout_ttl: Duration,
182
183    /// Initial delay in each Libp2p heartbeat
184    pub libp2p_heartbeat_initial_delay: Duration,
185
186    /// How many Libp2p peers we will emit gossip to at each heartbeat
187    pub libp2p_gossip_factor: f64,
188
189    /// Minimum number of Libp2p peers to emit gossip to during a heartbeat
190    pub libp2p_gossip_lazy: usize,
191}
192
193pub struct L1Params {
194    pub urls: Vec<Url>,
195    pub options: L1ClientOptions,
196}
197
198#[allow(clippy::too_many_arguments)]
199pub async fn init_node<
200    P: SequencerPersistence + MembershipPersistence + DhtPersistentStorage,
201    V: Versions,
202>(
203    genesis: Genesis,
204    network_params: NetworkParams,
205    metrics: &dyn Metrics,
206    mut persistence: P,
207    l1_params: L1Params,
208    storage: Option<RequestResponseStorage>,
209    seq_versions: V,
210    event_consumer: impl EventConsumer + 'static,
211    is_da: bool,
212    identity: Identity,
213    proposal_fetcher_config: ProposalFetcherConfig,
214) -> anyhow::Result<SequencerContext<network::Production, P, V>>
215where
216    Arc<P>: Storage<SeqTypes>,
217{
218    // Expose git information via status API.
219    metrics
220        .text_family(
221            "version".into(),
222            vec!["rev".into(), "desc".into(), "timestamp".into()],
223        )
224        .create(vec![
225            env!("VERGEN_GIT_SHA").into(),
226            env!("VERGEN_GIT_DESCRIBE").into(),
227            env!("VERGEN_GIT_COMMIT_TIMESTAMP").into(),
228        ]);
229
230    // Expose Node Entity Information via the status/metrics API
231    metrics
232        .text_family(
233            "node_identity_general".into(),
234            vec![
235                "name".into(),
236                "description".into(),
237                "company_name".into(),
238                "company_website".into(),
239                "operating_system".into(),
240                "node_type".into(),
241                "network_type".into(),
242            ],
243        )
244        .create(vec![
245            identity.node_name.unwrap_or_default(),
246            identity.node_description.unwrap_or_default(),
247            identity.company_name.unwrap_or_default(),
248            identity
249                .company_website
250                .map(|u| u.into())
251                .unwrap_or_default(),
252            identity.operating_system.unwrap_or_default(),
253            identity.node_type.unwrap_or_default(),
254            identity.network_type.unwrap_or_default(),
255        ]);
256
257    // Expose Node Identity Location via the status/metrics API
258    metrics
259        .text_family(
260            "node_identity_location".into(),
261            vec!["country".into(), "latitude".into(), "longitude".into()],
262        )
263        .create(vec![
264            identity.country_code.unwrap_or_default(),
265            identity.latitude.map(|l| l.to_string()).unwrap_or_default(),
266            identity
267                .longitude
268                .map(|l| l.to_string())
269                .unwrap_or_default(),
270        ]);
271
272    // Expose icons for node dashboard via the status/metrics API
273    metrics
274        .text_family(
275            "node_identity_icon".into(),
276            vec![
277                "small_1x".into(),
278                "small_2x".into(),
279                "small_3x".into(),
280                "large_1x".into(),
281                "large_2x".into(),
282                "large_3x".into(),
283            ],
284        )
285        .create(vec![
286            identity
287                .icon_14x14_1x
288                .map(|u| u.to_string())
289                .unwrap_or_default(),
290            identity
291                .icon_14x14_2x
292                .map(|u| u.to_string())
293                .unwrap_or_default(),
294            identity
295                .icon_14x14_3x
296                .map(|u| u.to_string())
297                .unwrap_or_default(),
298            identity
299                .icon_24x24_1x
300                .map(|u| u.to_string())
301                .unwrap_or_default(),
302            identity
303                .icon_24x24_2x
304                .map(|u| u.to_string())
305                .unwrap_or_default(),
306            identity
307                .icon_24x24_3x
308                .map(|u| u.to_string())
309                .unwrap_or_default(),
310        ]);
311
312    // Stick our public key in `metrics` so it is easily accessible via the status API.
313    let pub_key = BLSPubKey::from_private(&network_params.private_staking_key);
314    metrics
315        .text_family("node".into(), vec!["key".into()])
316        .create(vec![pub_key.to_string()]);
317
318    // Parse the Libp2p bind and advertise addresses to multiaddresses
319    let libp2p_bind_address = derive_libp2p_multiaddr(&network_params.libp2p_bind_address)
320        .with_context(|| {
321            format!(
322                "Failed to derive Libp2p bind address of {}",
323                &network_params.libp2p_bind_address
324            )
325        })?;
326    let libp2p_advertise_address =
327        derive_libp2p_multiaddr(&network_params.libp2p_advertise_address).with_context(|| {
328            format!(
329                "Failed to derive Libp2p advertise address of {}",
330                &network_params.libp2p_advertise_address
331            )
332        })?;
333
334    info!("Libp2p bind address: {}", libp2p_bind_address);
335    info!("Libp2p advertise address: {}", libp2p_advertise_address);
336
337    // Orchestrator client
338    let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url);
339    let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key);
340    let validator_config = ValidatorConfig {
341        public_key: pub_key,
342        private_key: network_params.private_staking_key,
343        stake_value: U256::ONE,
344        state_public_key: state_key_pair.ver_key(),
345        state_private_key: state_key_pair.sign_key(),
346        is_da,
347    };
348
349    // Derive our Libp2p public key from our private key
350    let libp2p_public_key = derive_libp2p_peer_id::<<SeqTypes as NodeType>::SignatureKey>(
351        &validator_config.private_key,
352    )
353    .with_context(|| "Failed to derive Libp2p peer ID")?;
354
355    // Print the libp2p public key
356    info!("Starting Libp2p with PeerID: {libp2p_public_key}");
357
358    let loaded_network_config_from_persistence = persistence.load_config().await?;
359    let (mut network_config, wait_for_orchestrator) = match (
360        loaded_network_config_from_persistence,
361        network_params.config_peers,
362    ) {
363        (Some(config), _) => {
364            tracing::warn!("loaded network config from storage, rejoining existing network");
365            (config, false)
366        },
367        // If we were told to fetch the config from an already-started peer, do so.
368        (None, Some(peers)) => {
369            tracing::warn!(?peers, "loading network config from peers");
370            let peers = StatePeers::<SequencerApiVersion>::from_urls(
371                peers,
372                network_params.catchup_backoff,
373                &NoMetrics,
374            );
375            let config = peers.fetch_config(validator_config.clone()).await?;
376
377            tracing::warn!(
378                node_id = config.node_index,
379                stake_table = ?config.config.known_nodes_with_stake,
380                "loaded config",
381            );
382            persistence.save_config(&config).await?;
383            (config, false)
384        },
385        // Otherwise, this is a fresh network; load from the orchestrator.
386        (None, None) => {
387            tracing::warn!("loading network config from orchestrator");
388            tracing::warn!(
389                "waiting for other nodes to connect, DO NOT RESTART until fully connected"
390            );
391            let config = get_complete_config(
392                &orchestrator_client,
393                validator_config.clone(),
394                // Register in our Libp2p advertise address and public key so other nodes
395                // can contact us on startup
396                Some(libp2p_advertise_address),
397                Some(libp2p_public_key),
398            )
399            .await?
400            .0;
401
402            tracing::warn!(
403                node_id = config.node_index,
404                stake_table = ?config.config.known_nodes_with_stake,
405                "loaded config",
406            );
407            persistence.save_config(&config).await?;
408            tracing::warn!("all nodes connected");
409            (config, true)
410        },
411    };
412
413    if let Some(upgrade) = genesis.upgrades.get(&V::Upgrade::VERSION) {
414        upgrade.set_hotshot_config_parameters(&mut network_config.config);
415    }
416
417    // Override the builder URLs in the network config with the ones from the command line
418    // if any were provided
419    if !network_params.builder_urls.is_empty() {
420        network_config.config.builder_urls = network_params.builder_urls.try_into().unwrap();
421    }
422
423    let epoch_height = genesis.epoch_height.unwrap_or_default();
424    let drb_difficulty = genesis.drb_difficulty.unwrap_or_default();
425    let drb_upgrade_difficulty = genesis.drb_upgrade_difficulty.unwrap_or_default();
426    let epoch_start_block = genesis.epoch_start_block.unwrap_or_default();
427    let stake_table_capacity = genesis
428        .stake_table_capacity
429        .unwrap_or(hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY);
430
431    tracing::warn!("setting epoch_height={epoch_height:?}");
432    tracing::warn!("setting drb_difficulty={drb_difficulty:?}");
433    tracing::warn!("setting drb_upgrade_difficulty={drb_upgrade_difficulty:?}");
434    tracing::warn!("setting epoch_start_block={epoch_start_block:?}");
435    tracing::warn!("setting stake_table_capacity={stake_table_capacity:?}");
436    network_config.config.epoch_height = epoch_height;
437    network_config.config.drb_difficulty = drb_difficulty;
438    network_config.config.drb_upgrade_difficulty = drb_upgrade_difficulty;
439    network_config.config.epoch_start_block = epoch_start_block;
440    network_config.config.stake_table_capacity = stake_table_capacity;
441
442    if let Some(da_committees) = &genesis.da_committees {
443        tracing::warn!("setting da_committees from genesis: {da_committees:?}");
444        network_config.config.da_committees = da_committees.clone();
445    }
446
447    // If the `Libp2p` bootstrap nodes were supplied via the command line, override those
448    // present in the config file.
449    if let Some(bootstrap_nodes) = network_params.libp2p_bootstrap_nodes {
450        if let Some(libp2p_config) = network_config.libp2p_config.as_mut() {
451            // If the libp2p configuration is present, we can override the bootstrap nodes.
452
453            // Split off the peer ID from the addresses
454            libp2p_config.bootstrap_nodes = bootstrap_nodes
455                .into_iter()
456                .map(split_off_peer_id)
457                .collect::<Result<Vec<_>, _>>()
458                .with_context(|| "Failed to parse peer ID from bootstrap node")?;
459        } else {
460            // If not, don't try launching with them. Eventually we may want to
461            // provide a default configuration here instead.
462            tracing::warn!("No libp2p configuration found, ignoring supplied bootstrap nodes");
463        }
464    }
465
466    let node_index = network_config.node_index;
467
468    // If we are a DA node, we need to subscribe to the DA topic
469    let topics = {
470        let mut topics = vec![CdnTopic::Global];
471        if is_da {
472            topics.push(CdnTopic::Da);
473        }
474        topics
475    };
476
477    // Initialize the push CDN network (and perform the initial connection)
478    let cdn_network = PushCdnNetwork::new(
479        network_params.cdn_endpoint,
480        topics,
481        KeyPair {
482            public_key: WrappedSignatureKey(validator_config.public_key),
483            private_key: validator_config.private_key.clone(),
484        },
485        CdnMetricsValue::new(metrics),
486    )
487    .with_context(|| format!("Failed to create CDN network {node_index}"))?;
488
489    // Configure gossipsub based on the command line options
490    let gossip_config = GossipConfig {
491        heartbeat_interval: network_params.libp2p_heartbeat_interval,
492        history_gossip: network_params.libp2p_history_gossip,
493        history_length: network_params.libp2p_history_length,
494        mesh_n: network_params.libp2p_mesh_n,
495        mesh_n_high: network_params.libp2p_mesh_n_high,
496        mesh_n_low: network_params.libp2p_mesh_n_low,
497        mesh_outbound_min: network_params.libp2p_mesh_outbound_min,
498        max_ihave_messages: network_params.libp2p_max_ihave_messages,
499        max_transmit_size: network_params.libp2p_max_gossip_transmit_size,
500        max_ihave_length: network_params.libp2p_max_ihave_length,
501        published_message_ids_cache_time: network_params.libp2p_published_message_ids_cache_time,
502        iwant_followup_time: network_params.libp2p_iwant_followup_time,
503        max_messages_per_rpc: network_params.libp2p_max_messages_per_rpc,
504        gossip_retransmission: network_params.libp2p_gossip_retransmission,
505        flood_publish: network_params.libp2p_flood_publish,
506        duplicate_cache_time: network_params.libp2p_duplicate_cache_time,
507        fanout_ttl: network_params.libp2p_fanout_ttl,
508        heartbeat_initial_delay: network_params.libp2p_heartbeat_initial_delay,
509        gossip_factor: network_params.libp2p_gossip_factor,
510        gossip_lazy: network_params.libp2p_gossip_lazy,
511    };
512
513    // Configure request/response based on the command line options
514    let request_response_config = RequestResponseConfig {
515        request_size_maximum: network_params.libp2p_max_direct_transmit_size,
516        response_size_maximum: network_params.libp2p_max_direct_transmit_size,
517    };
518
519    let l1_client = l1_params
520        .options
521        .with_metrics(metrics)
522        .connect(l1_params.urls)
523        .with_context(|| "failed to create L1 client")?;
524
525    info!("Validating fee contract");
526
527    genesis.validate_fee_contract(&l1_client).await?;
528
529    info!("Fee contract validated. Spawning L1 tasks");
530
531    l1_client.spawn_tasks().await;
532
533    info!(
534        "L1 tasks spawned. Waiting for L1 genesis: {:?}",
535        genesis.l1_finalized
536    );
537
538    let l1_genesis = match genesis.l1_finalized {
539        L1Finalized::Block(b) => b,
540        L1Finalized::Number { number } => l1_client.wait_for_finalized_block(number).await,
541        L1Finalized::Timestamp { timestamp } => {
542            l1_client
543                .wait_for_finalized_block_with_timestamp(U256::from(timestamp.unix_timestamp()))
544                .await
545        },
546    };
547
548    info!("L1 genesis found: {:?}", l1_genesis);
549
550    let genesis_chain_config = genesis.header.chain_config;
551    let mut genesis_state = ValidatedState {
552        chain_config: genesis_chain_config.into(),
553        ..Default::default()
554    };
555    for (address, amount) in genesis.accounts {
556        tracing::warn!(%address, %amount, "Prefunding account for demo");
557        genesis_state.prefund_account(address, amount);
558    }
559
560    // Create the list of parallel catchup providers
561    let state_catchup_providers = ParallelStateCatchup::new(&[]);
562
563    // Add the state peers to the list
564    let state_peers = StatePeers::<SequencerApiVersion>::from_urls(
565        network_params.state_peers,
566        network_params.catchup_backoff,
567        metrics,
568    );
569    state_catchup_providers.add_provider(Arc::new(state_peers));
570
571    // Add the local (persistence) catchup provider to the list (if we can)
572    match persistence
573        .clone()
574        .into_catchup_provider(network_params.catchup_backoff)
575    {
576        Ok(catchup) => {
577            state_catchup_providers.add_provider(Arc::new(catchup));
578        },
579        Err(e) => {
580            tracing::warn!(
581                "Failed to create local catchup provider: {e:#}. Only using remote catchup."
582            );
583        },
584    };
585
586    persistence.enable_metrics(metrics);
587
588    let fetcher = Fetcher::new(
589        Arc::new(state_catchup_providers.clone()),
590        Arc::new(Mutex::new(persistence.clone())),
591        l1_client.clone(),
592        genesis.chain_config,
593    );
594
595    info!("Spawning update loop");
596
597    fetcher.spawn_update_loop().await;
598    info!("Update loop spawned. Fetching block reward");
599
600    let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
601    info!("Block reward fetched: {:?}", block_reward);
602    // Create the HotShot membership
603    let mut membership = EpochCommittees::new_stake(
604        network_config.config.known_nodes_with_stake.clone(),
605        network_config.config.known_da_nodes.clone(),
606        block_reward,
607        fetcher,
608        epoch_height,
609    );
610    info!("Membership created. Reloading stake");
611    membership.reload_stake(RECENT_STAKE_TABLES_LIMIT).await;
612    info!("Stake reloaded");
613
614    let membership: Arc<RwLock<EpochCommittees>> = Arc::new(RwLock::new(membership));
615    let persistence = Arc::new(persistence);
616    let coordinator = EpochMembershipCoordinator::new(
617        membership,
618        network_config.config.epoch_height,
619        &persistence.clone(),
620    );
621
622    let instance_state = NodeState {
623        chain_config: genesis.chain_config,
624        genesis_chain_config,
625        l1_client,
626        genesis_header: genesis.header,
627        genesis_state,
628        l1_genesis: Some(l1_genesis),
629        node_id: node_index,
630        upgrades: genesis.upgrades,
631        current_version: V::Base::VERSION,
632        epoch_height: Some(epoch_height),
633        state_catchup: Arc::new(state_catchup_providers.clone()),
634        coordinator: coordinator.clone(),
635        genesis_version: genesis.genesis_version,
636        epoch_start_block: genesis.epoch_start_block.unwrap_or_default(),
637    };
638
639    // Initialize the Libp2p network
640    let network = {
641        info!("Initializing Libp2p network");
642        let p2p_network = Libp2pNetwork::from_config(
643            network_config.clone(),
644            persistence.clone(),
645            gossip_config,
646            request_response_config,
647            libp2p_bind_address,
648            &validator_config.public_key,
649            // We need the private key so we can derive our Libp2p keypair
650            // (using https://docs.rs/blake3/latest/blake3/fn.derive_key.html)
651            &validator_config.private_key,
652            hotshot::traits::implementations::Libp2pMetricsValue::new(metrics),
653        )
654        .await
655        .with_context(|| {
656            format!(
657                "Failed to create libp2p network on node {node_index}; binding to {:?}",
658                network_params.libp2p_bind_address
659            )
660        })?;
661
662        info!("Libp2p network initialized");
663
664        tracing::warn!("Waiting for at least one connection to be initialized");
665        select! {
666            _ = cdn_network.wait_for_ready() => {
667                tracing::warn!("CDN connection initialized");
668            },
669            _ = p2p_network.wait_for_ready() => {
670                tracing::warn!("P2P connection initialized");
671            },
672        };
673
674        // Combine the CDN and P2P networks
675        Arc::from(CombinedNetworks::new(
676            cdn_network,
677            p2p_network,
678            Some(Duration::from_secs(1)),
679        ))
680    };
681
682    let mut ctx = SequencerContext::init(
683        network_config,
684        validator_config,
685        coordinator,
686        instance_state,
687        storage,
688        state_catchup_providers,
689        persistence,
690        network,
691        Some(network_params.state_relay_server_url),
692        metrics,
693        genesis.stake_table.capacity,
694        event_consumer,
695        seq_versions,
696        proposal_fetcher_config,
697    )
698    .await?;
699    if wait_for_orchestrator {
700        ctx = ctx.wait_for_orchestrator(orchestrator_client);
701    }
702    Ok(ctx)
703}
704
705pub fn empty_builder_commitment() -> BuilderCommitment {
706    BuilderCommitment::from_bytes([])
707}
708
709#[cfg(any(test, feature = "testing"))]
710pub mod testing {
711    use std::{
712        cmp::max,
713        collections::{BTreeMap, HashMap},
714        time::Duration,
715    };
716
717    use alloy::{
718        network::EthereumWallet,
719        node_bindings::{Anvil, AnvilInstance},
720        primitives::{Address, U256},
721        providers::{
722            fillers::{
723                BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
724            },
725            layers::AnvilProvider,
726            Provider, ProviderBuilder, RootProvider,
727        },
728        signers::{
729            k256::ecdsa::SigningKey,
730            local::{LocalSigner, PrivateKeySigner},
731        },
732    };
733    use async_lock::RwLock;
734    use catchup::NullStateCatchup;
735    use committable::Committable;
736    use espresso_contract_deployer::{
737        builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
738        Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
739    };
740    use espresso_types::{
741        eth_signature_key::EthKeyPair,
742        v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, StateCatchup},
743        EpochVersion, Event, FeeAccount, L1Client, NetworkConfig, PubKey, SeqTypes, Transaction,
744        Upgrade, UpgradeMap,
745    };
746    use futures::{
747        future::join_all,
748        stream::{Stream, StreamExt},
749    };
750    use hotshot::{
751        traits::{
752            implementations::{MasterMap, MemoryNetwork},
753            BlockPayload,
754        },
755        types::EventType::{self, Decide},
756    };
757    use hotshot_builder_refactored::service::{
758        BuilderConfig as LegacyBuilderConfig, GlobalState as LegacyGlobalState,
759    };
760    use hotshot_testing::block_builder::{
761        BuilderTask, SimpleBuilderImplementation, TestBuilderImplementation,
762    };
763    use hotshot_types::{
764        data::EpochNumber,
765        event::LeafInfo,
766        light_client::StateKeyPair,
767        signature_key::BLSKeyPair,
768        traits::{
769            block_contents::BlockHeader, metrics::NoMetrics, network::Topic,
770            node_implementation::ConsensusTime as _, signature_key::BuilderSignatureKey,
771            EncodeBytes,
772        },
773        HotShotConfig, PeerConfig,
774    };
775    use portpicker::pick_unused_port;
776    use rand::SeedableRng as _;
777    use rand_chacha::ChaCha20Rng;
778    use staking_cli::demo::{DelegationConfig, StakingTransactions};
779    use tokio::spawn;
780    use vbs::version::Version;
781
782    use super::*;
783    use crate::{
784        catchup::ParallelStateCatchup,
785        persistence::no_storage::{self, NoStorage},
786    };
787
788    const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
789    const BUILDER_CHANNEL_CAPACITY_FOR_TEST: usize = 128;
790    type AnvilFillProvider = AnvilProvider<
791        FillProvider<
792            JoinFill<
793                alloy::providers::Identity,
794                JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
795            >,
796            RootProvider,
797        >,
798    >;
799    struct LegacyBuilderImplementation {
800        global_state: Arc<LegacyGlobalState<SeqTypes>>,
801    }
802
803    impl BuilderTask<SeqTypes> for LegacyBuilderImplementation {
804        fn start(
805            self: Box<Self>,
806            stream: Box<
807                dyn futures::prelude::Stream<Item = hotshot::types::Event<SeqTypes>>
808                    + std::marker::Unpin
809                    + Send
810                    + 'static,
811            >,
812        ) {
813            spawn(async move {
814                let res = self.global_state.start_event_loop(stream).await;
815                tracing::error!(?res, "testing legacy builder service exited");
816            });
817        }
818    }
819
820    pub async fn run_legacy_builder<const NUM_NODES: usize>(
821        port: Option<u16>,
822        max_block_size: Option<u64>,
823    ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
824        let builder_key_pair = TestConfig::<0>::builder_key();
825        let port = port.unwrap_or_else(|| pick_unused_port().expect("No ports available"));
826
827        // This should never fail.
828        let url: Url = format!("http://localhost:{port}")
829            .parse()
830            .expect("Failed to parse builder URL");
831
832        // create the global state
833        let global_state = LegacyGlobalState::new(
834            LegacyBuilderConfig {
835                builder_keys: (builder_key_pair.fee_account(), builder_key_pair),
836                max_api_waiting_time: Duration::from_secs(1),
837                max_block_size_increment_period: Duration::from_secs(60),
838                maximize_txn_capture_timeout: Duration::from_millis(100),
839                txn_garbage_collect_duration: Duration::from_secs(60),
840                txn_channel_capacity: BUILDER_CHANNEL_CAPACITY_FOR_TEST,
841                tx_status_cache_capacity: 81920,
842                base_fee: 10,
843            },
844            NodeState::default(),
845            max_block_size.unwrap_or(300),
846            NUM_NODES,
847        );
848
849        // Create and spawn the tide-disco app to serve the builder APIs
850        let app = Arc::clone(&global_state)
851            .into_app()
852            .expect("Failed to create builder tide-disco app");
853
854        spawn(
855            app.serve(
856                format!("http://0.0.0.0:{port}")
857                    .parse::<Url>()
858                    .expect("Failed to parse builder listener"),
859                EpochVersion::instance(),
860            ),
861        );
862
863        // Pass on the builder task to be injected in the testing harness
864        (Box::new(LegacyBuilderImplementation { global_state }), url)
865    }
866
867    pub async fn run_test_builder<const NUM_NODES: usize>(
868        port: Option<u16>,
869    ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
870        let port = port.unwrap_or_else(|| pick_unused_port().expect("No ports available"));
871
872        // This should never fail.
873        let url: Url = format!("http://localhost:{port}")
874            .parse()
875            .expect("Failed to parse builder URL");
876        tracing::info!("Starting test builder on {url}");
877
878        (
879            <SimpleBuilderImplementation as TestBuilderImplementation<SeqTypes>>::start(
880                NUM_NODES,
881                format!("http://0.0.0.0:{port}")
882                    .parse()
883                    .expect("Failed to parse builder listener"),
884                (),
885                HashMap::new(),
886            )
887            .await,
888            url,
889        )
890    }
891
892    pub struct TestConfigBuilder<const NUM_NODES: usize> {
893        config: HotShotConfig<SeqTypes>,
894        priv_keys: Vec<BLSPrivKey>,
895        state_key_pairs: Vec<StateKeyPair>,
896        master_map: Arc<MasterMap<PubKey>>,
897        l1_url: Url,
898        l1_opt: L1ClientOptions,
899        anvil_provider: Option<AnvilFillProvider>,
900        signer: LocalSigner<SigningKey>,
901        state_relay_url: Option<Url>,
902        builder_port: Option<u16>,
903        upgrades: BTreeMap<Version, Upgrade>,
904    }
905
906    pub fn staking_priv_keys(
907        priv_keys: &[BLSPrivKey],
908        state_key_pairs: &[StateKeyPair],
909        num_nodes: usize,
910    ) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
911        let seed = [42u8; 32];
912        let mut rng = ChaCha20Rng::from_seed(seed); // Create a deterministic RNG
913        let eth_key_pairs = (0..num_nodes).map(|_| SigningKey::random(&mut rng).into());
914        eth_key_pairs
915            .zip(priv_keys.iter())
916            .zip(state_key_pairs.iter())
917            .map(|((eth, bls), state)| (eth, bls.clone().into(), state.clone()))
918            .collect()
919    }
920
921    impl<const NUM_NODES: usize> TestConfigBuilder<NUM_NODES> {
922        pub fn builder_port(mut self, builder_port: Option<u16>) -> Self {
923            self.builder_port = builder_port;
924            self
925        }
926
927        pub fn state_relay_url(mut self, url: Url) -> Self {
928            self.state_relay_url = Some(url);
929            self
930        }
931
932        /// Sets the Anvil provider, constructed using the Anvil instance.
933        /// Also sets the L1 URL based on the Anvil endpoint.
934        /// The `AnvilProvider` can be used to configure the Anvil, for example,
935        /// by enabling interval mining after the test network is initialized.
936        pub fn anvil_provider(mut self, anvil: AnvilInstance) -> Self {
937            self.l1_url = anvil.endpoint().parse().unwrap();
938            let l1_client = L1Client::anvil(&anvil).expect("create l1 client");
939            let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
940            self.anvil_provider = Some(anvil_provider);
941            self
942        }
943
944        /// Sets a custom L1 URL, overriding any previously set Anvil instance URL.
945        /// This removes the anvil provider, as well as it is no longer needed
946        pub fn l1_url(mut self, l1_url: Url) -> Self {
947            self.anvil_provider = None;
948            self.l1_url = l1_url;
949            self
950        }
951
952        pub fn l1_opt(mut self, opt: L1ClientOptions) -> Self {
953            self.l1_opt = opt;
954            self
955        }
956
957        pub fn signer(mut self, signer: LocalSigner<SigningKey>) -> Self {
958            self.signer = signer;
959            self
960        }
961
962        pub fn upgrades<V: Versions>(mut self, upgrades: BTreeMap<Version, Upgrade>) -> Self {
963            let upgrade = upgrades.get(&<V as Versions>::Upgrade::VERSION).unwrap();
964            upgrade.set_hotshot_config_parameters(&mut self.config);
965            self.upgrades = upgrades;
966            self
967        }
968
969        /// Version specific upgrade setup. Extend to future upgrades
970        /// by adding a branch to the `match` statement.
971        pub async fn set_upgrades(mut self, version: Version) -> Self {
972            let upgrade = match version {
973                version if version >= EpochVersion::VERSION => {
974                    tracing::debug!(?version, "upgrade version");
975                    let blocks_per_epoch = self.config.epoch_height;
976                    let epoch_start_block = self.config.epoch_start_block;
977
978                    let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
979                        &self.config.hotshot_stake_table(),
980                        STAKE_TABLE_CAPACITY_FOR_TEST,
981                    )
982                    .unwrap();
983
984                    let validators =
985                        staking_priv_keys(&self.priv_keys, &self.state_key_pairs, NUM_NODES);
986
987                    let deployer = ProviderBuilder::new()
988                        .wallet(EthereumWallet::from(self.signer.clone()))
989                        .connect_http(self.l1_url.clone());
990
991                    let mut contracts = Contracts::new();
992                    let args = DeployerArgsBuilder::default()
993                        .deployer(deployer.clone())
994                        .rpc_url(self.l1_url.clone())
995                        .mock_light_client(true)
996                        .genesis_lc_state(genesis_state)
997                        .genesis_st_state(genesis_stake)
998                        .blocks_per_epoch(blocks_per_epoch)
999                        .epoch_start_block(epoch_start_block)
1000                        .exit_escrow_period(U256::from(max(
1001                            blocks_per_epoch * 15 + 100,
1002                            DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1003                        )))
1004                        .multisig_pauser(self.signer.address())
1005                        .token_name("Espresso".to_string())
1006                        .token_symbol("ESP".to_string())
1007                        .initial_token_supply(U256::from(3590000000u64))
1008                        .ops_timelock_delay(U256::from(0))
1009                        .ops_timelock_admin(self.signer.address())
1010                        .ops_timelock_proposers(vec![self.signer.address()])
1011                        .ops_timelock_executors(vec![self.signer.address()])
1012                        .safe_exit_timelock_delay(U256::from(10))
1013                        .safe_exit_timelock_admin(self.signer.address())
1014                        .safe_exit_timelock_proposers(vec![self.signer.address()])
1015                        .safe_exit_timelock_executors(vec![self.signer.address()])
1016                        .build()
1017                        .unwrap();
1018                    args.deploy_all(&mut contracts)
1019                        .await
1020                        .expect("failed to deploy all contracts");
1021
1022                    let st_addr = contracts
1023                        .address(Contract::StakeTableProxy)
1024                        .expect("StakeTableProxy address not found");
1025                    StakingTransactions::create(
1026                        self.l1_url.clone(),
1027                        &deployer,
1028                        st_addr,
1029                        validators,
1030                        None,
1031                        DelegationConfig::default(),
1032                    )
1033                    .await
1034                    .expect("stake table setup failed")
1035                    .apply_all()
1036                    .await
1037                    .expect("send all txns failed");
1038
1039                    Upgrade::pos_view_based(st_addr)
1040                },
1041                _ => panic!("Upgrade not configured for version {version:?}"),
1042            };
1043
1044            let mut upgrades = std::collections::BTreeMap::new();
1045            upgrade.set_hotshot_config_parameters(&mut self.config);
1046            upgrades.insert(version, upgrade);
1047
1048            self.upgrades = upgrades;
1049            self
1050        }
1051
1052        pub fn epoch_height(mut self, epoch_height: u64) -> Self {
1053            self.config.epoch_height = epoch_height;
1054            self
1055        }
1056
1057        pub fn epoch_start_block(mut self, start_block: u64) -> Self {
1058            self.config.epoch_start_block = start_block;
1059            self
1060        }
1061
1062        pub fn build(self) -> TestConfig<NUM_NODES> {
1063            TestConfig {
1064                config: self.config,
1065                priv_keys: self.priv_keys,
1066                state_key_pairs: self.state_key_pairs,
1067                master_map: self.master_map,
1068                l1_url: self.l1_url,
1069                l1_opt: self.l1_opt,
1070                signer: self.signer,
1071                state_relay_url: self.state_relay_url,
1072                builder_port: self.builder_port,
1073                upgrades: self.upgrades,
1074                anvil_provider: self.anvil_provider,
1075            }
1076        }
1077
1078        pub fn stake_table_capacity(mut self, stake_table_capacity: usize) -> Self {
1079            self.config.stake_table_capacity = stake_table_capacity;
1080            self
1081        }
1082    }
1083
1084    impl<const NUM_NODES: usize> Default for TestConfigBuilder<NUM_NODES> {
1085        fn default() -> Self {
1086            let num_nodes = NUM_NODES;
1087
1088            // Generate keys for the nodes.
1089            let seed = [0; 32];
1090            let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..num_nodes)
1091                .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed(seed, i as u64))
1092                .unzip();
1093            let state_key_pairs = (0..num_nodes)
1094                .map(|i| StateKeyPair::generate_from_seed_indexed(seed, i as u64))
1095                .collect::<Vec<_>>();
1096            let known_nodes_with_stake = pub_keys
1097                .iter()
1098                .zip(&state_key_pairs)
1099                .map(|(pub_key, state_key_pair)| PeerConfig::<SeqTypes> {
1100                    stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
1101                    state_ver_key: state_key_pair.ver_key(),
1102                })
1103                .collect::<Vec<_>>();
1104
1105            let master_map = MasterMap::new();
1106
1107            let config: HotShotConfig<SeqTypes> = HotShotConfig {
1108                fixed_leader_for_gpuvid: 0,
1109                num_nodes_with_stake: num_nodes.try_into().unwrap(),
1110                known_da_nodes: known_nodes_with_stake.clone(),
1111                da_committees: Default::default(),
1112                known_nodes_with_stake: known_nodes_with_stake.clone(),
1113                next_view_timeout: Duration::from_secs(5).as_millis() as u64,
1114                num_bootstrap: 1usize,
1115                da_staked_committee_size: num_nodes,
1116                view_sync_timeout: Duration::from_secs(1),
1117                data_request_delay: Duration::from_secs(1),
1118                builder_urls: vec1::vec1![Url::parse(&format!(
1119                    "http://127.0.0.1:{}",
1120                    pick_unused_port().unwrap()
1121                ))
1122                .unwrap()],
1123                builder_timeout: Duration::from_secs(1),
1124                start_threshold: (
1125                    known_nodes_with_stake.clone().len() as u64,
1126                    known_nodes_with_stake.clone().len() as u64,
1127                ),
1128                start_proposing_view: 0,
1129                stop_proposing_view: 0,
1130                start_voting_view: 0,
1131                stop_voting_view: 0,
1132                start_proposing_time: 0,
1133                start_voting_time: 0,
1134                stop_proposing_time: 0,
1135                stop_voting_time: 0,
1136                epoch_height: 30,
1137                epoch_start_block: 1,
1138                stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
1139                drb_difficulty: 10,
1140                drb_upgrade_difficulty: 20,
1141            };
1142
1143            let anvil = Anvil::new()
1144                .args(["--slots-in-an-epoch", "0", "--balance", "1000000"])
1145                .spawn();
1146
1147            let l1_client = L1Client::anvil(&anvil).expect("failed to create l1 client");
1148            let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1149
1150            let l1_signer_key = anvil_provider.anvil().keys()[0].clone();
1151            let signer = LocalSigner::from(l1_signer_key);
1152
1153            Self {
1154                config,
1155                priv_keys,
1156                state_key_pairs,
1157                master_map,
1158                l1_url: anvil_provider.anvil().endpoint().parse().unwrap(),
1159                l1_opt: L1ClientOptions {
1160                    stake_table_update_interval: Duration::from_secs(5),
1161                    l1_events_max_block_range: 1000,
1162                    l1_polling_interval: Duration::from_secs(1),
1163                    subscription_timeout: Duration::from_secs(5),
1164                    ..Default::default()
1165                },
1166                anvil_provider: Some(anvil_provider),
1167                signer,
1168                state_relay_url: None,
1169                builder_port: None,
1170                upgrades: Default::default(),
1171            }
1172        }
1173    }
1174
1175    #[derive(Clone)]
1176    pub struct TestConfig<const NUM_NODES: usize> {
1177        config: HotShotConfig<SeqTypes>,
1178        priv_keys: Vec<BLSPrivKey>,
1179        state_key_pairs: Vec<StateKeyPair>,
1180        master_map: Arc<MasterMap<PubKey>>,
1181        l1_url: Url,
1182        l1_opt: L1ClientOptions,
1183        anvil_provider: Option<AnvilFillProvider>,
1184        signer: LocalSigner<SigningKey>,
1185        state_relay_url: Option<Url>,
1186        builder_port: Option<u16>,
1187        upgrades: BTreeMap<Version, Upgrade>,
1188    }
1189
1190    impl<const NUM_NODES: usize> TestConfig<NUM_NODES> {
1191        pub fn num_nodes(&self) -> usize {
1192            self.priv_keys.len()
1193        }
1194
1195        pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1196            &self.config
1197        }
1198
1199        pub fn set_builder_urls(&mut self, builder_urls: vec1::Vec1<Url>) {
1200            self.config.builder_urls = builder_urls;
1201        }
1202
1203        pub fn builder_port(&self) -> Option<u16> {
1204            self.builder_port
1205        }
1206
1207        pub fn signer(&self) -> LocalSigner<SigningKey> {
1208            self.signer.clone()
1209        }
1210
1211        pub fn l1_url(&self) -> Url {
1212            self.l1_url.clone()
1213        }
1214
1215        pub fn anvil(&self) -> Option<&AnvilFillProvider> {
1216            self.anvil_provider.as_ref()
1217        }
1218
1219        pub fn get_upgrade_map(&self) -> UpgradeMap {
1220            self.upgrades.clone().into()
1221        }
1222
1223        pub fn upgrades(&self) -> BTreeMap<Version, Upgrade> {
1224            self.upgrades.clone()
1225        }
1226
1227        pub fn staking_priv_keys(&self) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
1228            staking_priv_keys(&self.priv_keys, &self.state_key_pairs, self.num_nodes())
1229        }
1230
1231        pub fn validator_providers(&self) -> Vec<(Address, impl Provider + Clone)> {
1232            self.staking_priv_keys()
1233                .into_iter()
1234                .map(|(signer, ..)| {
1235                    (
1236                        signer.address(),
1237                        ProviderBuilder::new()
1238                            .wallet(EthereumWallet::from(signer))
1239                            .connect_http(self.l1_url.clone()),
1240                    )
1241                })
1242                .collect()
1243        }
1244
1245        pub async fn init_nodes<V: Versions>(
1246            &self,
1247            bind_version: V,
1248        ) -> Vec<SequencerContext<network::Memory, NoStorage, V>> {
1249            join_all((0..self.num_nodes()).map(|i| async move {
1250                self.init_node(
1251                    i,
1252                    ValidatedState::default(),
1253                    no_storage::Options,
1254                    Some(NullStateCatchup::default()),
1255                    None,
1256                    &NoMetrics,
1257                    STAKE_TABLE_CAPACITY_FOR_TEST,
1258                    NullEventConsumer,
1259                    bind_version,
1260                    Default::default(),
1261                )
1262                .await
1263            }))
1264            .await
1265        }
1266
1267        pub fn known_nodes_with_stake(&self) -> &[PeerConfig<SeqTypes>] {
1268            &self.config.known_nodes_with_stake
1269        }
1270
1271        #[allow(clippy::too_many_arguments)]
1272        pub async fn init_node<V: Versions, P: PersistenceOptions>(
1273            &self,
1274            i: usize,
1275            mut state: ValidatedState,
1276            mut persistence_opt: P,
1277            state_peers: Option<impl StateCatchup + 'static>,
1278            storage: Option<RequestResponseStorage>,
1279            metrics: &dyn Metrics,
1280            stake_table_capacity: usize,
1281            event_consumer: impl EventConsumer + 'static,
1282            bind_version: V,
1283            upgrades: BTreeMap<Version, Upgrade>,
1284        ) -> SequencerContext<network::Memory, P::Persistence, V> {
1285            let config = self.config.clone();
1286            let my_peer_config = &config.known_nodes_with_stake[i];
1287            let is_da = config.known_da_nodes.contains(my_peer_config);
1288
1289            // Create our own (private, local) validator config
1290            let validator_config = ValidatorConfig {
1291                public_key: my_peer_config.stake_table_entry.stake_key,
1292                private_key: self.priv_keys[i].clone(),
1293                stake_value: my_peer_config.stake_table_entry.stake_amount,
1294                state_public_key: self.state_key_pairs[i].ver_key(),
1295                state_private_key: self.state_key_pairs[i].sign_key(),
1296                is_da,
1297            };
1298
1299            let topics = if is_da {
1300                vec![Topic::Global, Topic::Da]
1301            } else {
1302                vec![Topic::Global]
1303            };
1304
1305            let network = Arc::new(MemoryNetwork::new(
1306                &my_peer_config.stake_table_entry.stake_key,
1307                &self.master_map,
1308                &topics,
1309                None,
1310            ));
1311
1312            // Make sure the builder account is funded.
1313            let builder_account = Self::builder_key().fee_account();
1314            tracing::info!(%builder_account, "prefunding builder account");
1315            state.prefund_account(builder_account, U256::MAX.into());
1316
1317            let persistence = persistence_opt.create().await.unwrap();
1318
1319            let chain_config = state.chain_config.resolve().unwrap_or_default();
1320
1321            // Create an empty list of catchup providers
1322            let catchup_providers = ParallelStateCatchup::new(&[]);
1323
1324            // If we have the state peers, add them
1325            if let Some(state_peers) = state_peers {
1326                catchup_providers.add_provider(Arc::new(state_peers));
1327            }
1328
1329            // If we have a working local catchup provider, add it
1330            match persistence
1331                .clone()
1332                .into_catchup_provider(BackoffParams::default())
1333            {
1334                Ok(local_catchup) => {
1335                    catchup_providers.add_provider(local_catchup);
1336                },
1337                Err(e) => {
1338                    tracing::warn!(
1339                        "Failed to create local catchup provider: {e:#}. Only using remote \
1340                         catchup."
1341                    );
1342                },
1343            };
1344
1345            let l1_client = self
1346                .l1_opt
1347                .clone()
1348                .connect(vec![self.l1_url.clone()])
1349                .expect("failed to create L1 client");
1350            l1_client.spawn_tasks().await;
1351
1352            let fetcher = Fetcher::new(
1353                Arc::new(catchup_providers.clone()),
1354                Arc::new(Mutex::new(persistence.clone())),
1355                l1_client.clone(),
1356                chain_config,
1357            );
1358            fetcher.spawn_update_loop().await;
1359
1360            let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
1361            let mut membership = EpochCommittees::new_stake(
1362                config.known_nodes_with_stake.clone(),
1363                config.known_da_nodes.clone(),
1364                block_reward,
1365                fetcher,
1366                config.epoch_height,
1367            );
1368            membership.reload_stake(50).await;
1369
1370            let membership = Arc::new(RwLock::new(membership));
1371            let persistence = Arc::new(persistence);
1372
1373            let coordinator = EpochMembershipCoordinator::new(
1374                membership,
1375                config.epoch_height,
1376                &persistence.clone(),
1377            );
1378
1379            let node_state = NodeState::new(
1380                i as u64,
1381                chain_config,
1382                l1_client,
1383                Arc::new(catchup_providers.clone()),
1384                V::Base::VERSION,
1385                coordinator.clone(),
1386                V::Base::VERSION,
1387            )
1388            .with_current_version(V::Base::version())
1389            .with_genesis(state)
1390            .with_epoch_height(config.epoch_height)
1391            .with_upgrades(upgrades)
1392            .with_epoch_start_block(config.epoch_start_block);
1393
1394            tracing::info!(
1395                i,
1396                key = %my_peer_config.stake_table_entry.stake_key,
1397                state_key = %my_peer_config.state_ver_key,
1398                "starting node",
1399            );
1400
1401            SequencerContext::init(
1402                NetworkConfig {
1403                    config,
1404                    // For testing, we use a fake network, so the rest of the network config beyond
1405                    // the base consensus config does not matter.
1406                    ..Default::default()
1407                },
1408                validator_config,
1409                coordinator,
1410                node_state,
1411                storage,
1412                catchup_providers,
1413                persistence,
1414                network,
1415                self.state_relay_url.clone(),
1416                metrics,
1417                stake_table_capacity,
1418                event_consumer,
1419                bind_version,
1420                Default::default(),
1421            )
1422            .await
1423            .unwrap()
1424        }
1425
1426        pub fn builder_key() -> EthKeyPair {
1427            FeeAccount::generated_from_seed_indexed([1; 32], 0).1
1428        }
1429    }
1430
1431    // Wait for decide event, make sure it matches submitted transaction. Return the block number
1432    // containing the transaction and the block payload size
1433    pub async fn wait_for_decide_on_handle(
1434        events: &mut (impl Stream<Item = Event> + Unpin),
1435        submitted_txn: &Transaction,
1436    ) -> (u64, usize) {
1437        let commitment = submitted_txn.commit();
1438
1439        // Keep getting events until we see a Decide event
1440        loop {
1441            let event = events.next().await.unwrap();
1442            tracing::info!("Received event from handle: {event:?}");
1443
1444            if let Decide { leaf_chain, .. } = event.event {
1445                if let Some((height, size)) =
1446                    leaf_chain.iter().find_map(|LeafInfo { leaf, .. }| {
1447                        if leaf
1448                            .block_payload()
1449                            .as_ref()?
1450                            .transaction_commitments(leaf.block_header().metadata())
1451                            .contains(&commitment)
1452                        {
1453                            let size = leaf.block_payload().unwrap().encode().len();
1454                            Some((leaf.block_header().block_number(), size))
1455                        } else {
1456                            None
1457                        }
1458                    })
1459                {
1460                    tracing::info!(height, "transaction {commitment} sequenced");
1461                    return (height, size);
1462                }
1463            } else {
1464                // Keep waiting
1465            }
1466        }
1467    }
1468
1469    /// Waits until a node has reached the given target epoch (exclusive).
1470    /// The function returns once the first event indicates an epoch higher than `target_epoch`.
1471    pub async fn wait_for_epochs(
1472        events: &mut (impl futures::Stream<Item = hotshot_types::event::Event<SeqTypes>>
1473                  + std::marker::Unpin),
1474        epoch_height: u64,
1475        target_epoch: u64,
1476    ) {
1477        while let Some(event) = events.next().await {
1478            if let EventType::Decide { leaf_chain, .. } = event.event {
1479                let leaf = leaf_chain[0].leaf.clone();
1480                let epoch = leaf.epoch(epoch_height);
1481                println!(
1482                    "Node decided at height: {}, epoch: {epoch:?}",
1483                    leaf.height(),
1484                );
1485
1486                if epoch > Some(EpochNumber::new(target_epoch)) {
1487                    break;
1488                }
1489            }
1490        }
1491    }
1492}
1493
1494#[cfg(test)]
1495mod test {
1496
1497    use alloy::node_bindings::Anvil;
1498    use espresso_types::{Header, MockSequencerVersions, NamespaceId, Payload, Transaction};
1499    use futures::StreamExt;
1500    use hotshot::types::EventType::Decide;
1501    use hotshot_example_types::node_types::TestVersions;
1502    use hotshot_types::{
1503        event::LeafInfo,
1504        traits::block_contents::{BlockHeader, BlockPayload},
1505    };
1506    use testing::{wait_for_decide_on_handle, TestConfigBuilder};
1507
1508    use self::testing::run_test_builder;
1509    use super::*;
1510
1511    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1512    async fn test_skeleton_instantiation() {
1513        // Assign `config` so it isn't dropped early.
1514        let anvil = Anvil::new().spawn();
1515        let url = anvil.endpoint_url();
1516        const NUM_NODES: usize = 5;
1517        let mut config = TestConfigBuilder::<NUM_NODES>::default()
1518            .l1_url(url)
1519            .build();
1520
1521        let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1522
1523        config.set_builder_urls(vec1::vec1![builder_url]);
1524
1525        let handles = config.init_nodes(MockSequencerVersions::new()).await;
1526
1527        let handle_0 = &handles[0];
1528
1529        // Hook the builder up to the event stream from the first node
1530        builder_task.start(Box::new(handle_0.event_stream().await));
1531
1532        let mut events = handle_0.event_stream().await;
1533
1534        for handle in handles.iter() {
1535            handle.start_consensus().await;
1536        }
1537
1538        // Submit target transaction to handle
1539        let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3]);
1540        handles[0]
1541            .submit_transaction(txn.clone())
1542            .await
1543            .expect("Failed to submit transaction");
1544        tracing::info!("Submitted transaction to handle: {txn:?}");
1545
1546        wait_for_decide_on_handle(&mut events, &txn).await;
1547    }
1548
1549    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1550    async fn test_header_invariants() {
1551        let success_height = 30;
1552        // Assign `config` so it isn't dropped early.
1553        let anvil = Anvil::new().spawn();
1554        let url = anvil.endpoint_url();
1555        const NUM_NODES: usize = 5;
1556        let mut config = TestConfigBuilder::<NUM_NODES>::default()
1557            .l1_url(url)
1558            .build();
1559
1560        let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1561
1562        config.set_builder_urls(vec1::vec1![builder_url]);
1563        let handles = config.init_nodes(MockSequencerVersions::new()).await;
1564
1565        let handle_0 = &handles[0];
1566
1567        let mut events = handle_0.event_stream().await;
1568
1569        // Hook the builder up to the event stream from the first node
1570        builder_task.start(Box::new(handle_0.event_stream().await));
1571
1572        for handle in handles.iter() {
1573            handle.start_consensus().await;
1574        }
1575
1576        let mut parent = {
1577            // TODO refactor repeated code from other tests
1578            let (genesis_payload, genesis_ns_table) =
1579                Payload::from_transactions([], &ValidatedState::default(), &NodeState::mock())
1580                    .await
1581                    .unwrap();
1582
1583            let genesis_state = NodeState::mock();
1584            Header::genesis::<TestVersions>(&genesis_state, genesis_payload, &genesis_ns_table)
1585        };
1586
1587        loop {
1588            let event = events.next().await.unwrap();
1589            tracing::info!("Received event from handle: {event:?}");
1590            let Decide { leaf_chain, .. } = event.event else {
1591                continue;
1592            };
1593            tracing::info!("Got decide {leaf_chain:?}");
1594
1595            // Check that each successive header satisfies invariants relative to its parent: all
1596            // the fields which should be monotonic are.
1597            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
1598                let header = leaf.block_header().clone();
1599                if header.height() == 0 {
1600                    parent = header;
1601                    continue;
1602                }
1603                assert_eq!(header.height(), parent.height() + 1);
1604                assert!(header.timestamp() >= parent.timestamp());
1605                assert!(header.l1_head() >= parent.l1_head());
1606                assert!(header.l1_finalized() >= parent.l1_finalized());
1607                parent = header;
1608            }
1609
1610            if parent.height() >= success_height {
1611                break;
1612            }
1613        }
1614    }
1615}