hotshot_query_service/fetching/provider/
query_service.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16    data::{VidCommitment, VidCommon, ns_table},
17    traits::{EncodeBytes, block_contents::BlockHeader, node_implementation::NodeType},
18    vid::{
19        advz::{ADVZScheme, advz_scheme},
20        avidm::{AvidMScheme, init_avidm_param},
21        avidm_gf2::AvidmGf2Scheme,
22    },
23};
24use jf_advz::VidScheme;
25use surf_disco::{Client, Url};
26use vbs::{BinarySerializer, version::StaticVersionType};
27
28use super::Provider;
29use crate::{
30    Error, Header, Payload,
31    availability::{
32        ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
33        PayloadQueryData, VidCommonQueryData,
34    },
35    fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
36    types::HeightIndexed,
37};
38
39/// Data availability provider backed by another instance of this query service.
40///
41/// This fetcher implements the [`Provider`] interface by querying the REST API provided by another
42/// instance of this query service to try and retrieve missing objects.
43#[derive(Clone, Debug)]
44pub struct QueryServiceProvider<Ver: StaticVersionType> {
45    client: Client<Error, Ver>,
46}
47
48impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
49    pub fn new(url: Url, _: Ver) -> Self {
50        Self {
51            client: Client::new(url),
52        }
53    }
54}
55
56impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
57    async fn deserialize_legacy_payload<Types: NodeType>(
58        &self,
59        payload_bytes: Vec<u8>,
60        common_bytes: Vec<u8>,
61        req: PayloadRequest,
62    ) -> Option<Payload<Types>> {
63        let client_url = self.client.base_url();
64
65        let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
66            return None;
67        };
68
69        let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
70            &payload_bytes,
71        ) {
72            Ok(payload) => payload,
73            Err(err) => {
74                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
75                return None;
76            },
77        };
78
79        let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
80            &common_bytes,
81        ) {
82            Ok(common) => common,
83            Err(err) => {
84                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
85                return None;
86            },
87        };
88
89        let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
90        let bytes = payload.data.encode();
91
92        let commit = advz_scheme(num_storage_nodes)
93            .commit_only(bytes)
94            .inspect_err(|err| {
95                tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
96            })
97            .ok()?;
98
99        if commit != advz_commit {
100            tracing::error!(
101                ?req,
102                expected_commit=%advz_commit,
103                actual_commit=%commit,
104                %client_url,
105                "received inconsistent legacy payload"
106            );
107            return None;
108        }
109
110        Some(payload.data)
111    }
112
113    async fn deserialize_legacy_vid_common<Types: NodeType>(
114        &self,
115        bytes: Vec<u8>,
116        req: VidCommonRequest,
117    ) -> Option<VidCommon> {
118        let client_url = self.client.base_url();
119        let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
120            return None;
121        };
122
123        match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
124            Ok(res) => {
125                if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
126                    Some(VidCommon::V0(res.common))
127                } else {
128                    tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
129                    None
130                }
131            },
132            Err(err) => {
133                tracing::warn!(
134                    %client_url,
135                    ?req,
136                    %err,
137                    "failed to deserialize ADVZCommonQueryData"
138                );
139                None
140            },
141        }
142    }
143    async fn deserialize_legacy_leaf<Types: NodeType>(
144        &self,
145        bytes: Vec<u8>,
146        req: LeafRequest<Types>,
147    ) -> Option<LeafQueryData<Types>> {
148        let client_url = self.client.base_url();
149
150        match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
151            Ok(mut leaf) => {
152                if leaf.height() != req.height {
153                    tracing::error!(
154                        %client_url, ?req,
155                        expected_height = req.height,
156                        actual_height = leaf.height(),
157                        "received leaf with the wrong height"
158                    );
159                    return None;
160                }
161
162                let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
163                let actual_leaf_commit: [u8; 32] = leaf.hash().into();
164                if actual_leaf_commit != expected_leaf_commit {
165                    tracing::error!(
166                        %client_url, ?req,
167                        expected_leaf = %req.expected_leaf,
168                        actual_leaf = %leaf.hash(),
169                        "received leaf with the wrong hash"
170                    );
171                    return None;
172                }
173
174                let expected_qc_commit: [u8; 32] = req.expected_qc.into();
175                let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
176                if actual_qc_commit != expected_qc_commit {
177                    tracing::error!(
178                        %client_url, ?req,
179                        expected_qc = %req.expected_qc,
180                        actual_qc = %leaf.qc().commit(),
181                        "received leaf with the wrong QC"
182                    );
183                    return None;
184                }
185
186                // There is a potential DOS attack where the peer sends us a leaf with the full
187                // payload in it, which uses redundant resources in the database, since we fetch and
188                // store payloads separately. We can defend ourselves by simply dropping the payload
189                // if present.
190                leaf.leaf.unfill_block_payload();
191
192                Some(leaf.into())
193            },
194            Err(err) => {
195                tracing::warn!(
196                    %client_url, ?req, %err,
197                    "failed to deserialize legacy LeafQueryData"
198                );
199                None
200            },
201        }
202    }
203}
204
205#[async_trait]
206impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
207where
208    Types: NodeType,
209{
210    /// Fetches the `Payload` for a given request.
211    ///
212    /// Attempts to fetch and deserialize the requested data using the new type first.
213    /// If deserialization into the new type fails (e.g., because the provider is still returning
214    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
215    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
216    ///
217    async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
218        let client_url = self.client.base_url();
219        let req_hash = req.0;
220        // Fetch the payload and the VID common data. We need the common data to recompute the VID
221        // commitment, to ensure the payload we received is consistent with the commitment we
222        // requested.
223        let payload_bytes = self
224            .client
225            .get::<()>(&format!("availability/payload/hash/{}", req.0))
226            .bytes()
227            .await
228            .inspect_err(|err| {
229                tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
230            })
231            .ok()?;
232
233        let common_bytes = self
234            .client
235            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
236            .bytes()
237            .await
238            .inspect_err(|err| {
239                tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
240            })
241            .ok()?;
242
243        let payload =
244            vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
245                .inspect_err(|err| {
246                    tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
247                })
248                .ok();
249
250        let common =
251            vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
252                .inspect_err(|err| {
253                    tracing::info!(%err, %req_hash,
254                        "error deserializing VidCommonQueryData",
255                    );
256                })
257                .ok();
258
259        let (payload, common) = match (payload, common) {
260            (Some(payload), Some(common)) => (payload, common),
261            _ => {
262                tracing::info!(%req_hash, "falling back to legacy payload deserialization");
263
264                // fallback deserialization
265                return self
266                    .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
267                    .await;
268            },
269        };
270
271        match common.common() {
272            VidCommon::V0(common) => {
273                let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
274                let bytes = payload.data().encode();
275
276                let commit = advz_scheme(num_storage_nodes)
277                    .commit_only(bytes)
278                    .map(VidCommitment::V0)
279                    .inspect_err(|err| {
280                        tracing::error!(%err, %req_hash,  %num_storage_nodes, "failed to compute VID commitment (V0)");
281                    })
282                    .ok()?;
283
284                if commit != req.0 {
285                    tracing::error!(
286                        expected = %req_hash,
287                        actual = ?commit,
288                        %client_url,
289                        "VID commitment mismatch (V0)"
290                    );
291
292                    return None;
293                }
294            },
295            VidCommon::V1(common) => {
296                let bytes = payload.data().encode();
297
298                let avidm_param = init_avidm_param(common.total_weights)
299                    .inspect_err(|err| {
300                        tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
301                    })
302                    .ok()?;
303
304                let header = self
305                    .client
306                    .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
307                    .send()
308                    .await
309                    .inspect_err(|err| {
310                        tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
311                    })
312                    .ok()?;
313
314                if header.payload_commitment() != req.0 {
315                    tracing::error!(
316                        expected = %req_hash,
317                        actual = %header.payload_commitment(),
318                        %client_url,
319                        "header payload commitment mismatch (V1, AvidM)"
320                    );
321                    return None;
322                }
323
324                let metadata = header.metadata().encode();
325                let commit = AvidMScheme::commit(
326                    &avidm_param,
327                    &bytes,
328                    ns_table::parse_ns_table(bytes.len(), &metadata),
329                )
330                .map(VidCommitment::V1)
331                .inspect_err(|err| {
332                    tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
333                })
334                .ok()?;
335
336                // Compare calculated commitment with requested commitment
337                if commit != req.0 {
338                    tracing::warn!(
339                        expected = %req_hash,
340                        actual = %commit,
341                        %client_url,
342                        "commitment type mismatch for AVIDM check"
343                    );
344                    return None;
345                }
346            },
347            VidCommon::V2(common) => {
348                let bytes = payload.data().encode();
349
350                let header = self
351                    .client
352                    .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
353                    .send()
354                    .await
355                    .inspect_err(|err| {
356                        tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
357                    })
358                    .ok()?;
359
360                if header.payload_commitment() != req.0 {
361                    tracing::error!(
362                        expected = %req_hash,
363                        actual = %header.payload_commitment(),
364                        %client_url,
365                        "header payload commitment mismatch (V2, AvidmGf2)"
366                    );
367                    return None;
368                }
369
370                let metadata = header.metadata().encode();
371                let commit = AvidmGf2Scheme::commit(
372                    &common.param,
373                    &bytes,
374                    ns_table::parse_ns_table(bytes.len(), &metadata),
375                )
376                .map(|(commit, _)| VidCommitment::V2(commit))
377                .inspect_err(|err| {
378                    tracing::error!(%err, %req_hash, "failed to compute AvidmGf2 commitment");
379                })
380                .ok()?;
381
382                // Compare calculated commitment with requested commitment
383                if commit != req.0 {
384                    tracing::warn!(
385                        expected = %req_hash,
386                        actual = %commit,
387                        %client_url,
388                        "commitment type mismatch for AvidmGf2 check"
389                    );
390                    return None;
391                }
392            },
393        }
394
395        Some(payload.data)
396    }
397}
398
399#[async_trait]
400impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
401    for QueryServiceProvider<Ver>
402where
403    Types: NodeType,
404{
405    /// Fetches the `Leaf` for a given request.
406    ///
407    /// Attempts to fetch and deserialize the requested data using the new type first.
408    /// If deserialization into the new type fails (e.g., because the provider is still returning
409    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
410    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
411    ///
412    async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
413        let client_url = self.client.base_url();
414
415        let bytes = self
416            .client
417            .get::<()>(&format!("availability/leaf/{}", req.height))
418            .bytes()
419            .await;
420        let bytes = match bytes {
421            Ok(bytes) => bytes,
422            Err(err) => {
423                tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
424
425                return None;
426            },
427        };
428
429        // Attempt to deserialize using the new type
430
431        match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
432            Ok(mut leaf) => {
433                if leaf.height() != req.height {
434                    tracing::error!(
435                        %client_url, ?req, ?leaf,
436                        expected_height = req.height,
437                        actual_height = leaf.height(),
438                        "received leaf with the wrong height"
439                    );
440                    return None;
441                }
442                if leaf.hash() != req.expected_leaf {
443                    tracing::error!(
444                        %client_url, ?req, ?leaf,
445                        expected_hash = %req.expected_leaf,
446                        actual_hash = %leaf.hash(),
447                        "received leaf with the wrong hash"
448                    );
449                    return None;
450                }
451                if leaf.qc().commit() != req.expected_qc {
452                    tracing::error!(
453                        %client_url, ?req, ?leaf,
454                        expected_qc = %req.expected_qc,
455                        actual_qc = %leaf.qc().commit(),
456                        "received leaf with the wrong QC"
457                    );
458                    return None;
459                }
460
461                // There is a potential DOS attack where the peer sends us a leaf with the full
462                // payload in it, which uses redundant resources in the database, since we fetch and
463                // store payloads separately. We can defend ourselves by simply dropping the payload
464                // if present.
465                leaf.leaf.unfill_block_payload();
466
467                Some(leaf)
468            },
469            Err(err) => {
470                tracing::info!(
471                    ?req, %err,
472                    "failed to deserialize LeafQueryData, falling back to legacy deserialization"
473                );
474                // Fallback deserialization
475                self.deserialize_legacy_leaf(bytes, req).await
476            },
477        }
478    }
479}
480
481#[async_trait]
482impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
483where
484    Types: NodeType,
485{
486    /// Fetches the `VidCommon` for a given request.
487    ///
488    /// Attempts to fetch and deserialize the requested data using the new type first.
489    /// If deserialization into the new type fails (e.g., because the provider is still returning
490    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
491    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
492    ///
493    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
494        let client_url = self.client.base_url();
495        let bytes = self
496            .client
497            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
498            .bytes()
499            .await;
500        let bytes = match bytes {
501            Ok(bytes) => bytes,
502            Err(err) => {
503                tracing::info!(
504                    %client_url, ?req, %err,
505                    "failed to fetch VID common bytes"
506                );
507                return None;
508            },
509        };
510
511        match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
512            Ok(res) => {
513                if !res.common().is_consistent(&req.0) {
514                    tracing::error!(
515                        %client_url, ?req, ?res.common,
516                        "fetched inconsistent VID common data"
517                    );
518                    return None;
519                }
520                Some(res.common)
521            },
522            Err(err) => {
523                tracing::info!(
524                    %client_url, ?req, %err,
525                    "failed to deserialize as V1 VID common data, trying legacy fallback"
526                );
527                // Fallback deserialization
528                self.deserialize_legacy_vid_common::<Types>(bytes, req)
529                    .await
530            },
531        }
532    }
533}
534
535// These tests run the `postgres` Docker image, which doesn't work on Windows.
536#[cfg(all(test, not(target_os = "windows")))]
537mod test {
538    use std::{future::IntoFuture, time::Duration};
539
540    use committable::Committable;
541    use futures::{
542        future::{FutureExt, join},
543        stream::StreamExt,
544    };
545    // generic-array 0.14.x is deprecated, but VidCommitment requires this version
546    // for From<Output<H>> impl in jf-merkle-tree
547    #[allow(deprecated)]
548    use generic_array::GenericArray;
549    use hotshot_example_types::node_types::{EpochVersion, TEST_VERSIONS};
550    use rand::RngCore;
551    use test_utils::reserve_tcp_port;
552    use tide_disco::{App, error::ServerError};
553    use vbs::version::StaticVersion;
554
555    use super::*;
556    use crate::{
557        ApiState,
558        api::load_api,
559        availability::{
560            AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
561            Fetch, UpdateAvailabilityData, define_api,
562        },
563        data_source::{
564            AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
565            sql::{self, SqlDataSource},
566            storage::{
567                AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
568                fail_storage::{FailStorage, FailableAction},
569                pruning::{PrunedHeightStorage, PrunerCfg},
570                sql::testing::TmpDb,
571            },
572        },
573        fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
574        node::{SyncStatusQueryData, data_source::NodeDataSource},
575        task::BackgroundTask,
576        testing::{
577            consensus::{MockDataSource, MockNetwork},
578            mocks::{MockBase, MockTypes, mock_transaction},
579            sleep,
580        },
581        types::HeightIndexed,
582    };
583
584    type Provider = TestProvider<QueryServiceProvider<MockBase>>;
585    type EpochProvider = TestProvider<QueryServiceProvider<EpochVersion>>;
586
587    fn ignore<T>(_: T) {}
588
589    /// Build a data source suitable for this suite of tests.
590    async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
591        db: &TmpDb,
592        provider: &P,
593    ) -> sql::Builder<MockTypes, P> {
594        db.config()
595            .builder((*provider).clone())
596            .await
597            .unwrap()
598            // We disable proactive fetching for these tests, since we are intending to test on
599            // demand fetching, and proactive fetching could lead to false successes.
600            .disable_proactive_fetching()
601    }
602
603    /// A data source suitable for this suite of tests, with the default options.
604    async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
605        db: &TmpDb,
606        provider: &P,
607    ) -> SqlDataSource<MockTypes, P> {
608        builder(db, provider).await.build().await.unwrap()
609    }
610
611    #[test_log::test(tokio::test(flavor = "multi_thread"))]
612    async fn test_fetch_on_request() {
613        // Create the consensus network.
614        let mut network = MockNetwork::<MockDataSource>::init().await;
615
616        // Start a web server that the non-consensus node can use to fetch blocks.
617        let port = reserve_tcp_port().unwrap();
618        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
619        app.register_module(
620            "availability",
621            define_api(
622                &Default::default(),
623                MockBase::instance(),
624                "1.0.0".parse().unwrap(),
625            )
626            .unwrap(),
627        )
628        .unwrap();
629        network.spawn(
630            "server",
631            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
632        );
633
634        // Start a data source which is not receiving events from consensus, only from a peer.
635        let db = TmpDb::init().await;
636        let provider = Provider::new(QueryServiceProvider::new(
637            format!("http://localhost:{port}").parse().unwrap(),
638            MockBase::instance(),
639        ));
640        let data_source = data_source(&db, &provider).await;
641
642        // Start consensus.
643        network.start().await;
644
645        // Wait until the block height reaches 6. This gives us the genesis block, one additional
646        // block at the end, and then one block to play around with fetching each type of resource:
647        // * Leaf
648        // * Block
649        // * Payload
650        // * VID common
651        let leaves = network.data_source().subscribe_leaves(1).await;
652        let leaves = leaves.take(5).collect::<Vec<_>>().await;
653        let test_leaf = &leaves[0];
654        let test_block = &leaves[1];
655        let test_payload = &leaves[2];
656        let test_common = &leaves[3];
657
658        // Make requests for missing data that should _not_ trigger an active fetch:
659        tracing::info!("requesting unfetchable resources");
660        let mut fetches = vec![];
661        // * An unknown leaf hash.
662        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
663        // * An unknown leaf height.
664        fetches.push(
665            data_source
666                .get_leaf(test_leaf.height() as usize)
667                .await
668                .map(ignore),
669        );
670        // * An unknown block hash.
671        fetches.push(
672            data_source
673                .get_block(test_block.block_hash())
674                .await
675                .map(ignore),
676        );
677        fetches.push(
678            data_source
679                .get_payload(test_payload.block_hash())
680                .await
681                .map(ignore),
682        );
683        fetches.push(
684            data_source
685                .get_vid_common(test_common.block_hash())
686                .await
687                .map(ignore),
688        );
689        // * An unknown block height.
690        fetches.push(
691            data_source
692                .get_block(test_block.height() as usize)
693                .await
694                .map(ignore),
695        );
696        fetches.push(
697            data_source
698                .get_payload(test_payload.height() as usize)
699                .await
700                .map(ignore),
701        );
702        fetches.push(
703            data_source
704                .get_vid_common(test_common.height() as usize)
705                .await
706                .map(ignore),
707        );
708        // * Genesis VID common (no VID for genesis)
709        fetches.push(data_source.get_vid_common(0).await.map(ignore));
710        // * An unknown transaction.
711        fetches.push(
712            data_source
713                .get_block_containing_transaction(mock_transaction(vec![]).commit())
714                .await
715                .map(ignore),
716        );
717
718        // Even if we give data extra time to propagate, these requests will not resolve, since we
719        // didn't trigger any active fetches.
720        sleep(Duration::from_secs(1)).await;
721        for (i, fetch) in fetches.into_iter().enumerate() {
722            tracing::info!("checking fetch {i} is unresolved");
723            fetch.try_resolve().unwrap_err();
724        }
725
726        // Now we will actually fetch the missing data. First, since our node is not really
727        // connected to consensus, we need to give it a leaf after the range of interest so it
728        // learns about the correct block height. We will temporarily lock requests to the provider
729        // so that we can verify that without the provider, the node does _not_ get the data.
730        provider.block().await;
731        data_source
732            .append(leaves.last().cloned().unwrap().into())
733            .await
734            .unwrap();
735
736        tracing::info!("requesting fetchable resources");
737        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
738        let req_block = data_source.get_block(test_block.height() as usize).await;
739        let req_payload = data_source
740            .get_payload(test_payload.height() as usize)
741            .await;
742        let req_common = data_source
743            .get_vid_common(test_common.height() as usize)
744            .await;
745
746        // Give the requests some extra time to complete, and check that they still haven't
747        // resolved, since the provider is blocked. This just ensures the integrity of the test by
748        // checking the node didn't mysteriously get the block from somewhere else, so that when we
749        // unblock the provider and the node finally gets the block, we know it came from the
750        // provider.
751        sleep(Duration::from_secs(1)).await;
752        req_leaf.try_resolve().unwrap_err();
753        req_block.try_resolve().unwrap_err();
754        req_payload.try_resolve().unwrap_err();
755        req_common.try_resolve().unwrap_err();
756
757        // Unblock the request and see that we eventually receive the data.
758        provider.unblock().await;
759        let leaf = data_source
760            .get_leaf(test_leaf.height() as usize)
761            .await
762            .await;
763        let block = data_source
764            .get_block(test_block.height() as usize)
765            .await
766            .await;
767        let payload = data_source
768            .get_payload(test_payload.height() as usize)
769            .await
770            .await;
771        let common = data_source
772            .get_vid_common(test_common.height() as usize)
773            .await
774            .await;
775        {
776            // Verify the data.
777            let truth = network.data_source();
778            assert_eq!(
779                leaf,
780                truth.get_leaf(test_leaf.height() as usize).await.await
781            );
782            assert_eq!(
783                block,
784                truth.get_block(test_block.height() as usize).await.await
785            );
786            assert_eq!(
787                payload,
788                truth
789                    .get_payload(test_payload.height() as usize)
790                    .await
791                    .await
792            );
793            assert_eq!(
794                common,
795                truth
796                    .get_vid_common(test_common.height() as usize)
797                    .await
798                    .await
799            );
800        }
801
802        // Fetching the block and payload should have also fetched the corresponding leaves, since
803        // we have an invariant that we should not store a block in the database without its
804        // corresponding leaf and header. Thus we should be able to get the leaves even if the
805        // provider is blocked.
806        provider.block().await;
807        for leaf in [test_block, test_payload] {
808            tracing::info!("fetching existing leaf {}", leaf.height());
809            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
810            assert_eq!(*leaf, fetched_leaf);
811        }
812
813        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
814        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
815        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
816        // with this hash exists, and trigger a fetch for it.
817        tracing::info!("fetching block by hash");
818        provider.unblock().await;
819        {
820            let block = data_source.get_block(test_leaf.block_hash()).await.await;
821            assert_eq!(block.hash(), leaf.block_hash());
822        }
823
824        // Test a similar scenario, but with payload instead of block: we are aware of
825        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
826        // hash.
827        tracing::info!("fetching payload by hash");
828        {
829            let leaf = leaves.last().unwrap();
830            let payload = data_source.get_payload(leaf.block_hash()).await.await;
831            assert_eq!(payload.height(), leaf.height());
832            assert_eq!(payload.block_hash(), leaf.block_hash());
833            assert_eq!(payload.hash(), leaf.payload_hash());
834        }
835    }
836
837    #[tokio::test(flavor = "multi_thread")]
838    async fn test_fetch_on_request_epoch_version() {
839        // This test verifies that our provider can handle fetching things by their hashes,
840        // specifically focused on epoch version transitions
841        tracing::info!("Starting test_fetch_on_request_epoch_version");
842
843        // Create the consensus network.
844        let mut network = MockNetwork::<MockDataSource>::init().await;
845
846        // Start a web server that the non-consensus node can use to fetch blocks.
847        let port = reserve_tcp_port().unwrap();
848        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
849        app.register_module(
850            "availability",
851            define_api(
852                &Default::default(),
853                EpochVersion::instance(),
854                "1.0.0".parse().unwrap(),
855            )
856            .unwrap(),
857        )
858        .unwrap();
859        network.spawn(
860            "server",
861            app.serve(format!("0.0.0.0:{port}"), EpochVersion::instance()),
862        );
863
864        // Start a data source which is not receiving events from consensus, only from a peer.
865        // Use our special test provider that handles epoch version transitions
866        let db = TmpDb::init().await;
867        let provider = EpochProvider::new(QueryServiceProvider::new(
868            format!("http://localhost:{port}").parse().unwrap(),
869            EpochVersion::instance(),
870        ));
871        let data_source = data_source(&db, &provider).await;
872
873        // Start consensus.
874        network.start().await;
875
876        // Wait until the block height reaches 6. This gives us the genesis block, one additional
877        // block at the end, and then one block to play around with fetching each type of resource:
878        // * Leaf
879        // * Block
880        // * Payload
881        // * VID common
882        let leaves = network.data_source().subscribe_leaves(1).await;
883        let leaves = leaves.take(5).collect::<Vec<_>>().await;
884        let test_leaf = &leaves[0];
885        let test_block = &leaves[1];
886        let test_payload = &leaves[2];
887        let test_common = &leaves[3];
888
889        // Make requests for missing data that should _not_ trigger an active fetch:
890        let mut fetches = vec![];
891        // * An unknown leaf hash.
892        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
893        // * An unknown leaf height.
894        fetches.push(
895            data_source
896                .get_leaf(test_leaf.height() as usize)
897                .await
898                .map(ignore),
899        );
900        // * An unknown block hash.
901        fetches.push(
902            data_source
903                .get_block(test_block.block_hash())
904                .await
905                .map(ignore),
906        );
907        fetches.push(
908            data_source
909                .get_payload(test_payload.block_hash())
910                .await
911                .map(ignore),
912        );
913        fetches.push(
914            data_source
915                .get_vid_common(test_common.block_hash())
916                .await
917                .map(ignore),
918        );
919        // * An unknown block height.
920        fetches.push(
921            data_source
922                .get_block(test_block.height() as usize)
923                .await
924                .map(ignore),
925        );
926        fetches.push(
927            data_source
928                .get_payload(test_payload.height() as usize)
929                .await
930                .map(ignore),
931        );
932        fetches.push(
933            data_source
934                .get_vid_common(test_common.height() as usize)
935                .await
936                .map(ignore),
937        );
938        // * Genesis VID common (no VID for genesis)
939        fetches.push(data_source.get_vid_common(0).await.map(ignore));
940        // * An unknown transaction.
941        fetches.push(
942            data_source
943                .get_block_containing_transaction(mock_transaction(vec![]).commit())
944                .await
945                .map(ignore),
946        );
947
948        // Even if we give data extra time to propagate, these requests will not resolve, since we
949        // didn't trigger any active fetches.
950        sleep(Duration::from_secs(1)).await;
951        for (i, fetch) in fetches.into_iter().enumerate() {
952            tracing::info!("checking fetch {i} is unresolved");
953            fetch.try_resolve().unwrap_err();
954        }
955
956        // Now we will actually fetch the missing data. First, since our node is not really
957        // connected to consensus, we need to give it a leaf after the range of interest so it
958        // learns about the correct block height. We will temporarily lock requests to the provider
959        // so that we can verify that without the provider, the node does _not_ get the data.
960        provider.block().await;
961        data_source
962            .append(leaves.last().cloned().unwrap().into())
963            .await
964            .unwrap();
965
966        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
967        let req_block = data_source.get_block(test_block.height() as usize).await;
968        let req_payload = data_source
969            .get_payload(test_payload.height() as usize)
970            .await;
971        let req_common = data_source
972            .get_vid_common(test_common.height() as usize)
973            .await;
974
975        // Give the requests some extra time to complete, and check that they still haven't
976        // resolved, since the provider is blocked. This just ensures the integrity of the test by
977        // checking the node didn't mysteriously get the block from somewhere else, so that when we
978        // unblock the provider and the node finally gets the block, we know it came from the
979        // provider.
980        sleep(Duration::from_secs(1)).await;
981        req_leaf.try_resolve().unwrap_err();
982        req_block.try_resolve().unwrap_err();
983        req_payload.try_resolve().unwrap_err();
984        req_common.try_resolve().unwrap_err();
985
986        // Unblock the request and see that we eventually receive the data.
987        provider.unblock().await;
988        let leaf = data_source
989            .get_leaf(test_leaf.height() as usize)
990            .await
991            .await;
992        let block = data_source
993            .get_block(test_block.height() as usize)
994            .await
995            .await;
996        let payload = data_source
997            .get_payload(test_payload.height() as usize)
998            .await
999            .await;
1000        let common = data_source
1001            .get_vid_common(test_common.height() as usize)
1002            .await
1003            .await;
1004        {
1005            // Verify the data.
1006            let truth = network.data_source();
1007            assert_eq!(
1008                leaf,
1009                truth.get_leaf(test_leaf.height() as usize).await.await
1010            );
1011            assert_eq!(
1012                block,
1013                truth.get_block(test_block.height() as usize).await.await
1014            );
1015            assert_eq!(
1016                payload,
1017                truth
1018                    .get_payload(test_payload.height() as usize)
1019                    .await
1020                    .await
1021            );
1022            assert_eq!(
1023                common,
1024                truth
1025                    .get_vid_common(test_common.height() as usize)
1026                    .await
1027                    .await
1028            );
1029        }
1030
1031        // Fetching the block and payload should have also fetched the corresponding leaves, since
1032        // we have an invariant that we should not store a block in the database without its
1033        // corresponding leaf and header. Thus we should be able to get the leaves even if the
1034        // provider is blocked.
1035        provider.block().await;
1036        for leaf in [test_block, test_payload] {
1037            tracing::info!("fetching existing leaf {}", leaf.height());
1038            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1039            assert_eq!(*leaf, fetched_leaf);
1040        }
1041
1042        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
1043        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
1044        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
1045        // with this hash exists, and trigger a fetch for it.
1046        provider.unblock().await;
1047        {
1048            let block = data_source.get_block(test_leaf.block_hash()).await.await;
1049            assert_eq!(block.hash(), leaf.block_hash());
1050        }
1051
1052        // Test a similar scenario, but with payload instead of block: we are aware of
1053        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
1054        // hash.
1055        {
1056            let leaf = leaves.last().unwrap();
1057            let payload = data_source.get_payload(leaf.block_hash()).await.await;
1058            assert_eq!(payload.height(), leaf.height());
1059            assert_eq!(payload.block_hash(), leaf.block_hash());
1060            assert_eq!(payload.hash(), leaf.payload_hash());
1061        }
1062
1063        // Add more debug logs throughout the test
1064        tracing::info!("Test completed successfully!");
1065    }
1066
1067    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1068    async fn test_fetch_block_and_leaf_concurrently() {
1069        // Create the consensus network.
1070        let mut network = MockNetwork::<MockDataSource>::init().await;
1071
1072        // Start a web server that the non-consensus node can use to fetch blocks.
1073        let port = reserve_tcp_port().unwrap();
1074        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1075        app.register_module(
1076            "availability",
1077            define_api(
1078                &Default::default(),
1079                MockBase::instance(),
1080                "1.0.0".parse().unwrap(),
1081            )
1082            .unwrap(),
1083        )
1084        .unwrap();
1085        network.spawn(
1086            "server",
1087            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1088        );
1089
1090        // Start a data source which is not receiving events from consensus, only from a peer.
1091        let db = TmpDb::init().await;
1092        let provider = Provider::new(QueryServiceProvider::new(
1093            format!("http://localhost:{port}").parse().unwrap(),
1094            MockBase::instance(),
1095        ));
1096        let data_source = data_source(&db, &provider).await;
1097
1098        // Start consensus.
1099        network.start().await;
1100
1101        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1102        // block at the end, and then one block that we can use to test fetching.
1103        let leaves = network.data_source().subscribe_leaves(1).await;
1104        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1105        let test_leaf = &leaves[0];
1106
1107        // Tell the node about a leaf after the one of interest so it learns about the block height.
1108        data_source.append(leaves[1].clone().into()).await.unwrap();
1109
1110        // Fetch a leaf and the corresponding block at the same time. This will result in two tasks
1111        // trying to fetch the same leaf, but one should win and notify the other, which ultimately
1112        // ends up not fetching anything.
1113        let (leaf, block) = join(
1114            data_source
1115                .get_leaf(test_leaf.height() as usize)
1116                .await
1117                .into_future(),
1118            data_source
1119                .get_block(test_leaf.height() as usize)
1120                .await
1121                .into_future(),
1122        )
1123        .await;
1124        assert_eq!(leaf, *test_leaf);
1125        assert_eq!(leaf.header(), block.header());
1126    }
1127
1128    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1129    async fn test_fetch_different_blocks_same_payload() {
1130        // Create the consensus network.
1131        let mut network = MockNetwork::<MockDataSource>::init().await;
1132
1133        // Start a web server that the non-consensus node can use to fetch blocks.
1134        let port = reserve_tcp_port().unwrap();
1135        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1136        app.register_module(
1137            "availability",
1138            define_api(
1139                &Default::default(),
1140                MockBase::instance(),
1141                "1.0.0".parse().unwrap(),
1142            )
1143            .unwrap(),
1144        )
1145        .unwrap();
1146        network.spawn(
1147            "server",
1148            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1149        );
1150
1151        // Start a data source which is not receiving events from consensus, only from a peer.
1152        let db = TmpDb::init().await;
1153        let provider = Provider::new(QueryServiceProvider::new(
1154            format!("http://localhost:{port}").parse().unwrap(),
1155            MockBase::instance(),
1156        ));
1157        let data_source = data_source(&db, &provider).await;
1158
1159        // Start consensus.
1160        network.start().await;
1161
1162        // Wait until the block height reaches 4. This gives us the genesis block, one additional
1163        // block at the end, and then two blocks that we can use to test fetching.
1164        let leaves = network.data_source().subscribe_leaves(1).await;
1165        let leaves = leaves.take(4).collect::<Vec<_>>().await;
1166
1167        // Tell the node about a leaf after the range of interest so it learns about the block
1168        // height.
1169        data_source
1170            .append(leaves.last().cloned().unwrap().into())
1171            .await
1172            .unwrap();
1173
1174        // All the blocks here are empty, so they have the same payload:
1175        assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1176        // If we fetch both blocks at the same time, we can check that we haven't broken anything
1177        // with whatever optimizations we add to deduplicate payload fetching.
1178        let (block1, block2) = join(
1179            data_source
1180                .get_block(leaves[0].height() as usize)
1181                .await
1182                .into_future(),
1183            data_source
1184                .get_block(leaves[1].height() as usize)
1185                .await
1186                .into_future(),
1187        )
1188        .await;
1189        assert_eq!(block1.header(), leaves[0].header());
1190        assert_eq!(block2.header(), leaves[1].header());
1191    }
1192
1193    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1194    async fn test_fetch_stream() {
1195        // Create the consensus network.
1196        let mut network = MockNetwork::<MockDataSource>::init().await;
1197
1198        // Start a web server that the non-consensus node can use to fetch blocks.
1199        let port = reserve_tcp_port().unwrap();
1200        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1201        app.register_module(
1202            "availability",
1203            define_api(
1204                &Default::default(),
1205                MockBase::instance(),
1206                "1.0.0".parse().unwrap(),
1207            )
1208            .unwrap(),
1209        )
1210        .unwrap();
1211        network.spawn(
1212            "server",
1213            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1214        );
1215
1216        // Start a data source which is not receiving events from consensus, only from a peer.
1217        let db = TmpDb::init().await;
1218        let provider = Provider::new(QueryServiceProvider::new(
1219            format!("http://localhost:{port}").parse().unwrap(),
1220            MockBase::instance(),
1221        ));
1222        let data_source = data_source(&db, &provider).await;
1223
1224        // Start consensus.
1225        network.start().await;
1226
1227        // Subscribe to objects from the future.
1228        let blocks = data_source.subscribe_blocks(0).await;
1229        let leaves = data_source.subscribe_leaves(0).await;
1230        let common = data_source.subscribe_vid_common(0).await;
1231
1232        // Wait for a few blocks to be finalized.
1233        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1234        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1235
1236        // Tell the node about a leaf after the range of interest so it learns about the block
1237        // height.
1238        data_source
1239            .append(finalized_leaves.last().cloned().unwrap().into())
1240            .await
1241            .unwrap();
1242
1243        // Check the subscriptions.
1244        let blocks = blocks.take(5).collect::<Vec<_>>().await;
1245        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1246        let common = common.take(5).collect::<Vec<_>>().await;
1247        for i in 0..5 {
1248            tracing::info!("checking block {i}");
1249            assert_eq!(leaves[i], finalized_leaves[i]);
1250            assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1251            assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1252        }
1253    }
1254
1255    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1256    async fn test_fetch_range_start() {
1257        // Create the consensus network.
1258        let mut network = MockNetwork::<MockDataSource>::init().await;
1259
1260        // Start a web server that the non-consensus node can use to fetch blocks.
1261        let port = reserve_tcp_port().unwrap();
1262        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1263        app.register_module(
1264            "availability",
1265            define_api(
1266                &Default::default(),
1267                MockBase::instance(),
1268                "1.0.0".parse().unwrap(),
1269            )
1270            .unwrap(),
1271        )
1272        .unwrap();
1273        network.spawn(
1274            "server",
1275            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1276        );
1277
1278        // Start a data source which is not receiving events from consensus, only from a peer.
1279        let db = TmpDb::init().await;
1280        let provider = Provider::new(QueryServiceProvider::new(
1281            format!("http://localhost:{port}").parse().unwrap(),
1282            MockBase::instance(),
1283        ));
1284        let data_source = data_source(&db, &provider).await;
1285
1286        // Start consensus.
1287        network.start().await;
1288
1289        // Wait for a few blocks to be finalized.
1290        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1291        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1292
1293        // Tell the node about a leaf after the range of interest (so it learns about the block
1294        // height) and one in the middle of the range, to test what happens when data is missing
1295        // from the beginning of the range but other data is available.
1296        let mut tx = data_source.write().await.unwrap();
1297        tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1298        tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1299        tx.commit().await.unwrap();
1300
1301        // Get the whole range of leaves.
1302        let leaves = data_source
1303            .get_leaf_range(..5)
1304            .await
1305            .then(Fetch::resolve)
1306            .collect::<Vec<_>>()
1307            .await;
1308        for i in 0..5 {
1309            tracing::info!("checking leaf {i}");
1310            assert_eq!(leaves[i], finalized_leaves[i]);
1311        }
1312    }
1313
1314    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1315    async fn fetch_transaction() {
1316        // Create the consensus network.
1317        let mut network = MockNetwork::<MockDataSource>::init().await;
1318
1319        // Start a web server that the non-consensus node can use to fetch blocks.
1320        let port = reserve_tcp_port().unwrap();
1321        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1322        app.register_module(
1323            "availability",
1324            define_api(
1325                &Default::default(),
1326                MockBase::instance(),
1327                "1.0.0".parse().unwrap(),
1328            )
1329            .unwrap(),
1330        )
1331        .unwrap();
1332        network.spawn(
1333            "server",
1334            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1335        );
1336
1337        // Start a data source which is not receiving events from consensus. We don't give it a
1338        // fetcher since transactions are always fetched passively anyways.
1339        let db = TmpDb::init().await;
1340        let data_source = data_source(&db, &NoFetching).await;
1341
1342        // Subscribe to blocks.
1343        let mut leaves = network.data_source().subscribe_leaves(1).await;
1344        let mut blocks = network.data_source().subscribe_blocks(1).await;
1345
1346        // Start consensus.
1347        network.start().await;
1348
1349        // Subscribe to a transaction which hasn't been sequenced yet. This is completely passive
1350        // and works without a fetcher; we don't trigger fetches for transactions that we don't know
1351        // exist.
1352        let tx = mock_transaction(vec![1, 2, 3]);
1353        let fut = data_source
1354            .get_block_containing_transaction(tx.commit())
1355            .await;
1356
1357        // Sequence the transaction.
1358        network.submit_transaction(tx.clone()).await;
1359
1360        // Send blocks to the query service, the future will resolve as soon as it sees a block
1361        // containing the transaction.
1362        let block = loop {
1363            let leaf = leaves.next().await.unwrap();
1364            let block = blocks.next().await.unwrap();
1365
1366            data_source
1367                .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1368                .await
1369                .unwrap();
1370
1371            if block.transaction_by_hash(tx.commit()).is_some() {
1372                break block;
1373            }
1374        };
1375        tracing::info!("transaction included in block {}", block.height());
1376
1377        let fetched_tx = fut.await;
1378        assert_eq!(
1379            fetched_tx,
1380            BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1381        );
1382
1383        // Future queries for this transaction resolve immediately.
1384        assert_eq!(
1385            fetched_tx,
1386            data_source
1387                .get_block_containing_transaction(tx.commit())
1388                .await
1389                .await
1390        );
1391    }
1392
1393    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1394    async fn test_retry() {
1395        // Create the consensus network.
1396        let mut network = MockNetwork::<MockDataSource>::init().await;
1397
1398        // Start a web server that the non-consensus node can use to fetch blocks.
1399        let port = reserve_tcp_port().unwrap();
1400        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1401        app.register_module(
1402            "availability",
1403            define_api(
1404                &Default::default(),
1405                MockBase::instance(),
1406                "1.0.0".parse().unwrap(),
1407            )
1408            .unwrap(),
1409        )
1410        .unwrap();
1411        network.spawn(
1412            "server",
1413            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1414        );
1415
1416        // Start a data source which is not receiving events from consensus.
1417        let db = TmpDb::init().await;
1418        let provider = Provider::new(QueryServiceProvider::new(
1419            format!("http://localhost:{port}").parse().unwrap(),
1420            MockBase::instance(),
1421        ));
1422        let data_source = builder(&db, &provider)
1423            .await
1424            .with_max_retry_interval(Duration::from_secs(1))
1425            .build()
1426            .await
1427            .unwrap();
1428
1429        // Start consensus.
1430        network.start().await;
1431
1432        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1433        // block at the end, and one block to try fetching.
1434        let leaves = network.data_source().subscribe_leaves(1).await;
1435        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1436        let test_leaf = &leaves[0];
1437
1438        // Cause requests to fail temporarily, so we can test retries.
1439        provider.fail();
1440
1441        // Give the node a leaf after the range of interest so it learns about the correct block
1442        // height.
1443        data_source
1444            .append(leaves.last().cloned().unwrap().into())
1445            .await
1446            .unwrap();
1447
1448        tracing::info!("requesting leaf from failing providers");
1449        let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1450
1451        // Wait a few retries and check that the request has not completed, since the provider is
1452        // failing.
1453        sleep(Duration::from_secs(5)).await;
1454        fut.try_resolve().unwrap_err();
1455
1456        // As soon as the provider recovers, the request can complete.
1457        provider.unfail();
1458        assert_eq!(
1459            data_source
1460                .get_leaf(test_leaf.height() as usize)
1461                .await
1462                .await,
1463            *test_leaf
1464        );
1465    }
1466
1467    // Uses deprecated generic-array 0.14.x via digest, required for VidCommitment compatibility
1468    #[allow(deprecated)]
1469    fn random_vid_commit() -> VidCommitment {
1470        let mut bytes = [0; 32];
1471        rand::thread_rng().fill_bytes(&mut bytes);
1472        VidCommitment::V0(GenericArray::from(bytes).into())
1473    }
1474
1475    async fn malicious_server(port: u16) {
1476        let mut api = load_api::<(), ServerError, MockBase>(
1477            None::<std::path::PathBuf>,
1478            include_str!("../../../api/availability.toml"),
1479            vec![],
1480        )
1481        .unwrap();
1482
1483        api.get("get_payload", move |_, _| {
1484            async move {
1485                // No matter what data we are asked for, always respond with dummy data.
1486                Ok(PayloadQueryData::<MockTypes>::genesis(
1487                    &Default::default(),
1488                    &Default::default(),
1489                    TEST_VERSIONS.test.base,
1490                )
1491                .await)
1492            }
1493            .boxed()
1494        })
1495        .unwrap()
1496        .get("get_vid_common", move |_, _| {
1497            async move {
1498                // No matter what data we are asked for, always respond with dummy data.
1499                Ok(VidCommonQueryData::<MockTypes>::genesis(
1500                    &Default::default(),
1501                    &Default::default(),
1502                    TEST_VERSIONS.test.base,
1503                )
1504                .await)
1505            }
1506            .boxed()
1507        })
1508        .unwrap();
1509
1510        let mut app = App::<(), ServerError>::with_state(());
1511        app.register_module("availability", api).unwrap();
1512        app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1513            .await
1514            .ok();
1515    }
1516
1517    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1518    async fn test_fetch_from_malicious_server() {
1519        let port = reserve_tcp_port().unwrap();
1520        let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1521
1522        let provider = QueryServiceProvider::new(
1523            format!("http://localhost:{port}").parse().unwrap(),
1524            MockBase::instance(),
1525        );
1526        provider.client.connect(None).await;
1527
1528        // Query for a random payload, the server will respond with a different payload, and we
1529        // should detect the error.
1530        let res =
1531            ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1532                .await;
1533        assert_eq!(res, None);
1534
1535        // Query for a random VID common, the server will respond with a different one, and we
1536        // should detect the error.
1537        let res =
1538            ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1539                .await;
1540        assert_eq!(res, None);
1541    }
1542
1543    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1544    async fn test_archive_recovery() {
1545        // Create the consensus network.
1546        let mut network = MockNetwork::<MockDataSource>::init().await;
1547
1548        // Start a web server that the non-consensus node can use to fetch blocks.
1549        let port = reserve_tcp_port().unwrap();
1550        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1551        app.register_module(
1552            "availability",
1553            define_api(
1554                &Default::default(),
1555                MockBase::instance(),
1556                "1.0.0".parse().unwrap(),
1557            )
1558            .unwrap(),
1559        )
1560        .unwrap();
1561        network.spawn(
1562            "server",
1563            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1564        );
1565
1566        // Start a data source which is not receiving events from consensus, only from a peer. The
1567        // data source is at first configured to aggressively prune data.
1568        let db = TmpDb::init().await;
1569        let provider = Provider::new(QueryServiceProvider::new(
1570            format!("http://localhost:{port}").parse().unwrap(),
1571            MockBase::instance(),
1572        ));
1573        let mut data_source = db
1574            .config()
1575            .pruner_cfg(
1576                PrunerCfg::new()
1577                    .with_target_retention(Duration::from_secs(0))
1578                    .with_interval(Duration::from_secs(5)),
1579            )
1580            .unwrap()
1581            .builder(provider.clone())
1582            .await
1583            .unwrap()
1584            // Set a fast retry for failed operations. Occasionally storage operations will fail due
1585            // to conflicting write-mode transactions running concurrently. This is ok as they will
1586            // be retried. Having a fast retry interval speeds up the test.
1587            .with_min_retry_interval(Duration::from_millis(100))
1588            // Randomize retries a lot. This will temporarlly separate competing transactions write
1589            // transactions with high probability, so that one of them quickly gets exclusive access
1590            // to the database.
1591            .with_retry_randomization_factor(3.)
1592            .build()
1593            .await
1594            .unwrap();
1595
1596        // Start consensus.
1597        network.start().await;
1598
1599        // Wait until a few blocks are produced.
1600        let leaves = network.data_source().subscribe_leaves(1).await;
1601        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1602
1603        // The disconnected data source has no data yet, so it hasn't done any pruning.
1604        let pruned_height = data_source
1605            .read()
1606            .await
1607            .unwrap()
1608            .load_pruned_height()
1609            .await
1610            .unwrap();
1611        // Either None or 0 is acceptable, depending on whether or not the prover has run yet.
1612        assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1613
1614        // Send the last leaf to the disconnected data source so it learns about the height and
1615        // fetches the missing data.
1616        let last_leaf = leaves.last().unwrap();
1617        data_source.append(last_leaf.clone().into()).await.unwrap();
1618
1619        // Trigger a fetch of each leaf so the database gets populated.
1620        for i in 1..=last_leaf.height() {
1621            tracing::info!(i, "fetching leaf");
1622            assert_eq!(
1623                data_source.get_leaf(i as usize).await.await,
1624                leaves[i as usize - 1]
1625            );
1626        }
1627
1628        // After a bit of time, the pruner has run and deleted all the missing data we just fetched.
1629        loop {
1630            let pruned_height = data_source
1631                .read()
1632                .await
1633                .unwrap()
1634                .load_pruned_height()
1635                .await
1636                .unwrap();
1637            if pruned_height == Some(last_leaf.height()) {
1638                break;
1639            }
1640            tracing::info!(
1641                ?pruned_height,
1642                target_height = last_leaf.height(),
1643                "waiting for pruner to run"
1644            );
1645            sleep(Duration::from_secs(1)).await;
1646        }
1647
1648        // Now close the data source and restart it with archive recovery.
1649        data_source = db
1650            .config()
1651            .archive()
1652            .builder(provider.clone())
1653            .await
1654            .unwrap()
1655            .with_proactive_interval(Duration::from_secs(1))
1656            .build()
1657            .await
1658            .unwrap();
1659
1660        // Pruned height should be reset.
1661        let pruned_height = data_source
1662            .read()
1663            .await
1664            .unwrap()
1665            .load_pruned_height()
1666            .await
1667            .unwrap();
1668        assert_eq!(pruned_height, None);
1669
1670        // The node has pruned all of it's data including the latest block, so it's forgotten the
1671        // block height. We need to give it another leaf with some height so it will be willing to
1672        // fetch.
1673        data_source.append(last_leaf.clone().into()).await.unwrap();
1674
1675        // Wait for the data to be restored. It should be restored by the next major scan.
1676        loop {
1677            let sync_status = data_source.sync_status().await.unwrap();
1678
1679            // VID shares are unique to a node and will never be fetched from a peer; this is
1680            // acceptable since there is redundancy built into the VID scheme. Ignore missing VID
1681            // shares in the `is_fully_synced` check.
1682            if (SyncStatusQueryData {
1683                vid_shares: Default::default(),
1684                ..sync_status.clone()
1685            })
1686            .is_fully_synced()
1687            {
1688                break;
1689            }
1690            tracing::info!(?sync_status, "waiting for node to sync");
1691            sleep(Duration::from_secs(1)).await;
1692        }
1693
1694        // The node remains fully synced even after some time; no pruning.
1695        sleep(Duration::from_secs(3)).await;
1696        let sync_status = data_source.sync_status().await.unwrap();
1697        assert!(
1698            (SyncStatusQueryData {
1699                vid_shares: Default::default(),
1700                ..sync_status.clone()
1701            })
1702            .is_fully_synced(),
1703            "{sync_status:#?}"
1704        );
1705    }
1706
1707    #[derive(Clone, Copy, Debug)]
1708    enum FailureType {
1709        Begin,
1710        Write,
1711        Commit,
1712    }
1713
1714    async fn test_fetch_storage_failure_helper(failure: FailureType) {
1715        // Create the consensus network.
1716        let mut network = MockNetwork::<MockDataSource>::init().await;
1717
1718        // Start a web server that the non-consensus node can use to fetch blocks.
1719        let port = reserve_tcp_port().unwrap();
1720        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1721        app.register_module(
1722            "availability",
1723            define_api(
1724                &Default::default(),
1725                MockBase::instance(),
1726                "1.0.0".parse().unwrap(),
1727            )
1728            .unwrap(),
1729        )
1730        .unwrap();
1731        network.spawn(
1732            "server",
1733            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1734        );
1735
1736        // Start a data source which is not receiving events from consensus, only from a peer.
1737        let provider = Provider::new(QueryServiceProvider::new(
1738            format!("http://localhost:{port}").parse().unwrap(),
1739            MockBase::instance(),
1740        ));
1741        let db = TmpDb::init().await;
1742        let storage = FailStorage::from(
1743            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1744                .await
1745                .unwrap(),
1746        );
1747        let data_source = FetchingDataSource::builder(storage, provider)
1748            .disable_proactive_fetching()
1749            .disable_aggregator()
1750            .with_max_retry_interval(Duration::from_millis(100))
1751            .with_retry_timeout(Duration::from_secs(1))
1752            .build()
1753            .await
1754            .unwrap();
1755
1756        // Start consensus.
1757        network.start().await;
1758
1759        // Wait until a couple of blocks are produced.
1760        let leaves = network.data_source().subscribe_leaves(1).await;
1761        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1762
1763        // Send the last leaf to the disconnected data source so it learns about the height.
1764        let last_leaf = leaves.last().unwrap();
1765        let mut tx = data_source.write().await.unwrap();
1766        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1767        tx.commit().await.unwrap();
1768
1769        // Trigger a fetch of the first leaf; it should resolve even if we fail to store the leaf.
1770        tracing::info!("fetch with write failure");
1771        match failure {
1772            FailureType::Begin => {
1773                data_source
1774                    .as_ref()
1775                    .fail_begins_writable(FailableAction::Any)
1776                    .await
1777            },
1778            FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1779            FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1780        }
1781        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1782        data_source.as_ref().pass().await;
1783
1784        // It is possible that the fetch above completes, notifies the subscriber,
1785        // and the fetch below quickly subscribes and gets notified by the same loop.
1786        // We add a delay here to avoid this situation.
1787        // This is not a bug, as being notified after subscribing is fine.
1788        sleep(Duration::from_secs(1)).await;
1789
1790        // We can get the same leaf again, this will again trigger an active fetch since storage
1791        // failed the first time.
1792        tracing::info!("fetch with write success");
1793        let fetch = data_source.get_leaf(1).await;
1794        assert!(fetch.is_pending());
1795        assert_eq!(leaves[0], fetch.await);
1796
1797        sleep(Duration::from_secs(1)).await;
1798
1799        // Finally, we should have the leaf locally and not need to fetch it.
1800        tracing::info!("retrieve from storage");
1801        let fetch = data_source.get_leaf(1).await;
1802        assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1803    }
1804
1805    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1806    async fn test_fetch_storage_failure_on_begin() {
1807        test_fetch_storage_failure_helper(FailureType::Begin).await;
1808    }
1809
1810    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1811    async fn test_fetch_storage_failure_on_write() {
1812        test_fetch_storage_failure_helper(FailureType::Write).await;
1813    }
1814
1815    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1816    async fn test_fetch_storage_failure_on_commit() {
1817        test_fetch_storage_failure_helper(FailureType::Commit).await;
1818    }
1819
1820    async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1821        // Create the consensus network.
1822        let mut network = MockNetwork::<MockDataSource>::init().await;
1823
1824        // Start a web server that the non-consensus node can use to fetch blocks.
1825        let port = reserve_tcp_port().unwrap();
1826        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1827        app.register_module(
1828            "availability",
1829            define_api(
1830                &Default::default(),
1831                MockBase::instance(),
1832                "1.0.0".parse().unwrap(),
1833            )
1834            .unwrap(),
1835        )
1836        .unwrap();
1837        network.spawn(
1838            "server",
1839            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1840        );
1841
1842        // Start a data source which is not receiving events from consensus, only from a peer.
1843        let provider = Provider::new(QueryServiceProvider::new(
1844            format!("http://localhost:{port}").parse().unwrap(),
1845            MockBase::instance(),
1846        ));
1847        let db = TmpDb::init().await;
1848        let storage = FailStorage::from(
1849            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1850                .await
1851                .unwrap(),
1852        );
1853        let data_source = FetchingDataSource::builder(storage, provider)
1854            .disable_proactive_fetching()
1855            .disable_aggregator()
1856            .with_min_retry_interval(Duration::from_millis(100))
1857            .build()
1858            .await
1859            .unwrap();
1860
1861        // Start consensus.
1862        network.start().await;
1863
1864        // Wait until a couple of blocks are produced.
1865        let leaves = network.data_source().subscribe_leaves(1).await;
1866        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1867
1868        // Send the last leaf to the disconnected data source so it learns about the height.
1869        let last_leaf = leaves.last().unwrap();
1870        let mut tx = data_source.write().await.unwrap();
1871        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1872        tx.commit().await.unwrap();
1873
1874        // Trigger a fetch of the first leaf; it should retry until it successfully stores the leaf.
1875        tracing::info!("fetch with write failure");
1876        match failure {
1877            FailureType::Begin => {
1878                data_source
1879                    .as_ref()
1880                    .fail_one_begin_writable(FailableAction::Any)
1881                    .await
1882            },
1883            FailureType::Write => {
1884                data_source
1885                    .as_ref()
1886                    .fail_one_write(FailableAction::Any)
1887                    .await
1888            },
1889            FailureType::Commit => {
1890                data_source
1891                    .as_ref()
1892                    .fail_one_commit(FailableAction::Any)
1893                    .await
1894            },
1895        }
1896        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1897
1898        // Check that the leaf ended up in local storage.
1899        let mut tx = data_source.read().await.unwrap();
1900        assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1901    }
1902
1903    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1904    async fn test_fetch_storage_failure_retry_on_begin() {
1905        test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1906    }
1907
1908    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1909    async fn test_fetch_storage_failure_retry_on_write() {
1910        test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1911    }
1912
1913    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1914    async fn test_fetch_storage_failure_retry_on_commit() {
1915        test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1916    }
1917
1918    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1919    async fn test_fetch_on_decide() {
1920        // Create the consensus network.
1921        let mut network = MockNetwork::<MockDataSource>::init().await;
1922
1923        // Start a web server that the non-consensus node can use to fetch blocks.
1924        let port = reserve_tcp_port().unwrap();
1925        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1926        app.register_module(
1927            "availability",
1928            define_api(
1929                &Default::default(),
1930                MockBase::instance(),
1931                "1.0.0".parse().unwrap(),
1932            )
1933            .unwrap(),
1934        )
1935        .unwrap();
1936        network.spawn(
1937            "server",
1938            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1939        );
1940
1941        // Start a data source which is not receiving events from consensus.
1942        let db = TmpDb::init().await;
1943        let provider = Provider::new(QueryServiceProvider::new(
1944            format!("http://localhost:{port}").parse().unwrap(),
1945            MockBase::instance(),
1946        ));
1947        let data_source = builder(&db, &provider)
1948            .await
1949            .with_max_retry_interval(Duration::from_secs(1))
1950            .build()
1951            .await
1952            .unwrap();
1953
1954        // Start consensus.
1955        network.start().await;
1956
1957        // Wait until a block has been decided.
1958        let leaf = network
1959            .data_source()
1960            .subscribe_leaves(1)
1961            .await
1962            .next()
1963            .await
1964            .unwrap();
1965
1966        // Give the node a decide containing the leaf but no additional information.
1967        data_source.append(leaf.clone().into()).await.unwrap();
1968
1969        // We will eventually retrieve the corresponding block and VID common, triggered by seeing
1970        // the leaf.
1971        sleep(Duration::from_secs(5)).await;
1972
1973        // Read the missing data directly from storage (via a database transaction), rather than
1974        // going through the data source, so that this request itself does not trigger a fetch.
1975        // Thus, this will only work if the data was already fetched, triggered by the leaf.
1976        let mut tx = data_source.read().await.unwrap();
1977        let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1978        let block = tx.get_block(id).await.unwrap();
1979        let vid = tx.get_vid_common(id).await.unwrap();
1980
1981        assert_eq!(block.hash(), leaf.block_hash());
1982        assert_eq!(vid.block_hash(), leaf.block_hash());
1983    }
1984
1985    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1986    async fn test_fetch_begin_failure() {
1987        // Create the consensus network.
1988        let mut network = MockNetwork::<MockDataSource>::init().await;
1989
1990        // Start a web server that the non-consensus node can use to fetch blocks.
1991        let port = reserve_tcp_port().unwrap();
1992        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1993        app.register_module(
1994            "availability",
1995            define_api(
1996                &Default::default(),
1997                MockBase::instance(),
1998                "1.0.0".parse().unwrap(),
1999            )
2000            .unwrap(),
2001        )
2002        .unwrap();
2003        network.spawn(
2004            "server",
2005            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2006        );
2007
2008        // Start a data source which is not receiving events from consensus, only from a peer.
2009        let provider = Provider::new(QueryServiceProvider::new(
2010            format!("http://localhost:{port}").parse().unwrap(),
2011            MockBase::instance(),
2012        ));
2013        let db = TmpDb::init().await;
2014        let storage = FailStorage::from(
2015            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2016                .await
2017                .unwrap(),
2018        );
2019        let data_source = FetchingDataSource::builder(storage, provider)
2020            .disable_proactive_fetching()
2021            .disable_aggregator()
2022            .with_min_retry_interval(Duration::from_millis(100))
2023            .build()
2024            .await
2025            .unwrap();
2026
2027        // Start consensus.
2028        network.start().await;
2029
2030        // Wait until a couple of blocks are produced.
2031        let leaves = network.data_source().subscribe_leaves(1).await;
2032        let leaves = leaves.take(2).collect::<Vec<_>>().await;
2033
2034        // Send the last leaf to the disconnected data source so it learns about the height.
2035        let last_leaf = leaves.last().unwrap();
2036        let mut tx = data_source.write().await.unwrap();
2037        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2038        tx.commit().await.unwrap();
2039
2040        // Trigger a fetch of the first leaf; it should retry until it is able to determine
2041        // the leaf is fetchable and trigger the fetch.
2042        tracing::info!("fetch with transaction failure");
2043        data_source
2044            .as_ref()
2045            .fail_one_begin_read_only(FailableAction::Any)
2046            .await;
2047        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2048    }
2049
2050    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2051    async fn test_fetch_load_failure_block() {
2052        // Create the consensus network.
2053        let mut network = MockNetwork::<MockDataSource>::init().await;
2054
2055        // Start a web server that the non-consensus node can use to fetch blocks.
2056        let port = reserve_tcp_port().unwrap();
2057        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2058        app.register_module(
2059            "availability",
2060            define_api(
2061                &Default::default(),
2062                MockBase::instance(),
2063                "1.0.0".parse().unwrap(),
2064            )
2065            .unwrap(),
2066        )
2067        .unwrap();
2068        network.spawn(
2069            "server",
2070            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2071        );
2072
2073        // Start a data source which is not receiving events from consensus, only from a peer.
2074        let provider = Provider::new(QueryServiceProvider::new(
2075            format!("http://localhost:{port}").parse().unwrap(),
2076            MockBase::instance(),
2077        ));
2078        let db = TmpDb::init().await;
2079        let storage = FailStorage::from(
2080            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2081                .await
2082                .unwrap(),
2083        );
2084        let data_source = FetchingDataSource::builder(storage, provider)
2085            .disable_proactive_fetching()
2086            .disable_aggregator()
2087            .with_min_retry_interval(Duration::from_millis(100))
2088            .build()
2089            .await
2090            .unwrap();
2091
2092        // Start consensus.
2093        network.start().await;
2094
2095        // Wait until a block is produced.
2096        let mut leaves = network.data_source().subscribe_leaves(1).await;
2097        let leaf = leaves.next().await.unwrap();
2098
2099        // Send the leaf to the disconnected data source, so the corresponding block becomes
2100        // fetchable.
2101        let mut tx = data_source.write().await.unwrap();
2102        tx.insert_leaf(leaf.clone()).await.unwrap();
2103        tx.commit().await.unwrap();
2104
2105        // Trigger a fetch of the block by hash; it should retry until it is able to determine the
2106        // leaf is available, thus the block is fetchable, trigger the fetch.
2107        //
2108        // Failing only on the `get_header` call here hits an edge case which is only possible when
2109        // fetching blocks: we successfully determine that the object is not available locally and
2110        // that it might exist, so we actually call `active_fetch` to try and get it. If we then
2111        // fail to load the header and erroneously treat this as the header not being available, we
2112        // will give up and consider the object unfetchable (since the next step would be to fetch
2113        // the corresponding leaf, but we cannot do this with just a block hash).
2114        //
2115        // Thus, this test will only pass if we correctly retry the `active_fetch` until we are
2116        // successfully able to load the header from storage and determine that the block is
2117        // fetchable.
2118        tracing::info!("fetch with read failure");
2119        data_source
2120            .as_ref()
2121            .fail_one_read(FailableAction::GetHeader)
2122            .await;
2123        let fetch = data_source.get_block(leaf.block_hash()).await;
2124
2125        // Give some time for a few reads to fail before letting them succeed.
2126        sleep(Duration::from_secs(2)).await;
2127        data_source.as_ref().pass().await;
2128
2129        let block: BlockQueryData<MockTypes> = fetch.await;
2130        assert_eq!(block.hash(), leaf.block_hash());
2131    }
2132
2133    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2134    async fn test_fetch_load_failure_tx() {
2135        // Create the consensus network.
2136        let mut network = MockNetwork::<MockDataSource>::init().await;
2137
2138        // Start a web server that the non-consensus node can use to fetch blocks.
2139        let port = reserve_tcp_port().unwrap();
2140        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2141        app.register_module(
2142            "availability",
2143            define_api(
2144                &Default::default(),
2145                MockBase::instance(),
2146                "1.0.0".parse().unwrap(),
2147            )
2148            .unwrap(),
2149        )
2150        .unwrap();
2151        network.spawn(
2152            "server",
2153            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2154        );
2155
2156        // Start a data source which is not receiving events from consensus, only from a peer.
2157        let provider = Provider::new(QueryServiceProvider::new(
2158            format!("http://localhost:{port}").parse().unwrap(),
2159            MockBase::instance(),
2160        ));
2161        let db = TmpDb::init().await;
2162        let storage = FailStorage::from(
2163            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2164                .await
2165                .unwrap(),
2166        );
2167        let data_source = FetchingDataSource::builder(storage, provider)
2168            .disable_proactive_fetching()
2169            .disable_aggregator()
2170            .with_min_retry_interval(Duration::from_millis(100))
2171            .build()
2172            .await
2173            .unwrap();
2174
2175        // Start consensus.
2176        network.start().await;
2177
2178        // Wait until a transaction is sequenced.
2179        let tx = mock_transaction(vec![1, 2, 3]);
2180        network.submit_transaction(tx.clone()).await;
2181        let tx = network
2182            .data_source()
2183            .get_block_containing_transaction(tx.commit())
2184            .await
2185            .await;
2186
2187        // Send the block containing the transaction to the disconnected data source.
2188        {
2189            let leaf = network
2190                .data_source()
2191                .get_leaf(tx.transaction.block_height() as usize)
2192                .await
2193                .await;
2194            let block = network
2195                .data_source()
2196                .get_block(tx.transaction.block_height() as usize)
2197                .await
2198                .await;
2199            let mut tx = data_source.write().await.unwrap();
2200            tx.insert_leaf(leaf.clone()).await.unwrap();
2201            tx.insert_block(block.clone()).await.unwrap();
2202            tx.commit().await.unwrap();
2203        }
2204
2205        // Check that the transaction is there.
2206        tracing::info!("fetch success");
2207        assert_eq!(
2208            tx,
2209            data_source
2210                .get_block_containing_transaction(tx.transaction.hash())
2211                .await
2212                .await
2213        );
2214
2215        // Fetch the transaction with storage failures.
2216        //
2217        // Failing only one read here hits an edge case that only exists for unfetchable objects
2218        // (e.g. transactions). This will cause the initial aload of the transaction to fail, but,
2219        // if we erroneously treat this load failure as the transaction being missing, we will
2220        // succeed in calling `fetch`, since subsequent loads succeed. However, since a transaction
2221        // is not active-fetchable, no active fetch will actually be spawned, and this fetch will
2222        // never resolve.
2223        //
2224        // Thus, the test should only pass if we correctly retry the initial load until it succeeds
2225        // and we discover that the transaction doesn't need to be fetched at all.
2226        tracing::info!("fetch with read failure");
2227        data_source
2228            .as_ref()
2229            .fail_one_read(FailableAction::Any)
2230            .await;
2231        let fetch = data_source
2232            .get_block_containing_transaction(tx.transaction.hash())
2233            .await;
2234
2235        assert_eq!(tx, fetch.await);
2236    }
2237
2238    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2239    async fn test_stream_begin_failure() {
2240        // Create the consensus network.
2241        let mut network = MockNetwork::<MockDataSource>::init().await;
2242
2243        // Start a web server that the non-consensus node can use to fetch blocks.
2244        let port = reserve_tcp_port().unwrap();
2245        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2246        app.register_module(
2247            "availability",
2248            define_api(
2249                &Default::default(),
2250                MockBase::instance(),
2251                "1.0.0".parse().unwrap(),
2252            )
2253            .unwrap(),
2254        )
2255        .unwrap();
2256        network.spawn(
2257            "server",
2258            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2259        );
2260
2261        // Start a data source which is not receiving events from consensus, only from a peer.
2262        let provider = Provider::new(QueryServiceProvider::new(
2263            format!("http://localhost:{port}").parse().unwrap(),
2264            MockBase::instance(),
2265        ));
2266        let db = TmpDb::init().await;
2267        let storage = FailStorage::from(
2268            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2269                .await
2270                .unwrap(),
2271        );
2272        let data_source = FetchingDataSource::builder(storage, provider)
2273            .disable_proactive_fetching()
2274            .disable_aggregator()
2275            .with_min_retry_interval(Duration::from_millis(100))
2276            .with_range_chunk_size(3)
2277            .build()
2278            .await
2279            .unwrap();
2280
2281        // Start consensus.
2282        network.start().await;
2283
2284        // Wait until a few blocks are produced.
2285        let leaves = network.data_source().subscribe_leaves(1).await;
2286        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2287
2288        // Send the last leaf to the disconnected data source so it learns about the height.
2289        let last_leaf = leaves.last().unwrap();
2290        let mut tx = data_source.write().await.unwrap();
2291        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2292        tx.commit().await.unwrap();
2293
2294        // Stream the leaves; it should retry until it is able to determine each leaf is fetchable
2295        // and trigger the fetch.
2296        tracing::info!("stream with transaction failure");
2297        data_source
2298            .as_ref()
2299            .fail_one_begin_read_only(FailableAction::Any)
2300            .await;
2301        assert_eq!(
2302            leaves,
2303            data_source
2304                .subscribe_leaves(1)
2305                .await
2306                .take(5)
2307                .collect::<Vec<_>>()
2308                .await
2309        );
2310    }
2311
2312    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2313    async fn test_stream_load_failure() {
2314        // Create the consensus network.
2315        let mut network = MockNetwork::<MockDataSource>::init().await;
2316
2317        // Start a web server that the non-consensus node can use to fetch blocks.
2318        let port = reserve_tcp_port().unwrap();
2319        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2320        app.register_module(
2321            "availability",
2322            define_api(
2323                &Default::default(),
2324                MockBase::instance(),
2325                "1.0.0".parse().unwrap(),
2326            )
2327            .unwrap(),
2328        )
2329        .unwrap();
2330        network.spawn(
2331            "server",
2332            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2333        );
2334
2335        // Start a data source which is not receiving events from consensus, only from a peer.
2336        let provider = Provider::new(QueryServiceProvider::new(
2337            format!("http://localhost:{port}").parse().unwrap(),
2338            MockBase::instance(),
2339        ));
2340        let db = TmpDb::init().await;
2341        let storage = FailStorage::from(
2342            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2343                .await
2344                .unwrap(),
2345        );
2346        let data_source = FetchingDataSource::builder(storage, provider)
2347            .disable_proactive_fetching()
2348            .disable_aggregator()
2349            .with_min_retry_interval(Duration::from_millis(100))
2350            .with_range_chunk_size(3)
2351            .build()
2352            .await
2353            .unwrap();
2354
2355        // Start consensus.
2356        network.start().await;
2357
2358        // Wait until a few blocks are produced.
2359        let leaves = network.data_source().subscribe_leaves(1).await;
2360        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2361
2362        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2363        let last_leaf = leaves.last().unwrap();
2364        let mut tx = data_source.write().await.unwrap();
2365        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2366        tx.commit().await.unwrap();
2367
2368        // Stream the blocks with a period of database failures.
2369        tracing::info!("stream with read failure");
2370        data_source.as_ref().fail_reads(FailableAction::Any).await;
2371        let fetches = data_source
2372            .get_block_range(1..=5)
2373            .await
2374            .collect::<Vec<_>>()
2375            .await;
2376
2377        // Give some time for a few reads to fail before letting them succeed.
2378        sleep(Duration::from_secs(2)).await;
2379        data_source.as_ref().pass().await;
2380
2381        for (leaf, fetch) in leaves.iter().zip(fetches) {
2382            let block: BlockQueryData<MockTypes> = fetch.await;
2383            assert_eq!(block.hash(), leaf.block_hash());
2384        }
2385    }
2386
2387    enum MetadataType {
2388        Payload,
2389        Vid,
2390    }
2391
2392    async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2393        // Create the consensus network.
2394        let mut network = MockNetwork::<MockDataSource>::init().await;
2395
2396        // Start a web server that the non-consensus node can use to fetch blocks.
2397        let port = reserve_tcp_port().unwrap();
2398        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2399        app.register_module(
2400            "availability",
2401            define_api(
2402                &Default::default(),
2403                MockBase::instance(),
2404                "1.0.0".parse().unwrap(),
2405            )
2406            .unwrap(),
2407        )
2408        .unwrap();
2409        network.spawn(
2410            "server",
2411            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2412        );
2413
2414        // Start a data source which is not receiving events from consensus, only from a peer.
2415        let provider = Provider::new(QueryServiceProvider::new(
2416            format!("http://localhost:{port}").parse().unwrap(),
2417            MockBase::instance(),
2418        ));
2419        let db = TmpDb::init().await;
2420        let storage = FailStorage::from(
2421            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2422                .await
2423                .unwrap(),
2424        );
2425        let data_source = FetchingDataSource::builder(storage, provider)
2426            .disable_proactive_fetching()
2427            .disable_aggregator()
2428            .with_min_retry_interval(Duration::from_millis(100))
2429            .with_range_chunk_size(3)
2430            .build()
2431            .await
2432            .unwrap();
2433
2434        // Start consensus.
2435        network.start().await;
2436
2437        // Wait until a few blocks are produced.
2438        let leaves = network.data_source().subscribe_leaves(1).await;
2439        let leaves = leaves.take(3).collect::<Vec<_>>().await;
2440
2441        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2442        let last_leaf = leaves.last().unwrap();
2443        let mut tx = data_source.write().await.unwrap();
2444        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2445        tx.commit().await.unwrap();
2446
2447        // Send the first object to the disconnected data source, so we hit all the cases:
2448        // * leaf present but not full object (from the last leaf)
2449        // * full object present but inaccessible due to storage failures (first object)
2450        // * nothing present (middle object)
2451        let leaf = network.data_source().get_leaf(1).await.await;
2452        let block = network.data_source().get_block(1).await.await;
2453        let vid = network.data_source().get_vid_common(1).await.await;
2454        data_source
2455            .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2456            .await
2457            .unwrap();
2458
2459        // Stream the objects with a period of database failures.
2460        tracing::info!("stream with transaction failure");
2461        data_source
2462            .as_ref()
2463            .fail_begins_read_only(FailableAction::Any)
2464            .await;
2465        match stream {
2466            MetadataType::Payload => {
2467                let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2468
2469                // Give some time for a few reads to fail before letting them succeed.
2470                sleep(Duration::from_secs(2)).await;
2471                tracing::info!("stop failing transactions");
2472                data_source.as_ref().pass().await;
2473
2474                let payloads = payloads.collect::<Vec<_>>().await;
2475                for (leaf, payload) in leaves.iter().zip(payloads) {
2476                    assert_eq!(payload.block_hash, leaf.block_hash());
2477                }
2478            },
2479            MetadataType::Vid => {
2480                let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2481
2482                // Give some time for a few reads to fail before letting them succeed.
2483                sleep(Duration::from_secs(2)).await;
2484                tracing::info!("stop failing transactions");
2485                data_source.as_ref().pass().await;
2486
2487                let vids = vids.collect::<Vec<_>>().await;
2488                for (leaf, vid) in leaves.iter().zip(vids) {
2489                    assert_eq!(vid.block_hash, leaf.block_hash());
2490                }
2491            },
2492        }
2493    }
2494
2495    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2496    async fn test_metadata_stream_begin_failure_payload() {
2497        test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2498    }
2499
2500    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2501    async fn test_metadata_stream_begin_failure_vid() {
2502        test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2503    }
2504
2505    // This helper function starts up a mock network
2506    // with v0 and v1 availability query modules,
2507    // trigger fetches for a datasource from the provider,
2508    // and asserts that the fetched data is correct
2509    async fn run_fallback_deserialization_test_helper(port: u16, version: &str) {
2510        let mut network = MockNetwork::<MockDataSource>::init().await;
2511
2512        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2513
2514        // Register availability APIs for two versions: v0 and v1
2515        app.register_module(
2516            "availability",
2517            define_api(
2518                &Default::default(),
2519                StaticVersion::<0, 1> {},
2520                "0.0.1".parse().unwrap(),
2521            )
2522            .unwrap(),
2523        )
2524        .unwrap();
2525
2526        app.register_module(
2527            "availability",
2528            define_api(
2529                &Default::default(),
2530                StaticVersion::<0, 1> {},
2531                "1.0.0".parse().unwrap(),
2532            )
2533            .unwrap(),
2534        )
2535        .unwrap();
2536
2537        network.spawn(
2538            "server",
2539            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2540        );
2541
2542        let db = TmpDb::init().await;
2543
2544        let provider_url = format!("http://localhost:{port}/{version}")
2545            .parse()
2546            .expect("Invalid URL");
2547
2548        let provider = Provider::new(QueryServiceProvider::new(
2549            provider_url,
2550            StaticVersion::<0, 1> {},
2551        ));
2552
2553        let ds = data_source(&db, &provider).await;
2554        network.start().await;
2555
2556        let leaves = network.data_source().subscribe_leaves(1).await;
2557        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2558        let test_leaf = &leaves[0];
2559        let test_payload = &leaves[2];
2560        let test_common = &leaves[3];
2561
2562        let mut fetches = vec![];
2563        // Issue requests for missing data (these should initially remain unresolved):
2564        fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2565        fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2566        fetches.push(
2567            ds.get_vid_common(test_common.block_hash())
2568                .await
2569                .map(ignore),
2570        );
2571
2572        // Even if we give data extra time to propagate, these requests will not resolve, since we
2573        // didn't trigger any active fetches.
2574        sleep(Duration::from_secs(1)).await;
2575        for (i, fetch) in fetches.into_iter().enumerate() {
2576            tracing::info!("checking fetch {i} is unresolved");
2577            fetch.try_resolve().unwrap_err();
2578        }
2579
2580        // Append the latest known leaf to the local store
2581        // This would trigger fetches for the corresponding missing data
2582        // such as header, vid and payload
2583        // This would also trigger fetches for the parent data
2584        ds.append(leaves.last().cloned().unwrap().into())
2585            .await
2586            .unwrap();
2587
2588        // check that the data has been fetches and matches the network data source
2589        {
2590            let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2591            let payload = ds.get_payload(test_payload.height() as usize).await;
2592            let common = ds.get_vid_common(test_common.height() as usize).await;
2593
2594            let truth = network.data_source();
2595            assert_eq!(
2596                leaf.await,
2597                truth.get_leaf(test_leaf.height() as usize).await.await
2598            );
2599            assert_eq!(
2600                payload.await,
2601                truth
2602                    .get_payload(test_payload.height() as usize)
2603                    .await
2604                    .await
2605            );
2606            assert_eq!(
2607                common.await,
2608                truth
2609                    .get_vid_common(test_common.height() as usize)
2610                    .await
2611                    .await
2612            );
2613        }
2614    }
2615
2616    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2617    async fn test_fallback_deserialization_for_fetch_requests_v0() {
2618        let port = reserve_tcp_port().unwrap();
2619
2620        // This run will call v0 availalbilty api for fetch requests.
2621        // The fetch initially attempts deserialization with new types,
2622        // which fails because the v0 provider returns legacy types.
2623        // It then falls back to deserializing as legacy types,
2624        // and the fetch passes
2625        run_fallback_deserialization_test_helper(port, "v0").await;
2626    }
2627
2628    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2629    async fn test_fallback_deserialization_for_fetch_requests_v1() {
2630        let port = reserve_tcp_port().unwrap();
2631
2632        // Fetch from the v1 availability API using MockVersions.
2633        // this one fetches from the v1 provider.
2634        // which would correctly deserialize the bytes in the first attempt, so no fallback deserialization is needed
2635        run_fallback_deserialization_test_helper(port, "v1").await;
2636    }
2637
2638    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2639    async fn test_fallback_deserialization_for_fetch_requests_pos() {
2640        let port = reserve_tcp_port().unwrap();
2641
2642        // Fetch Proof of Stake (PoS) data using the v1 availability API
2643        // with proof of stake version
2644        run_fallback_deserialization_test_helper(port, "v1").await;
2645    }
2646    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2647    async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2648        // Run with the PoS version against a v0 provider.
2649        // Fetch requests are expected to fail because PoS commitments differ from the legacy commitments
2650        // returned by the v0 provider.
2651        // For example: a PoS Leaf2 commitment will not match the downgraded commitment from a legacy Leaf1.
2652
2653        let mut network = MockNetwork::<MockDataSource>::init().await;
2654
2655        let port = reserve_tcp_port().unwrap();
2656        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2657
2658        app.register_module(
2659            "availability",
2660            define_api(
2661                &Default::default(),
2662                StaticVersion::<0, 1> {},
2663                "0.0.1".parse().unwrap(),
2664            )
2665            .unwrap(),
2666        )
2667        .unwrap();
2668
2669        network.spawn(
2670            "server",
2671            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2672        );
2673
2674        let db = TmpDb::init().await;
2675        let provider = Provider::new(QueryServiceProvider::new(
2676            format!("http://localhost:{port}/v0").parse().unwrap(),
2677            StaticVersion::<0, 1> {},
2678        ));
2679        let ds = data_source(&db, &provider).await;
2680
2681        network.start().await;
2682
2683        let leaves = network.data_source().subscribe_leaves(1).await;
2684        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2685        let test_leaf = &leaves[0];
2686        let test_payload = &leaves[2];
2687        let test_common = &leaves[3];
2688
2689        let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2690        let payload = ds.get_payload(test_payload.height() as usize).await;
2691        let common = ds.get_vid_common(test_common.height() as usize).await;
2692
2693        sleep(Duration::from_secs(3)).await;
2694
2695        // fetches fail because of different commitments
2696        leaf.try_resolve().unwrap_err();
2697        payload.try_resolve().unwrap_err();
2698        common.try_resolve().unwrap_err();
2699    }
2700}