sequencer/
run.rs

1use anyhow::Context;
2use clap::Parser;
3use espresso_types::traits::SequencerPersistence;
4#[allow(unused_imports)]
5use espresso_types::{traits::NullEventConsumer, FeeVersion, SequencerVersions, V0_0};
6use futures::future::FutureExt;
7use hotshot_types::traits::{metrics::NoMetrics, node_implementation::Versions};
8use vbs::version::StaticVersionType;
9
10use super::{
11    api::{self, data_source::DataSourceOptions},
12    context::SequencerContext,
13    init_node, network,
14    options::{Modules, Options},
15    persistence, Genesis, L1Params, NetworkParams,
16};
17
18pub async fn main() -> anyhow::Result<()> {
19    let opt = Options::parse();
20    opt.logging.init();
21
22    let modules = opt.modules();
23    tracing::warn!(?modules, "sequencer starting up");
24
25    let genesis = Genesis::from_file(&opt.genesis_file)?;
26    tracing::warn!(?genesis, "genesis");
27
28    let base = genesis.base_version;
29    let upgrade = genesis.upgrade_version;
30
31    match (base, upgrade) {
32        #[cfg(all(feature = "fee", feature = "da-upgrade"))]
33        (espresso_types::FeeVersion::VERSION, espresso_types::DaUpgradeVersion::VERSION) => run(
34            genesis,
35            modules,
36            opt,
37            SequencerVersions::<espresso_types::FeeVersion, espresso_types::DaUpgradeVersion>::new(
38            ),
39        )
40        .await,
41        #[cfg(all(feature = "drb-and-header", feature = "da-upgrade"))]
42        (
43            espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
44            espresso_types::DaUpgradeVersion::VERSION,
45        ) => {
46            run(
47                genesis,
48                modules,
49                opt,
50                SequencerVersions::<
51                    espresso_types::DrbAndHeaderUpgradeVersion,
52                    espresso_types::DaUpgradeVersion,
53                >::new(),
54            )
55            .await
56        },
57        #[cfg(feature = "da-upgrade")]
58        (espresso_types::DaUpgradeVersion::VERSION, espresso_types::DaUpgradeVersion::VERSION) => {
59            run(
60                genesis,
61                modules,
62                opt,
63                SequencerVersions::<
64                    espresso_types::DaUpgradeVersion,
65                    espresso_types::DaUpgradeVersion,
66                >::new(),
67            )
68            .await
69        },
70        #[cfg(all(feature = "pos", feature = "drb-and-header"))]
71        (
72            espresso_types::EpochVersion::VERSION,
73            espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
74        ) => {
75            run(
76                genesis,
77                modules,
78                opt,
79                SequencerVersions::<
80                    espresso_types::EpochVersion,
81                    espresso_types::DrbAndHeaderUpgradeVersion,
82                >::new(),
83            )
84            .await
85        },
86        #[cfg(all(feature = "fee", feature = "drb-and-header"))]
87        (
88            espresso_types::FeeVersion::VERSION,
89            espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
90        ) => {
91            run(
92                genesis,
93                modules,
94                opt,
95                SequencerVersions::<
96                    espresso_types::FeeVersion,
97                    espresso_types::DrbAndHeaderUpgradeVersion,
98                >::new(),
99            )
100            .await
101        },
102        #[cfg(feature = "drb-and-header")]
103        (espresso_types::DrbAndHeaderUpgradeVersion::VERSION, _) => {
104            run(
105                genesis,
106                modules,
107                opt,
108                SequencerVersions::<
109                    espresso_types::DrbAndHeaderUpgradeVersion,
110                    espresso_types::DrbAndHeaderUpgradeVersion,
111                >::new(),
112            )
113            .await
114        },
115        #[cfg(all(feature = "fee", feature = "pos"))]
116        (FeeVersion::VERSION, espresso_types::EpochVersion::VERSION) => {
117            run(
118                genesis,
119                modules,
120                opt,
121                SequencerVersions::<espresso_types::FeeVersion, espresso_types::EpochVersion>::new(
122                ),
123            )
124            .await
125        },
126        #[cfg(feature = "pos")]
127        (espresso_types::EpochVersion::VERSION, espresso_types::EpochVersion::VERSION) => {
128            run(
129                genesis,
130                modules,
131                opt,
132                // Specifying V0_0 disables upgrades
133                SequencerVersions::<espresso_types::EpochVersion, espresso_types::EpochVersion>::new(),
134            )
135            .await
136        },
137        #[cfg(feature = "fee")]
138        (FeeVersion::VERSION, espresso_types::FeeVersion::VERSION) => {
139            run(
140                genesis,
141                modules,
142                opt,
143                SequencerVersions::<FeeVersion, espresso_types::FeeVersion>::new(),
144            )
145            .await
146        },
147        _ => panic!(
148            "Invalid base ({base}) and upgrade ({upgrade}) versions specified in the toml file."
149        ),
150    }
151}
152
153async fn run<V>(
154    genesis: Genesis,
155    mut modules: Modules,
156    opt: Options,
157    versions: V,
158) -> anyhow::Result<()>
159where
160    V: Versions,
161{
162    if let Some(storage) = modules.storage_fs.take() {
163        run_with_storage(genesis, modules, opt, storage, versions).await
164    } else if let Some(storage) = modules.storage_sql.take() {
165        run_with_storage(genesis, modules, opt, storage, versions).await
166    } else {
167        // Persistence is required. If none is provided, just use the local file system.
168        run_with_storage(
169            genesis,
170            modules,
171            opt,
172            persistence::fs::Options::default(),
173            versions,
174        )
175        .await
176    }
177}
178
179async fn run_with_storage<S, V>(
180    genesis: Genesis,
181    modules: Modules,
182    opt: Options,
183    storage_opt: S,
184    versions: V,
185) -> anyhow::Result<()>
186where
187    S: DataSourceOptions,
188    V: Versions,
189{
190    let ctx = init_with_storage(genesis, modules, opt, storage_opt, versions).await?;
191
192    // Start doing consensus.
193    ctx.start_consensus().await;
194    ctx.join().await;
195
196    Ok(())
197}
198
199pub async fn init_with_storage<S, V>(
200    genesis: Genesis,
201    modules: Modules,
202    opt: Options,
203    mut storage_opt: S,
204    versions: V,
205) -> anyhow::Result<SequencerContext<network::Production, S::Persistence, V>>
206where
207    S: DataSourceOptions,
208    V: Versions,
209{
210    let (private_staking_key, private_state_key) = opt.private_keys()?;
211    let l1_params = L1Params {
212        urls: opt.l1_provider_url,
213        options: opt.l1_options,
214    };
215
216    let network_params = NetworkParams {
217        cdn_endpoint: opt.cdn_endpoint,
218        libp2p_advertise_address: opt.libp2p_advertise_address,
219        libp2p_bind_address: opt.libp2p_bind_address,
220        libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
221        orchestrator_url: opt.orchestrator_url,
222        builder_urls: opt.builder_urls,
223        state_relay_server_url: opt.state_relay_server_url,
224        public_api_url: opt.public_api_url,
225        private_staking_key,
226        private_state_key,
227        state_peers: opt.state_peers,
228        config_peers: opt.config_peers,
229        catchup_backoff: opt.catchup_backoff,
230        libp2p_history_gossip: opt.libp2p_history_gossip,
231        libp2p_history_length: opt.libp2p_history_length,
232        libp2p_max_ihave_length: opt.libp2p_max_ihave_length,
233        libp2p_max_ihave_messages: opt.libp2p_max_ihave_messages,
234        libp2p_max_gossip_transmit_size: opt.libp2p_max_gossip_transmit_size,
235        libp2p_max_direct_transmit_size: opt.libp2p_max_direct_transmit_size,
236        libp2p_mesh_outbound_min: opt.libp2p_mesh_outbound_min,
237        libp2p_mesh_n: opt.libp2p_mesh_n,
238        libp2p_mesh_n_high: opt.libp2p_mesh_n_high,
239        libp2p_heartbeat_interval: opt.libp2p_heartbeat_interval,
240        libp2p_mesh_n_low: opt.libp2p_mesh_n_low,
241        libp2p_published_message_ids_cache_time: opt.libp2p_published_message_ids_cache_time,
242        libp2p_iwant_followup_time: opt.libp2p_iwant_followup_time,
243        libp2p_max_messages_per_rpc: opt.libp2p_max_messages_per_rpc,
244        libp2p_gossip_retransmission: opt.libp2p_gossip_retransmission,
245        libp2p_flood_publish: opt.libp2p_flood_publish,
246        libp2p_duplicate_cache_time: opt.libp2p_duplicate_cache_time,
247        libp2p_fanout_ttl: opt.libp2p_fanout_ttl,
248        libp2p_heartbeat_initial_delay: opt.libp2p_heartbeat_initial_delay,
249        libp2p_gossip_factor: opt.libp2p_gossip_factor,
250        libp2p_gossip_lazy: opt.libp2p_gossip_lazy,
251    };
252
253    let proposal_fetcher_config = opt.proposal_fetcher_config;
254
255    let persistence = storage_opt.create().await?;
256    persistence
257        .migrate_storage()
258        .await
259        .context("failed to migrate consensus data")?;
260
261    // Initialize HotShot. If the user requested the HTTP module, we must initialize the handle in
262    // a special way, in order to populate the API with consensus metrics. Otherwise, we initialize
263    // the handle directly, with no metrics.
264    let ctx = match modules.http {
265        Some(http_opt) => {
266            // Add optional API modules as requested.
267            let mut http_opt = api::Options::from(http_opt);
268            if let Some(query) = modules.query {
269                http_opt = storage_opt.enable_query_module(http_opt, query);
270            }
271            if let Some(submit) = modules.submit {
272                http_opt = http_opt.submit(submit);
273            }
274            if let Some(status) = modules.status {
275                http_opt = http_opt.status(status);
276            }
277
278            if let Some(catchup) = modules.catchup {
279                http_opt = http_opt.catchup(catchup);
280            }
281            if let Some(hotshot_events) = modules.hotshot_events {
282                http_opt = http_opt.hotshot_events(hotshot_events);
283            }
284            if let Some(explorer) = modules.explorer {
285                http_opt = http_opt.explorer(explorer);
286            }
287            if let Some(light_client) = modules.light_client {
288                http_opt = http_opt.light_client(light_client);
289            }
290            if let Some(config) = modules.config {
291                http_opt = http_opt.config(config);
292            }
293
294            http_opt
295                .serve(move |metrics, consumer, storage| {
296                    async move {
297                        init_node(
298                            genesis,
299                            network_params,
300                            &*metrics,
301                            persistence,
302                            l1_params,
303                            storage,
304                            versions,
305                            consumer,
306                            opt.is_da,
307                            opt.identity,
308                            proposal_fetcher_config,
309                        )
310                        .await
311                    }
312                    .boxed()
313                })
314                .await?
315        },
316        None => {
317            init_node(
318                genesis,
319                network_params,
320                &NoMetrics,
321                persistence,
322                l1_params,
323                None,
324                versions,
325                NullEventConsumer,
326                opt.is_da,
327                opt.identity,
328                proposal_fetcher_config,
329            )
330            .await?
331        },
332    };
333
334    Ok(ctx)
335}
336
337#[cfg(test)]
338mod test {
339    use std::time::Duration;
340
341    use espresso_types::{MockSequencerVersions, PubKey};
342    use hotshot_types::{light_client::StateKeyPair, traits::signature_key::SignatureKey};
343    use portpicker::pick_unused_port;
344    use surf_disco::{error::ClientError, Client, Url};
345    use tempfile::TempDir;
346    use tokio::spawn;
347    use vbs::version::Version;
348
349    use super::*;
350    use crate::{
351        api::options::Http,
352        genesis::{L1Finalized, StakeTableConfig},
353        persistence::fs,
354        SequencerApiVersion,
355    };
356
357    #[test_log::test(tokio::test(flavor = "multi_thread"))]
358    async fn test_startup_before_orchestrator() {
359        let (pub_key, priv_key) = PubKey::generated_from_seed_indexed([0; 32], 0);
360        let state_key = StateKeyPair::generate_from_seed_indexed([0; 32], 0);
361
362        let port = pick_unused_port().unwrap();
363        let tmp = TempDir::new().unwrap();
364
365        let genesis_file = tmp.path().join("genesis.toml");
366        let genesis = Genesis {
367            chain_config: Default::default(),
368            stake_table: StakeTableConfig { capacity: 10 },
369            accounts: Default::default(),
370            l1_finalized: L1Finalized::Number { number: 0 },
371            header: Default::default(),
372            upgrades: Default::default(),
373            base_version: Version { major: 0, minor: 1 },
374            upgrade_version: Version { major: 0, minor: 2 },
375            epoch_height: None,
376            drb_difficulty: None,
377            drb_upgrade_difficulty: None,
378            epoch_start_block: None,
379            stake_table_capacity: None,
380            genesis_version: Version { major: 0, minor: 1 },
381            da_committees: None,
382        };
383        genesis.to_file(&genesis_file).unwrap();
384
385        let modules = Modules {
386            http: Some(Http::with_port(port)),
387            query: Some(Default::default()),
388            storage_fs: Some(fs::Options::new(tmp.path().into())),
389            ..Default::default()
390        };
391        let opt = Options::parse_from([
392            "sequencer",
393            "--private-staking-key",
394            &priv_key.to_tagged_base64().expect("valid key").to_string(),
395            "--private-state-key",
396            &state_key
397                .sign_key_ref()
398                .to_tagged_base64()
399                .expect("valid key")
400                .to_string(),
401            "--genesis-file",
402            &genesis_file.display().to_string(),
403        ]);
404
405        // Start the sequencer in a background task. This process will not complete, because it will
406        // be waiting for the orchestrator, but it should at least start up the API server and
407        // populate some metrics.
408        tracing::info!(port, "starting sequencer");
409        let task = spawn(async move {
410            if let Err(err) = init_with_storage(
411                genesis,
412                modules,
413                opt,
414                fs::Options::new(tmp.path().into()),
415                MockSequencerVersions::new(),
416            )
417            .await
418            {
419                tracing::error!("failed to start sequencer: {err:#}");
420            }
421        });
422
423        // The healthcheck should eventually come up even though the node is waiting for the
424        // orchestrator.
425        tracing::info!("waiting for API to start");
426        let url: Url = format!("http://localhost:{port}").parse().unwrap();
427        let client = Client::<ClientError, SequencerApiVersion>::new(url.clone());
428        assert!(client.connect(Some(Duration::from_secs(60))).await);
429        client.get::<()>("healthcheck").send().await.unwrap();
430
431        // The metrics should include information about the node and software version. surf-disco
432        // doesn't currently support fetching a plaintext file, so we use a raw reqwest client.
433        let res = reqwest::get(url.join("/status/metrics").unwrap())
434            .await
435            .unwrap();
436        assert!(res.status().is_success(), "{}", res.status());
437        let metrics = res.text().await.unwrap();
438        let lines = metrics.lines().collect::<Vec<_>>();
439        assert!(
440            lines.contains(&format!("consensus_node{{key=\"{pub_key}\"}} 1").as_str()),
441            "{lines:#?}"
442        );
443        assert!(
444            lines.contains(
445                &format!(
446                    "consensus_version{{desc=\"{}\",rev=\"{}\",timestamp=\"{}\"}} 1",
447                    env!("VERGEN_GIT_DESCRIBE"),
448                    env!("VERGEN_GIT_SHA"),
449                    env!("VERGEN_GIT_COMMIT_TIMESTAMP"),
450                )
451                .as_str()
452            ),
453            "{lines:#?}"
454        );
455
456        task.abort();
457    }
458}