1use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16 data::{ns_table, VidCommitment, VidCommon},
17 traits::{block_contents::BlockHeader, node_implementation::NodeType, EncodeBytes},
18 vid::{
19 advz::{advz_scheme, ADVZScheme},
20 avidm::{init_avidm_param, AvidMScheme},
21 avidm_gf2::AvidmGf2Scheme,
22 },
23};
24use jf_advz::VidScheme;
25use surf_disco::{Client, Url};
26use vbs::{version::StaticVersionType, BinarySerializer};
27
28use super::Provider;
29use crate::{
30 availability::{
31 ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
32 PayloadQueryData, VidCommonQueryData,
33 },
34 fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
35 types::HeightIndexed,
36 Error, Header, Payload,
37};
38
39#[derive(Clone, Debug)]
44pub struct QueryServiceProvider<Ver: StaticVersionType> {
45 client: Client<Error, Ver>,
46}
47
48impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
49 pub fn new(url: Url, _: Ver) -> Self {
50 Self {
51 client: Client::new(url),
52 }
53 }
54}
55
56impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
57 async fn deserialize_legacy_payload<Types: NodeType>(
58 &self,
59 payload_bytes: Vec<u8>,
60 common_bytes: Vec<u8>,
61 req: PayloadRequest,
62 ) -> Option<Payload<Types>> {
63 let client_url = self.client.base_url();
64
65 let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
66 return None;
67 };
68
69 let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
70 &payload_bytes,
71 ) {
72 Ok(payload) => payload,
73 Err(err) => {
74 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
75 return None;
76 },
77 };
78
79 let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
80 &common_bytes,
81 ) {
82 Ok(common) => common,
83 Err(err) => {
84 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
85 return None;
86 },
87 };
88
89 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
90 let bytes = payload.data.encode();
91
92 let commit = advz_scheme(num_storage_nodes)
93 .commit_only(bytes)
94 .inspect_err(|err| {
95 tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
96 })
97 .ok()?;
98
99 if commit != advz_commit {
100 tracing::error!(
101 ?req,
102 expected_commit=%advz_commit,
103 actual_commit=%commit,
104 %client_url,
105 "received inconsistent legacy payload"
106 );
107 return None;
108 }
109
110 Some(payload.data)
111 }
112
113 async fn deserialize_legacy_vid_common<Types: NodeType>(
114 &self,
115 bytes: Vec<u8>,
116 req: VidCommonRequest,
117 ) -> Option<VidCommon> {
118 let client_url = self.client.base_url();
119 let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
120 return None;
121 };
122
123 match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
124 Ok(res) => {
125 if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
126 Some(VidCommon::V0(res.common))
127 } else {
128 tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
129 None
130 }
131 },
132 Err(err) => {
133 tracing::warn!(
134 %client_url,
135 ?req,
136 %err,
137 "failed to deserialize ADVZCommonQueryData"
138 );
139 None
140 },
141 }
142 }
143 async fn deserialize_legacy_leaf<Types: NodeType>(
144 &self,
145 bytes: Vec<u8>,
146 req: LeafRequest<Types>,
147 ) -> Option<LeafQueryData<Types>> {
148 let client_url = self.client.base_url();
149
150 match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
151 Ok(mut leaf) => {
152 if leaf.height() != req.height {
153 tracing::error!(
154 %client_url, ?req,
155 expected_height = req.height,
156 actual_height = leaf.height(),
157 "received leaf with the wrong height"
158 );
159 return None;
160 }
161
162 let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
163 let actual_leaf_commit: [u8; 32] = leaf.hash().into();
164 if actual_leaf_commit != expected_leaf_commit {
165 tracing::error!(
166 %client_url, ?req,
167 expected_leaf = %req.expected_leaf,
168 actual_leaf = %leaf.hash(),
169 "received leaf with the wrong hash"
170 );
171 return None;
172 }
173
174 let expected_qc_commit: [u8; 32] = req.expected_qc.into();
175 let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
176 if actual_qc_commit != expected_qc_commit {
177 tracing::error!(
178 %client_url, ?req,
179 expected_qc = %req.expected_qc,
180 actual_qc = %leaf.qc().commit(),
181 "received leaf with the wrong QC"
182 );
183 return None;
184 }
185
186 leaf.leaf.unfill_block_payload();
191
192 Some(leaf.into())
193 },
194 Err(err) => {
195 tracing::warn!(
196 %client_url, ?req, %err,
197 "failed to deserialize legacy LeafQueryData"
198 );
199 None
200 },
201 }
202 }
203}
204
205#[async_trait]
206impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
207where
208 Types: NodeType,
209{
210 async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
218 let client_url = self.client.base_url();
219 let req_hash = req.0;
220 let payload_bytes = self
224 .client
225 .get::<()>(&format!("availability/payload/hash/{}", req.0))
226 .bytes()
227 .await
228 .inspect_err(|err| {
229 tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
230 })
231 .ok()?;
232
233 let common_bytes = self
234 .client
235 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
236 .bytes()
237 .await
238 .inspect_err(|err| {
239 tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
240 })
241 .ok()?;
242
243 let payload =
244 vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
245 .inspect_err(|err| {
246 tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
247 })
248 .ok();
249
250 let common =
251 vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
252 .inspect_err(|err| {
253 tracing::info!(%err, %req_hash,
254 "error deserializing VidCommonQueryData",
255 );
256 })
257 .ok();
258
259 let (payload, common) = match (payload, common) {
260 (Some(payload), Some(common)) => (payload, common),
261 _ => {
262 tracing::info!(%req_hash, "falling back to legacy payload deserialization");
263
264 return self
266 .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
267 .await;
268 },
269 };
270
271 match common.common() {
272 VidCommon::V0(common) => {
273 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
274 let bytes = payload.data().encode();
275
276 let commit = advz_scheme(num_storage_nodes)
277 .commit_only(bytes)
278 .map(VidCommitment::V0)
279 .inspect_err(|err| {
280 tracing::error!(%err, %req_hash, %num_storage_nodes, "failed to compute VID commitment (V0)");
281 })
282 .ok()?;
283
284 if commit != req.0 {
285 tracing::error!(
286 expected = %req_hash,
287 actual = ?commit,
288 %client_url,
289 "VID commitment mismatch (V0)"
290 );
291
292 return None;
293 }
294 },
295 VidCommon::V1(common) => {
296 let bytes = payload.data().encode();
297
298 let avidm_param = init_avidm_param(common.total_weights)
299 .inspect_err(|err| {
300 tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
301 })
302 .ok()?;
303
304 let header = self
305 .client
306 .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
307 .send()
308 .await
309 .inspect_err(|err| {
310 tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
311 })
312 .ok()?;
313
314 if header.payload_commitment() != req.0 {
315 tracing::error!(
316 expected = %req_hash,
317 actual = %header.payload_commitment(),
318 %client_url,
319 "header payload commitment mismatch (V1, AvidM)"
320 );
321 return None;
322 }
323
324 let metadata = header.metadata().encode();
325 let commit = AvidMScheme::commit(
326 &avidm_param,
327 &bytes,
328 ns_table::parse_ns_table(bytes.len(), &metadata),
329 )
330 .map(VidCommitment::V1)
331 .inspect_err(|err| {
332 tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
333 })
334 .ok()?;
335
336 if commit != req.0 {
338 tracing::warn!(
339 expected = %req_hash,
340 actual = %commit,
341 %client_url,
342 "commitment type mismatch for AVIDM check"
343 );
344 return None;
345 }
346 },
347 VidCommon::V2(common) => {
348 let bytes = payload.data().encode();
349
350 let header = self
351 .client
352 .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
353 .send()
354 .await
355 .inspect_err(|err| {
356 tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
357 })
358 .ok()?;
359
360 if header.payload_commitment() != req.0 {
361 tracing::error!(
362 expected = %req_hash,
363 actual = %header.payload_commitment(),
364 %client_url,
365 "header payload commitment mismatch (V2, AvidmGf2)"
366 );
367 return None;
368 }
369
370 let metadata = header.metadata().encode();
371 let commit = AvidmGf2Scheme::commit(
372 &common.param,
373 &bytes,
374 ns_table::parse_ns_table(bytes.len(), &metadata),
375 )
376 .map(|(commit, _)| VidCommitment::V2(commit))
377 .inspect_err(|err| {
378 tracing::error!(%err, %req_hash, "failed to compute AvidmGf2 commitment");
379 })
380 .ok()?;
381
382 if commit != req.0 {
384 tracing::warn!(
385 expected = %req_hash,
386 actual = %commit,
387 %client_url,
388 "commitment type mismatch for AvidmGf2 check"
389 );
390 return None;
391 }
392 },
393 }
394
395 Some(payload.data)
396 }
397}
398
399#[async_trait]
400impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
401 for QueryServiceProvider<Ver>
402where
403 Types: NodeType,
404{
405 async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
413 let client_url = self.client.base_url();
414
415 let bytes = self
416 .client
417 .get::<()>(&format!("availability/leaf/{}", req.height))
418 .bytes()
419 .await;
420 let bytes = match bytes {
421 Ok(bytes) => bytes,
422 Err(err) => {
423 tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
424
425 return None;
426 },
427 };
428
429 match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
432 Ok(mut leaf) => {
433 if leaf.height() != req.height {
434 tracing::error!(
435 %client_url, ?req, ?leaf,
436 expected_height = req.height,
437 actual_height = leaf.height(),
438 "received leaf with the wrong height"
439 );
440 return None;
441 }
442 if leaf.hash() != req.expected_leaf {
443 tracing::error!(
444 %client_url, ?req, ?leaf,
445 expected_hash = %req.expected_leaf,
446 actual_hash = %leaf.hash(),
447 "received leaf with the wrong hash"
448 );
449 return None;
450 }
451 if leaf.qc().commit() != req.expected_qc {
452 tracing::error!(
453 %client_url, ?req, ?leaf,
454 expected_qc = %req.expected_qc,
455 actual_qc = %leaf.qc().commit(),
456 "received leaf with the wrong QC"
457 );
458 return None;
459 }
460
461 leaf.leaf.unfill_block_payload();
466
467 Some(leaf)
468 },
469 Err(err) => {
470 tracing::info!(
471 ?req, %err,
472 "failed to deserialize LeafQueryData, falling back to legacy deserialization"
473 );
474 self.deserialize_legacy_leaf(bytes, req).await
476 },
477 }
478 }
479}
480
481#[async_trait]
482impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
483where
484 Types: NodeType,
485{
486 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
494 let client_url = self.client.base_url();
495 let bytes = self
496 .client
497 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
498 .bytes()
499 .await;
500 let bytes = match bytes {
501 Ok(bytes) => bytes,
502 Err(err) => {
503 tracing::info!(
504 %client_url, ?req, %err,
505 "failed to fetch VID common bytes"
506 );
507 return None;
508 },
509 };
510
511 match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
512 Ok(res) => {
513 if !res.common().is_consistent(&req.0) {
514 tracing::error!(
515 %client_url, ?req, ?res.common,
516 "fetched inconsistent VID common data"
517 );
518 return None;
519 }
520 Some(res.common)
521 },
522 Err(err) => {
523 tracing::info!(
524 %client_url, ?req, %err,
525 "failed to deserialize as V1 VID common data, trying legacy fallback"
526 );
527 self.deserialize_legacy_vid_common::<Types>(bytes, req)
529 .await
530 },
531 }
532 }
533}
534
535#[cfg(all(test, not(target_os = "windows")))]
537mod test {
538 use std::{future::IntoFuture, time::Duration};
539
540 use committable::Committable;
541 use futures::{
542 future::{join, FutureExt},
543 stream::StreamExt,
544 };
545 #[allow(deprecated)]
548 use generic_array::GenericArray;
549 use hotshot_example_types::node_types::{EpochsTestVersions, TestVersions};
550 use hotshot_types::traits::node_implementation::Versions;
551 use portpicker::pick_unused_port;
552 use rand::RngCore;
553 use tide_disco::{error::ServerError, App};
554 use vbs::version::StaticVersion;
555
556 use super::*;
557 use crate::{
558 api::load_api,
559 availability::{
560 define_api, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData,
561 BlockWithTransaction, Fetch, UpdateAvailabilityData,
562 },
563 data_source::{
564 sql::{self, SqlDataSource},
565 storage::{
566 fail_storage::{FailStorage, FailableAction},
567 pruning::{PrunedHeightStorage, PrunerCfg},
568 sql::testing::TmpDb,
569 AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
570 },
571 AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
572 },
573 fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
574 node::{data_source::NodeDataSource, SyncStatus},
575 task::BackgroundTask,
576 testing::{
577 consensus::{MockDataSource, MockNetwork},
578 mocks::{mock_transaction, MockBase, MockTypes, MockVersions},
579 sleep,
580 },
581 types::HeightIndexed,
582 ApiState,
583 };
584
585 type Provider = TestProvider<QueryServiceProvider<MockBase>>;
586 type EpochProvider = TestProvider<QueryServiceProvider<<EpochsTestVersions as Versions>::Base>>;
587
588 fn ignore<T>(_: T) {}
589
590 async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
592 db: &TmpDb,
593 provider: &P,
594 ) -> sql::Builder<MockTypes, P> {
595 db.config()
596 .builder((*provider).clone())
597 .await
598 .unwrap()
599 .disable_proactive_fetching()
602 }
603
604 async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
606 db: &TmpDb,
607 provider: &P,
608 ) -> SqlDataSource<MockTypes, P> {
609 builder(db, provider).await.build().await.unwrap()
610 }
611
612 #[test_log::test(tokio::test(flavor = "multi_thread"))]
613 async fn test_fetch_on_request() {
614 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
616
617 let port = pick_unused_port().unwrap();
619 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
620 app.register_module(
621 "availability",
622 define_api(
623 &Default::default(),
624 MockBase::instance(),
625 "1.0.0".parse().unwrap(),
626 )
627 .unwrap(),
628 )
629 .unwrap();
630 network.spawn(
631 "server",
632 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
633 );
634
635 let db = TmpDb::init().await;
637 let provider = Provider::new(QueryServiceProvider::new(
638 format!("http://localhost:{port}").parse().unwrap(),
639 MockBase::instance(),
640 ));
641 let data_source = data_source(&db, &provider).await;
642
643 network.start().await;
645
646 let leaves = network.data_source().subscribe_leaves(1).await;
653 let leaves = leaves.take(5).collect::<Vec<_>>().await;
654 let test_leaf = &leaves[0];
655 let test_block = &leaves[1];
656 let test_payload = &leaves[2];
657 let test_common = &leaves[3];
658
659 tracing::info!("requesting unfetchable resources");
661 let mut fetches = vec![];
662 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
664 fetches.push(
666 data_source
667 .get_leaf(test_leaf.height() as usize)
668 .await
669 .map(ignore),
670 );
671 fetches.push(
673 data_source
674 .get_block(test_block.block_hash())
675 .await
676 .map(ignore),
677 );
678 fetches.push(
679 data_source
680 .get_payload(test_payload.block_hash())
681 .await
682 .map(ignore),
683 );
684 fetches.push(
685 data_source
686 .get_vid_common(test_common.block_hash())
687 .await
688 .map(ignore),
689 );
690 fetches.push(
692 data_source
693 .get_block(test_block.height() as usize)
694 .await
695 .map(ignore),
696 );
697 fetches.push(
698 data_source
699 .get_payload(test_payload.height() as usize)
700 .await
701 .map(ignore),
702 );
703 fetches.push(
704 data_source
705 .get_vid_common(test_common.height() as usize)
706 .await
707 .map(ignore),
708 );
709 fetches.push(data_source.get_vid_common(0).await.map(ignore));
711 fetches.push(
713 data_source
714 .get_block_containing_transaction(mock_transaction(vec![]).commit())
715 .await
716 .map(ignore),
717 );
718
719 sleep(Duration::from_secs(1)).await;
722 for (i, fetch) in fetches.into_iter().enumerate() {
723 tracing::info!("checking fetch {i} is unresolved");
724 fetch.try_resolve().unwrap_err();
725 }
726
727 provider.block().await;
732 data_source
733 .append(leaves.last().cloned().unwrap().into())
734 .await
735 .unwrap();
736
737 tracing::info!("requesting fetchable resources");
738 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
739 let req_block = data_source.get_block(test_block.height() as usize).await;
740 let req_payload = data_source
741 .get_payload(test_payload.height() as usize)
742 .await;
743 let req_common = data_source
744 .get_vid_common(test_common.height() as usize)
745 .await;
746
747 sleep(Duration::from_secs(1)).await;
753 req_leaf.try_resolve().unwrap_err();
754 req_block.try_resolve().unwrap_err();
755 req_payload.try_resolve().unwrap_err();
756 req_common.try_resolve().unwrap_err();
757
758 provider.unblock().await;
760 let leaf = data_source
761 .get_leaf(test_leaf.height() as usize)
762 .await
763 .await;
764 let block = data_source
765 .get_block(test_block.height() as usize)
766 .await
767 .await;
768 let payload = data_source
769 .get_payload(test_payload.height() as usize)
770 .await
771 .await;
772 let common = data_source
773 .get_vid_common(test_common.height() as usize)
774 .await
775 .await;
776 {
777 let truth = network.data_source();
779 assert_eq!(
780 leaf,
781 truth.get_leaf(test_leaf.height() as usize).await.await
782 );
783 assert_eq!(
784 block,
785 truth.get_block(test_block.height() as usize).await.await
786 );
787 assert_eq!(
788 payload,
789 truth
790 .get_payload(test_payload.height() as usize)
791 .await
792 .await
793 );
794 assert_eq!(
795 common,
796 truth
797 .get_vid_common(test_common.height() as usize)
798 .await
799 .await
800 );
801 }
802
803 provider.block().await;
808 for leaf in [test_block, test_payload] {
809 tracing::info!("fetching existing leaf {}", leaf.height());
810 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
811 assert_eq!(*leaf, fetched_leaf);
812 }
813
814 tracing::info!("fetching block by hash");
819 provider.unblock().await;
820 {
821 let block = data_source.get_block(test_leaf.block_hash()).await.await;
822 assert_eq!(block.hash(), leaf.block_hash());
823 }
824
825 tracing::info!("fetching payload by hash");
829 {
830 let leaf = leaves.last().unwrap();
831 let payload = data_source.get_payload(leaf.block_hash()).await.await;
832 assert_eq!(payload.height(), leaf.height());
833 assert_eq!(payload.block_hash(), leaf.block_hash());
834 assert_eq!(payload.hash(), leaf.payload_hash());
835 }
836 }
837
838 #[tokio::test(flavor = "multi_thread")]
839 async fn test_fetch_on_request_epoch_version() {
840 tracing::info!("Starting test_fetch_on_request_epoch_version");
843
844 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
846
847 let port = pick_unused_port().unwrap();
849 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
850 app.register_module(
851 "availability",
852 define_api(
853 &Default::default(),
854 <EpochsTestVersions as Versions>::Base::instance(),
855 "1.0.0".parse().unwrap(),
856 )
857 .unwrap(),
858 )
859 .unwrap();
860 network.spawn(
861 "server",
862 app.serve(
863 format!("0.0.0.0:{port}"),
864 <EpochsTestVersions as Versions>::Base::instance(),
865 ),
866 );
867
868 let db = TmpDb::init().await;
871 let provider = EpochProvider::new(QueryServiceProvider::new(
872 format!("http://localhost:{port}").parse().unwrap(),
873 <EpochsTestVersions as Versions>::Base::instance(),
874 ));
875 let data_source = data_source(&db, &provider).await;
876
877 network.start().await;
879
880 let leaves = network.data_source().subscribe_leaves(1).await;
887 let leaves = leaves.take(5).collect::<Vec<_>>().await;
888 let test_leaf = &leaves[0];
889 let test_block = &leaves[1];
890 let test_payload = &leaves[2];
891 let test_common = &leaves[3];
892
893 let mut fetches = vec![];
895 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
897 fetches.push(
899 data_source
900 .get_leaf(test_leaf.height() as usize)
901 .await
902 .map(ignore),
903 );
904 fetches.push(
906 data_source
907 .get_block(test_block.block_hash())
908 .await
909 .map(ignore),
910 );
911 fetches.push(
912 data_source
913 .get_payload(test_payload.block_hash())
914 .await
915 .map(ignore),
916 );
917 fetches.push(
918 data_source
919 .get_vid_common(test_common.block_hash())
920 .await
921 .map(ignore),
922 );
923 fetches.push(
925 data_source
926 .get_block(test_block.height() as usize)
927 .await
928 .map(ignore),
929 );
930 fetches.push(
931 data_source
932 .get_payload(test_payload.height() as usize)
933 .await
934 .map(ignore),
935 );
936 fetches.push(
937 data_source
938 .get_vid_common(test_common.height() as usize)
939 .await
940 .map(ignore),
941 );
942 fetches.push(data_source.get_vid_common(0).await.map(ignore));
944 fetches.push(
946 data_source
947 .get_block_containing_transaction(mock_transaction(vec![]).commit())
948 .await
949 .map(ignore),
950 );
951
952 sleep(Duration::from_secs(1)).await;
955 for (i, fetch) in fetches.into_iter().enumerate() {
956 tracing::info!("checking fetch {i} is unresolved");
957 fetch.try_resolve().unwrap_err();
958 }
959
960 provider.block().await;
965 data_source
966 .append(leaves.last().cloned().unwrap().into())
967 .await
968 .unwrap();
969
970 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
971 let req_block = data_source.get_block(test_block.height() as usize).await;
972 let req_payload = data_source
973 .get_payload(test_payload.height() as usize)
974 .await;
975 let req_common = data_source
976 .get_vid_common(test_common.height() as usize)
977 .await;
978
979 sleep(Duration::from_secs(1)).await;
985 req_leaf.try_resolve().unwrap_err();
986 req_block.try_resolve().unwrap_err();
987 req_payload.try_resolve().unwrap_err();
988 req_common.try_resolve().unwrap_err();
989
990 provider.unblock().await;
992 let leaf = data_source
993 .get_leaf(test_leaf.height() as usize)
994 .await
995 .await;
996 let block = data_source
997 .get_block(test_block.height() as usize)
998 .await
999 .await;
1000 let payload = data_source
1001 .get_payload(test_payload.height() as usize)
1002 .await
1003 .await;
1004 let common = data_source
1005 .get_vid_common(test_common.height() as usize)
1006 .await
1007 .await;
1008 {
1009 let truth = network.data_source();
1011 assert_eq!(
1012 leaf,
1013 truth.get_leaf(test_leaf.height() as usize).await.await
1014 );
1015 assert_eq!(
1016 block,
1017 truth.get_block(test_block.height() as usize).await.await
1018 );
1019 assert_eq!(
1020 payload,
1021 truth
1022 .get_payload(test_payload.height() as usize)
1023 .await
1024 .await
1025 );
1026 assert_eq!(
1027 common,
1028 truth
1029 .get_vid_common(test_common.height() as usize)
1030 .await
1031 .await
1032 );
1033 }
1034
1035 provider.block().await;
1040 for leaf in [test_block, test_payload] {
1041 tracing::info!("fetching existing leaf {}", leaf.height());
1042 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1043 assert_eq!(*leaf, fetched_leaf);
1044 }
1045
1046 provider.unblock().await;
1051 {
1052 let block = data_source.get_block(test_leaf.block_hash()).await.await;
1053 assert_eq!(block.hash(), leaf.block_hash());
1054 }
1055
1056 {
1060 let leaf = leaves.last().unwrap();
1061 let payload = data_source.get_payload(leaf.block_hash()).await.await;
1062 assert_eq!(payload.height(), leaf.height());
1063 assert_eq!(payload.block_hash(), leaf.block_hash());
1064 assert_eq!(payload.hash(), leaf.payload_hash());
1065 }
1066
1067 tracing::info!("Test completed successfully!");
1069 }
1070
1071 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1072 async fn test_fetch_block_and_leaf_concurrently() {
1073 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1075
1076 let port = pick_unused_port().unwrap();
1078 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1079 app.register_module(
1080 "availability",
1081 define_api(
1082 &Default::default(),
1083 MockBase::instance(),
1084 "1.0.0".parse().unwrap(),
1085 )
1086 .unwrap(),
1087 )
1088 .unwrap();
1089 network.spawn(
1090 "server",
1091 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1092 );
1093
1094 let db = TmpDb::init().await;
1096 let provider = Provider::new(QueryServiceProvider::new(
1097 format!("http://localhost:{port}").parse().unwrap(),
1098 MockBase::instance(),
1099 ));
1100 let data_source = data_source(&db, &provider).await;
1101
1102 network.start().await;
1104
1105 let leaves = network.data_source().subscribe_leaves(1).await;
1108 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1109 let test_leaf = &leaves[0];
1110
1111 data_source.append(leaves[1].clone().into()).await.unwrap();
1113
1114 let (leaf, block) = join(
1118 data_source
1119 .get_leaf(test_leaf.height() as usize)
1120 .await
1121 .into_future(),
1122 data_source
1123 .get_block(test_leaf.height() as usize)
1124 .await
1125 .into_future(),
1126 )
1127 .await;
1128 assert_eq!(leaf, *test_leaf);
1129 assert_eq!(leaf.header(), block.header());
1130 }
1131
1132 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1133 async fn test_fetch_different_blocks_same_payload() {
1134 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1136
1137 let port = pick_unused_port().unwrap();
1139 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1140 app.register_module(
1141 "availability",
1142 define_api(
1143 &Default::default(),
1144 MockBase::instance(),
1145 "1.0.0".parse().unwrap(),
1146 )
1147 .unwrap(),
1148 )
1149 .unwrap();
1150 network.spawn(
1151 "server",
1152 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1153 );
1154
1155 let db = TmpDb::init().await;
1157 let provider = Provider::new(QueryServiceProvider::new(
1158 format!("http://localhost:{port}").parse().unwrap(),
1159 MockBase::instance(),
1160 ));
1161 let data_source = data_source(&db, &provider).await;
1162
1163 network.start().await;
1165
1166 let leaves = network.data_source().subscribe_leaves(1).await;
1169 let leaves = leaves.take(4).collect::<Vec<_>>().await;
1170
1171 data_source
1174 .append(leaves.last().cloned().unwrap().into())
1175 .await
1176 .unwrap();
1177
1178 assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1180 let (block1, block2) = join(
1183 data_source
1184 .get_block(leaves[0].height() as usize)
1185 .await
1186 .into_future(),
1187 data_source
1188 .get_block(leaves[1].height() as usize)
1189 .await
1190 .into_future(),
1191 )
1192 .await;
1193 assert_eq!(block1.header(), leaves[0].header());
1194 assert_eq!(block2.header(), leaves[1].header());
1195 }
1196
1197 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1198 async fn test_fetch_stream() {
1199 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1201
1202 let port = pick_unused_port().unwrap();
1204 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1205 app.register_module(
1206 "availability",
1207 define_api(
1208 &Default::default(),
1209 MockBase::instance(),
1210 "1.0.0".parse().unwrap(),
1211 )
1212 .unwrap(),
1213 )
1214 .unwrap();
1215 network.spawn(
1216 "server",
1217 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1218 );
1219
1220 let db = TmpDb::init().await;
1222 let provider = Provider::new(QueryServiceProvider::new(
1223 format!("http://localhost:{port}").parse().unwrap(),
1224 MockBase::instance(),
1225 ));
1226 let data_source = data_source(&db, &provider).await;
1227
1228 network.start().await;
1230
1231 let blocks = data_source.subscribe_blocks(0).await;
1233 let leaves = data_source.subscribe_leaves(0).await;
1234 let common = data_source.subscribe_vid_common(0).await;
1235
1236 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1238 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1239
1240 data_source
1243 .append(finalized_leaves.last().cloned().unwrap().into())
1244 .await
1245 .unwrap();
1246
1247 let blocks = blocks.take(5).collect::<Vec<_>>().await;
1249 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1250 let common = common.take(5).collect::<Vec<_>>().await;
1251 for i in 0..5 {
1252 tracing::info!("checking block {i}");
1253 assert_eq!(leaves[i], finalized_leaves[i]);
1254 assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1255 assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1256 }
1257 }
1258
1259 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1260 async fn test_fetch_range_start() {
1261 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1263
1264 let port = pick_unused_port().unwrap();
1266 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1267 app.register_module(
1268 "availability",
1269 define_api(
1270 &Default::default(),
1271 MockBase::instance(),
1272 "1.0.0".parse().unwrap(),
1273 )
1274 .unwrap(),
1275 )
1276 .unwrap();
1277 network.spawn(
1278 "server",
1279 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1280 );
1281
1282 let db = TmpDb::init().await;
1284 let provider = Provider::new(QueryServiceProvider::new(
1285 format!("http://localhost:{port}").parse().unwrap(),
1286 MockBase::instance(),
1287 ));
1288 let data_source = data_source(&db, &provider).await;
1289
1290 network.start().await;
1292
1293 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1295 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1296
1297 let mut tx = data_source.write().await.unwrap();
1301 tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1302 tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1303 tx.commit().await.unwrap();
1304
1305 let leaves = data_source
1307 .get_leaf_range(..5)
1308 .await
1309 .then(Fetch::resolve)
1310 .collect::<Vec<_>>()
1311 .await;
1312 for i in 0..5 {
1313 tracing::info!("checking leaf {i}");
1314 assert_eq!(leaves[i], finalized_leaves[i]);
1315 }
1316 }
1317
1318 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1319 async fn fetch_transaction() {
1320 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1322
1323 let port = pick_unused_port().unwrap();
1325 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1326 app.register_module(
1327 "availability",
1328 define_api(
1329 &Default::default(),
1330 MockBase::instance(),
1331 "1.0.0".parse().unwrap(),
1332 )
1333 .unwrap(),
1334 )
1335 .unwrap();
1336 network.spawn(
1337 "server",
1338 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1339 );
1340
1341 let db = TmpDb::init().await;
1344 let data_source = data_source(&db, &NoFetching).await;
1345
1346 let mut leaves = network.data_source().subscribe_leaves(1).await;
1348 let mut blocks = network.data_source().subscribe_blocks(1).await;
1349
1350 network.start().await;
1352
1353 let tx = mock_transaction(vec![1, 2, 3]);
1357 let fut = data_source
1358 .get_block_containing_transaction(tx.commit())
1359 .await;
1360
1361 network.submit_transaction(tx.clone()).await;
1363
1364 let block = loop {
1367 let leaf = leaves.next().await.unwrap();
1368 let block = blocks.next().await.unwrap();
1369
1370 data_source
1371 .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1372 .await
1373 .unwrap();
1374
1375 if block.transaction_by_hash(tx.commit()).is_some() {
1376 break block;
1377 }
1378 };
1379 tracing::info!("transaction included in block {}", block.height());
1380
1381 let fetched_tx = fut.await;
1382 assert_eq!(
1383 fetched_tx,
1384 BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1385 );
1386
1387 assert_eq!(
1389 fetched_tx,
1390 data_source
1391 .get_block_containing_transaction(tx.commit())
1392 .await
1393 .await
1394 );
1395 }
1396
1397 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1398 async fn test_retry() {
1399 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1401
1402 let port = pick_unused_port().unwrap();
1404 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1405 app.register_module(
1406 "availability",
1407 define_api(
1408 &Default::default(),
1409 MockBase::instance(),
1410 "1.0.0".parse().unwrap(),
1411 )
1412 .unwrap(),
1413 )
1414 .unwrap();
1415 network.spawn(
1416 "server",
1417 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1418 );
1419
1420 let db = TmpDb::init().await;
1422 let provider = Provider::new(QueryServiceProvider::new(
1423 format!("http://localhost:{port}").parse().unwrap(),
1424 MockBase::instance(),
1425 ));
1426 let data_source = builder(&db, &provider)
1427 .await
1428 .with_max_retry_interval(Duration::from_secs(1))
1429 .build()
1430 .await
1431 .unwrap();
1432
1433 network.start().await;
1435
1436 let leaves = network.data_source().subscribe_leaves(1).await;
1439 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1440 let test_leaf = &leaves[0];
1441
1442 provider.fail();
1444
1445 data_source
1448 .append(leaves.last().cloned().unwrap().into())
1449 .await
1450 .unwrap();
1451
1452 tracing::info!("requesting leaf from failing providers");
1453 let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1454
1455 sleep(Duration::from_secs(5)).await;
1458 fut.try_resolve().unwrap_err();
1459
1460 provider.unfail();
1462 assert_eq!(
1463 data_source
1464 .get_leaf(test_leaf.height() as usize)
1465 .await
1466 .await,
1467 *test_leaf
1468 );
1469 }
1470
1471 #[allow(deprecated)]
1473 fn random_vid_commit() -> VidCommitment {
1474 let mut bytes = [0; 32];
1475 rand::thread_rng().fill_bytes(&mut bytes);
1476 VidCommitment::V0(GenericArray::from(bytes).into())
1477 }
1478
1479 async fn malicious_server(port: u16) {
1480 let mut api = load_api::<(), ServerError, MockBase>(
1481 None::<std::path::PathBuf>,
1482 include_str!("../../../api/availability.toml"),
1483 vec![],
1484 )
1485 .unwrap();
1486
1487 api.get("get_payload", move |_, _| {
1488 async move {
1489 Ok(PayloadQueryData::<MockTypes>::genesis::<TestVersions>(
1491 &Default::default(),
1492 &Default::default(),
1493 )
1494 .await)
1495 }
1496 .boxed()
1497 })
1498 .unwrap()
1499 .get("get_vid_common", move |_, _| {
1500 async move {
1501 Ok(VidCommonQueryData::<MockTypes>::genesis::<TestVersions>(
1503 &Default::default(),
1504 &Default::default(),
1505 )
1506 .await)
1507 }
1508 .boxed()
1509 })
1510 .unwrap();
1511
1512 let mut app = App::<(), ServerError>::with_state(());
1513 app.register_module("availability", api).unwrap();
1514 app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1515 .await
1516 .ok();
1517 }
1518
1519 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1520 async fn test_fetch_from_malicious_server() {
1521 let port = pick_unused_port().unwrap();
1522 let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1523
1524 let provider = QueryServiceProvider::new(
1525 format!("http://localhost:{port}").parse().unwrap(),
1526 MockBase::instance(),
1527 );
1528 provider.client.connect(None).await;
1529
1530 let res =
1533 ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1534 .await;
1535 assert_eq!(res, None);
1536
1537 let res =
1540 ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1541 .await;
1542 assert_eq!(res, None);
1543 }
1544
1545 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1546 async fn test_archive_recovery() {
1547 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1549
1550 let port = pick_unused_port().unwrap();
1552 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1553 app.register_module(
1554 "availability",
1555 define_api(
1556 &Default::default(),
1557 MockBase::instance(),
1558 "1.0.0".parse().unwrap(),
1559 )
1560 .unwrap(),
1561 )
1562 .unwrap();
1563 network.spawn(
1564 "server",
1565 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1566 );
1567
1568 let db = TmpDb::init().await;
1571 let provider = Provider::new(QueryServiceProvider::new(
1572 format!("http://localhost:{port}").parse().unwrap(),
1573 MockBase::instance(),
1574 ));
1575 let mut data_source = db
1576 .config()
1577 .pruner_cfg(
1578 PrunerCfg::new()
1579 .with_target_retention(Duration::from_secs(0))
1580 .with_interval(Duration::from_secs(5)),
1581 )
1582 .unwrap()
1583 .builder(provider.clone())
1584 .await
1585 .unwrap()
1586 .with_min_retry_interval(Duration::from_millis(100))
1590 .with_retry_randomization_factor(3.)
1594 .build()
1595 .await
1596 .unwrap();
1597
1598 network.start().await;
1600
1601 let leaves = network.data_source().subscribe_leaves(1).await;
1603 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1604
1605 let pruned_height = data_source
1607 .read()
1608 .await
1609 .unwrap()
1610 .load_pruned_height()
1611 .await
1612 .unwrap();
1613 assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1615
1616 let last_leaf = leaves.last().unwrap();
1619 data_source.append(last_leaf.clone().into()).await.unwrap();
1620
1621 for i in 1..=last_leaf.height() {
1623 tracing::info!(i, "fetching leaf");
1624 assert_eq!(
1625 data_source.get_leaf(i as usize).await.await,
1626 leaves[i as usize - 1]
1627 );
1628 }
1629
1630 loop {
1632 let pruned_height = data_source
1633 .read()
1634 .await
1635 .unwrap()
1636 .load_pruned_height()
1637 .await
1638 .unwrap();
1639 if pruned_height == Some(last_leaf.height()) {
1640 break;
1641 }
1642 tracing::info!(
1643 ?pruned_height,
1644 target_height = last_leaf.height(),
1645 "waiting for pruner to run"
1646 );
1647 sleep(Duration::from_secs(1)).await;
1648 }
1649
1650 data_source = db
1652 .config()
1653 .archive()
1654 .builder(provider.clone())
1655 .await
1656 .unwrap()
1657 .with_minor_scan_interval(Duration::from_secs(1))
1658 .with_major_scan_interval(1)
1659 .build()
1660 .await
1661 .unwrap();
1662
1663 let pruned_height = data_source
1665 .read()
1666 .await
1667 .unwrap()
1668 .load_pruned_height()
1669 .await
1670 .unwrap();
1671 assert_eq!(pruned_height, None);
1672
1673 data_source.append(last_leaf.clone().into()).await.unwrap();
1677
1678 loop {
1680 let sync_status = data_source.sync_status().await.unwrap();
1681
1682 if (SyncStatus {
1686 missing_vid_shares: 0,
1687 ..sync_status
1688 })
1689 .is_fully_synced()
1690 {
1691 break;
1692 }
1693 tracing::info!(?sync_status, "waiting for node to sync");
1694 sleep(Duration::from_secs(1)).await;
1695 }
1696
1697 sleep(Duration::from_secs(3)).await;
1699 let sync_status = data_source.sync_status().await.unwrap();
1700 assert!(
1701 (SyncStatus {
1702 missing_vid_shares: 0,
1703 ..sync_status
1704 })
1705 .is_fully_synced(),
1706 "{sync_status:?}"
1707 );
1708 }
1709
1710 #[derive(Clone, Copy, Debug)]
1711 enum FailureType {
1712 Begin,
1713 Write,
1714 Commit,
1715 }
1716
1717 async fn test_fetch_storage_failure_helper(failure: FailureType) {
1718 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1720
1721 let port = pick_unused_port().unwrap();
1723 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1724 app.register_module(
1725 "availability",
1726 define_api(
1727 &Default::default(),
1728 MockBase::instance(),
1729 "1.0.0".parse().unwrap(),
1730 )
1731 .unwrap(),
1732 )
1733 .unwrap();
1734 network.spawn(
1735 "server",
1736 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1737 );
1738
1739 let provider = Provider::new(QueryServiceProvider::new(
1741 format!("http://localhost:{port}").parse().unwrap(),
1742 MockBase::instance(),
1743 ));
1744 let db = TmpDb::init().await;
1745 let storage = FailStorage::from(
1746 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1747 .await
1748 .unwrap(),
1749 );
1750 let data_source = FetchingDataSource::builder(storage, provider)
1751 .disable_proactive_fetching()
1752 .disable_aggregator()
1753 .with_max_retry_interval(Duration::from_millis(100))
1754 .with_retry_timeout(Duration::from_secs(1))
1755 .build()
1756 .await
1757 .unwrap();
1758
1759 network.start().await;
1761
1762 let leaves = network.data_source().subscribe_leaves(1).await;
1764 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1765
1766 let last_leaf = leaves.last().unwrap();
1768 let mut tx = data_source.write().await.unwrap();
1769 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1770 tx.commit().await.unwrap();
1771
1772 tracing::info!("fetch with write failure");
1774 match failure {
1775 FailureType::Begin => {
1776 data_source
1777 .as_ref()
1778 .fail_begins_writable(FailableAction::Any)
1779 .await
1780 },
1781 FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1782 FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1783 }
1784 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1785 data_source.as_ref().pass().await;
1786
1787 sleep(Duration::from_secs(1)).await;
1792
1793 tracing::info!("fetch with write success");
1796 let fetch = data_source.get_leaf(1).await;
1797 assert!(fetch.is_pending());
1798 assert_eq!(leaves[0], fetch.await);
1799
1800 sleep(Duration::from_secs(1)).await;
1801
1802 tracing::info!("retrieve from storage");
1804 let fetch = data_source.get_leaf(1).await;
1805 assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1806 }
1807
1808 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1809 async fn test_fetch_storage_failure_on_begin() {
1810 test_fetch_storage_failure_helper(FailureType::Begin).await;
1811 }
1812
1813 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1814 async fn test_fetch_storage_failure_on_write() {
1815 test_fetch_storage_failure_helper(FailureType::Write).await;
1816 }
1817
1818 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1819 async fn test_fetch_storage_failure_on_commit() {
1820 test_fetch_storage_failure_helper(FailureType::Commit).await;
1821 }
1822
1823 async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1824 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1826
1827 let port = pick_unused_port().unwrap();
1829 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1830 app.register_module(
1831 "availability",
1832 define_api(
1833 &Default::default(),
1834 MockBase::instance(),
1835 "1.0.0".parse().unwrap(),
1836 )
1837 .unwrap(),
1838 )
1839 .unwrap();
1840 network.spawn(
1841 "server",
1842 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1843 );
1844
1845 let provider = Provider::new(QueryServiceProvider::new(
1847 format!("http://localhost:{port}").parse().unwrap(),
1848 MockBase::instance(),
1849 ));
1850 let db = TmpDb::init().await;
1851 let storage = FailStorage::from(
1852 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1853 .await
1854 .unwrap(),
1855 );
1856 let data_source = FetchingDataSource::builder(storage, provider)
1857 .disable_proactive_fetching()
1858 .disable_aggregator()
1859 .with_min_retry_interval(Duration::from_millis(100))
1860 .build()
1861 .await
1862 .unwrap();
1863
1864 network.start().await;
1866
1867 let leaves = network.data_source().subscribe_leaves(1).await;
1869 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1870
1871 let last_leaf = leaves.last().unwrap();
1873 let mut tx = data_source.write().await.unwrap();
1874 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1875 tx.commit().await.unwrap();
1876
1877 tracing::info!("fetch with write failure");
1879 match failure {
1880 FailureType::Begin => {
1881 data_source
1882 .as_ref()
1883 .fail_one_begin_writable(FailableAction::Any)
1884 .await
1885 },
1886 FailureType::Write => {
1887 data_source
1888 .as_ref()
1889 .fail_one_write(FailableAction::Any)
1890 .await
1891 },
1892 FailureType::Commit => {
1893 data_source
1894 .as_ref()
1895 .fail_one_commit(FailableAction::Any)
1896 .await
1897 },
1898 }
1899 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1900
1901 let mut tx = data_source.read().await.unwrap();
1903 assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1904 }
1905
1906 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1907 async fn test_fetch_storage_failure_retry_on_begin() {
1908 test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1909 }
1910
1911 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1912 async fn test_fetch_storage_failure_retry_on_write() {
1913 test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1914 }
1915
1916 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1917 async fn test_fetch_storage_failure_retry_on_commit() {
1918 test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1919 }
1920
1921 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1922 async fn test_fetch_on_decide() {
1923 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1925
1926 let port = pick_unused_port().unwrap();
1928 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1929 app.register_module(
1930 "availability",
1931 define_api(
1932 &Default::default(),
1933 MockBase::instance(),
1934 "1.0.0".parse().unwrap(),
1935 )
1936 .unwrap(),
1937 )
1938 .unwrap();
1939 network.spawn(
1940 "server",
1941 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1942 );
1943
1944 let db = TmpDb::init().await;
1946 let provider = Provider::new(QueryServiceProvider::new(
1947 format!("http://localhost:{port}").parse().unwrap(),
1948 MockBase::instance(),
1949 ));
1950 let data_source = builder(&db, &provider)
1951 .await
1952 .with_max_retry_interval(Duration::from_secs(1))
1953 .build()
1954 .await
1955 .unwrap();
1956
1957 network.start().await;
1959
1960 let leaf = network
1962 .data_source()
1963 .subscribe_leaves(1)
1964 .await
1965 .next()
1966 .await
1967 .unwrap();
1968
1969 data_source.append(leaf.clone().into()).await.unwrap();
1971
1972 sleep(Duration::from_secs(5)).await;
1975
1976 let mut tx = data_source.read().await.unwrap();
1980 let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1981 let block = tx.get_block(id).await.unwrap();
1982 let vid = tx.get_vid_common(id).await.unwrap();
1983
1984 assert_eq!(block.hash(), leaf.block_hash());
1985 assert_eq!(vid.block_hash(), leaf.block_hash());
1986 }
1987
1988 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1989 async fn test_fetch_begin_failure() {
1990 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1992
1993 let port = pick_unused_port().unwrap();
1995 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1996 app.register_module(
1997 "availability",
1998 define_api(
1999 &Default::default(),
2000 MockBase::instance(),
2001 "1.0.0".parse().unwrap(),
2002 )
2003 .unwrap(),
2004 )
2005 .unwrap();
2006 network.spawn(
2007 "server",
2008 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2009 );
2010
2011 let provider = Provider::new(QueryServiceProvider::new(
2013 format!("http://localhost:{port}").parse().unwrap(),
2014 MockBase::instance(),
2015 ));
2016 let db = TmpDb::init().await;
2017 let storage = FailStorage::from(
2018 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2019 .await
2020 .unwrap(),
2021 );
2022 let data_source = FetchingDataSource::builder(storage, provider)
2023 .disable_proactive_fetching()
2024 .disable_aggregator()
2025 .with_min_retry_interval(Duration::from_millis(100))
2026 .build()
2027 .await
2028 .unwrap();
2029
2030 network.start().await;
2032
2033 let leaves = network.data_source().subscribe_leaves(1).await;
2035 let leaves = leaves.take(2).collect::<Vec<_>>().await;
2036
2037 let last_leaf = leaves.last().unwrap();
2039 let mut tx = data_source.write().await.unwrap();
2040 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2041 tx.commit().await.unwrap();
2042
2043 tracing::info!("fetch with transaction failure");
2046 data_source
2047 .as_ref()
2048 .fail_one_begin_read_only(FailableAction::Any)
2049 .await;
2050 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2051 }
2052
2053 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2054 async fn test_fetch_load_failure_block() {
2055 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2057
2058 let port = pick_unused_port().unwrap();
2060 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2061 app.register_module(
2062 "availability",
2063 define_api(
2064 &Default::default(),
2065 MockBase::instance(),
2066 "1.0.0".parse().unwrap(),
2067 )
2068 .unwrap(),
2069 )
2070 .unwrap();
2071 network.spawn(
2072 "server",
2073 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2074 );
2075
2076 let provider = Provider::new(QueryServiceProvider::new(
2078 format!("http://localhost:{port}").parse().unwrap(),
2079 MockBase::instance(),
2080 ));
2081 let db = TmpDb::init().await;
2082 let storage = FailStorage::from(
2083 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2084 .await
2085 .unwrap(),
2086 );
2087 let data_source = FetchingDataSource::builder(storage, provider)
2088 .disable_proactive_fetching()
2089 .disable_aggregator()
2090 .with_min_retry_interval(Duration::from_millis(100))
2091 .build()
2092 .await
2093 .unwrap();
2094
2095 network.start().await;
2097
2098 let mut leaves = network.data_source().subscribe_leaves(1).await;
2100 let leaf = leaves.next().await.unwrap();
2101
2102 let mut tx = data_source.write().await.unwrap();
2105 tx.insert_leaf(leaf.clone()).await.unwrap();
2106 tx.commit().await.unwrap();
2107
2108 tracing::info!("fetch with read failure");
2122 data_source
2123 .as_ref()
2124 .fail_one_read(FailableAction::GetHeader)
2125 .await;
2126 let fetch = data_source.get_block(leaf.block_hash()).await;
2127
2128 sleep(Duration::from_secs(2)).await;
2130 data_source.as_ref().pass().await;
2131
2132 let block: BlockQueryData<MockTypes> = fetch.await;
2133 assert_eq!(block.hash(), leaf.block_hash());
2134 }
2135
2136 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2137 async fn test_fetch_load_failure_tx() {
2138 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2140
2141 let port = pick_unused_port().unwrap();
2143 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2144 app.register_module(
2145 "availability",
2146 define_api(
2147 &Default::default(),
2148 MockBase::instance(),
2149 "1.0.0".parse().unwrap(),
2150 )
2151 .unwrap(),
2152 )
2153 .unwrap();
2154 network.spawn(
2155 "server",
2156 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2157 );
2158
2159 let provider = Provider::new(QueryServiceProvider::new(
2161 format!("http://localhost:{port}").parse().unwrap(),
2162 MockBase::instance(),
2163 ));
2164 let db = TmpDb::init().await;
2165 let storage = FailStorage::from(
2166 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2167 .await
2168 .unwrap(),
2169 );
2170 let data_source = FetchingDataSource::builder(storage, provider)
2171 .disable_proactive_fetching()
2172 .disable_aggregator()
2173 .with_min_retry_interval(Duration::from_millis(100))
2174 .build()
2175 .await
2176 .unwrap();
2177
2178 network.start().await;
2180
2181 let tx = mock_transaction(vec![1, 2, 3]);
2183 network.submit_transaction(tx.clone()).await;
2184 let tx = network
2185 .data_source()
2186 .get_block_containing_transaction(tx.commit())
2187 .await
2188 .await;
2189
2190 {
2192 let leaf = network
2193 .data_source()
2194 .get_leaf(tx.transaction.block_height() as usize)
2195 .await
2196 .await;
2197 let block = network
2198 .data_source()
2199 .get_block(tx.transaction.block_height() as usize)
2200 .await
2201 .await;
2202 let mut tx = data_source.write().await.unwrap();
2203 tx.insert_leaf(leaf.clone()).await.unwrap();
2204 tx.insert_block(block.clone()).await.unwrap();
2205 tx.commit().await.unwrap();
2206 }
2207
2208 tracing::info!("fetch success");
2210 assert_eq!(
2211 tx,
2212 data_source
2213 .get_block_containing_transaction(tx.transaction.hash())
2214 .await
2215 .await
2216 );
2217
2218 tracing::info!("fetch with read failure");
2230 data_source
2231 .as_ref()
2232 .fail_one_read(FailableAction::Any)
2233 .await;
2234 let fetch = data_source
2235 .get_block_containing_transaction(tx.transaction.hash())
2236 .await;
2237
2238 assert_eq!(tx, fetch.await);
2239 }
2240
2241 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2242 async fn test_stream_begin_failure() {
2243 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2245
2246 let port = pick_unused_port().unwrap();
2248 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2249 app.register_module(
2250 "availability",
2251 define_api(
2252 &Default::default(),
2253 MockBase::instance(),
2254 "1.0.0".parse().unwrap(),
2255 )
2256 .unwrap(),
2257 )
2258 .unwrap();
2259 network.spawn(
2260 "server",
2261 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2262 );
2263
2264 let provider = Provider::new(QueryServiceProvider::new(
2266 format!("http://localhost:{port}").parse().unwrap(),
2267 MockBase::instance(),
2268 ));
2269 let db = TmpDb::init().await;
2270 let storage = FailStorage::from(
2271 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2272 .await
2273 .unwrap(),
2274 );
2275 let data_source = FetchingDataSource::builder(storage, provider)
2276 .disable_proactive_fetching()
2277 .disable_aggregator()
2278 .with_min_retry_interval(Duration::from_millis(100))
2279 .with_range_chunk_size(3)
2280 .build()
2281 .await
2282 .unwrap();
2283
2284 network.start().await;
2286
2287 let leaves = network.data_source().subscribe_leaves(1).await;
2289 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2290
2291 let last_leaf = leaves.last().unwrap();
2293 let mut tx = data_source.write().await.unwrap();
2294 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2295 tx.commit().await.unwrap();
2296
2297 tracing::info!("stream with transaction failure");
2300 data_source
2301 .as_ref()
2302 .fail_one_begin_read_only(FailableAction::Any)
2303 .await;
2304 assert_eq!(
2305 leaves,
2306 data_source
2307 .subscribe_leaves(1)
2308 .await
2309 .take(5)
2310 .collect::<Vec<_>>()
2311 .await
2312 );
2313 }
2314
2315 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2316 async fn test_stream_load_failure() {
2317 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2319
2320 let port = pick_unused_port().unwrap();
2322 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2323 app.register_module(
2324 "availability",
2325 define_api(
2326 &Default::default(),
2327 MockBase::instance(),
2328 "1.0.0".parse().unwrap(),
2329 )
2330 .unwrap(),
2331 )
2332 .unwrap();
2333 network.spawn(
2334 "server",
2335 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2336 );
2337
2338 let provider = Provider::new(QueryServiceProvider::new(
2340 format!("http://localhost:{port}").parse().unwrap(),
2341 MockBase::instance(),
2342 ));
2343 let db = TmpDb::init().await;
2344 let storage = FailStorage::from(
2345 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2346 .await
2347 .unwrap(),
2348 );
2349 let data_source = FetchingDataSource::builder(storage, provider)
2350 .disable_proactive_fetching()
2351 .disable_aggregator()
2352 .with_min_retry_interval(Duration::from_millis(100))
2353 .with_range_chunk_size(3)
2354 .build()
2355 .await
2356 .unwrap();
2357
2358 network.start().await;
2360
2361 let leaves = network.data_source().subscribe_leaves(1).await;
2363 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2364
2365 let last_leaf = leaves.last().unwrap();
2367 let mut tx = data_source.write().await.unwrap();
2368 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2369 tx.commit().await.unwrap();
2370
2371 tracing::info!("stream with read failure");
2373 data_source.as_ref().fail_reads(FailableAction::Any).await;
2374 let fetches = data_source
2375 .get_block_range(1..=5)
2376 .await
2377 .collect::<Vec<_>>()
2378 .await;
2379
2380 sleep(Duration::from_secs(2)).await;
2382 data_source.as_ref().pass().await;
2383
2384 for (leaf, fetch) in leaves.iter().zip(fetches) {
2385 let block: BlockQueryData<MockTypes> = fetch.await;
2386 assert_eq!(block.hash(), leaf.block_hash());
2387 }
2388 }
2389
2390 enum MetadataType {
2391 Payload,
2392 Vid,
2393 }
2394
2395 async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2396 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2398
2399 let port = pick_unused_port().unwrap();
2401 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2402 app.register_module(
2403 "availability",
2404 define_api(
2405 &Default::default(),
2406 MockBase::instance(),
2407 "1.0.0".parse().unwrap(),
2408 )
2409 .unwrap(),
2410 )
2411 .unwrap();
2412 network.spawn(
2413 "server",
2414 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2415 );
2416
2417 let provider = Provider::new(QueryServiceProvider::new(
2419 format!("http://localhost:{port}").parse().unwrap(),
2420 MockBase::instance(),
2421 ));
2422 let db = TmpDb::init().await;
2423 let storage = FailStorage::from(
2424 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2425 .await
2426 .unwrap(),
2427 );
2428 let data_source = FetchingDataSource::builder(storage, provider)
2429 .disable_proactive_fetching()
2430 .disable_aggregator()
2431 .with_min_retry_interval(Duration::from_millis(100))
2432 .with_range_chunk_size(3)
2433 .build()
2434 .await
2435 .unwrap();
2436
2437 network.start().await;
2439
2440 let leaves = network.data_source().subscribe_leaves(1).await;
2442 let leaves = leaves.take(3).collect::<Vec<_>>().await;
2443
2444 let last_leaf = leaves.last().unwrap();
2446 let mut tx = data_source.write().await.unwrap();
2447 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2448 tx.commit().await.unwrap();
2449
2450 let leaf = network.data_source().get_leaf(1).await.await;
2455 let block = network.data_source().get_block(1).await.await;
2456 let vid = network.data_source().get_vid_common(1).await.await;
2457 data_source
2458 .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2459 .await
2460 .unwrap();
2461
2462 tracing::info!("stream with transaction failure");
2464 data_source
2465 .as_ref()
2466 .fail_begins_read_only(FailableAction::Any)
2467 .await;
2468 match stream {
2469 MetadataType::Payload => {
2470 let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2471
2472 sleep(Duration::from_secs(2)).await;
2474 tracing::info!("stop failing transactions");
2475 data_source.as_ref().pass().await;
2476
2477 let payloads = payloads.collect::<Vec<_>>().await;
2478 for (leaf, payload) in leaves.iter().zip(payloads) {
2479 assert_eq!(payload.block_hash, leaf.block_hash());
2480 }
2481 },
2482 MetadataType::Vid => {
2483 let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2484
2485 sleep(Duration::from_secs(2)).await;
2487 tracing::info!("stop failing transactions");
2488 data_source.as_ref().pass().await;
2489
2490 let vids = vids.collect::<Vec<_>>().await;
2491 for (leaf, vid) in leaves.iter().zip(vids) {
2492 assert_eq!(vid.block_hash, leaf.block_hash());
2493 }
2494 },
2495 }
2496 }
2497
2498 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2499 async fn test_metadata_stream_begin_failure_payload() {
2500 test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2501 }
2502
2503 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2504 async fn test_metadata_stream_begin_failure_vid() {
2505 test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2506 }
2507
2508 async fn run_fallback_deserialization_test_helper<V: Versions>(port: u16, version: &str) {
2513 let mut network = MockNetwork::<MockDataSource, V>::init().await;
2514
2515 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2516
2517 app.register_module(
2519 "availability",
2520 define_api(
2521 &Default::default(),
2522 StaticVersion::<0, 1> {},
2523 "0.0.1".parse().unwrap(),
2524 )
2525 .unwrap(),
2526 )
2527 .unwrap();
2528
2529 app.register_module(
2530 "availability",
2531 define_api(
2532 &Default::default(),
2533 StaticVersion::<0, 1> {},
2534 "1.0.0".parse().unwrap(),
2535 )
2536 .unwrap(),
2537 )
2538 .unwrap();
2539
2540 network.spawn(
2541 "server",
2542 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2543 );
2544
2545 let db = TmpDb::init().await;
2546
2547 let provider_url = format!("http://localhost:{port}/{version}")
2548 .parse()
2549 .expect("Invalid URL");
2550
2551 let provider = Provider::new(QueryServiceProvider::new(
2552 provider_url,
2553 StaticVersion::<0, 1> {},
2554 ));
2555
2556 let ds = data_source(&db, &provider).await;
2557 network.start().await;
2558
2559 let leaves = network.data_source().subscribe_leaves(1).await;
2560 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2561 let test_leaf = &leaves[0];
2562 let test_payload = &leaves[2];
2563 let test_common = &leaves[3];
2564
2565 let mut fetches = vec![];
2566 fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2568 fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2569 fetches.push(
2570 ds.get_vid_common(test_common.block_hash())
2571 .await
2572 .map(ignore),
2573 );
2574
2575 sleep(Duration::from_secs(1)).await;
2578 for (i, fetch) in fetches.into_iter().enumerate() {
2579 tracing::info!("checking fetch {i} is unresolved");
2580 fetch.try_resolve().unwrap_err();
2581 }
2582
2583 ds.append(leaves.last().cloned().unwrap().into())
2588 .await
2589 .unwrap();
2590
2591 {
2593 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2594 let payload = ds.get_payload(test_payload.height() as usize).await;
2595 let common = ds.get_vid_common(test_common.height() as usize).await;
2596
2597 let truth = network.data_source();
2598 assert_eq!(
2599 leaf.await,
2600 truth.get_leaf(test_leaf.height() as usize).await.await
2601 );
2602 assert_eq!(
2603 payload.await,
2604 truth
2605 .get_payload(test_payload.height() as usize)
2606 .await
2607 .await
2608 );
2609 assert_eq!(
2610 common.await,
2611 truth
2612 .get_vid_common(test_common.height() as usize)
2613 .await
2614 .await
2615 );
2616 }
2617 }
2618
2619 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2620 async fn test_fallback_deserialization_for_fetch_requests_v0() {
2621 let port = pick_unused_port().unwrap();
2622
2623 run_fallback_deserialization_test_helper::<MockVersions>(port, "v0").await;
2629 }
2630
2631 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2632 async fn test_fallback_deserialization_for_fetch_requests_v1() {
2633 let port = pick_unused_port().unwrap();
2634
2635 run_fallback_deserialization_test_helper::<MockVersions>(port, "v1").await;
2639 }
2640
2641 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2642 async fn test_fallback_deserialization_for_fetch_requests_pos() {
2643 let port = pick_unused_port().unwrap();
2644
2645 run_fallback_deserialization_test_helper::<EpochsTestVersions>(port, "v1").await;
2648 }
2649 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2650 async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2651 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
2657
2658 let port = pick_unused_port().unwrap();
2659 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2660
2661 app.register_module(
2662 "availability",
2663 define_api(
2664 &Default::default(),
2665 StaticVersion::<0, 1> {},
2666 "0.0.1".parse().unwrap(),
2667 )
2668 .unwrap(),
2669 )
2670 .unwrap();
2671
2672 network.spawn(
2673 "server",
2674 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2675 );
2676
2677 let db = TmpDb::init().await;
2678 let provider = Provider::new(QueryServiceProvider::new(
2679 format!("http://localhost:{port}/v0").parse().unwrap(),
2680 StaticVersion::<0, 1> {},
2681 ));
2682 let ds = data_source(&db, &provider).await;
2683
2684 network.start().await;
2685
2686 let leaves = network.data_source().subscribe_leaves(1).await;
2687 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2688 let test_leaf = &leaves[0];
2689 let test_payload = &leaves[2];
2690 let test_common = &leaves[3];
2691
2692 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2693 let payload = ds.get_payload(test_payload.height() as usize).await;
2694 let common = ds.get_vid_common(test_common.height() as usize).await;
2695
2696 sleep(Duration::from_secs(3)).await;
2697
2698 leaf.try_resolve().unwrap_err();
2700 payload.try_resolve().unwrap_err();
2701 common.try_resolve().unwrap_err();
2702 }
2703}