sequencer/
run.rs

1use anyhow::Context;
2use clap::Parser;
3use espresso_types::traits::SequencerPersistence;
4#[allow(unused_imports)]
5use espresso_types::{traits::NullEventConsumer, FeeVersion, SequencerVersions, V0_0};
6use futures::future::FutureExt;
7use hotshot_types::traits::{metrics::NoMetrics, node_implementation::Versions};
8use vbs::version::StaticVersionType;
9
10use super::{
11    api::{self, data_source::DataSourceOptions},
12    context::SequencerContext,
13    init_node, network,
14    options::{Modules, Options},
15    persistence, Genesis, L1Params, NetworkParams,
16};
17
18pub async fn main() -> anyhow::Result<()> {
19    let opt = Options::parse();
20    opt.logging.init();
21
22    let modules = opt.modules();
23    tracing::warn!(?modules, "sequencer starting up");
24
25    let genesis = Genesis::from_file(&opt.genesis_file)?;
26    tracing::warn!(?genesis, "genesis");
27
28    let base = genesis.base_version;
29    let upgrade = genesis.upgrade_version;
30
31    match (base, upgrade) {
32        #[cfg(all(feature = "pos", feature = "drb-and-header"))]
33        (
34            espresso_types::EpochVersion::VERSION,
35            espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
36        ) => {
37            run(
38                genesis,
39                modules,
40                opt,
41                SequencerVersions::<
42                    espresso_types::EpochVersion,
43                    espresso_types::DrbAndHeaderUpgradeVersion,
44                >::new(),
45            )
46            .await
47        },
48        #[cfg(all(feature = "fee", feature = "drb-and-header"))]
49        (
50            espresso_types::FeeVersion::VERSION,
51            espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
52        ) => {
53            run(
54                genesis,
55                modules,
56                opt,
57                SequencerVersions::<
58                    espresso_types::FeeVersion,
59                    espresso_types::DrbAndHeaderUpgradeVersion,
60                >::new(),
61            )
62            .await
63        },
64        #[cfg(feature = "drb-and-header")]
65        (espresso_types::DrbAndHeaderUpgradeVersion::VERSION, _) => {
66            run(
67                genesis,
68                modules,
69                opt,
70                SequencerVersions::<
71                    espresso_types::DrbAndHeaderUpgradeVersion,
72                    espresso_types::DrbAndHeaderUpgradeVersion,
73                >::new(),
74            )
75            .await
76        },
77        #[cfg(all(feature = "fee", feature = "pos"))]
78        (FeeVersion::VERSION, espresso_types::EpochVersion::VERSION) => {
79            run(
80                genesis,
81                modules,
82                opt,
83                SequencerVersions::<espresso_types::FeeVersion, espresso_types::EpochVersion>::new(
84                ),
85            )
86            .await
87        },
88        #[cfg(feature = "pos")]
89        (espresso_types::EpochVersion::VERSION, espresso_types::EpochVersion::VERSION) => {
90            run(
91                genesis,
92                modules,
93                opt,
94                // Specifying V0_0 disables upgrades
95                SequencerVersions::<espresso_types::EpochVersion, espresso_types::EpochVersion>::new(),
96            )
97            .await
98        },
99        #[cfg(feature = "fee")]
100        (FeeVersion::VERSION, espresso_types::FeeVersion::VERSION) => {
101            run(
102                genesis,
103                modules,
104                opt,
105                SequencerVersions::<FeeVersion, espresso_types::FeeVersion>::new(),
106            )
107            .await
108        },
109        _ => panic!(
110            "Invalid base ({base}) and upgrade ({upgrade}) versions specified in the toml file."
111        ),
112    }
113}
114
115async fn run<V>(
116    genesis: Genesis,
117    mut modules: Modules,
118    opt: Options,
119    versions: V,
120) -> anyhow::Result<()>
121where
122    V: Versions,
123{
124    if let Some(storage) = modules.storage_fs.take() {
125        run_with_storage(genesis, modules, opt, storage, versions).await
126    } else if let Some(storage) = modules.storage_sql.take() {
127        run_with_storage(genesis, modules, opt, storage, versions).await
128    } else {
129        // Persistence is required. If none is provided, just use the local file system.
130        run_with_storage(
131            genesis,
132            modules,
133            opt,
134            persistence::fs::Options::default(),
135            versions,
136        )
137        .await
138    }
139}
140
141async fn run_with_storage<S, V>(
142    genesis: Genesis,
143    modules: Modules,
144    opt: Options,
145    storage_opt: S,
146    versions: V,
147) -> anyhow::Result<()>
148where
149    S: DataSourceOptions,
150    V: Versions,
151{
152    let ctx = init_with_storage(genesis, modules, opt, storage_opt, versions).await?;
153
154    // Start doing consensus.
155    ctx.start_consensus().await;
156    ctx.join().await;
157
158    Ok(())
159}
160
161pub(crate) async fn init_with_storage<S, V>(
162    genesis: Genesis,
163    modules: Modules,
164    opt: Options,
165    mut storage_opt: S,
166    versions: V,
167) -> anyhow::Result<SequencerContext<network::Production, S::Persistence, V>>
168where
169    S: DataSourceOptions,
170    V: Versions,
171{
172    let (private_staking_key, private_state_key) = opt.private_keys()?;
173    let l1_params = L1Params {
174        urls: opt.l1_provider_url,
175        options: opt.l1_options,
176    };
177
178    let network_params = NetworkParams {
179        cdn_endpoint: opt.cdn_endpoint,
180        libp2p_advertise_address: opt.libp2p_advertise_address,
181        libp2p_bind_address: opt.libp2p_bind_address,
182        libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
183        orchestrator_url: opt.orchestrator_url,
184        state_relay_server_url: opt.state_relay_server_url,
185        public_api_url: opt.public_api_url,
186        private_staking_key,
187        private_state_key,
188        state_peers: opt.state_peers,
189        config_peers: opt.config_peers,
190        catchup_backoff: opt.catchup_backoff,
191        libp2p_history_gossip: opt.libp2p_history_gossip,
192        libp2p_history_length: opt.libp2p_history_length,
193        libp2p_max_ihave_length: opt.libp2p_max_ihave_length,
194        libp2p_max_ihave_messages: opt.libp2p_max_ihave_messages,
195        libp2p_max_gossip_transmit_size: opt.libp2p_max_gossip_transmit_size,
196        libp2p_max_direct_transmit_size: opt.libp2p_max_direct_transmit_size,
197        libp2p_mesh_outbound_min: opt.libp2p_mesh_outbound_min,
198        libp2p_mesh_n: opt.libp2p_mesh_n,
199        libp2p_mesh_n_high: opt.libp2p_mesh_n_high,
200        libp2p_heartbeat_interval: opt.libp2p_heartbeat_interval,
201        libp2p_mesh_n_low: opt.libp2p_mesh_n_low,
202        libp2p_published_message_ids_cache_time: opt.libp2p_published_message_ids_cache_time,
203        libp2p_iwant_followup_time: opt.libp2p_iwant_followup_time,
204        libp2p_max_messages_per_rpc: opt.libp2p_max_messages_per_rpc,
205        libp2p_gossip_retransmission: opt.libp2p_gossip_retransmission,
206        libp2p_flood_publish: opt.libp2p_flood_publish,
207        libp2p_duplicate_cache_time: opt.libp2p_duplicate_cache_time,
208        libp2p_fanout_ttl: opt.libp2p_fanout_ttl,
209        libp2p_heartbeat_initial_delay: opt.libp2p_heartbeat_initial_delay,
210        libp2p_gossip_factor: opt.libp2p_gossip_factor,
211        libp2p_gossip_lazy: opt.libp2p_gossip_lazy,
212    };
213
214    let proposal_fetcher_config = opt.proposal_fetcher_config;
215
216    let persistence = storage_opt.create().await?;
217    persistence
218        .migrate_consensus()
219        .await
220        .context("failed to migrate consensus data")?;
221
222    // Initialize HotShot. If the user requested the HTTP module, we must initialize the handle in
223    // a special way, in order to populate the API with consensus metrics. Otherwise, we initialize
224    // the handle directly, with no metrics.
225    let ctx = match modules.http {
226        Some(http_opt) => {
227            // Add optional API modules as requested.
228            let mut http_opt = api::Options::from(http_opt);
229            if let Some(query) = modules.query {
230                http_opt = storage_opt.enable_query_module(http_opt, query);
231            }
232            if let Some(submit) = modules.submit {
233                http_opt = http_opt.submit(submit);
234            }
235            if let Some(status) = modules.status {
236                http_opt = http_opt.status(status);
237            }
238
239            if let Some(catchup) = modules.catchup {
240                http_opt = http_opt.catchup(catchup);
241            }
242            if let Some(hotshot_events) = modules.hotshot_events {
243                http_opt = http_opt.hotshot_events(hotshot_events);
244            }
245            if let Some(explorer) = modules.explorer {
246                http_opt = http_opt.explorer(explorer);
247            }
248            if let Some(config) = modules.config {
249                http_opt = http_opt.config(config);
250            }
251
252            http_opt
253                .serve(move |metrics, consumer, storage| {
254                    async move {
255                        init_node(
256                            genesis,
257                            network_params,
258                            &*metrics,
259                            persistence,
260                            l1_params,
261                            storage,
262                            versions,
263                            consumer,
264                            opt.is_da,
265                            opt.identity,
266                            proposal_fetcher_config,
267                        )
268                        .await
269                    }
270                    .boxed()
271                })
272                .await?
273        },
274        None => {
275            init_node(
276                genesis,
277                network_params,
278                &NoMetrics,
279                persistence,
280                l1_params,
281                None,
282                versions,
283                NullEventConsumer,
284                opt.is_da,
285                opt.identity,
286                proposal_fetcher_config,
287            )
288            .await?
289        },
290    };
291
292    Ok(ctx)
293}
294
295#[cfg(test)]
296mod test {
297    use std::time::Duration;
298
299    use espresso_types::{MockSequencerVersions, PubKey};
300    use hotshot_types::{light_client::StateKeyPair, traits::signature_key::SignatureKey};
301    use portpicker::pick_unused_port;
302    use surf_disco::{error::ClientError, Client, Url};
303    use tempfile::TempDir;
304    use tokio::spawn;
305    use vbs::version::Version;
306
307    use super::*;
308    use crate::{
309        api::options::Http,
310        genesis::{L1Finalized, StakeTableConfig},
311        persistence::fs,
312        SequencerApiVersion,
313    };
314
315    #[test_log::test(tokio::test(flavor = "multi_thread"))]
316    async fn test_startup_before_orchestrator() {
317        let (pub_key, priv_key) = PubKey::generated_from_seed_indexed([0; 32], 0);
318        let state_key = StateKeyPair::generate_from_seed_indexed([0; 32], 0);
319
320        let port = pick_unused_port().unwrap();
321        let tmp = TempDir::new().unwrap();
322
323        let genesis_file = tmp.path().join("genesis.toml");
324        let genesis = Genesis {
325            chain_config: Default::default(),
326            stake_table: StakeTableConfig { capacity: 10 },
327            accounts: Default::default(),
328            l1_finalized: L1Finalized::Number { number: 0 },
329            header: Default::default(),
330            upgrades: Default::default(),
331            base_version: Version { major: 0, minor: 1 },
332            upgrade_version: Version { major: 0, minor: 2 },
333            epoch_height: None,
334            drb_difficulty: None,
335            drb_upgrade_difficulty: None,
336            epoch_start_block: None,
337            stake_table_capacity: None,
338            genesis_version: Version { major: 0, minor: 1 },
339        };
340        genesis.to_file(&genesis_file).unwrap();
341
342        let modules = Modules {
343            http: Some(Http::with_port(port)),
344            query: Some(Default::default()),
345            storage_fs: Some(fs::Options::new(tmp.path().into())),
346            ..Default::default()
347        };
348        let opt = Options::parse_from([
349            "sequencer",
350            "--private-staking-key",
351            &priv_key.to_tagged_base64().expect("valid key").to_string(),
352            "--private-state-key",
353            &state_key
354                .sign_key_ref()
355                .to_tagged_base64()
356                .expect("valid key")
357                .to_string(),
358            "--genesis-file",
359            &genesis_file.display().to_string(),
360        ]);
361
362        // Start the sequencer in a background task. This process will not complete, because it will
363        // be waiting for the orchestrator, but it should at least start up the API server and
364        // populate some metrics.
365        tracing::info!(port, "starting sequencer");
366        let task = spawn(async move {
367            if let Err(err) = init_with_storage(
368                genesis,
369                modules,
370                opt,
371                fs::Options::new(tmp.path().into()),
372                MockSequencerVersions::new(),
373            )
374            .await
375            {
376                tracing::error!("failed to start sequencer: {err:#}");
377            }
378        });
379
380        // The healthcheck should eventually come up even though the node is waiting for the
381        // orchestrator.
382        tracing::info!("waiting for API to start");
383        let url: Url = format!("http://localhost:{port}").parse().unwrap();
384        let client = Client::<ClientError, SequencerApiVersion>::new(url.clone());
385        assert!(client.connect(Some(Duration::from_secs(60))).await);
386        client.get::<()>("healthcheck").send().await.unwrap();
387
388        // The metrics should include information about the node and software version. surf-disco
389        // doesn't currently support fetching a plaintext file, so we use a raw reqwest client.
390        let res = reqwest::get(url.join("/status/metrics").unwrap())
391            .await
392            .unwrap();
393        assert!(res.status().is_success(), "{}", res.status());
394        let metrics = res.text().await.unwrap();
395        let lines = metrics.lines().collect::<Vec<_>>();
396        assert!(
397            lines.contains(&format!("consensus_node{{key=\"{pub_key}\"}} 1").as_str()),
398            "{lines:#?}"
399        );
400        assert!(
401            lines.contains(
402                &format!(
403                    "consensus_version{{desc=\"{}\",rev=\"{}\",timestamp=\"{}\"}} 1",
404                    env!("VERGEN_GIT_DESCRIBE"),
405                    env!("VERGEN_GIT_SHA"),
406                    env!("VERGEN_GIT_COMMIT_TIMESTAMP"),
407                )
408                .as_str()
409            ),
410            "{lines:#?}"
411        );
412
413        task.abort();
414    }
415}