1use anyhow::Context;
2use clap::Parser;
3use espresso_types::traits::SequencerPersistence;
4#[allow(unused_imports)]
5use espresso_types::{traits::NullEventConsumer, FeeVersion, SequencerVersions, V0_0};
6use futures::future::FutureExt;
7use hotshot_types::traits::{metrics::NoMetrics, node_implementation::Versions};
8use vbs::version::StaticVersionType;
9
10use super::{
11 api::{self, data_source::DataSourceOptions},
12 context::SequencerContext,
13 init_node, network,
14 options::{Modules, Options},
15 persistence, Genesis, L1Params, NetworkParams,
16};
17
18pub async fn main() -> anyhow::Result<()> {
19 let opt = Options::parse();
20 opt.logging.init();
21
22 let modules = opt.modules();
23 tracing::warn!(?modules, "sequencer starting up");
24
25 let genesis = Genesis::from_file(&opt.genesis_file)?;
26 tracing::warn!(?genesis, "genesis");
27
28 let base = genesis.base_version;
29 let upgrade = genesis.upgrade_version;
30
31 match (base, upgrade) {
32 #[cfg(all(feature = "pos", feature = "drb-and-header"))]
33 (
34 espresso_types::EpochVersion::VERSION,
35 espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
36 ) => {
37 run(
38 genesis,
39 modules,
40 opt,
41 SequencerVersions::<
42 espresso_types::EpochVersion,
43 espresso_types::DrbAndHeaderUpgradeVersion,
44 >::new(),
45 )
46 .await
47 },
48 #[cfg(all(feature = "fee", feature = "drb-and-header"))]
49 (
50 espresso_types::FeeVersion::VERSION,
51 espresso_types::DrbAndHeaderUpgradeVersion::VERSION,
52 ) => {
53 run(
54 genesis,
55 modules,
56 opt,
57 SequencerVersions::<
58 espresso_types::FeeVersion,
59 espresso_types::DrbAndHeaderUpgradeVersion,
60 >::new(),
61 )
62 .await
63 },
64 #[cfg(feature = "drb-and-header")]
65 (espresso_types::DrbAndHeaderUpgradeVersion::VERSION, _) => {
66 run(
67 genesis,
68 modules,
69 opt,
70 SequencerVersions::<
71 espresso_types::DrbAndHeaderUpgradeVersion,
72 espresso_types::DrbAndHeaderUpgradeVersion,
73 >::new(),
74 )
75 .await
76 },
77 #[cfg(all(feature = "fee", feature = "pos"))]
78 (FeeVersion::VERSION, espresso_types::EpochVersion::VERSION) => {
79 run(
80 genesis,
81 modules,
82 opt,
83 SequencerVersions::<espresso_types::FeeVersion, espresso_types::EpochVersion>::new(
84 ),
85 )
86 .await
87 },
88 #[cfg(feature = "pos")]
89 (espresso_types::EpochVersion::VERSION, espresso_types::EpochVersion::VERSION) => {
90 run(
91 genesis,
92 modules,
93 opt,
94 SequencerVersions::<espresso_types::EpochVersion, espresso_types::EpochVersion>::new(),
96 )
97 .await
98 },
99 #[cfg(feature = "fee")]
100 (FeeVersion::VERSION, espresso_types::FeeVersion::VERSION) => {
101 run(
102 genesis,
103 modules,
104 opt,
105 SequencerVersions::<FeeVersion, espresso_types::FeeVersion>::new(),
106 )
107 .await
108 },
109 _ => panic!(
110 "Invalid base ({base}) and upgrade ({upgrade}) versions specified in the toml file."
111 ),
112 }
113}
114
115async fn run<V>(
116 genesis: Genesis,
117 mut modules: Modules,
118 opt: Options,
119 versions: V,
120) -> anyhow::Result<()>
121where
122 V: Versions,
123{
124 if let Some(storage) = modules.storage_fs.take() {
125 run_with_storage(genesis, modules, opt, storage, versions).await
126 } else if let Some(storage) = modules.storage_sql.take() {
127 run_with_storage(genesis, modules, opt, storage, versions).await
128 } else {
129 run_with_storage(
131 genesis,
132 modules,
133 opt,
134 persistence::fs::Options::default(),
135 versions,
136 )
137 .await
138 }
139}
140
141async fn run_with_storage<S, V>(
142 genesis: Genesis,
143 modules: Modules,
144 opt: Options,
145 storage_opt: S,
146 versions: V,
147) -> anyhow::Result<()>
148where
149 S: DataSourceOptions,
150 V: Versions,
151{
152 let ctx = init_with_storage(genesis, modules, opt, storage_opt, versions).await?;
153
154 ctx.start_consensus().await;
156 ctx.join().await;
157
158 Ok(())
159}
160
161pub(crate) async fn init_with_storage<S, V>(
162 genesis: Genesis,
163 modules: Modules,
164 opt: Options,
165 mut storage_opt: S,
166 versions: V,
167) -> anyhow::Result<SequencerContext<network::Production, S::Persistence, V>>
168where
169 S: DataSourceOptions,
170 V: Versions,
171{
172 let (private_staking_key, private_state_key) = opt.private_keys()?;
173 let l1_params = L1Params {
174 urls: opt.l1_provider_url,
175 options: opt.l1_options,
176 };
177
178 let network_params = NetworkParams {
179 cdn_endpoint: opt.cdn_endpoint,
180 libp2p_advertise_address: opt.libp2p_advertise_address,
181 libp2p_bind_address: opt.libp2p_bind_address,
182 libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
183 orchestrator_url: opt.orchestrator_url,
184 state_relay_server_url: opt.state_relay_server_url,
185 public_api_url: opt.public_api_url,
186 private_staking_key,
187 private_state_key,
188 state_peers: opt.state_peers,
189 config_peers: opt.config_peers,
190 catchup_backoff: opt.catchup_backoff,
191 libp2p_history_gossip: opt.libp2p_history_gossip,
192 libp2p_history_length: opt.libp2p_history_length,
193 libp2p_max_ihave_length: opt.libp2p_max_ihave_length,
194 libp2p_max_ihave_messages: opt.libp2p_max_ihave_messages,
195 libp2p_max_gossip_transmit_size: opt.libp2p_max_gossip_transmit_size,
196 libp2p_max_direct_transmit_size: opt.libp2p_max_direct_transmit_size,
197 libp2p_mesh_outbound_min: opt.libp2p_mesh_outbound_min,
198 libp2p_mesh_n: opt.libp2p_mesh_n,
199 libp2p_mesh_n_high: opt.libp2p_mesh_n_high,
200 libp2p_heartbeat_interval: opt.libp2p_heartbeat_interval,
201 libp2p_mesh_n_low: opt.libp2p_mesh_n_low,
202 libp2p_published_message_ids_cache_time: opt.libp2p_published_message_ids_cache_time,
203 libp2p_iwant_followup_time: opt.libp2p_iwant_followup_time,
204 libp2p_max_messages_per_rpc: opt.libp2p_max_messages_per_rpc,
205 libp2p_gossip_retransmission: opt.libp2p_gossip_retransmission,
206 libp2p_flood_publish: opt.libp2p_flood_publish,
207 libp2p_duplicate_cache_time: opt.libp2p_duplicate_cache_time,
208 libp2p_fanout_ttl: opt.libp2p_fanout_ttl,
209 libp2p_heartbeat_initial_delay: opt.libp2p_heartbeat_initial_delay,
210 libp2p_gossip_factor: opt.libp2p_gossip_factor,
211 libp2p_gossip_lazy: opt.libp2p_gossip_lazy,
212 };
213
214 let proposal_fetcher_config = opt.proposal_fetcher_config;
215
216 let persistence = storage_opt.create().await?;
217 persistence
218 .migrate_consensus()
219 .await
220 .context("failed to migrate consensus data")?;
221
222 let ctx = match modules.http {
226 Some(http_opt) => {
227 let mut http_opt = api::Options::from(http_opt);
229 if let Some(query) = modules.query {
230 http_opt = storage_opt.enable_query_module(http_opt, query);
231 }
232 if let Some(submit) = modules.submit {
233 http_opt = http_opt.submit(submit);
234 }
235 if let Some(status) = modules.status {
236 http_opt = http_opt.status(status);
237 }
238
239 if let Some(catchup) = modules.catchup {
240 http_opt = http_opt.catchup(catchup);
241 }
242 if let Some(hotshot_events) = modules.hotshot_events {
243 http_opt = http_opt.hotshot_events(hotshot_events);
244 }
245 if let Some(explorer) = modules.explorer {
246 http_opt = http_opt.explorer(explorer);
247 }
248 if let Some(config) = modules.config {
249 http_opt = http_opt.config(config);
250 }
251
252 http_opt
253 .serve(move |metrics, consumer, storage| {
254 async move {
255 init_node(
256 genesis,
257 network_params,
258 &*metrics,
259 persistence,
260 l1_params,
261 storage,
262 versions,
263 consumer,
264 opt.is_da,
265 opt.identity,
266 proposal_fetcher_config,
267 )
268 .await
269 }
270 .boxed()
271 })
272 .await?
273 },
274 None => {
275 init_node(
276 genesis,
277 network_params,
278 &NoMetrics,
279 persistence,
280 l1_params,
281 None,
282 versions,
283 NullEventConsumer,
284 opt.is_da,
285 opt.identity,
286 proposal_fetcher_config,
287 )
288 .await?
289 },
290 };
291
292 Ok(ctx)
293}
294
295#[cfg(test)]
296mod test {
297 use std::time::Duration;
298
299 use espresso_types::{MockSequencerVersions, PubKey};
300 use hotshot_types::{light_client::StateKeyPair, traits::signature_key::SignatureKey};
301 use portpicker::pick_unused_port;
302 use surf_disco::{error::ClientError, Client, Url};
303 use tempfile::TempDir;
304 use tokio::spawn;
305 use vbs::version::Version;
306
307 use super::*;
308 use crate::{
309 api::options::Http,
310 genesis::{L1Finalized, StakeTableConfig},
311 persistence::fs,
312 SequencerApiVersion,
313 };
314
315 #[test_log::test(tokio::test(flavor = "multi_thread"))]
316 async fn test_startup_before_orchestrator() {
317 let (pub_key, priv_key) = PubKey::generated_from_seed_indexed([0; 32], 0);
318 let state_key = StateKeyPair::generate_from_seed_indexed([0; 32], 0);
319
320 let port = pick_unused_port().unwrap();
321 let tmp = TempDir::new().unwrap();
322
323 let genesis_file = tmp.path().join("genesis.toml");
324 let genesis = Genesis {
325 chain_config: Default::default(),
326 stake_table: StakeTableConfig { capacity: 10 },
327 accounts: Default::default(),
328 l1_finalized: L1Finalized::Number { number: 0 },
329 header: Default::default(),
330 upgrades: Default::default(),
331 base_version: Version { major: 0, minor: 1 },
332 upgrade_version: Version { major: 0, minor: 2 },
333 epoch_height: None,
334 drb_difficulty: None,
335 drb_upgrade_difficulty: None,
336 epoch_start_block: None,
337 stake_table_capacity: None,
338 genesis_version: Version { major: 0, minor: 1 },
339 };
340 genesis.to_file(&genesis_file).unwrap();
341
342 let modules = Modules {
343 http: Some(Http::with_port(port)),
344 query: Some(Default::default()),
345 storage_fs: Some(fs::Options::new(tmp.path().into())),
346 ..Default::default()
347 };
348 let opt = Options::parse_from([
349 "sequencer",
350 "--private-staking-key",
351 &priv_key.to_tagged_base64().expect("valid key").to_string(),
352 "--private-state-key",
353 &state_key
354 .sign_key_ref()
355 .to_tagged_base64()
356 .expect("valid key")
357 .to_string(),
358 "--genesis-file",
359 &genesis_file.display().to_string(),
360 ]);
361
362 tracing::info!(port, "starting sequencer");
366 let task = spawn(async move {
367 if let Err(err) = init_with_storage(
368 genesis,
369 modules,
370 opt,
371 fs::Options::new(tmp.path().into()),
372 MockSequencerVersions::new(),
373 )
374 .await
375 {
376 tracing::error!("failed to start sequencer: {err:#}");
377 }
378 });
379
380 tracing::info!("waiting for API to start");
383 let url: Url = format!("http://localhost:{port}").parse().unwrap();
384 let client = Client::<ClientError, SequencerApiVersion>::new(url.clone());
385 assert!(client.connect(Some(Duration::from_secs(60))).await);
386 client.get::<()>("healthcheck").send().await.unwrap();
387
388 let res = reqwest::get(url.join("/status/metrics").unwrap())
391 .await
392 .unwrap();
393 assert!(res.status().is_success(), "{}", res.status());
394 let metrics = res.text().await.unwrap();
395 let lines = metrics.lines().collect::<Vec<_>>();
396 assert!(
397 lines.contains(&format!("consensus_node{{key=\"{pub_key}\"}} 1").as_str()),
398 "{lines:#?}"
399 );
400 assert!(
401 lines.contains(
402 &format!(
403 "consensus_version{{desc=\"{}\",rev=\"{}\",timestamp=\"{}\"}} 1",
404 env!("VERGEN_GIT_DESCRIBE"),
405 env!("VERGEN_GIT_SHA"),
406 env!("VERGEN_GIT_COMMIT_TIMESTAMP"),
407 )
408 .as_str()
409 ),
410 "{lines:#?}"
411 );
412
413 task.abort();
414 }
415}