1use std::sync::Arc;
4
5use anyhow::{bail, Context};
6use clap::Parser;
7use espresso_types::{
8 v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
9 v0_3::RewardMerkleTreeV1,
10 v0_4::RewardMerkleTreeV2,
11 BlockMerkleTree, PubKey, SeqTypes,
12};
13use futures::{
14 channel::oneshot,
15 future::{BoxFuture, Future},
16};
17use hotshot_query_service::{
18 data_source::{ExtensibleDataSource, MetricsDataSource},
19 fetching::provider::QueryServiceProvider,
20 status::{self, UpdateStatusData},
21 ApiState as AppState, Error,
22};
23use hotshot_types::traits::{
24 metrics::{Metrics, NoMetrics},
25 network::ConnectedNetwork,
26 node_implementation::Versions,
27};
28use jf_merkle_tree_compat::MerkleTreeScheme;
29use tide_disco::{listener::RateLimitListener, method::ReadState, Api, App, Url};
30use vbs::version::StaticVersionType;
31
32use super::{
33 data_source::{
34 provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
35 SequencerDataSource, StateSignatureDataSource, SubmitDataSource,
36 },
37 endpoints, fs, light_client, sql,
38 update::ApiEventConsumer,
39 ApiState, StorageState,
40};
41use crate::{
42 api::endpoints::RewardMerkleTreeVersion,
43 catchup::CatchupStorage,
44 context::{SequencerContext, TaskList},
45 persistence,
46 request_response::data_source::Storage as RequestResponseStorage,
47 state::update_state_storage_loop,
48 SequencerApiVersion,
49};
50
51#[derive(Clone, Debug)]
52pub struct Options {
53 pub http: Http,
54 pub query: Option<Query>,
55 pub submit: Option<Submit>,
56 pub status: Option<Status>,
57 pub catchup: Option<Catchup>,
58 pub config: Option<Config>,
59 pub hotshot_events: Option<HotshotEvents>,
60 pub explorer: Option<Explorer>,
61 pub light_client: Option<LightClient>,
62 pub storage_fs: Option<persistence::fs::Options>,
63 pub storage_sql: Option<persistence::sql::Options>,
64}
65
66impl From<Http> for Options {
67 fn from(http: Http) -> Self {
68 Self {
69 http,
70 query: None,
71 submit: None,
72 status: None,
73 catchup: None,
74 config: None,
75 hotshot_events: None,
76 explorer: None,
77 light_client: None,
78 storage_fs: None,
79 storage_sql: None,
80 }
81 }
82}
83
84impl Options {
85 pub fn with_port(port: u16) -> Self {
87 Http::with_port(port).into()
88 }
89
90 pub fn query_sql(mut self, query: Query, storage: persistence::sql::Options) -> Self {
92 self.query = Some(query);
93 self.storage_sql = Some(storage);
94 self
95 }
96
97 pub fn query_fs(mut self, query: Query, storage: persistence::fs::Options) -> Self {
99 self.query = Some(query);
100 self.storage_fs = Some(storage);
101 self
102 }
103
104 pub fn submit(mut self, opt: Submit) -> Self {
106 self.submit = Some(opt);
107 self
108 }
109
110 pub fn status(mut self, opt: Status) -> Self {
112 self.status = Some(opt);
113 self
114 }
115
116 pub fn catchup(mut self, opt: Catchup) -> Self {
118 self.catchup = Some(opt);
119 self
120 }
121
122 pub fn config(mut self, opt: Config) -> Self {
124 self.config = Some(opt);
125 self
126 }
127
128 pub fn hotshot_events(mut self, opt: HotshotEvents) -> Self {
130 self.hotshot_events = Some(opt);
131 self
132 }
133
134 pub fn explorer(mut self, opt: Explorer) -> Self {
136 self.explorer = Some(opt);
137 self
138 }
139
140 pub fn light_client(mut self, opt: LightClient) -> Self {
142 self.light_client = Some(opt);
143 self
144 }
145
146 pub fn has_query_module(&self) -> bool {
148 self.query.is_some() && (self.storage_fs.is_some() || self.storage_sql.is_some())
149 }
150
151 pub async fn serve<N, P, F, V: Versions + 'static>(
157 mut self,
158 init_context: F,
159 ) -> anyhow::Result<SequencerContext<N, P, V>>
160 where
161 N: ConnectedNetwork<PubKey>,
162 P: SequencerPersistence,
163 F: FnOnce(
164 Box<dyn Metrics>,
165 Box<dyn EventConsumer>,
166 Option<RequestResponseStorage>,
167 ) -> BoxFuture<'static, anyhow::Result<SequencerContext<N, P, V>>>,
168 {
169 let (send_ctx, recv_ctx) = oneshot::channel();
173 let state = ApiState::new(async move {
174 recv_ctx
175 .await
176 .expect("context initialized and sent over channel")
177 });
178 let mut tasks = TaskList::default();
179
180 #[allow(clippy::type_complexity)]
183 let (metrics, consumer, storage): (
184 Box<dyn Metrics>,
185 Box<dyn EventConsumer>,
186 Option<RequestResponseStorage>,
187 ) = if let Some(query_opt) = self.query.take() {
188 if let Some(opt) = self.storage_sql.take() {
189 self.init_with_query_module_sql(
190 query_opt,
191 opt,
192 state,
193 &mut tasks,
194 SequencerApiVersion::instance(),
195 )
196 .await?
197 } else if let Some(opt) = self.storage_fs.take() {
198 self.init_with_query_module_fs(
199 query_opt,
200 opt,
201 state,
202 &mut tasks,
203 SequencerApiVersion::instance(),
204 )
205 .await?
206 } else {
207 bail!("query module requested but not storage provided");
208 }
209 } else if self.status.is_some() {
210 let ds = MetricsDataSource::default();
214 let metrics = ds.populate_metrics();
215 let mut app = App::<_, Error>::with_state(AppState::from(ExtensibleDataSource::new(
216 ds,
217 state.clone(),
218 )));
219
220 register_api("status", &mut app, move |ver| {
222 status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
223 .context("failed to define status api")
224 })?;
225
226 self.init_hotshot_modules(&mut app)?;
227
228 if self.hotshot_events.is_some() {
230 self.init_hotshot_events_module(&mut app)?;
231 }
232
233 tasks.spawn(
234 "API server",
235 self.listen(self.http.port, app, SequencerApiVersion::instance()),
236 );
237
238 (metrics, Box::new(NullEventConsumer), None)
239 } else {
240 let mut app = App::<_, Error>::with_state(AppState::from(state.clone()));
247
248 self.init_hotshot_modules(&mut app)?;
249
250 if self.hotshot_events.is_some() {
252 self.init_hotshot_events_module(&mut app)?;
253 }
254
255 tasks.spawn(
256 "API server",
257 self.listen(self.http.port, app, SequencerApiVersion::instance()),
258 );
259
260 (Box::new(NoMetrics), Box::new(NullEventConsumer), None)
261 };
262
263 let ctx = init_context(metrics, consumer, storage.clone()).await?;
264 send_ctx
265 .send(ctx.clone())
266 .ok()
267 .context("API server exited without receiving context")?;
268 Ok(ctx.with_task_list(tasks))
269 }
270
271 async fn init_app_modules<N, P, D, V: Versions>(
272 &self,
273 ds: D,
274 state: ApiState<N, P, V>,
275 bind_version: SequencerApiVersion,
276 ) -> anyhow::Result<(
277 Box<dyn Metrics>,
278 Arc<StorageState<N, P, D, V>>,
279 App<AppState<StorageState<N, P, D, V>>, Error>,
280 )>
281 where
282 N: ConnectedNetwork<PubKey>,
283 P: SequencerPersistence,
284 D: SequencerDataSource + CatchupStorage + Send + Sync + 'static,
285 {
286 let metrics = ds.populate_metrics();
287 let ds = Arc::new(ExtensibleDataSource::new(ds, state.clone()));
288 let api_state: endpoints::AvailState<N, P, D, V> = ds.clone().into();
289 let mut app = App::<_, Error>::with_state(api_state);
290
291 register_api("status", &mut app, move |ver| {
293 status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
294 .context("failed to define status api")
295 })?;
296
297 register_api("availability", &mut app, move |ver| {
304 endpoints::availability(ver).context("failed to define availability api")
305 })?;
306
307 register_api("node", &mut app, move |ver| {
308 endpoints::node(ver).context("failed to define node api")
309 })?;
310
311 register_api("token", &mut app, move |ver| {
312 endpoints::token(ver).context("failed to define token api")
313 })?;
314
315 if self.submit.is_some() {
317 register_api("submit", &mut app, move |ver| {
318 endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
319 .context("failed to define submit api")
320 })?;
321 }
322
323 tracing::info!("initializing catchup API");
324
325 register_api("catchup", &mut app, move |ver| {
326 endpoints::catchup(bind_version, ver).context("failed to define catchup api")
327 })?;
328
329 register_api("state-signature", &mut app, move |ver| {
330 endpoints::state_signature(bind_version, ver)
331 .context("failed to define state signature api")
332 })?;
333
334 if self.config.is_some() {
335 register_api("config", &mut app, move |ver| {
336 endpoints::config(bind_version, ver).context("failed to define config api")
337 })?;
338 }
339 Ok((metrics, ds, app))
340 }
341
342 async fn init_with_query_module_fs<N, P, V: Versions + 'static>(
343 &self,
344 query_opt: Query,
345 mod_opt: persistence::fs::Options,
346 state: ApiState<N, P, V>,
347 tasks: &mut TaskList,
348 bind_version: SequencerApiVersion,
349 ) -> anyhow::Result<(
350 Box<dyn Metrics>,
351 Box<dyn EventConsumer>,
352 Option<RequestResponseStorage>,
353 )>
354 where
355 N: ConnectedNetwork<PubKey>,
356 P: SequencerPersistence,
357 {
358 let ds = <fs::DataSource as SequencerDataSource>::create(
359 mod_opt,
360 provider::<V>(query_opt.peers, bind_version),
361 false,
362 )
363 .await?;
364
365 let inner_storage = ds.inner();
367
368 let (metrics, ds, mut app) = self
369 .init_app_modules(ds, state.clone(), bind_version)
370 .await?;
371
372 if self.hotshot_events.is_some() {
374 self.init_hotshot_events_module(&mut app)?;
375 }
376
377 tasks.spawn("API server", self.listen(self.http.port, app, bind_version));
378 Ok((
379 metrics,
380 Box::new(ApiEventConsumer::from(ds)),
381 Some(RequestResponseStorage::Fs(inner_storage)),
382 ))
383 }
384
385 async fn init_with_query_module_sql<N, P, V: Versions + 'static>(
386 self,
387 query_opt: Query,
388 mod_opt: persistence::sql::Options,
389 state: ApiState<N, P, V>,
390 tasks: &mut TaskList,
391 bind_version: SequencerApiVersion,
392 ) -> anyhow::Result<(
393 Box<dyn Metrics>,
394 Box<dyn EventConsumer>,
395 Option<RequestResponseStorage>,
396 )>
397 where
398 N: ConnectedNetwork<PubKey>,
399 P: SequencerPersistence,
400 {
401 let mut provider = Provider::default();
402
403 provider = provider.with_provider(mod_opt.clone().create().await?);
406 for peer in query_opt.peers {
408 tracing::info!("will fetch missing data from {peer}");
409 provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version));
410 }
411
412 let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
413 let inner_storage = ds.inner();
414 let (metrics, ds, mut app) = self
415 .init_app_modules(ds, state.clone(), bind_version)
416 .await?;
417
418 if self.explorer.is_some() {
419 register_api("explorer", &mut app, move |ver| {
420 endpoints::explorer(ver).context("failed to define explorer api")
421 })?;
422 }
423
424 register_api("block-state", &mut app, move |ver| {
427 endpoints::merklized_state::<N, P, _, BlockMerkleTree, _, 3>(ver)
428 .context("failed to define block-state api")
429 })?;
430
431 register_api("fee-state", &mut app, move |ver| {
434 endpoints::fee::<_, SequencerApiVersion>(ver).context("failed to define fee-state api")
435 })?;
436
437 register_api("reward-state", &mut app, move |ver| {
438 endpoints::reward::<
439 _,
440 SequencerApiVersion,
441 RewardMerkleTreeV1,
442 { RewardMerkleTreeV1::ARITY },
443 >(ver, RewardMerkleTreeVersion::V1)
444 .context("failed to define reward-state api")
445 })?;
446
447 register_api("reward-state-v2", &mut app, move |ver| {
449 endpoints::reward::<
450 _,
451 SequencerApiVersion,
452 RewardMerkleTreeV2,
453 { RewardMerkleTreeV2::ARITY },
454 >(ver, RewardMerkleTreeVersion::V2)
455 .context("failed to define reward-state api")
456 })?;
457
458 let get_node_state = {
459 let state = state.clone();
460 async move { state.node_state().await.clone() }
461 };
462 tasks.spawn(
463 "merklized state storage update loop",
464 update_state_storage_loop(ds.clone(), get_node_state),
465 );
466
467 if self.hotshot_events.is_some() {
469 self.init_hotshot_events_module(&mut app)?;
470 }
471
472 if self.light_client.is_some() {
474 register_api("light-client", &mut app, move |ver| {
475 light_client::define_api::<_, SequencerApiVersion>(Default::default(), ver)
476 .context("failed to define light client api")
477 })?;
478 }
479
480 tasks.spawn(
481 "API server",
482 self.listen(self.http.port, app, SequencerApiVersion::instance()),
483 );
484 Ok((
485 metrics,
486 Box::new(ApiEventConsumer::from(ds)),
487 Some(RequestResponseStorage::Sql(inner_storage)),
488 ))
489 }
490
491 fn init_hotshot_modules<N, P, S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
497 where
498 S: 'static + Send + Sync + ReadState,
499 P: SequencerPersistence,
500 S::State: Send
501 + Sync
502 + SubmitDataSource<N, P>
503 + StateSignatureDataSource<N>
504 + NodeStateDataSource
505 + CatchupDataSource
506 + HotShotConfigDataSource,
507 N: ConnectedNetwork<PubKey>,
508 {
509 let bind_version = SequencerApiVersion::instance();
510 if self.submit.is_some() {
512 register_api("submit", app, move |ver| {
513 endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
514 .context("failed to define submit api")
515 })?;
516 }
517
518 if self.catchup.is_some() {
520 tracing::info!("initializing state API");
521
522 register_api("catchup", app, move |ver| {
523 endpoints::catchup(bind_version, ver).context("failed to define catchup api")
524 })?;
525 }
526
527 register_api("state-signature", app, move |ver| {
528 endpoints::state_signature(bind_version, ver)
529 .context("failed to define state signature api")
530 })?;
531
532 if self.config.is_some() {
533 register_api("config", app, move |ver| {
534 endpoints::config(bind_version, ver).context("failed to define config api")
535 })?;
536 }
537
538 Ok(())
539 }
540
541 fn init_hotshot_events_module<S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
546 where
547 S: 'static + Send + Sync + ReadState,
548 S::State: Send + Sync + hotshot_events_service::events_source::EventsSource<SeqTypes>,
549 {
550 tracing::info!("Initializing HotShot events API at /hotshot-events");
551 register_api("hotshot-events", app, move |ver| {
552 hotshot_events_service::events::define_api::<_, _, SequencerApiVersion>(
553 &hotshot_events_service::events::Options::default(),
554 ver,
555 )
556 .with_context(|| "failed to define the HotShot events API")
557 })?;
558
559 Ok(())
560 }
561
562 fn listen<S, E, ApiVer>(
563 &self,
564 port: u16,
565 app: App<S, E>,
566 bind_version: ApiVer,
567 ) -> impl Future<Output = anyhow::Result<()>>
568 where
569 S: Send + Sync + 'static,
570 E: Send + Sync + tide_disco::Error,
571 ApiVer: StaticVersionType + 'static,
572 {
573 let max_connections = self.http.max_connections;
574
575 async move {
576 if let Some(limit) = max_connections {
577 app.serve(RateLimitListener::with_port(port, limit), bind_version)
578 .await?;
579 } else {
580 app.serve(format!("0.0.0.0:{port}"), bind_version).await?;
581 }
582 Ok(())
583 }
584 }
585}
586
587#[derive(Parser, Clone, Copy, Debug)]
592pub struct Http {
593 #[clap(long, env = "ESPRESSO_SEQUENCER_API_PORT", default_value = "8080")]
595 pub port: u16,
596
597 #[clap(long, env = "ESPRESSO_SEQUENCER_MAX_CONNECTIONS")]
603 pub max_connections: Option<usize>,
604}
605
606impl Http {
607 pub fn with_port(port: u16) -> Self {
609 Self {
610 port,
611 max_connections: None,
612 }
613 }
614}
615
616#[derive(Parser, Clone, Copy, Debug, Default)]
618pub struct Submit;
619
620#[derive(Parser, Clone, Copy, Debug, Default)]
622pub struct Status;
623
624#[derive(Parser, Clone, Copy, Debug, Default)]
626pub struct Catchup;
627
628#[derive(Parser, Clone, Copy, Debug, Default)]
630pub struct Config;
631
632#[derive(Parser, Clone, Debug, Default)]
634pub struct Query {
635 #[clap(long, env = "ESPRESSO_SEQUENCER_API_PEERS", value_delimiter = ',')]
637 pub peers: Vec<Url>,
638}
639
640#[derive(Parser, Clone, Copy, Debug, Default)]
642pub struct State;
643
644#[derive(Parser, Clone, Copy, Debug, Default)]
646pub struct HotshotEvents;
647
648#[derive(Parser, Clone, Copy, Debug, Default)]
650pub struct Explorer;
651
652#[derive(Parser, Clone, Copy, Debug, Default)]
654pub struct LightClient;
655
656fn register_api<E, S, F, ModuleError, ModuleVersion>(
658 path: &'static str,
659 app: &mut App<S, E>,
660 f: F,
661) -> anyhow::Result<()>
662where
663 S: 'static + Send + Sync,
664 E: Send + Sync + 'static + tide_disco::Error + From<ModuleError>,
665 ModuleError: Send + Sync + 'static,
666 ModuleVersion: StaticVersionType + 'static,
667 F: Fn(semver::Version) -> anyhow::Result<Api<S, ModuleError, ModuleVersion>>,
668{
669 let v0 = "0.0.1".parse().unwrap();
670 let v1 = "1.1.0".parse().unwrap();
671 let result1 = f(v0)?;
672 let result2 = f(v1)?;
673
674 app.register_module(path, result1)?;
675 app.register_module(path, result2)?;
676
677 Ok(())
678}