sequencer/api/
options.rs

1//! Sequencer-specific API options and initialization.
2
3use std::sync::Arc;
4
5use anyhow::{bail, Context};
6use clap::Parser;
7use espresso_types::{
8    v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
9    v0_3::RewardMerkleTreeV1,
10    v0_4::RewardMerkleTreeV2,
11    BlockMerkleTree, PubKey, SeqTypes,
12};
13use futures::{
14    channel::oneshot,
15    future::{BoxFuture, Future},
16};
17use hotshot_query_service::{
18    data_source::{ExtensibleDataSource, MetricsDataSource},
19    fetching::provider::QueryServiceProvider,
20    status::{self, UpdateStatusData},
21    ApiState as AppState, Error,
22};
23use hotshot_types::traits::{
24    metrics::{Metrics, NoMetrics},
25    network::ConnectedNetwork,
26    node_implementation::Versions,
27};
28use jf_merkle_tree_compat::MerkleTreeScheme;
29use tide_disco::{listener::RateLimitListener, method::ReadState, Api, App, Url};
30use vbs::version::StaticVersionType;
31
32use super::{
33    data_source::{
34        provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
35        SequencerDataSource, StateSignatureDataSource, SubmitDataSource,
36    },
37    endpoints, fs, light_client, sql,
38    update::ApiEventConsumer,
39    ApiState, StorageState,
40};
41use crate::{
42    api::endpoints::RewardMerkleTreeVersion,
43    catchup::CatchupStorage,
44    context::{SequencerContext, TaskList},
45    persistence,
46    request_response::data_source::Storage as RequestResponseStorage,
47    state::update_state_storage_loop,
48    SequencerApiVersion,
49};
50
51#[derive(Clone, Debug)]
52pub struct Options {
53    pub http: Http,
54    pub query: Option<Query>,
55    pub submit: Option<Submit>,
56    pub status: Option<Status>,
57    pub catchup: Option<Catchup>,
58    pub config: Option<Config>,
59    pub hotshot_events: Option<HotshotEvents>,
60    pub explorer: Option<Explorer>,
61    pub light_client: Option<LightClient>,
62    pub storage_fs: Option<persistence::fs::Options>,
63    pub storage_sql: Option<persistence::sql::Options>,
64}
65
66impl From<Http> for Options {
67    fn from(http: Http) -> Self {
68        Self {
69            http,
70            query: None,
71            submit: None,
72            status: None,
73            catchup: None,
74            config: None,
75            hotshot_events: None,
76            explorer: None,
77            light_client: None,
78            storage_fs: None,
79            storage_sql: None,
80        }
81    }
82}
83
84impl Options {
85    /// Default options for running a web server on the given port.
86    pub fn with_port(port: u16) -> Self {
87        Http::with_port(port).into()
88    }
89
90    /// Add a query API module backed by a Postgres database.
91    pub fn query_sql(mut self, query: Query, storage: persistence::sql::Options) -> Self {
92        self.query = Some(query);
93        self.storage_sql = Some(storage);
94        self
95    }
96
97    /// Add a query API module backed by the file system.
98    pub fn query_fs(mut self, query: Query, storage: persistence::fs::Options) -> Self {
99        self.query = Some(query);
100        self.storage_fs = Some(storage);
101        self
102    }
103
104    /// Add a submit API module.
105    pub fn submit(mut self, opt: Submit) -> Self {
106        self.submit = Some(opt);
107        self
108    }
109
110    /// Add a status API module.
111    pub fn status(mut self, opt: Status) -> Self {
112        self.status = Some(opt);
113        self
114    }
115
116    /// Add a catchup API module.
117    pub fn catchup(mut self, opt: Catchup) -> Self {
118        self.catchup = Some(opt);
119        self
120    }
121
122    /// Add a config API module.
123    pub fn config(mut self, opt: Config) -> Self {
124        self.config = Some(opt);
125        self
126    }
127
128    /// Add a Hotshot events streaming API module.
129    pub fn hotshot_events(mut self, opt: HotshotEvents) -> Self {
130        self.hotshot_events = Some(opt);
131        self
132    }
133
134    /// Add an explorer API module.
135    pub fn explorer(mut self, opt: Explorer) -> Self {
136        self.explorer = Some(opt);
137        self
138    }
139
140    /// Add a light client API module.
141    pub fn light_client(mut self, opt: LightClient) -> Self {
142        self.light_client = Some(opt);
143        self
144    }
145
146    /// Whether these options will run the query API.
147    pub fn has_query_module(&self) -> bool {
148        self.query.is_some() && (self.storage_fs.is_some() || self.storage_sql.is_some())
149    }
150
151    /// Start the server.
152    ///
153    /// The function `init_context` is used to create a sequencer context from a metrics object and
154    /// optional saved consensus state. The metrics object is created from the API data source, so
155    /// that consensus will populuate metrics that can then be read and served by the API.
156    pub async fn serve<N, P, F, V: Versions + 'static>(
157        mut self,
158        init_context: F,
159    ) -> anyhow::Result<SequencerContext<N, P, V>>
160    where
161        N: ConnectedNetwork<PubKey>,
162        P: SequencerPersistence,
163        F: FnOnce(
164            Box<dyn Metrics>,
165            Box<dyn EventConsumer>,
166            Option<RequestResponseStorage>,
167        ) -> BoxFuture<'static, anyhow::Result<SequencerContext<N, P, V>>>,
168    {
169        // Create a channel to send the context to the web server after it is initialized. This
170        // allows the web server to start before initialization can complete, since initialization
171        // can take a long time (and is dependent on other nodes).
172        let (send_ctx, recv_ctx) = oneshot::channel();
173        let state = ApiState::new(async move {
174            recv_ctx
175                .await
176                .expect("context initialized and sent over channel")
177        });
178        let mut tasks = TaskList::default();
179
180        // The server state type depends on whether we are running a query or status API or not, so
181        // we handle the two cases differently.
182        #[allow(clippy::type_complexity)]
183        let (metrics, consumer, storage): (
184            Box<dyn Metrics>,
185            Box<dyn EventConsumer>,
186            Option<RequestResponseStorage>,
187        ) = if let Some(query_opt) = self.query.take() {
188            if let Some(opt) = self.storage_sql.take() {
189                self.init_with_query_module_sql(
190                    query_opt,
191                    opt,
192                    state,
193                    &mut tasks,
194                    SequencerApiVersion::instance(),
195                )
196                .await?
197            } else if let Some(opt) = self.storage_fs.take() {
198                self.init_with_query_module_fs(
199                    query_opt,
200                    opt,
201                    state,
202                    &mut tasks,
203                    SequencerApiVersion::instance(),
204                )
205                .await?
206            } else {
207                bail!("query module requested but not storage provided");
208            }
209        } else if self.status.is_some() {
210            // If a status API is requested but no availability API, we use the
211            // `MetricsDataSource`, which allows us to run the status API with no persistent
212            // storage.
213            let ds = MetricsDataSource::default();
214            let metrics = ds.populate_metrics();
215            let mut app = App::<_, Error>::with_state(AppState::from(ExtensibleDataSource::new(
216                ds,
217                state.clone(),
218            )));
219
220            // Initialize v0 and v1 status API.
221            register_api("status", &mut app, move |ver| {
222                status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
223                    .context("failed to define status api")
224            })?;
225
226            self.init_hotshot_modules(&mut app)?;
227
228            // Initialize hotshot events API if enabled
229            if self.hotshot_events.is_some() {
230                self.init_hotshot_events_module(&mut app)?;
231            }
232
233            tasks.spawn(
234                "API server",
235                self.listen(self.http.port, app, SequencerApiVersion::instance()),
236            );
237
238            (metrics, Box::new(NullEventConsumer), None)
239        } else {
240            // If no status or availability API is requested, we don't need metrics or a query
241            // service data source. The only app state is the HotShot handle, which we use to
242            // submit transactions.
243            //
244            // If we have no availability API, we cannot load a saved leaf from local storage,
245            // so we better have been provided the leaf ahead of time if we want it at all.
246            let mut app = App::<_, Error>::with_state(AppState::from(state.clone()));
247
248            self.init_hotshot_modules(&mut app)?;
249
250            // Initialize hotshot events API if enabled
251            if self.hotshot_events.is_some() {
252                self.init_hotshot_events_module(&mut app)?;
253            }
254
255            tasks.spawn(
256                "API server",
257                self.listen(self.http.port, app, SequencerApiVersion::instance()),
258            );
259
260            (Box::new(NoMetrics), Box::new(NullEventConsumer), None)
261        };
262
263        let ctx = init_context(metrics, consumer, storage.clone()).await?;
264        send_ctx
265            .send(ctx.clone())
266            .ok()
267            .context("API server exited without receiving context")?;
268        Ok(ctx.with_task_list(tasks))
269    }
270
271    async fn init_app_modules<N, P, D, V: Versions>(
272        &self,
273        ds: D,
274        state: ApiState<N, P, V>,
275        bind_version: SequencerApiVersion,
276    ) -> anyhow::Result<(
277        Box<dyn Metrics>,
278        Arc<StorageState<N, P, D, V>>,
279        App<AppState<StorageState<N, P, D, V>>, Error>,
280    )>
281    where
282        N: ConnectedNetwork<PubKey>,
283        P: SequencerPersistence,
284        D: SequencerDataSource + CatchupStorage + Send + Sync + 'static,
285    {
286        let metrics = ds.populate_metrics();
287        let ds = Arc::new(ExtensibleDataSource::new(ds, state.clone()));
288        let api_state: endpoints::AvailState<N, P, D, V> = ds.clone().into();
289        let mut app = App::<_, Error>::with_state(api_state);
290
291        // Initialize v0 and v1 status API.
292        register_api("status", &mut app, move |ver| {
293            status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
294                .context("failed to define status api")
295        })?;
296
297        // Initialize availability and node APIs (these both use the same data source).
298
299        // Note: We initialize two versions of the availability module: `availability/v0` and `availability/v1`.
300        // - `availability/v0/leaf/0` returns the old `Leaf1` type for backward compatibility.
301        // - `availability/v1/leaf/0` returns the new `Leaf2` type
302
303        register_api("availability", &mut app, move |ver| {
304            endpoints::availability(ver).context("failed to define availability api")
305        })?;
306
307        register_api("node", &mut app, move |ver| {
308            endpoints::node(ver).context("failed to define node api")
309        })?;
310
311        register_api("token", &mut app, move |ver| {
312            endpoints::token(ver).context("failed to define token api")
313        })?;
314
315        // Initialize submit API
316        if self.submit.is_some() {
317            register_api("submit", &mut app, move |ver| {
318                endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
319                    .context("failed to define submit api")
320            })?;
321        }
322
323        tracing::info!("initializing catchup API");
324
325        register_api("catchup", &mut app, move |ver| {
326            endpoints::catchup(bind_version, ver).context("failed to define catchup api")
327        })?;
328
329        register_api("state-signature", &mut app, move |ver| {
330            endpoints::state_signature(bind_version, ver)
331                .context("failed to define state signature api")
332        })?;
333
334        if self.config.is_some() {
335            register_api("config", &mut app, move |ver| {
336                endpoints::config(bind_version, ver).context("failed to define config api")
337            })?;
338        }
339        Ok((metrics, ds, app))
340    }
341
342    async fn init_with_query_module_fs<N, P, V: Versions + 'static>(
343        &self,
344        query_opt: Query,
345        mod_opt: persistence::fs::Options,
346        state: ApiState<N, P, V>,
347        tasks: &mut TaskList,
348        bind_version: SequencerApiVersion,
349    ) -> anyhow::Result<(
350        Box<dyn Metrics>,
351        Box<dyn EventConsumer>,
352        Option<RequestResponseStorage>,
353    )>
354    where
355        N: ConnectedNetwork<PubKey>,
356        P: SequencerPersistence,
357    {
358        let ds = <fs::DataSource as SequencerDataSource>::create(
359            mod_opt,
360            provider::<V>(query_opt.peers, bind_version),
361            false,
362        )
363        .await?;
364
365        // Get the inner storage from the data source
366        let inner_storage = ds.inner();
367
368        let (metrics, ds, mut app) = self
369            .init_app_modules(ds, state.clone(), bind_version)
370            .await?;
371
372        // Initialize hotshot events API if enabled
373        if self.hotshot_events.is_some() {
374            self.init_hotshot_events_module(&mut app)?;
375        }
376
377        tasks.spawn("API server", self.listen(self.http.port, app, bind_version));
378        Ok((
379            metrics,
380            Box::new(ApiEventConsumer::from(ds)),
381            Some(RequestResponseStorage::Fs(inner_storage)),
382        ))
383    }
384
385    async fn init_with_query_module_sql<N, P, V: Versions + 'static>(
386        self,
387        query_opt: Query,
388        mod_opt: persistence::sql::Options,
389        state: ApiState<N, P, V>,
390        tasks: &mut TaskList,
391        bind_version: SequencerApiVersion,
392    ) -> anyhow::Result<(
393        Box<dyn Metrics>,
394        Box<dyn EventConsumer>,
395        Option<RequestResponseStorage>,
396    )>
397    where
398        N: ConnectedNetwork<PubKey>,
399        P: SequencerPersistence,
400    {
401        let mut provider = Provider::default();
402
403        // Use the database itself as a fetching provider: sometimes we can fetch data that is
404        // missing from the query service from ephemeral consensus storage.
405        provider = provider.with_provider(mod_opt.clone().create().await?);
406        // If that fails, fetch missing data from peers.
407        for peer in query_opt.peers {
408            tracing::info!("will fetch missing data from {peer}");
409            provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version));
410        }
411
412        let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
413        let inner_storage = ds.inner();
414        let (metrics, ds, mut app) = self
415            .init_app_modules(ds, state.clone(), bind_version)
416            .await?;
417
418        if self.explorer.is_some() {
419            register_api("explorer", &mut app, move |ver| {
420                endpoints::explorer(ver).context("failed to define explorer api")
421            })?;
422        }
423
424        // Initialize merklized state module for block merkle tree
425
426        register_api("block-state", &mut app, move |ver| {
427            endpoints::merklized_state::<N, P, _, BlockMerkleTree, _, 3>(ver)
428                .context("failed to define block-state api")
429        })?;
430
431        // Initialize merklized state module for fee merkle tree
432
433        register_api("fee-state", &mut app, move |ver| {
434            endpoints::fee::<_, SequencerApiVersion>(ver).context("failed to define fee-state api")
435        })?;
436
437        register_api("reward-state", &mut app, move |ver| {
438            endpoints::reward::<
439                _,
440                SequencerApiVersion,
441                RewardMerkleTreeV1,
442                { RewardMerkleTreeV1::ARITY },
443            >(ver, RewardMerkleTreeVersion::V1)
444            .context("failed to define reward-state api")
445        })?;
446
447        // register new api for new reward merkle tree
448        register_api("reward-state-v2", &mut app, move |ver| {
449            endpoints::reward::<
450                _,
451                SequencerApiVersion,
452                RewardMerkleTreeV2,
453                { RewardMerkleTreeV2::ARITY },
454            >(ver, RewardMerkleTreeVersion::V2)
455            .context("failed to define reward-state api")
456        })?;
457
458        let get_node_state = {
459            let state = state.clone();
460            async move { state.node_state().await.clone() }
461        };
462        tasks.spawn(
463            "merklized state storage update loop",
464            update_state_storage_loop(ds.clone(), get_node_state),
465        );
466
467        // Initialize hotshot events API if enabled
468        if self.hotshot_events.is_some() {
469            self.init_hotshot_events_module(&mut app)?;
470        }
471
472        // Initialize light client API if enabled.
473        if self.light_client.is_some() {
474            register_api("light-client", &mut app, move |ver| {
475                light_client::define_api::<_, SequencerApiVersion>(Default::default(), ver)
476                    .context("failed to define light client api")
477            })?;
478        }
479
480        tasks.spawn(
481            "API server",
482            self.listen(self.http.port, app, SequencerApiVersion::instance()),
483        );
484        Ok((
485            metrics,
486            Box::new(ApiEventConsumer::from(ds)),
487            Some(RequestResponseStorage::Sql(inner_storage)),
488        ))
489    }
490
491    /// Initialize the modules for interacting with HotShot.
492    ///
493    /// This function adds the `submit`, `state`, and `state_signature` API modules to the given
494    /// app. These modules only require a HotShot handle as state, and thus they work with any data
495    /// source, so initialization is the same no matter what mode the service is running in.
496    fn init_hotshot_modules<N, P, S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
497    where
498        S: 'static + Send + Sync + ReadState,
499        P: SequencerPersistence,
500        S::State: Send
501            + Sync
502            + SubmitDataSource<N, P>
503            + StateSignatureDataSource<N>
504            + NodeStateDataSource
505            + CatchupDataSource
506            + HotShotConfigDataSource,
507        N: ConnectedNetwork<PubKey>,
508    {
509        let bind_version = SequencerApiVersion::instance();
510        // Initialize submit API
511        if self.submit.is_some() {
512            register_api("submit", app, move |ver| {
513                endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
514                    .context("failed to define submit api")
515            })?;
516        }
517
518        // Initialize state API.
519        if self.catchup.is_some() {
520            tracing::info!("initializing state API");
521
522            register_api("catchup", app, move |ver| {
523                endpoints::catchup(bind_version, ver).context("failed to define catchup api")
524            })?;
525        }
526
527        register_api("state-signature", app, move |ver| {
528            endpoints::state_signature(bind_version, ver)
529                .context("failed to define state signature api")
530        })?;
531
532        if self.config.is_some() {
533            register_api("config", app, move |ver| {
534                endpoints::config(bind_version, ver).context("failed to define config api")
535            })?;
536        }
537
538        Ok(())
539    }
540
541    /// Initialize the hotshot events API module if enabled.
542    ///
543    /// This function adds the hotshot events API module to the given app if the hotshot_events
544    /// option is enabled. This module requires the app state to implement EventsSource.
545    fn init_hotshot_events_module<S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
546    where
547        S: 'static + Send + Sync + ReadState,
548        S::State: Send + Sync + hotshot_events_service::events_source::EventsSource<SeqTypes>,
549    {
550        tracing::info!("Initializing HotShot events API at /hotshot-events");
551        register_api("hotshot-events", app, move |ver| {
552            hotshot_events_service::events::define_api::<_, _, SequencerApiVersion>(
553                &hotshot_events_service::events::Options::default(),
554                ver,
555            )
556            .with_context(|| "failed to define the HotShot events API")
557        })?;
558
559        Ok(())
560    }
561
562    fn listen<S, E, ApiVer>(
563        &self,
564        port: u16,
565        app: App<S, E>,
566        bind_version: ApiVer,
567    ) -> impl Future<Output = anyhow::Result<()>>
568    where
569        S: Send + Sync + 'static,
570        E: Send + Sync + tide_disco::Error,
571        ApiVer: StaticVersionType + 'static,
572    {
573        let max_connections = self.http.max_connections;
574
575        async move {
576            if let Some(limit) = max_connections {
577                app.serve(RateLimitListener::with_port(port, limit), bind_version)
578                    .await?;
579            } else {
580                app.serve(format!("0.0.0.0:{port}"), bind_version).await?;
581            }
582            Ok(())
583        }
584    }
585}
586
587/// The minimal HTTP API.
588///
589/// The API automatically includes health and version endpoints. Additional API modules can be
590/// added by including the query-api or submit-api modules.
591#[derive(Parser, Clone, Copy, Debug)]
592pub struct Http {
593    /// Port that the HTTP API will use.
594    #[clap(long, env = "ESPRESSO_SEQUENCER_API_PORT", default_value = "8080")]
595    pub port: u16,
596
597    /// Maximum number of concurrent HTTP connections the server will allow.
598    ///
599    /// Connections exceeding this will receive and immediate 429 response and be closed.
600    ///
601    /// Leave unset for no connection limit.
602    #[clap(long, env = "ESPRESSO_SEQUENCER_MAX_CONNECTIONS")]
603    pub max_connections: Option<usize>,
604}
605
606impl Http {
607    /// Default options for running a web server on the given port.
608    pub fn with_port(port: u16) -> Self {
609        Self {
610            port,
611            max_connections: None,
612        }
613    }
614}
615
616/// Options for the submission API module.
617#[derive(Parser, Clone, Copy, Debug, Default)]
618pub struct Submit;
619
620/// Options for the status API module.
621#[derive(Parser, Clone, Copy, Debug, Default)]
622pub struct Status;
623
624/// Options for the catchup API module.
625#[derive(Parser, Clone, Copy, Debug, Default)]
626pub struct Catchup;
627
628/// Options for the config API module.
629#[derive(Parser, Clone, Copy, Debug, Default)]
630pub struct Config;
631
632/// Options for the query API module.
633#[derive(Parser, Clone, Debug, Default)]
634pub struct Query {
635    /// Peers for fetching missing data for the query service.
636    #[clap(long, env = "ESPRESSO_SEQUENCER_API_PEERS", value_delimiter = ',')]
637    pub peers: Vec<Url>,
638}
639
640/// Options for the state API module.
641#[derive(Parser, Clone, Copy, Debug, Default)]
642pub struct State;
643
644/// Options for the Hotshot events streaming API module.
645#[derive(Parser, Clone, Copy, Debug, Default)]
646pub struct HotshotEvents;
647
648/// Options for the explorer API module.
649#[derive(Parser, Clone, Copy, Debug, Default)]
650pub struct Explorer;
651
652/// Options for the light client API module.
653#[derive(Parser, Clone, Copy, Debug, Default)]
654pub struct LightClient;
655
656/// Registers two versions (v0 and v1) of the same API module under the given path.
657fn register_api<E, S, F, ModuleError, ModuleVersion>(
658    path: &'static str,
659    app: &mut App<S, E>,
660    f: F,
661) -> anyhow::Result<()>
662where
663    S: 'static + Send + Sync,
664    E: Send + Sync + 'static + tide_disco::Error + From<ModuleError>,
665    ModuleError: Send + Sync + 'static,
666    ModuleVersion: StaticVersionType + 'static,
667    F: Fn(semver::Version) -> anyhow::Result<Api<S, ModuleError, ModuleVersion>>,
668{
669    let v0 = "0.0.1".parse().unwrap();
670    let v1 = "1.1.0".parse().unwrap();
671    let result1 = f(v0)?;
672    let result2 = f(v1)?;
673
674    app.register_module(path, result1)?;
675    app.register_module(path, result2)?;
676
677    Ok(())
678}