1use anyhow::Context;
2use clap::Parser;
3use espresso_types::traits::SequencerPersistence;
4#[allow(unused_imports)]
5use espresso_types::{traits::NullEventConsumer, FeeVersion, SequencerVersions, V0_0};
6use futures::future::FutureExt;
7use hotshot_types::traits::{metrics::NoMetrics, node_implementation::Versions};
8use vbs::version::StaticVersionType;
9
10use super::{
11 api::{self, data_source::DataSourceOptions},
12 context::SequencerContext,
13 init_node, network,
14 options::{Modules, Options},
15 persistence, Genesis, L1Params, NetworkParams,
16};
17
18pub async fn main() -> anyhow::Result<()> {
19 let opt = Options::parse();
20 opt.logging.init();
21
22 let modules = opt.modules();
23 tracing::warn!(?modules, "sequencer starting up");
24
25 let genesis = Genesis::from_file(&opt.genesis_file)?;
26 tracing::warn!(?genesis, "genesis");
27
28 let base = genesis.base_version;
29 let upgrade = genesis.upgrade_version;
30
31 match (base, upgrade) {
32 #[cfg(all(feature = "fee", feature = "da-upgrade"))]
33 (espresso_types::FeeVersion::VERSION, espresso_types::DaUpgradeVersion::VERSION) => run(
34 genesis,
35 modules,
36 opt,
37 SequencerVersions::<espresso_types::FeeVersion, espresso_types::DaUpgradeVersion>::new(
38 ),
39 )
40 .await,
41 #[cfg(all(feature = "drb-and-header", feature = "da-upgrade"))]
42 (
43 espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
44 espresso_types::DaUpgradeVersion::VERSION,
45 ) => {
46 run(
47 genesis,
48 modules,
49 opt,
50 SequencerVersions::<
51 espresso_types::DrbAndHeaderUpgradeVersion,
52 espresso_types::DaUpgradeVersion,
53 >::new(),
54 )
55 .await
56 },
57 #[cfg(feature = "da-upgrade")]
58 (espresso_types::DaUpgradeVersion::VERSION, espresso_types::DaUpgradeVersion::VERSION) => {
59 run(
60 genesis,
61 modules,
62 opt,
63 SequencerVersions::<
64 espresso_types::DaUpgradeVersion,
65 espresso_types::DaUpgradeVersion,
66 >::new(),
67 )
68 .await
69 },
70 #[cfg(all(feature = "pos", feature = "drb-and-header"))]
71 (
72 espresso_types::EpochVersion::VERSION,
73 espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
74 ) => {
75 run(
76 genesis,
77 modules,
78 opt,
79 SequencerVersions::<
80 espresso_types::EpochVersion,
81 espresso_types::DrbAndHeaderUpgradeVersion,
82 >::new(),
83 )
84 .await
85 },
86 #[cfg(all(feature = "fee", feature = "drb-and-header"))]
87 (
88 espresso_types::FeeVersion::VERSION,
89 espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
90 ) => {
91 run(
92 genesis,
93 modules,
94 opt,
95 SequencerVersions::<
96 espresso_types::FeeVersion,
97 espresso_types::DrbAndHeaderUpgradeVersion,
98 >::new(),
99 )
100 .await
101 },
102 #[cfg(feature = "drb-and-header")]
103 (espresso_types::DrbAndHeaderUpgradeVersion::VERSION, _) => {
104 run(
105 genesis,
106 modules,
107 opt,
108 SequencerVersions::<
109 espresso_types::DrbAndHeaderUpgradeVersion,
110 espresso_types::DrbAndHeaderUpgradeVersion,
111 >::new(),
112 )
113 .await
114 },
115 #[cfg(all(feature = "fee", feature = "pos"))]
116 (FeeVersion::VERSION, espresso_types::EpochVersion::VERSION) => {
117 run(
118 genesis,
119 modules,
120 opt,
121 SequencerVersions::<espresso_types::FeeVersion, espresso_types::EpochVersion>::new(
122 ),
123 )
124 .await
125 },
126 #[cfg(feature = "pos")]
127 (espresso_types::EpochVersion::VERSION, espresso_types::EpochVersion::VERSION) => {
128 run(
129 genesis,
130 modules,
131 opt,
132 SequencerVersions::<espresso_types::EpochVersion, espresso_types::EpochVersion>::new(),
134 )
135 .await
136 },
137 #[cfg(feature = "fee")]
138 (FeeVersion::VERSION, espresso_types::FeeVersion::VERSION) => {
139 run(
140 genesis,
141 modules,
142 opt,
143 SequencerVersions::<FeeVersion, espresso_types::FeeVersion>::new(),
144 )
145 .await
146 },
147 _ => panic!(
148 "Invalid base ({base}) and upgrade ({upgrade}) versions specified in the toml file."
149 ),
150 }
151}
152
153async fn run<V>(
154 genesis: Genesis,
155 mut modules: Modules,
156 opt: Options,
157 versions: V,
158) -> anyhow::Result<()>
159where
160 V: Versions,
161{
162 if let Some(storage) = modules.storage_fs.take() {
163 run_with_storage(genesis, modules, opt, storage, versions).await
164 } else if let Some(storage) = modules.storage_sql.take() {
165 run_with_storage(genesis, modules, opt, storage, versions).await
166 } else {
167 run_with_storage(
169 genesis,
170 modules,
171 opt,
172 persistence::fs::Options::default(),
173 versions,
174 )
175 .await
176 }
177}
178
179async fn run_with_storage<S, V>(
180 genesis: Genesis,
181 modules: Modules,
182 opt: Options,
183 storage_opt: S,
184 versions: V,
185) -> anyhow::Result<()>
186where
187 S: DataSourceOptions,
188 V: Versions,
189{
190 let ctx = init_with_storage(genesis, modules, opt, storage_opt, versions).await?;
191
192 ctx.start_consensus().await;
194 ctx.join().await;
195
196 Ok(())
197}
198
199pub async fn init_with_storage<S, V>(
200 genesis: Genesis,
201 modules: Modules,
202 opt: Options,
203 mut storage_opt: S,
204 versions: V,
205) -> anyhow::Result<SequencerContext<network::Production, S::Persistence, V>>
206where
207 S: DataSourceOptions,
208 V: Versions,
209{
210 let (private_staking_key, private_state_key) = opt.private_keys()?;
211 let l1_params = L1Params {
212 urls: opt.l1_provider_url,
213 options: opt.l1_options,
214 };
215
216 let network_params = NetworkParams {
217 cdn_endpoint: opt.cdn_endpoint,
218 libp2p_advertise_address: opt.libp2p_advertise_address,
219 libp2p_bind_address: opt.libp2p_bind_address,
220 libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
221 orchestrator_url: opt.orchestrator_url,
222 builder_urls: opt.builder_urls,
223 state_relay_server_url: opt.state_relay_server_url,
224 public_api_url: opt.public_api_url,
225 private_staking_key,
226 private_state_key,
227 state_peers: opt.state_peers,
228 config_peers: opt.config_peers,
229 catchup_backoff: opt.catchup_backoff,
230 libp2p_history_gossip: opt.libp2p_history_gossip,
231 libp2p_history_length: opt.libp2p_history_length,
232 libp2p_max_ihave_length: opt.libp2p_max_ihave_length,
233 libp2p_max_ihave_messages: opt.libp2p_max_ihave_messages,
234 libp2p_max_gossip_transmit_size: opt.libp2p_max_gossip_transmit_size,
235 libp2p_max_direct_transmit_size: opt.libp2p_max_direct_transmit_size,
236 libp2p_mesh_outbound_min: opt.libp2p_mesh_outbound_min,
237 libp2p_mesh_n: opt.libp2p_mesh_n,
238 libp2p_mesh_n_high: opt.libp2p_mesh_n_high,
239 libp2p_heartbeat_interval: opt.libp2p_heartbeat_interval,
240 libp2p_mesh_n_low: opt.libp2p_mesh_n_low,
241 libp2p_published_message_ids_cache_time: opt.libp2p_published_message_ids_cache_time,
242 libp2p_iwant_followup_time: opt.libp2p_iwant_followup_time,
243 libp2p_max_messages_per_rpc: opt.libp2p_max_messages_per_rpc,
244 libp2p_gossip_retransmission: opt.libp2p_gossip_retransmission,
245 libp2p_flood_publish: opt.libp2p_flood_publish,
246 libp2p_duplicate_cache_time: opt.libp2p_duplicate_cache_time,
247 libp2p_fanout_ttl: opt.libp2p_fanout_ttl,
248 libp2p_heartbeat_initial_delay: opt.libp2p_heartbeat_initial_delay,
249 libp2p_gossip_factor: opt.libp2p_gossip_factor,
250 libp2p_gossip_lazy: opt.libp2p_gossip_lazy,
251 };
252
253 let proposal_fetcher_config = opt.proposal_fetcher_config;
254
255 let persistence = storage_opt.create().await?;
256 persistence
257 .migrate_storage()
258 .await
259 .context("failed to migrate consensus data")?;
260
261 let ctx = match modules.http {
265 Some(http_opt) => {
266 let mut http_opt = api::Options::from(http_opt);
268 if let Some(query) = modules.query {
269 http_opt = storage_opt.enable_query_module(http_opt, query);
270 }
271 if let Some(submit) = modules.submit {
272 http_opt = http_opt.submit(submit);
273 }
274 if let Some(status) = modules.status {
275 http_opt = http_opt.status(status);
276 }
277
278 if let Some(catchup) = modules.catchup {
279 http_opt = http_opt.catchup(catchup);
280 }
281 if let Some(hotshot_events) = modules.hotshot_events {
282 http_opt = http_opt.hotshot_events(hotshot_events);
283 }
284 if let Some(explorer) = modules.explorer {
285 http_opt = http_opt.explorer(explorer);
286 }
287 if let Some(light_client) = modules.light_client {
288 http_opt = http_opt.light_client(light_client);
289 }
290 if let Some(config) = modules.config {
291 http_opt = http_opt.config(config);
292 }
293
294 http_opt
295 .serve(move |metrics, consumer, storage| {
296 async move {
297 init_node(
298 genesis,
299 network_params,
300 &*metrics,
301 persistence,
302 l1_params,
303 storage,
304 versions,
305 consumer,
306 opt.is_da,
307 opt.identity,
308 proposal_fetcher_config,
309 )
310 .await
311 }
312 .boxed()
313 })
314 .await?
315 },
316 None => {
317 init_node(
318 genesis,
319 network_params,
320 &NoMetrics,
321 persistence,
322 l1_params,
323 None,
324 versions,
325 NullEventConsumer,
326 opt.is_da,
327 opt.identity,
328 proposal_fetcher_config,
329 )
330 .await?
331 },
332 };
333
334 Ok(ctx)
335}
336
337#[cfg(test)]
338mod test {
339 use std::time::Duration;
340
341 use espresso_types::{MockSequencerVersions, PubKey};
342 use hotshot_types::{light_client::StateKeyPair, traits::signature_key::SignatureKey};
343 use portpicker::pick_unused_port;
344 use surf_disco::{error::ClientError, Client, Url};
345 use tempfile::TempDir;
346 use tokio::spawn;
347 use vbs::version::Version;
348
349 use super::*;
350 use crate::{
351 api::options::Http,
352 genesis::{L1Finalized, StakeTableConfig},
353 persistence::fs,
354 SequencerApiVersion,
355 };
356
357 #[test_log::test(tokio::test(flavor = "multi_thread"))]
358 async fn test_startup_before_orchestrator() {
359 let (pub_key, priv_key) = PubKey::generated_from_seed_indexed([0; 32], 0);
360 let state_key = StateKeyPair::generate_from_seed_indexed([0; 32], 0);
361
362 let port = pick_unused_port().unwrap();
363 let tmp = TempDir::new().unwrap();
364
365 let genesis_file = tmp.path().join("genesis.toml");
366 let genesis = Genesis {
367 chain_config: Default::default(),
368 stake_table: StakeTableConfig { capacity: 10 },
369 accounts: Default::default(),
370 l1_finalized: L1Finalized::Number { number: 0 },
371 header: Default::default(),
372 upgrades: Default::default(),
373 base_version: Version { major: 0, minor: 1 },
374 upgrade_version: Version { major: 0, minor: 2 },
375 epoch_height: None,
376 drb_difficulty: None,
377 drb_upgrade_difficulty: None,
378 epoch_start_block: None,
379 stake_table_capacity: None,
380 genesis_version: Version { major: 0, minor: 1 },
381 da_committees: None,
382 };
383 genesis.to_file(&genesis_file).unwrap();
384
385 let modules = Modules {
386 http: Some(Http::with_port(port)),
387 query: Some(Default::default()),
388 storage_fs: Some(fs::Options::new(tmp.path().into())),
389 ..Default::default()
390 };
391 let opt = Options::parse_from([
392 "sequencer",
393 "--private-staking-key",
394 &priv_key.to_tagged_base64().expect("valid key").to_string(),
395 "--private-state-key",
396 &state_key
397 .sign_key_ref()
398 .to_tagged_base64()
399 .expect("valid key")
400 .to_string(),
401 "--genesis-file",
402 &genesis_file.display().to_string(),
403 ]);
404
405 tracing::info!(port, "starting sequencer");
409 let task = spawn(async move {
410 if let Err(err) = init_with_storage(
411 genesis,
412 modules,
413 opt,
414 fs::Options::new(tmp.path().into()),
415 MockSequencerVersions::new(),
416 )
417 .await
418 {
419 tracing::error!("failed to start sequencer: {err:#}");
420 }
421 });
422
423 tracing::info!("waiting for API to start");
426 let url: Url = format!("http://localhost:{port}").parse().unwrap();
427 let client = Client::<ClientError, SequencerApiVersion>::new(url.clone());
428 assert!(client.connect(Some(Duration::from_secs(60))).await);
429 client.get::<()>("healthcheck").send().await.unwrap();
430
431 let res = reqwest::get(url.join("/status/metrics").unwrap())
434 .await
435 .unwrap();
436 assert!(res.status().is_success(), "{}", res.status());
437 let metrics = res.text().await.unwrap();
438 let lines = metrics.lines().collect::<Vec<_>>();
439 assert!(
440 lines.contains(&format!("consensus_node{{key=\"{pub_key}\"}} 1").as_str()),
441 "{lines:#?}"
442 );
443 assert!(
444 lines.contains(
445 &format!(
446 "consensus_version{{desc=\"{}\",rev=\"{}\",timestamp=\"{}\"}} 1",
447 env!("VERGEN_GIT_DESCRIBE"),
448 env!("VERGEN_GIT_SHA"),
449 env!("VERGEN_GIT_COMMIT_TIMESTAMP"),
450 )
451 .as_str()
452 ),
453 "{lines:#?}"
454 );
455
456 task.abort();
457 }
458}