hotshot_query_service/
availability.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Queries for HotShot chain state.
14//!
15//! The availability API provides an objective view of the HotShot blockchain. It provides access
16//! only to normative data: that is, data which is agreed upon by all honest consensus nodes and
17//! which is immutable. This means access to core consensus data structures including leaves,
18//! blocks, and headers, where each query is pure and idempotent. This also means that it is
19//! possible for a client to verify all of the information provided by this API, by running a
20//! HotShot light client and downloading the appropriate evidence with each query.
21//!
22//! This API does not provide any queries which represent only the _current_ state of the chain or
23//! may change over time, and it does not provide information for which there is not (yet) agreement
24//! of a supermajority of consensus nodes. For information about the current dynamic state of
25//! consensus and uncommitted state, try the [status](crate::status) API. For information about the
26//! chain which is tabulated by this specific node and not subject to full consensus agreement, try
27//! the [node](crate::node) API.
28
29use std::{fmt::Display, path::PathBuf, time::Duration};
30
31use derive_more::From;
32use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
33use hotshot_types::{
34    data::{Leaf, Leaf2, QuorumProposal, VidCommitment},
35    simple_certificate::QuorumCertificate,
36    traits::node_implementation::NodeType,
37};
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, Snafu};
40use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, StatusCode};
41use vbs::version::StaticVersionType;
42
43use crate::{api::load_api, Header, Payload, QueryError, VidCommon};
44
45pub(crate) mod data_source;
46mod fetch;
47pub(crate) mod query_data;
48pub use data_source::*;
49pub use fetch::Fetch;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54    pub api_path: Option<PathBuf>,
55
56    /// Timeout for failing requests due to missing data.
57    ///
58    /// If data needed to respond to a request is missing, it can (in some cases) be fetched from an
59    /// external provider. This parameter controls how long the request handler will wait for
60    /// missing data to be fetched before giving up and failing the request.
61    pub fetch_timeout: Duration,
62
63    /// Additional API specification files to merge with `availability-api-path`.
64    ///
65    /// These optional files may contain route definitions for application-specific routes that have
66    /// been added as extensions to the basic availability API.
67    pub extensions: Vec<toml::Value>,
68
69    /// The maximum number of small objects which can be loaded in a single range query.
70    ///
71    /// Currently small objects include leaves only. In the future this limit will also apply to
72    /// headers, block summaries, and VID common, however
73    /// * loading of headers and block summaries is currently implemented by loading the entire
74    ///   block
75    /// * imperfect VID parameter tuning means that VID common can be much larger than it should
76    pub small_object_range_limit: usize,
77
78    /// The maximum number of large objects which can be loaded in a single range query.
79    ///
80    /// Large objects include anything that _might_ contain a full payload or an object proportional
81    /// in size to a payload. Note that this limit applies to the entire class of objects: we do not
82    /// check the size of objects while loading to determine which limit to apply. If an object
83    /// belongs to a class which might contain a large payload, the large object limit always
84    /// applies.
85    pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89    fn default() -> Self {
90        Self {
91            api_path: None,
92            fetch_timeout: Duration::from_millis(500),
93            extensions: vec![],
94            large_object_range_limit: 100,
95            small_object_range_limit: 500,
96        }
97    }
98}
99
100#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
101#[snafu(visibility(pub))]
102pub enum Error {
103    Request {
104        source: RequestError,
105    },
106    #[snafu(display("leaf {resource} missing or not available"))]
107    #[from(ignore)]
108    FetchLeaf {
109        resource: String,
110    },
111    #[snafu(display("block {resource} missing or not available"))]
112    #[from(ignore)]
113    FetchBlock {
114        resource: String,
115    },
116    #[snafu(display("header {resource} missing or not available"))]
117    #[from(ignore)]
118    FetchHeader {
119        resource: String,
120    },
121    #[snafu(display("transaction {resource} missing or not available"))]
122    #[from(ignore)]
123    FetchTransaction {
124        resource: String,
125    },
126    #[snafu(display("transaction index {index} out of range for block {height}"))]
127    #[from(ignore)]
128    InvalidTransactionIndex {
129        height: u64,
130        index: u64,
131    },
132    #[snafu(display("request for range {from}..{until} exceeds limit {limit}"))]
133    #[from(ignore)]
134    RangeLimit {
135        from: usize,
136        until: usize,
137        limit: usize,
138    },
139    #[snafu(display("{source}"))]
140    Query {
141        source: QueryError,
142    },
143    #[snafu(display("State cert for epoch {epoch} not found"))]
144    #[from(ignore)]
145    FetchStateCert {
146        epoch: u64,
147    },
148    #[snafu(display("error {status}: {message}"))]
149    Custom {
150        message: String,
151        status: StatusCode,
152    },
153}
154
155impl Error {
156    pub fn internal<M: Display>(message: M) -> Self {
157        Self::Custom {
158            message: message.to_string(),
159            status: StatusCode::INTERNAL_SERVER_ERROR,
160        }
161    }
162
163    pub fn status(&self) -> StatusCode {
164        match self {
165            Self::Request { .. } | Self::RangeLimit { .. } => StatusCode::BAD_REQUEST,
166            Self::FetchLeaf { .. }
167            | Self::FetchBlock { .. }
168            | Self::FetchTransaction { .. }
169            | Self::FetchHeader { .. }
170            | Self::FetchStateCert { .. } => StatusCode::NOT_FOUND,
171            Self::InvalidTransactionIndex { .. } | Self::Query { .. } => StatusCode::NOT_FOUND,
172            Self::Custom { status, .. } => *status,
173        }
174    }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(bound = "")]
179pub struct Leaf1QueryData<Types: NodeType> {
180    pub(crate) leaf: Leaf<Types>,
181    pub(crate) qc: QuorumCertificate<Types>,
182}
183
184impl<Types: NodeType> Leaf1QueryData<Types> {
185    pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
186        Self { leaf, qc }
187    }
188
189    pub fn leaf(&self) -> &Leaf<Types> {
190        &self.leaf
191    }
192
193    pub fn qc(&self) -> &QuorumCertificate<Types> {
194        &self.qc
195    }
196}
197
198fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
199    // TODO do we still need some check here?
200    // `drb_seed` no longer exists on `Leaf2`
201    // if leaf2.drb_seed != [0; 32] && leaf2.drb_result != [0; 32] {
202    //     panic!("Downgrade of Leaf2 to Leaf will lose DRB information!");
203    // }
204    let quorum_proposal = QuorumProposal {
205        block_header: leaf2.block_header().clone(),
206        view_number: leaf2.view_number(),
207        justify_qc: leaf2.justify_qc().to_qc(),
208        upgrade_certificate: leaf2.upgrade_certificate(),
209        proposal_certificate: None,
210    };
211    let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
212    if let Some(payload) = leaf2.block_payload() {
213        leaf.fill_block_payload_unchecked(payload);
214    }
215    leaf
216}
217
218fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
219    Leaf1QueryData {
220        leaf: downgrade_leaf(leaf.leaf),
221        qc: leaf.qc.to_qc(),
222    }
223}
224
225async fn get_leaf_handler<Types, State>(
226    req: tide_disco::RequestParams,
227    state: &State,
228    timeout: Duration,
229) -> Result<LeafQueryData<Types>, Error>
230where
231    State: 'static + Send + Sync + ReadState,
232    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
233    Types: NodeType,
234    Header<Types>: QueryableHeader<Types>,
235    Payload<Types>: QueryablePayload<Types>,
236{
237    let id = match req.opt_integer_param("height")? {
238        Some(height) => LeafId::Number(height),
239        None => LeafId::Hash(req.blob_param("hash")?),
240    };
241    let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
242    fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
243        resource: id.to_string(),
244    })
245}
246
247async fn get_leaf_range_handler<Types, State>(
248    req: tide_disco::RequestParams,
249    state: &State,
250    timeout: Duration,
251    small_object_range_limit: usize,
252) -> Result<Vec<LeafQueryData<Types>>, Error>
253where
254    State: 'static + Send + Sync + ReadState,
255    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
256    Types: NodeType,
257    Header<Types>: QueryableHeader<Types>,
258    Payload<Types>: QueryablePayload<Types>,
259{
260    let from = req.integer_param::<_, usize>("from")?;
261    let until = req.integer_param("until")?;
262    enforce_range_limit(from, until, small_object_range_limit)?;
263
264    let leaves = state
265        .read(|state| state.get_leaf_range(from..until).boxed())
266        .await;
267    leaves
268        .enumerate()
269        .then(|(index, fetch)| async move {
270            fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
271                resource: (index + from).to_string(),
272            })
273        })
274        .try_collect::<Vec<_>>()
275        .await
276}
277
278fn downgrade_vid_common_query_data<Types: NodeType>(
279    data: VidCommonQueryData<Types>,
280) -> Option<ADVZCommonQueryData<Types>> {
281    let VidCommonQueryData {
282        height,
283        block_hash,
284        payload_hash: VidCommitment::V0(payload_hash),
285        common: VidCommon::V0(common),
286    } = data
287    else {
288        return None;
289    };
290    Some(ADVZCommonQueryData {
291        height,
292        block_hash,
293        payload_hash,
294        common,
295    })
296}
297
298async fn get_vid_common_handler<Types, State>(
299    req: tide_disco::RequestParams,
300    state: &State,
301    timeout: Duration,
302) -> Result<VidCommonQueryData<Types>, Error>
303where
304    State: 'static + Send + Sync + ReadState,
305    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
306    Types: NodeType,
307    Header<Types>: QueryableHeader<Types>,
308    Payload<Types>: QueryablePayload<Types>,
309{
310    let id = if let Some(height) = req.opt_integer_param("height")? {
311        BlockId::Number(height)
312    } else if let Some(hash) = req.opt_blob_param("hash")? {
313        BlockId::Hash(hash)
314    } else {
315        BlockId::PayloadHash(req.blob_param("payload-hash")?)
316    };
317    let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
318    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
319        resource: id.to_string(),
320    })
321}
322
323pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
324    options: &Options,
325    _: Ver,
326    api_ver: semver::Version,
327) -> Result<Api<State, Error, Ver>, ApiError>
328where
329    State: 'static + Send + Sync + ReadState,
330    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
331    Header<Types>: QueryableHeader<Types>,
332    Payload<Types>: QueryablePayload<Types>,
333{
334    let mut api = load_api::<State, Error, Ver>(
335        options.api_path.as_ref(),
336        include_str!("../api/availability.toml"),
337        options.extensions.clone(),
338    )?;
339    let timeout = options.fetch_timeout;
340    let small_object_range_limit = options.small_object_range_limit;
341    let large_object_range_limit = options.large_object_range_limit;
342
343    api.with_version(api_ver.clone());
344
345    // `LeafQueryData` now contains `Leaf2` and `QC2``, which is a breaking change.
346    // On node startup, all leaves are migrated to `Leaf2`.
347    //
348    // To maintain compatibility with nodes running an older version
349    // (which expect `LeafQueryData` with `Leaf1` and `QC1`),
350    // we downgrade `Leaf2` to `Leaf1` and `QC2` to `QC1` if the API version is V0.
351    // Otherwise, we return the new types.
352    if api_ver.major == 0 {
353        api.at("get_leaf", move |req, state| {
354            get_leaf_handler(req, state, timeout)
355                .map(|res| res.map(downgrade_leaf_query_data))
356                .boxed()
357        })?;
358
359        api.at("get_leaf_range", move |req, state| {
360            get_leaf_range_handler(req, state, timeout, small_object_range_limit)
361                .map(|res| {
362                    res.map(|r| {
363                        r.into_iter()
364                            .map(downgrade_leaf_query_data)
365                            .collect::<Vec<Leaf1QueryData<_>>>()
366                    })
367                })
368                .boxed()
369        })?;
370
371        api.stream("stream_leaves", move |req, state| {
372            async move {
373                let height = req.integer_param("height")?;
374                state
375                    .read(|state| {
376                        async move {
377                            Ok(state
378                                .subscribe_leaves(height)
379                                .await
380                                .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
381                        }
382                        .boxed()
383                    })
384                    .await
385            }
386            .try_flatten_stream()
387            .boxed()
388        })?;
389    } else {
390        api.at("get_leaf", move |req, state| {
391            get_leaf_handler(req, state, timeout).boxed()
392        })?;
393
394        api.at("get_leaf_range", move |req, state| {
395            get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
396        })?;
397
398        api.stream("stream_leaves", move |req, state| {
399            async move {
400                let height = req.integer_param("height")?;
401                state
402                    .read(|state| {
403                        async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
404                    })
405                    .await
406            }
407            .try_flatten_stream()
408            .boxed()
409        })?;
410    }
411
412    // VIDCommon data is version gated after the VID upgrade.
413    // We keep the old struct and data in the API version V0. Starting from V1 we are returning version gated structs.
414    if api_ver.major == 0 {
415        api.at("get_vid_common", move |req, state| {
416            get_vid_common_handler(req, state, timeout)
417                .map(|r| match r {
418                    Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
419                        message: "Incompatible VID version.".to_string(),
420                        status: StatusCode::BAD_REQUEST,
421                    }),
422                    Err(e) => Err(e),
423                })
424                .boxed()
425        })?
426        .stream("stream_vid_common", move |req, state| {
427            async move {
428                let height = req.integer_param("height")?;
429                state
430                    .read(|state| {
431                        async move {
432                            Ok(state.subscribe_vid_common(height).await.map(|data| {
433                                downgrade_vid_common_query_data(data).ok_or(Error::Custom {
434                                    message: "Incompatible VID version.".to_string(),
435                                    status: StatusCode::BAD_REQUEST,
436                                })
437                            }))
438                        }
439                        .boxed()
440                    })
441                    .await
442            }
443            .try_flatten_stream()
444            .boxed()
445        })?;
446    } else {
447        api.at("get_vid_common", move |req, state| {
448            get_vid_common_handler(req, state, timeout).boxed().boxed()
449        })?
450        .stream("stream_vid_common", move |req, state| {
451            async move {
452                let height = req.integer_param("height")?;
453                state
454                    .read(|state| {
455                        async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
456                    })
457                    .await
458            }
459            .try_flatten_stream()
460            .boxed()
461        })?;
462    }
463
464    api.at("get_header", move |req, state| {
465        async move {
466            let id = if let Some(height) = req.opt_integer_param("height")? {
467                BlockId::Number(height)
468            } else if let Some(hash) = req.opt_blob_param("hash")? {
469                BlockId::Hash(hash)
470            } else {
471                BlockId::PayloadHash(req.blob_param("payload-hash")?)
472            };
473            let fetch = state.read(|state| state.get_header(id).boxed()).await;
474            fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
475                resource: id.to_string(),
476            })
477        }
478        .boxed()
479    })?
480    .at("get_header_range", move |req, state| {
481        async move {
482            let from = req.integer_param::<_, usize>("from")?;
483            let until = req.integer_param::<_, usize>("until")?;
484            enforce_range_limit(from, until, large_object_range_limit)?;
485
486            let headers = state
487                .read(|state| state.get_header_range(from..until).boxed())
488                .await;
489            headers
490                .enumerate()
491                .then(|(index, fetch)| async move {
492                    fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
493                        resource: (index + from).to_string(),
494                    })
495                })
496                .try_collect::<Vec<_>>()
497                .await
498        }
499        .boxed()
500    })?
501    .stream("stream_headers", move |req, state| {
502        async move {
503            let height = req.integer_param("height")?;
504            state
505                .read(|state| {
506                    async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
507                })
508                .await
509        }
510        .try_flatten_stream()
511        .boxed()
512    })?
513    .at("get_block", move |req, state| {
514        async move {
515            let id = if let Some(height) = req.opt_integer_param("height")? {
516                BlockId::Number(height)
517            } else if let Some(hash) = req.opt_blob_param("hash")? {
518                BlockId::Hash(hash)
519            } else {
520                BlockId::PayloadHash(req.blob_param("payload-hash")?)
521            };
522            let fetch = state.read(|state| state.get_block(id).boxed()).await;
523            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
524                resource: id.to_string(),
525            })
526        }
527        .boxed()
528    })?
529    .at("get_block_range", move |req, state| {
530        async move {
531            let from = req.integer_param::<_, usize>("from")?;
532            let until = req.integer_param("until")?;
533            enforce_range_limit(from, until, large_object_range_limit)?;
534
535            let blocks = state
536                .read(|state| state.get_block_range(from..until).boxed())
537                .await;
538            blocks
539                .enumerate()
540                .then(|(index, fetch)| async move {
541                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
542                        resource: (index + from).to_string(),
543                    })
544                })
545                .try_collect::<Vec<_>>()
546                .await
547        }
548        .boxed()
549    })?
550    .stream("stream_blocks", move |req, state| {
551        async move {
552            let height = req.integer_param("height")?;
553            state
554                .read(|state| {
555                    async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
556                })
557                .await
558        }
559        .try_flatten_stream()
560        .boxed()
561    })?
562    .at("get_payload", move |req, state| {
563        async move {
564            let id = if let Some(height) = req.opt_integer_param("height")? {
565                BlockId::Number(height)
566            } else if let Some(hash) = req.opt_blob_param("hash")? {
567                BlockId::PayloadHash(hash)
568            } else {
569                BlockId::Hash(req.blob_param("block-hash")?)
570            };
571            let fetch = state.read(|state| state.get_payload(id).boxed()).await;
572            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
573                resource: id.to_string(),
574            })
575        }
576        .boxed()
577    })?
578    .at("get_payload_range", move |req, state| {
579        async move {
580            let from = req.integer_param::<_, usize>("from")?;
581            let until = req.integer_param("until")?;
582            enforce_range_limit(from, until, large_object_range_limit)?;
583
584            let payloads = state
585                .read(|state| state.get_payload_range(from..until).boxed())
586                .await;
587            payloads
588                .enumerate()
589                .then(|(index, fetch)| async move {
590                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
591                        resource: (index + from).to_string(),
592                    })
593                })
594                .try_collect::<Vec<_>>()
595                .await
596        }
597        .boxed()
598    })?
599    .stream("stream_payloads", move |req, state| {
600        async move {
601            let height = req.integer_param("height")?;
602            state
603                .read(|state| {
604                    async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
605                })
606                .await
607        }
608        .try_flatten_stream()
609        .boxed()
610    })?
611    .at("get_transaction", move |req, state| {
612        async move {
613            match req.opt_blob_param("hash")? {
614                Some(hash) => {
615                    let fetch = state
616                        .read(|state| state.get_transaction(hash).boxed())
617                        .await;
618                    fetch
619                        .with_timeout(timeout)
620                        .await
621                        .context(FetchTransactionSnafu {
622                            resource: hash.to_string(),
623                        })
624                },
625                None => {
626                    let height: u64 = req.integer_param("height")?;
627                    let fetch = state
628                        .read(|state| state.get_block(height as usize).boxed())
629                        .await;
630                    let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
631                        resource: height.to_string(),
632                    })?;
633                    let i: u64 = req.integer_param("index")?;
634                    let index = block
635                        .payload()
636                        .nth(block.metadata(), i as usize)
637                        .context(InvalidTransactionIndexSnafu { height, index: i })?;
638                    TransactionQueryData::new(&block, index, i)
639                        .context(InvalidTransactionIndexSnafu { height, index: i })
640                },
641            }
642        }
643        .boxed()
644    })?
645    .stream("stream_transactions", move |req, state| {
646        async move {
647            let height = req.integer_param::<_, usize>("height")?;
648
649            let namespace: Option<i64> = req
650                .opt_integer_param::<_, usize>("namespace")?
651                .map(|i| {
652                    i.try_into().map_err(|err| Error::Custom {
653                        message: format!(
654                            "Invalid 'namespace': could not convert usize to i64: {err}"
655                        ),
656                        status: StatusCode::BAD_REQUEST,
657                    })
658                })
659                .transpose()?;
660
661            state
662                .read(|state| {
663                    async move {
664                        Ok(state
665                            .subscribe_blocks(height)
666                            .await
667                            .map(move |block| {
668                                let transactions = block.enumerate().enumerate();
669                                let header = block.header();
670                                let filtered_txs = transactions
671                                    .filter_map(|(i, (index, _tx))| {
672                                        if let Some(requested_ns) = namespace {
673                                            let ns_id = QueryableHeader::<Types>::namespace_id(
674                                                header,
675                                                &index.ns_index,
676                                            )?;
677
678                                            if ns_id.into() != requested_ns {
679                                                return None;
680                                            }
681                                        }
682
683                                        TransactionQueryData::new(&block, index, i as u64)
684                                    })
685                                    .collect::<Vec<_>>();
686
687                                futures::stream::iter(filtered_txs.into_iter().map(Ok))
688                            })
689                            .flatten())
690                    }
691                    .boxed()
692                })
693                .await
694        }
695        .try_flatten_stream()
696        .boxed()
697    })?
698    .at("get_block_summary", move |req, state| {
699        async move {
700            let id: usize = req.integer_param("height")?;
701
702            let fetch = state.read(|state| state.get_block(id).boxed()).await;
703            fetch
704                .with_timeout(timeout)
705                .await
706                .context(FetchBlockSnafu {
707                    resource: id.to_string(),
708                })
709                .map(BlockSummaryQueryData::from)
710        }
711        .boxed()
712    })?
713    .at("get_block_summary_range", move |req, state| {
714        async move {
715            let from: usize = req.integer_param("from")?;
716            let until: usize = req.integer_param("until")?;
717            enforce_range_limit(from, until, large_object_range_limit)?;
718
719            let blocks = state
720                .read(|state| state.get_block_range(from..until).boxed())
721                .await;
722            let result: Vec<BlockSummaryQueryData<Types>> = blocks
723                .enumerate()
724                .then(|(index, fetch)| async move {
725                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
726                        resource: (index + from).to_string(),
727                    })
728                })
729                .map(|result| result.map(BlockSummaryQueryData::from))
730                .try_collect()
731                .await?;
732
733            Ok(result)
734        }
735        .boxed()
736    })?
737    .at("get_limits", move |_req, _state| {
738        async move {
739            Ok(Limits {
740                small_object_range_limit,
741                large_object_range_limit,
742            })
743        }
744        .boxed()
745    })?
746    .at("get_state_cert", move |req, state| {
747        async move {
748            let epoch = req.integer_param("epoch")?;
749            let fetch = state
750                .read(|state| state.get_state_cert(epoch).boxed())
751                .await;
752            fetch
753                .with_timeout(timeout)
754                .await
755                .context(FetchStateCertSnafu { epoch })
756        }
757        .boxed()
758    })?;
759    Ok(api)
760}
761
762fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
763    if until.saturating_sub(from) > limit {
764        return Err(Error::RangeLimit { from, until, limit });
765    }
766    Ok(())
767}
768
769#[cfg(test)]
770mod test {
771    use std::{fmt::Debug, time::Duration};
772
773    use async_lock::RwLock;
774    use committable::Committable;
775    use futures::future::FutureExt;
776    use hotshot_example_types::node_types::EpochsTestVersions;
777    use hotshot_types::{
778        data::Leaf2, simple_certificate::QuorumCertificate2,
779        traits::node_implementation::ConsensusTime,
780    };
781    use portpicker::pick_unused_port;
782    use serde::de::DeserializeOwned;
783    use surf_disco::{Client, Error as _};
784    use tempfile::TempDir;
785    use tide_disco::App;
786    use toml::toml;
787
788    use super::*;
789    use crate::{
790        data_source::{storage::AvailabilityStorage, ExtensibleDataSource, VersionedDataSource},
791        status::StatusDataSource,
792        task::BackgroundTask,
793        testing::{
794            consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
795            mocks::{mock_transaction, MockBase, MockHeader, MockPayload, MockTypes, MockVersions},
796            setup_test,
797        },
798        types::HeightIndexed,
799        ApiState, Error, Header,
800    };
801
802    /// Get the current ledger height and a list of non-empty leaf/block pairs.
803    async fn get_non_empty_blocks(
804        client: &Client<Error, MockBase>,
805    ) -> (
806        u64,
807        Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
808    ) {
809        let mut blocks = vec![];
810        // Ignore the genesis block (start from height 1).
811        for i in 1.. {
812            match client
813                .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
814                .send()
815                .await
816            {
817                Ok(block) => {
818                    if !block.is_empty() {
819                        let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
820                        blocks.push((leaf, block));
821                    }
822                },
823                Err(Error::Availability {
824                    source: super::Error::FetchBlock { .. },
825                }) => {
826                    tracing::info!(
827                        "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
828                    );
829                    return (i, blocks);
830                },
831                Err(err) => panic!("unexpected error {err}"),
832            }
833        }
834        unreachable!()
835    }
836
837    async fn validate(client: &Client<Error, MockBase>, height: u64) {
838        // Check the consistency of every block/leaf pair.
839        for i in 0..height {
840            // Limit the number of blocks we validate in order to
841            // speeed up the tests.
842            if ![0, 1, height / 2, height - 1].contains(&i) {
843                continue;
844            }
845            tracing::info!("validate block {i}/{height}");
846
847            // Check that looking up the leaf various ways returns the correct leaf.
848            let leaf: LeafQueryData<MockTypes> =
849                client.get(&format!("leaf/{i}")).send().await.unwrap();
850            assert_eq!(leaf.height(), i);
851            assert_eq!(
852                leaf,
853                client
854                    .get(&format!("leaf/hash/{}", leaf.hash()))
855                    .send()
856                    .await
857                    .unwrap()
858            );
859
860            // Check that looking up the block various ways returns the correct block.
861            let block: BlockQueryData<MockTypes> =
862                client.get(&format!("block/{i}")).send().await.unwrap();
863            let expected_payload = PayloadQueryData::from(block.clone());
864            assert_eq!(leaf.block_hash(), block.hash());
865            assert_eq!(block.height(), i);
866            assert_eq!(
867                block,
868                client
869                    .get(&format!("block/hash/{}", block.hash()))
870                    .send()
871                    .await
872                    .unwrap()
873            );
874            assert_eq!(
875                *block.header(),
876                client.get(&format!("header/{i}")).send().await.unwrap()
877            );
878            assert_eq!(
879                *block.header(),
880                client
881                    .get(&format!("header/hash/{}", block.hash()))
882                    .send()
883                    .await
884                    .unwrap()
885            );
886            assert_eq!(
887                expected_payload,
888                client.get(&format!("payload/{i}")).send().await.unwrap(),
889            );
890            assert_eq!(
891                expected_payload,
892                client
893                    .get(&format!("payload/block-hash/{}", block.hash()))
894                    .send()
895                    .await
896                    .unwrap(),
897            );
898            // Look up the common VID data.
899            let common: VidCommonQueryData<MockTypes> = client
900                .get(&format!("vid/common/{}", block.height()))
901                .send()
902                .await
903                .unwrap();
904            assert_eq!(common.height(), block.height());
905            assert_eq!(common.block_hash(), block.hash());
906            assert_eq!(common.payload_hash(), block.payload_hash());
907            assert_eq!(
908                common,
909                client
910                    .get(&format!("vid/common/hash/{}", block.hash()))
911                    .send()
912                    .await
913                    .unwrap()
914            );
915
916            let block_summary = client
917                .get(&format!("block/summary/{i}"))
918                .send()
919                .await
920                .unwrap();
921            assert_eq!(
922                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
923                block_summary,
924            );
925            assert_eq!(block_summary.header(), block.header());
926            assert_eq!(block_summary.hash(), block.hash());
927            assert_eq!(block_summary.size(), block.size());
928            assert_eq!(block_summary.num_transactions(), block.num_transactions());
929
930            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
931                .get(&format!("block/summaries/{}/{}", 0, i))
932                .send()
933                .await
934                .unwrap();
935            assert_eq!(block_summaries.len() as u64, i);
936
937            // We should be able to look up the block by payload hash. Note that for duplicate
938            // payloads, these endpoints may return a different block with the same payload, which
939            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
940            // response, only its payload.
941            assert_eq!(
942                block.payload(),
943                client
944                    .get::<BlockQueryData<MockTypes>>(&format!(
945                        "block/payload-hash/{}",
946                        block.payload_hash()
947                    ))
948                    .send()
949                    .await
950                    .unwrap()
951                    .payload()
952            );
953            assert_eq!(
954                block.payload_hash(),
955                client
956                    .get::<Header<MockTypes>>(&format!(
957                        "header/payload-hash/{}",
958                        block.payload_hash()
959                    ))
960                    .send()
961                    .await
962                    .unwrap()
963                    .payload_commitment
964            );
965            assert_eq!(
966                block.payload(),
967                client
968                    .get::<PayloadQueryData<MockTypes>>(&format!(
969                        "payload/hash/{}",
970                        block.payload_hash()
971                    ))
972                    .send()
973                    .await
974                    .unwrap()
975                    .data(),
976            );
977            assert_eq!(
978                common.common(),
979                client
980                    .get::<VidCommonQueryData<MockTypes>>(&format!(
981                        "vid/common/payload-hash/{}",
982                        block.payload_hash()
983                    ))
984                    .send()
985                    .await
986                    .unwrap()
987                    .common()
988            );
989
990            // Check that looking up each transaction in the block various ways returns the correct
991            // transaction.
992            for (j, txn_from_block) in block.enumerate() {
993                let txn: TransactionQueryData<MockTypes> = client
994                    .get(&format!("transaction/{}/{}", i, j.position))
995                    .send()
996                    .await
997                    .unwrap();
998                assert_eq!(txn.block_height(), i);
999                assert_eq!(txn.block_hash(), block.hash());
1000                assert_eq!(txn.index(), j.position as u64);
1001                assert_eq!(txn.hash(), txn_from_block.commit());
1002                assert_eq!(txn.transaction(), &txn_from_block);
1003                // We should be able to look up the transaction by hash. Note that for duplicate
1004                // transactions, this endpoint may return a different transaction with the same
1005                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1006                // `TransactionQueryData` response, only its commitment.
1007                assert_eq!(
1008                    txn.hash(),
1009                    client
1010                        .get::<TransactionQueryData<MockTypes>>(&format!(
1011                            "transaction/hash/{}",
1012                            txn.hash()
1013                        ))
1014                        .send()
1015                        .await
1016                        .unwrap()
1017                        .hash()
1018                );
1019            }
1020
1021            let block_range: Vec<BlockQueryData<MockTypes>> = client
1022                .get(&format!("block/{}/{}", 0, i))
1023                .send()
1024                .await
1025                .unwrap();
1026
1027            assert_eq!(block_range.len() as u64, i);
1028
1029            let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1030                .get(&format!("leaf/{}/{}", 0, i))
1031                .send()
1032                .await
1033                .unwrap();
1034
1035            assert_eq!(leaf_range.len() as u64, i);
1036
1037            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1038                .get(&format!("payload/{}/{}", 0, i))
1039                .send()
1040                .await
1041                .unwrap();
1042
1043            assert_eq!(payload_range.len() as u64, i);
1044
1045            let header_range: Vec<Header<MockTypes>> = client
1046                .get(&format!("header/{}/{}", 0, i))
1047                .send()
1048                .await
1049                .unwrap();
1050
1051            assert_eq!(header_range.len() as u64, i);
1052        }
1053    }
1054
1055    #[tokio::test(flavor = "multi_thread")]
1056    async fn test_api() {
1057        setup_test();
1058
1059        // Create the consensus network.
1060        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1061        network.start().await;
1062
1063        // Start the web server.
1064        let port = pick_unused_port().unwrap();
1065        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1066        let options = Options {
1067            small_object_range_limit: 500,
1068            large_object_range_limit: 500,
1069            ..Default::default()
1070        };
1071
1072        app.register_module(
1073            "availability",
1074            define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1075        )
1076        .unwrap();
1077        network.spawn(
1078            "server",
1079            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1080        );
1081
1082        // Start a client.
1083        let client = Client::<Error, MockBase>::new(
1084            format!("http://localhost:{port}/availability")
1085                .parse()
1086                .unwrap(),
1087        );
1088        assert!(client.connect(Some(Duration::from_secs(60))).await);
1089        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1090
1091        // Submit a few blocks and make sure each one gets reflected in the query service and
1092        // preserves the consistency of the data and indices.
1093        let leaves = client
1094            .socket("stream/leaves/0")
1095            .subscribe::<LeafQueryData<MockTypes>>()
1096            .await
1097            .unwrap();
1098        let headers = client
1099            .socket("stream/headers/0")
1100            .subscribe::<Header<MockTypes>>()
1101            .await
1102            .unwrap();
1103        let blocks = client
1104            .socket("stream/blocks/0")
1105            .subscribe::<BlockQueryData<MockTypes>>()
1106            .await
1107            .unwrap();
1108        let vid_common = client
1109            .socket("stream/vid/common/0")
1110            .subscribe::<VidCommonQueryData<MockTypes>>()
1111            .await
1112            .unwrap();
1113        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1114        for nonce in 0..3 {
1115            let txn = mock_transaction(vec![nonce]);
1116            network.submit_transaction(txn).await;
1117
1118            // Wait for the transaction to be finalized.
1119            let (i, leaf, block, common) = loop {
1120                tracing::info!("waiting for block with transaction {}", nonce);
1121                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1122                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1123                let leaf = leaf.unwrap();
1124                let header = header.unwrap();
1125                let block = block.unwrap();
1126                let common = common.unwrap();
1127                assert_eq!(leaf.height() as usize, i);
1128                assert_eq!(leaf.block_hash(), block.hash());
1129                assert_eq!(block.header(), &header);
1130                assert_eq!(common.height() as usize, i);
1131                if !block.is_empty() {
1132                    break (i, leaf, block, common);
1133                }
1134            };
1135            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1136            assert_eq!(
1137                block,
1138                client.get(&format!("block/{i}")).send().await.unwrap()
1139            );
1140            assert_eq!(
1141                common,
1142                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1143            );
1144
1145            validate(&client, (i + 1) as u64).await;
1146        }
1147
1148        network.shut_down().await;
1149    }
1150
1151    async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1152        // Check the consistency of every block/leaf pair.
1153        for i in 0..height {
1154            // Limit the number of blocks we validate in order to
1155            // speeed up the tests.
1156            if ![0, 1, height / 2, height - 1].contains(&i) {
1157                continue;
1158            }
1159            tracing::info!("validate block {i}/{height}");
1160
1161            // Check that looking up the leaf various ways returns the correct leaf.
1162            let leaf: Leaf1QueryData<MockTypes> =
1163                client.get(&format!("leaf/{i}")).send().await.unwrap();
1164            assert_eq!(leaf.leaf.height(), i);
1165            assert_eq!(
1166                leaf,
1167                client
1168                    .get(&format!(
1169                        "leaf/hash/{}",
1170                        <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1171                    ))
1172                    .send()
1173                    .await
1174                    .unwrap()
1175            );
1176
1177            // Check that looking up the block various ways returns the correct block.
1178            let block: BlockQueryData<MockTypes> =
1179                client.get(&format!("block/{i}")).send().await.unwrap();
1180            let expected_payload = PayloadQueryData::from(block.clone());
1181            assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1182            assert_eq!(block.height(), i);
1183            assert_eq!(
1184                block,
1185                client
1186                    .get(&format!("block/hash/{}", block.hash()))
1187                    .send()
1188                    .await
1189                    .unwrap()
1190            );
1191            assert_eq!(
1192                *block.header(),
1193                client.get(&format!("header/{i}")).send().await.unwrap()
1194            );
1195            assert_eq!(
1196                *block.header(),
1197                client
1198                    .get(&format!("header/hash/{}", block.hash()))
1199                    .send()
1200                    .await
1201                    .unwrap()
1202            );
1203            assert_eq!(
1204                expected_payload,
1205                client.get(&format!("payload/{i}")).send().await.unwrap(),
1206            );
1207            assert_eq!(
1208                expected_payload,
1209                client
1210                    .get(&format!("payload/block-hash/{}", block.hash()))
1211                    .send()
1212                    .await
1213                    .unwrap(),
1214            );
1215            // Look up the common VID data.
1216            let common: ADVZCommonQueryData<MockTypes> = client
1217                .get(&format!("vid/common/{}", block.height()))
1218                .send()
1219                .await
1220                .unwrap();
1221            assert_eq!(common.height(), block.height());
1222            assert_eq!(common.block_hash(), block.hash());
1223            assert_eq!(
1224                VidCommitment::V0(common.payload_hash()),
1225                block.payload_hash(),
1226            );
1227            assert_eq!(
1228                common,
1229                client
1230                    .get(&format!("vid/common/hash/{}", block.hash()))
1231                    .send()
1232                    .await
1233                    .unwrap()
1234            );
1235
1236            let block_summary = client
1237                .get(&format!("block/summary/{i}"))
1238                .send()
1239                .await
1240                .unwrap();
1241            assert_eq!(
1242                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1243                block_summary,
1244            );
1245            assert_eq!(block_summary.header(), block.header());
1246            assert_eq!(block_summary.hash(), block.hash());
1247            assert_eq!(block_summary.size(), block.size());
1248            assert_eq!(block_summary.num_transactions(), block.num_transactions());
1249
1250            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1251                .get(&format!("block/summaries/{}/{}", 0, i))
1252                .send()
1253                .await
1254                .unwrap();
1255            assert_eq!(block_summaries.len() as u64, i);
1256
1257            // We should be able to look up the block by payload hash. Note that for duplicate
1258            // payloads, these endpoints may return a different block with the same payload, which
1259            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
1260            // response, only its payload.
1261            assert_eq!(
1262                block.payload(),
1263                client
1264                    .get::<BlockQueryData<MockTypes>>(&format!(
1265                        "block/payload-hash/{}",
1266                        block.payload_hash()
1267                    ))
1268                    .send()
1269                    .await
1270                    .unwrap()
1271                    .payload()
1272            );
1273            assert_eq!(
1274                block.payload_hash(),
1275                client
1276                    .get::<Header<MockTypes>>(&format!(
1277                        "header/payload-hash/{}",
1278                        block.payload_hash()
1279                    ))
1280                    .send()
1281                    .await
1282                    .unwrap()
1283                    .payload_commitment
1284            );
1285            assert_eq!(
1286                block.payload(),
1287                client
1288                    .get::<PayloadQueryData<MockTypes>>(&format!(
1289                        "payload/hash/{}",
1290                        block.payload_hash()
1291                    ))
1292                    .send()
1293                    .await
1294                    .unwrap()
1295                    .data(),
1296            );
1297            assert_eq!(
1298                common.common(),
1299                client
1300                    .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1301                        "vid/common/payload-hash/{}",
1302                        block.payload_hash()
1303                    ))
1304                    .send()
1305                    .await
1306                    .unwrap()
1307                    .common()
1308            );
1309
1310            // Check that looking up each transaction in the block various ways returns the correct
1311            // transaction.
1312            for (j, txn_from_block) in block.enumerate() {
1313                let txn: TransactionQueryData<MockTypes> = client
1314                    .get(&format!("transaction/{}/{}", i, j.position))
1315                    .send()
1316                    .await
1317                    .unwrap();
1318                assert_eq!(txn.block_height(), i);
1319                assert_eq!(txn.block_hash(), block.hash());
1320                assert_eq!(txn.index(), j.position as u64);
1321                assert_eq!(txn.hash(), txn_from_block.commit());
1322                assert_eq!(txn.transaction(), &txn_from_block);
1323                // We should be able to look up the transaction by hash. Note that for duplicate
1324                // transactions, this endpoint may return a different transaction with the same
1325                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1326                // `TransactionQueryData` response, only its commitment.
1327                assert_eq!(
1328                    txn.hash(),
1329                    client
1330                        .get::<TransactionQueryData<MockTypes>>(&format!(
1331                            "transaction/hash/{}",
1332                            txn.hash()
1333                        ))
1334                        .send()
1335                        .await
1336                        .unwrap()
1337                        .hash()
1338                );
1339            }
1340
1341            let block_range: Vec<BlockQueryData<MockTypes>> = client
1342                .get(&format!("block/{}/{}", 0, i))
1343                .send()
1344                .await
1345                .unwrap();
1346
1347            assert_eq!(block_range.len() as u64, i);
1348
1349            let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1350                .get(&format!("leaf/{}/{}", 0, i))
1351                .send()
1352                .await
1353                .unwrap();
1354
1355            assert_eq!(leaf_range.len() as u64, i);
1356
1357            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1358                .get(&format!("payload/{}/{}", 0, i))
1359                .send()
1360                .await
1361                .unwrap();
1362
1363            assert_eq!(payload_range.len() as u64, i);
1364
1365            let header_range: Vec<Header<MockTypes>> = client
1366                .get(&format!("header/{}/{}", 0, i))
1367                .send()
1368                .await
1369                .unwrap();
1370
1371            assert_eq!(header_range.len() as u64, i);
1372        }
1373    }
1374
1375    #[tokio::test(flavor = "multi_thread")]
1376    async fn test_api_epochs() {
1377        setup_test();
1378
1379        // Create the consensus network.
1380        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
1381        let epoch_height = network.epoch_height();
1382        network.start().await;
1383
1384        // Start the web server.
1385        let port = pick_unused_port().unwrap();
1386        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1387        app.register_module(
1388            "availability",
1389            define_api(
1390                &Default::default(),
1391                MockBase::instance(),
1392                "1.0.0".parse().unwrap(),
1393            )
1394            .unwrap(),
1395        )
1396        .unwrap();
1397        network.spawn(
1398            "server",
1399            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1400        );
1401
1402        // Start a client.
1403        let client = Client::<Error, MockBase>::new(
1404            format!("http://localhost:{port}/availability")
1405                .parse()
1406                .unwrap(),
1407        );
1408        assert!(client.connect(Some(Duration::from_secs(60))).await);
1409
1410        // Submit a few blocks and make sure each one gets reflected in the query service and
1411        // preserves the consistency of the data and indices.
1412        let headers = client
1413            .socket("stream/headers/0")
1414            .subscribe::<Header<MockTypes>>()
1415            .await
1416            .unwrap();
1417        let mut chain = headers.enumerate();
1418
1419        loop {
1420            let (i, header) = chain.next().await.unwrap();
1421            let header = header.unwrap();
1422            assert_eq!(header.height(), i as u64);
1423            if header.height() >= 3 * epoch_height {
1424                break;
1425            }
1426        }
1427
1428        for epoch in 1..4 {
1429            let state_cert: StateCertQueryData<MockTypes> = client
1430                .get(&format!("state-cert/{epoch}"))
1431                .send()
1432                .await
1433                .unwrap();
1434            tracing::info!("state-cert: {state_cert:?}");
1435            assert_eq!(state_cert.0.epoch.u64(), epoch);
1436        }
1437
1438        network.shut_down().await;
1439    }
1440
1441    #[tokio::test(flavor = "multi_thread")]
1442    async fn test_old_api() {
1443        setup_test();
1444
1445        // Create the consensus network.
1446        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1447        network.start().await;
1448
1449        // Start the web server.
1450        let port = pick_unused_port().unwrap();
1451
1452        let options = Options {
1453            small_object_range_limit: 500,
1454            large_object_range_limit: 500,
1455            ..Default::default()
1456        };
1457
1458        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1459        app.register_module(
1460            "availability",
1461            define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1462        )
1463        .unwrap();
1464        network.spawn(
1465            "server",
1466            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1467        );
1468
1469        // Start a client.
1470        let client = Client::<Error, MockBase>::new(
1471            format!("http://localhost:{port}/availability")
1472                .parse()
1473                .unwrap(),
1474        );
1475        assert!(client.connect(Some(Duration::from_secs(60))).await);
1476        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1477
1478        // Submit a few blocks and make sure each one gets reflected in the query service and
1479        // preserves the consistency of the data and indices.
1480        let leaves = client
1481            .socket("stream/leaves/0")
1482            .subscribe::<Leaf1QueryData<MockTypes>>()
1483            .await
1484            .unwrap();
1485        let headers = client
1486            .socket("stream/headers/0")
1487            .subscribe::<Header<MockTypes>>()
1488            .await
1489            .unwrap();
1490        let blocks = client
1491            .socket("stream/blocks/0")
1492            .subscribe::<BlockQueryData<MockTypes>>()
1493            .await
1494            .unwrap();
1495        let vid_common = client
1496            .socket("stream/vid/common/0")
1497            .subscribe::<ADVZCommonQueryData<MockTypes>>()
1498            .await
1499            .unwrap();
1500        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1501        for nonce in 0..3 {
1502            let txn = mock_transaction(vec![nonce]);
1503            network.submit_transaction(txn).await;
1504
1505            // Wait for the transaction to be finalized.
1506            let (i, leaf, block, common) = loop {
1507                tracing::info!("waiting for block with transaction {}", nonce);
1508                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1509                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1510                let leaf = leaf.unwrap();
1511                let header = header.unwrap();
1512                let block = block.unwrap();
1513                let common = common.unwrap();
1514                assert_eq!(leaf.leaf.height() as usize, i);
1515                assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1516                assert_eq!(block.header(), &header);
1517                assert_eq!(common.height() as usize, i);
1518                if !block.is_empty() {
1519                    break (i, leaf, block, common);
1520                }
1521            };
1522            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1523            assert_eq!(
1524                block,
1525                client.get(&format!("block/{i}")).send().await.unwrap()
1526            );
1527            assert_eq!(
1528                common,
1529                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1530            );
1531
1532            validate_old(&client, (i + 1) as u64).await;
1533        }
1534
1535        network.shut_down().await;
1536    }
1537
1538    #[tokio::test(flavor = "multi_thread")]
1539    async fn test_extensions() {
1540        use hotshot_example_types::node_types::TestVersions;
1541
1542        setup_test();
1543
1544        let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1545        let data_source = ExtensibleDataSource::new(
1546            MockDataSource::create(dir.path(), Default::default())
1547                .await
1548                .unwrap(),
1549            0,
1550        );
1551
1552        // mock up some consensus data.
1553        let leaf =
1554            Leaf2::<MockTypes>::genesis::<MockVersions>(&Default::default(), &Default::default())
1555                .await;
1556        let qc =
1557            QuorumCertificate2::genesis::<TestVersions>(&Default::default(), &Default::default())
1558                .await;
1559        let leaf = LeafQueryData::new(leaf, qc).unwrap();
1560        let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1561        data_source
1562            .append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1563            .await
1564            .unwrap();
1565
1566        // assert that the store has data before we move on to API requests
1567        assert_eq!(
1568            ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1569                .await
1570                .unwrap(),
1571            1
1572        );
1573        assert_eq!(block, data_source.get_block(0).await.await);
1574
1575        // Create the API extensions specification.
1576        let extensions = toml! {
1577            [route.post_ext]
1578            PATH = ["/ext/:val"]
1579            METHOD = "POST"
1580            ":val" = "Integer"
1581
1582            [route.get_ext]
1583            PATH = ["/ext"]
1584            METHOD = "GET"
1585        };
1586
1587        let mut api =
1588            define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1589                &Options {
1590                    extensions: vec![extensions.into()],
1591                    ..Default::default()
1592                },
1593                MockBase::instance(),
1594                "1.0.0".parse().unwrap(),
1595            )
1596            .unwrap();
1597        api.get("get_ext", |_, state| {
1598            async move { Ok(*state.as_ref()) }.boxed()
1599        })
1600        .unwrap()
1601        .post("post_ext", |req, state| {
1602            async move {
1603                *state.as_mut() = req.integer_param("val")?;
1604                Ok(())
1605            }
1606            .boxed()
1607        })
1608        .unwrap();
1609
1610        let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1611        app.register_module("availability", api).unwrap();
1612
1613        let port = pick_unused_port().unwrap();
1614        let _server = BackgroundTask::spawn(
1615            "server",
1616            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1617        );
1618
1619        let client = Client::<Error, MockBase>::new(
1620            format!("http://localhost:{port}/availability")
1621                .parse()
1622                .unwrap(),
1623        );
1624        assert!(client.connect(Some(Duration::from_secs(60))).await);
1625
1626        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1627        client.post::<()>("ext/42").send().await.unwrap();
1628        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1629
1630        // Ensure we can still access the built-in functionality.
1631        assert_eq!(
1632            client
1633                .get::<MockHeader>("header/0")
1634                .send()
1635                .await
1636                .unwrap()
1637                .block_number,
1638            0
1639        );
1640    }
1641
1642    #[tokio::test(flavor = "multi_thread")]
1643    async fn test_range_limit() {
1644        setup_test();
1645
1646        let large_object_range_limit = 2;
1647        let small_object_range_limit = 3;
1648
1649        // Create the consensus network.
1650        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1651        network.start().await;
1652
1653        // Start the web server.
1654        let port = pick_unused_port().unwrap();
1655        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1656        app.register_module(
1657            "availability",
1658            define_api(
1659                &Options {
1660                    large_object_range_limit,
1661                    small_object_range_limit,
1662                    ..Default::default()
1663                },
1664                MockBase::instance(),
1665                "1.0.0".parse().unwrap(),
1666            )
1667            .unwrap(),
1668        )
1669        .unwrap();
1670        network.spawn(
1671            "server",
1672            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1673        );
1674
1675        // Start a client.
1676        let client = Client::<Error, MockBase>::new(
1677            format!("http://localhost:{port}/availability")
1678                .parse()
1679                .unwrap(),
1680        );
1681        assert!(client.connect(Some(Duration::from_secs(60))).await);
1682
1683        // Check reported limits.
1684        assert_eq!(
1685            client.get::<Limits>("limits").send().await.unwrap(),
1686            Limits {
1687                small_object_range_limit,
1688                large_object_range_limit
1689            }
1690        );
1691
1692        // Wait for enough blocks to be produced.
1693        client
1694            .socket("stream/blocks/0")
1695            .subscribe::<BlockQueryData<MockTypes>>()
1696            .await
1697            .unwrap()
1698            .take(small_object_range_limit + 1)
1699            .try_collect::<Vec<_>>()
1700            .await
1701            .unwrap();
1702
1703        async fn check_limit<T: DeserializeOwned + Debug>(
1704            client: &Client<Error, MockBase>,
1705            req: &str,
1706            limit: usize,
1707        ) {
1708            let range: Vec<T> = client
1709                .get(&format!("{req}/0/{limit}"))
1710                .send()
1711                .await
1712                .unwrap();
1713            assert_eq!(range.len(), limit);
1714            let err = client
1715                .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1716                .send()
1717                .await
1718                .unwrap_err();
1719            assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1720        }
1721
1722        check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1723        check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1724        check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1725        check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1726            .await;
1727        check_limit::<BlockSummaryQueryData<MockTypes>>(
1728            &client,
1729            "block/summaries",
1730            large_object_range_limit,
1731        )
1732        .await;
1733
1734        network.shut_down().await;
1735    }
1736
1737    #[tokio::test(flavor = "multi_thread")]
1738    async fn test_header_endpoint() {
1739        setup_test();
1740
1741        // Create the consensus network.
1742        let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init().await;
1743        network.start().await;
1744
1745        // Start the web server.
1746        let port = pick_unused_port().unwrap();
1747        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1748        app.register_module(
1749            "availability",
1750            define_api(
1751                &Default::default(),
1752                MockBase::instance(),
1753                "1.0.0".parse().unwrap(),
1754            )
1755            .unwrap(),
1756        )
1757        .unwrap();
1758        network.spawn(
1759            "server",
1760            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1761        );
1762
1763        let ds = network.data_source();
1764
1765        // Get the current block height and fetch header for some later block height
1766        // This fetch will only resolve when we receive a leaf or block for that block height
1767        let block_height = ds.block_height().await.unwrap();
1768        let fetch = ds
1769            .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1770            .await;
1771
1772        assert!(fetch.is_pending());
1773        let header = fetch.await;
1774        assert_eq!(header.height() as usize, block_height + 25);
1775
1776        network.shut_down().await;
1777    }
1778
1779    #[tokio::test(flavor = "multi_thread")]
1780    async fn test_leaf_only_ds() {
1781        setup_test();
1782
1783        // Create the consensus network.
1784        let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init_with_leaf_ds().await;
1785        network.start().await;
1786
1787        // Start the web server.
1788        let port = pick_unused_port().unwrap();
1789        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1790        app.register_module(
1791            "availability",
1792            define_api(
1793                &Default::default(),
1794                MockBase::instance(),
1795                "1.0.0".parse().unwrap(),
1796            )
1797            .unwrap(),
1798        )
1799        .unwrap();
1800        network.spawn(
1801            "server",
1802            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1803        );
1804
1805        // Start a client.
1806        let client = Client::<Error, MockBase>::new(
1807            format!("http://localhost:{port}/availability")
1808                .parse()
1809                .unwrap(),
1810        );
1811        assert!(client.connect(Some(Duration::from_secs(60))).await);
1812
1813        // Wait for some headers to be produced.
1814        client
1815            .socket("stream/headers/0")
1816            .subscribe::<Header<MockTypes>>()
1817            .await
1818            .unwrap()
1819            .take(5)
1820            .try_collect::<Vec<_>>()
1821            .await
1822            .unwrap();
1823
1824        // Wait for some leaves to be produced.
1825        client
1826            .socket("stream/leaves/5")
1827            .subscribe::<LeafQueryData<MockTypes>>()
1828            .await
1829            .unwrap()
1830            .take(5)
1831            .try_collect::<Vec<_>>()
1832            .await
1833            .unwrap();
1834
1835        let ds = network.data_source();
1836
1837        // Get the current block height and fetch header for some later block height
1838        // This fetch will only resolve if we get a block notification
1839        // However, this block will never be stored
1840        let block_height = ds.block_height().await.unwrap();
1841        let target_block_height = block_height + 20;
1842        let fetch = ds
1843            .get_block(BlockId::<MockTypes>::Number(target_block_height))
1844            .await;
1845
1846        assert!(fetch.is_pending());
1847        let block = fetch.await;
1848        assert_eq!(block.height() as usize, target_block_height);
1849
1850        let mut tx = ds.read().await.unwrap();
1851        tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1852            .await
1853            .unwrap_err();
1854        drop(tx);
1855
1856        network.shut_down().await;
1857    }
1858}