hotshot_query_service/
availability.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Queries for HotShot chain state.
14//!
15//! The availability API provides an objective view of the HotShot blockchain. It provides access
16//! only to normative data: that is, data which is agreed upon by all honest consensus nodes and
17//! which is immutable. This means access to core consensus data structures including leaves,
18//! blocks, and headers, where each query is pure and idempotent. This also means that it is
19//! possible for a client to verify all of the information provided by this API, by running a
20//! HotShot light client and downloading the appropriate evidence with each query.
21//!
22//! This API does not provide any queries which represent only the _current_ state of the chain or
23//! may change over time, and it does not provide information for which there is not (yet) agreement
24//! of a supermajority of consensus nodes. For information about the current dynamic state of
25//! consensus and uncommitted state, try the [status](crate::status) API. For information about the
26//! chain which is tabulated by this specific node and not subject to full consensus agreement, try
27//! the [node](crate::node) API.
28
29use std::{fmt::Display, path::PathBuf, time::Duration};
30
31use derive_more::From;
32use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
33use hotshot_types::{
34    data::{Leaf, Leaf2, QuorumProposal, VidCommitment},
35    simple_certificate::QuorumCertificate,
36    traits::node_implementation::NodeType,
37};
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, Snafu};
40use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, RequestParams, StatusCode};
41use vbs::version::StaticVersionType;
42
43use crate::{api::load_api, types::HeightIndexed, Header, Payload, QueryError, VidCommon};
44
45pub(crate) mod data_source;
46mod fetch;
47pub(crate) mod query_data;
48pub use data_source::*;
49pub use fetch::Fetch;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54    pub api_path: Option<PathBuf>,
55
56    /// Timeout for failing requests due to missing data.
57    ///
58    /// If data needed to respond to a request is missing, it can (in some cases) be fetched from an
59    /// external provider. This parameter controls how long the request handler will wait for
60    /// missing data to be fetched before giving up and failing the request.
61    pub fetch_timeout: Duration,
62
63    /// Additional API specification files to merge with `availability-api-path`.
64    ///
65    /// These optional files may contain route definitions for application-specific routes that have
66    /// been added as extensions to the basic availability API.
67    pub extensions: Vec<toml::Value>,
68
69    /// The maximum number of small objects which can be loaded in a single range query.
70    ///
71    /// Currently small objects include leaves only. In the future this limit will also apply to
72    /// headers, block summaries, and VID common, however
73    /// * loading of headers and block summaries is currently implemented by loading the entire
74    ///   block
75    /// * imperfect VID parameter tuning means that VID common can be much larger than it should
76    pub small_object_range_limit: usize,
77
78    /// The maximum number of large objects which can be loaded in a single range query.
79    ///
80    /// Large objects include anything that _might_ contain a full payload or an object proportional
81    /// in size to a payload. Note that this limit applies to the entire class of objects: we do not
82    /// check the size of objects while loading to determine which limit to apply. If an object
83    /// belongs to a class which might contain a large payload, the large object limit always
84    /// applies.
85    pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89    fn default() -> Self {
90        Self {
91            api_path: None,
92            fetch_timeout: Duration::from_millis(500),
93            extensions: vec![],
94            large_object_range_limit: 100,
95            small_object_range_limit: 500,
96        }
97    }
98}
99
100#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
101#[snafu(visibility(pub))]
102pub enum Error {
103    Request {
104        source: RequestError,
105    },
106    #[snafu(display("leaf {resource} missing or not available"))]
107    #[from(ignore)]
108    FetchLeaf {
109        resource: String,
110    },
111    #[snafu(display("block {resource} missing or not available"))]
112    #[from(ignore)]
113    FetchBlock {
114        resource: String,
115    },
116    #[snafu(display("header {resource} missing or not available"))]
117    #[from(ignore)]
118    FetchHeader {
119        resource: String,
120    },
121    #[snafu(display("transaction {resource} missing or not available"))]
122    #[from(ignore)]
123    FetchTransaction {
124        resource: String,
125    },
126    #[snafu(display("transaction index {index} out of range for block {height}"))]
127    #[from(ignore)]
128    InvalidTransactionIndex {
129        height: u64,
130        index: u64,
131    },
132    #[snafu(display("request for range {from}..{until} exceeds limit {limit}"))]
133    #[from(ignore)]
134    RangeLimit {
135        from: usize,
136        until: usize,
137        limit: usize,
138    },
139    #[snafu(display("{source}"))]
140    Query {
141        source: QueryError,
142    },
143    #[snafu(display("State cert for epoch {epoch} not found"))]
144    #[from(ignore)]
145    FetchStateCert {
146        epoch: u64,
147    },
148    #[snafu(display("error {status}: {message}"))]
149    Custom {
150        message: String,
151        status: StatusCode,
152    },
153}
154
155impl Error {
156    pub fn internal<M: Display>(message: M) -> Self {
157        Self::Custom {
158            message: message.to_string(),
159            status: StatusCode::INTERNAL_SERVER_ERROR,
160        }
161    }
162
163    pub fn status(&self) -> StatusCode {
164        match self {
165            Self::Request { .. } | Self::RangeLimit { .. } => StatusCode::BAD_REQUEST,
166            Self::FetchLeaf { .. }
167            | Self::FetchBlock { .. }
168            | Self::FetchTransaction { .. }
169            | Self::FetchHeader { .. }
170            | Self::FetchStateCert { .. } => StatusCode::NOT_FOUND,
171            Self::InvalidTransactionIndex { .. } | Self::Query { .. } => StatusCode::NOT_FOUND,
172            Self::Custom { status, .. } => *status,
173        }
174    }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(bound = "")]
179pub struct Leaf1QueryData<Types: NodeType> {
180    pub(crate) leaf: Leaf<Types>,
181    pub(crate) qc: QuorumCertificate<Types>,
182}
183
184impl<Types: NodeType> Leaf1QueryData<Types> {
185    pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
186        Self { leaf, qc }
187    }
188
189    pub fn leaf(&self) -> &Leaf<Types> {
190        &self.leaf
191    }
192
193    pub fn qc(&self) -> &QuorumCertificate<Types> {
194        &self.qc
195    }
196}
197
198fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
199    // TODO do we still need some check here?
200    // `drb_seed` no longer exists on `Leaf2`
201    // if leaf2.drb_seed != [0; 32] && leaf2.drb_result != [0; 32] {
202    //     panic!("Downgrade of Leaf2 to Leaf will lose DRB information!");
203    // }
204    let quorum_proposal = QuorumProposal {
205        block_header: leaf2.block_header().clone(),
206        view_number: leaf2.view_number(),
207        justify_qc: leaf2.justify_qc().to_qc(),
208        upgrade_certificate: leaf2.upgrade_certificate(),
209        proposal_certificate: None,
210    };
211    let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
212    if let Some(payload) = leaf2.block_payload() {
213        leaf.fill_block_payload_unchecked(payload);
214    }
215    leaf
216}
217
218fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
219    Leaf1QueryData {
220        leaf: downgrade_leaf(leaf.leaf),
221        qc: leaf.qc.to_qc(),
222    }
223}
224
225async fn get_leaf_handler<Types, State>(
226    req: tide_disco::RequestParams,
227    state: &State,
228    timeout: Duration,
229) -> Result<LeafQueryData<Types>, Error>
230where
231    State: 'static + Send + Sync + ReadState,
232    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
233    Types: NodeType,
234    Header<Types>: QueryableHeader<Types>,
235    Payload<Types>: QueryablePayload<Types>,
236{
237    let id = match req.opt_integer_param("height")? {
238        Some(height) => LeafId::Number(height),
239        None => LeafId::Hash(req.blob_param("hash")?),
240    };
241    let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
242    fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
243        resource: id.to_string(),
244    })
245}
246
247async fn get_leaf_range_handler<Types, State>(
248    req: tide_disco::RequestParams,
249    state: &State,
250    timeout: Duration,
251    small_object_range_limit: usize,
252) -> Result<Vec<LeafQueryData<Types>>, Error>
253where
254    State: 'static + Send + Sync + ReadState,
255    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
256    Types: NodeType,
257    Header<Types>: QueryableHeader<Types>,
258    Payload<Types>: QueryablePayload<Types>,
259{
260    let from = req.integer_param::<_, usize>("from")?;
261    let until = req.integer_param("until")?;
262    enforce_range_limit(from, until, small_object_range_limit)?;
263
264    let leaves = state
265        .read(|state| state.get_leaf_range(from..until).boxed())
266        .await;
267    leaves
268        .enumerate()
269        .then(|(index, fetch)| async move {
270            fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
271                resource: (index + from).to_string(),
272            })
273        })
274        .try_collect::<Vec<_>>()
275        .await
276}
277
278fn downgrade_vid_common_query_data<Types: NodeType>(
279    data: VidCommonQueryData<Types>,
280) -> Option<ADVZCommonQueryData<Types>> {
281    let VidCommonQueryData {
282        height,
283        block_hash,
284        payload_hash: VidCommitment::V0(payload_hash),
285        common: VidCommon::V0(common),
286    } = data
287    else {
288        return None;
289    };
290    Some(ADVZCommonQueryData {
291        height,
292        block_hash,
293        payload_hash,
294        common,
295    })
296}
297
298async fn get_vid_common_handler<Types, State>(
299    req: tide_disco::RequestParams,
300    state: &State,
301    timeout: Duration,
302) -> Result<VidCommonQueryData<Types>, Error>
303where
304    State: 'static + Send + Sync + ReadState,
305    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
306    Types: NodeType,
307    Header<Types>: QueryableHeader<Types>,
308    Payload<Types>: QueryablePayload<Types>,
309{
310    let id = if let Some(height) = req.opt_integer_param("height")? {
311        BlockId::Number(height)
312    } else if let Some(hash) = req.opt_blob_param("hash")? {
313        BlockId::Hash(hash)
314    } else {
315        BlockId::PayloadHash(req.blob_param("payload-hash")?)
316    };
317    let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
318    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
319        resource: id.to_string(),
320    })
321}
322
323pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
324    options: &Options,
325    _: Ver,
326    api_ver: semver::Version,
327) -> Result<Api<State, Error, Ver>, ApiError>
328where
329    State: 'static + Send + Sync + ReadState,
330    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
331    Header<Types>: QueryableHeader<Types>,
332    Payload<Types>: QueryablePayload<Types>,
333{
334    let mut api = load_api::<State, Error, Ver>(
335        options.api_path.as_ref(),
336        include_str!("../api/availability.toml"),
337        options.extensions.clone(),
338    )?;
339    let timeout = options.fetch_timeout;
340    let small_object_range_limit = options.small_object_range_limit;
341    let large_object_range_limit = options.large_object_range_limit;
342
343    api.with_version(api_ver.clone());
344
345    // `LeafQueryData` now contains `Leaf2` and `QC2``, which is a breaking change.
346    // On node startup, all leaves are migrated to `Leaf2`.
347    //
348    // To maintain compatibility with nodes running an older version
349    // (which expect `LeafQueryData` with `Leaf1` and `QC1`),
350    // we downgrade `Leaf2` to `Leaf1` and `QC2` to `QC1` if the API version is V0.
351    // Otherwise, we return the new types.
352    if api_ver.major == 0 {
353        api.at("get_leaf", move |req, state| {
354            get_leaf_handler(req, state, timeout)
355                .map(|res| res.map(downgrade_leaf_query_data))
356                .boxed()
357        })?;
358
359        api.at("get_leaf_range", move |req, state| {
360            get_leaf_range_handler(req, state, timeout, small_object_range_limit)
361                .map(|res| {
362                    res.map(|r| {
363                        r.into_iter()
364                            .map(downgrade_leaf_query_data)
365                            .collect::<Vec<Leaf1QueryData<_>>>()
366                    })
367                })
368                .boxed()
369        })?;
370
371        api.stream("stream_leaves", move |req, state| {
372            async move {
373                let height = req.integer_param("height")?;
374                state
375                    .read(|state| {
376                        async move {
377                            Ok(state
378                                .subscribe_leaves(height)
379                                .await
380                                .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
381                        }
382                        .boxed()
383                    })
384                    .await
385            }
386            .try_flatten_stream()
387            .boxed()
388        })?;
389    } else {
390        api.at("get_leaf", move |req, state| {
391            get_leaf_handler(req, state, timeout).boxed()
392        })?;
393
394        api.at("get_leaf_range", move |req, state| {
395            get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
396        })?;
397
398        api.stream("stream_leaves", move |req, state| {
399            async move {
400                let height = req.integer_param("height")?;
401                state
402                    .read(|state| {
403                        async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
404                    })
405                    .await
406            }
407            .try_flatten_stream()
408            .boxed()
409        })?;
410    }
411
412    // VIDCommon data is version gated after the VID upgrade.
413    // We keep the old struct and data in the API version V0. Starting from V1 we are returning version gated structs.
414    if api_ver.major == 0 {
415        api.at("get_vid_common", move |req, state| {
416            get_vid_common_handler(req, state, timeout)
417                .map(|r| match r {
418                    Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
419                        message: "Incompatible VID version.".to_string(),
420                        status: StatusCode::BAD_REQUEST,
421                    }),
422                    Err(e) => Err(e),
423                })
424                .boxed()
425        })?
426        .stream("stream_vid_common", move |req, state| {
427            async move {
428                let height = req.integer_param("height")?;
429                state
430                    .read(|state| {
431                        async move {
432                            Ok(state.subscribe_vid_common(height).await.map(|data| {
433                                downgrade_vid_common_query_data(data).ok_or(Error::Custom {
434                                    message: "Incompatible VID version.".to_string(),
435                                    status: StatusCode::BAD_REQUEST,
436                                })
437                            }))
438                        }
439                        .boxed()
440                    })
441                    .await
442            }
443            .try_flatten_stream()
444            .boxed()
445        })?;
446    } else {
447        api.at("get_vid_common", move |req, state| {
448            get_vid_common_handler(req, state, timeout).boxed().boxed()
449        })?
450        .stream("stream_vid_common", move |req, state| {
451            async move {
452                let height = req.integer_param("height")?;
453                state
454                    .read(|state| {
455                        async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
456                    })
457                    .await
458            }
459            .try_flatten_stream()
460            .boxed()
461        })?;
462    }
463
464    api.at("get_header", move |req, state| {
465        async move {
466            let id = if let Some(height) = req.opt_integer_param("height")? {
467                BlockId::Number(height)
468            } else if let Some(hash) = req.opt_blob_param("hash")? {
469                BlockId::Hash(hash)
470            } else {
471                BlockId::PayloadHash(req.blob_param("payload-hash")?)
472            };
473            let fetch = state.read(|state| state.get_header(id).boxed()).await;
474            fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
475                resource: id.to_string(),
476            })
477        }
478        .boxed()
479    })?
480    .at("get_header_range", move |req, state| {
481        async move {
482            let from = req.integer_param::<_, usize>("from")?;
483            let until = req.integer_param::<_, usize>("until")?;
484            enforce_range_limit(from, until, large_object_range_limit)?;
485
486            let headers = state
487                .read(|state| state.get_header_range(from..until).boxed())
488                .await;
489            headers
490                .enumerate()
491                .then(|(index, fetch)| async move {
492                    fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
493                        resource: (index + from).to_string(),
494                    })
495                })
496                .try_collect::<Vec<_>>()
497                .await
498        }
499        .boxed()
500    })?
501    .stream("stream_headers", move |req, state| {
502        async move {
503            let height = req.integer_param("height")?;
504            state
505                .read(|state| {
506                    async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
507                })
508                .await
509        }
510        .try_flatten_stream()
511        .boxed()
512    })?
513    .at("get_block", move |req, state| {
514        async move {
515            let id = if let Some(height) = req.opt_integer_param("height")? {
516                BlockId::Number(height)
517            } else if let Some(hash) = req.opt_blob_param("hash")? {
518                BlockId::Hash(hash)
519            } else {
520                BlockId::PayloadHash(req.blob_param("payload-hash")?)
521            };
522            let fetch = state.read(|state| state.get_block(id).boxed()).await;
523            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
524                resource: id.to_string(),
525            })
526        }
527        .boxed()
528    })?
529    .at("get_block_range", move |req, state| {
530        async move {
531            let from = req.integer_param::<_, usize>("from")?;
532            let until = req.integer_param("until")?;
533            enforce_range_limit(from, until, large_object_range_limit)?;
534
535            let blocks = state
536                .read(|state| state.get_block_range(from..until).boxed())
537                .await;
538            blocks
539                .enumerate()
540                .then(|(index, fetch)| async move {
541                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
542                        resource: (index + from).to_string(),
543                    })
544                })
545                .try_collect::<Vec<_>>()
546                .await
547        }
548        .boxed()
549    })?
550    .stream("stream_blocks", move |req, state| {
551        async move {
552            let height = req.integer_param("height")?;
553            state
554                .read(|state| {
555                    async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
556                })
557                .await
558        }
559        .try_flatten_stream()
560        .boxed()
561    })?
562    .at("get_payload", move |req, state| {
563        async move {
564            let id = if let Some(height) = req.opt_integer_param("height")? {
565                BlockId::Number(height)
566            } else if let Some(hash) = req.opt_blob_param("hash")? {
567                BlockId::PayloadHash(hash)
568            } else {
569                BlockId::Hash(req.blob_param("block-hash")?)
570            };
571            let fetch = state.read(|state| state.get_payload(id).boxed()).await;
572            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
573                resource: id.to_string(),
574            })
575        }
576        .boxed()
577    })?
578    .at("get_payload_range", move |req, state| {
579        async move {
580            let from = req.integer_param::<_, usize>("from")?;
581            let until = req.integer_param("until")?;
582            enforce_range_limit(from, until, large_object_range_limit)?;
583
584            let payloads = state
585                .read(|state| state.get_payload_range(from..until).boxed())
586                .await;
587            payloads
588                .enumerate()
589                .then(|(index, fetch)| async move {
590                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
591                        resource: (index + from).to_string(),
592                    })
593                })
594                .try_collect::<Vec<_>>()
595                .await
596        }
597        .boxed()
598    })?
599    .stream("stream_payloads", move |req, state| {
600        async move {
601            let height = req.integer_param("height")?;
602            state
603                .read(|state| {
604                    async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
605                })
606                .await
607        }
608        .try_flatten_stream()
609        .boxed()
610    })?
611    .at("get_transaction_proof", move |req, state| {
612        async move {
613            let tx = get_transaction(req, state, timeout).await?;
614            let height = tx.block.height();
615            let vid = state
616                .read(|state| state.get_vid_common(height as usize))
617                .await
618                .with_timeout(timeout)
619                .await
620                .context(FetchBlockSnafu {
621                    resource: height.to_string(),
622                })?;
623            let proof = tx.block.transaction_proof(&vid, &tx.index).context(
624                InvalidTransactionIndexSnafu {
625                    height,
626                    index: tx.transaction.index(),
627                },
628            )?;
629            Ok(TransactionWithProofQueryData::new(tx.transaction, proof))
630        }
631        .boxed()
632    })?
633    .at("get_transaction", move |req, state| {
634        async move { Ok(get_transaction(req, state, timeout).await?.transaction) }.boxed()
635    })?
636    .stream("stream_transactions", move |req, state| {
637        async move {
638            let height = req.integer_param::<_, usize>("height")?;
639
640            let namespace: Option<i64> = req
641                .opt_integer_param::<_, usize>("namespace")?
642                .map(|i| {
643                    i.try_into().map_err(|err| Error::Custom {
644                        message: format!(
645                            "Invalid 'namespace': could not convert usize to i64: {err}"
646                        ),
647                        status: StatusCode::BAD_REQUEST,
648                    })
649                })
650                .transpose()?;
651
652            state
653                .read(|state| {
654                    async move {
655                        Ok(state
656                            .subscribe_blocks(height)
657                            .await
658                            .map(move |block| {
659                                let transactions = block.enumerate().enumerate();
660                                let header = block.header();
661                                let filtered_txs = transactions
662                                    .filter_map(|(i, (index, _tx))| {
663                                        if let Some(requested_ns) = namespace {
664                                            let ns_id = QueryableHeader::<Types>::namespace_id(
665                                                header,
666                                                &index.ns_index,
667                                            )?;
668
669                                            if ns_id.into() != requested_ns {
670                                                return None;
671                                            }
672                                        }
673
674                                        let tx = block.transaction(&index)?;
675                                        TransactionQueryData::new(tx, &block, &index, i as u64)
676                                    })
677                                    .collect::<Vec<_>>();
678
679                                futures::stream::iter(filtered_txs.into_iter().map(Ok))
680                            })
681                            .flatten())
682                    }
683                    .boxed()
684                })
685                .await
686        }
687        .try_flatten_stream()
688        .boxed()
689    })?
690    .at("get_block_summary", move |req, state| {
691        async move {
692            let id: usize = req.integer_param("height")?;
693
694            let fetch = state.read(|state| state.get_block(id).boxed()).await;
695            fetch
696                .with_timeout(timeout)
697                .await
698                .context(FetchBlockSnafu {
699                    resource: id.to_string(),
700                })
701                .map(BlockSummaryQueryData::from)
702        }
703        .boxed()
704    })?
705    .at("get_block_summary_range", move |req, state| {
706        async move {
707            let from: usize = req.integer_param("from")?;
708            let until: usize = req.integer_param("until")?;
709            enforce_range_limit(from, until, large_object_range_limit)?;
710
711            let blocks = state
712                .read(|state| state.get_block_range(from..until).boxed())
713                .await;
714            let result: Vec<BlockSummaryQueryData<Types>> = blocks
715                .enumerate()
716                .then(|(index, fetch)| async move {
717                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
718                        resource: (index + from).to_string(),
719                    })
720                })
721                .map(|result| result.map(BlockSummaryQueryData::from))
722                .try_collect()
723                .await?;
724
725            Ok(result)
726        }
727        .boxed()
728    })?
729    .at("get_limits", move |_req, _state| {
730        async move {
731            Ok(Limits {
732                small_object_range_limit,
733                large_object_range_limit,
734            })
735        }
736        .boxed()
737    })?;
738    Ok(api)
739}
740
741fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
742    if until.saturating_sub(from) > limit {
743        return Err(Error::RangeLimit { from, until, limit });
744    }
745    Ok(())
746}
747
748async fn get_transaction<Types, State>(
749    req: RequestParams,
750    state: &State,
751    timeout: Duration,
752) -> Result<BlockWithTransaction<Types>, Error>
753where
754    Types: NodeType,
755    Header<Types>: QueryableHeader<Types>,
756    Payload<Types>: QueryablePayload<Types>,
757    State: 'static + Send + Sync + ReadState,
758    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
759{
760    match req.opt_blob_param("hash")? {
761        Some(hash) => state
762            .read(|state| state.get_block_containing_transaction(hash).boxed())
763            .await
764            .with_timeout(timeout)
765            .await
766            .context(FetchTransactionSnafu {
767                resource: hash.to_string(),
768            }),
769        None => {
770            let height: u64 = req.integer_param("height")?;
771            let fetch = state
772                .read(|state| state.get_block(height as usize).boxed())
773                .await;
774            let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
775                resource: height.to_string(),
776            })?;
777            let i: u64 = req.integer_param("index")?;
778            let index = block
779                .payload()
780                .nth(block.metadata(), i as usize)
781                .context(InvalidTransactionIndexSnafu { height, index: i })?;
782            let transaction = block
783                .transaction(&index)
784                .context(InvalidTransactionIndexSnafu { height, index: i })?;
785            let transaction = TransactionQueryData::new(transaction, &block, &index, i)
786                .context(InvalidTransactionIndexSnafu { height, index: i })?;
787            Ok(BlockWithTransaction {
788                transaction,
789                block,
790                index,
791            })
792        },
793    }
794}
795
796#[cfg(test)]
797mod test {
798    use std::{fmt::Debug, time::Duration};
799
800    use async_lock::RwLock;
801    use committable::Committable;
802    use futures::future::FutureExt;
803    use hotshot_example_types::node_types::EpochsTestVersions;
804    use hotshot_types::{data::Leaf2, simple_certificate::QuorumCertificate2};
805    use portpicker::pick_unused_port;
806    use serde::de::DeserializeOwned;
807    use surf_disco::{Client, Error as _};
808    use tempfile::TempDir;
809    use tide_disco::App;
810    use toml::toml;
811
812    use super::*;
813    use crate::{
814        data_source::{storage::AvailabilityStorage, ExtensibleDataSource, VersionedDataSource},
815        status::StatusDataSource,
816        task::BackgroundTask,
817        testing::{
818            consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
819            mocks::{mock_transaction, MockBase, MockHeader, MockPayload, MockTypes, MockVersions},
820        },
821        types::HeightIndexed,
822        ApiState, Error, Header,
823    };
824
825    /// Get the current ledger height and a list of non-empty leaf/block pairs.
826    async fn get_non_empty_blocks(
827        client: &Client<Error, MockBase>,
828    ) -> (
829        u64,
830        Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
831    ) {
832        let mut blocks = vec![];
833        // Ignore the genesis block (start from height 1).
834        for i in 1.. {
835            match client
836                .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
837                .send()
838                .await
839            {
840                Ok(block) => {
841                    if !block.is_empty() {
842                        let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
843                        blocks.push((leaf, block));
844                    }
845                },
846                Err(Error::Availability {
847                    source: super::Error::FetchBlock { .. },
848                }) => {
849                    tracing::info!(
850                        "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
851                    );
852                    return (i, blocks);
853                },
854                Err(err) => panic!("unexpected error {err}"),
855            }
856        }
857        unreachable!()
858    }
859
860    async fn validate(client: &Client<Error, MockBase>, height: u64) {
861        // Check the consistency of every block/leaf pair.
862        for i in 0..height {
863            // Limit the number of blocks we validate in order to
864            // speeed up the tests.
865            if ![0, 1, height / 2, height - 1].contains(&i) {
866                continue;
867            }
868            tracing::info!("validate block {i}/{height}");
869
870            // Check that looking up the leaf various ways returns the correct leaf.
871            let leaf: LeafQueryData<MockTypes> =
872                client.get(&format!("leaf/{i}")).send().await.unwrap();
873            assert_eq!(leaf.height(), i);
874            assert_eq!(
875                leaf,
876                client
877                    .get(&format!("leaf/hash/{}", leaf.hash()))
878                    .send()
879                    .await
880                    .unwrap()
881            );
882
883            // Check that looking up the block various ways returns the correct block.
884            let block: BlockQueryData<MockTypes> =
885                client.get(&format!("block/{i}")).send().await.unwrap();
886            let expected_payload = PayloadQueryData::from(block.clone());
887            assert_eq!(leaf.block_hash(), block.hash());
888            assert_eq!(block.height(), i);
889            assert_eq!(
890                block,
891                client
892                    .get(&format!("block/hash/{}", block.hash()))
893                    .send()
894                    .await
895                    .unwrap()
896            );
897            assert_eq!(
898                *block.header(),
899                client.get(&format!("header/{i}")).send().await.unwrap()
900            );
901            assert_eq!(
902                *block.header(),
903                client
904                    .get(&format!("header/hash/{}", block.hash()))
905                    .send()
906                    .await
907                    .unwrap()
908            );
909            assert_eq!(
910                expected_payload,
911                client.get(&format!("payload/{i}")).send().await.unwrap(),
912            );
913            assert_eq!(
914                expected_payload,
915                client
916                    .get(&format!("payload/block-hash/{}", block.hash()))
917                    .send()
918                    .await
919                    .unwrap(),
920            );
921            // Look up the common VID data.
922            let common: VidCommonQueryData<MockTypes> = client
923                .get(&format!("vid/common/{}", block.height()))
924                .send()
925                .await
926                .unwrap();
927            assert_eq!(common.height(), block.height());
928            assert_eq!(common.block_hash(), block.hash());
929            assert_eq!(common.payload_hash(), block.payload_hash());
930            assert_eq!(
931                common,
932                client
933                    .get(&format!("vid/common/hash/{}", block.hash()))
934                    .send()
935                    .await
936                    .unwrap()
937            );
938
939            let block_summary = client
940                .get(&format!("block/summary/{i}"))
941                .send()
942                .await
943                .unwrap();
944            assert_eq!(
945                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
946                block_summary,
947            );
948            assert_eq!(block_summary.header(), block.header());
949            assert_eq!(block_summary.hash(), block.hash());
950            assert_eq!(block_summary.size(), block.size());
951            assert_eq!(block_summary.num_transactions(), block.num_transactions());
952
953            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
954                .get(&format!("block/summaries/{}/{}", 0, i))
955                .send()
956                .await
957                .unwrap();
958            assert_eq!(block_summaries.len() as u64, i);
959
960            // We should be able to look up the block by payload hash. Note that for duplicate
961            // payloads, these endpoints may return a different block with the same payload, which
962            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
963            // response, only its payload.
964            assert_eq!(
965                block.payload(),
966                client
967                    .get::<BlockQueryData<MockTypes>>(&format!(
968                        "block/payload-hash/{}",
969                        block.payload_hash()
970                    ))
971                    .send()
972                    .await
973                    .unwrap()
974                    .payload()
975            );
976            assert_eq!(
977                block.payload_hash(),
978                client
979                    .get::<Header<MockTypes>>(&format!(
980                        "header/payload-hash/{}",
981                        block.payload_hash()
982                    ))
983                    .send()
984                    .await
985                    .unwrap()
986                    .payload_commitment
987            );
988            assert_eq!(
989                block.payload(),
990                client
991                    .get::<PayloadQueryData<MockTypes>>(&format!(
992                        "payload/hash/{}",
993                        block.payload_hash()
994                    ))
995                    .send()
996                    .await
997                    .unwrap()
998                    .data(),
999            );
1000            assert_eq!(
1001                common.common(),
1002                client
1003                    .get::<VidCommonQueryData<MockTypes>>(&format!(
1004                        "vid/common/payload-hash/{}",
1005                        block.payload_hash()
1006                    ))
1007                    .send()
1008                    .await
1009                    .unwrap()
1010                    .common()
1011            );
1012
1013            // Check that looking up each transaction in the block various ways returns the correct
1014            // transaction.
1015            for (j, txn_from_block) in block.enumerate() {
1016                let txn: TransactionQueryData<MockTypes> = client
1017                    .get(&format!("transaction/{}/{}/noproof", i, j.position))
1018                    .send()
1019                    .await
1020                    .unwrap();
1021                assert_eq!(txn.block_height(), i);
1022                assert_eq!(txn.block_hash(), block.hash());
1023                assert_eq!(txn.index(), j.position as u64);
1024                assert_eq!(txn.hash(), txn_from_block.commit());
1025                assert_eq!(txn.transaction(), &txn_from_block);
1026                // We should be able to look up the transaction by hash. Note that for duplicate
1027                // transactions, this endpoint may return a different transaction with the same
1028                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1029                // `TransactionWithProofQueryData` response, only its commitment.
1030                assert_eq!(
1031                    txn.hash(),
1032                    client
1033                        .get::<TransactionQueryData<MockTypes>>(&format!(
1034                            "transaction/hash/{}/noproof",
1035                            txn.hash()
1036                        ))
1037                        .send()
1038                        .await
1039                        .unwrap()
1040                        .hash()
1041                );
1042
1043                let tx_with_proof = client
1044                    .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1045                        "transaction/{}/{}/proof",
1046                        i, j.position
1047                    ))
1048                    .send()
1049                    .await
1050                    .unwrap();
1051                assert_eq!(txn.hash(), tx_with_proof.hash());
1052                assert!(tx_with_proof.proof().verify(
1053                    block.metadata(),
1054                    txn.transaction(),
1055                    &block.payload_hash(),
1056                    common.common()
1057                ));
1058
1059                // Similar to above, but by hash
1060                let tx_with_proof = client
1061                    .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1062                        "transaction/hash/{}/proof",
1063                        txn.hash()
1064                    ))
1065                    .send()
1066                    .await
1067                    .unwrap();
1068                assert_eq!(txn.hash(), tx_with_proof.hash());
1069                assert!(tx_with_proof.proof().verify(
1070                    block.metadata(),
1071                    txn.transaction(),
1072                    &block.payload_hash(),
1073                    common.common()
1074                ));
1075            }
1076
1077            let block_range: Vec<BlockQueryData<MockTypes>> = client
1078                .get(&format!("block/{}/{}", 0, i))
1079                .send()
1080                .await
1081                .unwrap();
1082
1083            assert_eq!(block_range.len() as u64, i);
1084
1085            let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1086                .get(&format!("leaf/{}/{}", 0, i))
1087                .send()
1088                .await
1089                .unwrap();
1090
1091            assert_eq!(leaf_range.len() as u64, i);
1092
1093            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1094                .get(&format!("payload/{}/{}", 0, i))
1095                .send()
1096                .await
1097                .unwrap();
1098
1099            assert_eq!(payload_range.len() as u64, i);
1100
1101            let header_range: Vec<Header<MockTypes>> = client
1102                .get(&format!("header/{}/{}", 0, i))
1103                .send()
1104                .await
1105                .unwrap();
1106
1107            assert_eq!(header_range.len() as u64, i);
1108        }
1109    }
1110
1111    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1112    async fn test_api() {
1113        // Create the consensus network.
1114        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1115        network.start().await;
1116
1117        // Start the web server.
1118        let port = pick_unused_port().unwrap();
1119        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1120        let options = Options {
1121            small_object_range_limit: 500,
1122            large_object_range_limit: 500,
1123            ..Default::default()
1124        };
1125
1126        app.register_module(
1127            "availability",
1128            define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1129        )
1130        .unwrap();
1131        network.spawn(
1132            "server",
1133            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1134        );
1135
1136        // Start a client.
1137        let client = Client::<Error, MockBase>::new(
1138            format!("http://localhost:{port}/availability")
1139                .parse()
1140                .unwrap(),
1141        );
1142        assert!(client.connect(Some(Duration::from_secs(60))).await);
1143        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1144
1145        // Submit a few blocks and make sure each one gets reflected in the query service and
1146        // preserves the consistency of the data and indices.
1147        let leaves = client
1148            .socket("stream/leaves/0")
1149            .subscribe::<LeafQueryData<MockTypes>>()
1150            .await
1151            .unwrap();
1152        let headers = client
1153            .socket("stream/headers/0")
1154            .subscribe::<Header<MockTypes>>()
1155            .await
1156            .unwrap();
1157        let blocks = client
1158            .socket("stream/blocks/0")
1159            .subscribe::<BlockQueryData<MockTypes>>()
1160            .await
1161            .unwrap();
1162        let vid_common = client
1163            .socket("stream/vid/common/0")
1164            .subscribe::<VidCommonQueryData<MockTypes>>()
1165            .await
1166            .unwrap();
1167        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1168        for nonce in 0..3 {
1169            let txn = mock_transaction(vec![nonce]);
1170            network.submit_transaction(txn).await;
1171
1172            // Wait for the transaction to be finalized.
1173            let (i, leaf, block, common) = loop {
1174                tracing::info!("waiting for block with transaction {}", nonce);
1175                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1176                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1177                let leaf = leaf.unwrap();
1178                let header = header.unwrap();
1179                let block = block.unwrap();
1180                let common = common.unwrap();
1181                assert_eq!(leaf.height() as usize, i);
1182                assert_eq!(leaf.block_hash(), block.hash());
1183                assert_eq!(block.header(), &header);
1184                assert_eq!(common.height() as usize, i);
1185                if !block.is_empty() {
1186                    break (i, leaf, block, common);
1187                }
1188            };
1189            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1190            assert_eq!(
1191                block,
1192                client.get(&format!("block/{i}")).send().await.unwrap()
1193            );
1194            assert_eq!(
1195                common,
1196                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1197            );
1198
1199            validate(&client, (i + 1) as u64).await;
1200        }
1201
1202        network.shut_down().await;
1203    }
1204
1205    async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1206        // Check the consistency of every block/leaf pair.
1207        for i in 0..height {
1208            // Limit the number of blocks we validate in order to
1209            // speeed up the tests.
1210            if ![0, 1, height / 2, height - 1].contains(&i) {
1211                continue;
1212            }
1213            tracing::info!("validate block {i}/{height}");
1214
1215            // Check that looking up the leaf various ways returns the correct leaf.
1216            let leaf: Leaf1QueryData<MockTypes> =
1217                client.get(&format!("leaf/{i}")).send().await.unwrap();
1218            assert_eq!(leaf.leaf.height(), i);
1219            assert_eq!(
1220                leaf,
1221                client
1222                    .get(&format!(
1223                        "leaf/hash/{}",
1224                        <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1225                    ))
1226                    .send()
1227                    .await
1228                    .unwrap()
1229            );
1230
1231            // Check that looking up the block various ways returns the correct block.
1232            let block: BlockQueryData<MockTypes> =
1233                client.get(&format!("block/{i}")).send().await.unwrap();
1234            let expected_payload = PayloadQueryData::from(block.clone());
1235            assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1236            assert_eq!(block.height(), i);
1237            assert_eq!(
1238                block,
1239                client
1240                    .get(&format!("block/hash/{}", block.hash()))
1241                    .send()
1242                    .await
1243                    .unwrap()
1244            );
1245            assert_eq!(
1246                *block.header(),
1247                client.get(&format!("header/{i}")).send().await.unwrap()
1248            );
1249            assert_eq!(
1250                *block.header(),
1251                client
1252                    .get(&format!("header/hash/{}", block.hash()))
1253                    .send()
1254                    .await
1255                    .unwrap()
1256            );
1257            assert_eq!(
1258                expected_payload,
1259                client.get(&format!("payload/{i}")).send().await.unwrap(),
1260            );
1261            assert_eq!(
1262                expected_payload,
1263                client
1264                    .get(&format!("payload/block-hash/{}", block.hash()))
1265                    .send()
1266                    .await
1267                    .unwrap(),
1268            );
1269            // Look up the common VID data.
1270            let common: ADVZCommonQueryData<MockTypes> = client
1271                .get(&format!("vid/common/{}", block.height()))
1272                .send()
1273                .await
1274                .unwrap();
1275            assert_eq!(common.height(), block.height());
1276            assert_eq!(common.block_hash(), block.hash());
1277            assert_eq!(
1278                VidCommitment::V0(common.payload_hash()),
1279                block.payload_hash(),
1280            );
1281            assert_eq!(
1282                common,
1283                client
1284                    .get(&format!("vid/common/hash/{}", block.hash()))
1285                    .send()
1286                    .await
1287                    .unwrap()
1288            );
1289
1290            let block_summary = client
1291                .get(&format!("block/summary/{i}"))
1292                .send()
1293                .await
1294                .unwrap();
1295            assert_eq!(
1296                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1297                block_summary,
1298            );
1299            assert_eq!(block_summary.header(), block.header());
1300            assert_eq!(block_summary.hash(), block.hash());
1301            assert_eq!(block_summary.size(), block.size());
1302            assert_eq!(block_summary.num_transactions(), block.num_transactions());
1303
1304            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1305                .get(&format!("block/summaries/{}/{}", 0, i))
1306                .send()
1307                .await
1308                .unwrap();
1309            assert_eq!(block_summaries.len() as u64, i);
1310
1311            // We should be able to look up the block by payload hash. Note that for duplicate
1312            // payloads, these endpoints may return a different block with the same payload, which
1313            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
1314            // response, only its payload.
1315            assert_eq!(
1316                block.payload(),
1317                client
1318                    .get::<BlockQueryData<MockTypes>>(&format!(
1319                        "block/payload-hash/{}",
1320                        block.payload_hash()
1321                    ))
1322                    .send()
1323                    .await
1324                    .unwrap()
1325                    .payload()
1326            );
1327            assert_eq!(
1328                block.payload_hash(),
1329                client
1330                    .get::<Header<MockTypes>>(&format!(
1331                        "header/payload-hash/{}",
1332                        block.payload_hash()
1333                    ))
1334                    .send()
1335                    .await
1336                    .unwrap()
1337                    .payload_commitment
1338            );
1339            assert_eq!(
1340                block.payload(),
1341                client
1342                    .get::<PayloadQueryData<MockTypes>>(&format!(
1343                        "payload/hash/{}",
1344                        block.payload_hash()
1345                    ))
1346                    .send()
1347                    .await
1348                    .unwrap()
1349                    .data(),
1350            );
1351            assert_eq!(
1352                common.common(),
1353                client
1354                    .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1355                        "vid/common/payload-hash/{}",
1356                        block.payload_hash()
1357                    ))
1358                    .send()
1359                    .await
1360                    .unwrap()
1361                    .common()
1362            );
1363
1364            // Check that looking up each transaction in the block various ways returns the correct
1365            // transaction.
1366            for (j, txn_from_block) in block.enumerate() {
1367                let txn: TransactionQueryData<MockTypes> = client
1368                    .get(&format!("transaction/{}/{}/noproof", i, j.position))
1369                    .send()
1370                    .await
1371                    .unwrap();
1372                assert_eq!(txn.block_height(), i);
1373                assert_eq!(txn.block_hash(), block.hash());
1374                assert_eq!(txn.index(), j.position as u64);
1375                assert_eq!(txn.hash(), txn_from_block.commit());
1376                assert_eq!(txn.transaction(), &txn_from_block);
1377                // We should be able to look up the transaction by hash. Note that for duplicate
1378                // transactions, this endpoint may return a different transaction with the same
1379                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1380                // `TransactionQueryData` response, only its commitment.
1381                assert_eq!(
1382                    txn.hash(),
1383                    client
1384                        .get::<TransactionQueryData<MockTypes>>(&format!(
1385                            "transaction/hash/{}/noproof",
1386                            txn.hash()
1387                        ))
1388                        .send()
1389                        .await
1390                        .unwrap()
1391                        .hash()
1392                );
1393
1394                assert_eq!(
1395                    txn.hash(),
1396                    client
1397                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1398                            "transaction/{}/{}/proof",
1399                            i, j.position
1400                        ))
1401                        .send()
1402                        .await
1403                        .unwrap()
1404                        .hash()
1405                );
1406
1407                assert_eq!(
1408                    txn.hash(),
1409                    client
1410                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1411                            "transaction/hash/{}/proof",
1412                            txn.hash()
1413                        ))
1414                        .send()
1415                        .await
1416                        .unwrap()
1417                        .hash()
1418                );
1419            }
1420
1421            let block_range: Vec<BlockQueryData<MockTypes>> = client
1422                .get(&format!("block/{}/{}", 0, i))
1423                .send()
1424                .await
1425                .unwrap();
1426
1427            assert_eq!(block_range.len() as u64, i);
1428
1429            let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1430                .get(&format!("leaf/{}/{}", 0, i))
1431                .send()
1432                .await
1433                .unwrap();
1434
1435            assert_eq!(leaf_range.len() as u64, i);
1436
1437            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1438                .get(&format!("payload/{}/{}", 0, i))
1439                .send()
1440                .await
1441                .unwrap();
1442
1443            assert_eq!(payload_range.len() as u64, i);
1444
1445            let header_range: Vec<Header<MockTypes>> = client
1446                .get(&format!("header/{}/{}", 0, i))
1447                .send()
1448                .await
1449                .unwrap();
1450
1451            assert_eq!(header_range.len() as u64, i);
1452        }
1453    }
1454
1455    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1456    async fn test_api_epochs() {
1457        // Create the consensus network.
1458        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
1459        let epoch_height = network.epoch_height();
1460        network.start().await;
1461
1462        // Start the web server.
1463        let port = pick_unused_port().unwrap();
1464        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1465        app.register_module(
1466            "availability",
1467            define_api(
1468                &Default::default(),
1469                MockBase::instance(),
1470                "1.0.0".parse().unwrap(),
1471            )
1472            .unwrap(),
1473        )
1474        .unwrap();
1475        network.spawn(
1476            "server",
1477            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1478        );
1479
1480        // Start a client.
1481        let client = Client::<Error, MockBase>::new(
1482            format!("http://localhost:{port}/availability")
1483                .parse()
1484                .unwrap(),
1485        );
1486        assert!(client.connect(Some(Duration::from_secs(60))).await);
1487
1488        // Submit a few blocks and make sure each one gets reflected in the query service and
1489        // preserves the consistency of the data and indices.
1490        let headers = client
1491            .socket("stream/headers/0")
1492            .subscribe::<Header<MockTypes>>()
1493            .await
1494            .unwrap();
1495        let mut chain = headers.enumerate();
1496
1497        loop {
1498            let (i, header) = chain.next().await.unwrap();
1499            let header = header.unwrap();
1500            assert_eq!(header.height(), i as u64);
1501            if header.height() >= 3 * epoch_height {
1502                break;
1503            }
1504        }
1505
1506        network.shut_down().await;
1507    }
1508
1509    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1510    async fn test_old_api() {
1511        // Create the consensus network.
1512        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1513        network.start().await;
1514
1515        // Start the web server.
1516        let port = pick_unused_port().unwrap();
1517
1518        let options = Options {
1519            small_object_range_limit: 500,
1520            large_object_range_limit: 500,
1521            ..Default::default()
1522        };
1523
1524        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1525        app.register_module(
1526            "availability",
1527            define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1528        )
1529        .unwrap();
1530        network.spawn(
1531            "server",
1532            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1533        );
1534
1535        // Start a client.
1536        let client = Client::<Error, MockBase>::new(
1537            format!("http://localhost:{port}/availability")
1538                .parse()
1539                .unwrap(),
1540        );
1541        assert!(client.connect(Some(Duration::from_secs(60))).await);
1542        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1543
1544        // Submit a few blocks and make sure each one gets reflected in the query service and
1545        // preserves the consistency of the data and indices.
1546        let leaves = client
1547            .socket("stream/leaves/0")
1548            .subscribe::<Leaf1QueryData<MockTypes>>()
1549            .await
1550            .unwrap();
1551        let headers = client
1552            .socket("stream/headers/0")
1553            .subscribe::<Header<MockTypes>>()
1554            .await
1555            .unwrap();
1556        let blocks = client
1557            .socket("stream/blocks/0")
1558            .subscribe::<BlockQueryData<MockTypes>>()
1559            .await
1560            .unwrap();
1561        let vid_common = client
1562            .socket("stream/vid/common/0")
1563            .subscribe::<ADVZCommonQueryData<MockTypes>>()
1564            .await
1565            .unwrap();
1566        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1567        for nonce in 0..3 {
1568            let txn = mock_transaction(vec![nonce]);
1569            network.submit_transaction(txn).await;
1570
1571            // Wait for the transaction to be finalized.
1572            let (i, leaf, block, common) = loop {
1573                tracing::info!("waiting for block with transaction {}", nonce);
1574                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1575                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1576                let leaf = leaf.unwrap();
1577                let header = header.unwrap();
1578                let block = block.unwrap();
1579                let common = common.unwrap();
1580                assert_eq!(leaf.leaf.height() as usize, i);
1581                assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1582                assert_eq!(block.header(), &header);
1583                assert_eq!(common.height() as usize, i);
1584                if !block.is_empty() {
1585                    break (i, leaf, block, common);
1586                }
1587            };
1588            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1589            assert_eq!(
1590                block,
1591                client.get(&format!("block/{i}")).send().await.unwrap()
1592            );
1593            assert_eq!(
1594                common,
1595                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1596            );
1597
1598            validate_old(&client, (i + 1) as u64).await;
1599        }
1600
1601        network.shut_down().await;
1602    }
1603
1604    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1605    async fn test_extensions() {
1606        use hotshot_example_types::node_types::TestVersions;
1607
1608        let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1609        let data_source = ExtensibleDataSource::new(
1610            MockDataSource::create(dir.path(), Default::default())
1611                .await
1612                .unwrap(),
1613            0,
1614        );
1615
1616        // mock up some consensus data.
1617        let leaf =
1618            Leaf2::<MockTypes>::genesis::<MockVersions>(&Default::default(), &Default::default())
1619                .await;
1620        let qc =
1621            QuorumCertificate2::genesis::<TestVersions>(&Default::default(), &Default::default())
1622                .await;
1623        let leaf = LeafQueryData::new(leaf, qc).unwrap();
1624        let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1625        data_source
1626            .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1627            .await
1628            .unwrap();
1629
1630        // assert that the store has data before we move on to API requests
1631        assert_eq!(
1632            ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1633                .await
1634                .unwrap(),
1635            1
1636        );
1637        assert_eq!(block, data_source.get_block(0).await.await);
1638
1639        // Create the API extensions specification.
1640        let extensions = toml! {
1641            [route.post_ext]
1642            PATH = ["/ext/:val"]
1643            METHOD = "POST"
1644            ":val" = "Integer"
1645
1646            [route.get_ext]
1647            PATH = ["/ext"]
1648            METHOD = "GET"
1649        };
1650
1651        let mut api =
1652            define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1653                &Options {
1654                    extensions: vec![extensions.into()],
1655                    ..Default::default()
1656                },
1657                MockBase::instance(),
1658                "1.0.0".parse().unwrap(),
1659            )
1660            .unwrap();
1661        api.get("get_ext", |_, state| {
1662            async move { Ok(*state.as_ref()) }.boxed()
1663        })
1664        .unwrap()
1665        .post("post_ext", |req, state| {
1666            async move {
1667                *state.as_mut() = req.integer_param("val")?;
1668                Ok(())
1669            }
1670            .boxed()
1671        })
1672        .unwrap();
1673
1674        let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1675        app.register_module("availability", api).unwrap();
1676
1677        let port = pick_unused_port().unwrap();
1678        let _server = BackgroundTask::spawn(
1679            "server",
1680            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1681        );
1682
1683        let client = Client::<Error, MockBase>::new(
1684            format!("http://localhost:{port}/availability")
1685                .parse()
1686                .unwrap(),
1687        );
1688        assert!(client.connect(Some(Duration::from_secs(60))).await);
1689
1690        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1691        client.post::<()>("ext/42").send().await.unwrap();
1692        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1693
1694        // Ensure we can still access the built-in functionality.
1695        assert_eq!(
1696            client
1697                .get::<MockHeader>("header/0")
1698                .send()
1699                .await
1700                .unwrap()
1701                .block_number,
1702            0
1703        );
1704    }
1705
1706    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1707    async fn test_range_limit() {
1708        let large_object_range_limit = 2;
1709        let small_object_range_limit = 3;
1710
1711        // Create the consensus network.
1712        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1713        network.start().await;
1714
1715        // Start the web server.
1716        let port = pick_unused_port().unwrap();
1717        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1718        app.register_module(
1719            "availability",
1720            define_api(
1721                &Options {
1722                    large_object_range_limit,
1723                    small_object_range_limit,
1724                    ..Default::default()
1725                },
1726                MockBase::instance(),
1727                "1.0.0".parse().unwrap(),
1728            )
1729            .unwrap(),
1730        )
1731        .unwrap();
1732        network.spawn(
1733            "server",
1734            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1735        );
1736
1737        // Start a client.
1738        let client = Client::<Error, MockBase>::new(
1739            format!("http://localhost:{port}/availability")
1740                .parse()
1741                .unwrap(),
1742        );
1743        assert!(client.connect(Some(Duration::from_secs(60))).await);
1744
1745        // Check reported limits.
1746        assert_eq!(
1747            client.get::<Limits>("limits").send().await.unwrap(),
1748            Limits {
1749                small_object_range_limit,
1750                large_object_range_limit
1751            }
1752        );
1753
1754        // Wait for enough blocks to be produced.
1755        client
1756            .socket("stream/blocks/0")
1757            .subscribe::<BlockQueryData<MockTypes>>()
1758            .await
1759            .unwrap()
1760            .take(small_object_range_limit + 1)
1761            .try_collect::<Vec<_>>()
1762            .await
1763            .unwrap();
1764
1765        async fn check_limit<T: DeserializeOwned + Debug>(
1766            client: &Client<Error, MockBase>,
1767            req: &str,
1768            limit: usize,
1769        ) {
1770            let range: Vec<T> = client
1771                .get(&format!("{req}/0/{limit}"))
1772                .send()
1773                .await
1774                .unwrap();
1775            assert_eq!(range.len(), limit);
1776            let err = client
1777                .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1778                .send()
1779                .await
1780                .unwrap_err();
1781            assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1782        }
1783
1784        check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1785        check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1786        check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1787        check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1788            .await;
1789        check_limit::<BlockSummaryQueryData<MockTypes>>(
1790            &client,
1791            "block/summaries",
1792            large_object_range_limit,
1793        )
1794        .await;
1795
1796        network.shut_down().await;
1797    }
1798
1799    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1800    async fn test_header_endpoint() {
1801        // Create the consensus network.
1802        let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init().await;
1803        network.start().await;
1804
1805        // Start the web server.
1806        let port = pick_unused_port().unwrap();
1807        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1808        app.register_module(
1809            "availability",
1810            define_api(
1811                &Default::default(),
1812                MockBase::instance(),
1813                "1.0.0".parse().unwrap(),
1814            )
1815            .unwrap(),
1816        )
1817        .unwrap();
1818        network.spawn(
1819            "server",
1820            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1821        );
1822
1823        let ds = network.data_source();
1824
1825        // Get the current block height and fetch header for some later block height
1826        // This fetch will only resolve when we receive a leaf or block for that block height
1827        let block_height = ds.block_height().await.unwrap();
1828        let fetch = ds
1829            .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1830            .await;
1831
1832        assert!(fetch.is_pending());
1833        let header = fetch.await;
1834        assert_eq!(header.height() as usize, block_height + 25);
1835
1836        network.shut_down().await;
1837    }
1838
1839    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1840    async fn test_leaf_only_ds() {
1841        // Create the consensus network.
1842        let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init_with_leaf_ds().await;
1843        network.start().await;
1844
1845        // Start the web server.
1846        let port = pick_unused_port().unwrap();
1847        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1848        app.register_module(
1849            "availability",
1850            define_api(
1851                &Default::default(),
1852                MockBase::instance(),
1853                "1.0.0".parse().unwrap(),
1854            )
1855            .unwrap(),
1856        )
1857        .unwrap();
1858        network.spawn(
1859            "server",
1860            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1861        );
1862
1863        // Start a client.
1864        let client = Client::<Error, MockBase>::new(
1865            format!("http://localhost:{port}/availability")
1866                .parse()
1867                .unwrap(),
1868        );
1869        assert!(client.connect(Some(Duration::from_secs(60))).await);
1870
1871        // Wait for some headers to be produced.
1872        client
1873            .socket("stream/headers/0")
1874            .subscribe::<Header<MockTypes>>()
1875            .await
1876            .unwrap()
1877            .take(5)
1878            .try_collect::<Vec<_>>()
1879            .await
1880            .unwrap();
1881
1882        // Wait for some leaves to be produced.
1883        client
1884            .socket("stream/leaves/5")
1885            .subscribe::<LeafQueryData<MockTypes>>()
1886            .await
1887            .unwrap()
1888            .take(5)
1889            .try_collect::<Vec<_>>()
1890            .await
1891            .unwrap();
1892
1893        let ds = network.data_source();
1894
1895        // Get the current block height and fetch header for some later block height
1896        // This fetch will only resolve if we get a block notification
1897        // However, this block will never be stored
1898        let block_height = ds.block_height().await.unwrap();
1899        let target_block_height = block_height + 20;
1900        let fetch = ds
1901            .get_block(BlockId::<MockTypes>::Number(target_block_height))
1902            .await;
1903
1904        assert!(fetch.is_pending());
1905        let block = fetch.await;
1906        assert_eq!(block.height() as usize, target_block_height);
1907
1908        let mut tx = ds.read().await.unwrap();
1909        tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1910            .await
1911            .unwrap_err();
1912        drop(tx);
1913
1914        network.shut_down().await;
1915    }
1916}