sequencer/
lib.rs

1pub mod api;
2pub mod catchup;
3pub mod context;
4pub mod genesis;
5mod proposal_fetcher;
6mod request_response;
7
8mod external_event_handler;
9pub mod options;
10pub mod state_signature;
11
12mod restart_tests;
13
14mod message_compat_tests;
15
16use std::sync::Arc;
17
18use alloy::primitives::U256;
19use anyhow::Context;
20use async_lock::{Mutex, RwLock};
21use catchup::{ParallelStateCatchup, StatePeers};
22use context::SequencerContext;
23use espresso_types::{
24    traits::{EventConsumer, MembershipPersistence},
25    v0_3::Fetcher,
26    BackoffParams, EpochCommittees, L1ClientOptions, NodeState, PubKey, SeqTypes, ValidatedState,
27};
28use genesis::L1Finalized;
29use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
30use libp2p::Multiaddr;
31use network::libp2p::split_off_peer_id;
32use options::Identity;
33use proposal_fetcher::ProposalFetcherConfig;
34use tokio::select;
35use tracing::info;
36use url::Url;
37
38use crate::request_response::data_source::Storage as RequestResponseStorage;
39pub mod persistence;
40pub mod state;
41use std::{fmt::Debug, marker::PhantomData, time::Duration};
42
43use derivative::Derivative;
44use espresso_types::v0::traits::SequencerPersistence;
45pub use genesis::Genesis;
46use hotshot::{
47    traits::implementations::{
48        derive_libp2p_multiaddr, derive_libp2p_peer_id, CdnMetricsValue, CdnTopic,
49        CombinedNetworks, GossipConfig, KeyPair, Libp2pNetwork, MemoryNetwork, PushCdnNetwork,
50        RequestResponseConfig, WrappedSignatureKey,
51    },
52    types::SignatureKey,
53};
54use hotshot_orchestrator::client::{get_complete_config, OrchestratorClient};
55use hotshot_types::{
56    data::ViewNumber,
57    epoch_membership::EpochMembershipCoordinator,
58    light_client::{StateKeyPair, StateSignKey},
59    signature_key::{BLSPrivKey, BLSPubKey},
60    traits::{
61        metrics::{Metrics, NoMetrics},
62        network::ConnectedNetwork,
63        node_implementation::{NodeImplementation, NodeType, Versions},
64        storage::Storage,
65    },
66    utils::BuilderCommitment,
67    ValidatorConfig,
68};
69pub use options::Options;
70use serde::{Deserialize, Serialize};
71use vbs::version::{StaticVersion, StaticVersionType};
72pub mod network;
73
74mod run;
75pub use run::main;
76
77pub const RECENT_STAKE_TABLES_LIMIT: u64 = 20;
78/// The Sequencer node is generic over the hotshot CommChannel.
79#[derive(Derivative, Serialize, Deserialize)]
80#[derivative(
81    Copy(bound = ""),
82    Debug(bound = ""),
83    Default(bound = ""),
84    PartialEq(bound = ""),
85    Eq(bound = ""),
86    Hash(bound = "")
87)]
88pub struct Node<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(PhantomData<fn(&N, &P)>);
89
90// Using derivative to derive Clone triggers the clippy lint
91// https://rust-lang.github.io/rust-clippy/master/index.html#/incorrect_clone_impl_on_copy_type
92impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Clone for Node<N, P> {
93    fn clone(&self) -> Self {
94        *self
95    }
96}
97
98pub type SequencerApiVersion = StaticVersion<0, 1>;
99
100impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> NodeImplementation<SeqTypes>
101    for Node<N, P>
102{
103    type Network = N;
104    type Storage = Arc<P>;
105}
106
107#[derive(Clone, Debug)]
108pub struct NetworkParams {
109    /// The address where a CDN marshal is located
110    pub cdn_endpoint: String,
111    pub orchestrator_url: Url,
112    pub state_relay_server_url: Url,
113    pub private_staking_key: BLSPrivKey,
114    pub private_state_key: StateSignKey,
115    pub state_peers: Vec<Url>,
116    pub config_peers: Option<Vec<Url>>,
117    pub catchup_backoff: BackoffParams,
118    /// The address to advertise as our public API's URL
119    pub public_api_url: Option<Url>,
120
121    /// The address to send to other Libp2p nodes to contact us
122    pub libp2p_advertise_address: String,
123    /// The address to bind to for Libp2p
124    pub libp2p_bind_address: String,
125    /// The (optional) bootstrap node addresses for Libp2p. If supplied, these will
126    /// override the bootstrap nodes specified in the config file.
127    pub libp2p_bootstrap_nodes: Option<Vec<Multiaddr>>,
128
129    /// The heartbeat interval
130    pub libp2p_heartbeat_interval: Duration,
131
132    /// The number of past heartbeats to gossip about
133    pub libp2p_history_gossip: usize,
134    /// The number of past heartbeats to remember the full messages for
135    pub libp2p_history_length: usize,
136
137    /// The target number of peers in the mesh
138    pub libp2p_mesh_n: usize,
139    /// The maximum number of peers in the mesh
140    pub libp2p_mesh_n_high: usize,
141    /// The minimum number of peers in the mesh
142    pub libp2p_mesh_n_low: usize,
143    /// The minimum number of mesh peers that must be outbound
144    pub libp2p_mesh_outbound_min: usize,
145
146    /// The maximum gossip message size
147    pub libp2p_max_gossip_transmit_size: usize,
148
149    /// The maximum direct message size
150    pub libp2p_max_direct_transmit_size: u64,
151
152    /// The maximum number of IHAVE messages to accept from a Libp2p peer within a heartbeat
153    pub libp2p_max_ihave_length: usize,
154
155    /// The maximum number of IHAVE messages to accept from a Libp2p peer within a heartbeat
156    pub libp2p_max_ihave_messages: usize,
157
158    /// The time period that message hashes are stored in the cache
159    pub libp2p_published_message_ids_cache_time: Duration,
160
161    /// The time to wait for a Libp2p message requested through IWANT following an IHAVE advertisement
162    pub libp2p_iwant_followup_time: Duration,
163
164    /// The maximum number of Libp2p messages we will process in a given RPC
165    pub libp2p_max_messages_per_rpc: Option<usize>,
166
167    /// How many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them
168    pub libp2p_gossip_retransmission: u32,
169
170    /// If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score
171    pub libp2p_flood_publish: bool,
172
173    /// The time period that Libp2p message hashes are stored in the cache
174    pub libp2p_duplicate_cache_time: Duration,
175
176    /// Time to live for Libp2p fanout peers
177    pub libp2p_fanout_ttl: Duration,
178
179    /// Initial delay in each Libp2p heartbeat
180    pub libp2p_heartbeat_initial_delay: Duration,
181
182    /// How many Libp2p peers we will emit gossip to at each heartbeat
183    pub libp2p_gossip_factor: f64,
184
185    /// Minimum number of Libp2p peers to emit gossip to during a heartbeat
186    pub libp2p_gossip_lazy: usize,
187}
188
189pub struct L1Params {
190    pub urls: Vec<Url>,
191    pub options: L1ClientOptions,
192}
193
194#[allow(clippy::too_many_arguments)]
195pub async fn init_node<
196    P: SequencerPersistence + MembershipPersistence + DhtPersistentStorage,
197    V: Versions,
198>(
199    genesis: Genesis,
200    network_params: NetworkParams,
201    metrics: &dyn Metrics,
202    mut persistence: P,
203    l1_params: L1Params,
204    storage: Option<RequestResponseStorage>,
205    seq_versions: V,
206    event_consumer: impl EventConsumer + 'static,
207    is_da: bool,
208    identity: Identity,
209    proposal_fetcher_config: ProposalFetcherConfig,
210) -> anyhow::Result<SequencerContext<network::Production, P, V>>
211where
212    Arc<P>: Storage<SeqTypes>,
213{
214    // Expose git information via status API.
215    metrics
216        .text_family(
217            "version".into(),
218            vec!["rev".into(), "desc".into(), "timestamp".into()],
219        )
220        .create(vec![
221            env!("VERGEN_GIT_SHA").into(),
222            env!("VERGEN_GIT_DESCRIBE").into(),
223            env!("VERGEN_GIT_COMMIT_TIMESTAMP").into(),
224        ]);
225
226    // Expose Node Entity Information via the status/metrics API
227    metrics
228        .text_family(
229            "node_identity_general".into(),
230            vec![
231                "name".into(),
232                "company_name".into(),
233                "company_website".into(),
234                "operating_system".into(),
235                "node_type".into(),
236                "network_type".into(),
237            ],
238        )
239        .create(vec![
240            identity.node_name.unwrap_or("".into()),
241            identity.company_name.unwrap_or("".into()),
242            identity
243                .company_website
244                .map(|u| u.into())
245                .unwrap_or("".into()),
246            identity.operating_system.unwrap_or("".into()),
247            identity.node_type.unwrap_or("".into()),
248            identity.network_type.unwrap_or("".into()),
249        ]);
250
251    // Expose Node Identity Location via the status/metrics API
252    metrics
253        .text_family(
254            "node_identity_location".into(),
255            vec!["country".into(), "latitude".into(), "longitude".into()],
256        )
257        .create(vec![
258            identity.country_code.unwrap_or("".into()),
259            identity
260                .latitude
261                .map(|l| l.to_string())
262                .unwrap_or("".into()),
263            identity
264                .longitude
265                .map(|l| l.to_string())
266                .unwrap_or("".into()),
267        ]);
268
269    // Stick our public key in `metrics` so it is easily accessible via the status API.
270    let pub_key = BLSPubKey::from_private(&network_params.private_staking_key);
271    metrics
272        .text_family("node".into(), vec!["key".into()])
273        .create(vec![pub_key.to_string()]);
274
275    // Parse the Libp2p bind and advertise addresses to multiaddresses
276    let libp2p_bind_address = derive_libp2p_multiaddr(&network_params.libp2p_bind_address)
277        .with_context(|| {
278            format!(
279                "Failed to derive Libp2p bind address of {}",
280                &network_params.libp2p_bind_address
281            )
282        })?;
283    let libp2p_advertise_address =
284        derive_libp2p_multiaddr(&network_params.libp2p_advertise_address).with_context(|| {
285            format!(
286                "Failed to derive Libp2p advertise address of {}",
287                &network_params.libp2p_advertise_address
288            )
289        })?;
290
291    info!("Libp2p bind address: {}", libp2p_bind_address);
292    info!("Libp2p advertise address: {}", libp2p_advertise_address);
293
294    // Orchestrator client
295    let orchestrator_client = OrchestratorClient::new(network_params.orchestrator_url);
296    let state_key_pair = StateKeyPair::from_sign_key(network_params.private_state_key);
297    let validator_config = ValidatorConfig {
298        public_key: pub_key,
299        private_key: network_params.private_staking_key,
300        stake_value: U256::ONE,
301        state_public_key: state_key_pair.ver_key(),
302        state_private_key: state_key_pair.sign_key(),
303        is_da,
304    };
305
306    // Derive our Libp2p public key from our private key
307    let libp2p_public_key = derive_libp2p_peer_id::<<SeqTypes as NodeType>::SignatureKey>(
308        &validator_config.private_key,
309    )
310    .with_context(|| "Failed to derive Libp2p peer ID")?;
311
312    // Print the libp2p public key
313    info!("Starting Libp2p with PeerID: {libp2p_public_key}");
314
315    let loaded_network_config_from_persistence = persistence.load_config().await?;
316    let (mut network_config, wait_for_orchestrator) = match (
317        loaded_network_config_from_persistence,
318        network_params.config_peers,
319    ) {
320        (Some(config), _) => {
321            tracing::warn!("loaded network config from storage, rejoining existing network");
322            (config, false)
323        },
324        // If we were told to fetch the config from an already-started peer, do so.
325        (None, Some(peers)) => {
326            tracing::warn!(?peers, "loading network config from peers");
327            let peers = StatePeers::<SequencerApiVersion>::from_urls(
328                peers,
329                network_params.catchup_backoff,
330                &NoMetrics,
331            );
332            let config = peers.fetch_config(validator_config.clone()).await?;
333
334            tracing::warn!(
335                node_id = config.node_index,
336                stake_table = ?config.config.known_nodes_with_stake,
337                "loaded config",
338            );
339            persistence.save_config(&config).await?;
340            (config, false)
341        },
342        // Otherwise, this is a fresh network; load from the orchestrator.
343        (None, None) => {
344            tracing::warn!("loading network config from orchestrator");
345            tracing::warn!(
346                "waiting for other nodes to connect, DO NOT RESTART until fully connected"
347            );
348            let config = get_complete_config(
349                &orchestrator_client,
350                validator_config.clone(),
351                // Register in our Libp2p advertise address and public key so other nodes
352                // can contact us on startup
353                Some(libp2p_advertise_address),
354                Some(libp2p_public_key),
355            )
356            .await?
357            .0;
358
359            tracing::warn!(
360                node_id = config.node_index,
361                stake_table = ?config.config.known_nodes_with_stake,
362                "loaded config",
363            );
364            persistence.save_config(&config).await?;
365            tracing::warn!("all nodes connected");
366            (config, true)
367        },
368    };
369
370    if let Some(upgrade) = genesis.upgrades.get(&V::Upgrade::VERSION) {
371        upgrade.set_hotshot_config_parameters(&mut network_config.config);
372    }
373
374    let epoch_height = genesis.epoch_height.unwrap_or_default();
375    let drb_difficulty = genesis.drb_difficulty.unwrap_or_default();
376    let drb_upgrade_difficulty = genesis.drb_upgrade_difficulty.unwrap_or_default();
377    let epoch_start_block = genesis.epoch_start_block.unwrap_or_default();
378    let stake_table_capacity = genesis
379        .stake_table_capacity
380        .unwrap_or(hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY);
381
382    tracing::warn!("setting epoch_height={epoch_height:?}");
383    tracing::warn!("setting drb_difficulty={drb_difficulty:?}");
384    tracing::warn!("setting drb_upgrade_difficulty={drb_upgrade_difficulty:?}");
385    tracing::warn!("setting epoch_start_block={epoch_start_block:?}");
386    tracing::warn!("setting stake_table_capacity={stake_table_capacity:?}");
387    network_config.config.epoch_height = epoch_height;
388    network_config.config.drb_difficulty = drb_difficulty;
389    network_config.config.drb_upgrade_difficulty = drb_upgrade_difficulty;
390    network_config.config.epoch_start_block = epoch_start_block;
391    network_config.config.stake_table_capacity = stake_table_capacity;
392
393    // If the `Libp2p` bootstrap nodes were supplied via the command line, override those
394    // present in the config file.
395    if let Some(bootstrap_nodes) = network_params.libp2p_bootstrap_nodes {
396        if let Some(libp2p_config) = network_config.libp2p_config.as_mut() {
397            // If the libp2p configuration is present, we can override the bootstrap nodes.
398
399            // Split off the peer ID from the addresses
400            libp2p_config.bootstrap_nodes = bootstrap_nodes
401                .into_iter()
402                .map(split_off_peer_id)
403                .collect::<Result<Vec<_>, _>>()
404                .with_context(|| "Failed to parse peer ID from bootstrap node")?;
405        } else {
406            // If not, don't try launching with them. Eventually we may want to
407            // provide a default configuration here instead.
408            tracing::warn!("No libp2p configuration found, ignoring supplied bootstrap nodes");
409        }
410    }
411
412    let node_index = network_config.node_index;
413
414    // If we are a DA node, we need to subscribe to the DA topic
415    let topics = {
416        let mut topics = vec![CdnTopic::Global];
417        if is_da {
418            topics.push(CdnTopic::Da);
419        }
420        topics
421    };
422
423    // Initialize the push CDN network (and perform the initial connection)
424    let cdn_network = PushCdnNetwork::new(
425        network_params.cdn_endpoint,
426        topics,
427        KeyPair {
428            public_key: WrappedSignatureKey(validator_config.public_key),
429            private_key: validator_config.private_key.clone(),
430        },
431        CdnMetricsValue::new(metrics),
432    )
433    .with_context(|| format!("Failed to create CDN network {node_index}"))?;
434
435    // Configure gossipsub based on the command line options
436    let gossip_config = GossipConfig {
437        heartbeat_interval: network_params.libp2p_heartbeat_interval,
438        history_gossip: network_params.libp2p_history_gossip,
439        history_length: network_params.libp2p_history_length,
440        mesh_n: network_params.libp2p_mesh_n,
441        mesh_n_high: network_params.libp2p_mesh_n_high,
442        mesh_n_low: network_params.libp2p_mesh_n_low,
443        mesh_outbound_min: network_params.libp2p_mesh_outbound_min,
444        max_ihave_messages: network_params.libp2p_max_ihave_messages,
445        max_transmit_size: network_params.libp2p_max_gossip_transmit_size,
446        max_ihave_length: network_params.libp2p_max_ihave_length,
447        published_message_ids_cache_time: network_params.libp2p_published_message_ids_cache_time,
448        iwant_followup_time: network_params.libp2p_iwant_followup_time,
449        max_messages_per_rpc: network_params.libp2p_max_messages_per_rpc,
450        gossip_retransmission: network_params.libp2p_gossip_retransmission,
451        flood_publish: network_params.libp2p_flood_publish,
452        duplicate_cache_time: network_params.libp2p_duplicate_cache_time,
453        fanout_ttl: network_params.libp2p_fanout_ttl,
454        heartbeat_initial_delay: network_params.libp2p_heartbeat_initial_delay,
455        gossip_factor: network_params.libp2p_gossip_factor,
456        gossip_lazy: network_params.libp2p_gossip_lazy,
457    };
458
459    // Configure request/response based on the command line options
460    let request_response_config = RequestResponseConfig {
461        request_size_maximum: network_params.libp2p_max_direct_transmit_size,
462        response_size_maximum: network_params.libp2p_max_direct_transmit_size,
463    };
464
465    let l1_client = l1_params
466        .options
467        .with_metrics(metrics)
468        .connect(l1_params.urls)
469        .with_context(|| "failed to create L1 client")?;
470    genesis.validate_fee_contract(&l1_client).await?;
471
472    l1_client.spawn_tasks().await;
473    let l1_genesis = match genesis.l1_finalized {
474        L1Finalized::Block(b) => b,
475        L1Finalized::Number { number } => l1_client.wait_for_finalized_block(number).await,
476        L1Finalized::Timestamp { timestamp } => {
477            l1_client
478                .wait_for_finalized_block_with_timestamp(U256::from(timestamp.unix_timestamp()))
479                .await
480        },
481    };
482
483    let mut genesis_state = ValidatedState {
484        chain_config: genesis.chain_config.into(),
485        ..Default::default()
486    };
487    for (address, amount) in genesis.accounts {
488        tracing::warn!(%address, %amount, "Prefunding account for demo");
489        genesis_state.prefund_account(address, amount);
490    }
491
492    // Create the list of parallel catchup providers
493    let state_catchup_providers = ParallelStateCatchup::new(&[]);
494
495    // Add the state peers to the list
496    let state_peers = StatePeers::<SequencerApiVersion>::from_urls(
497        network_params.state_peers,
498        network_params.catchup_backoff,
499        metrics,
500    );
501    state_catchup_providers.add_provider(Arc::new(state_peers));
502
503    // Add the local (persistence) catchup provider to the list (if we can)
504    match persistence
505        .clone()
506        .into_catchup_provider(network_params.catchup_backoff)
507    {
508        Ok(catchup) => {
509            state_catchup_providers.add_provider(Arc::new(catchup));
510        },
511        Err(e) => {
512            tracing::warn!(
513                "Failed to create local catchup provider: {e:#}. Only using remote catchup."
514            );
515        },
516    };
517
518    persistence.enable_metrics(metrics);
519
520    let fetcher = Fetcher::new(
521        Arc::new(state_catchup_providers.clone()),
522        Arc::new(Mutex::new(persistence.clone())),
523        l1_client.clone(),
524        genesis.chain_config,
525    );
526    fetcher.spawn_update_loop().await;
527    let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
528    // Create the HotShot membership
529    let mut membership = EpochCommittees::new_stake(
530        network_config.config.known_nodes_with_stake.clone(),
531        network_config.config.known_da_nodes.clone(),
532        block_reward,
533        fetcher,
534        epoch_height,
535    );
536    membership.reload_stake(RECENT_STAKE_TABLES_LIMIT).await;
537
538    let membership: Arc<RwLock<EpochCommittees>> = Arc::new(RwLock::new(membership));
539    let persistence = Arc::new(persistence);
540    let coordinator = EpochMembershipCoordinator::new(
541        membership,
542        network_config.config.epoch_height,
543        &persistence.clone(),
544    );
545
546    let instance_state = NodeState {
547        chain_config: genesis.chain_config,
548        l1_client,
549        genesis_header: genesis.header,
550        genesis_state,
551        l1_genesis: Some(l1_genesis),
552        node_id: node_index,
553        upgrades: genesis.upgrades,
554        current_version: V::Base::VERSION,
555        epoch_height: Some(epoch_height),
556        state_catchup: Arc::new(state_catchup_providers.clone()),
557        coordinator: coordinator.clone(),
558        genesis_version: genesis.genesis_version,
559        epoch_start_block: genesis.epoch_start_block.unwrap_or_default(),
560    };
561
562    // Initialize the Libp2p network
563    let network = {
564        let p2p_network = Libp2pNetwork::from_config(
565            network_config.clone(),
566            persistence.clone(),
567            coordinator.membership().clone(),
568            gossip_config,
569            request_response_config,
570            libp2p_bind_address,
571            &validator_config.public_key,
572            // We need the private key so we can derive our Libp2p keypair
573            // (using https://docs.rs/blake3/latest/blake3/fn.derive_key.html)
574            &validator_config.private_key,
575            hotshot::traits::implementations::Libp2pMetricsValue::new(metrics),
576        )
577        .await
578        .with_context(|| {
579            format!(
580                "Failed to create libp2p network on node {node_index}; binding to {:?}",
581                network_params.libp2p_bind_address
582            )
583        })?;
584
585        tracing::warn!("Waiting for at least one connection to be initialized");
586        select! {
587            _ = cdn_network.wait_for_ready() => {
588                tracing::warn!("CDN connection initialized");
589            },
590            _ = p2p_network.wait_for_ready() => {
591                tracing::warn!("P2P connection initialized");
592            },
593        };
594
595        // Combine the CDN and P2P networks
596        Arc::from(CombinedNetworks::new(
597            cdn_network,
598            p2p_network,
599            Some(Duration::from_secs(1)),
600        ))
601    };
602
603    let mut ctx = SequencerContext::init(
604        network_config,
605        validator_config,
606        coordinator,
607        instance_state,
608        storage,
609        state_catchup_providers,
610        persistence,
611        network,
612        Some(network_params.state_relay_server_url),
613        metrics,
614        genesis.stake_table.capacity,
615        event_consumer,
616        seq_versions,
617        proposal_fetcher_config,
618    )
619    .await?;
620    if wait_for_orchestrator {
621        ctx = ctx.wait_for_orchestrator(orchestrator_client);
622    }
623    Ok(ctx)
624}
625
626pub fn empty_builder_commitment() -> BuilderCommitment {
627    BuilderCommitment::from_bytes([])
628}
629
630#[cfg(any(test, feature = "testing"))]
631pub mod testing {
632    use std::{
633        collections::{BTreeMap, HashMap},
634        time::Duration,
635    };
636
637    use alloy::{
638        network::EthereumWallet,
639        node_bindings::{Anvil, AnvilInstance},
640        primitives::U256,
641        providers::{
642            fillers::{
643                BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
644            },
645            layers::AnvilProvider,
646            ProviderBuilder, RootProvider,
647        },
648        signers::{
649            k256::ecdsa::SigningKey,
650            local::{LocalSigner, PrivateKeySigner},
651        },
652    };
653    use async_lock::RwLock;
654    use catchup::NullStateCatchup;
655    use committable::Committable;
656    use espresso_contract_deployer::{
657        builder::DeployerArgsBuilder, network_config::light_client_genesis_from_stake_table,
658        Contract, Contracts,
659    };
660    use espresso_types::{
661        eth_signature_key::EthKeyPair,
662        v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, StateCatchup},
663        EpochVersion, Event, FeeAccount, L1Client, NetworkConfig, PubKey, SeqTypes, Transaction,
664        Upgrade, UpgradeMap,
665    };
666    use futures::{
667        future::join_all,
668        stream::{Stream, StreamExt},
669    };
670    use hotshot::{
671        traits::{
672            implementations::{MasterMap, MemoryNetwork},
673            BlockPayload,
674        },
675        types::EventType::Decide,
676    };
677    use hotshot_builder_core_refactored::service::{
678        BuilderConfig as LegacyBuilderConfig, GlobalState as LegacyGlobalState,
679    };
680    use hotshot_testing::block_builder::{
681        BuilderTask, SimpleBuilderImplementation, TestBuilderImplementation,
682    };
683    use hotshot_types::{
684        event::LeafInfo,
685        light_client::StateKeyPair,
686        signature_key::BLSKeyPair,
687        traits::{
688            block_contents::BlockHeader, metrics::NoMetrics, network::Topic,
689            signature_key::BuilderSignatureKey, EncodeBytes,
690        },
691        HotShotConfig, PeerConfig,
692    };
693    use portpicker::pick_unused_port;
694    use rand::SeedableRng as _;
695    use rand_chacha::ChaCha20Rng;
696    use staking_cli::demo::{setup_stake_table_contract_for_test, DelegationConfig};
697    use tokio::spawn;
698    use vbs::version::Version;
699
700    use super::*;
701    use crate::{
702        catchup::ParallelStateCatchup,
703        persistence::no_storage::{self, NoStorage},
704    };
705
706    const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
707    const BUILDER_CHANNEL_CAPACITY_FOR_TEST: usize = 128;
708    type AnvilFillProvider = AnvilProvider<
709        FillProvider<
710            JoinFill<
711                alloy::providers::Identity,
712                JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
713            >,
714            RootProvider,
715        >,
716    >;
717    struct LegacyBuilderImplementation {
718        global_state: Arc<LegacyGlobalState<SeqTypes>>,
719    }
720
721    impl BuilderTask<SeqTypes> for LegacyBuilderImplementation {
722        fn start(
723            self: Box<Self>,
724            stream: Box<
725                dyn futures::prelude::Stream<Item = hotshot::types::Event<SeqTypes>>
726                    + std::marker::Unpin
727                    + Send
728                    + 'static,
729            >,
730        ) {
731            spawn(async move {
732                let res = self.global_state.start_event_loop(stream).await;
733                tracing::error!(?res, "testing legacy builder service exited");
734            });
735        }
736    }
737
738    pub async fn run_legacy_builder<const NUM_NODES: usize>(
739        port: Option<u16>,
740        max_block_size: Option<u64>,
741    ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
742        let builder_key_pair = TestConfig::<0>::builder_key();
743        let port = port.unwrap_or_else(|| pick_unused_port().expect("No ports available"));
744
745        // This should never fail.
746        let url: Url = format!("http://localhost:{port}")
747            .parse()
748            .expect("Failed to parse builder URL");
749
750        // create the global state
751        let global_state = LegacyGlobalState::new(
752            LegacyBuilderConfig {
753                builder_keys: (builder_key_pair.fee_account(), builder_key_pair),
754                max_api_waiting_time: Duration::from_secs(1),
755                max_block_size_increment_period: Duration::from_secs(60),
756                maximize_txn_capture_timeout: Duration::from_millis(100),
757                txn_garbage_collect_duration: Duration::from_secs(60),
758                txn_channel_capacity: BUILDER_CHANNEL_CAPACITY_FOR_TEST,
759                tx_status_cache_capacity: 81920,
760                base_fee: 10,
761            },
762            NodeState::default(),
763            max_block_size.unwrap_or(300),
764            NUM_NODES,
765        );
766
767        // Create and spawn the tide-disco app to serve the builder APIs
768        let app = Arc::clone(&global_state)
769            .into_app()
770            .expect("Failed to create builder tide-disco app");
771
772        spawn(
773            app.serve(
774                format!("http://0.0.0.0:{port}")
775                    .parse::<Url>()
776                    .expect("Failed to parse builder listener"),
777                EpochVersion::instance(),
778            ),
779        );
780
781        // Pass on the builder task to be injected in the testing harness
782        (Box::new(LegacyBuilderImplementation { global_state }), url)
783    }
784
785    pub async fn run_test_builder<const NUM_NODES: usize>(
786        port: Option<u16>,
787    ) -> (Box<dyn BuilderTask<SeqTypes>>, Url) {
788        let port = port.unwrap_or_else(|| pick_unused_port().expect("No ports available"));
789
790        // This should never fail.
791        let url: Url = format!("http://localhost:{port}")
792            .parse()
793            .expect("Failed to parse builder URL");
794        tracing::info!("Starting test builder on {url}");
795
796        (
797            <SimpleBuilderImplementation as TestBuilderImplementation<SeqTypes>>::start(
798                NUM_NODES,
799                format!("http://0.0.0.0:{port}")
800                    .parse()
801                    .expect("Failed to parse builder listener"),
802                (),
803                HashMap::new(),
804            )
805            .await,
806            url,
807        )
808    }
809
810    pub struct TestConfigBuilder<const NUM_NODES: usize> {
811        config: HotShotConfig<SeqTypes>,
812        priv_keys: Vec<BLSPrivKey>,
813        state_key_pairs: Vec<StateKeyPair>,
814        master_map: Arc<MasterMap<PubKey>>,
815        l1_url: Url,
816        l1_opt: L1ClientOptions,
817        anvil_provider: Option<AnvilFillProvider>,
818        signer: LocalSigner<SigningKey>,
819        state_relay_url: Option<Url>,
820        builder_port: Option<u16>,
821        upgrades: BTreeMap<Version, Upgrade>,
822    }
823
824    pub fn staking_priv_keys(
825        priv_keys: &[BLSPrivKey],
826        state_key_pairs: &[StateKeyPair],
827        num_nodes: usize,
828    ) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
829        let seed = [42u8; 32];
830        let mut rng = ChaCha20Rng::from_seed(seed); // Create a deterministic RNG
831        let eth_key_pairs = (0..num_nodes).map(|_| SigningKey::random(&mut rng).into());
832        eth_key_pairs
833            .zip(priv_keys.iter())
834            .zip(state_key_pairs.iter())
835            .map(|((eth, bls), state)| (eth, bls.clone().into(), state.clone()))
836            .collect()
837    }
838
839    impl<const NUM_NODES: usize> TestConfigBuilder<NUM_NODES> {
840        pub fn builder_port(mut self, builder_port: Option<u16>) -> Self {
841            self.builder_port = builder_port;
842            self
843        }
844
845        pub fn state_relay_url(mut self, url: Url) -> Self {
846            self.state_relay_url = Some(url);
847            self
848        }
849
850        /// Sets the Anvil provider, constructed using the Anvil instance.
851        /// Also sets the L1 URL based on the Anvil endpoint.
852        /// The `AnvilProvider` can be used to configure the Anvil, for example,
853        /// by enabling interval mining after the test network is initialized.
854        pub fn anvil_provider(mut self, anvil: AnvilInstance) -> Self {
855            self.l1_url = anvil.endpoint().parse().unwrap();
856            let l1_client = L1Client::anvil(&anvil).expect("create l1 client");
857            let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
858            self.anvil_provider = Some(anvil_provider);
859            self
860        }
861
862        /// Sets a custom L1 URL, overriding any previously set Anvil instance URL.
863        /// This removes the anvil provider, as well as it is no longer needed
864        pub fn l1_url(mut self, l1_url: Url) -> Self {
865            self.anvil_provider = None;
866            self.l1_url = l1_url;
867            self
868        }
869
870        pub fn l1_opt(mut self, opt: L1ClientOptions) -> Self {
871            self.l1_opt = opt;
872            self
873        }
874
875        pub fn signer(mut self, signer: LocalSigner<SigningKey>) -> Self {
876            self.signer = signer;
877            self
878        }
879
880        pub fn upgrades<V: Versions>(mut self, upgrades: BTreeMap<Version, Upgrade>) -> Self {
881            let upgrade = upgrades.get(&<V as Versions>::Upgrade::VERSION).unwrap();
882            upgrade.set_hotshot_config_parameters(&mut self.config);
883            self.upgrades = upgrades;
884            self
885        }
886
887        /// Version specific upgrade setup. Extend to future upgrades
888        /// by adding a branch to the `match` statement.
889        pub async fn set_upgrades(mut self, version: Version) -> Self {
890            let upgrade = match version {
891                version if version >= EpochVersion::VERSION => {
892                    tracing::debug!(?version, "upgrade version");
893                    let blocks_per_epoch = self.config.epoch_height;
894                    let epoch_start_block = self.config.epoch_start_block;
895
896                    let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
897                        &self.config.hotshot_stake_table(),
898                        STAKE_TABLE_CAPACITY_FOR_TEST,
899                    )
900                    .unwrap();
901
902                    let validators =
903                        staking_priv_keys(&self.priv_keys, &self.state_key_pairs, NUM_NODES);
904
905                    let deployer = ProviderBuilder::new()
906                        .wallet(EthereumWallet::from(self.signer.clone()))
907                        .on_http(self.l1_url.clone());
908
909                    let mut contracts = Contracts::new();
910                    let args = DeployerArgsBuilder::default()
911                        .deployer(deployer.clone())
912                        .mock_light_client(true)
913                        .genesis_lc_state(genesis_state)
914                        .genesis_st_state(genesis_stake)
915                        .blocks_per_epoch(blocks_per_epoch)
916                        .epoch_start_block(epoch_start_block)
917                        .multisig_pauser(self.signer.address())
918                        .token_name("Espresso".to_string())
919                        .token_symbol("ESP".to_string())
920                        .initial_token_supply(U256::from(3590000000u64))
921                        .ops_timelock_delay(U256::from(0))
922                        .ops_timelock_admin(self.signer.address())
923                        .ops_timelock_proposers(vec![self.signer.address()])
924                        .ops_timelock_executors(vec![self.signer.address()])
925                        .safe_exit_timelock_delay(U256::from(10))
926                        .safe_exit_timelock_admin(self.signer.address())
927                        .safe_exit_timelock_proposers(vec![self.signer.address()])
928                        .safe_exit_timelock_executors(vec![self.signer.address()])
929                        .build()
930                        .unwrap();
931                    args.deploy_all(&mut contracts)
932                        .await
933                        .expect("failed to deploy all contracts");
934
935                    let st_addr = contracts
936                        .address(Contract::StakeTableProxy)
937                        .expect("StakeTableProxy address not found");
938                    setup_stake_table_contract_for_test(
939                        self.l1_url.clone(),
940                        &deployer,
941                        st_addr,
942                        validators,
943                        DelegationConfig::default(),
944                    )
945                    .await
946                    .expect("stake table setup failed");
947
948                    Upgrade::pos_view_based(st_addr)
949                },
950                _ => panic!("Upgrade not configured for version {version:?}"),
951            };
952
953            let mut upgrades = std::collections::BTreeMap::new();
954            upgrade.set_hotshot_config_parameters(&mut self.config);
955            upgrades.insert(version, upgrade);
956
957            self.upgrades = upgrades;
958            self
959        }
960
961        pub fn epoch_height(mut self, epoch_height: u64) -> Self {
962            self.config.epoch_height = epoch_height;
963            self
964        }
965
966        pub fn epoch_start_block(mut self, start_block: u64) -> Self {
967            self.config.epoch_start_block = start_block;
968            self
969        }
970
971        pub fn build(self) -> TestConfig<NUM_NODES> {
972            TestConfig {
973                config: self.config,
974                priv_keys: self.priv_keys,
975                state_key_pairs: self.state_key_pairs,
976                master_map: self.master_map,
977                l1_url: self.l1_url,
978                l1_opt: self.l1_opt,
979                signer: self.signer,
980                state_relay_url: self.state_relay_url,
981                builder_port: self.builder_port,
982                upgrades: self.upgrades,
983                anvil_provider: self.anvil_provider,
984            }
985        }
986
987        pub fn stake_table_capacity(mut self, stake_table_capacity: usize) -> Self {
988            self.config.stake_table_capacity = stake_table_capacity;
989            self
990        }
991    }
992
993    impl<const NUM_NODES: usize> Default for TestConfigBuilder<NUM_NODES> {
994        fn default() -> Self {
995            let num_nodes = NUM_NODES;
996
997            // Generate keys for the nodes.
998            let seed = [0; 32];
999            let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..num_nodes)
1000                .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed(seed, i as u64))
1001                .unzip();
1002            let state_key_pairs = (0..num_nodes)
1003                .map(|i| StateKeyPair::generate_from_seed_indexed(seed, i as u64))
1004                .collect::<Vec<_>>();
1005            let known_nodes_with_stake = pub_keys
1006                .iter()
1007                .zip(&state_key_pairs)
1008                .map(|(pub_key, state_key_pair)| PeerConfig::<SeqTypes> {
1009                    stake_table_entry: pub_key.stake_table_entry(U256::from(1)),
1010                    state_ver_key: state_key_pair.ver_key(),
1011                })
1012                .collect::<Vec<_>>();
1013
1014            let master_map = MasterMap::new();
1015
1016            let config: HotShotConfig<SeqTypes> = HotShotConfig {
1017                fixed_leader_for_gpuvid: 0,
1018                num_nodes_with_stake: num_nodes.try_into().unwrap(),
1019                known_da_nodes: known_nodes_with_stake.clone(),
1020                known_nodes_with_stake: known_nodes_with_stake.clone(),
1021                next_view_timeout: Duration::from_secs(5).as_millis() as u64,
1022                num_bootstrap: 1usize,
1023                da_staked_committee_size: num_nodes,
1024                view_sync_timeout: Duration::from_secs(1),
1025                data_request_delay: Duration::from_secs(1),
1026                builder_urls: vec1::vec1![Url::parse(&format!(
1027                    "http://127.0.0.1:{}",
1028                    pick_unused_port().unwrap()
1029                ))
1030                .unwrap()],
1031                builder_timeout: Duration::from_secs(1),
1032                start_threshold: (
1033                    known_nodes_with_stake.clone().len() as u64,
1034                    known_nodes_with_stake.clone().len() as u64,
1035                ),
1036                start_proposing_view: 0,
1037                stop_proposing_view: 0,
1038                start_voting_view: 0,
1039                stop_voting_view: 0,
1040                start_proposing_time: 0,
1041                start_voting_time: 0,
1042                stop_proposing_time: 0,
1043                stop_voting_time: 0,
1044                epoch_height: 30,
1045                epoch_start_block: 1,
1046                stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
1047                drb_difficulty: 10,
1048                drb_upgrade_difficulty: 20,
1049            };
1050
1051            let anvil = Anvil::new().args(["--slots-in-an-epoch", "0"]).spawn();
1052
1053            let l1_client = L1Client::anvil(&anvil).expect("failed to create l1 client");
1054            let anvil_provider = AnvilProvider::new(l1_client.provider, Arc::new(anvil));
1055
1056            let l1_signer_key = anvil_provider.anvil().keys()[0].clone();
1057            let signer = LocalSigner::from(l1_signer_key);
1058
1059            Self {
1060                config,
1061                priv_keys,
1062                state_key_pairs,
1063                master_map,
1064                l1_url: anvil_provider.anvil().endpoint().parse().unwrap(),
1065                l1_opt: L1ClientOptions {
1066                    stake_table_update_interval: Duration::from_secs(5),
1067                    l1_events_max_block_range: 1000,
1068                    l1_polling_interval: Duration::from_secs(1),
1069                    subscription_timeout: Duration::from_secs(5),
1070                    ..Default::default()
1071                },
1072                anvil_provider: Some(anvil_provider),
1073                signer,
1074                state_relay_url: None,
1075                builder_port: None,
1076                upgrades: Default::default(),
1077            }
1078        }
1079    }
1080
1081    #[derive(Clone)]
1082    pub struct TestConfig<const NUM_NODES: usize> {
1083        config: HotShotConfig<SeqTypes>,
1084        priv_keys: Vec<BLSPrivKey>,
1085        state_key_pairs: Vec<StateKeyPair>,
1086        master_map: Arc<MasterMap<PubKey>>,
1087        l1_url: Url,
1088        l1_opt: L1ClientOptions,
1089        anvil_provider: Option<AnvilFillProvider>,
1090        signer: LocalSigner<SigningKey>,
1091        state_relay_url: Option<Url>,
1092        builder_port: Option<u16>,
1093        upgrades: BTreeMap<Version, Upgrade>,
1094    }
1095
1096    impl<const NUM_NODES: usize> TestConfig<NUM_NODES> {
1097        pub fn num_nodes(&self) -> usize {
1098            self.priv_keys.len()
1099        }
1100
1101        pub fn hotshot_config(&self) -> &HotShotConfig<SeqTypes> {
1102            &self.config
1103        }
1104
1105        pub fn set_builder_urls(&mut self, builder_urls: vec1::Vec1<Url>) {
1106            self.config.builder_urls = builder_urls;
1107        }
1108
1109        pub fn builder_port(&self) -> Option<u16> {
1110            self.builder_port
1111        }
1112
1113        pub fn signer(&self) -> LocalSigner<SigningKey> {
1114            self.signer.clone()
1115        }
1116
1117        pub fn l1_url(&self) -> Url {
1118            self.l1_url.clone()
1119        }
1120        pub fn anvil(&self) -> Option<&AnvilFillProvider> {
1121            self.anvil_provider.as_ref()
1122        }
1123
1124        pub fn get_upgrade_map(&self) -> UpgradeMap {
1125            self.upgrades.clone().into()
1126        }
1127
1128        pub fn upgrades(&self) -> BTreeMap<Version, Upgrade> {
1129            self.upgrades.clone()
1130        }
1131
1132        pub fn staking_priv_keys(&self) -> Vec<(PrivateKeySigner, BLSKeyPair, StateKeyPair)> {
1133            staking_priv_keys(&self.priv_keys, &self.state_key_pairs, self.num_nodes())
1134        }
1135
1136        pub async fn init_nodes<V: Versions>(
1137            &self,
1138            bind_version: V,
1139        ) -> Vec<SequencerContext<network::Memory, NoStorage, V>> {
1140            join_all((0..self.num_nodes()).map(|i| async move {
1141                self.init_node(
1142                    i,
1143                    ValidatedState::default(),
1144                    no_storage::Options,
1145                    Some(NullStateCatchup::default()),
1146                    None,
1147                    &NoMetrics,
1148                    STAKE_TABLE_CAPACITY_FOR_TEST,
1149                    NullEventConsumer,
1150                    bind_version,
1151                    Default::default(),
1152                )
1153                .await
1154            }))
1155            .await
1156        }
1157
1158        pub fn known_nodes_with_stake(&self) -> &[PeerConfig<SeqTypes>] {
1159            &self.config.known_nodes_with_stake
1160        }
1161
1162        #[allow(clippy::too_many_arguments)]
1163        pub async fn init_node<V: Versions, P: PersistenceOptions>(
1164            &self,
1165            i: usize,
1166            mut state: ValidatedState,
1167            mut persistence_opt: P,
1168            state_peers: Option<impl StateCatchup + 'static>,
1169            storage: Option<RequestResponseStorage>,
1170            metrics: &dyn Metrics,
1171            stake_table_capacity: usize,
1172            event_consumer: impl EventConsumer + 'static,
1173            bind_version: V,
1174            upgrades: BTreeMap<Version, Upgrade>,
1175        ) -> SequencerContext<network::Memory, P::Persistence, V> {
1176            let config = self.config.clone();
1177            let my_peer_config = &config.known_nodes_with_stake[i];
1178            let is_da = config.known_da_nodes.contains(my_peer_config);
1179
1180            // Create our own (private, local) validator config
1181            let validator_config = ValidatorConfig {
1182                public_key: my_peer_config.stake_table_entry.stake_key,
1183                private_key: self.priv_keys[i].clone(),
1184                stake_value: my_peer_config.stake_table_entry.stake_amount,
1185                state_public_key: self.state_key_pairs[i].ver_key(),
1186                state_private_key: self.state_key_pairs[i].sign_key(),
1187                is_da,
1188            };
1189
1190            let topics = if is_da {
1191                vec![Topic::Global, Topic::Da]
1192            } else {
1193                vec![Topic::Global]
1194            };
1195
1196            let network = Arc::new(MemoryNetwork::new(
1197                &my_peer_config.stake_table_entry.stake_key,
1198                &self.master_map,
1199                &topics,
1200                None,
1201            ));
1202
1203            // Make sure the builder account is funded.
1204            let builder_account = Self::builder_key().fee_account();
1205            tracing::info!(%builder_account, "prefunding builder account");
1206            state.prefund_account(builder_account, U256::MAX.into());
1207
1208            let persistence = persistence_opt.create().await.unwrap();
1209
1210            let chain_config = state.chain_config.resolve().unwrap_or_default();
1211
1212            // Create an empty list of catchup providers
1213            let catchup_providers = ParallelStateCatchup::new(&[]);
1214
1215            // If we have the state peers, add them
1216            if let Some(state_peers) = state_peers {
1217                catchup_providers.add_provider(Arc::new(state_peers));
1218            }
1219
1220            // If we have a working local catchup provider, add it
1221            match persistence
1222                .clone()
1223                .into_catchup_provider(BackoffParams::default())
1224            {
1225                Ok(local_catchup) => {
1226                    catchup_providers.add_provider(local_catchup);
1227                },
1228                Err(e) => {
1229                    tracing::warn!(
1230                        "Failed to create local catchup provider: {e:#}. Only using remote \
1231                         catchup."
1232                    );
1233                },
1234            };
1235
1236            let l1_client = self
1237                .l1_opt
1238                .clone()
1239                .connect(vec![self.l1_url.clone()])
1240                .expect("failed to create L1 client");
1241            l1_client.spawn_tasks().await;
1242
1243            let fetcher = Fetcher::new(
1244                Arc::new(catchup_providers.clone()),
1245                Arc::new(Mutex::new(persistence.clone())),
1246                l1_client.clone(),
1247                chain_config,
1248            );
1249            fetcher.spawn_update_loop().await;
1250
1251            let block_reward = fetcher.fetch_fixed_block_reward().await.ok();
1252            let mut membership = EpochCommittees::new_stake(
1253                config.known_nodes_with_stake.clone(),
1254                config.known_da_nodes.clone(),
1255                block_reward,
1256                fetcher,
1257                config.epoch_height,
1258            );
1259            membership.reload_stake(50).await;
1260
1261            let membership = Arc::new(RwLock::new(membership));
1262            let persistence = Arc::new(persistence);
1263
1264            let coordinator = EpochMembershipCoordinator::new(
1265                membership,
1266                config.epoch_height,
1267                &persistence.clone(),
1268            );
1269
1270            let node_state = NodeState::new(
1271                i as u64,
1272                chain_config,
1273                l1_client,
1274                Arc::new(catchup_providers.clone()),
1275                V::Base::VERSION,
1276                coordinator.clone(),
1277                Version { major: 0, minor: 1 },
1278            )
1279            .with_current_version(V::Base::version())
1280            .with_genesis(state)
1281            .with_epoch_height(config.epoch_height)
1282            .with_upgrades(upgrades)
1283            .with_epoch_start_block(config.epoch_start_block);
1284
1285            tracing::info!(
1286                i,
1287                key = %my_peer_config.stake_table_entry.stake_key,
1288                state_key = %my_peer_config.state_ver_key,
1289                "starting node",
1290            );
1291
1292            SequencerContext::init(
1293                NetworkConfig {
1294                    config,
1295                    // For testing, we use a fake network, so the rest of the network config beyond
1296                    // the base consensus config does not matter.
1297                    ..Default::default()
1298                },
1299                validator_config,
1300                coordinator,
1301                node_state,
1302                storage,
1303                catchup_providers,
1304                persistence,
1305                network,
1306                self.state_relay_url.clone(),
1307                metrics,
1308                stake_table_capacity,
1309                event_consumer,
1310                bind_version,
1311                Default::default(),
1312            )
1313            .await
1314            .unwrap()
1315        }
1316
1317        pub fn builder_key() -> EthKeyPair {
1318            FeeAccount::generated_from_seed_indexed([1; 32], 0).1
1319        }
1320    }
1321
1322    // Wait for decide event, make sure it matches submitted transaction. Return the block number
1323    // containing the transaction and the block payload size
1324    pub async fn wait_for_decide_on_handle(
1325        events: &mut (impl Stream<Item = Event> + Unpin),
1326        submitted_txn: &Transaction,
1327    ) -> (u64, usize) {
1328        let commitment = submitted_txn.commit();
1329
1330        // Keep getting events until we see a Decide event
1331        loop {
1332            let event = events.next().await.unwrap();
1333            tracing::info!("Received event from handle: {event:?}");
1334
1335            if let Decide { leaf_chain, .. } = event.event {
1336                if let Some((height, size)) =
1337                    leaf_chain.iter().find_map(|LeafInfo { leaf, .. }| {
1338                        if leaf
1339                            .block_payload()
1340                            .as_ref()?
1341                            .transaction_commitments(leaf.block_header().metadata())
1342                            .contains(&commitment)
1343                        {
1344                            let size = leaf.block_payload().unwrap().encode().len();
1345                            Some((leaf.block_header().block_number(), size))
1346                        } else {
1347                            None
1348                        }
1349                    })
1350                {
1351                    tracing::info!(height, "transaction {commitment} sequenced");
1352                    return (height, size);
1353                }
1354            } else {
1355                // Keep waiting
1356            }
1357        }
1358    }
1359}
1360
1361#[cfg(test)]
1362mod test {
1363
1364    use alloy::node_bindings::Anvil;
1365    use espresso_types::{Header, MockSequencerVersions, NamespaceId, Payload, Transaction};
1366    use futures::StreamExt;
1367    use hotshot::types::EventType::Decide;
1368    use hotshot_example_types::node_types::TestVersions;
1369    use hotshot_types::{
1370        event::LeafInfo,
1371        traits::block_contents::{BlockHeader, BlockPayload},
1372    };
1373    use testing::{wait_for_decide_on_handle, TestConfigBuilder};
1374
1375    use self::testing::run_test_builder;
1376    use super::*;
1377
1378    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1379    async fn test_skeleton_instantiation() {
1380        // Assign `config` so it isn't dropped early.
1381        let anvil = Anvil::new().spawn();
1382        let url = anvil.endpoint_url();
1383        const NUM_NODES: usize = 5;
1384        let mut config = TestConfigBuilder::<NUM_NODES>::default()
1385            .l1_url(url)
1386            .build();
1387
1388        let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1389
1390        config.set_builder_urls(vec1::vec1![builder_url]);
1391
1392        let handles = config.init_nodes(MockSequencerVersions::new()).await;
1393
1394        let handle_0 = &handles[0];
1395
1396        // Hook the builder up to the event stream from the first node
1397        builder_task.start(Box::new(handle_0.event_stream().await));
1398
1399        let mut events = handle_0.event_stream().await;
1400
1401        for handle in handles.iter() {
1402            handle.start_consensus().await;
1403        }
1404
1405        // Submit target transaction to handle
1406        let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3]);
1407        handles[0]
1408            .submit_transaction(txn.clone())
1409            .await
1410            .expect("Failed to submit transaction");
1411        tracing::info!("Submitted transaction to handle: {txn:?}");
1412
1413        wait_for_decide_on_handle(&mut events, &txn).await;
1414    }
1415
1416    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1417    async fn test_header_invariants() {
1418        let success_height = 30;
1419        // Assign `config` so it isn't dropped early.
1420        let anvil = Anvil::new().spawn();
1421        let url = anvil.endpoint_url();
1422        const NUM_NODES: usize = 5;
1423        let mut config = TestConfigBuilder::<NUM_NODES>::default()
1424            .l1_url(url)
1425            .build();
1426
1427        let (builder_task, builder_url) = run_test_builder::<NUM_NODES>(None).await;
1428
1429        config.set_builder_urls(vec1::vec1![builder_url]);
1430        let handles = config.init_nodes(MockSequencerVersions::new()).await;
1431
1432        let handle_0 = &handles[0];
1433
1434        let mut events = handle_0.event_stream().await;
1435
1436        // Hook the builder up to the event stream from the first node
1437        builder_task.start(Box::new(handle_0.event_stream().await));
1438
1439        for handle in handles.iter() {
1440            handle.start_consensus().await;
1441        }
1442
1443        let mut parent = {
1444            // TODO refactor repeated code from other tests
1445            let (genesis_payload, genesis_ns_table) =
1446                Payload::from_transactions([], &ValidatedState::default(), &NodeState::mock())
1447                    .await
1448                    .unwrap();
1449
1450            let genesis_state = NodeState::mock();
1451            Header::genesis::<TestVersions>(&genesis_state, genesis_payload, &genesis_ns_table)
1452        };
1453
1454        loop {
1455            let event = events.next().await.unwrap();
1456            tracing::info!("Received event from handle: {event:?}");
1457            let Decide { leaf_chain, .. } = event.event else {
1458                continue;
1459            };
1460            tracing::info!("Got decide {leaf_chain:?}");
1461
1462            // Check that each successive header satisfies invariants relative to its parent: all
1463            // the fields which should be monotonic are.
1464            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
1465                let header = leaf.block_header().clone();
1466                if header.height() == 0 {
1467                    parent = header;
1468                    continue;
1469                }
1470                assert_eq!(header.height(), parent.height() + 1);
1471                assert!(header.timestamp() >= parent.timestamp());
1472                assert!(header.l1_head() >= parent.l1_head());
1473                assert!(header.l1_finalized() >= parent.l1_finalized());
1474                parent = header;
1475            }
1476
1477            if parent.height() >= success_height {
1478                break;
1479            }
1480        }
1481    }
1482}