sequencer/api/
options.rs

1//! Sequencer-specific API options and initialization.
2
3use std::sync::Arc;
4
5use anyhow::{bail, Context};
6use clap::Parser;
7use espresso_types::{
8    v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
9    v0_3::RewardMerkleTreeV1,
10    v0_4::RewardMerkleTreeV2,
11    BlockMerkleTree, PubKey,
12};
13use futures::{
14    channel::oneshot,
15    future::{BoxFuture, Future},
16};
17use hotshot_query_service::{
18    data_source::{ExtensibleDataSource, MetricsDataSource},
19    fetching::provider::QueryServiceProvider,
20    status::{self, UpdateStatusData},
21    ApiState as AppState, Error,
22};
23use hotshot_types::traits::{
24    metrics::{Metrics, NoMetrics},
25    network::ConnectedNetwork,
26    node_implementation::Versions,
27};
28use jf_merkle_tree::MerkleTreeScheme;
29use tide_disco::{listener::RateLimitListener, method::ReadState, Api, App, Url};
30use vbs::version::StaticVersionType;
31
32use super::{
33    data_source::{
34        provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
35        SequencerDataSource, StateSignatureDataSource, SubmitDataSource,
36    },
37    endpoints, fs, sql,
38    update::ApiEventConsumer,
39    ApiState, StorageState,
40};
41use crate::{
42    catchup::CatchupStorage,
43    context::{SequencerContext, TaskList},
44    persistence,
45    request_response::data_source::Storage as RequestResponseStorage,
46    state::update_state_storage_loop,
47    SequencerApiVersion,
48};
49
50#[derive(Clone, Debug)]
51pub struct Options {
52    pub http: Http,
53    pub query: Option<Query>,
54    pub submit: Option<Submit>,
55    pub status: Option<Status>,
56    pub catchup: Option<Catchup>,
57    pub config: Option<Config>,
58    pub hotshot_events: Option<HotshotEvents>,
59    pub explorer: Option<Explorer>,
60    pub storage_fs: Option<persistence::fs::Options>,
61    pub storage_sql: Option<persistence::sql::Options>,
62}
63
64impl From<Http> for Options {
65    fn from(http: Http) -> Self {
66        Self {
67            http,
68            query: None,
69            submit: None,
70            status: None,
71            catchup: None,
72            config: None,
73            hotshot_events: None,
74            explorer: None,
75            storage_fs: None,
76            storage_sql: None,
77        }
78    }
79}
80
81impl Options {
82    /// Default options for running a web server on the given port.
83    pub fn with_port(port: u16) -> Self {
84        Http::with_port(port).into()
85    }
86
87    /// Add a query API module backed by a Postgres database.
88    pub fn query_sql(mut self, query: Query, storage: persistence::sql::Options) -> Self {
89        self.query = Some(query);
90        self.storage_sql = Some(storage);
91        self
92    }
93
94    /// Add a query API module backed by the file system.
95    pub fn query_fs(mut self, query: Query, storage: persistence::fs::Options) -> Self {
96        self.query = Some(query);
97        self.storage_fs = Some(storage);
98        self
99    }
100
101    /// Add a submit API module.
102    pub fn submit(mut self, opt: Submit) -> Self {
103        self.submit = Some(opt);
104        self
105    }
106
107    /// Add a status API module.
108    pub fn status(mut self, opt: Status) -> Self {
109        self.status = Some(opt);
110        self
111    }
112
113    /// Add a catchup API module.
114    pub fn catchup(mut self, opt: Catchup) -> Self {
115        self.catchup = Some(opt);
116        self
117    }
118
119    /// Add a config API module.
120    pub fn config(mut self, opt: Config) -> Self {
121        self.config = Some(opt);
122        self
123    }
124
125    /// Add a Hotshot events streaming API module.
126    pub fn hotshot_events(mut self, opt: HotshotEvents) -> Self {
127        self.hotshot_events = Some(opt);
128        self
129    }
130
131    /// Add an explorer API module.
132    pub fn explorer(mut self, opt: Explorer) -> Self {
133        self.explorer = Some(opt);
134        self
135    }
136
137    /// Whether these options will run the query API.
138    pub fn has_query_module(&self) -> bool {
139        self.query.is_some() && (self.storage_fs.is_some() || self.storage_sql.is_some())
140    }
141
142    /// Start the server.
143    ///
144    /// The function `init_context` is used to create a sequencer context from a metrics object and
145    /// optional saved consensus state. The metrics object is created from the API data source, so
146    /// that consensus will populuate metrics that can then be read and served by the API.
147    pub async fn serve<N, P, F, V: Versions + 'static>(
148        mut self,
149        init_context: F,
150    ) -> anyhow::Result<SequencerContext<N, P, V>>
151    where
152        N: ConnectedNetwork<PubKey>,
153        P: SequencerPersistence,
154        F: FnOnce(
155            Box<dyn Metrics>,
156            Box<dyn EventConsumer>,
157            Option<RequestResponseStorage>,
158        ) -> BoxFuture<'static, anyhow::Result<SequencerContext<N, P, V>>>,
159    {
160        // Create a channel to send the context to the web server after it is initialized. This
161        // allows the web server to start before initialization can complete, since initialization
162        // can take a long time (and is dependent on other nodes).
163        let (send_ctx, recv_ctx) = oneshot::channel();
164        let state = ApiState::new(async move {
165            recv_ctx
166                .await
167                .expect("context initialized and sent over channel")
168        });
169        let mut tasks = TaskList::default();
170
171        // The server state type depends on whether we are running a query or status API or not, so
172        // we handle the two cases differently.
173        #[allow(clippy::type_complexity)]
174        let (metrics, consumer, storage): (
175            Box<dyn Metrics>,
176            Box<dyn EventConsumer>,
177            Option<RequestResponseStorage>,
178        ) = if let Some(query_opt) = self.query.take() {
179            if let Some(opt) = self.storage_sql.take() {
180                self.init_with_query_module_sql(
181                    query_opt,
182                    opt,
183                    state,
184                    &mut tasks,
185                    SequencerApiVersion::instance(),
186                )
187                .await?
188            } else if let Some(opt) = self.storage_fs.take() {
189                self.init_with_query_module_fs(
190                    query_opt,
191                    opt,
192                    state,
193                    &mut tasks,
194                    SequencerApiVersion::instance(),
195                )
196                .await?
197            } else {
198                bail!("query module requested but not storage provided");
199            }
200        } else if self.status.is_some() {
201            // If a status API is requested but no availability API, we use the
202            // `MetricsDataSource`, which allows us to run the status API with no persistent
203            // storage.
204            let ds = MetricsDataSource::default();
205            let metrics = ds.populate_metrics();
206            let mut app = App::<_, Error>::with_state(AppState::from(ExtensibleDataSource::new(
207                ds,
208                state.clone(),
209            )));
210
211            // Initialize v0 and v1 status API.
212            register_api("status", &mut app, move |ver| {
213                status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
214                    .context("failed to define status api")
215            })?;
216
217            self.init_hotshot_modules(&mut app)?;
218
219            // Initialize hotshot events API if enabled
220            if self.hotshot_events.is_some() {
221                self.init_hotshot_events_module(&mut app)?;
222            }
223
224            tasks.spawn(
225                "API server",
226                self.listen(self.http.port, app, SequencerApiVersion::instance()),
227            );
228
229            (metrics, Box::new(NullEventConsumer), None)
230        } else {
231            // If no status or availability API is requested, we don't need metrics or a query
232            // service data source. The only app state is the HotShot handle, which we use to
233            // submit transactions.
234            //
235            // If we have no availability API, we cannot load a saved leaf from local storage,
236            // so we better have been provided the leaf ahead of time if we want it at all.
237            let mut app = App::<_, Error>::with_state(AppState::from(state.clone()));
238
239            self.init_hotshot_modules(&mut app)?;
240
241            // Initialize hotshot events API if enabled
242            if self.hotshot_events.is_some() {
243                self.init_hotshot_events_module(&mut app)?;
244            }
245
246            tasks.spawn(
247                "API server",
248                self.listen(self.http.port, app, SequencerApiVersion::instance()),
249            );
250
251            (Box::new(NoMetrics), Box::new(NullEventConsumer), None)
252        };
253
254        let ctx = init_context(metrics, consumer, storage.clone()).await?;
255        send_ctx
256            .send(ctx.clone())
257            .ok()
258            .context("API server exited without receiving context")?;
259        Ok(ctx.with_task_list(tasks))
260    }
261
262    async fn init_app_modules<N, P, D, V: Versions>(
263        &self,
264        ds: D,
265        state: ApiState<N, P, V>,
266        bind_version: SequencerApiVersion,
267    ) -> anyhow::Result<(
268        Box<dyn Metrics>,
269        Arc<StorageState<N, P, D, V>>,
270        App<AppState<StorageState<N, P, D, V>>, Error>,
271    )>
272    where
273        N: ConnectedNetwork<PubKey>,
274        P: SequencerPersistence,
275        D: SequencerDataSource + CatchupStorage + Send + Sync + 'static,
276    {
277        let metrics = ds.populate_metrics();
278        let ds = Arc::new(ExtensibleDataSource::new(ds, state.clone()));
279        let api_state: endpoints::AvailState<N, P, D, V> = ds.clone().into();
280        let mut app = App::<_, Error>::with_state(api_state);
281
282        // Initialize v0 and v1 status API.
283        register_api("status", &mut app, move |ver| {
284            status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
285                .context("failed to define status api")
286        })?;
287
288        // Initialize availability and node APIs (these both use the same data source).
289
290        // Note: We initialize two versions of the availability module: `availability/v0` and `availability/v1`.
291        // - `availability/v0/leaf/0` returns the old `Leaf1` type for backward compatibility.
292        // - `availability/v1/leaf/0` returns the new `Leaf2` type
293
294        register_api("availability", &mut app, move |ver| {
295            endpoints::availability(ver).context("failed to define availability api")
296        })?;
297
298        register_api("node", &mut app, move |ver| {
299            endpoints::node(ver).context("failed to define node api")
300        })?;
301
302        // Initialize submit API
303        if self.submit.is_some() {
304            register_api("submit", &mut app, move |ver| {
305                endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
306                    .context("failed to define submit api")
307            })?;
308        }
309
310        tracing::info!("initializing catchup API");
311
312        register_api("catchup", &mut app, move |ver| {
313            endpoints::catchup(bind_version, ver).context("failed to define catchup api")
314        })?;
315
316        register_api("state-signature", &mut app, move |ver| {
317            endpoints::state_signature(bind_version, ver)
318                .context("failed to define state signature api")
319        })?;
320
321        if self.config.is_some() {
322            register_api("config", &mut app, move |ver| {
323                endpoints::config(bind_version, ver).context("failed to define config api")
324            })?;
325        }
326        Ok((metrics, ds, app))
327    }
328
329    async fn init_with_query_module_fs<N, P, V: Versions + 'static>(
330        &self,
331        query_opt: Query,
332        mod_opt: persistence::fs::Options,
333        state: ApiState<N, P, V>,
334        tasks: &mut TaskList,
335        bind_version: SequencerApiVersion,
336    ) -> anyhow::Result<(
337        Box<dyn Metrics>,
338        Box<dyn EventConsumer>,
339        Option<RequestResponseStorage>,
340    )>
341    where
342        N: ConnectedNetwork<PubKey>,
343        P: SequencerPersistence,
344    {
345        let ds = <fs::DataSource as SequencerDataSource>::create(
346            mod_opt,
347            provider::<V>(query_opt.peers, bind_version),
348            false,
349        )
350        .await?;
351
352        // Get the inner storage from the data source
353        let inner_storage = ds.inner();
354
355        let (metrics, ds, mut app) = self
356            .init_app_modules(ds, state.clone(), bind_version)
357            .await?;
358
359        // Initialize hotshot events API if enabled
360        if self.hotshot_events.is_some() {
361            self.init_hotshot_events_module(&mut app)?;
362        }
363
364        tasks.spawn("API server", self.listen(self.http.port, app, bind_version));
365        Ok((
366            metrics,
367            Box::new(ApiEventConsumer::from(ds)),
368            Some(RequestResponseStorage::Fs(inner_storage)),
369        ))
370    }
371
372    async fn init_with_query_module_sql<N, P, V: Versions + 'static>(
373        self,
374        query_opt: Query,
375        mod_opt: persistence::sql::Options,
376        state: ApiState<N, P, V>,
377        tasks: &mut TaskList,
378        bind_version: SequencerApiVersion,
379    ) -> anyhow::Result<(
380        Box<dyn Metrics>,
381        Box<dyn EventConsumer>,
382        Option<RequestResponseStorage>,
383    )>
384    where
385        N: ConnectedNetwork<PubKey>,
386        P: SequencerPersistence,
387    {
388        let mut provider = Provider::default();
389
390        // Use the database itself as a fetching provider: sometimes we can fetch data that is
391        // missing from the query service from ephemeral consensus storage.
392        provider = provider.with_provider(mod_opt.clone().create().await?);
393        // If that fails, fetch missing data from peers.
394        for peer in query_opt.peers {
395            tracing::info!("will fetch missing data from {peer}");
396            provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version));
397        }
398
399        let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
400        let inner_storage = ds.inner();
401        let (metrics, ds, mut app) = self
402            .init_app_modules(ds, state.clone(), bind_version)
403            .await?;
404
405        if self.explorer.is_some() {
406            register_api("explorer", &mut app, move |ver| {
407                endpoints::explorer(ver).context("failed to define explorer api")
408            })?;
409        }
410
411        // Initialize merklized state module for block merkle tree
412
413        register_api("block-state", &mut app, move |ver| {
414            endpoints::merklized_state::<N, P, _, BlockMerkleTree, _, 3>(ver)
415                .context("failed to define block-state api")
416        })?;
417
418        // Initialize merklized state module for fee merkle tree
419
420        register_api("fee-state", &mut app, move |ver| {
421            endpoints::fee::<_, SequencerApiVersion>(ver).context("failed to define fee-state api")
422        })?;
423
424        register_api("reward-state", &mut app, move |ver| {
425            endpoints::reward::<
426                _,
427                SequencerApiVersion,
428                RewardMerkleTreeV1,
429                { RewardMerkleTreeV1::ARITY },
430            >(ver)
431            .context("failed to define reward-state api")
432        })?;
433
434        // register new api for new reward merkle tree
435        register_api("reward-state-v2", &mut app, move |ver| {
436            endpoints::reward::<
437                _,
438                SequencerApiVersion,
439                RewardMerkleTreeV2,
440                { RewardMerkleTreeV2::ARITY },
441            >(ver)
442            .context("failed to define reward-state api")
443        })?;
444
445        let get_node_state = {
446            let state = state.clone();
447            async move { state.node_state().await.clone() }
448        };
449        tasks.spawn(
450            "merklized state storage update loop",
451            update_state_storage_loop(ds.clone(), get_node_state),
452        );
453
454        // Initialize hotshot events API if enabled
455        if self.hotshot_events.is_some() {
456            self.init_hotshot_events_module(&mut app)?;
457        }
458
459        tasks.spawn(
460            "API server",
461            self.listen(self.http.port, app, SequencerApiVersion::instance()),
462        );
463        Ok((
464            metrics,
465            Box::new(ApiEventConsumer::from(ds)),
466            Some(RequestResponseStorage::Sql(inner_storage)),
467        ))
468    }
469
470    /// Initialize the modules for interacting with HotShot.
471    ///
472    /// This function adds the `submit`, `state`, and `state_signature` API modules to the given
473    /// app. These modules only require a HotShot handle as state, and thus they work with any data
474    /// source, so initialization is the same no matter what mode the service is running in.
475    fn init_hotshot_modules<N, P, S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
476    where
477        S: 'static + Send + Sync + ReadState,
478        P: SequencerPersistence,
479        S::State: Send
480            + Sync
481            + SubmitDataSource<N, P>
482            + StateSignatureDataSource<N>
483            + NodeStateDataSource
484            + CatchupDataSource
485            + HotShotConfigDataSource,
486        N: ConnectedNetwork<PubKey>,
487    {
488        let bind_version = SequencerApiVersion::instance();
489        // Initialize submit API
490        if self.submit.is_some() {
491            register_api("submit", app, move |ver| {
492                endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
493                    .context("failed to define submit api")
494            })?;
495        }
496
497        // Initialize state API.
498        if self.catchup.is_some() {
499            tracing::info!("initializing state API");
500
501            register_api("catchup", app, move |ver| {
502                endpoints::catchup(bind_version, ver).context("failed to define catchup api")
503            })?;
504        }
505
506        register_api("state-signature", app, move |ver| {
507            endpoints::state_signature(bind_version, ver)
508                .context("failed to define state signature api")
509        })?;
510
511        if self.config.is_some() {
512            register_api("config", app, move |ver| {
513                endpoints::config(bind_version, ver).context("failed to define config api")
514            })?;
515        }
516
517        Ok(())
518    }
519
520    /// Initialize the hotshot events API module if enabled.
521    ///
522    /// This function adds the hotshot events API module to the given app if the hotshot_events
523    /// option is enabled. This module requires the app state to implement EventsSource.
524    fn init_hotshot_events_module<S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
525    where
526        S: 'static + Send + Sync + ReadState,
527        S::State:
528            Send + Sync + hotshot_events_service::events_source::EventsSource<crate::SeqTypes>,
529    {
530        tracing::info!("Initializing HotShot events API at /hotshot-events");
531        register_api("hotshot-events", app, move |ver| {
532            hotshot_events_service::events::define_api::<_, _, SequencerApiVersion>(
533                &hotshot_events_service::events::Options::default(),
534                ver,
535            )
536            .with_context(|| "failed to define the HotShot events API")
537        })?;
538
539        Ok(())
540    }
541
542    fn listen<S, E, ApiVer>(
543        &self,
544        port: u16,
545        app: App<S, E>,
546        bind_version: ApiVer,
547    ) -> impl Future<Output = anyhow::Result<()>>
548    where
549        S: Send + Sync + 'static,
550        E: Send + Sync + tide_disco::Error,
551        ApiVer: StaticVersionType + 'static,
552    {
553        let max_connections = self.http.max_connections;
554
555        async move {
556            if let Some(limit) = max_connections {
557                app.serve(RateLimitListener::with_port(port, limit), bind_version)
558                    .await?;
559            } else {
560                app.serve(format!("0.0.0.0:{port}"), bind_version).await?;
561            }
562            Ok(())
563        }
564    }
565}
566
567/// The minimal HTTP API.
568///
569/// The API automatically includes health and version endpoints. Additional API modules can be
570/// added by including the query-api or submit-api modules.
571#[derive(Parser, Clone, Copy, Debug)]
572pub struct Http {
573    /// Port that the HTTP API will use.
574    #[clap(long, env = "ESPRESSO_SEQUENCER_API_PORT", default_value = "8080")]
575    pub port: u16,
576
577    /// Maximum number of concurrent HTTP connections the server will allow.
578    ///
579    /// Connections exceeding this will receive and immediate 429 response and be closed.
580    ///
581    /// Leave unset for no connection limit.
582    #[clap(long, env = "ESPRESSO_SEQUENCER_MAX_CONNECTIONS")]
583    pub max_connections: Option<usize>,
584}
585
586impl Http {
587    /// Default options for running a web server on the given port.
588    pub fn with_port(port: u16) -> Self {
589        Self {
590            port,
591            max_connections: None,
592        }
593    }
594}
595
596/// Options for the submission API module.
597#[derive(Parser, Clone, Copy, Debug, Default)]
598pub struct Submit;
599
600/// Options for the status API module.
601#[derive(Parser, Clone, Copy, Debug, Default)]
602pub struct Status;
603
604/// Options for the catchup API module.
605#[derive(Parser, Clone, Copy, Debug, Default)]
606pub struct Catchup;
607
608/// Options for the config API module.
609#[derive(Parser, Clone, Copy, Debug, Default)]
610pub struct Config;
611
612/// Options for the query API module.
613#[derive(Parser, Clone, Debug, Default)]
614pub struct Query {
615    /// Peers for fetching missing data for the query service.
616    #[clap(long, env = "ESPRESSO_SEQUENCER_API_PEERS", value_delimiter = ',')]
617    pub peers: Vec<Url>,
618}
619
620/// Options for the state API module.
621#[derive(Parser, Clone, Copy, Debug, Default)]
622pub struct State;
623
624/// Options for the Hotshot events streaming API module.
625#[derive(Parser, Clone, Copy, Debug, Default)]
626pub struct HotshotEvents;
627
628/// Options for the explorer API module.
629#[derive(Parser, Clone, Copy, Debug, Default)]
630pub struct Explorer;
631
632/// Registers two versions (v0 and v1) of the same API module under the given path.
633fn register_api<E, S, F, ModuleError, ModuleVersion>(
634    path: &'static str,
635    app: &mut App<S, E>,
636    f: F,
637) -> anyhow::Result<()>
638where
639    S: 'static + Send + Sync,
640    E: Send + Sync + 'static + tide_disco::Error + From<ModuleError>,
641    ModuleError: Send + Sync + 'static,
642    ModuleVersion: StaticVersionType + 'static,
643    F: Fn(semver::Version) -> anyhow::Result<Api<S, ModuleError, ModuleVersion>>,
644{
645    let v0 = "0.0.1".parse().unwrap();
646    let v1 = "1.0.0".parse().unwrap();
647    let result1 = f(v0)?;
648    let result2 = f(v1)?;
649
650    app.register_module(path, result1)?;
651    app.register_module(path, result2)?;
652
653    Ok(())
654}