1use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16 data::{ns_table, VidCommitment},
17 traits::{block_contents::BlockHeader, node_implementation::NodeType, EncodeBytes},
18 vid::{
19 advz::{advz_scheme, ADVZScheme},
20 avidm::{init_avidm_param, AvidMScheme},
21 },
22};
23use jf_advz::VidScheme;
24use surf_disco::{Client, Url};
25use vbs::{version::StaticVersionType, BinarySerializer};
26
27use super::Provider;
28use crate::{
29 availability::{
30 ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
31 PayloadQueryData, VidCommonQueryData,
32 },
33 fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
34 types::HeightIndexed,
35 Error, Header, Payload, VidCommon,
36};
37
38#[derive(Clone, Debug)]
43pub struct QueryServiceProvider<Ver: StaticVersionType> {
44 client: Client<Error, Ver>,
45}
46
47impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
48 pub fn new(url: Url, _: Ver) -> Self {
49 Self {
50 client: Client::new(url),
51 }
52 }
53}
54
55impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
56 async fn deserialize_legacy_payload<Types: NodeType>(
57 &self,
58 payload_bytes: Vec<u8>,
59 common_bytes: Vec<u8>,
60 req: PayloadRequest,
61 ) -> Option<Payload<Types>> {
62 let client_url = self.client.base_url();
63
64 let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
65 return None;
66 };
67
68 let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
69 &payload_bytes,
70 ) {
71 Ok(payload) => payload,
72 Err(err) => {
73 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
74 return None;
75 },
76 };
77
78 let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
79 &common_bytes,
80 ) {
81 Ok(common) => common,
82 Err(err) => {
83 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
84 return None;
85 },
86 };
87
88 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
89 let bytes = payload.data.encode();
90
91 let commit = advz_scheme(num_storage_nodes)
92 .commit_only(bytes)
93 .inspect_err(|err| {
94 tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
95 })
96 .ok()?;
97
98 if commit != advz_commit {
99 tracing::error!(
100 ?req,
101 expected_commit=%advz_commit,
102 actual_commit=%commit,
103 %client_url,
104 "received inconsistent legacy payload"
105 );
106 return None;
107 }
108
109 Some(payload.data)
110 }
111
112 async fn deserialize_legacy_vid_common<Types: NodeType>(
113 &self,
114 bytes: Vec<u8>,
115 req: VidCommonRequest,
116 ) -> Option<VidCommon> {
117 let client_url = self.client.base_url();
118 let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
119 return None;
120 };
121
122 match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
123 Ok(res) => {
124 if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
125 Some(VidCommon::V0(res.common))
126 } else {
127 tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
128 None
129 }
130 },
131 Err(err) => {
132 tracing::warn!(
133 %client_url,
134 ?req,
135 %err,
136 "failed to deserialize ADVZCommonQueryData"
137 );
138 None
139 },
140 }
141 }
142 async fn deserialize_legacy_leaf<Types: NodeType>(
143 &self,
144 bytes: Vec<u8>,
145 req: LeafRequest<Types>,
146 ) -> Option<LeafQueryData<Types>> {
147 let client_url = self.client.base_url();
148
149 match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
150 Ok(mut leaf) => {
151 if leaf.height() != req.height {
152 tracing::error!(
153 %client_url, ?req,
154 expected_height = req.height,
155 actual_height = leaf.height(),
156 "received leaf with the wrong height"
157 );
158 return None;
159 }
160
161 let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
162 let actual_leaf_commit: [u8; 32] = leaf.hash().into();
163 if actual_leaf_commit != expected_leaf_commit {
164 tracing::error!(
165 %client_url, ?req,
166 expected_leaf = %req.expected_leaf,
167 actual_leaf = %leaf.hash(),
168 "received leaf with the wrong hash"
169 );
170 return None;
171 }
172
173 let expected_qc_commit: [u8; 32] = req.expected_qc.into();
174 let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
175 if actual_qc_commit != expected_qc_commit {
176 tracing::error!(
177 %client_url, ?req,
178 expected_qc = %req.expected_qc,
179 actual_qc = %leaf.qc().commit(),
180 "received leaf with the wrong QC"
181 );
182 return None;
183 }
184
185 leaf.leaf.unfill_block_payload();
190
191 Some(leaf.into())
192 },
193 Err(err) => {
194 tracing::warn!(
195 %client_url, ?req, %err,
196 "failed to deserialize legacy LeafQueryData"
197 );
198 None
199 },
200 }
201 }
202}
203
204#[async_trait]
205impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
206where
207 Types: NodeType,
208{
209 async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
217 let client_url = self.client.base_url();
218 let req_hash = req.0;
219 let payload_bytes = self
223 .client
224 .get::<()>(&format!("availability/payload/hash/{}", req.0))
225 .bytes()
226 .await
227 .inspect_err(|err| {
228 tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
229 })
230 .ok()?;
231
232 let common_bytes = self
233 .client
234 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
235 .bytes()
236 .await
237 .inspect_err(|err| {
238 tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
239 })
240 .ok()?;
241
242 let payload =
243 vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
244 .inspect_err(|err| {
245 tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
246 })
247 .ok();
248
249 let common =
250 vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
251 .inspect_err(|err| {
252 tracing::info!(%err, %req_hash,
253 "error deserializing VidCommonQueryData",
254 );
255 })
256 .ok();
257
258 let (payload, common) = match (payload, common) {
259 (Some(payload), Some(common)) => (payload, common),
260 _ => {
261 tracing::info!(%req_hash, "falling back to legacy payload deserialization");
262
263 return self
265 .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
266 .await;
267 },
268 };
269
270 match common.common() {
271 VidCommon::V0(common) => {
272 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
273 let bytes = payload.data().encode();
274
275 let commit = advz_scheme(num_storage_nodes)
276 .commit_only(bytes)
277 .map(VidCommitment::V0)
278 .inspect_err(|err| {
279 tracing::error!(%err, %req_hash, %num_storage_nodes, "failed to compute VID commitment (V0)");
280 })
281 .ok()?;
282
283 if commit != req.0 {
284 tracing::error!(
285 expected = %req_hash,
286 actual = ?commit,
287 %client_url,
288 "VID commitment mismatch (V0)"
289 );
290
291 return None;
292 }
293 },
294 VidCommon::V1(common) => {
295 let bytes = payload.data().encode();
296
297 let avidm_param = init_avidm_param(common.total_weights)
298 .inspect_err(|err| {
299 tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
300 })
301 .ok()?;
302
303 let header = self
304 .client
305 .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
306 .send()
307 .await
308 .inspect_err(|err| {
309 tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
310 })
311 .ok()?;
312
313 if header.payload_commitment() != req.0 {
314 tracing::error!(
315 expected = %req_hash,
316 actual = %header.payload_commitment(),
317 %client_url,
318 "header payload commitment mismatch (V1)"
319 );
320 return None;
321 }
322
323 let metadata = header.metadata().encode();
324 let commit = AvidMScheme::commit(
325 &avidm_param,
326 &bytes,
327 ns_table::parse_ns_table(bytes.len(), &metadata),
328 )
329 .map(VidCommitment::V1)
330 .inspect_err(|err| {
331 tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
332 })
333 .ok()?;
334
335 if commit != req.0 {
337 tracing::warn!(
338 expected = %req_hash,
339 actual = %commit,
340 %client_url,
341 "commitment type mismatch for AVIDM check"
342 );
343 return None;
344 }
345 },
346 }
347
348 Some(payload.data)
349 }
350}
351
352#[async_trait]
353impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
354 for QueryServiceProvider<Ver>
355where
356 Types: NodeType,
357{
358 async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
366 let client_url = self.client.base_url();
367
368 let bytes = self
369 .client
370 .get::<()>(&format!("availability/leaf/{}", req.height))
371 .bytes()
372 .await;
373 let bytes = match bytes {
374 Ok(bytes) => bytes,
375 Err(err) => {
376 tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
377
378 return None;
379 },
380 };
381
382 match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
385 Ok(mut leaf) => {
386 if leaf.height() != req.height {
387 tracing::error!(
388 %client_url, ?req, ?leaf,
389 expected_height = req.height,
390 actual_height = leaf.height(),
391 "received leaf with the wrong height"
392 );
393 return None;
394 }
395 if leaf.hash() != req.expected_leaf {
396 tracing::error!(
397 %client_url, ?req, ?leaf,
398 expected_hash = %req.expected_leaf,
399 actual_hash = %leaf.hash(),
400 "received leaf with the wrong hash"
401 );
402 return None;
403 }
404 if leaf.qc().commit() != req.expected_qc {
405 tracing::error!(
406 %client_url, ?req, ?leaf,
407 expected_qc = %req.expected_qc,
408 actual_qc = %leaf.qc().commit(),
409 "received leaf with the wrong QC"
410 );
411 return None;
412 }
413
414 leaf.leaf.unfill_block_payload();
419
420 Some(leaf)
421 },
422 Err(err) => {
423 tracing::info!(
424 ?req, %err,
425 "failed to deserialize LeafQueryData, falling back to legacy deserialization"
426 );
427 self.deserialize_legacy_leaf(bytes, req).await
429 },
430 }
431 }
432}
433
434#[async_trait]
435impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
436where
437 Types: NodeType,
438{
439 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
447 let client_url = self.client.base_url();
448 let bytes = self
449 .client
450 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
451 .bytes()
452 .await;
453 let bytes = match bytes {
454 Ok(bytes) => bytes,
455 Err(err) => {
456 tracing::info!(
457 %client_url, ?req, %err,
458 "failed to fetch VID common bytes"
459 );
460 return None;
461 },
462 };
463
464 match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
465 Ok(res) => match req.0 {
466 VidCommitment::V0(commit) => {
467 if let VidCommon::V0(common) = res.common {
468 if ADVZScheme::is_consistent(&commit, &common).is_ok() {
469 Some(VidCommon::V0(common))
470 } else {
471 tracing::error!(
472 %client_url, ?req, ?commit, ?common,
473 "VID V0 common data is inconsistent with commitment"
474 );
475 None
476 }
477 } else {
478 tracing::warn!(?req, ?res, "Expect VID common data but found None");
479 None
480 }
481 },
482 VidCommitment::V1(_) => {
483 if let VidCommon::V1(common) = res.common {
484 Some(VidCommon::V1(common))
485 } else {
486 tracing::warn!(?req, ?res, "Expect VID common data but found None");
487 None
488 }
489 },
490 },
491 Err(err) => {
492 tracing::info!(
493 %client_url, ?req, %err,
494 "failed to deserialize as V1 VID common data, trying legacy fallback"
495 );
496 self.deserialize_legacy_vid_common::<Types>(bytes, req)
498 .await
499 },
500 }
501 }
502}
503
504#[cfg(all(test, not(target_os = "windows")))]
506mod test {
507 use std::{future::IntoFuture, time::Duration};
508
509 use committable::Committable;
510 use futures::{
511 future::{join, FutureExt},
512 stream::StreamExt,
513 };
514 #[allow(deprecated)]
517 use generic_array::GenericArray;
518 use hotshot_example_types::node_types::{EpochsTestVersions, TestVersions};
519 use hotshot_types::traits::node_implementation::Versions;
520 use portpicker::pick_unused_port;
521 use rand::RngCore;
522 use tide_disco::{error::ServerError, App};
523 use vbs::version::StaticVersion;
524
525 use super::*;
526 use crate::{
527 api::load_api,
528 availability::{
529 define_api, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData,
530 BlockWithTransaction, Fetch, UpdateAvailabilityData,
531 },
532 data_source::{
533 sql::{self, SqlDataSource},
534 storage::{
535 fail_storage::{FailStorage, FailableAction},
536 pruning::{PrunedHeightStorage, PrunerCfg},
537 sql::testing::TmpDb,
538 AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
539 },
540 AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
541 },
542 fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
543 node::{data_source::NodeDataSource, SyncStatus},
544 task::BackgroundTask,
545 testing::{
546 consensus::{MockDataSource, MockNetwork},
547 mocks::{mock_transaction, MockBase, MockTypes, MockVersions},
548 sleep,
549 },
550 types::HeightIndexed,
551 ApiState,
552 };
553
554 type Provider = TestProvider<QueryServiceProvider<MockBase>>;
555 type EpochProvider = TestProvider<QueryServiceProvider<<EpochsTestVersions as Versions>::Base>>;
556
557 fn ignore<T>(_: T) {}
558
559 async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
561 db: &TmpDb,
562 provider: &P,
563 ) -> sql::Builder<MockTypes, P> {
564 db.config()
565 .builder((*provider).clone())
566 .await
567 .unwrap()
568 .disable_proactive_fetching()
571 }
572
573 async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
575 db: &TmpDb,
576 provider: &P,
577 ) -> SqlDataSource<MockTypes, P> {
578 builder(db, provider).await.build().await.unwrap()
579 }
580
581 #[test_log::test(tokio::test(flavor = "multi_thread"))]
582 async fn test_fetch_on_request() {
583 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
585
586 let port = pick_unused_port().unwrap();
588 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
589 app.register_module(
590 "availability",
591 define_api(
592 &Default::default(),
593 MockBase::instance(),
594 "1.0.0".parse().unwrap(),
595 )
596 .unwrap(),
597 )
598 .unwrap();
599 network.spawn(
600 "server",
601 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
602 );
603
604 let db = TmpDb::init().await;
606 let provider = Provider::new(QueryServiceProvider::new(
607 format!("http://localhost:{port}").parse().unwrap(),
608 MockBase::instance(),
609 ));
610 let data_source = data_source(&db, &provider).await;
611
612 network.start().await;
614
615 let leaves = network.data_source().subscribe_leaves(1).await;
622 let leaves = leaves.take(5).collect::<Vec<_>>().await;
623 let test_leaf = &leaves[0];
624 let test_block = &leaves[1];
625 let test_payload = &leaves[2];
626 let test_common = &leaves[3];
627
628 tracing::info!("requesting unfetchable resources");
630 let mut fetches = vec![];
631 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
633 fetches.push(
635 data_source
636 .get_leaf(test_leaf.height() as usize)
637 .await
638 .map(ignore),
639 );
640 fetches.push(
642 data_source
643 .get_block(test_block.block_hash())
644 .await
645 .map(ignore),
646 );
647 fetches.push(
648 data_source
649 .get_payload(test_payload.block_hash())
650 .await
651 .map(ignore),
652 );
653 fetches.push(
654 data_source
655 .get_vid_common(test_common.block_hash())
656 .await
657 .map(ignore),
658 );
659 fetches.push(
661 data_source
662 .get_block(test_block.height() as usize)
663 .await
664 .map(ignore),
665 );
666 fetches.push(
667 data_source
668 .get_payload(test_payload.height() as usize)
669 .await
670 .map(ignore),
671 );
672 fetches.push(
673 data_source
674 .get_vid_common(test_common.height() as usize)
675 .await
676 .map(ignore),
677 );
678 fetches.push(data_source.get_vid_common(0).await.map(ignore));
680 fetches.push(
682 data_source
683 .get_block_containing_transaction(mock_transaction(vec![]).commit())
684 .await
685 .map(ignore),
686 );
687
688 sleep(Duration::from_secs(1)).await;
691 for (i, fetch) in fetches.into_iter().enumerate() {
692 tracing::info!("checking fetch {i} is unresolved");
693 fetch.try_resolve().unwrap_err();
694 }
695
696 provider.block().await;
701 data_source
702 .append(leaves.last().cloned().unwrap().into())
703 .await
704 .unwrap();
705
706 tracing::info!("requesting fetchable resources");
707 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
708 let req_block = data_source.get_block(test_block.height() as usize).await;
709 let req_payload = data_source
710 .get_payload(test_payload.height() as usize)
711 .await;
712 let req_common = data_source
713 .get_vid_common(test_common.height() as usize)
714 .await;
715
716 sleep(Duration::from_secs(1)).await;
722 req_leaf.try_resolve().unwrap_err();
723 req_block.try_resolve().unwrap_err();
724 req_payload.try_resolve().unwrap_err();
725 req_common.try_resolve().unwrap_err();
726
727 provider.unblock().await;
729 let leaf = data_source
730 .get_leaf(test_leaf.height() as usize)
731 .await
732 .await;
733 let block = data_source
734 .get_block(test_block.height() as usize)
735 .await
736 .await;
737 let payload = data_source
738 .get_payload(test_payload.height() as usize)
739 .await
740 .await;
741 let common = data_source
742 .get_vid_common(test_common.height() as usize)
743 .await
744 .await;
745 {
746 let truth = network.data_source();
748 assert_eq!(
749 leaf,
750 truth.get_leaf(test_leaf.height() as usize).await.await
751 );
752 assert_eq!(
753 block,
754 truth.get_block(test_block.height() as usize).await.await
755 );
756 assert_eq!(
757 payload,
758 truth
759 .get_payload(test_payload.height() as usize)
760 .await
761 .await
762 );
763 assert_eq!(
764 common,
765 truth
766 .get_vid_common(test_common.height() as usize)
767 .await
768 .await
769 );
770 }
771
772 provider.block().await;
777 for leaf in [test_block, test_payload] {
778 tracing::info!("fetching existing leaf {}", leaf.height());
779 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
780 assert_eq!(*leaf, fetched_leaf);
781 }
782
783 tracing::info!("fetching block by hash");
788 provider.unblock().await;
789 {
790 let block = data_source.get_block(test_leaf.block_hash()).await.await;
791 assert_eq!(block.hash(), leaf.block_hash());
792 }
793
794 tracing::info!("fetching payload by hash");
798 {
799 let leaf = leaves.last().unwrap();
800 let payload = data_source.get_payload(leaf.block_hash()).await.await;
801 assert_eq!(payload.height(), leaf.height());
802 assert_eq!(payload.block_hash(), leaf.block_hash());
803 assert_eq!(payload.hash(), leaf.payload_hash());
804 }
805 }
806
807 #[tokio::test(flavor = "multi_thread")]
808 async fn test_fetch_on_request_epoch_version() {
809 tracing::info!("Starting test_fetch_on_request_epoch_version");
812
813 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
815
816 let port = pick_unused_port().unwrap();
818 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
819 app.register_module(
820 "availability",
821 define_api(
822 &Default::default(),
823 <EpochsTestVersions as Versions>::Base::instance(),
824 "1.0.0".parse().unwrap(),
825 )
826 .unwrap(),
827 )
828 .unwrap();
829 network.spawn(
830 "server",
831 app.serve(
832 format!("0.0.0.0:{port}"),
833 <EpochsTestVersions as Versions>::Base::instance(),
834 ),
835 );
836
837 let db = TmpDb::init().await;
840 let provider = EpochProvider::new(QueryServiceProvider::new(
841 format!("http://localhost:{port}").parse().unwrap(),
842 <EpochsTestVersions as Versions>::Base::instance(),
843 ));
844 let data_source = data_source(&db, &provider).await;
845
846 network.start().await;
848
849 let leaves = network.data_source().subscribe_leaves(1).await;
856 let leaves = leaves.take(5).collect::<Vec<_>>().await;
857 let test_leaf = &leaves[0];
858 let test_block = &leaves[1];
859 let test_payload = &leaves[2];
860 let test_common = &leaves[3];
861
862 let mut fetches = vec![];
864 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
866 fetches.push(
868 data_source
869 .get_leaf(test_leaf.height() as usize)
870 .await
871 .map(ignore),
872 );
873 fetches.push(
875 data_source
876 .get_block(test_block.block_hash())
877 .await
878 .map(ignore),
879 );
880 fetches.push(
881 data_source
882 .get_payload(test_payload.block_hash())
883 .await
884 .map(ignore),
885 );
886 fetches.push(
887 data_source
888 .get_vid_common(test_common.block_hash())
889 .await
890 .map(ignore),
891 );
892 fetches.push(
894 data_source
895 .get_block(test_block.height() as usize)
896 .await
897 .map(ignore),
898 );
899 fetches.push(
900 data_source
901 .get_payload(test_payload.height() as usize)
902 .await
903 .map(ignore),
904 );
905 fetches.push(
906 data_source
907 .get_vid_common(test_common.height() as usize)
908 .await
909 .map(ignore),
910 );
911 fetches.push(data_source.get_vid_common(0).await.map(ignore));
913 fetches.push(
915 data_source
916 .get_block_containing_transaction(mock_transaction(vec![]).commit())
917 .await
918 .map(ignore),
919 );
920
921 sleep(Duration::from_secs(1)).await;
924 for (i, fetch) in fetches.into_iter().enumerate() {
925 tracing::info!("checking fetch {i} is unresolved");
926 fetch.try_resolve().unwrap_err();
927 }
928
929 provider.block().await;
934 data_source
935 .append(leaves.last().cloned().unwrap().into())
936 .await
937 .unwrap();
938
939 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
940 let req_block = data_source.get_block(test_block.height() as usize).await;
941 let req_payload = data_source
942 .get_payload(test_payload.height() as usize)
943 .await;
944 let req_common = data_source
945 .get_vid_common(test_common.height() as usize)
946 .await;
947
948 sleep(Duration::from_secs(1)).await;
954 req_leaf.try_resolve().unwrap_err();
955 req_block.try_resolve().unwrap_err();
956 req_payload.try_resolve().unwrap_err();
957 req_common.try_resolve().unwrap_err();
958
959 provider.unblock().await;
961 let leaf = data_source
962 .get_leaf(test_leaf.height() as usize)
963 .await
964 .await;
965 let block = data_source
966 .get_block(test_block.height() as usize)
967 .await
968 .await;
969 let payload = data_source
970 .get_payload(test_payload.height() as usize)
971 .await
972 .await;
973 let common = data_source
974 .get_vid_common(test_common.height() as usize)
975 .await
976 .await;
977 {
978 let truth = network.data_source();
980 assert_eq!(
981 leaf,
982 truth.get_leaf(test_leaf.height() as usize).await.await
983 );
984 assert_eq!(
985 block,
986 truth.get_block(test_block.height() as usize).await.await
987 );
988 assert_eq!(
989 payload,
990 truth
991 .get_payload(test_payload.height() as usize)
992 .await
993 .await
994 );
995 assert_eq!(
996 common,
997 truth
998 .get_vid_common(test_common.height() as usize)
999 .await
1000 .await
1001 );
1002 }
1003
1004 provider.block().await;
1009 for leaf in [test_block, test_payload] {
1010 tracing::info!("fetching existing leaf {}", leaf.height());
1011 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1012 assert_eq!(*leaf, fetched_leaf);
1013 }
1014
1015 provider.unblock().await;
1020 {
1021 let block = data_source.get_block(test_leaf.block_hash()).await.await;
1022 assert_eq!(block.hash(), leaf.block_hash());
1023 }
1024
1025 {
1029 let leaf = leaves.last().unwrap();
1030 let payload = data_source.get_payload(leaf.block_hash()).await.await;
1031 assert_eq!(payload.height(), leaf.height());
1032 assert_eq!(payload.block_hash(), leaf.block_hash());
1033 assert_eq!(payload.hash(), leaf.payload_hash());
1034 }
1035
1036 tracing::info!("Test completed successfully!");
1038 }
1039
1040 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1041 async fn test_fetch_block_and_leaf_concurrently() {
1042 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1044
1045 let port = pick_unused_port().unwrap();
1047 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1048 app.register_module(
1049 "availability",
1050 define_api(
1051 &Default::default(),
1052 MockBase::instance(),
1053 "1.0.0".parse().unwrap(),
1054 )
1055 .unwrap(),
1056 )
1057 .unwrap();
1058 network.spawn(
1059 "server",
1060 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1061 );
1062
1063 let db = TmpDb::init().await;
1065 let provider = Provider::new(QueryServiceProvider::new(
1066 format!("http://localhost:{port}").parse().unwrap(),
1067 MockBase::instance(),
1068 ));
1069 let data_source = data_source(&db, &provider).await;
1070
1071 network.start().await;
1073
1074 let leaves = network.data_source().subscribe_leaves(1).await;
1077 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1078 let test_leaf = &leaves[0];
1079
1080 data_source.append(leaves[1].clone().into()).await.unwrap();
1082
1083 let (leaf, block) = join(
1087 data_source
1088 .get_leaf(test_leaf.height() as usize)
1089 .await
1090 .into_future(),
1091 data_source
1092 .get_block(test_leaf.height() as usize)
1093 .await
1094 .into_future(),
1095 )
1096 .await;
1097 assert_eq!(leaf, *test_leaf);
1098 assert_eq!(leaf.header(), block.header());
1099 }
1100
1101 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1102 async fn test_fetch_different_blocks_same_payload() {
1103 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1105
1106 let port = pick_unused_port().unwrap();
1108 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1109 app.register_module(
1110 "availability",
1111 define_api(
1112 &Default::default(),
1113 MockBase::instance(),
1114 "1.0.0".parse().unwrap(),
1115 )
1116 .unwrap(),
1117 )
1118 .unwrap();
1119 network.spawn(
1120 "server",
1121 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1122 );
1123
1124 let db = TmpDb::init().await;
1126 let provider = Provider::new(QueryServiceProvider::new(
1127 format!("http://localhost:{port}").parse().unwrap(),
1128 MockBase::instance(),
1129 ));
1130 let data_source = data_source(&db, &provider).await;
1131
1132 network.start().await;
1134
1135 let leaves = network.data_source().subscribe_leaves(1).await;
1138 let leaves = leaves.take(4).collect::<Vec<_>>().await;
1139
1140 data_source
1143 .append(leaves.last().cloned().unwrap().into())
1144 .await
1145 .unwrap();
1146
1147 assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1149 let (block1, block2) = join(
1152 data_source
1153 .get_block(leaves[0].height() as usize)
1154 .await
1155 .into_future(),
1156 data_source
1157 .get_block(leaves[1].height() as usize)
1158 .await
1159 .into_future(),
1160 )
1161 .await;
1162 assert_eq!(block1.header(), leaves[0].header());
1163 assert_eq!(block2.header(), leaves[1].header());
1164 }
1165
1166 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1167 async fn test_fetch_stream() {
1168 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1170
1171 let port = pick_unused_port().unwrap();
1173 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1174 app.register_module(
1175 "availability",
1176 define_api(
1177 &Default::default(),
1178 MockBase::instance(),
1179 "1.0.0".parse().unwrap(),
1180 )
1181 .unwrap(),
1182 )
1183 .unwrap();
1184 network.spawn(
1185 "server",
1186 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1187 );
1188
1189 let db = TmpDb::init().await;
1191 let provider = Provider::new(QueryServiceProvider::new(
1192 format!("http://localhost:{port}").parse().unwrap(),
1193 MockBase::instance(),
1194 ));
1195 let data_source = data_source(&db, &provider).await;
1196
1197 network.start().await;
1199
1200 let blocks = data_source.subscribe_blocks(0).await;
1202 let leaves = data_source.subscribe_leaves(0).await;
1203 let common = data_source.subscribe_vid_common(0).await;
1204
1205 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1207 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1208
1209 data_source
1212 .append(finalized_leaves.last().cloned().unwrap().into())
1213 .await
1214 .unwrap();
1215
1216 let blocks = blocks.take(5).collect::<Vec<_>>().await;
1218 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1219 let common = common.take(5).collect::<Vec<_>>().await;
1220 for i in 0..5 {
1221 tracing::info!("checking block {i}");
1222 assert_eq!(leaves[i], finalized_leaves[i]);
1223 assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1224 assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1225 }
1226 }
1227
1228 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1229 async fn test_fetch_range_start() {
1230 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1232
1233 let port = pick_unused_port().unwrap();
1235 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1236 app.register_module(
1237 "availability",
1238 define_api(
1239 &Default::default(),
1240 MockBase::instance(),
1241 "1.0.0".parse().unwrap(),
1242 )
1243 .unwrap(),
1244 )
1245 .unwrap();
1246 network.spawn(
1247 "server",
1248 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1249 );
1250
1251 let db = TmpDb::init().await;
1253 let provider = Provider::new(QueryServiceProvider::new(
1254 format!("http://localhost:{port}").parse().unwrap(),
1255 MockBase::instance(),
1256 ));
1257 let data_source = data_source(&db, &provider).await;
1258
1259 network.start().await;
1261
1262 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1264 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1265
1266 let mut tx = data_source.write().await.unwrap();
1270 tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1271 tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1272 tx.commit().await.unwrap();
1273
1274 let leaves = data_source
1276 .get_leaf_range(..5)
1277 .await
1278 .then(Fetch::resolve)
1279 .collect::<Vec<_>>()
1280 .await;
1281 for i in 0..5 {
1282 tracing::info!("checking leaf {i}");
1283 assert_eq!(leaves[i], finalized_leaves[i]);
1284 }
1285 }
1286
1287 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1288 async fn fetch_transaction() {
1289 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1291
1292 let port = pick_unused_port().unwrap();
1294 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1295 app.register_module(
1296 "availability",
1297 define_api(
1298 &Default::default(),
1299 MockBase::instance(),
1300 "1.0.0".parse().unwrap(),
1301 )
1302 .unwrap(),
1303 )
1304 .unwrap();
1305 network.spawn(
1306 "server",
1307 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1308 );
1309
1310 let db = TmpDb::init().await;
1313 let data_source = data_source(&db, &NoFetching).await;
1314
1315 let mut leaves = network.data_source().subscribe_leaves(1).await;
1317 let mut blocks = network.data_source().subscribe_blocks(1).await;
1318
1319 network.start().await;
1321
1322 let tx = mock_transaction(vec![1, 2, 3]);
1326 let fut = data_source
1327 .get_block_containing_transaction(tx.commit())
1328 .await;
1329
1330 network.submit_transaction(tx.clone()).await;
1332
1333 let block = loop {
1336 let leaf = leaves.next().await.unwrap();
1337 let block = blocks.next().await.unwrap();
1338
1339 data_source
1340 .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1341 .await
1342 .unwrap();
1343
1344 if block.transaction_by_hash(tx.commit()).is_some() {
1345 break block;
1346 }
1347 };
1348 tracing::info!("transaction included in block {}", block.height());
1349
1350 let fetched_tx = fut.await;
1351 assert_eq!(
1352 fetched_tx,
1353 BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1354 );
1355
1356 assert_eq!(
1358 fetched_tx,
1359 data_source
1360 .get_block_containing_transaction(tx.commit())
1361 .await
1362 .await
1363 );
1364 }
1365
1366 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1367 async fn test_retry() {
1368 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1370
1371 let port = pick_unused_port().unwrap();
1373 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1374 app.register_module(
1375 "availability",
1376 define_api(
1377 &Default::default(),
1378 MockBase::instance(),
1379 "1.0.0".parse().unwrap(),
1380 )
1381 .unwrap(),
1382 )
1383 .unwrap();
1384 network.spawn(
1385 "server",
1386 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1387 );
1388
1389 let db = TmpDb::init().await;
1391 let provider = Provider::new(QueryServiceProvider::new(
1392 format!("http://localhost:{port}").parse().unwrap(),
1393 MockBase::instance(),
1394 ));
1395 let data_source = builder(&db, &provider)
1396 .await
1397 .with_max_retry_interval(Duration::from_secs(1))
1398 .build()
1399 .await
1400 .unwrap();
1401
1402 network.start().await;
1404
1405 let leaves = network.data_source().subscribe_leaves(1).await;
1408 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1409 let test_leaf = &leaves[0];
1410
1411 provider.fail();
1413
1414 data_source
1417 .append(leaves.last().cloned().unwrap().into())
1418 .await
1419 .unwrap();
1420
1421 tracing::info!("requesting leaf from failing providers");
1422 let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1423
1424 sleep(Duration::from_secs(5)).await;
1427 fut.try_resolve().unwrap_err();
1428
1429 provider.unfail();
1431 assert_eq!(
1432 data_source
1433 .get_leaf(test_leaf.height() as usize)
1434 .await
1435 .await,
1436 *test_leaf
1437 );
1438 }
1439
1440 #[allow(deprecated)]
1442 fn random_vid_commit() -> VidCommitment {
1443 let mut bytes = [0; 32];
1444 rand::thread_rng().fill_bytes(&mut bytes);
1445 VidCommitment::V0(GenericArray::from(bytes).into())
1446 }
1447
1448 async fn malicious_server(port: u16) {
1449 let mut api = load_api::<(), ServerError, MockBase>(
1450 None::<std::path::PathBuf>,
1451 include_str!("../../../api/availability.toml"),
1452 vec![],
1453 )
1454 .unwrap();
1455
1456 api.get("get_payload", move |_, _| {
1457 async move {
1458 Ok(PayloadQueryData::<MockTypes>::genesis::<TestVersions>(
1460 &Default::default(),
1461 &Default::default(),
1462 )
1463 .await)
1464 }
1465 .boxed()
1466 })
1467 .unwrap()
1468 .get("get_vid_common", move |_, _| {
1469 async move {
1470 Ok(VidCommonQueryData::<MockTypes>::genesis::<TestVersions>(
1472 &Default::default(),
1473 &Default::default(),
1474 )
1475 .await)
1476 }
1477 .boxed()
1478 })
1479 .unwrap();
1480
1481 let mut app = App::<(), ServerError>::with_state(());
1482 app.register_module("availability", api).unwrap();
1483 app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1484 .await
1485 .ok();
1486 }
1487
1488 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1489 async fn test_fetch_from_malicious_server() {
1490 let port = pick_unused_port().unwrap();
1491 let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1492
1493 let provider = QueryServiceProvider::new(
1494 format!("http://localhost:{port}").parse().unwrap(),
1495 MockBase::instance(),
1496 );
1497 provider.client.connect(None).await;
1498
1499 let res =
1502 ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1503 .await;
1504 assert_eq!(res, None);
1505
1506 let res =
1509 ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1510 .await;
1511 assert_eq!(res, None);
1512 }
1513
1514 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1515 async fn test_archive_recovery() {
1516 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1518
1519 let port = pick_unused_port().unwrap();
1521 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1522 app.register_module(
1523 "availability",
1524 define_api(
1525 &Default::default(),
1526 MockBase::instance(),
1527 "1.0.0".parse().unwrap(),
1528 )
1529 .unwrap(),
1530 )
1531 .unwrap();
1532 network.spawn(
1533 "server",
1534 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1535 );
1536
1537 let db = TmpDb::init().await;
1540 let provider = Provider::new(QueryServiceProvider::new(
1541 format!("http://localhost:{port}").parse().unwrap(),
1542 MockBase::instance(),
1543 ));
1544 let mut data_source = db
1545 .config()
1546 .pruner_cfg(
1547 PrunerCfg::new()
1548 .with_target_retention(Duration::from_secs(0))
1549 .with_interval(Duration::from_secs(5)),
1550 )
1551 .unwrap()
1552 .builder(provider.clone())
1553 .await
1554 .unwrap()
1555 .with_min_retry_interval(Duration::from_millis(100))
1559 .with_retry_randomization_factor(3.)
1563 .build()
1564 .await
1565 .unwrap();
1566
1567 network.start().await;
1569
1570 let leaves = network.data_source().subscribe_leaves(1).await;
1572 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1573
1574 let pruned_height = data_source
1576 .read()
1577 .await
1578 .unwrap()
1579 .load_pruned_height()
1580 .await
1581 .unwrap();
1582 assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1584
1585 let last_leaf = leaves.last().unwrap();
1588 data_source.append(last_leaf.clone().into()).await.unwrap();
1589
1590 for i in 1..=last_leaf.height() {
1592 tracing::info!(i, "fetching leaf");
1593 assert_eq!(
1594 data_source.get_leaf(i as usize).await.await,
1595 leaves[i as usize - 1]
1596 );
1597 }
1598
1599 loop {
1601 let pruned_height = data_source
1602 .read()
1603 .await
1604 .unwrap()
1605 .load_pruned_height()
1606 .await
1607 .unwrap();
1608 if pruned_height == Some(last_leaf.height()) {
1609 break;
1610 }
1611 tracing::info!(
1612 ?pruned_height,
1613 target_height = last_leaf.height(),
1614 "waiting for pruner to run"
1615 );
1616 sleep(Duration::from_secs(1)).await;
1617 }
1618
1619 data_source = db
1621 .config()
1622 .archive()
1623 .builder(provider.clone())
1624 .await
1625 .unwrap()
1626 .with_minor_scan_interval(Duration::from_secs(1))
1627 .with_major_scan_interval(1)
1628 .build()
1629 .await
1630 .unwrap();
1631
1632 let pruned_height = data_source
1634 .read()
1635 .await
1636 .unwrap()
1637 .load_pruned_height()
1638 .await
1639 .unwrap();
1640 assert_eq!(pruned_height, None);
1641
1642 data_source.append(last_leaf.clone().into()).await.unwrap();
1646
1647 loop {
1649 let sync_status = data_source.sync_status().await.unwrap();
1650
1651 if (SyncStatus {
1655 missing_vid_shares: 0,
1656 ..sync_status
1657 })
1658 .is_fully_synced()
1659 {
1660 break;
1661 }
1662 tracing::info!(?sync_status, "waiting for node to sync");
1663 sleep(Duration::from_secs(1)).await;
1664 }
1665
1666 sleep(Duration::from_secs(3)).await;
1668 let sync_status = data_source.sync_status().await.unwrap();
1669 assert!(
1670 (SyncStatus {
1671 missing_vid_shares: 0,
1672 ..sync_status
1673 })
1674 .is_fully_synced(),
1675 "{sync_status:?}"
1676 );
1677 }
1678
1679 #[derive(Clone, Copy, Debug)]
1680 enum FailureType {
1681 Begin,
1682 Write,
1683 Commit,
1684 }
1685
1686 async fn test_fetch_storage_failure_helper(failure: FailureType) {
1687 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1689
1690 let port = pick_unused_port().unwrap();
1692 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1693 app.register_module(
1694 "availability",
1695 define_api(
1696 &Default::default(),
1697 MockBase::instance(),
1698 "1.0.0".parse().unwrap(),
1699 )
1700 .unwrap(),
1701 )
1702 .unwrap();
1703 network.spawn(
1704 "server",
1705 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1706 );
1707
1708 let provider = Provider::new(QueryServiceProvider::new(
1710 format!("http://localhost:{port}").parse().unwrap(),
1711 MockBase::instance(),
1712 ));
1713 let db = TmpDb::init().await;
1714 let storage = FailStorage::from(
1715 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1716 .await
1717 .unwrap(),
1718 );
1719 let data_source = FetchingDataSource::builder(storage, provider)
1720 .disable_proactive_fetching()
1721 .disable_aggregator()
1722 .with_max_retry_interval(Duration::from_millis(100))
1723 .with_retry_timeout(Duration::from_secs(1))
1724 .build()
1725 .await
1726 .unwrap();
1727
1728 network.start().await;
1730
1731 let leaves = network.data_source().subscribe_leaves(1).await;
1733 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1734
1735 let last_leaf = leaves.last().unwrap();
1737 let mut tx = data_source.write().await.unwrap();
1738 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1739 tx.commit().await.unwrap();
1740
1741 tracing::info!("fetch with write failure");
1743 match failure {
1744 FailureType::Begin => {
1745 data_source
1746 .as_ref()
1747 .fail_begins_writable(FailableAction::Any)
1748 .await
1749 },
1750 FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1751 FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1752 }
1753 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1754 data_source.as_ref().pass().await;
1755
1756 sleep(Duration::from_secs(1)).await;
1761
1762 tracing::info!("fetch with write success");
1765 let fetch = data_source.get_leaf(1).await;
1766 assert!(fetch.is_pending());
1767 assert_eq!(leaves[0], fetch.await);
1768
1769 sleep(Duration::from_secs(1)).await;
1770
1771 tracing::info!("retrieve from storage");
1773 let fetch = data_source.get_leaf(1).await;
1774 assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1775 }
1776
1777 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1778 async fn test_fetch_storage_failure_on_begin() {
1779 test_fetch_storage_failure_helper(FailureType::Begin).await;
1780 }
1781
1782 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1783 async fn test_fetch_storage_failure_on_write() {
1784 test_fetch_storage_failure_helper(FailureType::Write).await;
1785 }
1786
1787 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1788 async fn test_fetch_storage_failure_on_commit() {
1789 test_fetch_storage_failure_helper(FailureType::Commit).await;
1790 }
1791
1792 async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1793 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1795
1796 let port = pick_unused_port().unwrap();
1798 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1799 app.register_module(
1800 "availability",
1801 define_api(
1802 &Default::default(),
1803 MockBase::instance(),
1804 "1.0.0".parse().unwrap(),
1805 )
1806 .unwrap(),
1807 )
1808 .unwrap();
1809 network.spawn(
1810 "server",
1811 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1812 );
1813
1814 let provider = Provider::new(QueryServiceProvider::new(
1816 format!("http://localhost:{port}").parse().unwrap(),
1817 MockBase::instance(),
1818 ));
1819 let db = TmpDb::init().await;
1820 let storage = FailStorage::from(
1821 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1822 .await
1823 .unwrap(),
1824 );
1825 let data_source = FetchingDataSource::builder(storage, provider)
1826 .disable_proactive_fetching()
1827 .disable_aggregator()
1828 .with_min_retry_interval(Duration::from_millis(100))
1829 .build()
1830 .await
1831 .unwrap();
1832
1833 network.start().await;
1835
1836 let leaves = network.data_source().subscribe_leaves(1).await;
1838 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1839
1840 let last_leaf = leaves.last().unwrap();
1842 let mut tx = data_source.write().await.unwrap();
1843 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1844 tx.commit().await.unwrap();
1845
1846 tracing::info!("fetch with write failure");
1848 match failure {
1849 FailureType::Begin => {
1850 data_source
1851 .as_ref()
1852 .fail_one_begin_writable(FailableAction::Any)
1853 .await
1854 },
1855 FailureType::Write => {
1856 data_source
1857 .as_ref()
1858 .fail_one_write(FailableAction::Any)
1859 .await
1860 },
1861 FailureType::Commit => {
1862 data_source
1863 .as_ref()
1864 .fail_one_commit(FailableAction::Any)
1865 .await
1866 },
1867 }
1868 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1869
1870 let mut tx = data_source.read().await.unwrap();
1872 assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1873 }
1874
1875 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1876 async fn test_fetch_storage_failure_retry_on_begin() {
1877 test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1878 }
1879
1880 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1881 async fn test_fetch_storage_failure_retry_on_write() {
1882 test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1883 }
1884
1885 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1886 async fn test_fetch_storage_failure_retry_on_commit() {
1887 test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1888 }
1889
1890 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1891 async fn test_fetch_on_decide() {
1892 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1894
1895 let port = pick_unused_port().unwrap();
1897 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1898 app.register_module(
1899 "availability",
1900 define_api(
1901 &Default::default(),
1902 MockBase::instance(),
1903 "1.0.0".parse().unwrap(),
1904 )
1905 .unwrap(),
1906 )
1907 .unwrap();
1908 network.spawn(
1909 "server",
1910 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1911 );
1912
1913 let db = TmpDb::init().await;
1915 let provider = Provider::new(QueryServiceProvider::new(
1916 format!("http://localhost:{port}").parse().unwrap(),
1917 MockBase::instance(),
1918 ));
1919 let data_source = builder(&db, &provider)
1920 .await
1921 .with_max_retry_interval(Duration::from_secs(1))
1922 .build()
1923 .await
1924 .unwrap();
1925
1926 network.start().await;
1928
1929 let leaf = network
1931 .data_source()
1932 .subscribe_leaves(1)
1933 .await
1934 .next()
1935 .await
1936 .unwrap();
1937
1938 data_source.append(leaf.clone().into()).await.unwrap();
1940
1941 sleep(Duration::from_secs(5)).await;
1944
1945 let mut tx = data_source.read().await.unwrap();
1949 let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1950 let block = tx.get_block(id).await.unwrap();
1951 let vid = tx.get_vid_common(id).await.unwrap();
1952
1953 assert_eq!(block.hash(), leaf.block_hash());
1954 assert_eq!(vid.block_hash(), leaf.block_hash());
1955 }
1956
1957 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1958 async fn test_fetch_begin_failure() {
1959 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1961
1962 let port = pick_unused_port().unwrap();
1964 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1965 app.register_module(
1966 "availability",
1967 define_api(
1968 &Default::default(),
1969 MockBase::instance(),
1970 "1.0.0".parse().unwrap(),
1971 )
1972 .unwrap(),
1973 )
1974 .unwrap();
1975 network.spawn(
1976 "server",
1977 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1978 );
1979
1980 let provider = Provider::new(QueryServiceProvider::new(
1982 format!("http://localhost:{port}").parse().unwrap(),
1983 MockBase::instance(),
1984 ));
1985 let db = TmpDb::init().await;
1986 let storage = FailStorage::from(
1987 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1988 .await
1989 .unwrap(),
1990 );
1991 let data_source = FetchingDataSource::builder(storage, provider)
1992 .disable_proactive_fetching()
1993 .disable_aggregator()
1994 .with_min_retry_interval(Duration::from_millis(100))
1995 .build()
1996 .await
1997 .unwrap();
1998
1999 network.start().await;
2001
2002 let leaves = network.data_source().subscribe_leaves(1).await;
2004 let leaves = leaves.take(2).collect::<Vec<_>>().await;
2005
2006 let last_leaf = leaves.last().unwrap();
2008 let mut tx = data_source.write().await.unwrap();
2009 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2010 tx.commit().await.unwrap();
2011
2012 tracing::info!("fetch with transaction failure");
2015 data_source
2016 .as_ref()
2017 .fail_one_begin_read_only(FailableAction::Any)
2018 .await;
2019 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2020 }
2021
2022 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2023 async fn test_fetch_load_failure_block() {
2024 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2026
2027 let port = pick_unused_port().unwrap();
2029 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2030 app.register_module(
2031 "availability",
2032 define_api(
2033 &Default::default(),
2034 MockBase::instance(),
2035 "1.0.0".parse().unwrap(),
2036 )
2037 .unwrap(),
2038 )
2039 .unwrap();
2040 network.spawn(
2041 "server",
2042 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2043 );
2044
2045 let provider = Provider::new(QueryServiceProvider::new(
2047 format!("http://localhost:{port}").parse().unwrap(),
2048 MockBase::instance(),
2049 ));
2050 let db = TmpDb::init().await;
2051 let storage = FailStorage::from(
2052 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2053 .await
2054 .unwrap(),
2055 );
2056 let data_source = FetchingDataSource::builder(storage, provider)
2057 .disable_proactive_fetching()
2058 .disable_aggregator()
2059 .with_min_retry_interval(Duration::from_millis(100))
2060 .build()
2061 .await
2062 .unwrap();
2063
2064 network.start().await;
2066
2067 let mut leaves = network.data_source().subscribe_leaves(1).await;
2069 let leaf = leaves.next().await.unwrap();
2070
2071 let mut tx = data_source.write().await.unwrap();
2074 tx.insert_leaf(leaf.clone()).await.unwrap();
2075 tx.commit().await.unwrap();
2076
2077 tracing::info!("fetch with read failure");
2091 data_source
2092 .as_ref()
2093 .fail_one_read(FailableAction::GetHeader)
2094 .await;
2095 let fetch = data_source.get_block(leaf.block_hash()).await;
2096
2097 sleep(Duration::from_secs(2)).await;
2099 data_source.as_ref().pass().await;
2100
2101 let block: BlockQueryData<MockTypes> = fetch.await;
2102 assert_eq!(block.hash(), leaf.block_hash());
2103 }
2104
2105 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2106 async fn test_fetch_load_failure_tx() {
2107 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2109
2110 let port = pick_unused_port().unwrap();
2112 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2113 app.register_module(
2114 "availability",
2115 define_api(
2116 &Default::default(),
2117 MockBase::instance(),
2118 "1.0.0".parse().unwrap(),
2119 )
2120 .unwrap(),
2121 )
2122 .unwrap();
2123 network.spawn(
2124 "server",
2125 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2126 );
2127
2128 let provider = Provider::new(QueryServiceProvider::new(
2130 format!("http://localhost:{port}").parse().unwrap(),
2131 MockBase::instance(),
2132 ));
2133 let db = TmpDb::init().await;
2134 let storage = FailStorage::from(
2135 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2136 .await
2137 .unwrap(),
2138 );
2139 let data_source = FetchingDataSource::builder(storage, provider)
2140 .disable_proactive_fetching()
2141 .disable_aggregator()
2142 .with_min_retry_interval(Duration::from_millis(100))
2143 .build()
2144 .await
2145 .unwrap();
2146
2147 network.start().await;
2149
2150 let tx = mock_transaction(vec![1, 2, 3]);
2152 network.submit_transaction(tx.clone()).await;
2153 let tx = network
2154 .data_source()
2155 .get_block_containing_transaction(tx.commit())
2156 .await
2157 .await;
2158
2159 {
2161 let leaf = network
2162 .data_source()
2163 .get_leaf(tx.transaction.block_height() as usize)
2164 .await
2165 .await;
2166 let block = network
2167 .data_source()
2168 .get_block(tx.transaction.block_height() as usize)
2169 .await
2170 .await;
2171 let mut tx = data_source.write().await.unwrap();
2172 tx.insert_leaf(leaf.clone()).await.unwrap();
2173 tx.insert_block(block.clone()).await.unwrap();
2174 tx.commit().await.unwrap();
2175 }
2176
2177 tracing::info!("fetch success");
2179 assert_eq!(
2180 tx,
2181 data_source
2182 .get_block_containing_transaction(tx.transaction.hash())
2183 .await
2184 .await
2185 );
2186
2187 tracing::info!("fetch with read failure");
2199 data_source
2200 .as_ref()
2201 .fail_one_read(FailableAction::Any)
2202 .await;
2203 let fetch = data_source
2204 .get_block_containing_transaction(tx.transaction.hash())
2205 .await;
2206
2207 assert_eq!(tx, fetch.await);
2208 }
2209
2210 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2211 async fn test_stream_begin_failure() {
2212 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2214
2215 let port = pick_unused_port().unwrap();
2217 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2218 app.register_module(
2219 "availability",
2220 define_api(
2221 &Default::default(),
2222 MockBase::instance(),
2223 "1.0.0".parse().unwrap(),
2224 )
2225 .unwrap(),
2226 )
2227 .unwrap();
2228 network.spawn(
2229 "server",
2230 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2231 );
2232
2233 let provider = Provider::new(QueryServiceProvider::new(
2235 format!("http://localhost:{port}").parse().unwrap(),
2236 MockBase::instance(),
2237 ));
2238 let db = TmpDb::init().await;
2239 let storage = FailStorage::from(
2240 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2241 .await
2242 .unwrap(),
2243 );
2244 let data_source = FetchingDataSource::builder(storage, provider)
2245 .disable_proactive_fetching()
2246 .disable_aggregator()
2247 .with_min_retry_interval(Duration::from_millis(100))
2248 .with_range_chunk_size(3)
2249 .build()
2250 .await
2251 .unwrap();
2252
2253 network.start().await;
2255
2256 let leaves = network.data_source().subscribe_leaves(1).await;
2258 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2259
2260 let last_leaf = leaves.last().unwrap();
2262 let mut tx = data_source.write().await.unwrap();
2263 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2264 tx.commit().await.unwrap();
2265
2266 tracing::info!("stream with transaction failure");
2269 data_source
2270 .as_ref()
2271 .fail_one_begin_read_only(FailableAction::Any)
2272 .await;
2273 assert_eq!(
2274 leaves,
2275 data_source
2276 .subscribe_leaves(1)
2277 .await
2278 .take(5)
2279 .collect::<Vec<_>>()
2280 .await
2281 );
2282 }
2283
2284 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2285 async fn test_stream_load_failure() {
2286 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2288
2289 let port = pick_unused_port().unwrap();
2291 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2292 app.register_module(
2293 "availability",
2294 define_api(
2295 &Default::default(),
2296 MockBase::instance(),
2297 "1.0.0".parse().unwrap(),
2298 )
2299 .unwrap(),
2300 )
2301 .unwrap();
2302 network.spawn(
2303 "server",
2304 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2305 );
2306
2307 let provider = Provider::new(QueryServiceProvider::new(
2309 format!("http://localhost:{port}").parse().unwrap(),
2310 MockBase::instance(),
2311 ));
2312 let db = TmpDb::init().await;
2313 let storage = FailStorage::from(
2314 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2315 .await
2316 .unwrap(),
2317 );
2318 let data_source = FetchingDataSource::builder(storage, provider)
2319 .disable_proactive_fetching()
2320 .disable_aggregator()
2321 .with_min_retry_interval(Duration::from_millis(100))
2322 .with_range_chunk_size(3)
2323 .build()
2324 .await
2325 .unwrap();
2326
2327 network.start().await;
2329
2330 let leaves = network.data_source().subscribe_leaves(1).await;
2332 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2333
2334 let last_leaf = leaves.last().unwrap();
2336 let mut tx = data_source.write().await.unwrap();
2337 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2338 tx.commit().await.unwrap();
2339
2340 tracing::info!("stream with read failure");
2342 data_source.as_ref().fail_reads(FailableAction::Any).await;
2343 let fetches = data_source
2344 .get_block_range(1..=5)
2345 .await
2346 .collect::<Vec<_>>()
2347 .await;
2348
2349 sleep(Duration::from_secs(2)).await;
2351 data_source.as_ref().pass().await;
2352
2353 for (leaf, fetch) in leaves.iter().zip(fetches) {
2354 let block: BlockQueryData<MockTypes> = fetch.await;
2355 assert_eq!(block.hash(), leaf.block_hash());
2356 }
2357 }
2358
2359 enum MetadataType {
2360 Payload,
2361 Vid,
2362 }
2363
2364 async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2365 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2367
2368 let port = pick_unused_port().unwrap();
2370 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2371 app.register_module(
2372 "availability",
2373 define_api(
2374 &Default::default(),
2375 MockBase::instance(),
2376 "1.0.0".parse().unwrap(),
2377 )
2378 .unwrap(),
2379 )
2380 .unwrap();
2381 network.spawn(
2382 "server",
2383 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2384 );
2385
2386 let provider = Provider::new(QueryServiceProvider::new(
2388 format!("http://localhost:{port}").parse().unwrap(),
2389 MockBase::instance(),
2390 ));
2391 let db = TmpDb::init().await;
2392 let storage = FailStorage::from(
2393 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2394 .await
2395 .unwrap(),
2396 );
2397 let data_source = FetchingDataSource::builder(storage, provider)
2398 .disable_proactive_fetching()
2399 .disable_aggregator()
2400 .with_min_retry_interval(Duration::from_millis(100))
2401 .with_range_chunk_size(3)
2402 .build()
2403 .await
2404 .unwrap();
2405
2406 network.start().await;
2408
2409 let leaves = network.data_source().subscribe_leaves(1).await;
2411 let leaves = leaves.take(3).collect::<Vec<_>>().await;
2412
2413 let last_leaf = leaves.last().unwrap();
2415 let mut tx = data_source.write().await.unwrap();
2416 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2417 tx.commit().await.unwrap();
2418
2419 let leaf = network.data_source().get_leaf(1).await.await;
2424 let block = network.data_source().get_block(1).await.await;
2425 let vid = network.data_source().get_vid_common(1).await.await;
2426 data_source
2427 .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2428 .await
2429 .unwrap();
2430
2431 tracing::info!("stream with transaction failure");
2433 data_source
2434 .as_ref()
2435 .fail_begins_read_only(FailableAction::Any)
2436 .await;
2437 match stream {
2438 MetadataType::Payload => {
2439 let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2440
2441 sleep(Duration::from_secs(2)).await;
2443 tracing::info!("stop failing transactions");
2444 data_source.as_ref().pass().await;
2445
2446 let payloads = payloads.collect::<Vec<_>>().await;
2447 for (leaf, payload) in leaves.iter().zip(payloads) {
2448 assert_eq!(payload.block_hash, leaf.block_hash());
2449 }
2450 },
2451 MetadataType::Vid => {
2452 let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2453
2454 sleep(Duration::from_secs(2)).await;
2456 tracing::info!("stop failing transactions");
2457 data_source.as_ref().pass().await;
2458
2459 let vids = vids.collect::<Vec<_>>().await;
2460 for (leaf, vid) in leaves.iter().zip(vids) {
2461 assert_eq!(vid.block_hash, leaf.block_hash());
2462 }
2463 },
2464 }
2465 }
2466
2467 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2468 async fn test_metadata_stream_begin_failure_payload() {
2469 test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2470 }
2471
2472 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2473 async fn test_metadata_stream_begin_failure_vid() {
2474 test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2475 }
2476
2477 async fn run_fallback_deserialization_test_helper<V: Versions>(port: u16, version: &str) {
2482 let mut network = MockNetwork::<MockDataSource, V>::init().await;
2483
2484 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2485
2486 app.register_module(
2488 "availability",
2489 define_api(
2490 &Default::default(),
2491 StaticVersion::<0, 1> {},
2492 "0.0.1".parse().unwrap(),
2493 )
2494 .unwrap(),
2495 )
2496 .unwrap();
2497
2498 app.register_module(
2499 "availability",
2500 define_api(
2501 &Default::default(),
2502 StaticVersion::<0, 1> {},
2503 "1.0.0".parse().unwrap(),
2504 )
2505 .unwrap(),
2506 )
2507 .unwrap();
2508
2509 network.spawn(
2510 "server",
2511 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2512 );
2513
2514 let db = TmpDb::init().await;
2515
2516 let provider_url = format!("http://localhost:{port}/{version}")
2517 .parse()
2518 .expect("Invalid URL");
2519
2520 let provider = Provider::new(QueryServiceProvider::new(
2521 provider_url,
2522 StaticVersion::<0, 1> {},
2523 ));
2524
2525 let ds = data_source(&db, &provider).await;
2526 network.start().await;
2527
2528 let leaves = network.data_source().subscribe_leaves(1).await;
2529 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2530 let test_leaf = &leaves[0];
2531 let test_payload = &leaves[2];
2532 let test_common = &leaves[3];
2533
2534 let mut fetches = vec![];
2535 fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2537 fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2538 fetches.push(
2539 ds.get_vid_common(test_common.block_hash())
2540 .await
2541 .map(ignore),
2542 );
2543
2544 sleep(Duration::from_secs(1)).await;
2547 for (i, fetch) in fetches.into_iter().enumerate() {
2548 tracing::info!("checking fetch {i} is unresolved");
2549 fetch.try_resolve().unwrap_err();
2550 }
2551
2552 ds.append(leaves.last().cloned().unwrap().into())
2557 .await
2558 .unwrap();
2559
2560 {
2562 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2563 let payload = ds.get_payload(test_payload.height() as usize).await;
2564 let common = ds.get_vid_common(test_common.height() as usize).await;
2565
2566 let truth = network.data_source();
2567 assert_eq!(
2568 leaf.await,
2569 truth.get_leaf(test_leaf.height() as usize).await.await
2570 );
2571 assert_eq!(
2572 payload.await,
2573 truth
2574 .get_payload(test_payload.height() as usize)
2575 .await
2576 .await
2577 );
2578 assert_eq!(
2579 common.await,
2580 truth
2581 .get_vid_common(test_common.height() as usize)
2582 .await
2583 .await
2584 );
2585 }
2586 }
2587
2588 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2589 async fn test_fallback_deserialization_for_fetch_requests_v0() {
2590 let port = pick_unused_port().unwrap();
2591
2592 run_fallback_deserialization_test_helper::<MockVersions>(port, "v0").await;
2598 }
2599
2600 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2601 async fn test_fallback_deserialization_for_fetch_requests_v1() {
2602 let port = pick_unused_port().unwrap();
2603
2604 run_fallback_deserialization_test_helper::<MockVersions>(port, "v1").await;
2608 }
2609
2610 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2611 async fn test_fallback_deserialization_for_fetch_requests_pos() {
2612 let port = pick_unused_port().unwrap();
2613
2614 run_fallback_deserialization_test_helper::<EpochsTestVersions>(port, "v1").await;
2617 }
2618 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2619 async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2620 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
2626
2627 let port = pick_unused_port().unwrap();
2628 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2629
2630 app.register_module(
2631 "availability",
2632 define_api(
2633 &Default::default(),
2634 StaticVersion::<0, 1> {},
2635 "0.0.1".parse().unwrap(),
2636 )
2637 .unwrap(),
2638 )
2639 .unwrap();
2640
2641 network.spawn(
2642 "server",
2643 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2644 );
2645
2646 let db = TmpDb::init().await;
2647 let provider = Provider::new(QueryServiceProvider::new(
2648 format!("http://localhost:{port}/v0").parse().unwrap(),
2649 StaticVersion::<0, 1> {},
2650 ));
2651 let ds = data_source(&db, &provider).await;
2652
2653 network.start().await;
2654
2655 let leaves = network.data_source().subscribe_leaves(1).await;
2656 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2657 let test_leaf = &leaves[0];
2658 let test_payload = &leaves[2];
2659 let test_common = &leaves[3];
2660
2661 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2662 let payload = ds.get_payload(test_payload.height() as usize).await;
2663 let common = ds.get_vid_common(test_common.height() as usize).await;
2664
2665 sleep(Duration::from_secs(3)).await;
2666
2667 leaf.try_resolve().unwrap_err();
2669 payload.try_resolve().unwrap_err();
2670 common.try_resolve().unwrap_err();
2671 }
2672}