hotshot_query_service/fetching/provider/
query_service.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16    data::{ns_table, VidCommitment},
17    traits::{block_contents::BlockHeader, node_implementation::NodeType, EncodeBytes},
18    vid::{
19        advz::{advz_scheme, ADVZScheme},
20        avidm::{init_avidm_param, AvidMScheme},
21    },
22};
23use jf_advz::VidScheme;
24use surf_disco::{Client, Url};
25use vbs::{version::StaticVersionType, BinarySerializer};
26
27use super::Provider;
28use crate::{
29    availability::{
30        ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
31        PayloadQueryData, VidCommonQueryData,
32    },
33    fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
34    types::HeightIndexed,
35    Error, Header, Payload, VidCommon,
36};
37
38/// Data availability provider backed by another instance of this query service.
39///
40/// This fetcher implements the [`Provider`] interface by querying the REST API provided by another
41/// instance of this query service to try and retrieve missing objects.
42#[derive(Clone, Debug)]
43pub struct QueryServiceProvider<Ver: StaticVersionType> {
44    client: Client<Error, Ver>,
45}
46
47impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
48    pub fn new(url: Url, _: Ver) -> Self {
49        Self {
50            client: Client::new(url),
51        }
52    }
53}
54
55impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
56    async fn deserialize_legacy_payload<Types: NodeType>(
57        &self,
58        payload_bytes: Vec<u8>,
59        common_bytes: Vec<u8>,
60        req: PayloadRequest,
61    ) -> Option<Payload<Types>> {
62        let client_url = self.client.base_url();
63
64        let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
65            return None;
66        };
67
68        let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
69            &payload_bytes,
70        ) {
71            Ok(payload) => payload,
72            Err(err) => {
73                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
74                return None;
75            },
76        };
77
78        let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
79            &common_bytes,
80        ) {
81            Ok(common) => common,
82            Err(err) => {
83                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
84                return None;
85            },
86        };
87
88        let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
89        let bytes = payload.data.encode();
90
91        let commit = advz_scheme(num_storage_nodes)
92            .commit_only(bytes)
93            .inspect_err(|err| {
94                tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
95            })
96            .ok()?;
97
98        if commit != advz_commit {
99            tracing::error!(
100                ?req,
101                expected_commit=%advz_commit,
102                actual_commit=%commit,
103                %client_url,
104                "received inconsistent legacy payload"
105            );
106            return None;
107        }
108
109        Some(payload.data)
110    }
111
112    async fn deserialize_legacy_vid_common<Types: NodeType>(
113        &self,
114        bytes: Vec<u8>,
115        req: VidCommonRequest,
116    ) -> Option<VidCommon> {
117        let client_url = self.client.base_url();
118        let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
119            return None;
120        };
121
122        match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
123            Ok(res) => {
124                if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
125                    Some(VidCommon::V0(res.common))
126                } else {
127                    tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
128                    None
129                }
130            },
131            Err(err) => {
132                tracing::warn!(
133                    %client_url,
134                    ?req,
135                    %err,
136                    "failed to deserialize ADVZCommonQueryData"
137                );
138                None
139            },
140        }
141    }
142    async fn deserialize_legacy_leaf<Types: NodeType>(
143        &self,
144        bytes: Vec<u8>,
145        req: LeafRequest<Types>,
146    ) -> Option<LeafQueryData<Types>> {
147        let client_url = self.client.base_url();
148
149        match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
150            Ok(mut leaf) => {
151                if leaf.height() != req.height {
152                    tracing::error!(
153                        %client_url, ?req,
154                        expected_height = req.height,
155                        actual_height = leaf.height(),
156                        "received leaf with the wrong height"
157                    );
158                    return None;
159                }
160
161                let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
162                let actual_leaf_commit: [u8; 32] = leaf.hash().into();
163                if actual_leaf_commit != expected_leaf_commit {
164                    tracing::error!(
165                        %client_url, ?req,
166                        expected_leaf = %req.expected_leaf,
167                        actual_leaf = %leaf.hash(),
168                        "received leaf with the wrong hash"
169                    );
170                    return None;
171                }
172
173                let expected_qc_commit: [u8; 32] = req.expected_qc.into();
174                let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
175                if actual_qc_commit != expected_qc_commit {
176                    tracing::error!(
177                        %client_url, ?req,
178                        expected_qc = %req.expected_qc,
179                        actual_qc = %leaf.qc().commit(),
180                        "received leaf with the wrong QC"
181                    );
182                    return None;
183                }
184
185                // There is a potential DOS attack where the peer sends us a leaf with the full
186                // payload in it, which uses redundant resources in the database, since we fetch and
187                // store payloads separately. We can defend ourselves by simply dropping the payload
188                // if present.
189                leaf.leaf.unfill_block_payload();
190
191                Some(leaf.into())
192            },
193            Err(err) => {
194                tracing::warn!(
195                    %client_url, ?req, %err,
196                    "failed to deserialize legacy LeafQueryData"
197                );
198                None
199            },
200        }
201    }
202}
203
204#[async_trait]
205impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
206where
207    Types: NodeType,
208{
209    /// Fetches the `Payload` for a given request.
210    ///
211    /// Attempts to fetch and deserialize the requested data using the new type first.
212    /// If deserialization into the new type fails (e.g., because the provider is still returning
213    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
214    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
215    ///
216    async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
217        let client_url = self.client.base_url();
218        let req_hash = req.0;
219        // Fetch the payload and the VID common data. We need the common data to recompute the VID
220        // commitment, to ensure the payload we received is consistent with the commitment we
221        // requested.
222        let payload_bytes = self
223            .client
224            .get::<()>(&format!("availability/payload/hash/{}", req.0))
225            .bytes()
226            .await
227            .inspect_err(|err| {
228                tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
229            })
230            .ok()?;
231
232        let common_bytes = self
233            .client
234            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
235            .bytes()
236            .await
237            .inspect_err(|err| {
238                tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
239            })
240            .ok()?;
241
242        let payload =
243            vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
244                .inspect_err(|err| {
245                    tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
246                })
247                .ok();
248
249        let common =
250            vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
251                .inspect_err(|err| {
252                    tracing::info!(%err, %req_hash,
253                        "error deserializing VidCommonQueryData",
254                    );
255                })
256                .ok();
257
258        let (payload, common) = match (payload, common) {
259            (Some(payload), Some(common)) => (payload, common),
260            _ => {
261                tracing::info!(%req_hash, "falling back to legacy payload deserialization");
262
263                // fallback deserialization
264                return self
265                    .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
266                    .await;
267            },
268        };
269
270        match common.common() {
271            VidCommon::V0(common) => {
272                let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
273                let bytes = payload.data().encode();
274
275                let commit = advz_scheme(num_storage_nodes)
276                    .commit_only(bytes)
277                    .map(VidCommitment::V0)
278                    .inspect_err(|err| {
279                        tracing::error!(%err, %req_hash,  %num_storage_nodes, "failed to compute VID commitment (V0)");
280                    })
281                    .ok()?;
282
283                if commit != req.0 {
284                    tracing::error!(
285                        expected = %req_hash,
286                        actual = ?commit,
287                        %client_url,
288                        "VID commitment mismatch (V0)"
289                    );
290
291                    return None;
292                }
293            },
294            VidCommon::V1(common) => {
295                let bytes = payload.data().encode();
296
297                let avidm_param = init_avidm_param(common.total_weights)
298                    .inspect_err(|err| {
299                        tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
300                    })
301                    .ok()?;
302
303                let header = self
304                    .client
305                    .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
306                    .send()
307                    .await
308                    .inspect_err(|err| {
309                        tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
310                    })
311                    .ok()?;
312
313                if header.payload_commitment() != req.0 {
314                    tracing::error!(
315                        expected = %req_hash,
316                        actual = %header.payload_commitment(),
317                        %client_url,
318                        "header payload commitment mismatch (V1)"
319                    );
320                    return None;
321                }
322
323                let metadata = header.metadata().encode();
324                let commit = AvidMScheme::commit(
325                    &avidm_param,
326                    &bytes,
327                    ns_table::parse_ns_table(bytes.len(), &metadata),
328                )
329                .map(VidCommitment::V1)
330                .inspect_err(|err| {
331                    tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
332                })
333                .ok()?;
334
335                // Compare calculated commitment with requested commitment
336                if commit != req.0 {
337                    tracing::warn!(
338                        expected = %req_hash,
339                        actual = %commit,
340                        %client_url,
341                        "commitment type mismatch for AVIDM check"
342                    );
343                    return None;
344                }
345            },
346        }
347
348        Some(payload.data)
349    }
350}
351
352#[async_trait]
353impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
354    for QueryServiceProvider<Ver>
355where
356    Types: NodeType,
357{
358    /// Fetches the `Leaf` for a given request.
359    ///
360    /// Attempts to fetch and deserialize the requested data using the new type first.
361    /// If deserialization into the new type fails (e.g., because the provider is still returning
362    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
363    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
364    ///
365    async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
366        let client_url = self.client.base_url();
367
368        let bytes = self
369            .client
370            .get::<()>(&format!("availability/leaf/{}", req.height))
371            .bytes()
372            .await;
373        let bytes = match bytes {
374            Ok(bytes) => bytes,
375            Err(err) => {
376                tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
377
378                return None;
379            },
380        };
381
382        // Attempt to deserialize using the new type
383
384        match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
385            Ok(mut leaf) => {
386                if leaf.height() != req.height {
387                    tracing::error!(
388                        %client_url, ?req, ?leaf,
389                        expected_height = req.height,
390                        actual_height = leaf.height(),
391                        "received leaf with the wrong height"
392                    );
393                    return None;
394                }
395                if leaf.hash() != req.expected_leaf {
396                    tracing::error!(
397                        %client_url, ?req, ?leaf,
398                        expected_hash = %req.expected_leaf,
399                        actual_hash = %leaf.hash(),
400                        "received leaf with the wrong hash"
401                    );
402                    return None;
403                }
404                if leaf.qc().commit() != req.expected_qc {
405                    tracing::error!(
406                        %client_url, ?req, ?leaf,
407                        expected_qc = %req.expected_qc,
408                        actual_qc = %leaf.qc().commit(),
409                        "received leaf with the wrong QC"
410                    );
411                    return None;
412                }
413
414                // There is a potential DOS attack where the peer sends us a leaf with the full
415                // payload in it, which uses redundant resources in the database, since we fetch and
416                // store payloads separately. We can defend ourselves by simply dropping the payload
417                // if present.
418                leaf.leaf.unfill_block_payload();
419
420                Some(leaf)
421            },
422            Err(err) => {
423                tracing::info!(
424                    ?req, %err,
425                    "failed to deserialize LeafQueryData, falling back to legacy deserialization"
426                );
427                // Fallback deserialization
428                self.deserialize_legacy_leaf(bytes, req).await
429            },
430        }
431    }
432}
433
434#[async_trait]
435impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
436where
437    Types: NodeType,
438{
439    /// Fetches the `VidCommon` for a given request.
440    ///
441    /// Attempts to fetch and deserialize the requested data using the new type first.
442    /// If deserialization into the new type fails (e.g., because the provider is still returning
443    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
444    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
445    ///
446    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
447        let client_url = self.client.base_url();
448        let bytes = self
449            .client
450            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
451            .bytes()
452            .await;
453        let bytes = match bytes {
454            Ok(bytes) => bytes,
455            Err(err) => {
456                tracing::info!(
457                    %client_url, ?req, %err,
458                    "failed to fetch VID common bytes"
459                );
460                return None;
461            },
462        };
463
464        match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
465            Ok(res) => match req.0 {
466                VidCommitment::V0(commit) => {
467                    if let VidCommon::V0(common) = res.common {
468                        if ADVZScheme::is_consistent(&commit, &common).is_ok() {
469                            Some(VidCommon::V0(common))
470                        } else {
471                            tracing::error!(
472                                %client_url, ?req, ?commit, ?common,
473                                "VID V0 common data is inconsistent with commitment"
474                            );
475                            None
476                        }
477                    } else {
478                        tracing::warn!(?req, ?res, "Expect VID common data but found None");
479                        None
480                    }
481                },
482                VidCommitment::V1(_) => {
483                    if let VidCommon::V1(common) = res.common {
484                        Some(VidCommon::V1(common))
485                    } else {
486                        tracing::warn!(?req, ?res, "Expect VID common data but found None");
487                        None
488                    }
489                },
490            },
491            Err(err) => {
492                tracing::info!(
493                    %client_url, ?req, %err,
494                    "failed to deserialize as V1 VID common data, trying legacy fallback"
495                );
496                // Fallback deserialization
497                self.deserialize_legacy_vid_common::<Types>(bytes, req)
498                    .await
499            },
500        }
501    }
502}
503
504// These tests run the `postgres` Docker image, which doesn't work on Windows.
505#[cfg(all(test, not(target_os = "windows")))]
506mod test {
507    use std::{future::IntoFuture, time::Duration};
508
509    use committable::Committable;
510    use futures::{
511        future::{join, FutureExt},
512        stream::StreamExt,
513    };
514    // generic-array 0.14.x is deprecated, but VidCommitment requires this version
515    // for From<Output<H>> impl in jf-merkle-tree
516    #[allow(deprecated)]
517    use generic_array::GenericArray;
518    use hotshot_example_types::node_types::{EpochsTestVersions, TestVersions};
519    use hotshot_types::traits::node_implementation::Versions;
520    use portpicker::pick_unused_port;
521    use rand::RngCore;
522    use tide_disco::{error::ServerError, App};
523    use vbs::version::StaticVersion;
524
525    use super::*;
526    use crate::{
527        api::load_api,
528        availability::{
529            define_api, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData,
530            BlockWithTransaction, Fetch, UpdateAvailabilityData,
531        },
532        data_source::{
533            sql::{self, SqlDataSource},
534            storage::{
535                fail_storage::{FailStorage, FailableAction},
536                pruning::{PrunedHeightStorage, PrunerCfg},
537                sql::testing::TmpDb,
538                AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
539            },
540            AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
541        },
542        fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
543        node::{data_source::NodeDataSource, SyncStatus},
544        task::BackgroundTask,
545        testing::{
546            consensus::{MockDataSource, MockNetwork},
547            mocks::{mock_transaction, MockBase, MockTypes, MockVersions},
548            sleep,
549        },
550        types::HeightIndexed,
551        ApiState,
552    };
553
554    type Provider = TestProvider<QueryServiceProvider<MockBase>>;
555    type EpochProvider = TestProvider<QueryServiceProvider<<EpochsTestVersions as Versions>::Base>>;
556
557    fn ignore<T>(_: T) {}
558
559    /// Build a data source suitable for this suite of tests.
560    async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
561        db: &TmpDb,
562        provider: &P,
563    ) -> sql::Builder<MockTypes, P> {
564        db.config()
565            .builder((*provider).clone())
566            .await
567            .unwrap()
568            // We disable proactive fetching for these tests, since we are intending to test on
569            // demand fetching, and proactive fetching could lead to false successes.
570            .disable_proactive_fetching()
571    }
572
573    /// A data source suitable for this suite of tests, with the default options.
574    async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
575        db: &TmpDb,
576        provider: &P,
577    ) -> SqlDataSource<MockTypes, P> {
578        builder(db, provider).await.build().await.unwrap()
579    }
580
581    #[test_log::test(tokio::test(flavor = "multi_thread"))]
582    async fn test_fetch_on_request() {
583        // Create the consensus network.
584        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
585
586        // Start a web server that the non-consensus node can use to fetch blocks.
587        let port = pick_unused_port().unwrap();
588        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
589        app.register_module(
590            "availability",
591            define_api(
592                &Default::default(),
593                MockBase::instance(),
594                "1.0.0".parse().unwrap(),
595            )
596            .unwrap(),
597        )
598        .unwrap();
599        network.spawn(
600            "server",
601            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
602        );
603
604        // Start a data source which is not receiving events from consensus, only from a peer.
605        let db = TmpDb::init().await;
606        let provider = Provider::new(QueryServiceProvider::new(
607            format!("http://localhost:{port}").parse().unwrap(),
608            MockBase::instance(),
609        ));
610        let data_source = data_source(&db, &provider).await;
611
612        // Start consensus.
613        network.start().await;
614
615        // Wait until the block height reaches 6. This gives us the genesis block, one additional
616        // block at the end, and then one block to play around with fetching each type of resource:
617        // * Leaf
618        // * Block
619        // * Payload
620        // * VID common
621        let leaves = network.data_source().subscribe_leaves(1).await;
622        let leaves = leaves.take(5).collect::<Vec<_>>().await;
623        let test_leaf = &leaves[0];
624        let test_block = &leaves[1];
625        let test_payload = &leaves[2];
626        let test_common = &leaves[3];
627
628        // Make requests for missing data that should _not_ trigger an active fetch:
629        tracing::info!("requesting unfetchable resources");
630        let mut fetches = vec![];
631        // * An unknown leaf hash.
632        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
633        // * An unknown leaf height.
634        fetches.push(
635            data_source
636                .get_leaf(test_leaf.height() as usize)
637                .await
638                .map(ignore),
639        );
640        // * An unknown block hash.
641        fetches.push(
642            data_source
643                .get_block(test_block.block_hash())
644                .await
645                .map(ignore),
646        );
647        fetches.push(
648            data_source
649                .get_payload(test_payload.block_hash())
650                .await
651                .map(ignore),
652        );
653        fetches.push(
654            data_source
655                .get_vid_common(test_common.block_hash())
656                .await
657                .map(ignore),
658        );
659        // * An unknown block height.
660        fetches.push(
661            data_source
662                .get_block(test_block.height() as usize)
663                .await
664                .map(ignore),
665        );
666        fetches.push(
667            data_source
668                .get_payload(test_payload.height() as usize)
669                .await
670                .map(ignore),
671        );
672        fetches.push(
673            data_source
674                .get_vid_common(test_common.height() as usize)
675                .await
676                .map(ignore),
677        );
678        // * Genesis VID common (no VID for genesis)
679        fetches.push(data_source.get_vid_common(0).await.map(ignore));
680        // * An unknown transaction.
681        fetches.push(
682            data_source
683                .get_block_containing_transaction(mock_transaction(vec![]).commit())
684                .await
685                .map(ignore),
686        );
687
688        // Even if we give data extra time to propagate, these requests will not resolve, since we
689        // didn't trigger any active fetches.
690        sleep(Duration::from_secs(1)).await;
691        for (i, fetch) in fetches.into_iter().enumerate() {
692            tracing::info!("checking fetch {i} is unresolved");
693            fetch.try_resolve().unwrap_err();
694        }
695
696        // Now we will actually fetch the missing data. First, since our node is not really
697        // connected to consensus, we need to give it a leaf after the range of interest so it
698        // learns about the correct block height. We will temporarily lock requests to the provider
699        // so that we can verify that without the provider, the node does _not_ get the data.
700        provider.block().await;
701        data_source
702            .append(leaves.last().cloned().unwrap().into())
703            .await
704            .unwrap();
705
706        tracing::info!("requesting fetchable resources");
707        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
708        let req_block = data_source.get_block(test_block.height() as usize).await;
709        let req_payload = data_source
710            .get_payload(test_payload.height() as usize)
711            .await;
712        let req_common = data_source
713            .get_vid_common(test_common.height() as usize)
714            .await;
715
716        // Give the requests some extra time to complete, and check that they still haven't
717        // resolved, since the provider is blocked. This just ensures the integrity of the test by
718        // checking the node didn't mysteriously get the block from somewhere else, so that when we
719        // unblock the provider and the node finally gets the block, we know it came from the
720        // provider.
721        sleep(Duration::from_secs(1)).await;
722        req_leaf.try_resolve().unwrap_err();
723        req_block.try_resolve().unwrap_err();
724        req_payload.try_resolve().unwrap_err();
725        req_common.try_resolve().unwrap_err();
726
727        // Unblock the request and see that we eventually receive the data.
728        provider.unblock().await;
729        let leaf = data_source
730            .get_leaf(test_leaf.height() as usize)
731            .await
732            .await;
733        let block = data_source
734            .get_block(test_block.height() as usize)
735            .await
736            .await;
737        let payload = data_source
738            .get_payload(test_payload.height() as usize)
739            .await
740            .await;
741        let common = data_source
742            .get_vid_common(test_common.height() as usize)
743            .await
744            .await;
745        {
746            // Verify the data.
747            let truth = network.data_source();
748            assert_eq!(
749                leaf,
750                truth.get_leaf(test_leaf.height() as usize).await.await
751            );
752            assert_eq!(
753                block,
754                truth.get_block(test_block.height() as usize).await.await
755            );
756            assert_eq!(
757                payload,
758                truth
759                    .get_payload(test_payload.height() as usize)
760                    .await
761                    .await
762            );
763            assert_eq!(
764                common,
765                truth
766                    .get_vid_common(test_common.height() as usize)
767                    .await
768                    .await
769            );
770        }
771
772        // Fetching the block and payload should have also fetched the corresponding leaves, since
773        // we have an invariant that we should not store a block in the database without its
774        // corresponding leaf and header. Thus we should be able to get the leaves even if the
775        // provider is blocked.
776        provider.block().await;
777        for leaf in [test_block, test_payload] {
778            tracing::info!("fetching existing leaf {}", leaf.height());
779            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
780            assert_eq!(*leaf, fetched_leaf);
781        }
782
783        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
784        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
785        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
786        // with this hash exists, and trigger a fetch for it.
787        tracing::info!("fetching block by hash");
788        provider.unblock().await;
789        {
790            let block = data_source.get_block(test_leaf.block_hash()).await.await;
791            assert_eq!(block.hash(), leaf.block_hash());
792        }
793
794        // Test a similar scenario, but with payload instead of block: we are aware of
795        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
796        // hash.
797        tracing::info!("fetching payload by hash");
798        {
799            let leaf = leaves.last().unwrap();
800            let payload = data_source.get_payload(leaf.block_hash()).await.await;
801            assert_eq!(payload.height(), leaf.height());
802            assert_eq!(payload.block_hash(), leaf.block_hash());
803            assert_eq!(payload.hash(), leaf.payload_hash());
804        }
805    }
806
807    #[tokio::test(flavor = "multi_thread")]
808    async fn test_fetch_on_request_epoch_version() {
809        // This test verifies that our provider can handle fetching things by their hashes,
810        // specifically focused on epoch version transitions
811        tracing::info!("Starting test_fetch_on_request_epoch_version");
812
813        // Create the consensus network.
814        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
815
816        // Start a web server that the non-consensus node can use to fetch blocks.
817        let port = pick_unused_port().unwrap();
818        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
819        app.register_module(
820            "availability",
821            define_api(
822                &Default::default(),
823                <EpochsTestVersions as Versions>::Base::instance(),
824                "1.0.0".parse().unwrap(),
825            )
826            .unwrap(),
827        )
828        .unwrap();
829        network.spawn(
830            "server",
831            app.serve(
832                format!("0.0.0.0:{port}"),
833                <EpochsTestVersions as Versions>::Base::instance(),
834            ),
835        );
836
837        // Start a data source which is not receiving events from consensus, only from a peer.
838        // Use our special test provider that handles epoch version transitions
839        let db = TmpDb::init().await;
840        let provider = EpochProvider::new(QueryServiceProvider::new(
841            format!("http://localhost:{port}").parse().unwrap(),
842            <EpochsTestVersions as Versions>::Base::instance(),
843        ));
844        let data_source = data_source(&db, &provider).await;
845
846        // Start consensus.
847        network.start().await;
848
849        // Wait until the block height reaches 6. This gives us the genesis block, one additional
850        // block at the end, and then one block to play around with fetching each type of resource:
851        // * Leaf
852        // * Block
853        // * Payload
854        // * VID common
855        let leaves = network.data_source().subscribe_leaves(1).await;
856        let leaves = leaves.take(5).collect::<Vec<_>>().await;
857        let test_leaf = &leaves[0];
858        let test_block = &leaves[1];
859        let test_payload = &leaves[2];
860        let test_common = &leaves[3];
861
862        // Make requests for missing data that should _not_ trigger an active fetch:
863        let mut fetches = vec![];
864        // * An unknown leaf hash.
865        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
866        // * An unknown leaf height.
867        fetches.push(
868            data_source
869                .get_leaf(test_leaf.height() as usize)
870                .await
871                .map(ignore),
872        );
873        // * An unknown block hash.
874        fetches.push(
875            data_source
876                .get_block(test_block.block_hash())
877                .await
878                .map(ignore),
879        );
880        fetches.push(
881            data_source
882                .get_payload(test_payload.block_hash())
883                .await
884                .map(ignore),
885        );
886        fetches.push(
887            data_source
888                .get_vid_common(test_common.block_hash())
889                .await
890                .map(ignore),
891        );
892        // * An unknown block height.
893        fetches.push(
894            data_source
895                .get_block(test_block.height() as usize)
896                .await
897                .map(ignore),
898        );
899        fetches.push(
900            data_source
901                .get_payload(test_payload.height() as usize)
902                .await
903                .map(ignore),
904        );
905        fetches.push(
906            data_source
907                .get_vid_common(test_common.height() as usize)
908                .await
909                .map(ignore),
910        );
911        // * Genesis VID common (no VID for genesis)
912        fetches.push(data_source.get_vid_common(0).await.map(ignore));
913        // * An unknown transaction.
914        fetches.push(
915            data_source
916                .get_block_containing_transaction(mock_transaction(vec![]).commit())
917                .await
918                .map(ignore),
919        );
920
921        // Even if we give data extra time to propagate, these requests will not resolve, since we
922        // didn't trigger any active fetches.
923        sleep(Duration::from_secs(1)).await;
924        for (i, fetch) in fetches.into_iter().enumerate() {
925            tracing::info!("checking fetch {i} is unresolved");
926            fetch.try_resolve().unwrap_err();
927        }
928
929        // Now we will actually fetch the missing data. First, since our node is not really
930        // connected to consensus, we need to give it a leaf after the range of interest so it
931        // learns about the correct block height. We will temporarily lock requests to the provider
932        // so that we can verify that without the provider, the node does _not_ get the data.
933        provider.block().await;
934        data_source
935            .append(leaves.last().cloned().unwrap().into())
936            .await
937            .unwrap();
938
939        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
940        let req_block = data_source.get_block(test_block.height() as usize).await;
941        let req_payload = data_source
942            .get_payload(test_payload.height() as usize)
943            .await;
944        let req_common = data_source
945            .get_vid_common(test_common.height() as usize)
946            .await;
947
948        // Give the requests some extra time to complete, and check that they still haven't
949        // resolved, since the provider is blocked. This just ensures the integrity of the test by
950        // checking the node didn't mysteriously get the block from somewhere else, so that when we
951        // unblock the provider and the node finally gets the block, we know it came from the
952        // provider.
953        sleep(Duration::from_secs(1)).await;
954        req_leaf.try_resolve().unwrap_err();
955        req_block.try_resolve().unwrap_err();
956        req_payload.try_resolve().unwrap_err();
957        req_common.try_resolve().unwrap_err();
958
959        // Unblock the request and see that we eventually receive the data.
960        provider.unblock().await;
961        let leaf = data_source
962            .get_leaf(test_leaf.height() as usize)
963            .await
964            .await;
965        let block = data_source
966            .get_block(test_block.height() as usize)
967            .await
968            .await;
969        let payload = data_source
970            .get_payload(test_payload.height() as usize)
971            .await
972            .await;
973        let common = data_source
974            .get_vid_common(test_common.height() as usize)
975            .await
976            .await;
977        {
978            // Verify the data.
979            let truth = network.data_source();
980            assert_eq!(
981                leaf,
982                truth.get_leaf(test_leaf.height() as usize).await.await
983            );
984            assert_eq!(
985                block,
986                truth.get_block(test_block.height() as usize).await.await
987            );
988            assert_eq!(
989                payload,
990                truth
991                    .get_payload(test_payload.height() as usize)
992                    .await
993                    .await
994            );
995            assert_eq!(
996                common,
997                truth
998                    .get_vid_common(test_common.height() as usize)
999                    .await
1000                    .await
1001            );
1002        }
1003
1004        // Fetching the block and payload should have also fetched the corresponding leaves, since
1005        // we have an invariant that we should not store a block in the database without its
1006        // corresponding leaf and header. Thus we should be able to get the leaves even if the
1007        // provider is blocked.
1008        provider.block().await;
1009        for leaf in [test_block, test_payload] {
1010            tracing::info!("fetching existing leaf {}", leaf.height());
1011            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1012            assert_eq!(*leaf, fetched_leaf);
1013        }
1014
1015        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
1016        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
1017        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
1018        // with this hash exists, and trigger a fetch for it.
1019        provider.unblock().await;
1020        {
1021            let block = data_source.get_block(test_leaf.block_hash()).await.await;
1022            assert_eq!(block.hash(), leaf.block_hash());
1023        }
1024
1025        // Test a similar scenario, but with payload instead of block: we are aware of
1026        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
1027        // hash.
1028        {
1029            let leaf = leaves.last().unwrap();
1030            let payload = data_source.get_payload(leaf.block_hash()).await.await;
1031            assert_eq!(payload.height(), leaf.height());
1032            assert_eq!(payload.block_hash(), leaf.block_hash());
1033            assert_eq!(payload.hash(), leaf.payload_hash());
1034        }
1035
1036        // Add more debug logs throughout the test
1037        tracing::info!("Test completed successfully!");
1038    }
1039
1040    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1041    async fn test_fetch_block_and_leaf_concurrently() {
1042        // Create the consensus network.
1043        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1044
1045        // Start a web server that the non-consensus node can use to fetch blocks.
1046        let port = pick_unused_port().unwrap();
1047        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1048        app.register_module(
1049            "availability",
1050            define_api(
1051                &Default::default(),
1052                MockBase::instance(),
1053                "1.0.0".parse().unwrap(),
1054            )
1055            .unwrap(),
1056        )
1057        .unwrap();
1058        network.spawn(
1059            "server",
1060            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1061        );
1062
1063        // Start a data source which is not receiving events from consensus, only from a peer.
1064        let db = TmpDb::init().await;
1065        let provider = Provider::new(QueryServiceProvider::new(
1066            format!("http://localhost:{port}").parse().unwrap(),
1067            MockBase::instance(),
1068        ));
1069        let data_source = data_source(&db, &provider).await;
1070
1071        // Start consensus.
1072        network.start().await;
1073
1074        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1075        // block at the end, and then one block that we can use to test fetching.
1076        let leaves = network.data_source().subscribe_leaves(1).await;
1077        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1078        let test_leaf = &leaves[0];
1079
1080        // Tell the node about a leaf after the one of interest so it learns about the block height.
1081        data_source.append(leaves[1].clone().into()).await.unwrap();
1082
1083        // Fetch a leaf and the corresponding block at the same time. This will result in two tasks
1084        // trying to fetch the same leaf, but one should win and notify the other, which ultimately
1085        // ends up not fetching anything.
1086        let (leaf, block) = join(
1087            data_source
1088                .get_leaf(test_leaf.height() as usize)
1089                .await
1090                .into_future(),
1091            data_source
1092                .get_block(test_leaf.height() as usize)
1093                .await
1094                .into_future(),
1095        )
1096        .await;
1097        assert_eq!(leaf, *test_leaf);
1098        assert_eq!(leaf.header(), block.header());
1099    }
1100
1101    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1102    async fn test_fetch_different_blocks_same_payload() {
1103        // Create the consensus network.
1104        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1105
1106        // Start a web server that the non-consensus node can use to fetch blocks.
1107        let port = pick_unused_port().unwrap();
1108        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1109        app.register_module(
1110            "availability",
1111            define_api(
1112                &Default::default(),
1113                MockBase::instance(),
1114                "1.0.0".parse().unwrap(),
1115            )
1116            .unwrap(),
1117        )
1118        .unwrap();
1119        network.spawn(
1120            "server",
1121            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1122        );
1123
1124        // Start a data source which is not receiving events from consensus, only from a peer.
1125        let db = TmpDb::init().await;
1126        let provider = Provider::new(QueryServiceProvider::new(
1127            format!("http://localhost:{port}").parse().unwrap(),
1128            MockBase::instance(),
1129        ));
1130        let data_source = data_source(&db, &provider).await;
1131
1132        // Start consensus.
1133        network.start().await;
1134
1135        // Wait until the block height reaches 4. This gives us the genesis block, one additional
1136        // block at the end, and then two blocks that we can use to test fetching.
1137        let leaves = network.data_source().subscribe_leaves(1).await;
1138        let leaves = leaves.take(4).collect::<Vec<_>>().await;
1139
1140        // Tell the node about a leaf after the range of interest so it learns about the block
1141        // height.
1142        data_source
1143            .append(leaves.last().cloned().unwrap().into())
1144            .await
1145            .unwrap();
1146
1147        // All the blocks here are empty, so they have the same payload:
1148        assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1149        // If we fetch both blocks at the same time, we can check that we haven't broken anything
1150        // with whatever optimizations we add to deduplicate payload fetching.
1151        let (block1, block2) = join(
1152            data_source
1153                .get_block(leaves[0].height() as usize)
1154                .await
1155                .into_future(),
1156            data_source
1157                .get_block(leaves[1].height() as usize)
1158                .await
1159                .into_future(),
1160        )
1161        .await;
1162        assert_eq!(block1.header(), leaves[0].header());
1163        assert_eq!(block2.header(), leaves[1].header());
1164    }
1165
1166    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1167    async fn test_fetch_stream() {
1168        // Create the consensus network.
1169        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1170
1171        // Start a web server that the non-consensus node can use to fetch blocks.
1172        let port = pick_unused_port().unwrap();
1173        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1174        app.register_module(
1175            "availability",
1176            define_api(
1177                &Default::default(),
1178                MockBase::instance(),
1179                "1.0.0".parse().unwrap(),
1180            )
1181            .unwrap(),
1182        )
1183        .unwrap();
1184        network.spawn(
1185            "server",
1186            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1187        );
1188
1189        // Start a data source which is not receiving events from consensus, only from a peer.
1190        let db = TmpDb::init().await;
1191        let provider = Provider::new(QueryServiceProvider::new(
1192            format!("http://localhost:{port}").parse().unwrap(),
1193            MockBase::instance(),
1194        ));
1195        let data_source = data_source(&db, &provider).await;
1196
1197        // Start consensus.
1198        network.start().await;
1199
1200        // Subscribe to objects from the future.
1201        let blocks = data_source.subscribe_blocks(0).await;
1202        let leaves = data_source.subscribe_leaves(0).await;
1203        let common = data_source.subscribe_vid_common(0).await;
1204
1205        // Wait for a few blocks to be finalized.
1206        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1207        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1208
1209        // Tell the node about a leaf after the range of interest so it learns about the block
1210        // height.
1211        data_source
1212            .append(finalized_leaves.last().cloned().unwrap().into())
1213            .await
1214            .unwrap();
1215
1216        // Check the subscriptions.
1217        let blocks = blocks.take(5).collect::<Vec<_>>().await;
1218        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1219        let common = common.take(5).collect::<Vec<_>>().await;
1220        for i in 0..5 {
1221            tracing::info!("checking block {i}");
1222            assert_eq!(leaves[i], finalized_leaves[i]);
1223            assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1224            assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1225        }
1226    }
1227
1228    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1229    async fn test_fetch_range_start() {
1230        // Create the consensus network.
1231        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1232
1233        // Start a web server that the non-consensus node can use to fetch blocks.
1234        let port = pick_unused_port().unwrap();
1235        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1236        app.register_module(
1237            "availability",
1238            define_api(
1239                &Default::default(),
1240                MockBase::instance(),
1241                "1.0.0".parse().unwrap(),
1242            )
1243            .unwrap(),
1244        )
1245        .unwrap();
1246        network.spawn(
1247            "server",
1248            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1249        );
1250
1251        // Start a data source which is not receiving events from consensus, only from a peer.
1252        let db = TmpDb::init().await;
1253        let provider = Provider::new(QueryServiceProvider::new(
1254            format!("http://localhost:{port}").parse().unwrap(),
1255            MockBase::instance(),
1256        ));
1257        let data_source = data_source(&db, &provider).await;
1258
1259        // Start consensus.
1260        network.start().await;
1261
1262        // Wait for a few blocks to be finalized.
1263        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1264        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1265
1266        // Tell the node about a leaf after the range of interest (so it learns about the block
1267        // height) and one in the middle of the range, to test what happens when data is missing
1268        // from the beginning of the range but other data is available.
1269        let mut tx = data_source.write().await.unwrap();
1270        tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1271        tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1272        tx.commit().await.unwrap();
1273
1274        // Get the whole range of leaves.
1275        let leaves = data_source
1276            .get_leaf_range(..5)
1277            .await
1278            .then(Fetch::resolve)
1279            .collect::<Vec<_>>()
1280            .await;
1281        for i in 0..5 {
1282            tracing::info!("checking leaf {i}");
1283            assert_eq!(leaves[i], finalized_leaves[i]);
1284        }
1285    }
1286
1287    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1288    async fn fetch_transaction() {
1289        // Create the consensus network.
1290        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1291
1292        // Start a web server that the non-consensus node can use to fetch blocks.
1293        let port = pick_unused_port().unwrap();
1294        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1295        app.register_module(
1296            "availability",
1297            define_api(
1298                &Default::default(),
1299                MockBase::instance(),
1300                "1.0.0".parse().unwrap(),
1301            )
1302            .unwrap(),
1303        )
1304        .unwrap();
1305        network.spawn(
1306            "server",
1307            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1308        );
1309
1310        // Start a data source which is not receiving events from consensus. We don't give it a
1311        // fetcher since transactions are always fetched passively anyways.
1312        let db = TmpDb::init().await;
1313        let data_source = data_source(&db, &NoFetching).await;
1314
1315        // Subscribe to blocks.
1316        let mut leaves = network.data_source().subscribe_leaves(1).await;
1317        let mut blocks = network.data_source().subscribe_blocks(1).await;
1318
1319        // Start consensus.
1320        network.start().await;
1321
1322        // Subscribe to a transaction which hasn't been sequenced yet. This is completely passive
1323        // and works without a fetcher; we don't trigger fetches for transactions that we don't know
1324        // exist.
1325        let tx = mock_transaction(vec![1, 2, 3]);
1326        let fut = data_source
1327            .get_block_containing_transaction(tx.commit())
1328            .await;
1329
1330        // Sequence the transaction.
1331        network.submit_transaction(tx.clone()).await;
1332
1333        // Send blocks to the query service, the future will resolve as soon as it sees a block
1334        // containing the transaction.
1335        let block = loop {
1336            let leaf = leaves.next().await.unwrap();
1337            let block = blocks.next().await.unwrap();
1338
1339            data_source
1340                .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1341                .await
1342                .unwrap();
1343
1344            if block.transaction_by_hash(tx.commit()).is_some() {
1345                break block;
1346            }
1347        };
1348        tracing::info!("transaction included in block {}", block.height());
1349
1350        let fetched_tx = fut.await;
1351        assert_eq!(
1352            fetched_tx,
1353            BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1354        );
1355
1356        // Future queries for this transaction resolve immediately.
1357        assert_eq!(
1358            fetched_tx,
1359            data_source
1360                .get_block_containing_transaction(tx.commit())
1361                .await
1362                .await
1363        );
1364    }
1365
1366    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1367    async fn test_retry() {
1368        // Create the consensus network.
1369        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1370
1371        // Start a web server that the non-consensus node can use to fetch blocks.
1372        let port = pick_unused_port().unwrap();
1373        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1374        app.register_module(
1375            "availability",
1376            define_api(
1377                &Default::default(),
1378                MockBase::instance(),
1379                "1.0.0".parse().unwrap(),
1380            )
1381            .unwrap(),
1382        )
1383        .unwrap();
1384        network.spawn(
1385            "server",
1386            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1387        );
1388
1389        // Start a data source which is not receiving events from consensus.
1390        let db = TmpDb::init().await;
1391        let provider = Provider::new(QueryServiceProvider::new(
1392            format!("http://localhost:{port}").parse().unwrap(),
1393            MockBase::instance(),
1394        ));
1395        let data_source = builder(&db, &provider)
1396            .await
1397            .with_max_retry_interval(Duration::from_secs(1))
1398            .build()
1399            .await
1400            .unwrap();
1401
1402        // Start consensus.
1403        network.start().await;
1404
1405        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1406        // block at the end, and one block to try fetching.
1407        let leaves = network.data_source().subscribe_leaves(1).await;
1408        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1409        let test_leaf = &leaves[0];
1410
1411        // Cause requests to fail temporarily, so we can test retries.
1412        provider.fail();
1413
1414        // Give the node a leaf after the range of interest so it learns about the correct block
1415        // height.
1416        data_source
1417            .append(leaves.last().cloned().unwrap().into())
1418            .await
1419            .unwrap();
1420
1421        tracing::info!("requesting leaf from failing providers");
1422        let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1423
1424        // Wait a few retries and check that the request has not completed, since the provider is
1425        // failing.
1426        sleep(Duration::from_secs(5)).await;
1427        fut.try_resolve().unwrap_err();
1428
1429        // As soon as the provider recovers, the request can complete.
1430        provider.unfail();
1431        assert_eq!(
1432            data_source
1433                .get_leaf(test_leaf.height() as usize)
1434                .await
1435                .await,
1436            *test_leaf
1437        );
1438    }
1439
1440    // Uses deprecated generic-array 0.14.x via digest, required for VidCommitment compatibility
1441    #[allow(deprecated)]
1442    fn random_vid_commit() -> VidCommitment {
1443        let mut bytes = [0; 32];
1444        rand::thread_rng().fill_bytes(&mut bytes);
1445        VidCommitment::V0(GenericArray::from(bytes).into())
1446    }
1447
1448    async fn malicious_server(port: u16) {
1449        let mut api = load_api::<(), ServerError, MockBase>(
1450            None::<std::path::PathBuf>,
1451            include_str!("../../../api/availability.toml"),
1452            vec![],
1453        )
1454        .unwrap();
1455
1456        api.get("get_payload", move |_, _| {
1457            async move {
1458                // No matter what data we are asked for, always respond with dummy data.
1459                Ok(PayloadQueryData::<MockTypes>::genesis::<TestVersions>(
1460                    &Default::default(),
1461                    &Default::default(),
1462                )
1463                .await)
1464            }
1465            .boxed()
1466        })
1467        .unwrap()
1468        .get("get_vid_common", move |_, _| {
1469            async move {
1470                // No matter what data we are asked for, always respond with dummy data.
1471                Ok(VidCommonQueryData::<MockTypes>::genesis::<TestVersions>(
1472                    &Default::default(),
1473                    &Default::default(),
1474                )
1475                .await)
1476            }
1477            .boxed()
1478        })
1479        .unwrap();
1480
1481        let mut app = App::<(), ServerError>::with_state(());
1482        app.register_module("availability", api).unwrap();
1483        app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1484            .await
1485            .ok();
1486    }
1487
1488    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1489    async fn test_fetch_from_malicious_server() {
1490        let port = pick_unused_port().unwrap();
1491        let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1492
1493        let provider = QueryServiceProvider::new(
1494            format!("http://localhost:{port}").parse().unwrap(),
1495            MockBase::instance(),
1496        );
1497        provider.client.connect(None).await;
1498
1499        // Query for a random payload, the server will respond with a different payload, and we
1500        // should detect the error.
1501        let res =
1502            ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1503                .await;
1504        assert_eq!(res, None);
1505
1506        // Query for a random VID common, the server will respond with a different one, and we
1507        // should detect the error.
1508        let res =
1509            ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1510                .await;
1511        assert_eq!(res, None);
1512    }
1513
1514    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1515    async fn test_archive_recovery() {
1516        // Create the consensus network.
1517        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1518
1519        // Start a web server that the non-consensus node can use to fetch blocks.
1520        let port = pick_unused_port().unwrap();
1521        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1522        app.register_module(
1523            "availability",
1524            define_api(
1525                &Default::default(),
1526                MockBase::instance(),
1527                "1.0.0".parse().unwrap(),
1528            )
1529            .unwrap(),
1530        )
1531        .unwrap();
1532        network.spawn(
1533            "server",
1534            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1535        );
1536
1537        // Start a data source which is not receiving events from consensus, only from a peer. The
1538        // data source is at first configured to aggressively prune data.
1539        let db = TmpDb::init().await;
1540        let provider = Provider::new(QueryServiceProvider::new(
1541            format!("http://localhost:{port}").parse().unwrap(),
1542            MockBase::instance(),
1543        ));
1544        let mut data_source = db
1545            .config()
1546            .pruner_cfg(
1547                PrunerCfg::new()
1548                    .with_target_retention(Duration::from_secs(0))
1549                    .with_interval(Duration::from_secs(5)),
1550            )
1551            .unwrap()
1552            .builder(provider.clone())
1553            .await
1554            .unwrap()
1555            // Set a fast retry for failed operations. Occasionally storage operations will fail due
1556            // to conflicting write-mode transactions running concurrently. This is ok as they will
1557            // be retried. Having a fast retry interval speeds up the test.
1558            .with_min_retry_interval(Duration::from_millis(100))
1559            // Randomize retries a lot. This will temporarlly separate competing transactions write
1560            // transactions with high probability, so that one of them quickly gets exclusive access
1561            // to the database.
1562            .with_retry_randomization_factor(3.)
1563            .build()
1564            .await
1565            .unwrap();
1566
1567        // Start consensus.
1568        network.start().await;
1569
1570        // Wait until a few blocks are produced.
1571        let leaves = network.data_source().subscribe_leaves(1).await;
1572        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1573
1574        // The disconnected data source has no data yet, so it hasn't done any pruning.
1575        let pruned_height = data_source
1576            .read()
1577            .await
1578            .unwrap()
1579            .load_pruned_height()
1580            .await
1581            .unwrap();
1582        // Either None or 0 is acceptable, depending on whether or not the prover has run yet.
1583        assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1584
1585        // Send the last leaf to the disconnected data source so it learns about the height and
1586        // fetches the missing data.
1587        let last_leaf = leaves.last().unwrap();
1588        data_source.append(last_leaf.clone().into()).await.unwrap();
1589
1590        // Trigger a fetch of each leaf so the database gets populated.
1591        for i in 1..=last_leaf.height() {
1592            tracing::info!(i, "fetching leaf");
1593            assert_eq!(
1594                data_source.get_leaf(i as usize).await.await,
1595                leaves[i as usize - 1]
1596            );
1597        }
1598
1599        // After a bit of time, the pruner has run and deleted all the missing data we just fetched.
1600        loop {
1601            let pruned_height = data_source
1602                .read()
1603                .await
1604                .unwrap()
1605                .load_pruned_height()
1606                .await
1607                .unwrap();
1608            if pruned_height == Some(last_leaf.height()) {
1609                break;
1610            }
1611            tracing::info!(
1612                ?pruned_height,
1613                target_height = last_leaf.height(),
1614                "waiting for pruner to run"
1615            );
1616            sleep(Duration::from_secs(1)).await;
1617        }
1618
1619        // Now close the data source and restart it with archive recovery.
1620        data_source = db
1621            .config()
1622            .archive()
1623            .builder(provider.clone())
1624            .await
1625            .unwrap()
1626            .with_minor_scan_interval(Duration::from_secs(1))
1627            .with_major_scan_interval(1)
1628            .build()
1629            .await
1630            .unwrap();
1631
1632        // Pruned height should be reset.
1633        let pruned_height = data_source
1634            .read()
1635            .await
1636            .unwrap()
1637            .load_pruned_height()
1638            .await
1639            .unwrap();
1640        assert_eq!(pruned_height, None);
1641
1642        // The node has pruned all of it's data including the latest block, so it's forgotten the
1643        // block height. We need to give it another leaf with some height so it will be willing to
1644        // fetch.
1645        data_source.append(last_leaf.clone().into()).await.unwrap();
1646
1647        // Wait for the data to be restored. It should be restored by the next major scan.
1648        loop {
1649            let sync_status = data_source.sync_status().await.unwrap();
1650
1651            // VID shares are unique to a node and will never be fetched from a peer; this is
1652            // acceptable since there is redundancy built into the VID scheme. Ignore missing VID
1653            // shares in the `is_fully_synced` check.
1654            if (SyncStatus {
1655                missing_vid_shares: 0,
1656                ..sync_status
1657            })
1658            .is_fully_synced()
1659            {
1660                break;
1661            }
1662            tracing::info!(?sync_status, "waiting for node to sync");
1663            sleep(Duration::from_secs(1)).await;
1664        }
1665
1666        // The node remains fully synced even after some time; no pruning.
1667        sleep(Duration::from_secs(3)).await;
1668        let sync_status = data_source.sync_status().await.unwrap();
1669        assert!(
1670            (SyncStatus {
1671                missing_vid_shares: 0,
1672                ..sync_status
1673            })
1674            .is_fully_synced(),
1675            "{sync_status:?}"
1676        );
1677    }
1678
1679    #[derive(Clone, Copy, Debug)]
1680    enum FailureType {
1681        Begin,
1682        Write,
1683        Commit,
1684    }
1685
1686    async fn test_fetch_storage_failure_helper(failure: FailureType) {
1687        // Create the consensus network.
1688        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1689
1690        // Start a web server that the non-consensus node can use to fetch blocks.
1691        let port = pick_unused_port().unwrap();
1692        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1693        app.register_module(
1694            "availability",
1695            define_api(
1696                &Default::default(),
1697                MockBase::instance(),
1698                "1.0.0".parse().unwrap(),
1699            )
1700            .unwrap(),
1701        )
1702        .unwrap();
1703        network.spawn(
1704            "server",
1705            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1706        );
1707
1708        // Start a data source which is not receiving events from consensus, only from a peer.
1709        let provider = Provider::new(QueryServiceProvider::new(
1710            format!("http://localhost:{port}").parse().unwrap(),
1711            MockBase::instance(),
1712        ));
1713        let db = TmpDb::init().await;
1714        let storage = FailStorage::from(
1715            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1716                .await
1717                .unwrap(),
1718        );
1719        let data_source = FetchingDataSource::builder(storage, provider)
1720            .disable_proactive_fetching()
1721            .disable_aggregator()
1722            .with_max_retry_interval(Duration::from_millis(100))
1723            .with_retry_timeout(Duration::from_secs(1))
1724            .build()
1725            .await
1726            .unwrap();
1727
1728        // Start consensus.
1729        network.start().await;
1730
1731        // Wait until a couple of blocks are produced.
1732        let leaves = network.data_source().subscribe_leaves(1).await;
1733        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1734
1735        // Send the last leaf to the disconnected data source so it learns about the height.
1736        let last_leaf = leaves.last().unwrap();
1737        let mut tx = data_source.write().await.unwrap();
1738        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1739        tx.commit().await.unwrap();
1740
1741        // Trigger a fetch of the first leaf; it should resolve even if we fail to store the leaf.
1742        tracing::info!("fetch with write failure");
1743        match failure {
1744            FailureType::Begin => {
1745                data_source
1746                    .as_ref()
1747                    .fail_begins_writable(FailableAction::Any)
1748                    .await
1749            },
1750            FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1751            FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1752        }
1753        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1754        data_source.as_ref().pass().await;
1755
1756        // It is possible that the fetch above completes, notifies the subscriber,
1757        // and the fetch below quickly subscribes and gets notified by the same loop.
1758        // We add a delay here to avoid this situation.
1759        // This is not a bug, as being notified after subscribing is fine.
1760        sleep(Duration::from_secs(1)).await;
1761
1762        // We can get the same leaf again, this will again trigger an active fetch since storage
1763        // failed the first time.
1764        tracing::info!("fetch with write success");
1765        let fetch = data_source.get_leaf(1).await;
1766        assert!(fetch.is_pending());
1767        assert_eq!(leaves[0], fetch.await);
1768
1769        sleep(Duration::from_secs(1)).await;
1770
1771        // Finally, we should have the leaf locally and not need to fetch it.
1772        tracing::info!("retrieve from storage");
1773        let fetch = data_source.get_leaf(1).await;
1774        assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1775    }
1776
1777    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1778    async fn test_fetch_storage_failure_on_begin() {
1779        test_fetch_storage_failure_helper(FailureType::Begin).await;
1780    }
1781
1782    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1783    async fn test_fetch_storage_failure_on_write() {
1784        test_fetch_storage_failure_helper(FailureType::Write).await;
1785    }
1786
1787    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1788    async fn test_fetch_storage_failure_on_commit() {
1789        test_fetch_storage_failure_helper(FailureType::Commit).await;
1790    }
1791
1792    async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1793        // Create the consensus network.
1794        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1795
1796        // Start a web server that the non-consensus node can use to fetch blocks.
1797        let port = pick_unused_port().unwrap();
1798        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1799        app.register_module(
1800            "availability",
1801            define_api(
1802                &Default::default(),
1803                MockBase::instance(),
1804                "1.0.0".parse().unwrap(),
1805            )
1806            .unwrap(),
1807        )
1808        .unwrap();
1809        network.spawn(
1810            "server",
1811            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1812        );
1813
1814        // Start a data source which is not receiving events from consensus, only from a peer.
1815        let provider = Provider::new(QueryServiceProvider::new(
1816            format!("http://localhost:{port}").parse().unwrap(),
1817            MockBase::instance(),
1818        ));
1819        let db = TmpDb::init().await;
1820        let storage = FailStorage::from(
1821            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1822                .await
1823                .unwrap(),
1824        );
1825        let data_source = FetchingDataSource::builder(storage, provider)
1826            .disable_proactive_fetching()
1827            .disable_aggregator()
1828            .with_min_retry_interval(Duration::from_millis(100))
1829            .build()
1830            .await
1831            .unwrap();
1832
1833        // Start consensus.
1834        network.start().await;
1835
1836        // Wait until a couple of blocks are produced.
1837        let leaves = network.data_source().subscribe_leaves(1).await;
1838        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1839
1840        // Send the last leaf to the disconnected data source so it learns about the height.
1841        let last_leaf = leaves.last().unwrap();
1842        let mut tx = data_source.write().await.unwrap();
1843        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1844        tx.commit().await.unwrap();
1845
1846        // Trigger a fetch of the first leaf; it should retry until it successfully stores the leaf.
1847        tracing::info!("fetch with write failure");
1848        match failure {
1849            FailureType::Begin => {
1850                data_source
1851                    .as_ref()
1852                    .fail_one_begin_writable(FailableAction::Any)
1853                    .await
1854            },
1855            FailureType::Write => {
1856                data_source
1857                    .as_ref()
1858                    .fail_one_write(FailableAction::Any)
1859                    .await
1860            },
1861            FailureType::Commit => {
1862                data_source
1863                    .as_ref()
1864                    .fail_one_commit(FailableAction::Any)
1865                    .await
1866            },
1867        }
1868        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1869
1870        // Check that the leaf ended up in local storage.
1871        let mut tx = data_source.read().await.unwrap();
1872        assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1873    }
1874
1875    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1876    async fn test_fetch_storage_failure_retry_on_begin() {
1877        test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1878    }
1879
1880    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1881    async fn test_fetch_storage_failure_retry_on_write() {
1882        test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1883    }
1884
1885    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1886    async fn test_fetch_storage_failure_retry_on_commit() {
1887        test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1888    }
1889
1890    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1891    async fn test_fetch_on_decide() {
1892        // Create the consensus network.
1893        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1894
1895        // Start a web server that the non-consensus node can use to fetch blocks.
1896        let port = pick_unused_port().unwrap();
1897        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1898        app.register_module(
1899            "availability",
1900            define_api(
1901                &Default::default(),
1902                MockBase::instance(),
1903                "1.0.0".parse().unwrap(),
1904            )
1905            .unwrap(),
1906        )
1907        .unwrap();
1908        network.spawn(
1909            "server",
1910            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1911        );
1912
1913        // Start a data source which is not receiving events from consensus.
1914        let db = TmpDb::init().await;
1915        let provider = Provider::new(QueryServiceProvider::new(
1916            format!("http://localhost:{port}").parse().unwrap(),
1917            MockBase::instance(),
1918        ));
1919        let data_source = builder(&db, &provider)
1920            .await
1921            .with_max_retry_interval(Duration::from_secs(1))
1922            .build()
1923            .await
1924            .unwrap();
1925
1926        // Start consensus.
1927        network.start().await;
1928
1929        // Wait until a block has been decided.
1930        let leaf = network
1931            .data_source()
1932            .subscribe_leaves(1)
1933            .await
1934            .next()
1935            .await
1936            .unwrap();
1937
1938        // Give the node a decide containing the leaf but no additional information.
1939        data_source.append(leaf.clone().into()).await.unwrap();
1940
1941        // We will eventually retrieve the corresponding block and VID common, triggered by seeing
1942        // the leaf.
1943        sleep(Duration::from_secs(5)).await;
1944
1945        // Read the missing data directly from storage (via a database transaction), rather than
1946        // going through the data source, so that this request itself does not trigger a fetch.
1947        // Thus, this will only work if the data was already fetched, triggered by the leaf.
1948        let mut tx = data_source.read().await.unwrap();
1949        let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1950        let block = tx.get_block(id).await.unwrap();
1951        let vid = tx.get_vid_common(id).await.unwrap();
1952
1953        assert_eq!(block.hash(), leaf.block_hash());
1954        assert_eq!(vid.block_hash(), leaf.block_hash());
1955    }
1956
1957    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1958    async fn test_fetch_begin_failure() {
1959        // Create the consensus network.
1960        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1961
1962        // Start a web server that the non-consensus node can use to fetch blocks.
1963        let port = pick_unused_port().unwrap();
1964        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1965        app.register_module(
1966            "availability",
1967            define_api(
1968                &Default::default(),
1969                MockBase::instance(),
1970                "1.0.0".parse().unwrap(),
1971            )
1972            .unwrap(),
1973        )
1974        .unwrap();
1975        network.spawn(
1976            "server",
1977            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1978        );
1979
1980        // Start a data source which is not receiving events from consensus, only from a peer.
1981        let provider = Provider::new(QueryServiceProvider::new(
1982            format!("http://localhost:{port}").parse().unwrap(),
1983            MockBase::instance(),
1984        ));
1985        let db = TmpDb::init().await;
1986        let storage = FailStorage::from(
1987            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1988                .await
1989                .unwrap(),
1990        );
1991        let data_source = FetchingDataSource::builder(storage, provider)
1992            .disable_proactive_fetching()
1993            .disable_aggregator()
1994            .with_min_retry_interval(Duration::from_millis(100))
1995            .build()
1996            .await
1997            .unwrap();
1998
1999        // Start consensus.
2000        network.start().await;
2001
2002        // Wait until a couple of blocks are produced.
2003        let leaves = network.data_source().subscribe_leaves(1).await;
2004        let leaves = leaves.take(2).collect::<Vec<_>>().await;
2005
2006        // Send the last leaf to the disconnected data source so it learns about the height.
2007        let last_leaf = leaves.last().unwrap();
2008        let mut tx = data_source.write().await.unwrap();
2009        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2010        tx.commit().await.unwrap();
2011
2012        // Trigger a fetch of the first leaf; it should retry until it is able to determine
2013        // the leaf is fetchable and trigger the fetch.
2014        tracing::info!("fetch with transaction failure");
2015        data_source
2016            .as_ref()
2017            .fail_one_begin_read_only(FailableAction::Any)
2018            .await;
2019        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2020    }
2021
2022    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2023    async fn test_fetch_load_failure_block() {
2024        // Create the consensus network.
2025        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2026
2027        // Start a web server that the non-consensus node can use to fetch blocks.
2028        let port = pick_unused_port().unwrap();
2029        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2030        app.register_module(
2031            "availability",
2032            define_api(
2033                &Default::default(),
2034                MockBase::instance(),
2035                "1.0.0".parse().unwrap(),
2036            )
2037            .unwrap(),
2038        )
2039        .unwrap();
2040        network.spawn(
2041            "server",
2042            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2043        );
2044
2045        // Start a data source which is not receiving events from consensus, only from a peer.
2046        let provider = Provider::new(QueryServiceProvider::new(
2047            format!("http://localhost:{port}").parse().unwrap(),
2048            MockBase::instance(),
2049        ));
2050        let db = TmpDb::init().await;
2051        let storage = FailStorage::from(
2052            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2053                .await
2054                .unwrap(),
2055        );
2056        let data_source = FetchingDataSource::builder(storage, provider)
2057            .disable_proactive_fetching()
2058            .disable_aggregator()
2059            .with_min_retry_interval(Duration::from_millis(100))
2060            .build()
2061            .await
2062            .unwrap();
2063
2064        // Start consensus.
2065        network.start().await;
2066
2067        // Wait until a block is produced.
2068        let mut leaves = network.data_source().subscribe_leaves(1).await;
2069        let leaf = leaves.next().await.unwrap();
2070
2071        // Send the leaf to the disconnected data source, so the corresponding block becomes
2072        // fetchable.
2073        let mut tx = data_source.write().await.unwrap();
2074        tx.insert_leaf(leaf.clone()).await.unwrap();
2075        tx.commit().await.unwrap();
2076
2077        // Trigger a fetch of the block by hash; it should retry until it is able to determine the
2078        // leaf is available, thus the block is fetchable, trigger the fetch.
2079        //
2080        // Failing only on the `get_header` call here hits an edge case which is only possible when
2081        // fetching blocks: we successfully determine that the object is not available locally and
2082        // that it might exist, so we actually call `active_fetch` to try and get it. If we then
2083        // fail to load the header and erroneously treat this as the header not being available, we
2084        // will give up and consider the object unfetchable (since the next step would be to fetch
2085        // the corresponding leaf, but we cannot do this with just a block hash).
2086        //
2087        // Thus, this test will only pass if we correctly retry the `active_fetch` until we are
2088        // successfully able to load the header from storage and determine that the block is
2089        // fetchable.
2090        tracing::info!("fetch with read failure");
2091        data_source
2092            .as_ref()
2093            .fail_one_read(FailableAction::GetHeader)
2094            .await;
2095        let fetch = data_source.get_block(leaf.block_hash()).await;
2096
2097        // Give some time for a few reads to fail before letting them succeed.
2098        sleep(Duration::from_secs(2)).await;
2099        data_source.as_ref().pass().await;
2100
2101        let block: BlockQueryData<MockTypes> = fetch.await;
2102        assert_eq!(block.hash(), leaf.block_hash());
2103    }
2104
2105    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2106    async fn test_fetch_load_failure_tx() {
2107        // Create the consensus network.
2108        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2109
2110        // Start a web server that the non-consensus node can use to fetch blocks.
2111        let port = pick_unused_port().unwrap();
2112        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2113        app.register_module(
2114            "availability",
2115            define_api(
2116                &Default::default(),
2117                MockBase::instance(),
2118                "1.0.0".parse().unwrap(),
2119            )
2120            .unwrap(),
2121        )
2122        .unwrap();
2123        network.spawn(
2124            "server",
2125            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2126        );
2127
2128        // Start a data source which is not receiving events from consensus, only from a peer.
2129        let provider = Provider::new(QueryServiceProvider::new(
2130            format!("http://localhost:{port}").parse().unwrap(),
2131            MockBase::instance(),
2132        ));
2133        let db = TmpDb::init().await;
2134        let storage = FailStorage::from(
2135            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2136                .await
2137                .unwrap(),
2138        );
2139        let data_source = FetchingDataSource::builder(storage, provider)
2140            .disable_proactive_fetching()
2141            .disable_aggregator()
2142            .with_min_retry_interval(Duration::from_millis(100))
2143            .build()
2144            .await
2145            .unwrap();
2146
2147        // Start consensus.
2148        network.start().await;
2149
2150        // Wait until a transaction is sequenced.
2151        let tx = mock_transaction(vec![1, 2, 3]);
2152        network.submit_transaction(tx.clone()).await;
2153        let tx = network
2154            .data_source()
2155            .get_block_containing_transaction(tx.commit())
2156            .await
2157            .await;
2158
2159        // Send the block containing the transaction to the disconnected data source.
2160        {
2161            let leaf = network
2162                .data_source()
2163                .get_leaf(tx.transaction.block_height() as usize)
2164                .await
2165                .await;
2166            let block = network
2167                .data_source()
2168                .get_block(tx.transaction.block_height() as usize)
2169                .await
2170                .await;
2171            let mut tx = data_source.write().await.unwrap();
2172            tx.insert_leaf(leaf.clone()).await.unwrap();
2173            tx.insert_block(block.clone()).await.unwrap();
2174            tx.commit().await.unwrap();
2175        }
2176
2177        // Check that the transaction is there.
2178        tracing::info!("fetch success");
2179        assert_eq!(
2180            tx,
2181            data_source
2182                .get_block_containing_transaction(tx.transaction.hash())
2183                .await
2184                .await
2185        );
2186
2187        // Fetch the transaction with storage failures.
2188        //
2189        // Failing only one read here hits an edge case that only exists for unfetchable objects
2190        // (e.g. transactions). This will cause the initial aload of the transaction to fail, but,
2191        // if we erroneously treat this load failure as the transaction being missing, we will
2192        // succeed in calling `fetch`, since subsequent loads succeed. However, since a transaction
2193        // is not active-fetchable, no active fetch will actually be spawned, and this fetch will
2194        // never resolve.
2195        //
2196        // Thus, the test should only pass if we correctly retry the initial load until it succeeds
2197        // and we discover that the transaction doesn't need to be fetched at all.
2198        tracing::info!("fetch with read failure");
2199        data_source
2200            .as_ref()
2201            .fail_one_read(FailableAction::Any)
2202            .await;
2203        let fetch = data_source
2204            .get_block_containing_transaction(tx.transaction.hash())
2205            .await;
2206
2207        assert_eq!(tx, fetch.await);
2208    }
2209
2210    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2211    async fn test_stream_begin_failure() {
2212        // Create the consensus network.
2213        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2214
2215        // Start a web server that the non-consensus node can use to fetch blocks.
2216        let port = pick_unused_port().unwrap();
2217        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2218        app.register_module(
2219            "availability",
2220            define_api(
2221                &Default::default(),
2222                MockBase::instance(),
2223                "1.0.0".parse().unwrap(),
2224            )
2225            .unwrap(),
2226        )
2227        .unwrap();
2228        network.spawn(
2229            "server",
2230            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2231        );
2232
2233        // Start a data source which is not receiving events from consensus, only from a peer.
2234        let provider = Provider::new(QueryServiceProvider::new(
2235            format!("http://localhost:{port}").parse().unwrap(),
2236            MockBase::instance(),
2237        ));
2238        let db = TmpDb::init().await;
2239        let storage = FailStorage::from(
2240            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2241                .await
2242                .unwrap(),
2243        );
2244        let data_source = FetchingDataSource::builder(storage, provider)
2245            .disable_proactive_fetching()
2246            .disable_aggregator()
2247            .with_min_retry_interval(Duration::from_millis(100))
2248            .with_range_chunk_size(3)
2249            .build()
2250            .await
2251            .unwrap();
2252
2253        // Start consensus.
2254        network.start().await;
2255
2256        // Wait until a few blocks are produced.
2257        let leaves = network.data_source().subscribe_leaves(1).await;
2258        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2259
2260        // Send the last leaf to the disconnected data source so it learns about the height.
2261        let last_leaf = leaves.last().unwrap();
2262        let mut tx = data_source.write().await.unwrap();
2263        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2264        tx.commit().await.unwrap();
2265
2266        // Stream the leaves; it should retry until it is able to determine each leaf is fetchable
2267        // and trigger the fetch.
2268        tracing::info!("stream with transaction failure");
2269        data_source
2270            .as_ref()
2271            .fail_one_begin_read_only(FailableAction::Any)
2272            .await;
2273        assert_eq!(
2274            leaves,
2275            data_source
2276                .subscribe_leaves(1)
2277                .await
2278                .take(5)
2279                .collect::<Vec<_>>()
2280                .await
2281        );
2282    }
2283
2284    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2285    async fn test_stream_load_failure() {
2286        // Create the consensus network.
2287        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2288
2289        // Start a web server that the non-consensus node can use to fetch blocks.
2290        let port = pick_unused_port().unwrap();
2291        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2292        app.register_module(
2293            "availability",
2294            define_api(
2295                &Default::default(),
2296                MockBase::instance(),
2297                "1.0.0".parse().unwrap(),
2298            )
2299            .unwrap(),
2300        )
2301        .unwrap();
2302        network.spawn(
2303            "server",
2304            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2305        );
2306
2307        // Start a data source which is not receiving events from consensus, only from a peer.
2308        let provider = Provider::new(QueryServiceProvider::new(
2309            format!("http://localhost:{port}").parse().unwrap(),
2310            MockBase::instance(),
2311        ));
2312        let db = TmpDb::init().await;
2313        let storage = FailStorage::from(
2314            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2315                .await
2316                .unwrap(),
2317        );
2318        let data_source = FetchingDataSource::builder(storage, provider)
2319            .disable_proactive_fetching()
2320            .disable_aggregator()
2321            .with_min_retry_interval(Duration::from_millis(100))
2322            .with_range_chunk_size(3)
2323            .build()
2324            .await
2325            .unwrap();
2326
2327        // Start consensus.
2328        network.start().await;
2329
2330        // Wait until a few blocks are produced.
2331        let leaves = network.data_source().subscribe_leaves(1).await;
2332        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2333
2334        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2335        let last_leaf = leaves.last().unwrap();
2336        let mut tx = data_source.write().await.unwrap();
2337        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2338        tx.commit().await.unwrap();
2339
2340        // Stream the blocks with a period of database failures.
2341        tracing::info!("stream with read failure");
2342        data_source.as_ref().fail_reads(FailableAction::Any).await;
2343        let fetches = data_source
2344            .get_block_range(1..=5)
2345            .await
2346            .collect::<Vec<_>>()
2347            .await;
2348
2349        // Give some time for a few reads to fail before letting them succeed.
2350        sleep(Duration::from_secs(2)).await;
2351        data_source.as_ref().pass().await;
2352
2353        for (leaf, fetch) in leaves.iter().zip(fetches) {
2354            let block: BlockQueryData<MockTypes> = fetch.await;
2355            assert_eq!(block.hash(), leaf.block_hash());
2356        }
2357    }
2358
2359    enum MetadataType {
2360        Payload,
2361        Vid,
2362    }
2363
2364    async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2365        // Create the consensus network.
2366        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2367
2368        // Start a web server that the non-consensus node can use to fetch blocks.
2369        let port = pick_unused_port().unwrap();
2370        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2371        app.register_module(
2372            "availability",
2373            define_api(
2374                &Default::default(),
2375                MockBase::instance(),
2376                "1.0.0".parse().unwrap(),
2377            )
2378            .unwrap(),
2379        )
2380        .unwrap();
2381        network.spawn(
2382            "server",
2383            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2384        );
2385
2386        // Start a data source which is not receiving events from consensus, only from a peer.
2387        let provider = Provider::new(QueryServiceProvider::new(
2388            format!("http://localhost:{port}").parse().unwrap(),
2389            MockBase::instance(),
2390        ));
2391        let db = TmpDb::init().await;
2392        let storage = FailStorage::from(
2393            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2394                .await
2395                .unwrap(),
2396        );
2397        let data_source = FetchingDataSource::builder(storage, provider)
2398            .disable_proactive_fetching()
2399            .disable_aggregator()
2400            .with_min_retry_interval(Duration::from_millis(100))
2401            .with_range_chunk_size(3)
2402            .build()
2403            .await
2404            .unwrap();
2405
2406        // Start consensus.
2407        network.start().await;
2408
2409        // Wait until a few blocks are produced.
2410        let leaves = network.data_source().subscribe_leaves(1).await;
2411        let leaves = leaves.take(3).collect::<Vec<_>>().await;
2412
2413        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2414        let last_leaf = leaves.last().unwrap();
2415        let mut tx = data_source.write().await.unwrap();
2416        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2417        tx.commit().await.unwrap();
2418
2419        // Send the first object to the disconnected data source, so we hit all the cases:
2420        // * leaf present but not full object (from the last leaf)
2421        // * full object present but inaccessible due to storage failures (first object)
2422        // * nothing present (middle object)
2423        let leaf = network.data_source().get_leaf(1).await.await;
2424        let block = network.data_source().get_block(1).await.await;
2425        let vid = network.data_source().get_vid_common(1).await.await;
2426        data_source
2427            .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2428            .await
2429            .unwrap();
2430
2431        // Stream the objects with a period of database failures.
2432        tracing::info!("stream with transaction failure");
2433        data_source
2434            .as_ref()
2435            .fail_begins_read_only(FailableAction::Any)
2436            .await;
2437        match stream {
2438            MetadataType::Payload => {
2439                let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2440
2441                // Give some time for a few reads to fail before letting them succeed.
2442                sleep(Duration::from_secs(2)).await;
2443                tracing::info!("stop failing transactions");
2444                data_source.as_ref().pass().await;
2445
2446                let payloads = payloads.collect::<Vec<_>>().await;
2447                for (leaf, payload) in leaves.iter().zip(payloads) {
2448                    assert_eq!(payload.block_hash, leaf.block_hash());
2449                }
2450            },
2451            MetadataType::Vid => {
2452                let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2453
2454                // Give some time for a few reads to fail before letting them succeed.
2455                sleep(Duration::from_secs(2)).await;
2456                tracing::info!("stop failing transactions");
2457                data_source.as_ref().pass().await;
2458
2459                let vids = vids.collect::<Vec<_>>().await;
2460                for (leaf, vid) in leaves.iter().zip(vids) {
2461                    assert_eq!(vid.block_hash, leaf.block_hash());
2462                }
2463            },
2464        }
2465    }
2466
2467    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2468    async fn test_metadata_stream_begin_failure_payload() {
2469        test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2470    }
2471
2472    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2473    async fn test_metadata_stream_begin_failure_vid() {
2474        test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2475    }
2476
2477    // This helper function starts up a mock network
2478    // with v0 and v1 availability query modules,
2479    // trigger fetches for a datasource from the provider,
2480    // and asserts that the fetched data is correct
2481    async fn run_fallback_deserialization_test_helper<V: Versions>(port: u16, version: &str) {
2482        let mut network = MockNetwork::<MockDataSource, V>::init().await;
2483
2484        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2485
2486        // Register availability APIs for two versions: v0 and v1
2487        app.register_module(
2488            "availability",
2489            define_api(
2490                &Default::default(),
2491                StaticVersion::<0, 1> {},
2492                "0.0.1".parse().unwrap(),
2493            )
2494            .unwrap(),
2495        )
2496        .unwrap();
2497
2498        app.register_module(
2499            "availability",
2500            define_api(
2501                &Default::default(),
2502                StaticVersion::<0, 1> {},
2503                "1.0.0".parse().unwrap(),
2504            )
2505            .unwrap(),
2506        )
2507        .unwrap();
2508
2509        network.spawn(
2510            "server",
2511            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2512        );
2513
2514        let db = TmpDb::init().await;
2515
2516        let provider_url = format!("http://localhost:{port}/{version}")
2517            .parse()
2518            .expect("Invalid URL");
2519
2520        let provider = Provider::new(QueryServiceProvider::new(
2521            provider_url,
2522            StaticVersion::<0, 1> {},
2523        ));
2524
2525        let ds = data_source(&db, &provider).await;
2526        network.start().await;
2527
2528        let leaves = network.data_source().subscribe_leaves(1).await;
2529        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2530        let test_leaf = &leaves[0];
2531        let test_payload = &leaves[2];
2532        let test_common = &leaves[3];
2533
2534        let mut fetches = vec![];
2535        // Issue requests for missing data (these should initially remain unresolved):
2536        fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2537        fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2538        fetches.push(
2539            ds.get_vid_common(test_common.block_hash())
2540                .await
2541                .map(ignore),
2542        );
2543
2544        // Even if we give data extra time to propagate, these requests will not resolve, since we
2545        // didn't trigger any active fetches.
2546        sleep(Duration::from_secs(1)).await;
2547        for (i, fetch) in fetches.into_iter().enumerate() {
2548            tracing::info!("checking fetch {i} is unresolved");
2549            fetch.try_resolve().unwrap_err();
2550        }
2551
2552        // Append the latest known leaf to the local store
2553        // This would trigger fetches for the corresponding missing data
2554        // such as header, vid and payload
2555        // This would also trigger fetches for the parent data
2556        ds.append(leaves.last().cloned().unwrap().into())
2557            .await
2558            .unwrap();
2559
2560        // check that the data has been fetches and matches the network data source
2561        {
2562            let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2563            let payload = ds.get_payload(test_payload.height() as usize).await;
2564            let common = ds.get_vid_common(test_common.height() as usize).await;
2565
2566            let truth = network.data_source();
2567            assert_eq!(
2568                leaf.await,
2569                truth.get_leaf(test_leaf.height() as usize).await.await
2570            );
2571            assert_eq!(
2572                payload.await,
2573                truth
2574                    .get_payload(test_payload.height() as usize)
2575                    .await
2576                    .await
2577            );
2578            assert_eq!(
2579                common.await,
2580                truth
2581                    .get_vid_common(test_common.height() as usize)
2582                    .await
2583                    .await
2584            );
2585        }
2586    }
2587
2588    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2589    async fn test_fallback_deserialization_for_fetch_requests_v0() {
2590        let port = pick_unused_port().unwrap();
2591
2592        // This run will call v0 availalbilty api for fetch requests.
2593        // The fetch initially attempts deserialization with new types,
2594        // which fails because the v0 provider returns legacy types.
2595        // It then falls back to deserializing as legacy types,
2596        // and the fetch passes
2597        run_fallback_deserialization_test_helper::<MockVersions>(port, "v0").await;
2598    }
2599
2600    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2601    async fn test_fallback_deserialization_for_fetch_requests_v1() {
2602        let port = pick_unused_port().unwrap();
2603
2604        // Fetch from the v1 availability API using MockVersions.
2605        // this one fetches from the v1 provider.
2606        // which would correctly deserialize the bytes in the first attempt, so no fallback deserialization is needed
2607        run_fallback_deserialization_test_helper::<MockVersions>(port, "v1").await;
2608    }
2609
2610    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2611    async fn test_fallback_deserialization_for_fetch_requests_pos() {
2612        let port = pick_unused_port().unwrap();
2613
2614        // Fetch Proof of Stake (PoS) data using the v1 availability API
2615        // with proof of stake version
2616        run_fallback_deserialization_test_helper::<EpochsTestVersions>(port, "v1").await;
2617    }
2618    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2619    async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2620        // Run with the PoS version against a v0 provider.
2621        // Fetch requests are expected to fail because PoS commitments differ from the legacy commitments
2622        // returned by the v0 provider.
2623        // For example: a PoS Leaf2 commitment will not match the downgraded commitment from a legacy Leaf1.
2624
2625        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
2626
2627        let port = pick_unused_port().unwrap();
2628        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2629
2630        app.register_module(
2631            "availability",
2632            define_api(
2633                &Default::default(),
2634                StaticVersion::<0, 1> {},
2635                "0.0.1".parse().unwrap(),
2636            )
2637            .unwrap(),
2638        )
2639        .unwrap();
2640
2641        network.spawn(
2642            "server",
2643            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2644        );
2645
2646        let db = TmpDb::init().await;
2647        let provider = Provider::new(QueryServiceProvider::new(
2648            format!("http://localhost:{port}/v0").parse().unwrap(),
2649            StaticVersion::<0, 1> {},
2650        ));
2651        let ds = data_source(&db, &provider).await;
2652
2653        network.start().await;
2654
2655        let leaves = network.data_source().subscribe_leaves(1).await;
2656        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2657        let test_leaf = &leaves[0];
2658        let test_payload = &leaves[2];
2659        let test_common = &leaves[3];
2660
2661        let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2662        let payload = ds.get_payload(test_payload.height() as usize).await;
2663        let common = ds.get_vid_common(test_common.height() as usize).await;
2664
2665        sleep(Duration::from_secs(3)).await;
2666
2667        // fetches fail because of different commitments
2668        leaf.try_resolve().unwrap_err();
2669        payload.try_resolve().unwrap_err();
2670        common.try_resolve().unwrap_err();
2671    }
2672}