1use std::{fmt::Display, path::PathBuf, time::Duration};
30
31use derive_more::From;
32use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
33use hotshot_types::{
34 data::{Leaf, Leaf2, QuorumProposal, VidCommitment},
35 simple_certificate::QuorumCertificate,
36 traits::node_implementation::NodeType,
37};
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, Snafu};
40use tide_disco::{api::ApiError, method::ReadState, Api, RequestError, RequestParams, StatusCode};
41use vbs::version::StaticVersionType;
42
43use crate::{api::load_api, types::HeightIndexed, Header, Payload, QueryError, VidCommon};
44
45pub(crate) mod data_source;
46mod fetch;
47pub(crate) mod query_data;
48pub use data_source::*;
49pub use fetch::Fetch;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54 pub api_path: Option<PathBuf>,
55
56 pub fetch_timeout: Duration,
62
63 pub extensions: Vec<toml::Value>,
68
69 pub small_object_range_limit: usize,
77
78 pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89 fn default() -> Self {
90 Self {
91 api_path: None,
92 fetch_timeout: Duration::from_millis(500),
93 extensions: vec![],
94 large_object_range_limit: 100,
95 small_object_range_limit: 500,
96 }
97 }
98}
99
100#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
101#[snafu(visibility(pub))]
102pub enum Error {
103 Request {
104 source: RequestError,
105 },
106 #[snafu(display("leaf {resource} missing or not available"))]
107 #[from(ignore)]
108 FetchLeaf {
109 resource: String,
110 },
111 #[snafu(display("block {resource} missing or not available"))]
112 #[from(ignore)]
113 FetchBlock {
114 resource: String,
115 },
116 #[snafu(display("header {resource} missing or not available"))]
117 #[from(ignore)]
118 FetchHeader {
119 resource: String,
120 },
121 #[snafu(display("transaction {resource} missing or not available"))]
122 #[from(ignore)]
123 FetchTransaction {
124 resource: String,
125 },
126 #[snafu(display("transaction index {index} out of range for block {height}"))]
127 #[from(ignore)]
128 InvalidTransactionIndex {
129 height: u64,
130 index: u64,
131 },
132 #[snafu(display("request for range {from}..{until} exceeds limit {limit}"))]
133 #[from(ignore)]
134 RangeLimit {
135 from: usize,
136 until: usize,
137 limit: usize,
138 },
139 #[snafu(display("{source}"))]
140 Query {
141 source: QueryError,
142 },
143 #[snafu(display("State cert for epoch {epoch} not found"))]
144 #[from(ignore)]
145 FetchStateCert {
146 epoch: u64,
147 },
148 #[snafu(display("error {status}: {message}"))]
149 Custom {
150 message: String,
151 status: StatusCode,
152 },
153}
154
155impl Error {
156 pub fn internal<M: Display>(message: M) -> Self {
157 Self::Custom {
158 message: message.to_string(),
159 status: StatusCode::INTERNAL_SERVER_ERROR,
160 }
161 }
162
163 pub fn status(&self) -> StatusCode {
164 match self {
165 Self::Request { .. } | Self::RangeLimit { .. } => StatusCode::BAD_REQUEST,
166 Self::FetchLeaf { .. }
167 | Self::FetchBlock { .. }
168 | Self::FetchTransaction { .. }
169 | Self::FetchHeader { .. }
170 | Self::FetchStateCert { .. } => StatusCode::NOT_FOUND,
171 Self::InvalidTransactionIndex { .. } | Self::Query { .. } => StatusCode::NOT_FOUND,
172 Self::Custom { status, .. } => *status,
173 }
174 }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(bound = "")]
179pub struct Leaf1QueryData<Types: NodeType> {
180 pub(crate) leaf: Leaf<Types>,
181 pub(crate) qc: QuorumCertificate<Types>,
182}
183
184impl<Types: NodeType> Leaf1QueryData<Types> {
185 pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
186 Self { leaf, qc }
187 }
188
189 pub fn leaf(&self) -> &Leaf<Types> {
190 &self.leaf
191 }
192
193 pub fn qc(&self) -> &QuorumCertificate<Types> {
194 &self.qc
195 }
196}
197
198fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
199 let quorum_proposal = QuorumProposal {
205 block_header: leaf2.block_header().clone(),
206 view_number: leaf2.view_number(),
207 justify_qc: leaf2.justify_qc().to_qc(),
208 upgrade_certificate: leaf2.upgrade_certificate(),
209 proposal_certificate: None,
210 };
211 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
212 if let Some(payload) = leaf2.block_payload() {
213 leaf.fill_block_payload_unchecked(payload);
214 }
215 leaf
216}
217
218fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
219 Leaf1QueryData {
220 leaf: downgrade_leaf(leaf.leaf),
221 qc: leaf.qc.to_qc(),
222 }
223}
224
225async fn get_leaf_handler<Types, State>(
226 req: tide_disco::RequestParams,
227 state: &State,
228 timeout: Duration,
229) -> Result<LeafQueryData<Types>, Error>
230where
231 State: 'static + Send + Sync + ReadState,
232 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
233 Types: NodeType,
234 Header<Types>: QueryableHeader<Types>,
235 Payload<Types>: QueryablePayload<Types>,
236{
237 let id = match req.opt_integer_param("height")? {
238 Some(height) => LeafId::Number(height),
239 None => LeafId::Hash(req.blob_param("hash")?),
240 };
241 let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
242 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
243 resource: id.to_string(),
244 })
245}
246
247async fn get_leaf_range_handler<Types, State>(
248 req: tide_disco::RequestParams,
249 state: &State,
250 timeout: Duration,
251 small_object_range_limit: usize,
252) -> Result<Vec<LeafQueryData<Types>>, Error>
253where
254 State: 'static + Send + Sync + ReadState,
255 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
256 Types: NodeType,
257 Header<Types>: QueryableHeader<Types>,
258 Payload<Types>: QueryablePayload<Types>,
259{
260 let from = req.integer_param::<_, usize>("from")?;
261 let until = req.integer_param("until")?;
262 enforce_range_limit(from, until, small_object_range_limit)?;
263
264 let leaves = state
265 .read(|state| state.get_leaf_range(from..until).boxed())
266 .await;
267 leaves
268 .enumerate()
269 .then(|(index, fetch)| async move {
270 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
271 resource: (index + from).to_string(),
272 })
273 })
274 .try_collect::<Vec<_>>()
275 .await
276}
277
278fn downgrade_vid_common_query_data<Types: NodeType>(
279 data: VidCommonQueryData<Types>,
280) -> Option<ADVZCommonQueryData<Types>> {
281 let VidCommonQueryData {
282 height,
283 block_hash,
284 payload_hash: VidCommitment::V0(payload_hash),
285 common: VidCommon::V0(common),
286 } = data
287 else {
288 return None;
289 };
290 Some(ADVZCommonQueryData {
291 height,
292 block_hash,
293 payload_hash,
294 common,
295 })
296}
297
298async fn get_vid_common_handler<Types, State>(
299 req: tide_disco::RequestParams,
300 state: &State,
301 timeout: Duration,
302) -> Result<VidCommonQueryData<Types>, Error>
303where
304 State: 'static + Send + Sync + ReadState,
305 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
306 Types: NodeType,
307 Header<Types>: QueryableHeader<Types>,
308 Payload<Types>: QueryablePayload<Types>,
309{
310 let id = if let Some(height) = req.opt_integer_param("height")? {
311 BlockId::Number(height)
312 } else if let Some(hash) = req.opt_blob_param("hash")? {
313 BlockId::Hash(hash)
314 } else {
315 BlockId::PayloadHash(req.blob_param("payload-hash")?)
316 };
317 let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
318 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
319 resource: id.to_string(),
320 })
321}
322
323pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
324 options: &Options,
325 _: Ver,
326 api_ver: semver::Version,
327) -> Result<Api<State, Error, Ver>, ApiError>
328where
329 State: 'static + Send + Sync + ReadState,
330 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
331 Header<Types>: QueryableHeader<Types>,
332 Payload<Types>: QueryablePayload<Types>,
333{
334 let mut api = load_api::<State, Error, Ver>(
335 options.api_path.as_ref(),
336 include_str!("../api/availability.toml"),
337 options.extensions.clone(),
338 )?;
339 let timeout = options.fetch_timeout;
340 let small_object_range_limit = options.small_object_range_limit;
341 let large_object_range_limit = options.large_object_range_limit;
342
343 api.with_version(api_ver.clone());
344
345 if api_ver.major == 0 {
353 api.at("get_leaf", move |req, state| {
354 get_leaf_handler(req, state, timeout)
355 .map(|res| res.map(downgrade_leaf_query_data))
356 .boxed()
357 })?;
358
359 api.at("get_leaf_range", move |req, state| {
360 get_leaf_range_handler(req, state, timeout, small_object_range_limit)
361 .map(|res| {
362 res.map(|r| {
363 r.into_iter()
364 .map(downgrade_leaf_query_data)
365 .collect::<Vec<Leaf1QueryData<_>>>()
366 })
367 })
368 .boxed()
369 })?;
370
371 api.stream("stream_leaves", move |req, state| {
372 async move {
373 let height = req.integer_param("height")?;
374 state
375 .read(|state| {
376 async move {
377 Ok(state
378 .subscribe_leaves(height)
379 .await
380 .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
381 }
382 .boxed()
383 })
384 .await
385 }
386 .try_flatten_stream()
387 .boxed()
388 })?;
389 } else {
390 api.at("get_leaf", move |req, state| {
391 get_leaf_handler(req, state, timeout).boxed()
392 })?;
393
394 api.at("get_leaf_range", move |req, state| {
395 get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
396 })?;
397
398 api.stream("stream_leaves", move |req, state| {
399 async move {
400 let height = req.integer_param("height")?;
401 state
402 .read(|state| {
403 async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
404 })
405 .await
406 }
407 .try_flatten_stream()
408 .boxed()
409 })?;
410 }
411
412 if api_ver.major == 0 {
415 api.at("get_vid_common", move |req, state| {
416 get_vid_common_handler(req, state, timeout)
417 .map(|r| match r {
418 Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
419 message: "Incompatible VID version.".to_string(),
420 status: StatusCode::BAD_REQUEST,
421 }),
422 Err(e) => Err(e),
423 })
424 .boxed()
425 })?
426 .stream("stream_vid_common", move |req, state| {
427 async move {
428 let height = req.integer_param("height")?;
429 state
430 .read(|state| {
431 async move {
432 Ok(state.subscribe_vid_common(height).await.map(|data| {
433 downgrade_vid_common_query_data(data).ok_or(Error::Custom {
434 message: "Incompatible VID version.".to_string(),
435 status: StatusCode::BAD_REQUEST,
436 })
437 }))
438 }
439 .boxed()
440 })
441 .await
442 }
443 .try_flatten_stream()
444 .boxed()
445 })?;
446 } else {
447 api.at("get_vid_common", move |req, state| {
448 get_vid_common_handler(req, state, timeout).boxed().boxed()
449 })?
450 .stream("stream_vid_common", move |req, state| {
451 async move {
452 let height = req.integer_param("height")?;
453 state
454 .read(|state| {
455 async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
456 })
457 .await
458 }
459 .try_flatten_stream()
460 .boxed()
461 })?;
462 }
463
464 api.at("get_header", move |req, state| {
465 async move {
466 let id = if let Some(height) = req.opt_integer_param("height")? {
467 BlockId::Number(height)
468 } else if let Some(hash) = req.opt_blob_param("hash")? {
469 BlockId::Hash(hash)
470 } else {
471 BlockId::PayloadHash(req.blob_param("payload-hash")?)
472 };
473 let fetch = state.read(|state| state.get_header(id).boxed()).await;
474 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
475 resource: id.to_string(),
476 })
477 }
478 .boxed()
479 })?
480 .at("get_header_range", move |req, state| {
481 async move {
482 let from = req.integer_param::<_, usize>("from")?;
483 let until = req.integer_param::<_, usize>("until")?;
484 enforce_range_limit(from, until, large_object_range_limit)?;
485
486 let headers = state
487 .read(|state| state.get_header_range(from..until).boxed())
488 .await;
489 headers
490 .enumerate()
491 .then(|(index, fetch)| async move {
492 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
493 resource: (index + from).to_string(),
494 })
495 })
496 .try_collect::<Vec<_>>()
497 .await
498 }
499 .boxed()
500 })?
501 .stream("stream_headers", move |req, state| {
502 async move {
503 let height = req.integer_param("height")?;
504 state
505 .read(|state| {
506 async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
507 })
508 .await
509 }
510 .try_flatten_stream()
511 .boxed()
512 })?
513 .at("get_block", move |req, state| {
514 async move {
515 let id = if let Some(height) = req.opt_integer_param("height")? {
516 BlockId::Number(height)
517 } else if let Some(hash) = req.opt_blob_param("hash")? {
518 BlockId::Hash(hash)
519 } else {
520 BlockId::PayloadHash(req.blob_param("payload-hash")?)
521 };
522 let fetch = state.read(|state| state.get_block(id).boxed()).await;
523 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
524 resource: id.to_string(),
525 })
526 }
527 .boxed()
528 })?
529 .at("get_block_range", move |req, state| {
530 async move {
531 let from = req.integer_param::<_, usize>("from")?;
532 let until = req.integer_param("until")?;
533 enforce_range_limit(from, until, large_object_range_limit)?;
534
535 let blocks = state
536 .read(|state| state.get_block_range(from..until).boxed())
537 .await;
538 blocks
539 .enumerate()
540 .then(|(index, fetch)| async move {
541 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
542 resource: (index + from).to_string(),
543 })
544 })
545 .try_collect::<Vec<_>>()
546 .await
547 }
548 .boxed()
549 })?
550 .stream("stream_blocks", move |req, state| {
551 async move {
552 let height = req.integer_param("height")?;
553 state
554 .read(|state| {
555 async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
556 })
557 .await
558 }
559 .try_flatten_stream()
560 .boxed()
561 })?
562 .at("get_payload", move |req, state| {
563 async move {
564 let id = if let Some(height) = req.opt_integer_param("height")? {
565 BlockId::Number(height)
566 } else if let Some(hash) = req.opt_blob_param("hash")? {
567 BlockId::PayloadHash(hash)
568 } else {
569 BlockId::Hash(req.blob_param("block-hash")?)
570 };
571 let fetch = state.read(|state| state.get_payload(id).boxed()).await;
572 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
573 resource: id.to_string(),
574 })
575 }
576 .boxed()
577 })?
578 .at("get_payload_range", move |req, state| {
579 async move {
580 let from = req.integer_param::<_, usize>("from")?;
581 let until = req.integer_param("until")?;
582 enforce_range_limit(from, until, large_object_range_limit)?;
583
584 let payloads = state
585 .read(|state| state.get_payload_range(from..until).boxed())
586 .await;
587 payloads
588 .enumerate()
589 .then(|(index, fetch)| async move {
590 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
591 resource: (index + from).to_string(),
592 })
593 })
594 .try_collect::<Vec<_>>()
595 .await
596 }
597 .boxed()
598 })?
599 .stream("stream_payloads", move |req, state| {
600 async move {
601 let height = req.integer_param("height")?;
602 state
603 .read(|state| {
604 async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
605 })
606 .await
607 }
608 .try_flatten_stream()
609 .boxed()
610 })?
611 .at("get_transaction_proof", move |req, state| {
612 async move {
613 let tx = get_transaction(req, state, timeout).await?;
614 let height = tx.block.height();
615 let vid = state
616 .read(|state| state.get_vid_common(height as usize))
617 .await
618 .with_timeout(timeout)
619 .await
620 .context(FetchBlockSnafu {
621 resource: height.to_string(),
622 })?;
623 let proof = tx.block.transaction_proof(&vid, &tx.index).context(
624 InvalidTransactionIndexSnafu {
625 height,
626 index: tx.transaction.index(),
627 },
628 )?;
629 Ok(TransactionWithProofQueryData::new(tx.transaction, proof))
630 }
631 .boxed()
632 })?
633 .at("get_transaction", move |req, state| {
634 async move { Ok(get_transaction(req, state, timeout).await?.transaction) }.boxed()
635 })?
636 .stream("stream_transactions", move |req, state| {
637 async move {
638 let height = req.integer_param::<_, usize>("height")?;
639
640 let namespace: Option<i64> = req
641 .opt_integer_param::<_, usize>("namespace")?
642 .map(|i| {
643 i.try_into().map_err(|err| Error::Custom {
644 message: format!(
645 "Invalid 'namespace': could not convert usize to i64: {err}"
646 ),
647 status: StatusCode::BAD_REQUEST,
648 })
649 })
650 .transpose()?;
651
652 state
653 .read(|state| {
654 async move {
655 Ok(state
656 .subscribe_blocks(height)
657 .await
658 .map(move |block| {
659 let transactions = block.enumerate().enumerate();
660 let header = block.header();
661 let filtered_txs = transactions
662 .filter_map(|(i, (index, _tx))| {
663 if let Some(requested_ns) = namespace {
664 let ns_id = QueryableHeader::<Types>::namespace_id(
665 header,
666 &index.ns_index,
667 )?;
668
669 if ns_id.into() != requested_ns {
670 return None;
671 }
672 }
673
674 let tx = block.transaction(&index)?;
675 TransactionQueryData::new(tx, &block, &index, i as u64)
676 })
677 .collect::<Vec<_>>();
678
679 futures::stream::iter(filtered_txs.into_iter().map(Ok))
680 })
681 .flatten())
682 }
683 .boxed()
684 })
685 .await
686 }
687 .try_flatten_stream()
688 .boxed()
689 })?
690 .at("get_block_summary", move |req, state| {
691 async move {
692 let id: usize = req.integer_param("height")?;
693
694 let fetch = state.read(|state| state.get_block(id).boxed()).await;
695 fetch
696 .with_timeout(timeout)
697 .await
698 .context(FetchBlockSnafu {
699 resource: id.to_string(),
700 })
701 .map(BlockSummaryQueryData::from)
702 }
703 .boxed()
704 })?
705 .at("get_block_summary_range", move |req, state| {
706 async move {
707 let from: usize = req.integer_param("from")?;
708 let until: usize = req.integer_param("until")?;
709 enforce_range_limit(from, until, large_object_range_limit)?;
710
711 let blocks = state
712 .read(|state| state.get_block_range(from..until).boxed())
713 .await;
714 let result: Vec<BlockSummaryQueryData<Types>> = blocks
715 .enumerate()
716 .then(|(index, fetch)| async move {
717 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
718 resource: (index + from).to_string(),
719 })
720 })
721 .map(|result| result.map(BlockSummaryQueryData::from))
722 .try_collect()
723 .await?;
724
725 Ok(result)
726 }
727 .boxed()
728 })?
729 .at("get_limits", move |_req, _state| {
730 async move {
731 Ok(Limits {
732 small_object_range_limit,
733 large_object_range_limit,
734 })
735 }
736 .boxed()
737 })?
738 .at("get_state_cert", move |req, state| {
739 async move {
740 let epoch = req.integer_param("epoch")?;
741 let fetch = state
742 .read(|state| state.get_state_cert(epoch).boxed())
743 .await;
744 fetch
745 .with_timeout(timeout)
746 .await
747 .context(FetchStateCertSnafu { epoch })
748 .map(StateCertQueryDataV1::from)
749 }
750 .boxed()
751 })?
752 .at("get_state_cert_v2", move |req, state| {
753 async move {
754 let epoch = req.integer_param("epoch")?;
755 let fetch = state
756 .read(|state| state.get_state_cert(epoch).boxed())
757 .await;
758 fetch
759 .with_timeout(timeout)
760 .await
761 .context(FetchStateCertSnafu { epoch })
762 }
763 .boxed()
764 })?;
765 Ok(api)
766}
767
768fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
769 if until.saturating_sub(from) > limit {
770 return Err(Error::RangeLimit { from, until, limit });
771 }
772 Ok(())
773}
774
775async fn get_transaction<Types, State>(
776 req: RequestParams,
777 state: &State,
778 timeout: Duration,
779) -> Result<BlockWithTransaction<Types>, Error>
780where
781 Types: NodeType,
782 Header<Types>: QueryableHeader<Types>,
783 Payload<Types>: QueryablePayload<Types>,
784 State: 'static + Send + Sync + ReadState,
785 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
786{
787 match req.opt_blob_param("hash")? {
788 Some(hash) => state
789 .read(|state| state.get_block_containing_transaction(hash).boxed())
790 .await
791 .with_timeout(timeout)
792 .await
793 .context(FetchTransactionSnafu {
794 resource: hash.to_string(),
795 }),
796 None => {
797 let height: u64 = req.integer_param("height")?;
798 let fetch = state
799 .read(|state| state.get_block(height as usize).boxed())
800 .await;
801 let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
802 resource: height.to_string(),
803 })?;
804 let i: u64 = req.integer_param("index")?;
805 let index = block
806 .payload()
807 .nth(block.metadata(), i as usize)
808 .context(InvalidTransactionIndexSnafu { height, index: i })?;
809 let transaction = block
810 .transaction(&index)
811 .context(InvalidTransactionIndexSnafu { height, index: i })?;
812 let transaction = TransactionQueryData::new(transaction, &block, &index, i)
813 .context(InvalidTransactionIndexSnafu { height, index: i })?;
814 Ok(BlockWithTransaction {
815 transaction,
816 block,
817 index,
818 })
819 },
820 }
821}
822
823#[cfg(test)]
824mod test {
825 use std::{fmt::Debug, time::Duration};
826
827 use async_lock::RwLock;
828 use committable::Committable;
829 use futures::future::FutureExt;
830 use hotshot_example_types::node_types::EpochsTestVersions;
831 use hotshot_types::{
832 data::Leaf2, simple_certificate::QuorumCertificate2,
833 traits::node_implementation::ConsensusTime,
834 };
835 use portpicker::pick_unused_port;
836 use serde::de::DeserializeOwned;
837 use surf_disco::{Client, Error as _};
838 use tempfile::TempDir;
839 use tide_disco::App;
840 use toml::toml;
841
842 use super::*;
843 use crate::{
844 data_source::{storage::AvailabilityStorage, ExtensibleDataSource, VersionedDataSource},
845 status::StatusDataSource,
846 task::BackgroundTask,
847 testing::{
848 consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
849 mocks::{mock_transaction, MockBase, MockHeader, MockPayload, MockTypes, MockVersions},
850 },
851 types::HeightIndexed,
852 ApiState, Error, Header,
853 };
854
855 async fn get_non_empty_blocks(
857 client: &Client<Error, MockBase>,
858 ) -> (
859 u64,
860 Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
861 ) {
862 let mut blocks = vec![];
863 for i in 1.. {
865 match client
866 .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
867 .send()
868 .await
869 {
870 Ok(block) => {
871 if !block.is_empty() {
872 let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
873 blocks.push((leaf, block));
874 }
875 },
876 Err(Error::Availability {
877 source: super::Error::FetchBlock { .. },
878 }) => {
879 tracing::info!(
880 "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
881 );
882 return (i, blocks);
883 },
884 Err(err) => panic!("unexpected error {err}"),
885 }
886 }
887 unreachable!()
888 }
889
890 async fn validate(client: &Client<Error, MockBase>, height: u64) {
891 for i in 0..height {
893 if ![0, 1, height / 2, height - 1].contains(&i) {
896 continue;
897 }
898 tracing::info!("validate block {i}/{height}");
899
900 let leaf: LeafQueryData<MockTypes> =
902 client.get(&format!("leaf/{i}")).send().await.unwrap();
903 assert_eq!(leaf.height(), i);
904 assert_eq!(
905 leaf,
906 client
907 .get(&format!("leaf/hash/{}", leaf.hash()))
908 .send()
909 .await
910 .unwrap()
911 );
912
913 let block: BlockQueryData<MockTypes> =
915 client.get(&format!("block/{i}")).send().await.unwrap();
916 let expected_payload = PayloadQueryData::from(block.clone());
917 assert_eq!(leaf.block_hash(), block.hash());
918 assert_eq!(block.height(), i);
919 assert_eq!(
920 block,
921 client
922 .get(&format!("block/hash/{}", block.hash()))
923 .send()
924 .await
925 .unwrap()
926 );
927 assert_eq!(
928 *block.header(),
929 client.get(&format!("header/{i}")).send().await.unwrap()
930 );
931 assert_eq!(
932 *block.header(),
933 client
934 .get(&format!("header/hash/{}", block.hash()))
935 .send()
936 .await
937 .unwrap()
938 );
939 assert_eq!(
940 expected_payload,
941 client.get(&format!("payload/{i}")).send().await.unwrap(),
942 );
943 assert_eq!(
944 expected_payload,
945 client
946 .get(&format!("payload/block-hash/{}", block.hash()))
947 .send()
948 .await
949 .unwrap(),
950 );
951 let common: VidCommonQueryData<MockTypes> = client
953 .get(&format!("vid/common/{}", block.height()))
954 .send()
955 .await
956 .unwrap();
957 assert_eq!(common.height(), block.height());
958 assert_eq!(common.block_hash(), block.hash());
959 assert_eq!(common.payload_hash(), block.payload_hash());
960 assert_eq!(
961 common,
962 client
963 .get(&format!("vid/common/hash/{}", block.hash()))
964 .send()
965 .await
966 .unwrap()
967 );
968
969 let block_summary = client
970 .get(&format!("block/summary/{i}"))
971 .send()
972 .await
973 .unwrap();
974 assert_eq!(
975 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
976 block_summary,
977 );
978 assert_eq!(block_summary.header(), block.header());
979 assert_eq!(block_summary.hash(), block.hash());
980 assert_eq!(block_summary.size(), block.size());
981 assert_eq!(block_summary.num_transactions(), block.num_transactions());
982
983 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
984 .get(&format!("block/summaries/{}/{}", 0, i))
985 .send()
986 .await
987 .unwrap();
988 assert_eq!(block_summaries.len() as u64, i);
989
990 assert_eq!(
995 block.payload(),
996 client
997 .get::<BlockQueryData<MockTypes>>(&format!(
998 "block/payload-hash/{}",
999 block.payload_hash()
1000 ))
1001 .send()
1002 .await
1003 .unwrap()
1004 .payload()
1005 );
1006 assert_eq!(
1007 block.payload_hash(),
1008 client
1009 .get::<Header<MockTypes>>(&format!(
1010 "header/payload-hash/{}",
1011 block.payload_hash()
1012 ))
1013 .send()
1014 .await
1015 .unwrap()
1016 .payload_commitment
1017 );
1018 assert_eq!(
1019 block.payload(),
1020 client
1021 .get::<PayloadQueryData<MockTypes>>(&format!(
1022 "payload/hash/{}",
1023 block.payload_hash()
1024 ))
1025 .send()
1026 .await
1027 .unwrap()
1028 .data(),
1029 );
1030 assert_eq!(
1031 common.common(),
1032 client
1033 .get::<VidCommonQueryData<MockTypes>>(&format!(
1034 "vid/common/payload-hash/{}",
1035 block.payload_hash()
1036 ))
1037 .send()
1038 .await
1039 .unwrap()
1040 .common()
1041 );
1042
1043 for (j, txn_from_block) in block.enumerate() {
1046 let txn: TransactionQueryData<MockTypes> = client
1047 .get(&format!("transaction/{}/{}/noproof", i, j.position))
1048 .send()
1049 .await
1050 .unwrap();
1051 assert_eq!(txn.block_height(), i);
1052 assert_eq!(txn.block_hash(), block.hash());
1053 assert_eq!(txn.index(), j.position as u64);
1054 assert_eq!(txn.hash(), txn_from_block.commit());
1055 assert_eq!(txn.transaction(), &txn_from_block);
1056 assert_eq!(
1061 txn.hash(),
1062 client
1063 .get::<TransactionQueryData<MockTypes>>(&format!(
1064 "transaction/hash/{}/noproof",
1065 txn.hash()
1066 ))
1067 .send()
1068 .await
1069 .unwrap()
1070 .hash()
1071 );
1072
1073 assert_eq!(
1075 txn.hash(),
1076 client
1077 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1078 "transaction/{}/{}/proof",
1079 i, j.position
1080 ))
1081 .send()
1082 .await
1083 .unwrap()
1084 .hash()
1085 );
1086 assert_eq!(
1087 txn.hash(),
1088 client
1089 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1090 "transaction/hash/{}/proof",
1091 txn.hash()
1092 ))
1093 .send()
1094 .await
1095 .unwrap()
1096 .hash()
1097 );
1098 }
1099
1100 let block_range: Vec<BlockQueryData<MockTypes>> = client
1101 .get(&format!("block/{}/{}", 0, i))
1102 .send()
1103 .await
1104 .unwrap();
1105
1106 assert_eq!(block_range.len() as u64, i);
1107
1108 let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1109 .get(&format!("leaf/{}/{}", 0, i))
1110 .send()
1111 .await
1112 .unwrap();
1113
1114 assert_eq!(leaf_range.len() as u64, i);
1115
1116 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1117 .get(&format!("payload/{}/{}", 0, i))
1118 .send()
1119 .await
1120 .unwrap();
1121
1122 assert_eq!(payload_range.len() as u64, i);
1123
1124 let header_range: Vec<Header<MockTypes>> = client
1125 .get(&format!("header/{}/{}", 0, i))
1126 .send()
1127 .await
1128 .unwrap();
1129
1130 assert_eq!(header_range.len() as u64, i);
1131 }
1132 }
1133
1134 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1135 async fn test_api() {
1136 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1138 network.start().await;
1139
1140 let port = pick_unused_port().unwrap();
1142 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1143 let options = Options {
1144 small_object_range_limit: 500,
1145 large_object_range_limit: 500,
1146 ..Default::default()
1147 };
1148
1149 app.register_module(
1150 "availability",
1151 define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1152 )
1153 .unwrap();
1154 network.spawn(
1155 "server",
1156 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1157 );
1158
1159 let client = Client::<Error, MockBase>::new(
1161 format!("http://localhost:{port}/availability")
1162 .parse()
1163 .unwrap(),
1164 );
1165 assert!(client.connect(Some(Duration::from_secs(60))).await);
1166 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1167
1168 let leaves = client
1171 .socket("stream/leaves/0")
1172 .subscribe::<LeafQueryData<MockTypes>>()
1173 .await
1174 .unwrap();
1175 let headers = client
1176 .socket("stream/headers/0")
1177 .subscribe::<Header<MockTypes>>()
1178 .await
1179 .unwrap();
1180 let blocks = client
1181 .socket("stream/blocks/0")
1182 .subscribe::<BlockQueryData<MockTypes>>()
1183 .await
1184 .unwrap();
1185 let vid_common = client
1186 .socket("stream/vid/common/0")
1187 .subscribe::<VidCommonQueryData<MockTypes>>()
1188 .await
1189 .unwrap();
1190 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1191 for nonce in 0..3 {
1192 let txn = mock_transaction(vec![nonce]);
1193 network.submit_transaction(txn).await;
1194
1195 let (i, leaf, block, common) = loop {
1197 tracing::info!("waiting for block with transaction {}", nonce);
1198 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1199 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1200 let leaf = leaf.unwrap();
1201 let header = header.unwrap();
1202 let block = block.unwrap();
1203 let common = common.unwrap();
1204 assert_eq!(leaf.height() as usize, i);
1205 assert_eq!(leaf.block_hash(), block.hash());
1206 assert_eq!(block.header(), &header);
1207 assert_eq!(common.height() as usize, i);
1208 if !block.is_empty() {
1209 break (i, leaf, block, common);
1210 }
1211 };
1212 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1213 assert_eq!(
1214 block,
1215 client.get(&format!("block/{i}")).send().await.unwrap()
1216 );
1217 assert_eq!(
1218 common,
1219 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1220 );
1221
1222 validate(&client, (i + 1) as u64).await;
1223 }
1224
1225 network.shut_down().await;
1226 }
1227
1228 async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1229 for i in 0..height {
1231 if ![0, 1, height / 2, height - 1].contains(&i) {
1234 continue;
1235 }
1236 tracing::info!("validate block {i}/{height}");
1237
1238 let leaf: Leaf1QueryData<MockTypes> =
1240 client.get(&format!("leaf/{i}")).send().await.unwrap();
1241 assert_eq!(leaf.leaf.height(), i);
1242 assert_eq!(
1243 leaf,
1244 client
1245 .get(&format!(
1246 "leaf/hash/{}",
1247 <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1248 ))
1249 .send()
1250 .await
1251 .unwrap()
1252 );
1253
1254 let block: BlockQueryData<MockTypes> =
1256 client.get(&format!("block/{i}")).send().await.unwrap();
1257 let expected_payload = PayloadQueryData::from(block.clone());
1258 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1259 assert_eq!(block.height(), i);
1260 assert_eq!(
1261 block,
1262 client
1263 .get(&format!("block/hash/{}", block.hash()))
1264 .send()
1265 .await
1266 .unwrap()
1267 );
1268 assert_eq!(
1269 *block.header(),
1270 client.get(&format!("header/{i}")).send().await.unwrap()
1271 );
1272 assert_eq!(
1273 *block.header(),
1274 client
1275 .get(&format!("header/hash/{}", block.hash()))
1276 .send()
1277 .await
1278 .unwrap()
1279 );
1280 assert_eq!(
1281 expected_payload,
1282 client.get(&format!("payload/{i}")).send().await.unwrap(),
1283 );
1284 assert_eq!(
1285 expected_payload,
1286 client
1287 .get(&format!("payload/block-hash/{}", block.hash()))
1288 .send()
1289 .await
1290 .unwrap(),
1291 );
1292 let common: ADVZCommonQueryData<MockTypes> = client
1294 .get(&format!("vid/common/{}", block.height()))
1295 .send()
1296 .await
1297 .unwrap();
1298 assert_eq!(common.height(), block.height());
1299 assert_eq!(common.block_hash(), block.hash());
1300 assert_eq!(
1301 VidCommitment::V0(common.payload_hash()),
1302 block.payload_hash(),
1303 );
1304 assert_eq!(
1305 common,
1306 client
1307 .get(&format!("vid/common/hash/{}", block.hash()))
1308 .send()
1309 .await
1310 .unwrap()
1311 );
1312
1313 let block_summary = client
1314 .get(&format!("block/summary/{i}"))
1315 .send()
1316 .await
1317 .unwrap();
1318 assert_eq!(
1319 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1320 block_summary,
1321 );
1322 assert_eq!(block_summary.header(), block.header());
1323 assert_eq!(block_summary.hash(), block.hash());
1324 assert_eq!(block_summary.size(), block.size());
1325 assert_eq!(block_summary.num_transactions(), block.num_transactions());
1326
1327 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1328 .get(&format!("block/summaries/{}/{}", 0, i))
1329 .send()
1330 .await
1331 .unwrap();
1332 assert_eq!(block_summaries.len() as u64, i);
1333
1334 assert_eq!(
1339 block.payload(),
1340 client
1341 .get::<BlockQueryData<MockTypes>>(&format!(
1342 "block/payload-hash/{}",
1343 block.payload_hash()
1344 ))
1345 .send()
1346 .await
1347 .unwrap()
1348 .payload()
1349 );
1350 assert_eq!(
1351 block.payload_hash(),
1352 client
1353 .get::<Header<MockTypes>>(&format!(
1354 "header/payload-hash/{}",
1355 block.payload_hash()
1356 ))
1357 .send()
1358 .await
1359 .unwrap()
1360 .payload_commitment
1361 );
1362 assert_eq!(
1363 block.payload(),
1364 client
1365 .get::<PayloadQueryData<MockTypes>>(&format!(
1366 "payload/hash/{}",
1367 block.payload_hash()
1368 ))
1369 .send()
1370 .await
1371 .unwrap()
1372 .data(),
1373 );
1374 assert_eq!(
1375 common.common(),
1376 client
1377 .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1378 "vid/common/payload-hash/{}",
1379 block.payload_hash()
1380 ))
1381 .send()
1382 .await
1383 .unwrap()
1384 .common()
1385 );
1386
1387 for (j, txn_from_block) in block.enumerate() {
1390 let txn: TransactionQueryData<MockTypes> = client
1391 .get(&format!("transaction/{}/{}/noproof", i, j.position))
1392 .send()
1393 .await
1394 .unwrap();
1395 assert_eq!(txn.block_height(), i);
1396 assert_eq!(txn.block_hash(), block.hash());
1397 assert_eq!(txn.index(), j.position as u64);
1398 assert_eq!(txn.hash(), txn_from_block.commit());
1399 assert_eq!(txn.transaction(), &txn_from_block);
1400 assert_eq!(
1405 txn.hash(),
1406 client
1407 .get::<TransactionQueryData<MockTypes>>(&format!(
1408 "transaction/hash/{}/noproof",
1409 txn.hash()
1410 ))
1411 .send()
1412 .await
1413 .unwrap()
1414 .hash()
1415 );
1416
1417 assert_eq!(
1418 txn.hash(),
1419 client
1420 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1421 "transaction/{}/{}/proof",
1422 i, j.position
1423 ))
1424 .send()
1425 .await
1426 .unwrap()
1427 .hash()
1428 );
1429
1430 assert_eq!(
1431 txn.hash(),
1432 client
1433 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1434 "transaction/hash/{}/proof",
1435 txn.hash()
1436 ))
1437 .send()
1438 .await
1439 .unwrap()
1440 .hash()
1441 );
1442 }
1443
1444 let block_range: Vec<BlockQueryData<MockTypes>> = client
1445 .get(&format!("block/{}/{}", 0, i))
1446 .send()
1447 .await
1448 .unwrap();
1449
1450 assert_eq!(block_range.len() as u64, i);
1451
1452 let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1453 .get(&format!("leaf/{}/{}", 0, i))
1454 .send()
1455 .await
1456 .unwrap();
1457
1458 assert_eq!(leaf_range.len() as u64, i);
1459
1460 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1461 .get(&format!("payload/{}/{}", 0, i))
1462 .send()
1463 .await
1464 .unwrap();
1465
1466 assert_eq!(payload_range.len() as u64, i);
1467
1468 let header_range: Vec<Header<MockTypes>> = client
1469 .get(&format!("header/{}/{}", 0, i))
1470 .send()
1471 .await
1472 .unwrap();
1473
1474 assert_eq!(header_range.len() as u64, i);
1475 }
1476 }
1477
1478 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1479 async fn test_api_epochs() {
1480 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
1482 let epoch_height = network.epoch_height();
1483 network.start().await;
1484
1485 let port = pick_unused_port().unwrap();
1487 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1488 app.register_module(
1489 "availability",
1490 define_api(
1491 &Default::default(),
1492 MockBase::instance(),
1493 "1.0.0".parse().unwrap(),
1494 )
1495 .unwrap(),
1496 )
1497 .unwrap();
1498 network.spawn(
1499 "server",
1500 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1501 );
1502
1503 let client = Client::<Error, MockBase>::new(
1505 format!("http://localhost:{port}/availability")
1506 .parse()
1507 .unwrap(),
1508 );
1509 assert!(client.connect(Some(Duration::from_secs(60))).await);
1510
1511 let headers = client
1514 .socket("stream/headers/0")
1515 .subscribe::<Header<MockTypes>>()
1516 .await
1517 .unwrap();
1518 let mut chain = headers.enumerate();
1519
1520 loop {
1521 let (i, header) = chain.next().await.unwrap();
1522 let header = header.unwrap();
1523 assert_eq!(header.height(), i as u64);
1524 if header.height() >= 3 * epoch_height {
1525 break;
1526 }
1527 }
1528
1529 for epoch in 1..4 {
1530 let state_cert: StateCertQueryDataV2<MockTypes> = client
1531 .get(&format!("state-cert-v2/{epoch}"))
1532 .send()
1533 .await
1534 .unwrap();
1535 tracing::info!("state-cert: {state_cert:?}");
1536 assert_eq!(state_cert.0.epoch.u64(), epoch);
1537 }
1538
1539 network.shut_down().await;
1540 }
1541
1542 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1543 async fn test_old_api() {
1544 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1546 network.start().await;
1547
1548 let port = pick_unused_port().unwrap();
1550
1551 let options = Options {
1552 small_object_range_limit: 500,
1553 large_object_range_limit: 500,
1554 ..Default::default()
1555 };
1556
1557 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1558 app.register_module(
1559 "availability",
1560 define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1561 )
1562 .unwrap();
1563 network.spawn(
1564 "server",
1565 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1566 );
1567
1568 let client = Client::<Error, MockBase>::new(
1570 format!("http://localhost:{port}/availability")
1571 .parse()
1572 .unwrap(),
1573 );
1574 assert!(client.connect(Some(Duration::from_secs(60))).await);
1575 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1576
1577 let leaves = client
1580 .socket("stream/leaves/0")
1581 .subscribe::<Leaf1QueryData<MockTypes>>()
1582 .await
1583 .unwrap();
1584 let headers = client
1585 .socket("stream/headers/0")
1586 .subscribe::<Header<MockTypes>>()
1587 .await
1588 .unwrap();
1589 let blocks = client
1590 .socket("stream/blocks/0")
1591 .subscribe::<BlockQueryData<MockTypes>>()
1592 .await
1593 .unwrap();
1594 let vid_common = client
1595 .socket("stream/vid/common/0")
1596 .subscribe::<ADVZCommonQueryData<MockTypes>>()
1597 .await
1598 .unwrap();
1599 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1600 for nonce in 0..3 {
1601 let txn = mock_transaction(vec![nonce]);
1602 network.submit_transaction(txn).await;
1603
1604 let (i, leaf, block, common) = loop {
1606 tracing::info!("waiting for block with transaction {}", nonce);
1607 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1608 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1609 let leaf = leaf.unwrap();
1610 let header = header.unwrap();
1611 let block = block.unwrap();
1612 let common = common.unwrap();
1613 assert_eq!(leaf.leaf.height() as usize, i);
1614 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1615 assert_eq!(block.header(), &header);
1616 assert_eq!(common.height() as usize, i);
1617 if !block.is_empty() {
1618 break (i, leaf, block, common);
1619 }
1620 };
1621 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1622 assert_eq!(
1623 block,
1624 client.get(&format!("block/{i}")).send().await.unwrap()
1625 );
1626 assert_eq!(
1627 common,
1628 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1629 );
1630
1631 validate_old(&client, (i + 1) as u64).await;
1632 }
1633
1634 network.shut_down().await;
1635 }
1636
1637 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1638 async fn test_extensions() {
1639 use hotshot_example_types::node_types::TestVersions;
1640
1641 let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1642 let data_source = ExtensibleDataSource::new(
1643 MockDataSource::create(dir.path(), Default::default())
1644 .await
1645 .unwrap(),
1646 0,
1647 );
1648
1649 let leaf =
1651 Leaf2::<MockTypes>::genesis::<MockVersions>(&Default::default(), &Default::default())
1652 .await;
1653 let qc =
1654 QuorumCertificate2::genesis::<TestVersions>(&Default::default(), &Default::default())
1655 .await;
1656 let leaf = LeafQueryData::new(leaf, qc).unwrap();
1657 let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1658 data_source
1659 .append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1660 .await
1661 .unwrap();
1662
1663 assert_eq!(
1665 ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1666 .await
1667 .unwrap(),
1668 1
1669 );
1670 assert_eq!(block, data_source.get_block(0).await.await);
1671
1672 let extensions = toml! {
1674 [route.post_ext]
1675 PATH = ["/ext/:val"]
1676 METHOD = "POST"
1677 ":val" = "Integer"
1678
1679 [route.get_ext]
1680 PATH = ["/ext"]
1681 METHOD = "GET"
1682 };
1683
1684 let mut api =
1685 define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1686 &Options {
1687 extensions: vec![extensions.into()],
1688 ..Default::default()
1689 },
1690 MockBase::instance(),
1691 "1.0.0".parse().unwrap(),
1692 )
1693 .unwrap();
1694 api.get("get_ext", |_, state| {
1695 async move { Ok(*state.as_ref()) }.boxed()
1696 })
1697 .unwrap()
1698 .post("post_ext", |req, state| {
1699 async move {
1700 *state.as_mut() = req.integer_param("val")?;
1701 Ok(())
1702 }
1703 .boxed()
1704 })
1705 .unwrap();
1706
1707 let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1708 app.register_module("availability", api).unwrap();
1709
1710 let port = pick_unused_port().unwrap();
1711 let _server = BackgroundTask::spawn(
1712 "server",
1713 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1714 );
1715
1716 let client = Client::<Error, MockBase>::new(
1717 format!("http://localhost:{port}/availability")
1718 .parse()
1719 .unwrap(),
1720 );
1721 assert!(client.connect(Some(Duration::from_secs(60))).await);
1722
1723 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1724 client.post::<()>("ext/42").send().await.unwrap();
1725 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1726
1727 assert_eq!(
1729 client
1730 .get::<MockHeader>("header/0")
1731 .send()
1732 .await
1733 .unwrap()
1734 .block_number,
1735 0
1736 );
1737 }
1738
1739 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1740 async fn test_range_limit() {
1741 let large_object_range_limit = 2;
1742 let small_object_range_limit = 3;
1743
1744 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1746 network.start().await;
1747
1748 let port = pick_unused_port().unwrap();
1750 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1751 app.register_module(
1752 "availability",
1753 define_api(
1754 &Options {
1755 large_object_range_limit,
1756 small_object_range_limit,
1757 ..Default::default()
1758 },
1759 MockBase::instance(),
1760 "1.0.0".parse().unwrap(),
1761 )
1762 .unwrap(),
1763 )
1764 .unwrap();
1765 network.spawn(
1766 "server",
1767 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1768 );
1769
1770 let client = Client::<Error, MockBase>::new(
1772 format!("http://localhost:{port}/availability")
1773 .parse()
1774 .unwrap(),
1775 );
1776 assert!(client.connect(Some(Duration::from_secs(60))).await);
1777
1778 assert_eq!(
1780 client.get::<Limits>("limits").send().await.unwrap(),
1781 Limits {
1782 small_object_range_limit,
1783 large_object_range_limit
1784 }
1785 );
1786
1787 client
1789 .socket("stream/blocks/0")
1790 .subscribe::<BlockQueryData<MockTypes>>()
1791 .await
1792 .unwrap()
1793 .take(small_object_range_limit + 1)
1794 .try_collect::<Vec<_>>()
1795 .await
1796 .unwrap();
1797
1798 async fn check_limit<T: DeserializeOwned + Debug>(
1799 client: &Client<Error, MockBase>,
1800 req: &str,
1801 limit: usize,
1802 ) {
1803 let range: Vec<T> = client
1804 .get(&format!("{req}/0/{limit}"))
1805 .send()
1806 .await
1807 .unwrap();
1808 assert_eq!(range.len(), limit);
1809 let err = client
1810 .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1811 .send()
1812 .await
1813 .unwrap_err();
1814 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1815 }
1816
1817 check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1818 check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1819 check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1820 check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1821 .await;
1822 check_limit::<BlockSummaryQueryData<MockTypes>>(
1823 &client,
1824 "block/summaries",
1825 large_object_range_limit,
1826 )
1827 .await;
1828
1829 network.shut_down().await;
1830 }
1831
1832 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1833 async fn test_header_endpoint() {
1834 let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init().await;
1836 network.start().await;
1837
1838 let port = pick_unused_port().unwrap();
1840 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1841 app.register_module(
1842 "availability",
1843 define_api(
1844 &Default::default(),
1845 MockBase::instance(),
1846 "1.0.0".parse().unwrap(),
1847 )
1848 .unwrap(),
1849 )
1850 .unwrap();
1851 network.spawn(
1852 "server",
1853 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1854 );
1855
1856 let ds = network.data_source();
1857
1858 let block_height = ds.block_height().await.unwrap();
1861 let fetch = ds
1862 .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1863 .await;
1864
1865 assert!(fetch.is_pending());
1866 let header = fetch.await;
1867 assert_eq!(header.height() as usize, block_height + 25);
1868
1869 network.shut_down().await;
1870 }
1871
1872 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1873 async fn test_leaf_only_ds() {
1874 let mut network = MockNetwork::<MockSqlDataSource, MockVersions>::init_with_leaf_ds().await;
1876 network.start().await;
1877
1878 let port = pick_unused_port().unwrap();
1880 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1881 app.register_module(
1882 "availability",
1883 define_api(
1884 &Default::default(),
1885 MockBase::instance(),
1886 "1.0.0".parse().unwrap(),
1887 )
1888 .unwrap(),
1889 )
1890 .unwrap();
1891 network.spawn(
1892 "server",
1893 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1894 );
1895
1896 let client = Client::<Error, MockBase>::new(
1898 format!("http://localhost:{port}/availability")
1899 .parse()
1900 .unwrap(),
1901 );
1902 assert!(client.connect(Some(Duration::from_secs(60))).await);
1903
1904 client
1906 .socket("stream/headers/0")
1907 .subscribe::<Header<MockTypes>>()
1908 .await
1909 .unwrap()
1910 .take(5)
1911 .try_collect::<Vec<_>>()
1912 .await
1913 .unwrap();
1914
1915 client
1917 .socket("stream/leaves/5")
1918 .subscribe::<LeafQueryData<MockTypes>>()
1919 .await
1920 .unwrap()
1921 .take(5)
1922 .try_collect::<Vec<_>>()
1923 .await
1924 .unwrap();
1925
1926 let ds = network.data_source();
1927
1928 let block_height = ds.block_height().await.unwrap();
1932 let target_block_height = block_height + 20;
1933 let fetch = ds
1934 .get_block(BlockId::<MockTypes>::Number(target_block_height))
1935 .await;
1936
1937 assert!(fetch.is_pending());
1938 let block = fetch.await;
1939 assert_eq!(block.height() as usize, target_block_height);
1940
1941 let mut tx = ds.read().await.unwrap();
1942 tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1943 .await
1944 .unwrap_err();
1945 drop(tx);
1946
1947 network.shut_down().await;
1948 }
1949}