hotshot_query_service/data_source/
fetching.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Asynchronous retrieval of missing data.
14//!
15//! [`FetchingDataSource`] combines a local storage implementation with a remote data availability
16//! provider to create a data sources which caches data locally, but which is capable of fetching
17//! missing data from a remote source, either proactively or on demand.
18//!
19//! This implementation supports three kinds of data fetching.
20//!
21//! # Proactive Fetching
22//!
23//! Proactive fetching means actively scanning the local database for missing objects and
24//! proactively retrieving them from a remote provider, even if those objects have yet to be
25//! requested by a client. Doing this increases the chance of success and decreases latency when a
26//! client does eventually ask for those objects. This is also the mechanism by which a query
27//! service joining a network late, or having been offline for some time, is able to catch up with
28//! the events on the network that it missed.
29//!
30//! Proactive fetching is implemented by a background task which performs periodic scans of the
31//! database, identifying and retrieving missing objects. This task is generally low priority, since
32//! missing objects are rare, and it will take care not to monopolize resources that could be used
33//! to serve requests.
34//!
35//! # Active Fetching
36//!
37//! Active fetching means reaching out to a remote data availability provider to retrieve a missing
38//! resource, upon receiving a request for that resource from a client. Not every request for a
39//! missing resource triggers an active fetch. To avoid spamming peers with requests for missing
40//! data, we only actively fetch resources that are known to exist somewhere. This means we can
41//! actively fetch leaves and headers when we are requested a leaf or header by height, whose height
42//! is less than the current chain height. We can fetch a block when the corresponding header exists
43//! (corresponding based on height, hash, or payload hash) or can be actively fetched.
44//!
45//! # Passive Fetching
46//!
47//! For requests that cannot be actively fetched (for example, a block requested by hash, where we
48//! do not have a header proving that a block with that hash exists), we use passive fetching. This
49//! essentially means waiting passively until the query service receives an object that satisfies
50//! the request. This object may be received because it was actively fetched in responsive to a
51//! different request for the same object, one that permitted an active fetch. Or it may have been
52//! fetched [proactively](#proactive-fetching).
53
54use std::{
55    cmp::{max, min},
56    fmt::{Debug, Display},
57    iter::repeat_with,
58    marker::PhantomData,
59    ops::{Bound, Range, RangeBounds},
60    sync::Arc,
61    time::Duration,
62};
63
64use anyhow::{Context, bail};
65use async_lock::Semaphore;
66use async_trait::async_trait;
67use backoff::{ExponentialBackoff, ExponentialBackoffBuilder, backoff::Backoff};
68use derivative::Derivative;
69use futures::{
70    channel::oneshot,
71    future::{self, BoxFuture, Either, Future, FutureExt, join_all},
72    stream::{self, BoxStream, StreamExt},
73};
74use hotshot_types::{
75    data::VidShare,
76    simple_certificate::CertificatePair,
77    traits::{
78        metrics::{Gauge, Metrics},
79        node_implementation::NodeType,
80    },
81};
82use jf_merkle_tree_compat::{MerkleTreeScheme, prelude::MerkleProof};
83use tagged_base64::TaggedBase64;
84use tokio::{spawn, time::sleep};
85use tracing::Instrument;
86
87use super::{
88    Transaction, VersionedDataSource,
89    notifier::Notifier,
90    storage::{
91        Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
92        MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
93        UpdateAvailabilityStorage,
94        pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
95    },
96};
97use crate::{
98    Header, Payload, QueryError, QueryResult,
99    availability::{
100        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
101        FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
102        PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
103        UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
104    },
105    explorer::{self, ExplorerDataSource},
106    fetching::{self, Provider, request},
107    merklized_state::{
108        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
109    },
110    metrics::PrometheusMetrics,
111    node::{
112        NodeDataSource, SyncStatus, SyncStatusQueryData, SyncStatusRange, TimeWindowQueryData,
113        WindowStart,
114    },
115    status::{HasMetrics, StatusDataSource},
116    task::BackgroundTask,
117    types::HeightIndexed,
118};
119
120mod block;
121mod header;
122mod leaf;
123mod transaction;
124mod vid;
125
126use self::{
127    block::PayloadFetcher,
128    leaf::LeafFetcher,
129    transaction::TransactionRequest,
130    vid::{VidCommonFetcher, VidCommonRequest},
131};
132
133/// Builder for [`FetchingDataSource`] with configuration.
134pub struct Builder<Types, S, P> {
135    storage: S,
136    provider: P,
137    backoff: ExponentialBackoffBuilder,
138    rate_limit: usize,
139    range_chunk_size: usize,
140    proactive_interval: Duration,
141    proactive_range_chunk_size: Option<usize>,
142    sync_status_chunk_size: usize,
143    active_fetch_delay: Duration,
144    chunk_fetch_delay: Duration,
145    proactive_fetching: bool,
146    aggregator: bool,
147    aggregator_chunk_size: Option<usize>,
148    leaf_only: bool,
149    _types: PhantomData<Types>,
150}
151
152impl<Types, S, P> Builder<Types, S, P> {
153    /// Construct a new builder with the given storage and fetcher and the default options.
154    pub fn new(storage: S, provider: P) -> Self {
155        let mut default_backoff = ExponentialBackoffBuilder::default();
156        default_backoff
157            .with_initial_interval(Duration::from_secs(1))
158            .with_multiplier(2.)
159            .with_max_interval(Duration::from_secs(32))
160            .with_max_elapsed_time(Some(Duration::from_secs(64)));
161
162        Self {
163            storage,
164            provider,
165            backoff: default_backoff,
166            rate_limit: 32,
167            range_chunk_size: 25,
168            proactive_interval: Duration::from_hours(8),
169            proactive_range_chunk_size: None,
170            sync_status_chunk_size: 100_000,
171            active_fetch_delay: Duration::from_millis(50),
172            chunk_fetch_delay: Duration::from_millis(100),
173            proactive_fetching: true,
174            aggregator: true,
175            aggregator_chunk_size: None,
176            leaf_only: false,
177            _types: Default::default(),
178        }
179    }
180
181    pub fn leaf_only(mut self) -> Self {
182        self.leaf_only = true;
183        self
184    }
185
186    /// Set the minimum delay between retries of failed operations.
187    pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
188        self.backoff.with_initial_interval(interval);
189        self
190    }
191
192    /// Set the maximum delay between retries of failed operations.
193    pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
194        self.backoff.with_max_interval(interval);
195        self
196    }
197
198    /// Set the multiplier for exponential backoff when retrying failed requests.
199    pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
200        self.backoff.with_multiplier(multiplier);
201        self
202    }
203
204    /// Set the randomization factor for randomized backoff when retrying failed requests.
205    pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
206        self.backoff.with_randomization_factor(factor);
207        self
208    }
209
210    /// Set the maximum time to retry failed operations before giving up.
211    pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
212        self.backoff.with_max_elapsed_time(Some(timeout));
213        self
214    }
215
216    /// Set the maximum number of simultaneous fetches.
217    pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
218        self.rate_limit = with_rate_limit;
219        self
220    }
221
222    /// Set the number of items to process at a time when loading a range or stream.
223    ///
224    /// This determines:
225    /// * The number of objects to load from storage in a single request
226    /// * The number of objects to buffer in memory per request/stream
227    /// * The number of concurrent notification subscriptions per request/stream
228    pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
229        self.range_chunk_size = range_chunk_size;
230        self
231    }
232
233    /// Set the time interval between proactive fetching scans.
234    ///
235    /// See [proactive fetching](self#proactive-fetching).
236    pub fn with_proactive_interval(mut self, interval: Duration) -> Self {
237        self.proactive_interval = interval;
238        self
239    }
240
241    /// Set the number of items to process at a time when scanning for proactive fetching.
242    ///
243    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
244    /// proactive fetching scans, not for normal subscription streams. This can be useful to tune
245    /// the proactive scanner to be more or less greedy with persistent storage resources.
246    ///
247    /// By default (i.e. if this method is not called) the proactive range chunk size will be set to
248    /// whatever the normal range chunk size is.
249    pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
250        self.proactive_range_chunk_size = Some(range_chunk_size);
251        self
252    }
253
254    /// Set the number of items to process in a single transaction when scanning the database for
255    /// missing objects.
256    pub fn with_sync_status_chunk_size(mut self, chunk_size: usize) -> Self {
257        self.sync_status_chunk_size = chunk_size;
258        self
259    }
260
261    /// Add a delay between active fetches in proactive scans.
262    ///
263    /// This can be used to limit the rate at which this query service makes requests to other query
264    /// services during proactive scans. This is useful if the query service has a lot of blocks to
265    /// catch up on, as without a delay, scanning can be extremely burdensome on the peer.
266    pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
267        self.active_fetch_delay = active_fetch_delay;
268        self
269    }
270
271    /// Adds a delay between chunk fetches during proactive scans.
272    ///
273    /// In a proactive scan, we retrieve a range of objects from a provider or local storage (e.g., a database).
274    /// Without a delay between fetching these chunks, the process can become very CPU-intensive, especially
275    /// when chunks are retrieved from local storage. While there is already a delay for active fetches
276    /// (`active_fetch_delay`), situations may arise when subscribed to an old stream that fetches most of the data
277    /// from local storage.
278    ///
279    /// This additional delay helps to limit constant maximum CPU usage
280    /// and ensures that local storage remains accessible to all processes,
281    /// not just the proactive scanner.
282    pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
283        self.chunk_fetch_delay = chunk_fetch_delay;
284        self
285    }
286
287    /// Run without [proactive fetching](self#proactive-fetching).
288    ///
289    /// This can reduce load on the CPU and the database, but increases the probability that
290    /// requests will fail due to missing resources. If resources are constrained, it is recommended
291    /// to run with rare proactive fetching (see
292    /// [`with_major_scan_interval`](Self::with_major_scan_interval),
293    /// [`with_minor_scan_interval`](Self::with_minor_scan_interval)), rather than disabling it
294    /// entirely.
295    pub fn disable_proactive_fetching(mut self) -> Self {
296        self.proactive_fetching = false;
297        self
298    }
299
300    /// Run without an aggregator.
301    ///
302    /// This can reduce load on the CPU and the database, but it will cause aggregate statistics
303    /// (such as transaction counts) not to update.
304    pub fn disable_aggregator(mut self) -> Self {
305        self.aggregator = false;
306        self
307    }
308
309    /// Set the number of items to process at a time when computing aggregate statistics.
310    ///
311    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
312    /// the aggregator task, not for normal subscription streams. This can be useful to tune
313    /// the aggregator to be more or less greedy with persistent storage resources.
314    ///
315    /// By default (i.e. if this method is not called) the proactive range chunk size will be set to
316    /// whatever the normal range chunk size is.
317    pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
318        self.aggregator_chunk_size = Some(chunk_size);
319        self
320    }
321
322    pub fn is_leaf_only(&self) -> bool {
323        self.leaf_only
324    }
325}
326
327impl<Types, S, P> Builder<Types, S, P>
328where
329    Types: NodeType,
330    Payload<Types>: QueryablePayload<Types>,
331    Header<Types>: QueryableHeader<Types>,
332    S: PruneStorage + VersionedDataSource + HasMetrics + 'static,
333    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
334        + PrunedHeightStorage
335        + NodeStorage<Types>
336        + AggregatesStorage<Types>,
337    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
338    P: AvailabilityProvider<Types>,
339{
340    /// Build a [`FetchingDataSource`] with these options.
341    pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
342        FetchingDataSource::new(self).await
343    }
344}
345
346/// The most basic kind of data source.
347///
348/// A data source is constructed modularly by combining a [storage](super::storage) implementation
349/// with a [Fetcher](crate::fetching::Fetcher). The former allows the query service to store the
350/// data it has persistently in an easily accessible storage medium, such as the local file system
351/// or a database. This allows it to answer queries efficiently and to maintain its state across
352/// restarts. The latter allows the query service to fetch data that is missing from its storage
353/// from an external data availability provider, such as the Tiramisu DA network or another instance
354/// of the query service.
355///
356/// These two components of a data source are combined in [`FetchingDataSource`], which is the
357/// lowest level kind of data source available. It simply uses the storage implementation to fetch
358/// data when available, and fills in everything else using the fetcher. Various kinds of data
359/// sources can be constructed out of [`FetchingDataSource`] by changing the storage and fetcher
360/// implementations used, and more complex data sources can be built on top using data source
361/// combinators.
362#[derive(Derivative)]
363#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
364pub struct FetchingDataSource<Types, S, P>
365where
366    Types: NodeType,
367{
368    // The fetcher manages retrieval of resources from both local storage and a remote provider. It
369    // encapsulates the data which may need to be shared with a long-lived task or future that
370    // implements the asynchronous fetching of a particular object. This is why it gets its own
371    // type, wrapped in an [`Arc`] for easy, efficient cloning.
372    fetcher: Arc<Fetcher<Types, S, P>>,
373    // The proactive scanner task. This is only saved here so that we can cancel it on drop.
374    scanner: Option<BackgroundTask>,
375    // The aggregator task, which derives aggregate statistics from a block stream.
376    aggregator: Option<BackgroundTask>,
377    pruner: Pruner<Types, S>,
378}
379
380#[derive(Derivative)]
381#[derivative(Clone(bound = ""), Debug(bound = "S: Debug,   "))]
382pub struct Pruner<Types, S>
383where
384    Types: NodeType,
385{
386    handle: Option<BackgroundTask>,
387    _types: PhantomData<(Types, S)>,
388}
389
390impl<Types, S> Pruner<Types, S>
391where
392    Types: NodeType,
393    Header<Types>: QueryableHeader<Types>,
394    Payload<Types>: QueryablePayload<Types>,
395    S: PruneStorage + Send + Sync + 'static,
396{
397    async fn new(storage: Arc<S>) -> Self {
398        let cfg = storage.get_pruning_config();
399        let Some(cfg) = cfg else {
400            return Self {
401                handle: None,
402                _types: Default::default(),
403            };
404        };
405
406        let future = async move {
407            for i in 1.. {
408                tracing::warn!("starting pruner run {i} ");
409                Self::prune(storage.clone()).await;
410                sleep(cfg.interval()).await;
411            }
412        };
413
414        let task = BackgroundTask::spawn("pruner", future);
415
416        Self {
417            handle: Some(task),
418            _types: Default::default(),
419        }
420    }
421
422    async fn prune(storage: Arc<S>) {
423        // We loop until the whole run pruner run is complete
424        let mut pruner = S::Pruner::default();
425        loop {
426            match storage.prune(&mut pruner).await {
427                Ok(Some(height)) => {
428                    tracing::warn!("Pruned to height {height}");
429                },
430                Ok(None) => {
431                    tracing::warn!("pruner run complete.");
432                    break;
433                },
434                Err(e) => {
435                    tracing::error!("pruner run failed: {e:?}");
436                    break;
437                },
438            }
439        }
440    }
441}
442
443impl<Types, S, P> FetchingDataSource<Types, S, P>
444where
445    Types: NodeType,
446    Payload<Types>: QueryablePayload<Types>,
447    Header<Types>: QueryableHeader<Types>,
448    S: VersionedDataSource + PruneStorage + HasMetrics + 'static,
449    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
450    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
451        + NodeStorage<Types>
452        + PrunedHeightStorage
453        + AggregatesStorage<Types>,
454    P: AvailabilityProvider<Types>,
455{
456    /// Build a [`FetchingDataSource`] with the given `storage` and `provider`.
457    pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
458        Builder::new(storage, provider)
459    }
460
461    async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
462        let leaf_only = builder.is_leaf_only();
463        let aggregator = builder.aggregator;
464        let aggregator_chunk_size = builder
465            .aggregator_chunk_size
466            .unwrap_or(builder.range_chunk_size);
467        let proactive_fetching = builder.proactive_fetching;
468        let proactive_interval = builder.proactive_interval;
469        let proactive_range_chunk_size = builder
470            .proactive_range_chunk_size
471            .unwrap_or(builder.range_chunk_size);
472        let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
473        let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
474
475        let fetcher = Arc::new(Fetcher::new(builder).await?);
476        let scanner = if proactive_fetching && !leaf_only {
477            Some(BackgroundTask::spawn(
478                "proactive scanner",
479                fetcher.clone().proactive_scan(
480                    proactive_interval,
481                    proactive_range_chunk_size,
482                    scanner_metrics,
483                ),
484            ))
485        } else {
486            None
487        };
488
489        let aggregator = if aggregator && !leaf_only {
490            Some(BackgroundTask::spawn(
491                "aggregator",
492                fetcher
493                    .clone()
494                    .aggregate(aggregator_chunk_size, aggregator_metrics),
495            ))
496        } else {
497            None
498        };
499
500        let storage = fetcher.storage.clone();
501
502        let pruner = Pruner::new(storage).await;
503        let ds = Self {
504            fetcher,
505            scanner,
506            pruner,
507            aggregator,
508        };
509
510        Ok(ds)
511    }
512
513    /// Get a copy of the (shared) inner storage
514    pub fn inner(&self) -> Arc<S> {
515        self.fetcher.storage.clone()
516    }
517}
518
519impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
520where
521    Types: NodeType,
522{
523    fn as_ref(&self) -> &S {
524        &self.fetcher.storage
525    }
526}
527
528impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
529where
530    Types: NodeType,
531    S: HasMetrics,
532{
533    fn metrics(&self) -> &PrometheusMetrics {
534        self.as_ref().metrics()
535    }
536}
537
538#[async_trait]
539impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
540where
541    Types: NodeType,
542    Header<Types>: QueryableHeader<Types>,
543    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
544    for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
545    P: Send + Sync,
546{
547    async fn block_height(&self) -> QueryResult<usize> {
548        let mut tx = self.read().await.map_err(|err| QueryError::Error {
549            message: err.to_string(),
550        })?;
551        tx.block_height().await
552    }
553}
554
555#[async_trait]
556impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
557where
558    Types: NodeType,
559    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
560    for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
561    P: Send + Sync,
562{
563    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
564        let mut tx = self.read().await?;
565        tx.load_pruned_height().await
566    }
567}
568
569#[async_trait]
570impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
571where
572    Types: NodeType,
573    Header<Types>: QueryableHeader<Types>,
574    Payload<Types>: QueryablePayload<Types>,
575    S: VersionedDataSource + 'static,
576    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
577    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
578    P: AvailabilityProvider<Types>,
579{
580    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
581    where
582        ID: Into<LeafId<Types>> + Send + Sync,
583    {
584        self.fetcher.get(id.into()).await
585    }
586
587    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
588    where
589        ID: Into<BlockId<Types>> + Send + Sync,
590    {
591        self.fetcher
592            .get::<HeaderQueryData<_>>(id.into())
593            .await
594            .map(|h| h.header)
595    }
596
597    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
598    where
599        ID: Into<BlockId<Types>> + Send + Sync,
600    {
601        self.fetcher.get(id.into()).await
602    }
603
604    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
605    where
606        ID: Into<BlockId<Types>> + Send + Sync,
607    {
608        self.fetcher.get(id.into()).await
609    }
610
611    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
612    where
613        ID: Into<BlockId<Types>> + Send + Sync,
614    {
615        self.fetcher.get(id.into()).await
616    }
617
618    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
619    where
620        ID: Into<BlockId<Types>> + Send + Sync,
621    {
622        self.fetcher.get(VidCommonRequest::from(id.into())).await
623    }
624
625    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
626    where
627        ID: Into<BlockId<Types>> + Send + Sync,
628    {
629        self.fetcher.get(VidCommonRequest::from(id.into())).await
630    }
631
632    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
633    where
634        R: RangeBounds<usize> + Send + 'static,
635    {
636        self.fetcher.clone().get_range(range)
637    }
638
639    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
640    where
641        R: RangeBounds<usize> + Send + 'static,
642    {
643        self.fetcher.clone().get_range(range)
644    }
645
646    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
647    where
648        R: RangeBounds<usize> + Send + 'static,
649    {
650        let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
651
652        leaves
653            .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
654            .boxed()
655    }
656
657    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
658    where
659        R: RangeBounds<usize> + Send + 'static,
660    {
661        self.fetcher.clone().get_range(range)
662    }
663
664    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
665    where
666        R: RangeBounds<usize> + Send + 'static,
667    {
668        self.fetcher.clone().get_range(range)
669    }
670
671    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
672    where
673        R: RangeBounds<usize> + Send + 'static,
674    {
675        self.fetcher.clone().get_range(range)
676    }
677
678    async fn get_vid_common_metadata_range<R>(
679        &self,
680        range: R,
681    ) -> FetchStream<VidCommonMetadata<Types>>
682    where
683        R: RangeBounds<usize> + Send + 'static,
684    {
685        self.fetcher.clone().get_range(range)
686    }
687
688    async fn get_leaf_range_rev(
689        &self,
690        start: Bound<usize>,
691        end: usize,
692    ) -> FetchStream<LeafQueryData<Types>> {
693        self.fetcher.clone().get_range_rev(start, end)
694    }
695
696    async fn get_block_range_rev(
697        &self,
698        start: Bound<usize>,
699        end: usize,
700    ) -> FetchStream<BlockQueryData<Types>> {
701        self.fetcher.clone().get_range_rev(start, end)
702    }
703
704    async fn get_payload_range_rev(
705        &self,
706        start: Bound<usize>,
707        end: usize,
708    ) -> FetchStream<PayloadQueryData<Types>> {
709        self.fetcher.clone().get_range_rev(start, end)
710    }
711
712    async fn get_payload_metadata_range_rev(
713        &self,
714        start: Bound<usize>,
715        end: usize,
716    ) -> FetchStream<PayloadMetadata<Types>> {
717        self.fetcher.clone().get_range_rev(start, end)
718    }
719
720    async fn get_vid_common_range_rev(
721        &self,
722        start: Bound<usize>,
723        end: usize,
724    ) -> FetchStream<VidCommonQueryData<Types>> {
725        self.fetcher.clone().get_range_rev(start, end)
726    }
727
728    async fn get_vid_common_metadata_range_rev(
729        &self,
730        start: Bound<usize>,
731        end: usize,
732    ) -> FetchStream<VidCommonMetadata<Types>> {
733        self.fetcher.clone().get_range_rev(start, end)
734    }
735
736    async fn get_block_containing_transaction(
737        &self,
738        h: TransactionHash<Types>,
739    ) -> Fetch<BlockWithTransaction<Types>> {
740        self.fetcher.clone().get(TransactionRequest::from(h)).await
741    }
742}
743
744impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
745where
746    Types: NodeType,
747    Header<Types>: QueryableHeader<Types>,
748    Payload<Types>: QueryablePayload<Types>,
749    S: VersionedDataSource + 'static,
750    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
751    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
752    P: AvailabilityProvider<Types>,
753{
754    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
755        let height = info.height() as usize;
756
757        // Save the new decided leaf.
758        self.fetcher
759            .store(&(info.leaf.clone(), info.qc_chain))
760            .await;
761
762        // Trigger a fetch of the parent leaf, if we don't already have it.
763        leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
764
765        // Store and notify the block data and VID common, if available. Spawn a fetch to retrieve
766        // it, if not.
767        //
768        // Note a special case here: if the data was not available in the decide event, but _is_
769        // available locally in the database, without having to spawn a fetch for it, we _must_
770        // notify now. Thus, we must pattern match to distinguish `Fetch::Ready`/`Fetch::Pending`.
771        //
772        // Why? As soon as we inserted the leaf, the corresponding object may become available, if
773        // we already had an identical payload/VID common in the database, from a different block.
774        // Then calling `get()` will not spawn a fetch/notification, and existing fetches waiting
775        // for the newly decided object to arrive will miss it. Thus, if `get()` returned a `Ready`
776        // object, it is our responsibility, as the task processing newly decided objects, to make
777        // sure those fetches get notified.
778        let block = match info.block {
779            Some(block) => Some(block),
780            None => match self.fetcher.get::<BlockQueryData<Types>>(height).await {
781                Fetch::Ready(block) => Some(block),
782                Fetch::Pending(fut) => {
783                    let span = tracing::info_span!("fetch missing block", height);
784                    spawn(
785                        async move {
786                            tracing::info!("fetching missing block");
787                            fut.await;
788                        }
789                        .instrument(span),
790                    );
791                    None
792                },
793            },
794        };
795        if let Some(block) = &block {
796            self.fetcher.store(block).await;
797        }
798        let vid = match info.vid_common {
799            Some(vid) => Some(vid),
800            None => match self.fetcher.get::<VidCommonQueryData<Types>>(height).await {
801                Fetch::Ready(vid) => Some(vid),
802                Fetch::Pending(fut) => {
803                    let span = tracing::info_span!("fetch missing VID common", height);
804                    spawn(
805                        async move {
806                            tracing::info!("fetching missing VID common");
807                            fut.await;
808                        }
809                        .instrument(span),
810                    );
811                    None
812                },
813            },
814        };
815        if let Some(vid) = &vid {
816            self.fetcher.store(&(vid.clone(), info.vid_share)).await;
817        }
818
819        // Send notifications for the new objects after storing all of them. This ensures that as
820        // soon as a fetch for any of these objects resolves, the corresponding data will
821        // immediately be available. This isn't strictly required for correctness; after all,
822        // objects can generally be fetched as asynchronously as we want. But this is the most
823        // intuitive behavior to provide when possible.
824        info.leaf.notify(&self.fetcher.notifiers).await;
825        if let Some(block) = &block {
826            block.notify(&self.fetcher.notifiers).await;
827        }
828        if let Some(vid) = &vid {
829            vid.notify(&self.fetcher.notifiers).await;
830        }
831
832        Ok(())
833    }
834}
835
836impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
837where
838    Types: NodeType,
839    S: VersionedDataSource + Send + Sync,
840    P: Send + Sync,
841{
842    type Transaction<'a>
843        = S::Transaction<'a>
844    where
845        Self: 'a;
846    type ReadOnly<'a>
847        = S::ReadOnly<'a>
848    where
849        Self: 'a;
850
851    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
852        self.fetcher.write().await
853    }
854
855    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
856        self.fetcher.read().await
857    }
858}
859
860/// Asynchronous retrieval and storage of [`Fetchable`] resources.
861#[derive(Debug)]
862struct Fetcher<Types, S, P>
863where
864    Types: NodeType,
865{
866    storage: Arc<S>,
867    notifiers: Notifiers<Types>,
868    provider: Arc<P>,
869    leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
870    payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
871    vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
872    range_chunk_size: usize,
873    sync_status_chunk_size: usize,
874    // Duration to sleep after each active fetch,
875    active_fetch_delay: Duration,
876    // Duration to sleep after each chunk fetched
877    chunk_fetch_delay: Duration,
878    // Exponential backoff when retrying failed operations.
879    backoff: ExponentialBackoff,
880    // Semaphore limiting the number of simultaneous DB accesses we can have from tasks spawned to
881    // retry failed loads.
882    retry_semaphore: Arc<Semaphore>,
883    leaf_only: bool,
884}
885
886impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
887where
888    Types: NodeType,
889    S: VersionedDataSource + Send + Sync,
890    P: Send + Sync,
891{
892    type Transaction<'a>
893        = S::Transaction<'a>
894    where
895        Self: 'a;
896    type ReadOnly<'a>
897        = S::ReadOnly<'a>
898    where
899        Self: 'a;
900
901    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
902        self.storage.write().await
903    }
904
905    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
906        self.storage.read().await
907    }
908}
909
910impl<Types, S, P> Fetcher<Types, S, P>
911where
912    Types: NodeType,
913    Header<Types>: QueryableHeader<Types>,
914    S: VersionedDataSource + Sync,
915    for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
916{
917    pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
918        let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
919        let backoff = builder.backoff.build();
920
921        let (payload_fetcher, vid_fetcher) = if builder.is_leaf_only() {
922            (None, None)
923        } else {
924            (
925                Some(Arc::new(fetching::Fetcher::new(
926                    retry_semaphore.clone(),
927                    backoff.clone(),
928                ))),
929                Some(Arc::new(fetching::Fetcher::new(
930                    retry_semaphore.clone(),
931                    backoff.clone(),
932                ))),
933            )
934        };
935        let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
936
937        let leaf_only = builder.leaf_only;
938
939        Ok(Self {
940            storage: Arc::new(builder.storage),
941            notifiers: Default::default(),
942            provider: Arc::new(builder.provider),
943            leaf_fetcher: Arc::new(leaf_fetcher),
944            payload_fetcher,
945            vid_common_fetcher: vid_fetcher,
946            range_chunk_size: builder.range_chunk_size,
947            sync_status_chunk_size: builder.sync_status_chunk_size,
948            active_fetch_delay: builder.active_fetch_delay,
949            chunk_fetch_delay: builder.chunk_fetch_delay,
950            backoff,
951            retry_semaphore,
952            leaf_only,
953        })
954    }
955}
956
957impl<Types, S, P> Fetcher<Types, S, P>
958where
959    Types: NodeType,
960    Header<Types>: QueryableHeader<Types>,
961    Payload<Types>: QueryablePayload<Types>,
962    S: VersionedDataSource + 'static,
963    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
964    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
965    P: AvailabilityProvider<Types>,
966{
967    async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
968    where
969        T: Fetchable<Types>,
970    {
971        let req = req.into();
972
973        // Subscribe to notifications before we check storage for the requested object. This ensures
974        // that this operation will always eventually succeed as long as the requested object
975        // actually exists (or will exist). We will either find it in our local storage and succeed
976        // immediately, or (if it exists) someone will *later* come and add it to storage, at which
977        // point we will get a notification causing this passive fetch to resolve.
978        //
979        // Note the "someone" who later fetches the object and adds it to storage may be an active
980        // fetch triggered by this very requests, in cases where that is possible, but it need not
981        // be.
982        let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
983
984        match self.try_get(req).await {
985            Ok(Some(obj)) => return Fetch::Ready(obj),
986            Ok(None) => return passive(req, passive_fetch),
987            Err(err) => {
988                tracing::warn!(
989                    ?req,
990                    "unable to fetch object; spawning a task to retry: {err:#}"
991                );
992            },
993        }
994
995        // We'll use this channel to get the object back if we successfully load it on retry.
996        let (send, recv) = oneshot::channel();
997
998        let fetcher = self.clone();
999        let mut backoff = fetcher.backoff.clone();
1000        let span = tracing::warn_span!("get retry", ?req);
1001        spawn(
1002            async move {
1003                let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1004                loop {
1005                    let res = {
1006                        // Limit the number of simultaneous retry tasks hitting the database. When
1007                        // the database is down, we might have a lot of these tasks running, and if
1008                        // they all hit the DB at once, they are only going to make things worse.
1009                        let _guard = fetcher.retry_semaphore.acquire().await;
1010                        fetcher.try_get(req).await
1011                    };
1012                    match res {
1013                        Ok(Some(obj)) => {
1014                            // If the object was immediately available after all, signal the
1015                            // original fetch. We probably just temporarily couldn't access it due
1016                            // to database errors.
1017                            tracing::info!(?req, "object was ready after retries");
1018                            send.send(obj).ok();
1019                            break;
1020                        },
1021                        Ok(None) => {
1022                            // The object was not immediately available after all, but we have
1023                            // successfully spawned a fetch for it if possible. The spawned fetch
1024                            // will notify the original request once it completes.
1025                            tracing::info!(?req, "spawned fetch after retries");
1026                            break;
1027                        },
1028                        Err(err) => {
1029                            tracing::warn!(
1030                                ?req,
1031                                ?delay,
1032                                "unable to fetch object, will retry: {err:#}"
1033                            );
1034                            sleep(delay).await;
1035                            if let Some(next_delay) = backoff.next_backoff() {
1036                                delay = next_delay;
1037                            }
1038                        },
1039                    }
1040                }
1041            }
1042            .instrument(span),
1043        );
1044
1045        // Wait for the object to be fetched, either from the local database on retry or from
1046        // another provider eventually.
1047        passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1048    }
1049
1050    /// Try to get an object from local storage or initialize a fetch if it is missing.
1051    ///
1052    /// There are three possible scenarios in this function, indicated by the return type:
1053    /// * `Ok(Some(obj))`: the requested object was available locally and successfully retrieved
1054    ///   from the database; no fetch was spawned
1055    /// * `Ok(None)`: the requested object was not available locally, but a fetch was successfully
1056    ///   spawned if possible (in other words, if a fetch was not spawned, it was determined that
1057    ///   the requested object is not fetchable)
1058    /// * `Err(_)`: it could not be determined whether the object was available locally or whether
1059    ///   it could be fetched; no fetch was spawned even though the object may be fetchable
1060    async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1061    where
1062        T: Fetchable<Types>,
1063    {
1064        let mut tx = self.read().await.context("opening read transaction")?;
1065        match T::load(&mut tx, req).await {
1066            Ok(t) => Ok(Some(t)),
1067            Err(QueryError::Missing | QueryError::NotFound) => {
1068                // We successfully queried the database, but the object wasn't there. Try to
1069                // fetch it.
1070                tracing::debug!(?req, "object missing from local storage, will try to fetch");
1071                self.fetch::<T>(&mut tx, req).await?;
1072                Ok(None)
1073            },
1074            Err(err) => {
1075                // An error occurred while querying the database. We don't know if we need to fetch
1076                // the object or not. Return an error so we can try again.
1077                bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1078            },
1079        }
1080    }
1081
1082    /// Get a range of objects from local storage or a provider.
1083    ///
1084    /// Convert a finite stream of fallible local storage lookups into a (possibly infinite) stream
1085    /// of infallible fetches. Objects in `range` are loaded from local storage. Any gaps or missing
1086    /// objects are filled by fetching from a provider. Items in the resulting stream are futures
1087    /// that will never fail to produce a resource, although they may block indefinitely if the
1088    /// resource needs to be fetched.
1089    ///
1090    /// Objects are loaded and fetched in chunks, which strikes a good balance of limiting the total
1091    /// number of storage and network requests, while also keeping the amount of simultaneous
1092    /// resource consumption bounded.
1093    fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1094    where
1095        R: RangeBounds<usize> + Send + 'static,
1096        T: RangedFetchable<Types>,
1097    {
1098        let chunk_size = self.range_chunk_size;
1099        self.get_range_with_chunk_size(chunk_size, range)
1100    }
1101
1102    /// Same as [`Self::get_range`], but uses the given chunk size instead of the default.
1103    fn get_range_with_chunk_size<R, T>(
1104        self: Arc<Self>,
1105        chunk_size: usize,
1106        range: R,
1107    ) -> BoxStream<'static, Fetch<T>>
1108    where
1109        R: RangeBounds<usize> + Send + 'static,
1110        T: RangedFetchable<Types>,
1111    {
1112        let chunk_fetch_delay = self.chunk_fetch_delay;
1113        let active_fetch_delay = self.active_fetch_delay;
1114
1115        stream::iter(range_chunks(range, chunk_size))
1116            .then(move |chunk| {
1117                let self_clone = self.clone();
1118                async move {
1119                    {
1120                        let chunk = self_clone.get_chunk(chunk).await;
1121
1122                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1123                        // helps to limit constant high CPU usage when fetching long range of data,
1124                        // especially for older streams that fetch most of the data from local
1125                        // storage.
1126                        sleep(chunk_fetch_delay).await;
1127                        stream::iter(chunk)
1128                    }
1129                }
1130            })
1131            .flatten()
1132            .then(move |f| async move {
1133                match f {
1134                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1135                    // the catchup provider. The delay applies between pending fetches, not between
1136                    // chunks.
1137                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1138                    Fetch::Ready(_) => (),
1139                };
1140                f
1141            })
1142            .boxed()
1143    }
1144
1145    /// Same as [`Self::get_range`], but yields objects in reverse order by height.
1146    ///
1147    /// Note that unlike [`Self::get_range`], which accepts any range and yields an infinite stream
1148    /// if the range has no upper bound, this function requires there to be a defined upper bound,
1149    /// otherwise we don't know where the reversed stream should _start_. The `end` bound given here
1150    /// is inclusive; i.e. the first item yielded by the stream will have height `end`.
1151    fn get_range_rev<T>(
1152        self: Arc<Self>,
1153        start: Bound<usize>,
1154        end: usize,
1155    ) -> BoxStream<'static, Fetch<T>>
1156    where
1157        T: RangedFetchable<Types>,
1158    {
1159        let chunk_size = self.range_chunk_size;
1160        self.get_range_with_chunk_size_rev(chunk_size, start, end)
1161    }
1162
1163    /// Same as [`Self::get_range_rev`], but uses the given chunk size instead of the default.
1164    fn get_range_with_chunk_size_rev<T>(
1165        self: Arc<Self>,
1166        chunk_size: usize,
1167        start: Bound<usize>,
1168        end: usize,
1169    ) -> BoxStream<'static, Fetch<T>>
1170    where
1171        T: RangedFetchable<Types>,
1172    {
1173        let chunk_fetch_delay = self.chunk_fetch_delay;
1174        let active_fetch_delay = self.active_fetch_delay;
1175
1176        stream::iter(range_chunks_rev(start, end, chunk_size))
1177            .then(move |chunk| {
1178                let self_clone = self.clone();
1179                async move {
1180                    {
1181                        let chunk = self_clone.get_chunk(chunk).await;
1182
1183                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1184                        // helps to limit constant high CPU usage when fetching long range of data,
1185                        // especially for older streams that fetch most of the data from local
1186                        // storage
1187                        sleep(chunk_fetch_delay).await;
1188                        stream::iter(chunk.into_iter().rev())
1189                    }
1190                }
1191            })
1192            .flatten()
1193            .then(move |f| async move {
1194                match f {
1195                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1196                    // the catchup provider. The delay applies between pending fetches, not between
1197                    // chunks.
1198                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1199                    Fetch::Ready(_) => (),
1200                };
1201                f
1202            })
1203            .boxed()
1204    }
1205
1206    /// Get a range of objects from local storage or a provider.
1207    ///
1208    /// This method is similar to `get_range`, except that:
1209    /// * It fetches all desired objects together, as a single chunk
1210    /// * It loads the object or triggers fetches right now rather than providing a lazy stream
1211    ///   which only fetches objects when polled.
1212    async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1213    where
1214        T: RangedFetchable<Types>,
1215    {
1216        // Subscribe to notifications first. As in [`get`](Self::get), this ensures we won't miss
1217        // any notifications sent in between checking local storage and triggering a fetch if
1218        // necessary.
1219        let passive_fetches = join_all(
1220            chunk
1221                .clone()
1222                .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1223        )
1224        .await;
1225
1226        match self.try_get_chunk(&chunk).await {
1227            Ok(objs) => {
1228                // Convert to fetches. Objects which are not immediately available (`None` in the
1229                // chunk) become passive fetches awaiting a notification of availability.
1230                return objs
1231                    .into_iter()
1232                    .zip(passive_fetches)
1233                    .enumerate()
1234                    .map(move |(i, (obj, passive_fetch))| match obj {
1235                        Some(obj) => Fetch::Ready(obj),
1236                        None => passive(T::Request::from(chunk.start + i), passive_fetch),
1237                    })
1238                    .collect();
1239            },
1240            Err(err) => {
1241                tracing::warn!(
1242                    ?chunk,
1243                    "unable to fetch chunk; spawning a task to retry: {err:#}"
1244                );
1245            },
1246        }
1247
1248        // We'll use these channels to get the objects back that we successfully load on retry.
1249        let (send, recv): (Vec<_>, Vec<_>) =
1250            repeat_with(oneshot::channel).take(chunk.len()).unzip();
1251
1252        {
1253            let fetcher = self.clone();
1254            let mut backoff = fetcher.backoff.clone();
1255            let chunk = chunk.clone();
1256            let span = tracing::warn_span!("get_chunk retry", ?chunk);
1257            spawn(
1258                async move {
1259                    let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1260                    loop {
1261                        let res = {
1262                            // Limit the number of simultaneous retry tasks hitting the database.
1263                            // When the database is down, we might have a lot of these tasks
1264                            // running, and if they all hit the DB at once, they are only going to
1265                            // make things worse.
1266                            let _guard = fetcher.retry_semaphore.acquire().await;
1267                            fetcher.try_get_chunk(&chunk).await
1268                        };
1269                        match res {
1270                            Ok(objs) => {
1271                                for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1272                                    if let Some(obj) = obj {
1273                                        // If the object was immediately available after all, signal
1274                                        // the original fetch. We probably just temporarily couldn't
1275                                        // access it due to database errors.
1276                                        tracing::info!(?chunk, i, "object was ready after retries");
1277                                        sender.send(obj).ok();
1278                                    } else {
1279                                        // The object was not immediately available after all, but
1280                                        // we have successfully spawned a fetch for it if possible.
1281                                        // The spawned fetch will notify the original request once
1282                                        // it completes.
1283                                        tracing::info!(?chunk, i, "spawned fetch after retries");
1284                                    }
1285                                }
1286                                break;
1287                            },
1288                            Err(err) => {
1289                                tracing::warn!(
1290                                    ?chunk,
1291                                    ?delay,
1292                                    "unable to fetch chunk, will retry: {err:#}"
1293                                );
1294                                sleep(delay).await;
1295                                if let Some(next_delay) = backoff.next_backoff() {
1296                                    delay = next_delay;
1297                                }
1298                            },
1299                        }
1300                    }
1301                }
1302                .instrument(span),
1303            );
1304        }
1305
1306        // Wait for the objects to be fetched, either from the local database on retry or from
1307        // another provider eventually.
1308        passive_fetches
1309            .into_iter()
1310            .zip(recv)
1311            .enumerate()
1312            .map(move |(i, (passive_fetch, recv))| {
1313                passive(
1314                    T::Request::from(chunk.start + i),
1315                    select_some(passive_fetch, recv.map(Result::ok)),
1316                )
1317            })
1318            .collect()
1319    }
1320
1321    /// Try to get a range of objects from local storage, initializing fetches if any are missing.
1322    ///
1323    /// If this function succeeded, then for each object in the requested range, either:
1324    /// * the object was available locally, and corresponds to `Some(_)` object in the result
1325    /// * the object was not available locally (and corresponds to `None` in the result), but a
1326    ///   fetch was successfully spawned if possible (in other words, if a fetch was not spawned, it
1327    ///   was determined that the requested object is not fetchable)
1328    ///
1329    /// This function will fail if it could not be determined which objects in the requested range
1330    /// are available locally, or if, for any missing object, it could not be determined whether
1331    /// that object is fetchable. In this case, there may be no fetch spawned for certain objects in
1332    /// the requested range, even if those objects are actually fetchable.
1333    async fn try_get_chunk<T>(
1334        self: &Arc<Self>,
1335        chunk: &Range<usize>,
1336    ) -> anyhow::Result<Vec<Option<T>>>
1337    where
1338        T: RangedFetchable<Types>,
1339    {
1340        let mut tx = self.read().await.context("opening read transaction")?;
1341        let ts = T::load_range(&mut tx, chunk.clone())
1342            .await
1343            .context(format!("when fetching items in range {chunk:?}"))?;
1344
1345        // Log and discard error information; we want a list of Option where None indicates an
1346        // object that needs to be fetched. Note that we don't use `FetchRequest::might_exist` to
1347        // silence the logs here when an object is missing that is not expected to exist at all.
1348        // When objects are not expected to exist, `load_range` should just return a truncated list
1349        // rather than returning `Err` objects, so if there are errors in here they are unexpected
1350        // and we do want to log them.
1351        let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1352
1353        // Kick off a fetch for each missing object.
1354        let mut results = Vec::with_capacity(chunk.len());
1355        for t in ts {
1356            // Fetch missing objects that should come before `t`.
1357            while chunk.start + results.len() < t.height() as usize {
1358                tracing::debug!(
1359                    "item {} in chunk not available, will be fetched",
1360                    results.len()
1361                );
1362                self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1363                    .await?;
1364                results.push(None);
1365            }
1366
1367            results.push(Some(t));
1368        }
1369        // Fetch missing objects from the end of the range.
1370        while results.len() < chunk.len() {
1371            self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1372                .await?;
1373            results.push(None);
1374        }
1375
1376        Ok(results)
1377    }
1378
1379    /// Spawn an active fetch for the requested object, if possible.
1380    ///
1381    /// On success, either an active fetch for `req` has been spawned, or it has been determined
1382    /// that `req` is not fetchable. Fails if it cannot be determined (e.g. due to errors in the
1383    /// local database) whether `req` is fetchable or not.
1384    async fn fetch<T>(
1385        self: &Arc<Self>,
1386        tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1387        req: T::Request,
1388    ) -> anyhow::Result<()>
1389    where
1390        T: Fetchable<Types>,
1391    {
1392        tracing::debug!("fetching resource {req:?}");
1393
1394        // Trigger an active fetch from a remote provider if possible.
1395        let heights = Heights::load(tx)
1396            .await
1397            .context("failed to load heights; cannot definitively say object might exist")?;
1398        if req.might_exist(heights) {
1399            T::active_fetch(tx, self.clone(), req).await?;
1400        } else {
1401            tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1402        }
1403        Ok(())
1404    }
1405
1406    /// Proactively search for and retrieve missing objects.
1407    ///
1408    /// This function will proactively identify and retrieve blocks and leaves which are missing
1409    /// from storage. It will run until cancelled, thus, it is meant to be spawned as a background
1410    /// task rather than called synchronously.
1411    async fn proactive_scan(
1412        self: Arc<Self>,
1413        interval: Duration,
1414        chunk_size: usize,
1415        metrics: ScannerMetrics,
1416    ) {
1417        for i in 0.. {
1418            let span = tracing::warn_span!("proactive scan", i);
1419            metrics.running.set(1);
1420            metrics.current_scan.set(i);
1421            async {
1422                let sync_status = {
1423                    match self.sync_status().await {
1424                        Ok(st) => st,
1425                        Err(err) => {
1426                            tracing::warn!(
1427                                "unable to load sync status, scan will be skipped: {err:#}"
1428                            );
1429                            return;
1430                        },
1431                    }
1432                };
1433                tracing::info!(?sync_status, "starting scan");
1434                metrics.missing_blocks.set(sync_status.blocks.missing);
1435                metrics.missing_vid.set(sync_status.vid_common.missing);
1436
1437                // Fetch missing blocks. This will also trigger a fetch for the corresponding
1438                // missing leaves.
1439                for range in sync_status.blocks.ranges {
1440                    metrics.scanned_blocks.set(range.start);
1441                    if range.status != SyncStatus::Missing {
1442                        continue;
1443                    }
1444
1445                    tracing::info!(?range, "fetching missing block range");
1446                    self.clone()
1447                        // Fetching the payload metadata is enough to trigger an active fetch of the
1448                        // corresponding leaf and the full block if they are missing.
1449                        //
1450                        // We iterate in reverse order because leaves are inherently fetched in
1451                        // reverse, since we cannot (actively) fetch a leaf until we have the
1452                        // subsequent leaf, which tells us what the hash of its parent should be.
1453                        .get_range_with_chunk_size_rev::<PayloadMetadata<Types>>(
1454                            chunk_size,
1455                            Bound::Included(range.start),
1456                            range.end - 1,
1457                        )
1458                        .then(|fetch| async move {fetch.await;})
1459                        .collect::<()>()
1460                        .await;
1461                    metrics
1462                        .missing_blocks
1463                        .update((range.start as i64) - (range.end as i64));
1464                }
1465
1466                // Do the same for VID.
1467                for range in sync_status.vid_common.ranges {
1468                    metrics.scanned_vid.set(range.start);
1469                    if range.status != SyncStatus::Missing {
1470                        continue;
1471                    }
1472
1473                    tracing::info!(?range, "fetching missing VID range");
1474                    self.clone()
1475                        .get_range_with_chunk_size_rev::<VidCommonMetadata<Types>>(
1476                            chunk_size,
1477                            Bound::Included(range.start),
1478                            range.end - 1,
1479                        )
1480                        .then(|fetch| async move {
1481                            fetch.await;
1482                        })
1483                        .collect::<()>()
1484                        .await;
1485                    metrics
1486                        .missing_vid
1487                        .update((range.start as i64) - (range.end as i64));
1488                }
1489
1490                tracing::info!("completed proactive scan, will scan again in {interval:?}");
1491
1492                // Reset metrics.
1493                metrics.running.set(0);
1494            }
1495            .instrument(span)
1496            .await;
1497
1498            sleep(interval).await;
1499        }
1500    }
1501}
1502
1503impl<Types, S, P> Fetcher<Types, S, P>
1504where
1505    Types: NodeType,
1506    Header<Types>: QueryableHeader<Types>,
1507    S: VersionedDataSource + 'static,
1508    for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1509    P: Send + Sync,
1510{
1511    async fn sync_status(&self) -> anyhow::Result<SyncStatusQueryData> {
1512        let heights = {
1513            let mut tx = self
1514                .read()
1515                .await
1516                .context("opening transaction to load heights")?;
1517            Heights::load(&mut tx).await.context("loading heights")?
1518        };
1519
1520        let mut res = SyncStatusQueryData {
1521            pruned_height: heights.pruned_height.map(|h| h as usize),
1522            ..Default::default()
1523        };
1524        let start = if let Some(height) = res.pruned_height {
1525            // Add an initial range for pruned data.
1526            let range = SyncStatusRange {
1527                status: SyncStatus::Pruned,
1528                start: 0,
1529                end: height + 1,
1530            };
1531            res.blocks.ranges.push(range);
1532            res.leaves.ranges.push(range);
1533            res.vid_common.ranges.push(range);
1534            res.vid_shares.ranges.push(range);
1535
1536            height + 1
1537        } else {
1538            0
1539        };
1540
1541        // Break the range into manageable chunks, so we don't hold any one database transaction
1542        // open for too long.
1543        for chunk in range_chunks(
1544            start..(heights.height as usize),
1545            self.sync_status_chunk_size,
1546        ) {
1547            tracing::debug!(chunk.start, chunk.end, "checking sync status in sub-range");
1548            let mut tx = self
1549                .read()
1550                .await
1551                .context("opening transaction to sync status range")?;
1552            let range_status = tx
1553                .sync_status_for_range(chunk.start, chunk.end)
1554                .await
1555                .context(format!("checking sync status in sub-range {chunk:?}"))?;
1556            tracing::debug!(
1557                chunk.start,
1558                chunk.end,
1559                ?range_status,
1560                "found sync status for range"
1561            );
1562
1563            res.blocks.extend(range_status.blocks);
1564            res.leaves.extend(range_status.leaves);
1565            res.vid_common.extend(range_status.vid_common);
1566            res.vid_shares.extend(range_status.vid_shares);
1567        }
1568
1569        Ok(res)
1570    }
1571}
1572
1573impl<Types, S, P> Fetcher<Types, S, P>
1574where
1575    Types: NodeType,
1576    Header<Types>: QueryableHeader<Types>,
1577    Payload<Types>: QueryablePayload<Types>,
1578    S: VersionedDataSource + 'static,
1579    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1580    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1581        + NodeStorage<Types>
1582        + PrunedHeightStorage
1583        + AggregatesStorage<Types>,
1584    P: AvailabilityProvider<Types>,
1585{
1586    #[tracing::instrument(skip_all)]
1587    async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1588        loop {
1589            let prev_aggregate = loop {
1590                let mut tx = match self.read().await {
1591                    Ok(tx) => tx,
1592                    Err(err) => {
1593                        tracing::error!("unable to open read tx: {err:#}");
1594                        sleep(Duration::from_secs(5)).await;
1595                        continue;
1596                    },
1597                };
1598                match tx.load_prev_aggregate().await {
1599                    Ok(agg) => break agg,
1600                    Err(err) => {
1601                        tracing::error!("unable to load previous aggregate: {err:#}");
1602                        sleep(Duration::from_secs(5)).await;
1603                        continue;
1604                    },
1605                }
1606            };
1607
1608            let (start, mut prev_aggregate) = match prev_aggregate {
1609                Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1610                None => (0, Aggregate::default()),
1611            };
1612
1613            tracing::info!(start, "starting aggregator");
1614            metrics.height.set(start);
1615
1616            let mut blocks = self
1617                .clone()
1618                .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1619                .then(Fetch::resolve)
1620                .ready_chunks(chunk_size)
1621                .boxed();
1622            while let Some(chunk) = blocks.next().await {
1623                let Some(last) = chunk.last() else {
1624                    // This is not supposed to happen, but if the chunk is empty, just skip it.
1625                    tracing::warn!("ready_chunks returned an empty chunk");
1626                    continue;
1627                };
1628                let height = last.height();
1629                let num_blocks = chunk.len();
1630                tracing::debug!(
1631                    num_blocks,
1632                    height,
1633                    "updating aggregate statistics for chunk"
1634                );
1635                loop {
1636                    let res = async {
1637                        let mut tx = self.write().await.context("opening transaction")?;
1638                        let aggregate =
1639                            tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1640                        tx.commit().await.context("committing transaction")?;
1641                        prev_aggregate = aggregate;
1642                        anyhow::Result::<_>::Ok(())
1643                    }
1644                    .await;
1645                    match res {
1646                        Ok(()) => {
1647                            break;
1648                        },
1649                        Err(err) => {
1650                            tracing::warn!(
1651                                num_blocks,
1652                                height,
1653                                "failed to update aggregates for chunk: {err:#}"
1654                            );
1655                            sleep(Duration::from_secs(1)).await;
1656                        },
1657                    }
1658                }
1659                metrics.height.set(height as usize);
1660            }
1661            tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1662        }
1663    }
1664}
1665
1666impl<Types, S, P> Fetcher<Types, S, P>
1667where
1668    Types: NodeType,
1669    S: VersionedDataSource,
1670    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1671{
1672    /// Store an object and notify anyone waiting on this object that it is available.
1673    async fn store_and_notify<T>(&self, obj: &T)
1674    where
1675        T: Storable<Types>,
1676    {
1677        self.store(obj).await;
1678
1679        // Send a notification about the newly received object. It is important that we do this
1680        // _after_ our attempt to store the object in local storage, otherwise there is a potential
1681        // missed notification deadlock:
1682        // * we send the notification
1683        // * a task calls [`get`](Self::get) or [`get_chunk`](Self::get_chunk), finds that the
1684        //   requested object is not in storage, and begins waiting for a notification
1685        // * we store the object. This ensures that no other task will be triggered to fetch it,
1686        //   which means no one will ever notify the waiting task.
1687        //
1688        // Note that we send the notification regardless of whether the store actually succeeded or
1689        // not. This is to avoid _another_ subtle deadlock: if we failed to notify just because we
1690        // failed to store, some fetches might not resolve, even though the object in question has
1691        // actually been fetched. This should actually be ok, because as long as the object is not
1692        // in storage, eventually some other task will come along and fetch, store, and notify about
1693        // it. However, this is certainly not ideal, since we could resolve those pending fetches
1694        // right now, and it causes bigger problems when the fetch that fails to resolve is the
1695        // proactive scanner task, who is often the one that would eventually come along and
1696        // re-fetch the object.
1697        //
1698        // The key thing to note is that it does no harm to notify even if we fail to store: at best
1699        // we wake some tasks up sooner; at worst, anyone who misses the notification still
1700        // satisfies the invariant that we only wait on notifications for objects which are not in
1701        // storage, and eventually some other task will come along, find the object missing from
1702        // storage, and re-fetch it.
1703        obj.notify(&self.notifiers).await;
1704    }
1705
1706    async fn store<T>(&self, obj: &T)
1707    where
1708        T: Storable<Types>,
1709    {
1710        let try_store = || async {
1711            let mut tx = self.storage.write().await?;
1712            obj.clone().store(&mut tx, self.leaf_only).await?;
1713            tx.commit().await
1714        };
1715
1716        // Store the object in local storage, so we can avoid fetching it in the future.
1717        let mut backoff = self.backoff.clone();
1718        backoff.reset();
1719        loop {
1720            let Err(err) = try_store().await else {
1721                break;
1722            };
1723            // It is unfortunate if this fails, but we can still proceed by notifying with the
1724            // object that we fetched, keeping it in memory. Log the error, retry a few times, and
1725            // eventually move on.
1726            tracing::warn!(
1727                "failed to store fetched {} {}: {err:#}",
1728                T::name(),
1729                obj.height()
1730            );
1731
1732            let Some(delay) = backoff.next_backoff() else {
1733                break;
1734            };
1735            tracing::info!(?delay, "retrying failed operation");
1736            sleep(delay).await;
1737        }
1738    }
1739}
1740
1741#[derive(Debug)]
1742struct Notifiers<Types>
1743where
1744    Types: NodeType,
1745{
1746    block: Notifier<BlockQueryData<Types>>,
1747    leaf: Notifier<LeafQueryData<Types>>,
1748    vid_common: Notifier<VidCommonQueryData<Types>>,
1749}
1750
1751impl<Types> Default for Notifiers<Types>
1752where
1753    Types: NodeType,
1754{
1755    fn default() -> Self {
1756        Self {
1757            block: Notifier::new(),
1758            leaf: Notifier::new(),
1759            vid_common: Notifier::new(),
1760        }
1761    }
1762}
1763
1764#[derive(Clone, Copy, Debug)]
1765struct Heights {
1766    height: u64,
1767    pruned_height: Option<u64>,
1768}
1769
1770impl Heights {
1771    async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1772    where
1773        Types: NodeType,
1774        Header<Types>: QueryableHeader<Types>,
1775        T: NodeStorage<Types> + PrunedHeightStorage + Send,
1776    {
1777        let height = tx.block_height().await.context("loading block height")? as u64;
1778        let pruned_height = tx
1779            .load_pruned_height()
1780            .await
1781            .context("loading pruned height")?;
1782        Ok(Self {
1783            height,
1784            pruned_height,
1785        })
1786    }
1787
1788    fn might_exist(self, h: u64) -> bool {
1789        h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1790    }
1791}
1792
1793#[async_trait]
1794impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1795    for FetchingDataSource<Types, S, P>
1796where
1797    Types: NodeType,
1798    S: VersionedDataSource + 'static,
1799    for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1800    P: Send + Sync,
1801    State: MerklizedState<Types, ARITY> + 'static,
1802    <State as MerkleTreeScheme>::Commitment: Send,
1803{
1804    async fn get_path(
1805        &self,
1806        snapshot: Snapshot<Types, State, ARITY>,
1807        key: State::Key,
1808    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1809        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1810            message: err.to_string(),
1811        })?;
1812        tx.get_path(snapshot, key).await
1813    }
1814}
1815
1816#[async_trait]
1817impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1818where
1819    Types: NodeType,
1820    Header<Types>: QueryableHeader<Types>,
1821    Payload<Types>: QueryablePayload<Types>,
1822    S: VersionedDataSource + 'static,
1823    for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1824    P: Send + Sync,
1825{
1826    async fn get_last_state_height(&self) -> QueryResult<usize> {
1827        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1828            message: err.to_string(),
1829        })?;
1830        tx.get_last_state_height().await
1831    }
1832}
1833
1834#[async_trait]
1835impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1836where
1837    Types: NodeType,
1838    Header<Types>: QueryableHeader<Types>,
1839    S: VersionedDataSource + 'static,
1840    for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1841    P: Send + Sync,
1842{
1843    async fn block_height(&self) -> QueryResult<usize> {
1844        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1845            message: err.to_string(),
1846        })?;
1847        tx.block_height().await
1848    }
1849
1850    async fn count_transactions_in_range(
1851        &self,
1852        range: impl RangeBounds<usize> + Send,
1853        namespace: Option<NamespaceId<Types>>,
1854    ) -> QueryResult<usize> {
1855        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1856            message: err.to_string(),
1857        })?;
1858        tx.count_transactions_in_range(range, namespace).await
1859    }
1860
1861    async fn payload_size_in_range(
1862        &self,
1863        range: impl RangeBounds<usize> + Send,
1864        namespace: Option<NamespaceId<Types>>,
1865    ) -> QueryResult<usize> {
1866        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1867            message: err.to_string(),
1868        })?;
1869        tx.payload_size_in_range(range, namespace).await
1870    }
1871
1872    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1873    where
1874        ID: Into<BlockId<Types>> + Send + Sync,
1875    {
1876        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1877            message: err.to_string(),
1878        })?;
1879        tx.vid_share(id).await
1880    }
1881
1882    async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
1883        self.fetcher
1884            .sync_status()
1885            .await
1886            .map_err(|err| QueryError::Error {
1887                message: format!("{err:#}"),
1888            })
1889    }
1890
1891    async fn get_header_window(
1892        &self,
1893        start: impl Into<WindowStart<Types>> + Send + Sync,
1894        end: u64,
1895        limit: usize,
1896    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
1897        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1898            message: err.to_string(),
1899        })?;
1900        tx.get_header_window(start, end, limit).await
1901    }
1902}
1903
1904#[async_trait]
1905impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
1906where
1907    Types: NodeType,
1908    Payload<Types>: QueryablePayload<Types>,
1909    Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
1910    crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
1911    S: VersionedDataSource + 'static,
1912    for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
1913    P: Send + Sync,
1914{
1915    async fn get_block_summaries(
1916        &self,
1917        request: explorer::query_data::GetBlockSummariesRequest<Types>,
1918    ) -> Result<
1919        Vec<explorer::query_data::BlockSummary<Types>>,
1920        explorer::query_data::GetBlockSummariesError,
1921    > {
1922        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1923            message: err.to_string(),
1924        })?;
1925        tx.get_block_summaries(request).await
1926    }
1927
1928    async fn get_block_detail(
1929        &self,
1930        request: explorer::query_data::BlockIdentifier<Types>,
1931    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
1932    {
1933        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1934            message: err.to_string(),
1935        })?;
1936        tx.get_block_detail(request).await
1937    }
1938
1939    async fn get_transaction_summaries(
1940        &self,
1941        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
1942    ) -> Result<
1943        Vec<explorer::query_data::TransactionSummary<Types>>,
1944        explorer::query_data::GetTransactionSummariesError,
1945    > {
1946        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1947            message: err.to_string(),
1948        })?;
1949        tx.get_transaction_summaries(request).await
1950    }
1951
1952    async fn get_transaction_detail(
1953        &self,
1954        request: explorer::query_data::TransactionIdentifier<Types>,
1955    ) -> Result<
1956        explorer::query_data::TransactionDetailResponse<Types>,
1957        explorer::query_data::GetTransactionDetailError,
1958    > {
1959        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1960            message: err.to_string(),
1961        })?;
1962        tx.get_transaction_detail(request).await
1963    }
1964
1965    async fn get_explorer_summary(
1966        &self,
1967    ) -> Result<
1968        explorer::query_data::ExplorerSummary<Types>,
1969        explorer::query_data::GetExplorerSummaryError,
1970    > {
1971        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1972            message: err.to_string(),
1973        })?;
1974        tx.get_explorer_summary().await
1975    }
1976
1977    async fn get_search_results(
1978        &self,
1979        query: TaggedBase64,
1980    ) -> Result<
1981        explorer::query_data::SearchResult<Types>,
1982        explorer::query_data::GetSearchResultsError,
1983    > {
1984        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1985            message: err.to_string(),
1986        })?;
1987        tx.get_search_results(query).await
1988    }
1989}
1990
1991/// A provider which can be used as a fetcher by the availability service.
1992pub trait AvailabilityProvider<Types: NodeType>:
1993    Provider<Types, request::LeafRequest<Types>>
1994    + Provider<Types, request::PayloadRequest>
1995    + Provider<Types, request::VidCommonRequest>
1996    + Sync
1997    + 'static
1998{
1999}
2000impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2001    P: Provider<Types, request::LeafRequest<Types>>
2002        + Provider<Types, request::PayloadRequest>
2003        + Provider<Types, request::VidCommonRequest>
2004        + Sync
2005        + 'static
2006{
2007}
2008
2009trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2010    /// Indicate whether it is possible this object could exist.
2011    ///
2012    /// This can filter out requests quickly for objects that cannot possibly exist, such as
2013    /// requests for objects with a height greater than the current block height. Not only does this
2014    /// let us fail faster for such requests (without touching storage at all), it also helps keep
2015    /// logging quieter when we fail to fetch an object because the user made a bad request, while
2016    /// still being fairly loud when we fail to fetch an object that might have really existed.
2017    ///
2018    /// This method is conservative: it returns `true` if it cannot tell whether the given object
2019    /// could exist or not.
2020    fn might_exist(self, _heights: Heights) -> bool {
2021        true
2022    }
2023}
2024
2025/// Objects which can be fetched from a remote DA provider and cached in local storage.
2026///
2027/// This trait lets us abstract over leaves, blocks, and other types that can be fetched. Thus, the
2028/// logistics of fetching are shared between all objects, and only the low-level particulars are
2029/// type-specific.
2030#[async_trait]
2031trait Fetchable<Types>: Clone + Send + Sync + 'static
2032where
2033    Types: NodeType,
2034    Header<Types>: QueryableHeader<Types>,
2035    Payload<Types>: QueryablePayload<Types>,
2036{
2037    /// A succinct specification of the object to be fetched.
2038    type Request: FetchRequest;
2039
2040    /// Does this object satisfy the given request?
2041    fn satisfies(&self, req: Self::Request) -> bool;
2042
2043    /// Spawn a task to fetch the object from a remote provider, if possible.
2044    ///
2045    /// An active fetch will only be triggered if:
2046    /// * There is not already an active fetch in progress for the same object
2047    /// * The requested object is known to exist. For example, we will fetch a leaf by height but
2048    ///   not by hash, since we can't guarantee that a leaf with an arbitrary hash exists. Note that
2049    ///   this function assumes `req.might_exist()` has already been checked before calling it, and
2050    ///   so may do unnecessary work if the caller does not ensure this.
2051    ///
2052    /// If we do trigger an active fetch for an object, any passive listeners for the object will be
2053    /// notified once it has been retrieved. If we do not trigger an active fetch for an object,
2054    /// this function does nothing. In either case, as long as the requested object does in fact
2055    /// exist, we will eventually receive it passively, since we will eventually receive all blocks
2056    /// and leaves that are ever produced. Active fetching merely helps us receive certain objects
2057    /// sooner.
2058    ///
2059    /// This function fails if it _might_ be possible to actively fetch the requested object, but we
2060    /// were unable to do so (e.g. due to errors in the database).
2061    async fn active_fetch<S, P>(
2062        tx: &mut impl AvailabilityStorage<Types>,
2063        fetcher: Arc<Fetcher<Types, S, P>>,
2064        req: Self::Request,
2065    ) -> anyhow::Result<()>
2066    where
2067        S: VersionedDataSource + 'static,
2068        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2069        for<'a> S::ReadOnly<'a>:
2070            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2071        P: AvailabilityProvider<Types>;
2072
2073    /// Wait for someone else to fetch the object.
2074    async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2075
2076    /// Load an object from local storage.
2077    ///
2078    /// This function assumes `req.might_exist()` has already been checked before calling it, and so
2079    /// may do unnecessary work if the caller does not ensure this.
2080    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2081    where
2082        S: AvailabilityStorage<Types>;
2083}
2084
2085type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2086
2087#[async_trait]
2088trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2089where
2090    Types: NodeType,
2091    Header<Types>: QueryableHeader<Types>,
2092    Payload<Types>: QueryablePayload<Types>,
2093{
2094    type RangedRequest: FetchRequest + From<usize> + Send;
2095
2096    /// Load a range of these objects from local storage.
2097    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2098    where
2099        S: AvailabilityStorage<Types>,
2100        R: RangeBounds<usize> + Send + 'static;
2101}
2102
2103/// An object which can be stored in the database.
2104trait Storable<Types: NodeType>: HeightIndexed + Clone {
2105    /// The name of this type of object, for debugging purposes.
2106    fn name() -> &'static str;
2107
2108    /// Notify anyone waiting for this object that it has become available.
2109    fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2110
2111    /// Store the object in the local database.
2112    fn store(
2113        self,
2114        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2115        leaf_only: bool,
2116    ) -> impl Send + Future<Output = anyhow::Result<()>>;
2117}
2118
2119impl<Types: NodeType> HeightIndexed
2120    for (LeafQueryData<Types>, Option<[CertificatePair<Types>; 2]>)
2121{
2122    fn height(&self) -> u64 {
2123        self.0.height()
2124    }
2125}
2126
2127impl<Types: NodeType> Storable<Types>
2128    for (LeafQueryData<Types>, Option<[CertificatePair<Types>; 2]>)
2129{
2130    fn name() -> &'static str {
2131        "leaf with QC chain"
2132    }
2133
2134    async fn notify(&self, notifiers: &Notifiers<Types>) {
2135        self.0.notify(notifiers).await;
2136    }
2137
2138    async fn store(
2139        self,
2140        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2141        _leaf_only: bool,
2142    ) -> anyhow::Result<()> {
2143        storage.insert_leaf_with_qc_chain(self.0, self.1).await
2144    }
2145}
2146
2147/// Break a range into fixed-size chunks.
2148fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2149where
2150    R: RangeBounds<usize>,
2151{
2152    // Transform range to explicit start (inclusive) and end (exclusive) bounds.
2153    let mut start = match range.start_bound() {
2154        Bound::Included(i) => *i,
2155        Bound::Excluded(i) => *i + 1,
2156        Bound::Unbounded => 0,
2157    };
2158    let end = match range.end_bound() {
2159        Bound::Included(i) => *i + 1,
2160        Bound::Excluded(i) => *i,
2161        Bound::Unbounded => usize::MAX,
2162    };
2163    std::iter::from_fn(move || {
2164        let chunk_end = min(start + chunk_size, end);
2165        if chunk_end == start {
2166            return None;
2167        }
2168
2169        let chunk = start..chunk_end;
2170        start = chunk_end;
2171        Some(chunk)
2172    })
2173}
2174
2175/// Break a range into fixed-size chunks, starting from the end and moving towards the start.
2176///
2177/// While the chunks are yielded in reverse order, from `end` to `start`, each individual chunk is
2178/// in the usual ascending order. That is, the first chunk ends with `end` and the last chunk starts
2179/// with `start`.
2180///
2181/// Note that unlike [`range_chunks_rev`], which accepts any range and yields an infinite iterator
2182/// if the range has no upper bound, this function requires there to be a defined upper bound,
2183/// otherwise we don't know where the reversed iterator should _start_. The `end` bound given here
2184/// is inclusive; i.e. the end of the first chunk yielded by the stream will be exactly `end`.
2185fn range_chunks_rev(
2186    start: Bound<usize>,
2187    end: usize,
2188    chunk_size: usize,
2189) -> impl Iterator<Item = Range<usize>> {
2190    // Transform the start bound to be inclusive.
2191    let start = match start {
2192        Bound::Included(i) => i,
2193        Bound::Excluded(i) => i + 1,
2194        Bound::Unbounded => 0,
2195    };
2196    // Transform the end bound to be exclusive.
2197    let mut end = end + 1;
2198
2199    std::iter::from_fn(move || {
2200        let chunk_start = max(start, end.saturating_sub(chunk_size));
2201        if end <= chunk_start {
2202            return None;
2203        }
2204
2205        let chunk = chunk_start..end;
2206        end = chunk_start;
2207        Some(chunk)
2208    })
2209}
2210
2211trait ResultExt<T, E> {
2212    fn ok_or_trace(self) -> Option<T>
2213    where
2214        E: Display;
2215}
2216
2217impl<T, E> ResultExt<T, E> for Result<T, E> {
2218    fn ok_or_trace(self) -> Option<T>
2219    where
2220        E: Display,
2221    {
2222        match self {
2223            Ok(t) => Some(t),
2224            Err(err) => {
2225                tracing::info!(
2226                    "error loading resource from local storage, will try to fetch: {err:#}"
2227                );
2228                None
2229            },
2230        }
2231    }
2232}
2233
2234#[derive(Debug)]
2235struct ScannerMetrics {
2236    /// Whether a scan is currently running (1) or not (0).
2237    running: Box<dyn Gauge>,
2238    /// The current number that is running.
2239    current_scan: Box<dyn Gauge>,
2240    /// Number of blocks processed in the current scan.
2241    scanned_blocks: Box<dyn Gauge>,
2242    /// Number of VID entries processed in the current scan.
2243    scanned_vid: Box<dyn Gauge>,
2244    /// The number of missing blocks discovered and not yet resolved in the current scan.
2245    missing_blocks: Box<dyn Gauge>,
2246    /// The number of missing VID entries discovered and not yet resolved in the current scan.
2247    missing_vid: Box<dyn Gauge>,
2248}
2249
2250impl ScannerMetrics {
2251    fn new(metrics: &PrometheusMetrics) -> Self {
2252        let group = metrics.subgroup("scanner".into());
2253        Self {
2254            running: group.create_gauge("running".into(), None),
2255            current_scan: group.create_gauge("current".into(), None),
2256            scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2257            scanned_vid: group.create_gauge("scanned_vid".into(), None),
2258            missing_blocks: group.create_gauge("missing_blocks".into(), None),
2259            missing_vid: group.create_gauge("missing_vid".into(), None),
2260        }
2261    }
2262}
2263
2264#[derive(Debug)]
2265struct AggregatorMetrics {
2266    /// The block height for which aggregate statistics are currently available.
2267    height: Box<dyn Gauge>,
2268}
2269
2270impl AggregatorMetrics {
2271    fn new(metrics: &PrometheusMetrics) -> Self {
2272        let group = metrics.subgroup("aggregator".into());
2273        Self {
2274            height: group.create_gauge("height".into(), None),
2275        }
2276    }
2277}
2278
2279/// Turn a fallible passive fetch future into an infallible "fetch".
2280///
2281/// Basically, we ignore failures due to a channel sender being dropped, which should never happen.
2282fn passive<T>(
2283    req: impl Debug + Send + 'static,
2284    fut: impl Future<Output = Option<T>> + Send + 'static,
2285) -> Fetch<T>
2286where
2287    T: Send + 'static,
2288{
2289    Fetch::Pending(
2290        fut.then(move |opt| async move {
2291            match opt {
2292                Some(t) => t,
2293                None => {
2294                    // If `passive_fetch` returns `None`, it means the notifier was dropped without
2295                    // ever sending a notification. In this case, the correct behavior is actually
2296                    // to block forever (unless the `Fetch` itself is dropped), since the semantics
2297                    // of `Fetch` are to never fail. This is analogous to fetching an object which
2298                    // doesn't actually exist: the `Fetch` will never return.
2299                    //
2300                    // However, for ease of debugging, and since this is never expected to happen in
2301                    // normal usage, we panic instead. This should only happen in two cases:
2302                    // * The server was shut down (dropping the notifier) without cleaning up some
2303                    //   background tasks. This will not affect runtime behavior, but should be
2304                    //   fixed if it happens.
2305                    // * There is a very unexpected runtime bug resulting in the notifier being
2306                    //   dropped. If this happens, things are very broken in any case, and it is
2307                    //   better to panic loudly than simply block forever.
2308                    panic!("notifier dropped without satisfying request {req:?}");
2309                },
2310            }
2311        })
2312        .boxed(),
2313    )
2314}
2315
2316/// Get the result of the first future to return `Some`, if either do.
2317async fn select_some<T>(
2318    a: impl Future<Output = Option<T>> + Unpin,
2319    b: impl Future<Output = Option<T>> + Unpin,
2320) -> Option<T> {
2321    match future::select(a, b).await {
2322        // If the first future resolves with `Some`, immediately return the result.
2323        Either::Left((Some(a), _)) => Some(a),
2324        Either::Right((Some(b), _)) => Some(b),
2325
2326        // If the first future resolves with `None`, wait for the result of the second future.
2327        Either::Left((None, b)) => b.await,
2328        Either::Right((None, a)) => a.await,
2329    }
2330}
2331
2332#[cfg(test)]
2333mod test {
2334    use hotshot_example_types::node_types::TEST_VERSIONS;
2335
2336    use super::*;
2337    use crate::{
2338        data_source::{
2339            sql::testing::TmpDb,
2340            storage::{SqlStorage, StorageConnectionType},
2341        },
2342        fetching::provider::NoFetching,
2343        testing::{consensus::MockSqlDataSource, mocks::MockTypes},
2344    };
2345
2346    #[test]
2347    fn test_range_chunks() {
2348        // Inclusive bounds, partial last chunk.
2349        assert_eq!(
2350            range_chunks(0..=4, 2).collect::<Vec<_>>(),
2351            [0..2, 2..4, 4..5]
2352        );
2353
2354        // Inclusive bounds, complete last chunk.
2355        assert_eq!(
2356            range_chunks(0..=5, 2).collect::<Vec<_>>(),
2357            [0..2, 2..4, 4..6]
2358        );
2359
2360        // Exclusive bounds, partial last chunk.
2361        assert_eq!(
2362            range_chunks(0..5, 2).collect::<Vec<_>>(),
2363            [0..2, 2..4, 4..5]
2364        );
2365
2366        // Exclusive bounds, complete last chunk.
2367        assert_eq!(
2368            range_chunks(0..6, 2).collect::<Vec<_>>(),
2369            [0..2, 2..4, 4..6]
2370        );
2371
2372        // Unbounded.
2373        assert_eq!(
2374            range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2375            [0..2, 2..4, 4..6, 6..8, 8..10]
2376        );
2377    }
2378
2379    #[test]
2380    fn test_range_chunks_rev() {
2381        // Inclusive bounds, partial last chunk.
2382        assert_eq!(
2383            range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2384            [3..5, 1..3, 0..1]
2385        );
2386
2387        // Inclusive bounds, complete last chunk.
2388        assert_eq!(
2389            range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2390            [4..6, 2..4, 0..2]
2391        );
2392
2393        // Exclusive bounds, partial last chunk.
2394        assert_eq!(
2395            range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2396            [4..6, 2..4, 1..2]
2397        );
2398
2399        // Exclusive bounds, complete last chunk.
2400        assert_eq!(
2401            range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2402            [3..5, 1..3]
2403        );
2404    }
2405
2406    async fn test_sync_status(chunk_size: usize, present_ranges: &[(usize, usize)]) {
2407        let block_height = present_ranges.last().unwrap().1;
2408        let storage = TmpDb::init().await;
2409        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2410            .await
2411            .unwrap();
2412        let ds = MockSqlDataSource::builder(db, NoFetching)
2413            .with_sync_status_chunk_size(chunk_size)
2414            .build()
2415            .await
2416            .unwrap();
2417
2418        // Generate some mock leaves to insert.
2419        let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
2420            LeafQueryData::<MockTypes>::genesis(
2421                &Default::default(),
2422                &Default::default(),
2423                TEST_VERSIONS.test,
2424            )
2425            .await,
2426        ];
2427        for i in 1..block_height {
2428            let mut leaf = leaves[i - 1].clone();
2429            leaf.leaf.block_header_mut().block_number = i as u64;
2430            leaves.push(leaf);
2431        }
2432
2433        // Set up.
2434        {
2435            let mut tx = ds.write().await.unwrap();
2436
2437            for &(start, end) in present_ranges {
2438                for leaf in leaves[start..end].iter() {
2439                    tracing::info!(height = leaf.height(), "insert leaf");
2440                    tx.insert_leaf(leaf.clone()).await.unwrap();
2441                }
2442            }
2443
2444            if present_ranges[0].0 > 0 {
2445                tx.save_pruned_height((present_ranges[0].0 - 1) as u64)
2446                    .await
2447                    .unwrap();
2448            }
2449
2450            tx.commit().await.unwrap();
2451        }
2452
2453        let sync_status = ds.sync_status().await.unwrap().leaves;
2454
2455        // Verify missing.
2456        let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
2457        assert_eq!(
2458            sync_status.missing,
2459            block_height - present - present_ranges[0].0
2460        );
2461
2462        // Verify ranges.
2463        let mut ranges = sync_status.ranges.into_iter();
2464        let mut prev = 0;
2465        for &(start, end) in present_ranges {
2466            if start != prev {
2467                let range = ranges.next().unwrap();
2468                assert_eq!(
2469                    range,
2470                    SyncStatusRange {
2471                        start: prev,
2472                        end: start,
2473                        status: if prev == 0 {
2474                            SyncStatus::Pruned
2475                        } else {
2476                            SyncStatus::Missing
2477                        },
2478                    }
2479                );
2480            }
2481            let range = ranges.next().unwrap();
2482            assert_eq!(
2483                range,
2484                SyncStatusRange {
2485                    start,
2486                    end,
2487                    status: SyncStatus::Present,
2488                }
2489            );
2490            prev = end;
2491        }
2492
2493        if prev != block_height {
2494            let range = ranges.next().unwrap();
2495            assert_eq!(
2496                range,
2497                SyncStatusRange {
2498                    start: prev,
2499                    end: block_height,
2500                    status: SyncStatus::Missing,
2501                }
2502            );
2503        }
2504
2505        assert_eq!(ranges.next(), None);
2506    }
2507
2508    #[tokio::test]
2509    #[test_log::test]
2510    async fn test_sync_status_multiple_chunks() {
2511        test_sync_status(10, &[(0, 1), (3, 5), (8, 10)]).await;
2512    }
2513
2514    #[tokio::test]
2515    #[test_log::test]
2516    async fn test_sync_status_multiple_chunks_present_range_overlapping_chunk() {
2517        test_sync_status(5, &[(1, 4)]).await;
2518    }
2519
2520    #[tokio::test]
2521    #[test_log::test]
2522    async fn test_sync_status_multiple_chunks_missing_range_overlapping_chunk() {
2523        test_sync_status(5, &[(0, 1), (4, 5)]).await;
2524    }
2525}