hotshot_query_service/fetching/provider/
query_service.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16    data::{ns_table, VidCommitment},
17    traits::{block_contents::BlockHeader, node_implementation::NodeType, EncodeBytes},
18    vid::{
19        advz::{advz_scheme, ADVZScheme},
20        avidm::{init_avidm_param, AvidMScheme},
21    },
22};
23use jf_vid::VidScheme;
24use surf_disco::{Client, Url};
25use vbs::{version::StaticVersionType, BinarySerializer};
26
27use super::Provider;
28use crate::{
29    availability::{
30        ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
31        PayloadQueryData, VidCommonQueryData,
32    },
33    fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
34    types::HeightIndexed,
35    Error, Header, Payload, VidCommon,
36};
37
38/// Data availability provider backed by another instance of this query service.
39///
40/// This fetcher implements the [`Provider`] interface by querying the REST API provided by another
41/// instance of this query service to try and retrieve missing objects.
42#[derive(Clone, Debug)]
43pub struct QueryServiceProvider<Ver: StaticVersionType> {
44    client: Client<Error, Ver>,
45}
46
47impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
48    pub fn new(url: Url, _: Ver) -> Self {
49        Self {
50            client: Client::new(url),
51        }
52    }
53}
54
55impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
56    async fn deserialize_legacy_payload<Types: NodeType>(
57        &self,
58        payload_bytes: Vec<u8>,
59        common_bytes: Vec<u8>,
60        req: PayloadRequest,
61    ) -> Option<Payload<Types>> {
62        let client_url = self.client.base_url();
63
64        let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
65            return None;
66        };
67
68        let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
69            &payload_bytes,
70        ) {
71            Ok(payload) => payload,
72            Err(err) => {
73                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
74                return None;
75            },
76        };
77
78        let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
79            &common_bytes,
80        ) {
81            Ok(common) => common,
82            Err(err) => {
83                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
84                return None;
85            },
86        };
87
88        let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
89        let bytes = payload.data.encode();
90
91        let commit = advz_scheme(num_storage_nodes)
92            .commit_only(bytes)
93            .inspect_err(|err| {
94                tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
95            })
96            .ok()?;
97
98        if commit != advz_commit {
99            tracing::error!(
100                ?req,
101                expected_commit=%advz_commit,
102                actual_commit=%commit,
103                %client_url,
104                "received inconsistent legacy payload"
105            );
106            return None;
107        }
108
109        Some(payload.data)
110    }
111
112    async fn deserialize_legacy_vid_common<Types: NodeType>(
113        &self,
114        bytes: Vec<u8>,
115        req: VidCommonRequest,
116    ) -> Option<VidCommon> {
117        let client_url = self.client.base_url();
118        let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
119            return None;
120        };
121
122        match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
123            Ok(res) => {
124                if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
125                    Some(VidCommon::V0(res.common))
126                } else {
127                    tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
128                    None
129                }
130            },
131            Err(err) => {
132                tracing::warn!(
133                    %client_url,
134                    ?req,
135                    %err,
136                    "failed to deserialize ADVZCommonQueryData"
137                );
138                None
139            },
140        }
141    }
142    async fn deserialize_legacy_leaf<Types: NodeType>(
143        &self,
144        bytes: Vec<u8>,
145        req: LeafRequest<Types>,
146    ) -> Option<LeafQueryData<Types>> {
147        let client_url = self.client.base_url();
148
149        match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
150            Ok(mut leaf) => {
151                if leaf.height() != req.height {
152                    tracing::error!(
153                        %client_url, ?req,
154                        expected_height = req.height,
155                        actual_height = leaf.height(),
156                        "received leaf with the wrong height"
157                    );
158                    return None;
159                }
160
161                let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
162                let actual_leaf_commit: [u8; 32] = leaf.hash().into();
163                if actual_leaf_commit != expected_leaf_commit {
164                    tracing::error!(
165                        %client_url, ?req,
166                        expected_leaf = %req.expected_leaf,
167                        actual_leaf = %leaf.hash(),
168                        "received leaf with the wrong hash"
169                    );
170                    return None;
171                }
172
173                let expected_qc_commit: [u8; 32] = req.expected_qc.into();
174                let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
175                if actual_qc_commit != expected_qc_commit {
176                    tracing::error!(
177                        %client_url, ?req,
178                        expected_qc = %req.expected_qc,
179                        actual_qc = %leaf.qc().commit(),
180                        "received leaf with the wrong QC"
181                    );
182                    return None;
183                }
184
185                // There is a potential DOS attack where the peer sends us a leaf with the full
186                // payload in it, which uses redundant resources in the database, since we fetch and
187                // store payloads separately. We can defend ourselves by simply dropping the payload
188                // if present.
189                leaf.leaf.unfill_block_payload();
190
191                Some(leaf.into())
192            },
193            Err(err) => {
194                tracing::warn!(
195                    %client_url, ?req, %err,
196                    "failed to deserialize legacy LeafQueryData"
197                );
198                None
199            },
200        }
201    }
202}
203
204#[async_trait]
205impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
206where
207    Types: NodeType,
208{
209    /// Fetches the `Payload` for a given request.
210    ///
211    /// Attempts to fetch and deserialize the requested data using the new type first.
212    /// If deserialization into the new type fails (e.g., because the provider is still returning
213    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
214    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
215    ///
216    async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
217        let client_url = self.client.base_url();
218        let req_hash = req.0;
219        // Fetch the payload and the VID common data. We need the common data to recompute the VID
220        // commitment, to ensure the payload we received is consistent with the commitment we
221        // requested.
222        let payload_bytes = self
223            .client
224            .get::<()>(&format!("availability/payload/hash/{}", req.0))
225            .bytes()
226            .await
227            .inspect_err(|err| {
228                tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
229            })
230            .ok()?;
231
232        let common_bytes = self
233            .client
234            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
235            .bytes()
236            .await
237            .inspect_err(|err| {
238                tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
239            })
240            .ok()?;
241
242        let payload =
243            vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
244                .inspect_err(|err| {
245                    tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
246                })
247                .ok();
248
249        let common =
250            vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
251                .inspect_err(|err| {
252                    tracing::info!(%err, %req_hash,
253                        "error deserializing VidCommonQueryData",
254                    );
255                })
256                .ok();
257
258        let (payload, common) = match (payload, common) {
259            (Some(payload), Some(common)) => (payload, common),
260            _ => {
261                tracing::info!(%req_hash, "falling back to legacy payload deserialization");
262
263                // fallback deserialization
264                return self
265                    .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
266                    .await;
267            },
268        };
269
270        match common.common() {
271            VidCommon::V0(common) => {
272                let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
273                let bytes = payload.data().encode();
274
275                let commit = advz_scheme(num_storage_nodes)
276                    .commit_only(bytes)
277                    .map(VidCommitment::V0)
278                    .inspect_err(|err| {
279                        tracing::error!(%err, %req_hash,  %num_storage_nodes, "failed to compute VID commitment (V0)");
280                    })
281                    .ok()?;
282
283                if commit != req.0 {
284                    tracing::error!(
285                        expected = %req_hash,
286                        actual = ?commit,
287                        %client_url,
288                        "VID commitment mismatch (V0)"
289                    );
290
291                    return None;
292                }
293            },
294            VidCommon::V1(common) => {
295                let bytes = payload.data().encode();
296
297                let avidm_param = init_avidm_param(common.total_weights)
298                    .inspect_err(|err| {
299                        tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
300                    })
301                    .ok()?;
302
303                let header = self
304                    .client
305                    .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
306                    .send()
307                    .await
308                    .inspect_err(|err| {
309                        tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
310                    })
311                    .ok()?;
312
313                if header.payload_commitment() != req.0 {
314                    tracing::error!(
315                        expected = %req_hash,
316                        actual = %header.payload_commitment(),
317                        %client_url,
318                        "header payload commitment mismatch (V1)"
319                    );
320                    return None;
321                }
322
323                let metadata = header.metadata().encode();
324                let commit = AvidMScheme::commit(
325                    &avidm_param,
326                    &bytes,
327                    ns_table::parse_ns_table(bytes.len(), &metadata),
328                )
329                .map(VidCommitment::V1)
330                .inspect_err(|err| {
331                    tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
332                })
333                .ok()?;
334
335                // Compare calculated commitment with requested commitment
336                if commit != req.0 {
337                    tracing::warn!(
338                        expected = %req_hash,
339                        actual = %commit,
340                        %client_url,
341                        "commitment type mismatch for AVIDM check"
342                    );
343                    return None;
344                }
345            },
346        }
347
348        Some(payload.data)
349    }
350}
351
352#[async_trait]
353impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
354    for QueryServiceProvider<Ver>
355where
356    Types: NodeType,
357{
358    /// Fetches the `Leaf` for a given request.
359    ///
360    /// Attempts to fetch and deserialize the requested data using the new type first.
361    /// If deserialization into the new type fails (e.g., because the provider is still returning
362    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
363    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
364    ///
365    async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
366        let client_url = self.client.base_url();
367
368        let bytes = self
369            .client
370            .get::<()>(&format!("availability/leaf/{}", req.height))
371            .bytes()
372            .await;
373        let bytes = match bytes {
374            Ok(bytes) => bytes,
375            Err(err) => {
376                tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
377
378                return None;
379            },
380        };
381
382        // Attempt to deserialize using the new type
383
384        match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
385            Ok(mut leaf) => {
386                if leaf.height() != req.height {
387                    tracing::error!(
388                        %client_url, ?req, ?leaf,
389                        expected_height = req.height,
390                        actual_height = leaf.height(),
391                        "received leaf with the wrong height"
392                    );
393                    return None;
394                }
395                if leaf.hash() != req.expected_leaf {
396                    tracing::error!(
397                        %client_url, ?req, ?leaf,
398                        expected_hash = %req.expected_leaf,
399                        actual_hash = %leaf.hash(),
400                        "received leaf with the wrong hash"
401                    );
402                    return None;
403                }
404                if leaf.qc().commit() != req.expected_qc {
405                    tracing::error!(
406                        %client_url, ?req, ?leaf,
407                        expected_qc = %req.expected_qc,
408                        actual_qc = %leaf.qc().commit(),
409                        "received leaf with the wrong QC"
410                    );
411                    return None;
412                }
413
414                // There is a potential DOS attack where the peer sends us a leaf with the full
415                // payload in it, which uses redundant resources in the database, since we fetch and
416                // store payloads separately. We can defend ourselves by simply dropping the payload
417                // if present.
418                leaf.leaf.unfill_block_payload();
419
420                Some(leaf)
421            },
422            Err(err) => {
423                tracing::info!(
424                    ?req, %err,
425                    "failed to deserialize LeafQueryData, falling back to legacy deserialization"
426                );
427                // Fallback deserialization
428                self.deserialize_legacy_leaf(bytes, req).await
429            },
430        }
431    }
432}
433
434#[async_trait]
435impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
436where
437    Types: NodeType,
438{
439    /// Fetches the `VidCommon` for a given request.
440    ///
441    /// Attempts to fetch and deserialize the requested data using the new type first.
442    /// If deserialization into the new type fails (e.g., because the provider is still returning
443    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
444    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
445    ///
446    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
447        let client_url = self.client.base_url();
448        let bytes = self
449            .client
450            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
451            .bytes()
452            .await;
453        let bytes = match bytes {
454            Ok(bytes) => bytes,
455            Err(err) => {
456                tracing::info!(
457                    %client_url, ?req, %err,
458                    "failed to fetch VID common bytes"
459                );
460                return None;
461            },
462        };
463
464        match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
465            Ok(res) => match req.0 {
466                VidCommitment::V0(commit) => {
467                    if let VidCommon::V0(common) = res.common {
468                        if ADVZScheme::is_consistent(&commit, &common).is_ok() {
469                            Some(VidCommon::V0(common))
470                        } else {
471                            tracing::error!(
472                                %client_url, ?req, ?commit, ?common,
473                                "VID V0 common data is inconsistent with commitment"
474                            );
475                            None
476                        }
477                    } else {
478                        tracing::warn!(?req, ?res, "Expect VID common data but found None");
479                        None
480                    }
481                },
482                VidCommitment::V1(_) => {
483                    if let VidCommon::V1(common) = res.common {
484                        Some(VidCommon::V1(common))
485                    } else {
486                        tracing::warn!(?req, ?res, "Expect VID common data but found None");
487                        None
488                    }
489                },
490            },
491            Err(err) => {
492                tracing::info!(
493                    %client_url, ?req, %err,
494                    "failed to deserialize as V1 VID common data, trying legacy fallback"
495                );
496                // Fallback deserialization
497                self.deserialize_legacy_vid_common::<Types>(bytes, req)
498                    .await
499            },
500        }
501    }
502}
503
504// These tests run the `postgres` Docker image, which doesn't work on Windows.
505#[cfg(all(test, not(target_os = "windows")))]
506mod test {
507    use std::{future::IntoFuture, time::Duration};
508
509    use committable::Committable;
510    use futures::{
511        future::{join, FutureExt},
512        stream::StreamExt,
513    };
514    use generic_array::GenericArray;
515    use hotshot_example_types::node_types::{EpochsTestVersions, TestVersions};
516    use hotshot_types::traits::node_implementation::Versions;
517    use portpicker::pick_unused_port;
518    use rand::RngCore;
519    use tide_disco::{error::ServerError, App};
520    use vbs::version::StaticVersion;
521
522    use super::*;
523    use crate::{
524        api::load_api,
525        availability::{
526            define_api, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData,
527            BlockWithTransaction, Fetch, UpdateAvailabilityData,
528        },
529        data_source::{
530            sql::{self, SqlDataSource},
531            storage::{
532                fail_storage::{FailStorage, FailableAction},
533                pruning::{PrunedHeightStorage, PrunerCfg},
534                sql::testing::TmpDb,
535                AvailabilityStorage, SqlStorage, UpdateAvailabilityStorage,
536            },
537            AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
538        },
539        fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
540        node::{data_source::NodeDataSource, SyncStatus},
541        task::BackgroundTask,
542        testing::{
543            consensus::{MockDataSource, MockNetwork},
544            mocks::{mock_transaction, MockBase, MockTypes, MockVersions},
545            sleep,
546        },
547        types::HeightIndexed,
548        ApiState,
549    };
550
551    type Provider = TestProvider<QueryServiceProvider<MockBase>>;
552    type EpochProvider = TestProvider<QueryServiceProvider<<EpochsTestVersions as Versions>::Base>>;
553
554    fn ignore<T>(_: T) {}
555
556    /// Build a data source suitable for this suite of tests.
557    async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
558        db: &TmpDb,
559        provider: &P,
560    ) -> sql::Builder<MockTypes, P> {
561        db.config()
562            .builder((*provider).clone())
563            .await
564            .unwrap()
565            // We disable proactive fetching for these tests, since we are intending to test on
566            // demand fetching, and proactive fetching could lead to false successes.
567            .disable_proactive_fetching()
568    }
569
570    /// A data source suitable for this suite of tests, with the default options.
571    async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
572        db: &TmpDb,
573        provider: &P,
574    ) -> SqlDataSource<MockTypes, P> {
575        builder(db, provider).await.build().await.unwrap()
576    }
577
578    #[test_log::test(tokio::test(flavor = "multi_thread"))]
579    async fn test_fetch_on_request() {
580        // Create the consensus network.
581        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
582
583        // Start a web server that the non-consensus node can use to fetch blocks.
584        let port = pick_unused_port().unwrap();
585        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
586        app.register_module(
587            "availability",
588            define_api(
589                &Default::default(),
590                MockBase::instance(),
591                "1.0.0".parse().unwrap(),
592            )
593            .unwrap(),
594        )
595        .unwrap();
596        network.spawn(
597            "server",
598            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
599        );
600
601        // Start a data source which is not receiving events from consensus, only from a peer.
602        let db = TmpDb::init().await;
603        let provider = Provider::new(QueryServiceProvider::new(
604            format!("http://localhost:{port}").parse().unwrap(),
605            MockBase::instance(),
606        ));
607        let data_source = data_source(&db, &provider).await;
608
609        // Start consensus.
610        network.start().await;
611
612        // Wait until the block height reaches 6. This gives us the genesis block, one additional
613        // block at the end, and then one block to play around with fetching each type of resource:
614        // * Leaf
615        // * Block
616        // * Payload
617        // * VID common
618        let leaves = network.data_source().subscribe_leaves(1).await;
619        let leaves = leaves.take(5).collect::<Vec<_>>().await;
620        let test_leaf = &leaves[0];
621        let test_block = &leaves[1];
622        let test_payload = &leaves[2];
623        let test_common = &leaves[3];
624
625        // Make requests for missing data that should _not_ trigger an active fetch:
626        tracing::info!("requesting unfetchable resources");
627        let mut fetches = vec![];
628        // * An unknown leaf hash.
629        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
630        // * An unknown leaf height.
631        fetches.push(
632            data_source
633                .get_leaf(test_leaf.height() as usize)
634                .await
635                .map(ignore),
636        );
637        // * An unknown block hash.
638        fetches.push(
639            data_source
640                .get_block(test_block.block_hash())
641                .await
642                .map(ignore),
643        );
644        fetches.push(
645            data_source
646                .get_payload(test_payload.block_hash())
647                .await
648                .map(ignore),
649        );
650        fetches.push(
651            data_source
652                .get_vid_common(test_common.block_hash())
653                .await
654                .map(ignore),
655        );
656        // * An unknown block height.
657        fetches.push(
658            data_source
659                .get_block(test_block.height() as usize)
660                .await
661                .map(ignore),
662        );
663        fetches.push(
664            data_source
665                .get_payload(test_payload.height() as usize)
666                .await
667                .map(ignore),
668        );
669        fetches.push(
670            data_source
671                .get_vid_common(test_common.height() as usize)
672                .await
673                .map(ignore),
674        );
675        // * Genesis VID common (no VID for genesis)
676        fetches.push(data_source.get_vid_common(0).await.map(ignore));
677        // * An unknown transaction.
678        fetches.push(
679            data_source
680                .get_block_containing_transaction(mock_transaction(vec![]).commit())
681                .await
682                .map(ignore),
683        );
684
685        // Even if we give data extra time to propagate, these requests will not resolve, since we
686        // didn't trigger any active fetches.
687        sleep(Duration::from_secs(1)).await;
688        for (i, fetch) in fetches.into_iter().enumerate() {
689            tracing::info!("checking fetch {i} is unresolved");
690            fetch.try_resolve().unwrap_err();
691        }
692
693        // Now we will actually fetch the missing data. First, since our node is not really
694        // connected to consensus, we need to give it a leaf after the range of interest so it
695        // learns about the correct block height. We will temporarily lock requests to the provider
696        // so that we can verify that without the provider, the node does _not_ get the data.
697        provider.block().await;
698        data_source
699            .append(leaves.last().cloned().unwrap().into())
700            .await
701            .unwrap();
702
703        tracing::info!("requesting fetchable resources");
704        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
705        let req_block = data_source.get_block(test_block.height() as usize).await;
706        let req_payload = data_source
707            .get_payload(test_payload.height() as usize)
708            .await;
709        let req_common = data_source
710            .get_vid_common(test_common.height() as usize)
711            .await;
712
713        // Give the requests some extra time to complete, and check that they still haven't
714        // resolved, since the provider is blocked. This just ensures the integrity of the test by
715        // checking the node didn't mysteriously get the block from somewhere else, so that when we
716        // unblock the provider and the node finally gets the block, we know it came from the
717        // provider.
718        sleep(Duration::from_secs(1)).await;
719        req_leaf.try_resolve().unwrap_err();
720        req_block.try_resolve().unwrap_err();
721        req_payload.try_resolve().unwrap_err();
722        req_common.try_resolve().unwrap_err();
723
724        // Unblock the request and see that we eventually receive the data.
725        provider.unblock().await;
726        let leaf = data_source
727            .get_leaf(test_leaf.height() as usize)
728            .await
729            .await;
730        let block = data_source
731            .get_block(test_block.height() as usize)
732            .await
733            .await;
734        let payload = data_source
735            .get_payload(test_payload.height() as usize)
736            .await
737            .await;
738        let common = data_source
739            .get_vid_common(test_common.height() as usize)
740            .await
741            .await;
742        {
743            // Verify the data.
744            let truth = network.data_source();
745            assert_eq!(
746                leaf,
747                truth.get_leaf(test_leaf.height() as usize).await.await
748            );
749            assert_eq!(
750                block,
751                truth.get_block(test_block.height() as usize).await.await
752            );
753            assert_eq!(
754                payload,
755                truth
756                    .get_payload(test_payload.height() as usize)
757                    .await
758                    .await
759            );
760            assert_eq!(
761                common,
762                truth
763                    .get_vid_common(test_common.height() as usize)
764                    .await
765                    .await
766            );
767        }
768
769        // Fetching the block and payload should have also fetched the corresponding leaves, since
770        // we have an invariant that we should not store a block in the database without its
771        // corresponding leaf and header. Thus we should be able to get the leaves even if the
772        // provider is blocked.
773        provider.block().await;
774        for leaf in [test_block, test_payload] {
775            tracing::info!("fetching existing leaf {}", leaf.height());
776            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
777            assert_eq!(*leaf, fetched_leaf);
778        }
779
780        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
781        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
782        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
783        // with this hash exists, and trigger a fetch for it.
784        tracing::info!("fetching block by hash");
785        provider.unblock().await;
786        {
787            let block = data_source.get_block(test_leaf.block_hash()).await.await;
788            assert_eq!(block.hash(), leaf.block_hash());
789        }
790
791        // Test a similar scenario, but with payload instead of block: we are aware of
792        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
793        // hash.
794        tracing::info!("fetching payload by hash");
795        {
796            let leaf = leaves.last().unwrap();
797            let payload = data_source.get_payload(leaf.block_hash()).await.await;
798            assert_eq!(payload.height(), leaf.height());
799            assert_eq!(payload.block_hash(), leaf.block_hash());
800            assert_eq!(payload.hash(), leaf.payload_hash());
801        }
802    }
803
804    #[tokio::test(flavor = "multi_thread")]
805    async fn test_fetch_on_request_epoch_version() {
806        // This test verifies that our provider can handle fetching things by their hashes,
807        // specifically focused on epoch version transitions
808        tracing::info!("Starting test_fetch_on_request_epoch_version");
809
810        // Create the consensus network.
811        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
812
813        // Start a web server that the non-consensus node can use to fetch blocks.
814        let port = pick_unused_port().unwrap();
815        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
816        app.register_module(
817            "availability",
818            define_api(
819                &Default::default(),
820                <EpochsTestVersions as Versions>::Base::instance(),
821                "1.0.0".parse().unwrap(),
822            )
823            .unwrap(),
824        )
825        .unwrap();
826        network.spawn(
827            "server",
828            app.serve(
829                format!("0.0.0.0:{port}"),
830                <EpochsTestVersions as Versions>::Base::instance(),
831            ),
832        );
833
834        // Start a data source which is not receiving events from consensus, only from a peer.
835        // Use our special test provider that handles epoch version transitions
836        let db = TmpDb::init().await;
837        let provider = EpochProvider::new(QueryServiceProvider::new(
838            format!("http://localhost:{port}").parse().unwrap(),
839            <EpochsTestVersions as Versions>::Base::instance(),
840        ));
841        let data_source = data_source(&db, &provider).await;
842
843        // Start consensus.
844        network.start().await;
845
846        // Wait until the block height reaches 6. This gives us the genesis block, one additional
847        // block at the end, and then one block to play around with fetching each type of resource:
848        // * Leaf
849        // * Block
850        // * Payload
851        // * VID common
852        let leaves = network.data_source().subscribe_leaves(1).await;
853        let leaves = leaves.take(5).collect::<Vec<_>>().await;
854        let test_leaf = &leaves[0];
855        let test_block = &leaves[1];
856        let test_payload = &leaves[2];
857        let test_common = &leaves[3];
858
859        // Make requests for missing data that should _not_ trigger an active fetch:
860        let mut fetches = vec![];
861        // * An unknown leaf hash.
862        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
863        // * An unknown leaf height.
864        fetches.push(
865            data_source
866                .get_leaf(test_leaf.height() as usize)
867                .await
868                .map(ignore),
869        );
870        // * An unknown block hash.
871        fetches.push(
872            data_source
873                .get_block(test_block.block_hash())
874                .await
875                .map(ignore),
876        );
877        fetches.push(
878            data_source
879                .get_payload(test_payload.block_hash())
880                .await
881                .map(ignore),
882        );
883        fetches.push(
884            data_source
885                .get_vid_common(test_common.block_hash())
886                .await
887                .map(ignore),
888        );
889        // * An unknown block height.
890        fetches.push(
891            data_source
892                .get_block(test_block.height() as usize)
893                .await
894                .map(ignore),
895        );
896        fetches.push(
897            data_source
898                .get_payload(test_payload.height() as usize)
899                .await
900                .map(ignore),
901        );
902        fetches.push(
903            data_source
904                .get_vid_common(test_common.height() as usize)
905                .await
906                .map(ignore),
907        );
908        // * Genesis VID common (no VID for genesis)
909        fetches.push(data_source.get_vid_common(0).await.map(ignore));
910        // * An unknown transaction.
911        fetches.push(
912            data_source
913                .get_block_containing_transaction(mock_transaction(vec![]).commit())
914                .await
915                .map(ignore),
916        );
917
918        // Even if we give data extra time to propagate, these requests will not resolve, since we
919        // didn't trigger any active fetches.
920        sleep(Duration::from_secs(1)).await;
921        for (i, fetch) in fetches.into_iter().enumerate() {
922            tracing::info!("checking fetch {i} is unresolved");
923            fetch.try_resolve().unwrap_err();
924        }
925
926        // Now we will actually fetch the missing data. First, since our node is not really
927        // connected to consensus, we need to give it a leaf after the range of interest so it
928        // learns about the correct block height. We will temporarily lock requests to the provider
929        // so that we can verify that without the provider, the node does _not_ get the data.
930        provider.block().await;
931        data_source
932            .append(leaves.last().cloned().unwrap().into())
933            .await
934            .unwrap();
935
936        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
937        let req_block = data_source.get_block(test_block.height() as usize).await;
938        let req_payload = data_source
939            .get_payload(test_payload.height() as usize)
940            .await;
941        let req_common = data_source
942            .get_vid_common(test_common.height() as usize)
943            .await;
944
945        // Give the requests some extra time to complete, and check that they still haven't
946        // resolved, since the provider is blocked. This just ensures the integrity of the test by
947        // checking the node didn't mysteriously get the block from somewhere else, so that when we
948        // unblock the provider and the node finally gets the block, we know it came from the
949        // provider.
950        sleep(Duration::from_secs(1)).await;
951        req_leaf.try_resolve().unwrap_err();
952        req_block.try_resolve().unwrap_err();
953        req_payload.try_resolve().unwrap_err();
954        req_common.try_resolve().unwrap_err();
955
956        // Unblock the request and see that we eventually receive the data.
957        provider.unblock().await;
958        let leaf = data_source
959            .get_leaf(test_leaf.height() as usize)
960            .await
961            .await;
962        let block = data_source
963            .get_block(test_block.height() as usize)
964            .await
965            .await;
966        let payload = data_source
967            .get_payload(test_payload.height() as usize)
968            .await
969            .await;
970        let common = data_source
971            .get_vid_common(test_common.height() as usize)
972            .await
973            .await;
974        {
975            // Verify the data.
976            let truth = network.data_source();
977            assert_eq!(
978                leaf,
979                truth.get_leaf(test_leaf.height() as usize).await.await
980            );
981            assert_eq!(
982                block,
983                truth.get_block(test_block.height() as usize).await.await
984            );
985            assert_eq!(
986                payload,
987                truth
988                    .get_payload(test_payload.height() as usize)
989                    .await
990                    .await
991            );
992            assert_eq!(
993                common,
994                truth
995                    .get_vid_common(test_common.height() as usize)
996                    .await
997                    .await
998            );
999        }
1000
1001        // Fetching the block and payload should have also fetched the corresponding leaves, since
1002        // we have an invariant that we should not store a block in the database without its
1003        // corresponding leaf and header. Thus we should be able to get the leaves even if the
1004        // provider is blocked.
1005        provider.block().await;
1006        for leaf in [test_block, test_payload] {
1007            tracing::info!("fetching existing leaf {}", leaf.height());
1008            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1009            assert_eq!(*leaf, fetched_leaf);
1010        }
1011
1012        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
1013        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
1014        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
1015        // with this hash exists, and trigger a fetch for it.
1016        provider.unblock().await;
1017        {
1018            let block = data_source.get_block(test_leaf.block_hash()).await.await;
1019            assert_eq!(block.hash(), leaf.block_hash());
1020        }
1021
1022        // Test a similar scenario, but with payload instead of block: we are aware of
1023        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
1024        // hash.
1025        {
1026            let leaf = leaves.last().unwrap();
1027            let payload = data_source.get_payload(leaf.block_hash()).await.await;
1028            assert_eq!(payload.height(), leaf.height());
1029            assert_eq!(payload.block_hash(), leaf.block_hash());
1030            assert_eq!(payload.hash(), leaf.payload_hash());
1031        }
1032
1033        // Add more debug logs throughout the test
1034        tracing::info!("Test completed successfully!");
1035    }
1036
1037    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1038    async fn test_fetch_block_and_leaf_concurrently() {
1039        // Create the consensus network.
1040        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1041
1042        // Start a web server that the non-consensus node can use to fetch blocks.
1043        let port = pick_unused_port().unwrap();
1044        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1045        app.register_module(
1046            "availability",
1047            define_api(
1048                &Default::default(),
1049                MockBase::instance(),
1050                "1.0.0".parse().unwrap(),
1051            )
1052            .unwrap(),
1053        )
1054        .unwrap();
1055        network.spawn(
1056            "server",
1057            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1058        );
1059
1060        // Start a data source which is not receiving events from consensus, only from a peer.
1061        let db = TmpDb::init().await;
1062        let provider = Provider::new(QueryServiceProvider::new(
1063            format!("http://localhost:{port}").parse().unwrap(),
1064            MockBase::instance(),
1065        ));
1066        let data_source = data_source(&db, &provider).await;
1067
1068        // Start consensus.
1069        network.start().await;
1070
1071        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1072        // block at the end, and then one block that we can use to test fetching.
1073        let leaves = network.data_source().subscribe_leaves(1).await;
1074        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1075        let test_leaf = &leaves[0];
1076
1077        // Tell the node about a leaf after the one of interest so it learns about the block height.
1078        data_source.append(leaves[1].clone().into()).await.unwrap();
1079
1080        // Fetch a leaf and the corresponding block at the same time. This will result in two tasks
1081        // trying to fetch the same leaf, but one should win and notify the other, which ultimately
1082        // ends up not fetching anything.
1083        let (leaf, block) = join(
1084            data_source
1085                .get_leaf(test_leaf.height() as usize)
1086                .await
1087                .into_future(),
1088            data_source
1089                .get_block(test_leaf.height() as usize)
1090                .await
1091                .into_future(),
1092        )
1093        .await;
1094        assert_eq!(leaf, *test_leaf);
1095        assert_eq!(leaf.header(), block.header());
1096    }
1097
1098    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1099    async fn test_fetch_different_blocks_same_payload() {
1100        // Create the consensus network.
1101        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1102
1103        // Start a web server that the non-consensus node can use to fetch blocks.
1104        let port = pick_unused_port().unwrap();
1105        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1106        app.register_module(
1107            "availability",
1108            define_api(
1109                &Default::default(),
1110                MockBase::instance(),
1111                "1.0.0".parse().unwrap(),
1112            )
1113            .unwrap(),
1114        )
1115        .unwrap();
1116        network.spawn(
1117            "server",
1118            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1119        );
1120
1121        // Start a data source which is not receiving events from consensus, only from a peer.
1122        let db = TmpDb::init().await;
1123        let provider = Provider::new(QueryServiceProvider::new(
1124            format!("http://localhost:{port}").parse().unwrap(),
1125            MockBase::instance(),
1126        ));
1127        let data_source = data_source(&db, &provider).await;
1128
1129        // Start consensus.
1130        network.start().await;
1131
1132        // Wait until the block height reaches 4. This gives us the genesis block, one additional
1133        // block at the end, and then two blocks that we can use to test fetching.
1134        let leaves = network.data_source().subscribe_leaves(1).await;
1135        let leaves = leaves.take(4).collect::<Vec<_>>().await;
1136
1137        // Tell the node about a leaf after the range of interest so it learns about the block
1138        // height.
1139        data_source
1140            .append(leaves.last().cloned().unwrap().into())
1141            .await
1142            .unwrap();
1143
1144        // All the blocks here are empty, so they have the same payload:
1145        assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1146        // If we fetch both blocks at the same time, we can check that we haven't broken anything
1147        // with whatever optimizations we add to deduplicate payload fetching.
1148        let (block1, block2) = join(
1149            data_source
1150                .get_block(leaves[0].height() as usize)
1151                .await
1152                .into_future(),
1153            data_source
1154                .get_block(leaves[1].height() as usize)
1155                .await
1156                .into_future(),
1157        )
1158        .await;
1159        assert_eq!(block1.header(), leaves[0].header());
1160        assert_eq!(block2.header(), leaves[1].header());
1161    }
1162
1163    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1164    async fn test_fetch_stream() {
1165        // Create the consensus network.
1166        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1167
1168        // Start a web server that the non-consensus node can use to fetch blocks.
1169        let port = pick_unused_port().unwrap();
1170        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1171        app.register_module(
1172            "availability",
1173            define_api(
1174                &Default::default(),
1175                MockBase::instance(),
1176                "1.0.0".parse().unwrap(),
1177            )
1178            .unwrap(),
1179        )
1180        .unwrap();
1181        network.spawn(
1182            "server",
1183            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1184        );
1185
1186        // Start a data source which is not receiving events from consensus, only from a peer.
1187        let db = TmpDb::init().await;
1188        let provider = Provider::new(QueryServiceProvider::new(
1189            format!("http://localhost:{port}").parse().unwrap(),
1190            MockBase::instance(),
1191        ));
1192        let data_source = data_source(&db, &provider).await;
1193
1194        // Start consensus.
1195        network.start().await;
1196
1197        // Subscribe to objects from the future.
1198        let blocks = data_source.subscribe_blocks(0).await;
1199        let leaves = data_source.subscribe_leaves(0).await;
1200        let common = data_source.subscribe_vid_common(0).await;
1201
1202        // Wait for a few blocks to be finalized.
1203        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1204        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1205
1206        // Tell the node about a leaf after the range of interest so it learns about the block
1207        // height.
1208        data_source
1209            .append(finalized_leaves.last().cloned().unwrap().into())
1210            .await
1211            .unwrap();
1212
1213        // Check the subscriptions.
1214        let blocks = blocks.take(5).collect::<Vec<_>>().await;
1215        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1216        let common = common.take(5).collect::<Vec<_>>().await;
1217        for i in 0..5 {
1218            tracing::info!("checking block {i}");
1219            assert_eq!(leaves[i], finalized_leaves[i]);
1220            assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1221            assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1222        }
1223    }
1224
1225    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1226    async fn test_fetch_range_start() {
1227        // Create the consensus network.
1228        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1229
1230        // Start a web server that the non-consensus node can use to fetch blocks.
1231        let port = pick_unused_port().unwrap();
1232        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1233        app.register_module(
1234            "availability",
1235            define_api(
1236                &Default::default(),
1237                MockBase::instance(),
1238                "1.0.0".parse().unwrap(),
1239            )
1240            .unwrap(),
1241        )
1242        .unwrap();
1243        network.spawn(
1244            "server",
1245            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1246        );
1247
1248        // Start a data source which is not receiving events from consensus, only from a peer.
1249        let db = TmpDb::init().await;
1250        let provider = Provider::new(QueryServiceProvider::new(
1251            format!("http://localhost:{port}").parse().unwrap(),
1252            MockBase::instance(),
1253        ));
1254        let data_source = data_source(&db, &provider).await;
1255
1256        // Start consensus.
1257        network.start().await;
1258
1259        // Wait for a few blocks to be finalized.
1260        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1261        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1262
1263        // Tell the node about a leaf after the range of interest (so it learns about the block
1264        // height) and one in the middle of the range, to test what happens when data is missing
1265        // from the beginning of the range but other data is available.
1266        let mut tx = data_source.write().await.unwrap();
1267        tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1268        tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1269        tx.commit().await.unwrap();
1270
1271        // Get the whole range of leaves.
1272        let leaves = data_source
1273            .get_leaf_range(..5)
1274            .await
1275            .then(Fetch::resolve)
1276            .collect::<Vec<_>>()
1277            .await;
1278        for i in 0..5 {
1279            tracing::info!("checking leaf {i}");
1280            assert_eq!(leaves[i], finalized_leaves[i]);
1281        }
1282    }
1283
1284    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1285    async fn fetch_transaction() {
1286        // Create the consensus network.
1287        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1288
1289        // Start a web server that the non-consensus node can use to fetch blocks.
1290        let port = pick_unused_port().unwrap();
1291        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1292        app.register_module(
1293            "availability",
1294            define_api(
1295                &Default::default(),
1296                MockBase::instance(),
1297                "1.0.0".parse().unwrap(),
1298            )
1299            .unwrap(),
1300        )
1301        .unwrap();
1302        network.spawn(
1303            "server",
1304            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1305        );
1306
1307        // Start a data source which is not receiving events from consensus. We don't give it a
1308        // fetcher since transactions are always fetched passively anyways.
1309        let db = TmpDb::init().await;
1310        let data_source = data_source(&db, &NoFetching).await;
1311
1312        // Subscribe to blocks.
1313        let mut leaves = network.data_source().subscribe_leaves(1).await;
1314        let mut blocks = network.data_source().subscribe_blocks(1).await;
1315
1316        // Start consensus.
1317        network.start().await;
1318
1319        // Subscribe to a transaction which hasn't been sequenced yet. This is completely passive
1320        // and works without a fetcher; we don't trigger fetches for transactions that we don't know
1321        // exist.
1322        let tx = mock_transaction(vec![1, 2, 3]);
1323        let fut = data_source
1324            .get_block_containing_transaction(tx.commit())
1325            .await;
1326
1327        // Sequence the transaction.
1328        network.submit_transaction(tx.clone()).await;
1329
1330        // Send blocks to the query service, the future will resolve as soon as it sees a block
1331        // containing the transaction.
1332        let block = loop {
1333            let leaf = leaves.next().await.unwrap();
1334            let block = blocks.next().await.unwrap();
1335
1336            data_source
1337                .append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1338                .await
1339                .unwrap();
1340
1341            if block.transaction_by_hash(tx.commit()).is_some() {
1342                break block;
1343            }
1344        };
1345        tracing::info!("transaction included in block {}", block.height());
1346
1347        let fetched_tx = fut.await;
1348        assert_eq!(
1349            fetched_tx,
1350            BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1351        );
1352
1353        // Future queries for this transaction resolve immediately.
1354        assert_eq!(
1355            fetched_tx,
1356            data_source
1357                .get_block_containing_transaction(tx.commit())
1358                .await
1359                .await
1360        );
1361    }
1362
1363    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1364    async fn test_retry() {
1365        // Create the consensus network.
1366        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1367
1368        // Start a web server that the non-consensus node can use to fetch blocks.
1369        let port = pick_unused_port().unwrap();
1370        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1371        app.register_module(
1372            "availability",
1373            define_api(
1374                &Default::default(),
1375                MockBase::instance(),
1376                "1.0.0".parse().unwrap(),
1377            )
1378            .unwrap(),
1379        )
1380        .unwrap();
1381        network.spawn(
1382            "server",
1383            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1384        );
1385
1386        // Start a data source which is not receiving events from consensus.
1387        let db = TmpDb::init().await;
1388        let provider = Provider::new(QueryServiceProvider::new(
1389            format!("http://localhost:{port}").parse().unwrap(),
1390            MockBase::instance(),
1391        ));
1392        let data_source = builder(&db, &provider)
1393            .await
1394            .with_max_retry_interval(Duration::from_secs(1))
1395            .build()
1396            .await
1397            .unwrap();
1398
1399        // Start consensus.
1400        network.start().await;
1401
1402        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1403        // block at the end, and one block to try fetching.
1404        let leaves = network.data_source().subscribe_leaves(1).await;
1405        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1406        let test_leaf = &leaves[0];
1407
1408        // Cause requests to fail temporarily, so we can test retries.
1409        provider.fail();
1410
1411        // Give the node a leaf after the range of interest so it learns about the correct block
1412        // height.
1413        data_source
1414            .append(leaves.last().cloned().unwrap().into())
1415            .await
1416            .unwrap();
1417
1418        tracing::info!("requesting leaf from failing providers");
1419        let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1420
1421        // Wait a few retries and check that the request has not completed, since the provider is
1422        // failing.
1423        sleep(Duration::from_secs(5)).await;
1424        fut.try_resolve().unwrap_err();
1425
1426        // As soon as the provider recovers, the request can complete.
1427        provider.unfail();
1428        assert_eq!(
1429            data_source
1430                .get_leaf(test_leaf.height() as usize)
1431                .await
1432                .await,
1433            *test_leaf
1434        );
1435    }
1436
1437    fn random_vid_commit() -> VidCommitment {
1438        let mut bytes = [0; 32];
1439        rand::thread_rng().fill_bytes(&mut bytes);
1440        VidCommitment::V0(GenericArray::from(bytes).into())
1441    }
1442
1443    async fn malicious_server(port: u16) {
1444        let mut api = load_api::<(), ServerError, MockBase>(
1445            None::<std::path::PathBuf>,
1446            include_str!("../../../api/availability.toml"),
1447            vec![],
1448        )
1449        .unwrap();
1450
1451        api.get("get_payload", move |_, _| {
1452            async move {
1453                // No matter what data we are asked for, always respond with dummy data.
1454                Ok(PayloadQueryData::<MockTypes>::genesis::<TestVersions>(
1455                    &Default::default(),
1456                    &Default::default(),
1457                )
1458                .await)
1459            }
1460            .boxed()
1461        })
1462        .unwrap()
1463        .get("get_vid_common", move |_, _| {
1464            async move {
1465                // No matter what data we are asked for, always respond with dummy data.
1466                Ok(VidCommonQueryData::<MockTypes>::genesis::<TestVersions>(
1467                    &Default::default(),
1468                    &Default::default(),
1469                )
1470                .await)
1471            }
1472            .boxed()
1473        })
1474        .unwrap();
1475
1476        let mut app = App::<(), ServerError>::with_state(());
1477        app.register_module("availability", api).unwrap();
1478        app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1479            .await
1480            .ok();
1481    }
1482
1483    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1484    async fn test_fetch_from_malicious_server() {
1485        let port = pick_unused_port().unwrap();
1486        let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1487
1488        let provider = QueryServiceProvider::new(
1489            format!("http://localhost:{port}").parse().unwrap(),
1490            MockBase::instance(),
1491        );
1492        provider.client.connect(None).await;
1493
1494        // Query for a random payload, the server will respond with a different payload, and we
1495        // should detect the error.
1496        let res =
1497            ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1498                .await;
1499        assert_eq!(res, None);
1500
1501        // Query for a random VID common, the server will respond with a different one, and we
1502        // should detect the error.
1503        let res =
1504            ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1505                .await;
1506        assert_eq!(res, None);
1507    }
1508
1509    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1510    async fn test_archive_recovery() {
1511        // Create the consensus network.
1512        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1513
1514        // Start a web server that the non-consensus node can use to fetch blocks.
1515        let port = pick_unused_port().unwrap();
1516        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1517        app.register_module(
1518            "availability",
1519            define_api(
1520                &Default::default(),
1521                MockBase::instance(),
1522                "1.0.0".parse().unwrap(),
1523            )
1524            .unwrap(),
1525        )
1526        .unwrap();
1527        network.spawn(
1528            "server",
1529            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1530        );
1531
1532        // Start a data source which is not receiving events from consensus, only from a peer. The
1533        // data source is at first configured to aggressively prune data.
1534        let db = TmpDb::init().await;
1535        let provider = Provider::new(QueryServiceProvider::new(
1536            format!("http://localhost:{port}").parse().unwrap(),
1537            MockBase::instance(),
1538        ));
1539        let mut data_source = db
1540            .config()
1541            .pruner_cfg(
1542                PrunerCfg::new()
1543                    .with_target_retention(Duration::from_secs(0))
1544                    .with_interval(Duration::from_secs(5)),
1545            )
1546            .unwrap()
1547            .builder(provider.clone())
1548            .await
1549            .unwrap()
1550            // Set a fast retry for failed operations. Occasionally storage operations will fail due
1551            // to conflicting write-mode transactions running concurrently. This is ok as they will
1552            // be retried. Having a fast retry interval speeds up the test.
1553            .with_min_retry_interval(Duration::from_millis(100))
1554            // Randomize retries a lot. This will temporarlly separate competing transactions write
1555            // transactions with high probability, so that one of them quickly gets exclusive access
1556            // to the database.
1557            .with_retry_randomization_factor(3.)
1558            .build()
1559            .await
1560            .unwrap();
1561
1562        // Start consensus.
1563        network.start().await;
1564
1565        // Wait until a few blocks are produced.
1566        let leaves = network.data_source().subscribe_leaves(1).await;
1567        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1568
1569        // The disconnected data source has no data yet, so it hasn't done any pruning.
1570        let pruned_height = data_source
1571            .read()
1572            .await
1573            .unwrap()
1574            .load_pruned_height()
1575            .await
1576            .unwrap();
1577        // Either None or 0 is acceptable, depending on whether or not the prover has run yet.
1578        assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1579
1580        // Send the last leaf to the disconnected data source so it learns about the height and
1581        // fetches the missing data.
1582        let last_leaf = leaves.last().unwrap();
1583        data_source.append(last_leaf.clone().into()).await.unwrap();
1584
1585        // Trigger a fetch of each leaf so the database gets populated.
1586        for i in 1..=last_leaf.height() {
1587            tracing::info!(i, "fetching leaf");
1588            assert_eq!(
1589                data_source.get_leaf(i as usize).await.await,
1590                leaves[i as usize - 1]
1591            );
1592        }
1593
1594        // After a bit of time, the pruner has run and deleted all the missing data we just fetched.
1595        loop {
1596            let pruned_height = data_source
1597                .read()
1598                .await
1599                .unwrap()
1600                .load_pruned_height()
1601                .await
1602                .unwrap();
1603            if pruned_height == Some(last_leaf.height()) {
1604                break;
1605            }
1606            tracing::info!(
1607                ?pruned_height,
1608                target_height = last_leaf.height(),
1609                "waiting for pruner to run"
1610            );
1611            sleep(Duration::from_secs(1)).await;
1612        }
1613
1614        // Now close the data source and restart it with archive recovery.
1615        data_source = db
1616            .config()
1617            .archive()
1618            .builder(provider.clone())
1619            .await
1620            .unwrap()
1621            .with_minor_scan_interval(Duration::from_secs(1))
1622            .with_major_scan_interval(1)
1623            .build()
1624            .await
1625            .unwrap();
1626
1627        // Pruned height should be reset.
1628        let pruned_height = data_source
1629            .read()
1630            .await
1631            .unwrap()
1632            .load_pruned_height()
1633            .await
1634            .unwrap();
1635        assert_eq!(pruned_height, None);
1636
1637        // The node has pruned all of it's data including the latest block, so it's forgotten the
1638        // block height. We need to give it another leaf with some height so it will be willing to
1639        // fetch.
1640        data_source.append(last_leaf.clone().into()).await.unwrap();
1641
1642        // Wait for the data to be restored. It should be restored by the next major scan.
1643        loop {
1644            let sync_status = data_source.sync_status().await.unwrap();
1645
1646            // VID shares are unique to a node and will never be fetched from a peer; this is
1647            // acceptable since there is redundancy built into the VID scheme. Ignore missing VID
1648            // shares in the `is_fully_synced` check.
1649            if (SyncStatus {
1650                missing_vid_shares: 0,
1651                ..sync_status
1652            })
1653            .is_fully_synced()
1654            {
1655                break;
1656            }
1657            tracing::info!(?sync_status, "waiting for node to sync");
1658            sleep(Duration::from_secs(1)).await;
1659        }
1660
1661        // The node remains fully synced even after some time; no pruning.
1662        sleep(Duration::from_secs(3)).await;
1663        let sync_status = data_source.sync_status().await.unwrap();
1664        assert!(
1665            (SyncStatus {
1666                missing_vid_shares: 0,
1667                ..sync_status
1668            })
1669            .is_fully_synced(),
1670            "{sync_status:?}"
1671        );
1672    }
1673
1674    #[derive(Clone, Copy, Debug)]
1675    enum FailureType {
1676        Begin,
1677        Write,
1678        Commit,
1679    }
1680
1681    async fn test_fetch_storage_failure_helper(failure: FailureType) {
1682        // Create the consensus network.
1683        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1684
1685        // Start a web server that the non-consensus node can use to fetch blocks.
1686        let port = pick_unused_port().unwrap();
1687        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1688        app.register_module(
1689            "availability",
1690            define_api(
1691                &Default::default(),
1692                MockBase::instance(),
1693                "1.0.0".parse().unwrap(),
1694            )
1695            .unwrap(),
1696        )
1697        .unwrap();
1698        network.spawn(
1699            "server",
1700            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1701        );
1702
1703        // Start a data source which is not receiving events from consensus, only from a peer.
1704        let provider = Provider::new(QueryServiceProvider::new(
1705            format!("http://localhost:{port}").parse().unwrap(),
1706            MockBase::instance(),
1707        ));
1708        let db = TmpDb::init().await;
1709        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1710        let data_source = FetchingDataSource::builder(storage, provider)
1711            .disable_proactive_fetching()
1712            .disable_aggregator()
1713            .with_max_retry_interval(Duration::from_millis(100))
1714            .with_retry_timeout(Duration::from_secs(1))
1715            .build()
1716            .await
1717            .unwrap();
1718
1719        // Start consensus.
1720        network.start().await;
1721
1722        // Wait until a couple of blocks are produced.
1723        let leaves = network.data_source().subscribe_leaves(1).await;
1724        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1725
1726        // Send the last leaf to the disconnected data source so it learns about the height.
1727        let last_leaf = leaves.last().unwrap();
1728        let mut tx = data_source.write().await.unwrap();
1729        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1730        tx.commit().await.unwrap();
1731
1732        // Trigger a fetch of the first leaf; it should resolve even if we fail to store the leaf.
1733        tracing::info!("fetch with write failure");
1734        match failure {
1735            FailureType::Begin => {
1736                data_source
1737                    .as_ref()
1738                    .fail_begins_writable(FailableAction::Any)
1739                    .await
1740            },
1741            FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1742            FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1743        }
1744        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1745        data_source.as_ref().pass().await;
1746
1747        // It is possible that the fetch above completes, notifies the subscriber,
1748        // and the fetch below quickly subscribes and gets notified by the same loop.
1749        // We add a delay here to avoid this situation.
1750        // This is not a bug, as being notified after subscribing is fine.
1751        sleep(Duration::from_secs(1)).await;
1752
1753        // We can get the same leaf again, this will again trigger an active fetch since storage
1754        // failed the first time.
1755        tracing::info!("fetch with write success");
1756        let fetch = data_source.get_leaf(1).await;
1757        assert!(fetch.is_pending());
1758        assert_eq!(leaves[0], fetch.await);
1759
1760        sleep(Duration::from_secs(1)).await;
1761
1762        // Finally, we should have the leaf locally and not need to fetch it.
1763        tracing::info!("retrieve from storage");
1764        let fetch = data_source.get_leaf(1).await;
1765        assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1766    }
1767
1768    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1769    async fn test_fetch_storage_failure_on_begin() {
1770        test_fetch_storage_failure_helper(FailureType::Begin).await;
1771    }
1772
1773    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1774    async fn test_fetch_storage_failure_on_write() {
1775        test_fetch_storage_failure_helper(FailureType::Write).await;
1776    }
1777
1778    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1779    async fn test_fetch_storage_failure_on_commit() {
1780        test_fetch_storage_failure_helper(FailureType::Commit).await;
1781    }
1782
1783    async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1784        // Create the consensus network.
1785        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1786
1787        // Start a web server that the non-consensus node can use to fetch blocks.
1788        let port = pick_unused_port().unwrap();
1789        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1790        app.register_module(
1791            "availability",
1792            define_api(
1793                &Default::default(),
1794                MockBase::instance(),
1795                "1.0.0".parse().unwrap(),
1796            )
1797            .unwrap(),
1798        )
1799        .unwrap();
1800        network.spawn(
1801            "server",
1802            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1803        );
1804
1805        // Start a data source which is not receiving events from consensus, only from a peer.
1806        let provider = Provider::new(QueryServiceProvider::new(
1807            format!("http://localhost:{port}").parse().unwrap(),
1808            MockBase::instance(),
1809        ));
1810        let db = TmpDb::init().await;
1811        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1812        let data_source = FetchingDataSource::builder(storage, provider)
1813            .disable_proactive_fetching()
1814            .disable_aggregator()
1815            .with_min_retry_interval(Duration::from_millis(100))
1816            .build()
1817            .await
1818            .unwrap();
1819
1820        // Start consensus.
1821        network.start().await;
1822
1823        // Wait until a couple of blocks are produced.
1824        let leaves = network.data_source().subscribe_leaves(1).await;
1825        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1826
1827        // Send the last leaf to the disconnected data source so it learns about the height.
1828        let last_leaf = leaves.last().unwrap();
1829        let mut tx = data_source.write().await.unwrap();
1830        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1831        tx.commit().await.unwrap();
1832
1833        // Trigger a fetch of the first leaf; it should retry until it successfully stores the leaf.
1834        tracing::info!("fetch with write failure");
1835        match failure {
1836            FailureType::Begin => {
1837                data_source
1838                    .as_ref()
1839                    .fail_one_begin_writable(FailableAction::Any)
1840                    .await
1841            },
1842            FailureType::Write => {
1843                data_source
1844                    .as_ref()
1845                    .fail_one_write(FailableAction::Any)
1846                    .await
1847            },
1848            FailureType::Commit => {
1849                data_source
1850                    .as_ref()
1851                    .fail_one_commit(FailableAction::Any)
1852                    .await
1853            },
1854        }
1855        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1856
1857        // Check that the leaf ended up in local storage.
1858        let mut tx = data_source.read().await.unwrap();
1859        assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1860    }
1861
1862    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1863    async fn test_fetch_storage_failure_retry_on_begin() {
1864        test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1865    }
1866
1867    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1868    async fn test_fetch_storage_failure_retry_on_write() {
1869        test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1870    }
1871
1872    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1873    async fn test_fetch_storage_failure_retry_on_commit() {
1874        test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1875    }
1876
1877    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1878    async fn test_fetch_on_decide() {
1879        // Create the consensus network.
1880        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1881
1882        // Start a web server that the non-consensus node can use to fetch blocks.
1883        let port = pick_unused_port().unwrap();
1884        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1885        app.register_module(
1886            "availability",
1887            define_api(
1888                &Default::default(),
1889                MockBase::instance(),
1890                "1.0.0".parse().unwrap(),
1891            )
1892            .unwrap(),
1893        )
1894        .unwrap();
1895        network.spawn(
1896            "server",
1897            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1898        );
1899
1900        // Start a data source which is not receiving events from consensus.
1901        let db = TmpDb::init().await;
1902        let provider = Provider::new(QueryServiceProvider::new(
1903            format!("http://localhost:{port}").parse().unwrap(),
1904            MockBase::instance(),
1905        ));
1906        let data_source = builder(&db, &provider)
1907            .await
1908            .with_max_retry_interval(Duration::from_secs(1))
1909            .build()
1910            .await
1911            .unwrap();
1912
1913        // Start consensus.
1914        network.start().await;
1915
1916        // Wait until a block has been decided.
1917        let leaf = network
1918            .data_source()
1919            .subscribe_leaves(1)
1920            .await
1921            .next()
1922            .await
1923            .unwrap();
1924
1925        // Give the node a decide containing the leaf but no additional information.
1926        data_source.append(leaf.clone().into()).await.unwrap();
1927
1928        // We will eventually retrieve the corresponding block and VID common, triggered by seeing
1929        // the leaf.
1930        sleep(Duration::from_secs(5)).await;
1931
1932        // Read the missing data directly from storage (via a database transaction), rather than
1933        // going through the data source, so that this request itself does not trigger a fetch.
1934        // Thus, this will only work if the data was already fetched, triggered by the leaf.
1935        let mut tx = data_source.read().await.unwrap();
1936        let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1937        let block = tx.get_block(id).await.unwrap();
1938        let vid = tx.get_vid_common(id).await.unwrap();
1939
1940        assert_eq!(block.hash(), leaf.block_hash());
1941        assert_eq!(vid.block_hash(), leaf.block_hash());
1942    }
1943
1944    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1945    async fn test_fetch_begin_failure() {
1946        // Create the consensus network.
1947        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1948
1949        // Start a web server that the non-consensus node can use to fetch blocks.
1950        let port = pick_unused_port().unwrap();
1951        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1952        app.register_module(
1953            "availability",
1954            define_api(
1955                &Default::default(),
1956                MockBase::instance(),
1957                "1.0.0".parse().unwrap(),
1958            )
1959            .unwrap(),
1960        )
1961        .unwrap();
1962        network.spawn(
1963            "server",
1964            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1965        );
1966
1967        // Start a data source which is not receiving events from consensus, only from a peer.
1968        let provider = Provider::new(QueryServiceProvider::new(
1969            format!("http://localhost:{port}").parse().unwrap(),
1970            MockBase::instance(),
1971        ));
1972        let db = TmpDb::init().await;
1973        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1974        let data_source = FetchingDataSource::builder(storage, provider)
1975            .disable_proactive_fetching()
1976            .disable_aggregator()
1977            .with_min_retry_interval(Duration::from_millis(100))
1978            .build()
1979            .await
1980            .unwrap();
1981
1982        // Start consensus.
1983        network.start().await;
1984
1985        // Wait until a couple of blocks are produced.
1986        let leaves = network.data_source().subscribe_leaves(1).await;
1987        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1988
1989        // Send the last leaf to the disconnected data source so it learns about the height.
1990        let last_leaf = leaves.last().unwrap();
1991        let mut tx = data_source.write().await.unwrap();
1992        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1993        tx.commit().await.unwrap();
1994
1995        // Trigger a fetch of the first leaf; it should retry until it is able to determine
1996        // the leaf is fetchable and trigger the fetch.
1997        tracing::info!("fetch with transaction failure");
1998        data_source
1999            .as_ref()
2000            .fail_one_begin_read_only(FailableAction::Any)
2001            .await;
2002        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2003    }
2004
2005    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2006    async fn test_fetch_load_failure_block() {
2007        // Create the consensus network.
2008        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2009
2010        // Start a web server that the non-consensus node can use to fetch blocks.
2011        let port = pick_unused_port().unwrap();
2012        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2013        app.register_module(
2014            "availability",
2015            define_api(
2016                &Default::default(),
2017                MockBase::instance(),
2018                "1.0.0".parse().unwrap(),
2019            )
2020            .unwrap(),
2021        )
2022        .unwrap();
2023        network.spawn(
2024            "server",
2025            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2026        );
2027
2028        // Start a data source which is not receiving events from consensus, only from a peer.
2029        let provider = Provider::new(QueryServiceProvider::new(
2030            format!("http://localhost:{port}").parse().unwrap(),
2031            MockBase::instance(),
2032        ));
2033        let db = TmpDb::init().await;
2034        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2035        let data_source = FetchingDataSource::builder(storage, provider)
2036            .disable_proactive_fetching()
2037            .disable_aggregator()
2038            .with_min_retry_interval(Duration::from_millis(100))
2039            .build()
2040            .await
2041            .unwrap();
2042
2043        // Start consensus.
2044        network.start().await;
2045
2046        // Wait until a block is produced.
2047        let mut leaves = network.data_source().subscribe_leaves(1).await;
2048        let leaf = leaves.next().await.unwrap();
2049
2050        // Send the leaf to the disconnected data source, so the corresponding block becomes
2051        // fetchable.
2052        let mut tx = data_source.write().await.unwrap();
2053        tx.insert_leaf(leaf.clone()).await.unwrap();
2054        tx.commit().await.unwrap();
2055
2056        // Trigger a fetch of the block by hash; it should retry until it is able to determine the
2057        // leaf is available, thus the block is fetchable, trigger the fetch.
2058        //
2059        // Failing only on the `get_header` call here hits an edge case which is only possible when
2060        // fetching blocks: we successfully determine that the object is not available locally and
2061        // that it might exist, so we actually call `active_fetch` to try and get it. If we then
2062        // fail to load the header and erroneously treat this as the header not being available, we
2063        // will give up and consider the object unfetchable (since the next step would be to fetch
2064        // the corresponding leaf, but we cannot do this with just a block hash).
2065        //
2066        // Thus, this test will only pass if we correctly retry the `active_fetch` until we are
2067        // successfully able to load the header from storage and determine that the block is
2068        // fetchable.
2069        tracing::info!("fetch with read failure");
2070        data_source
2071            .as_ref()
2072            .fail_one_read(FailableAction::GetHeader)
2073            .await;
2074        let fetch = data_source.get_block(leaf.block_hash()).await;
2075
2076        // Give some time for a few reads to fail before letting them succeed.
2077        sleep(Duration::from_secs(2)).await;
2078        data_source.as_ref().pass().await;
2079
2080        let block: BlockQueryData<MockTypes> = fetch.await;
2081        assert_eq!(block.hash(), leaf.block_hash());
2082    }
2083
2084    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2085    async fn test_fetch_load_failure_tx() {
2086        // Create the consensus network.
2087        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2088
2089        // Start a web server that the non-consensus node can use to fetch blocks.
2090        let port = pick_unused_port().unwrap();
2091        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2092        app.register_module(
2093            "availability",
2094            define_api(
2095                &Default::default(),
2096                MockBase::instance(),
2097                "1.0.0".parse().unwrap(),
2098            )
2099            .unwrap(),
2100        )
2101        .unwrap();
2102        network.spawn(
2103            "server",
2104            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2105        );
2106
2107        // Start a data source which is not receiving events from consensus, only from a peer.
2108        let provider = Provider::new(QueryServiceProvider::new(
2109            format!("http://localhost:{port}").parse().unwrap(),
2110            MockBase::instance(),
2111        ));
2112        let db = TmpDb::init().await;
2113        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2114        let data_source = FetchingDataSource::builder(storage, provider)
2115            .disable_proactive_fetching()
2116            .disable_aggregator()
2117            .with_min_retry_interval(Duration::from_millis(100))
2118            .build()
2119            .await
2120            .unwrap();
2121
2122        // Start consensus.
2123        network.start().await;
2124
2125        // Wait until a transaction is sequenced.
2126        let tx = mock_transaction(vec![1, 2, 3]);
2127        network.submit_transaction(tx.clone()).await;
2128        let tx = network
2129            .data_source()
2130            .get_block_containing_transaction(tx.commit())
2131            .await
2132            .await;
2133
2134        // Send the block containing the transaction to the disconnected data source.
2135        {
2136            let leaf = network
2137                .data_source()
2138                .get_leaf(tx.transaction.block_height() as usize)
2139                .await
2140                .await;
2141            let block = network
2142                .data_source()
2143                .get_block(tx.transaction.block_height() as usize)
2144                .await
2145                .await;
2146            let mut tx = data_source.write().await.unwrap();
2147            tx.insert_leaf(leaf.clone()).await.unwrap();
2148            tx.insert_block(block.clone()).await.unwrap();
2149            tx.commit().await.unwrap();
2150        }
2151
2152        // Check that the transaction is there.
2153        tracing::info!("fetch success");
2154        assert_eq!(
2155            tx,
2156            data_source
2157                .get_block_containing_transaction(tx.transaction.hash())
2158                .await
2159                .await
2160        );
2161
2162        // Fetch the transaction with storage failures.
2163        //
2164        // Failing only one read here hits an edge case that only exists for unfetchable objects
2165        // (e.g. transactions). This will cause the initial aload of the transaction to fail, but,
2166        // if we erroneously treat this load failure as the transaction being missing, we will
2167        // succeed in calling `fetch`, since subsequent loads succeed. However, since a transaction
2168        // is not active-fetchable, no active fetch will actually be spawned, and this fetch will
2169        // never resolve.
2170        //
2171        // Thus, the test should only pass if we correctly retry the initial load until it succeeds
2172        // and we discover that the transaction doesn't need to be fetched at all.
2173        tracing::info!("fetch with read failure");
2174        data_source
2175            .as_ref()
2176            .fail_one_read(FailableAction::Any)
2177            .await;
2178        let fetch = data_source
2179            .get_block_containing_transaction(tx.transaction.hash())
2180            .await;
2181
2182        assert_eq!(tx, fetch.await);
2183    }
2184
2185    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2186    async fn test_stream_begin_failure() {
2187        // Create the consensus network.
2188        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2189
2190        // Start a web server that the non-consensus node can use to fetch blocks.
2191        let port = pick_unused_port().unwrap();
2192        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2193        app.register_module(
2194            "availability",
2195            define_api(
2196                &Default::default(),
2197                MockBase::instance(),
2198                "1.0.0".parse().unwrap(),
2199            )
2200            .unwrap(),
2201        )
2202        .unwrap();
2203        network.spawn(
2204            "server",
2205            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2206        );
2207
2208        // Start a data source which is not receiving events from consensus, only from a peer.
2209        let provider = Provider::new(QueryServiceProvider::new(
2210            format!("http://localhost:{port}").parse().unwrap(),
2211            MockBase::instance(),
2212        ));
2213        let db = TmpDb::init().await;
2214        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2215        let data_source = FetchingDataSource::builder(storage, provider)
2216            .disable_proactive_fetching()
2217            .disable_aggregator()
2218            .with_min_retry_interval(Duration::from_millis(100))
2219            .with_range_chunk_size(3)
2220            .build()
2221            .await
2222            .unwrap();
2223
2224        // Start consensus.
2225        network.start().await;
2226
2227        // Wait until a few blocks are produced.
2228        let leaves = network.data_source().subscribe_leaves(1).await;
2229        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2230
2231        // Send the last leaf to the disconnected data source so it learns about the height.
2232        let last_leaf = leaves.last().unwrap();
2233        let mut tx = data_source.write().await.unwrap();
2234        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2235        tx.commit().await.unwrap();
2236
2237        // Stream the leaves; it should retry until it is able to determine each leaf is fetchable
2238        // and trigger the fetch.
2239        tracing::info!("stream with transaction failure");
2240        data_source
2241            .as_ref()
2242            .fail_one_begin_read_only(FailableAction::Any)
2243            .await;
2244        assert_eq!(
2245            leaves,
2246            data_source
2247                .subscribe_leaves(1)
2248                .await
2249                .take(5)
2250                .collect::<Vec<_>>()
2251                .await
2252        );
2253    }
2254
2255    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2256    async fn test_stream_load_failure() {
2257        // Create the consensus network.
2258        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2259
2260        // Start a web server that the non-consensus node can use to fetch blocks.
2261        let port = pick_unused_port().unwrap();
2262        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2263        app.register_module(
2264            "availability",
2265            define_api(
2266                &Default::default(),
2267                MockBase::instance(),
2268                "1.0.0".parse().unwrap(),
2269            )
2270            .unwrap(),
2271        )
2272        .unwrap();
2273        network.spawn(
2274            "server",
2275            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2276        );
2277
2278        // Start a data source which is not receiving events from consensus, only from a peer.
2279        let provider = Provider::new(QueryServiceProvider::new(
2280            format!("http://localhost:{port}").parse().unwrap(),
2281            MockBase::instance(),
2282        ));
2283        let db = TmpDb::init().await;
2284        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2285        let data_source = FetchingDataSource::builder(storage, provider)
2286            .disable_proactive_fetching()
2287            .disable_aggregator()
2288            .with_min_retry_interval(Duration::from_millis(100))
2289            .with_range_chunk_size(3)
2290            .build()
2291            .await
2292            .unwrap();
2293
2294        // Start consensus.
2295        network.start().await;
2296
2297        // Wait until a few blocks are produced.
2298        let leaves = network.data_source().subscribe_leaves(1).await;
2299        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2300
2301        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2302        let last_leaf = leaves.last().unwrap();
2303        let mut tx = data_source.write().await.unwrap();
2304        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2305        tx.commit().await.unwrap();
2306
2307        // Stream the blocks with a period of database failures.
2308        tracing::info!("stream with read failure");
2309        data_source.as_ref().fail_reads(FailableAction::Any).await;
2310        let fetches = data_source
2311            .get_block_range(1..=5)
2312            .await
2313            .collect::<Vec<_>>()
2314            .await;
2315
2316        // Give some time for a few reads to fail before letting them succeed.
2317        sleep(Duration::from_secs(2)).await;
2318        data_source.as_ref().pass().await;
2319
2320        for (leaf, fetch) in leaves.iter().zip(fetches) {
2321            let block: BlockQueryData<MockTypes> = fetch.await;
2322            assert_eq!(block.hash(), leaf.block_hash());
2323        }
2324    }
2325
2326    enum MetadataType {
2327        Payload,
2328        Vid,
2329    }
2330
2331    async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2332        // Create the consensus network.
2333        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2334
2335        // Start a web server that the non-consensus node can use to fetch blocks.
2336        let port = pick_unused_port().unwrap();
2337        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2338        app.register_module(
2339            "availability",
2340            define_api(
2341                &Default::default(),
2342                MockBase::instance(),
2343                "1.0.0".parse().unwrap(),
2344            )
2345            .unwrap(),
2346        )
2347        .unwrap();
2348        network.spawn(
2349            "server",
2350            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2351        );
2352
2353        // Start a data source which is not receiving events from consensus, only from a peer.
2354        let provider = Provider::new(QueryServiceProvider::new(
2355            format!("http://localhost:{port}").parse().unwrap(),
2356            MockBase::instance(),
2357        ));
2358        let db = TmpDb::init().await;
2359        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2360        let data_source = FetchingDataSource::builder(storage, provider)
2361            .disable_proactive_fetching()
2362            .disable_aggregator()
2363            .with_min_retry_interval(Duration::from_millis(100))
2364            .with_range_chunk_size(3)
2365            .build()
2366            .await
2367            .unwrap();
2368
2369        // Start consensus.
2370        network.start().await;
2371
2372        // Wait until a few blocks are produced.
2373        let leaves = network.data_source().subscribe_leaves(1).await;
2374        let leaves = leaves.take(3).collect::<Vec<_>>().await;
2375
2376        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2377        let last_leaf = leaves.last().unwrap();
2378        let mut tx = data_source.write().await.unwrap();
2379        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2380        tx.commit().await.unwrap();
2381
2382        // Send the first object to the disconnected data source, so we hit all the cases:
2383        // * leaf present but not full object (from the last leaf)
2384        // * full object present but inaccessible due to storage failures (first object)
2385        // * nothing present (middle object)
2386        let leaf = network.data_source().get_leaf(1).await.await;
2387        let block = network.data_source().get_block(1).await.await;
2388        let vid = network.data_source().get_vid_common(1).await.await;
2389        data_source
2390            .append(BlockInfo::new(leaf, Some(block), Some(vid), None, None))
2391            .await
2392            .unwrap();
2393
2394        // Stream the objects with a period of database failures.
2395        tracing::info!("stream with transaction failure");
2396        data_source
2397            .as_ref()
2398            .fail_begins_read_only(FailableAction::Any)
2399            .await;
2400        match stream {
2401            MetadataType::Payload => {
2402                let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2403
2404                // Give some time for a few reads to fail before letting them succeed.
2405                sleep(Duration::from_secs(2)).await;
2406                tracing::info!("stop failing transactions");
2407                data_source.as_ref().pass().await;
2408
2409                let payloads = payloads.collect::<Vec<_>>().await;
2410                for (leaf, payload) in leaves.iter().zip(payloads) {
2411                    assert_eq!(payload.block_hash, leaf.block_hash());
2412                }
2413            },
2414            MetadataType::Vid => {
2415                let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2416
2417                // Give some time for a few reads to fail before letting them succeed.
2418                sleep(Duration::from_secs(2)).await;
2419                tracing::info!("stop failing transactions");
2420                data_source.as_ref().pass().await;
2421
2422                let vids = vids.collect::<Vec<_>>().await;
2423                for (leaf, vid) in leaves.iter().zip(vids) {
2424                    assert_eq!(vid.block_hash, leaf.block_hash());
2425                }
2426            },
2427        }
2428    }
2429
2430    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2431    async fn test_metadata_stream_begin_failure_payload() {
2432        test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2433    }
2434
2435    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2436    async fn test_metadata_stream_begin_failure_vid() {
2437        test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2438    }
2439
2440    // This helper function starts up a mock network
2441    // with v0 and v1 availability query modules,
2442    // trigger fetches for a datasource from the provider,
2443    // and asserts that the fetched data is correct
2444    async fn run_fallback_deserialization_test_helper<V: Versions>(port: u16, version: &str) {
2445        let mut network = MockNetwork::<MockDataSource, V>::init().await;
2446
2447        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2448
2449        // Register availability APIs for two versions: v0 and v1
2450        app.register_module(
2451            "availability",
2452            define_api(
2453                &Default::default(),
2454                StaticVersion::<0, 1> {},
2455                "0.0.1".parse().unwrap(),
2456            )
2457            .unwrap(),
2458        )
2459        .unwrap();
2460
2461        app.register_module(
2462            "availability",
2463            define_api(
2464                &Default::default(),
2465                StaticVersion::<0, 1> {},
2466                "1.0.0".parse().unwrap(),
2467            )
2468            .unwrap(),
2469        )
2470        .unwrap();
2471
2472        network.spawn(
2473            "server",
2474            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2475        );
2476
2477        let db = TmpDb::init().await;
2478
2479        let provider_url = format!("http://localhost:{port}/{version}")
2480            .parse()
2481            .expect("Invalid URL");
2482
2483        let provider = Provider::new(QueryServiceProvider::new(
2484            provider_url,
2485            StaticVersion::<0, 1> {},
2486        ));
2487
2488        let ds = data_source(&db, &provider).await;
2489        network.start().await;
2490
2491        let leaves = network.data_source().subscribe_leaves(1).await;
2492        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2493        let test_leaf = &leaves[0];
2494        let test_payload = &leaves[2];
2495        let test_common = &leaves[3];
2496
2497        let mut fetches = vec![];
2498        // Issue requests for missing data (these should initially remain unresolved):
2499        fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2500        fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2501        fetches.push(
2502            ds.get_vid_common(test_common.block_hash())
2503                .await
2504                .map(ignore),
2505        );
2506
2507        // Even if we give data extra time to propagate, these requests will not resolve, since we
2508        // didn't trigger any active fetches.
2509        sleep(Duration::from_secs(1)).await;
2510        for (i, fetch) in fetches.into_iter().enumerate() {
2511            tracing::info!("checking fetch {i} is unresolved");
2512            fetch.try_resolve().unwrap_err();
2513        }
2514
2515        // Append the latest known leaf to the local store
2516        // This would trigger fetches for the corresponding missing data
2517        // such as header, vid and payload
2518        // This would also trigger fetches for the parent data
2519        ds.append(leaves.last().cloned().unwrap().into())
2520            .await
2521            .unwrap();
2522
2523        // check that the data has been fetches and matches the network data source
2524        {
2525            let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2526            let payload = ds.get_payload(test_payload.height() as usize).await;
2527            let common = ds.get_vid_common(test_common.height() as usize).await;
2528
2529            let truth = network.data_source();
2530            assert_eq!(
2531                leaf.await,
2532                truth.get_leaf(test_leaf.height() as usize).await.await
2533            );
2534            assert_eq!(
2535                payload.await,
2536                truth
2537                    .get_payload(test_payload.height() as usize)
2538                    .await
2539                    .await
2540            );
2541            assert_eq!(
2542                common.await,
2543                truth
2544                    .get_vid_common(test_common.height() as usize)
2545                    .await
2546                    .await
2547            );
2548        }
2549    }
2550
2551    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2552    async fn test_fallback_deserialization_for_fetch_requests_v0() {
2553        let port = pick_unused_port().unwrap();
2554
2555        // This run will call v0 availalbilty api for fetch requests.
2556        // The fetch initially attempts deserialization with new types,
2557        // which fails because the v0 provider returns legacy types.
2558        // It then falls back to deserializing as legacy types,
2559        // and the fetch passes
2560        run_fallback_deserialization_test_helper::<MockVersions>(port, "v0").await;
2561    }
2562
2563    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2564    async fn test_fallback_deserialization_for_fetch_requests_v1() {
2565        let port = pick_unused_port().unwrap();
2566
2567        // Fetch from the v1 availability API using MockVersions.
2568        // this one fetches from the v1 provider.
2569        // which would correctly deserialize the bytes in the first attempt, so no fallback deserialization is needed
2570        run_fallback_deserialization_test_helper::<MockVersions>(port, "v1").await;
2571    }
2572
2573    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2574    async fn test_fallback_deserialization_for_fetch_requests_pos() {
2575        let port = pick_unused_port().unwrap();
2576
2577        // Fetch Proof of Stake (PoS) data using the v1 availability API
2578        // with proof of stake version
2579        run_fallback_deserialization_test_helper::<EpochsTestVersions>(port, "v1").await;
2580    }
2581    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2582    async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2583        // Run with the PoS version against a v0 provider.
2584        // Fetch requests are expected to fail because PoS commitments differ from the legacy commitments
2585        // returned by the v0 provider.
2586        // For example: a PoS Leaf2 commitment will not match the downgraded commitment from a legacy Leaf1.
2587
2588        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
2589
2590        let port = pick_unused_port().unwrap();
2591        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2592
2593        app.register_module(
2594            "availability",
2595            define_api(
2596                &Default::default(),
2597                StaticVersion::<0, 1> {},
2598                "0.0.1".parse().unwrap(),
2599            )
2600            .unwrap(),
2601        )
2602        .unwrap();
2603
2604        network.spawn(
2605            "server",
2606            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2607        );
2608
2609        let db = TmpDb::init().await;
2610        let provider = Provider::new(QueryServiceProvider::new(
2611            format!("http://localhost:{port}/v0").parse().unwrap(),
2612            StaticVersion::<0, 1> {},
2613        ));
2614        let ds = data_source(&db, &provider).await;
2615
2616        network.start().await;
2617
2618        let leaves = network.data_source().subscribe_leaves(1).await;
2619        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2620        let test_leaf = &leaves[0];
2621        let test_payload = &leaves[2];
2622        let test_common = &leaves[3];
2623
2624        let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2625        let payload = ds.get_payload(test_payload.height() as usize).await;
2626        let common = ds.get_vid_common(test_common.height() as usize).await;
2627
2628        sleep(Duration::from_secs(3)).await;
2629
2630        // fetches fail because of different commitments
2631        leaf.try_resolve().unwrap_err();
2632        payload.try_resolve().unwrap_err();
2633        common.try_resolve().unwrap_err();
2634    }
2635}