hotshot_query_service/data_source/
fetching.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Asynchronous retrieval of missing data.
14//!
15//! [`FetchingDataSource`] combines a local storage implementation with a remote data availability
16//! provider to create a data sources which caches data locally, but which is capable of fetching
17//! missing data from a remote source, either proactively or on demand.
18//!
19//! This implementation supports three kinds of data fetching.
20//!
21//! # Proactive Fetching
22//!
23//! Proactive fetching means actively scanning the local database for missing objects and
24//! proactively retrieving them from a remote provider, even if those objects have yet to be
25//! requested by a client. Doing this increases the chance of success and decreases latency when a
26//! client does eventually ask for those objects. This is also the mechanism by which a query
27//! service joining a network late, or having been offline for some time, is able to catch up with
28//! the events on the network that it missed.
29//!
30//! The current implementation of proactive fetching is meant to be the simplest effective algorithm
31//! which still gives us a reasonable range of configuration options for experimentation. It is
32//! subject to change as we learn about the behavior of proactive fetching in a realistic system.
33//!
34//! Proactive fetching is currently implemented by a background task which performs periodic scans
35//! of the database, identifying and retrieving missing objects. This task is generally low
36//! priority, since missing objects are rare, and it will take care not to monopolize resources that
37//! could be used to serve requests. To reduce load and to optimize for the common case where blocks
38//! are usually not missing once they have already been retrieved, we distinguish between _major_
39//! and _minor_ scans.
40//!
41//! Minor scans are lightweight and can run very frequently. They will only look for missing
42//! blocks among blocks that are new since the previous scan. Thus, the more frequently minor
43//! scans run, the less work they have to do. This allows them to run frequently, giving low
44//! latency for retrieval of newly produced blocks that we failed to receive initially. Between
45//! each minor scan, the task will sleep for [a configurable
46//! duration](Builder::with_minor_scan_interval) to wait for new blocks to be produced and give
47//! other tasks full access to all shared resources.
48//!
49//! Every `n`th scan (`n` is [configurable](Builder::with_major_scan_interval)) is a major scan.
50//! These scan all blocks from 0, which guarantees that we will eventually retrieve all blocks, even
51//! if for some reason we have lost a block that we previously had (due to storage failures and
52//! corruptions, or simple bugs in this software). These scans are rather expensive (although they
53//! will release control of shared resources many times during the duration of the scan), but
54//! because it is rather unlikely that a major scan will discover any missing blocks that the next
55//! minor scan would have missed, it is ok if major scans run very infrequently.
56//!
57//! # Active Fetching
58//!
59//! Active fetching means reaching out to a remote data availability provider to retrieve a missing
60//! resource, upon receiving a request for that resource from a client. Not every request for a
61//! missing resource triggers an active fetch. To avoid spamming peers with requests for missing
62//! data, we only actively fetch resources that are known to exist somewhere. This means we can
63//! actively fetch leaves and headers when we are requested a leaf or header by height, whose height
64//! is less than the current chain height. We can fetch a block when the corresponding header exists
65//! (corresponding based on height, hash, or payload hash) or can be actively fetched.
66//!
67//! # Passive Fetching
68//!
69//! For requests that cannot be actively fetched (for example, a block requested by hash, where we
70//! do not have a header proving that a block with that hash exists), we use passive fetching. This
71//! essentially means waiting passively until the query service receives an object that satisfies
72//! the request. This object may be received because it was actively fetched in responsive to a
73//! different request for the same object, one that permitted an active fetch. Or it may have been
74//! fetched [proactively](#proactive-fetching).
75
76use std::{
77    cmp::{max, min},
78    fmt::{Debug, Display},
79    iter::repeat_with,
80    marker::PhantomData,
81    ops::{Bound, Range, RangeBounds},
82    sync::Arc,
83    time::Duration,
84};
85
86use anyhow::{bail, Context};
87use async_lock::Semaphore;
88use async_trait::async_trait;
89use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder};
90use derivative::Derivative;
91use futures::{
92    channel::oneshot,
93    future::{self, join_all, BoxFuture, Either, Future, FutureExt},
94    stream::{self, BoxStream, StreamExt},
95};
96use hotshot_types::{
97    data::VidShare,
98    traits::{
99        metrics::{Gauge, Metrics},
100        node_implementation::NodeType,
101    },
102};
103use jf_merkle_tree_compat::{prelude::MerkleProof, MerkleTreeScheme};
104use tagged_base64::TaggedBase64;
105use tokio::{spawn, time::sleep};
106use tracing::Instrument;
107
108use super::{
109    notifier::Notifier,
110    storage::{
111        pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
112        sql::MigrateTypes,
113        Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
114        MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
115        UpdateAvailabilityStorage,
116    },
117    Transaction, VersionedDataSource,
118};
119use crate::{
120    availability::{
121        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
122        FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
123        PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
124        UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
125    },
126    explorer::{self, ExplorerDataSource},
127    fetching::{self, request, Provider},
128    merklized_state::{
129        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
130    },
131    metrics::PrometheusMetrics,
132    node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
133    status::{HasMetrics, StatusDataSource},
134    task::BackgroundTask,
135    types::HeightIndexed,
136    Header, Payload, QueryError, QueryResult,
137};
138
139mod block;
140mod header;
141mod leaf;
142mod transaction;
143mod vid;
144
145use self::{
146    block::PayloadFetcher,
147    leaf::LeafFetcher,
148    transaction::TransactionRequest,
149    vid::{VidCommonFetcher, VidCommonRequest},
150};
151
152/// Builder for [`FetchingDataSource`] with configuration.
153pub struct Builder<Types, S, P> {
154    storage: S,
155    provider: P,
156    backoff: ExponentialBackoffBuilder,
157    rate_limit: usize,
158    range_chunk_size: usize,
159    minor_scan_interval: Duration,
160    major_scan_interval: usize,
161    major_scan_offset: usize,
162    proactive_range_chunk_size: Option<usize>,
163    active_fetch_delay: Duration,
164    chunk_fetch_delay: Duration,
165    proactive_fetching: bool,
166    aggregator: bool,
167    aggregator_chunk_size: Option<usize>,
168    types_migration_batch_size: u64,
169    leaf_only: bool,
170    _types: PhantomData<Types>,
171}
172
173impl<Types, S, P> Builder<Types, S, P> {
174    /// Construct a new builder with the given storage and fetcher and the default options.
175    pub fn new(storage: S, provider: P) -> Self {
176        let mut default_backoff = ExponentialBackoffBuilder::default();
177        default_backoff
178            .with_initial_interval(Duration::from_secs(1))
179            .with_multiplier(2.)
180            .with_max_interval(Duration::from_secs(32))
181            .with_max_elapsed_time(Some(Duration::from_secs(64)));
182
183        Self {
184            storage,
185            provider,
186            backoff: default_backoff,
187            rate_limit: 32,
188            range_chunk_size: 25,
189            // By default, we run minor proactive scans fairly frequently: once every minute. These
190            // scans are cheap (more the more frequently they run) and can help us keep up with
191            // the head of the chain even if our attached consensus instance is behind.
192            minor_scan_interval: Duration::from_secs(60),
193            // Major scans, on the other hand, are rather expensive and not especially important for
194            // usability. We run them rarely, once every 60 minor scans, or once every hour by
195            // default.
196            major_scan_interval: 60,
197            // Major scan offset can be used when starting multiple nodes at the same time, so they
198            // don't all pause for a major scan together.
199            major_scan_offset: 0,
200            proactive_range_chunk_size: None,
201            active_fetch_delay: Duration::from_millis(50),
202            chunk_fetch_delay: Duration::from_millis(100),
203            proactive_fetching: true,
204            aggregator: true,
205            aggregator_chunk_size: None,
206            types_migration_batch_size: 10000,
207            leaf_only: false,
208            _types: Default::default(),
209        }
210    }
211
212    pub fn leaf_only(mut self) -> Self {
213        self.leaf_only = true;
214        self
215    }
216
217    /// Set the minimum delay between retries of failed operations.
218    pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
219        self.backoff.with_initial_interval(interval);
220        self
221    }
222
223    /// Set the maximum delay between retries of failed operations.
224    pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
225        self.backoff.with_max_interval(interval);
226        self
227    }
228
229    /// Set the multiplier for exponential backoff when retrying failed requests.
230    pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
231        self.backoff.with_multiplier(multiplier);
232        self
233    }
234
235    /// Set the randomization factor for randomized backoff when retrying failed requests.
236    pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
237        self.backoff.with_randomization_factor(factor);
238        self
239    }
240
241    /// Set the maximum time to retry failed operations before giving up.
242    pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
243        self.backoff.with_max_elapsed_time(Some(timeout));
244        self
245    }
246
247    /// Set the maximum number of simultaneous fetches.
248    pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
249        self.rate_limit = with_rate_limit;
250        self
251    }
252
253    /// Set the number of items to process at a time when loading a range or stream.
254    ///
255    /// This determines:
256    /// * The number of objects to load from storage in a single request
257    /// * The number of objects to buffer in memory per request/stream
258    /// * The number of concurrent notification subscriptions per request/stream
259    pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
260        self.range_chunk_size = range_chunk_size;
261        self
262    }
263
264    /// Set the time interval between minor proactive fetching scans.
265    ///
266    /// See [proactive fetching](self#proactive-fetching).
267    pub fn with_minor_scan_interval(mut self, interval: Duration) -> Self {
268        self.minor_scan_interval = interval;
269        self
270    }
271
272    /// Set the interval (denominated in [minor scans](Self::with_minor_scan_interval)) between
273    /// major proactive fetching scans.
274    ///
275    /// See [proactive fetching](self#proactive-fetching).
276    pub fn with_major_scan_interval(mut self, interval: usize) -> Self {
277        self.major_scan_interval = interval;
278        self
279    }
280
281    /// Set the offset (denominated in [minor scans](Self::with_minor_scan_interval)) before the
282    /// first major proactive fetching scan.
283    ///
284    /// This is useful when starting multiple nodes at the same time: major proactive scans can have
285    /// a measurable impact on the performance of the node for a brief time while the scan is
286    /// running, so it may be desirable to prevent a group of nodes from all doing major scans at
287    /// the same time. This can be achieved by giving each node a different `major_scan_offset`.
288    ///
289    /// See also [proactive fetching](self#proactive-fetching).
290    pub fn with_major_scan_offset(mut self, offset: usize) -> Self {
291        self.major_scan_offset = offset;
292        self
293    }
294
295    /// Set the number of items to process at a time when scanning for proactive fetching.
296    ///
297    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
298    /// proactive fetching scans, not for normal subscription streams. This can be useful to tune
299    /// the proactive scanner to be more or less greedy with persistent storage resources.
300    ///
301    /// By default (i.e. if this method is not called) the proactive range chunk size will be set to
302    /// whatever the normal range chunk size is.
303    pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
304        self.proactive_range_chunk_size = Some(range_chunk_size);
305        self
306    }
307
308    /// Add a delay between active fetches in proactive scans.
309    ///
310    /// This can be used to limit the rate at which this query service makes requests to other query
311    /// services during proactive scans. This is useful if the query service has a lot of blocks to
312    /// catch up on, as without a delay, scanning can be extremely burdensome on the peer.
313    pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
314        self.active_fetch_delay = active_fetch_delay;
315        self
316    }
317
318    /// Adds a delay between chunk fetches during proactive scans.
319    ///
320    /// In a proactive scan, we retrieve a range of objects from a provider or local storage (e.g., a database).
321    /// Without a delay between fetching these chunks, the process can become very CPU-intensive, especially
322    /// when chunks are retrieved from local storage. While there is already a delay for active fetches
323    /// (`active_fetch_delay`), situations may arise when subscribed to an old stream that fetches most of the data
324    /// from local storage.
325    ///
326    /// This additional delay helps to limit constant maximum CPU usage
327    /// and ensures that local storage remains accessible to all processes,
328    /// not just the proactive scanner.
329    pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
330        self.chunk_fetch_delay = chunk_fetch_delay;
331        self
332    }
333
334    /// Run without [proactive fetching](self#proactive-fetching).
335    ///
336    /// This can reduce load on the CPU and the database, but increases the probability that
337    /// requests will fail due to missing resources. If resources are constrained, it is recommended
338    /// to run with rare proactive fetching (see
339    /// [`with_major_scan_interval`](Self::with_major_scan_interval),
340    /// [`with_minor_scan_interval`](Self::with_minor_scan_interval)), rather than disabling it
341    /// entirely.
342    pub fn disable_proactive_fetching(mut self) -> Self {
343        self.proactive_fetching = false;
344        self
345    }
346
347    /// Run without an aggregator.
348    ///
349    /// This can reduce load on the CPU and the database, but it will cause aggregate statistics
350    /// (such as transaction counts) not to update.
351    pub fn disable_aggregator(mut self) -> Self {
352        self.aggregator = false;
353        self
354    }
355
356    /// Set the number of items to process at a time when computing aggregate statistics.
357    ///
358    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
359    /// the aggregator task, not for normal subscription streams. This can be useful to tune
360    /// the aggregator to be more or less greedy with persistent storage resources.
361    ///
362    /// By default (i.e. if this method is not called) the proactive range chunk size will be set to
363    /// whatever the normal range chunk size is.
364    pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
365        self.aggregator_chunk_size = Some(chunk_size);
366        self
367    }
368
369    /// Sets the batch size for the types migration.
370    /// Determines how many `(leaf, vid)` rows are selected from the old types table
371    /// and migrated at once.
372    pub fn with_types_migration_batch_size(mut self, batch: u64) -> Self {
373        self.types_migration_batch_size = batch;
374        self
375    }
376
377    pub fn is_leaf_only(&self) -> bool {
378        self.leaf_only
379    }
380}
381
382impl<Types, S, P> Builder<Types, S, P>
383where
384    Types: NodeType,
385    Payload<Types>: QueryablePayload<Types>,
386    Header<Types>: QueryableHeader<Types>,
387    S: PruneStorage + VersionedDataSource + HasMetrics + MigrateTypes<Types> + 'static,
388    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
389        + PrunedHeightStorage
390        + NodeStorage<Types>
391        + AggregatesStorage<Types>,
392    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
393    P: AvailabilityProvider<Types>,
394{
395    /// Build a [`FetchingDataSource`] with these options.
396    pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
397        FetchingDataSource::new(self).await
398    }
399}
400
401/// The most basic kind of data source.
402///
403/// A data source is constructed modularly by combining a [storage](super::storage) implementation
404/// with a [Fetcher](crate::fetching::Fetcher). The former allows the query service to store the
405/// data it has persistently in an easily accessible storage medium, such as the local file system
406/// or a database. This allows it to answer queries efficiently and to maintain its state across
407/// restarts. The latter allows the query service to fetch data that is missing from its storage
408/// from an external data availability provider, such as the Tiramisu DA network or another instance
409/// of the query service.
410///
411/// These two components of a data source are combined in [`FetchingDataSource`], which is the
412/// lowest level kind of data source available. It simply uses the storage implementation to fetch
413/// data when available, and fills in everything else using the fetcher. Various kinds of data
414/// sources can be constructed out of [`FetchingDataSource`] by changing the storage and fetcher
415/// implementations used, and more complex data sources can be built on top using data source
416/// combinators.
417#[derive(Derivative)]
418#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
419pub struct FetchingDataSource<Types, S, P>
420where
421    Types: NodeType,
422{
423    // The fetcher manages retrieval of resources from both local storage and a remote provider. It
424    // encapsulates the data which may need to be shared with a long-lived task or future that
425    // implements the asynchronous fetching of a particular object. This is why it gets its own
426    // type, wrapped in an [`Arc`] for easy, efficient cloning.
427    fetcher: Arc<Fetcher<Types, S, P>>,
428    // The proactive scanner task. This is only saved here so that we can cancel it on drop.
429    scanner: Option<BackgroundTask>,
430    // The aggregator task, which derives aggregate statistics from a block stream.
431    aggregator: Option<BackgroundTask>,
432    pruner: Pruner<Types, S>,
433}
434
435#[derive(Derivative)]
436#[derivative(Clone(bound = ""), Debug(bound = "S: Debug,   "))]
437pub struct Pruner<Types, S>
438where
439    Types: NodeType,
440{
441    handle: Option<BackgroundTask>,
442    _types: PhantomData<(Types, S)>,
443}
444
445impl<Types, S> Pruner<Types, S>
446where
447    Types: NodeType,
448    Header<Types>: QueryableHeader<Types>,
449    Payload<Types>: QueryablePayload<Types>,
450    S: PruneStorage + Send + Sync + 'static,
451{
452    async fn new(storage: Arc<S>) -> Self {
453        let cfg = storage.get_pruning_config();
454        let Some(cfg) = cfg else {
455            return Self {
456                handle: None,
457                _types: Default::default(),
458            };
459        };
460
461        let future = async move {
462            for i in 1.. {
463                tracing::warn!("starting pruner run {i} ");
464                Self::prune(storage.clone()).await;
465                sleep(cfg.interval()).await;
466            }
467        };
468
469        let task = BackgroundTask::spawn("pruner", future);
470
471        Self {
472            handle: Some(task),
473            _types: Default::default(),
474        }
475    }
476
477    async fn prune(storage: Arc<S>) {
478        // We loop until the whole run pruner run is complete
479        let mut pruner = S::Pruner::default();
480        loop {
481            match storage.prune(&mut pruner).await {
482                Ok(Some(height)) => {
483                    tracing::warn!("Pruned to height {height}");
484                },
485                Ok(None) => {
486                    tracing::warn!("pruner run complete.");
487                    break;
488                },
489                Err(e) => {
490                    tracing::error!("pruner run failed: {e:?}");
491                    break;
492                },
493            }
494        }
495    }
496}
497
498impl<Types, S, P> FetchingDataSource<Types, S, P>
499where
500    Types: NodeType,
501    Payload<Types>: QueryablePayload<Types>,
502    Header<Types>: QueryableHeader<Types>,
503    S: VersionedDataSource + PruneStorage + HasMetrics + MigrateTypes<Types> + 'static,
504    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
505    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
506        + NodeStorage<Types>
507        + PrunedHeightStorage
508        + AggregatesStorage<Types>,
509    P: AvailabilityProvider<Types>,
510{
511    /// Build a [`FetchingDataSource`] with the given `storage` and `provider`.
512    pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
513        Builder::new(storage, provider)
514    }
515
516    async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
517        let leaf_only = builder.is_leaf_only();
518        let aggregator = builder.aggregator;
519        let aggregator_chunk_size = builder
520            .aggregator_chunk_size
521            .unwrap_or(builder.range_chunk_size);
522        let proactive_fetching = builder.proactive_fetching;
523        let minor_interval = builder.minor_scan_interval;
524        let major_interval = builder.major_scan_interval;
525        let major_offset = builder.major_scan_offset;
526        let proactive_range_chunk_size = builder
527            .proactive_range_chunk_size
528            .unwrap_or(builder.range_chunk_size);
529        let migration_batch_size = builder.types_migration_batch_size;
530        let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
531        let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
532
533        let fetcher = Arc::new(Fetcher::new(builder).await?);
534
535        // Migrate the old types to new PoS types
536        // This is a one-time operation that should be done before starting the data source
537        // It migrates leaf1 storage to leaf2
538        // and vid to vid2
539        fetcher.storage.migrate_types(migration_batch_size).await?;
540
541        let scanner = if proactive_fetching && !leaf_only {
542            Some(BackgroundTask::spawn(
543                "proactive scanner",
544                fetcher.clone().proactive_scan(
545                    minor_interval,
546                    major_interval,
547                    major_offset,
548                    proactive_range_chunk_size,
549                    scanner_metrics,
550                ),
551            ))
552        } else {
553            None
554        };
555
556        let aggregator = if aggregator && !leaf_only {
557            Some(BackgroundTask::spawn(
558                "aggregator",
559                fetcher
560                    .clone()
561                    .aggregate(aggregator_chunk_size, aggregator_metrics),
562            ))
563        } else {
564            None
565        };
566
567        let storage = fetcher.storage.clone();
568
569        let pruner = Pruner::new(storage).await;
570        let ds = Self {
571            fetcher,
572            scanner,
573            pruner,
574            aggregator,
575        };
576
577        Ok(ds)
578    }
579
580    /// Get a copy of the (shared) inner storage
581    pub fn inner(&self) -> Arc<S> {
582        self.fetcher.storage.clone()
583    }
584}
585
586impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
587where
588    Types: NodeType,
589{
590    fn as_ref(&self) -> &S {
591        &self.fetcher.storage
592    }
593}
594
595impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
596where
597    Types: NodeType,
598    S: HasMetrics,
599{
600    fn metrics(&self) -> &PrometheusMetrics {
601        self.as_ref().metrics()
602    }
603}
604
605#[async_trait]
606impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
607where
608    Types: NodeType,
609    Header<Types>: QueryableHeader<Types>,
610    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
611    for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
612    P: Send + Sync,
613{
614    async fn block_height(&self) -> QueryResult<usize> {
615        let mut tx = self.read().await.map_err(|err| QueryError::Error {
616            message: err.to_string(),
617        })?;
618        tx.block_height().await
619    }
620}
621
622#[async_trait]
623impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
624where
625    Types: NodeType,
626    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
627    for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
628    P: Send + Sync,
629{
630    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
631        let mut tx = self.read().await?;
632        tx.load_pruned_height().await
633    }
634}
635
636#[async_trait]
637impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
638where
639    Types: NodeType,
640    Header<Types>: QueryableHeader<Types>,
641    Payload<Types>: QueryablePayload<Types>,
642    S: VersionedDataSource + 'static,
643    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
644    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
645    P: AvailabilityProvider<Types>,
646{
647    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
648    where
649        ID: Into<LeafId<Types>> + Send + Sync,
650    {
651        self.fetcher.get(id.into()).await
652    }
653
654    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
655    where
656        ID: Into<BlockId<Types>> + Send + Sync,
657    {
658        self.fetcher
659            .get::<HeaderQueryData<_>>(id.into())
660            .await
661            .map(|h| h.header)
662    }
663
664    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
665    where
666        ID: Into<BlockId<Types>> + Send + Sync,
667    {
668        self.fetcher.get(id.into()).await
669    }
670
671    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
672    where
673        ID: Into<BlockId<Types>> + Send + Sync,
674    {
675        self.fetcher.get(id.into()).await
676    }
677
678    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
679    where
680        ID: Into<BlockId<Types>> + Send + Sync,
681    {
682        self.fetcher.get(id.into()).await
683    }
684
685    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
686    where
687        ID: Into<BlockId<Types>> + Send + Sync,
688    {
689        self.fetcher.get(VidCommonRequest::from(id.into())).await
690    }
691
692    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
693    where
694        ID: Into<BlockId<Types>> + Send + Sync,
695    {
696        self.fetcher.get(VidCommonRequest::from(id.into())).await
697    }
698
699    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
700    where
701        R: RangeBounds<usize> + Send + 'static,
702    {
703        self.fetcher.clone().get_range(range)
704    }
705
706    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
707    where
708        R: RangeBounds<usize> + Send + 'static,
709    {
710        self.fetcher.clone().get_range(range)
711    }
712
713    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
714    where
715        R: RangeBounds<usize> + Send + 'static,
716    {
717        let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
718
719        leaves
720            .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
721            .boxed()
722    }
723
724    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
725    where
726        R: RangeBounds<usize> + Send + 'static,
727    {
728        self.fetcher.clone().get_range(range)
729    }
730
731    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
732    where
733        R: RangeBounds<usize> + Send + 'static,
734    {
735        self.fetcher.clone().get_range(range)
736    }
737
738    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
739    where
740        R: RangeBounds<usize> + Send + 'static,
741    {
742        self.fetcher.clone().get_range(range)
743    }
744
745    async fn get_vid_common_metadata_range<R>(
746        &self,
747        range: R,
748    ) -> FetchStream<VidCommonMetadata<Types>>
749    where
750        R: RangeBounds<usize> + Send + 'static,
751    {
752        self.fetcher.clone().get_range(range)
753    }
754
755    async fn get_leaf_range_rev(
756        &self,
757        start: Bound<usize>,
758        end: usize,
759    ) -> FetchStream<LeafQueryData<Types>> {
760        self.fetcher.clone().get_range_rev(start, end)
761    }
762
763    async fn get_block_range_rev(
764        &self,
765        start: Bound<usize>,
766        end: usize,
767    ) -> FetchStream<BlockQueryData<Types>> {
768        self.fetcher.clone().get_range_rev(start, end)
769    }
770
771    async fn get_payload_range_rev(
772        &self,
773        start: Bound<usize>,
774        end: usize,
775    ) -> FetchStream<PayloadQueryData<Types>> {
776        self.fetcher.clone().get_range_rev(start, end)
777    }
778
779    async fn get_payload_metadata_range_rev(
780        &self,
781        start: Bound<usize>,
782        end: usize,
783    ) -> FetchStream<PayloadMetadata<Types>> {
784        self.fetcher.clone().get_range_rev(start, end)
785    }
786
787    async fn get_vid_common_range_rev(
788        &self,
789        start: Bound<usize>,
790        end: usize,
791    ) -> FetchStream<VidCommonQueryData<Types>> {
792        self.fetcher.clone().get_range_rev(start, end)
793    }
794
795    async fn get_vid_common_metadata_range_rev(
796        &self,
797        start: Bound<usize>,
798        end: usize,
799    ) -> FetchStream<VidCommonMetadata<Types>> {
800        self.fetcher.clone().get_range_rev(start, end)
801    }
802
803    async fn get_block_containing_transaction(
804        &self,
805        h: TransactionHash<Types>,
806    ) -> Fetch<BlockWithTransaction<Types>> {
807        self.fetcher.clone().get(TransactionRequest::from(h)).await
808    }
809}
810
811impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
812where
813    Types: NodeType,
814    Header<Types>: QueryableHeader<Types>,
815    Payload<Types>: QueryablePayload<Types>,
816    S: VersionedDataSource + 'static,
817    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
818    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
819    P: AvailabilityProvider<Types>,
820{
821    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
822        let height = info.height() as usize;
823        let fetch_block = info.block.is_none();
824        let fetch_vid = info.vid_common.is_none();
825
826        // Trigger a fetch of the parent leaf, if we don't already have it.
827        leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
828
829        self.fetcher.store_and_notify(info).await;
830
831        if fetch_block || fetch_vid {
832            // If data related to this block is missing, try and fetch it. Do this in an async task:
833            // we're triggering a fire-and-forget fetch; we don't need to block the caller on this.
834            let fetcher = self.fetcher.clone();
835            let span = tracing::info_span!("fetch missing data", height);
836            spawn(
837                async move {
838                    tracing::info!(fetch_block, fetch_vid, "fetching missing data");
839                    if fetch_block {
840                        fetcher.get::<PayloadMetadata<Types>>(height).await;
841                    }
842                    if fetch_vid {
843                        fetcher.get::<VidCommonMetadata<Types>>(height).await;
844                    }
845                }
846                .instrument(span),
847            );
848        }
849        Ok(())
850    }
851}
852
853impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
854where
855    Types: NodeType,
856    S: VersionedDataSource + Send + Sync,
857    P: Send + Sync,
858{
859    type Transaction<'a>
860        = S::Transaction<'a>
861    where
862        Self: 'a;
863    type ReadOnly<'a>
864        = S::ReadOnly<'a>
865    where
866        Self: 'a;
867
868    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
869        self.fetcher.write().await
870    }
871
872    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
873        self.fetcher.read().await
874    }
875}
876
877/// Asynchronous retrieval and storage of [`Fetchable`] resources.
878#[derive(Debug)]
879struct Fetcher<Types, S, P>
880where
881    Types: NodeType,
882{
883    storage: Arc<S>,
884    notifiers: Notifiers<Types>,
885    provider: Arc<P>,
886    leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
887    payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
888    vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
889    range_chunk_size: usize,
890    // Duration to sleep after each active fetch,
891    active_fetch_delay: Duration,
892    // Duration to sleep after each chunk fetched
893    chunk_fetch_delay: Duration,
894    // Exponential backoff when retrying failed operations.
895    backoff: ExponentialBackoff,
896    // Semaphore limiting the number of simultaneous DB accesses we can have from tasks spawned to
897    // retry failed loads.
898    retry_semaphore: Arc<Semaphore>,
899    leaf_only: bool,
900}
901
902impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
903where
904    Types: NodeType,
905    S: VersionedDataSource + Send + Sync,
906    P: Send + Sync,
907{
908    type Transaction<'a>
909        = S::Transaction<'a>
910    where
911        Self: 'a;
912    type ReadOnly<'a>
913        = S::ReadOnly<'a>
914    where
915        Self: 'a;
916
917    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
918        self.storage.write().await
919    }
920
921    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
922        self.storage.read().await
923    }
924}
925
926impl<Types, S, P> Fetcher<Types, S, P>
927where
928    Types: NodeType,
929    Header<Types>: QueryableHeader<Types>,
930    S: VersionedDataSource + Sync,
931    for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
932{
933    pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
934        let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
935        let backoff = builder.backoff.build();
936
937        let (payload_fetcher, vid_fetcher) = if builder.is_leaf_only() {
938            (None, None)
939        } else {
940            (
941                Some(Arc::new(fetching::Fetcher::new(
942                    retry_semaphore.clone(),
943                    backoff.clone(),
944                ))),
945                Some(Arc::new(fetching::Fetcher::new(
946                    retry_semaphore.clone(),
947                    backoff.clone(),
948                ))),
949            )
950        };
951        let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
952
953        let leaf_only = builder.leaf_only;
954
955        Ok(Self {
956            storage: Arc::new(builder.storage),
957            notifiers: Default::default(),
958            provider: Arc::new(builder.provider),
959            leaf_fetcher: Arc::new(leaf_fetcher),
960            payload_fetcher,
961            vid_common_fetcher: vid_fetcher,
962            range_chunk_size: builder.range_chunk_size,
963            active_fetch_delay: builder.active_fetch_delay,
964            chunk_fetch_delay: builder.chunk_fetch_delay,
965            backoff,
966            retry_semaphore,
967            leaf_only,
968        })
969    }
970}
971
972impl<Types, S, P> Fetcher<Types, S, P>
973where
974    Types: NodeType,
975    Header<Types>: QueryableHeader<Types>,
976    Payload<Types>: QueryablePayload<Types>,
977    S: VersionedDataSource + 'static,
978    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
979    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
980    P: AvailabilityProvider<Types>,
981{
982    async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
983    where
984        T: Fetchable<Types>,
985    {
986        let req = req.into();
987
988        // Subscribe to notifications before we check storage for the requested object. This ensures
989        // that this operation will always eventually succeed as long as the requested object
990        // actually exists (or will exist). We will either find it in our local storage and succeed
991        // immediately, or (if it exists) someone will *later* come and add it to storage, at which
992        // point we will get a notification causing this passive fetch to resolve.
993        //
994        // Note the "someone" who later fetches the object and adds it to storage may be an active
995        // fetch triggered by this very requests, in cases where that is possible, but it need not
996        // be.
997        let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
998
999        match self.try_get(req).await {
1000            Ok(Some(obj)) => return Fetch::Ready(obj),
1001            Ok(None) => return passive(req, passive_fetch),
1002            Err(err) => {
1003                tracing::warn!(
1004                    ?req,
1005                    "unable to fetch object; spawning a task to retry: {err:#}"
1006                );
1007            },
1008        }
1009
1010        // We'll use this channel to get the object back if we successfully load it on retry.
1011        let (send, recv) = oneshot::channel();
1012
1013        let fetcher = self.clone();
1014        let mut backoff = fetcher.backoff.clone();
1015        let span = tracing::warn_span!("get retry", ?req);
1016        spawn(
1017            async move {
1018                let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1019                loop {
1020                    let res = {
1021                        // Limit the number of simultaneous retry tasks hitting the database. When
1022                        // the database is down, we might have a lot of these tasks running, and if
1023                        // they all hit the DB at once, they are only going to make things worse.
1024                        let _guard = fetcher.retry_semaphore.acquire().await;
1025                        fetcher.try_get(req).await
1026                    };
1027                    match res {
1028                        Ok(Some(obj)) => {
1029                            // If the object was immediately available after all, signal the
1030                            // original fetch. We probably just temporarily couldn't access it due
1031                            // to database errors.
1032                            tracing::info!(?req, "object was ready after retries");
1033                            send.send(obj).ok();
1034                            break;
1035                        },
1036                        Ok(None) => {
1037                            // The object was not immediately available after all, but we have
1038                            // successfully spawned a fetch for it if possible. The spawned fetch
1039                            // will notify the original request once it completes.
1040                            tracing::info!(?req, "spawned fetch after retries");
1041                            break;
1042                        },
1043                        Err(err) => {
1044                            tracing::warn!(
1045                                ?req,
1046                                ?delay,
1047                                "unable to fetch object, will retry: {err:#}"
1048                            );
1049                            sleep(delay).await;
1050                            if let Some(next_delay) = backoff.next_backoff() {
1051                                delay = next_delay;
1052                            }
1053                        },
1054                    }
1055                }
1056            }
1057            .instrument(span),
1058        );
1059
1060        // Wait for the object to be fetched, either from the local database on retry or from
1061        // another provider eventually.
1062        passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1063    }
1064
1065    /// Try to get an object from local storage or initialize a fetch if it is missing.
1066    ///
1067    /// There are three possible scenarios in this function, indicated by the return type:
1068    /// * `Ok(Some(obj))`: the requested object was available locally and successfully retrieved
1069    ///   from the database; no fetch was spawned
1070    /// * `Ok(None)`: the requested object was not available locally, but a fetch was successfully
1071    ///   spawned if possible (in other words, if a fetch was not spawned, it was determined that
1072    ///   the requested object is not fetchable)
1073    /// * `Err(_)`: it could not be determined whether the object was available locally or whether
1074    ///   it could be fetched; no fetch was spawned even though the object may be fetchable
1075    async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1076    where
1077        T: Fetchable<Types>,
1078    {
1079        let mut tx = self.read().await.context("opening read transaction")?;
1080        match T::load(&mut tx, req).await {
1081            Ok(t) => Ok(Some(t)),
1082            Err(QueryError::Missing | QueryError::NotFound) => {
1083                // We successfully queried the database, but the object wasn't there. Try to
1084                // fetch it.
1085                tracing::debug!(?req, "object missing from local storage, will try to fetch");
1086                self.fetch::<T>(&mut tx, req).await?;
1087                Ok(None)
1088            },
1089            Err(err) => {
1090                // An error occurred while querying the database. We don't know if we need to fetch
1091                // the object or not. Return an error so we can try again.
1092                bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1093            },
1094        }
1095    }
1096
1097    /// Get a range of objects from local storage or a provider.
1098    ///
1099    /// Convert a finite stream of fallible local storage lookups into a (possibly infinite) stream
1100    /// of infallible fetches. Objects in `range` are loaded from local storage. Any gaps or missing
1101    /// objects are filled by fetching from a provider. Items in the resulting stream are futures
1102    /// that will never fail to produce a resource, although they may block indefinitely if the
1103    /// resource needs to be fetched.
1104    ///
1105    /// Objects are loaded and fetched in chunks, which strikes a good balance of limiting the total
1106    /// number of storage and network requests, while also keeping the amount of simultaneous
1107    /// resource consumption bounded.
1108    fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1109    where
1110        R: RangeBounds<usize> + Send + 'static,
1111        T: RangedFetchable<Types>,
1112    {
1113        let chunk_size = self.range_chunk_size;
1114        self.get_range_with_chunk_size(chunk_size, range)
1115    }
1116
1117    /// Same as [`Self::get_range`], but uses the given chunk size instead of the default.
1118    fn get_range_with_chunk_size<R, T>(
1119        self: Arc<Self>,
1120        chunk_size: usize,
1121        range: R,
1122    ) -> BoxStream<'static, Fetch<T>>
1123    where
1124        R: RangeBounds<usize> + Send + 'static,
1125        T: RangedFetchable<Types>,
1126    {
1127        let chunk_fetch_delay = self.chunk_fetch_delay;
1128        let active_fetch_delay = self.active_fetch_delay;
1129
1130        stream::iter(range_chunks(range, chunk_size))
1131            .then(move |chunk| {
1132                let self_clone = self.clone();
1133                async move {
1134                    {
1135                        let chunk = self_clone.get_chunk(chunk).await;
1136
1137                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1138                        // helps to limit constant high CPU usage when fetching long range of data,
1139                        // especially for older streams that fetch most of the data from local
1140                        // storage.
1141                        sleep(chunk_fetch_delay).await;
1142                        stream::iter(chunk)
1143                    }
1144                }
1145            })
1146            .flatten()
1147            .then(move |f| async move {
1148                match f {
1149                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1150                    // the catchup provider. The delay applies between pending fetches, not between
1151                    // chunks.
1152                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1153                    Fetch::Ready(_) => (),
1154                };
1155                f
1156            })
1157            .boxed()
1158    }
1159
1160    /// Same as [`Self::get_range`], but yields objects in reverse order by height.
1161    ///
1162    /// Note that unlike [`Self::get_range`], which accepts any range and yields an infinite stream
1163    /// if the range has no upper bound, this function requires there to be a defined upper bound,
1164    /// otherwise we don't know where the reversed stream should _start_. The `end` bound given here
1165    /// is inclusive; i.e. the first item yielded by the stream will have height `end`.
1166    fn get_range_rev<T>(
1167        self: Arc<Self>,
1168        start: Bound<usize>,
1169        end: usize,
1170    ) -> BoxStream<'static, Fetch<T>>
1171    where
1172        T: RangedFetchable<Types>,
1173    {
1174        let chunk_size = self.range_chunk_size;
1175        self.get_range_with_chunk_size_rev(chunk_size, start, end)
1176    }
1177
1178    /// Same as [`Self::get_range_rev`], but uses the given chunk size instead of the default.
1179    fn get_range_with_chunk_size_rev<T>(
1180        self: Arc<Self>,
1181        chunk_size: usize,
1182        start: Bound<usize>,
1183        end: usize,
1184    ) -> BoxStream<'static, Fetch<T>>
1185    where
1186        T: RangedFetchable<Types>,
1187    {
1188        let chunk_fetch_delay = self.chunk_fetch_delay;
1189        let active_fetch_delay = self.active_fetch_delay;
1190
1191        stream::iter(range_chunks_rev(start, end, chunk_size))
1192            .then(move |chunk| {
1193                let self_clone = self.clone();
1194                async move {
1195                    {
1196                        let chunk = self_clone.get_chunk(chunk).await;
1197
1198                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1199                        // helps to limit constant high CPU usage when fetching long range of data,
1200                        // especially for older streams that fetch most of the data from local
1201                        // storage
1202                        sleep(chunk_fetch_delay).await;
1203                        stream::iter(chunk.into_iter().rev())
1204                    }
1205                }
1206            })
1207            .flatten()
1208            .then(move |f| async move {
1209                match f {
1210                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1211                    // the catchup provider. The delay applies between pending fetches, not between
1212                    // chunks.
1213                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1214                    Fetch::Ready(_) => (),
1215                };
1216                f
1217            })
1218            .boxed()
1219    }
1220
1221    /// Get a range of objects from local storage or a provider.
1222    ///
1223    /// This method is similar to `get_range`, except that:
1224    /// * It fetches all desired objects together, as a single chunk
1225    /// * It loads the object or triggers fetches right now rather than providing a lazy stream
1226    ///   which only fetches objects when polled.
1227    async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1228    where
1229        T: RangedFetchable<Types>,
1230    {
1231        // Subscribe to notifications first. As in [`get`](Self::get), this ensures we won't miss
1232        // any notifications sent in between checking local storage and triggering a fetch if
1233        // necessary.
1234        let passive_fetches = join_all(
1235            chunk
1236                .clone()
1237                .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1238        )
1239        .await;
1240
1241        match self.try_get_chunk(&chunk).await {
1242            Ok(objs) => {
1243                // Convert to fetches. Objects which are not immediately available (`None` in the
1244                // chunk) become passive fetches awaiting a notification of availability.
1245                return objs
1246                    .into_iter()
1247                    .zip(passive_fetches)
1248                    .enumerate()
1249                    .map(move |(i, (obj, passive_fetch))| match obj {
1250                        Some(obj) => Fetch::Ready(obj),
1251                        None => passive(T::Request::from(chunk.start + i), passive_fetch),
1252                    })
1253                    .collect();
1254            },
1255            Err(err) => {
1256                tracing::warn!(
1257                    ?chunk,
1258                    "unable to fetch chunk; spawning a task to retry: {err:#}"
1259                );
1260            },
1261        }
1262
1263        // We'll use these channels to get the objects back that we successfully load on retry.
1264        let (send, recv): (Vec<_>, Vec<_>) =
1265            repeat_with(oneshot::channel).take(chunk.len()).unzip();
1266
1267        {
1268            let fetcher = self.clone();
1269            let mut backoff = fetcher.backoff.clone();
1270            let chunk = chunk.clone();
1271            let span = tracing::warn_span!("get_chunk retry", ?chunk);
1272            spawn(
1273                async move {
1274                    let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1275                    loop {
1276                        let res = {
1277                            // Limit the number of simultaneous retry tasks hitting the database.
1278                            // When the database is down, we might have a lot of these tasks
1279                            // running, and if they all hit the DB at once, they are only going to
1280                            // make things worse.
1281                            let _guard = fetcher.retry_semaphore.acquire().await;
1282                            fetcher.try_get_chunk(&chunk).await
1283                        };
1284                        match res {
1285                            Ok(objs) => {
1286                                for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1287                                    if let Some(obj) = obj {
1288                                        // If the object was immediately available after all, signal
1289                                        // the original fetch. We probably just temporarily couldn't
1290                                        // access it due to database errors.
1291                                        tracing::info!(?chunk, i, "object was ready after retries");
1292                                        sender.send(obj).ok();
1293                                    } else {
1294                                        // The object was not immediately available after all, but
1295                                        // we have successfully spawned a fetch for it if possible.
1296                                        // The spawned fetch will notify the original request once
1297                                        // it completes.
1298                                        tracing::info!(?chunk, i, "spawned fetch after retries");
1299                                    }
1300                                }
1301                                break;
1302                            },
1303                            Err(err) => {
1304                                tracing::warn!(
1305                                    ?chunk,
1306                                    ?delay,
1307                                    "unable to fetch chunk, will retry: {err:#}"
1308                                );
1309                                sleep(delay).await;
1310                                if let Some(next_delay) = backoff.next_backoff() {
1311                                    delay = next_delay;
1312                                }
1313                            },
1314                        }
1315                    }
1316                }
1317                .instrument(span),
1318            );
1319        }
1320
1321        // Wait for the objects to be fetched, either from the local database on retry or from
1322        // another provider eventually.
1323        passive_fetches
1324            .into_iter()
1325            .zip(recv)
1326            .enumerate()
1327            .map(move |(i, (passive_fetch, recv))| {
1328                passive(
1329                    T::Request::from(chunk.start + i),
1330                    select_some(passive_fetch, recv.map(Result::ok)),
1331                )
1332            })
1333            .collect()
1334    }
1335
1336    /// Try to get a range of objects from local storage, initializing fetches if any are missing.
1337    ///
1338    /// If this function succeeded, then for each object in the requested range, either:
1339    /// * the object was available locally, and corresponds to `Some(_)` object in the result
1340    /// * the object was not available locally (and corresponds to `None` in the result), but a
1341    ///   fetch was successfully spawned if possible (in other words, if a fetch was not spawned, it
1342    ///   was determined that the requested object is not fetchable)
1343    ///
1344    /// This function will fail if it could not be determined which objects in the requested range
1345    /// are available locally, or if, for any missing object, it could not be determined whether
1346    /// that object is fetchable. In this case, there may be no fetch spawned for certain objects in
1347    /// the requested range, even if those objects are actually fetchable.
1348    async fn try_get_chunk<T>(
1349        self: &Arc<Self>,
1350        chunk: &Range<usize>,
1351    ) -> anyhow::Result<Vec<Option<T>>>
1352    where
1353        T: RangedFetchable<Types>,
1354    {
1355        let mut tx = self.read().await.context("opening read transaction")?;
1356        let ts = T::load_range(&mut tx, chunk.clone())
1357            .await
1358            .context(format!("when fetching items in range {chunk:?}"))?;
1359
1360        // Log and discard error information; we want a list of Option where None indicates an
1361        // object that needs to be fetched. Note that we don't use `FetchRequest::might_exist` to
1362        // silence the logs here when an object is missing that is not expected to exist at all.
1363        // When objects are not expected to exist, `load_range` should just return a truncated list
1364        // rather than returning `Err` objects, so if there are errors in here they are unexpected
1365        // and we do want to log them.
1366        let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1367
1368        // Kick off a fetch for each missing object.
1369        let mut results = Vec::with_capacity(chunk.len());
1370        for t in ts {
1371            // Fetch missing objects that should come before `t`.
1372            while chunk.start + results.len() < t.height() as usize {
1373                tracing::debug!(
1374                    "item {} in chunk not available, will be fetched",
1375                    results.len()
1376                );
1377                self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1378                    .await?;
1379                results.push(None);
1380            }
1381
1382            results.push(Some(t));
1383        }
1384        // Fetch missing objects from the end of the range.
1385        while results.len() < chunk.len() {
1386            self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1387                .await?;
1388            results.push(None);
1389        }
1390
1391        Ok(results)
1392    }
1393
1394    /// Spawn an active fetch for the requested object, if possible.
1395    ///
1396    /// On success, either an active fetch for `req` has been spawned, or it has been determined
1397    /// that `req` is not fetchable. Fails if it cannot be determined (e.g. due to errors in the
1398    /// local database) whether `req` is fetchable or not.
1399    async fn fetch<T>(
1400        self: &Arc<Self>,
1401        tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1402        req: T::Request,
1403    ) -> anyhow::Result<()>
1404    where
1405        T: Fetchable<Types>,
1406    {
1407        tracing::debug!("fetching resource {req:?}");
1408
1409        // Trigger an active fetch from a remote provider if possible.
1410        let heights = Heights::load(tx)
1411            .await
1412            .context("failed to load heights; cannot definitively say object might exist")?;
1413        if req.might_exist(heights) {
1414            T::active_fetch(tx, self.clone(), req).await?;
1415        } else {
1416            tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1417        }
1418        Ok(())
1419    }
1420
1421    /// Proactively search for and retrieve missing objects.
1422    ///
1423    /// This function will proactively identify and retrieve blocks and leaves which are missing
1424    /// from storage. It will run until cancelled, thus, it is meant to be spawned as a background
1425    /// task rather than called synchronously.
1426    async fn proactive_scan(
1427        self: Arc<Self>,
1428        minor_interval: Duration,
1429        major_interval: usize,
1430        major_offset: usize,
1431        chunk_size: usize,
1432        metrics: ScannerMetrics,
1433    ) {
1434        let mut prev_height = 0;
1435
1436        for i in 0.. {
1437            let major = i % major_interval == major_offset % major_interval;
1438            let span = tracing::warn_span!("proactive scan", i, major, prev_height);
1439            metrics.running.set(1);
1440            metrics.current_scan.set(i);
1441            metrics.current_is_major.set(major as usize);
1442            async {
1443                let mut backoff = minor_interval;
1444                let max_backoff = Duration::from_secs(60);
1445                metrics.backoff.set(backoff.as_secs() as usize);
1446
1447                // We can't start the scan until we know the current block height and pruned height,
1448                // so we know which blocks to scan. Thus we retry until this succeeds.
1449                let heights = loop {
1450                    let mut tx = match self.read().await {
1451                        Ok(tx) => tx,
1452                        Err(err) => {
1453                            tracing::error!(
1454                                ?backoff,
1455                                "unable to start transaction for scan: {err:#}"
1456                            );
1457                            metrics.retries.update(1);
1458                            sleep(backoff).await;
1459                            backoff = min(2 * backoff, max_backoff);
1460                            metrics.backoff.set(backoff.as_secs() as usize);
1461                            continue;
1462                        },
1463                    };
1464                    let heights = match Heights::load(&mut tx).await {
1465                        Ok(heights) => heights,
1466                        Err(err) => {
1467                            tracing::error!(?backoff, "unable to load heights: {err:#}");
1468                            metrics.retries.update(1);
1469                            sleep(backoff).await;
1470                            backoff = min(2 * backoff, max_backoff);
1471                            metrics.backoff.set(backoff.as_secs() as usize);
1472                            continue;
1473                        },
1474                    };
1475                    metrics.retries.set(0);
1476                    break heights;
1477                };
1478
1479                // Get the pruned height or default to 0 if it is not set. We will not look for
1480                // blocks older than the pruned height.
1481                let minimum_block_height = heights.pruned_height.unwrap_or(0) as usize;
1482                // Get the block height; we will look for any missing blocks up to `block_height`.
1483                let block_height = heights.height as usize;
1484
1485                // In a major scan, we fetch all blocks between 0 and `block_height`. In minor scans
1486                // (much more frequent) we fetch blocks that are missing since the last scan.
1487                let start = if major {
1488                    // We log major scans at WARN level, since they happen infrequently and can have
1489                    // a measurable impact on performance while running. This also serves as a
1490                    // useful progress heartbeat.
1491                    tracing::warn!(
1492                        start = minimum_block_height,
1493                        block_height,
1494                        "starting major scan"
1495                    );
1496
1497                    // If we're starting a major scan, reset the major scan counts of missing data
1498                    // as we're about to recompute them.
1499                    metrics.major_missing_blocks.set(0);
1500                    metrics.major_missing_vid.set(0);
1501
1502                    minimum_block_height
1503                } else {
1504                    tracing::info!(start = prev_height, block_height, "starting minor scan");
1505                    prev_height
1506                };
1507                prev_height = block_height;
1508                metrics.current_start.set(start);
1509                metrics.current_end.set(block_height);
1510                metrics.scanned_blocks.set(0);
1511                metrics.scanned_vid.set(0);
1512
1513                // Iterate over all blocks that we should have. Fetching the payload metadata is
1514                // enough to trigger an active fetch of the corresponding leaf and the full block if
1515                // they are missing, without loading the full payload from storage if we already
1516                // have it.
1517                //
1518                // We iterate in reverse order for two reasons:
1519                // 1. More recent blocks are more likely to be requested sooner.
1520                // 2. Leaves are inherently fetched in reverse order, because we cannot (actively)
1521                //    fetch a leaf until we have the subsequent leaf, which tells us what the hash
1522                //    of its parent should be. Thus, if we iterated forwards, we could get stuck any
1523                //    time we came upon two consecutive missing leaves.
1524                let mut blocks = self
1525                    .clone()
1526                    .get_range_with_chunk_size_rev::<PayloadMetadata<Types>>(
1527                        chunk_size,
1528                        Bound::Included(start),
1529                        block_height.saturating_sub(1),
1530                    );
1531                let mut missing_blocks = 0;
1532                while let Some(fetch) = blocks.next().await {
1533                    if fetch.is_pending() {
1534                        missing_blocks += 1;
1535                        // Wait for the block to be fetched. This slows down the scanner so that we
1536                        // don't waste memory generating more active fetch tasks then we can handle
1537                        // at a given time. Note that even with this await, all blocks within a
1538                        // chunk are fetched in parallel, so this does not block the next block in
1539                        // the chunk, only the next chunk until the current chunk completes.
1540                        fetch.await;
1541                    }
1542                    metrics.scanned_blocks.update(1);
1543                }
1544                metrics.add_missing_blocks(major, missing_blocks);
1545
1546                // We have to trigger a separate fetch of the VID data, since this is fetched
1547                // independently of the block payload.
1548                let mut vid = self
1549                    .clone()
1550                    .get_range_with_chunk_size_rev::<VidCommonMetadata<Types>>(
1551                        chunk_size,
1552                        Bound::Included(start),
1553                        block_height.saturating_sub(1),
1554                    );
1555                let mut missing_vid = 0;
1556                while let Some(fetch) = vid.next().await {
1557                    if fetch.is_pending() {
1558                        missing_vid += 1;
1559                        // As above, limit the speed at which we spawn new fetches to the speed at
1560                        // which we can process them.
1561                        fetch.await;
1562                    }
1563                    metrics.scanned_vid.update(1);
1564                }
1565                metrics.add_missing_vid(major, missing_vid);
1566
1567                tracing::info!("completed proactive scan, will scan again in {minor_interval:?}");
1568
1569                // Reset metrics.
1570                metrics.running.set(0);
1571                if major {
1572                    // If we just completed a major scan, reset the incremental counts of missing
1573                    // data from minor scans, so the next round of minor scans can recompute/update
1574                    // them.
1575                    metrics.minor_missing_blocks.set(0);
1576                    metrics.minor_missing_vid.set(0);
1577                }
1578
1579                sleep(minor_interval).await;
1580            }
1581            .instrument(span)
1582            .await;
1583        }
1584    }
1585}
1586
1587impl<Types, S, P> Fetcher<Types, S, P>
1588where
1589    Types: NodeType,
1590    Header<Types>: QueryableHeader<Types>,
1591    Payload<Types>: QueryablePayload<Types>,
1592    S: VersionedDataSource + 'static,
1593    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1594    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1595        + NodeStorage<Types>
1596        + PrunedHeightStorage
1597        + AggregatesStorage<Types>,
1598    P: AvailabilityProvider<Types>,
1599{
1600    #[tracing::instrument(skip_all)]
1601    async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1602        loop {
1603            let prev_aggregate = loop {
1604                let mut tx = match self.read().await {
1605                    Ok(tx) => tx,
1606                    Err(err) => {
1607                        tracing::error!("unable to open read tx: {err:#}");
1608                        sleep(Duration::from_secs(5)).await;
1609                        continue;
1610                    },
1611                };
1612                match tx.load_prev_aggregate().await {
1613                    Ok(agg) => break agg,
1614                    Err(err) => {
1615                        tracing::error!("unable to load previous aggregate: {err:#}");
1616                        sleep(Duration::from_secs(5)).await;
1617                        continue;
1618                    },
1619                }
1620            };
1621
1622            let (start, mut prev_aggregate) = match prev_aggregate {
1623                Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1624                None => (0, Aggregate::default()),
1625            };
1626
1627            tracing::info!(start, "starting aggregator");
1628            metrics.height.set(start);
1629
1630            let mut blocks = self
1631                .clone()
1632                .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1633                .then(Fetch::resolve)
1634                .ready_chunks(chunk_size)
1635                .boxed();
1636            while let Some(chunk) = blocks.next().await {
1637                let Some(last) = chunk.last() else {
1638                    // This is not supposed to happen, but if the chunk is empty, just skip it.
1639                    tracing::warn!("ready_chunks returned an empty chunk");
1640                    continue;
1641                };
1642                let height = last.height();
1643                let num_blocks = chunk.len();
1644                tracing::debug!(
1645                    num_blocks,
1646                    height,
1647                    "updating aggregate statistics for chunk"
1648                );
1649                loop {
1650                    let res = async {
1651                        let mut tx = self.write().await.context("opening transaction")?;
1652                        let aggregate =
1653                            tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1654                        tx.commit().await.context("committing transaction")?;
1655                        prev_aggregate = aggregate;
1656                        anyhow::Result::<_>::Ok(())
1657                    }
1658                    .await;
1659                    match res {
1660                        Ok(()) => {
1661                            break;
1662                        },
1663                        Err(err) => {
1664                            tracing::warn!(
1665                                num_blocks,
1666                                height,
1667                                "failed to update aggregates for chunk: {err:#}"
1668                            );
1669                            sleep(Duration::from_secs(1)).await;
1670                        },
1671                    }
1672                }
1673                metrics.height.set(height as usize);
1674            }
1675            tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1676        }
1677    }
1678}
1679
1680impl<Types, S, P> Fetcher<Types, S, P>
1681where
1682    Types: NodeType,
1683    S: VersionedDataSource,
1684    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1685{
1686    /// Store an object and notify anyone waiting on this object that it is available.
1687    async fn store_and_notify<T>(&self, obj: T)
1688    where
1689        T: Storable<Types>,
1690    {
1691        let try_store = || async {
1692            let mut tx = self.storage.write().await?;
1693            obj.clone().store(&mut tx, self.leaf_only).await?;
1694            tx.commit().await
1695        };
1696
1697        // Store the object in local storage, so we can avoid fetching it in the future.
1698        let mut backoff = self.backoff.clone();
1699        backoff.reset();
1700        loop {
1701            let Err(err) = try_store().await else {
1702                break;
1703            };
1704            // It is unfortunate if this fails, but we can still proceed by notifying with the
1705            // object that we fetched, keeping it in memory. Log the error, retry a few times, and
1706            // eventually move on.
1707            tracing::warn!(
1708                "failed to store fetched {} {}: {err:#}",
1709                T::name(),
1710                obj.height()
1711            );
1712
1713            let Some(delay) = backoff.next_backoff() else {
1714                break;
1715            };
1716            tracing::info!(?delay, "retrying failed operation");
1717            sleep(delay).await;
1718        }
1719
1720        // Send a notification about the newly received object. It is important that we do this
1721        // _after_ our attempt to store the object in local storage, otherwise there is a potential
1722        // missed notification deadlock:
1723        // * we send the notification
1724        // * a task calls [`get`](Self::get) or [`get_chunk`](Self::get_chunk), finds that the
1725        //   requested object is not in storage, and begins waiting for a notification
1726        // * we store the object. This ensures that no other task will be triggered to fetch it,
1727        //   which means no one will ever notify the waiting task.
1728        //
1729        // Note that we send the notification regardless of whether the store actually succeeded or
1730        // not. This is to avoid _another_ subtle deadlock: if we failed to notify just because we
1731        // failed to store, some fetches might not resolve, even though the object in question has
1732        // actually been fetched. This should actually be ok, because as long as the object is not
1733        // in storage, eventually some other task will come along and fetch, store, and notify about
1734        // it. However, this is certainly not ideal, since we could resolve those pending fetches
1735        // right now, and it causes bigger problems when the fetch that fails to resolve is the
1736        // proactive scanner task, who is often the one that would eventually come along and
1737        // re-fetch the object.
1738        //
1739        // The key thing to note is that it does no harm to notify even if we fail to store: at best
1740        // we wake some tasks up sooner; at worst, anyone who misses the notification still
1741        // satisfies the invariant that we only wait on notifications for objects which are not in
1742        // storage, and eventually some other task will come along, find the object missing from
1743        // storage, and re-fetch it.
1744        obj.notify(&self.notifiers).await;
1745    }
1746}
1747
1748#[derive(Debug)]
1749struct Notifiers<Types>
1750where
1751    Types: NodeType,
1752{
1753    block: Notifier<BlockQueryData<Types>>,
1754    leaf: Notifier<LeafQueryData<Types>>,
1755    vid_common: Notifier<VidCommonQueryData<Types>>,
1756}
1757
1758impl<Types> Default for Notifiers<Types>
1759where
1760    Types: NodeType,
1761{
1762    fn default() -> Self {
1763        Self {
1764            block: Notifier::new(),
1765            leaf: Notifier::new(),
1766            vid_common: Notifier::new(),
1767        }
1768    }
1769}
1770
1771#[derive(Clone, Copy, Debug)]
1772struct Heights {
1773    height: u64,
1774    pruned_height: Option<u64>,
1775}
1776
1777impl Heights {
1778    async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1779    where
1780        Types: NodeType,
1781        Header<Types>: QueryableHeader<Types>,
1782        T: NodeStorage<Types> + PrunedHeightStorage + Send,
1783    {
1784        let height = tx.block_height().await.context("loading block height")? as u64;
1785        let pruned_height = tx
1786            .load_pruned_height()
1787            .await
1788            .context("loading pruned height")?;
1789        Ok(Self {
1790            height,
1791            pruned_height,
1792        })
1793    }
1794
1795    fn might_exist(self, h: u64) -> bool {
1796        h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1797    }
1798}
1799
1800#[async_trait]
1801impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1802    for FetchingDataSource<Types, S, P>
1803where
1804    Types: NodeType,
1805    S: VersionedDataSource + 'static,
1806    for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1807    P: Send + Sync,
1808    State: MerklizedState<Types, ARITY> + 'static,
1809    <State as MerkleTreeScheme>::Commitment: Send,
1810{
1811    async fn get_path(
1812        &self,
1813        snapshot: Snapshot<Types, State, ARITY>,
1814        key: State::Key,
1815    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1816        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1817            message: err.to_string(),
1818        })?;
1819        tx.get_path(snapshot, key).await
1820    }
1821}
1822
1823#[async_trait]
1824impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1825where
1826    Types: NodeType,
1827    Header<Types>: QueryableHeader<Types>,
1828    Payload<Types>: QueryablePayload<Types>,
1829    S: VersionedDataSource + 'static,
1830    for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1831    P: Send + Sync,
1832{
1833    async fn get_last_state_height(&self) -> QueryResult<usize> {
1834        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1835            message: err.to_string(),
1836        })?;
1837        tx.get_last_state_height().await
1838    }
1839}
1840
1841#[async_trait]
1842impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1843where
1844    Types: NodeType,
1845    Header<Types>: QueryableHeader<Types>,
1846    S: VersionedDataSource + 'static,
1847    for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
1848    P: Send + Sync,
1849{
1850    async fn block_height(&self) -> QueryResult<usize> {
1851        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1852            message: err.to_string(),
1853        })?;
1854        tx.block_height().await
1855    }
1856
1857    async fn count_transactions_in_range(
1858        &self,
1859        range: impl RangeBounds<usize> + Send,
1860        namespace: Option<NamespaceId<Types>>,
1861    ) -> QueryResult<usize> {
1862        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1863            message: err.to_string(),
1864        })?;
1865        tx.count_transactions_in_range(range, namespace).await
1866    }
1867
1868    async fn payload_size_in_range(
1869        &self,
1870        range: impl RangeBounds<usize> + Send,
1871        namespace: Option<NamespaceId<Types>>,
1872    ) -> QueryResult<usize> {
1873        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1874            message: err.to_string(),
1875        })?;
1876        tx.payload_size_in_range(range, namespace).await
1877    }
1878
1879    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1880    where
1881        ID: Into<BlockId<Types>> + Send + Sync,
1882    {
1883        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1884            message: err.to_string(),
1885        })?;
1886        tx.vid_share(id).await
1887    }
1888
1889    async fn sync_status(&self) -> QueryResult<SyncStatus> {
1890        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1891            message: err.to_string(),
1892        })?;
1893        tx.sync_status().await
1894    }
1895
1896    async fn get_header_window(
1897        &self,
1898        start: impl Into<WindowStart<Types>> + Send + Sync,
1899        end: u64,
1900        limit: usize,
1901    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
1902        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1903            message: err.to_string(),
1904        })?;
1905        tx.get_header_window(start, end, limit).await
1906    }
1907}
1908
1909#[async_trait]
1910impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
1911where
1912    Types: NodeType,
1913    Payload<Types>: QueryablePayload<Types>,
1914    Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
1915    crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
1916    S: VersionedDataSource + 'static,
1917    for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
1918    P: Send + Sync,
1919{
1920    async fn get_block_summaries(
1921        &self,
1922        request: explorer::query_data::GetBlockSummariesRequest<Types>,
1923    ) -> Result<
1924        Vec<explorer::query_data::BlockSummary<Types>>,
1925        explorer::query_data::GetBlockSummariesError,
1926    > {
1927        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1928            message: err.to_string(),
1929        })?;
1930        tx.get_block_summaries(request).await
1931    }
1932
1933    async fn get_block_detail(
1934        &self,
1935        request: explorer::query_data::BlockIdentifier<Types>,
1936    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
1937    {
1938        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1939            message: err.to_string(),
1940        })?;
1941        tx.get_block_detail(request).await
1942    }
1943
1944    async fn get_transaction_summaries(
1945        &self,
1946        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
1947    ) -> Result<
1948        Vec<explorer::query_data::TransactionSummary<Types>>,
1949        explorer::query_data::GetTransactionSummariesError,
1950    > {
1951        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1952            message: err.to_string(),
1953        })?;
1954        tx.get_transaction_summaries(request).await
1955    }
1956
1957    async fn get_transaction_detail(
1958        &self,
1959        request: explorer::query_data::TransactionIdentifier<Types>,
1960    ) -> Result<
1961        explorer::query_data::TransactionDetailResponse<Types>,
1962        explorer::query_data::GetTransactionDetailError,
1963    > {
1964        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1965            message: err.to_string(),
1966        })?;
1967        tx.get_transaction_detail(request).await
1968    }
1969
1970    async fn get_explorer_summary(
1971        &self,
1972    ) -> Result<
1973        explorer::query_data::ExplorerSummary<Types>,
1974        explorer::query_data::GetExplorerSummaryError,
1975    > {
1976        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1977            message: err.to_string(),
1978        })?;
1979        tx.get_explorer_summary().await
1980    }
1981
1982    async fn get_search_results(
1983        &self,
1984        query: TaggedBase64,
1985    ) -> Result<
1986        explorer::query_data::SearchResult<Types>,
1987        explorer::query_data::GetSearchResultsError,
1988    > {
1989        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1990            message: err.to_string(),
1991        })?;
1992        tx.get_search_results(query).await
1993    }
1994}
1995
1996/// A provider which can be used as a fetcher by the availability service.
1997pub trait AvailabilityProvider<Types: NodeType>:
1998    Provider<Types, request::LeafRequest<Types>>
1999    + Provider<Types, request::PayloadRequest>
2000    + Provider<Types, request::VidCommonRequest>
2001    + Sync
2002    + 'static
2003{
2004}
2005impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2006    P: Provider<Types, request::LeafRequest<Types>>
2007        + Provider<Types, request::PayloadRequest>
2008        + Provider<Types, request::VidCommonRequest>
2009        + Sync
2010        + 'static
2011{
2012}
2013
2014trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2015    /// Indicate whether it is possible this object could exist.
2016    ///
2017    /// This can filter out requests quickly for objects that cannot possibly exist, such as
2018    /// requests for objects with a height greater than the current block height. Not only does this
2019    /// let us fail faster for such requests (without touching storage at all), it also helps keep
2020    /// logging quieter when we fail to fetch an object because the user made a bad request, while
2021    /// still being fairly loud when we fail to fetch an object that might have really existed.
2022    ///
2023    /// This method is conservative: it returns `true` if it cannot tell whether the given object
2024    /// could exist or not.
2025    fn might_exist(self, _heights: Heights) -> bool {
2026        true
2027    }
2028}
2029
2030/// Objects which can be fetched from a remote DA provider and cached in local storage.
2031///
2032/// This trait lets us abstract over leaves, blocks, and other types that can be fetched. Thus, the
2033/// logistics of fetching are shared between all objects, and only the low-level particulars are
2034/// type-specific.
2035#[async_trait]
2036trait Fetchable<Types>: Clone + Send + Sync + 'static
2037where
2038    Types: NodeType,
2039    Header<Types>: QueryableHeader<Types>,
2040    Payload<Types>: QueryablePayload<Types>,
2041{
2042    /// A succinct specification of the object to be fetched.
2043    type Request: FetchRequest;
2044
2045    /// Does this object satisfy the given request?
2046    fn satisfies(&self, req: Self::Request) -> bool;
2047
2048    /// Spawn a task to fetch the object from a remote provider, if possible.
2049    ///
2050    /// An active fetch will only be triggered if:
2051    /// * There is not already an active fetch in progress for the same object
2052    /// * The requested object is known to exist. For example, we will fetch a leaf by height but
2053    ///   not by hash, since we can't guarantee that a leaf with an arbitrary hash exists. Note that
2054    ///   this function assumes `req.might_exist()` has already been checked before calling it, and
2055    ///   so may do unnecessary work if the caller does not ensure this.
2056    ///
2057    /// If we do trigger an active fetch for an object, any passive listeners for the object will be
2058    /// notified once it has been retrieved. If we do not trigger an active fetch for an object,
2059    /// this function does nothing. In either case, as long as the requested object does in fact
2060    /// exist, we will eventually receive it passively, since we will eventually receive all blocks
2061    /// and leaves that are ever produced. Active fetching merely helps us receive certain objects
2062    /// sooner.
2063    ///
2064    /// This function fails if it _might_ be possible to actively fetch the requested object, but we
2065    /// were unable to do so (e.g. due to errors in the database).
2066    async fn active_fetch<S, P>(
2067        tx: &mut impl AvailabilityStorage<Types>,
2068        fetcher: Arc<Fetcher<Types, S, P>>,
2069        req: Self::Request,
2070    ) -> anyhow::Result<()>
2071    where
2072        S: VersionedDataSource + 'static,
2073        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2074        for<'a> S::ReadOnly<'a>:
2075            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2076        P: AvailabilityProvider<Types>;
2077
2078    /// Wait for someone else to fetch the object.
2079    async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2080
2081    /// Load an object from local storage.
2082    ///
2083    /// This function assumes `req.might_exist()` has already been checked before calling it, and so
2084    /// may do unnecessary work if the caller does not ensure this.
2085    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2086    where
2087        S: AvailabilityStorage<Types>;
2088}
2089
2090type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2091
2092#[async_trait]
2093trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2094where
2095    Types: NodeType,
2096    Header<Types>: QueryableHeader<Types>,
2097    Payload<Types>: QueryablePayload<Types>,
2098{
2099    type RangedRequest: FetchRequest + From<usize> + Send;
2100
2101    /// Load a range of these objects from local storage.
2102    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2103    where
2104        S: AvailabilityStorage<Types>,
2105        R: RangeBounds<usize> + Send + 'static;
2106}
2107
2108/// An object which can be stored in the database.
2109trait Storable<Types: NodeType>: HeightIndexed + Clone {
2110    /// The name of this type of object, for debugging purposes.
2111    fn name() -> &'static str;
2112
2113    /// Notify anyone waiting for this object that it has become available.
2114    fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2115
2116    /// Store the object in the local database.
2117    fn store(
2118        self,
2119        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2120        leaf_only: bool,
2121    ) -> impl Send + Future<Output = anyhow::Result<()>>;
2122}
2123
2124impl<Types: NodeType> Storable<Types> for BlockInfo<Types> {
2125    fn name() -> &'static str {
2126        "block info"
2127    }
2128
2129    async fn notify(&self, notifiers: &Notifiers<Types>) {
2130        self.leaf.notify(notifiers).await;
2131
2132        if let Some(block) = &self.block {
2133            block.notify(notifiers).await;
2134        }
2135        if let Some(vid) = &self.vid_common {
2136            vid.notify(notifiers).await;
2137        }
2138    }
2139
2140    async fn store(
2141        self,
2142        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2143        leaf_only: bool,
2144    ) -> anyhow::Result<()> {
2145        storage
2146            .insert_leaf_with_qc_chain(self.leaf, self.qc_chain)
2147            .await?;
2148
2149        if let Some(common) = self.vid_common {
2150            (common, self.vid_share).store(storage, leaf_only).await?;
2151        }
2152
2153        if let Some(block) = self.block {
2154            block.store(storage, leaf_only).await?;
2155        }
2156
2157        Ok(())
2158    }
2159}
2160
2161/// Break a range into fixed-size chunks.
2162fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2163where
2164    R: RangeBounds<usize>,
2165{
2166    // Transform range to explicit start (inclusive) and end (exclusive) bounds.
2167    let mut start = match range.start_bound() {
2168        Bound::Included(i) => *i,
2169        Bound::Excluded(i) => *i + 1,
2170        Bound::Unbounded => 0,
2171    };
2172    let end = match range.end_bound() {
2173        Bound::Included(i) => *i + 1,
2174        Bound::Excluded(i) => *i,
2175        Bound::Unbounded => usize::MAX,
2176    };
2177    std::iter::from_fn(move || {
2178        let chunk_end = min(start + chunk_size, end);
2179        if chunk_end == start {
2180            return None;
2181        }
2182
2183        let chunk = start..chunk_end;
2184        start = chunk_end;
2185        Some(chunk)
2186    })
2187}
2188
2189/// Break a range into fixed-size chunks, starting from the end and moving towards the start.
2190///
2191/// While the chunks are yielded in reverse order, from `end` to `start`, each individual chunk is
2192/// in the usual ascending order. That is, the first chunk ends with `end` and the last chunk starts
2193/// with `start`.
2194///
2195/// Note that unlike [`range_chunks_rev`], which accepts any range and yields an infinite iterator
2196/// if the range has no upper bound, this function requires there to be a defined upper bound,
2197/// otherwise we don't know where the reversed iterator should _start_. The `end` bound given here
2198/// is inclusive; i.e. the end of the first chunk yielded by the stream will be exactly `end`.
2199fn range_chunks_rev(
2200    start: Bound<usize>,
2201    end: usize,
2202    chunk_size: usize,
2203) -> impl Iterator<Item = Range<usize>> {
2204    // Transform the start bound to be inclusive.
2205    let start = match start {
2206        Bound::Included(i) => i,
2207        Bound::Excluded(i) => i + 1,
2208        Bound::Unbounded => 0,
2209    };
2210    // Transform the end bound to be exclusive.
2211    let mut end = end + 1;
2212
2213    std::iter::from_fn(move || {
2214        let chunk_start = max(start, end.saturating_sub(chunk_size));
2215        if end <= chunk_start {
2216            return None;
2217        }
2218
2219        let chunk = chunk_start..end;
2220        end = chunk_start;
2221        Some(chunk)
2222    })
2223}
2224
2225trait ResultExt<T, E> {
2226    fn ok_or_trace(self) -> Option<T>
2227    where
2228        E: Display;
2229}
2230
2231impl<T, E> ResultExt<T, E> for Result<T, E> {
2232    fn ok_or_trace(self) -> Option<T>
2233    where
2234        E: Display,
2235    {
2236        match self {
2237            Ok(t) => Some(t),
2238            Err(err) => {
2239                tracing::info!(
2240                    "error loading resource from local storage, will try to fetch: {err:#}"
2241                );
2242                None
2243            },
2244        }
2245    }
2246}
2247
2248#[derive(Debug)]
2249struct ScannerMetrics {
2250    /// Whether a scan is currently running (1) or not (0).
2251    running: Box<dyn Gauge>,
2252    /// The current number that is running.
2253    current_scan: Box<dyn Gauge>,
2254    /// Whether the currently running scan is a major scan (1) or not (0).
2255    current_is_major: Box<dyn Gauge>,
2256    /// Current backoff delay for retries (s).
2257    backoff: Box<dyn Gauge>,
2258    /// Number of retries since last success.
2259    retries: Box<dyn Gauge>,
2260    /// Block height where the current scan started.
2261    current_start: Box<dyn Gauge>,
2262    /// Block height where the current scan will end.
2263    current_end: Box<dyn Gauge>,
2264    /// Number of blocks processed in the current scan.
2265    scanned_blocks: Box<dyn Gauge>,
2266    /// Number of VID entries processed in the current scan.
2267    scanned_vid: Box<dyn Gauge>,
2268    /// The number of missing blocks encountered in the last major scan.
2269    major_missing_blocks: Box<dyn Gauge>,
2270    /// The number of missing VID entries encountered in the last major scan.
2271    major_missing_vid: Box<dyn Gauge>,
2272    /// The number of missing blocks encountered _cumulatively_ in minor scans since the last major
2273    /// scan.
2274    minor_missing_blocks: Box<dyn Gauge>,
2275    /// The number of missing VID entries encountered _cumulatively_ in minor scans since the last
2276    /// major scan.
2277    minor_missing_vid: Box<dyn Gauge>,
2278}
2279
2280impl ScannerMetrics {
2281    fn new(metrics: &PrometheusMetrics) -> Self {
2282        let group = metrics.subgroup("scanner".into());
2283        Self {
2284            running: group.create_gauge("running".into(), None),
2285            current_scan: group.create_gauge("current".into(), None),
2286            current_is_major: group.create_gauge("is_major".into(), None),
2287            backoff: group.create_gauge("backoff".into(), Some("s".into())),
2288            retries: group.create_gauge("retries".into(), None),
2289            current_start: group.create_gauge("start".into(), None),
2290            current_end: group.create_gauge("end".into(), None),
2291            scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2292            scanned_vid: group.create_gauge("scanned_vid".into(), None),
2293            major_missing_blocks: group.create_gauge("major_missing_blocks".into(), None),
2294            major_missing_vid: group.create_gauge("major_missing_vid".into(), None),
2295            minor_missing_blocks: group.create_gauge("minor_missing_blocks".into(), None),
2296            minor_missing_vid: group.create_gauge("minor_missing_vid".into(), None),
2297        }
2298    }
2299
2300    fn add_missing_blocks(&self, major: bool, missing: usize) {
2301        if major {
2302            self.major_missing_blocks.set(missing);
2303        } else {
2304            self.minor_missing_blocks.update(missing as i64);
2305        }
2306    }
2307
2308    fn add_missing_vid(&self, major: bool, missing: usize) {
2309        if major {
2310            self.major_missing_vid.set(missing);
2311        } else {
2312            self.minor_missing_vid.update(missing as i64);
2313        }
2314    }
2315}
2316
2317#[derive(Debug)]
2318struct AggregatorMetrics {
2319    /// The block height for which aggregate statistics are currently available.
2320    height: Box<dyn Gauge>,
2321}
2322
2323impl AggregatorMetrics {
2324    fn new(metrics: &PrometheusMetrics) -> Self {
2325        let group = metrics.subgroup("aggregator".into());
2326        Self {
2327            height: group.create_gauge("height".into(), None),
2328        }
2329    }
2330}
2331
2332/// Turn a fallible passive fetch future into an infallible "fetch".
2333///
2334/// Basically, we ignore failures due to a channel sender being dropped, which should never happen.
2335fn passive<T>(
2336    req: impl Debug + Send + 'static,
2337    fut: impl Future<Output = Option<T>> + Send + 'static,
2338) -> Fetch<T>
2339where
2340    T: Send + 'static,
2341{
2342    Fetch::Pending(
2343        fut.then(move |opt| async move {
2344            match opt {
2345                Some(t) => t,
2346                None => {
2347                    // If `passive_fetch` returns `None`, it means the notifier was dropped without
2348                    // ever sending a notification. In this case, the correct behavior is actually
2349                    // to block forever (unless the `Fetch` itself is dropped), since the semantics
2350                    // of `Fetch` are to never fail. This is analogous to fetching an object which
2351                    // doesn't actually exist: the `Fetch` will never return.
2352                    //
2353                    // However, for ease of debugging, and since this is never expected to happen in
2354                    // normal usage, we panic instead. This should only happen in two cases:
2355                    // * The server was shut down (dropping the notifier) without cleaning up some
2356                    //   background tasks. This will not affect runtime behavior, but should be
2357                    //   fixed if it happens.
2358                    // * There is a very unexpected runtime bug resulting in the notifier being
2359                    //   dropped. If this happens, things are very broken in any case, and it is
2360                    //   better to panic loudly than simply block forever.
2361                    panic!("notifier dropped without satisfying request {req:?}");
2362                },
2363            }
2364        })
2365        .boxed(),
2366    )
2367}
2368
2369/// Get the result of the first future to return `Some`, if either do.
2370async fn select_some<T>(
2371    a: impl Future<Output = Option<T>> + Unpin,
2372    b: impl Future<Output = Option<T>> + Unpin,
2373) -> Option<T> {
2374    match future::select(a, b).await {
2375        // If the first future resolves with `Some`, immediately return the result.
2376        Either::Left((Some(a), _)) => Some(a),
2377        Either::Right((Some(b), _)) => Some(b),
2378
2379        // If the first future resolves with `None`, wait for the result of the second future.
2380        Either::Left((None, b)) => b.await,
2381        Either::Right((None, a)) => a.await,
2382    }
2383}
2384
2385#[cfg(test)]
2386mod test {
2387    use super::*;
2388
2389    #[test]
2390    fn test_range_chunks() {
2391        // Inclusive bounds, partial last chunk.
2392        assert_eq!(
2393            range_chunks(0..=4, 2).collect::<Vec<_>>(),
2394            [0..2, 2..4, 4..5]
2395        );
2396
2397        // Inclusive bounds, complete last chunk.
2398        assert_eq!(
2399            range_chunks(0..=5, 2).collect::<Vec<_>>(),
2400            [0..2, 2..4, 4..6]
2401        );
2402
2403        // Exclusive bounds, partial last chunk.
2404        assert_eq!(
2405            range_chunks(0..5, 2).collect::<Vec<_>>(),
2406            [0..2, 2..4, 4..5]
2407        );
2408
2409        // Exclusive bounds, complete last chunk.
2410        assert_eq!(
2411            range_chunks(0..6, 2).collect::<Vec<_>>(),
2412            [0..2, 2..4, 4..6]
2413        );
2414
2415        // Unbounded.
2416        assert_eq!(
2417            range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2418            [0..2, 2..4, 4..6, 6..8, 8..10]
2419        );
2420    }
2421
2422    #[test]
2423    fn test_range_chunks_rev() {
2424        // Inclusive bounds, partial last chunk.
2425        assert_eq!(
2426            range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2427            [3..5, 1..3, 0..1]
2428        );
2429
2430        // Inclusive bounds, complete last chunk.
2431        assert_eq!(
2432            range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2433            [4..6, 2..4, 0..2]
2434        );
2435
2436        // Exclusive bounds, partial last chunk.
2437        assert_eq!(
2438            range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2439            [4..6, 2..4, 1..2]
2440        );
2441
2442        // Exclusive bounds, complete last chunk.
2443        assert_eq!(
2444            range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2445            [3..5, 1..3]
2446        );
2447    }
2448}