1use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16 data::{ns_table, VidCommitment},
17 traits::{block_contents::BlockHeader, node_implementation::NodeType, EncodeBytes},
18 vid::{
19 advz::{advz_scheme, ADVZScheme},
20 avidm::{init_avidm_param, AvidMScheme},
21 },
22};
23use jf_vid::VidScheme;
24use surf_disco::{Client, Url};
25use vbs::{version::StaticVersionType, BinarySerializer};
26
27use super::Provider;
28use crate::{
29 availability::{
30 ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
31 PayloadQueryData, VidCommonQueryData,
32 },
33 fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
34 types::HeightIndexed,
35 Error, Header, Payload, VidCommon,
36};
37
38#[derive(Clone, Debug)]
43pub struct QueryServiceProvider<Ver: StaticVersionType> {
44 client: Client<Error, Ver>,
45}
46
47impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
48 pub fn new(url: Url, _: Ver) -> Self {
49 Self {
50 client: Client::new(url),
51 }
52 }
53}
54
55impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
56 async fn deserialize_legacy_payload<Types: NodeType>(
57 &self,
58 payload_bytes: Vec<u8>,
59 common_bytes: Vec<u8>,
60 req: PayloadRequest,
61 ) -> Option<Payload<Types>> {
62 let client_url = self.client.base_url();
63
64 let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
65 return None;
66 };
67
68 let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
69 &payload_bytes,
70 ) {
71 Ok(payload) => payload,
72 Err(err) => {
73 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
74 return None;
75 },
76 };
77
78 let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
79 &common_bytes,
80 ) {
81 Ok(common) => common,
82 Err(err) => {
83 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
84 return None;
85 },
86 };
87
88 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
89 let bytes = payload.data.encode();
90
91 let commit = advz_scheme(num_storage_nodes)
92 .commit_only(bytes)
93 .inspect_err(|err| {
94 tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
95 })
96 .ok()?;
97
98 if commit != advz_commit {
99 tracing::error!(
100 ?req,
101 expected_commit=%advz_commit,
102 actual_commit=%commit,
103 %client_url,
104 "received inconsistent legacy payload"
105 );
106 return None;
107 }
108
109 Some(payload.data)
110 }
111
112 async fn deserialize_legacy_vid_common<Types: NodeType>(
113 &self,
114 bytes: Vec<u8>,
115 req: VidCommonRequest,
116 ) -> Option<VidCommon> {
117 let client_url = self.client.base_url();
118 let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
119 return None;
120 };
121
122 match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
123 Ok(res) => {
124 if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
125 Some(VidCommon::V0(res.common))
126 } else {
127 tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
128 None
129 }
130 },
131 Err(err) => {
132 tracing::warn!(
133 %client_url,
134 ?req,
135 %err,
136 "failed to deserialize ADVZCommonQueryData"
137 );
138 None
139 },
140 }
141 }
142 async fn deserialize_legacy_leaf<Types: NodeType>(
143 &self,
144 bytes: Vec<u8>,
145 req: LeafRequest<Types>,
146 ) -> Option<LeafQueryData<Types>> {
147 let client_url = self.client.base_url();
148
149 match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
150 Ok(mut leaf) => {
151 if leaf.height() != req.height {
152 tracing::error!(
153 %client_url, ?req,
154 expected_height = req.height,
155 actual_height = leaf.height(),
156 "received leaf with the wrong height"
157 );
158 return None;
159 }
160
161 let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
162 let actual_leaf_commit: [u8; 32] = leaf.hash().into();
163 if actual_leaf_commit != expected_leaf_commit {
164 tracing::error!(
165 %client_url, ?req,
166 expected_leaf = %req.expected_leaf,
167 actual_leaf = %leaf.hash(),
168 "received leaf with the wrong hash"
169 );
170 return None;
171 }
172
173 let expected_qc_commit: [u8; 32] = req.expected_qc.into();
174 let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
175 if actual_qc_commit != expected_qc_commit {
176 tracing::error!(
177 %client_url, ?req,
178 expected_qc = %req.expected_qc,
179 actual_qc = %leaf.qc().commit(),
180 "received leaf with the wrong QC"
181 );
182 return None;
183 }
184
185 leaf.leaf.unfill_block_payload();
190
191 Some(leaf.into())
192 },
193 Err(err) => {
194 tracing::warn!(
195 %client_url, ?req, %err,
196 "failed to deserialize legacy LeafQueryData"
197 );
198 None
199 },
200 }
201 }
202}
203
204#[async_trait]
205impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
206where
207 Types: NodeType,
208{
209 async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
217 let client_url = self.client.base_url();
218 let req_hash = req.0;
219 let payload_bytes = self
223 .client
224 .get::<()>(&format!("availability/payload/hash/{}", req.0))
225 .bytes()
226 .await
227 .inspect_err(|err| {
228 tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
229 })
230 .ok()?;
231
232 let common_bytes = self
233 .client
234 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
235 .bytes()
236 .await
237 .inspect_err(|err| {
238 tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
239 })
240 .ok()?;
241
242 let payload =
243 vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
244 .inspect_err(|err| {
245 tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
246 })
247 .ok();
248
249 let common =
250 vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
251 .inspect_err(|err| {
252 tracing::info!(%err, %req_hash,
253 "error deserializing VidCommonQueryData",
254 );
255 })
256 .ok();
257
258 let (payload, common) = match (payload, common) {
259 (Some(payload), Some(common)) => (payload, common),
260 _ => {
261 tracing::info!(%req_hash, "falling back to legacy payload deserialization");
262
263 return self
265 .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
266 .await;
267 },
268 };
269
270 match common.common() {
271 VidCommon::V0(common) => {
272 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
273 let bytes = payload.data().encode();
274
275 let commit = advz_scheme(num_storage_nodes)
276 .commit_only(bytes)
277 .map(VidCommitment::V0)
278 .inspect_err(|err| {
279 tracing::error!(%err, %req_hash, %num_storage_nodes, "failed to compute VID commitment (V0)");
280 })
281 .ok()?;
282
283 if commit != req.0 {
284 tracing::error!(
285 expected = %req_hash,
286 actual = ?commit,
287 %client_url,
288 "VID commitment mismatch (V0)"
289 );
290
291 return None;
292 }
293 },
294 VidCommon::V1(common) => {
295 let bytes = payload.data().encode();
296
297 let avidm_param = init_avidm_param(common.total_weights)
298 .inspect_err(|err| {
299 tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
300 })
301 .ok()?;
302
303 let header = self
304 .client
305 .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
306 .send()
307 .await
308 .inspect_err(|err| {
309 tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
310 })
311 .ok()?;
312
313 if header.payload_commitment() != req.0 {
314 tracing::error!(
315 expected = %req_hash,
316 actual = %header.payload_commitment(),
317 %client_url,
318 "header payload commitment mismatch (V1)"
319 );
320 return None;
321 }
322
323 let metadata = header.metadata().encode();
324 let commit = AvidMScheme::commit(
325 &avidm_param,
326 &bytes,
327 ns_table::parse_ns_table(bytes.len(), &metadata),
328 )
329 .map(VidCommitment::V1)
330 .inspect_err(|err| {
331 tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
332 })
333 .ok()?;
334
335 if commit != req.0 {
337 tracing::warn!(
338 expected = %req_hash,
339 actual = %commit,
340 %client_url,
341 "commitment type mismatch for AVIDM check"
342 );
343 return None;
344 }
345 },
346 }
347
348 Some(payload.data)
349 }
350}
351
352#[async_trait]
353impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
354 for QueryServiceProvider<Ver>
355where
356 Types: NodeType,
357{
358 async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
366 let client_url = self.client.base_url();
367
368 let bytes = self
369 .client
370 .get::<()>(&format!("availability/leaf/{}", req.height))
371 .bytes()
372 .await;
373 let bytes = match bytes {
374 Ok(bytes) => bytes,
375 Err(err) => {
376 tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
377
378 return None;
379 },
380 };
381
382 match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
385 Ok(mut leaf) => {
386 if leaf.height() != req.height {
387 tracing::error!(
388 %client_url, ?req, ?leaf,
389 expected_height = req.height,
390 actual_height = leaf.height(),
391 "received leaf with the wrong height"
392 );
393 return None;
394 }
395 if leaf.hash() != req.expected_leaf {
396 tracing::error!(
397 %client_url, ?req, ?leaf,
398 expected_hash = %req.expected_leaf,
399 actual_hash = %leaf.hash(),
400 "received leaf with the wrong hash"
401 );
402 return None;
403 }
404 if leaf.qc().commit() != req.expected_qc {
405 tracing::error!(
406 %client_url, ?req, ?leaf,
407 expected_qc = %req.expected_qc,
408 actual_qc = %leaf.qc().commit(),
409 "received leaf with the wrong QC"
410 );
411 return None;
412 }
413
414 leaf.leaf.unfill_block_payload();
419
420 Some(leaf)
421 },
422 Err(err) => {
423 tracing::info!(
424 ?req, %err,
425 "failed to deserialize LeafQueryData, falling back to legacy deserialization"
426 );
427 self.deserialize_legacy_leaf(bytes, req).await
429 },
430 }
431 }
432}
433
434#[async_trait]
435impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
436where
437 Types: NodeType,
438{
439 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
447 let client_url = self.client.base_url();
448 let bytes = self
449 .client
450 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
451 .bytes()
452 .await;
453 let bytes = match bytes {
454 Ok(bytes) => bytes,
455 Err(err) => {
456 tracing::info!(
457 %client_url, ?req, %err,
458 "failed to fetch VID common bytes"
459 );
460 return None;
461 },
462 };
463
464 match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
465 Ok(res) => match req.0 {
466 VidCommitment::V0(commit) => {
467 if let VidCommon::V0(common) = res.common {
468 if ADVZScheme::is_consistent(&commit, &common).is_ok() {
469 Some(VidCommon::V0(common))
470 } else {
471 tracing::error!(
472 %client_url, ?req, ?commit, ?common,
473 "VID V0 common data is inconsistent with commitment"
474 );
475 None
476 }
477 } else {
478 tracing::warn!(?req, ?res, "Expect VID common data but found None");
479 None
480 }
481 },
482 VidCommitment::V1(_) => {
483 if let VidCommon::V1(common) = res.common {
484 Some(VidCommon::V1(common))
485 } else {
486 tracing::warn!(?req, ?res, "Expect VID common data but found None");
487 None
488 }
489 },
490 },
491 Err(err) => {
492 tracing::info!(
493 %client_url, ?req, %err,
494 "failed to deserialize as V1 VID common data, trying legacy fallback"
495 );
496 self.deserialize_legacy_vid_common::<Types>(bytes, req)
498 .await
499 },
500 }
501 }
502}
503
504#[cfg(all(test, not(target_os = "windows")))]
506mod test {
507 use std::{future::IntoFuture, time::Duration};
508
509 use committable::Committable;
510 use futures::{
511 future::{join, FutureExt},
512 stream::StreamExt,
513 };
514 use generic_array::GenericArray;
515 use hotshot_example_types::node_types::{EpochsTestVersions, TestVersions};
516 use hotshot_types::traits::node_implementation::Versions;
517 use portpicker::pick_unused_port;
518 use rand::RngCore;
519 use tide_disco::{error::ServerError, App};
520 use vbs::version::StaticVersion;
521
522 use super::*;
523 use crate::{
524 api::load_api,
525 availability::{
526 define_api, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch,
527 TransactionQueryData, UpdateAvailabilityData,
528 },
529 data_source::{
530 sql::{self, SqlDataSource},
531 storage::{
532 fail_storage::{FailStorage, FailableAction},
533 pruning::{PrunedHeightStorage, PrunerCfg},
534 sql::testing::TmpDb,
535 AvailabilityStorage, SqlStorage, UpdateAvailabilityStorage,
536 },
537 AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
538 },
539 fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
540 node::{data_source::NodeDataSource, SyncStatus},
541 task::BackgroundTask,
542 testing::{
543 consensus::{MockDataSource, MockNetwork},
544 mocks::{mock_transaction, MockBase, MockTypes, MockVersions},
545 setup_test, sleep,
546 },
547 types::HeightIndexed,
548 ApiState,
549 };
550
551 type Provider = TestProvider<QueryServiceProvider<MockBase>>;
552 type EpochProvider = TestProvider<QueryServiceProvider<<EpochsTestVersions as Versions>::Base>>;
553
554 fn ignore<T>(_: T) {}
555
556 async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
558 db: &TmpDb,
559 provider: &P,
560 ) -> sql::Builder<MockTypes, P> {
561 db.config()
562 .builder((*provider).clone())
563 .await
564 .unwrap()
565 .disable_proactive_fetching()
568 }
569
570 async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
572 db: &TmpDb,
573 provider: &P,
574 ) -> SqlDataSource<MockTypes, P> {
575 builder(db, provider).await.build().await.unwrap()
576 }
577
578 #[tokio::test(flavor = "multi_thread")]
579 async fn test_fetch_on_request() {
580 setup_test();
581
582 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
584
585 let port = pick_unused_port().unwrap();
587 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
588 app.register_module(
589 "availability",
590 define_api(
591 &Default::default(),
592 MockBase::instance(),
593 "1.0.0".parse().unwrap(),
594 )
595 .unwrap(),
596 )
597 .unwrap();
598 network.spawn(
599 "server",
600 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
601 );
602
603 let db = TmpDb::init().await;
605 let provider = Provider::new(QueryServiceProvider::new(
606 format!("http://localhost:{port}").parse().unwrap(),
607 MockBase::instance(),
608 ));
609 let data_source = data_source(&db, &provider).await;
610
611 network.start().await;
613
614 let leaves = network.data_source().subscribe_leaves(1).await;
621 let leaves = leaves.take(5).collect::<Vec<_>>().await;
622 let test_leaf = &leaves[0];
623 let test_block = &leaves[1];
624 let test_payload = &leaves[2];
625 let test_common = &leaves[3];
626
627 tracing::info!("requesting unfetchable resources");
629 let mut fetches = vec![];
630 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
632 fetches.push(
634 data_source
635 .get_leaf(test_leaf.height() as usize)
636 .await
637 .map(ignore),
638 );
639 fetches.push(
641 data_source
642 .get_block(test_block.block_hash())
643 .await
644 .map(ignore),
645 );
646 fetches.push(
647 data_source
648 .get_payload(test_payload.block_hash())
649 .await
650 .map(ignore),
651 );
652 fetches.push(
653 data_source
654 .get_vid_common(test_common.block_hash())
655 .await
656 .map(ignore),
657 );
658 fetches.push(
660 data_source
661 .get_block(test_block.height() as usize)
662 .await
663 .map(ignore),
664 );
665 fetches.push(
666 data_source
667 .get_payload(test_payload.height() as usize)
668 .await
669 .map(ignore),
670 );
671 fetches.push(
672 data_source
673 .get_vid_common(test_common.height() as usize)
674 .await
675 .map(ignore),
676 );
677 fetches.push(data_source.get_vid_common(0).await.map(ignore));
679 fetches.push(
681 data_source
682 .get_transaction(mock_transaction(vec![]).commit())
683 .await
684 .map(ignore),
685 );
686
687 sleep(Duration::from_secs(1)).await;
690 for (i, fetch) in fetches.into_iter().enumerate() {
691 tracing::info!("checking fetch {i} is unresolved");
692 fetch.try_resolve().unwrap_err();
693 }
694
695 provider.block().await;
700 data_source
701 .append(leaves.last().cloned().unwrap().into())
702 .await
703 .unwrap();
704
705 tracing::info!("requesting fetchable resources");
706 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
707 let req_block = data_source.get_block(test_block.height() as usize).await;
708 let req_payload = data_source
709 .get_payload(test_payload.height() as usize)
710 .await;
711 let req_common = data_source
712 .get_vid_common(test_common.height() as usize)
713 .await;
714
715 sleep(Duration::from_secs(1)).await;
721 req_leaf.try_resolve().unwrap_err();
722 req_block.try_resolve().unwrap_err();
723 req_payload.try_resolve().unwrap_err();
724 req_common.try_resolve().unwrap_err();
725
726 provider.unblock().await;
728 let leaf = data_source
729 .get_leaf(test_leaf.height() as usize)
730 .await
731 .await;
732 let block = data_source
733 .get_block(test_block.height() as usize)
734 .await
735 .await;
736 let payload = data_source
737 .get_payload(test_payload.height() as usize)
738 .await
739 .await;
740 let common = data_source
741 .get_vid_common(test_common.height() as usize)
742 .await
743 .await;
744 {
745 let truth = network.data_source();
747 assert_eq!(
748 leaf,
749 truth.get_leaf(test_leaf.height() as usize).await.await
750 );
751 assert_eq!(
752 block,
753 truth.get_block(test_block.height() as usize).await.await
754 );
755 assert_eq!(
756 payload,
757 truth
758 .get_payload(test_payload.height() as usize)
759 .await
760 .await
761 );
762 assert_eq!(
763 common,
764 truth
765 .get_vid_common(test_common.height() as usize)
766 .await
767 .await
768 );
769 }
770
771 provider.block().await;
776 for leaf in [test_block, test_payload] {
777 tracing::info!("fetching existing leaf {}", leaf.height());
778 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
779 assert_eq!(*leaf, fetched_leaf);
780 }
781
782 tracing::info!("fetching block by hash");
787 provider.unblock().await;
788 {
789 let block = data_source.get_block(test_leaf.block_hash()).await.await;
790 assert_eq!(block.hash(), leaf.block_hash());
791 }
792
793 tracing::info!("fetching payload by hash");
797 {
798 let leaf = leaves.last().unwrap();
799 let payload = data_source.get_payload(leaf.block_hash()).await.await;
800 assert_eq!(payload.height(), leaf.height());
801 assert_eq!(payload.block_hash(), leaf.block_hash());
802 assert_eq!(payload.hash(), leaf.payload_hash());
803 }
804 }
805
806 #[tokio::test(flavor = "multi_thread")]
807 async fn test_fetch_on_request_epoch_version() {
808 tracing::info!("Starting test_fetch_on_request_epoch_version");
811
812 setup_test();
813
814 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
816
817 let port = pick_unused_port().unwrap();
819 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
820 app.register_module(
821 "availability",
822 define_api(
823 &Default::default(),
824 <EpochsTestVersions as Versions>::Base::instance(),
825 "1.0.0".parse().unwrap(),
826 )
827 .unwrap(),
828 )
829 .unwrap();
830 network.spawn(
831 "server",
832 app.serve(
833 format!("0.0.0.0:{port}"),
834 <EpochsTestVersions as Versions>::Base::instance(),
835 ),
836 );
837
838 let db = TmpDb::init().await;
841 let provider = EpochProvider::new(QueryServiceProvider::new(
842 format!("http://localhost:{port}").parse().unwrap(),
843 <EpochsTestVersions as Versions>::Base::instance(),
844 ));
845 let data_source = data_source(&db, &provider).await;
846
847 network.start().await;
849
850 let leaves = network.data_source().subscribe_leaves(1).await;
857 let leaves = leaves.take(5).collect::<Vec<_>>().await;
858 let test_leaf = &leaves[0];
859 let test_block = &leaves[1];
860 let test_payload = &leaves[2];
861 let test_common = &leaves[3];
862
863 let mut fetches = vec![];
865 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
867 fetches.push(
869 data_source
870 .get_leaf(test_leaf.height() as usize)
871 .await
872 .map(ignore),
873 );
874 fetches.push(
876 data_source
877 .get_block(test_block.block_hash())
878 .await
879 .map(ignore),
880 );
881 fetches.push(
882 data_source
883 .get_payload(test_payload.block_hash())
884 .await
885 .map(ignore),
886 );
887 fetches.push(
888 data_source
889 .get_vid_common(test_common.block_hash())
890 .await
891 .map(ignore),
892 );
893 fetches.push(
895 data_source
896 .get_block(test_block.height() as usize)
897 .await
898 .map(ignore),
899 );
900 fetches.push(
901 data_source
902 .get_payload(test_payload.height() as usize)
903 .await
904 .map(ignore),
905 );
906 fetches.push(
907 data_source
908 .get_vid_common(test_common.height() as usize)
909 .await
910 .map(ignore),
911 );
912 fetches.push(data_source.get_vid_common(0).await.map(ignore));
914 fetches.push(
916 data_source
917 .get_transaction(mock_transaction(vec![]).commit())
918 .await
919 .map(ignore),
920 );
921
922 sleep(Duration::from_secs(1)).await;
925 for (i, fetch) in fetches.into_iter().enumerate() {
926 tracing::info!("checking fetch {i} is unresolved");
927 fetch.try_resolve().unwrap_err();
928 }
929
930 provider.block().await;
935 data_source
936 .append(leaves.last().cloned().unwrap().into())
937 .await
938 .unwrap();
939
940 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
941 let req_block = data_source.get_block(test_block.height() as usize).await;
942 let req_payload = data_source
943 .get_payload(test_payload.height() as usize)
944 .await;
945 let req_common = data_source
946 .get_vid_common(test_common.height() as usize)
947 .await;
948
949 sleep(Duration::from_secs(1)).await;
955 req_leaf.try_resolve().unwrap_err();
956 req_block.try_resolve().unwrap_err();
957 req_payload.try_resolve().unwrap_err();
958 req_common.try_resolve().unwrap_err();
959
960 provider.unblock().await;
962 let leaf = data_source
963 .get_leaf(test_leaf.height() as usize)
964 .await
965 .await;
966 let block = data_source
967 .get_block(test_block.height() as usize)
968 .await
969 .await;
970 let payload = data_source
971 .get_payload(test_payload.height() as usize)
972 .await
973 .await;
974 let common = data_source
975 .get_vid_common(test_common.height() as usize)
976 .await
977 .await;
978 {
979 let truth = network.data_source();
981 assert_eq!(
982 leaf,
983 truth.get_leaf(test_leaf.height() as usize).await.await
984 );
985 assert_eq!(
986 block,
987 truth.get_block(test_block.height() as usize).await.await
988 );
989 assert_eq!(
990 payload,
991 truth
992 .get_payload(test_payload.height() as usize)
993 .await
994 .await
995 );
996 assert_eq!(
997 common,
998 truth
999 .get_vid_common(test_common.height() as usize)
1000 .await
1001 .await
1002 );
1003 }
1004
1005 provider.block().await;
1010 for leaf in [test_block, test_payload] {
1011 tracing::info!("fetching existing leaf {}", leaf.height());
1012 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1013 assert_eq!(*leaf, fetched_leaf);
1014 }
1015
1016 provider.unblock().await;
1021 {
1022 let block = data_source.get_block(test_leaf.block_hash()).await.await;
1023 assert_eq!(block.hash(), leaf.block_hash());
1024 }
1025
1026 {
1030 let leaf = leaves.last().unwrap();
1031 let payload = data_source.get_payload(leaf.block_hash()).await.await;
1032 assert_eq!(payload.height(), leaf.height());
1033 assert_eq!(payload.block_hash(), leaf.block_hash());
1034 assert_eq!(payload.hash(), leaf.payload_hash());
1035 }
1036
1037 tracing::info!("Test completed successfully!");
1039 }
1040
1041 #[tokio::test(flavor = "multi_thread")]
1042 async fn test_fetch_block_and_leaf_concurrently() {
1043 setup_test();
1044
1045 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1047
1048 let port = pick_unused_port().unwrap();
1050 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1051 app.register_module(
1052 "availability",
1053 define_api(
1054 &Default::default(),
1055 MockBase::instance(),
1056 "1.0.0".parse().unwrap(),
1057 )
1058 .unwrap(),
1059 )
1060 .unwrap();
1061 network.spawn(
1062 "server",
1063 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1064 );
1065
1066 let db = TmpDb::init().await;
1068 let provider = Provider::new(QueryServiceProvider::new(
1069 format!("http://localhost:{port}").parse().unwrap(),
1070 MockBase::instance(),
1071 ));
1072 let data_source = data_source(&db, &provider).await;
1073
1074 network.start().await;
1076
1077 let leaves = network.data_source().subscribe_leaves(1).await;
1080 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1081 let test_leaf = &leaves[0];
1082
1083 data_source.append(leaves[1].clone().into()).await.unwrap();
1085
1086 let (leaf, block) = join(
1090 data_source
1091 .get_leaf(test_leaf.height() as usize)
1092 .await
1093 .into_future(),
1094 data_source
1095 .get_block(test_leaf.height() as usize)
1096 .await
1097 .into_future(),
1098 )
1099 .await;
1100 assert_eq!(leaf, *test_leaf);
1101 assert_eq!(leaf.header(), block.header());
1102 }
1103
1104 #[tokio::test(flavor = "multi_thread")]
1105 async fn test_fetch_different_blocks_same_payload() {
1106 setup_test();
1107
1108 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1110
1111 let port = pick_unused_port().unwrap();
1113 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1114 app.register_module(
1115 "availability",
1116 define_api(
1117 &Default::default(),
1118 MockBase::instance(),
1119 "1.0.0".parse().unwrap(),
1120 )
1121 .unwrap(),
1122 )
1123 .unwrap();
1124 network.spawn(
1125 "server",
1126 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1127 );
1128
1129 let db = TmpDb::init().await;
1131 let provider = Provider::new(QueryServiceProvider::new(
1132 format!("http://localhost:{port}").parse().unwrap(),
1133 MockBase::instance(),
1134 ));
1135 let data_source = data_source(&db, &provider).await;
1136
1137 network.start().await;
1139
1140 let leaves = network.data_source().subscribe_leaves(1).await;
1143 let leaves = leaves.take(4).collect::<Vec<_>>().await;
1144
1145 data_source
1148 .append(leaves.last().cloned().unwrap().into())
1149 .await
1150 .unwrap();
1151
1152 assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1154 let (block1, block2) = join(
1157 data_source
1158 .get_block(leaves[0].height() as usize)
1159 .await
1160 .into_future(),
1161 data_source
1162 .get_block(leaves[1].height() as usize)
1163 .await
1164 .into_future(),
1165 )
1166 .await;
1167 assert_eq!(block1.header(), leaves[0].header());
1168 assert_eq!(block2.header(), leaves[1].header());
1169 }
1170
1171 #[tokio::test(flavor = "multi_thread")]
1172 async fn test_fetch_stream() {
1173 setup_test();
1174
1175 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1177
1178 let port = pick_unused_port().unwrap();
1180 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1181 app.register_module(
1182 "availability",
1183 define_api(
1184 &Default::default(),
1185 MockBase::instance(),
1186 "1.0.0".parse().unwrap(),
1187 )
1188 .unwrap(),
1189 )
1190 .unwrap();
1191 network.spawn(
1192 "server",
1193 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1194 );
1195
1196 let db = TmpDb::init().await;
1198 let provider = Provider::new(QueryServiceProvider::new(
1199 format!("http://localhost:{port}").parse().unwrap(),
1200 MockBase::instance(),
1201 ));
1202 let data_source = data_source(&db, &provider).await;
1203
1204 network.start().await;
1206
1207 let blocks = data_source.subscribe_blocks(0).await;
1209 let leaves = data_source.subscribe_leaves(0).await;
1210 let common = data_source.subscribe_vid_common(0).await;
1211
1212 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1214 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1215
1216 data_source
1219 .append(finalized_leaves.last().cloned().unwrap().into())
1220 .await
1221 .unwrap();
1222
1223 let blocks = blocks.take(5).collect::<Vec<_>>().await;
1225 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1226 let common = common.take(5).collect::<Vec<_>>().await;
1227 for i in 0..5 {
1228 tracing::info!("checking block {i}");
1229 assert_eq!(leaves[i], finalized_leaves[i]);
1230 assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1231 assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1232 }
1233 }
1234
1235 #[tokio::test(flavor = "multi_thread")]
1236 async fn test_fetch_range_start() {
1237 setup_test();
1238
1239 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1241
1242 let port = pick_unused_port().unwrap();
1244 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1245 app.register_module(
1246 "availability",
1247 define_api(
1248 &Default::default(),
1249 MockBase::instance(),
1250 "1.0.0".parse().unwrap(),
1251 )
1252 .unwrap(),
1253 )
1254 .unwrap();
1255 network.spawn(
1256 "server",
1257 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1258 );
1259
1260 let db = TmpDb::init().await;
1262 let provider = Provider::new(QueryServiceProvider::new(
1263 format!("http://localhost:{port}").parse().unwrap(),
1264 MockBase::instance(),
1265 ));
1266 let data_source = data_source(&db, &provider).await;
1267
1268 network.start().await;
1270
1271 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1273 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1274
1275 let mut tx = data_source.write().await.unwrap();
1279 tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1280 tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1281 tx.commit().await.unwrap();
1282
1283 let leaves = data_source
1285 .get_leaf_range(..5)
1286 .await
1287 .then(Fetch::resolve)
1288 .collect::<Vec<_>>()
1289 .await;
1290 for i in 0..5 {
1291 tracing::info!("checking leaf {i}");
1292 assert_eq!(leaves[i], finalized_leaves[i]);
1293 }
1294 }
1295
1296 #[tokio::test(flavor = "multi_thread")]
1297 async fn fetch_transaction() {
1298 setup_test();
1299
1300 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1302
1303 let port = pick_unused_port().unwrap();
1305 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1306 app.register_module(
1307 "availability",
1308 define_api(
1309 &Default::default(),
1310 MockBase::instance(),
1311 "1.0.0".parse().unwrap(),
1312 )
1313 .unwrap(),
1314 )
1315 .unwrap();
1316 network.spawn(
1317 "server",
1318 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1319 );
1320
1321 let db = TmpDb::init().await;
1324 let data_source = data_source(&db, &NoFetching).await;
1325
1326 let mut leaves = network.data_source().subscribe_leaves(1).await;
1328 let mut blocks = network.data_source().subscribe_blocks(1).await;
1329
1330 network.start().await;
1332
1333 let tx = mock_transaction(vec![1, 2, 3]);
1337 let fut = data_source.get_transaction(tx.commit()).await;
1338
1339 network.submit_transaction(tx.clone()).await;
1341
1342 let block = loop {
1345 let leaf = leaves.next().await.unwrap();
1346 let block = blocks.next().await.unwrap();
1347
1348 data_source
1349 .append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1350 .await
1351 .unwrap();
1352
1353 if block.transaction_by_hash(tx.commit()).is_some() {
1354 break block;
1355 }
1356 };
1357 tracing::info!("transaction included in block {}", block.height());
1358
1359 let fetched_tx = fut.await;
1360 assert_eq!(
1361 fetched_tx,
1362 TransactionQueryData::with_hash(&block, tx.commit()).unwrap()
1363 );
1364
1365 assert_eq!(
1367 fetched_tx,
1368 data_source.get_transaction(tx.commit()).await.await
1369 );
1370 }
1371
1372 #[tokio::test(flavor = "multi_thread")]
1373 async fn test_retry() {
1374 setup_test();
1375
1376 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1378
1379 let port = pick_unused_port().unwrap();
1381 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1382 app.register_module(
1383 "availability",
1384 define_api(
1385 &Default::default(),
1386 MockBase::instance(),
1387 "1.0.0".parse().unwrap(),
1388 )
1389 .unwrap(),
1390 )
1391 .unwrap();
1392 network.spawn(
1393 "server",
1394 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1395 );
1396
1397 let db = TmpDb::init().await;
1399 let provider = Provider::new(QueryServiceProvider::new(
1400 format!("http://localhost:{port}").parse().unwrap(),
1401 MockBase::instance(),
1402 ));
1403 let data_source = builder(&db, &provider)
1404 .await
1405 .with_max_retry_interval(Duration::from_secs(1))
1406 .build()
1407 .await
1408 .unwrap();
1409
1410 network.start().await;
1412
1413 let leaves = network.data_source().subscribe_leaves(1).await;
1416 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1417 let test_leaf = &leaves[0];
1418
1419 provider.fail();
1421
1422 data_source
1425 .append(leaves.last().cloned().unwrap().into())
1426 .await
1427 .unwrap();
1428
1429 tracing::info!("requesting leaf from failing providers");
1430 let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1431
1432 sleep(Duration::from_secs(5)).await;
1435 fut.try_resolve().unwrap_err();
1436
1437 provider.unfail();
1439 assert_eq!(
1440 data_source
1441 .get_leaf(test_leaf.height() as usize)
1442 .await
1443 .await,
1444 *test_leaf
1445 );
1446 }
1447
1448 fn random_vid_commit() -> VidCommitment {
1449 let mut bytes = [0; 32];
1450 rand::thread_rng().fill_bytes(&mut bytes);
1451 VidCommitment::V0(GenericArray::from(bytes).into())
1452 }
1453
1454 async fn malicious_server(port: u16) {
1455 let mut api = load_api::<(), ServerError, MockBase>(
1456 None::<std::path::PathBuf>,
1457 include_str!("../../../api/availability.toml"),
1458 vec![],
1459 )
1460 .unwrap();
1461
1462 api.get("get_payload", move |_, _| {
1463 async move {
1464 Ok(PayloadQueryData::<MockTypes>::genesis::<TestVersions>(
1466 &Default::default(),
1467 &Default::default(),
1468 )
1469 .await)
1470 }
1471 .boxed()
1472 })
1473 .unwrap()
1474 .get("get_vid_common", move |_, _| {
1475 async move {
1476 Ok(VidCommonQueryData::<MockTypes>::genesis::<TestVersions>(
1478 &Default::default(),
1479 &Default::default(),
1480 )
1481 .await)
1482 }
1483 .boxed()
1484 })
1485 .unwrap();
1486
1487 let mut app = App::<(), ServerError>::with_state(());
1488 app.register_module("availability", api).unwrap();
1489 app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1490 .await
1491 .ok();
1492 }
1493
1494 #[tokio::test(flavor = "multi_thread")]
1495 async fn test_fetch_from_malicious_server() {
1496 setup_test();
1497
1498 let port = pick_unused_port().unwrap();
1499 let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1500
1501 let provider = QueryServiceProvider::new(
1502 format!("http://localhost:{port}").parse().unwrap(),
1503 MockBase::instance(),
1504 );
1505 provider.client.connect(None).await;
1506
1507 let res =
1510 ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1511 .await;
1512 assert_eq!(res, None);
1513
1514 let res =
1517 ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1518 .await;
1519 assert_eq!(res, None);
1520 }
1521
1522 #[tokio::test(flavor = "multi_thread")]
1523 async fn test_archive_recovery() {
1524 setup_test();
1525
1526 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1528
1529 let port = pick_unused_port().unwrap();
1531 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1532 app.register_module(
1533 "availability",
1534 define_api(
1535 &Default::default(),
1536 MockBase::instance(),
1537 "1.0.0".parse().unwrap(),
1538 )
1539 .unwrap(),
1540 )
1541 .unwrap();
1542 network.spawn(
1543 "server",
1544 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1545 );
1546
1547 let db = TmpDb::init().await;
1550 let provider = Provider::new(QueryServiceProvider::new(
1551 format!("http://localhost:{port}").parse().unwrap(),
1552 MockBase::instance(),
1553 ));
1554 let mut data_source = db
1555 .config()
1556 .pruner_cfg(
1557 PrunerCfg::new()
1558 .with_target_retention(Duration::from_secs(0))
1559 .with_interval(Duration::from_secs(5)),
1560 )
1561 .unwrap()
1562 .builder(provider.clone())
1563 .await
1564 .unwrap()
1565 .with_min_retry_interval(Duration::from_millis(100))
1569 .with_retry_randomization_factor(3.)
1573 .build()
1574 .await
1575 .unwrap();
1576
1577 network.start().await;
1579
1580 let leaves = network.data_source().subscribe_leaves(1).await;
1582 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1583
1584 let pruned_height = data_source
1586 .read()
1587 .await
1588 .unwrap()
1589 .load_pruned_height()
1590 .await
1591 .unwrap();
1592 assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1594
1595 let last_leaf = leaves.last().unwrap();
1598 data_source.append(last_leaf.clone().into()).await.unwrap();
1599
1600 for i in 1..=last_leaf.height() {
1602 tracing::info!(i, "fetching leaf");
1603 assert_eq!(
1604 data_source.get_leaf(i as usize).await.await,
1605 leaves[i as usize - 1]
1606 );
1607 }
1608
1609 loop {
1611 let pruned_height = data_source
1612 .read()
1613 .await
1614 .unwrap()
1615 .load_pruned_height()
1616 .await
1617 .unwrap();
1618 if pruned_height == Some(last_leaf.height()) {
1619 break;
1620 }
1621 tracing::info!(
1622 ?pruned_height,
1623 target_height = last_leaf.height(),
1624 "waiting for pruner to run"
1625 );
1626 sleep(Duration::from_secs(1)).await;
1627 }
1628
1629 data_source = db
1631 .config()
1632 .archive()
1633 .builder(provider.clone())
1634 .await
1635 .unwrap()
1636 .with_minor_scan_interval(Duration::from_secs(1))
1637 .with_major_scan_interval(1)
1638 .build()
1639 .await
1640 .unwrap();
1641
1642 let pruned_height = data_source
1644 .read()
1645 .await
1646 .unwrap()
1647 .load_pruned_height()
1648 .await
1649 .unwrap();
1650 assert_eq!(pruned_height, None);
1651
1652 data_source.append(last_leaf.clone().into()).await.unwrap();
1656
1657 loop {
1659 let sync_status = data_source.sync_status().await.unwrap();
1660
1661 if (SyncStatus {
1665 missing_vid_shares: 0,
1666 ..sync_status
1667 })
1668 .is_fully_synced()
1669 {
1670 break;
1671 }
1672 tracing::info!(?sync_status, "waiting for node to sync");
1673 sleep(Duration::from_secs(1)).await;
1674 }
1675
1676 sleep(Duration::from_secs(3)).await;
1678 let sync_status = data_source.sync_status().await.unwrap();
1679 assert!(
1680 (SyncStatus {
1681 missing_vid_shares: 0,
1682 ..sync_status
1683 })
1684 .is_fully_synced(),
1685 "{sync_status:?}"
1686 );
1687 }
1688
1689 #[derive(Clone, Copy, Debug)]
1690 enum FailureType {
1691 Begin,
1692 Write,
1693 Commit,
1694 }
1695
1696 async fn test_fetch_storage_failure_helper(failure: FailureType) {
1697 setup_test();
1698
1699 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1701
1702 let port = pick_unused_port().unwrap();
1704 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1705 app.register_module(
1706 "availability",
1707 define_api(
1708 &Default::default(),
1709 MockBase::instance(),
1710 "1.0.0".parse().unwrap(),
1711 )
1712 .unwrap(),
1713 )
1714 .unwrap();
1715 network.spawn(
1716 "server",
1717 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1718 );
1719
1720 let provider = Provider::new(QueryServiceProvider::new(
1722 format!("http://localhost:{port}").parse().unwrap(),
1723 MockBase::instance(),
1724 ));
1725 let db = TmpDb::init().await;
1726 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1727 let data_source = FetchingDataSource::builder(storage, provider)
1728 .disable_proactive_fetching()
1729 .disable_aggregator()
1730 .with_max_retry_interval(Duration::from_millis(100))
1731 .with_retry_timeout(Duration::from_secs(1))
1732 .build()
1733 .await
1734 .unwrap();
1735
1736 network.start().await;
1738
1739 let leaves = network.data_source().subscribe_leaves(1).await;
1741 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1742
1743 let last_leaf = leaves.last().unwrap();
1745 let mut tx = data_source.write().await.unwrap();
1746 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1747 tx.commit().await.unwrap();
1748
1749 tracing::info!("fetch with write failure");
1751 match failure {
1752 FailureType::Begin => {
1753 data_source
1754 .as_ref()
1755 .fail_begins_writable(FailableAction::Any)
1756 .await
1757 },
1758 FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1759 FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1760 }
1761 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1762 data_source.as_ref().pass().await;
1763
1764 sleep(Duration::from_secs(1)).await;
1769
1770 tracing::info!("fetch with write success");
1773 let fetch = data_source.get_leaf(1).await;
1774 assert!(fetch.is_pending());
1775 assert_eq!(leaves[0], fetch.await);
1776
1777 sleep(Duration::from_secs(1)).await;
1778
1779 tracing::info!("retrieve from storage");
1781 let fetch = data_source.get_leaf(1).await;
1782 assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1783 }
1784
1785 #[tokio::test(flavor = "multi_thread")]
1786 async fn test_fetch_storage_failure_on_begin() {
1787 test_fetch_storage_failure_helper(FailureType::Begin).await;
1788 }
1789
1790 #[tokio::test(flavor = "multi_thread")]
1791 async fn test_fetch_storage_failure_on_write() {
1792 test_fetch_storage_failure_helper(FailureType::Write).await;
1793 }
1794
1795 #[tokio::test(flavor = "multi_thread")]
1796 async fn test_fetch_storage_failure_on_commit() {
1797 test_fetch_storage_failure_helper(FailureType::Commit).await;
1798 }
1799
1800 async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1801 setup_test();
1802
1803 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1805
1806 let port = pick_unused_port().unwrap();
1808 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1809 app.register_module(
1810 "availability",
1811 define_api(
1812 &Default::default(),
1813 MockBase::instance(),
1814 "1.0.0".parse().unwrap(),
1815 )
1816 .unwrap(),
1817 )
1818 .unwrap();
1819 network.spawn(
1820 "server",
1821 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1822 );
1823
1824 let provider = Provider::new(QueryServiceProvider::new(
1826 format!("http://localhost:{port}").parse().unwrap(),
1827 MockBase::instance(),
1828 ));
1829 let db = TmpDb::init().await;
1830 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1831 let data_source = FetchingDataSource::builder(storage, provider)
1832 .disable_proactive_fetching()
1833 .disable_aggregator()
1834 .with_min_retry_interval(Duration::from_millis(100))
1835 .build()
1836 .await
1837 .unwrap();
1838
1839 network.start().await;
1841
1842 let leaves = network.data_source().subscribe_leaves(1).await;
1844 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1845
1846 let last_leaf = leaves.last().unwrap();
1848 let mut tx = data_source.write().await.unwrap();
1849 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1850 tx.commit().await.unwrap();
1851
1852 tracing::info!("fetch with write failure");
1854 match failure {
1855 FailureType::Begin => {
1856 data_source
1857 .as_ref()
1858 .fail_one_begin_writable(FailableAction::Any)
1859 .await
1860 },
1861 FailureType::Write => {
1862 data_source
1863 .as_ref()
1864 .fail_one_write(FailableAction::Any)
1865 .await
1866 },
1867 FailureType::Commit => {
1868 data_source
1869 .as_ref()
1870 .fail_one_commit(FailableAction::Any)
1871 .await
1872 },
1873 }
1874 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1875
1876 let mut tx = data_source.read().await.unwrap();
1878 assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1879 }
1880
1881 #[tokio::test(flavor = "multi_thread")]
1882 async fn test_fetch_storage_failure_retry_on_begin() {
1883 test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1884 }
1885
1886 #[tokio::test(flavor = "multi_thread")]
1887 async fn test_fetch_storage_failure_retry_on_write() {
1888 test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1889 }
1890
1891 #[tokio::test(flavor = "multi_thread")]
1892 async fn test_fetch_storage_failure_retry_on_commit() {
1893 test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1894 }
1895
1896 #[tokio::test(flavor = "multi_thread")]
1897 async fn test_fetch_on_decide() {
1898 setup_test();
1899
1900 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1902
1903 let port = pick_unused_port().unwrap();
1905 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1906 app.register_module(
1907 "availability",
1908 define_api(
1909 &Default::default(),
1910 MockBase::instance(),
1911 "1.0.0".parse().unwrap(),
1912 )
1913 .unwrap(),
1914 )
1915 .unwrap();
1916 network.spawn(
1917 "server",
1918 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1919 );
1920
1921 let db = TmpDb::init().await;
1923 let provider = Provider::new(QueryServiceProvider::new(
1924 format!("http://localhost:{port}").parse().unwrap(),
1925 MockBase::instance(),
1926 ));
1927 let data_source = builder(&db, &provider)
1928 .await
1929 .with_max_retry_interval(Duration::from_secs(1))
1930 .build()
1931 .await
1932 .unwrap();
1933
1934 network.start().await;
1936
1937 let leaf = network
1939 .data_source()
1940 .subscribe_leaves(1)
1941 .await
1942 .next()
1943 .await
1944 .unwrap();
1945
1946 data_source.append(leaf.clone().into()).await.unwrap();
1948
1949 sleep(Duration::from_secs(5)).await;
1952
1953 let mut tx = data_source.read().await.unwrap();
1957 let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1958 let block = tx.get_block(id).await.unwrap();
1959 let vid = tx.get_vid_common(id).await.unwrap();
1960
1961 assert_eq!(block.hash(), leaf.block_hash());
1962 assert_eq!(vid.block_hash(), leaf.block_hash());
1963 }
1964
1965 #[tokio::test(flavor = "multi_thread")]
1966 async fn test_fetch_begin_failure() {
1967 setup_test();
1968
1969 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1971
1972 let port = pick_unused_port().unwrap();
1974 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1975 app.register_module(
1976 "availability",
1977 define_api(
1978 &Default::default(),
1979 MockBase::instance(),
1980 "1.0.0".parse().unwrap(),
1981 )
1982 .unwrap(),
1983 )
1984 .unwrap();
1985 network.spawn(
1986 "server",
1987 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1988 );
1989
1990 let provider = Provider::new(QueryServiceProvider::new(
1992 format!("http://localhost:{port}").parse().unwrap(),
1993 MockBase::instance(),
1994 ));
1995 let db = TmpDb::init().await;
1996 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1997 let data_source = FetchingDataSource::builder(storage, provider)
1998 .disable_proactive_fetching()
1999 .disable_aggregator()
2000 .with_min_retry_interval(Duration::from_millis(100))
2001 .build()
2002 .await
2003 .unwrap();
2004
2005 network.start().await;
2007
2008 let leaves = network.data_source().subscribe_leaves(1).await;
2010 let leaves = leaves.take(2).collect::<Vec<_>>().await;
2011
2012 let last_leaf = leaves.last().unwrap();
2014 let mut tx = data_source.write().await.unwrap();
2015 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2016 tx.commit().await.unwrap();
2017
2018 tracing::info!("fetch with transaction failure");
2021 data_source
2022 .as_ref()
2023 .fail_one_begin_read_only(FailableAction::Any)
2024 .await;
2025 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2026 }
2027
2028 #[tokio::test(flavor = "multi_thread")]
2029 async fn test_fetch_load_failure_block() {
2030 setup_test();
2031
2032 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2034
2035 let port = pick_unused_port().unwrap();
2037 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2038 app.register_module(
2039 "availability",
2040 define_api(
2041 &Default::default(),
2042 MockBase::instance(),
2043 "1.0.0".parse().unwrap(),
2044 )
2045 .unwrap(),
2046 )
2047 .unwrap();
2048 network.spawn(
2049 "server",
2050 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2051 );
2052
2053 let provider = Provider::new(QueryServiceProvider::new(
2055 format!("http://localhost:{port}").parse().unwrap(),
2056 MockBase::instance(),
2057 ));
2058 let db = TmpDb::init().await;
2059 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2060 let data_source = FetchingDataSource::builder(storage, provider)
2061 .disable_proactive_fetching()
2062 .disable_aggregator()
2063 .with_min_retry_interval(Duration::from_millis(100))
2064 .build()
2065 .await
2066 .unwrap();
2067
2068 network.start().await;
2070
2071 let mut leaves = network.data_source().subscribe_leaves(1).await;
2073 let leaf = leaves.next().await.unwrap();
2074
2075 let mut tx = data_source.write().await.unwrap();
2078 tx.insert_leaf(leaf.clone()).await.unwrap();
2079 tx.commit().await.unwrap();
2080
2081 tracing::info!("fetch with read failure");
2095 data_source
2096 .as_ref()
2097 .fail_one_read(FailableAction::GetHeader)
2098 .await;
2099 let fetch = data_source.get_block(leaf.block_hash()).await;
2100
2101 sleep(Duration::from_secs(2)).await;
2103 data_source.as_ref().pass().await;
2104
2105 let block: BlockQueryData<MockTypes> = fetch.await;
2106 assert_eq!(block.hash(), leaf.block_hash());
2107 }
2108
2109 #[tokio::test(flavor = "multi_thread")]
2110 async fn test_fetch_load_failure_tx() {
2111 setup_test();
2112
2113 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2115
2116 let port = pick_unused_port().unwrap();
2118 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2119 app.register_module(
2120 "availability",
2121 define_api(
2122 &Default::default(),
2123 MockBase::instance(),
2124 "1.0.0".parse().unwrap(),
2125 )
2126 .unwrap(),
2127 )
2128 .unwrap();
2129 network.spawn(
2130 "server",
2131 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2132 );
2133
2134 let provider = Provider::new(QueryServiceProvider::new(
2136 format!("http://localhost:{port}").parse().unwrap(),
2137 MockBase::instance(),
2138 ));
2139 let db = TmpDb::init().await;
2140 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2141 let data_source = FetchingDataSource::builder(storage, provider)
2142 .disable_proactive_fetching()
2143 .disable_aggregator()
2144 .with_min_retry_interval(Duration::from_millis(100))
2145 .build()
2146 .await
2147 .unwrap();
2148
2149 network.start().await;
2151
2152 let tx = mock_transaction(vec![1, 2, 3]);
2154 network.submit_transaction(tx.clone()).await;
2155 let tx = network
2156 .data_source()
2157 .get_transaction(tx.commit())
2158 .await
2159 .await;
2160
2161 {
2163 let leaf = network
2164 .data_source()
2165 .get_leaf(tx.block_height() as usize)
2166 .await
2167 .await;
2168 let block = network
2169 .data_source()
2170 .get_block(tx.block_height() as usize)
2171 .await
2172 .await;
2173 let mut tx = data_source.write().await.unwrap();
2174 tx.insert_leaf(leaf.clone()).await.unwrap();
2175 tx.insert_block(block.clone()).await.unwrap();
2176 tx.commit().await.unwrap();
2177 }
2178
2179 tracing::info!("fetch success");
2181 assert_eq!(tx, data_source.get_transaction(tx.hash()).await.await);
2182
2183 tracing::info!("fetch with read failure");
2195 data_source
2196 .as_ref()
2197 .fail_one_read(FailableAction::Any)
2198 .await;
2199 let fetch = data_source.get_transaction(tx.hash()).await;
2200
2201 assert_eq!(tx, fetch.await);
2202 }
2203
2204 #[tokio::test(flavor = "multi_thread")]
2205 async fn test_stream_begin_failure() {
2206 setup_test();
2207
2208 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2210
2211 let port = pick_unused_port().unwrap();
2213 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2214 app.register_module(
2215 "availability",
2216 define_api(
2217 &Default::default(),
2218 MockBase::instance(),
2219 "1.0.0".parse().unwrap(),
2220 )
2221 .unwrap(),
2222 )
2223 .unwrap();
2224 network.spawn(
2225 "server",
2226 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2227 );
2228
2229 let provider = Provider::new(QueryServiceProvider::new(
2231 format!("http://localhost:{port}").parse().unwrap(),
2232 MockBase::instance(),
2233 ));
2234 let db = TmpDb::init().await;
2235 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2236 let data_source = FetchingDataSource::builder(storage, provider)
2237 .disable_proactive_fetching()
2238 .disable_aggregator()
2239 .with_min_retry_interval(Duration::from_millis(100))
2240 .with_range_chunk_size(3)
2241 .build()
2242 .await
2243 .unwrap();
2244
2245 network.start().await;
2247
2248 let leaves = network.data_source().subscribe_leaves(1).await;
2250 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2251
2252 let last_leaf = leaves.last().unwrap();
2254 let mut tx = data_source.write().await.unwrap();
2255 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2256 tx.commit().await.unwrap();
2257
2258 tracing::info!("stream with transaction failure");
2261 data_source
2262 .as_ref()
2263 .fail_one_begin_read_only(FailableAction::Any)
2264 .await;
2265 assert_eq!(
2266 leaves,
2267 data_source
2268 .subscribe_leaves(1)
2269 .await
2270 .take(5)
2271 .collect::<Vec<_>>()
2272 .await
2273 );
2274 }
2275
2276 #[tokio::test(flavor = "multi_thread")]
2277 async fn test_stream_load_failure() {
2278 setup_test();
2279
2280 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2282
2283 let port = pick_unused_port().unwrap();
2285 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2286 app.register_module(
2287 "availability",
2288 define_api(
2289 &Default::default(),
2290 MockBase::instance(),
2291 "1.0.0".parse().unwrap(),
2292 )
2293 .unwrap(),
2294 )
2295 .unwrap();
2296 network.spawn(
2297 "server",
2298 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2299 );
2300
2301 let provider = Provider::new(QueryServiceProvider::new(
2303 format!("http://localhost:{port}").parse().unwrap(),
2304 MockBase::instance(),
2305 ));
2306 let db = TmpDb::init().await;
2307 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2308 let data_source = FetchingDataSource::builder(storage, provider)
2309 .disable_proactive_fetching()
2310 .disable_aggregator()
2311 .with_min_retry_interval(Duration::from_millis(100))
2312 .with_range_chunk_size(3)
2313 .build()
2314 .await
2315 .unwrap();
2316
2317 network.start().await;
2319
2320 let leaves = network.data_source().subscribe_leaves(1).await;
2322 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2323
2324 let last_leaf = leaves.last().unwrap();
2326 let mut tx = data_source.write().await.unwrap();
2327 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2328 tx.commit().await.unwrap();
2329
2330 tracing::info!("stream with read failure");
2332 data_source.as_ref().fail_reads(FailableAction::Any).await;
2333 let fetches = data_source
2334 .get_block_range(1..=5)
2335 .await
2336 .collect::<Vec<_>>()
2337 .await;
2338
2339 sleep(Duration::from_secs(2)).await;
2341 data_source.as_ref().pass().await;
2342
2343 for (leaf, fetch) in leaves.iter().zip(fetches) {
2344 let block: BlockQueryData<MockTypes> = fetch.await;
2345 assert_eq!(block.hash(), leaf.block_hash());
2346 }
2347 }
2348
2349 enum MetadataType {
2350 Payload,
2351 Vid,
2352 }
2353
2354 async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2355 setup_test();
2356
2357 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2359
2360 let port = pick_unused_port().unwrap();
2362 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2363 app.register_module(
2364 "availability",
2365 define_api(
2366 &Default::default(),
2367 MockBase::instance(),
2368 "1.0.0".parse().unwrap(),
2369 )
2370 .unwrap(),
2371 )
2372 .unwrap();
2373 network.spawn(
2374 "server",
2375 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2376 );
2377
2378 let provider = Provider::new(QueryServiceProvider::new(
2380 format!("http://localhost:{port}").parse().unwrap(),
2381 MockBase::instance(),
2382 ));
2383 let db = TmpDb::init().await;
2384 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2385 let data_source = FetchingDataSource::builder(storage, provider)
2386 .disable_proactive_fetching()
2387 .disable_aggregator()
2388 .with_min_retry_interval(Duration::from_millis(100))
2389 .with_range_chunk_size(3)
2390 .build()
2391 .await
2392 .unwrap();
2393
2394 network.start().await;
2396
2397 let leaves = network.data_source().subscribe_leaves(1).await;
2399 let leaves = leaves.take(3).collect::<Vec<_>>().await;
2400
2401 let last_leaf = leaves.last().unwrap();
2403 let mut tx = data_source.write().await.unwrap();
2404 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2405 tx.commit().await.unwrap();
2406
2407 let leaf = network.data_source().get_leaf(1).await.await;
2412 let block = network.data_source().get_block(1).await.await;
2413 let vid = network.data_source().get_vid_common(1).await.await;
2414 data_source
2415 .append(BlockInfo::new(leaf, Some(block), Some(vid), None, None))
2416 .await
2417 .unwrap();
2418
2419 tracing::info!("stream with transaction failure");
2421 data_source
2422 .as_ref()
2423 .fail_begins_read_only(FailableAction::Any)
2424 .await;
2425 match stream {
2426 MetadataType::Payload => {
2427 let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2428
2429 sleep(Duration::from_secs(2)).await;
2431 tracing::info!("stop failing transactions");
2432 data_source.as_ref().pass().await;
2433
2434 let payloads = payloads.collect::<Vec<_>>().await;
2435 for (leaf, payload) in leaves.iter().zip(payloads) {
2436 assert_eq!(payload.block_hash, leaf.block_hash());
2437 }
2438 },
2439 MetadataType::Vid => {
2440 let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2441
2442 sleep(Duration::from_secs(2)).await;
2444 tracing::info!("stop failing transactions");
2445 data_source.as_ref().pass().await;
2446
2447 let vids = vids.collect::<Vec<_>>().await;
2448 for (leaf, vid) in leaves.iter().zip(vids) {
2449 assert_eq!(vid.block_hash, leaf.block_hash());
2450 }
2451 },
2452 }
2453 }
2454
2455 #[tokio::test(flavor = "multi_thread")]
2456 async fn test_metadata_stream_begin_failure_payload() {
2457 test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2458 }
2459
2460 #[tokio::test(flavor = "multi_thread")]
2461 async fn test_metadata_stream_begin_failure_vid() {
2462 test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2463 }
2464
2465 async fn run_fallback_deserialization_test_helper<V: Versions>(port: u16, version: &str) {
2470 let mut network = MockNetwork::<MockDataSource, V>::init().await;
2471
2472 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2473
2474 app.register_module(
2476 "availability",
2477 define_api(
2478 &Default::default(),
2479 StaticVersion::<0, 1> {},
2480 "0.0.1".parse().unwrap(),
2481 )
2482 .unwrap(),
2483 )
2484 .unwrap();
2485
2486 app.register_module(
2487 "availability",
2488 define_api(
2489 &Default::default(),
2490 StaticVersion::<0, 1> {},
2491 "1.0.0".parse().unwrap(),
2492 )
2493 .unwrap(),
2494 )
2495 .unwrap();
2496
2497 network.spawn(
2498 "server",
2499 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2500 );
2501
2502 let db = TmpDb::init().await;
2503
2504 let provider_url = format!("http://localhost:{port}/{version}")
2505 .parse()
2506 .expect("Invalid URL");
2507
2508 let provider = Provider::new(QueryServiceProvider::new(
2509 provider_url,
2510 StaticVersion::<0, 1> {},
2511 ));
2512
2513 let ds = data_source(&db, &provider).await;
2514 network.start().await;
2515
2516 let leaves = network.data_source().subscribe_leaves(1).await;
2517 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2518 let test_leaf = &leaves[0];
2519 let test_payload = &leaves[2];
2520 let test_common = &leaves[3];
2521
2522 let mut fetches = vec![];
2523 fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2525 fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2526 fetches.push(
2527 ds.get_vid_common(test_common.block_hash())
2528 .await
2529 .map(ignore),
2530 );
2531
2532 sleep(Duration::from_secs(1)).await;
2535 for (i, fetch) in fetches.into_iter().enumerate() {
2536 tracing::info!("checking fetch {i} is unresolved");
2537 fetch.try_resolve().unwrap_err();
2538 }
2539
2540 ds.append(leaves.last().cloned().unwrap().into())
2545 .await
2546 .unwrap();
2547
2548 {
2550 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2551 let payload = ds.get_payload(test_payload.height() as usize).await;
2552 let common = ds.get_vid_common(test_common.height() as usize).await;
2553
2554 let truth = network.data_source();
2555 assert_eq!(
2556 leaf.await,
2557 truth.get_leaf(test_leaf.height() as usize).await.await
2558 );
2559 assert_eq!(
2560 payload.await,
2561 truth
2562 .get_payload(test_payload.height() as usize)
2563 .await
2564 .await
2565 );
2566 assert_eq!(
2567 common.await,
2568 truth
2569 .get_vid_common(test_common.height() as usize)
2570 .await
2571 .await
2572 );
2573 }
2574 }
2575
2576 #[tokio::test(flavor = "multi_thread")]
2577 async fn test_fallback_deserialization_for_fetch_requests_v0() {
2578 setup_test();
2579
2580 let port = pick_unused_port().unwrap();
2581
2582 run_fallback_deserialization_test_helper::<MockVersions>(port, "v0").await;
2588 }
2589
2590 #[tokio::test(flavor = "multi_thread")]
2591 async fn test_fallback_deserialization_for_fetch_requests_v1() {
2592 setup_test();
2593 let port = pick_unused_port().unwrap();
2594
2595 run_fallback_deserialization_test_helper::<MockVersions>(port, "v1").await;
2599 }
2600
2601 #[tokio::test(flavor = "multi_thread")]
2602 async fn test_fallback_deserialization_for_fetch_requests_pos() {
2603 setup_test();
2604 let port = pick_unused_port().unwrap();
2605
2606 run_fallback_deserialization_test_helper::<EpochsTestVersions>(port, "v1").await;
2609 }
2610 #[tokio::test(flavor = "multi_thread")]
2611 async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2612 setup_test();
2613
2614 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
2620
2621 let port = pick_unused_port().unwrap();
2622 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2623
2624 app.register_module(
2625 "availability",
2626 define_api(
2627 &Default::default(),
2628 StaticVersion::<0, 1> {},
2629 "0.0.1".parse().unwrap(),
2630 )
2631 .unwrap(),
2632 )
2633 .unwrap();
2634
2635 network.spawn(
2636 "server",
2637 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2638 );
2639
2640 let db = TmpDb::init().await;
2641 let provider = Provider::new(QueryServiceProvider::new(
2642 format!("http://localhost:{port}/v0").parse().unwrap(),
2643 StaticVersion::<0, 1> {},
2644 ));
2645 let ds = data_source(&db, &provider).await;
2646
2647 network.start().await;
2648
2649 let leaves = network.data_source().subscribe_leaves(1).await;
2650 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2651 let test_leaf = &leaves[0];
2652 let test_payload = &leaves[2];
2653 let test_common = &leaves[3];
2654
2655 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2656 let payload = ds.get_payload(test_payload.height() as usize).await;
2657 let common = ds.get_vid_common(test_common.height() as usize).await;
2658
2659 sleep(Duration::from_secs(3)).await;
2660
2661 leaf.try_resolve().unwrap_err();
2663 payload.try_resolve().unwrap_err();
2664 common.try_resolve().unwrap_err();
2665 }
2666}