hotshot_query_service/
availability.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Queries for HotShot chain state.
14//!
15//! The availability API provides an objective view of the HotShot blockchain. It provides access
16//! only to normative data: that is, data which is agreed upon by all honest consensus nodes and
17//! which is immutable. This means access to core consensus data structures including leaves,
18//! blocks, and headers, where each query is pure and idempotent. This also means that it is
19//! possible for a client to verify all of the information provided by this API, by running a
20//! HotShot light client and downloading the appropriate evidence with each query.
21//!
22//! This API does not provide any queries which represent only the _current_ state of the chain or
23//! may change over time, and it does not provide information for which there is not (yet) agreement
24//! of a supermajority of consensus nodes. For information about the current dynamic state of
25//! consensus and uncommitted state, try the [status](crate::status) API. For information about the
26//! chain which is tabulated by this specific node and not subject to full consensus agreement, try
27//! the [node](crate::node) API.
28
29use std::{fmt::Display, path::PathBuf, time::Duration};
30
31use derive_more::From;
32use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
33use hotshot_types::{
34    data::{Leaf, Leaf2, QuorumProposal, VidCommitment},
35    simple_certificate::QuorumCertificate,
36    traits::node_implementation::NodeType,
37};
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, Snafu};
40use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, RequestParams, StatusCode};
41use vbs::version::StaticVersionType;
42
43use crate::{api::load_api, types::HeightIndexed, Header, Payload, QueryError, VidCommon};
44
45pub(crate) mod data_source;
46mod fetch;
47pub(crate) mod query_data;
48pub use data_source::*;
49pub use fetch::Fetch;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54    pub api_path: Option<PathBuf>,
55
56    /// Timeout for failing requests due to missing data.
57    ///
58    /// If data needed to respond to a request is missing, it can (in some cases) be fetched from an
59    /// external provider. This parameter controls how long the request handler will wait for
60    /// missing data to be fetched before giving up and failing the request.
61    pub fetch_timeout: Duration,
62
63    /// Additional API specification files to merge with `availability-api-path`.
64    ///
65    /// These optional files may contain route definitions for application-specific routes that have
66    /// been added as extensions to the basic availability API.
67    pub extensions: Vec<toml::Value>,
68
69    /// The maximum number of small objects which can be loaded in a single range query.
70    ///
71    /// Currently small objects include leaves only. In the future this limit will also apply to
72    /// headers, block summaries, and VID common, however
73    /// * loading of headers and block summaries is currently implemented by loading the entire
74    ///   block
75    /// * imperfect VID parameter tuning means that VID common can be much larger than it should
76    pub small_object_range_limit: usize,
77
78    /// The maximum number of large objects which can be loaded in a single range query.
79    ///
80    /// Large objects include anything that _might_ contain a full payload or an object proportional
81    /// in size to a payload. Note that this limit applies to the entire class of objects: we do not
82    /// check the size of objects while loading to determine which limit to apply. If an object
83    /// belongs to a class which might contain a large payload, the large object limit always
84    /// applies.
85    pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89    fn default() -> Self {
90        Self {
91            api_path: None,
92            fetch_timeout: Duration::from_millis(500),
93            extensions: vec![],
94            large_object_range_limit: 100,
95            small_object_range_limit: 500,
96        }
97    }
98}
99
100#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
101#[snafu(visibility(pub))]
102pub enum Error {
103    Request {
104        source: RequestError,
105    },
106    #[snafu(display("leaf {resource} missing or not available"))]
107    #[from(ignore)]
108    FetchLeaf {
109        resource: String,
110    },
111    #[snafu(display("block {resource} missing or not available"))]
112    #[from(ignore)]
113    FetchBlock {
114        resource: String,
115    },
116    #[snafu(display("header {resource} missing or not available"))]
117    #[from(ignore)]
118    FetchHeader {
119        resource: String,
120    },
121    #[snafu(display("transaction {resource} missing or not available"))]
122    #[from(ignore)]
123    FetchTransaction {
124        resource: String,
125    },
126    #[snafu(display("transaction index {index} out of range for block {height}"))]
127    #[from(ignore)]
128    InvalidTransactionIndex {
129        height: u64,
130        index: u64,
131    },
132    #[snafu(display("request for range {from}..{until} exceeds limit {limit}"))]
133    #[from(ignore)]
134    RangeLimit {
135        from: usize,
136        until: usize,
137        limit: usize,
138    },
139    #[snafu(display("{source}"))]
140    Query {
141        source: QueryError,
142    },
143    #[snafu(display("State cert for epoch {epoch} not found"))]
144    #[from(ignore)]
145    FetchStateCert {
146        epoch: u64,
147    },
148    #[snafu(display("error {status}: {message}"))]
149    Custom {
150        message: String,
151        status: StatusCode,
152    },
153}
154
155impl Error {
156    pub fn internal<M: Display>(message: M) -> Self {
157        Self::Custom {
158            message: message.to_string(),
159            status: StatusCode::INTERNAL_SERVER_ERROR,
160        }
161    }
162
163    pub fn status(&self) -> StatusCode {
164        match self {
165            Self::Request { .. } | Self::RangeLimit { .. } => StatusCode::BAD_REQUEST,
166            Self::FetchLeaf { .. }
167            | Self::FetchBlock { .. }
168            | Self::FetchTransaction { .. }
169            | Self::FetchHeader { .. }
170            | Self::FetchStateCert { .. } => StatusCode::NOT_FOUND,
171            Self::InvalidTransactionIndex { .. } | Self::Query { .. } => StatusCode::NOT_FOUND,
172            Self::Custom { status, .. } => *status,
173        }
174    }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(bound = "")]
179pub struct Leaf1QueryData<Types: NodeType> {
180    pub(crate) leaf: Leaf<Types>,
181    pub(crate) qc: QuorumCertificate<Types>,
182}
183
184impl<Types: NodeType> Leaf1QueryData<Types> {
185    pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
186        Self { leaf, qc }
187    }
188
189    pub fn leaf(&self) -> &Leaf<Types> {
190        &self.leaf
191    }
192
193    pub fn qc(&self) -> &QuorumCertificate<Types> {
194        &self.qc
195    }
196}
197
198fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
199    // TODO do we still need some check here?
200    // `drb_seed` no longer exists on `Leaf2`
201    // if leaf2.drb_seed != [0; 32] && leaf2.drb_result != [0; 32] {
202    //     panic!("Downgrade of Leaf2 to Leaf will lose DRB information!");
203    // }
204    let quorum_proposal = QuorumProposal {
205        block_header: leaf2.block_header().clone(),
206        view_number: leaf2.view_number(),
207        justify_qc: leaf2.justify_qc().to_qc(),
208        upgrade_certificate: leaf2.upgrade_certificate(),
209        proposal_certificate: None,
210    };
211    let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
212    if let Some(payload) = leaf2.block_payload() {
213        leaf.fill_block_payload_unchecked(payload);
214    }
215    leaf
216}
217
218fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
219    Leaf1QueryData {
220        leaf: downgrade_leaf(leaf.leaf),
221        qc: leaf.qc.to_qc(),
222    }
223}
224
225async fn get_leaf_handler<Types, State>(
226    req: tide_disco::RequestParams,
227    state: &State,
228    timeout: Duration,
229) -> Result<LeafQueryData<Types>, Error>
230where
231    State: 'static + Send + Sync + ReadState,
232    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
233    Types: NodeType,
234    Header<Types>: QueryableHeader<Types>,
235    Payload<Types>: QueryablePayload<Types>,
236{
237    let id = match req.opt_integer_param("height")? {
238        Some(height) => LeafId::Number(height),
239        None => LeafId::Hash(req.blob_param("hash")?),
240    };
241    let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
242    fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
243        resource: id.to_string(),
244    })
245}
246
247async fn get_leaf_range_handler<Types, State>(
248    req: tide_disco::RequestParams,
249    state: &State,
250    timeout: Duration,
251    small_object_range_limit: usize,
252) -> Result<Vec<LeafQueryData<Types>>, Error>
253where
254    State: 'static + Send + Sync + ReadState,
255    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
256    Types: NodeType,
257    Header<Types>: QueryableHeader<Types>,
258    Payload<Types>: QueryablePayload<Types>,
259{
260    let from = req.integer_param::<_, usize>("from")?;
261    let until = req.integer_param("until")?;
262    enforce_range_limit(from, until, small_object_range_limit)?;
263
264    let leaves = state
265        .read(|state| state.get_leaf_range(from..until).boxed())
266        .await;
267    leaves
268        .enumerate()
269        .then(|(index, fetch)| async move {
270            fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
271                resource: (index + from).to_string(),
272            })
273        })
274        .try_collect::<Vec<_>>()
275        .await
276}
277
278fn downgrade_vid_common_query_data<Types: NodeType>(
279    data: VidCommonQueryData<Types>,
280) -> Option<ADVZCommonQueryData<Types>> {
281    let VidCommonQueryData {
282        height,
283        block_hash,
284        payload_hash: VidCommitment::V0(payload_hash),
285        common: VidCommon::V0(common),
286    } = data
287    else {
288        return None;
289    };
290    Some(ADVZCommonQueryData {
291        height,
292        block_hash,
293        payload_hash,
294        common,
295    })
296}
297
298async fn get_vid_common_handler<Types, State>(
299    req: tide_disco::RequestParams,
300    state: &State,
301    timeout: Duration,
302) -> Result<VidCommonQueryData<Types>, Error>
303where
304    State: 'static + Send + Sync + ReadState,
305    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
306    Types: NodeType,
307    Header<Types>: QueryableHeader<Types>,
308    Payload<Types>: QueryablePayload<Types>,
309{
310    let id = if let Some(height) = req.opt_integer_param("height")? {
311        BlockId::Number(height)
312    } else if let Some(hash) = req.opt_blob_param("hash")? {
313        BlockId::Hash(hash)
314    } else {
315        BlockId::PayloadHash(req.blob_param("payload-hash")?)
316    };
317    let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
318    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
319        resource: id.to_string(),
320    })
321}
322
323pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
324    options: &Options,
325    _: Ver,
326    api_ver: semver::Version,
327) -> Result<Api<State, Error, Ver>, ApiError>
328where
329    State: 'static + Send + Sync + ReadState,
330    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
331    Header<Types>: QueryableHeader<Types>,
332    Payload<Types>: QueryablePayload<Types>,
333{
334    let mut api = load_api::<State, Error, Ver>(
335        options.api_path.as_ref(),
336        include_str!("../api/availability.toml"),
337        options.extensions.clone(),
338    )?;
339    let timeout = options.fetch_timeout;
340    let small_object_range_limit = options.small_object_range_limit;
341    let large_object_range_limit = options.large_object_range_limit;
342
343    api.with_version(api_ver.clone());
344
345    // `LeafQueryData` now contains `Leaf2` and `QC2``, which is a breaking change.
346    // On node startup, all leaves are migrated to `Leaf2`.
347    //
348    // To maintain compatibility with nodes running an older version
349    // (which expect `LeafQueryData` with `Leaf1` and `QC1`),
350    // we downgrade `Leaf2` to `Leaf1` and `QC2` to `QC1` if the API version is V0.
351    // Otherwise, we return the new types.
352    if api_ver.major == 0 {
353        api.at("get_leaf", move |req, state| {
354            get_leaf_handler(req, state, timeout)
355                .map(|res| res.map(downgrade_leaf_query_data))
356                .boxed()
357        })?;
358
359        api.at("get_leaf_range", move |req, state| {
360            get_leaf_range_handler(req, state, timeout, small_object_range_limit)
361                .map(|res| {
362                    res.map(|r| {
363                        r.into_iter()
364                            .map(downgrade_leaf_query_data)
365                            .collect::<Vec<Leaf1QueryData<_>>>()
366                    })
367                })
368                .boxed()
369        })?;
370
371        api.stream("stream_leaves", move |req, state| {
372            async move {
373                let height = req.integer_param("height")?;
374                state
375                    .read(|state| {
376                        async move {
377                            Ok(state
378                                .subscribe_leaves(height)
379                                .await
380                                .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
381                        }
382                        .boxed()
383                    })
384                    .await
385            }
386            .try_flatten_stream()
387            .boxed()
388        })?;
389    } else {
390        api.at("get_leaf", move |req, state| {
391            get_leaf_handler(req, state, timeout).boxed()
392        })?;
393
394        api.at("get_leaf_range", move |req, state| {
395            get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
396        })?;
397
398        api.stream("stream_leaves", move |req, state| {
399            async move {
400                let height = req.integer_param("height")?;
401                state
402                    .read(|state| {
403                        async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
404                    })
405                    .await
406            }
407            .try_flatten_stream()
408            .boxed()
409        })?;
410    }
411
412    // VIDCommon data is version gated after the VID upgrade.
413    // We keep the old struct and data in the API version V0. Starting from V1 we are returning version gated structs.
414    if api_ver.major == 0 {
415        api.at("get_vid_common", move |req, state| {
416            get_vid_common_handler(req, state, timeout)
417                .map(|r| match r {
418                    Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
419                        message: "Incompatible VID version.".to_string(),
420                        status: StatusCode::BAD_REQUEST,
421                    }),
422                    Err(e) => Err(e),
423                })
424                .boxed()
425        })?
426        .stream("stream_vid_common", move |req, state| {
427            async move {
428                let height = req.integer_param("height")?;
429                state
430                    .read(|state| {
431                        async move {
432                            Ok(state.subscribe_vid_common(height).await.map(|data| {
433                                downgrade_vid_common_query_data(data).ok_or(Error::Custom {
434                                    message: "Incompatible VID version.".to_string(),
435                                    status: StatusCode::BAD_REQUEST,
436                                })
437                            }))
438                        }
439                        .boxed()
440                    })
441                    .await
442            }
443            .try_flatten_stream()
444            .boxed()
445        })?;
446    } else {
447        api.at("get_vid_common", move |req, state| {
448            get_vid_common_handler(req, state, timeout).boxed().boxed()
449        })?
450        .stream("stream_vid_common", move |req, state| {
451            async move {
452                let height = req.integer_param("height")?;
453                state
454                    .read(|state| {
455                        async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
456                    })
457                    .await
458            }
459            .try_flatten_stream()
460            .boxed()
461        })?;
462    }
463
464    api.at("get_header", move |req, state| {
465        async move {
466            let id = if let Some(height) = req.opt_integer_param("height")? {
467                BlockId::Number(height)
468            } else if let Some(hash) = req.opt_blob_param("hash")? {
469                BlockId::Hash(hash)
470            } else {
471                BlockId::PayloadHash(req.blob_param("payload-hash")?)
472            };
473            let fetch = state.read(|state| state.get_header(id).boxed()).await;
474            fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
475                resource: id.to_string(),
476            })
477        }
478        .boxed()
479    })?
480    .at("get_header_range", move |req, state| {
481        async move {
482            let from = req.integer_param::<_, usize>("from")?;
483            let until = req.integer_param::<_, usize>("until")?;
484            enforce_range_limit(from, until, large_object_range_limit)?;
485
486            let headers = state
487                .read(|state| state.get_header_range(from..until).boxed())
488                .await;
489            headers
490                .enumerate()
491                .then(|(index, fetch)| async move {
492                    fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
493                        resource: (index + from).to_string(),
494                    })
495                })
496                .try_collect::<Vec<_>>()
497                .await
498        }
499        .boxed()
500    })?
501    .stream("stream_headers", move |req, state| {
502        async move {
503            let height = req.integer_param("height")?;
504            state
505                .read(|state| {
506                    async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
507                })
508                .await
509        }
510        .try_flatten_stream()
511        .boxed()
512    })?
513    .at("get_block", move |req, state| {
514        async move {
515            let id = if let Some(height) = req.opt_integer_param("height")? {
516                BlockId::Number(height)
517            } else if let Some(hash) = req.opt_blob_param("hash")? {
518                BlockId::Hash(hash)
519            } else {
520                BlockId::PayloadHash(req.blob_param("payload-hash")?)
521            };
522            let fetch = state.read(|state| state.get_block(id).boxed()).await;
523            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
524                resource: id.to_string(),
525            })
526        }
527        .boxed()
528    })?
529    .at("get_block_range", move |req, state| {
530        async move {
531            let from = req.integer_param::<_, usize>("from")?;
532            let until = req.integer_param("until")?;
533            enforce_range_limit(from, until, large_object_range_limit)?;
534
535            let blocks = state
536                .read(|state| state.get_block_range(from..until).boxed())
537                .await;
538            blocks
539                .enumerate()
540                .then(|(index, fetch)| async move {
541                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
542                        resource: (index + from).to_string(),
543                    })
544                })
545                .try_collect::<Vec<_>>()
546                .await
547        }
548        .boxed()
549    })?
550    .stream("stream_blocks", move |req, state| {
551        async move {
552            let height = req.integer_param("height")?;
553            state
554                .read(|state| {
555                    async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
556                })
557                .await
558        }
559        .try_flatten_stream()
560        .boxed()
561    })?
562    .at("get_payload", move |req, state| {
563        async move {
564            let id = if let Some(height) = req.opt_integer_param("height")? {
565                BlockId::Number(height)
566            } else if let Some(hash) = req.opt_blob_param("hash")? {
567                BlockId::PayloadHash(hash)
568            } else {
569                BlockId::Hash(req.blob_param("block-hash")?)
570            };
571            let fetch = state.read(|state| state.get_payload(id).boxed()).await;
572            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
573                resource: id.to_string(),
574            })
575        }
576        .boxed()
577    })?
578    .at("get_payload_range", move |req, state| {
579        async move {
580            let from = req.integer_param::<_, usize>("from")?;
581            let until = req.integer_param("until")?;
582            enforce_range_limit(from, until, large_object_range_limit)?;
583
584            let payloads = state
585                .read(|state| state.get_payload_range(from..until).boxed())
586                .await;
587            payloads
588                .enumerate()
589                .then(|(index, fetch)| async move {
590                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
591                        resource: (index + from).to_string(),
592                    })
593                })
594                .try_collect::<Vec<_>>()
595                .await
596        }
597        .boxed()
598    })?
599    .stream("stream_payloads", move |req, state| {
600        async move {
601            let height = req.integer_param("height")?;
602            state
603                .read(|state| {
604                    async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
605                })
606                .await
607        }
608        .try_flatten_stream()
609        .boxed()
610    })?
611    .at("get_transaction_proof", move |req, state| {
612        async move {
613            let tx = get_transaction(req, state, timeout).await?;
614            let height = tx.block.height();
615            let vid = state
616                .read(|state| state.get_vid_common(height as usize))
617                .await
618                .with_timeout(timeout)
619                .await
620                .context(FetchBlockSnafu {
621                    resource: height.to_string(),
622                })?;
623            let proof = tx.block.transaction_proof(&vid, &tx.index).context(
624                InvalidTransactionIndexSnafu {
625                    height,
626                    index: tx.transaction.index(),
627                },
628            )?;
629            Ok(TransactionWithProofQueryData::new(tx.transaction, proof))
630        }
631        .boxed()
632    })?
633    .at("get_transaction", move |req, state| {
634        async move { Ok(get_transaction(req, state, timeout).await?.transaction) }.boxed()
635    })?
636    .stream("stream_transactions", move |req, state| {
637        async move {
638            let height = req.integer_param::<_, usize>("height")?;
639
640            let namespace: Option<i64> = req
641                .opt_integer_param::<_, usize>("namespace")?
642                .map(|i| {
643                    i.try_into().map_err(|err| Error::Custom {
644                        message: format!(
645                            "Invalid 'namespace': could not convert usize to i64: {err}"
646                        ),
647                        status: StatusCode::BAD_REQUEST,
648                    })
649                })
650                .transpose()?;
651
652            state
653                .read(|state| {
654                    async move {
655                        Ok(state
656                            .subscribe_blocks(height)
657                            .await
658                            .map(move |block| {
659                                let transactions = block.enumerate().enumerate();
660                                let header = block.header();
661                                let filtered_txs = transactions
662                                    .filter_map(|(i, (index, _tx))| {
663                                        if let Some(requested_ns) = namespace {
664                                            let ns_id = QueryableHeader::<Types>::namespace_id(
665                                                header,
666                                                &index.ns_index,
667                                            )?;
668
669                                            if ns_id.into() != requested_ns {
670                                                return None;
671                                            }
672                                        }
673
674                                        let tx = block.transaction(&index)?;
675                                        TransactionQueryData::new(tx, &block, &index, i as u64)
676                                    })
677                                    .collect::<Vec<_>>();
678
679                                futures::stream::iter(filtered_txs.into_iter().map(Ok))
680                            })
681                            .flatten())
682                    }
683                    .boxed()
684                })
685                .await
686        }
687        .try_flatten_stream()
688        .boxed()
689    })?
690    .at("get_block_summary", move |req, state| {
691        async move {
692            let id: usize = req.integer_param("height")?;
693
694            let fetch = state.read(|state| state.get_block(id).boxed()).await;
695            fetch
696                .with_timeout(timeout)
697                .await
698                .context(FetchBlockSnafu {
699                    resource: id.to_string(),
700                })
701                .map(BlockSummaryQueryData::from)
702        }
703        .boxed()
704    })?
705    .at("get_block_summary_range", move |req, state| {
706        async move {
707            let from: usize = req.integer_param("from")?;
708            let until: usize = req.integer_param("until")?;
709            enforce_range_limit(from, until, large_object_range_limit)?;
710
711            let blocks = state
712                .read(|state| state.get_block_range(from..until).boxed())
713                .await;
714            let result: Vec<BlockSummaryQueryData<Types>> = blocks
715                .enumerate()
716                .then(|(index, fetch)| async move {
717                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
718                        resource: (index + from).to_string(),
719                    })
720                })
721                .map(|result| result.map(BlockSummaryQueryData::from))
722                .try_collect()
723                .await?;
724
725            Ok(result)
726        }
727        .boxed()
728    })?
729    .at("get_limits", move |_req, _state| {
730        async move {
731            Ok(Limits {
732                small_object_range_limit,
733                large_object_range_limit,
734            })
735        }
736        .boxed()
737    })?
738    .at("get_state_cert", move |req, state| {
739        async move {
740            let epoch = req.integer_param("epoch")?;
741            let fetch = state
742                .read(|state| state.get_state_cert(epoch).boxed())
743                .await;
744            fetch
745                .with_timeout(timeout)
746                .await
747                .context(FetchStateCertSnafu { epoch })
748                .map(StateCertQueryDataV1::from)
749        }
750        .boxed()
751    })?
752    .at("get_state_cert_v2", move |req, state| {
753        async move {
754            let epoch = req.integer_param("epoch")?;
755            let fetch = state
756                .read(|state| state.get_state_cert(epoch).boxed())
757                .await;
758            fetch
759                .with_timeout(timeout)
760                .await
761                .context(FetchStateCertSnafu { epoch })
762        }
763        .boxed()
764    })?;
765    Ok(api)
766}
767
768fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
769    if until.saturating_sub(from) > limit {
770        return Err(Error::RangeLimit { from, until, limit });
771    }
772    Ok(())
773}
774
775async fn get_transaction<Types, State>(
776    req: RequestParams,
777    state: &State,
778    timeout: Duration,
779) -> Result<BlockWithTransaction<Types>, Error>
780where
781    Types: NodeType,
782    Header<Types>: QueryableHeader<Types>,
783    Payload<Types>: QueryablePayload<Types>,
784    State: 'static + Send + Sync + ReadState,
785    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
786{
787    match req.opt_blob_param("hash")? {
788        Some(hash) => state
789            .read(|state| state.get_block_containing_transaction(hash).boxed())
790            .await
791            .with_timeout(timeout)
792            .await
793            .context(FetchTransactionSnafu {
794                resource: hash.to_string(),
795            }),
796        None => {
797            let height: u64 = req.integer_param("height")?;
798            let fetch = state
799                .read(|state| state.get_block(height as usize).boxed())
800                .await;
801            let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
802                resource: height.to_string(),
803            })?;
804            let i: u64 = req.integer_param("index")?;
805            let index = block
806                .payload()
807                .nth(block.metadata(), i as usize)
808                .context(InvalidTransactionIndexSnafu { height, index: i })?;
809            let transaction = block
810                .transaction(&index)
811                .context(InvalidTransactionIndexSnafu { height, index: i })?;
812            let transaction = TransactionQueryData::new(transaction, &block, &index, i)
813                .context(InvalidTransactionIndexSnafu { height, index: i })?;
814            Ok(BlockWithTransaction {
815                transaction,
816                block,
817                index,
818            })
819        },
820    }
821}
822
823#[cfg(test)]
824mod test {
825    use std::{fmt::Debug, time::Duration};
826
827    use async_lock::RwLock;
828    use committable::Committable;
829    use futures::future::FutureExt;
830    use hotshot_example_types::node_types::EpochsTestVersions;
831    use hotshot_types::{
832        data::Leaf2, simple_certificate::QuorumCertificate2,
833        traits::node_implementation::ConsensusTime,
834    };
835    use portpicker::pick_unused_port;
836    use serde::de::DeserializeOwned;
837    use surf_disco::{Client, Error as _};
838    use tempfile::TempDir;
839    use tide_disco::App;
840    use toml::toml;
841
842    use super::*;
843    use crate::{
844        data_source::{storage::AvailabilityStorage, ExtensibleDataSource, VersionedDataSource},
845        status::StatusDataSource,
846        task::BackgroundTask,
847        testing::{
848            consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
849            mocks::{mock_transaction, MockBase, MockHeader, MockPayload, MockTypes, MockVersions},
850        },
851        types::HeightIndexed,
852        ApiState, Error, Header,
853    };
854
855    /// Get the current ledger height and a list of non-empty leaf/block pairs.
856    async fn get_non_empty_blocks(
857        client: &Client<Error, MockBase>,
858    ) -> (
859        u64,
860        Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
861    ) {
862        let mut blocks = vec![];
863        // Ignore the genesis block (start from height 1).
864        for i in 1.. {
865            match client
866                .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
867                .send()
868                .await
869            {
870                Ok(block) => {
871                    if !block.is_empty() {
872                        let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
873                        blocks.push((leaf, block));
874                    }
875                },
876                Err(Error::Availability {
877                    source: super::Error::FetchBlock { .. },
878                }) => {
879                    tracing::info!(
880                        "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
881                    );
882                    return (i, blocks);
883                },
884                Err(err) => panic!("unexpected error {err}"),
885            }
886        }
887        unreachable!()
888    }
889
890    async fn validate(client: &Client<Error, MockBase>, height: u64) {
891        // Check the consistency of every block/leaf pair.
892        for i in 0..height {
893            // Limit the number of blocks we validate in order to
894            // speeed up the tests.
895            if ![0, 1, height / 2, height - 1].contains(&i) {
896                continue;
897            }
898            tracing::info!("validate block {i}/{height}");
899
900            // Check that looking up the leaf various ways returns the correct leaf.
901            let leaf: LeafQueryData<MockTypes> =
902                client.get(&format!("leaf/{i}")).send().await.unwrap();
903            assert_eq!(leaf.height(), i);
904            assert_eq!(
905                leaf,
906                client
907                    .get(&format!("leaf/hash/{}", leaf.hash()))
908                    .send()
909                    .await
910                    .unwrap()
911            );
912
913            // Check that looking up the block various ways returns the correct block.
914            let block: BlockQueryData<MockTypes> =
915                client.get(&format!("block/{i}")).send().await.unwrap();
916            let expected_payload = PayloadQueryData::from(block.clone());
917            assert_eq!(leaf.block_hash(), block.hash());
918            assert_eq!(block.height(), i);
919            assert_eq!(
920                block,
921                client
922                    .get(&format!("block/hash/{}", block.hash()))
923                    .send()
924                    .await
925                    .unwrap()
926            );
927            assert_eq!(
928                *block.header(),
929                client.get(&format!("header/{i}")).send().await.unwrap()
930            );
931            assert_eq!(
932                *block.header(),
933                client
934                    .get(&format!("header/hash/{}", block.hash()))
935                    .send()
936                    .await
937                    .unwrap()
938            );
939            assert_eq!(
940                expected_payload,
941                client.get(&format!("payload/{i}")).send().await.unwrap(),
942            );
943            assert_eq!(
944                expected_payload,
945                client
946                    .get(&format!("payload/block-hash/{}", block.hash()))
947                    .send()
948                    .await
949                    .unwrap(),
950            );
951            // Look up the common VID data.
952            let common: VidCommonQueryData<MockTypes> = client
953                .get(&format!("vid/common/{}", block.height()))
954                .send()
955                .await
956                .unwrap();
957            assert_eq!(common.height(), block.height());
958            assert_eq!(common.block_hash(), block.hash());
959            assert_eq!(common.payload_hash(), block.payload_hash());
960            assert_eq!(
961                common,
962                client
963                    .get(&format!("vid/common/hash/{}", block.hash()))
964                    .send()
965                    .await
966                    .unwrap()
967            );
968
969            let block_summary = client
970                .get(&format!("block/summary/{i}"))
971                .send()
972                .await
973                .unwrap();
974            assert_eq!(
975                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
976                block_summary,
977            );
978            assert_eq!(block_summary.header(), block.header());
979            assert_eq!(block_summary.hash(), block.hash());
980            assert_eq!(block_summary.size(), block.size());
981            assert_eq!(block_summary.num_transactions(), block.num_transactions());
982
983            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
984                .get(&format!("block/summaries/{}/{}", 0, i))
985                .send()
986                .await
987                .unwrap();
988            assert_eq!(block_summaries.len() as u64, i);
989
990            // We should be able to look up the block by payload hash. Note that for duplicate
991            // payloads, these endpoints may return a different block with the same payload, which
992            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
993            // response, only its payload.
994            assert_eq!(
995                block.payload(),
996                client
997                    .get::<BlockQueryData<MockTypes>>(&format!(
998                        "block/payload-hash/{}",
999                        block.payload_hash()
1000                    ))
1001                    .send()
1002                    .await
1003                    .unwrap()
1004                    .payload()
1005            );
1006            assert_eq!(
1007                block.payload_hash(),
1008                client
1009                    .get::<Header<MockTypes>>(&format!(
1010                        "header/payload-hash/{}",
1011                        block.payload_hash()
1012                    ))
1013                    .send()
1014                    .await
1015                    .unwrap()
1016                    .payload_commitment
1017            );
1018            assert_eq!(
1019                block.payload(),
1020                client
1021                    .get::<PayloadQueryData<MockTypes>>(&format!(
1022                        "payload/hash/{}",
1023                        block.payload_hash()
1024                    ))
1025                    .send()
1026                    .await
1027                    .unwrap()
1028                    .data(),
1029            );
1030            assert_eq!(
1031                common.common(),
1032                client
1033                    .get::<VidCommonQueryData<MockTypes>>(&format!(
1034                        "vid/common/payload-hash/{}",
1035                        block.payload_hash()
1036                    ))
1037                    .send()
1038                    .await
1039                    .unwrap()
1040                    .common()
1041            );
1042
1043            // Check that looking up each transaction in the block various ways returns the correct
1044            // transaction.
1045            for (j, txn_from_block) in block.enumerate() {
1046                let txn: TransactionQueryData<MockTypes> = client
1047                    .get(&format!("transaction/{}/{}/noproof", i, j.position))
1048                    .send()
1049                    .await
1050                    .unwrap();
1051                assert_eq!(txn.block_height(), i);
1052                assert_eq!(txn.block_hash(), block.hash());
1053                assert_eq!(txn.index(), j.position as u64);
1054                assert_eq!(txn.hash(), txn_from_block.commit());
1055                assert_eq!(txn.transaction(), &txn_from_block);
1056                // We should be able to look up the transaction by hash. Note that for duplicate
1057                // transactions, this endpoint may return a different transaction with the same
1058                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1059                // `TransactionWithProofQueryData` response, only its commitment.
1060                assert_eq!(
1061                    txn.hash(),
1062                    client
1063                        .get::<TransactionQueryData<MockTypes>>(&format!(
1064                            "transaction/hash/{}/noproof",
1065                            txn.hash()
1066                        ))
1067                        .send()
1068                        .await
1069                        .unwrap()
1070                        .hash()
1071                );
1072
1073                // We can also get the transaction with proof omitted.
1074                assert_eq!(
1075                    txn.hash(),
1076                    client
1077                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1078                            "transaction/{}/{}/proof",
1079                            i, j.position
1080                        ))
1081                        .send()
1082                        .await
1083                        .unwrap()
1084                        .hash()
1085                );
1086                assert_eq!(
1087                    txn.hash(),
1088                    client
1089                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1090                            "transaction/hash/{}/proof",
1091                            txn.hash()
1092                        ))
1093                        .send()
1094                        .await
1095                        .unwrap()
1096                        .hash()
1097                );
1098            }
1099
1100            let block_range: Vec<BlockQueryData<MockTypes>> = client
1101                .get(&format!("block/{}/{}", 0, i))
1102                .send()
1103                .await
1104                .unwrap();
1105
1106            assert_eq!(block_range.len() as u64, i);
1107
1108            let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1109                .get(&format!("leaf/{}/{}", 0, i))
1110                .send()
1111                .await
1112                .unwrap();
1113
1114            assert_eq!(leaf_range.len() as u64, i);
1115
1116            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1117                .get(&format!("payload/{}/{}", 0, i))
1118                .send()
1119                .await
1120                .unwrap();
1121
1122            assert_eq!(payload_range.len() as u64, i);
1123
1124            let header_range: Vec<Header<MockTypes>> = client
1125                .get(&format!("header/{}/{}", 0, i))
1126                .send()
1127                .await
1128                .unwrap();
1129
1130            assert_eq!(header_range.len() as u64, i);
1131        }
1132    }
1133
1134    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1135    async fn test_api() {
1136        // Create the consensus network.
1137        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1138        network.start().await;
1139
1140        // Start the web server.
1141        let port = pick_unused_port().unwrap();
1142        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1143        let options = Options {
1144            small_object_range_limit: 500,
1145            large_object_range_limit: 500,
1146            ..Default::default()
1147        };
1148
1149        app.register_module(
1150            "availability",
1151            define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1152        )
1153        .unwrap();
1154        network.spawn(
1155            "server",
1156            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1157        );
1158
1159        // Start a client.
1160        let client = Client::<Error, MockBase>::new(
1161            format!("http://localhost:{port}/availability")
1162                .parse()
1163                .unwrap(),
1164        );
1165        assert!(client.connect(Some(Duration::from_secs(60))).await);
1166        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1167
1168        // Submit a few blocks and make sure each one gets reflected in the query service and
1169        // preserves the consistency of the data and indices.
1170        let leaves = client
1171            .socket("stream/leaves/0")
1172            .subscribe::<LeafQueryData<MockTypes>>()
1173            .await
1174            .unwrap();
1175        let headers = client
1176            .socket("stream/headers/0")
1177            .subscribe::<Header<MockTypes>>()
1178            .await
1179            .unwrap();
1180        let blocks = client
1181            .socket("stream/blocks/0")
1182            .subscribe::<BlockQueryData<MockTypes>>()
1183            .await
1184            .unwrap();
1185        let vid_common = client
1186            .socket("stream/vid/common/0")
1187            .subscribe::<VidCommonQueryData<MockTypes>>()
1188            .await
1189            .unwrap();
1190        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1191        for nonce in 0..3 {
1192            let txn = mock_transaction(vec![nonce]);
1193            network.submit_transaction(txn).await;
1194
1195            // Wait for the transaction to be finalized.
1196            let (i, leaf, block, common) = loop {
1197                tracing::info!("waiting for block with transaction {}", nonce);
1198                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1199                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1200                let leaf = leaf.unwrap();
1201                let header = header.unwrap();
1202                let block = block.unwrap();
1203                let common = common.unwrap();
1204                assert_eq!(leaf.height() as usize, i);
1205                assert_eq!(leaf.block_hash(), block.hash());
1206                assert_eq!(block.header(), &header);
1207                assert_eq!(common.height() as usize, i);
1208                if !block.is_empty() {
1209                    break (i, leaf, block, common);
1210                }
1211            };
1212            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1213            assert_eq!(
1214                block,
1215                client.get(&format!("block/{i}")).send().await.unwrap()
1216            );
1217            assert_eq!(
1218                common,
1219                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1220            );
1221
1222            validate(&client, (i + 1) as u64).await;
1223        }
1224
1225        network.shut_down().await;
1226    }
1227
1228    async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1229        // Check the consistency of every block/leaf pair.
1230        for i in 0..height {
1231            // Limit the number of blocks we validate in order to
1232            // speeed up the tests.
1233            if ![0, 1, height / 2, height - 1].contains(&i) {
1234                continue;
1235            }
1236            tracing::info!("validate block {i}/{height}");
1237
1238            // Check that looking up the leaf various ways returns the correct leaf.
1239            let leaf: Leaf1QueryData<MockTypes> =
1240                client.get(&format!("leaf/{i}")).send().await.unwrap();
1241            assert_eq!(leaf.leaf.height(), i);
1242            assert_eq!(
1243                leaf,
1244                client
1245                    .get(&format!(
1246                        "leaf/hash/{}",
1247                        <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1248                    ))
1249                    .send()
1250                    .await
1251                    .unwrap()
1252            );
1253
1254            // Check that looking up the block various ways returns the correct block.
1255            let block: BlockQueryData<MockTypes> =
1256                client.get(&format!("block/{i}")).send().await.unwrap();
1257            let expected_payload = PayloadQueryData::from(block.clone());
1258            assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1259            assert_eq!(block.height(), i);
1260            assert_eq!(
1261                block,
1262                client
1263                    .get(&format!("block/hash/{}", block.hash()))
1264                    .send()
1265                    .await
1266                    .unwrap()
1267            );
1268            assert_eq!(
1269                *block.header(),
1270                client.get(&format!("header/{i}")).send().await.unwrap()
1271            );
1272            assert_eq!(
1273                *block.header(),
1274                client
1275                    .get(&format!("header/hash/{}", block.hash()))
1276                    .send()
1277                    .await
1278                    .unwrap()
1279            );
1280            assert_eq!(
1281                expected_payload,
1282                client.get(&format!("payload/{i}")).send().await.unwrap(),
1283            );
1284            assert_eq!(
1285                expected_payload,
1286                client
1287                    .get(&format!("payload/block-hash/{}", block.hash()))
1288                    .send()
1289                    .await
1290                    .unwrap(),
1291            );
1292            // Look up the common VID data.
1293            let common: ADVZCommonQueryData<MockTypes> = client
1294                .get(&format!("vid/common/{}", block.height()))
1295                .send()
1296                .await
1297                .unwrap();
1298            assert_eq!(common.height(), block.height());
1299            assert_eq!(common.block_hash(), block.hash());
1300            assert_eq!(
1301                VidCommitment::V0(common.payload_hash()),
1302                block.payload_hash(),
1303            );
1304            assert_eq!(
1305                common,
1306                client
1307                    .get(&format!("vid/common/hash/{}", block.hash()))
1308                    .send()
1309                    .await
1310                    .unwrap()
1311            );
1312
1313            let block_summary = client
1314                .get(&format!("block/summary/{i}"))
1315                .send()
1316                .await
1317                .unwrap();
1318            assert_eq!(
1319                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1320                block_summary,
1321            );
1322            assert_eq!(block_summary.header(), block.header());
1323            assert_eq!(block_summary.hash(), block.hash());
1324            assert_eq!(block_summary.size(), block.size());
1325            assert_eq!(block_summary.num_transactions(), block.num_transactions());
1326
1327            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1328                .get(&format!("block/summaries/{}/{}", 0, i))
1329                .send()
1330                .await
1331                .unwrap();
1332            assert_eq!(block_summaries.len() as u64, i);
1333
1334            // We should be able to look up the block by payload hash. Note that for duplicate
1335            // payloads, these endpoints may return a different block with the same payload, which
1336            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
1337            // response, only its payload.
1338            assert_eq!(
1339                block.payload(),
1340                client
1341                    .get::<BlockQueryData<MockTypes>>(&format!(
1342                        "block/payload-hash/{}",
1343                        block.payload_hash()
1344                    ))
1345                    .send()
1346                    .await
1347                    .unwrap()
1348                    .payload()
1349            );
1350            assert_eq!(
1351                block.payload_hash(),
1352                client
1353                    .get::<Header<MockTypes>>(&format!(
1354                        "header/payload-hash/{}",
1355                        block.payload_hash()
1356                    ))
1357                    .send()
1358                    .await
1359                    .unwrap()
1360                    .payload_commitment
1361            );
1362            assert_eq!(
1363                block.payload(),
1364                client
1365                    .get::<PayloadQueryData<MockTypes>>(&format!(
1366                        "payload/hash/{}",
1367                        block.payload_hash()
1368                    ))
1369                    .send()
1370                    .await
1371                    .unwrap()
1372                    .data(),
1373            );
1374            assert_eq!(
1375                common.common(),
1376                client
1377                    .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1378                        "vid/common/payload-hash/{}",
1379                        block.payload_hash()
1380                    ))
1381                    .send()
1382                    .await
1383                    .unwrap()
1384                    .common()
1385            );
1386
1387            // Check that looking up each transaction in the block various ways returns the correct
1388            // transaction.
1389            for (j, txn_from_block) in block.enumerate() {
1390                let txn: TransactionQueryData<MockTypes> = client
1391                    .get(&format!("transaction/{}/{}/noproof", i, j.position))
1392                    .send()
1393                    .await
1394                    .unwrap();
1395                assert_eq!(txn.block_height(), i);
1396                assert_eq!(txn.block_hash(), block.hash());
1397                assert_eq!(txn.index(), j.position as u64);
1398                assert_eq!(txn.hash(), txn_from_block.commit());
1399                assert_eq!(txn.transaction(), &txn_from_block);
1400                // We should be able to look up the transaction by hash. Note that for duplicate
1401                // transactions, this endpoint may return a different transaction with the same
1402                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1403                // `TransactionQueryData` response, only its commitment.
1404                assert_eq!(
1405                    txn.hash(),
1406                    client
1407                        .get::<TransactionQueryData<MockTypes>>(&format!(
1408                            "transaction/hash/{}/noproof",
1409                            txn.hash()
1410                        ))
1411                        .send()
1412                        .await
1413                        .unwrap()
1414                        .hash()
1415                );
1416
1417                assert_eq!(
1418                    txn.hash(),
1419                    client
1420                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1421                            "transaction/{}/{}/proof",
1422                            i, j.position
1423                        ))
1424                        .send()
1425                        .await
1426                        .unwrap()
1427                        .hash()
1428                );
1429
1430                assert_eq!(
1431                    txn.hash(),
1432                    client
1433                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1434                            "transaction/hash/{}/proof",
1435                            txn.hash()
1436                        ))
1437                        .send()
1438                        .await
1439                        .unwrap()
1440                        .hash()
1441                );
1442            }
1443
1444            let block_range: Vec<BlockQueryData<MockTypes>> = client
1445                .get(&format!("block/{}/{}", 0, i))
1446                .send()
1447                .await
1448                .unwrap();
1449
1450            assert_eq!(block_range.len() as u64, i);
1451
1452            let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1453                .get(&format!("leaf/{}/{}", 0, i))
1454                .send()
1455                .await
1456                .unwrap();
1457
1458            assert_eq!(leaf_range.len() as u64, i);
1459
1460            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1461                .get(&format!("payload/{}/{}", 0, i))
1462                .send()
1463                .await
1464                .unwrap();
1465
1466            assert_eq!(payload_range.len() as u64, i);
1467
1468            let header_range: Vec<Header<MockTypes>> = client
1469                .get(&format!("header/{}/{}", 0, i))
1470                .send()
1471                .await
1472                .unwrap();
1473
1474            assert_eq!(header_range.len() as u64, i);
1475        }
1476    }
1477
1478    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1479    async fn test_api_epochs() {
1480        // Create the consensus network.
1481        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
1482        let epoch_height = network.epoch_height();
1483        network.start().await;
1484
1485        // Start the web server.
1486        let port = pick_unused_port().unwrap();
1487        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1488        app.register_module(
1489            "availability",
1490            define_api(
1491                &Default::default(),
1492                MockBase::instance(),
1493                "1.0.0".parse().unwrap(),
1494            )
1495            .unwrap(),
1496        )
1497        .unwrap();
1498        network.spawn(
1499            "server",
1500            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1501        );
1502
1503        // Start a client.
1504        let client = Client::<Error, MockBase>::new(
1505            format!("http://localhost:{port}/availability")
1506                .parse()
1507                .unwrap(),
1508        );
1509        assert!(client.connect(Some(Duration::from_secs(60))).await);
1510
1511        // Submit a few blocks and make sure each one gets reflected in the query service and
1512        // preserves the consistency of the data and indices.
1513        let headers = client
1514            .socket("stream/headers/0")
1515            .subscribe::<Header<MockTypes>>()
1516            .await
1517            .unwrap();
1518        let mut chain = headers.enumerate();
1519
1520        loop {
1521            let (i, header) = chain.next().await.unwrap();
1522            let header = header.unwrap();
1523            assert_eq!(header.height(), i as u64);
1524            if header.height() >= 3 * epoch_height {
1525                break;
1526            }
1527        }
1528
1529        for epoch in 1..4 {
1530            let state_cert: StateCertQueryDataV2<MockTypes> = client
1531                .get(&format!("state-cert-v2/{epoch}"))
1532                .send()
1533                .await
1534                .unwrap();
1535            tracing::info!("state-cert: {state_cert:?}");
1536            assert_eq!(state_cert.0.epoch.u64(), epoch);
1537        }
1538
1539        network.shut_down().await;
1540    }
1541
1542    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1543    async fn test_old_api() {
1544        // Create the consensus network.
1545        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1546        network.start().await;
1547
1548        // Start the web server.
1549        let port = pick_unused_port().unwrap();
1550
1551        let options = Options {
1552            small_object_range_limit: 500,
1553            large_object_range_limit: 500,
1554            ..Default::default()
1555        };
1556
1557        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1558        app.register_module(
1559            "availability",
1560            define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1561        )
1562        .unwrap();
1563        network.spawn(
1564            "server",
1565            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1566        );
1567
1568        // Start a client.
1569        let client = Client::<Error, MockBase>::new(
1570            format!("http://localhost:{port}/availability")
1571                .parse()
1572                .unwrap(),
1573        );
1574        assert!(client.connect(Some(Duration::from_secs(60))).await);
1575        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1576
1577        // Submit a few blocks and make sure each one gets reflected in the query service and
1578        // preserves the consistency of the data and indices.
1579        let leaves = client
1580            .socket("stream/leaves/0")
1581            .subscribe::<Leaf1QueryData<MockTypes>>()
1582            .await
1583            .unwrap();
1584        let headers = client
1585            .socket("stream/headers/0")
1586            .subscribe::<Header<MockTypes>>()
1587            .await
1588            .unwrap();
1589        let blocks = client
1590            .socket("stream/blocks/0")
1591            .subscribe::<BlockQueryData<MockTypes>>()
1592            .await
1593            .unwrap();
1594        let vid_common = client
1595            .socket("stream/vid/common/0")
1596            .subscribe::<ADVZCommonQueryData<MockTypes>>()
1597            .await
1598            .unwrap();
1599        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1600        for nonce in 0..3 {
1601            let txn = mock_transaction(vec![nonce]);
1602            network.submit_transaction(txn).await;
1603
1604            // Wait for the transaction to be finalized.
1605            let (i, leaf, block, common) = loop {
1606                tracing::info!("waiting for block with transaction {}", nonce);
1607                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1608                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1609                let leaf = leaf.unwrap();
1610                let header = header.unwrap();
1611                let block = block.unwrap();
1612                let common = common.unwrap();
1613                assert_eq!(leaf.leaf.height() as usize, i);
1614                assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1615                assert_eq!(block.header(), &header);
1616                assert_eq!(common.height() as usize, i);
1617                if !block.is_empty() {
1618                    break (i, leaf, block, common);
1619                }
1620            };
1621            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1622            assert_eq!(
1623                block,
1624                client.get(&format!("block/{i}")).send().await.unwrap()
1625            );
1626            assert_eq!(
1627                common,
1628                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1629            );
1630
1631            validate_old(&client, (i + 1) as u64).await;
1632        }
1633
1634        network.shut_down().await;
1635    }
1636
1637    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1638    async fn test_extensions() {
1639        use hotshot_example_types::node_types::TestVersions;
1640
1641        let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1642        let data_source = ExtensibleDataSource::new(
1643            MockDataSource::create(dir.path(), Default::default())
1644                .await
1645                .unwrap(),
1646            0,
1647        );
1648
1649        // mock up some consensus data.
1650        let leaf =
1651            Leaf2::<MockTypes>::genesis::<MockVersions>(&Default::default(), &Default::default())
1652                .await;
1653        let qc =
1654            QuorumCertificate2::genesis::<TestVersions>(&Default::default(), &Default::default())
1655                .await;
1656        let leaf = LeafQueryData::new(leaf, qc).unwrap();
1657        let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1658        data_source
1659            .append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1660            .await
1661            .unwrap();
1662
1663        // assert that the store has data before we move on to API requests
1664        assert_eq!(
1665            ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1666                .await
1667                .unwrap(),
1668            1
1669        );
1670        assert_eq!(block, data_source.get_block(0).await.await);
1671
1672        // Create the API extensions specification.
1673        let extensions = toml! {
1674            [route.post_ext]
1675            PATH = ["/ext/:val"]
1676            METHOD = "POST"
1677            ":val" = "Integer"
1678
1679            [route.get_ext]
1680            PATH = ["/ext"]
1681            METHOD = "GET"
1682        };
1683
1684        let mut api =
1685            define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1686                &Options {
1687                    extensions: vec![extensions.into()],
1688                    ..Default::default()
1689                },
1690                MockBase::instance(),
1691                "1.0.0".parse().unwrap(),
1692            )
1693            .unwrap();
1694        api.get("get_ext", |_, state| {
1695            async move { Ok(*state.as_ref()) }.boxed()
1696        })
1697        .unwrap()
1698        .post("post_ext", |req, state| {
1699            async move {
1700                *state.as_mut() = req.integer_param("val")?;
1701                Ok(())
1702            }
1703            .boxed()
1704        })
1705        .unwrap();
1706
1707        let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1708        app.register_module("availability", api).unwrap();
1709
1710        let port = pick_unused_port().unwrap();
1711        let _server = BackgroundTask::spawn(
1712            "server",
1713            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1714        );
1715
1716        let client = Client::<Error, MockBase>::new(
1717            format!("http://localhost:{port}/availability")
1718                .parse()
1719                .unwrap(),
1720        );
1721        assert!(client.connect(Some(Duration::from_secs(60))).await);
1722
1723        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1724        client.post::<()>("ext/42").send().await.unwrap();
1725        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1726
1727        // Ensure we can still access the built-in functionality.
1728        assert_eq!(
1729            client
1730                .get::<MockHeader>("header/0")
1731                .send()
1732                .await
1733                .unwrap()
1734                .block_number,
1735            0
1736        );
1737    }
1738
1739    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1740    async fn test_range_limit() {
1741        let large_object_range_limit = 2;
1742        let small_object_range_limit = 3;
1743
1744        // Create the consensus network.
1745        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1746        network.start().await;
1747
1748        // Start the web server.
1749        let port = pick_unused_port().unwrap();
1750        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1751        app.register_module(
1752            "availability",
1753            define_api(
1754                &Options {
1755                    large_object_range_limit,
1756                    small_object_range_limit,
1757                    ..Default::default()
1758                },
1759                MockBase::instance(),
1760                "1.0.0".parse().unwrap(),
1761            )
1762            .unwrap(),
1763        )
1764        .unwrap();
1765        network.spawn(
1766            "server",
1767            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1768        );
1769
1770        // Start a client.
1771        let client = Client::<Error, MockBase>::new(
1772            format!("http://localhost:{port}/availability")
1773                .parse()
1774                .unwrap(),
1775        );
1776        assert!(client.connect(Some(Duration::from_secs(60))).await);
1777
1778        // Check reported limits.
1779        assert_eq!(
1780            client.get::<Limits>("limits").send().await.unwrap(),
1781            Limits {
1782                small_object_range_limit,
1783                large_object_range_limit
1784            }
1785        );
1786
1787        // Wait for enough blocks to be produced.
1788        client
1789            .socket("stream/blocks/0")
1790            .subscribe::<BlockQueryData<MockTypes>>()
1791            .await
1792            .unwrap()
1793            .take(small_object_range_limit + 1)
1794            .try_collect::<Vec<_>>()
1795            .await
1796            .unwrap();
1797
1798        async fn check_limit<T: DeserializeOwned + Debug>(
1799            client: &Client<Error, MockBase>,
1800            req: &str,
1801            limit: usize,
1802        ) {
1803            let range: Vec<T> = client
1804                .get(&format!("{req}/0/{limit}"))
1805                .send()
1806                .await
1807                .unwrap();
1808            assert_eq!(range.len(), limit);
1809            let err = client
1810                .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1811                .send()
1812                .await
1813                .unwrap_err();
1814            assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1815        }
1816
1817        check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1818        check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1819        check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1820        check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1821            .await;
1822        check_limit::<BlockSummaryQueryData<MockTypes>>(
1823            &client,
1824            "block/summaries",
1825            large_object_range_limit,
1826        )
1827        .await;
1828
1829        network.shut_down().await;
1830    }
1831
1832    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1833    async fn test_header_endpoint() {
1834        // Create the consensus network.
1835        let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init().await;
1836        network.start().await;
1837
1838        // Start the web server.
1839        let port = pick_unused_port().unwrap();
1840        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1841        app.register_module(
1842            "availability",
1843            define_api(
1844                &Default::default(),
1845                MockBase::instance(),
1846                "1.0.0".parse().unwrap(),
1847            )
1848            .unwrap(),
1849        )
1850        .unwrap();
1851        network.spawn(
1852            "server",
1853            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1854        );
1855
1856        let ds = network.data_source();
1857
1858        // Get the current block height and fetch header for some later block height
1859        // This fetch will only resolve when we receive a leaf or block for that block height
1860        let block_height = ds.block_height().await.unwrap();
1861        let fetch = ds
1862            .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1863            .await;
1864
1865        assert!(fetch.is_pending());
1866        let header = fetch.await;
1867        assert_eq!(header.height() as usize, block_height + 25);
1868
1869        network.shut_down().await;
1870    }
1871
1872    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1873    async fn test_leaf_only_ds() {
1874        // Create the consensus network.
1875        let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init_with_leaf_ds().await;
1876        network.start().await;
1877
1878        // Start the web server.
1879        let port = pick_unused_port().unwrap();
1880        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1881        app.register_module(
1882            "availability",
1883            define_api(
1884                &Default::default(),
1885                MockBase::instance(),
1886                "1.0.0".parse().unwrap(),
1887            )
1888            .unwrap(),
1889        )
1890        .unwrap();
1891        network.spawn(
1892            "server",
1893            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1894        );
1895
1896        // Start a client.
1897        let client = Client::<Error, MockBase>::new(
1898            format!("http://localhost:{port}/availability")
1899                .parse()
1900                .unwrap(),
1901        );
1902        assert!(client.connect(Some(Duration::from_secs(60))).await);
1903
1904        // Wait for some headers to be produced.
1905        client
1906            .socket("stream/headers/0")
1907            .subscribe::<Header<MockTypes>>()
1908            .await
1909            .unwrap()
1910            .take(5)
1911            .try_collect::<Vec<_>>()
1912            .await
1913            .unwrap();
1914
1915        // Wait for some leaves to be produced.
1916        client
1917            .socket("stream/leaves/5")
1918            .subscribe::<LeafQueryData<MockTypes>>()
1919            .await
1920            .unwrap()
1921            .take(5)
1922            .try_collect::<Vec<_>>()
1923            .await
1924            .unwrap();
1925
1926        let ds = network.data_source();
1927
1928        // Get the current block height and fetch header for some later block height
1929        // This fetch will only resolve if we get a block notification
1930        // However, this block will never be stored
1931        let block_height = ds.block_height().await.unwrap();
1932        let target_block_height = block_height + 20;
1933        let fetch = ds
1934            .get_block(BlockId::<MockTypes>::Number(target_block_height))
1935            .await;
1936
1937        assert!(fetch.is_pending());
1938        let block = fetch.await;
1939        assert_eq!(block.height() as usize, target_block_height);
1940
1941        let mut tx = ds.read().await.unwrap();
1942        tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1943            .await
1944            .unwrap_err();
1945        drop(tx);
1946
1947        network.shut_down().await;
1948    }
1949}