1use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16 data::{VidCommitment, VidCommon, ns_table},
17 traits::{EncodeBytes, block_contents::BlockHeader, node_implementation::NodeType},
18 vid::{
19 advz::{ADVZScheme, advz_scheme},
20 avidm::{AvidMScheme, init_avidm_param},
21 avidm_gf2::AvidmGf2Scheme,
22 },
23};
24use jf_advz::VidScheme;
25use surf_disco::{Client, Url};
26use vbs::{BinarySerializer, version::StaticVersionType};
27
28use super::Provider;
29use crate::{
30 Error, Header, Payload,
31 availability::{
32 ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
33 PayloadQueryData, VidCommonQueryData,
34 },
35 fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
36 types::HeightIndexed,
37};
38
39#[derive(Clone, Debug)]
44pub struct QueryServiceProvider<Ver: StaticVersionType> {
45 client: Client<Error, Ver>,
46}
47
48impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
49 pub fn new(url: Url, _: Ver) -> Self {
50 Self {
51 client: Client::new(url),
52 }
53 }
54}
55
56impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
57 async fn deserialize_legacy_payload<Types: NodeType>(
58 &self,
59 payload_bytes: Vec<u8>,
60 common_bytes: Vec<u8>,
61 req: PayloadRequest,
62 ) -> Option<Payload<Types>> {
63 let client_url = self.client.base_url();
64
65 let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
66 return None;
67 };
68
69 let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
70 &payload_bytes,
71 ) {
72 Ok(payload) => payload,
73 Err(err) => {
74 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
75 return None;
76 },
77 };
78
79 let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
80 &common_bytes,
81 ) {
82 Ok(common) => common,
83 Err(err) => {
84 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
85 return None;
86 },
87 };
88
89 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
90 let bytes = payload.data.encode();
91
92 let commit = advz_scheme(num_storage_nodes)
93 .commit_only(bytes)
94 .inspect_err(|err| {
95 tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
96 })
97 .ok()?;
98
99 if commit != advz_commit {
100 tracing::error!(
101 ?req,
102 expected_commit=%advz_commit,
103 actual_commit=%commit,
104 %client_url,
105 "received inconsistent legacy payload"
106 );
107 return None;
108 }
109
110 Some(payload.data)
111 }
112
113 async fn deserialize_legacy_vid_common<Types: NodeType>(
114 &self,
115 bytes: Vec<u8>,
116 req: VidCommonRequest,
117 ) -> Option<VidCommon> {
118 let client_url = self.client.base_url();
119 let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
120 return None;
121 };
122
123 match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
124 Ok(res) => {
125 if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
126 Some(VidCommon::V0(res.common))
127 } else {
128 tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
129 None
130 }
131 },
132 Err(err) => {
133 tracing::warn!(
134 %client_url,
135 ?req,
136 %err,
137 "failed to deserialize ADVZCommonQueryData"
138 );
139 None
140 },
141 }
142 }
143 async fn deserialize_legacy_leaf<Types: NodeType>(
144 &self,
145 bytes: Vec<u8>,
146 req: LeafRequest<Types>,
147 ) -> Option<LeafQueryData<Types>> {
148 let client_url = self.client.base_url();
149
150 match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
151 Ok(mut leaf) => {
152 if leaf.height() != req.height {
153 tracing::error!(
154 %client_url, ?req,
155 expected_height = req.height,
156 actual_height = leaf.height(),
157 "received leaf with the wrong height"
158 );
159 return None;
160 }
161
162 let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
163 let actual_leaf_commit: [u8; 32] = leaf.hash().into();
164 if actual_leaf_commit != expected_leaf_commit {
165 tracing::error!(
166 %client_url, ?req,
167 expected_leaf = %req.expected_leaf,
168 actual_leaf = %leaf.hash(),
169 "received leaf with the wrong hash"
170 );
171 return None;
172 }
173
174 let expected_qc_commit: [u8; 32] = req.expected_qc.into();
175 let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
176 if actual_qc_commit != expected_qc_commit {
177 tracing::error!(
178 %client_url, ?req,
179 expected_qc = %req.expected_qc,
180 actual_qc = %leaf.qc().commit(),
181 "received leaf with the wrong QC"
182 );
183 return None;
184 }
185
186 leaf.leaf.unfill_block_payload();
191
192 Some(leaf.into())
193 },
194 Err(err) => {
195 tracing::warn!(
196 %client_url, ?req, %err,
197 "failed to deserialize legacy LeafQueryData"
198 );
199 None
200 },
201 }
202 }
203}
204
205#[async_trait]
206impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
207where
208 Types: NodeType,
209{
210 async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
218 let client_url = self.client.base_url();
219 let req_hash = req.0;
220 let payload_bytes = self
224 .client
225 .get::<()>(&format!("availability/payload/hash/{}", req.0))
226 .bytes()
227 .await
228 .inspect_err(|err| {
229 tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
230 })
231 .ok()?;
232
233 let common_bytes = self
234 .client
235 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
236 .bytes()
237 .await
238 .inspect_err(|err| {
239 tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
240 })
241 .ok()?;
242
243 let payload =
244 vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
245 .inspect_err(|err| {
246 tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
247 })
248 .ok();
249
250 let common =
251 vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
252 .inspect_err(|err| {
253 tracing::info!(%err, %req_hash,
254 "error deserializing VidCommonQueryData",
255 );
256 })
257 .ok();
258
259 let (payload, common) = match (payload, common) {
260 (Some(payload), Some(common)) => (payload, common),
261 _ => {
262 tracing::info!(%req_hash, "falling back to legacy payload deserialization");
263
264 return self
266 .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
267 .await;
268 },
269 };
270
271 match common.common() {
272 VidCommon::V0(common) => {
273 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
274 let bytes = payload.data().encode();
275
276 let commit = advz_scheme(num_storage_nodes)
277 .commit_only(bytes)
278 .map(VidCommitment::V0)
279 .inspect_err(|err| {
280 tracing::error!(%err, %req_hash, %num_storage_nodes, "failed to compute VID commitment (V0)");
281 })
282 .ok()?;
283
284 if commit != req.0 {
285 tracing::error!(
286 expected = %req_hash,
287 actual = ?commit,
288 %client_url,
289 "VID commitment mismatch (V0)"
290 );
291
292 return None;
293 }
294 },
295 VidCommon::V1(common) => {
296 let bytes = payload.data().encode();
297
298 let avidm_param = init_avidm_param(common.total_weights)
299 .inspect_err(|err| {
300 tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
301 })
302 .ok()?;
303
304 let header = self
305 .client
306 .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
307 .send()
308 .await
309 .inspect_err(|err| {
310 tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
311 })
312 .ok()?;
313
314 if header.payload_commitment() != req.0 {
315 tracing::error!(
316 expected = %req_hash,
317 actual = %header.payload_commitment(),
318 %client_url,
319 "header payload commitment mismatch (V1, AvidM)"
320 );
321 return None;
322 }
323
324 let metadata = header.metadata().encode();
325 let commit = AvidMScheme::commit(
326 &avidm_param,
327 &bytes,
328 ns_table::parse_ns_table(bytes.len(), &metadata),
329 )
330 .map(VidCommitment::V1)
331 .inspect_err(|err| {
332 tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
333 })
334 .ok()?;
335
336 if commit != req.0 {
338 tracing::warn!(
339 expected = %req_hash,
340 actual = %commit,
341 %client_url,
342 "commitment type mismatch for AVIDM check"
343 );
344 return None;
345 }
346 },
347 VidCommon::V2(common) => {
348 let bytes = payload.data().encode();
349
350 let header = self
351 .client
352 .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
353 .send()
354 .await
355 .inspect_err(|err| {
356 tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
357 })
358 .ok()?;
359
360 if header.payload_commitment() != req.0 {
361 tracing::error!(
362 expected = %req_hash,
363 actual = %header.payload_commitment(),
364 %client_url,
365 "header payload commitment mismatch (V2, AvidmGf2)"
366 );
367 return None;
368 }
369
370 let metadata = header.metadata().encode();
371 let commit = AvidmGf2Scheme::commit(
372 &common.param,
373 &bytes,
374 ns_table::parse_ns_table(bytes.len(), &metadata),
375 )
376 .map(|(commit, _)| VidCommitment::V2(commit))
377 .inspect_err(|err| {
378 tracing::error!(%err, %req_hash, "failed to compute AvidmGf2 commitment");
379 })
380 .ok()?;
381
382 if commit != req.0 {
384 tracing::warn!(
385 expected = %req_hash,
386 actual = %commit,
387 %client_url,
388 "commitment type mismatch for AvidmGf2 check"
389 );
390 return None;
391 }
392 },
393 }
394
395 Some(payload.data)
396 }
397}
398
399#[async_trait]
400impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
401 for QueryServiceProvider<Ver>
402where
403 Types: NodeType,
404{
405 async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
413 let client_url = self.client.base_url();
414
415 let bytes = self
416 .client
417 .get::<()>(&format!("availability/leaf/{}", req.height))
418 .bytes()
419 .await;
420 let bytes = match bytes {
421 Ok(bytes) => bytes,
422 Err(err) => {
423 tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
424
425 return None;
426 },
427 };
428
429 match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
432 Ok(mut leaf) => {
433 if leaf.height() != req.height {
434 tracing::error!(
435 %client_url, ?req, ?leaf,
436 expected_height = req.height,
437 actual_height = leaf.height(),
438 "received leaf with the wrong height"
439 );
440 return None;
441 }
442 if leaf.hash() != req.expected_leaf {
443 tracing::error!(
444 %client_url, ?req, ?leaf,
445 expected_hash = %req.expected_leaf,
446 actual_hash = %leaf.hash(),
447 "received leaf with the wrong hash"
448 );
449 return None;
450 }
451 if leaf.qc().commit() != req.expected_qc {
452 tracing::error!(
453 %client_url, ?req, ?leaf,
454 expected_qc = %req.expected_qc,
455 actual_qc = %leaf.qc().commit(),
456 "received leaf with the wrong QC"
457 );
458 return None;
459 }
460
461 leaf.leaf.unfill_block_payload();
466
467 Some(leaf)
468 },
469 Err(err) => {
470 tracing::info!(
471 ?req, %err,
472 "failed to deserialize LeafQueryData, falling back to legacy deserialization"
473 );
474 self.deserialize_legacy_leaf(bytes, req).await
476 },
477 }
478 }
479}
480
481#[async_trait]
482impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
483where
484 Types: NodeType,
485{
486 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
494 let client_url = self.client.base_url();
495 let bytes = self
496 .client
497 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
498 .bytes()
499 .await;
500 let bytes = match bytes {
501 Ok(bytes) => bytes,
502 Err(err) => {
503 tracing::info!(
504 %client_url, ?req, %err,
505 "failed to fetch VID common bytes"
506 );
507 return None;
508 },
509 };
510
511 match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
512 Ok(res) => {
513 if !res.common().is_consistent(&req.0) {
514 tracing::error!(
515 %client_url, ?req, ?res.common,
516 "fetched inconsistent VID common data"
517 );
518 return None;
519 }
520 Some(res.common)
521 },
522 Err(err) => {
523 tracing::info!(
524 %client_url, ?req, %err,
525 "failed to deserialize as V1 VID common data, trying legacy fallback"
526 );
527 self.deserialize_legacy_vid_common::<Types>(bytes, req)
529 .await
530 },
531 }
532 }
533}
534
535#[cfg(all(test, not(target_os = "windows")))]
537mod test {
538 use std::{future::IntoFuture, time::Duration};
539
540 use committable::Committable;
541 use futures::{
542 future::{FutureExt, join},
543 stream::StreamExt,
544 };
545 #[allow(deprecated)]
548 use generic_array::GenericArray;
549 use hotshot_example_types::node_types::{EpochVersion, TEST_VERSIONS};
550 use rand::RngCore;
551 use test_utils::reserve_tcp_port;
552 use tide_disco::{App, error::ServerError};
553 use vbs::version::StaticVersion;
554
555 use super::*;
556 use crate::{
557 ApiState,
558 api::load_api,
559 availability::{
560 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
561 Fetch, UpdateAvailabilityData, define_api,
562 },
563 data_source::{
564 AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
565 sql::{self, SqlDataSource},
566 storage::{
567 AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
568 fail_storage::{FailStorage, FailableAction},
569 pruning::{PrunedHeightStorage, PrunerCfg},
570 sql::testing::TmpDb,
571 },
572 },
573 fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
574 node::{SyncStatusQueryData, data_source::NodeDataSource},
575 task::BackgroundTask,
576 testing::{
577 consensus::{MockDataSource, MockNetwork},
578 mocks::{MockBase, MockTypes, mock_transaction},
579 sleep,
580 },
581 types::HeightIndexed,
582 };
583
584 type Provider = TestProvider<QueryServiceProvider<MockBase>>;
585 type EpochProvider = TestProvider<QueryServiceProvider<EpochVersion>>;
586
587 fn ignore<T>(_: T) {}
588
589 async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
591 db: &TmpDb,
592 provider: &P,
593 ) -> sql::Builder<MockTypes, P> {
594 db.config()
595 .builder((*provider).clone())
596 .await
597 .unwrap()
598 .disable_proactive_fetching()
601 }
602
603 async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
605 db: &TmpDb,
606 provider: &P,
607 ) -> SqlDataSource<MockTypes, P> {
608 builder(db, provider).await.build().await.unwrap()
609 }
610
611 #[test_log::test(tokio::test(flavor = "multi_thread"))]
612 async fn test_fetch_on_request() {
613 let mut network = MockNetwork::<MockDataSource>::init().await;
615
616 let port = reserve_tcp_port().unwrap();
618 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
619 app.register_module(
620 "availability",
621 define_api(
622 &Default::default(),
623 MockBase::instance(),
624 "1.0.0".parse().unwrap(),
625 )
626 .unwrap(),
627 )
628 .unwrap();
629 network.spawn(
630 "server",
631 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
632 );
633
634 let db = TmpDb::init().await;
636 let provider = Provider::new(QueryServiceProvider::new(
637 format!("http://localhost:{port}").parse().unwrap(),
638 MockBase::instance(),
639 ));
640 let data_source = data_source(&db, &provider).await;
641
642 network.start().await;
644
645 let leaves = network.data_source().subscribe_leaves(1).await;
652 let leaves = leaves.take(5).collect::<Vec<_>>().await;
653 let test_leaf = &leaves[0];
654 let test_block = &leaves[1];
655 let test_payload = &leaves[2];
656 let test_common = &leaves[3];
657
658 tracing::info!("requesting unfetchable resources");
660 let mut fetches = vec![];
661 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
663 fetches.push(
665 data_source
666 .get_leaf(test_leaf.height() as usize)
667 .await
668 .map(ignore),
669 );
670 fetches.push(
672 data_source
673 .get_block(test_block.block_hash())
674 .await
675 .map(ignore),
676 );
677 fetches.push(
678 data_source
679 .get_payload(test_payload.block_hash())
680 .await
681 .map(ignore),
682 );
683 fetches.push(
684 data_source
685 .get_vid_common(test_common.block_hash())
686 .await
687 .map(ignore),
688 );
689 fetches.push(
691 data_source
692 .get_block(test_block.height() as usize)
693 .await
694 .map(ignore),
695 );
696 fetches.push(
697 data_source
698 .get_payload(test_payload.height() as usize)
699 .await
700 .map(ignore),
701 );
702 fetches.push(
703 data_source
704 .get_vid_common(test_common.height() as usize)
705 .await
706 .map(ignore),
707 );
708 fetches.push(data_source.get_vid_common(0).await.map(ignore));
710 fetches.push(
712 data_source
713 .get_block_containing_transaction(mock_transaction(vec![]).commit())
714 .await
715 .map(ignore),
716 );
717
718 sleep(Duration::from_secs(1)).await;
721 for (i, fetch) in fetches.into_iter().enumerate() {
722 tracing::info!("checking fetch {i} is unresolved");
723 fetch.try_resolve().unwrap_err();
724 }
725
726 provider.block().await;
731 data_source
732 .append(leaves.last().cloned().unwrap().into())
733 .await
734 .unwrap();
735
736 tracing::info!("requesting fetchable resources");
737 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
738 let req_block = data_source.get_block(test_block.height() as usize).await;
739 let req_payload = data_source
740 .get_payload(test_payload.height() as usize)
741 .await;
742 let req_common = data_source
743 .get_vid_common(test_common.height() as usize)
744 .await;
745
746 sleep(Duration::from_secs(1)).await;
752 req_leaf.try_resolve().unwrap_err();
753 req_block.try_resolve().unwrap_err();
754 req_payload.try_resolve().unwrap_err();
755 req_common.try_resolve().unwrap_err();
756
757 provider.unblock().await;
759 let leaf = data_source
760 .get_leaf(test_leaf.height() as usize)
761 .await
762 .await;
763 let block = data_source
764 .get_block(test_block.height() as usize)
765 .await
766 .await;
767 let payload = data_source
768 .get_payload(test_payload.height() as usize)
769 .await
770 .await;
771 let common = data_source
772 .get_vid_common(test_common.height() as usize)
773 .await
774 .await;
775 {
776 let truth = network.data_source();
778 assert_eq!(
779 leaf,
780 truth.get_leaf(test_leaf.height() as usize).await.await
781 );
782 assert_eq!(
783 block,
784 truth.get_block(test_block.height() as usize).await.await
785 );
786 assert_eq!(
787 payload,
788 truth
789 .get_payload(test_payload.height() as usize)
790 .await
791 .await
792 );
793 assert_eq!(
794 common,
795 truth
796 .get_vid_common(test_common.height() as usize)
797 .await
798 .await
799 );
800 }
801
802 provider.block().await;
807 for leaf in [test_block, test_payload] {
808 tracing::info!("fetching existing leaf {}", leaf.height());
809 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
810 assert_eq!(*leaf, fetched_leaf);
811 }
812
813 tracing::info!("fetching block by hash");
818 provider.unblock().await;
819 {
820 let block = data_source.get_block(test_leaf.block_hash()).await.await;
821 assert_eq!(block.hash(), leaf.block_hash());
822 }
823
824 tracing::info!("fetching payload by hash");
828 {
829 let leaf = leaves.last().unwrap();
830 let payload = data_source.get_payload(leaf.block_hash()).await.await;
831 assert_eq!(payload.height(), leaf.height());
832 assert_eq!(payload.block_hash(), leaf.block_hash());
833 assert_eq!(payload.hash(), leaf.payload_hash());
834 }
835 }
836
837 #[tokio::test(flavor = "multi_thread")]
838 async fn test_fetch_on_request_epoch_version() {
839 tracing::info!("Starting test_fetch_on_request_epoch_version");
842
843 let mut network = MockNetwork::<MockDataSource>::init().await;
845
846 let port = reserve_tcp_port().unwrap();
848 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
849 app.register_module(
850 "availability",
851 define_api(
852 &Default::default(),
853 EpochVersion::instance(),
854 "1.0.0".parse().unwrap(),
855 )
856 .unwrap(),
857 )
858 .unwrap();
859 network.spawn(
860 "server",
861 app.serve(format!("0.0.0.0:{port}"), EpochVersion::instance()),
862 );
863
864 let db = TmpDb::init().await;
867 let provider = EpochProvider::new(QueryServiceProvider::new(
868 format!("http://localhost:{port}").parse().unwrap(),
869 EpochVersion::instance(),
870 ));
871 let data_source = data_source(&db, &provider).await;
872
873 network.start().await;
875
876 let leaves = network.data_source().subscribe_leaves(1).await;
883 let leaves = leaves.take(5).collect::<Vec<_>>().await;
884 let test_leaf = &leaves[0];
885 let test_block = &leaves[1];
886 let test_payload = &leaves[2];
887 let test_common = &leaves[3];
888
889 let mut fetches = vec![];
891 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
893 fetches.push(
895 data_source
896 .get_leaf(test_leaf.height() as usize)
897 .await
898 .map(ignore),
899 );
900 fetches.push(
902 data_source
903 .get_block(test_block.block_hash())
904 .await
905 .map(ignore),
906 );
907 fetches.push(
908 data_source
909 .get_payload(test_payload.block_hash())
910 .await
911 .map(ignore),
912 );
913 fetches.push(
914 data_source
915 .get_vid_common(test_common.block_hash())
916 .await
917 .map(ignore),
918 );
919 fetches.push(
921 data_source
922 .get_block(test_block.height() as usize)
923 .await
924 .map(ignore),
925 );
926 fetches.push(
927 data_source
928 .get_payload(test_payload.height() as usize)
929 .await
930 .map(ignore),
931 );
932 fetches.push(
933 data_source
934 .get_vid_common(test_common.height() as usize)
935 .await
936 .map(ignore),
937 );
938 fetches.push(data_source.get_vid_common(0).await.map(ignore));
940 fetches.push(
942 data_source
943 .get_block_containing_transaction(mock_transaction(vec![]).commit())
944 .await
945 .map(ignore),
946 );
947
948 sleep(Duration::from_secs(1)).await;
951 for (i, fetch) in fetches.into_iter().enumerate() {
952 tracing::info!("checking fetch {i} is unresolved");
953 fetch.try_resolve().unwrap_err();
954 }
955
956 provider.block().await;
961 data_source
962 .append(leaves.last().cloned().unwrap().into())
963 .await
964 .unwrap();
965
966 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
967 let req_block = data_source.get_block(test_block.height() as usize).await;
968 let req_payload = data_source
969 .get_payload(test_payload.height() as usize)
970 .await;
971 let req_common = data_source
972 .get_vid_common(test_common.height() as usize)
973 .await;
974
975 sleep(Duration::from_secs(1)).await;
981 req_leaf.try_resolve().unwrap_err();
982 req_block.try_resolve().unwrap_err();
983 req_payload.try_resolve().unwrap_err();
984 req_common.try_resolve().unwrap_err();
985
986 provider.unblock().await;
988 let leaf = data_source
989 .get_leaf(test_leaf.height() as usize)
990 .await
991 .await;
992 let block = data_source
993 .get_block(test_block.height() as usize)
994 .await
995 .await;
996 let payload = data_source
997 .get_payload(test_payload.height() as usize)
998 .await
999 .await;
1000 let common = data_source
1001 .get_vid_common(test_common.height() as usize)
1002 .await
1003 .await;
1004 {
1005 let truth = network.data_source();
1007 assert_eq!(
1008 leaf,
1009 truth.get_leaf(test_leaf.height() as usize).await.await
1010 );
1011 assert_eq!(
1012 block,
1013 truth.get_block(test_block.height() as usize).await.await
1014 );
1015 assert_eq!(
1016 payload,
1017 truth
1018 .get_payload(test_payload.height() as usize)
1019 .await
1020 .await
1021 );
1022 assert_eq!(
1023 common,
1024 truth
1025 .get_vid_common(test_common.height() as usize)
1026 .await
1027 .await
1028 );
1029 }
1030
1031 provider.block().await;
1036 for leaf in [test_block, test_payload] {
1037 tracing::info!("fetching existing leaf {}", leaf.height());
1038 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1039 assert_eq!(*leaf, fetched_leaf);
1040 }
1041
1042 provider.unblock().await;
1047 {
1048 let block = data_source.get_block(test_leaf.block_hash()).await.await;
1049 assert_eq!(block.hash(), leaf.block_hash());
1050 }
1051
1052 {
1056 let leaf = leaves.last().unwrap();
1057 let payload = data_source.get_payload(leaf.block_hash()).await.await;
1058 assert_eq!(payload.height(), leaf.height());
1059 assert_eq!(payload.block_hash(), leaf.block_hash());
1060 assert_eq!(payload.hash(), leaf.payload_hash());
1061 }
1062
1063 tracing::info!("Test completed successfully!");
1065 }
1066
1067 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1068 async fn test_fetch_block_and_leaf_concurrently() {
1069 let mut network = MockNetwork::<MockDataSource>::init().await;
1071
1072 let port = reserve_tcp_port().unwrap();
1074 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1075 app.register_module(
1076 "availability",
1077 define_api(
1078 &Default::default(),
1079 MockBase::instance(),
1080 "1.0.0".parse().unwrap(),
1081 )
1082 .unwrap(),
1083 )
1084 .unwrap();
1085 network.spawn(
1086 "server",
1087 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1088 );
1089
1090 let db = TmpDb::init().await;
1092 let provider = Provider::new(QueryServiceProvider::new(
1093 format!("http://localhost:{port}").parse().unwrap(),
1094 MockBase::instance(),
1095 ));
1096 let data_source = data_source(&db, &provider).await;
1097
1098 network.start().await;
1100
1101 let leaves = network.data_source().subscribe_leaves(1).await;
1104 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1105 let test_leaf = &leaves[0];
1106
1107 data_source.append(leaves[1].clone().into()).await.unwrap();
1109
1110 let (leaf, block) = join(
1114 data_source
1115 .get_leaf(test_leaf.height() as usize)
1116 .await
1117 .into_future(),
1118 data_source
1119 .get_block(test_leaf.height() as usize)
1120 .await
1121 .into_future(),
1122 )
1123 .await;
1124 assert_eq!(leaf, *test_leaf);
1125 assert_eq!(leaf.header(), block.header());
1126 }
1127
1128 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1129 async fn test_fetch_different_blocks_same_payload() {
1130 let mut network = MockNetwork::<MockDataSource>::init().await;
1132
1133 let port = reserve_tcp_port().unwrap();
1135 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1136 app.register_module(
1137 "availability",
1138 define_api(
1139 &Default::default(),
1140 MockBase::instance(),
1141 "1.0.0".parse().unwrap(),
1142 )
1143 .unwrap(),
1144 )
1145 .unwrap();
1146 network.spawn(
1147 "server",
1148 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1149 );
1150
1151 let db = TmpDb::init().await;
1153 let provider = Provider::new(QueryServiceProvider::new(
1154 format!("http://localhost:{port}").parse().unwrap(),
1155 MockBase::instance(),
1156 ));
1157 let data_source = data_source(&db, &provider).await;
1158
1159 network.start().await;
1161
1162 let leaves = network.data_source().subscribe_leaves(1).await;
1165 let leaves = leaves.take(4).collect::<Vec<_>>().await;
1166
1167 data_source
1170 .append(leaves.last().cloned().unwrap().into())
1171 .await
1172 .unwrap();
1173
1174 assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1176 let (block1, block2) = join(
1179 data_source
1180 .get_block(leaves[0].height() as usize)
1181 .await
1182 .into_future(),
1183 data_source
1184 .get_block(leaves[1].height() as usize)
1185 .await
1186 .into_future(),
1187 )
1188 .await;
1189 assert_eq!(block1.header(), leaves[0].header());
1190 assert_eq!(block2.header(), leaves[1].header());
1191 }
1192
1193 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1194 async fn test_fetch_stream() {
1195 let mut network = MockNetwork::<MockDataSource>::init().await;
1197
1198 let port = reserve_tcp_port().unwrap();
1200 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1201 app.register_module(
1202 "availability",
1203 define_api(
1204 &Default::default(),
1205 MockBase::instance(),
1206 "1.0.0".parse().unwrap(),
1207 )
1208 .unwrap(),
1209 )
1210 .unwrap();
1211 network.spawn(
1212 "server",
1213 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1214 );
1215
1216 let db = TmpDb::init().await;
1218 let provider = Provider::new(QueryServiceProvider::new(
1219 format!("http://localhost:{port}").parse().unwrap(),
1220 MockBase::instance(),
1221 ));
1222 let data_source = data_source(&db, &provider).await;
1223
1224 network.start().await;
1226
1227 let blocks = data_source.subscribe_blocks(0).await;
1229 let leaves = data_source.subscribe_leaves(0).await;
1230 let common = data_source.subscribe_vid_common(0).await;
1231
1232 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1234 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1235
1236 data_source
1239 .append(finalized_leaves.last().cloned().unwrap().into())
1240 .await
1241 .unwrap();
1242
1243 let blocks = blocks.take(5).collect::<Vec<_>>().await;
1245 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1246 let common = common.take(5).collect::<Vec<_>>().await;
1247 for i in 0..5 {
1248 tracing::info!("checking block {i}");
1249 assert_eq!(leaves[i], finalized_leaves[i]);
1250 assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1251 assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1252 }
1253 }
1254
1255 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1256 async fn test_fetch_range_start() {
1257 let mut network = MockNetwork::<MockDataSource>::init().await;
1259
1260 let port = reserve_tcp_port().unwrap();
1262 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1263 app.register_module(
1264 "availability",
1265 define_api(
1266 &Default::default(),
1267 MockBase::instance(),
1268 "1.0.0".parse().unwrap(),
1269 )
1270 .unwrap(),
1271 )
1272 .unwrap();
1273 network.spawn(
1274 "server",
1275 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1276 );
1277
1278 let db = TmpDb::init().await;
1280 let provider = Provider::new(QueryServiceProvider::new(
1281 format!("http://localhost:{port}").parse().unwrap(),
1282 MockBase::instance(),
1283 ));
1284 let data_source = data_source(&db, &provider).await;
1285
1286 network.start().await;
1288
1289 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1291 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1292
1293 let mut tx = data_source.write().await.unwrap();
1297 tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1298 tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1299 tx.commit().await.unwrap();
1300
1301 let leaves = data_source
1303 .get_leaf_range(..5)
1304 .await
1305 .then(Fetch::resolve)
1306 .collect::<Vec<_>>()
1307 .await;
1308 for i in 0..5 {
1309 tracing::info!("checking leaf {i}");
1310 assert_eq!(leaves[i], finalized_leaves[i]);
1311 }
1312 }
1313
1314 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1315 async fn fetch_transaction() {
1316 let mut network = MockNetwork::<MockDataSource>::init().await;
1318
1319 let port = reserve_tcp_port().unwrap();
1321 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1322 app.register_module(
1323 "availability",
1324 define_api(
1325 &Default::default(),
1326 MockBase::instance(),
1327 "1.0.0".parse().unwrap(),
1328 )
1329 .unwrap(),
1330 )
1331 .unwrap();
1332 network.spawn(
1333 "server",
1334 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1335 );
1336
1337 let db = TmpDb::init().await;
1340 let data_source = data_source(&db, &NoFetching).await;
1341
1342 let mut leaves = network.data_source().subscribe_leaves(1).await;
1344 let mut blocks = network.data_source().subscribe_blocks(1).await;
1345
1346 network.start().await;
1348
1349 let tx = mock_transaction(vec![1, 2, 3]);
1353 let fut = data_source
1354 .get_block_containing_transaction(tx.commit())
1355 .await;
1356
1357 network.submit_transaction(tx.clone()).await;
1359
1360 let block = loop {
1363 let leaf = leaves.next().await.unwrap();
1364 let block = blocks.next().await.unwrap();
1365
1366 data_source
1367 .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1368 .await
1369 .unwrap();
1370
1371 if block.transaction_by_hash(tx.commit()).is_some() {
1372 break block;
1373 }
1374 };
1375 tracing::info!("transaction included in block {}", block.height());
1376
1377 let fetched_tx = fut.await;
1378 assert_eq!(
1379 fetched_tx,
1380 BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1381 );
1382
1383 assert_eq!(
1385 fetched_tx,
1386 data_source
1387 .get_block_containing_transaction(tx.commit())
1388 .await
1389 .await
1390 );
1391 }
1392
1393 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1394 async fn test_retry() {
1395 let mut network = MockNetwork::<MockDataSource>::init().await;
1397
1398 let port = reserve_tcp_port().unwrap();
1400 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1401 app.register_module(
1402 "availability",
1403 define_api(
1404 &Default::default(),
1405 MockBase::instance(),
1406 "1.0.0".parse().unwrap(),
1407 )
1408 .unwrap(),
1409 )
1410 .unwrap();
1411 network.spawn(
1412 "server",
1413 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1414 );
1415
1416 let db = TmpDb::init().await;
1418 let provider = Provider::new(QueryServiceProvider::new(
1419 format!("http://localhost:{port}").parse().unwrap(),
1420 MockBase::instance(),
1421 ));
1422 let data_source = builder(&db, &provider)
1423 .await
1424 .with_max_retry_interval(Duration::from_secs(1))
1425 .build()
1426 .await
1427 .unwrap();
1428
1429 network.start().await;
1431
1432 let leaves = network.data_source().subscribe_leaves(1).await;
1435 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1436 let test_leaf = &leaves[0];
1437
1438 provider.fail();
1440
1441 data_source
1444 .append(leaves.last().cloned().unwrap().into())
1445 .await
1446 .unwrap();
1447
1448 tracing::info!("requesting leaf from failing providers");
1449 let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1450
1451 sleep(Duration::from_secs(5)).await;
1454 fut.try_resolve().unwrap_err();
1455
1456 provider.unfail();
1458 assert_eq!(
1459 data_source
1460 .get_leaf(test_leaf.height() as usize)
1461 .await
1462 .await,
1463 *test_leaf
1464 );
1465 }
1466
1467 #[allow(deprecated)]
1469 fn random_vid_commit() -> VidCommitment {
1470 let mut bytes = [0; 32];
1471 rand::thread_rng().fill_bytes(&mut bytes);
1472 VidCommitment::V0(GenericArray::from(bytes).into())
1473 }
1474
1475 async fn malicious_server(port: u16) {
1476 let mut api = load_api::<(), ServerError, MockBase>(
1477 None::<std::path::PathBuf>,
1478 include_str!("../../../api/availability.toml"),
1479 vec![],
1480 )
1481 .unwrap();
1482
1483 api.get("get_payload", move |_, _| {
1484 async move {
1485 Ok(PayloadQueryData::<MockTypes>::genesis(
1487 &Default::default(),
1488 &Default::default(),
1489 TEST_VERSIONS.test.base,
1490 )
1491 .await)
1492 }
1493 .boxed()
1494 })
1495 .unwrap()
1496 .get("get_vid_common", move |_, _| {
1497 async move {
1498 Ok(VidCommonQueryData::<MockTypes>::genesis(
1500 &Default::default(),
1501 &Default::default(),
1502 TEST_VERSIONS.test.base,
1503 )
1504 .await)
1505 }
1506 .boxed()
1507 })
1508 .unwrap();
1509
1510 let mut app = App::<(), ServerError>::with_state(());
1511 app.register_module("availability", api).unwrap();
1512 app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1513 .await
1514 .ok();
1515 }
1516
1517 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1518 async fn test_fetch_from_malicious_server() {
1519 let port = reserve_tcp_port().unwrap();
1520 let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1521
1522 let provider = QueryServiceProvider::new(
1523 format!("http://localhost:{port}").parse().unwrap(),
1524 MockBase::instance(),
1525 );
1526 provider.client.connect(None).await;
1527
1528 let res =
1531 ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1532 .await;
1533 assert_eq!(res, None);
1534
1535 let res =
1538 ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1539 .await;
1540 assert_eq!(res, None);
1541 }
1542
1543 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1544 async fn test_archive_recovery() {
1545 let mut network = MockNetwork::<MockDataSource>::init().await;
1547
1548 let port = reserve_tcp_port().unwrap();
1550 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1551 app.register_module(
1552 "availability",
1553 define_api(
1554 &Default::default(),
1555 MockBase::instance(),
1556 "1.0.0".parse().unwrap(),
1557 )
1558 .unwrap(),
1559 )
1560 .unwrap();
1561 network.spawn(
1562 "server",
1563 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1564 );
1565
1566 let db = TmpDb::init().await;
1569 let provider = Provider::new(QueryServiceProvider::new(
1570 format!("http://localhost:{port}").parse().unwrap(),
1571 MockBase::instance(),
1572 ));
1573 let mut data_source = db
1574 .config()
1575 .pruner_cfg(
1576 PrunerCfg::new()
1577 .with_target_retention(Duration::from_secs(0))
1578 .with_interval(Duration::from_secs(5)),
1579 )
1580 .unwrap()
1581 .builder(provider.clone())
1582 .await
1583 .unwrap()
1584 .with_min_retry_interval(Duration::from_millis(100))
1588 .with_retry_randomization_factor(3.)
1592 .build()
1593 .await
1594 .unwrap();
1595
1596 network.start().await;
1598
1599 let leaves = network.data_source().subscribe_leaves(1).await;
1601 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1602
1603 let pruned_height = data_source
1605 .read()
1606 .await
1607 .unwrap()
1608 .load_pruned_height()
1609 .await
1610 .unwrap();
1611 assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1613
1614 let last_leaf = leaves.last().unwrap();
1617 data_source.append(last_leaf.clone().into()).await.unwrap();
1618
1619 for i in 1..=last_leaf.height() {
1621 tracing::info!(i, "fetching leaf");
1622 assert_eq!(
1623 data_source.get_leaf(i as usize).await.await,
1624 leaves[i as usize - 1]
1625 );
1626 }
1627
1628 loop {
1630 let pruned_height = data_source
1631 .read()
1632 .await
1633 .unwrap()
1634 .load_pruned_height()
1635 .await
1636 .unwrap();
1637 if pruned_height == Some(last_leaf.height()) {
1638 break;
1639 }
1640 tracing::info!(
1641 ?pruned_height,
1642 target_height = last_leaf.height(),
1643 "waiting for pruner to run"
1644 );
1645 sleep(Duration::from_secs(1)).await;
1646 }
1647
1648 data_source = db
1650 .config()
1651 .archive()
1652 .builder(provider.clone())
1653 .await
1654 .unwrap()
1655 .with_proactive_interval(Duration::from_secs(1))
1656 .build()
1657 .await
1658 .unwrap();
1659
1660 let pruned_height = data_source
1662 .read()
1663 .await
1664 .unwrap()
1665 .load_pruned_height()
1666 .await
1667 .unwrap();
1668 assert_eq!(pruned_height, None);
1669
1670 data_source.append(last_leaf.clone().into()).await.unwrap();
1674
1675 loop {
1677 let sync_status = data_source.sync_status().await.unwrap();
1678
1679 if (SyncStatusQueryData {
1683 vid_shares: Default::default(),
1684 ..sync_status.clone()
1685 })
1686 .is_fully_synced()
1687 {
1688 break;
1689 }
1690 tracing::info!(?sync_status, "waiting for node to sync");
1691 sleep(Duration::from_secs(1)).await;
1692 }
1693
1694 sleep(Duration::from_secs(3)).await;
1696 let sync_status = data_source.sync_status().await.unwrap();
1697 assert!(
1698 (SyncStatusQueryData {
1699 vid_shares: Default::default(),
1700 ..sync_status.clone()
1701 })
1702 .is_fully_synced(),
1703 "{sync_status:#?}"
1704 );
1705 }
1706
1707 #[derive(Clone, Copy, Debug)]
1708 enum FailureType {
1709 Begin,
1710 Write,
1711 Commit,
1712 }
1713
1714 async fn test_fetch_storage_failure_helper(failure: FailureType) {
1715 let mut network = MockNetwork::<MockDataSource>::init().await;
1717
1718 let port = reserve_tcp_port().unwrap();
1720 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1721 app.register_module(
1722 "availability",
1723 define_api(
1724 &Default::default(),
1725 MockBase::instance(),
1726 "1.0.0".parse().unwrap(),
1727 )
1728 .unwrap(),
1729 )
1730 .unwrap();
1731 network.spawn(
1732 "server",
1733 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1734 );
1735
1736 let provider = Provider::new(QueryServiceProvider::new(
1738 format!("http://localhost:{port}").parse().unwrap(),
1739 MockBase::instance(),
1740 ));
1741 let db = TmpDb::init().await;
1742 let storage = FailStorage::from(
1743 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1744 .await
1745 .unwrap(),
1746 );
1747 let data_source = FetchingDataSource::builder(storage, provider)
1748 .disable_proactive_fetching()
1749 .disable_aggregator()
1750 .with_max_retry_interval(Duration::from_millis(100))
1751 .with_retry_timeout(Duration::from_secs(1))
1752 .build()
1753 .await
1754 .unwrap();
1755
1756 network.start().await;
1758
1759 let leaves = network.data_source().subscribe_leaves(1).await;
1761 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1762
1763 let last_leaf = leaves.last().unwrap();
1765 let mut tx = data_source.write().await.unwrap();
1766 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1767 tx.commit().await.unwrap();
1768
1769 tracing::info!("fetch with write failure");
1771 match failure {
1772 FailureType::Begin => {
1773 data_source
1774 .as_ref()
1775 .fail_begins_writable(FailableAction::Any)
1776 .await
1777 },
1778 FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1779 FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1780 }
1781 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1782 data_source.as_ref().pass().await;
1783
1784 sleep(Duration::from_secs(1)).await;
1789
1790 tracing::info!("fetch with write success");
1793 let fetch = data_source.get_leaf(1).await;
1794 assert!(fetch.is_pending());
1795 assert_eq!(leaves[0], fetch.await);
1796
1797 sleep(Duration::from_secs(1)).await;
1798
1799 tracing::info!("retrieve from storage");
1801 let fetch = data_source.get_leaf(1).await;
1802 assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1803 }
1804
1805 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1806 async fn test_fetch_storage_failure_on_begin() {
1807 test_fetch_storage_failure_helper(FailureType::Begin).await;
1808 }
1809
1810 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1811 async fn test_fetch_storage_failure_on_write() {
1812 test_fetch_storage_failure_helper(FailureType::Write).await;
1813 }
1814
1815 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1816 async fn test_fetch_storage_failure_on_commit() {
1817 test_fetch_storage_failure_helper(FailureType::Commit).await;
1818 }
1819
1820 async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1821 let mut network = MockNetwork::<MockDataSource>::init().await;
1823
1824 let port = reserve_tcp_port().unwrap();
1826 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1827 app.register_module(
1828 "availability",
1829 define_api(
1830 &Default::default(),
1831 MockBase::instance(),
1832 "1.0.0".parse().unwrap(),
1833 )
1834 .unwrap(),
1835 )
1836 .unwrap();
1837 network.spawn(
1838 "server",
1839 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1840 );
1841
1842 let provider = Provider::new(QueryServiceProvider::new(
1844 format!("http://localhost:{port}").parse().unwrap(),
1845 MockBase::instance(),
1846 ));
1847 let db = TmpDb::init().await;
1848 let storage = FailStorage::from(
1849 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1850 .await
1851 .unwrap(),
1852 );
1853 let data_source = FetchingDataSource::builder(storage, provider)
1854 .disable_proactive_fetching()
1855 .disable_aggregator()
1856 .with_min_retry_interval(Duration::from_millis(100))
1857 .build()
1858 .await
1859 .unwrap();
1860
1861 network.start().await;
1863
1864 let leaves = network.data_source().subscribe_leaves(1).await;
1866 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1867
1868 let last_leaf = leaves.last().unwrap();
1870 let mut tx = data_source.write().await.unwrap();
1871 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1872 tx.commit().await.unwrap();
1873
1874 tracing::info!("fetch with write failure");
1876 match failure {
1877 FailureType::Begin => {
1878 data_source
1879 .as_ref()
1880 .fail_one_begin_writable(FailableAction::Any)
1881 .await
1882 },
1883 FailureType::Write => {
1884 data_source
1885 .as_ref()
1886 .fail_one_write(FailableAction::Any)
1887 .await
1888 },
1889 FailureType::Commit => {
1890 data_source
1891 .as_ref()
1892 .fail_one_commit(FailableAction::Any)
1893 .await
1894 },
1895 }
1896 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1897
1898 let mut tx = data_source.read().await.unwrap();
1900 assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1901 }
1902
1903 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1904 async fn test_fetch_storage_failure_retry_on_begin() {
1905 test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1906 }
1907
1908 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1909 async fn test_fetch_storage_failure_retry_on_write() {
1910 test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1911 }
1912
1913 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1914 async fn test_fetch_storage_failure_retry_on_commit() {
1915 test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1916 }
1917
1918 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1919 async fn test_fetch_on_decide() {
1920 let mut network = MockNetwork::<MockDataSource>::init().await;
1922
1923 let port = reserve_tcp_port().unwrap();
1925 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1926 app.register_module(
1927 "availability",
1928 define_api(
1929 &Default::default(),
1930 MockBase::instance(),
1931 "1.0.0".parse().unwrap(),
1932 )
1933 .unwrap(),
1934 )
1935 .unwrap();
1936 network.spawn(
1937 "server",
1938 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1939 );
1940
1941 let db = TmpDb::init().await;
1943 let provider = Provider::new(QueryServiceProvider::new(
1944 format!("http://localhost:{port}").parse().unwrap(),
1945 MockBase::instance(),
1946 ));
1947 let data_source = builder(&db, &provider)
1948 .await
1949 .with_max_retry_interval(Duration::from_secs(1))
1950 .build()
1951 .await
1952 .unwrap();
1953
1954 network.start().await;
1956
1957 let leaf = network
1959 .data_source()
1960 .subscribe_leaves(1)
1961 .await
1962 .next()
1963 .await
1964 .unwrap();
1965
1966 data_source.append(leaf.clone().into()).await.unwrap();
1968
1969 sleep(Duration::from_secs(5)).await;
1972
1973 let mut tx = data_source.read().await.unwrap();
1977 let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1978 let block = tx.get_block(id).await.unwrap();
1979 let vid = tx.get_vid_common(id).await.unwrap();
1980
1981 assert_eq!(block.hash(), leaf.block_hash());
1982 assert_eq!(vid.block_hash(), leaf.block_hash());
1983 }
1984
1985 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1986 async fn test_fetch_begin_failure() {
1987 let mut network = MockNetwork::<MockDataSource>::init().await;
1989
1990 let port = reserve_tcp_port().unwrap();
1992 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1993 app.register_module(
1994 "availability",
1995 define_api(
1996 &Default::default(),
1997 MockBase::instance(),
1998 "1.0.0".parse().unwrap(),
1999 )
2000 .unwrap(),
2001 )
2002 .unwrap();
2003 network.spawn(
2004 "server",
2005 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2006 );
2007
2008 let provider = Provider::new(QueryServiceProvider::new(
2010 format!("http://localhost:{port}").parse().unwrap(),
2011 MockBase::instance(),
2012 ));
2013 let db = TmpDb::init().await;
2014 let storage = FailStorage::from(
2015 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2016 .await
2017 .unwrap(),
2018 );
2019 let data_source = FetchingDataSource::builder(storage, provider)
2020 .disable_proactive_fetching()
2021 .disable_aggregator()
2022 .with_min_retry_interval(Duration::from_millis(100))
2023 .build()
2024 .await
2025 .unwrap();
2026
2027 network.start().await;
2029
2030 let leaves = network.data_source().subscribe_leaves(1).await;
2032 let leaves = leaves.take(2).collect::<Vec<_>>().await;
2033
2034 let last_leaf = leaves.last().unwrap();
2036 let mut tx = data_source.write().await.unwrap();
2037 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2038 tx.commit().await.unwrap();
2039
2040 tracing::info!("fetch with transaction failure");
2043 data_source
2044 .as_ref()
2045 .fail_one_begin_read_only(FailableAction::Any)
2046 .await;
2047 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2048 }
2049
2050 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2051 async fn test_fetch_load_failure_block() {
2052 let mut network = MockNetwork::<MockDataSource>::init().await;
2054
2055 let port = reserve_tcp_port().unwrap();
2057 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2058 app.register_module(
2059 "availability",
2060 define_api(
2061 &Default::default(),
2062 MockBase::instance(),
2063 "1.0.0".parse().unwrap(),
2064 )
2065 .unwrap(),
2066 )
2067 .unwrap();
2068 network.spawn(
2069 "server",
2070 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2071 );
2072
2073 let provider = Provider::new(QueryServiceProvider::new(
2075 format!("http://localhost:{port}").parse().unwrap(),
2076 MockBase::instance(),
2077 ));
2078 let db = TmpDb::init().await;
2079 let storage = FailStorage::from(
2080 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2081 .await
2082 .unwrap(),
2083 );
2084 let data_source = FetchingDataSource::builder(storage, provider)
2085 .disable_proactive_fetching()
2086 .disable_aggregator()
2087 .with_min_retry_interval(Duration::from_millis(100))
2088 .build()
2089 .await
2090 .unwrap();
2091
2092 network.start().await;
2094
2095 let mut leaves = network.data_source().subscribe_leaves(1).await;
2097 let leaf = leaves.next().await.unwrap();
2098
2099 let mut tx = data_source.write().await.unwrap();
2102 tx.insert_leaf(leaf.clone()).await.unwrap();
2103 tx.commit().await.unwrap();
2104
2105 tracing::info!("fetch with read failure");
2119 data_source
2120 .as_ref()
2121 .fail_one_read(FailableAction::GetHeader)
2122 .await;
2123 let fetch = data_source.get_block(leaf.block_hash()).await;
2124
2125 sleep(Duration::from_secs(2)).await;
2127 data_source.as_ref().pass().await;
2128
2129 let block: BlockQueryData<MockTypes> = fetch.await;
2130 assert_eq!(block.hash(), leaf.block_hash());
2131 }
2132
2133 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2134 async fn test_fetch_load_failure_tx() {
2135 let mut network = MockNetwork::<MockDataSource>::init().await;
2137
2138 let port = reserve_tcp_port().unwrap();
2140 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2141 app.register_module(
2142 "availability",
2143 define_api(
2144 &Default::default(),
2145 MockBase::instance(),
2146 "1.0.0".parse().unwrap(),
2147 )
2148 .unwrap(),
2149 )
2150 .unwrap();
2151 network.spawn(
2152 "server",
2153 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2154 );
2155
2156 let provider = Provider::new(QueryServiceProvider::new(
2158 format!("http://localhost:{port}").parse().unwrap(),
2159 MockBase::instance(),
2160 ));
2161 let db = TmpDb::init().await;
2162 let storage = FailStorage::from(
2163 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2164 .await
2165 .unwrap(),
2166 );
2167 let data_source = FetchingDataSource::builder(storage, provider)
2168 .disable_proactive_fetching()
2169 .disable_aggregator()
2170 .with_min_retry_interval(Duration::from_millis(100))
2171 .build()
2172 .await
2173 .unwrap();
2174
2175 network.start().await;
2177
2178 let tx = mock_transaction(vec![1, 2, 3]);
2180 network.submit_transaction(tx.clone()).await;
2181 let tx = network
2182 .data_source()
2183 .get_block_containing_transaction(tx.commit())
2184 .await
2185 .await;
2186
2187 {
2189 let leaf = network
2190 .data_source()
2191 .get_leaf(tx.transaction.block_height() as usize)
2192 .await
2193 .await;
2194 let block = network
2195 .data_source()
2196 .get_block(tx.transaction.block_height() as usize)
2197 .await
2198 .await;
2199 let mut tx = data_source.write().await.unwrap();
2200 tx.insert_leaf(leaf.clone()).await.unwrap();
2201 tx.insert_block(block.clone()).await.unwrap();
2202 tx.commit().await.unwrap();
2203 }
2204
2205 tracing::info!("fetch success");
2207 assert_eq!(
2208 tx,
2209 data_source
2210 .get_block_containing_transaction(tx.transaction.hash())
2211 .await
2212 .await
2213 );
2214
2215 tracing::info!("fetch with read failure");
2227 data_source
2228 .as_ref()
2229 .fail_one_read(FailableAction::Any)
2230 .await;
2231 let fetch = data_source
2232 .get_block_containing_transaction(tx.transaction.hash())
2233 .await;
2234
2235 assert_eq!(tx, fetch.await);
2236 }
2237
2238 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2239 async fn test_stream_begin_failure() {
2240 let mut network = MockNetwork::<MockDataSource>::init().await;
2242
2243 let port = reserve_tcp_port().unwrap();
2245 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2246 app.register_module(
2247 "availability",
2248 define_api(
2249 &Default::default(),
2250 MockBase::instance(),
2251 "1.0.0".parse().unwrap(),
2252 )
2253 .unwrap(),
2254 )
2255 .unwrap();
2256 network.spawn(
2257 "server",
2258 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2259 );
2260
2261 let provider = Provider::new(QueryServiceProvider::new(
2263 format!("http://localhost:{port}").parse().unwrap(),
2264 MockBase::instance(),
2265 ));
2266 let db = TmpDb::init().await;
2267 let storage = FailStorage::from(
2268 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2269 .await
2270 .unwrap(),
2271 );
2272 let data_source = FetchingDataSource::builder(storage, provider)
2273 .disable_proactive_fetching()
2274 .disable_aggregator()
2275 .with_min_retry_interval(Duration::from_millis(100))
2276 .with_range_chunk_size(3)
2277 .build()
2278 .await
2279 .unwrap();
2280
2281 network.start().await;
2283
2284 let leaves = network.data_source().subscribe_leaves(1).await;
2286 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2287
2288 let last_leaf = leaves.last().unwrap();
2290 let mut tx = data_source.write().await.unwrap();
2291 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2292 tx.commit().await.unwrap();
2293
2294 tracing::info!("stream with transaction failure");
2297 data_source
2298 .as_ref()
2299 .fail_one_begin_read_only(FailableAction::Any)
2300 .await;
2301 assert_eq!(
2302 leaves,
2303 data_source
2304 .subscribe_leaves(1)
2305 .await
2306 .take(5)
2307 .collect::<Vec<_>>()
2308 .await
2309 );
2310 }
2311
2312 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2313 async fn test_stream_load_failure() {
2314 let mut network = MockNetwork::<MockDataSource>::init().await;
2316
2317 let port = reserve_tcp_port().unwrap();
2319 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2320 app.register_module(
2321 "availability",
2322 define_api(
2323 &Default::default(),
2324 MockBase::instance(),
2325 "1.0.0".parse().unwrap(),
2326 )
2327 .unwrap(),
2328 )
2329 .unwrap();
2330 network.spawn(
2331 "server",
2332 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2333 );
2334
2335 let provider = Provider::new(QueryServiceProvider::new(
2337 format!("http://localhost:{port}").parse().unwrap(),
2338 MockBase::instance(),
2339 ));
2340 let db = TmpDb::init().await;
2341 let storage = FailStorage::from(
2342 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2343 .await
2344 .unwrap(),
2345 );
2346 let data_source = FetchingDataSource::builder(storage, provider)
2347 .disable_proactive_fetching()
2348 .disable_aggregator()
2349 .with_min_retry_interval(Duration::from_millis(100))
2350 .with_range_chunk_size(3)
2351 .build()
2352 .await
2353 .unwrap();
2354
2355 network.start().await;
2357
2358 let leaves = network.data_source().subscribe_leaves(1).await;
2360 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2361
2362 let last_leaf = leaves.last().unwrap();
2364 let mut tx = data_source.write().await.unwrap();
2365 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2366 tx.commit().await.unwrap();
2367
2368 tracing::info!("stream with read failure");
2370 data_source.as_ref().fail_reads(FailableAction::Any).await;
2371 let fetches = data_source
2372 .get_block_range(1..=5)
2373 .await
2374 .collect::<Vec<_>>()
2375 .await;
2376
2377 sleep(Duration::from_secs(2)).await;
2379 data_source.as_ref().pass().await;
2380
2381 for (leaf, fetch) in leaves.iter().zip(fetches) {
2382 let block: BlockQueryData<MockTypes> = fetch.await;
2383 assert_eq!(block.hash(), leaf.block_hash());
2384 }
2385 }
2386
2387 enum MetadataType {
2388 Payload,
2389 Vid,
2390 }
2391
2392 async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2393 let mut network = MockNetwork::<MockDataSource>::init().await;
2395
2396 let port = reserve_tcp_port().unwrap();
2398 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2399 app.register_module(
2400 "availability",
2401 define_api(
2402 &Default::default(),
2403 MockBase::instance(),
2404 "1.0.0".parse().unwrap(),
2405 )
2406 .unwrap(),
2407 )
2408 .unwrap();
2409 network.spawn(
2410 "server",
2411 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2412 );
2413
2414 let provider = Provider::new(QueryServiceProvider::new(
2416 format!("http://localhost:{port}").parse().unwrap(),
2417 MockBase::instance(),
2418 ));
2419 let db = TmpDb::init().await;
2420 let storage = FailStorage::from(
2421 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2422 .await
2423 .unwrap(),
2424 );
2425 let data_source = FetchingDataSource::builder(storage, provider)
2426 .disable_proactive_fetching()
2427 .disable_aggregator()
2428 .with_min_retry_interval(Duration::from_millis(100))
2429 .with_range_chunk_size(3)
2430 .build()
2431 .await
2432 .unwrap();
2433
2434 network.start().await;
2436
2437 let leaves = network.data_source().subscribe_leaves(1).await;
2439 let leaves = leaves.take(3).collect::<Vec<_>>().await;
2440
2441 let last_leaf = leaves.last().unwrap();
2443 let mut tx = data_source.write().await.unwrap();
2444 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2445 tx.commit().await.unwrap();
2446
2447 let leaf = network.data_source().get_leaf(1).await.await;
2452 let block = network.data_source().get_block(1).await.await;
2453 let vid = network.data_source().get_vid_common(1).await.await;
2454 data_source
2455 .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2456 .await
2457 .unwrap();
2458
2459 tracing::info!("stream with transaction failure");
2461 data_source
2462 .as_ref()
2463 .fail_begins_read_only(FailableAction::Any)
2464 .await;
2465 match stream {
2466 MetadataType::Payload => {
2467 let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2468
2469 sleep(Duration::from_secs(2)).await;
2471 tracing::info!("stop failing transactions");
2472 data_source.as_ref().pass().await;
2473
2474 let payloads = payloads.collect::<Vec<_>>().await;
2475 for (leaf, payload) in leaves.iter().zip(payloads) {
2476 assert_eq!(payload.block_hash, leaf.block_hash());
2477 }
2478 },
2479 MetadataType::Vid => {
2480 let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2481
2482 sleep(Duration::from_secs(2)).await;
2484 tracing::info!("stop failing transactions");
2485 data_source.as_ref().pass().await;
2486
2487 let vids = vids.collect::<Vec<_>>().await;
2488 for (leaf, vid) in leaves.iter().zip(vids) {
2489 assert_eq!(vid.block_hash, leaf.block_hash());
2490 }
2491 },
2492 }
2493 }
2494
2495 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2496 async fn test_metadata_stream_begin_failure_payload() {
2497 test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2498 }
2499
2500 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2501 async fn test_metadata_stream_begin_failure_vid() {
2502 test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2503 }
2504
2505 async fn run_fallback_deserialization_test_helper(port: u16, version: &str) {
2510 let mut network = MockNetwork::<MockDataSource>::init().await;
2511
2512 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2513
2514 app.register_module(
2516 "availability",
2517 define_api(
2518 &Default::default(),
2519 StaticVersion::<0, 1> {},
2520 "0.0.1".parse().unwrap(),
2521 )
2522 .unwrap(),
2523 )
2524 .unwrap();
2525
2526 app.register_module(
2527 "availability",
2528 define_api(
2529 &Default::default(),
2530 StaticVersion::<0, 1> {},
2531 "1.0.0".parse().unwrap(),
2532 )
2533 .unwrap(),
2534 )
2535 .unwrap();
2536
2537 network.spawn(
2538 "server",
2539 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2540 );
2541
2542 let db = TmpDb::init().await;
2543
2544 let provider_url = format!("http://localhost:{port}/{version}")
2545 .parse()
2546 .expect("Invalid URL");
2547
2548 let provider = Provider::new(QueryServiceProvider::new(
2549 provider_url,
2550 StaticVersion::<0, 1> {},
2551 ));
2552
2553 let ds = data_source(&db, &provider).await;
2554 network.start().await;
2555
2556 let leaves = network.data_source().subscribe_leaves(1).await;
2557 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2558 let test_leaf = &leaves[0];
2559 let test_payload = &leaves[2];
2560 let test_common = &leaves[3];
2561
2562 let mut fetches = vec![];
2563 fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2565 fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2566 fetches.push(
2567 ds.get_vid_common(test_common.block_hash())
2568 .await
2569 .map(ignore),
2570 );
2571
2572 sleep(Duration::from_secs(1)).await;
2575 for (i, fetch) in fetches.into_iter().enumerate() {
2576 tracing::info!("checking fetch {i} is unresolved");
2577 fetch.try_resolve().unwrap_err();
2578 }
2579
2580 ds.append(leaves.last().cloned().unwrap().into())
2585 .await
2586 .unwrap();
2587
2588 {
2590 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2591 let payload = ds.get_payload(test_payload.height() as usize).await;
2592 let common = ds.get_vid_common(test_common.height() as usize).await;
2593
2594 let truth = network.data_source();
2595 assert_eq!(
2596 leaf.await,
2597 truth.get_leaf(test_leaf.height() as usize).await.await
2598 );
2599 assert_eq!(
2600 payload.await,
2601 truth
2602 .get_payload(test_payload.height() as usize)
2603 .await
2604 .await
2605 );
2606 assert_eq!(
2607 common.await,
2608 truth
2609 .get_vid_common(test_common.height() as usize)
2610 .await
2611 .await
2612 );
2613 }
2614 }
2615
2616 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2617 async fn test_fallback_deserialization_for_fetch_requests_v0() {
2618 let port = reserve_tcp_port().unwrap();
2619
2620 run_fallback_deserialization_test_helper(port, "v0").await;
2626 }
2627
2628 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2629 async fn test_fallback_deserialization_for_fetch_requests_v1() {
2630 let port = reserve_tcp_port().unwrap();
2631
2632 run_fallback_deserialization_test_helper(port, "v1").await;
2636 }
2637
2638 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2639 async fn test_fallback_deserialization_for_fetch_requests_pos() {
2640 let port = reserve_tcp_port().unwrap();
2641
2642 run_fallback_deserialization_test_helper(port, "v1").await;
2645 }
2646 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2647 async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2648 let mut network = MockNetwork::<MockDataSource>::init().await;
2654
2655 let port = reserve_tcp_port().unwrap();
2656 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2657
2658 app.register_module(
2659 "availability",
2660 define_api(
2661 &Default::default(),
2662 StaticVersion::<0, 1> {},
2663 "0.0.1".parse().unwrap(),
2664 )
2665 .unwrap(),
2666 )
2667 .unwrap();
2668
2669 network.spawn(
2670 "server",
2671 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2672 );
2673
2674 let db = TmpDb::init().await;
2675 let provider = Provider::new(QueryServiceProvider::new(
2676 format!("http://localhost:{port}/v0").parse().unwrap(),
2677 StaticVersion::<0, 1> {},
2678 ));
2679 let ds = data_source(&db, &provider).await;
2680
2681 network.start().await;
2682
2683 let leaves = network.data_source().subscribe_leaves(1).await;
2684 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2685 let test_leaf = &leaves[0];
2686 let test_payload = &leaves[2];
2687 let test_common = &leaves[3];
2688
2689 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2690 let payload = ds.get_payload(test_payload.height() as usize).await;
2691 let common = ds.get_vid_common(test_common.height() as usize).await;
2692
2693 sleep(Duration::from_secs(3)).await;
2694
2695 leaf.try_resolve().unwrap_err();
2697 payload.try_resolve().unwrap_err();
2698 common.try_resolve().unwrap_err();
2699 }
2700}