1use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16 data::{ns_table, VidCommitment},
17 traits::{block_contents::BlockHeader, node_implementation::NodeType, EncodeBytes},
18 vid::{
19 advz::{advz_scheme, ADVZScheme},
20 avidm::{init_avidm_param, AvidMScheme},
21 },
22};
23use jf_vid::VidScheme;
24use surf_disco::{Client, Url};
25use vbs::{version::StaticVersionType, BinarySerializer};
26
27use super::Provider;
28use crate::{
29 availability::{
30 ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
31 PayloadQueryData, VidCommonQueryData,
32 },
33 fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
34 types::HeightIndexed,
35 Error, Header, Payload, VidCommon,
36};
37
38#[derive(Clone, Debug)]
43pub struct QueryServiceProvider<Ver: StaticVersionType> {
44 client: Client<Error, Ver>,
45}
46
47impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
48 pub fn new(url: Url, _: Ver) -> Self {
49 Self {
50 client: Client::new(url),
51 }
52 }
53}
54
55impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
56 async fn deserialize_legacy_payload<Types: NodeType>(
57 &self,
58 payload_bytes: Vec<u8>,
59 common_bytes: Vec<u8>,
60 req: PayloadRequest,
61 ) -> Option<Payload<Types>> {
62 let client_url = self.client.base_url();
63
64 let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
65 return None;
66 };
67
68 let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
69 &payload_bytes,
70 ) {
71 Ok(payload) => payload,
72 Err(err) => {
73 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
74 return None;
75 },
76 };
77
78 let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
79 &common_bytes,
80 ) {
81 Ok(common) => common,
82 Err(err) => {
83 tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
84 return None;
85 },
86 };
87
88 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
89 let bytes = payload.data.encode();
90
91 let commit = advz_scheme(num_storage_nodes)
92 .commit_only(bytes)
93 .inspect_err(|err| {
94 tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
95 })
96 .ok()?;
97
98 if commit != advz_commit {
99 tracing::error!(
100 ?req,
101 expected_commit=%advz_commit,
102 actual_commit=%commit,
103 %client_url,
104 "received inconsistent legacy payload"
105 );
106 return None;
107 }
108
109 Some(payload.data)
110 }
111
112 async fn deserialize_legacy_vid_common<Types: NodeType>(
113 &self,
114 bytes: Vec<u8>,
115 req: VidCommonRequest,
116 ) -> Option<VidCommon> {
117 let client_url = self.client.base_url();
118 let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
119 return None;
120 };
121
122 match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
123 Ok(res) => {
124 if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
125 Some(VidCommon::V0(res.common))
126 } else {
127 tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
128 None
129 }
130 },
131 Err(err) => {
132 tracing::warn!(
133 %client_url,
134 ?req,
135 %err,
136 "failed to deserialize ADVZCommonQueryData"
137 );
138 None
139 },
140 }
141 }
142 async fn deserialize_legacy_leaf<Types: NodeType>(
143 &self,
144 bytes: Vec<u8>,
145 req: LeafRequest<Types>,
146 ) -> Option<LeafQueryData<Types>> {
147 let client_url = self.client.base_url();
148
149 match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
150 Ok(mut leaf) => {
151 if leaf.height() != req.height {
152 tracing::error!(
153 %client_url, ?req,
154 expected_height = req.height,
155 actual_height = leaf.height(),
156 "received leaf with the wrong height"
157 );
158 return None;
159 }
160
161 let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
162 let actual_leaf_commit: [u8; 32] = leaf.hash().into();
163 if actual_leaf_commit != expected_leaf_commit {
164 tracing::error!(
165 %client_url, ?req,
166 expected_leaf = %req.expected_leaf,
167 actual_leaf = %leaf.hash(),
168 "received leaf with the wrong hash"
169 );
170 return None;
171 }
172
173 let expected_qc_commit: [u8; 32] = req.expected_qc.into();
174 let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
175 if actual_qc_commit != expected_qc_commit {
176 tracing::error!(
177 %client_url, ?req,
178 expected_qc = %req.expected_qc,
179 actual_qc = %leaf.qc().commit(),
180 "received leaf with the wrong QC"
181 );
182 return None;
183 }
184
185 leaf.leaf.unfill_block_payload();
190
191 Some(leaf.into())
192 },
193 Err(err) => {
194 tracing::warn!(
195 %client_url, ?req, %err,
196 "failed to deserialize legacy LeafQueryData"
197 );
198 None
199 },
200 }
201 }
202}
203
204#[async_trait]
205impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
206where
207 Types: NodeType,
208{
209 async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
217 let client_url = self.client.base_url();
218 let req_hash = req.0;
219 let payload_bytes = self
223 .client
224 .get::<()>(&format!("availability/payload/hash/{}", req.0))
225 .bytes()
226 .await
227 .inspect_err(|err| {
228 tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
229 })
230 .ok()?;
231
232 let common_bytes = self
233 .client
234 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
235 .bytes()
236 .await
237 .inspect_err(|err| {
238 tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
239 })
240 .ok()?;
241
242 let payload =
243 vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
244 .inspect_err(|err| {
245 tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
246 })
247 .ok();
248
249 let common =
250 vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
251 .inspect_err(|err| {
252 tracing::info!(%err, %req_hash,
253 "error deserializing VidCommonQueryData",
254 );
255 })
256 .ok();
257
258 let (payload, common) = match (payload, common) {
259 (Some(payload), Some(common)) => (payload, common),
260 _ => {
261 tracing::info!(%req_hash, "falling back to legacy payload deserialization");
262
263 return self
265 .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
266 .await;
267 },
268 };
269
270 match common.common() {
271 VidCommon::V0(common) => {
272 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
273 let bytes = payload.data().encode();
274
275 let commit = advz_scheme(num_storage_nodes)
276 .commit_only(bytes)
277 .map(VidCommitment::V0)
278 .inspect_err(|err| {
279 tracing::error!(%err, %req_hash, %num_storage_nodes, "failed to compute VID commitment (V0)");
280 })
281 .ok()?;
282
283 if commit != req.0 {
284 tracing::error!(
285 expected = %req_hash,
286 actual = ?commit,
287 %client_url,
288 "VID commitment mismatch (V0)"
289 );
290
291 return None;
292 }
293 },
294 VidCommon::V1(common) => {
295 let bytes = payload.data().encode();
296
297 let avidm_param = init_avidm_param(common.total_weights)
298 .inspect_err(|err| {
299 tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
300 })
301 .ok()?;
302
303 let header = self
304 .client
305 .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
306 .send()
307 .await
308 .inspect_err(|err| {
309 tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
310 })
311 .ok()?;
312
313 if header.payload_commitment() != req.0 {
314 tracing::error!(
315 expected = %req_hash,
316 actual = %header.payload_commitment(),
317 %client_url,
318 "header payload commitment mismatch (V1)"
319 );
320 return None;
321 }
322
323 let metadata = header.metadata().encode();
324 let commit = AvidMScheme::commit(
325 &avidm_param,
326 &bytes,
327 ns_table::parse_ns_table(bytes.len(), &metadata),
328 )
329 .map(VidCommitment::V1)
330 .inspect_err(|err| {
331 tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
332 })
333 .ok()?;
334
335 if commit != req.0 {
337 tracing::warn!(
338 expected = %req_hash,
339 actual = %commit,
340 %client_url,
341 "commitment type mismatch for AVIDM check"
342 );
343 return None;
344 }
345 },
346 }
347
348 Some(payload.data)
349 }
350}
351
352#[async_trait]
353impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
354 for QueryServiceProvider<Ver>
355where
356 Types: NodeType,
357{
358 async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
366 let client_url = self.client.base_url();
367
368 let bytes = self
369 .client
370 .get::<()>(&format!("availability/leaf/{}", req.height))
371 .bytes()
372 .await;
373 let bytes = match bytes {
374 Ok(bytes) => bytes,
375 Err(err) => {
376 tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
377
378 return None;
379 },
380 };
381
382 match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
385 Ok(mut leaf) => {
386 if leaf.height() != req.height {
387 tracing::error!(
388 %client_url, ?req, ?leaf,
389 expected_height = req.height,
390 actual_height = leaf.height(),
391 "received leaf with the wrong height"
392 );
393 return None;
394 }
395 if leaf.hash() != req.expected_leaf {
396 tracing::error!(
397 %client_url, ?req, ?leaf,
398 expected_hash = %req.expected_leaf,
399 actual_hash = %leaf.hash(),
400 "received leaf with the wrong hash"
401 );
402 return None;
403 }
404 if leaf.qc().commit() != req.expected_qc {
405 tracing::error!(
406 %client_url, ?req, ?leaf,
407 expected_qc = %req.expected_qc,
408 actual_qc = %leaf.qc().commit(),
409 "received leaf with the wrong QC"
410 );
411 return None;
412 }
413
414 leaf.leaf.unfill_block_payload();
419
420 Some(leaf)
421 },
422 Err(err) => {
423 tracing::info!(
424 ?req, %err,
425 "failed to deserialize LeafQueryData, falling back to legacy deserialization"
426 );
427 self.deserialize_legacy_leaf(bytes, req).await
429 },
430 }
431 }
432}
433
434#[async_trait]
435impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
436where
437 Types: NodeType,
438{
439 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
447 let client_url = self.client.base_url();
448 let bytes = self
449 .client
450 .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
451 .bytes()
452 .await;
453 let bytes = match bytes {
454 Ok(bytes) => bytes,
455 Err(err) => {
456 tracing::info!(
457 %client_url, ?req, %err,
458 "failed to fetch VID common bytes"
459 );
460 return None;
461 },
462 };
463
464 match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
465 Ok(res) => match req.0 {
466 VidCommitment::V0(commit) => {
467 if let VidCommon::V0(common) = res.common {
468 if ADVZScheme::is_consistent(&commit, &common).is_ok() {
469 Some(VidCommon::V0(common))
470 } else {
471 tracing::error!(
472 %client_url, ?req, ?commit, ?common,
473 "VID V0 common data is inconsistent with commitment"
474 );
475 None
476 }
477 } else {
478 tracing::warn!(?req, ?res, "Expect VID common data but found None");
479 None
480 }
481 },
482 VidCommitment::V1(_) => {
483 if let VidCommon::V1(common) = res.common {
484 Some(VidCommon::V1(common))
485 } else {
486 tracing::warn!(?req, ?res, "Expect VID common data but found None");
487 None
488 }
489 },
490 },
491 Err(err) => {
492 tracing::info!(
493 %client_url, ?req, %err,
494 "failed to deserialize as V1 VID common data, trying legacy fallback"
495 );
496 self.deserialize_legacy_vid_common::<Types>(bytes, req)
498 .await
499 },
500 }
501 }
502}
503
504#[cfg(all(test, not(target_os = "windows")))]
506mod test {
507 use std::{future::IntoFuture, time::Duration};
508
509 use committable::Committable;
510 use futures::{
511 future::{join, FutureExt},
512 stream::StreamExt,
513 };
514 use generic_array::GenericArray;
515 use hotshot_example_types::node_types::{EpochsTestVersions, TestVersions};
516 use hotshot_types::traits::node_implementation::Versions;
517 use portpicker::pick_unused_port;
518 use rand::RngCore;
519 use tide_disco::{error::ServerError, App};
520 use vbs::version::StaticVersion;
521
522 use super::*;
523 use crate::{
524 api::load_api,
525 availability::{
526 define_api, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData,
527 BlockWithTransaction, Fetch, UpdateAvailabilityData,
528 },
529 data_source::{
530 sql::{self, SqlDataSource},
531 storage::{
532 fail_storage::{FailStorage, FailableAction},
533 pruning::{PrunedHeightStorage, PrunerCfg},
534 sql::testing::TmpDb,
535 AvailabilityStorage, SqlStorage, UpdateAvailabilityStorage,
536 },
537 AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
538 },
539 fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
540 node::{data_source::NodeDataSource, SyncStatus},
541 task::BackgroundTask,
542 testing::{
543 consensus::{MockDataSource, MockNetwork},
544 mocks::{mock_transaction, MockBase, MockTypes, MockVersions},
545 sleep,
546 },
547 types::HeightIndexed,
548 ApiState,
549 };
550
551 type Provider = TestProvider<QueryServiceProvider<MockBase>>;
552 type EpochProvider = TestProvider<QueryServiceProvider<<EpochsTestVersions as Versions>::Base>>;
553
554 fn ignore<T>(_: T) {}
555
556 async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
558 db: &TmpDb,
559 provider: &P,
560 ) -> sql::Builder<MockTypes, P> {
561 db.config()
562 .builder((*provider).clone())
563 .await
564 .unwrap()
565 .disable_proactive_fetching()
568 }
569
570 async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
572 db: &TmpDb,
573 provider: &P,
574 ) -> SqlDataSource<MockTypes, P> {
575 builder(db, provider).await.build().await.unwrap()
576 }
577
578 #[test_log::test(tokio::test(flavor = "multi_thread"))]
579 async fn test_fetch_on_request() {
580 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
582
583 let port = pick_unused_port().unwrap();
585 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
586 app.register_module(
587 "availability",
588 define_api(
589 &Default::default(),
590 MockBase::instance(),
591 "1.0.0".parse().unwrap(),
592 )
593 .unwrap(),
594 )
595 .unwrap();
596 network.spawn(
597 "server",
598 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
599 );
600
601 let db = TmpDb::init().await;
603 let provider = Provider::new(QueryServiceProvider::new(
604 format!("http://localhost:{port}").parse().unwrap(),
605 MockBase::instance(),
606 ));
607 let data_source = data_source(&db, &provider).await;
608
609 network.start().await;
611
612 let leaves = network.data_source().subscribe_leaves(1).await;
619 let leaves = leaves.take(5).collect::<Vec<_>>().await;
620 let test_leaf = &leaves[0];
621 let test_block = &leaves[1];
622 let test_payload = &leaves[2];
623 let test_common = &leaves[3];
624
625 tracing::info!("requesting unfetchable resources");
627 let mut fetches = vec![];
628 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
630 fetches.push(
632 data_source
633 .get_leaf(test_leaf.height() as usize)
634 .await
635 .map(ignore),
636 );
637 fetches.push(
639 data_source
640 .get_block(test_block.block_hash())
641 .await
642 .map(ignore),
643 );
644 fetches.push(
645 data_source
646 .get_payload(test_payload.block_hash())
647 .await
648 .map(ignore),
649 );
650 fetches.push(
651 data_source
652 .get_vid_common(test_common.block_hash())
653 .await
654 .map(ignore),
655 );
656 fetches.push(
658 data_source
659 .get_block(test_block.height() as usize)
660 .await
661 .map(ignore),
662 );
663 fetches.push(
664 data_source
665 .get_payload(test_payload.height() as usize)
666 .await
667 .map(ignore),
668 );
669 fetches.push(
670 data_source
671 .get_vid_common(test_common.height() as usize)
672 .await
673 .map(ignore),
674 );
675 fetches.push(data_source.get_vid_common(0).await.map(ignore));
677 fetches.push(
679 data_source
680 .get_block_containing_transaction(mock_transaction(vec![]).commit())
681 .await
682 .map(ignore),
683 );
684
685 sleep(Duration::from_secs(1)).await;
688 for (i, fetch) in fetches.into_iter().enumerate() {
689 tracing::info!("checking fetch {i} is unresolved");
690 fetch.try_resolve().unwrap_err();
691 }
692
693 provider.block().await;
698 data_source
699 .append(leaves.last().cloned().unwrap().into())
700 .await
701 .unwrap();
702
703 tracing::info!("requesting fetchable resources");
704 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
705 let req_block = data_source.get_block(test_block.height() as usize).await;
706 let req_payload = data_source
707 .get_payload(test_payload.height() as usize)
708 .await;
709 let req_common = data_source
710 .get_vid_common(test_common.height() as usize)
711 .await;
712
713 sleep(Duration::from_secs(1)).await;
719 req_leaf.try_resolve().unwrap_err();
720 req_block.try_resolve().unwrap_err();
721 req_payload.try_resolve().unwrap_err();
722 req_common.try_resolve().unwrap_err();
723
724 provider.unblock().await;
726 let leaf = data_source
727 .get_leaf(test_leaf.height() as usize)
728 .await
729 .await;
730 let block = data_source
731 .get_block(test_block.height() as usize)
732 .await
733 .await;
734 let payload = data_source
735 .get_payload(test_payload.height() as usize)
736 .await
737 .await;
738 let common = data_source
739 .get_vid_common(test_common.height() as usize)
740 .await
741 .await;
742 {
743 let truth = network.data_source();
745 assert_eq!(
746 leaf,
747 truth.get_leaf(test_leaf.height() as usize).await.await
748 );
749 assert_eq!(
750 block,
751 truth.get_block(test_block.height() as usize).await.await
752 );
753 assert_eq!(
754 payload,
755 truth
756 .get_payload(test_payload.height() as usize)
757 .await
758 .await
759 );
760 assert_eq!(
761 common,
762 truth
763 .get_vid_common(test_common.height() as usize)
764 .await
765 .await
766 );
767 }
768
769 provider.block().await;
774 for leaf in [test_block, test_payload] {
775 tracing::info!("fetching existing leaf {}", leaf.height());
776 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
777 assert_eq!(*leaf, fetched_leaf);
778 }
779
780 tracing::info!("fetching block by hash");
785 provider.unblock().await;
786 {
787 let block = data_source.get_block(test_leaf.block_hash()).await.await;
788 assert_eq!(block.hash(), leaf.block_hash());
789 }
790
791 tracing::info!("fetching payload by hash");
795 {
796 let leaf = leaves.last().unwrap();
797 let payload = data_source.get_payload(leaf.block_hash()).await.await;
798 assert_eq!(payload.height(), leaf.height());
799 assert_eq!(payload.block_hash(), leaf.block_hash());
800 assert_eq!(payload.hash(), leaf.payload_hash());
801 }
802 }
803
804 #[tokio::test(flavor = "multi_thread")]
805 async fn test_fetch_on_request_epoch_version() {
806 tracing::info!("Starting test_fetch_on_request_epoch_version");
809
810 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
812
813 let port = pick_unused_port().unwrap();
815 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
816 app.register_module(
817 "availability",
818 define_api(
819 &Default::default(),
820 <EpochsTestVersions as Versions>::Base::instance(),
821 "1.0.0".parse().unwrap(),
822 )
823 .unwrap(),
824 )
825 .unwrap();
826 network.spawn(
827 "server",
828 app.serve(
829 format!("0.0.0.0:{port}"),
830 <EpochsTestVersions as Versions>::Base::instance(),
831 ),
832 );
833
834 let db = TmpDb::init().await;
837 let provider = EpochProvider::new(QueryServiceProvider::new(
838 format!("http://localhost:{port}").parse().unwrap(),
839 <EpochsTestVersions as Versions>::Base::instance(),
840 ));
841 let data_source = data_source(&db, &provider).await;
842
843 network.start().await;
845
846 let leaves = network.data_source().subscribe_leaves(1).await;
853 let leaves = leaves.take(5).collect::<Vec<_>>().await;
854 let test_leaf = &leaves[0];
855 let test_block = &leaves[1];
856 let test_payload = &leaves[2];
857 let test_common = &leaves[3];
858
859 let mut fetches = vec![];
861 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
863 fetches.push(
865 data_source
866 .get_leaf(test_leaf.height() as usize)
867 .await
868 .map(ignore),
869 );
870 fetches.push(
872 data_source
873 .get_block(test_block.block_hash())
874 .await
875 .map(ignore),
876 );
877 fetches.push(
878 data_source
879 .get_payload(test_payload.block_hash())
880 .await
881 .map(ignore),
882 );
883 fetches.push(
884 data_source
885 .get_vid_common(test_common.block_hash())
886 .await
887 .map(ignore),
888 );
889 fetches.push(
891 data_source
892 .get_block(test_block.height() as usize)
893 .await
894 .map(ignore),
895 );
896 fetches.push(
897 data_source
898 .get_payload(test_payload.height() as usize)
899 .await
900 .map(ignore),
901 );
902 fetches.push(
903 data_source
904 .get_vid_common(test_common.height() as usize)
905 .await
906 .map(ignore),
907 );
908 fetches.push(data_source.get_vid_common(0).await.map(ignore));
910 fetches.push(
912 data_source
913 .get_block_containing_transaction(mock_transaction(vec![]).commit())
914 .await
915 .map(ignore),
916 );
917
918 sleep(Duration::from_secs(1)).await;
921 for (i, fetch) in fetches.into_iter().enumerate() {
922 tracing::info!("checking fetch {i} is unresolved");
923 fetch.try_resolve().unwrap_err();
924 }
925
926 provider.block().await;
931 data_source
932 .append(leaves.last().cloned().unwrap().into())
933 .await
934 .unwrap();
935
936 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
937 let req_block = data_source.get_block(test_block.height() as usize).await;
938 let req_payload = data_source
939 .get_payload(test_payload.height() as usize)
940 .await;
941 let req_common = data_source
942 .get_vid_common(test_common.height() as usize)
943 .await;
944
945 sleep(Duration::from_secs(1)).await;
951 req_leaf.try_resolve().unwrap_err();
952 req_block.try_resolve().unwrap_err();
953 req_payload.try_resolve().unwrap_err();
954 req_common.try_resolve().unwrap_err();
955
956 provider.unblock().await;
958 let leaf = data_source
959 .get_leaf(test_leaf.height() as usize)
960 .await
961 .await;
962 let block = data_source
963 .get_block(test_block.height() as usize)
964 .await
965 .await;
966 let payload = data_source
967 .get_payload(test_payload.height() as usize)
968 .await
969 .await;
970 let common = data_source
971 .get_vid_common(test_common.height() as usize)
972 .await
973 .await;
974 {
975 let truth = network.data_source();
977 assert_eq!(
978 leaf,
979 truth.get_leaf(test_leaf.height() as usize).await.await
980 );
981 assert_eq!(
982 block,
983 truth.get_block(test_block.height() as usize).await.await
984 );
985 assert_eq!(
986 payload,
987 truth
988 .get_payload(test_payload.height() as usize)
989 .await
990 .await
991 );
992 assert_eq!(
993 common,
994 truth
995 .get_vid_common(test_common.height() as usize)
996 .await
997 .await
998 );
999 }
1000
1001 provider.block().await;
1006 for leaf in [test_block, test_payload] {
1007 tracing::info!("fetching existing leaf {}", leaf.height());
1008 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1009 assert_eq!(*leaf, fetched_leaf);
1010 }
1011
1012 provider.unblock().await;
1017 {
1018 let block = data_source.get_block(test_leaf.block_hash()).await.await;
1019 assert_eq!(block.hash(), leaf.block_hash());
1020 }
1021
1022 {
1026 let leaf = leaves.last().unwrap();
1027 let payload = data_source.get_payload(leaf.block_hash()).await.await;
1028 assert_eq!(payload.height(), leaf.height());
1029 assert_eq!(payload.block_hash(), leaf.block_hash());
1030 assert_eq!(payload.hash(), leaf.payload_hash());
1031 }
1032
1033 tracing::info!("Test completed successfully!");
1035 }
1036
1037 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1038 async fn test_fetch_block_and_leaf_concurrently() {
1039 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1041
1042 let port = pick_unused_port().unwrap();
1044 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1045 app.register_module(
1046 "availability",
1047 define_api(
1048 &Default::default(),
1049 MockBase::instance(),
1050 "1.0.0".parse().unwrap(),
1051 )
1052 .unwrap(),
1053 )
1054 .unwrap();
1055 network.spawn(
1056 "server",
1057 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1058 );
1059
1060 let db = TmpDb::init().await;
1062 let provider = Provider::new(QueryServiceProvider::new(
1063 format!("http://localhost:{port}").parse().unwrap(),
1064 MockBase::instance(),
1065 ));
1066 let data_source = data_source(&db, &provider).await;
1067
1068 network.start().await;
1070
1071 let leaves = network.data_source().subscribe_leaves(1).await;
1074 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1075 let test_leaf = &leaves[0];
1076
1077 data_source.append(leaves[1].clone().into()).await.unwrap();
1079
1080 let (leaf, block) = join(
1084 data_source
1085 .get_leaf(test_leaf.height() as usize)
1086 .await
1087 .into_future(),
1088 data_source
1089 .get_block(test_leaf.height() as usize)
1090 .await
1091 .into_future(),
1092 )
1093 .await;
1094 assert_eq!(leaf, *test_leaf);
1095 assert_eq!(leaf.header(), block.header());
1096 }
1097
1098 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1099 async fn test_fetch_different_blocks_same_payload() {
1100 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1102
1103 let port = pick_unused_port().unwrap();
1105 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1106 app.register_module(
1107 "availability",
1108 define_api(
1109 &Default::default(),
1110 MockBase::instance(),
1111 "1.0.0".parse().unwrap(),
1112 )
1113 .unwrap(),
1114 )
1115 .unwrap();
1116 network.spawn(
1117 "server",
1118 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1119 );
1120
1121 let db = TmpDb::init().await;
1123 let provider = Provider::new(QueryServiceProvider::new(
1124 format!("http://localhost:{port}").parse().unwrap(),
1125 MockBase::instance(),
1126 ));
1127 let data_source = data_source(&db, &provider).await;
1128
1129 network.start().await;
1131
1132 let leaves = network.data_source().subscribe_leaves(1).await;
1135 let leaves = leaves.take(4).collect::<Vec<_>>().await;
1136
1137 data_source
1140 .append(leaves.last().cloned().unwrap().into())
1141 .await
1142 .unwrap();
1143
1144 assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1146 let (block1, block2) = join(
1149 data_source
1150 .get_block(leaves[0].height() as usize)
1151 .await
1152 .into_future(),
1153 data_source
1154 .get_block(leaves[1].height() as usize)
1155 .await
1156 .into_future(),
1157 )
1158 .await;
1159 assert_eq!(block1.header(), leaves[0].header());
1160 assert_eq!(block2.header(), leaves[1].header());
1161 }
1162
1163 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1164 async fn test_fetch_stream() {
1165 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1167
1168 let port = pick_unused_port().unwrap();
1170 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1171 app.register_module(
1172 "availability",
1173 define_api(
1174 &Default::default(),
1175 MockBase::instance(),
1176 "1.0.0".parse().unwrap(),
1177 )
1178 .unwrap(),
1179 )
1180 .unwrap();
1181 network.spawn(
1182 "server",
1183 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1184 );
1185
1186 let db = TmpDb::init().await;
1188 let provider = Provider::new(QueryServiceProvider::new(
1189 format!("http://localhost:{port}").parse().unwrap(),
1190 MockBase::instance(),
1191 ));
1192 let data_source = data_source(&db, &provider).await;
1193
1194 network.start().await;
1196
1197 let blocks = data_source.subscribe_blocks(0).await;
1199 let leaves = data_source.subscribe_leaves(0).await;
1200 let common = data_source.subscribe_vid_common(0).await;
1201
1202 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1204 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1205
1206 data_source
1209 .append(finalized_leaves.last().cloned().unwrap().into())
1210 .await
1211 .unwrap();
1212
1213 let blocks = blocks.take(5).collect::<Vec<_>>().await;
1215 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1216 let common = common.take(5).collect::<Vec<_>>().await;
1217 for i in 0..5 {
1218 tracing::info!("checking block {i}");
1219 assert_eq!(leaves[i], finalized_leaves[i]);
1220 assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1221 assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1222 }
1223 }
1224
1225 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1226 async fn test_fetch_range_start() {
1227 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1229
1230 let port = pick_unused_port().unwrap();
1232 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1233 app.register_module(
1234 "availability",
1235 define_api(
1236 &Default::default(),
1237 MockBase::instance(),
1238 "1.0.0".parse().unwrap(),
1239 )
1240 .unwrap(),
1241 )
1242 .unwrap();
1243 network.spawn(
1244 "server",
1245 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1246 );
1247
1248 let db = TmpDb::init().await;
1250 let provider = Provider::new(QueryServiceProvider::new(
1251 format!("http://localhost:{port}").parse().unwrap(),
1252 MockBase::instance(),
1253 ));
1254 let data_source = data_source(&db, &provider).await;
1255
1256 network.start().await;
1258
1259 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1261 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1262
1263 let mut tx = data_source.write().await.unwrap();
1267 tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1268 tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1269 tx.commit().await.unwrap();
1270
1271 let leaves = data_source
1273 .get_leaf_range(..5)
1274 .await
1275 .then(Fetch::resolve)
1276 .collect::<Vec<_>>()
1277 .await;
1278 for i in 0..5 {
1279 tracing::info!("checking leaf {i}");
1280 assert_eq!(leaves[i], finalized_leaves[i]);
1281 }
1282 }
1283
1284 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1285 async fn fetch_transaction() {
1286 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1288
1289 let port = pick_unused_port().unwrap();
1291 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1292 app.register_module(
1293 "availability",
1294 define_api(
1295 &Default::default(),
1296 MockBase::instance(),
1297 "1.0.0".parse().unwrap(),
1298 )
1299 .unwrap(),
1300 )
1301 .unwrap();
1302 network.spawn(
1303 "server",
1304 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1305 );
1306
1307 let db = TmpDb::init().await;
1310 let data_source = data_source(&db, &NoFetching).await;
1311
1312 let mut leaves = network.data_source().subscribe_leaves(1).await;
1314 let mut blocks = network.data_source().subscribe_blocks(1).await;
1315
1316 network.start().await;
1318
1319 let tx = mock_transaction(vec![1, 2, 3]);
1323 let fut = data_source
1324 .get_block_containing_transaction(tx.commit())
1325 .await;
1326
1327 network.submit_transaction(tx.clone()).await;
1329
1330 let block = loop {
1333 let leaf = leaves.next().await.unwrap();
1334 let block = blocks.next().await.unwrap();
1335
1336 data_source
1337 .append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1338 .await
1339 .unwrap();
1340
1341 if block.transaction_by_hash(tx.commit()).is_some() {
1342 break block;
1343 }
1344 };
1345 tracing::info!("transaction included in block {}", block.height());
1346
1347 let fetched_tx = fut.await;
1348 assert_eq!(
1349 fetched_tx,
1350 BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1351 );
1352
1353 assert_eq!(
1355 fetched_tx,
1356 data_source
1357 .get_block_containing_transaction(tx.commit())
1358 .await
1359 .await
1360 );
1361 }
1362
1363 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1364 async fn test_retry() {
1365 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1367
1368 let port = pick_unused_port().unwrap();
1370 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1371 app.register_module(
1372 "availability",
1373 define_api(
1374 &Default::default(),
1375 MockBase::instance(),
1376 "1.0.0".parse().unwrap(),
1377 )
1378 .unwrap(),
1379 )
1380 .unwrap();
1381 network.spawn(
1382 "server",
1383 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1384 );
1385
1386 let db = TmpDb::init().await;
1388 let provider = Provider::new(QueryServiceProvider::new(
1389 format!("http://localhost:{port}").parse().unwrap(),
1390 MockBase::instance(),
1391 ));
1392 let data_source = builder(&db, &provider)
1393 .await
1394 .with_max_retry_interval(Duration::from_secs(1))
1395 .build()
1396 .await
1397 .unwrap();
1398
1399 network.start().await;
1401
1402 let leaves = network.data_source().subscribe_leaves(1).await;
1405 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1406 let test_leaf = &leaves[0];
1407
1408 provider.fail();
1410
1411 data_source
1414 .append(leaves.last().cloned().unwrap().into())
1415 .await
1416 .unwrap();
1417
1418 tracing::info!("requesting leaf from failing providers");
1419 let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1420
1421 sleep(Duration::from_secs(5)).await;
1424 fut.try_resolve().unwrap_err();
1425
1426 provider.unfail();
1428 assert_eq!(
1429 data_source
1430 .get_leaf(test_leaf.height() as usize)
1431 .await
1432 .await,
1433 *test_leaf
1434 );
1435 }
1436
1437 fn random_vid_commit() -> VidCommitment {
1438 let mut bytes = [0; 32];
1439 rand::thread_rng().fill_bytes(&mut bytes);
1440 VidCommitment::V0(GenericArray::from(bytes).into())
1441 }
1442
1443 async fn malicious_server(port: u16) {
1444 let mut api = load_api::<(), ServerError, MockBase>(
1445 None::<std::path::PathBuf>,
1446 include_str!("../../../api/availability.toml"),
1447 vec![],
1448 )
1449 .unwrap();
1450
1451 api.get("get_payload", move |_, _| {
1452 async move {
1453 Ok(PayloadQueryData::<MockTypes>::genesis::<TestVersions>(
1455 &Default::default(),
1456 &Default::default(),
1457 )
1458 .await)
1459 }
1460 .boxed()
1461 })
1462 .unwrap()
1463 .get("get_vid_common", move |_, _| {
1464 async move {
1465 Ok(VidCommonQueryData::<MockTypes>::genesis::<TestVersions>(
1467 &Default::default(),
1468 &Default::default(),
1469 )
1470 .await)
1471 }
1472 .boxed()
1473 })
1474 .unwrap();
1475
1476 let mut app = App::<(), ServerError>::with_state(());
1477 app.register_module("availability", api).unwrap();
1478 app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1479 .await
1480 .ok();
1481 }
1482
1483 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1484 async fn test_fetch_from_malicious_server() {
1485 let port = pick_unused_port().unwrap();
1486 let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1487
1488 let provider = QueryServiceProvider::new(
1489 format!("http://localhost:{port}").parse().unwrap(),
1490 MockBase::instance(),
1491 );
1492 provider.client.connect(None).await;
1493
1494 let res =
1497 ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1498 .await;
1499 assert_eq!(res, None);
1500
1501 let res =
1504 ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1505 .await;
1506 assert_eq!(res, None);
1507 }
1508
1509 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1510 async fn test_archive_recovery() {
1511 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1513
1514 let port = pick_unused_port().unwrap();
1516 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1517 app.register_module(
1518 "availability",
1519 define_api(
1520 &Default::default(),
1521 MockBase::instance(),
1522 "1.0.0".parse().unwrap(),
1523 )
1524 .unwrap(),
1525 )
1526 .unwrap();
1527 network.spawn(
1528 "server",
1529 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1530 );
1531
1532 let db = TmpDb::init().await;
1535 let provider = Provider::new(QueryServiceProvider::new(
1536 format!("http://localhost:{port}").parse().unwrap(),
1537 MockBase::instance(),
1538 ));
1539 let mut data_source = db
1540 .config()
1541 .pruner_cfg(
1542 PrunerCfg::new()
1543 .with_target_retention(Duration::from_secs(0))
1544 .with_interval(Duration::from_secs(5)),
1545 )
1546 .unwrap()
1547 .builder(provider.clone())
1548 .await
1549 .unwrap()
1550 .with_min_retry_interval(Duration::from_millis(100))
1554 .with_retry_randomization_factor(3.)
1558 .build()
1559 .await
1560 .unwrap();
1561
1562 network.start().await;
1564
1565 let leaves = network.data_source().subscribe_leaves(1).await;
1567 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1568
1569 let pruned_height = data_source
1571 .read()
1572 .await
1573 .unwrap()
1574 .load_pruned_height()
1575 .await
1576 .unwrap();
1577 assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1579
1580 let last_leaf = leaves.last().unwrap();
1583 data_source.append(last_leaf.clone().into()).await.unwrap();
1584
1585 for i in 1..=last_leaf.height() {
1587 tracing::info!(i, "fetching leaf");
1588 assert_eq!(
1589 data_source.get_leaf(i as usize).await.await,
1590 leaves[i as usize - 1]
1591 );
1592 }
1593
1594 loop {
1596 let pruned_height = data_source
1597 .read()
1598 .await
1599 .unwrap()
1600 .load_pruned_height()
1601 .await
1602 .unwrap();
1603 if pruned_height == Some(last_leaf.height()) {
1604 break;
1605 }
1606 tracing::info!(
1607 ?pruned_height,
1608 target_height = last_leaf.height(),
1609 "waiting for pruner to run"
1610 );
1611 sleep(Duration::from_secs(1)).await;
1612 }
1613
1614 data_source = db
1616 .config()
1617 .archive()
1618 .builder(provider.clone())
1619 .await
1620 .unwrap()
1621 .with_minor_scan_interval(Duration::from_secs(1))
1622 .with_major_scan_interval(1)
1623 .build()
1624 .await
1625 .unwrap();
1626
1627 let pruned_height = data_source
1629 .read()
1630 .await
1631 .unwrap()
1632 .load_pruned_height()
1633 .await
1634 .unwrap();
1635 assert_eq!(pruned_height, None);
1636
1637 data_source.append(last_leaf.clone().into()).await.unwrap();
1641
1642 loop {
1644 let sync_status = data_source.sync_status().await.unwrap();
1645
1646 if (SyncStatus {
1650 missing_vid_shares: 0,
1651 ..sync_status
1652 })
1653 .is_fully_synced()
1654 {
1655 break;
1656 }
1657 tracing::info!(?sync_status, "waiting for node to sync");
1658 sleep(Duration::from_secs(1)).await;
1659 }
1660
1661 sleep(Duration::from_secs(3)).await;
1663 let sync_status = data_source.sync_status().await.unwrap();
1664 assert!(
1665 (SyncStatus {
1666 missing_vid_shares: 0,
1667 ..sync_status
1668 })
1669 .is_fully_synced(),
1670 "{sync_status:?}"
1671 );
1672 }
1673
1674 #[derive(Clone, Copy, Debug)]
1675 enum FailureType {
1676 Begin,
1677 Write,
1678 Commit,
1679 }
1680
1681 async fn test_fetch_storage_failure_helper(failure: FailureType) {
1682 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1684
1685 let port = pick_unused_port().unwrap();
1687 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1688 app.register_module(
1689 "availability",
1690 define_api(
1691 &Default::default(),
1692 MockBase::instance(),
1693 "1.0.0".parse().unwrap(),
1694 )
1695 .unwrap(),
1696 )
1697 .unwrap();
1698 network.spawn(
1699 "server",
1700 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1701 );
1702
1703 let provider = Provider::new(QueryServiceProvider::new(
1705 format!("http://localhost:{port}").parse().unwrap(),
1706 MockBase::instance(),
1707 ));
1708 let db = TmpDb::init().await;
1709 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1710 let data_source = FetchingDataSource::builder(storage, provider)
1711 .disable_proactive_fetching()
1712 .disable_aggregator()
1713 .with_max_retry_interval(Duration::from_millis(100))
1714 .with_retry_timeout(Duration::from_secs(1))
1715 .build()
1716 .await
1717 .unwrap();
1718
1719 network.start().await;
1721
1722 let leaves = network.data_source().subscribe_leaves(1).await;
1724 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1725
1726 let last_leaf = leaves.last().unwrap();
1728 let mut tx = data_source.write().await.unwrap();
1729 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1730 tx.commit().await.unwrap();
1731
1732 tracing::info!("fetch with write failure");
1734 match failure {
1735 FailureType::Begin => {
1736 data_source
1737 .as_ref()
1738 .fail_begins_writable(FailableAction::Any)
1739 .await
1740 },
1741 FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1742 FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1743 }
1744 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1745 data_source.as_ref().pass().await;
1746
1747 sleep(Duration::from_secs(1)).await;
1752
1753 tracing::info!("fetch with write success");
1756 let fetch = data_source.get_leaf(1).await;
1757 assert!(fetch.is_pending());
1758 assert_eq!(leaves[0], fetch.await);
1759
1760 sleep(Duration::from_secs(1)).await;
1761
1762 tracing::info!("retrieve from storage");
1764 let fetch = data_source.get_leaf(1).await;
1765 assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1766 }
1767
1768 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1769 async fn test_fetch_storage_failure_on_begin() {
1770 test_fetch_storage_failure_helper(FailureType::Begin).await;
1771 }
1772
1773 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1774 async fn test_fetch_storage_failure_on_write() {
1775 test_fetch_storage_failure_helper(FailureType::Write).await;
1776 }
1777
1778 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1779 async fn test_fetch_storage_failure_on_commit() {
1780 test_fetch_storage_failure_helper(FailureType::Commit).await;
1781 }
1782
1783 async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1784 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1786
1787 let port = pick_unused_port().unwrap();
1789 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1790 app.register_module(
1791 "availability",
1792 define_api(
1793 &Default::default(),
1794 MockBase::instance(),
1795 "1.0.0".parse().unwrap(),
1796 )
1797 .unwrap(),
1798 )
1799 .unwrap();
1800 network.spawn(
1801 "server",
1802 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1803 );
1804
1805 let provider = Provider::new(QueryServiceProvider::new(
1807 format!("http://localhost:{port}").parse().unwrap(),
1808 MockBase::instance(),
1809 ));
1810 let db = TmpDb::init().await;
1811 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1812 let data_source = FetchingDataSource::builder(storage, provider)
1813 .disable_proactive_fetching()
1814 .disable_aggregator()
1815 .with_min_retry_interval(Duration::from_millis(100))
1816 .build()
1817 .await
1818 .unwrap();
1819
1820 network.start().await;
1822
1823 let leaves = network.data_source().subscribe_leaves(1).await;
1825 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1826
1827 let last_leaf = leaves.last().unwrap();
1829 let mut tx = data_source.write().await.unwrap();
1830 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1831 tx.commit().await.unwrap();
1832
1833 tracing::info!("fetch with write failure");
1835 match failure {
1836 FailureType::Begin => {
1837 data_source
1838 .as_ref()
1839 .fail_one_begin_writable(FailableAction::Any)
1840 .await
1841 },
1842 FailureType::Write => {
1843 data_source
1844 .as_ref()
1845 .fail_one_write(FailableAction::Any)
1846 .await
1847 },
1848 FailureType::Commit => {
1849 data_source
1850 .as_ref()
1851 .fail_one_commit(FailableAction::Any)
1852 .await
1853 },
1854 }
1855 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1856
1857 let mut tx = data_source.read().await.unwrap();
1859 assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1860 }
1861
1862 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1863 async fn test_fetch_storage_failure_retry_on_begin() {
1864 test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1865 }
1866
1867 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1868 async fn test_fetch_storage_failure_retry_on_write() {
1869 test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1870 }
1871
1872 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1873 async fn test_fetch_storage_failure_retry_on_commit() {
1874 test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1875 }
1876
1877 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1878 async fn test_fetch_on_decide() {
1879 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1881
1882 let port = pick_unused_port().unwrap();
1884 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1885 app.register_module(
1886 "availability",
1887 define_api(
1888 &Default::default(),
1889 MockBase::instance(),
1890 "1.0.0".parse().unwrap(),
1891 )
1892 .unwrap(),
1893 )
1894 .unwrap();
1895 network.spawn(
1896 "server",
1897 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1898 );
1899
1900 let db = TmpDb::init().await;
1902 let provider = Provider::new(QueryServiceProvider::new(
1903 format!("http://localhost:{port}").parse().unwrap(),
1904 MockBase::instance(),
1905 ));
1906 let data_source = builder(&db, &provider)
1907 .await
1908 .with_max_retry_interval(Duration::from_secs(1))
1909 .build()
1910 .await
1911 .unwrap();
1912
1913 network.start().await;
1915
1916 let leaf = network
1918 .data_source()
1919 .subscribe_leaves(1)
1920 .await
1921 .next()
1922 .await
1923 .unwrap();
1924
1925 data_source.append(leaf.clone().into()).await.unwrap();
1927
1928 sleep(Duration::from_secs(5)).await;
1931
1932 let mut tx = data_source.read().await.unwrap();
1936 let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1937 let block = tx.get_block(id).await.unwrap();
1938 let vid = tx.get_vid_common(id).await.unwrap();
1939
1940 assert_eq!(block.hash(), leaf.block_hash());
1941 assert_eq!(vid.block_hash(), leaf.block_hash());
1942 }
1943
1944 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1945 async fn test_fetch_begin_failure() {
1946 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1948
1949 let port = pick_unused_port().unwrap();
1951 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1952 app.register_module(
1953 "availability",
1954 define_api(
1955 &Default::default(),
1956 MockBase::instance(),
1957 "1.0.0".parse().unwrap(),
1958 )
1959 .unwrap(),
1960 )
1961 .unwrap();
1962 network.spawn(
1963 "server",
1964 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1965 );
1966
1967 let provider = Provider::new(QueryServiceProvider::new(
1969 format!("http://localhost:{port}").parse().unwrap(),
1970 MockBase::instance(),
1971 ));
1972 let db = TmpDb::init().await;
1973 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1974 let data_source = FetchingDataSource::builder(storage, provider)
1975 .disable_proactive_fetching()
1976 .disable_aggregator()
1977 .with_min_retry_interval(Duration::from_millis(100))
1978 .build()
1979 .await
1980 .unwrap();
1981
1982 network.start().await;
1984
1985 let leaves = network.data_source().subscribe_leaves(1).await;
1987 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1988
1989 let last_leaf = leaves.last().unwrap();
1991 let mut tx = data_source.write().await.unwrap();
1992 tx.insert_leaf(last_leaf.clone()).await.unwrap();
1993 tx.commit().await.unwrap();
1994
1995 tracing::info!("fetch with transaction failure");
1998 data_source
1999 .as_ref()
2000 .fail_one_begin_read_only(FailableAction::Any)
2001 .await;
2002 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2003 }
2004
2005 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2006 async fn test_fetch_load_failure_block() {
2007 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2009
2010 let port = pick_unused_port().unwrap();
2012 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2013 app.register_module(
2014 "availability",
2015 define_api(
2016 &Default::default(),
2017 MockBase::instance(),
2018 "1.0.0".parse().unwrap(),
2019 )
2020 .unwrap(),
2021 )
2022 .unwrap();
2023 network.spawn(
2024 "server",
2025 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2026 );
2027
2028 let provider = Provider::new(QueryServiceProvider::new(
2030 format!("http://localhost:{port}").parse().unwrap(),
2031 MockBase::instance(),
2032 ));
2033 let db = TmpDb::init().await;
2034 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2035 let data_source = FetchingDataSource::builder(storage, provider)
2036 .disable_proactive_fetching()
2037 .disable_aggregator()
2038 .with_min_retry_interval(Duration::from_millis(100))
2039 .build()
2040 .await
2041 .unwrap();
2042
2043 network.start().await;
2045
2046 let mut leaves = network.data_source().subscribe_leaves(1).await;
2048 let leaf = leaves.next().await.unwrap();
2049
2050 let mut tx = data_source.write().await.unwrap();
2053 tx.insert_leaf(leaf.clone()).await.unwrap();
2054 tx.commit().await.unwrap();
2055
2056 tracing::info!("fetch with read failure");
2070 data_source
2071 .as_ref()
2072 .fail_one_read(FailableAction::GetHeader)
2073 .await;
2074 let fetch = data_source.get_block(leaf.block_hash()).await;
2075
2076 sleep(Duration::from_secs(2)).await;
2078 data_source.as_ref().pass().await;
2079
2080 let block: BlockQueryData<MockTypes> = fetch.await;
2081 assert_eq!(block.hash(), leaf.block_hash());
2082 }
2083
2084 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2085 async fn test_fetch_load_failure_tx() {
2086 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2088
2089 let port = pick_unused_port().unwrap();
2091 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2092 app.register_module(
2093 "availability",
2094 define_api(
2095 &Default::default(),
2096 MockBase::instance(),
2097 "1.0.0".parse().unwrap(),
2098 )
2099 .unwrap(),
2100 )
2101 .unwrap();
2102 network.spawn(
2103 "server",
2104 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2105 );
2106
2107 let provider = Provider::new(QueryServiceProvider::new(
2109 format!("http://localhost:{port}").parse().unwrap(),
2110 MockBase::instance(),
2111 ));
2112 let db = TmpDb::init().await;
2113 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2114 let data_source = FetchingDataSource::builder(storage, provider)
2115 .disable_proactive_fetching()
2116 .disable_aggregator()
2117 .with_min_retry_interval(Duration::from_millis(100))
2118 .build()
2119 .await
2120 .unwrap();
2121
2122 network.start().await;
2124
2125 let tx = mock_transaction(vec![1, 2, 3]);
2127 network.submit_transaction(tx.clone()).await;
2128 let tx = network
2129 .data_source()
2130 .get_block_containing_transaction(tx.commit())
2131 .await
2132 .await;
2133
2134 {
2136 let leaf = network
2137 .data_source()
2138 .get_leaf(tx.transaction.block_height() as usize)
2139 .await
2140 .await;
2141 let block = network
2142 .data_source()
2143 .get_block(tx.transaction.block_height() as usize)
2144 .await
2145 .await;
2146 let mut tx = data_source.write().await.unwrap();
2147 tx.insert_leaf(leaf.clone()).await.unwrap();
2148 tx.insert_block(block.clone()).await.unwrap();
2149 tx.commit().await.unwrap();
2150 }
2151
2152 tracing::info!("fetch success");
2154 assert_eq!(
2155 tx,
2156 data_source
2157 .get_block_containing_transaction(tx.transaction.hash())
2158 .await
2159 .await
2160 );
2161
2162 tracing::info!("fetch with read failure");
2174 data_source
2175 .as_ref()
2176 .fail_one_read(FailableAction::Any)
2177 .await;
2178 let fetch = data_source
2179 .get_block_containing_transaction(tx.transaction.hash())
2180 .await;
2181
2182 assert_eq!(tx, fetch.await);
2183 }
2184
2185 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2186 async fn test_stream_begin_failure() {
2187 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2189
2190 let port = pick_unused_port().unwrap();
2192 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2193 app.register_module(
2194 "availability",
2195 define_api(
2196 &Default::default(),
2197 MockBase::instance(),
2198 "1.0.0".parse().unwrap(),
2199 )
2200 .unwrap(),
2201 )
2202 .unwrap();
2203 network.spawn(
2204 "server",
2205 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2206 );
2207
2208 let provider = Provider::new(QueryServiceProvider::new(
2210 format!("http://localhost:{port}").parse().unwrap(),
2211 MockBase::instance(),
2212 ));
2213 let db = TmpDb::init().await;
2214 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2215 let data_source = FetchingDataSource::builder(storage, provider)
2216 .disable_proactive_fetching()
2217 .disable_aggregator()
2218 .with_min_retry_interval(Duration::from_millis(100))
2219 .with_range_chunk_size(3)
2220 .build()
2221 .await
2222 .unwrap();
2223
2224 network.start().await;
2226
2227 let leaves = network.data_source().subscribe_leaves(1).await;
2229 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2230
2231 let last_leaf = leaves.last().unwrap();
2233 let mut tx = data_source.write().await.unwrap();
2234 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2235 tx.commit().await.unwrap();
2236
2237 tracing::info!("stream with transaction failure");
2240 data_source
2241 .as_ref()
2242 .fail_one_begin_read_only(FailableAction::Any)
2243 .await;
2244 assert_eq!(
2245 leaves,
2246 data_source
2247 .subscribe_leaves(1)
2248 .await
2249 .take(5)
2250 .collect::<Vec<_>>()
2251 .await
2252 );
2253 }
2254
2255 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2256 async fn test_stream_load_failure() {
2257 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2259
2260 let port = pick_unused_port().unwrap();
2262 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2263 app.register_module(
2264 "availability",
2265 define_api(
2266 &Default::default(),
2267 MockBase::instance(),
2268 "1.0.0".parse().unwrap(),
2269 )
2270 .unwrap(),
2271 )
2272 .unwrap();
2273 network.spawn(
2274 "server",
2275 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2276 );
2277
2278 let provider = Provider::new(QueryServiceProvider::new(
2280 format!("http://localhost:{port}").parse().unwrap(),
2281 MockBase::instance(),
2282 ));
2283 let db = TmpDb::init().await;
2284 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2285 let data_source = FetchingDataSource::builder(storage, provider)
2286 .disable_proactive_fetching()
2287 .disable_aggregator()
2288 .with_min_retry_interval(Duration::from_millis(100))
2289 .with_range_chunk_size(3)
2290 .build()
2291 .await
2292 .unwrap();
2293
2294 network.start().await;
2296
2297 let leaves = network.data_source().subscribe_leaves(1).await;
2299 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2300
2301 let last_leaf = leaves.last().unwrap();
2303 let mut tx = data_source.write().await.unwrap();
2304 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2305 tx.commit().await.unwrap();
2306
2307 tracing::info!("stream with read failure");
2309 data_source.as_ref().fail_reads(FailableAction::Any).await;
2310 let fetches = data_source
2311 .get_block_range(1..=5)
2312 .await
2313 .collect::<Vec<_>>()
2314 .await;
2315
2316 sleep(Duration::from_secs(2)).await;
2318 data_source.as_ref().pass().await;
2319
2320 for (leaf, fetch) in leaves.iter().zip(fetches) {
2321 let block: BlockQueryData<MockTypes> = fetch.await;
2322 assert_eq!(block.hash(), leaf.block_hash());
2323 }
2324 }
2325
2326 enum MetadataType {
2327 Payload,
2328 Vid,
2329 }
2330
2331 async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2332 let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2334
2335 let port = pick_unused_port().unwrap();
2337 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2338 app.register_module(
2339 "availability",
2340 define_api(
2341 &Default::default(),
2342 MockBase::instance(),
2343 "1.0.0".parse().unwrap(),
2344 )
2345 .unwrap(),
2346 )
2347 .unwrap();
2348 network.spawn(
2349 "server",
2350 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2351 );
2352
2353 let provider = Provider::new(QueryServiceProvider::new(
2355 format!("http://localhost:{port}").parse().unwrap(),
2356 MockBase::instance(),
2357 ));
2358 let db = TmpDb::init().await;
2359 let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2360 let data_source = FetchingDataSource::builder(storage, provider)
2361 .disable_proactive_fetching()
2362 .disable_aggregator()
2363 .with_min_retry_interval(Duration::from_millis(100))
2364 .with_range_chunk_size(3)
2365 .build()
2366 .await
2367 .unwrap();
2368
2369 network.start().await;
2371
2372 let leaves = network.data_source().subscribe_leaves(1).await;
2374 let leaves = leaves.take(3).collect::<Vec<_>>().await;
2375
2376 let last_leaf = leaves.last().unwrap();
2378 let mut tx = data_source.write().await.unwrap();
2379 tx.insert_leaf(last_leaf.clone()).await.unwrap();
2380 tx.commit().await.unwrap();
2381
2382 let leaf = network.data_source().get_leaf(1).await.await;
2387 let block = network.data_source().get_block(1).await.await;
2388 let vid = network.data_source().get_vid_common(1).await.await;
2389 data_source
2390 .append(BlockInfo::new(leaf, Some(block), Some(vid), None, None))
2391 .await
2392 .unwrap();
2393
2394 tracing::info!("stream with transaction failure");
2396 data_source
2397 .as_ref()
2398 .fail_begins_read_only(FailableAction::Any)
2399 .await;
2400 match stream {
2401 MetadataType::Payload => {
2402 let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2403
2404 sleep(Duration::from_secs(2)).await;
2406 tracing::info!("stop failing transactions");
2407 data_source.as_ref().pass().await;
2408
2409 let payloads = payloads.collect::<Vec<_>>().await;
2410 for (leaf, payload) in leaves.iter().zip(payloads) {
2411 assert_eq!(payload.block_hash, leaf.block_hash());
2412 }
2413 },
2414 MetadataType::Vid => {
2415 let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2416
2417 sleep(Duration::from_secs(2)).await;
2419 tracing::info!("stop failing transactions");
2420 data_source.as_ref().pass().await;
2421
2422 let vids = vids.collect::<Vec<_>>().await;
2423 for (leaf, vid) in leaves.iter().zip(vids) {
2424 assert_eq!(vid.block_hash, leaf.block_hash());
2425 }
2426 },
2427 }
2428 }
2429
2430 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2431 async fn test_metadata_stream_begin_failure_payload() {
2432 test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2433 }
2434
2435 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2436 async fn test_metadata_stream_begin_failure_vid() {
2437 test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2438 }
2439
2440 async fn run_fallback_deserialization_test_helper<V: Versions>(port: u16, version: &str) {
2445 let mut network = MockNetwork::<MockDataSource, V>::init().await;
2446
2447 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2448
2449 app.register_module(
2451 "availability",
2452 define_api(
2453 &Default::default(),
2454 StaticVersion::<0, 1> {},
2455 "0.0.1".parse().unwrap(),
2456 )
2457 .unwrap(),
2458 )
2459 .unwrap();
2460
2461 app.register_module(
2462 "availability",
2463 define_api(
2464 &Default::default(),
2465 StaticVersion::<0, 1> {},
2466 "1.0.0".parse().unwrap(),
2467 )
2468 .unwrap(),
2469 )
2470 .unwrap();
2471
2472 network.spawn(
2473 "server",
2474 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2475 );
2476
2477 let db = TmpDb::init().await;
2478
2479 let provider_url = format!("http://localhost:{port}/{version}")
2480 .parse()
2481 .expect("Invalid URL");
2482
2483 let provider = Provider::new(QueryServiceProvider::new(
2484 provider_url,
2485 StaticVersion::<0, 1> {},
2486 ));
2487
2488 let ds = data_source(&db, &provider).await;
2489 network.start().await;
2490
2491 let leaves = network.data_source().subscribe_leaves(1).await;
2492 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2493 let test_leaf = &leaves[0];
2494 let test_payload = &leaves[2];
2495 let test_common = &leaves[3];
2496
2497 let mut fetches = vec![];
2498 fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2500 fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2501 fetches.push(
2502 ds.get_vid_common(test_common.block_hash())
2503 .await
2504 .map(ignore),
2505 );
2506
2507 sleep(Duration::from_secs(1)).await;
2510 for (i, fetch) in fetches.into_iter().enumerate() {
2511 tracing::info!("checking fetch {i} is unresolved");
2512 fetch.try_resolve().unwrap_err();
2513 }
2514
2515 ds.append(leaves.last().cloned().unwrap().into())
2520 .await
2521 .unwrap();
2522
2523 {
2525 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2526 let payload = ds.get_payload(test_payload.height() as usize).await;
2527 let common = ds.get_vid_common(test_common.height() as usize).await;
2528
2529 let truth = network.data_source();
2530 assert_eq!(
2531 leaf.await,
2532 truth.get_leaf(test_leaf.height() as usize).await.await
2533 );
2534 assert_eq!(
2535 payload.await,
2536 truth
2537 .get_payload(test_payload.height() as usize)
2538 .await
2539 .await
2540 );
2541 assert_eq!(
2542 common.await,
2543 truth
2544 .get_vid_common(test_common.height() as usize)
2545 .await
2546 .await
2547 );
2548 }
2549 }
2550
2551 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2552 async fn test_fallback_deserialization_for_fetch_requests_v0() {
2553 let port = pick_unused_port().unwrap();
2554
2555 run_fallback_deserialization_test_helper::<MockVersions>(port, "v0").await;
2561 }
2562
2563 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2564 async fn test_fallback_deserialization_for_fetch_requests_v1() {
2565 let port = pick_unused_port().unwrap();
2566
2567 run_fallback_deserialization_test_helper::<MockVersions>(port, "v1").await;
2571 }
2572
2573 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2574 async fn test_fallback_deserialization_for_fetch_requests_pos() {
2575 let port = pick_unused_port().unwrap();
2576
2577 run_fallback_deserialization_test_helper::<EpochsTestVersions>(port, "v1").await;
2580 }
2581 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2582 async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2583 let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
2589
2590 let port = pick_unused_port().unwrap();
2591 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2592
2593 app.register_module(
2594 "availability",
2595 define_api(
2596 &Default::default(),
2597 StaticVersion::<0, 1> {},
2598 "0.0.1".parse().unwrap(),
2599 )
2600 .unwrap(),
2601 )
2602 .unwrap();
2603
2604 network.spawn(
2605 "server",
2606 app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2607 );
2608
2609 let db = TmpDb::init().await;
2610 let provider = Provider::new(QueryServiceProvider::new(
2611 format!("http://localhost:{port}/v0").parse().unwrap(),
2612 StaticVersion::<0, 1> {},
2613 ));
2614 let ds = data_source(&db, &provider).await;
2615
2616 network.start().await;
2617
2618 let leaves = network.data_source().subscribe_leaves(1).await;
2619 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2620 let test_leaf = &leaves[0];
2621 let test_payload = &leaves[2];
2622 let test_common = &leaves[3];
2623
2624 let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2625 let payload = ds.get_payload(test_payload.height() as usize).await;
2626 let common = ds.get_vid_common(test_common.height() as usize).await;
2627
2628 sleep(Duration::from_secs(3)).await;
2629
2630 leaf.try_resolve().unwrap_err();
2632 payload.try_resolve().unwrap_err();
2633 common.try_resolve().unwrap_err();
2634 }
2635}