hotshot_query_service/fetching/provider/
query_service.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use async_trait::async_trait;
14use committable::Committable;
15use hotshot_types::{
16    data::{ns_table, VidCommitment},
17    traits::{block_contents::BlockHeader, node_implementation::NodeType, EncodeBytes},
18    vid::{
19        advz::{advz_scheme, ADVZScheme},
20        avidm::{init_avidm_param, AvidMScheme},
21    },
22};
23use jf_vid::VidScheme;
24use surf_disco::{Client, Url};
25use vbs::{version::StaticVersionType, BinarySerializer};
26
27use super::Provider;
28use crate::{
29    availability::{
30        ADVZCommonQueryData, ADVZPayloadQueryData, LeafQueryData, LeafQueryDataLegacy,
31        PayloadQueryData, VidCommonQueryData,
32    },
33    fetching::request::{LeafRequest, PayloadRequest, VidCommonRequest},
34    types::HeightIndexed,
35    Error, Header, Payload, VidCommon,
36};
37
38/// Data availability provider backed by another instance of this query service.
39///
40/// This fetcher implements the [`Provider`] interface by querying the REST API provided by another
41/// instance of this query service to try and retrieve missing objects.
42#[derive(Clone, Debug)]
43pub struct QueryServiceProvider<Ver: StaticVersionType> {
44    client: Client<Error, Ver>,
45}
46
47impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
48    pub fn new(url: Url, _: Ver) -> Self {
49        Self {
50            client: Client::new(url),
51        }
52    }
53}
54
55impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
56    async fn deserialize_legacy_payload<Types: NodeType>(
57        &self,
58        payload_bytes: Vec<u8>,
59        common_bytes: Vec<u8>,
60        req: PayloadRequest,
61    ) -> Option<Payload<Types>> {
62        let client_url = self.client.base_url();
63
64        let PayloadRequest(VidCommitment::V0(advz_commit)) = req else {
65            return None;
66        };
67
68        let payload = match vbs::Serializer::<Ver>::deserialize::<ADVZPayloadQueryData<Types>>(
69            &payload_bytes,
70        ) {
71            Ok(payload) => payload,
72            Err(err) => {
73                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
74                return None;
75            },
76        };
77
78        let common = match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(
79            &common_bytes,
80        ) {
81            Ok(common) => common,
82            Err(err) => {
83                tracing::warn!(%err, ?req, "failed to deserialize ADVZPayloadQueryData");
84                return None;
85            },
86        };
87
88        let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common.common()) as usize;
89        let bytes = payload.data.encode();
90
91        let commit = advz_scheme(num_storage_nodes)
92            .commit_only(bytes)
93            .inspect_err(|err| {
94                tracing::error!(%err, ?req, "failed to compute legacy VID commitment");
95            })
96            .ok()?;
97
98        if commit != advz_commit {
99            tracing::error!(
100                ?req,
101                expected_commit=%advz_commit,
102                actual_commit=%commit,
103                %client_url,
104                "received inconsistent legacy payload"
105            );
106            return None;
107        }
108
109        Some(payload.data)
110    }
111
112    async fn deserialize_legacy_vid_common<Types: NodeType>(
113        &self,
114        bytes: Vec<u8>,
115        req: VidCommonRequest,
116    ) -> Option<VidCommon> {
117        let client_url = self.client.base_url();
118        let VidCommonRequest(VidCommitment::V0(advz_commit)) = req else {
119            return None;
120        };
121
122        match vbs::Serializer::<Ver>::deserialize::<ADVZCommonQueryData<Types>>(&bytes) {
123            Ok(res) => {
124                if ADVZScheme::is_consistent(&advz_commit, &res.common).is_ok() {
125                    Some(VidCommon::V0(res.common))
126                } else {
127                    tracing::error!(%client_url, ?req, ?res.common, "fetched inconsistent VID common data");
128                    None
129                }
130            },
131            Err(err) => {
132                tracing::warn!(
133                    %client_url,
134                    ?req,
135                    %err,
136                    "failed to deserialize ADVZCommonQueryData"
137                );
138                None
139            },
140        }
141    }
142    async fn deserialize_legacy_leaf<Types: NodeType>(
143        &self,
144        bytes: Vec<u8>,
145        req: LeafRequest<Types>,
146    ) -> Option<LeafQueryData<Types>> {
147        let client_url = self.client.base_url();
148
149        match vbs::Serializer::<Ver>::deserialize::<LeafQueryDataLegacy<Types>>(&bytes) {
150            Ok(mut leaf) => {
151                if leaf.height() != req.height {
152                    tracing::error!(
153                        %client_url, ?req,
154                        expected_height = req.height,
155                        actual_height = leaf.height(),
156                        "received leaf with the wrong height"
157                    );
158                    return None;
159                }
160
161                let expected_leaf_commit: [u8; 32] = req.expected_leaf.into();
162                let actual_leaf_commit: [u8; 32] = leaf.hash().into();
163                if actual_leaf_commit != expected_leaf_commit {
164                    tracing::error!(
165                        %client_url, ?req,
166                        expected_leaf = %req.expected_leaf,
167                        actual_leaf = %leaf.hash(),
168                        "received leaf with the wrong hash"
169                    );
170                    return None;
171                }
172
173                let expected_qc_commit: [u8; 32] = req.expected_qc.into();
174                let actual_qc_commit: [u8; 32] = leaf.qc().commit().into();
175                if actual_qc_commit != expected_qc_commit {
176                    tracing::error!(
177                        %client_url, ?req,
178                        expected_qc = %req.expected_qc,
179                        actual_qc = %leaf.qc().commit(),
180                        "received leaf with the wrong QC"
181                    );
182                    return None;
183                }
184
185                // There is a potential DOS attack where the peer sends us a leaf with the full
186                // payload in it, which uses redundant resources in the database, since we fetch and
187                // store payloads separately. We can defend ourselves by simply dropping the payload
188                // if present.
189                leaf.leaf.unfill_block_payload();
190
191                Some(leaf.into())
192            },
193            Err(err) => {
194                tracing::warn!(
195                    %client_url, ?req, %err,
196                    "failed to deserialize legacy LeafQueryData"
197                );
198                None
199            },
200        }
201    }
202}
203
204#[async_trait]
205impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
206where
207    Types: NodeType,
208{
209    /// Fetches the `Payload` for a given request.
210    ///
211    /// Attempts to fetch and deserialize the requested data using the new type first.
212    /// If deserialization into the new type fails (e.g., because the provider is still returning
213    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
214    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
215    ///
216    async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
217        let client_url = self.client.base_url();
218        let req_hash = req.0;
219        // Fetch the payload and the VID common data. We need the common data to recompute the VID
220        // commitment, to ensure the payload we received is consistent with the commitment we
221        // requested.
222        let payload_bytes = self
223            .client
224            .get::<()>(&format!("availability/payload/hash/{}", req.0))
225            .bytes()
226            .await
227            .inspect_err(|err| {
228                tracing::info!(%err, %req_hash, %client_url, "failed to fetch payload bytes");
229            })
230            .ok()?;
231
232        let common_bytes = self
233            .client
234            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
235            .bytes()
236            .await
237            .inspect_err(|err| {
238                tracing::info!(%err, %req_hash, %client_url, "failed to fetch VID common bytes");
239            })
240            .ok()?;
241
242        let payload =
243            vbs::Serializer::<Ver>::deserialize::<PayloadQueryData<Types>>(&payload_bytes)
244                .inspect_err(|err| {
245                    tracing::info!(%err, %req_hash, "failed to deserialize PayloadQueryData");
246                })
247                .ok();
248
249        let common =
250            vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&common_bytes)
251                .inspect_err(|err| {
252                    tracing::info!(%err, %req_hash,
253                        "error deserializing VidCommonQueryData",
254                    );
255                })
256                .ok();
257
258        let (payload, common) = match (payload, common) {
259            (Some(payload), Some(common)) => (payload, common),
260            _ => {
261                tracing::info!(%req_hash, "falling back to legacy payload deserialization");
262
263                // fallback deserialization
264                return self
265                    .deserialize_legacy_payload::<Types>(payload_bytes, common_bytes, req)
266                    .await;
267            },
268        };
269
270        match common.common() {
271            VidCommon::V0(common) => {
272                let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
273                let bytes = payload.data().encode();
274
275                let commit = advz_scheme(num_storage_nodes)
276                    .commit_only(bytes)
277                    .map(VidCommitment::V0)
278                    .inspect_err(|err| {
279                        tracing::error!(%err, %req_hash,  %num_storage_nodes, "failed to compute VID commitment (V0)");
280                    })
281                    .ok()?;
282
283                if commit != req.0 {
284                    tracing::error!(
285                        expected = %req_hash,
286                        actual = ?commit,
287                        %client_url,
288                        "VID commitment mismatch (V0)"
289                    );
290
291                    return None;
292                }
293            },
294            VidCommon::V1(common) => {
295                let bytes = payload.data().encode();
296
297                let avidm_param = init_avidm_param(common.total_weights)
298                    .inspect_err(|err| {
299                        tracing::error!(%err, %req_hash, "failed to initialize AVIDM params. total_weight={}", common.total_weights);
300                    })
301                    .ok()?;
302
303                let header = self
304                    .client
305                    .get::<Header<Types>>(&format!("availability/header/{}", payload.height()))
306                    .send()
307                    .await
308                    .inspect_err(|err| {
309                        tracing::warn!(%client_url, %err, "failed to fetch header for payload. height={}", payload.height());
310                    })
311                    .ok()?;
312
313                if header.payload_commitment() != req.0 {
314                    tracing::error!(
315                        expected = %req_hash,
316                        actual = %header.payload_commitment(),
317                        %client_url,
318                        "header payload commitment mismatch (V1)"
319                    );
320                    return None;
321                }
322
323                let metadata = header.metadata().encode();
324                let commit = AvidMScheme::commit(
325                    &avidm_param,
326                    &bytes,
327                    ns_table::parse_ns_table(bytes.len(), &metadata),
328                )
329                .map(VidCommitment::V1)
330                .inspect_err(|err| {
331                    tracing::error!(%err, %req_hash, "failed to compute AVIDM commitment");
332                })
333                .ok()?;
334
335                // Compare calculated commitment with requested commitment
336                if commit != req.0 {
337                    tracing::warn!(
338                        expected = %req_hash,
339                        actual = %commit,
340                        %client_url,
341                        "commitment type mismatch for AVIDM check"
342                    );
343                    return None;
344                }
345            },
346        }
347
348        Some(payload.data)
349    }
350}
351
352#[async_trait]
353impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
354    for QueryServiceProvider<Ver>
355where
356    Types: NodeType,
357{
358    /// Fetches the `Leaf` for a given request.
359    ///
360    /// Attempts to fetch and deserialize the requested data using the new type first.
361    /// If deserialization into the new type fails (e.g., because the provider is still returning
362    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
363    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
364    ///
365    async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
366        let client_url = self.client.base_url();
367
368        let bytes = self
369            .client
370            .get::<()>(&format!("availability/leaf/{}", req.height))
371            .bytes()
372            .await;
373        let bytes = match bytes {
374            Ok(bytes) => bytes,
375            Err(err) => {
376                tracing::info!(%client_url, ?req, %err, "failed to fetch bytes for leaf");
377
378                return None;
379            },
380        };
381
382        // Attempt to deserialize using the new type
383
384        match vbs::Serializer::<Ver>::deserialize::<LeafQueryData<Types>>(&bytes) {
385            Ok(mut leaf) => {
386                if leaf.height() != req.height {
387                    tracing::error!(
388                        %client_url, ?req, ?leaf,
389                        expected_height = req.height,
390                        actual_height = leaf.height(),
391                        "received leaf with the wrong height"
392                    );
393                    return None;
394                }
395                if leaf.hash() != req.expected_leaf {
396                    tracing::error!(
397                        %client_url, ?req, ?leaf,
398                        expected_hash = %req.expected_leaf,
399                        actual_hash = %leaf.hash(),
400                        "received leaf with the wrong hash"
401                    );
402                    return None;
403                }
404                if leaf.qc().commit() != req.expected_qc {
405                    tracing::error!(
406                        %client_url, ?req, ?leaf,
407                        expected_qc = %req.expected_qc,
408                        actual_qc = %leaf.qc().commit(),
409                        "received leaf with the wrong QC"
410                    );
411                    return None;
412                }
413
414                // There is a potential DOS attack where the peer sends us a leaf with the full
415                // payload in it, which uses redundant resources in the database, since we fetch and
416                // store payloads separately. We can defend ourselves by simply dropping the payload
417                // if present.
418                leaf.leaf.unfill_block_payload();
419
420                Some(leaf)
421            },
422            Err(err) => {
423                tracing::info!(
424                    ?req, %err,
425                    "failed to deserialize LeafQueryData, falling back to legacy deserialization"
426                );
427                // Fallback deserialization
428                self.deserialize_legacy_leaf(bytes, req).await
429            },
430        }
431    }
432}
433
434#[async_trait]
435impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
436where
437    Types: NodeType,
438{
439    /// Fetches the `VidCommon` for a given request.
440    ///
441    /// Attempts to fetch and deserialize the requested data using the new type first.
442    /// If deserialization into the new type fails (e.g., because the provider is still returning
443    /// legacy data), it falls back to attempt deserialization using an older, legacy type instead.
444    /// This fallback ensures compatibility with older nodes or providers that have not yet upgraded.
445    ///
446    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
447        let client_url = self.client.base_url();
448        let bytes = self
449            .client
450            .get::<()>(&format!("availability/vid/common/payload-hash/{}", req.0))
451            .bytes()
452            .await;
453        let bytes = match bytes {
454            Ok(bytes) => bytes,
455            Err(err) => {
456                tracing::info!(
457                    %client_url, ?req, %err,
458                    "failed to fetch VID common bytes"
459                );
460                return None;
461            },
462        };
463
464        match vbs::Serializer::<Ver>::deserialize::<VidCommonQueryData<Types>>(&bytes) {
465            Ok(res) => match req.0 {
466                VidCommitment::V0(commit) => {
467                    if let VidCommon::V0(common) = res.common {
468                        if ADVZScheme::is_consistent(&commit, &common).is_ok() {
469                            Some(VidCommon::V0(common))
470                        } else {
471                            tracing::error!(
472                                %client_url, ?req, ?commit, ?common,
473                                "VID V0 common data is inconsistent with commitment"
474                            );
475                            None
476                        }
477                    } else {
478                        tracing::warn!(?req, ?res, "Expect VID common data but found None");
479                        None
480                    }
481                },
482                VidCommitment::V1(_) => {
483                    if let VidCommon::V1(common) = res.common {
484                        Some(VidCommon::V1(common))
485                    } else {
486                        tracing::warn!(?req, ?res, "Expect VID common data but found None");
487                        None
488                    }
489                },
490            },
491            Err(err) => {
492                tracing::info!(
493                    %client_url, ?req, %err,
494                    "failed to deserialize as V1 VID common data, trying legacy fallback"
495                );
496                // Fallback deserialization
497                self.deserialize_legacy_vid_common::<Types>(bytes, req)
498                    .await
499            },
500        }
501    }
502}
503
504// These tests run the `postgres` Docker image, which doesn't work on Windows.
505#[cfg(all(test, not(target_os = "windows")))]
506mod test {
507    use std::{future::IntoFuture, time::Duration};
508
509    use committable::Committable;
510    use futures::{
511        future::{join, FutureExt},
512        stream::StreamExt,
513    };
514    use generic_array::GenericArray;
515    use hotshot_example_types::node_types::{EpochsTestVersions, TestVersions};
516    use hotshot_types::traits::node_implementation::Versions;
517    use portpicker::pick_unused_port;
518    use rand::RngCore;
519    use tide_disco::{error::ServerError, App};
520    use vbs::version::StaticVersion;
521
522    use super::*;
523    use crate::{
524        api::load_api,
525        availability::{
526            define_api, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch,
527            TransactionQueryData, UpdateAvailabilityData,
528        },
529        data_source::{
530            sql::{self, SqlDataSource},
531            storage::{
532                fail_storage::{FailStorage, FailableAction},
533                pruning::{PrunedHeightStorage, PrunerCfg},
534                sql::testing::TmpDb,
535                AvailabilityStorage, SqlStorage, UpdateAvailabilityStorage,
536            },
537            AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
538        },
539        fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
540        node::{data_source::NodeDataSource, SyncStatus},
541        task::BackgroundTask,
542        testing::{
543            consensus::{MockDataSource, MockNetwork},
544            mocks::{mock_transaction, MockBase, MockTypes, MockVersions},
545            setup_test, sleep,
546        },
547        types::HeightIndexed,
548        ApiState,
549    };
550
551    type Provider = TestProvider<QueryServiceProvider<MockBase>>;
552    type EpochProvider = TestProvider<QueryServiceProvider<<EpochsTestVersions as Versions>::Base>>;
553
554    fn ignore<T>(_: T) {}
555
556    /// Build a data source suitable for this suite of tests.
557    async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
558        db: &TmpDb,
559        provider: &P,
560    ) -> sql::Builder<MockTypes, P> {
561        db.config()
562            .builder((*provider).clone())
563            .await
564            .unwrap()
565            // We disable proactive fetching for these tests, since we are intending to test on
566            // demand fetching, and proactive fetching could lead to false successes.
567            .disable_proactive_fetching()
568    }
569
570    /// A data source suitable for this suite of tests, with the default options.
571    async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
572        db: &TmpDb,
573        provider: &P,
574    ) -> SqlDataSource<MockTypes, P> {
575        builder(db, provider).await.build().await.unwrap()
576    }
577
578    #[tokio::test(flavor = "multi_thread")]
579    async fn test_fetch_on_request() {
580        setup_test();
581
582        // Create the consensus network.
583        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
584
585        // Start a web server that the non-consensus node can use to fetch blocks.
586        let port = pick_unused_port().unwrap();
587        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
588        app.register_module(
589            "availability",
590            define_api(
591                &Default::default(),
592                MockBase::instance(),
593                "1.0.0".parse().unwrap(),
594            )
595            .unwrap(),
596        )
597        .unwrap();
598        network.spawn(
599            "server",
600            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
601        );
602
603        // Start a data source which is not receiving events from consensus, only from a peer.
604        let db = TmpDb::init().await;
605        let provider = Provider::new(QueryServiceProvider::new(
606            format!("http://localhost:{port}").parse().unwrap(),
607            MockBase::instance(),
608        ));
609        let data_source = data_source(&db, &provider).await;
610
611        // Start consensus.
612        network.start().await;
613
614        // Wait until the block height reaches 6. This gives us the genesis block, one additional
615        // block at the end, and then one block to play around with fetching each type of resource:
616        // * Leaf
617        // * Block
618        // * Payload
619        // * VID common
620        let leaves = network.data_source().subscribe_leaves(1).await;
621        let leaves = leaves.take(5).collect::<Vec<_>>().await;
622        let test_leaf = &leaves[0];
623        let test_block = &leaves[1];
624        let test_payload = &leaves[2];
625        let test_common = &leaves[3];
626
627        // Make requests for missing data that should _not_ trigger an active fetch:
628        tracing::info!("requesting unfetchable resources");
629        let mut fetches = vec![];
630        // * An unknown leaf hash.
631        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
632        // * An unknown leaf height.
633        fetches.push(
634            data_source
635                .get_leaf(test_leaf.height() as usize)
636                .await
637                .map(ignore),
638        );
639        // * An unknown block hash.
640        fetches.push(
641            data_source
642                .get_block(test_block.block_hash())
643                .await
644                .map(ignore),
645        );
646        fetches.push(
647            data_source
648                .get_payload(test_payload.block_hash())
649                .await
650                .map(ignore),
651        );
652        fetches.push(
653            data_source
654                .get_vid_common(test_common.block_hash())
655                .await
656                .map(ignore),
657        );
658        // * An unknown block height.
659        fetches.push(
660            data_source
661                .get_block(test_block.height() as usize)
662                .await
663                .map(ignore),
664        );
665        fetches.push(
666            data_source
667                .get_payload(test_payload.height() as usize)
668                .await
669                .map(ignore),
670        );
671        fetches.push(
672            data_source
673                .get_vid_common(test_common.height() as usize)
674                .await
675                .map(ignore),
676        );
677        // * Genesis VID common (no VID for genesis)
678        fetches.push(data_source.get_vid_common(0).await.map(ignore));
679        // * An unknown transaction.
680        fetches.push(
681            data_source
682                .get_transaction(mock_transaction(vec![]).commit())
683                .await
684                .map(ignore),
685        );
686
687        // Even if we give data extra time to propagate, these requests will not resolve, since we
688        // didn't trigger any active fetches.
689        sleep(Duration::from_secs(1)).await;
690        for (i, fetch) in fetches.into_iter().enumerate() {
691            tracing::info!("checking fetch {i} is unresolved");
692            fetch.try_resolve().unwrap_err();
693        }
694
695        // Now we will actually fetch the missing data. First, since our node is not really
696        // connected to consensus, we need to give it a leaf after the range of interest so it
697        // learns about the correct block height. We will temporarily lock requests to the provider
698        // so that we can verify that without the provider, the node does _not_ get the data.
699        provider.block().await;
700        data_source
701            .append(leaves.last().cloned().unwrap().into())
702            .await
703            .unwrap();
704
705        tracing::info!("requesting fetchable resources");
706        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
707        let req_block = data_source.get_block(test_block.height() as usize).await;
708        let req_payload = data_source
709            .get_payload(test_payload.height() as usize)
710            .await;
711        let req_common = data_source
712            .get_vid_common(test_common.height() as usize)
713            .await;
714
715        // Give the requests some extra time to complete, and check that they still haven't
716        // resolved, since the provider is blocked. This just ensures the integrity of the test by
717        // checking the node didn't mysteriously get the block from somewhere else, so that when we
718        // unblock the provider and the node finally gets the block, we know it came from the
719        // provider.
720        sleep(Duration::from_secs(1)).await;
721        req_leaf.try_resolve().unwrap_err();
722        req_block.try_resolve().unwrap_err();
723        req_payload.try_resolve().unwrap_err();
724        req_common.try_resolve().unwrap_err();
725
726        // Unblock the request and see that we eventually receive the data.
727        provider.unblock().await;
728        let leaf = data_source
729            .get_leaf(test_leaf.height() as usize)
730            .await
731            .await;
732        let block = data_source
733            .get_block(test_block.height() as usize)
734            .await
735            .await;
736        let payload = data_source
737            .get_payload(test_payload.height() as usize)
738            .await
739            .await;
740        let common = data_source
741            .get_vid_common(test_common.height() as usize)
742            .await
743            .await;
744        {
745            // Verify the data.
746            let truth = network.data_source();
747            assert_eq!(
748                leaf,
749                truth.get_leaf(test_leaf.height() as usize).await.await
750            );
751            assert_eq!(
752                block,
753                truth.get_block(test_block.height() as usize).await.await
754            );
755            assert_eq!(
756                payload,
757                truth
758                    .get_payload(test_payload.height() as usize)
759                    .await
760                    .await
761            );
762            assert_eq!(
763                common,
764                truth
765                    .get_vid_common(test_common.height() as usize)
766                    .await
767                    .await
768            );
769        }
770
771        // Fetching the block and payload should have also fetched the corresponding leaves, since
772        // we have an invariant that we should not store a block in the database without its
773        // corresponding leaf and header. Thus we should be able to get the leaves even if the
774        // provider is blocked.
775        provider.block().await;
776        for leaf in [test_block, test_payload] {
777            tracing::info!("fetching existing leaf {}", leaf.height());
778            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
779            assert_eq!(*leaf, fetched_leaf);
780        }
781
782        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
783        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
784        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
785        // with this hash exists, and trigger a fetch for it.
786        tracing::info!("fetching block by hash");
787        provider.unblock().await;
788        {
789            let block = data_source.get_block(test_leaf.block_hash()).await.await;
790            assert_eq!(block.hash(), leaf.block_hash());
791        }
792
793        // Test a similar scenario, but with payload instead of block: we are aware of
794        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
795        // hash.
796        tracing::info!("fetching payload by hash");
797        {
798            let leaf = leaves.last().unwrap();
799            let payload = data_source.get_payload(leaf.block_hash()).await.await;
800            assert_eq!(payload.height(), leaf.height());
801            assert_eq!(payload.block_hash(), leaf.block_hash());
802            assert_eq!(payload.hash(), leaf.payload_hash());
803        }
804    }
805
806    #[tokio::test(flavor = "multi_thread")]
807    async fn test_fetch_on_request_epoch_version() {
808        // This test verifies that our provider can handle fetching things by their hashes,
809        // specifically focused on epoch version transitions
810        tracing::info!("Starting test_fetch_on_request_epoch_version");
811
812        setup_test();
813
814        // Create the consensus network.
815        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
816
817        // Start a web server that the non-consensus node can use to fetch blocks.
818        let port = pick_unused_port().unwrap();
819        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
820        app.register_module(
821            "availability",
822            define_api(
823                &Default::default(),
824                <EpochsTestVersions as Versions>::Base::instance(),
825                "1.0.0".parse().unwrap(),
826            )
827            .unwrap(),
828        )
829        .unwrap();
830        network.spawn(
831            "server",
832            app.serve(
833                format!("0.0.0.0:{port}"),
834                <EpochsTestVersions as Versions>::Base::instance(),
835            ),
836        );
837
838        // Start a data source which is not receiving events from consensus, only from a peer.
839        // Use our special test provider that handles epoch version transitions
840        let db = TmpDb::init().await;
841        let provider = EpochProvider::new(QueryServiceProvider::new(
842            format!("http://localhost:{port}").parse().unwrap(),
843            <EpochsTestVersions as Versions>::Base::instance(),
844        ));
845        let data_source = data_source(&db, &provider).await;
846
847        // Start consensus.
848        network.start().await;
849
850        // Wait until the block height reaches 6. This gives us the genesis block, one additional
851        // block at the end, and then one block to play around with fetching each type of resource:
852        // * Leaf
853        // * Block
854        // * Payload
855        // * VID common
856        let leaves = network.data_source().subscribe_leaves(1).await;
857        let leaves = leaves.take(5).collect::<Vec<_>>().await;
858        let test_leaf = &leaves[0];
859        let test_block = &leaves[1];
860        let test_payload = &leaves[2];
861        let test_common = &leaves[3];
862
863        // Make requests for missing data that should _not_ trigger an active fetch:
864        let mut fetches = vec![];
865        // * An unknown leaf hash.
866        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
867        // * An unknown leaf height.
868        fetches.push(
869            data_source
870                .get_leaf(test_leaf.height() as usize)
871                .await
872                .map(ignore),
873        );
874        // * An unknown block hash.
875        fetches.push(
876            data_source
877                .get_block(test_block.block_hash())
878                .await
879                .map(ignore),
880        );
881        fetches.push(
882            data_source
883                .get_payload(test_payload.block_hash())
884                .await
885                .map(ignore),
886        );
887        fetches.push(
888            data_source
889                .get_vid_common(test_common.block_hash())
890                .await
891                .map(ignore),
892        );
893        // * An unknown block height.
894        fetches.push(
895            data_source
896                .get_block(test_block.height() as usize)
897                .await
898                .map(ignore),
899        );
900        fetches.push(
901            data_source
902                .get_payload(test_payload.height() as usize)
903                .await
904                .map(ignore),
905        );
906        fetches.push(
907            data_source
908                .get_vid_common(test_common.height() as usize)
909                .await
910                .map(ignore),
911        );
912        // * Genesis VID common (no VID for genesis)
913        fetches.push(data_source.get_vid_common(0).await.map(ignore));
914        // * An unknown transaction.
915        fetches.push(
916            data_source
917                .get_transaction(mock_transaction(vec![]).commit())
918                .await
919                .map(ignore),
920        );
921
922        // Even if we give data extra time to propagate, these requests will not resolve, since we
923        // didn't trigger any active fetches.
924        sleep(Duration::from_secs(1)).await;
925        for (i, fetch) in fetches.into_iter().enumerate() {
926            tracing::info!("checking fetch {i} is unresolved");
927            fetch.try_resolve().unwrap_err();
928        }
929
930        // Now we will actually fetch the missing data. First, since our node is not really
931        // connected to consensus, we need to give it a leaf after the range of interest so it
932        // learns about the correct block height. We will temporarily lock requests to the provider
933        // so that we can verify that without the provider, the node does _not_ get the data.
934        provider.block().await;
935        data_source
936            .append(leaves.last().cloned().unwrap().into())
937            .await
938            .unwrap();
939
940        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
941        let req_block = data_source.get_block(test_block.height() as usize).await;
942        let req_payload = data_source
943            .get_payload(test_payload.height() as usize)
944            .await;
945        let req_common = data_source
946            .get_vid_common(test_common.height() as usize)
947            .await;
948
949        // Give the requests some extra time to complete, and check that they still haven't
950        // resolved, since the provider is blocked. This just ensures the integrity of the test by
951        // checking the node didn't mysteriously get the block from somewhere else, so that when we
952        // unblock the provider and the node finally gets the block, we know it came from the
953        // provider.
954        sleep(Duration::from_secs(1)).await;
955        req_leaf.try_resolve().unwrap_err();
956        req_block.try_resolve().unwrap_err();
957        req_payload.try_resolve().unwrap_err();
958        req_common.try_resolve().unwrap_err();
959
960        // Unblock the request and see that we eventually receive the data.
961        provider.unblock().await;
962        let leaf = data_source
963            .get_leaf(test_leaf.height() as usize)
964            .await
965            .await;
966        let block = data_source
967            .get_block(test_block.height() as usize)
968            .await
969            .await;
970        let payload = data_source
971            .get_payload(test_payload.height() as usize)
972            .await
973            .await;
974        let common = data_source
975            .get_vid_common(test_common.height() as usize)
976            .await
977            .await;
978        {
979            // Verify the data.
980            let truth = network.data_source();
981            assert_eq!(
982                leaf,
983                truth.get_leaf(test_leaf.height() as usize).await.await
984            );
985            assert_eq!(
986                block,
987                truth.get_block(test_block.height() as usize).await.await
988            );
989            assert_eq!(
990                payload,
991                truth
992                    .get_payload(test_payload.height() as usize)
993                    .await
994                    .await
995            );
996            assert_eq!(
997                common,
998                truth
999                    .get_vid_common(test_common.height() as usize)
1000                    .await
1001                    .await
1002            );
1003        }
1004
1005        // Fetching the block and payload should have also fetched the corresponding leaves, since
1006        // we have an invariant that we should not store a block in the database without its
1007        // corresponding leaf and header. Thus we should be able to get the leaves even if the
1008        // provider is blocked.
1009        provider.block().await;
1010        for leaf in [test_block, test_payload] {
1011            tracing::info!("fetching existing leaf {}", leaf.height());
1012            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
1013            assert_eq!(*leaf, fetched_leaf);
1014        }
1015
1016        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
1017        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
1018        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
1019        // with this hash exists, and trigger a fetch for it.
1020        provider.unblock().await;
1021        {
1022            let block = data_source.get_block(test_leaf.block_hash()).await.await;
1023            assert_eq!(block.hash(), leaf.block_hash());
1024        }
1025
1026        // Test a similar scenario, but with payload instead of block: we are aware of
1027        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
1028        // hash.
1029        {
1030            let leaf = leaves.last().unwrap();
1031            let payload = data_source.get_payload(leaf.block_hash()).await.await;
1032            assert_eq!(payload.height(), leaf.height());
1033            assert_eq!(payload.block_hash(), leaf.block_hash());
1034            assert_eq!(payload.hash(), leaf.payload_hash());
1035        }
1036
1037        // Add more debug logs throughout the test
1038        tracing::info!("Test completed successfully!");
1039    }
1040
1041    #[tokio::test(flavor = "multi_thread")]
1042    async fn test_fetch_block_and_leaf_concurrently() {
1043        setup_test();
1044
1045        // Create the consensus network.
1046        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1047
1048        // Start a web server that the non-consensus node can use to fetch blocks.
1049        let port = pick_unused_port().unwrap();
1050        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1051        app.register_module(
1052            "availability",
1053            define_api(
1054                &Default::default(),
1055                MockBase::instance(),
1056                "1.0.0".parse().unwrap(),
1057            )
1058            .unwrap(),
1059        )
1060        .unwrap();
1061        network.spawn(
1062            "server",
1063            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1064        );
1065
1066        // Start a data source which is not receiving events from consensus, only from a peer.
1067        let db = TmpDb::init().await;
1068        let provider = Provider::new(QueryServiceProvider::new(
1069            format!("http://localhost:{port}").parse().unwrap(),
1070            MockBase::instance(),
1071        ));
1072        let data_source = data_source(&db, &provider).await;
1073
1074        // Start consensus.
1075        network.start().await;
1076
1077        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1078        // block at the end, and then one block that we can use to test fetching.
1079        let leaves = network.data_source().subscribe_leaves(1).await;
1080        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1081        let test_leaf = &leaves[0];
1082
1083        // Tell the node about a leaf after the one of interest so it learns about the block height.
1084        data_source.append(leaves[1].clone().into()).await.unwrap();
1085
1086        // Fetch a leaf and the corresponding block at the same time. This will result in two tasks
1087        // trying to fetch the same leaf, but one should win and notify the other, which ultimately
1088        // ends up not fetching anything.
1089        let (leaf, block) = join(
1090            data_source
1091                .get_leaf(test_leaf.height() as usize)
1092                .await
1093                .into_future(),
1094            data_source
1095                .get_block(test_leaf.height() as usize)
1096                .await
1097                .into_future(),
1098        )
1099        .await;
1100        assert_eq!(leaf, *test_leaf);
1101        assert_eq!(leaf.header(), block.header());
1102    }
1103
1104    #[tokio::test(flavor = "multi_thread")]
1105    async fn test_fetch_different_blocks_same_payload() {
1106        setup_test();
1107
1108        // Create the consensus network.
1109        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1110
1111        // Start a web server that the non-consensus node can use to fetch blocks.
1112        let port = pick_unused_port().unwrap();
1113        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1114        app.register_module(
1115            "availability",
1116            define_api(
1117                &Default::default(),
1118                MockBase::instance(),
1119                "1.0.0".parse().unwrap(),
1120            )
1121            .unwrap(),
1122        )
1123        .unwrap();
1124        network.spawn(
1125            "server",
1126            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1127        );
1128
1129        // Start a data source which is not receiving events from consensus, only from a peer.
1130        let db = TmpDb::init().await;
1131        let provider = Provider::new(QueryServiceProvider::new(
1132            format!("http://localhost:{port}").parse().unwrap(),
1133            MockBase::instance(),
1134        ));
1135        let data_source = data_source(&db, &provider).await;
1136
1137        // Start consensus.
1138        network.start().await;
1139
1140        // Wait until the block height reaches 4. This gives us the genesis block, one additional
1141        // block at the end, and then two blocks that we can use to test fetching.
1142        let leaves = network.data_source().subscribe_leaves(1).await;
1143        let leaves = leaves.take(4).collect::<Vec<_>>().await;
1144
1145        // Tell the node about a leaf after the range of interest so it learns about the block
1146        // height.
1147        data_source
1148            .append(leaves.last().cloned().unwrap().into())
1149            .await
1150            .unwrap();
1151
1152        // All the blocks here are empty, so they have the same payload:
1153        assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1154        // If we fetch both blocks at the same time, we can check that we haven't broken anything
1155        // with whatever optimizations we add to deduplicate payload fetching.
1156        let (block1, block2) = join(
1157            data_source
1158                .get_block(leaves[0].height() as usize)
1159                .await
1160                .into_future(),
1161            data_source
1162                .get_block(leaves[1].height() as usize)
1163                .await
1164                .into_future(),
1165        )
1166        .await;
1167        assert_eq!(block1.header(), leaves[0].header());
1168        assert_eq!(block2.header(), leaves[1].header());
1169    }
1170
1171    #[tokio::test(flavor = "multi_thread")]
1172    async fn test_fetch_stream() {
1173        setup_test();
1174
1175        // Create the consensus network.
1176        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1177
1178        // Start a web server that the non-consensus node can use to fetch blocks.
1179        let port = pick_unused_port().unwrap();
1180        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1181        app.register_module(
1182            "availability",
1183            define_api(
1184                &Default::default(),
1185                MockBase::instance(),
1186                "1.0.0".parse().unwrap(),
1187            )
1188            .unwrap(),
1189        )
1190        .unwrap();
1191        network.spawn(
1192            "server",
1193            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1194        );
1195
1196        // Start a data source which is not receiving events from consensus, only from a peer.
1197        let db = TmpDb::init().await;
1198        let provider = Provider::new(QueryServiceProvider::new(
1199            format!("http://localhost:{port}").parse().unwrap(),
1200            MockBase::instance(),
1201        ));
1202        let data_source = data_source(&db, &provider).await;
1203
1204        // Start consensus.
1205        network.start().await;
1206
1207        // Subscribe to objects from the future.
1208        let blocks = data_source.subscribe_blocks(0).await;
1209        let leaves = data_source.subscribe_leaves(0).await;
1210        let common = data_source.subscribe_vid_common(0).await;
1211
1212        // Wait for a few blocks to be finalized.
1213        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1214        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1215
1216        // Tell the node about a leaf after the range of interest so it learns about the block
1217        // height.
1218        data_source
1219            .append(finalized_leaves.last().cloned().unwrap().into())
1220            .await
1221            .unwrap();
1222
1223        // Check the subscriptions.
1224        let blocks = blocks.take(5).collect::<Vec<_>>().await;
1225        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1226        let common = common.take(5).collect::<Vec<_>>().await;
1227        for i in 0..5 {
1228            tracing::info!("checking block {i}");
1229            assert_eq!(leaves[i], finalized_leaves[i]);
1230            assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1231            assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1232        }
1233    }
1234
1235    #[tokio::test(flavor = "multi_thread")]
1236    async fn test_fetch_range_start() {
1237        setup_test();
1238
1239        // Create the consensus network.
1240        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1241
1242        // Start a web server that the non-consensus node can use to fetch blocks.
1243        let port = pick_unused_port().unwrap();
1244        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1245        app.register_module(
1246            "availability",
1247            define_api(
1248                &Default::default(),
1249                MockBase::instance(),
1250                "1.0.0".parse().unwrap(),
1251            )
1252            .unwrap(),
1253        )
1254        .unwrap();
1255        network.spawn(
1256            "server",
1257            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1258        );
1259
1260        // Start a data source which is not receiving events from consensus, only from a peer.
1261        let db = TmpDb::init().await;
1262        let provider = Provider::new(QueryServiceProvider::new(
1263            format!("http://localhost:{port}").parse().unwrap(),
1264            MockBase::instance(),
1265        ));
1266        let data_source = data_source(&db, &provider).await;
1267
1268        // Start consensus.
1269        network.start().await;
1270
1271        // Wait for a few blocks to be finalized.
1272        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1273        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1274
1275        // Tell the node about a leaf after the range of interest (so it learns about the block
1276        // height) and one in the middle of the range, to test what happens when data is missing
1277        // from the beginning of the range but other data is available.
1278        let mut tx = data_source.write().await.unwrap();
1279        tx.insert_leaf(finalized_leaves[2].clone()).await.unwrap();
1280        tx.insert_leaf(finalized_leaves[4].clone()).await.unwrap();
1281        tx.commit().await.unwrap();
1282
1283        // Get the whole range of leaves.
1284        let leaves = data_source
1285            .get_leaf_range(..5)
1286            .await
1287            .then(Fetch::resolve)
1288            .collect::<Vec<_>>()
1289            .await;
1290        for i in 0..5 {
1291            tracing::info!("checking leaf {i}");
1292            assert_eq!(leaves[i], finalized_leaves[i]);
1293        }
1294    }
1295
1296    #[tokio::test(flavor = "multi_thread")]
1297    async fn fetch_transaction() {
1298        setup_test();
1299
1300        // Create the consensus network.
1301        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1302
1303        // Start a web server that the non-consensus node can use to fetch blocks.
1304        let port = pick_unused_port().unwrap();
1305        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1306        app.register_module(
1307            "availability",
1308            define_api(
1309                &Default::default(),
1310                MockBase::instance(),
1311                "1.0.0".parse().unwrap(),
1312            )
1313            .unwrap(),
1314        )
1315        .unwrap();
1316        network.spawn(
1317            "server",
1318            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1319        );
1320
1321        // Start a data source which is not receiving events from consensus. We don't give it a
1322        // fetcher since transactions are always fetched passively anyways.
1323        let db = TmpDb::init().await;
1324        let data_source = data_source(&db, &NoFetching).await;
1325
1326        // Subscribe to blocks.
1327        let mut leaves = network.data_source().subscribe_leaves(1).await;
1328        let mut blocks = network.data_source().subscribe_blocks(1).await;
1329
1330        // Start consensus.
1331        network.start().await;
1332
1333        // Subscribe to a transaction which hasn't been sequenced yet. This is completely passive
1334        // and works without a fetcher; we don't trigger fetches for transactions that we don't know
1335        // exist.
1336        let tx = mock_transaction(vec![1, 2, 3]);
1337        let fut = data_source.get_transaction(tx.commit()).await;
1338
1339        // Sequence the transaction.
1340        network.submit_transaction(tx.clone()).await;
1341
1342        // Send blocks to the query service, the future will resolve as soon as it sees a block
1343        // containing the transaction.
1344        let block = loop {
1345            let leaf = leaves.next().await.unwrap();
1346            let block = blocks.next().await.unwrap();
1347
1348            data_source
1349                .append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1350                .await
1351                .unwrap();
1352
1353            if block.transaction_by_hash(tx.commit()).is_some() {
1354                break block;
1355            }
1356        };
1357        tracing::info!("transaction included in block {}", block.height());
1358
1359        let fetched_tx = fut.await;
1360        assert_eq!(
1361            fetched_tx,
1362            TransactionQueryData::with_hash(&block, tx.commit()).unwrap()
1363        );
1364
1365        // Future queries for this transaction resolve immediately.
1366        assert_eq!(
1367            fetched_tx,
1368            data_source.get_transaction(tx.commit()).await.await
1369        );
1370    }
1371
1372    #[tokio::test(flavor = "multi_thread")]
1373    async fn test_retry() {
1374        setup_test();
1375
1376        // Create the consensus network.
1377        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1378
1379        // Start a web server that the non-consensus node can use to fetch blocks.
1380        let port = pick_unused_port().unwrap();
1381        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1382        app.register_module(
1383            "availability",
1384            define_api(
1385                &Default::default(),
1386                MockBase::instance(),
1387                "1.0.0".parse().unwrap(),
1388            )
1389            .unwrap(),
1390        )
1391        .unwrap();
1392        network.spawn(
1393            "server",
1394            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1395        );
1396
1397        // Start a data source which is not receiving events from consensus.
1398        let db = TmpDb::init().await;
1399        let provider = Provider::new(QueryServiceProvider::new(
1400            format!("http://localhost:{port}").parse().unwrap(),
1401            MockBase::instance(),
1402        ));
1403        let data_source = builder(&db, &provider)
1404            .await
1405            .with_max_retry_interval(Duration::from_secs(1))
1406            .build()
1407            .await
1408            .unwrap();
1409
1410        // Start consensus.
1411        network.start().await;
1412
1413        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1414        // block at the end, and one block to try fetching.
1415        let leaves = network.data_source().subscribe_leaves(1).await;
1416        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1417        let test_leaf = &leaves[0];
1418
1419        // Cause requests to fail temporarily, so we can test retries.
1420        provider.fail();
1421
1422        // Give the node a leaf after the range of interest so it learns about the correct block
1423        // height.
1424        data_source
1425            .append(leaves.last().cloned().unwrap().into())
1426            .await
1427            .unwrap();
1428
1429        tracing::info!("requesting leaf from failing providers");
1430        let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1431
1432        // Wait a few retries and check that the request has not completed, since the provider is
1433        // failing.
1434        sleep(Duration::from_secs(5)).await;
1435        fut.try_resolve().unwrap_err();
1436
1437        // As soon as the provider recovers, the request can complete.
1438        provider.unfail();
1439        assert_eq!(
1440            data_source
1441                .get_leaf(test_leaf.height() as usize)
1442                .await
1443                .await,
1444            *test_leaf
1445        );
1446    }
1447
1448    fn random_vid_commit() -> VidCommitment {
1449        let mut bytes = [0; 32];
1450        rand::thread_rng().fill_bytes(&mut bytes);
1451        VidCommitment::V0(GenericArray::from(bytes).into())
1452    }
1453
1454    async fn malicious_server(port: u16) {
1455        let mut api = load_api::<(), ServerError, MockBase>(
1456            None::<std::path::PathBuf>,
1457            include_str!("../../../api/availability.toml"),
1458            vec![],
1459        )
1460        .unwrap();
1461
1462        api.get("get_payload", move |_, _| {
1463            async move {
1464                // No matter what data we are asked for, always respond with dummy data.
1465                Ok(PayloadQueryData::<MockTypes>::genesis::<TestVersions>(
1466                    &Default::default(),
1467                    &Default::default(),
1468                )
1469                .await)
1470            }
1471            .boxed()
1472        })
1473        .unwrap()
1474        .get("get_vid_common", move |_, _| {
1475            async move {
1476                // No matter what data we are asked for, always respond with dummy data.
1477                Ok(VidCommonQueryData::<MockTypes>::genesis::<TestVersions>(
1478                    &Default::default(),
1479                    &Default::default(),
1480                )
1481                .await)
1482            }
1483            .boxed()
1484        })
1485        .unwrap();
1486
1487        let mut app = App::<(), ServerError>::with_state(());
1488        app.register_module("availability", api).unwrap();
1489        app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1490            .await
1491            .ok();
1492    }
1493
1494    #[tokio::test(flavor = "multi_thread")]
1495    async fn test_fetch_from_malicious_server() {
1496        setup_test();
1497
1498        let port = pick_unused_port().unwrap();
1499        let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1500
1501        let provider = QueryServiceProvider::new(
1502            format!("http://localhost:{port}").parse().unwrap(),
1503            MockBase::instance(),
1504        );
1505        provider.client.connect(None).await;
1506
1507        // Query for a random payload, the server will respond with a different payload, and we
1508        // should detect the error.
1509        let res =
1510            ProviderTrait::<MockTypes, _>::fetch(&provider, PayloadRequest(random_vid_commit()))
1511                .await;
1512        assert_eq!(res, None);
1513
1514        // Query for a random VID common, the server will respond with a different one, and we
1515        // should detect the error.
1516        let res =
1517            ProviderTrait::<MockTypes, _>::fetch(&provider, VidCommonRequest(random_vid_commit()))
1518                .await;
1519        assert_eq!(res, None);
1520    }
1521
1522    #[tokio::test(flavor = "multi_thread")]
1523    async fn test_archive_recovery() {
1524        setup_test();
1525
1526        // Create the consensus network.
1527        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1528
1529        // Start a web server that the non-consensus node can use to fetch blocks.
1530        let port = pick_unused_port().unwrap();
1531        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1532        app.register_module(
1533            "availability",
1534            define_api(
1535                &Default::default(),
1536                MockBase::instance(),
1537                "1.0.0".parse().unwrap(),
1538            )
1539            .unwrap(),
1540        )
1541        .unwrap();
1542        network.spawn(
1543            "server",
1544            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1545        );
1546
1547        // Start a data source which is not receiving events from consensus, only from a peer. The
1548        // data source is at first configured to aggressively prune data.
1549        let db = TmpDb::init().await;
1550        let provider = Provider::new(QueryServiceProvider::new(
1551            format!("http://localhost:{port}").parse().unwrap(),
1552            MockBase::instance(),
1553        ));
1554        let mut data_source = db
1555            .config()
1556            .pruner_cfg(
1557                PrunerCfg::new()
1558                    .with_target_retention(Duration::from_secs(0))
1559                    .with_interval(Duration::from_secs(5)),
1560            )
1561            .unwrap()
1562            .builder(provider.clone())
1563            .await
1564            .unwrap()
1565            // Set a fast retry for failed operations. Occasionally storage operations will fail due
1566            // to conflicting write-mode transactions running concurrently. This is ok as they will
1567            // be retried. Having a fast retry interval speeds up the test.
1568            .with_min_retry_interval(Duration::from_millis(100))
1569            // Randomize retries a lot. This will temporarlly separate competing transactions write
1570            // transactions with high probability, so that one of them quickly gets exclusive access
1571            // to the database.
1572            .with_retry_randomization_factor(3.)
1573            .build()
1574            .await
1575            .unwrap();
1576
1577        // Start consensus.
1578        network.start().await;
1579
1580        // Wait until a few blocks are produced.
1581        let leaves = network.data_source().subscribe_leaves(1).await;
1582        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1583
1584        // The disconnected data source has no data yet, so it hasn't done any pruning.
1585        let pruned_height = data_source
1586            .read()
1587            .await
1588            .unwrap()
1589            .load_pruned_height()
1590            .await
1591            .unwrap();
1592        // Either None or 0 is acceptable, depending on whether or not the prover has run yet.
1593        assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1594
1595        // Send the last leaf to the disconnected data source so it learns about the height and
1596        // fetches the missing data.
1597        let last_leaf = leaves.last().unwrap();
1598        data_source.append(last_leaf.clone().into()).await.unwrap();
1599
1600        // Trigger a fetch of each leaf so the database gets populated.
1601        for i in 1..=last_leaf.height() {
1602            tracing::info!(i, "fetching leaf");
1603            assert_eq!(
1604                data_source.get_leaf(i as usize).await.await,
1605                leaves[i as usize - 1]
1606            );
1607        }
1608
1609        // After a bit of time, the pruner has run and deleted all the missing data we just fetched.
1610        loop {
1611            let pruned_height = data_source
1612                .read()
1613                .await
1614                .unwrap()
1615                .load_pruned_height()
1616                .await
1617                .unwrap();
1618            if pruned_height == Some(last_leaf.height()) {
1619                break;
1620            }
1621            tracing::info!(
1622                ?pruned_height,
1623                target_height = last_leaf.height(),
1624                "waiting for pruner to run"
1625            );
1626            sleep(Duration::from_secs(1)).await;
1627        }
1628
1629        // Now close the data source and restart it with archive recovery.
1630        data_source = db
1631            .config()
1632            .archive()
1633            .builder(provider.clone())
1634            .await
1635            .unwrap()
1636            .with_minor_scan_interval(Duration::from_secs(1))
1637            .with_major_scan_interval(1)
1638            .build()
1639            .await
1640            .unwrap();
1641
1642        // Pruned height should be reset.
1643        let pruned_height = data_source
1644            .read()
1645            .await
1646            .unwrap()
1647            .load_pruned_height()
1648            .await
1649            .unwrap();
1650        assert_eq!(pruned_height, None);
1651
1652        // The node has pruned all of it's data including the latest block, so it's forgotten the
1653        // block height. We need to give it another leaf with some height so it will be willing to
1654        // fetch.
1655        data_source.append(last_leaf.clone().into()).await.unwrap();
1656
1657        // Wait for the data to be restored. It should be restored by the next major scan.
1658        loop {
1659            let sync_status = data_source.sync_status().await.unwrap();
1660
1661            // VID shares are unique to a node and will never be fetched from a peer; this is
1662            // acceptable since there is redundancy built into the VID scheme. Ignore missing VID
1663            // shares in the `is_fully_synced` check.
1664            if (SyncStatus {
1665                missing_vid_shares: 0,
1666                ..sync_status
1667            })
1668            .is_fully_synced()
1669            {
1670                break;
1671            }
1672            tracing::info!(?sync_status, "waiting for node to sync");
1673            sleep(Duration::from_secs(1)).await;
1674        }
1675
1676        // The node remains fully synced even after some time; no pruning.
1677        sleep(Duration::from_secs(3)).await;
1678        let sync_status = data_source.sync_status().await.unwrap();
1679        assert!(
1680            (SyncStatus {
1681                missing_vid_shares: 0,
1682                ..sync_status
1683            })
1684            .is_fully_synced(),
1685            "{sync_status:?}"
1686        );
1687    }
1688
1689    #[derive(Clone, Copy, Debug)]
1690    enum FailureType {
1691        Begin,
1692        Write,
1693        Commit,
1694    }
1695
1696    async fn test_fetch_storage_failure_helper(failure: FailureType) {
1697        setup_test();
1698
1699        // Create the consensus network.
1700        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1701
1702        // Start a web server that the non-consensus node can use to fetch blocks.
1703        let port = pick_unused_port().unwrap();
1704        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1705        app.register_module(
1706            "availability",
1707            define_api(
1708                &Default::default(),
1709                MockBase::instance(),
1710                "1.0.0".parse().unwrap(),
1711            )
1712            .unwrap(),
1713        )
1714        .unwrap();
1715        network.spawn(
1716            "server",
1717            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1718        );
1719
1720        // Start a data source which is not receiving events from consensus, only from a peer.
1721        let provider = Provider::new(QueryServiceProvider::new(
1722            format!("http://localhost:{port}").parse().unwrap(),
1723            MockBase::instance(),
1724        ));
1725        let db = TmpDb::init().await;
1726        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1727        let data_source = FetchingDataSource::builder(storage, provider)
1728            .disable_proactive_fetching()
1729            .disable_aggregator()
1730            .with_max_retry_interval(Duration::from_millis(100))
1731            .with_retry_timeout(Duration::from_secs(1))
1732            .build()
1733            .await
1734            .unwrap();
1735
1736        // Start consensus.
1737        network.start().await;
1738
1739        // Wait until a couple of blocks are produced.
1740        let leaves = network.data_source().subscribe_leaves(1).await;
1741        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1742
1743        // Send the last leaf to the disconnected data source so it learns about the height.
1744        let last_leaf = leaves.last().unwrap();
1745        let mut tx = data_source.write().await.unwrap();
1746        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1747        tx.commit().await.unwrap();
1748
1749        // Trigger a fetch of the first leaf; it should resolve even if we fail to store the leaf.
1750        tracing::info!("fetch with write failure");
1751        match failure {
1752            FailureType::Begin => {
1753                data_source
1754                    .as_ref()
1755                    .fail_begins_writable(FailableAction::Any)
1756                    .await
1757            },
1758            FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1759            FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1760        }
1761        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1762        data_source.as_ref().pass().await;
1763
1764        // It is possible that the fetch above completes, notifies the subscriber,
1765        // and the fetch below quickly subscribes and gets notified by the same loop.
1766        // We add a delay here to avoid this situation.
1767        // This is not a bug, as being notified after subscribing is fine.
1768        sleep(Duration::from_secs(1)).await;
1769
1770        // We can get the same leaf again, this will again trigger an active fetch since storage
1771        // failed the first time.
1772        tracing::info!("fetch with write success");
1773        let fetch = data_source.get_leaf(1).await;
1774        assert!(fetch.is_pending());
1775        assert_eq!(leaves[0], fetch.await);
1776
1777        sleep(Duration::from_secs(1)).await;
1778
1779        // Finally, we should have the leaf locally and not need to fetch it.
1780        tracing::info!("retrieve from storage");
1781        let fetch = data_source.get_leaf(1).await;
1782        assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1783    }
1784
1785    #[tokio::test(flavor = "multi_thread")]
1786    async fn test_fetch_storage_failure_on_begin() {
1787        test_fetch_storage_failure_helper(FailureType::Begin).await;
1788    }
1789
1790    #[tokio::test(flavor = "multi_thread")]
1791    async fn test_fetch_storage_failure_on_write() {
1792        test_fetch_storage_failure_helper(FailureType::Write).await;
1793    }
1794
1795    #[tokio::test(flavor = "multi_thread")]
1796    async fn test_fetch_storage_failure_on_commit() {
1797        test_fetch_storage_failure_helper(FailureType::Commit).await;
1798    }
1799
1800    async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1801        setup_test();
1802
1803        // Create the consensus network.
1804        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1805
1806        // Start a web server that the non-consensus node can use to fetch blocks.
1807        let port = pick_unused_port().unwrap();
1808        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1809        app.register_module(
1810            "availability",
1811            define_api(
1812                &Default::default(),
1813                MockBase::instance(),
1814                "1.0.0".parse().unwrap(),
1815            )
1816            .unwrap(),
1817        )
1818        .unwrap();
1819        network.spawn(
1820            "server",
1821            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1822        );
1823
1824        // Start a data source which is not receiving events from consensus, only from a peer.
1825        let provider = Provider::new(QueryServiceProvider::new(
1826            format!("http://localhost:{port}").parse().unwrap(),
1827            MockBase::instance(),
1828        ));
1829        let db = TmpDb::init().await;
1830        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1831        let data_source = FetchingDataSource::builder(storage, provider)
1832            .disable_proactive_fetching()
1833            .disable_aggregator()
1834            .with_min_retry_interval(Duration::from_millis(100))
1835            .build()
1836            .await
1837            .unwrap();
1838
1839        // Start consensus.
1840        network.start().await;
1841
1842        // Wait until a couple of blocks are produced.
1843        let leaves = network.data_source().subscribe_leaves(1).await;
1844        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1845
1846        // Send the last leaf to the disconnected data source so it learns about the height.
1847        let last_leaf = leaves.last().unwrap();
1848        let mut tx = data_source.write().await.unwrap();
1849        tx.insert_leaf(last_leaf.clone()).await.unwrap();
1850        tx.commit().await.unwrap();
1851
1852        // Trigger a fetch of the first leaf; it should retry until it successfully stores the leaf.
1853        tracing::info!("fetch with write failure");
1854        match failure {
1855            FailureType::Begin => {
1856                data_source
1857                    .as_ref()
1858                    .fail_one_begin_writable(FailableAction::Any)
1859                    .await
1860            },
1861            FailureType::Write => {
1862                data_source
1863                    .as_ref()
1864                    .fail_one_write(FailableAction::Any)
1865                    .await
1866            },
1867            FailureType::Commit => {
1868                data_source
1869                    .as_ref()
1870                    .fail_one_commit(FailableAction::Any)
1871                    .await
1872            },
1873        }
1874        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1875
1876        // Check that the leaf ended up in local storage.
1877        let mut tx = data_source.read().await.unwrap();
1878        assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1879    }
1880
1881    #[tokio::test(flavor = "multi_thread")]
1882    async fn test_fetch_storage_failure_retry_on_begin() {
1883        test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1884    }
1885
1886    #[tokio::test(flavor = "multi_thread")]
1887    async fn test_fetch_storage_failure_retry_on_write() {
1888        test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1889    }
1890
1891    #[tokio::test(flavor = "multi_thread")]
1892    async fn test_fetch_storage_failure_retry_on_commit() {
1893        test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1894    }
1895
1896    #[tokio::test(flavor = "multi_thread")]
1897    async fn test_fetch_on_decide() {
1898        setup_test();
1899
1900        // Create the consensus network.
1901        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1902
1903        // Start a web server that the non-consensus node can use to fetch blocks.
1904        let port = pick_unused_port().unwrap();
1905        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1906        app.register_module(
1907            "availability",
1908            define_api(
1909                &Default::default(),
1910                MockBase::instance(),
1911                "1.0.0".parse().unwrap(),
1912            )
1913            .unwrap(),
1914        )
1915        .unwrap();
1916        network.spawn(
1917            "server",
1918            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1919        );
1920
1921        // Start a data source which is not receiving events from consensus.
1922        let db = TmpDb::init().await;
1923        let provider = Provider::new(QueryServiceProvider::new(
1924            format!("http://localhost:{port}").parse().unwrap(),
1925            MockBase::instance(),
1926        ));
1927        let data_source = builder(&db, &provider)
1928            .await
1929            .with_max_retry_interval(Duration::from_secs(1))
1930            .build()
1931            .await
1932            .unwrap();
1933
1934        // Start consensus.
1935        network.start().await;
1936
1937        // Wait until a block has been decided.
1938        let leaf = network
1939            .data_source()
1940            .subscribe_leaves(1)
1941            .await
1942            .next()
1943            .await
1944            .unwrap();
1945
1946        // Give the node a decide containing the leaf but no additional information.
1947        data_source.append(leaf.clone().into()).await.unwrap();
1948
1949        // We will eventually retrieve the corresponding block and VID common, triggered by seeing
1950        // the leaf.
1951        sleep(Duration::from_secs(5)).await;
1952
1953        // Read the missing data directly from storage (via a database transaction), rather than
1954        // going through the data source, so that this request itself does not trigger a fetch.
1955        // Thus, this will only work if the data was already fetched, triggered by the leaf.
1956        let mut tx = data_source.read().await.unwrap();
1957        let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1958        let block = tx.get_block(id).await.unwrap();
1959        let vid = tx.get_vid_common(id).await.unwrap();
1960
1961        assert_eq!(block.hash(), leaf.block_hash());
1962        assert_eq!(vid.block_hash(), leaf.block_hash());
1963    }
1964
1965    #[tokio::test(flavor = "multi_thread")]
1966    async fn test_fetch_begin_failure() {
1967        setup_test();
1968
1969        // Create the consensus network.
1970        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
1971
1972        // Start a web server that the non-consensus node can use to fetch blocks.
1973        let port = pick_unused_port().unwrap();
1974        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1975        app.register_module(
1976            "availability",
1977            define_api(
1978                &Default::default(),
1979                MockBase::instance(),
1980                "1.0.0".parse().unwrap(),
1981            )
1982            .unwrap(),
1983        )
1984        .unwrap();
1985        network.spawn(
1986            "server",
1987            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1988        );
1989
1990        // Start a data source which is not receiving events from consensus, only from a peer.
1991        let provider = Provider::new(QueryServiceProvider::new(
1992            format!("http://localhost:{port}").parse().unwrap(),
1993            MockBase::instance(),
1994        ));
1995        let db = TmpDb::init().await;
1996        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
1997        let data_source = FetchingDataSource::builder(storage, provider)
1998            .disable_proactive_fetching()
1999            .disable_aggregator()
2000            .with_min_retry_interval(Duration::from_millis(100))
2001            .build()
2002            .await
2003            .unwrap();
2004
2005        // Start consensus.
2006        network.start().await;
2007
2008        // Wait until a couple of blocks are produced.
2009        let leaves = network.data_source().subscribe_leaves(1).await;
2010        let leaves = leaves.take(2).collect::<Vec<_>>().await;
2011
2012        // Send the last leaf to the disconnected data source so it learns about the height.
2013        let last_leaf = leaves.last().unwrap();
2014        let mut tx = data_source.write().await.unwrap();
2015        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2016        tx.commit().await.unwrap();
2017
2018        // Trigger a fetch of the first leaf; it should retry until it is able to determine
2019        // the leaf is fetchable and trigger the fetch.
2020        tracing::info!("fetch with transaction failure");
2021        data_source
2022            .as_ref()
2023            .fail_one_begin_read_only(FailableAction::Any)
2024            .await;
2025        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2026    }
2027
2028    #[tokio::test(flavor = "multi_thread")]
2029    async fn test_fetch_load_failure_block() {
2030        setup_test();
2031
2032        // Create the consensus network.
2033        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2034
2035        // Start a web server that the non-consensus node can use to fetch blocks.
2036        let port = pick_unused_port().unwrap();
2037        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2038        app.register_module(
2039            "availability",
2040            define_api(
2041                &Default::default(),
2042                MockBase::instance(),
2043                "1.0.0".parse().unwrap(),
2044            )
2045            .unwrap(),
2046        )
2047        .unwrap();
2048        network.spawn(
2049            "server",
2050            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2051        );
2052
2053        // Start a data source which is not receiving events from consensus, only from a peer.
2054        let provider = Provider::new(QueryServiceProvider::new(
2055            format!("http://localhost:{port}").parse().unwrap(),
2056            MockBase::instance(),
2057        ));
2058        let db = TmpDb::init().await;
2059        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2060        let data_source = FetchingDataSource::builder(storage, provider)
2061            .disable_proactive_fetching()
2062            .disable_aggregator()
2063            .with_min_retry_interval(Duration::from_millis(100))
2064            .build()
2065            .await
2066            .unwrap();
2067
2068        // Start consensus.
2069        network.start().await;
2070
2071        // Wait until a block is produced.
2072        let mut leaves = network.data_source().subscribe_leaves(1).await;
2073        let leaf = leaves.next().await.unwrap();
2074
2075        // Send the leaf to the disconnected data source, so the corresponding block becomes
2076        // fetchable.
2077        let mut tx = data_source.write().await.unwrap();
2078        tx.insert_leaf(leaf.clone()).await.unwrap();
2079        tx.commit().await.unwrap();
2080
2081        // Trigger a fetch of the block by hash; it should retry until it is able to determine the
2082        // leaf is available, thus the block is fetchable, trigger the fetch.
2083        //
2084        // Failing only on the `get_header` call here hits an edge case which is only possible when
2085        // fetching blocks: we successfully determine that the object is not available locally and
2086        // that it might exist, so we actually call `active_fetch` to try and get it. If we then
2087        // fail to load the header and erroneously treat this as the header not being available, we
2088        // will give up and consider the object unfetchable (since the next step would be to fetch
2089        // the corresponding leaf, but we cannot do this with just a block hash).
2090        //
2091        // Thus, this test will only pass if we correctly retry the `active_fetch` until we are
2092        // successfully able to load the header from storage and determine that the block is
2093        // fetchable.
2094        tracing::info!("fetch with read failure");
2095        data_source
2096            .as_ref()
2097            .fail_one_read(FailableAction::GetHeader)
2098            .await;
2099        let fetch = data_source.get_block(leaf.block_hash()).await;
2100
2101        // Give some time for a few reads to fail before letting them succeed.
2102        sleep(Duration::from_secs(2)).await;
2103        data_source.as_ref().pass().await;
2104
2105        let block: BlockQueryData<MockTypes> = fetch.await;
2106        assert_eq!(block.hash(), leaf.block_hash());
2107    }
2108
2109    #[tokio::test(flavor = "multi_thread")]
2110    async fn test_fetch_load_failure_tx() {
2111        setup_test();
2112
2113        // Create the consensus network.
2114        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2115
2116        // Start a web server that the non-consensus node can use to fetch blocks.
2117        let port = pick_unused_port().unwrap();
2118        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2119        app.register_module(
2120            "availability",
2121            define_api(
2122                &Default::default(),
2123                MockBase::instance(),
2124                "1.0.0".parse().unwrap(),
2125            )
2126            .unwrap(),
2127        )
2128        .unwrap();
2129        network.spawn(
2130            "server",
2131            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2132        );
2133
2134        // Start a data source which is not receiving events from consensus, only from a peer.
2135        let provider = Provider::new(QueryServiceProvider::new(
2136            format!("http://localhost:{port}").parse().unwrap(),
2137            MockBase::instance(),
2138        ));
2139        let db = TmpDb::init().await;
2140        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2141        let data_source = FetchingDataSource::builder(storage, provider)
2142            .disable_proactive_fetching()
2143            .disable_aggregator()
2144            .with_min_retry_interval(Duration::from_millis(100))
2145            .build()
2146            .await
2147            .unwrap();
2148
2149        // Start consensus.
2150        network.start().await;
2151
2152        // Wait until a transaction is sequenced.
2153        let tx = mock_transaction(vec![1, 2, 3]);
2154        network.submit_transaction(tx.clone()).await;
2155        let tx = network
2156            .data_source()
2157            .get_transaction(tx.commit())
2158            .await
2159            .await;
2160
2161        // Send the block containing the transaction to the disconnected data source.
2162        {
2163            let leaf = network
2164                .data_source()
2165                .get_leaf(tx.block_height() as usize)
2166                .await
2167                .await;
2168            let block = network
2169                .data_source()
2170                .get_block(tx.block_height() as usize)
2171                .await
2172                .await;
2173            let mut tx = data_source.write().await.unwrap();
2174            tx.insert_leaf(leaf.clone()).await.unwrap();
2175            tx.insert_block(block.clone()).await.unwrap();
2176            tx.commit().await.unwrap();
2177        }
2178
2179        // Check that the transaction is there.
2180        tracing::info!("fetch success");
2181        assert_eq!(tx, data_source.get_transaction(tx.hash()).await.await);
2182
2183        // Fetch the transaction with storage failures.
2184        //
2185        // Failing only one read here hits an edge case that only exists for unfetchable objects
2186        // (e.g. transactions). This will cause the initial aload of the transaction to fail, but,
2187        // if we erroneously treat this load failure as the transaction being missing, we will
2188        // succeed in calling `fetch`, since subsequent loads succeed. However, since a transaction
2189        // is not active-fetchable, no active fetch will actually be spawned, and this fetch will
2190        // never resolve.
2191        //
2192        // Thus, the test should only pass if we correctly retry the initial load until it succeeds
2193        // and we discover that the transaction doesn't need to be fetched at all.
2194        tracing::info!("fetch with read failure");
2195        data_source
2196            .as_ref()
2197            .fail_one_read(FailableAction::Any)
2198            .await;
2199        let fetch = data_source.get_transaction(tx.hash()).await;
2200
2201        assert_eq!(tx, fetch.await);
2202    }
2203
2204    #[tokio::test(flavor = "multi_thread")]
2205    async fn test_stream_begin_failure() {
2206        setup_test();
2207
2208        // Create the consensus network.
2209        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2210
2211        // Start a web server that the non-consensus node can use to fetch blocks.
2212        let port = pick_unused_port().unwrap();
2213        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2214        app.register_module(
2215            "availability",
2216            define_api(
2217                &Default::default(),
2218                MockBase::instance(),
2219                "1.0.0".parse().unwrap(),
2220            )
2221            .unwrap(),
2222        )
2223        .unwrap();
2224        network.spawn(
2225            "server",
2226            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2227        );
2228
2229        // Start a data source which is not receiving events from consensus, only from a peer.
2230        let provider = Provider::new(QueryServiceProvider::new(
2231            format!("http://localhost:{port}").parse().unwrap(),
2232            MockBase::instance(),
2233        ));
2234        let db = TmpDb::init().await;
2235        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2236        let data_source = FetchingDataSource::builder(storage, provider)
2237            .disable_proactive_fetching()
2238            .disable_aggregator()
2239            .with_min_retry_interval(Duration::from_millis(100))
2240            .with_range_chunk_size(3)
2241            .build()
2242            .await
2243            .unwrap();
2244
2245        // Start consensus.
2246        network.start().await;
2247
2248        // Wait until a few blocks are produced.
2249        let leaves = network.data_source().subscribe_leaves(1).await;
2250        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2251
2252        // Send the last leaf to the disconnected data source so it learns about the height.
2253        let last_leaf = leaves.last().unwrap();
2254        let mut tx = data_source.write().await.unwrap();
2255        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2256        tx.commit().await.unwrap();
2257
2258        // Stream the leaves; it should retry until it is able to determine each leaf is fetchable
2259        // and trigger the fetch.
2260        tracing::info!("stream with transaction failure");
2261        data_source
2262            .as_ref()
2263            .fail_one_begin_read_only(FailableAction::Any)
2264            .await;
2265        assert_eq!(
2266            leaves,
2267            data_source
2268                .subscribe_leaves(1)
2269                .await
2270                .take(5)
2271                .collect::<Vec<_>>()
2272                .await
2273        );
2274    }
2275
2276    #[tokio::test(flavor = "multi_thread")]
2277    async fn test_stream_load_failure() {
2278        setup_test();
2279
2280        // Create the consensus network.
2281        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2282
2283        // Start a web server that the non-consensus node can use to fetch blocks.
2284        let port = pick_unused_port().unwrap();
2285        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2286        app.register_module(
2287            "availability",
2288            define_api(
2289                &Default::default(),
2290                MockBase::instance(),
2291                "1.0.0".parse().unwrap(),
2292            )
2293            .unwrap(),
2294        )
2295        .unwrap();
2296        network.spawn(
2297            "server",
2298            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2299        );
2300
2301        // Start a data source which is not receiving events from consensus, only from a peer.
2302        let provider = Provider::new(QueryServiceProvider::new(
2303            format!("http://localhost:{port}").parse().unwrap(),
2304            MockBase::instance(),
2305        ));
2306        let db = TmpDb::init().await;
2307        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2308        let data_source = FetchingDataSource::builder(storage, provider)
2309            .disable_proactive_fetching()
2310            .disable_aggregator()
2311            .with_min_retry_interval(Duration::from_millis(100))
2312            .with_range_chunk_size(3)
2313            .build()
2314            .await
2315            .unwrap();
2316
2317        // Start consensus.
2318        network.start().await;
2319
2320        // Wait until a few blocks are produced.
2321        let leaves = network.data_source().subscribe_leaves(1).await;
2322        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2323
2324        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2325        let last_leaf = leaves.last().unwrap();
2326        let mut tx = data_source.write().await.unwrap();
2327        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2328        tx.commit().await.unwrap();
2329
2330        // Stream the blocks with a period of database failures.
2331        tracing::info!("stream with read failure");
2332        data_source.as_ref().fail_reads(FailableAction::Any).await;
2333        let fetches = data_source
2334            .get_block_range(1..=5)
2335            .await
2336            .collect::<Vec<_>>()
2337            .await;
2338
2339        // Give some time for a few reads to fail before letting them succeed.
2340        sleep(Duration::from_secs(2)).await;
2341        data_source.as_ref().pass().await;
2342
2343        for (leaf, fetch) in leaves.iter().zip(fetches) {
2344            let block: BlockQueryData<MockTypes> = fetch.await;
2345            assert_eq!(block.hash(), leaf.block_hash());
2346        }
2347    }
2348
2349    enum MetadataType {
2350        Payload,
2351        Vid,
2352    }
2353
2354    async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2355        setup_test();
2356
2357        // Create the consensus network.
2358        let mut network = MockNetwork::<MockDataSource, MockVersions>::init().await;
2359
2360        // Start a web server that the non-consensus node can use to fetch blocks.
2361        let port = pick_unused_port().unwrap();
2362        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2363        app.register_module(
2364            "availability",
2365            define_api(
2366                &Default::default(),
2367                MockBase::instance(),
2368                "1.0.0".parse().unwrap(),
2369            )
2370            .unwrap(),
2371        )
2372        .unwrap();
2373        network.spawn(
2374            "server",
2375            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2376        );
2377
2378        // Start a data source which is not receiving events from consensus, only from a peer.
2379        let provider = Provider::new(QueryServiceProvider::new(
2380            format!("http://localhost:{port}").parse().unwrap(),
2381            MockBase::instance(),
2382        ));
2383        let db = TmpDb::init().await;
2384        let storage = FailStorage::from(SqlStorage::connect(db.config()).await.unwrap());
2385        let data_source = FetchingDataSource::builder(storage, provider)
2386            .disable_proactive_fetching()
2387            .disable_aggregator()
2388            .with_min_retry_interval(Duration::from_millis(100))
2389            .with_range_chunk_size(3)
2390            .build()
2391            .await
2392            .unwrap();
2393
2394        // Start consensus.
2395        network.start().await;
2396
2397        // Wait until a few blocks are produced.
2398        let leaves = network.data_source().subscribe_leaves(1).await;
2399        let leaves = leaves.take(3).collect::<Vec<_>>().await;
2400
2401        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2402        let last_leaf = leaves.last().unwrap();
2403        let mut tx = data_source.write().await.unwrap();
2404        tx.insert_leaf(last_leaf.clone()).await.unwrap();
2405        tx.commit().await.unwrap();
2406
2407        // Send the first object to the disconnected data source, so we hit all the cases:
2408        // * leaf present but not full object (from the last leaf)
2409        // * full object present but inaccessible due to storage failures (first object)
2410        // * nothing present (middle object)
2411        let leaf = network.data_source().get_leaf(1).await.await;
2412        let block = network.data_source().get_block(1).await.await;
2413        let vid = network.data_source().get_vid_common(1).await.await;
2414        data_source
2415            .append(BlockInfo::new(leaf, Some(block), Some(vid), None, None))
2416            .await
2417            .unwrap();
2418
2419        // Stream the objects with a period of database failures.
2420        tracing::info!("stream with transaction failure");
2421        data_source
2422            .as_ref()
2423            .fail_begins_read_only(FailableAction::Any)
2424            .await;
2425        match stream {
2426            MetadataType::Payload => {
2427                let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2428
2429                // Give some time for a few reads to fail before letting them succeed.
2430                sleep(Duration::from_secs(2)).await;
2431                tracing::info!("stop failing transactions");
2432                data_source.as_ref().pass().await;
2433
2434                let payloads = payloads.collect::<Vec<_>>().await;
2435                for (leaf, payload) in leaves.iter().zip(payloads) {
2436                    assert_eq!(payload.block_hash, leaf.block_hash());
2437                }
2438            },
2439            MetadataType::Vid => {
2440                let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2441
2442                // Give some time for a few reads to fail before letting them succeed.
2443                sleep(Duration::from_secs(2)).await;
2444                tracing::info!("stop failing transactions");
2445                data_source.as_ref().pass().await;
2446
2447                let vids = vids.collect::<Vec<_>>().await;
2448                for (leaf, vid) in leaves.iter().zip(vids) {
2449                    assert_eq!(vid.block_hash, leaf.block_hash());
2450                }
2451            },
2452        }
2453    }
2454
2455    #[tokio::test(flavor = "multi_thread")]
2456    async fn test_metadata_stream_begin_failure_payload() {
2457        test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2458    }
2459
2460    #[tokio::test(flavor = "multi_thread")]
2461    async fn test_metadata_stream_begin_failure_vid() {
2462        test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2463    }
2464
2465    // This helper function starts up a mock network
2466    // with v0 and v1 availability query modules,
2467    // trigger fetches for a datasource from the provider,
2468    // and asserts that the fetched data is correct
2469    async fn run_fallback_deserialization_test_helper<V: Versions>(port: u16, version: &str) {
2470        let mut network = MockNetwork::<MockDataSource, V>::init().await;
2471
2472        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2473
2474        // Register availability APIs for two versions: v0 and v1
2475        app.register_module(
2476            "availability",
2477            define_api(
2478                &Default::default(),
2479                StaticVersion::<0, 1> {},
2480                "0.0.1".parse().unwrap(),
2481            )
2482            .unwrap(),
2483        )
2484        .unwrap();
2485
2486        app.register_module(
2487            "availability",
2488            define_api(
2489                &Default::default(),
2490                StaticVersion::<0, 1> {},
2491                "1.0.0".parse().unwrap(),
2492            )
2493            .unwrap(),
2494        )
2495        .unwrap();
2496
2497        network.spawn(
2498            "server",
2499            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2500        );
2501
2502        let db = TmpDb::init().await;
2503
2504        let provider_url = format!("http://localhost:{port}/{version}")
2505            .parse()
2506            .expect("Invalid URL");
2507
2508        let provider = Provider::new(QueryServiceProvider::new(
2509            provider_url,
2510            StaticVersion::<0, 1> {},
2511        ));
2512
2513        let ds = data_source(&db, &provider).await;
2514        network.start().await;
2515
2516        let leaves = network.data_source().subscribe_leaves(1).await;
2517        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2518        let test_leaf = &leaves[0];
2519        let test_payload = &leaves[2];
2520        let test_common = &leaves[3];
2521
2522        let mut fetches = vec![];
2523        // Issue requests for missing data (these should initially remain unresolved):
2524        fetches.push(ds.get_leaf(test_leaf.height() as usize).await.map(ignore));
2525        fetches.push(ds.get_payload(test_payload.block_hash()).await.map(ignore));
2526        fetches.push(
2527            ds.get_vid_common(test_common.block_hash())
2528                .await
2529                .map(ignore),
2530        );
2531
2532        // Even if we give data extra time to propagate, these requests will not resolve, since we
2533        // didn't trigger any active fetches.
2534        sleep(Duration::from_secs(1)).await;
2535        for (i, fetch) in fetches.into_iter().enumerate() {
2536            tracing::info!("checking fetch {i} is unresolved");
2537            fetch.try_resolve().unwrap_err();
2538        }
2539
2540        // Append the latest known leaf to the local store
2541        // This would trigger fetches for the corresponding missing data
2542        // such as header, vid and payload
2543        // This would also trigger fetches for the parent data
2544        ds.append(leaves.last().cloned().unwrap().into())
2545            .await
2546            .unwrap();
2547
2548        // check that the data has been fetches and matches the network data source
2549        {
2550            let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2551            let payload = ds.get_payload(test_payload.height() as usize).await;
2552            let common = ds.get_vid_common(test_common.height() as usize).await;
2553
2554            let truth = network.data_source();
2555            assert_eq!(
2556                leaf.await,
2557                truth.get_leaf(test_leaf.height() as usize).await.await
2558            );
2559            assert_eq!(
2560                payload.await,
2561                truth
2562                    .get_payload(test_payload.height() as usize)
2563                    .await
2564                    .await
2565            );
2566            assert_eq!(
2567                common.await,
2568                truth
2569                    .get_vid_common(test_common.height() as usize)
2570                    .await
2571                    .await
2572            );
2573        }
2574    }
2575
2576    #[tokio::test(flavor = "multi_thread")]
2577    async fn test_fallback_deserialization_for_fetch_requests_v0() {
2578        setup_test();
2579
2580        let port = pick_unused_port().unwrap();
2581
2582        // This run will call v0 availalbilty api for fetch requests.
2583        // The fetch initially attempts deserialization with new types,
2584        // which fails because the v0 provider returns legacy types.
2585        // It then falls back to deserializing as legacy types,
2586        // and the fetch passes
2587        run_fallback_deserialization_test_helper::<MockVersions>(port, "v0").await;
2588    }
2589
2590    #[tokio::test(flavor = "multi_thread")]
2591    async fn test_fallback_deserialization_for_fetch_requests_v1() {
2592        setup_test();
2593        let port = pick_unused_port().unwrap();
2594
2595        // Fetch from the v1 availability API using MockVersions.
2596        // this one fetches from the v1 provider.
2597        // which would correctly deserialize the bytes in the first attempt, so no fallback deserialization is needed
2598        run_fallback_deserialization_test_helper::<MockVersions>(port, "v1").await;
2599    }
2600
2601    #[tokio::test(flavor = "multi_thread")]
2602    async fn test_fallback_deserialization_for_fetch_requests_pos() {
2603        setup_test();
2604        let port = pick_unused_port().unwrap();
2605
2606        // Fetch Proof of Stake (PoS) data using the v1 availability API
2607        // with proof of stake version
2608        run_fallback_deserialization_test_helper::<EpochsTestVersions>(port, "v1").await;
2609    }
2610    #[tokio::test(flavor = "multi_thread")]
2611    async fn test_fallback_deserialization_for_fetch_requests_v0_pos() {
2612        setup_test();
2613
2614        // Run with the PoS version against a v0 provider.
2615        // Fetch requests are expected to fail because PoS commitments differ from the legacy commitments
2616        // returned by the v0 provider.
2617        // For example: a PoS Leaf2 commitment will not match the downgraded commitment from a legacy Leaf1.
2618
2619        let mut network = MockNetwork::<MockDataSource, EpochsTestVersions>::init().await;
2620
2621        let port = pick_unused_port().unwrap();
2622        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2623
2624        app.register_module(
2625            "availability",
2626            define_api(
2627                &Default::default(),
2628                StaticVersion::<0, 1> {},
2629                "0.0.1".parse().unwrap(),
2630            )
2631            .unwrap(),
2632        )
2633        .unwrap();
2634
2635        network.spawn(
2636            "server",
2637            app.serve(format!("0.0.0.0:{port}"), StaticVersion::<0, 1> {}),
2638        );
2639
2640        let db = TmpDb::init().await;
2641        let provider = Provider::new(QueryServiceProvider::new(
2642            format!("http://localhost:{port}/v0").parse().unwrap(),
2643            StaticVersion::<0, 1> {},
2644        ));
2645        let ds = data_source(&db, &provider).await;
2646
2647        network.start().await;
2648
2649        let leaves = network.data_source().subscribe_leaves(1).await;
2650        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2651        let test_leaf = &leaves[0];
2652        let test_payload = &leaves[2];
2653        let test_common = &leaves[3];
2654
2655        let leaf = ds.get_leaf(test_leaf.height() as usize).await;
2656        let payload = ds.get_payload(test_payload.height() as usize).await;
2657        let common = ds.get_vid_common(test_common.height() as usize).await;
2658
2659        sleep(Duration::from_secs(3)).await;
2660
2661        // fetches fail because of different commitments
2662        leaf.try_resolve().unwrap_err();
2663        payload.try_resolve().unwrap_err();
2664        common.try_resolve().unwrap_err();
2665    }
2666}