1use std::{fmt::Display, path::PathBuf, time::Duration};
30
31use derive_more::From;
32use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
33use hotshot_types::{
34 data::{Leaf, Leaf2, QuorumProposal, VidCommitment},
35 simple_certificate::QuorumCertificate,
36 traits::node_implementation::NodeType,
37};
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, Snafu};
40use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, StatusCode};
41use vbs::version::StaticVersionType;
42
43use crate::{api::load_api, Header, Payload, QueryError, VidCommon};
44
45pub(crate) mod data_source;
46mod fetch;
47pub(crate) mod query_data;
48pub use data_source::*;
49pub use fetch::Fetch;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54 pub api_path: Option<PathBuf>,
55
56 pub fetch_timeout: Duration,
62
63 pub extensions: Vec<toml::Value>,
68
69 pub small_object_range_limit: usize,
77
78 pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89 fn default() -> Self {
90 Self {
91 api_path: None,
92 fetch_timeout: Duration::from_millis(500),
93 extensions: vec![],
94 large_object_range_limit: 100,
95 small_object_range_limit: 500,
96 }
97 }
98}
99
100#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
101#[snafu(visibility(pub))]
102pub enum Error {
103 Request {
104 source: RequestError,
105 },
106 #[snafu(display("leaf {resource} missing or not available"))]
107 #[from(ignore)]
108 FetchLeaf {
109 resource: String,
110 },
111 #[snafu(display("block {resource} missing or not available"))]
112 #[from(ignore)]
113 FetchBlock {
114 resource: String,
115 },
116 #[snafu(display("header {resource} missing or not available"))]
117 #[from(ignore)]
118 FetchHeader {
119 resource: String,
120 },
121 #[snafu(display("transaction {resource} missing or not available"))]
122 #[from(ignore)]
123 FetchTransaction {
124 resource: String,
125 },
126 #[snafu(display("transaction index {index} out of range for block {height}"))]
127 #[from(ignore)]
128 InvalidTransactionIndex {
129 height: u64,
130 index: u64,
131 },
132 #[snafu(display("request for range {from}..{until} exceeds limit {limit}"))]
133 #[from(ignore)]
134 RangeLimit {
135 from: usize,
136 until: usize,
137 limit: usize,
138 },
139 #[snafu(display("{source}"))]
140 Query {
141 source: QueryError,
142 },
143 #[snafu(display("State cert for epoch {epoch} not found"))]
144 #[from(ignore)]
145 FetchStateCert {
146 epoch: u64,
147 },
148 #[snafu(display("error {status}: {message}"))]
149 Custom {
150 message: String,
151 status: StatusCode,
152 },
153}
154
155impl Error {
156 pub fn internal<M: Display>(message: M) -> Self {
157 Self::Custom {
158 message: message.to_string(),
159 status: StatusCode::INTERNAL_SERVER_ERROR,
160 }
161 }
162
163 pub fn status(&self) -> StatusCode {
164 match self {
165 Self::Request { .. } | Self::RangeLimit { .. } => StatusCode::BAD_REQUEST,
166 Self::FetchLeaf { .. }
167 | Self::FetchBlock { .. }
168 | Self::FetchTransaction { .. }
169 | Self::FetchHeader { .. }
170 | Self::FetchStateCert { .. } => StatusCode::NOT_FOUND,
171 Self::InvalidTransactionIndex { .. } | Self::Query { .. } => StatusCode::NOT_FOUND,
172 Self::Custom { status, .. } => *status,
173 }
174 }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(bound = "")]
179pub struct Leaf1QueryData<Types: NodeType> {
180 pub(crate) leaf: Leaf<Types>,
181 pub(crate) qc: QuorumCertificate<Types>,
182}
183
184impl<Types: NodeType> Leaf1QueryData<Types> {
185 pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
186 Self { leaf, qc }
187 }
188
189 pub fn leaf(&self) -> &Leaf<Types> {
190 &self.leaf
191 }
192
193 pub fn qc(&self) -> &QuorumCertificate<Types> {
194 &self.qc
195 }
196}
197
198fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
199 let quorum_proposal = QuorumProposal {
205 block_header: leaf2.block_header().clone(),
206 view_number: leaf2.view_number(),
207 justify_qc: leaf2.justify_qc().to_qc(),
208 upgrade_certificate: leaf2.upgrade_certificate(),
209 proposal_certificate: None,
210 };
211 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
212 if let Some(payload) = leaf2.block_payload() {
213 leaf.fill_block_payload_unchecked(payload);
214 }
215 leaf
216}
217
218fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
219 Leaf1QueryData {
220 leaf: downgrade_leaf(leaf.leaf),
221 qc: leaf.qc.to_qc(),
222 }
223}
224
225async fn get_leaf_handler<Types, State>(
226 req: tide_disco::RequestParams,
227 state: &State,
228 timeout: Duration,
229) -> Result<LeafQueryData<Types>, Error>
230where
231 State: 'static + Send + Sync + ReadState,
232 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
233 Types: NodeType,
234 Header<Types>: QueryableHeader<Types>,
235 Payload<Types>: QueryablePayload<Types>,
236{
237 let id = match req.opt_integer_param("height")? {
238 Some(height) => LeafId::Number(height),
239 None => LeafId::Hash(req.blob_param("hash")?),
240 };
241 let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
242 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
243 resource: id.to_string(),
244 })
245}
246
247async fn get_leaf_range_handler<Types, State>(
248 req: tide_disco::RequestParams,
249 state: &State,
250 timeout: Duration,
251 small_object_range_limit: usize,
252) -> Result<Vec<LeafQueryData<Types>>, Error>
253where
254 State: 'static + Send + Sync + ReadState,
255 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
256 Types: NodeType,
257 Header<Types>: QueryableHeader<Types>,
258 Payload<Types>: QueryablePayload<Types>,
259{
260 let from = req.integer_param::<_, usize>("from")?;
261 let until = req.integer_param("until")?;
262 enforce_range_limit(from, until, small_object_range_limit)?;
263
264 let leaves = state
265 .read(|state| state.get_leaf_range(from..until).boxed())
266 .await;
267 leaves
268 .enumerate()
269 .then(|(index, fetch)| async move {
270 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
271 resource: (index + from).to_string(),
272 })
273 })
274 .try_collect::<Vec<_>>()
275 .await
276}
277
278fn downgrade_vid_common_query_data<Types: NodeType>(
279 data: VidCommonQueryData<Types>,
280) -> Option<ADVZCommonQueryData<Types>> {
281 let VidCommonQueryData {
282 height,
283 block_hash,
284 payload_hash: VidCommitment::V0(payload_hash),
285 common: VidCommon::V0(common),
286 } = data
287 else {
288 return None;
289 };
290 Some(ADVZCommonQueryData {
291 height,
292 block_hash,
293 payload_hash,
294 common,
295 })
296}
297
298async fn get_vid_common_handler<Types, State>(
299 req: tide_disco::RequestParams,
300 state: &State,
301 timeout: Duration,
302) -> Result<VidCommonQueryData<Types>, Error>
303where
304 State: 'static + Send + Sync + ReadState,
305 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
306 Types: NodeType,
307 Header<Types>: QueryableHeader<Types>,
308 Payload<Types>: QueryablePayload<Types>,
309{
310 let id = if let Some(height) = req.opt_integer_param("height")? {
311 BlockId::Number(height)
312 } else if let Some(hash) = req.opt_blob_param("hash")? {
313 BlockId::Hash(hash)
314 } else {
315 BlockId::PayloadHash(req.blob_param("payload-hash")?)
316 };
317 let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
318 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
319 resource: id.to_string(),
320 })
321}
322
323pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
324 options: &Options,
325 _: Ver,
326 api_ver: semver::Version,
327) -> Result<Api<State, Error, Ver>, ApiError>
328where
329 State: 'static + Send + Sync + ReadState,
330 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
331 Header<Types>: QueryableHeader<Types>,
332 Payload<Types>: QueryablePayload<Types>,
333{
334 let mut api = load_api::<State, Error, Ver>(
335 options.api_path.as_ref(),
336 include_str!("../api/availability.toml"),
337 options.extensions.clone(),
338 )?;
339 let timeout = options.fetch_timeout;
340 let small_object_range_limit = options.small_object_range_limit;
341 let large_object_range_limit = options.large_object_range_limit;
342
343 api.with_version(api_ver.clone());
344
345 if api_ver.major == 0 {
353 api.at("get_leaf", move |req, state| {
354 get_leaf_handler(req, state, timeout)
355 .map(|res| res.map(downgrade_leaf_query_data))
356 .boxed()
357 })?;
358
359 api.at("get_leaf_range", move |req, state| {
360 get_leaf_range_handler(req, state, timeout, small_object_range_limit)
361 .map(|res| {
362 res.map(|r| {
363 r.into_iter()
364 .map(downgrade_leaf_query_data)
365 .collect::<Vec<Leaf1QueryData<_>>>()
366 })
367 })
368 .boxed()
369 })?;
370
371 api.stream("stream_leaves", move |req, state| {
372 async move {
373 let height = req.integer_param("height")?;
374 state
375 .read(|state| {
376 async move {
377 Ok(state
378 .subscribe_leaves(height)
379 .await
380 .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
381 }
382 .boxed()
383 })
384 .await
385 }
386 .try_flatten_stream()
387 .boxed()
388 })?;
389 } else {
390 api.at("get_leaf", move |req, state| {
391 get_leaf_handler(req, state, timeout).boxed()
392 })?;
393
394 api.at("get_leaf_range", move |req, state| {
395 get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
396 })?;
397
398 api.stream("stream_leaves", move |req, state| {
399 async move {
400 let height = req.integer_param("height")?;
401 state
402 .read(|state| {
403 async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
404 })
405 .await
406 }
407 .try_flatten_stream()
408 .boxed()
409 })?;
410 }
411
412 if api_ver.major == 0 {
415 api.at("get_vid_common", move |req, state| {
416 get_vid_common_handler(req, state, timeout)
417 .map(|r| match r {
418 Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
419 message: "Incompatible VID version.".to_string(),
420 status: StatusCode::BAD_REQUEST,
421 }),
422 Err(e) => Err(e),
423 })
424 .boxed()
425 })?
426 .stream("stream_vid_common", move |req, state| {
427 async move {
428 let height = req.integer_param("height")?;
429 state
430 .read(|state| {
431 async move {
432 Ok(state.subscribe_vid_common(height).await.map(|data| {
433 downgrade_vid_common_query_data(data).ok_or(Error::Custom {
434 message: "Incompatible VID version.".to_string(),
435 status: StatusCode::BAD_REQUEST,
436 })
437 }))
438 }
439 .boxed()
440 })
441 .await
442 }
443 .try_flatten_stream()
444 .boxed()
445 })?;
446 } else {
447 api.at("get_vid_common", move |req, state| {
448 get_vid_common_handler(req, state, timeout).boxed().boxed()
449 })?
450 .stream("stream_vid_common", move |req, state| {
451 async move {
452 let height = req.integer_param("height")?;
453 state
454 .read(|state| {
455 async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
456 })
457 .await
458 }
459 .try_flatten_stream()
460 .boxed()
461 })?;
462 }
463
464 api.at("get_header", move |req, state| {
465 async move {
466 let id = if let Some(height) = req.opt_integer_param("height")? {
467 BlockId::Number(height)
468 } else if let Some(hash) = req.opt_blob_param("hash")? {
469 BlockId::Hash(hash)
470 } else {
471 BlockId::PayloadHash(req.blob_param("payload-hash")?)
472 };
473 let fetch = state.read(|state| state.get_header(id).boxed()).await;
474 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
475 resource: id.to_string(),
476 })
477 }
478 .boxed()
479 })?
480 .at("get_header_range", move |req, state| {
481 async move {
482 let from = req.integer_param::<_, usize>("from")?;
483 let until = req.integer_param::<_, usize>("until")?;
484 enforce_range_limit(from, until, large_object_range_limit)?;
485
486 let headers = state
487 .read(|state| state.get_header_range(from..until).boxed())
488 .await;
489 headers
490 .enumerate()
491 .then(|(index, fetch)| async move {
492 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
493 resource: (index + from).to_string(),
494 })
495 })
496 .try_collect::<Vec<_>>()
497 .await
498 }
499 .boxed()
500 })?
501 .stream("stream_headers", move |req, state| {
502 async move {
503 let height = req.integer_param("height")?;
504 state
505 .read(|state| {
506 async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
507 })
508 .await
509 }
510 .try_flatten_stream()
511 .boxed()
512 })?
513 .at("get_block", move |req, state| {
514 async move {
515 let id = if let Some(height) = req.opt_integer_param("height")? {
516 BlockId::Number(height)
517 } else if let Some(hash) = req.opt_blob_param("hash")? {
518 BlockId::Hash(hash)
519 } else {
520 BlockId::PayloadHash(req.blob_param("payload-hash")?)
521 };
522 let fetch = state.read(|state| state.get_block(id).boxed()).await;
523 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
524 resource: id.to_string(),
525 })
526 }
527 .boxed()
528 })?
529 .at("get_block_range", move |req, state| {
530 async move {
531 let from = req.integer_param::<_, usize>("from")?;
532 let until = req.integer_param("until")?;
533 enforce_range_limit(from, until, large_object_range_limit)?;
534
535 let blocks = state
536 .read(|state| state.get_block_range(from..until).boxed())
537 .await;
538 blocks
539 .enumerate()
540 .then(|(index, fetch)| async move {
541 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
542 resource: (index + from).to_string(),
543 })
544 })
545 .try_collect::<Vec<_>>()
546 .await
547 }
548 .boxed()
549 })?
550 .stream("stream_blocks", move |req, state| {
551 async move {
552 let height = req.integer_param("height")?;
553 state
554 .read(|state| {
555 async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
556 })
557 .await
558 }
559 .try_flatten_stream()
560 .boxed()
561 })?
562 .at("get_payload", move |req, state| {
563 async move {
564 let id = if let Some(height) = req.opt_integer_param("height")? {
565 BlockId::Number(height)
566 } else if let Some(hash) = req.opt_blob_param("hash")? {
567 BlockId::PayloadHash(hash)
568 } else {
569 BlockId::Hash(req.blob_param("block-hash")?)
570 };
571 let fetch = state.read(|state| state.get_payload(id).boxed()).await;
572 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
573 resource: id.to_string(),
574 })
575 }
576 .boxed()
577 })?
578 .at("get_payload_range", move |req, state| {
579 async move {
580 let from = req.integer_param::<_, usize>("from")?;
581 let until = req.integer_param("until")?;
582 enforce_range_limit(from, until, large_object_range_limit)?;
583
584 let payloads = state
585 .read(|state| state.get_payload_range(from..until).boxed())
586 .await;
587 payloads
588 .enumerate()
589 .then(|(index, fetch)| async move {
590 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
591 resource: (index + from).to_string(),
592 })
593 })
594 .try_collect::<Vec<_>>()
595 .await
596 }
597 .boxed()
598 })?
599 .stream("stream_payloads", move |req, state| {
600 async move {
601 let height = req.integer_param("height")?;
602 state
603 .read(|state| {
604 async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
605 })
606 .await
607 }
608 .try_flatten_stream()
609 .boxed()
610 })?
611 .at("get_transaction", move |req, state| {
612 async move {
613 match req.opt_blob_param("hash")? {
614 Some(hash) => {
615 let fetch = state
616 .read(|state| state.get_transaction(hash).boxed())
617 .await;
618 fetch
619 .with_timeout(timeout)
620 .await
621 .context(FetchTransactionSnafu {
622 resource: hash.to_string(),
623 })
624 },
625 None => {
626 let height: u64 = req.integer_param("height")?;
627 let fetch = state
628 .read(|state| state.get_block(height as usize).boxed())
629 .await;
630 let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
631 resource: height.to_string(),
632 })?;
633 let i: u64 = req.integer_param("index")?;
634 let index = block
635 .payload()
636 .nth(block.metadata(), i as usize)
637 .context(InvalidTransactionIndexSnafu { height, index: i })?;
638 TransactionQueryData::new(&block, index, i)
639 .context(InvalidTransactionIndexSnafu { height, index: i })
640 },
641 }
642 }
643 .boxed()
644 })?
645 .stream("stream_transactions", move |req, state| {
646 async move {
647 let height = req.integer_param::<_, usize>("height")?;
648
649 let namespace: Option<i64> = req
650 .opt_integer_param::<_, usize>("namespace")?
651 .map(|i| {
652 i.try_into().map_err(|err| Error::Custom {
653 message: format!(
654 "Invalid 'namespace': could not convert usize to i64: {err}"
655 ),
656 status: StatusCode::BAD_REQUEST,
657 })
658 })
659 .transpose()?;
660
661 state
662 .read(|state| {
663 async move {
664 Ok(state
665 .subscribe_blocks(height)
666 .await
667 .map(move |block| {
668 let transactions = block.enumerate().enumerate();
669 let header = block.header();
670 let filtered_txs = transactions
671 .filter_map(|(i, (index, _tx))| {
672 if let Some(requested_ns) = namespace {
673 let ns_id = QueryableHeader::<Types>::namespace_id(
674 header,
675 &index.ns_index,
676 )?;
677
678 if ns_id.into() != requested_ns {
679 return None;
680 }
681 }
682
683 TransactionQueryData::new(&block, index, i as u64)
684 })
685 .collect::<Vec<_>>();
686
687 futures::stream::iter(filtered_txs.into_iter().map(Ok))
688 })
689 .flatten())
690 }
691 .boxed()
692 })
693 .await
694 }
695 .try_flatten_stream()
696 .boxed()
697 })?
698 .at("get_block_summary", move |req, state| {
699 async move {
700 let id: usize = req.integer_param("height")?;
701
702 let fetch = state.read(|state| state.get_block(id).boxed()).await;
703 fetch
704 .with_timeout(timeout)
705 .await
706 .context(FetchBlockSnafu {
707 resource: id.to_string(),
708 })
709 .map(BlockSummaryQueryData::from)
710 }
711 .boxed()
712 })?
713 .at("get_block_summary_range", move |req, state| {
714 async move {
715 let from: usize = req.integer_param("from")?;
716 let until: usize = req.integer_param("until")?;
717 enforce_range_limit(from, until, large_object_range_limit)?;
718
719 let blocks = state
720 .read(|state| state.get_block_range(from..until).boxed())
721 .await;
722 let result: Vec<BlockSummaryQueryData<Types>> = blocks
723 .enumerate()
724 .then(|(index, fetch)| async move {
725 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
726 resource: (index + from).to_string(),
727 })
728 })
729 .map(|result| result.map(BlockSummaryQueryData::from))
730 .try_collect()
731 .await?;
732
733 Ok(result)
734 }
735 .boxed()
736 })?
737 .at("get_limits", move |_req, _state| {
738 async move {
739 Ok(Limits {
740 small_object_range_limit,
741 large_object_range_limit,
742 })
743 }
744 .boxed()
745 })?
746 .at("get_state_cert", move |req, state| {
747 async move {
748 let epoch = req.integer_param("epoch")?;
749 let fetch = state
750 .read(|state| state.get_state_cert(epoch).boxed())
751 .await;
752 fetch
753 .with_timeout(timeout)
754 .await
755 .context(FetchStateCertSnafu { epoch })
756 }
757 .boxed()
758 })?;
759 Ok(api)
760}
761
762fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
763 if until.saturating_sub(from) > limit {
764 return Err(Error::RangeLimit { from, until, limit });
765 }
766 Ok(())
767}
768
769#[cfg(test)]
770mod test {
771 use std::{fmt::Debug, time::Duration};
772
773 use async_lock::RwLock;
774 use committable::Committable;
775 use futures::future::FutureExt;
776 use hotshot_example_types::node_types::EpochsTestVersions;
777 use hotshot_types::{
778 data::Leaf2, simple_certificate::QuorumCertificate2,
779 traits::node_implementation::ConsensusTime,
780 };
781 use portpicker::pick_unused_port;
782 use serde::de::DeserializeOwned;
783 use surf_disco::{Client, Error as _};
784 use tempfile::TempDir;
785 use tide_disco::App;
786 use toml::toml;
787
788 use super::*;
789 use crate::{
790 data_source::{storage::AvailabilityStorage, ExtensibleDataSource, VersionedDataSource},
791 status::StatusDataSource,
792 task::BackgroundTask,
793 testing::{
794 consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
795 mocks::{mock_transaction, MockBase, MockHeader, MockPayload, MockTypes, MockVersions},
796 setup_test,
797 },
798 types::HeightIndexed,
799 ApiState, Error, Header,
800 };
801
802 async fn get_non_empty_blocks(
804 client: &Client<Error, MockBase>,
805 ) -> (
806 u64,
807 Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
808 ) {
809 let mut blocks = vec![];
810 for i in 1.. {
812 match client
813 .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
814 .send()
815 .await
816 {
817 Ok(block) => {
818 if !block.is_empty() {
819 let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
820 blocks.push((leaf, block));
821 }
822 },
823 Err(Error::Availability {
824 source: super::Error::FetchBlock { .. },
825 }) => {
826 tracing::info!(
827 "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
828 );
829 return (i, blocks);
830 },
831 Err(err) => panic!("unexpected error {err}"),
832 }
833 }
834 unreachable!()
835 }
836
837 async fn validate(client: &Client<Error, MockBase>, height: u64) {
838 for i in 0..height {
840 if ![0, 1, height / 2, height - 1].contains(&i) {
843 continue;
844 }
845 tracing::info!("validate block {i}/{height}");
846
847 let leaf: LeafQueryData<MockTypes> =
849 client.get(&format!("leaf/{i}")).send().await.unwrap();
850 assert_eq!(leaf.height(), i);
851 assert_eq!(
852 leaf,
853 client
854 .get(&format!("leaf/hash/{}", leaf.hash()))
855 .send()
856 .await
857 .unwrap()
858 );
859
860 let block: BlockQueryData<MockTypes> =
862 client.get(&format!("block/{i}")).send().await.unwrap();
863 let expected_payload = PayloadQueryData::from(block.clone());
864 assert_eq!(leaf.block_hash(), block.hash());
865 assert_eq!(block.height(), i);
866 assert_eq!(
867 block,
868 client
869 .get(&format!("block/hash/{}", block.hash()))
870 .send()
871 .await
872 .unwrap()
873 );
874 assert_eq!(
875 *block.header(),
876 client.get(&format!("header/{i}")).send().await.unwrap()
877 );
878 assert_eq!(
879 *block.header(),
880 client
881 .get(&format!("header/hash/{}", block.hash()))
882 .send()
883 .await
884 .unwrap()
885 );
886 assert_eq!(
887 expected_payload,
888 client.get(&format!("payload/{i}")).send().await.unwrap(),
889 );
890 assert_eq!(
891 expected_payload,
892 client
893 .get(&format!("payload/block-hash/{}", block.hash()))
894 .send()
895 .await
896 .unwrap(),
897 );
898 let common: VidCommonQueryData<MockTypes> = client
900 .get(&format!("vid/common/{}", block.height()))
901 .send()
902 .await
903 .unwrap();
904 assert_eq!(common.height(), block.height());
905 assert_eq!(common.block_hash(), block.hash());
906 assert_eq!(common.payload_hash(), block.payload_hash());
907 assert_eq!(
908 common,
909 client
910 .get(&format!("vid/common/hash/{}", block.hash()))
911 .send()
912 .await
913 .unwrap()
914 );
915
916 let block_summary = client
917 .get(&format!("block/summary/{i}"))
918 .send()
919 .await
920 .unwrap();
921 assert_eq!(
922 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
923 block_summary,
924 );
925 assert_eq!(block_summary.header(), block.header());
926 assert_eq!(block_summary.hash(), block.hash());
927 assert_eq!(block_summary.size(), block.size());
928 assert_eq!(block_summary.num_transactions(), block.num_transactions());
929
930 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
931 .get(&format!("block/summaries/{}/{}", 0, i))
932 .send()
933 .await
934 .unwrap();
935 assert_eq!(block_summaries.len() as u64, i);
936
937 assert_eq!(
942 block.payload(),
943 client
944 .get::<BlockQueryData<MockTypes>>(&format!(
945 "block/payload-hash/{}",
946 block.payload_hash()
947 ))
948 .send()
949 .await
950 .unwrap()
951 .payload()
952 );
953 assert_eq!(
954 block.payload_hash(),
955 client
956 .get::<Header<MockTypes>>(&format!(
957 "header/payload-hash/{}",
958 block.payload_hash()
959 ))
960 .send()
961 .await
962 .unwrap()
963 .payload_commitment
964 );
965 assert_eq!(
966 block.payload(),
967 client
968 .get::<PayloadQueryData<MockTypes>>(&format!(
969 "payload/hash/{}",
970 block.payload_hash()
971 ))
972 .send()
973 .await
974 .unwrap()
975 .data(),
976 );
977 assert_eq!(
978 common.common(),
979 client
980 .get::<VidCommonQueryData<MockTypes>>(&format!(
981 "vid/common/payload-hash/{}",
982 block.payload_hash()
983 ))
984 .send()
985 .await
986 .unwrap()
987 .common()
988 );
989
990 for (j, txn_from_block) in block.enumerate() {
993 let txn: TransactionQueryData<MockTypes> = client
994 .get(&format!("transaction/{}/{}", i, j.position))
995 .send()
996 .await
997 .unwrap();
998 assert_eq!(txn.block_height(), i);
999 assert_eq!(txn.block_hash(), block.hash());
1000 assert_eq!(txn.index(), j.position as u64);
1001 assert_eq!(txn.hash(), txn_from_block.commit());
1002 assert_eq!(txn.transaction(), &txn_from_block);
1003 assert_eq!(
1008 txn.hash(),
1009 client
1010 .get::<TransactionQueryData<MockTypes>>(&format!(
1011 "transaction/hash/{}",
1012 txn.hash()
1013 ))
1014 .send()
1015 .await
1016 .unwrap()
1017 .hash()
1018 );
1019 }
1020
1021 let block_range: Vec<BlockQueryData<MockTypes>> = client
1022 .get(&format!("block/{}/{}", 0, i))
1023 .send()
1024 .await
1025 .unwrap();
1026
1027 assert_eq!(block_range.len() as u64, i);
1028
1029 let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1030 .get(&format!("leaf/{}/{}", 0, i))
1031 .send()
1032 .await
1033 .unwrap();
1034
1035 assert_eq!(leaf_range.len() as u64, i);
1036
1037 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1038 .get(&format!("payload/{}/{}", 0, i))
1039 .send()
1040 .await
1041 .unwrap();
1042
1043 assert_eq!(payload_range.len() as u64, i);
1044
1045 let header_range: Vec<Header<MockTypes>> = client
1046 .get(&format!("header/{}/{}", 0, i))
1047 .send()
1048 .await
1049 .unwrap();
1050
1051 assert_eq!(header_range.len() as u64, i);
1052 }
1053 }
1054
1055 #[tokio::test(flavor = "multi_thread")]
1056 async fn test_api() {
1057 setup_test();
1058
1059 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1061 network.start().await;
1062
1063 let port = pick_unused_port().unwrap();
1065 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1066 let options = Options {
1067 small_object_range_limit: 500,
1068 large_object_range_limit: 500,
1069 ..Default::default()
1070 };
1071
1072 app.register_module(
1073 "availability",
1074 define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1075 )
1076 .unwrap();
1077 network.spawn(
1078 "server",
1079 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1080 );
1081
1082 let client = Client::<Error, MockBase>::new(
1084 format!("http://localhost:{port}/availability")
1085 .parse()
1086 .unwrap(),
1087 );
1088 assert!(client.connect(Some(Duration::from_secs(60))).await);
1089 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1090
1091 let leaves = client
1094 .socket("stream/leaves/0")
1095 .subscribe::<LeafQueryData<MockTypes>>()
1096 .await
1097 .unwrap();
1098 let headers = client
1099 .socket("stream/headers/0")
1100 .subscribe::<Header<MockTypes>>()
1101 .await
1102 .unwrap();
1103 let blocks = client
1104 .socket("stream/blocks/0")
1105 .subscribe::<BlockQueryData<MockTypes>>()
1106 .await
1107 .unwrap();
1108 let vid_common = client
1109 .socket("stream/vid/common/0")
1110 .subscribe::<VidCommonQueryData<MockTypes>>()
1111 .await
1112 .unwrap();
1113 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1114 for nonce in 0..3 {
1115 let txn = mock_transaction(vec![nonce]);
1116 network.submit_transaction(txn).await;
1117
1118 let (i, leaf, block, common) = loop {
1120 tracing::info!("waiting for block with transaction {}", nonce);
1121 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1122 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1123 let leaf = leaf.unwrap();
1124 let header = header.unwrap();
1125 let block = block.unwrap();
1126 let common = common.unwrap();
1127 assert_eq!(leaf.height() as usize, i);
1128 assert_eq!(leaf.block_hash(), block.hash());
1129 assert_eq!(block.header(), &header);
1130 assert_eq!(common.height() as usize, i);
1131 if !block.is_empty() {
1132 break (i, leaf, block, common);
1133 }
1134 };
1135 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1136 assert_eq!(
1137 block,
1138 client.get(&format!("block/{i}")).send().await.unwrap()
1139 );
1140 assert_eq!(
1141 common,
1142 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1143 );
1144
1145 validate(&client, (i + 1) as u64).await;
1146 }
1147
1148 network.shut_down().await;
1149 }
1150
1151 async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1152 for i in 0..height {
1154 if ![0, 1, height / 2, height - 1].contains(&i) {
1157 continue;
1158 }
1159 tracing::info!("validate block {i}/{height}");
1160
1161 let leaf: Leaf1QueryData<MockTypes> =
1163 client.get(&format!("leaf/{i}")).send().await.unwrap();
1164 assert_eq!(leaf.leaf.height(), i);
1165 assert_eq!(
1166 leaf,
1167 client
1168 .get(&format!(
1169 "leaf/hash/{}",
1170 <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1171 ))
1172 .send()
1173 .await
1174 .unwrap()
1175 );
1176
1177 let block: BlockQueryData<MockTypes> =
1179 client.get(&format!("block/{i}")).send().await.unwrap();
1180 let expected_payload = PayloadQueryData::from(block.clone());
1181 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1182 assert_eq!(block.height(), i);
1183 assert_eq!(
1184 block,
1185 client
1186 .get(&format!("block/hash/{}", block.hash()))
1187 .send()
1188 .await
1189 .unwrap()
1190 );
1191 assert_eq!(
1192 *block.header(),
1193 client.get(&format!("header/{i}")).send().await.unwrap()
1194 );
1195 assert_eq!(
1196 *block.header(),
1197 client
1198 .get(&format!("header/hash/{}", block.hash()))
1199 .send()
1200 .await
1201 .unwrap()
1202 );
1203 assert_eq!(
1204 expected_payload,
1205 client.get(&format!("payload/{i}")).send().await.unwrap(),
1206 );
1207 assert_eq!(
1208 expected_payload,
1209 client
1210 .get(&format!("payload/block-hash/{}", block.hash()))
1211 .send()
1212 .await
1213 .unwrap(),
1214 );
1215 let common: ADVZCommonQueryData<MockTypes> = client
1217 .get(&format!("vid/common/{}", block.height()))
1218 .send()
1219 .await
1220 .unwrap();
1221 assert_eq!(common.height(), block.height());
1222 assert_eq!(common.block_hash(), block.hash());
1223 assert_eq!(
1224 VidCommitment::V0(common.payload_hash()),
1225 block.payload_hash(),
1226 );
1227 assert_eq!(
1228 common,
1229 client
1230 .get(&format!("vid/common/hash/{}", block.hash()))
1231 .send()
1232 .await
1233 .unwrap()
1234 );
1235
1236 let block_summary = client
1237 .get(&format!("block/summary/{i}"))
1238 .send()
1239 .await
1240 .unwrap();
1241 assert_eq!(
1242 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1243 block_summary,
1244 );
1245 assert_eq!(block_summary.header(), block.header());
1246 assert_eq!(block_summary.hash(), block.hash());
1247 assert_eq!(block_summary.size(), block.size());
1248 assert_eq!(block_summary.num_transactions(), block.num_transactions());
1249
1250 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1251 .get(&format!("block/summaries/{}/{}", 0, i))
1252 .send()
1253 .await
1254 .unwrap();
1255 assert_eq!(block_summaries.len() as u64, i);
1256
1257 assert_eq!(
1262 block.payload(),
1263 client
1264 .get::<BlockQueryData<MockTypes>>(&format!(
1265 "block/payload-hash/{}",
1266 block.payload_hash()
1267 ))
1268 .send()
1269 .await
1270 .unwrap()
1271 .payload()
1272 );
1273 assert_eq!(
1274 block.payload_hash(),
1275 client
1276 .get::<Header<MockTypes>>(&format!(
1277 "header/payload-hash/{}",
1278 block.payload_hash()
1279 ))
1280 .send()
1281 .await
1282 .unwrap()
1283 .payload_commitment
1284 );
1285 assert_eq!(
1286 block.payload(),
1287 client
1288 .get::<PayloadQueryData<MockTypes>>(&format!(
1289 "payload/hash/{}",
1290 block.payload_hash()
1291 ))
1292 .send()
1293 .await
1294 .unwrap()
1295 .data(),
1296 );
1297 assert_eq!(
1298 common.common(),
1299 client
1300 .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1301 "vid/common/payload-hash/{}",
1302 block.payload_hash()
1303 ))
1304 .send()
1305 .await
1306 .unwrap()
1307 .common()
1308 );
1309
1310 for (j, txn_from_block) in block.enumerate() {
1313 let txn: TransactionQueryData<MockTypes> = client
1314 .get(&format!("transaction/{}/{}", i, j.position))
1315 .send()
1316 .await
1317 .unwrap();
1318 assert_eq!(txn.block_height(), i);
1319 assert_eq!(txn.block_hash(), block.hash());
1320 assert_eq!(txn.index(), j.position as u64);
1321 assert_eq!(txn.hash(), txn_from_block.commit());
1322 assert_eq!(txn.transaction(), &txn_from_block);
1323 assert_eq!(
1328 txn.hash(),
1329 client
1330 .get::<TransactionQueryData<MockTypes>>(&format!(
1331 "transaction/hash/{}",
1332 txn.hash()
1333 ))
1334 .send()
1335 .await
1336 .unwrap()
1337 .hash()
1338 );
1339 }
1340
1341 let block_range: Vec<BlockQueryData<MockTypes>> = client
1342 .get(&format!("block/{}/{}", 0, i))
1343 .send()
1344 .await
1345 .unwrap();
1346
1347 assert_eq!(block_range.len() as u64, i);
1348
1349 let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1350 .get(&format!("leaf/{}/{}", 0, i))
1351 .send()
1352 .await
1353 .unwrap();
1354
1355 assert_eq!(leaf_range.len() as u64, i);
1356
1357 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1358 .get(&format!("payload/{}/{}", 0, i))
1359 .send()
1360 .await
1361 .unwrap();
1362
1363 assert_eq!(payload_range.len() as u64, i);
1364
1365 let header_range: Vec<Header<MockTypes>> = client
1366 .get(&format!("header/{}/{}", 0, i))
1367 .send()
1368 .await
1369 .unwrap();
1370
1371 assert_eq!(header_range.len() as u64, i);
1372 }
1373 }
1374
1375 #[tokio::test(flavor = "multi_thread")]
1376 async fn test_api_epochs() {
1377 setup_test();
1378
1379 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
1381 let epoch_height = network.epoch_height();
1382 network.start().await;
1383
1384 let port = pick_unused_port().unwrap();
1386 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1387 app.register_module(
1388 "availability",
1389 define_api(
1390 &Default::default(),
1391 MockBase::instance(),
1392 "1.0.0".parse().unwrap(),
1393 )
1394 .unwrap(),
1395 )
1396 .unwrap();
1397 network.spawn(
1398 "server",
1399 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1400 );
1401
1402 let client = Client::<Error, MockBase>::new(
1404 format!("http://localhost:{port}/availability")
1405 .parse()
1406 .unwrap(),
1407 );
1408 assert!(client.connect(Some(Duration::from_secs(60))).await);
1409
1410 let headers = client
1413 .socket("stream/headers/0")
1414 .subscribe::<Header<MockTypes>>()
1415 .await
1416 .unwrap();
1417 let mut chain = headers.enumerate();
1418
1419 loop {
1420 let (i, header) = chain.next().await.unwrap();
1421 let header = header.unwrap();
1422 assert_eq!(header.height(), i as u64);
1423 if header.height() >= 3 * epoch_height {
1424 break;
1425 }
1426 }
1427
1428 for epoch in 1..4 {
1429 let state_cert: StateCertQueryData<MockTypes> = client
1430 .get(&format!("state-cert/{epoch}"))
1431 .send()
1432 .await
1433 .unwrap();
1434 tracing::info!("state-cert: {state_cert:?}");
1435 assert_eq!(state_cert.0.epoch.u64(), epoch);
1436 }
1437
1438 network.shut_down().await;
1439 }
1440
1441 #[tokio::test(flavor = "multi_thread")]
1442 async fn test_old_api() {
1443 setup_test();
1444
1445 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1447 network.start().await;
1448
1449 let port = pick_unused_port().unwrap();
1451
1452 let options = Options {
1453 small_object_range_limit: 500,
1454 large_object_range_limit: 500,
1455 ..Default::default()
1456 };
1457
1458 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1459 app.register_module(
1460 "availability",
1461 define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1462 )
1463 .unwrap();
1464 network.spawn(
1465 "server",
1466 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1467 );
1468
1469 let client = Client::<Error, MockBase>::new(
1471 format!("http://localhost:{port}/availability")
1472 .parse()
1473 .unwrap(),
1474 );
1475 assert!(client.connect(Some(Duration::from_secs(60))).await);
1476 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1477
1478 let leaves = client
1481 .socket("stream/leaves/0")
1482 .subscribe::<Leaf1QueryData<MockTypes>>()
1483 .await
1484 .unwrap();
1485 let headers = client
1486 .socket("stream/headers/0")
1487 .subscribe::<Header<MockTypes>>()
1488 .await
1489 .unwrap();
1490 let blocks = client
1491 .socket("stream/blocks/0")
1492 .subscribe::<BlockQueryData<MockTypes>>()
1493 .await
1494 .unwrap();
1495 let vid_common = client
1496 .socket("stream/vid/common/0")
1497 .subscribe::<ADVZCommonQueryData<MockTypes>>()
1498 .await
1499 .unwrap();
1500 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1501 for nonce in 0..3 {
1502 let txn = mock_transaction(vec![nonce]);
1503 network.submit_transaction(txn).await;
1504
1505 let (i, leaf, block, common) = loop {
1507 tracing::info!("waiting for block with transaction {}", nonce);
1508 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1509 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1510 let leaf = leaf.unwrap();
1511 let header = header.unwrap();
1512 let block = block.unwrap();
1513 let common = common.unwrap();
1514 assert_eq!(leaf.leaf.height() as usize, i);
1515 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1516 assert_eq!(block.header(), &header);
1517 assert_eq!(common.height() as usize, i);
1518 if !block.is_empty() {
1519 break (i, leaf, block, common);
1520 }
1521 };
1522 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1523 assert_eq!(
1524 block,
1525 client.get(&format!("block/{i}")).send().await.unwrap()
1526 );
1527 assert_eq!(
1528 common,
1529 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1530 );
1531
1532 validate_old(&client, (i + 1) as u64).await;
1533 }
1534
1535 network.shut_down().await;
1536 }
1537
1538 #[tokio::test(flavor = "multi_thread")]
1539 async fn test_extensions() {
1540 use hotshot_example_types::node_types::TestVersions;
1541
1542 setup_test();
1543
1544 let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1545 let data_source = ExtensibleDataSource::new(
1546 MockDataSource::create(dir.path(), Default::default())
1547 .await
1548 .unwrap(),
1549 0,
1550 );
1551
1552 let leaf =
1554 Leaf2::<MockTypes>::genesis::<MockVersions>(&Default::default(), &Default::default())
1555 .await;
1556 let qc =
1557 QuorumCertificate2::genesis::<TestVersions>(&Default::default(), &Default::default())
1558 .await;
1559 let leaf = LeafQueryData::new(leaf, qc).unwrap();
1560 let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1561 data_source
1562 .append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1563 .await
1564 .unwrap();
1565
1566 assert_eq!(
1568 ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1569 .await
1570 .unwrap(),
1571 1
1572 );
1573 assert_eq!(block, data_source.get_block(0).await.await);
1574
1575 let extensions = toml! {
1577 [route.post_ext]
1578 PATH = ["/ext/:val"]
1579 METHOD = "POST"
1580 ":val" = "Integer"
1581
1582 [route.get_ext]
1583 PATH = ["/ext"]
1584 METHOD = "GET"
1585 };
1586
1587 let mut api =
1588 define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1589 &Options {
1590 extensions: vec![extensions.into()],
1591 ..Default::default()
1592 },
1593 MockBase::instance(),
1594 "1.0.0".parse().unwrap(),
1595 )
1596 .unwrap();
1597 api.get("get_ext", |_, state| {
1598 async move { Ok(*state.as_ref()) }.boxed()
1599 })
1600 .unwrap()
1601 .post("post_ext", |req, state| {
1602 async move {
1603 *state.as_mut() = req.integer_param("val")?;
1604 Ok(())
1605 }
1606 .boxed()
1607 })
1608 .unwrap();
1609
1610 let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1611 app.register_module("availability", api).unwrap();
1612
1613 let port = pick_unused_port().unwrap();
1614 let _server = BackgroundTask::spawn(
1615 "server",
1616 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1617 );
1618
1619 let client = Client::<Error, MockBase>::new(
1620 format!("http://localhost:{port}/availability")
1621 .parse()
1622 .unwrap(),
1623 );
1624 assert!(client.connect(Some(Duration::from_secs(60))).await);
1625
1626 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1627 client.post::<()>("ext/42").send().await.unwrap();
1628 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1629
1630 assert_eq!(
1632 client
1633 .get::<MockHeader>("header/0")
1634 .send()
1635 .await
1636 .unwrap()
1637 .block_number,
1638 0
1639 );
1640 }
1641
1642 #[tokio::test(flavor = "multi_thread")]
1643 async fn test_range_limit() {
1644 setup_test();
1645
1646 let large_object_range_limit = 2;
1647 let small_object_range_limit = 3;
1648
1649 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1651 network.start().await;
1652
1653 let port = pick_unused_port().unwrap();
1655 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1656 app.register_module(
1657 "availability",
1658 define_api(
1659 &Options {
1660 large_object_range_limit,
1661 small_object_range_limit,
1662 ..Default::default()
1663 },
1664 MockBase::instance(),
1665 "1.0.0".parse().unwrap(),
1666 )
1667 .unwrap(),
1668 )
1669 .unwrap();
1670 network.spawn(
1671 "server",
1672 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1673 );
1674
1675 let client = Client::<Error, MockBase>::new(
1677 format!("http://localhost:{port}/availability")
1678 .parse()
1679 .unwrap(),
1680 );
1681 assert!(client.connect(Some(Duration::from_secs(60))).await);
1682
1683 assert_eq!(
1685 client.get::<Limits>("limits").send().await.unwrap(),
1686 Limits {
1687 small_object_range_limit,
1688 large_object_range_limit
1689 }
1690 );
1691
1692 client
1694 .socket("stream/blocks/0")
1695 .subscribe::<BlockQueryData<MockTypes>>()
1696 .await
1697 .unwrap()
1698 .take(small_object_range_limit + 1)
1699 .try_collect::<Vec<_>>()
1700 .await
1701 .unwrap();
1702
1703 async fn check_limit<T: DeserializeOwned + Debug>(
1704 client: &Client<Error, MockBase>,
1705 req: &str,
1706 limit: usize,
1707 ) {
1708 let range: Vec<T> = client
1709 .get(&format!("{req}/0/{limit}"))
1710 .send()
1711 .await
1712 .unwrap();
1713 assert_eq!(range.len(), limit);
1714 let err = client
1715 .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1716 .send()
1717 .await
1718 .unwrap_err();
1719 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1720 }
1721
1722 check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1723 check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1724 check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1725 check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1726 .await;
1727 check_limit::<BlockSummaryQueryData<MockTypes>>(
1728 &client,
1729 "block/summaries",
1730 large_object_range_limit,
1731 )
1732 .await;
1733
1734 network.shut_down().await;
1735 }
1736
1737 #[tokio::test(flavor = "multi_thread")]
1738 async fn test_header_endpoint() {
1739 setup_test();
1740
1741 let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init().await;
1743 network.start().await;
1744
1745 let port = pick_unused_port().unwrap();
1747 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1748 app.register_module(
1749 "availability",
1750 define_api(
1751 &Default::default(),
1752 MockBase::instance(),
1753 "1.0.0".parse().unwrap(),
1754 )
1755 .unwrap(),
1756 )
1757 .unwrap();
1758 network.spawn(
1759 "server",
1760 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1761 );
1762
1763 let ds = network.data_source();
1764
1765 let block_height = ds.block_height().await.unwrap();
1768 let fetch = ds
1769 .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1770 .await;
1771
1772 assert!(fetch.is_pending());
1773 let header = fetch.await;
1774 assert_eq!(header.height() as usize, block_height + 25);
1775
1776 network.shut_down().await;
1777 }
1778
1779 #[tokio::test(flavor = "multi_thread")]
1780 async fn test_leaf_only_ds() {
1781 setup_test();
1782
1783 let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init_with_leaf_ds().await;
1785 network.start().await;
1786
1787 let port = pick_unused_port().unwrap();
1789 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1790 app.register_module(
1791 "availability",
1792 define_api(
1793 &Default::default(),
1794 MockBase::instance(),
1795 "1.0.0".parse().unwrap(),
1796 )
1797 .unwrap(),
1798 )
1799 .unwrap();
1800 network.spawn(
1801 "server",
1802 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1803 );
1804
1805 let client = Client::<Error, MockBase>::new(
1807 format!("http://localhost:{port}/availability")
1808 .parse()
1809 .unwrap(),
1810 );
1811 assert!(client.connect(Some(Duration::from_secs(60))).await);
1812
1813 client
1815 .socket("stream/headers/0")
1816 .subscribe::<Header<MockTypes>>()
1817 .await
1818 .unwrap()
1819 .take(5)
1820 .try_collect::<Vec<_>>()
1821 .await
1822 .unwrap();
1823
1824 client
1826 .socket("stream/leaves/5")
1827 .subscribe::<LeafQueryData<MockTypes>>()
1828 .await
1829 .unwrap()
1830 .take(5)
1831 .try_collect::<Vec<_>>()
1832 .await
1833 .unwrap();
1834
1835 let ds = network.data_source();
1836
1837 let block_height = ds.block_height().await.unwrap();
1841 let target_block_height = block_height + 20;
1842 let fetch = ds
1843 .get_block(BlockId::<MockTypes>::Number(target_block_height))
1844 .await;
1845
1846 assert!(fetch.is_pending());
1847 let block = fetch.await;
1848 assert_eq!(block.height() as usize, target_block_height);
1849
1850 let mut tx = ds.read().await.unwrap();
1851 tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1852 .await
1853 .unwrap_err();
1854 drop(tx);
1855
1856 network.shut_down().await;
1857 }
1858}