1use std::{fmt::Display, path::PathBuf, time::Duration};
30
31use derive_more::From;
32use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
33use hotshot_types::{
34 data::{Leaf, Leaf2, QuorumProposal, VidCommitment},
35 simple_certificate::QuorumCertificate,
36 traits::node_implementation::NodeType,
37};
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, Snafu};
40use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, RequestParams, StatusCode};
41use vbs::version::StaticVersionType;
42
43use crate::{api::load_api, types::HeightIndexed, Header, Payload, QueryError, VidCommon};
44
45pub(crate) mod data_source;
46mod fetch;
47pub(crate) mod query_data;
48pub use data_source::*;
49pub use fetch::Fetch;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54 pub api_path: Option<PathBuf>,
55
56 pub fetch_timeout: Duration,
62
63 pub extensions: Vec<toml::Value>,
68
69 pub small_object_range_limit: usize,
77
78 pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89 fn default() -> Self {
90 Self {
91 api_path: None,
92 fetch_timeout: Duration::from_millis(500),
93 extensions: vec![],
94 large_object_range_limit: 100,
95 small_object_range_limit: 500,
96 }
97 }
98}
99
100#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
101#[snafu(visibility(pub))]
102pub enum Error {
103 Request {
104 source: RequestError,
105 },
106 #[snafu(display("leaf {resource} missing or not available"))]
107 #[from(ignore)]
108 FetchLeaf {
109 resource: String,
110 },
111 #[snafu(display("block {resource} missing or not available"))]
112 #[from(ignore)]
113 FetchBlock {
114 resource: String,
115 },
116 #[snafu(display("header {resource} missing or not available"))]
117 #[from(ignore)]
118 FetchHeader {
119 resource: String,
120 },
121 #[snafu(display("transaction {resource} missing or not available"))]
122 #[from(ignore)]
123 FetchTransaction {
124 resource: String,
125 },
126 #[snafu(display("transaction index {index} out of range for block {height}"))]
127 #[from(ignore)]
128 InvalidTransactionIndex {
129 height: u64,
130 index: u64,
131 },
132 #[snafu(display("request for range {from}..{until} exceeds limit {limit}"))]
133 #[from(ignore)]
134 RangeLimit {
135 from: usize,
136 until: usize,
137 limit: usize,
138 },
139 #[snafu(display("{source}"))]
140 Query {
141 source: QueryError,
142 },
143 #[snafu(display("State cert for epoch {epoch} not found"))]
144 #[from(ignore)]
145 FetchStateCert {
146 epoch: u64,
147 },
148 #[snafu(display("error {status}: {message}"))]
149 Custom {
150 message: String,
151 status: StatusCode,
152 },
153}
154
155impl Error {
156 pub fn internal<M: Display>(message: M) -> Self {
157 Self::Custom {
158 message: message.to_string(),
159 status: StatusCode::INTERNAL_SERVER_ERROR,
160 }
161 }
162
163 pub fn status(&self) -> StatusCode {
164 match self {
165 Self::Request { .. } | Self::RangeLimit { .. } => StatusCode::BAD_REQUEST,
166 Self::FetchLeaf { .. }
167 | Self::FetchBlock { .. }
168 | Self::FetchTransaction { .. }
169 | Self::FetchHeader { .. }
170 | Self::FetchStateCert { .. } => StatusCode::NOT_FOUND,
171 Self::InvalidTransactionIndex { .. } | Self::Query { .. } => StatusCode::NOT_FOUND,
172 Self::Custom { status, .. } => *status,
173 }
174 }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(bound = "")]
179pub struct Leaf1QueryData<Types: NodeType> {
180 pub(crate) leaf: Leaf<Types>,
181 pub(crate) qc: QuorumCertificate<Types>,
182}
183
184impl<Types: NodeType> Leaf1QueryData<Types> {
185 pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
186 Self { leaf, qc }
187 }
188
189 pub fn leaf(&self) -> &Leaf<Types> {
190 &self.leaf
191 }
192
193 pub fn qc(&self) -> &QuorumCertificate<Types> {
194 &self.qc
195 }
196}
197
198fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
199 let quorum_proposal = QuorumProposal {
205 block_header: leaf2.block_header().clone(),
206 view_number: leaf2.view_number(),
207 justify_qc: leaf2.justify_qc().to_qc(),
208 upgrade_certificate: leaf2.upgrade_certificate(),
209 proposal_certificate: None,
210 };
211 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
212 if let Some(payload) = leaf2.block_payload() {
213 leaf.fill_block_payload_unchecked(payload);
214 }
215 leaf
216}
217
218fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
219 Leaf1QueryData {
220 leaf: downgrade_leaf(leaf.leaf),
221 qc: leaf.qc.to_qc(),
222 }
223}
224
225async fn get_leaf_handler<Types, State>(
226 req: tide_disco::RequestParams,
227 state: &State,
228 timeout: Duration,
229) -> Result<LeafQueryData<Types>, Error>
230where
231 State: 'static + Send + Sync + ReadState,
232 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
233 Types: NodeType,
234 Header<Types>: QueryableHeader<Types>,
235 Payload<Types>: QueryablePayload<Types>,
236{
237 let id = match req.opt_integer_param("height")? {
238 Some(height) => LeafId::Number(height),
239 None => LeafId::Hash(req.blob_param("hash")?),
240 };
241 let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
242 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
243 resource: id.to_string(),
244 })
245}
246
247async fn get_leaf_range_handler<Types, State>(
248 req: tide_disco::RequestParams,
249 state: &State,
250 timeout: Duration,
251 small_object_range_limit: usize,
252) -> Result<Vec<LeafQueryData<Types>>, Error>
253where
254 State: 'static + Send + Sync + ReadState,
255 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
256 Types: NodeType,
257 Header<Types>: QueryableHeader<Types>,
258 Payload<Types>: QueryablePayload<Types>,
259{
260 let from = req.integer_param::<_, usize>("from")?;
261 let until = req.integer_param("until")?;
262 enforce_range_limit(from, until, small_object_range_limit)?;
263
264 let leaves = state
265 .read(|state| state.get_leaf_range(from..until).boxed())
266 .await;
267 leaves
268 .enumerate()
269 .then(|(index, fetch)| async move {
270 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
271 resource: (index + from).to_string(),
272 })
273 })
274 .try_collect::<Vec<_>>()
275 .await
276}
277
278fn downgrade_vid_common_query_data<Types: NodeType>(
279 data: VidCommonQueryData<Types>,
280) -> Option<ADVZCommonQueryData<Types>> {
281 let VidCommonQueryData {
282 height,
283 block_hash,
284 payload_hash: VidCommitment::V0(payload_hash),
285 common: VidCommon::V0(common),
286 } = data
287 else {
288 return None;
289 };
290 Some(ADVZCommonQueryData {
291 height,
292 block_hash,
293 payload_hash,
294 common,
295 })
296}
297
298async fn get_vid_common_handler<Types, State>(
299 req: tide_disco::RequestParams,
300 state: &State,
301 timeout: Duration,
302) -> Result<VidCommonQueryData<Types>, Error>
303where
304 State: 'static + Send + Sync + ReadState,
305 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
306 Types: NodeType,
307 Header<Types>: QueryableHeader<Types>,
308 Payload<Types>: QueryablePayload<Types>,
309{
310 let id = if let Some(height) = req.opt_integer_param("height")? {
311 BlockId::Number(height)
312 } else if let Some(hash) = req.opt_blob_param("hash")? {
313 BlockId::Hash(hash)
314 } else {
315 BlockId::PayloadHash(req.blob_param("payload-hash")?)
316 };
317 let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
318 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
319 resource: id.to_string(),
320 })
321}
322
323pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
324 options: &Options,
325 _: Ver,
326 api_ver: semver::Version,
327) -> Result<Api<State, Error, Ver>, ApiError>
328where
329 State: 'static + Send + Sync + ReadState,
330 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
331 Header<Types>: QueryableHeader<Types>,
332 Payload<Types>: QueryablePayload<Types>,
333{
334 let mut api = load_api::<State, Error, Ver>(
335 options.api_path.as_ref(),
336 include_str!("../api/availability.toml"),
337 options.extensions.clone(),
338 )?;
339 let timeout = options.fetch_timeout;
340 let small_object_range_limit = options.small_object_range_limit;
341 let large_object_range_limit = options.large_object_range_limit;
342
343 api.with_version(api_ver.clone());
344
345 if api_ver.major == 0 {
353 api.at("get_leaf", move |req, state| {
354 get_leaf_handler(req, state, timeout)
355 .map(|res| res.map(downgrade_leaf_query_data))
356 .boxed()
357 })?;
358
359 api.at("get_leaf_range", move |req, state| {
360 get_leaf_range_handler(req, state, timeout, small_object_range_limit)
361 .map(|res| {
362 res.map(|r| {
363 r.into_iter()
364 .map(downgrade_leaf_query_data)
365 .collect::<Vec<Leaf1QueryData<_>>>()
366 })
367 })
368 .boxed()
369 })?;
370
371 api.stream("stream_leaves", move |req, state| {
372 async move {
373 let height = req.integer_param("height")?;
374 state
375 .read(|state| {
376 async move {
377 Ok(state
378 .subscribe_leaves(height)
379 .await
380 .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
381 }
382 .boxed()
383 })
384 .await
385 }
386 .try_flatten_stream()
387 .boxed()
388 })?;
389 } else {
390 api.at("get_leaf", move |req, state| {
391 get_leaf_handler(req, state, timeout).boxed()
392 })?;
393
394 api.at("get_leaf_range", move |req, state| {
395 get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
396 })?;
397
398 api.stream("stream_leaves", move |req, state| {
399 async move {
400 let height = req.integer_param("height")?;
401 state
402 .read(|state| {
403 async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
404 })
405 .await
406 }
407 .try_flatten_stream()
408 .boxed()
409 })?;
410 }
411
412 if api_ver.major == 0 {
415 api.at("get_vid_common", move |req, state| {
416 get_vid_common_handler(req, state, timeout)
417 .map(|r| match r {
418 Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
419 message: "Incompatible VID version.".to_string(),
420 status: StatusCode::BAD_REQUEST,
421 }),
422 Err(e) => Err(e),
423 })
424 .boxed()
425 })?
426 .stream("stream_vid_common", move |req, state| {
427 async move {
428 let height = req.integer_param("height")?;
429 state
430 .read(|state| {
431 async move {
432 Ok(state.subscribe_vid_common(height).await.map(|data| {
433 downgrade_vid_common_query_data(data).ok_or(Error::Custom {
434 message: "Incompatible VID version.".to_string(),
435 status: StatusCode::BAD_REQUEST,
436 })
437 }))
438 }
439 .boxed()
440 })
441 .await
442 }
443 .try_flatten_stream()
444 .boxed()
445 })?;
446 } else {
447 api.at("get_vid_common", move |req, state| {
448 get_vid_common_handler(req, state, timeout).boxed().boxed()
449 })?
450 .stream("stream_vid_common", move |req, state| {
451 async move {
452 let height = req.integer_param("height")?;
453 state
454 .read(|state| {
455 async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
456 })
457 .await
458 }
459 .try_flatten_stream()
460 .boxed()
461 })?;
462 }
463
464 api.at("get_header", move |req, state| {
465 async move {
466 let id = if let Some(height) = req.opt_integer_param("height")? {
467 BlockId::Number(height)
468 } else if let Some(hash) = req.opt_blob_param("hash")? {
469 BlockId::Hash(hash)
470 } else {
471 BlockId::PayloadHash(req.blob_param("payload-hash")?)
472 };
473 let fetch = state.read(|state| state.get_header(id).boxed()).await;
474 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
475 resource: id.to_string(),
476 })
477 }
478 .boxed()
479 })?
480 .at("get_header_range", move |req, state| {
481 async move {
482 let from = req.integer_param::<_, usize>("from")?;
483 let until = req.integer_param::<_, usize>("until")?;
484 enforce_range_limit(from, until, large_object_range_limit)?;
485
486 let headers = state
487 .read(|state| state.get_header_range(from..until).boxed())
488 .await;
489 headers
490 .enumerate()
491 .then(|(index, fetch)| async move {
492 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
493 resource: (index + from).to_string(),
494 })
495 })
496 .try_collect::<Vec<_>>()
497 .await
498 }
499 .boxed()
500 })?
501 .stream("stream_headers", move |req, state| {
502 async move {
503 let height = req.integer_param("height")?;
504 state
505 .read(|state| {
506 async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
507 })
508 .await
509 }
510 .try_flatten_stream()
511 .boxed()
512 })?
513 .at("get_block", move |req, state| {
514 async move {
515 let id = if let Some(height) = req.opt_integer_param("height")? {
516 BlockId::Number(height)
517 } else if let Some(hash) = req.opt_blob_param("hash")? {
518 BlockId::Hash(hash)
519 } else {
520 BlockId::PayloadHash(req.blob_param("payload-hash")?)
521 };
522 let fetch = state.read(|state| state.get_block(id).boxed()).await;
523 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
524 resource: id.to_string(),
525 })
526 }
527 .boxed()
528 })?
529 .at("get_block_range", move |req, state| {
530 async move {
531 let from = req.integer_param::<_, usize>("from")?;
532 let until = req.integer_param("until")?;
533 enforce_range_limit(from, until, large_object_range_limit)?;
534
535 let blocks = state
536 .read(|state| state.get_block_range(from..until).boxed())
537 .await;
538 blocks
539 .enumerate()
540 .then(|(index, fetch)| async move {
541 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
542 resource: (index + from).to_string(),
543 })
544 })
545 .try_collect::<Vec<_>>()
546 .await
547 }
548 .boxed()
549 })?
550 .stream("stream_blocks", move |req, state| {
551 async move {
552 let height = req.integer_param("height")?;
553 state
554 .read(|state| {
555 async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
556 })
557 .await
558 }
559 .try_flatten_stream()
560 .boxed()
561 })?
562 .at("get_payload", move |req, state| {
563 async move {
564 let id = if let Some(height) = req.opt_integer_param("height")? {
565 BlockId::Number(height)
566 } else if let Some(hash) = req.opt_blob_param("hash")? {
567 BlockId::PayloadHash(hash)
568 } else {
569 BlockId::Hash(req.blob_param("block-hash")?)
570 };
571 let fetch = state.read(|state| state.get_payload(id).boxed()).await;
572 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
573 resource: id.to_string(),
574 })
575 }
576 .boxed()
577 })?
578 .at("get_payload_range", move |req, state| {
579 async move {
580 let from = req.integer_param::<_, usize>("from")?;
581 let until = req.integer_param("until")?;
582 enforce_range_limit(from, until, large_object_range_limit)?;
583
584 let payloads = state
585 .read(|state| state.get_payload_range(from..until).boxed())
586 .await;
587 payloads
588 .enumerate()
589 .then(|(index, fetch)| async move {
590 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
591 resource: (index + from).to_string(),
592 })
593 })
594 .try_collect::<Vec<_>>()
595 .await
596 }
597 .boxed()
598 })?
599 .stream("stream_payloads", move |req, state| {
600 async move {
601 let height = req.integer_param("height")?;
602 state
603 .read(|state| {
604 async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
605 })
606 .await
607 }
608 .try_flatten_stream()
609 .boxed()
610 })?
611 .at("get_transaction_proof", move |req, state| {
612 async move {
613 let tx = get_transaction(req, state, timeout).await?;
614 let height = tx.block.height();
615 let vid = state
616 .read(|state| state.get_vid_common(height as usize))
617 .await
618 .with_timeout(timeout)
619 .await
620 .context(FetchBlockSnafu {
621 resource: height.to_string(),
622 })?;
623 let proof = tx.block.transaction_proof(&vid, &tx.index).context(
624 InvalidTransactionIndexSnafu {
625 height,
626 index: tx.transaction.index(),
627 },
628 )?;
629 Ok(TransactionWithProofQueryData::new(tx.transaction, proof))
630 }
631 .boxed()
632 })?
633 .at("get_transaction", move |req, state| {
634 async move { Ok(get_transaction(req, state, timeout).await?.transaction) }.boxed()
635 })?
636 .stream("stream_transactions", move |req, state| {
637 async move {
638 let height = req.integer_param::<_, usize>("height")?;
639
640 let namespace: Option<i64> = req
641 .opt_integer_param::<_, usize>("namespace")?
642 .map(|i| {
643 i.try_into().map_err(|err| Error::Custom {
644 message: format!(
645 "Invalid 'namespace': could not convert usize to i64: {err}"
646 ),
647 status: StatusCode::BAD_REQUEST,
648 })
649 })
650 .transpose()?;
651
652 state
653 .read(|state| {
654 async move {
655 Ok(state
656 .subscribe_blocks(height)
657 .await
658 .map(move |block| {
659 let transactions = block.enumerate().enumerate();
660 let header = block.header();
661 let filtered_txs = transactions
662 .filter_map(|(i, (index, _tx))| {
663 if let Some(requested_ns) = namespace {
664 let ns_id = QueryableHeader::<Types>::namespace_id(
665 header,
666 &index.ns_index,
667 )?;
668
669 if ns_id.into() != requested_ns {
670 return None;
671 }
672 }
673
674 let tx = block.transaction(&index)?;
675 TransactionQueryData::new(tx, &block, &index, i as u64)
676 })
677 .collect::<Vec<_>>();
678
679 futures::stream::iter(filtered_txs.into_iter().map(Ok))
680 })
681 .flatten())
682 }
683 .boxed()
684 })
685 .await
686 }
687 .try_flatten_stream()
688 .boxed()
689 })?
690 .at("get_block_summary", move |req, state| {
691 async move {
692 let id: usize = req.integer_param("height")?;
693
694 let fetch = state.read(|state| state.get_block(id).boxed()).await;
695 fetch
696 .with_timeout(timeout)
697 .await
698 .context(FetchBlockSnafu {
699 resource: id.to_string(),
700 })
701 .map(BlockSummaryQueryData::from)
702 }
703 .boxed()
704 })?
705 .at("get_block_summary_range", move |req, state| {
706 async move {
707 let from: usize = req.integer_param("from")?;
708 let until: usize = req.integer_param("until")?;
709 enforce_range_limit(from, until, large_object_range_limit)?;
710
711 let blocks = state
712 .read(|state| state.get_block_range(from..until).boxed())
713 .await;
714 let result: Vec<BlockSummaryQueryData<Types>> = blocks
715 .enumerate()
716 .then(|(index, fetch)| async move {
717 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
718 resource: (index + from).to_string(),
719 })
720 })
721 .map(|result| result.map(BlockSummaryQueryData::from))
722 .try_collect()
723 .await?;
724
725 Ok(result)
726 }
727 .boxed()
728 })?
729 .at("get_limits", move |_req, _state| {
730 async move {
731 Ok(Limits {
732 small_object_range_limit,
733 large_object_range_limit,
734 })
735 }
736 .boxed()
737 })?;
738 Ok(api)
739}
740
741fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
742 if until.saturating_sub(from) > limit {
743 return Err(Error::RangeLimit { from, until, limit });
744 }
745 Ok(())
746}
747
748async fn get_transaction<Types, State>(
749 req: RequestParams,
750 state: &State,
751 timeout: Duration,
752) -> Result<BlockWithTransaction<Types>, Error>
753where
754 Types: NodeType,
755 Header<Types>: QueryableHeader<Types>,
756 Payload<Types>: QueryablePayload<Types>,
757 State: 'static + Send + Sync + ReadState,
758 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
759{
760 match req.opt_blob_param("hash")? {
761 Some(hash) => state
762 .read(|state| state.get_block_containing_transaction(hash).boxed())
763 .await
764 .with_timeout(timeout)
765 .await
766 .context(FetchTransactionSnafu {
767 resource: hash.to_string(),
768 }),
769 None => {
770 let height: u64 = req.integer_param("height")?;
771 let fetch = state
772 .read(|state| state.get_block(height as usize).boxed())
773 .await;
774 let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
775 resource: height.to_string(),
776 })?;
777 let i: u64 = req.integer_param("index")?;
778 let index = block
779 .payload()
780 .nth(block.metadata(), i as usize)
781 .context(InvalidTransactionIndexSnafu { height, index: i })?;
782 let transaction = block
783 .transaction(&index)
784 .context(InvalidTransactionIndexSnafu { height, index: i })?;
785 let transaction = TransactionQueryData::new(transaction, &block, &index, i)
786 .context(InvalidTransactionIndexSnafu { height, index: i })?;
787 Ok(BlockWithTransaction {
788 transaction,
789 block,
790 index,
791 })
792 },
793 }
794}
795
796#[cfg(test)]
797mod test {
798 use std::{fmt::Debug, time::Duration};
799
800 use async_lock::RwLock;
801 use committable::Committable;
802 use futures::future::FutureExt;
803 use hotshot_example_types::node_types::EpochsTestVersions;
804 use hotshot_types::{data::Leaf2, simple_certificate::QuorumCertificate2};
805 use portpicker::pick_unused_port;
806 use serde::de::DeserializeOwned;
807 use surf_disco::{Client, Error as _};
808 use tempfile::TempDir;
809 use tide_disco::App;
810 use toml::toml;
811
812 use super::*;
813 use crate::{
814 data_source::{storage::AvailabilityStorage, ExtensibleDataSource, VersionedDataSource},
815 status::StatusDataSource,
816 task::BackgroundTask,
817 testing::{
818 consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
819 mocks::{mock_transaction, MockBase, MockHeader, MockPayload, MockTypes, MockVersions},
820 },
821 types::HeightIndexed,
822 ApiState, Error, Header,
823 };
824
825 async fn get_non_empty_blocks(
827 client: &Client<Error, MockBase>,
828 ) -> (
829 u64,
830 Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
831 ) {
832 let mut blocks = vec![];
833 for i in 1.. {
835 match client
836 .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
837 .send()
838 .await
839 {
840 Ok(block) => {
841 if !block.is_empty() {
842 let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
843 blocks.push((leaf, block));
844 }
845 },
846 Err(Error::Availability {
847 source: super::Error::FetchBlock { .. },
848 }) => {
849 tracing::info!(
850 "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
851 );
852 return (i, blocks);
853 },
854 Err(err) => panic!("unexpected error {err}"),
855 }
856 }
857 unreachable!()
858 }
859
860 async fn validate(client: &Client<Error, MockBase>, height: u64) {
861 for i in 0..height {
863 if ![0, 1, height / 2, height - 1].contains(&i) {
866 continue;
867 }
868 tracing::info!("validate block {i}/{height}");
869
870 let leaf: LeafQueryData<MockTypes> =
872 client.get(&format!("leaf/{i}")).send().await.unwrap();
873 assert_eq!(leaf.height(), i);
874 assert_eq!(
875 leaf,
876 client
877 .get(&format!("leaf/hash/{}", leaf.hash()))
878 .send()
879 .await
880 .unwrap()
881 );
882
883 let block: BlockQueryData<MockTypes> =
885 client.get(&format!("block/{i}")).send().await.unwrap();
886 let expected_payload = PayloadQueryData::from(block.clone());
887 assert_eq!(leaf.block_hash(), block.hash());
888 assert_eq!(block.height(), i);
889 assert_eq!(
890 block,
891 client
892 .get(&format!("block/hash/{}", block.hash()))
893 .send()
894 .await
895 .unwrap()
896 );
897 assert_eq!(
898 *block.header(),
899 client.get(&format!("header/{i}")).send().await.unwrap()
900 );
901 assert_eq!(
902 *block.header(),
903 client
904 .get(&format!("header/hash/{}", block.hash()))
905 .send()
906 .await
907 .unwrap()
908 );
909 assert_eq!(
910 expected_payload,
911 client.get(&format!("payload/{i}")).send().await.unwrap(),
912 );
913 assert_eq!(
914 expected_payload,
915 client
916 .get(&format!("payload/block-hash/{}", block.hash()))
917 .send()
918 .await
919 .unwrap(),
920 );
921 let common: VidCommonQueryData<MockTypes> = client
923 .get(&format!("vid/common/{}", block.height()))
924 .send()
925 .await
926 .unwrap();
927 assert_eq!(common.height(), block.height());
928 assert_eq!(common.block_hash(), block.hash());
929 assert_eq!(common.payload_hash(), block.payload_hash());
930 assert_eq!(
931 common,
932 client
933 .get(&format!("vid/common/hash/{}", block.hash()))
934 .send()
935 .await
936 .unwrap()
937 );
938
939 let block_summary = client
940 .get(&format!("block/summary/{i}"))
941 .send()
942 .await
943 .unwrap();
944 assert_eq!(
945 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
946 block_summary,
947 );
948 assert_eq!(block_summary.header(), block.header());
949 assert_eq!(block_summary.hash(), block.hash());
950 assert_eq!(block_summary.size(), block.size());
951 assert_eq!(block_summary.num_transactions(), block.num_transactions());
952
953 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
954 .get(&format!("block/summaries/{}/{}", 0, i))
955 .send()
956 .await
957 .unwrap();
958 assert_eq!(block_summaries.len() as u64, i);
959
960 assert_eq!(
965 block.payload(),
966 client
967 .get::<BlockQueryData<MockTypes>>(&format!(
968 "block/payload-hash/{}",
969 block.payload_hash()
970 ))
971 .send()
972 .await
973 .unwrap()
974 .payload()
975 );
976 assert_eq!(
977 block.payload_hash(),
978 client
979 .get::<Header<MockTypes>>(&format!(
980 "header/payload-hash/{}",
981 block.payload_hash()
982 ))
983 .send()
984 .await
985 .unwrap()
986 .payload_commitment
987 );
988 assert_eq!(
989 block.payload(),
990 client
991 .get::<PayloadQueryData<MockTypes>>(&format!(
992 "payload/hash/{}",
993 block.payload_hash()
994 ))
995 .send()
996 .await
997 .unwrap()
998 .data(),
999 );
1000 assert_eq!(
1001 common.common(),
1002 client
1003 .get::<VidCommonQueryData<MockTypes>>(&format!(
1004 "vid/common/payload-hash/{}",
1005 block.payload_hash()
1006 ))
1007 .send()
1008 .await
1009 .unwrap()
1010 .common()
1011 );
1012
1013 for (j, txn_from_block) in block.enumerate() {
1016 let txn: TransactionQueryData<MockTypes> = client
1017 .get(&format!("transaction/{}/{}/noproof", i, j.position))
1018 .send()
1019 .await
1020 .unwrap();
1021 assert_eq!(txn.block_height(), i);
1022 assert_eq!(txn.block_hash(), block.hash());
1023 assert_eq!(txn.index(), j.position as u64);
1024 assert_eq!(txn.hash(), txn_from_block.commit());
1025 assert_eq!(txn.transaction(), &txn_from_block);
1026 assert_eq!(
1031 txn.hash(),
1032 client
1033 .get::<TransactionQueryData<MockTypes>>(&format!(
1034 "transaction/hash/{}/noproof",
1035 txn.hash()
1036 ))
1037 .send()
1038 .await
1039 .unwrap()
1040 .hash()
1041 );
1042
1043 let tx_with_proof = client
1044 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1045 "transaction/{}/{}/proof",
1046 i, j.position
1047 ))
1048 .send()
1049 .await
1050 .unwrap();
1051 assert_eq!(txn.hash(), tx_with_proof.hash());
1052 assert!(tx_with_proof.proof().verify(
1053 block.metadata(),
1054 txn.transaction(),
1055 &block.payload_hash(),
1056 common.common()
1057 ));
1058
1059 let tx_with_proof = client
1061 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1062 "transaction/hash/{}/proof",
1063 txn.hash()
1064 ))
1065 .send()
1066 .await
1067 .unwrap();
1068 assert_eq!(txn.hash(), tx_with_proof.hash());
1069 assert!(tx_with_proof.proof().verify(
1070 block.metadata(),
1071 txn.transaction(),
1072 &block.payload_hash(),
1073 common.common()
1074 ));
1075 }
1076
1077 let block_range: Vec<BlockQueryData<MockTypes>> = client
1078 .get(&format!("block/{}/{}", 0, i))
1079 .send()
1080 .await
1081 .unwrap();
1082
1083 assert_eq!(block_range.len() as u64, i);
1084
1085 let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1086 .get(&format!("leaf/{}/{}", 0, i))
1087 .send()
1088 .await
1089 .unwrap();
1090
1091 assert_eq!(leaf_range.len() as u64, i);
1092
1093 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1094 .get(&format!("payload/{}/{}", 0, i))
1095 .send()
1096 .await
1097 .unwrap();
1098
1099 assert_eq!(payload_range.len() as u64, i);
1100
1101 let header_range: Vec<Header<MockTypes>> = client
1102 .get(&format!("header/{}/{}", 0, i))
1103 .send()
1104 .await
1105 .unwrap();
1106
1107 assert_eq!(header_range.len() as u64, i);
1108 }
1109 }
1110
1111 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1112 async fn test_api() {
1113 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1115 network.start().await;
1116
1117 let port = pick_unused_port().unwrap();
1119 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1120 let options = Options {
1121 small_object_range_limit: 500,
1122 large_object_range_limit: 500,
1123 ..Default::default()
1124 };
1125
1126 app.register_module(
1127 "availability",
1128 define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1129 )
1130 .unwrap();
1131 network.spawn(
1132 "server",
1133 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1134 );
1135
1136 let client = Client::<Error, MockBase>::new(
1138 format!("http://localhost:{port}/availability")
1139 .parse()
1140 .unwrap(),
1141 );
1142 assert!(client.connect(Some(Duration::from_secs(60))).await);
1143 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1144
1145 let leaves = client
1148 .socket("stream/leaves/0")
1149 .subscribe::<LeafQueryData<MockTypes>>()
1150 .await
1151 .unwrap();
1152 let headers = client
1153 .socket("stream/headers/0")
1154 .subscribe::<Header<MockTypes>>()
1155 .await
1156 .unwrap();
1157 let blocks = client
1158 .socket("stream/blocks/0")
1159 .subscribe::<BlockQueryData<MockTypes>>()
1160 .await
1161 .unwrap();
1162 let vid_common = client
1163 .socket("stream/vid/common/0")
1164 .subscribe::<VidCommonQueryData<MockTypes>>()
1165 .await
1166 .unwrap();
1167 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1168 for nonce in 0..3 {
1169 let txn = mock_transaction(vec![nonce]);
1170 network.submit_transaction(txn).await;
1171
1172 let (i, leaf, block, common) = loop {
1174 tracing::info!("waiting for block with transaction {}", nonce);
1175 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1176 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1177 let leaf = leaf.unwrap();
1178 let header = header.unwrap();
1179 let block = block.unwrap();
1180 let common = common.unwrap();
1181 assert_eq!(leaf.height() as usize, i);
1182 assert_eq!(leaf.block_hash(), block.hash());
1183 assert_eq!(block.header(), &header);
1184 assert_eq!(common.height() as usize, i);
1185 if !block.is_empty() {
1186 break (i, leaf, block, common);
1187 }
1188 };
1189 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1190 assert_eq!(
1191 block,
1192 client.get(&format!("block/{i}")).send().await.unwrap()
1193 );
1194 assert_eq!(
1195 common,
1196 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1197 );
1198
1199 validate(&client, (i + 1) as u64).await;
1200 }
1201
1202 network.shut_down().await;
1203 }
1204
1205 async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1206 for i in 0..height {
1208 if ![0, 1, height / 2, height - 1].contains(&i) {
1211 continue;
1212 }
1213 tracing::info!("validate block {i}/{height}");
1214
1215 let leaf: Leaf1QueryData<MockTypes> =
1217 client.get(&format!("leaf/{i}")).send().await.unwrap();
1218 assert_eq!(leaf.leaf.height(), i);
1219 assert_eq!(
1220 leaf,
1221 client
1222 .get(&format!(
1223 "leaf/hash/{}",
1224 <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1225 ))
1226 .send()
1227 .await
1228 .unwrap()
1229 );
1230
1231 let block: BlockQueryData<MockTypes> =
1233 client.get(&format!("block/{i}")).send().await.unwrap();
1234 let expected_payload = PayloadQueryData::from(block.clone());
1235 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1236 assert_eq!(block.height(), i);
1237 assert_eq!(
1238 block,
1239 client
1240 .get(&format!("block/hash/{}", block.hash()))
1241 .send()
1242 .await
1243 .unwrap()
1244 );
1245 assert_eq!(
1246 *block.header(),
1247 client.get(&format!("header/{i}")).send().await.unwrap()
1248 );
1249 assert_eq!(
1250 *block.header(),
1251 client
1252 .get(&format!("header/hash/{}", block.hash()))
1253 .send()
1254 .await
1255 .unwrap()
1256 );
1257 assert_eq!(
1258 expected_payload,
1259 client.get(&format!("payload/{i}")).send().await.unwrap(),
1260 );
1261 assert_eq!(
1262 expected_payload,
1263 client
1264 .get(&format!("payload/block-hash/{}", block.hash()))
1265 .send()
1266 .await
1267 .unwrap(),
1268 );
1269 let common: ADVZCommonQueryData<MockTypes> = client
1271 .get(&format!("vid/common/{}", block.height()))
1272 .send()
1273 .await
1274 .unwrap();
1275 assert_eq!(common.height(), block.height());
1276 assert_eq!(common.block_hash(), block.hash());
1277 assert_eq!(
1278 VidCommitment::V0(common.payload_hash()),
1279 block.payload_hash(),
1280 );
1281 assert_eq!(
1282 common,
1283 client
1284 .get(&format!("vid/common/hash/{}", block.hash()))
1285 .send()
1286 .await
1287 .unwrap()
1288 );
1289
1290 let block_summary = client
1291 .get(&format!("block/summary/{i}"))
1292 .send()
1293 .await
1294 .unwrap();
1295 assert_eq!(
1296 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1297 block_summary,
1298 );
1299 assert_eq!(block_summary.header(), block.header());
1300 assert_eq!(block_summary.hash(), block.hash());
1301 assert_eq!(block_summary.size(), block.size());
1302 assert_eq!(block_summary.num_transactions(), block.num_transactions());
1303
1304 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1305 .get(&format!("block/summaries/{}/{}", 0, i))
1306 .send()
1307 .await
1308 .unwrap();
1309 assert_eq!(block_summaries.len() as u64, i);
1310
1311 assert_eq!(
1316 block.payload(),
1317 client
1318 .get::<BlockQueryData<MockTypes>>(&format!(
1319 "block/payload-hash/{}",
1320 block.payload_hash()
1321 ))
1322 .send()
1323 .await
1324 .unwrap()
1325 .payload()
1326 );
1327 assert_eq!(
1328 block.payload_hash(),
1329 client
1330 .get::<Header<MockTypes>>(&format!(
1331 "header/payload-hash/{}",
1332 block.payload_hash()
1333 ))
1334 .send()
1335 .await
1336 .unwrap()
1337 .payload_commitment
1338 );
1339 assert_eq!(
1340 block.payload(),
1341 client
1342 .get::<PayloadQueryData<MockTypes>>(&format!(
1343 "payload/hash/{}",
1344 block.payload_hash()
1345 ))
1346 .send()
1347 .await
1348 .unwrap()
1349 .data(),
1350 );
1351 assert_eq!(
1352 common.common(),
1353 client
1354 .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1355 "vid/common/payload-hash/{}",
1356 block.payload_hash()
1357 ))
1358 .send()
1359 .await
1360 .unwrap()
1361 .common()
1362 );
1363
1364 for (j, txn_from_block) in block.enumerate() {
1367 let txn: TransactionQueryData<MockTypes> = client
1368 .get(&format!("transaction/{}/{}/noproof", i, j.position))
1369 .send()
1370 .await
1371 .unwrap();
1372 assert_eq!(txn.block_height(), i);
1373 assert_eq!(txn.block_hash(), block.hash());
1374 assert_eq!(txn.index(), j.position as u64);
1375 assert_eq!(txn.hash(), txn_from_block.commit());
1376 assert_eq!(txn.transaction(), &txn_from_block);
1377 assert_eq!(
1382 txn.hash(),
1383 client
1384 .get::<TransactionQueryData<MockTypes>>(&format!(
1385 "transaction/hash/{}/noproof",
1386 txn.hash()
1387 ))
1388 .send()
1389 .await
1390 .unwrap()
1391 .hash()
1392 );
1393
1394 assert_eq!(
1395 txn.hash(),
1396 client
1397 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1398 "transaction/{}/{}/proof",
1399 i, j.position
1400 ))
1401 .send()
1402 .await
1403 .unwrap()
1404 .hash()
1405 );
1406
1407 assert_eq!(
1408 txn.hash(),
1409 client
1410 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1411 "transaction/hash/{}/proof",
1412 txn.hash()
1413 ))
1414 .send()
1415 .await
1416 .unwrap()
1417 .hash()
1418 );
1419 }
1420
1421 let block_range: Vec<BlockQueryData<MockTypes>> = client
1422 .get(&format!("block/{}/{}", 0, i))
1423 .send()
1424 .await
1425 .unwrap();
1426
1427 assert_eq!(block_range.len() as u64, i);
1428
1429 let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1430 .get(&format!("leaf/{}/{}", 0, i))
1431 .send()
1432 .await
1433 .unwrap();
1434
1435 assert_eq!(leaf_range.len() as u64, i);
1436
1437 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1438 .get(&format!("payload/{}/{}", 0, i))
1439 .send()
1440 .await
1441 .unwrap();
1442
1443 assert_eq!(payload_range.len() as u64, i);
1444
1445 let header_range: Vec<Header<MockTypes>> = client
1446 .get(&format!("header/{}/{}", 0, i))
1447 .send()
1448 .await
1449 .unwrap();
1450
1451 assert_eq!(header_range.len() as u64, i);
1452 }
1453 }
1454
1455 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1456 async fn test_api_epochs() {
1457 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
1459 let epoch_height = network.epoch_height();
1460 network.start().await;
1461
1462 let port = pick_unused_port().unwrap();
1464 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1465 app.register_module(
1466 "availability",
1467 define_api(
1468 &Default::default(),
1469 MockBase::instance(),
1470 "1.0.0".parse().unwrap(),
1471 )
1472 .unwrap(),
1473 )
1474 .unwrap();
1475 network.spawn(
1476 "server",
1477 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1478 );
1479
1480 let client = Client::<Error, MockBase>::new(
1482 format!("http://localhost:{port}/availability")
1483 .parse()
1484 .unwrap(),
1485 );
1486 assert!(client.connect(Some(Duration::from_secs(60))).await);
1487
1488 let headers = client
1491 .socket("stream/headers/0")
1492 .subscribe::<Header<MockTypes>>()
1493 .await
1494 .unwrap();
1495 let mut chain = headers.enumerate();
1496
1497 loop {
1498 let (i, header) = chain.next().await.unwrap();
1499 let header = header.unwrap();
1500 assert_eq!(header.height(), i as u64);
1501 if header.height() >= 3 * epoch_height {
1502 break;
1503 }
1504 }
1505
1506 network.shut_down().await;
1507 }
1508
1509 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1510 async fn test_old_api() {
1511 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1513 network.start().await;
1514
1515 let port = pick_unused_port().unwrap();
1517
1518 let options = Options {
1519 small_object_range_limit: 500,
1520 large_object_range_limit: 500,
1521 ..Default::default()
1522 };
1523
1524 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1525 app.register_module(
1526 "availability",
1527 define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1528 )
1529 .unwrap();
1530 network.spawn(
1531 "server",
1532 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1533 );
1534
1535 let client = Client::<Error, MockBase>::new(
1537 format!("http://localhost:{port}/availability")
1538 .parse()
1539 .unwrap(),
1540 );
1541 assert!(client.connect(Some(Duration::from_secs(60))).await);
1542 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1543
1544 let leaves = client
1547 .socket("stream/leaves/0")
1548 .subscribe::<Leaf1QueryData<MockTypes>>()
1549 .await
1550 .unwrap();
1551 let headers = client
1552 .socket("stream/headers/0")
1553 .subscribe::<Header<MockTypes>>()
1554 .await
1555 .unwrap();
1556 let blocks = client
1557 .socket("stream/blocks/0")
1558 .subscribe::<BlockQueryData<MockTypes>>()
1559 .await
1560 .unwrap();
1561 let vid_common = client
1562 .socket("stream/vid/common/0")
1563 .subscribe::<ADVZCommonQueryData<MockTypes>>()
1564 .await
1565 .unwrap();
1566 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1567 for nonce in 0..3 {
1568 let txn = mock_transaction(vec![nonce]);
1569 network.submit_transaction(txn).await;
1570
1571 let (i, leaf, block, common) = loop {
1573 tracing::info!("waiting for block with transaction {}", nonce);
1574 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1575 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1576 let leaf = leaf.unwrap();
1577 let header = header.unwrap();
1578 let block = block.unwrap();
1579 let common = common.unwrap();
1580 assert_eq!(leaf.leaf.height() as usize, i);
1581 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1582 assert_eq!(block.header(), &header);
1583 assert_eq!(common.height() as usize, i);
1584 if !block.is_empty() {
1585 break (i, leaf, block, common);
1586 }
1587 };
1588 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1589 assert_eq!(
1590 block,
1591 client.get(&format!("block/{i}")).send().await.unwrap()
1592 );
1593 assert_eq!(
1594 common,
1595 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1596 );
1597
1598 validate_old(&client, (i + 1) as u64).await;
1599 }
1600
1601 network.shut_down().await;
1602 }
1603
1604 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1605 async fn test_extensions() {
1606 use hotshot_example_types::node_types::TestVersions;
1607
1608 let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1609 let data_source = ExtensibleDataSource::new(
1610 MockDataSource::create(dir.path(), Default::default())
1611 .await
1612 .unwrap(),
1613 0,
1614 );
1615
1616 let leaf =
1618 Leaf2::<MockTypes>::genesis::<MockVersions>(&Default::default(), &Default::default())
1619 .await;
1620 let qc =
1621 QuorumCertificate2::genesis::<TestVersions>(&Default::default(), &Default::default())
1622 .await;
1623 let leaf = LeafQueryData::new(leaf, qc).unwrap();
1624 let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1625 data_source
1626 .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1627 .await
1628 .unwrap();
1629
1630 assert_eq!(
1632 ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1633 .await
1634 .unwrap(),
1635 1
1636 );
1637 assert_eq!(block, data_source.get_block(0).await.await);
1638
1639 let extensions = toml! {
1641 [route.post_ext]
1642 PATH = ["/ext/:val"]
1643 METHOD = "POST"
1644 ":val" = "Integer"
1645
1646 [route.get_ext]
1647 PATH = ["/ext"]
1648 METHOD = "GET"
1649 };
1650
1651 let mut api =
1652 define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1653 &Options {
1654 extensions: vec![extensions.into()],
1655 ..Default::default()
1656 },
1657 MockBase::instance(),
1658 "1.0.0".parse().unwrap(),
1659 )
1660 .unwrap();
1661 api.get("get_ext", |_, state| {
1662 async move { Ok(*state.as_ref()) }.boxed()
1663 })
1664 .unwrap()
1665 .post("post_ext", |req, state| {
1666 async move {
1667 *state.as_mut() = req.integer_param("val")?;
1668 Ok(())
1669 }
1670 .boxed()
1671 })
1672 .unwrap();
1673
1674 let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1675 app.register_module("availability", api).unwrap();
1676
1677 let port = pick_unused_port().unwrap();
1678 let _server = BackgroundTask::spawn(
1679 "server",
1680 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1681 );
1682
1683 let client = Client::<Error, MockBase>::new(
1684 format!("http://localhost:{port}/availability")
1685 .parse()
1686 .unwrap(),
1687 );
1688 assert!(client.connect(Some(Duration::from_secs(60))).await);
1689
1690 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1691 client.post::<()>("ext/42").send().await.unwrap();
1692 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1693
1694 assert_eq!(
1696 client
1697 .get::<MockHeader>("header/0")
1698 .send()
1699 .await
1700 .unwrap()
1701 .block_number,
1702 0
1703 );
1704 }
1705
1706 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1707 async fn test_range_limit() {
1708 let large_object_range_limit = 2;
1709 let small_object_range_limit = 3;
1710
1711 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1713 network.start().await;
1714
1715 let port = pick_unused_port().unwrap();
1717 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1718 app.register_module(
1719 "availability",
1720 define_api(
1721 &Options {
1722 large_object_range_limit,
1723 small_object_range_limit,
1724 ..Default::default()
1725 },
1726 MockBase::instance(),
1727 "1.0.0".parse().unwrap(),
1728 )
1729 .unwrap(),
1730 )
1731 .unwrap();
1732 network.spawn(
1733 "server",
1734 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1735 );
1736
1737 let client = Client::<Error, MockBase>::new(
1739 format!("http://localhost:{port}/availability")
1740 .parse()
1741 .unwrap(),
1742 );
1743 assert!(client.connect(Some(Duration::from_secs(60))).await);
1744
1745 assert_eq!(
1747 client.get::<Limits>("limits").send().await.unwrap(),
1748 Limits {
1749 small_object_range_limit,
1750 large_object_range_limit
1751 }
1752 );
1753
1754 client
1756 .socket("stream/blocks/0")
1757 .subscribe::<BlockQueryData<MockTypes>>()
1758 .await
1759 .unwrap()
1760 .take(small_object_range_limit + 1)
1761 .try_collect::<Vec<_>>()
1762 .await
1763 .unwrap();
1764
1765 async fn check_limit<T: DeserializeOwned + Debug>(
1766 client: &Client<Error, MockBase>,
1767 req: &str,
1768 limit: usize,
1769 ) {
1770 let range: Vec<T> = client
1771 .get(&format!("{req}/0/{limit}"))
1772 .send()
1773 .await
1774 .unwrap();
1775 assert_eq!(range.len(), limit);
1776 let err = client
1777 .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1778 .send()
1779 .await
1780 .unwrap_err();
1781 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1782 }
1783
1784 check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1785 check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1786 check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1787 check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1788 .await;
1789 check_limit::<BlockSummaryQueryData<MockTypes>>(
1790 &client,
1791 "block/summaries",
1792 large_object_range_limit,
1793 )
1794 .await;
1795
1796 network.shut_down().await;
1797 }
1798
1799 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1800 async fn test_header_endpoint() {
1801 let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init().await;
1803 network.start().await;
1804
1805 let port = pick_unused_port().unwrap();
1807 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1808 app.register_module(
1809 "availability",
1810 define_api(
1811 &Default::default(),
1812 MockBase::instance(),
1813 "1.0.0".parse().unwrap(),
1814 )
1815 .unwrap(),
1816 )
1817 .unwrap();
1818 network.spawn(
1819 "server",
1820 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1821 );
1822
1823 let ds = network.data_source();
1824
1825 let block_height = ds.block_height().await.unwrap();
1828 let fetch = ds
1829 .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1830 .await;
1831
1832 assert!(fetch.is_pending());
1833 let header = fetch.await;
1834 assert_eq!(header.height() as usize, block_height + 25);
1835
1836 network.shut_down().await;
1837 }
1838
1839 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1840 async fn test_leaf_only_ds() {
1841 let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init_with_leaf_ds().await;
1843 network.start().await;
1844
1845 let port = pick_unused_port().unwrap();
1847 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1848 app.register_module(
1849 "availability",
1850 define_api(
1851 &Default::default(),
1852 MockBase::instance(),
1853 "1.0.0".parse().unwrap(),
1854 )
1855 .unwrap(),
1856 )
1857 .unwrap();
1858 network.spawn(
1859 "server",
1860 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1861 );
1862
1863 let client = Client::<Error, MockBase>::new(
1865 format!("http://localhost:{port}/availability")
1866 .parse()
1867 .unwrap(),
1868 );
1869 assert!(client.connect(Some(Duration::from_secs(60))).await);
1870
1871 client
1873 .socket("stream/headers/0")
1874 .subscribe::<Header<MockTypes>>()
1875 .await
1876 .unwrap()
1877 .take(5)
1878 .try_collect::<Vec<_>>()
1879 .await
1880 .unwrap();
1881
1882 client
1884 .socket("stream/leaves/5")
1885 .subscribe::<LeafQueryData<MockTypes>>()
1886 .await
1887 .unwrap()
1888 .take(5)
1889 .try_collect::<Vec<_>>()
1890 .await
1891 .unwrap();
1892
1893 let ds = network.data_source();
1894
1895 let block_height = ds.block_height().await.unwrap();
1899 let target_block_height = block_height + 20;
1900 let fetch = ds
1901 .get_block(BlockId::<MockTypes>::Number(target_block_height))
1902 .await;
1903
1904 assert!(fetch.is_pending());
1905 let block = fetch.await;
1906 assert_eq!(block.height() as usize, target_block_height);
1907
1908 let mut tx = ds.read().await.unwrap();
1909 tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1910 .await
1911 .unwrap_err();
1912 drop(tx);
1913
1914 network.shut_down().await;
1915 }
1916}