hotshot_query_service/fetching/provider/
query_service.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16    data::{ns_table, VidCommitment, VidCommon},
17    traits::{block_contents::BlockHeader, node_implementation::NodeType, EncodeBytes},
18    vid::{
19        advz::{advz_scheme, ADVZScheme},
20        avidm::{init_avidm_param, AvidMScheme},
21        avidm_gf2::AvidmGf2Scheme,
22    },
23};
24use jf_advz::VidScheme;
25use surf_disco::{Client, Url};
26use vbs::{version::StaticVersionType, BinarySerializer};
27
28use super::Provider;
29use crate::{
30    availability::{
31        ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
32        PayloadQueryData, VidCommonQueryData,
33    },
34    fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
35    types::HeightIndexed,
36    Error, Header, Payload,
37};
38
39/// Data availability provider backed by another instance of this query service.
40///
41/// This fetcher implements the [`Provider`] interface by querying the REST API provided by another
42/// instance of this query service to try and retrieve missing objects.
43#[derive(Clone, Debug)]
44pub struct QueryServiceProvider<Ver: StaticVersionType> {
45    client: Client<Error, Ver>,
46}
47
48impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
49    pub fn new(url: Url, _: Ver) -> Self {
50        Self {
51            client: Client::new(url),
52        }
53    }
54}
55
56impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
57    async fn deserialize_legacy_payload<Types: NodeType>(
58        &self,
59        payload_bytes: Vec<u8>,
60        common_bytes: Vec<u8>,
61        req: PayloadRequest,
62    ) -> Option<Payload<Types>> {
63        let client_url = self.client.base_url();
64
65        let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
66            return None;
67        };
68
69        let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
70            &payload_bytes,
71        ) {
72            Ok(payload) => payload,
73            Err(err) => {
74                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
75                return None;
76            },
77        };
78
79        let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
80            &common_bytes,
81        ) {
82            Ok(common) => common,
83            Err(err) => {
84                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
85                return None;
86            },
87        };
88
89        let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
90        let bytes = payload.data.encode();
91
92        let commit = advz_scheme(num_storage_nodes)
93            .commit_only(bytes)
94            .inspect_err(|err| {
95                tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
96            })
97            .ok()?;
98
99        if commit != advz_commit {
100            tracing::error!(
101                ?req,
102                expected_commit=%advz_commit,
103                actual_commit=%commit,
104                %client_url,
105                "received inconsistent legacy payload"
106            );
107            return None;
108        }
109
110        Some(payload.data)
111    }
112
113    async fn deserialize_legacy_vid_common<Types: NodeType>(
114        &self,
115        bytes: Vec<u8>,
116        req: VidCommonRequest,
117    ) -> Option<VidCommon> {
118        let client_url = self.client.base_url();
119        let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
120            return None;
121        };
122
123        match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
124            Ok(res) => {
125                if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
126                    Some(VidCommon::V0(res.common))
127                } else {
128                    tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
129                    None
130                }
131            },
132            Err(err) => {
133                tracing::warn!(
134                    %client_url,
135                    ?req,
136                    %err,
137                    "failed to deserialize ADVZCommonQueryData"
138                );
139                None
140            },
141        }
142    }
143    async fn deserialize_legacy_leaf<Types: NodeType>(
144        &self,
145        bytes: Vec<u8>,
146        req: LeafRequest<Types>,
147    ) -> Option<LeafQueryData<Types>> {
148        let client_url = self.client.base_url();
149
150        match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
151            Ok(mut leaf) => {
152                if leaf.height() != req.height {
153                    tracing::error!(
154                        %client_url, ?req,
155                        expected_height = req.height,
156                        actual_height = leaf.height(),
157                        "received leaf with the wrong height"
158                    );
159                    return None;
160                }
161
162                let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
163                let actual_leaf_commit: [u8; 32] = leaf.hash().into();
164                if actual_leaf_commit != expected_leaf_commit {
165                    tracing::error!(
166                        %client_url, ?req,
167                        expected_leaf = %req.expected_leaf,
168                        actual_leaf = %leaf.hash(),
169                        "received leaf with the wrong hash"
170                    );
171                    return None;
172                }
173
174                let expected_qc_commit: [u8; 32] = req.expected_qc.into();
175                let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
176                if actual_qc_commit != expected_qc_commit {
177                    tracing::error!(
178                        %client_url, ?req,
179                        expected_qc = %req.expected_qc,
180                        actual_qc = %leaf.qc().commit(),
181                        "received leaf with the wrong QC"
182                    );
183                    return None;
184                }
185
186                // There is a potential DOS attack where the peer sends us a leaf with the full
187                // payload in it, which uses redundant resources in the database, since we fetch and
188                // store payloads separately. We can defend ourselves by simply dropping the payload
189                // if present.
190                leaf.leaf.unfill_block_payload();
191
192                Some(leaf.into())
193            },
194            Err(err) => {
195                tracing::warn!(
196                    %client_url, ?req, %err,
197                    "failed to deserialize legacy LeafQueryData"
198                );
199                None
200            },
201        }
202    }
203}
204
205#[async_trait]
206impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
207where
208    Types: NodeType,
209{
210    /// Fetches the `Payload` for a given request.
211    ///
212    /// Attempts to fetch and deserialize the requested data using the new type first.
213    /// If deserialization into the new type fails (e.g., because the provider is still returning
214    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
215    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
216    ///
217    async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
218        let client_url = self.client.base_url();
219        let req_hash = req.0;
220        // Fetch the payload and the VID common data. We need the common data to recompute the VID
221        // commitment, to ensure the payload we received is consistent with the commitment we
222        // requested.
223        let payload_bytes = self
224            .client
225            .get::<()>(&format!("availability/payload/hash/{}", req.0))
226            .bytes()
227            .await
228            .inspect_err(|err| {
229                tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
230            })
231            .ok()?;
232
233        let common_bytes = self
234            .client
235            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
236            .bytes()
237            .await
238            .inspect_err(|err| {
239                tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
240            })
241            .ok()?;
242
243        let payload =
244            vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
245                .inspect_err(|err| {
246                    tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
247                })
248                .ok();
249
250        let common =
251            vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
252                .inspect_err(|err| {
253                    tracing::info!(%err, %req_hash,
254                        "error deserializing VidCommonQueryData",
255                    );
256                })
257                .ok();
258
259        let (payload, common) = match (payload, common) {
260            (Some(payload), Some(common)) => (payload, common),
261            _ => {
262                tracing::info!(%req_hash, "falling back to legacy payload deserialization");
263
264                // fallback deserialization
265                return self
266                    .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
267                    .await;
268            },
269        };
270
271        match common.common() {
272            VidCommon::V0(common) => {
273                let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
274                let bytes = payload.data().encode();
275
276                let commit = advz_scheme(num_storage_nodes)
277                    .commit_only(bytes)
278                    .map(VidCommitment::V0)
279                    .inspect_err(|err| {
280                        tracing::error!(%err, %req_hash,  %num_storage_nodes, "failed to compute VID commitment (V0)");
281                    })
282                    .ok()?;
283
284                if commit != req.0 {
285                    tracing::error!(
286                        expected = %req_hash,
287                        actual = ?commit,
288                        %client_url,
289                        "VID commitment mismatch (V0)"
290                    );
291
292                    return None;
293                }
294            },
295            VidCommon::V1(common) => {
296                let bytes = payload.data().encode();
297
298                let avidm_param = init_avidm_param(common.total_weights)
299                    .inspect_err(|err| {
300                        tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
301                    })
302                    .ok()?;
303
304                let header = self
305                    .client
306                    .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
307                    .send()
308                    .await
309                    .inspect_err(|err| {
310                        tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
311                    })
312                    .ok()?;
313
314                if header.payload_commitment() != req.0 {
315                    tracing::error!(
316                        expected = %req_hash,
317                        actual = %header.payload_commitment(),
318                        %client_url,
319                        "header payload commitment mismatch (V1, AvidM)"
320                    );
321                    return None;
322                }
323
324                let metadata = header.metadata().encode();
325                let commit = AvidMScheme::commit(
326                    &avidm_param,
327                    &bytes,
328                    ns_table::parse_ns_table(bytes.len(), &metadata),
329                )
330                .map(VidCommitment::V1)
331                .inspect_err(|err| {
332                    tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
333                })
334                .ok()?;
335
336                // Compare calculated commitment with requested commitment
337                if commit != req.0 {
338                    tracing::warn!(
339                        expected = %req_hash,
340                        actual = %commit,
341                        %client_url,
342                        "commitment type mismatch for AVIDM check"
343                    );
344                    return None;
345                }
346            },
347            VidCommon::V2(common) => {
348                let bytes = payload.data().encode();
349
350                let header = self
351                    .client
352                    .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
353                    .send()
354                    .await
355                    .inspect_err(|err| {
356                        tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
357                    })
358                    .ok()?;
359
360                if header.payload_commitment() != req.0 {
361                    tracing::error!(
362                        expected = %req_hash,
363                        actual = %header.payload_commitment(),
364                        %client_url,
365                        "header payload commitment mismatch (V2, AvidmGf2)"
366                    );
367                    return None;
368                }
369
370                let metadata = header.metadata().encode();
371                let commit = AvidmGf2Scheme::commit(
372                    &common.param,
373                    &bytes,
374                    ns_table::parse_ns_table(bytes.len(), &metadata),
375                )
376                .map(|(commit, _)| VidCommitment::V2(commit))
377                .inspect_err(|err| {
378                    tracing::error!(%err, %req_hash, "failed to compute AvidmGf2 commitment");
379                })
380                .ok()?;
381
382                // Compare calculated commitment with requested commitment
383                if commit != req.0 {
384                    tracing::warn!(
385                        expected = %req_hash,
386                        actual = %commit,
387                        %client_url,
388                        "commitment type mismatch for AvidmGf2 check"
389                    );
390                    return None;
391                }
392            },
393        }
394
395        Some(payload.data)
396    }
397}
398
399#[async_trait]
400impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
401    for QueryServiceProvider<Ver>
402where
403    Types: NodeType,
404{
405    /// Fetches the `Leaf` for a given request.
406    ///
407    /// Attempts to fetch and deserialize the requested data using the new type first.
408    /// If deserialization into the new type fails (e.g., because the provider is still returning
409    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
410    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
411    ///
412    async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
413        let client_url = self.client.base_url();
414
415        let bytes = self
416            .client
417            .get::<()>(&format!("availability/leaf/{}", req.height))
418            .bytes()
419            .await;
420        let bytes = match bytes {
421            Ok(bytes) => bytes,
422            Err(err) => {
423                tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
424
425                return None;
426            },
427        };
428
429        // Attempt to deserialize using the new type
430
431        match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
432            Ok(mut leaf) => {
433                if leaf.height() != req.height {
434                    tracing::error!(
435                        %client_url, ?req, ?leaf,
436                        expected_height = req.height,
437                        actual_height = leaf.height(),
438                        "received leaf with the wrong height"
439                    );
440                    return None;
441                }
442                if leaf.hash() != req.expected_leaf {
443                    tracing::error!(
444                        %client_url, ?req, ?leaf,
445                        expected_hash = %req.expected_leaf,
446                        actual_hash = %leaf.hash(),
447                        "received leaf with the wrong hash"
448                    );
449                    return None;
450                }
451                if leaf.qc().commit() != req.expected_qc {
452                    tracing::error!(
453                        %client_url, ?req, ?leaf,
454                        expected_qc = %req.expected_qc,
455                        actual_qc = %leaf.qc().commit(),
456                        "received leaf with the wrong QC"
457                    );
458                    return None;
459                }
460
461                // There is a potential DOS attack where the peer sends us a leaf with the full
462                // payload in it, which uses redundant resources in the database, since we fetch and
463                // store payloads separately. We can defend ourselves by simply dropping the payload
464                // if present.
465                leaf.leaf.unfill_block_payload();
466
467                Some(leaf)
468            },
469            Err(err) => {
470                tracing::info!(
471                    ?req, %err,
472                    "failed to deserialize LeafQueryData, falling back to legacy deserialization"
473                );
474                // Fallback deserialization
475                self.deserialize_legacy_leaf(bytes, req).await
476            },
477        }
478    }
479}
480
481#[async_trait]
482impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
483where
484    Types: NodeType,
485{
486    /// Fetches the `VidCommon` for a given request.
487    ///
488    /// Attempts to fetch and deserialize the requested data using the new type first.
489    /// If deserialization into the new type fails (e.g., because the provider is still returning
490    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
491    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
492    ///
493    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
494        let client_url = self.client.base_url();
495        let bytes = self
496            .client
497            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
498            .bytes()
499            .await;
500        let bytes = match bytes {
501            Ok(bytes) => bytes,
502            Err(err) => {
503                tracing::info!(
504                    %client_url, ?req, %err,
505                    "failed to fetch VID common bytes"
506                );
507                return None;
508            },
509        };
510
511        match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
512            Ok(res) => {
513                if !res.common().is_consistent(&req.0) {
514                    tracing::error!(
515                        %client_url, ?req, ?res.common,
516                        "fetched inconsistent VID common data"
517                    );
518                    return None;
519                }
520                Some(res.common)
521            },
522            Err(err) => {
523                tracing::info!(
524                    %client_url, ?req, %err,
525                    "failed to deserialize as V1 VID common data, trying legacy fallback"
526                );
527                // Fallback deserialization
528                self.deserialize_legacy_vid_common::<Types>(bytes, req)
529                    .await
530            },
531        }
532    }
533}
534
535// These tests run the `postgres` Docker image, which doesn't work on Windows.
536#[cfg(all(test, not(target_os = "windows")))]
537mod test {
538    use std::{future::IntoFuture, time::Duration};
539
540    use committable::Committable;
541    use futures::{
542        future::{join, FutureExt},
543        stream::StreamExt,
544    };
545    // generic-array 0.14.x is deprecated, but VidCommitment requires this version
546    // for From<Output<H>> impl in jf-merkle-tree
547    #[allow(deprecated)]
548    use generic_array::GenericArray;
549    use hotshot_example_types::node_types::{EpochsTestVersions, TestVersions};
550    use hotshot_types::traits::node_implementation::Versions;
551    use portpicker::pick_unused_port;
552    use rand::RngCore;
553    use tide_disco::{error::ServerError, App};
554    use vbs::version::StaticVersion;
555
556    use super::*;
557    use crate::{
558        api::load_api,
559        availability::{
560            define_api, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData,
561            BlockWithTransaction, Fetch, UpdateAvailabilityData,
562        },
563        data_source::{
564            sql::{self, SqlDataSource},
565            storage::{
566                fail_storage::{FailStorage, FailableAction},
567                pruning::{PrunedHeightStorage, PrunerCfg},
568                sql::testing::TmpDb,
569                AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
570            },
571            AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
572        },
573        fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
574        node::{data_source::NodeDataSource, SyncStatus},
575        task::BackgroundTask,
576        testing::{
577            consensus::{MockDataSource, MockNetwork},
578            mocks::{mock_transaction, MockBase, MockTypes, MockVersions},
579            sleep,
580        },
581        types::HeightIndexed,
582        ApiState,
583    };
584
585    type Provider = TestProvider<QueryServiceProvider<MockBase>>;
586    type EpochProvider = TestProvider<QueryServiceProvider<<EpochsTestVersions as Versions>::Base>>;
587
588    fn ignore<T>(_: T) {}
589
590    /// Build a data source suitable for this suite of tests.
591    async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
592        db: &TmpDb,
593        provider: &P,
594    ) -> sql::Builder<MockTypes, P> {
595        db.config()
596            .builder((*provider).clone())
597            .await
598            .unwrap()
599            // We disable proactive fetching for these tests, since we are intending to test on
600            // demand fetching, and proactive fetching could lead to false successes.
601            .disable_proactive_fetching()
602    }
603
604    /// A data source suitable for this suite of tests, with the default options.
605    async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
606        db: &TmpDb,
607        provider: &P,
608    ) -> SqlDataSource<MockTypes, P> {
609        builder(db, provider).await.build().await.unwrap()
610    }
611
612    #[test_log::test(tokio::test(flavor = "multi_thread"))]
613    async fn test_fetch_on_request() {
614        // Create the consensus network.
615        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
616
617        // Start a web server that the non-consensus node can use to fetch blocks.
618        let port = pick_unused_port().unwrap();
619        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
620        app.register_module(
621            "availability",
622            define_api(
623                &Default::default(),
624                MockBase::instance(),
625                "1.0.0".parse().unwrap(),
626            )
627            .unwrap(),
628        )
629        .unwrap();
630        network.spawn(
631            "server",
632            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
633        );
634
635        // Start a data source which is not receiving events from consensus, only from a peer.
636        let db = TmpDb::init().await;
637        let provider = Provider::new(QueryServiceProvider::new(
638            format!("http://localhost:{port}").parse().unwrap(),
639            MockBase::instance(),
640        ));
641        let data_source = data_source(&db, &provider).await;
642
643        // Start consensus.
644        network.start().await;
645
646        // Wait until the block height reaches 6. This gives us the genesis block, one additional
647        // block at the end, and then one block to play around with fetching each type of resource:
648        // * Leaf
649        // * Block
650        // * Payload
651        // * VID common
652        let leaves = network.data_source().subscribe_leaves(1).await;
653        let leaves = leaves.take(5).collect::<Vec<_>>().await;
654        let test_leaf = &leaves[0];
655        let test_block = &leaves[1];
656        let test_payload = &leaves[2];
657        let test_common = &leaves[3];
658
659        // Make requests for missing data that should _not_ trigger an active fetch:
660        tracing::info!("requesting unfetchable resources");
661        let mut fetches = vec![];
662        // * An unknown leaf hash.
663        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
664        // * An unknown leaf height.
665        fetches.push(
666            data_source
667                .get_leaf(test_leaf.height() as usize)
668                .await
669                .map(ignore),
670        );
671        // * An unknown block hash.
672        fetches.push(
673            data_source
674                .get_block(test_block.block_hash())
675                .await
676                .map(ignore),
677        );
678        fetches.push(
679            data_source
680                .get_payload(test_payload.block_hash())
681                .await
682                .map(ignore),
683        );
684        fetches.push(
685            data_source
686                .get_vid_common(test_common.block_hash())
687                .await
688                .map(ignore),
689        );
690        // * An unknown block height.
691        fetches.push(
692            data_source
693                .get_block(test_block.height() as usize)
694                .await
695                .map(ignore),
696        );
697        fetches.push(
698            data_source
699                .get_payload(test_payload.height() as usize)
700                .await
701                .map(ignore),
702        );
703        fetches.push(
704            data_source
705                .get_vid_common(test_common.height() as usize)
706                .await
707                .map(ignore),
708        );
709        // * Genesis VID common (no VID for genesis)
710        fetches.push(data_source.get_vid_common(0).await.map(ignore));
711        // * An unknown transaction.
712        fetches.push(
713            data_source
714                .get_block_containing_transaction(mock_transaction(vec![]).commit())
715                .await
716                .map(ignore),
717        );
718
719        // Even if we give data extra time to propagate, these requests will not resolve, since we
720        // didn't trigger any active fetches.
721        sleep(Duration::from_secs(1)).await;
722        for (i, fetch) in fetches.into_iter().enumerate() {
723            tracing::info!("checking fetch {i} is unresolved");
724            fetch.try_resolve().unwrap_err();
725        }
726
727        // Now we will actually fetch the missing data. First, since our node is not really
728        // connected to consensus, we need to give it a leaf after the range of interest so it
729        // learns about the correct block height. We will temporarily lock requests to the provider
730        // so that we can verify that without the provider, the node does _not_ get the data.
731        provider.block().await;
732        data_source
733            .append(leaves.last().cloned().unwrap().into())
734            .await
735            .unwrap();
736
737        tracing::info!("requesting fetchable resources");
738        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
739        let req_block = data_source.get_block(test_block.height() as usize).await;
740        let req_payload = data_source
741            .get_payload(test_payload.height() as usize)
742            .await;
743        let req_common = data_source
744            .get_vid_common(test_common.height() as usize)
745            .await;
746
747        // Give the requests some extra time to complete, and check that they still haven't
748        // resolved, since the provider is blocked. This just ensures the integrity of the test by
749        // checking the node didn't mysteriously get the block from somewhere else, so that when we
750        // unblock the provider and the node finally gets the block, we know it came from the
751        // provider.
752        sleep(Duration::from_secs(1)).await;
753        req_leaf.try_resolve().unwrap_err();
754        req_block.try_resolve().unwrap_err();
755        req_payload.try_resolve().unwrap_err();
756        req_common.try_resolve().unwrap_err();
757
758        // Unblock the request and see that we eventually receive the data.
759        provider.unblock().await;
760        let leaf = data_source
761            .get_leaf(test_leaf.height() as usize)
762            .await
763            .await;
764        let block = data_source
765            .get_block(test_block.height() as usize)
766            .await
767            .await;
768        let payload = data_source
769            .get_payload(test_payload.height() as usize)
770            .await
771            .await;
772        let common = data_source
773            .get_vid_common(test_common.height() as usize)
774            .await
775            .await;
776        {
777            // Verify the data.
778            let truth = network.data_source();
779            assert_eq!(
780                leaf,
781                truth.get_leaf(test_leaf.height() as usize).await.await
782            );
783            assert_eq!(
784                block,
785                truth.get_block(test_block.height() as usize).await.await
786            );
787            assert_eq!(
788                payload,
789                truth
790                    .get_payload(test_payload.height() as usize)
791                    .await
792                    .await
793            );
794            assert_eq!(
795                common,
796                truth
797                    .get_vid_common(test_common.height() as usize)
798                    .await
799                    .await
800            );
801        }
802
803        // Fetching the block and payload should have also fetched the corresponding leaves, since
804        // we have an invariant that we should not store a block in the database without its
805        // corresponding leaf and header. Thus we should be able to get the leaves even if the
806        // provider is blocked.
807        provider.block().await;
808        for leaf in [test_block, test_payload] {
809            tracing::info!("fetching existing leaf {}", leaf.height());
810            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
811            assert_eq!(*leaf, fetched_leaf);
812        }
813
814        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
815        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
816        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
817        // with this hash exists, and trigger a fetch for it.
818        tracing::info!("fetching block by hash");
819        provider.unblock().await;
820        {
821            let block = data_source.get_block(test_leaf.block_hash()).await.await;
822            assert_eq!(block.hash(), leaf.block_hash());
823        }
824
825        // Test a similar scenario, but with payload instead of block: we are aware of
826        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
827        // hash.
828        tracing::info!("fetching payload by hash");
829        {
830            let leaf = leaves.last().unwrap();
831            let payload = data_source.get_payload(leaf.block_hash()).await.await;
832            assert_eq!(payload.height(), leaf.height());
833            assert_eq!(payload.block_hash(), leaf.block_hash());
834            assert_eq!(payload.hash(), leaf.payload_hash());
835        }
836    }
837
838    #[tokio::test(flavor = "multi_thread")]
839    async fn test_fetch_on_request_epoch_version() {
840        // This test verifies that our provider can handle fetching things by their hashes,
841        // specifically focused on epoch version transitions
842        tracing::info!("Starting test_fetch_on_request_epoch_version");
843
844        // Create the consensus network.
845        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
846
847        // Start a web server that the non-consensus node can use to fetch blocks.
848        let port = pick_unused_port().unwrap();
849        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
850        app.register_module(
851            "availability",
852            define_api(
853                &Default::default(),
854                <EpochsTestVersions as Versions>::Base::instance(),
855                "1.0.0".parse().unwrap(),
856            )
857            .unwrap(),
858        )
859        .unwrap();
860        network.spawn(
861            "server",
862            app.serve(
863                format!("0.0.0.0:{port}"),
864                <EpochsTestVersions as Versions>::Base::instance(),
865            ),
866        );
867
868        // Start a data source which is not receiving events from consensus, only from a peer.
869        // Use our special test provider that handles epoch version transitions
870        let db = TmpDb::init().await;
871        let provider = EpochProvider::new(QueryServiceProvider::new(
872            format!("http://localhost:{port}").parse().unwrap(),
873            <EpochsTestVersions as Versions>::Base::instance(),
874        ));
875        let data_source = data_source(&db, &provider).await;
876
877        // Start consensus.
878        network.start().await;
879
880        // Wait until the block height reaches 6. This gives us the genesis block, one additional
881        // block at the end, and then one block to play around with fetching each type of resource:
882        // * Leaf
883        // * Block
884        // * Payload
885        // * VID common
886        let leaves = network.data_source().subscribe_leaves(1).await;
887        let leaves = leaves.take(5).collect::<Vec<_>>().await;
888        let test_leaf = &leaves[0];
889        let test_block = &leaves[1];
890        let test_payload = &leaves[2];
891        let test_common = &leaves[3];
892
893        // Make requests for missing data that should _not_ trigger an active fetch:
894        let mut fetches = vec![];
895        // * An unknown leaf hash.
896        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
897        // * An unknown leaf height.
898        fetches.push(
899            data_source
900                .get_leaf(test_leaf.height() as usize)
901                .await
902                .map(ignore),
903        );
904        // * An unknown block hash.
905        fetches.push(
906            data_source
907                .get_block(test_block.block_hash())
908                .await
909                .map(ignore),
910        );
911        fetches.push(
912            data_source
913                .get_payload(test_payload.block_hash())
914                .await
915                .map(ignore),
916        );
917        fetches.push(
918            data_source
919                .get_vid_common(test_common.block_hash())
920                .await
921                .map(ignore),
922        );
923        // * An unknown block height.
924        fetches.push(
925            data_source
926                .get_block(test_block.height() as usize)
927                .await
928                .map(ignore),
929        );
930        fetches.push(
931            data_source
932                .get_payload(test_payload.height() as usize)
933                .await
934                .map(ignore),
935        );
936        fetches.push(
937            data_source
938                .get_vid_common(test_common.height() as usize)
939                .await
940                .map(ignore),
941        );
942        // * Genesis VID common (no VID for genesis)
943        fetches.push(data_source.get_vid_common(0).await.map(ignore));
944        // * An unknown transaction.
945        fetches.push(
946            data_source
947                .get_block_containing_transaction(mock_transaction(vec![]).commit())
948                .await
949                .map(ignore),
950        );
951
952        // Even if we give data extra time to propagate, these requests will not resolve, since we
953        // didn't trigger any active fetches.
954        sleep(Duration::from_secs(1)).await;
955        for (i, fetch) in fetches.into_iter().enumerate() {
956            tracing::info!("checking fetch {i} is unresolved");
957            fetch.try_resolve().unwrap_err();
958        }
959
960        // Now we will actually fetch the missing data. First, since our node is not really
961        // connected to consensus, we need to give it a leaf after the range of interest so it
962        // learns about the correct block height. We will temporarily lock requests to the provider
963        // so that we can verify that without the provider, the node does _not_ get the data.
964        provider.block().await;
965        data_source
966            .append(leaves.last().cloned().unwrap().into())
967            .await
968            .unwrap();
969
970        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
971        let req_block = data_source.get_block(test_block.height() as usize).await;
972        let req_payload = data_source
973            .get_payload(test_payload.height() as usize)
974            .await;
975        let req_common = data_source
976            .get_vid_common(test_common.height() as usize)
977            .await;
978
979        // Give the requests some extra time to complete, and check that they still haven't
980        // resolved, since the provider is blocked. This just ensures the integrity of the test by
981        // checking the node didn't mysteriously get the block from somewhere else, so that when we
982        // unblock the provider and the node finally gets the block, we know it came from the
983        // provider.
984        sleep(Duration::from_secs(1)).await;
985        req_leaf.try_resolve().unwrap_err();
986        req_block.try_resolve().unwrap_err();
987        req_payload.try_resolve().unwrap_err();
988        req_common.try_resolve().unwrap_err();
989
990        // Unblock the request and see that we eventually receive the data.
991        provider.unblock().await;
992        let leaf = data_source
993            .get_leaf(test_leaf.height() as usize)
994            .await
995            .await;
996        let block = data_source
997            .get_block(test_block.height() as usize)
998            .await
999            .await;
1000        let payload = data_source
1001            .get_payload(test_payload.height() as usize)
1002            .await
1003            .await;
1004        let common = data_source
1005            .get_vid_common(test_common.height() as usize)
1006            .await
1007            .await;
1008        {
1009            // Verify the data.
1010            let truth = network.data_source();
1011            assert_eq!(
1012                leaf,
1013                truth.get_leaf(test_leaf.height() as usize).await.await
1014            );
1015            assert_eq!(
1016                block,
1017                truth.get_block(test_block.height() as usize).await.await
1018            );
1019            assert_eq!(
1020                payload,
1021                truth
1022                    .get_payload(test_payload.height() as usize)
1023                    .await
1024                    .await
1025            );
1026            assert_eq!(
1027                common,
1028                truth
1029                    .get_vid_common(test_common.height() as usize)
1030                    .await
1031                    .await
1032            );
1033        }
1034
1035        // Fetching the block and payload should have also fetched the corresponding leaves, since
1036        // we have an invariant that we should not store a block in the database without its
1037        // corresponding leaf and header. Thus we should be able to get the leaves even if the
1038        // provider is blocked.
1039        provider.block().await;
1040        for leaf in [test_block, test_payload] {
1041            tracing::info!("fetching existing leaf {}", leaf.height());
1042            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1043            assert_eq!(*leaf, fetched_leaf);
1044        }
1045
1046        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
1047        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
1048        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
1049        // with this hash exists, and trigger a fetch for it.
1050        provider.unblock().await;
1051        {
1052            let block = data_source.get_block(test_leaf.block_hash()).await.await;
1053            assert_eq!(block.hash(), leaf.block_hash());
1054        }
1055
1056        // Test a similar scenario, but with payload instead of block: we are aware of
1057        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
1058        // hash.
1059        {
1060            let leaf = leaves.last().unwrap();
1061            let payload = data_source.get_payload(leaf.block_hash()).await.await;
1062            assert_eq!(payload.height(), leaf.height());
1063            assert_eq!(payload.block_hash(), leaf.block_hash());
1064            assert_eq!(payload.hash(), leaf.payload_hash());
1065        }
1066
1067        // Add more debug logs throughout the test
1068        tracing::info!("Test completed successfully!");
1069    }
1070
1071    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1072    async fn test_fetch_block_and_leaf_concurrently() {
1073        // Create the consensus network.
1074        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1075
1076        // Start a web server that the non-consensus node can use to fetch blocks.
1077        let port = pick_unused_port().unwrap();
1078        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1079        app.register_module(
1080            "availability",
1081            define_api(
1082                &Default::default(),
1083                MockBase::instance(),
1084                "1.0.0".parse().unwrap(),
1085            )
1086            .unwrap(),
1087        )
1088        .unwrap();
1089        network.spawn(
1090            "server",
1091            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1092        );
1093
1094        // Start a data source which is not receiving events from consensus, only from a peer.
1095        let db = TmpDb::init().await;
1096        let provider = Provider::new(QueryServiceProvider::new(
1097            format!("http://localhost:{port}").parse().unwrap(),
1098            MockBase::instance(),
1099        ));
1100        let data_source = data_source(&db, &provider).await;
1101
1102        // Start consensus.
1103        network.start().await;
1104
1105        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1106        // block at the end, and then one block that we can use to test fetching.
1107        let leaves = network.data_source().subscribe_leaves(1).await;
1108        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1109        let test_leaf = &leaves[0];
1110
1111        // Tell the node about a leaf after the one of interest so it learns about the block height.
1112        data_source.append(leaves[1].clone().into()).await.unwrap();
1113
1114        // Fetch a leaf and the corresponding block at the same time. This will result in two tasks
1115        // trying to fetch the same leaf, but one should win and notify the other, which ultimately
1116        // ends up not fetching anything.
1117        let (leaf, block) = join(
1118            data_source
1119                .get_leaf(test_leaf.height() as usize)
1120                .await
1121                .into_future(),
1122            data_source
1123                .get_block(test_leaf.height() as usize)
1124                .await
1125                .into_future(),
1126        )
1127        .await;
1128        assert_eq!(leaf, *test_leaf);
1129        assert_eq!(leaf.header(), block.header());
1130    }
1131
1132    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1133    async fn test_fetch_different_blocks_same_payload() {
1134        // Create the consensus network.
1135        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1136
1137        // Start a web server that the non-consensus node can use to fetch blocks.
1138        let port = pick_unused_port().unwrap();
1139        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1140        app.register_module(
1141            "availability",
1142            define_api(
1143                &Default::default(),
1144                MockBase::instance(),
1145                "1.0.0".parse().unwrap(),
1146            )
1147            .unwrap(),
1148        )
1149        .unwrap();
1150        network.spawn(
1151            "server",
1152            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1153        );
1154
1155        // Start a data source which is not receiving events from consensus, only from a peer.
1156        let db = TmpDb::init().await;
1157        let provider = Provider::new(QueryServiceProvider::new(
1158            format!("http://localhost:{port}").parse().unwrap(),
1159            MockBase::instance(),
1160        ));
1161        let data_source = data_source(&db, &provider).await;
1162
1163        // Start consensus.
1164        network.start().await;
1165
1166        // Wait until the block height reaches 4. This gives us the genesis block, one additional
1167        // block at the end, and then two blocks that we can use to test fetching.
1168        let leaves = network.data_source().subscribe_leaves(1).await;
1169        let leaves = leaves.take(4).collect::<Vec<_>>().await;
1170
1171        // Tell the node about a leaf after the range of interest so it learns about the block
1172        // height.
1173        data_source
1174            .append(leaves.last().cloned().unwrap().into())
1175            .await
1176            .unwrap();
1177
1178        // All the blocks here are empty, so they have the same payload:
1179        assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1180        // If we fetch both blocks at the same time, we can check that we haven't broken anything
1181        // with whatever optimizations we add to deduplicate payload fetching.
1182        let (block1, block2) = join(
1183            data_source
1184                .get_block(leaves[0].height() as usize)
1185                .await
1186                .into_future(),
1187            data_source
1188                .get_block(leaves[1].height() as usize)
1189                .await
1190                .into_future(),
1191        )
1192        .await;
1193        assert_eq!(block1.header(), leaves[0].header());
1194        assert_eq!(block2.header(), leaves[1].header());
1195    }
1196
1197    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1198    async fn test_fetch_stream() {
1199        // Create the consensus network.
1200        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1201
1202        // Start a web server that the non-consensus node can use to fetch blocks.
1203        let port = pick_unused_port().unwrap();
1204        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1205        app.register_module(
1206            "availability",
1207            define_api(
1208                &Default::default(),
1209                MockBase::instance(),
1210                "1.0.0".parse().unwrap(),
1211            )
1212            .unwrap(),
1213        )
1214        .unwrap();
1215        network.spawn(
1216            "server",
1217            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1218        );
1219
1220        // Start a data source which is not receiving events from consensus, only from a peer.
1221        let db = TmpDb::init().await;
1222        let provider = Provider::new(QueryServiceProvider::new(
1223            format!("http://localhost:{port}").parse().unwrap(),
1224            MockBase::instance(),
1225        ));
1226        let data_source = data_source(&db, &provider).await;
1227
1228        // Start consensus.
1229        network.start().await;
1230
1231        // Subscribe to objects from the future.
1232        let blocks = data_source.subscribe_blocks(0).await;
1233        let leaves = data_source.subscribe_leaves(0).await;
1234        let common = data_source.subscribe_vid_common(0).await;
1235
1236        // Wait for a few blocks to be finalized.
1237        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1238        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1239
1240        // Tell the node about a leaf after the range of interest so it learns about the block
1241        // height.
1242        data_source
1243            .append(finalized_leaves.last().cloned().unwrap().into())
1244            .await
1245            .unwrap();
1246
1247        // Check the subscriptions.
1248        let blocks = blocks.take(5).collect::<Vec<_>>().await;
1249        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1250        let common = common.take(5).collect::<Vec<_>>().await;
1251        for i in 0..5 {
1252            tracing::info!("checking block {i}");
1253            assert_eq!(leaves[i], finalized_leaves[i]);
1254            assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1255            assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1256        }
1257    }
1258
1259    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1260    async fn test_fetch_range_start() {
1261        // Create the consensus network.
1262        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1263
1264        // Start a web server that the non-consensus node can use to fetch blocks.
1265        let port = pick_unused_port().unwrap();
1266        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1267        app.register_module(
1268            "availability",
1269            define_api(
1270                &Default::default(),
1271                MockBase::instance(),
1272                "1.0.0".parse().unwrap(),
1273            )
1274            .unwrap(),
1275        )
1276        .unwrap();
1277        network.spawn(
1278            "server",
1279            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1280        );
1281
1282        // Start a data source which is not receiving events from consensus, only from a peer.
1283        let db = TmpDb::init().await;
1284        let provider = Provider::new(QueryServiceProvider::new(
1285            format!("http://localhost:{port}").parse().unwrap(),
1286            MockBase::instance(),
1287        ));
1288        let data_source = data_source(&db, &provider).await;
1289
1290        // Start consensus.
1291        network.start().await;
1292
1293        // Wait for a few blocks to be finalized.
1294        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1295        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1296
1297        // Tell the node about a leaf after the range of interest (so it learns about the block
1298        // height) and one in the middle of the range, to test what happens when data is missing
1299        // from the beginning of the range but other data is available.
1300        let mut tx = data_source.write().await.unwrap();
1301        tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1302        tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1303        tx.commit().await.unwrap();
1304
1305        // Get the whole range of leaves.
1306        let leaves = data_source
1307            .get_leaf_range(..5)
1308            .await
1309            .then(Fetch::resolve)
1310            .collect::<Vec<_>>()
1311            .await;
1312        for i in 0..5 {
1313            tracing::info!("checking leaf {i}");
1314            assert_eq!(leaves[i], finalized_leaves[i]);
1315        }
1316    }
1317
1318    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1319    async fn fetch_transaction() {
1320        // Create the consensus network.
1321        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1322
1323        // Start a web server that the non-consensus node can use to fetch blocks.
1324        let port = pick_unused_port().unwrap();
1325        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1326        app.register_module(
1327            "availability",
1328            define_api(
1329                &Default::default(),
1330                MockBase::instance(),
1331                "1.0.0".parse().unwrap(),
1332            )
1333            .unwrap(),
1334        )
1335        .unwrap();
1336        network.spawn(
1337            "server",
1338            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1339        );
1340
1341        // Start a data source which is not receiving events from consensus. We don't give it a
1342        // fetcher since transactions are always fetched passively anyways.
1343        let db = TmpDb::init().await;
1344        let data_source = data_source(&db, &NoFetching).await;
1345
1346        // Subscribe to blocks.
1347        let mut leaves = network.data_source().subscribe_leaves(1).await;
1348        let mut blocks = network.data_source().subscribe_blocks(1).await;
1349
1350        // Start consensus.
1351        network.start().await;
1352
1353        // Subscribe to a transaction which hasn't been sequenced yet. This is completely passive
1354        // and works without a fetcher; we don't trigger fetches for transactions that we don't know
1355        // exist.
1356        let tx = mock_transaction(vec![1, 2, 3]);
1357        let fut = data_source
1358            .get_block_containing_transaction(tx.commit())
1359            .await;
1360
1361        // Sequence the transaction.
1362        network.submit_transaction(tx.clone()).await;
1363
1364        // Send blocks to the query service, the future will resolve as soon as it sees a block
1365        // containing the transaction.
1366        let block = loop {
1367            let leaf = leaves.next().await.unwrap();
1368            let block = blocks.next().await.unwrap();
1369
1370            data_source
1371                .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1372                .await
1373                .unwrap();
1374
1375            if block.transaction_by_hash(tx.commit()).is_some() {
1376                break block;
1377            }
1378        };
1379        tracing::info!("transaction included in block {}", block.height());
1380
1381        let fetched_tx = fut.await;
1382        assert_eq!(
1383            fetched_tx,
1384            BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1385        );
1386
1387        // Future queries for this transaction resolve immediately.
1388        assert_eq!(
1389            fetched_tx,
1390            data_source
1391                .get_block_containing_transaction(tx.commit())
1392                .await
1393                .await
1394        );
1395    }
1396
1397    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1398    async fn test_retry() {
1399        // Create the consensus network.
1400        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1401
1402        // Start a web server that the non-consensus node can use to fetch blocks.
1403        let port = pick_unused_port().unwrap();
1404        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1405        app.register_module(
1406            "availability",
1407            define_api(
1408                &Default::default(),
1409                MockBase::instance(),
1410                "1.0.0".parse().unwrap(),
1411            )
1412            .unwrap(),
1413        )
1414        .unwrap();
1415        network.spawn(
1416            "server",
1417            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1418        );
1419
1420        // Start a data source which is not receiving events from consensus.
1421        let db = TmpDb::init().await;
1422        let provider = Provider::new(QueryServiceProvider::new(
1423            format!("http://localhost:{port}").parse().unwrap(),
1424            MockBase::instance(),
1425        ));
1426        let data_source = builder(&db, &provider)
1427            .await
1428            .with_max_retry_interval(Duration::from_secs(1))
1429            .build()
1430            .await
1431            .unwrap();
1432
1433        // Start consensus.
1434        network.start().await;
1435
1436        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1437        // block at the end, and one block to try fetching.
1438        let leaves = network.data_source().subscribe_leaves(1).await;
1439        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1440        let test_leaf = &leaves[0];
1441
1442        // Cause requests to fail temporarily, so we can test retries.
1443        provider.fail();
1444
1445        // Give the node a leaf after the range of interest so it learns about the correct block
1446        // height.
1447        data_source
1448            .append(leaves.last().cloned().unwrap().into())
1449            .await
1450            .unwrap();
1451
1452        tracing::info!("requesting leaf from failing providers");
1453        let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1454
1455        // Wait a few retries and check that the request has not completed, since the provider is
1456        // failing.
1457        sleep(Duration::from_secs(5)).await;
1458        fut.try_resolve().unwrap_err();
1459
1460        // As soon as the provider recovers, the request can complete.
1461        provider.unfail();
1462        assert_eq!(
1463            data_source
1464                .get_leaf(test_leaf.height() as usize)
1465                .await
1466                .await,
1467            *test_leaf
1468        );
1469    }
1470
1471    // Uses deprecated generic-array 0.14.x via digest, required for VidCommitment compatibility
1472    #[allow(deprecated)]
1473    fn random_vid_commit() -> VidCommitment {
1474        let mut bytes = [0; 32];
1475        rand::thread_rng().fill_bytes(&mut bytes);
1476        VidCommitment::V0(GenericArray::from(bytes).into())
1477    }
1478
1479    async fn malicious_server(port: u16) {
1480        let mut api = load_api::<(), ServerError, MockBase>(
1481            None::<std::path::PathBuf>,
1482            include_str!("../../../api/availability.toml"),
1483            vec![],
1484        )
1485        .unwrap();
1486
1487        api.get("get_payload", move |_, _| {
1488            async move {
1489                // No matter what data we are asked for, always respond with dummy data.
1490                Ok(PayloadQueryData::<MockTypes>::genesis::<TestVersions>(
1491                    &Default::default(),
1492                    &Default::default(),
1493                )
1494                .await)
1495            }
1496            .boxed()
1497        })
1498        .unwrap()
1499        .get("get_vid_common", move |_, _| {
1500            async move {
1501                // No matter what data we are asked for, always respond with dummy data.
1502                Ok(VidCommonQueryData::<MockTypes>::genesis::<TestVersions>(
1503                    &Default::default(),
1504                    &Default::default(),
1505                )
1506                .await)
1507            }
1508            .boxed()
1509        })
1510        .unwrap();
1511
1512        let mut app = App::<(), ServerError>::with_state(());
1513        app.register_module("availability", api).unwrap();
1514        app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1515            .await
1516            .ok();
1517    }
1518
1519    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1520    async fn test_fetch_from_malicious_server() {
1521        let port = pick_unused_port().unwrap();
1522        let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1523
1524        let provider = QueryServiceProvider::new(
1525            format!("http://localhost:{port}").parse().unwrap(),
1526            MockBase::instance(),
1527        );
1528        provider.client.connect(None).await;
1529
1530        // Query for a random payload, the server will respond with a different payload, and we
1531        // should detect the error.
1532        let res =
1533            ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1534                .await;
1535        assert_eq!(res, None);
1536
1537        // Query for a random VID common, the server will respond with a different one, and we
1538        // should detect the error.
1539        let res =
1540            ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1541                .await;
1542        assert_eq!(res, None);
1543    }
1544
1545    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1546    async fn test_archive_recovery() {
1547        // Create the consensus network.
1548        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1549
1550        // Start a web server that the non-consensus node can use to fetch blocks.
1551        let port = pick_unused_port().unwrap();
1552        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1553        app.register_module(
1554            "availability",
1555            define_api(
1556                &Default::default(),
1557                MockBase::instance(),
1558                "1.0.0".parse().unwrap(),
1559            )
1560            .unwrap(),
1561        )
1562        .unwrap();
1563        network.spawn(
1564            "server",
1565            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1566        );
1567
1568        // Start a data source which is not receiving events from consensus, only from a peer. The
1569        // data source is at first configured to aggressively prune data.
1570        let db = TmpDb::init().await;
1571        let provider = Provider::new(QueryServiceProvider::new(
1572            format!("http://localhost:{port}").parse().unwrap(),
1573            MockBase::instance(),
1574        ));
1575        let mut data_source = db
1576            .config()
1577            .pruner_cfg(
1578                PrunerCfg::new()
1579                    .with_target_retention(Duration::from_secs(0))
1580                    .with_interval(Duration::from_secs(5)),
1581            )
1582            .unwrap()
1583            .builder(provider.clone())
1584            .await
1585            .unwrap()
1586            // Set a fast retry for failed operations. Occasionally storage operations will fail due
1587            // to conflicting write-mode transactions running concurrently. This is ok as they will
1588            // be retried. Having a fast retry interval speeds up the test.
1589            .with_min_retry_interval(Duration::from_millis(100))
1590            // Randomize retries a lot. This will temporarlly separate competing transactions write
1591            // transactions with high probability, so that one of them quickly gets exclusive access
1592            // to the database.
1593            .with_retry_randomization_factor(3.)
1594            .build()
1595            .await
1596            .unwrap();
1597
1598        // Start consensus.
1599        network.start().await;
1600
1601        // Wait until a few blocks are produced.
1602        let leaves = network.data_source().subscribe_leaves(1).await;
1603        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1604
1605        // The disconnected data source has no data yet, so it hasn't done any pruning.
1606        let pruned_height = data_source
1607            .read()
1608            .await
1609            .unwrap()
1610            .load_pruned_height()
1611            .await
1612            .unwrap();
1613        // Either None or 0 is acceptable, depending on whether or not the prover has run yet.
1614        assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1615
1616        // Send the last leaf to the disconnected data source so it learns about the height and
1617        // fetches the missing data.
1618        let last_leaf = leaves.last().unwrap();
1619        data_source.append(last_leaf.clone().into()).await.unwrap();
1620
1621        // Trigger a fetch of each leaf so the database gets populated.
1622        for i in 1..=last_leaf.height() {
1623            tracing::info!(i, "fetching leaf");
1624            assert_eq!(
1625                data_source.get_leaf(i as usize).await.await,
1626                leaves[i as usize - 1]
1627            );
1628        }
1629
1630        // After a bit of time, the pruner has run and deleted all the missing data we just fetched.
1631        loop {
1632            let pruned_height = data_source
1633                .read()
1634                .await
1635                .unwrap()
1636                .load_pruned_height()
1637                .await
1638                .unwrap();
1639            if pruned_height == Some(last_leaf.height()) {
1640                break;
1641            }
1642            tracing::info!(
1643                ?pruned_height,
1644                target_height = last_leaf.height(),
1645                "waiting for pruner to run"
1646            );
1647            sleep(Duration::from_secs(1)).await;
1648        }
1649
1650        // Now close the data source and restart it with archive recovery.
1651        data_source = db
1652            .config()
1653            .archive()
1654            .builder(provider.clone())
1655            .await
1656            .unwrap()
1657            .with_minor_scan_interval(Duration::from_secs(1))
1658            .with_major_scan_interval(1)
1659            .build()
1660            .await
1661            .unwrap();
1662
1663        // Pruned height should be reset.
1664        let pruned_height = data_source
1665            .read()
1666            .await
1667            .unwrap()
1668            .load_pruned_height()
1669            .await
1670            .unwrap();
1671        assert_eq!(pruned_height, None);
1672
1673        // The node has pruned all of it's data including the latest block, so it's forgotten the
1674        // block height. We need to give it another leaf with some height so it will be willing to
1675        // fetch.
1676        data_source.append(last_leaf.clone().into()).await.unwrap();
1677
1678        // Wait for the data to be restored. It should be restored by the next major scan.
1679        loop {
1680            let sync_status = data_source.sync_status().await.unwrap();
1681
1682            // VID shares are unique to a node and will never be fetched from a peer; this is
1683            // acceptable since there is redundancy built into the VID scheme. Ignore missing VID
1684            // shares in the `is_fully_synced` check.
1685            if (SyncStatus {
1686                missing_vid_shares: 0,
1687                ..sync_status
1688            })
1689            .is_fully_synced()
1690            {
1691                break;
1692            }
1693            tracing::info!(?sync_status, "waiting for node to sync");
1694            sleep(Duration::from_secs(1)).await;
1695        }
1696
1697        // The node remains fully synced even after some time; no pruning.
1698        sleep(Duration::from_secs(3)).await;
1699        let sync_status = data_source.sync_status().await.unwrap();
1700        assert!(
1701            (SyncStatus {
1702                missing_vid_shares: 0,
1703                ..sync_status
1704            })
1705            .is_fully_synced(),
1706            "{sync_status:?}"
1707        );
1708    }
1709
1710    #[derive(Clone, Copy, Debug)]
1711    enum FailureType {
1712        Begin,
1713        Write,
1714        Commit,
1715    }
1716
1717    async fn test_fetch_storage_failure_helper(failure: FailureType) {
1718        // Create the consensus network.
1719        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1720
1721        // Start a web server that the non-consensus node can use to fetch blocks.
1722        let port = pick_unused_port().unwrap();
1723        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1724        app.register_module(
1725            "availability",
1726            define_api(
1727                &Default::default(),
1728                MockBase::instance(),
1729                "1.0.0".parse().unwrap(),
1730            )
1731            .unwrap(),
1732        )
1733        .unwrap();
1734        network.spawn(
1735            "server",
1736            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1737        );
1738
1739        // Start a data source which is not receiving events from consensus, only from a peer.
1740        let provider = Provider::new(QueryServiceProvider::new(
1741            format!("http://localhost:{port}").parse().unwrap(),
1742            MockBase::instance(),
1743        ));
1744        let db = TmpDb::init().await;
1745        let storage = FailStorage::from(
1746            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1747                .await
1748                .unwrap(),
1749        );
1750        let data_source = FetchingDataSource::builder(storage, provider)
1751            .disable_proactive_fetching()
1752            .disable_aggregator()
1753            .with_max_retry_interval(Duration::from_millis(100))
1754            .with_retry_timeout(Duration::from_secs(1))
1755            .build()
1756            .await
1757            .unwrap();
1758
1759        // Start consensus.
1760        network.start().await;
1761
1762        // Wait until a couple of blocks are produced.
1763        let leaves = network.data_source().subscribe_leaves(1).await;
1764        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1765
1766        // Send the last leaf to the disconnected data source so it learns about the height.
1767        let last_leaf = leaves.last().unwrap();
1768        let mut tx = data_source.write().await.unwrap();
1769        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1770        tx.commit().await.unwrap();
1771
1772        // Trigger a fetch of the first leaf; it should resolve even if we fail to store the leaf.
1773        tracing::info!("fetch with write failure");
1774        match failure {
1775            FailureType::Begin => {
1776                data_source
1777                    .as_ref()
1778                    .fail_begins_writable(FailableAction::Any)
1779                    .await
1780            },
1781            FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1782            FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1783        }
1784        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1785        data_source.as_ref().pass().await;
1786
1787        // It is possible that the fetch above completes, notifies the subscriber,
1788        // and the fetch below quickly subscribes and gets notified by the same loop.
1789        // We add a delay here to avoid this situation.
1790        // This is not a bug, as being notified after subscribing is fine.
1791        sleep(Duration::from_secs(1)).await;
1792
1793        // We can get the same leaf again, this will again trigger an active fetch since storage
1794        // failed the first time.
1795        tracing::info!("fetch with write success");
1796        let fetch = data_source.get_leaf(1).await;
1797        assert!(fetch.is_pending());
1798        assert_eq!(leaves[0], fetch.await);
1799
1800        sleep(Duration::from_secs(1)).await;
1801
1802        // Finally, we should have the leaf locally and not need to fetch it.
1803        tracing::info!("retrieve from storage");
1804        let fetch = data_source.get_leaf(1).await;
1805        assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1806    }
1807
1808    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1809    async fn test_fetch_storage_failure_on_begin() {
1810        test_fetch_storage_failure_helper(FailureType::Begin).await;
1811    }
1812
1813    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1814    async fn test_fetch_storage_failure_on_write() {
1815        test_fetch_storage_failure_helper(FailureType::Write).await;
1816    }
1817
1818    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1819    async fn test_fetch_storage_failure_on_commit() {
1820        test_fetch_storage_failure_helper(FailureType::Commit).await;
1821    }
1822
1823    async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1824        // Create the consensus network.
1825        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1826
1827        // Start a web server that the non-consensus node can use to fetch blocks.
1828        let port = pick_unused_port().unwrap();
1829        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1830        app.register_module(
1831            "availability",
1832            define_api(
1833                &Default::default(),
1834                MockBase::instance(),
1835                "1.0.0".parse().unwrap(),
1836            )
1837            .unwrap(),
1838        )
1839        .unwrap();
1840        network.spawn(
1841            "server",
1842            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1843        );
1844
1845        // Start a data source which is not receiving events from consensus, only from a peer.
1846        let provider = Provider::new(QueryServiceProvider::new(
1847            format!("http://localhost:{port}").parse().unwrap(),
1848            MockBase::instance(),
1849        ));
1850        let db = TmpDb::init().await;
1851        let storage = FailStorage::from(
1852            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1853                .await
1854                .unwrap(),
1855        );
1856        let data_source = FetchingDataSource::builder(storage, provider)
1857            .disable_proactive_fetching()
1858            .disable_aggregator()
1859            .with_min_retry_interval(Duration::from_millis(100))
1860            .build()
1861            .await
1862            .unwrap();
1863
1864        // Start consensus.
1865        network.start().await;
1866
1867        // Wait until a couple of blocks are produced.
1868        let leaves = network.data_source().subscribe_leaves(1).await;
1869        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1870
1871        // Send the last leaf to the disconnected data source so it learns about the height.
1872        let last_leaf = leaves.last().unwrap();
1873        let mut tx = data_source.write().await.unwrap();
1874        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1875        tx.commit().await.unwrap();
1876
1877        // Trigger a fetch of the first leaf; it should retry until it successfully stores the leaf.
1878        tracing::info!("fetch with write failure");
1879        match failure {
1880            FailureType::Begin => {
1881                data_source
1882                    .as_ref()
1883                    .fail_one_begin_writable(FailableAction::Any)
1884                    .await
1885            },
1886            FailureType::Write => {
1887                data_source
1888                    .as_ref()
1889                    .fail_one_write(FailableAction::Any)
1890                    .await
1891            },
1892            FailureType::Commit => {
1893                data_source
1894                    .as_ref()
1895                    .fail_one_commit(FailableAction::Any)
1896                    .await
1897            },
1898        }
1899        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1900
1901        // Check that the leaf ended up in local storage.
1902        let mut tx = data_source.read().await.unwrap();
1903        assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1904    }
1905
1906    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1907    async fn test_fetch_storage_failure_retry_on_begin() {
1908        test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1909    }
1910
1911    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1912    async fn test_fetch_storage_failure_retry_on_write() {
1913        test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1914    }
1915
1916    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1917    async fn test_fetch_storage_failure_retry_on_commit() {
1918        test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1919    }
1920
1921    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1922    async fn test_fetch_on_decide() {
1923        // Create the consensus network.
1924        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1925
1926        // Start a web server that the non-consensus node can use to fetch blocks.
1927        let port = pick_unused_port().unwrap();
1928        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1929        app.register_module(
1930            "availability",
1931            define_api(
1932                &Default::default(),
1933                MockBase::instance(),
1934                "1.0.0".parse().unwrap(),
1935            )
1936            .unwrap(),
1937        )
1938        .unwrap();
1939        network.spawn(
1940            "server",
1941            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1942        );
1943
1944        // Start a data source which is not receiving events from consensus.
1945        let db = TmpDb::init().await;
1946        let provider = Provider::new(QueryServiceProvider::new(
1947            format!("http://localhost:{port}").parse().unwrap(),
1948            MockBase::instance(),
1949        ));
1950        let data_source = builder(&db, &provider)
1951            .await
1952            .with_max_retry_interval(Duration::from_secs(1))
1953            .build()
1954            .await
1955            .unwrap();
1956
1957        // Start consensus.
1958        network.start().await;
1959
1960        // Wait until a block has been decided.
1961        let leaf = network
1962            .data_source()
1963            .subscribe_leaves(1)
1964            .await
1965            .next()
1966            .await
1967            .unwrap();
1968
1969        // Give the node a decide containing the leaf but no additional information.
1970        data_source.append(leaf.clone().into()).await.unwrap();
1971
1972        // We will eventually retrieve the corresponding block and VID common, triggered by seeing
1973        // the leaf.
1974        sleep(Duration::from_secs(5)).await;
1975
1976        // Read the missing data directly from storage (via a database transaction), rather than
1977        // going through the data source, so that this request itself does not trigger a fetch.
1978        // Thus, this will only work if the data was already fetched, triggered by the leaf.
1979        let mut tx = data_source.read().await.unwrap();
1980        let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1981        let block = tx.get_block(id).await.unwrap();
1982        let vid = tx.get_vid_common(id).await.unwrap();
1983
1984        assert_eq!(block.hash(), leaf.block_hash());
1985        assert_eq!(vid.block_hash(), leaf.block_hash());
1986    }
1987
1988    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1989    async fn test_fetch_begin_failure() {
1990        // Create the consensus network.
1991        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1992
1993        // Start a web server that the non-consensus node can use to fetch blocks.
1994        let port = pick_unused_port().unwrap();
1995        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1996        app.register_module(
1997            "availability",
1998            define_api(
1999                &Default::default(),
2000                MockBase::instance(),
2001                "1.0.0".parse().unwrap(),
2002            )
2003            .unwrap(),
2004        )
2005        .unwrap();
2006        network.spawn(
2007            "server",
2008            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2009        );
2010
2011        // Start a data source which is not receiving events from consensus, only from a peer.
2012        let provider = Provider::new(QueryServiceProvider::new(
2013            format!("http://localhost:{port}").parse().unwrap(),
2014            MockBase::instance(),
2015        ));
2016        let db = TmpDb::init().await;
2017        let storage = FailStorage::from(
2018            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2019                .await
2020                .unwrap(),
2021        );
2022        let data_source = FetchingDataSource::builder(storage, provider)
2023            .disable_proactive_fetching()
2024            .disable_aggregator()
2025            .with_min_retry_interval(Duration::from_millis(100))
2026            .build()
2027            .await
2028            .unwrap();
2029
2030        // Start consensus.
2031        network.start().await;
2032
2033        // Wait until a couple of blocks are produced.
2034        let leaves = network.data_source().subscribe_leaves(1).await;
2035        let leaves = leaves.take(2).collect::<Vec<_>>().await;
2036
2037        // Send the last leaf to the disconnected data source so it learns about the height.
2038        let last_leaf = leaves.last().unwrap();
2039        let mut tx = data_source.write().await.unwrap();
2040        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2041        tx.commit().await.unwrap();
2042
2043        // Trigger a fetch of the first leaf; it should retry until it is able to determine
2044        // the leaf is fetchable and trigger the fetch.
2045        tracing::info!("fetch with transaction failure");
2046        data_source
2047            .as_ref()
2048            .fail_one_begin_read_only(FailableAction::Any)
2049            .await;
2050        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2051    }
2052
2053    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2054    async fn test_fetch_load_failure_block() {
2055        // Create the consensus network.
2056        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2057
2058        // Start a web server that the non-consensus node can use to fetch blocks.
2059        let port = pick_unused_port().unwrap();
2060        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2061        app.register_module(
2062            "availability",
2063            define_api(
2064                &Default::default(),
2065                MockBase::instance(),
2066                "1.0.0".parse().unwrap(),
2067            )
2068            .unwrap(),
2069        )
2070        .unwrap();
2071        network.spawn(
2072            "server",
2073            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2074        );
2075
2076        // Start a data source which is not receiving events from consensus, only from a peer.
2077        let provider = Provider::new(QueryServiceProvider::new(
2078            format!("http://localhost:{port}").parse().unwrap(),
2079            MockBase::instance(),
2080        ));
2081        let db = TmpDb::init().await;
2082        let storage = FailStorage::from(
2083            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2084                .await
2085                .unwrap(),
2086        );
2087        let data_source = FetchingDataSource::builder(storage, provider)
2088            .disable_proactive_fetching()
2089            .disable_aggregator()
2090            .with_min_retry_interval(Duration::from_millis(100))
2091            .build()
2092            .await
2093            .unwrap();
2094
2095        // Start consensus.
2096        network.start().await;
2097
2098        // Wait until a block is produced.
2099        let mut leaves = network.data_source().subscribe_leaves(1).await;
2100        let leaf = leaves.next().await.unwrap();
2101
2102        // Send the leaf to the disconnected data source, so the corresponding block becomes
2103        // fetchable.
2104        let mut tx = data_source.write().await.unwrap();
2105        tx.insert_leaf(leaf.clone()).await.unwrap();
2106        tx.commit().await.unwrap();
2107
2108        // Trigger a fetch of the block by hash; it should retry until it is able to determine the
2109        // leaf is available, thus the block is fetchable, trigger the fetch.
2110        //
2111        // Failing only on the `get_header` call here hits an edge case which is only possible when
2112        // fetching blocks: we successfully determine that the object is not available locally and
2113        // that it might exist, so we actually call `active_fetch` to try and get it. If we then
2114        // fail to load the header and erroneously treat this as the header not being available, we
2115        // will give up and consider the object unfetchable (since the next step would be to fetch
2116        // the corresponding leaf, but we cannot do this with just a block hash).
2117        //
2118        // Thus, this test will only pass if we correctly retry the `active_fetch` until we are
2119        // successfully able to load the header from storage and determine that the block is
2120        // fetchable.
2121        tracing::info!("fetch with read failure");
2122        data_source
2123            .as_ref()
2124            .fail_one_read(FailableAction::GetHeader)
2125            .await;
2126        let fetch = data_source.get_block(leaf.block_hash()).await;
2127
2128        // Give some time for a few reads to fail before letting them succeed.
2129        sleep(Duration::from_secs(2)).await;
2130        data_source.as_ref().pass().await;
2131
2132        let block: BlockQueryData<MockTypes> = fetch.await;
2133        assert_eq!(block.hash(), leaf.block_hash());
2134    }
2135
2136    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2137    async fn test_fetch_load_failure_tx() {
2138        // Create the consensus network.
2139        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2140
2141        // Start a web server that the non-consensus node can use to fetch blocks.
2142        let port = pick_unused_port().unwrap();
2143        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2144        app.register_module(
2145            "availability",
2146            define_api(
2147                &Default::default(),
2148                MockBase::instance(),
2149                "1.0.0".parse().unwrap(),
2150            )
2151            .unwrap(),
2152        )
2153        .unwrap();
2154        network.spawn(
2155            "server",
2156            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2157        );
2158
2159        // Start a data source which is not receiving events from consensus, only from a peer.
2160        let provider = Provider::new(QueryServiceProvider::new(
2161            format!("http://localhost:{port}").parse().unwrap(),
2162            MockBase::instance(),
2163        ));
2164        let db = TmpDb::init().await;
2165        let storage = FailStorage::from(
2166            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2167                .await
2168                .unwrap(),
2169        );
2170        let data_source = FetchingDataSource::builder(storage, provider)
2171            .disable_proactive_fetching()
2172            .disable_aggregator()
2173            .with_min_retry_interval(Duration::from_millis(100))
2174            .build()
2175            .await
2176            .unwrap();
2177
2178        // Start consensus.
2179        network.start().await;
2180
2181        // Wait until a transaction is sequenced.
2182        let tx = mock_transaction(vec![1, 2, 3]);
2183        network.submit_transaction(tx.clone()).await;
2184        let tx = network
2185            .data_source()
2186            .get_block_containing_transaction(tx.commit())
2187            .await
2188            .await;
2189
2190        // Send the block containing the transaction to the disconnected data source.
2191        {
2192            let leaf = network
2193                .data_source()
2194                .get_leaf(tx.transaction.block_height() as usize)
2195                .await
2196                .await;
2197            let block = network
2198                .data_source()
2199                .get_block(tx.transaction.block_height() as usize)
2200                .await
2201                .await;
2202            let mut tx = data_source.write().await.unwrap();
2203            tx.insert_leaf(leaf.clone()).await.unwrap();
2204            tx.insert_block(block.clone()).await.unwrap();
2205            tx.commit().await.unwrap();
2206        }
2207
2208        // Check that the transaction is there.
2209        tracing::info!("fetch success");
2210        assert_eq!(
2211            tx,
2212            data_source
2213                .get_block_containing_transaction(tx.transaction.hash())
2214                .await
2215                .await
2216        );
2217
2218        // Fetch the transaction with storage failures.
2219        //
2220        // Failing only one read here hits an edge case that only exists for unfetchable objects
2221        // (e.g. transactions). This will cause the initial aload of the transaction to fail, but,
2222        // if we erroneously treat this load failure as the transaction being missing, we will
2223        // succeed in calling `fetch`, since subsequent loads succeed. However, since a transaction
2224        // is not active-fetchable, no active fetch will actually be spawned, and this fetch will
2225        // never resolve.
2226        //
2227        // Thus, the test should only pass if we correctly retry the initial load until it succeeds
2228        // and we discover that the transaction doesn't need to be fetched at all.
2229        tracing::info!("fetch with read failure");
2230        data_source
2231            .as_ref()
2232            .fail_one_read(FailableAction::Any)
2233            .await;
2234        let fetch = data_source
2235            .get_block_containing_transaction(tx.transaction.hash())
2236            .await;
2237
2238        assert_eq!(tx, fetch.await);
2239    }
2240
2241    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2242    async fn test_stream_begin_failure() {
2243        // Create the consensus network.
2244        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2245
2246        // Start a web server that the non-consensus node can use to fetch blocks.
2247        let port = pick_unused_port().unwrap();
2248        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2249        app.register_module(
2250            "availability",
2251            define_api(
2252                &Default::default(),
2253                MockBase::instance(),
2254                "1.0.0".parse().unwrap(),
2255            )
2256            .unwrap(),
2257        )
2258        .unwrap();
2259        network.spawn(
2260            "server",
2261            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2262        );
2263
2264        // Start a data source which is not receiving events from consensus, only from a peer.
2265        let provider = Provider::new(QueryServiceProvider::new(
2266            format!("http://localhost:{port}").parse().unwrap(),
2267            MockBase::instance(),
2268        ));
2269        let db = TmpDb::init().await;
2270        let storage = FailStorage::from(
2271            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2272                .await
2273                .unwrap(),
2274        );
2275        let data_source = FetchingDataSource::builder(storage, provider)
2276            .disable_proactive_fetching()
2277            .disable_aggregator()
2278            .with_min_retry_interval(Duration::from_millis(100))
2279            .with_range_chunk_size(3)
2280            .build()
2281            .await
2282            .unwrap();
2283
2284        // Start consensus.
2285        network.start().await;
2286
2287        // Wait until a few blocks are produced.
2288        let leaves = network.data_source().subscribe_leaves(1).await;
2289        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2290
2291        // Send the last leaf to the disconnected data source so it learns about the height.
2292        let last_leaf = leaves.last().unwrap();
2293        let mut tx = data_source.write().await.unwrap();
2294        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2295        tx.commit().await.unwrap();
2296
2297        // Stream the leaves; it should retry until it is able to determine each leaf is fetchable
2298        // and trigger the fetch.
2299        tracing::info!("stream with transaction failure");
2300        data_source
2301            .as_ref()
2302            .fail_one_begin_read_only(FailableAction::Any)
2303            .await;
2304        assert_eq!(
2305            leaves,
2306            data_source
2307                .subscribe_leaves(1)
2308                .await
2309                .take(5)
2310                .collect::<Vec<_>>()
2311                .await
2312        );
2313    }
2314
2315    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2316    async fn test_stream_load_failure() {
2317        // Create the consensus network.
2318        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2319
2320        // Start a web server that the non-consensus node can use to fetch blocks.
2321        let port = pick_unused_port().unwrap();
2322        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2323        app.register_module(
2324            "availability",
2325            define_api(
2326                &Default::default(),
2327                MockBase::instance(),
2328                "1.0.0".parse().unwrap(),
2329            )
2330            .unwrap(),
2331        )
2332        .unwrap();
2333        network.spawn(
2334            "server",
2335            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2336        );
2337
2338        // Start a data source which is not receiving events from consensus, only from a peer.
2339        let provider = Provider::new(QueryServiceProvider::new(
2340            format!("http://localhost:{port}").parse().unwrap(),
2341            MockBase::instance(),
2342        ));
2343        let db = TmpDb::init().await;
2344        let storage = FailStorage::from(
2345            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2346                .await
2347                .unwrap(),
2348        );
2349        let data_source = FetchingDataSource::builder(storage, provider)
2350            .disable_proactive_fetching()
2351            .disable_aggregator()
2352            .with_min_retry_interval(Duration::from_millis(100))
2353            .with_range_chunk_size(3)
2354            .build()
2355            .await
2356            .unwrap();
2357
2358        // Start consensus.
2359        network.start().await;
2360
2361        // Wait until a few blocks are produced.
2362        let leaves = network.data_source().subscribe_leaves(1).await;
2363        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2364
2365        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2366        let last_leaf = leaves.last().unwrap();
2367        let mut tx = data_source.write().await.unwrap();
2368        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2369        tx.commit().await.unwrap();
2370
2371        // Stream the blocks with a period of database failures.
2372        tracing::info!("stream with read failure");
2373        data_source.as_ref().fail_reads(FailableAction::Any).await;
2374        let fetches = data_source
2375            .get_block_range(1..=5)
2376            .await
2377            .collect::<Vec<_>>()
2378            .await;
2379
2380        // Give some time for a few reads to fail before letting them succeed.
2381        sleep(Duration::from_secs(2)).await;
2382        data_source.as_ref().pass().await;
2383
2384        for (leaf, fetch) in leaves.iter().zip(fetches) {
2385            let block: BlockQueryData<MockTypes> = fetch.await;
2386            assert_eq!(block.hash(), leaf.block_hash());
2387        }
2388    }
2389
2390    enum MetadataType {
2391        Payload,
2392        Vid,
2393    }
2394
2395    async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2396        // Create the consensus network.
2397        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2398
2399        // Start a web server that the non-consensus node can use to fetch blocks.
2400        let port = pick_unused_port().unwrap();
2401        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2402        app.register_module(
2403            "availability",
2404            define_api(
2405                &Default::default(),
2406                MockBase::instance(),
2407                "1.0.0".parse().unwrap(),
2408            )
2409            .unwrap(),
2410        )
2411        .unwrap();
2412        network.spawn(
2413            "server",
2414            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2415        );
2416
2417        // Start a data source which is not receiving events from consensus, only from a peer.
2418        let provider = Provider::new(QueryServiceProvider::new(
2419            format!("http://localhost:{port}").parse().unwrap(),
2420            MockBase::instance(),
2421        ));
2422        let db = TmpDb::init().await;
2423        let storage = FailStorage::from(
2424            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2425                .await
2426                .unwrap(),
2427        );
2428        let data_source = FetchingDataSource::builder(storage, provider)
2429            .disable_proactive_fetching()
2430            .disable_aggregator()
2431            .with_min_retry_interval(Duration::from_millis(100))
2432            .with_range_chunk_size(3)
2433            .build()
2434            .await
2435            .unwrap();
2436
2437        // Start consensus.
2438        network.start().await;
2439
2440        // Wait until a few blocks are produced.
2441        let leaves = network.data_source().subscribe_leaves(1).await;
2442        let leaves = leaves.take(3).collect::<Vec<_>>().await;
2443
2444        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2445        let last_leaf = leaves.last().unwrap();
2446        let mut tx = data_source.write().await.unwrap();
2447        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2448        tx.commit().await.unwrap();
2449
2450        // Send the first object to the disconnected data source, so we hit all the cases:
2451        // * leaf present but not full object (from the last leaf)
2452        // * full object present but inaccessible due to storage failures (first object)
2453        // * nothing present (middle object)
2454        let leaf = network.data_source().get_leaf(1).await.await;
2455        let block = network.data_source().get_block(1).await.await;
2456        let vid = network.data_source().get_vid_common(1).await.await;
2457        data_source
2458            .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2459            .await
2460            .unwrap();
2461
2462        // Stream the objects with a period of database failures.
2463        tracing::info!("stream with transaction failure");
2464        data_source
2465            .as_ref()
2466            .fail_begins_read_only(FailableAction::Any)
2467            .await;
2468        match stream {
2469            MetadataType::Payload => {
2470                let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2471
2472                // Give some time for a few reads to fail before letting them succeed.
2473                sleep(Duration::from_secs(2)).await;
2474                tracing::info!("stop failing transactions");
2475                data_source.as_ref().pass().await;
2476
2477                let payloads = payloads.collect::<Vec<_>>().await;
2478                for (leaf, payload) in leaves.iter().zip(payloads) {
2479                    assert_eq!(payload.block_hash, leaf.block_hash());
2480                }
2481            },
2482            MetadataType::Vid => {
2483                let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2484
2485                // Give some time for a few reads to fail before letting them succeed.
2486                sleep(Duration::from_secs(2)).await;
2487                tracing::info!("stop failing transactions");
2488                data_source.as_ref().pass().await;
2489
2490                let vids = vids.collect::<Vec<_>>().await;
2491                for (leaf, vid) in leaves.iter().zip(vids) {
2492                    assert_eq!(vid.block_hash, leaf.block_hash());
2493                }
2494            },
2495        }
2496    }
2497
2498    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2499    async fn test_metadata_stream_begin_failure_payload() {
2500        test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2501    }
2502
2503    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2504    async fn test_metadata_stream_begin_failure_vid() {
2505        test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2506    }
2507
2508    // This helper function starts up a mock network
2509    // with v0 and v1 availability query modules,
2510    // trigger fetches for a datasource from the provider,
2511    // and asserts that the fetched data is correct
2512    async fn run_fallback_deserialization_test_helper<V: Versions>(port: u16, version: &str) {
2513        let mut network = MockNetwork::<MockDataSource, V>::init().await;
2514
2515        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2516
2517        // Register availability APIs for two versions: v0 and v1
2518        app.register_module(
2519            "availability",
2520            define_api(
2521                &Default::default(),
2522                StaticVersion::<0, 1> {},
2523                "0.0.1".parse().unwrap(),
2524            )
2525            .unwrap(),
2526        )
2527        .unwrap();
2528
2529        app.register_module(
2530            "availability",
2531            define_api(
2532                &Default::default(),
2533                StaticVersion::<0, 1> {},
2534                "1.0.0".parse().unwrap(),
2535            )
2536            .unwrap(),
2537        )
2538        .unwrap();
2539
2540        network.spawn(
2541            "server",
2542            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2543        );
2544
2545        let db = TmpDb::init().await;
2546
2547        let provider_url = format!("http://localhost:{port}/{version}")
2548            .parse()
2549            .expect("Invalid URL");
2550
2551        let provider = Provider::new(QueryServiceProvider::new(
2552            provider_url,
2553            StaticVersion::<0, 1> {},
2554        ));
2555
2556        let ds = data_source(&db, &provider).await;
2557        network.start().await;
2558
2559        let leaves = network.data_source().subscribe_leaves(1).await;
2560        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2561        let test_leaf = &leaves[0];
2562        let test_payload = &leaves[2];
2563        let test_common = &leaves[3];
2564
2565        let mut fetches = vec![];
2566        // Issue requests for missing data (these should initially remain unresolved):
2567        fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2568        fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2569        fetches.push(
2570            ds.get_vid_common(test_common.block_hash())
2571                .await
2572                .map(ignore),
2573        );
2574
2575        // Even if we give data extra time to propagate, these requests will not resolve, since we
2576        // didn't trigger any active fetches.
2577        sleep(Duration::from_secs(1)).await;
2578        for (i, fetch) in fetches.into_iter().enumerate() {
2579            tracing::info!("checking fetch {i} is unresolved");
2580            fetch.try_resolve().unwrap_err();
2581        }
2582
2583        // Append the latest known leaf to the local store
2584        // This would trigger fetches for the corresponding missing data
2585        // such as header, vid and payload
2586        // This would also trigger fetches for the parent data
2587        ds.append(leaves.last().cloned().unwrap().into())
2588            .await
2589            .unwrap();
2590
2591        // check that the data has been fetches and matches the network data source
2592        {
2593            let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2594            let payload = ds.get_payload(test_payload.height() as usize).await;
2595            let common = ds.get_vid_common(test_common.height() as usize).await;
2596
2597            let truth = network.data_source();
2598            assert_eq!(
2599                leaf.await,
2600                truth.get_leaf(test_leaf.height() as usize).await.await
2601            );
2602            assert_eq!(
2603                payload.await,
2604                truth
2605                    .get_payload(test_payload.height() as usize)
2606                    .await
2607                    .await
2608            );
2609            assert_eq!(
2610                common.await,
2611                truth
2612                    .get_vid_common(test_common.height() as usize)
2613                    .await
2614                    .await
2615            );
2616        }
2617    }
2618
2619    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2620    async fn test_fallback_deserialization_for_fetch_requests_v0() {
2621        let port = pick_unused_port().unwrap();
2622
2623        // This run will call v0 availalbilty api for fetch requests.
2624        // The fetch initially attempts deserialization with new types,
2625        // which fails because the v0 provider returns legacy types.
2626        // It then falls back to deserializing as legacy types,
2627        // and the fetch passes
2628        run_fallback_deserialization_test_helper::<MockVersions>(port, "v0").await;
2629    }
2630
2631    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2632    async fn test_fallback_deserialization_for_fetch_requests_v1() {
2633        let port = pick_unused_port().unwrap();
2634
2635        // Fetch from the v1 availability API using MockVersions.
2636        // this one fetches from the v1 provider.
2637        // which would correctly deserialize the bytes in the first attempt, so no fallback deserialization is needed
2638        run_fallback_deserialization_test_helper::<MockVersions>(port, "v1").await;
2639    }
2640
2641    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2642    async fn test_fallback_deserialization_for_fetch_requests_pos() {
2643        let port = pick_unused_port().unwrap();
2644
2645        // Fetch Proof of Stake (PoS) data using the v1 availability API
2646        // with proof of stake version
2647        run_fallback_deserialization_test_helper::<EpochsTestVersions>(port, "v1").await;
2648    }
2649    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2650    async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2651        // Run with the PoS version against a v0 provider.
2652        // Fetch requests are expected to fail because PoS commitments differ from the legacy commitments
2653        // returned by the v0 provider.
2654        // For example: a PoS Leaf2 commitment will not match the downgraded commitment from a legacy Leaf1.
2655
2656        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
2657
2658        let port = pick_unused_port().unwrap();
2659        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2660
2661        app.register_module(
2662            "availability",
2663            define_api(
2664                &Default::default(),
2665                StaticVersion::<0, 1> {},
2666                "0.0.1".parse().unwrap(),
2667            )
2668            .unwrap(),
2669        )
2670        .unwrap();
2671
2672        network.spawn(
2673            "server",
2674            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2675        );
2676
2677        let db = TmpDb::init().await;
2678        let provider = Provider::new(QueryServiceProvider::new(
2679            format!("http://localhost:{port}/v0").parse().unwrap(),
2680            StaticVersion::<0, 1> {},
2681        ));
2682        let ds = data_source(&db, &provider).await;
2683
2684        network.start().await;
2685
2686        let leaves = network.data_source().subscribe_leaves(1).await;
2687        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2688        let test_leaf = &leaves[0];
2689        let test_payload = &leaves[2];
2690        let test_common = &leaves[3];
2691
2692        let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2693        let payload = ds.get_payload(test_payload.height() as usize).await;
2694        let common = ds.get_vid_common(test_common.height() as usize).await;
2695
2696        sleep(Duration::from_secs(3)).await;
2697
2698        // fetches fail because of different commitments
2699        leaf.try_resolve().unwrap_err();
2700        payload.try_resolve().unwrap_err();
2701        common.try_resolve().unwrap_err();
2702    }
2703}