hotshot_query_service/
lib.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! The HotShot Query Service is a minimal, generic query service that can be integrated into any
14//! decentralized application running on the [hotshot] consensus layer. It provides all the features
15//! that HotShot itself expects of a query service (such as providing consensus-related data for
16//! catchup and synchronization) as well as some application-level features that deal only with
17//! consensus-related or application-agnostic data. In addition, the query service is provided as an
18//! extensible library, which makes it easy to add additional, application-specific features.
19//!
20//! # Basic usage
21//!
22//! ```
23//! # use hotshot::types::SystemContextHandle;
24//! # use hotshot_query_service::testing::mocks::{
25//! #   MockNodeImpl as AppNodeImpl, MockTypes as AppTypes, MockVersions as AppVersions,
26//! # };
27//! # use hotshot_example_types::node_types::TestVersions;
28//! # use hotshot_types::consensus::ConsensusMetricsValue;
29//! # use std::path::Path;
30//! # async fn doc(storage_path: &std::path::Path) -> anyhow::Result<()> {
31//! use hotshot_query_service::{
32//!     availability,
33//!     data_source::{FileSystemDataSource, Transaction, UpdateDataSource, VersionedDataSource},
34//!     fetching::provider::NoFetching,
35//!     node,
36//!     status::UpdateStatusData,
37//!     status,
38//!     testing::mocks::MockBase,
39//!     ApiState, Error,
40//! };
41//!
42//! use futures::StreamExt;
43//! use vbs::version::StaticVersionType;
44//! use hotshot::SystemContext;
45//! use std::sync::Arc;
46//! use tide_disco::App;
47//! use tokio::spawn;
48//!
49//! // Create or open a data source.
50//! let data_source = FileSystemDataSource::<AppTypes, NoFetching>::create(storage_path, NoFetching)
51//!     .await?;
52//!
53//! // Create hotshot, giving it a handle to the status metrics.
54//! let hotshot = SystemContext::<AppTypes, AppNodeImpl, AppVersions>::init(
55//! #   panic!(), panic!(), panic!(), panic!(), panic!(), panic!(), panic!(),
56//!     ConsensusMetricsValue::new(&*data_source.populate_metrics()), panic!(),
57//!     panic!()
58//!     // Other fields omitted
59//! ).await?.0;
60//!
61//! // Create API modules.
62//! let availability_api = availability::define_api(&Default::default(),  MockBase::instance())?;
63//! let node_api = node::define_api(&Default::default(),  MockBase::instance())?;
64//! let status_api = status::define_api(&Default::default(),  MockBase::instance())?;
65//!
66//! // Create app.
67//! let data_source = ApiState::from(data_source);
68//! let mut app = App::<_, Error>::with_state(data_source.clone());
69//! app
70//!     .register_module("availability", availability_api)?
71//!     .register_module("node", node_api)?
72//!     .register_module("status", status_api)?;
73//!
74//! // Serve app.
75//! spawn(app.serve("0.0.0.0:8080", MockBase::instance()));
76//!
77//! // Update query data using HotShot events.
78//! let mut events = hotshot.event_stream();
79//! while let Some(event) = events.next().await {
80//!     // Update the query data based on this event.
81//!     data_source.update(&event).await.ok();
82//! }
83//! # Ok(())
84//! # }
85//! ```
86//!
87//! Shortcut for starting an out-of-the-box service with no extensions (does exactly the above and
88//! nothing more):
89//!
90//! ```
91//! # use hotshot::types::SystemContextHandle;
92//! # use vbs::version::StaticVersionType;
93//! # use hotshot_query_service::{data_source::FileSystemDataSource, Error, Options};
94//! # use hotshot_query_service::fetching::provider::NoFetching;
95//! # use hotshot_query_service::testing::mocks::{MockBase, MockNodeImpl, MockTypes, MockVersions};
96//! # use std::path::Path;
97//! # use tokio::spawn;
98//! # async fn doc(storage_path: &Path, options: Options, hotshot: SystemContextHandle<MockTypes, MockNodeImpl, MockVersions>) -> Result<(), Error> {
99//! use hotshot_query_service::run_standalone_service;
100//!
101//! let data_source = FileSystemDataSource::create(storage_path, NoFetching).await.map_err(Error::internal)?;
102//! spawn(run_standalone_service(options, data_source, hotshot,  MockBase::instance()));
103//! # Ok(())
104//! # }
105//! ```
106//!
107//! # Persistence
108//!
109//! Naturally, an archival query service such as this is heavily dependent on a persistent storage
110//! implementation. The APIs provided by this query service are generic over the specific type of
111//! the persistence layer, which we call a _data source_. This crate provides several data source
112//! implementations in the [`data_source`] module.
113//!
114//! # Interaction with other components
115//!
116//! While the HotShot Query Service [can be used as a standalone service](run_standalone_service),
117//! it is designed to be used as a single component of a larger service consisting of several other
118//! interacting components. This interaction has two dimensions:
119//! * _extension_, adding new functionality to the API modules provided by this crate
120//! * _composition_, combining the API modules from this crate with other, application-specific API
121//!   modules to create a single [tide_disco] API
122//!
123//! ## Extension
124//!
125//! It is possible to add new functionality directly to the modules provided by this create. This
126//! allows you to keep semantically related functionality grouped together in a single API module,
127//! for interface purposes, even while some of the functionality of that module is provided by this
128//! crate and some of it is an application-specific extension.
129//!
130//! For example, consider an application which is a UTXO-based blockchain. Each transaction consists
131//! of a handful of new _output records_, and you want your query service to provide an API for
132//! looking up a specific output by its index. Semantically, this functionality belongs in the
133//! _data availability_ API, however it is application-specific -- HotShot itself makes no
134//! assumptions and provides no guarantees about the internal structure of a transaction. In order
135//! to expose this UTXO-specific functionality as well as the generic data availability
136//! functionality provided by this crate as part of the same public API, you can extend the
137//! [availability] module of this crate with additional data structures and endpoints that know
138//! about the internal structure of your transactions.
139//!
140//! There are two parts to adding additional functionality to a module in this crate: adding the
141//! required additional data structures to the data source, and creating a new API endpoint to
142//! expose the functionality. The mechanism for the former will depend on the specific data source
143//! you are using. Check the documentation for your data source implementation to see how it can be
144//! extended.
145//!
146//! For the latter, you can modify the default availability API with the addition of a new endpoint
147//! that accesses the custom state you have added to the data source. It is good practice to define
148//! a trait for accessing this custom state, so that if you want to switch data sources in the
149//! future, you can easily extend the new data source, implement the trait, and then transparently
150//! replace the data source that you use to set up your API. In the case of
151//! adding a UTXO index, this trait might look like this:
152//!
153//! ```
154//! # use hotshot_query_service::{
155//! #   availability::{AvailabilityDataSource, TransactionIndex},
156//! #   testing::mocks::MockTypes as AppTypes,
157//! # };
158//! use async_trait::async_trait;
159//!
160//! #[async_trait]
161//! trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
162//!     // Index mapping UTXO index to (block index, transaction index, output index)
163//!     async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
164//! }
165//! ```
166//!
167//! Implement this trait for the extended data source you're using, and then add a new endpoint to
168//! the availability API like so:
169//!
170//! ```
171//! # use async_trait::async_trait;
172//! # use futures::FutureExt;
173//! # use hotshot_query_service::availability::{
174//! #   self, AvailabilityDataSource, FetchBlockSnafu, TransactionIndex,
175//! # };
176//! # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
177//! # use hotshot_query_service::testing::mocks::MockBase;
178//! # use hotshot_query_service::{ApiState, Error};
179//! # use snafu::ResultExt;
180//! # use tide_disco::{api::ApiError, method::ReadState, Api, App, StatusCode};
181//! # use vbs::version::StaticVersionType;
182//! # #[async_trait]
183//! # trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
184//! #   async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
185//! # }
186//!
187//! fn define_app_specific_availability_api<State, Ver: StaticVersionType + 'static>(
188//!     options: &availability::Options,
189//!     bind_version: Ver,
190//! ) -> Result<Api<State, availability::Error, Ver>, ApiError>
191//! where
192//!     State: 'static + Send + Sync + ReadState,
193//!     <State as ReadState>::State: UtxoDataSource + Send + Sync,
194//! {
195//!     let mut api = availability::define_api(options, bind_version)?;
196//!     api.get("get_utxo", |req, state: &<State as ReadState>::State| async move {
197//!         let utxo_index = req.integer_param("index")?;
198//!         let (block_index, txn_index, output_index) = state
199//!             .find_utxo(utxo_index)
200//!             .await
201//!             .ok_or_else(|| availability::Error::Custom {
202//!                 message: format!("no such UTXO {}", utxo_index),
203//!                 status: StatusCode::NOT_FOUND,
204//!             })?;
205//!         let block = state
206//!             .get_block(block_index)
207//!             .await
208//!             .context(FetchBlockSnafu { resource: block_index.to_string() })?;
209//!         let txn = block.transaction(&txn_index).unwrap();
210//!         let utxo = // Application-specific logic to extract a UTXO from a transaction.
211//! #           todo!();
212//!         Ok(utxo)
213//!     }.boxed())?;
214//!     Ok(api)
215//! }
216//!
217//! fn init_server<D: UtxoDataSource + Send + Sync + 'static, Ver: StaticVersionType + 'static>(
218//!     options: &availability::Options,
219//!     data_source: D,
220//!     bind_version: Ver,
221//! ) -> Result<App<ApiState<D>, Error>, availability::Error> {
222//!     let api = define_app_specific_availability_api(options, bind_version)
223//!         .map_err(availability::Error::internal)?;
224//!     let mut app = App::<_, _>::with_state(ApiState::from(data_source));
225//!     app.register_module("availability", api).map_err(availability::Error::internal)?;
226//!     Ok(app)
227//! }
228//! ```
229//!
230//! Now you need to define the new route, `get_utxo`, in your API specification. Create a file
231//! `app_specific_availability.toml`:
232//!
233//! ```toml
234//! [route.get_utxo]
235//! PATH = ["utxo/:index"]
236//! ":index" = "Integer"
237//! DOC = "Get a UTXO by its index"
238//! ```
239//!
240//! and make sure `options.extensions` includes `"app_specific_availability.toml"`.
241//!
242//! ## Composition
243//!
244//! Composing the modules provided by this crate with other, unrelated modules to create a unified
245//! service is fairly simple, as most of the complexity is handled by [tide_disco], which already
246//! provides a mechanism for composing several modules into a single application. In principle, all
247//! you need to do is register the [availability], [node], and [status] APIs provided by this crate
248//! with a [tide_disco::App], and then register your own API modules with the same app.
249//!
250//! The one wrinkle is that all modules within a [tide_disco] app must share the same state type. It
251//! is for this reason that the modules provided by this crate are generic on the state type --
252//! [availability::define_api], [node::define_api], and [status::define_api] can all work with any
253//! state type, provided that type implements the corresponding data source traits. The data sources
254//! provided by this crate implement these traits, but if you want to use a custom state type that
255//! includes state for other modules, you will need to implement these traits for your custom type.
256//! The basic pattern looks like this:
257//!
258//! ```
259//! # use async_trait::async_trait;
260//! # use hotshot_query_service::{Header, QueryResult, VidShare};
261//! # use hotshot_query_service::availability::{
262//! #   AvailabilityDataSource, BlockId, BlockQueryData, Fetch, FetchStream, LeafId, LeafQueryData,
263//! #   PayloadMetadata, PayloadQueryData, TransactionHash, TransactionQueryData,
264//! #   VidCommonMetadata, VidCommonQueryData,
265//! # };
266//! # use hotshot_query_service::metrics::PrometheusMetrics;
267//! # use hotshot_query_service::node::{
268//! #   NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart,
269//! # };
270//! # use hotshot_query_service::status::{HasMetrics, StatusDataSource};
271//! # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
272//! # use std::ops::{Bound, RangeBounds};
273//! # type AppQueryData = ();
274//! // Our AppState takes an underlying data source `D` which already implements the relevant
275//! // traits, and adds some state for use with other modules.
276//! struct AppState<D> {
277//!     hotshot_qs: D,
278//!     // additional state for other modules
279//! }
280//!
281//! // Implement data source trait for availability API by delegating to the underlying data source.
282//! #[async_trait]
283//! impl<D: AvailabilityDataSource<AppTypes> + Send + Sync>
284//!     AvailabilityDataSource<AppTypes> for AppState<D>
285//! {
286//!     async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<AppTypes>>
287//!     where
288//!         ID: Into<LeafId<AppTypes>> + Send + Sync,
289//!     {
290//!         self.hotshot_qs.get_leaf(id).await
291//!     }
292//!
293//!     // etc
294//! #   async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<AppTypes>>
295//! #   where
296//! #       ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
297//! #   async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<AppTypes>>
298//! #   where
299//! #       ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
300//! #   async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<AppTypes>>
301//! #   where
302//! #       ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
303//! #   async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<AppTypes>>
304//! #   where
305//! #       ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
306//! #   async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<AppTypes>>
307//! #   where
308//! #       ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
309//! #   async fn get_transaction(&self, hash: TransactionHash<AppTypes>) -> Fetch<TransactionQueryData<AppTypes>> { todo!() }
310//! #   async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<AppTypes>>
311//! #   where
312//! #       R: RangeBounds<usize> + Send { todo!() }
313//! #   async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<AppTypes>>
314//! #   where
315//! #       R: RangeBounds<usize> + Send { todo!() }
316//! #   async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<AppTypes>>
317//! #   where
318//! #       R: RangeBounds<usize> + Send { todo!() }
319//! #   async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<AppTypes>>
320//! #   where
321//! #       R: RangeBounds<usize> + Send { todo!() }
322//! #   async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<AppTypes>>
323//! #   where
324//! #       R: RangeBounds<usize> + Send { todo!() }
325//! #   async fn get_vid_common_metadata_range<R>(&self, range: R) -> FetchStream<VidCommonMetadata<AppTypes>>
326//! #   where
327//! #       R: RangeBounds<usize> + Send { todo!() }
328//! #   async fn get_leaf_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<LeafQueryData<AppTypes>> { todo!() }
329//! #   async fn get_block_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<BlockQueryData<AppTypes>> { todo!() }
330//! #   async fn get_payload_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<PayloadQueryData<AppTypes>> { todo!() }
331//! #   async fn get_payload_metadata_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<PayloadMetadata<AppTypes>> { todo!() }
332//! #   async fn get_vid_common_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<VidCommonQueryData<AppTypes>> { todo!() }
333//! #   async fn get_vid_common_metadata_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<VidCommonMetadata<AppTypes>> { todo!() }
334//! }
335//!
336//! // Implement data source trait for node API by delegating to the underlying data source.
337//! #[async_trait]
338//! impl<D: NodeDataSource<AppTypes> + Send + Sync> NodeDataSource<AppTypes> for AppState<D> {
339//!     async fn block_height(&self) -> QueryResult<usize> {
340//!         self.hotshot_qs.block_height().await
341//!     }
342//!
343//!     async fn count_transactions_in_range(
344//!         &self,
345//!         range: impl RangeBounds<usize> + Send,
346//!     ) -> QueryResult<usize> {
347//!         self.hotshot_qs.count_transactions_in_range(range).await
348//!     }
349//!
350//!     async fn payload_size_in_range(
351//!         &self,
352//!         range: impl RangeBounds<usize> + Send,
353//!     ) -> QueryResult<usize> {
354//!         self.hotshot_qs.payload_size_in_range(range).await
355//!     }
356//!
357//!     async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
358//!     where
359//!         ID: Into<BlockId<AppTypes>> + Send + Sync,
360//!     {
361//!         self.hotshot_qs.vid_share(id).await
362//!     }
363//!
364//!     async fn sync_status(&self) -> QueryResult<SyncStatus> {
365//!         self.hotshot_qs.sync_status().await
366//!     }
367//!
368//!     async fn get_header_window(
369//!         &self,
370//!         start: impl Into<WindowStart<AppTypes>> + Send + Sync,
371//!         end: u64,
372//!         limit: usize,
373//!     ) -> QueryResult<TimeWindowQueryData<Header<AppTypes>>> {
374//!         self.hotshot_qs.get_header_window(start, end, limit).await
375//!     }
376//! }
377//!
378//! // Implement data source trait for status API by delegating to the underlying data source.
379//! impl<D: HasMetrics> HasMetrics for AppState<D> {
380//!     fn metrics(&self) -> &PrometheusMetrics {
381//!         self.hotshot_qs.metrics()
382//!     }
383//! }
384//! #[async_trait]
385//! impl<D: StatusDataSource + Send + Sync> StatusDataSource for AppState<D> {
386//!     async fn block_height(&self) -> QueryResult<usize> {
387//!         self.hotshot_qs.block_height().await
388//!     }
389//! }
390//!
391//! // Implement data source traits for other modules, using additional state from AppState.
392//! ```
393//!
394//! In the future, we may provide derive macros for
395//! [AvailabilityDataSource](availability::AvailabilityDataSource),
396//! [NodeDataSource](node::NodeDataSource), and [StatusDataSource](status::StatusDataSource) to
397//! eliminate the boilerplate of implementing them for a custom type that has an existing
398//! implementation as one of its fields.
399//!
400//! Once you have created your `AppState` type aggregating the state for each API module, you can
401//! initialize the state as normal, instantiating `D` with a concrete implementation of a data
402//! source and initializing `hotshot_qs` as you normally would that data source.
403//!
404//! _However_, this only works if you want the persistent storage for the availability and node
405//! modules (managed by `hotshot_qs`) to be independent of the persistent storage for other modules.
406//! You may well want to synchronize the storage for all modules together, so that updates to the
407//! entire application state can be done atomically. This is particularly relevant if one of your
408//! application-specific modules updates its storage based on a stream of HotShot leaves. Since the
409//! availability and node modules also update with each new leaf, you probably want all of these
410//! modules to stay in sync. The data source implementations provided by this crate provide means by
411//! which you can add additional data to the same persistent store and synchronize the entire store
412//! together. Refer to the documentation for you specific data source for information on how to
413//! achieve this.
414//!
415
416mod api;
417pub mod availability;
418pub mod data_source;
419mod error;
420pub mod explorer;
421pub mod fetching;
422pub mod merklized_state;
423pub mod metrics;
424pub mod node;
425mod resolvable;
426pub mod status;
427pub mod task;
428pub mod testing;
429pub mod types;
430
431use std::sync::Arc;
432
433use async_trait::async_trait;
434use derive_more::{Deref, From, Into};
435pub use error::Error;
436use futures::{future::BoxFuture, stream::StreamExt};
437use hotshot::types::SystemContextHandle;
438use hotshot_types::traits::{
439    node_implementation::{NodeImplementation, NodeType, Versions},
440    BlockPayload,
441};
442pub use hotshot_types::{data::Leaf2, simple_certificate::QuorumCertificate};
443pub use resolvable::Resolvable;
444use serde::{Deserialize, Serialize};
445use snafu::Snafu;
446use task::BackgroundTask;
447use tide_disco::{method::ReadState, App, StatusCode};
448use vbs::version::StaticVersionType;
449
450#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
451pub enum VidCommon {
452    V0(hotshot_types::vid::advz::ADVZCommon),
453    V1(hotshot_types::vid::avidm::AvidMCommon),
454}
455
456pub type Payload<Types> = <Types as NodeType>::BlockPayload;
457pub type Header<Types> = <Types as NodeType>::BlockHeader;
458pub type Metadata<Types> = <Payload<Types> as BlockPayload<Types>>::Metadata;
459/// Item within a [`Payload`].
460pub type Transaction<Types> = <Payload<Types> as BlockPayload<Types>>::Transaction;
461pub type SignatureKey<Types> = <Types as NodeType>::SignatureKey;
462
463#[derive(Clone, Debug, Snafu, Deserialize, Serialize)]
464#[snafu(visibility(pub))]
465pub enum QueryError {
466    /// The requested resource does not exist or is not known to this query service.
467    NotFound,
468    /// The requested resource exists but is not currently available.
469    ///
470    /// In most cases a missing resource can be recovered from DA.
471    Missing,
472    /// There was an error while trying to fetch the requested resource.
473    #[snafu(display("Failed to fetch requested resource: {message}"))]
474    #[snafu(context(suffix(ErrorSnafu)))]
475    Error { message: String },
476}
477
478impl QueryError {
479    pub fn status(&self) -> StatusCode {
480        match self {
481            Self::NotFound | Self::Missing => StatusCode::NOT_FOUND,
482            Self::Error { .. } => StatusCode::INTERNAL_SERVER_ERROR,
483        }
484    }
485}
486
487pub type QueryResult<T> = Result<T, QueryError>;
488
489#[derive(Default)]
490pub struct Options {
491    pub availability: availability::Options,
492    pub node: node::Options,
493    pub status: status::Options,
494    pub port: u16,
495}
496
497/// Read-only wrapper for API state which does not require locking.
498#[derive(Clone, Debug, Deref, From, Into)]
499pub struct ApiState<D>(Arc<D>);
500
501#[async_trait]
502impl<D: 'static + Send + Sync> ReadState for ApiState<D> {
503    type State = D;
504    async fn read<T>(
505        &self,
506        op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
507    ) -> T {
508        op(&self.0).await
509    }
510}
511
512impl<D> From<D> for ApiState<D> {
513    fn from(d: D) -> Self {
514        Self::from(Arc::new(d))
515    }
516}
517
518/// Run an instance of the HotShot Query service with no customization.
519pub async fn run_standalone_service<
520    Types: NodeType,
521    I: NodeImplementation<Types>,
522    D,
523    ApiVer,
524    HsVer: Versions,
525>(
526    options: Options,
527    data_source: D,
528    hotshot: SystemContextHandle<Types, I, HsVer>,
529    bind_version: ApiVer,
530) -> Result<(), Error>
531where
532    Payload<Types>: availability::QueryablePayload<Types>,
533    Header<Types>: availability::QueryableHeader<Types>,
534    D: availability::AvailabilityDataSource<Types>
535        + data_source::UpdateDataSource<Types>
536        + node::NodeDataSource<Types>
537        + status::StatusDataSource
538        + data_source::VersionedDataSource
539        + Send
540        + Sync
541        + 'static,
542    ApiVer: StaticVersionType + 'static,
543{
544    // Create API modules.
545    let availability_api_v0 = availability::define_api(
546        &options.availability,
547        bind_version,
548        "0.0.1".parse().unwrap(),
549    )
550    .map_err(Error::internal)?;
551
552    let availability_api_v1 = availability::define_api(
553        &options.availability,
554        bind_version,
555        "1.0.0".parse().unwrap(),
556    )
557    .map_err(Error::internal)?;
558    let node_api = node::define_api(&options.node, bind_version, "0.0.1".parse().unwrap())
559        .map_err(Error::internal)?;
560    let status_api = status::define_api(&options.status, bind_version, "0.0.1".parse().unwrap())
561        .map_err(Error::internal)?;
562
563    // Create app.
564    let data_source = Arc::new(data_source);
565    let mut app = App::<_, Error>::with_state(ApiState(data_source.clone()));
566    app.register_module("availability", availability_api_v0)
567        .map_err(Error::internal)?
568        .register_module("availability", availability_api_v1)
569        .map_err(Error::internal)?
570        .register_module("node", node_api)
571        .map_err(Error::internal)?
572        .register_module("status", status_api)
573        .map_err(Error::internal)?;
574
575    // Serve app.
576    let url = format!("0.0.0.0:{}", options.port);
577    let _server =
578        BackgroundTask::spawn("server", async move { app.serve(&url, bind_version).await });
579
580    // Subscribe to events before starting consensus, so we don't miss any events.
581    let mut events = hotshot.event_stream();
582    hotshot.hotshot.start_consensus().await;
583
584    // Update query data using HotShot events.
585    while let Some(event) = events.next().await {
586        // Update the query data based on this event. It is safe to ignore errors here; the error
587        // just returns the failed block height for use in garbage collection, but this simple
588        // implementation isn't doing any kind of garbage collection.
589        data_source.update(&event).await.ok();
590    }
591
592    Ok(())
593}
594
595#[cfg(test)]
596mod test {
597    use std::{
598        ops::{Bound, RangeBounds},
599        time::Duration,
600    };
601
602    use async_lock::RwLock;
603    use async_trait::async_trait;
604    use atomic_store::{load_store::BincodeLoadStore, AtomicStore, AtomicStoreLoader, RollingLog};
605    use futures::future::FutureExt;
606    use hotshot_types::{data::VidShare, simple_certificate::QuorumCertificate2};
607    use portpicker::pick_unused_port;
608    use surf_disco::Client;
609    use tempfile::TempDir;
610    use testing::mocks::MockBase;
611    use tide_disco::App;
612    use toml::toml;
613
614    use super::*;
615    use crate::{
616        availability::{
617            AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch, FetchStream, LeafId,
618            LeafQueryData, NamespaceId, PayloadMetadata, PayloadQueryData, StateCertQueryData,
619            TransactionHash, TransactionQueryData, UpdateAvailabilityData, VidCommonMetadata,
620            VidCommonQueryData,
621        },
622        metrics::PrometheusMetrics,
623        node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
624        status::{HasMetrics, StatusDataSource},
625        testing::{
626            consensus::MockDataSource,
627            mocks::{MockHeader, MockPayload, MockTypes},
628        },
629    };
630
631    struct CompositeState {
632        store: AtomicStore,
633        hotshot_qs: MockDataSource,
634        module_state: RollingLog<BincodeLoadStore<u64>>,
635    }
636
637    #[async_trait]
638    impl AvailabilityDataSource<MockTypes> for CompositeState {
639        async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<MockTypes>>
640        where
641            ID: Into<LeafId<MockTypes>> + Send + Sync,
642        {
643            self.hotshot_qs.get_leaf(id).await
644        }
645        async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<MockTypes>>
646        where
647            ID: Into<BlockId<MockTypes>> + Send + Sync,
648        {
649            self.hotshot_qs.get_block(id).await
650        }
651
652        async fn get_header<ID>(&self, id: ID) -> Fetch<Header<MockTypes>>
653        where
654            ID: Into<BlockId<MockTypes>> + Send + Sync,
655        {
656            self.hotshot_qs.get_header(id).await
657        }
658        async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<MockTypes>>
659        where
660            ID: Into<BlockId<MockTypes>> + Send + Sync,
661        {
662            self.hotshot_qs.get_payload(id).await
663        }
664        async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<MockTypes>>
665        where
666            ID: Into<BlockId<MockTypes>> + Send + Sync,
667        {
668            self.hotshot_qs.get_payload_metadata(id).await
669        }
670        async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<MockTypes>>
671        where
672            ID: Into<BlockId<MockTypes>> + Send + Sync,
673        {
674            self.hotshot_qs.get_vid_common(id).await
675        }
676        async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<MockTypes>>
677        where
678            ID: Into<BlockId<MockTypes>> + Send + Sync,
679        {
680            self.hotshot_qs.get_vid_common_metadata(id).await
681        }
682        async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<MockTypes>>
683        where
684            R: RangeBounds<usize> + Send + 'static,
685        {
686            self.hotshot_qs.get_leaf_range(range).await
687        }
688        async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<MockTypes>>
689        where
690            R: RangeBounds<usize> + Send + 'static,
691        {
692            self.hotshot_qs.get_block_range(range).await
693        }
694
695        async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<MockTypes>>
696        where
697            R: RangeBounds<usize> + Send + 'static,
698        {
699            self.hotshot_qs.get_header_range(range).await
700        }
701        async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<MockTypes>>
702        where
703            R: RangeBounds<usize> + Send + 'static,
704        {
705            self.hotshot_qs.get_payload_range(range).await
706        }
707        async fn get_payload_metadata_range<R>(
708            &self,
709            range: R,
710        ) -> FetchStream<PayloadMetadata<MockTypes>>
711        where
712            R: RangeBounds<usize> + Send + 'static,
713        {
714            self.hotshot_qs.get_payload_metadata_range(range).await
715        }
716        async fn get_vid_common_range<R>(
717            &self,
718            range: R,
719        ) -> FetchStream<VidCommonQueryData<MockTypes>>
720        where
721            R: RangeBounds<usize> + Send + 'static,
722        {
723            self.hotshot_qs.get_vid_common_range(range).await
724        }
725        async fn get_vid_common_metadata_range<R>(
726            &self,
727            range: R,
728        ) -> FetchStream<VidCommonMetadata<MockTypes>>
729        where
730            R: RangeBounds<usize> + Send + 'static,
731        {
732            self.hotshot_qs.get_vid_common_metadata_range(range).await
733        }
734        async fn get_leaf_range_rev(
735            &self,
736            start: Bound<usize>,
737            end: usize,
738        ) -> FetchStream<LeafQueryData<MockTypes>> {
739            self.hotshot_qs.get_leaf_range_rev(start, end).await
740        }
741        async fn get_block_range_rev(
742            &self,
743            start: Bound<usize>,
744            end: usize,
745        ) -> FetchStream<BlockQueryData<MockTypes>> {
746            self.hotshot_qs.get_block_range_rev(start, end).await
747        }
748        async fn get_payload_range_rev(
749            &self,
750            start: Bound<usize>,
751            end: usize,
752        ) -> FetchStream<PayloadQueryData<MockTypes>> {
753            self.hotshot_qs.get_payload_range_rev(start, end).await
754        }
755        async fn get_payload_metadata_range_rev(
756            &self,
757            start: Bound<usize>,
758            end: usize,
759        ) -> FetchStream<PayloadMetadata<MockTypes>> {
760            self.hotshot_qs
761                .get_payload_metadata_range_rev(start, end)
762                .await
763        }
764        async fn get_vid_common_range_rev(
765            &self,
766            start: Bound<usize>,
767            end: usize,
768        ) -> FetchStream<VidCommonQueryData<MockTypes>> {
769            self.hotshot_qs.get_vid_common_range_rev(start, end).await
770        }
771        async fn get_vid_common_metadata_range_rev(
772            &self,
773            start: Bound<usize>,
774            end: usize,
775        ) -> FetchStream<VidCommonMetadata<MockTypes>> {
776            self.hotshot_qs
777                .get_vid_common_metadata_range_rev(start, end)
778                .await
779        }
780        async fn get_transaction(
781            &self,
782            hash: TransactionHash<MockTypes>,
783        ) -> Fetch<TransactionQueryData<MockTypes>> {
784            self.hotshot_qs.get_transaction(hash).await
785        }
786        async fn get_state_cert(&self, epoch: u64) -> Fetch<StateCertQueryData<MockTypes>> {
787            self.hotshot_qs.get_state_cert(epoch).await
788        }
789    }
790
791    // Imiplement data source trait for node API.
792    #[async_trait]
793    impl NodeDataSource<MockTypes> for CompositeState {
794        async fn block_height(&self) -> QueryResult<usize> {
795            StatusDataSource::block_height(self).await
796        }
797        async fn count_transactions_in_range(
798            &self,
799            range: impl RangeBounds<usize> + Send,
800            namespace: Option<NamespaceId<MockTypes>>,
801        ) -> QueryResult<usize> {
802            self.hotshot_qs
803                .count_transactions_in_range(range, namespace)
804                .await
805        }
806        async fn payload_size_in_range(
807            &self,
808            range: impl RangeBounds<usize> + Send,
809            namespace: Option<NamespaceId<MockTypes>>,
810        ) -> QueryResult<usize> {
811            self.hotshot_qs
812                .payload_size_in_range(range, namespace)
813                .await
814        }
815        async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
816        where
817            ID: Into<BlockId<MockTypes>> + Send + Sync,
818        {
819            self.hotshot_qs.vid_share(id).await
820        }
821        async fn sync_status(&self) -> QueryResult<SyncStatus> {
822            self.hotshot_qs.sync_status().await
823        }
824        async fn get_header_window(
825            &self,
826            start: impl Into<WindowStart<MockTypes>> + Send + Sync,
827            end: u64,
828            limit: usize,
829        ) -> QueryResult<TimeWindowQueryData<Header<MockTypes>>> {
830            self.hotshot_qs.get_header_window(start, end, limit).await
831        }
832    }
833
834    // Implement data source trait for status API.
835    impl HasMetrics for CompositeState {
836        fn metrics(&self) -> &PrometheusMetrics {
837            self.hotshot_qs.metrics()
838        }
839    }
840    #[async_trait]
841    impl StatusDataSource for CompositeState {
842        async fn block_height(&self) -> QueryResult<usize> {
843            StatusDataSource::block_height(&self.hotshot_qs).await
844        }
845    }
846
847    #[tokio::test(flavor = "multi_thread")]
848    async fn test_composition() {
849        use hotshot_example_types::node_types::TestVersions;
850
851        let dir = TempDir::with_prefix("test_composition").unwrap();
852        let mut loader = AtomicStoreLoader::create(dir.path(), "test_composition").unwrap();
853        let hotshot_qs = MockDataSource::create_with_store(&mut loader, Default::default())
854            .await
855            .unwrap();
856
857        // Mock up some data and add a block to the store.
858        let leaf =
859            Leaf2::<MockTypes>::genesis::<TestVersions>(&Default::default(), &Default::default())
860                .await;
861        let qc =
862            QuorumCertificate2::genesis::<TestVersions>(&Default::default(), &Default::default())
863                .await;
864        let leaf = LeafQueryData::new(leaf, qc).unwrap();
865        let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
866        hotshot_qs
867            .append(BlockInfo::new(leaf, Some(block), None, None, None))
868            .await
869            .unwrap();
870
871        let module_state =
872            RollingLog::create(&mut loader, Default::default(), "module_state", 1024).unwrap();
873        let state = CompositeState {
874            hotshot_qs,
875            module_state,
876            store: AtomicStore::open(loader).unwrap(),
877        };
878
879        let module_spec = toml! {
880            [route.post_ext]
881            PATH = ["/ext/:val"]
882            METHOD = "POST"
883            ":val" = "Integer"
884
885            [route.get_ext]
886            PATH = ["/ext"]
887            METHOD = "GET"
888        };
889
890        let mut app = App::<_, Error>::with_state(RwLock::new(state));
891        app.register_module(
892            "availability",
893            availability::define_api(
894                &Default::default(),
895                MockBase::instance(),
896                "0.0.1".parse().unwrap(),
897            )
898            .unwrap(),
899        )
900        .unwrap()
901        .register_module(
902            "node",
903            node::define_api(
904                &Default::default(),
905                MockBase::instance(),
906                "0.0.1".parse().unwrap(),
907            )
908            .unwrap(),
909        )
910        .unwrap()
911        .register_module(
912            "status",
913            status::define_api(
914                &Default::default(),
915                MockBase::instance(),
916                "0.0.1".parse().unwrap(),
917            )
918            .unwrap(),
919        )
920        .unwrap()
921        .module::<Error, MockBase>("mod", module_spec)
922        .unwrap()
923        .get("get_ext", |_, state| {
924            async move { state.module_state.load_latest().map_err(Error::internal) }.boxed()
925        })
926        .unwrap()
927        .post("post_ext", |req, state| {
928            async move {
929                state
930                    .module_state
931                    .store_resource(&req.integer_param("val").map_err(Error::internal)?)
932                    .map_err(Error::internal)?;
933                state
934                    .module_state
935                    .commit_version()
936                    .map_err(Error::internal)?;
937                state
938                    .hotshot_qs
939                    .skip_version()
940                    .await
941                    .map_err(Error::internal)?;
942                state.store.commit_version().map_err(Error::internal)
943            }
944            .boxed()
945        })
946        .unwrap();
947
948        let port = pick_unused_port().unwrap();
949        let _server = BackgroundTask::spawn(
950            "server",
951            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
952        );
953
954        let client =
955            Client::<Error, MockBase>::new(format!("http://localhost:{port}").parse().unwrap());
956        assert!(client.connect(Some(Duration::from_secs(60))).await);
957
958        client.post::<()>("mod/ext/42").send().await.unwrap();
959        assert_eq!(client.get::<u64>("mod/ext").send().await.unwrap(), 42);
960
961        // Check that we can still access the built-in modules.
962        assert_eq!(
963            client
964                .get::<u64>("status/block-height")
965                .send()
966                .await
967                .unwrap(),
968            1
969        );
970        let sync_status: SyncStatus = client.get("node/sync-status").send().await.unwrap();
971        assert_eq!(
972            sync_status,
973            SyncStatus {
974                missing_blocks: 0,
975                missing_leaves: 0,
976                missing_vid_common: 1,
977                missing_vid_shares: 1,
978                pruned_height: None
979            }
980        );
981        assert_eq!(
982            client
983                .get::<MockHeader>("availability/header/0")
984                .send()
985                .await
986                .unwrap()
987                .block_number,
988            0
989        );
990    }
991}