1use std::sync::Arc;
4
5use anyhow::{bail, Context};
6use clap::Parser;
7use espresso_types::{
8 v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
9 v0_3::RewardMerkleTreeV1,
10 v0_4::RewardMerkleTreeV2,
11 BlockMerkleTree, PubKey,
12};
13use futures::{
14 channel::oneshot,
15 future::{BoxFuture, Future},
16};
17use hotshot_query_service::{
18 data_source::{ExtensibleDataSource, MetricsDataSource},
19 fetching::provider::QueryServiceProvider,
20 status::{self, UpdateStatusData},
21 ApiState as AppState, Error,
22};
23use hotshot_types::traits::{
24 metrics::{Metrics, NoMetrics},
25 network::ConnectedNetwork,
26 node_implementation::Versions,
27};
28use jf_merkle_tree::MerkleTreeScheme;
29use tide_disco::{listener::RateLimitListener, method::ReadState, Api, App, Url};
30use vbs::version::StaticVersionType;
31
32use super::{
33 data_source::{
34 provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
35 SequencerDataSource, StateSignatureDataSource, SubmitDataSource,
36 },
37 endpoints, fs, sql,
38 update::ApiEventConsumer,
39 ApiState, StorageState,
40};
41use crate::{
42 catchup::CatchupStorage,
43 context::{SequencerContext, TaskList},
44 persistence,
45 request_response::data_source::Storage as RequestResponseStorage,
46 state::update_state_storage_loop,
47 SequencerApiVersion,
48};
49
50#[derive(Clone, Debug)]
51pub struct Options {
52 pub http: Http,
53 pub query: Option<Query>,
54 pub submit: Option<Submit>,
55 pub status: Option<Status>,
56 pub catchup: Option<Catchup>,
57 pub config: Option<Config>,
58 pub hotshot_events: Option<HotshotEvents>,
59 pub explorer: Option<Explorer>,
60 pub storage_fs: Option<persistence::fs::Options>,
61 pub storage_sql: Option<persistence::sql::Options>,
62}
63
64impl From<Http> for Options {
65 fn from(http: Http) -> Self {
66 Self {
67 http,
68 query: None,
69 submit: None,
70 status: None,
71 catchup: None,
72 config: None,
73 hotshot_events: None,
74 explorer: None,
75 storage_fs: None,
76 storage_sql: None,
77 }
78 }
79}
80
81impl Options {
82 pub fn with_port(port: u16) -> Self {
84 Http::with_port(port).into()
85 }
86
87 pub fn query_sql(mut self, query: Query, storage: persistence::sql::Options) -> Self {
89 self.query = Some(query);
90 self.storage_sql = Some(storage);
91 self
92 }
93
94 pub fn query_fs(mut self, query: Query, storage: persistence::fs::Options) -> Self {
96 self.query = Some(query);
97 self.storage_fs = Some(storage);
98 self
99 }
100
101 pub fn submit(mut self, opt: Submit) -> Self {
103 self.submit = Some(opt);
104 self
105 }
106
107 pub fn status(mut self, opt: Status) -> Self {
109 self.status = Some(opt);
110 self
111 }
112
113 pub fn catchup(mut self, opt: Catchup) -> Self {
115 self.catchup = Some(opt);
116 self
117 }
118
119 pub fn config(mut self, opt: Config) -> Self {
121 self.config = Some(opt);
122 self
123 }
124
125 pub fn hotshot_events(mut self, opt: HotshotEvents) -> Self {
127 self.hotshot_events = Some(opt);
128 self
129 }
130
131 pub fn explorer(mut self, opt: Explorer) -> Self {
133 self.explorer = Some(opt);
134 self
135 }
136
137 pub fn has_query_module(&self) -> bool {
139 self.query.is_some() && (self.storage_fs.is_some() || self.storage_sql.is_some())
140 }
141
142 pub async fn serve<N, P, F, V: Versions + 'static>(
148 mut self,
149 init_context: F,
150 ) -> anyhow::Result<SequencerContext<N, P, V>>
151 where
152 N: ConnectedNetwork<PubKey>,
153 P: SequencerPersistence,
154 F: FnOnce(
155 Box<dyn Metrics>,
156 Box<dyn EventConsumer>,
157 Option<RequestResponseStorage>,
158 ) -> BoxFuture<'static, anyhow::Result<SequencerContext<N, P, V>>>,
159 {
160 let (send_ctx, recv_ctx) = oneshot::channel();
164 let state = ApiState::new(async move {
165 recv_ctx
166 .await
167 .expect("context initialized and sent over channel")
168 });
169 let mut tasks = TaskList::default();
170
171 #[allow(clippy::type_complexity)]
174 let (metrics, consumer, storage): (
175 Box<dyn Metrics>,
176 Box<dyn EventConsumer>,
177 Option<RequestResponseStorage>,
178 ) = if let Some(query_opt) = self.query.take() {
179 if let Some(opt) = self.storage_sql.take() {
180 self.init_with_query_module_sql(
181 query_opt,
182 opt,
183 state,
184 &mut tasks,
185 SequencerApiVersion::instance(),
186 )
187 .await?
188 } else if let Some(opt) = self.storage_fs.take() {
189 self.init_with_query_module_fs(
190 query_opt,
191 opt,
192 state,
193 &mut tasks,
194 SequencerApiVersion::instance(),
195 )
196 .await?
197 } else {
198 bail!("query module requested but not storage provided");
199 }
200 } else if self.status.is_some() {
201 let ds = MetricsDataSource::default();
205 let metrics = ds.populate_metrics();
206 let mut app = App::<_, Error>::with_state(AppState::from(ExtensibleDataSource::new(
207 ds,
208 state.clone(),
209 )));
210
211 register_api("status", &mut app, move |ver| {
213 status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
214 .context("failed to define status api")
215 })?;
216
217 self.init_hotshot_modules(&mut app)?;
218
219 if self.hotshot_events.is_some() {
221 self.init_hotshot_events_module(&mut app)?;
222 }
223
224 tasks.spawn(
225 "API server",
226 self.listen(self.http.port, app, SequencerApiVersion::instance()),
227 );
228
229 (metrics, Box::new(NullEventConsumer), None)
230 } else {
231 let mut app = App::<_, Error>::with_state(AppState::from(state.clone()));
238
239 self.init_hotshot_modules(&mut app)?;
240
241 if self.hotshot_events.is_some() {
243 self.init_hotshot_events_module(&mut app)?;
244 }
245
246 tasks.spawn(
247 "API server",
248 self.listen(self.http.port, app, SequencerApiVersion::instance()),
249 );
250
251 (Box::new(NoMetrics), Box::new(NullEventConsumer), None)
252 };
253
254 let ctx = init_context(metrics, consumer, storage.clone()).await?;
255 send_ctx
256 .send(ctx.clone())
257 .ok()
258 .context("API server exited without receiving context")?;
259 Ok(ctx.with_task_list(tasks))
260 }
261
262 async fn init_app_modules<N, P, D, V: Versions>(
263 &self,
264 ds: D,
265 state: ApiState<N, P, V>,
266 bind_version: SequencerApiVersion,
267 ) -> anyhow::Result<(
268 Box<dyn Metrics>,
269 Arc<StorageState<N, P, D, V>>,
270 App<AppState<StorageState<N, P, D, V>>, Error>,
271 )>
272 where
273 N: ConnectedNetwork<PubKey>,
274 P: SequencerPersistence,
275 D: SequencerDataSource + CatchupStorage + Send + Sync + 'static,
276 {
277 let metrics = ds.populate_metrics();
278 let ds = Arc::new(ExtensibleDataSource::new(ds, state.clone()));
279 let api_state: endpoints::AvailState<N, P, D, V> = ds.clone().into();
280 let mut app = App::<_, Error>::with_state(api_state);
281
282 register_api("status", &mut app, move |ver| {
284 status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
285 .context("failed to define status api")
286 })?;
287
288 register_api("availability", &mut app, move |ver| {
295 endpoints::availability(ver).context("failed to define availability api")
296 })?;
297
298 register_api("node", &mut app, move |ver| {
299 endpoints::node(ver).context("failed to define node api")
300 })?;
301
302 if self.submit.is_some() {
304 register_api("submit", &mut app, move |ver| {
305 endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
306 .context("failed to define submit api")
307 })?;
308 }
309
310 tracing::info!("initializing catchup API");
311
312 register_api("catchup", &mut app, move |ver| {
313 endpoints::catchup(bind_version, ver).context("failed to define catchup api")
314 })?;
315
316 register_api("state-signature", &mut app, move |ver| {
317 endpoints::state_signature(bind_version, ver)
318 .context("failed to define state signature api")
319 })?;
320
321 if self.config.is_some() {
322 register_api("config", &mut app, move |ver| {
323 endpoints::config(bind_version, ver).context("failed to define config api")
324 })?;
325 }
326 Ok((metrics, ds, app))
327 }
328
329 async fn init_with_query_module_fs<N, P, V: Versions + 'static>(
330 &self,
331 query_opt: Query,
332 mod_opt: persistence::fs::Options,
333 state: ApiState<N, P, V>,
334 tasks: &mut TaskList,
335 bind_version: SequencerApiVersion,
336 ) -> anyhow::Result<(
337 Box<dyn Metrics>,
338 Box<dyn EventConsumer>,
339 Option<RequestResponseStorage>,
340 )>
341 where
342 N: ConnectedNetwork<PubKey>,
343 P: SequencerPersistence,
344 {
345 let ds = <fs::DataSource as SequencerDataSource>::create(
346 mod_opt,
347 provider::<V>(query_opt.peers, bind_version),
348 false,
349 )
350 .await?;
351
352 let inner_storage = ds.inner();
354
355 let (metrics, ds, mut app) = self
356 .init_app_modules(ds, state.clone(), bind_version)
357 .await?;
358
359 if self.hotshot_events.is_some() {
361 self.init_hotshot_events_module(&mut app)?;
362 }
363
364 tasks.spawn("API server", self.listen(self.http.port, app, bind_version));
365 Ok((
366 metrics,
367 Box::new(ApiEventConsumer::from(ds)),
368 Some(RequestResponseStorage::Fs(inner_storage)),
369 ))
370 }
371
372 async fn init_with_query_module_sql<N, P, V: Versions + 'static>(
373 self,
374 query_opt: Query,
375 mod_opt: persistence::sql::Options,
376 state: ApiState<N, P, V>,
377 tasks: &mut TaskList,
378 bind_version: SequencerApiVersion,
379 ) -> anyhow::Result<(
380 Box<dyn Metrics>,
381 Box<dyn EventConsumer>,
382 Option<RequestResponseStorage>,
383 )>
384 where
385 N: ConnectedNetwork<PubKey>,
386 P: SequencerPersistence,
387 {
388 let mut provider = Provider::default();
389
390 provider = provider.with_provider(mod_opt.clone().create().await?);
393 for peer in query_opt.peers {
395 tracing::info!("will fetch missing data from {peer}");
396 provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version));
397 }
398
399 let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
400 let inner_storage = ds.inner();
401 let (metrics, ds, mut app) = self
402 .init_app_modules(ds, state.clone(), bind_version)
403 .await?;
404
405 if self.explorer.is_some() {
406 register_api("explorer", &mut app, move |ver| {
407 endpoints::explorer(ver).context("failed to define explorer api")
408 })?;
409 }
410
411 register_api("block-state", &mut app, move |ver| {
414 endpoints::merklized_state::<N, P, _, BlockMerkleTree, _, 3>(ver)
415 .context("failed to define block-state api")
416 })?;
417
418 register_api("fee-state", &mut app, move |ver| {
421 endpoints::fee::<_, SequencerApiVersion>(ver).context("failed to define fee-state api")
422 })?;
423
424 register_api("reward-state", &mut app, move |ver| {
425 endpoints::reward::<
426 _,
427 SequencerApiVersion,
428 RewardMerkleTreeV1,
429 { RewardMerkleTreeV1::ARITY },
430 >(ver)
431 .context("failed to define reward-state api")
432 })?;
433
434 register_api("reward-state-v2", &mut app, move |ver| {
436 endpoints::reward::<
437 _,
438 SequencerApiVersion,
439 RewardMerkleTreeV2,
440 { RewardMerkleTreeV2::ARITY },
441 >(ver)
442 .context("failed to define reward-state api")
443 })?;
444
445 let get_node_state = {
446 let state = state.clone();
447 async move { state.node_state().await.clone() }
448 };
449 tasks.spawn(
450 "merklized state storage update loop",
451 update_state_storage_loop(ds.clone(), get_node_state),
452 );
453
454 if self.hotshot_events.is_some() {
456 self.init_hotshot_events_module(&mut app)?;
457 }
458
459 tasks.spawn(
460 "API server",
461 self.listen(self.http.port, app, SequencerApiVersion::instance()),
462 );
463 Ok((
464 metrics,
465 Box::new(ApiEventConsumer::from(ds)),
466 Some(RequestResponseStorage::Sql(inner_storage)),
467 ))
468 }
469
470 fn init_hotshot_modules<N, P, S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
476 where
477 S: 'static + Send + Sync + ReadState,
478 P: SequencerPersistence,
479 S::State: Send
480 + Sync
481 + SubmitDataSource<N, P>
482 + StateSignatureDataSource<N>
483 + NodeStateDataSource
484 + CatchupDataSource
485 + HotShotConfigDataSource,
486 N: ConnectedNetwork<PubKey>,
487 {
488 let bind_version = SequencerApiVersion::instance();
489 if self.submit.is_some() {
491 register_api("submit", app, move |ver| {
492 endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
493 .context("failed to define submit api")
494 })?;
495 }
496
497 if self.catchup.is_some() {
499 tracing::info!("initializing state API");
500
501 register_api("catchup", app, move |ver| {
502 endpoints::catchup(bind_version, ver).context("failed to define catchup api")
503 })?;
504 }
505
506 register_api("state-signature", app, move |ver| {
507 endpoints::state_signature(bind_version, ver)
508 .context("failed to define state signature api")
509 })?;
510
511 if self.config.is_some() {
512 register_api("config", app, move |ver| {
513 endpoints::config(bind_version, ver).context("failed to define config api")
514 })?;
515 }
516
517 Ok(())
518 }
519
520 fn init_hotshot_events_module<S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
525 where
526 S: 'static + Send + Sync + ReadState,
527 S::State:
528 Send + Sync + hotshot_events_service::events_source::EventsSource<crate::SeqTypes>,
529 {
530 tracing::info!("Initializing HotShot events API at /hotshot-events");
531 register_api("hotshot-events", app, move |ver| {
532 hotshot_events_service::events::define_api::<_, _, SequencerApiVersion>(
533 &hotshot_events_service::events::Options::default(),
534 ver,
535 )
536 .with_context(|| "failed to define the HotShot events API")
537 })?;
538
539 Ok(())
540 }
541
542 fn listen<S, E, ApiVer>(
543 &self,
544 port: u16,
545 app: App<S, E>,
546 bind_version: ApiVer,
547 ) -> impl Future<Output = anyhow::Result<()>>
548 where
549 S: Send + Sync + 'static,
550 E: Send + Sync + tide_disco::Error,
551 ApiVer: StaticVersionType + 'static,
552 {
553 let max_connections = self.http.max_connections;
554
555 async move {
556 if let Some(limit) = max_connections {
557 app.serve(RateLimitListener::with_port(port, limit), bind_version)
558 .await?;
559 } else {
560 app.serve(format!("0.0.0.0:{port}"), bind_version).await?;
561 }
562 Ok(())
563 }
564 }
565}
566
567#[derive(Parser, Clone, Copy, Debug)]
572pub struct Http {
573 #[clap(long, env = "ESPRESSO_SEQUENCER_API_PORT", default_value = "8080")]
575 pub port: u16,
576
577 #[clap(long, env = "ESPRESSO_SEQUENCER_MAX_CONNECTIONS")]
583 pub max_connections: Option<usize>,
584}
585
586impl Http {
587 pub fn with_port(port: u16) -> Self {
589 Self {
590 port,
591 max_connections: None,
592 }
593 }
594}
595
596#[derive(Parser, Clone, Copy, Debug, Default)]
598pub struct Submit;
599
600#[derive(Parser, Clone, Copy, Debug, Default)]
602pub struct Status;
603
604#[derive(Parser, Clone, Copy, Debug, Default)]
606pub struct Catchup;
607
608#[derive(Parser, Clone, Copy, Debug, Default)]
610pub struct Config;
611
612#[derive(Parser, Clone, Debug, Default)]
614pub struct Query {
615 #[clap(long, env = "ESPRESSO_SEQUENCER_API_PEERS", value_delimiter = ',')]
617 pub peers: Vec<Url>,
618}
619
620#[derive(Parser, Clone, Copy, Debug, Default)]
622pub struct State;
623
624#[derive(Parser, Clone, Copy, Debug, Default)]
626pub struct HotshotEvents;
627
628#[derive(Parser, Clone, Copy, Debug, Default)]
630pub struct Explorer;
631
632fn register_api<E, S, F, ModuleError, ModuleVersion>(
634 path: &'static str,
635 app: &mut App<S, E>,
636 f: F,
637) -> anyhow::Result<()>
638where
639 S: 'static + Send + Sync,
640 E: Send + Sync + 'static + tide_disco::Error + From<ModuleError>,
641 ModuleError: Send + Sync + 'static,
642 ModuleVersion: StaticVersionType + 'static,
643 F: Fn(semver::Version) -> anyhow::Result<Api<S, ModuleError, ModuleVersion>>,
644{
645 let v0 = "0.0.1".parse().unwrap();
646 let v1 = "1.0.0".parse().unwrap();
647 let result1 = f(v0)?;
648 let result2 = f(v1)?;
649
650 app.register_module(path, result1)?;
651 app.register_module(path, result2)?;
652
653 Ok(())
654}