hotshot_query_service/data_source/
fetching.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Asynchronous retrieval of missing data.
14//!
15//! [`FetchingDataSource`] combines a local storage implementation with a remote data availability
16//! provider to create a data sources which caches data locally, but which is capable of fetching
17//! missing data from a remote source, either proactively or on demand.
18//!
19//! This implementation supports three kinds of data fetching.
20//!
21//! # Proactive Fetching
22//!
23//! Proactive fetching means actively scanning the local database for missing objects and
24//! proactively retrieving them from a remote provider, even if those objects have yet to be
25//! requested by a client. Doing this increases the chance of success and decreases latency when a
26//! client does eventually ask for those objects. This is also the mechanism by which a query
27//! service joining a network late, or having been offline for some time, is able to catch up with
28//! the events on the network that it missed.
29//!
30//! The current implementation of proactive fetching is meant to be the simplest effective algorithm
31//! which still gives us a reasonable range of configuration options for experimentation. It is
32//! subject to change as we learn about the behavior of proactive fetching in a realistic system.
33//!
34//! Proactive fetching is currently implemented by a background task which performs periodic scans
35//! of the database, identifying and retrieving missing objects. This task is generally low
36//! priority, since missing objects are rare, and it will take care not to monopolize resources that
37//! could be used to serve requests. To reduce load and to optimize for the common case where blocks
38//! are usually not missing once they have already been retrieved, we distinguish between _major_
39//! and _minor_ scans.
40//!
41//! Minor scans are lightweight and can run very frequently. They will only look for missing
42//! blocks among blocks that are new since the previous scan. Thus, the more frequently minor
43//! scans run, the less work they have to do. This allows them to run frequently, giving low
44//! latency for retrieval of newly produced blocks that we failed to receive initially. Between
45//! each minor scan, the task will sleep for [a configurable
46//! duration](Builder::with_minor_scan_interval) to wait for new blocks to be produced and give
47//! other tasks full access to all shared resources.
48//!
49//! Every `n`th scan (`n` is [configurable](Builder::with_major_scan_interval)) is a major scan.
50//! These scan all blocks from 0, which guarantees that we will eventually retrieve all blocks, even
51//! if for some reason we have lost a block that we previously had (due to storage failures and
52//! corruptions, or simple bugs in this software). These scans are rather expensive (although they
53//! will release control of shared resources many times during the duration of the scan), but
54//! because it is rather unlikely that a major scan will discover any missing blocks that the next
55//! minor scan would have missed, it is ok if major scans run very infrequently.
56//!
57//! # Active Fetching
58//!
59//! Active fetching means reaching out to a remote data availability provider to retrieve a missing
60//! resource, upon receiving a request for that resource from a client. Not every request for a
61//! missing resource triggers an active fetch. To avoid spamming peers with requests for missing
62//! data, we only actively fetch resources that are known to exist somewhere. This means we can
63//! actively fetch leaves and headers when we are requested a leaf or header by height, whose height
64//! is less than the current chain height. We can fetch a block when the corresponding header exists
65//! (corresponding based on height, hash, or payload hash) or can be actively fetched.
66//!
67//! # Passive Fetching
68//!
69//! For requests that cannot be actively fetched (for example, a block requested by hash, where we
70//! do not have a header proving that a block with that hash exists), we use passive fetching. This
71//! essentially means waiting passively until the query service receives an object that satisfies
72//! the request. This object may be received because it was actively fetched in responsive to a
73//! different request for the same object, one that permitted an active fetch. Or it may have been
74//! fetched [proactively](#proactive-fetching).
75
76use std::{
77    cmp::{max, min},
78    fmt::{Debug, Display},
79    iter::repeat_with,
80    marker::PhantomData,
81    ops::{Bound, Range, RangeBounds},
82    sync::Arc,
83    time::Duration,
84};
85
86use anyhow::{bail, Context};
87use async_lock::Semaphore;
88use async_trait::async_trait;
89use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder};
90use derivative::Derivative;
91use futures::{
92    channel::oneshot,
93    future::{self, join_all, BoxFuture, Either, Future, FutureExt},
94    stream::{self, BoxStream, StreamExt},
95};
96use hotshot_types::{
97    data::VidShare,
98    traits::{
99        metrics::{Gauge, Metrics},
100        node_implementation::NodeType,
101    },
102};
103use jf_merkle_tree::{prelude::MerkleProof, MerkleTreeScheme};
104use tagged_base64::TaggedBase64;
105use tokio::{spawn, time::sleep};
106use tracing::Instrument;
107
108use super::{
109    notifier::Notifier,
110    storage::{
111        pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
112        sql::MigrateTypes,
113        Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
114        MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
115        UpdateAvailabilityStorage,
116    },
117    Transaction, VersionedDataSource,
118};
119use crate::{
120    availability::{
121        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
122        FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
123        PayloadQueryData, QueryableHeader, QueryablePayload, StateCertQueryDataV2, TransactionHash,
124        UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
125    },
126    explorer::{self, ExplorerDataSource},
127    fetching::{
128        self,
129        request::{self, StateCertRequest},
130        Provider,
131    },
132    merklized_state::{
133        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
134    },
135    metrics::PrometheusMetrics,
136    node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
137    status::{HasMetrics, StatusDataSource},
138    task::BackgroundTask,
139    types::HeightIndexed,
140    Header, Payload, QueryError, QueryResult,
141};
142
143mod block;
144mod header;
145mod leaf;
146mod state_cert;
147mod transaction;
148mod vid;
149
150use self::{
151    block::PayloadFetcher,
152    leaf::LeafFetcher,
153    transaction::TransactionRequest,
154    vid::{VidCommonFetcher, VidCommonRequest},
155};
156
157/// Builder for [`FetchingDataSource`] with configuration.
158pub struct Builder<Types, S, P> {
159    storage: S,
160    provider: P,
161    backoff: ExponentialBackoffBuilder,
162    rate_limit: usize,
163    range_chunk_size: usize,
164    minor_scan_interval: Duration,
165    major_scan_interval: usize,
166    major_scan_offset: usize,
167    proactive_range_chunk_size: Option<usize>,
168    active_fetch_delay: Duration,
169    chunk_fetch_delay: Duration,
170    proactive_fetching: bool,
171    aggregator: bool,
172    aggregator_chunk_size: Option<usize>,
173    types_migration_batch_size: u64,
174    leaf_only: bool,
175    _types: PhantomData<Types>,
176}
177
178impl<Types, S, P> Builder<Types, S, P> {
179    /// Construct a new builder with the given storage and fetcher and the default options.
180    pub fn new(storage: S, provider: P) -> Self {
181        let mut default_backoff = ExponentialBackoffBuilder::default();
182        default_backoff
183            .with_initial_interval(Duration::from_secs(1))
184            .with_multiplier(2.)
185            .with_max_interval(Duration::from_secs(32))
186            .with_max_elapsed_time(Some(Duration::from_secs(64)));
187
188        Self {
189            storage,
190            provider,
191            backoff: default_backoff,
192            rate_limit: 32,
193            range_chunk_size: 25,
194            // By default, we run minor proactive scans fairly frequently: once every minute. These
195            // scans are cheap (more the more frequently they run) and can help us keep up with
196            // the head of the chain even if our attached consensus instance is behind.
197            minor_scan_interval: Duration::from_secs(60),
198            // Major scans, on the other hand, are rather expensive and not especially important for
199            // usability. We run them rarely, once every 60 minor scans, or once every hour by
200            // default.
201            major_scan_interval: 60,
202            // Major scan offset can be used when starting multiple nodes at the same time, so they
203            // don't all pause for a major scan together.
204            major_scan_offset: 0,
205            proactive_range_chunk_size: None,
206            active_fetch_delay: Duration::from_millis(50),
207            chunk_fetch_delay: Duration::from_millis(100),
208            proactive_fetching: true,
209            aggregator: true,
210            aggregator_chunk_size: None,
211            types_migration_batch_size: 10000,
212            leaf_only: false,
213            _types: Default::default(),
214        }
215    }
216
217    pub fn leaf_only(mut self) -> Self {
218        self.leaf_only = true;
219        self
220    }
221
222    /// Set the minimum delay between retries of failed operations.
223    pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
224        self.backoff.with_initial_interval(interval);
225        self
226    }
227
228    /// Set the maximum delay between retries of failed operations.
229    pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
230        self.backoff.with_max_interval(interval);
231        self
232    }
233
234    /// Set the multiplier for exponential backoff when retrying failed requests.
235    pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
236        self.backoff.with_multiplier(multiplier);
237        self
238    }
239
240    /// Set the randomization factor for randomized backoff when retrying failed requests.
241    pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
242        self.backoff.with_randomization_factor(factor);
243        self
244    }
245
246    /// Set the maximum time to retry failed operations before giving up.
247    pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
248        self.backoff.with_max_elapsed_time(Some(timeout));
249        self
250    }
251
252    /// Set the maximum number of simultaneous fetches.
253    pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
254        self.rate_limit = with_rate_limit;
255        self
256    }
257
258    /// Set the number of items to process at a time when loading a range or stream.
259    ///
260    /// This determines:
261    /// * The number of objects to load from storage in a single request
262    /// * The number of objects to buffer in memory per request/stream
263    /// * The number of concurrent notification subscriptions per request/stream
264    pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
265        self.range_chunk_size = range_chunk_size;
266        self
267    }
268
269    /// Set the time interval between minor proactive fetching scans.
270    ///
271    /// See [proactive fetching](self#proactive-fetching).
272    pub fn with_minor_scan_interval(mut self, interval: Duration) -> Self {
273        self.minor_scan_interval = interval;
274        self
275    }
276
277    /// Set the interval (denominated in [minor scans](Self::with_minor_scan_interval)) between
278    /// major proactive fetching scans.
279    ///
280    /// See [proactive fetching](self#proactive-fetching).
281    pub fn with_major_scan_interval(mut self, interval: usize) -> Self {
282        self.major_scan_interval = interval;
283        self
284    }
285
286    /// Set the offset (denominated in [minor scans](Self::with_minor_scan_interval)) before the
287    /// first major proactive fetching scan.
288    ///
289    /// This is useful when starting multiple nodes at the same time: major proactive scans can have
290    /// a measurable impact on the performance of the node for a brief time while the scan is
291    /// running, so it may be desirable to prevent a group of nodes from all doing major scans at
292    /// the same time. This can be achieved by giving each node a different `major_scan_offset`.
293    ///
294    /// See also [proactive fetching](self#proactive-fetching).
295    pub fn with_major_scan_offset(mut self, offset: usize) -> Self {
296        self.major_scan_offset = offset;
297        self
298    }
299
300    /// Set the number of items to process at a time when scanning for proactive fetching.
301    ///
302    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
303    /// proactive fetching scans, not for normal subscription streams. This can be useful to tune
304    /// the proactive scanner to be more or less greedy with persistent storage resources.
305    ///
306    /// By default (i.e. if this method is not called) the proactive range chunk size will be set to
307    /// whatever the normal range chunk size is.
308    pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
309        self.proactive_range_chunk_size = Some(range_chunk_size);
310        self
311    }
312
313    /// Add a delay between active fetches in proactive scans.
314    ///
315    /// This can be used to limit the rate at which this query service makes requests to other query
316    /// services during proactive scans. This is useful if the query service has a lot of blocks to
317    /// catch up on, as without a delay, scanning can be extremely burdensome on the peer.
318    pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
319        self.active_fetch_delay = active_fetch_delay;
320        self
321    }
322
323    /// Adds a delay between chunk fetches during proactive scans.
324    ///
325    /// In a proactive scan, we retrieve a range of objects from a provider or local storage (e.g., a database).
326    /// Without a delay between fetching these chunks, the process can become very CPU-intensive, especially
327    /// when chunks are retrieved from local storage. While there is already a delay for active fetches
328    /// (`active_fetch_delay`), situations may arise when subscribed to an old stream that fetches most of the data
329    /// from local storage.
330    ///
331    /// This additional delay helps to limit constant maximum CPU usage
332    /// and ensures that local storage remains accessible to all processes,
333    /// not just the proactive scanner.
334    pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
335        self.chunk_fetch_delay = chunk_fetch_delay;
336        self
337    }
338
339    /// Run without [proactive fetching](self#proactive-fetching).
340    ///
341    /// This can reduce load on the CPU and the database, but increases the probability that
342    /// requests will fail due to missing resources. If resources are constrained, it is recommended
343    /// to run with rare proactive fetching (see
344    /// [`with_major_scan_interval`](Self::with_major_scan_interval),
345    /// [`with_minor_scan_interval`](Self::with_minor_scan_interval)), rather than disabling it
346    /// entirely.
347    pub fn disable_proactive_fetching(mut self) -> Self {
348        self.proactive_fetching = false;
349        self
350    }
351
352    /// Run without an aggregator.
353    ///
354    /// This can reduce load on the CPU and the database, but it will cause aggregate statistics
355    /// (such as transaction counts) not to update.
356    pub fn disable_aggregator(mut self) -> Self {
357        self.aggregator = false;
358        self
359    }
360
361    /// Set the number of items to process at a time when computing aggregate statistics.
362    ///
363    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
364    /// the aggregator task, not for normal subscription streams. This can be useful to tune
365    /// the aggregator to be more or less greedy with persistent storage resources.
366    ///
367    /// By default (i.e. if this method is not called) the proactive range chunk size will be set to
368    /// whatever the normal range chunk size is.
369    pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
370        self.aggregator_chunk_size = Some(chunk_size);
371        self
372    }
373
374    /// Sets the batch size for the types migration.
375    /// Determines how many `(leaf, vid)` rows are selected from the old types table
376    /// and migrated at once.
377    pub fn with_types_migration_batch_size(mut self, batch: u64) -> Self {
378        self.types_migration_batch_size = batch;
379        self
380    }
381
382    pub fn is_leaf_only(&self) -> bool {
383        self.leaf_only
384    }
385}
386
387impl<Types, S, P> Builder<Types, S, P>
388where
389    Types: NodeType,
390    Payload<Types>: QueryablePayload<Types>,
391    Header<Types>: QueryableHeader<Types>,
392    S: PruneStorage + VersionedDataSource + HasMetrics + MigrateTypes<Types> + 'static,
393    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
394        + PrunedHeightStorage
395        + NodeStorage<Types>
396        + AggregatesStorage<Types>,
397    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
398    P: AvailabilityProvider<Types>,
399{
400    /// Build a [`FetchingDataSource`] with these options.
401    pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
402        FetchingDataSource::new(self).await
403    }
404}
405
406/// The most basic kind of data source.
407///
408/// A data source is constructed modularly by combining a [storage](super::storage) implementation
409/// with a [Fetcher](crate::fetching::Fetcher). The former allows the query service to store the
410/// data it has persistently in an easily accessible storage medium, such as the local file system
411/// or a database. This allows it to answer queries efficiently and to maintain its state across
412/// restarts. The latter allows the query service to fetch data that is missing from its storage
413/// from an external data availability provider, such as the Tiramisu DA network or another instance
414/// of the query service.
415///
416/// These two components of a data source are combined in [`FetchingDataSource`], which is the
417/// lowest level kind of data source available. It simply uses the storage implementation to fetch
418/// data when available, and fills in everything else using the fetcher. Various kinds of data
419/// sources can be constructed out of [`FetchingDataSource`] by changing the storage and fetcher
420/// implementations used, and more complex data sources can be built on top using data source
421/// combinators.
422#[derive(Derivative)]
423#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
424pub struct FetchingDataSource<Types, S, P>
425where
426    Types: NodeType,
427{
428    // The fetcher manages retrieval of resources from both local storage and a remote provider. It
429    // encapsulates the data which may need to be shared with a long-lived task or future that
430    // implements the asynchronous fetching of a particular object. This is why it gets its own
431    // type, wrapped in an [`Arc`] for easy, efficient cloning.
432    fetcher: Arc<Fetcher<Types, S, P>>,
433    // The proactive scanner task. This is only saved here so that we can cancel it on drop.
434    scanner: Option<BackgroundTask>,
435    // The aggregator task, which derives aggregate statistics from a block stream.
436    aggregator: Option<BackgroundTask>,
437    pruner: Pruner<Types, S>,
438}
439
440#[derive(Derivative)]
441#[derivative(Clone(bound = ""), Debug(bound = "S: Debug,   "))]
442pub struct Pruner<Types, S>
443where
444    Types: NodeType,
445{
446    handle: Option<BackgroundTask>,
447    _types: PhantomData<(Types, S)>,
448}
449
450impl<Types, S> Pruner<Types, S>
451where
452    Types: NodeType,
453    Header<Types>: QueryableHeader<Types>,
454    Payload<Types>: QueryablePayload<Types>,
455    S: PruneStorage + Send + Sync + 'static,
456{
457    async fn new(storage: Arc<S>) -> Self {
458        let cfg = storage.get_pruning_config();
459        let Some(cfg) = cfg else {
460            return Self {
461                handle: None,
462                _types: Default::default(),
463            };
464        };
465
466        let future = async move {
467            for i in 1.. {
468                tracing::warn!("starting pruner run {i} ");
469                Self::prune(storage.clone()).await;
470                sleep(cfg.interval()).await;
471            }
472        };
473
474        let task = BackgroundTask::spawn("pruner", future);
475
476        Self {
477            handle: Some(task),
478            _types: Default::default(),
479        }
480    }
481
482    async fn prune(storage: Arc<S>) {
483        // We loop until the whole run pruner run is complete
484        let mut pruner = S::Pruner::default();
485        loop {
486            match storage.prune(&mut pruner).await {
487                Ok(Some(height)) => {
488                    tracing::warn!("Pruned to height {height}");
489                },
490                Ok(None) => {
491                    tracing::warn!("pruner run complete.");
492                    break;
493                },
494                Err(e) => {
495                    tracing::error!("pruner run failed: {e:?}");
496                    break;
497                },
498            }
499        }
500    }
501}
502
503impl<Types, S, P> FetchingDataSource<Types, S, P>
504where
505    Types: NodeType,
506    Payload<Types>: QueryablePayload<Types>,
507    Header<Types>: QueryableHeader<Types>,
508    S: VersionedDataSource + PruneStorage + HasMetrics + MigrateTypes<Types> + 'static,
509    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
510    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
511        + NodeStorage<Types>
512        + PrunedHeightStorage
513        + AggregatesStorage<Types>,
514    P: AvailabilityProvider<Types>,
515{
516    /// Build a [`FetchingDataSource`] with the given `storage` and `provider`.
517    pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
518        Builder::new(storage, provider)
519    }
520
521    async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
522        let leaf_only = builder.is_leaf_only();
523        let aggregator = builder.aggregator;
524        let aggregator_chunk_size = builder
525            .aggregator_chunk_size
526            .unwrap_or(builder.range_chunk_size);
527        let proactive_fetching = builder.proactive_fetching;
528        let minor_interval = builder.minor_scan_interval;
529        let major_interval = builder.major_scan_interval;
530        let major_offset = builder.major_scan_offset;
531        let proactive_range_chunk_size = builder
532            .proactive_range_chunk_size
533            .unwrap_or(builder.range_chunk_size);
534        let migration_batch_size = builder.types_migration_batch_size;
535        let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
536        let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
537
538        let fetcher = Arc::new(Fetcher::new(builder).await?);
539
540        // Migrate the old types to new PoS types
541        // This is a one-time operation that should be done before starting the data source
542        // It migrates leaf1 storage to leaf2
543        // and vid to vid2
544        fetcher.storage.migrate_types(migration_batch_size).await?;
545
546        let scanner = if proactive_fetching && !leaf_only {
547            Some(BackgroundTask::spawn(
548                "proactive scanner",
549                fetcher.clone().proactive_scan(
550                    minor_interval,
551                    major_interval,
552                    major_offset,
553                    proactive_range_chunk_size,
554                    scanner_metrics,
555                ),
556            ))
557        } else {
558            None
559        };
560
561        let aggregator = if aggregator && !leaf_only {
562            Some(BackgroundTask::spawn(
563                "aggregator",
564                fetcher
565                    .clone()
566                    .aggregate(aggregator_chunk_size, aggregator_metrics),
567            ))
568        } else {
569            None
570        };
571
572        let storage = fetcher.storage.clone();
573
574        let pruner = Pruner::new(storage).await;
575        let ds = Self {
576            fetcher,
577            scanner,
578            pruner,
579            aggregator,
580        };
581
582        Ok(ds)
583    }
584
585    /// Get a copy of the (shared) inner storage
586    pub fn inner(&self) -> Arc<S> {
587        self.fetcher.storage.clone()
588    }
589}
590
591impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
592where
593    Types: NodeType,
594{
595    fn as_ref(&self) -> &S {
596        &self.fetcher.storage
597    }
598}
599
600impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
601where
602    Types: NodeType,
603    S: HasMetrics,
604{
605    fn metrics(&self) -> &PrometheusMetrics {
606        self.as_ref().metrics()
607    }
608}
609
610#[async_trait]
611impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
612where
613    Types: NodeType,
614    Header<Types>: QueryableHeader<Types>,
615    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
616    for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
617    P: Send + Sync,
618{
619    async fn block_height(&self) -> QueryResult<usize> {
620        let mut tx = self.read().await.map_err(|err| QueryError::Error {
621            message: err.to_string(),
622        })?;
623        tx.block_height().await
624    }
625}
626
627#[async_trait]
628impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
629where
630    Types: NodeType,
631    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
632    for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
633    P: Send + Sync,
634{
635    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
636        let mut tx = self.read().await?;
637        tx.load_pruned_height().await
638    }
639}
640
641#[async_trait]
642impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
643where
644    Types: NodeType,
645    Header<Types>: QueryableHeader<Types>,
646    Payload<Types>: QueryablePayload<Types>,
647    S: VersionedDataSource + 'static,
648    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
649    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
650    P: AvailabilityProvider<Types>,
651{
652    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
653    where
654        ID: Into<LeafId<Types>> + Send + Sync,
655    {
656        self.fetcher.get(id.into()).await
657    }
658
659    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
660    where
661        ID: Into<BlockId<Types>> + Send + Sync,
662    {
663        self.fetcher
664            .get::<HeaderQueryData<_>>(id.into())
665            .await
666            .map(|h| h.header)
667    }
668
669    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
670    where
671        ID: Into<BlockId<Types>> + Send + Sync,
672    {
673        self.fetcher.get(id.into()).await
674    }
675
676    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
677    where
678        ID: Into<BlockId<Types>> + Send + Sync,
679    {
680        self.fetcher.get(id.into()).await
681    }
682
683    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
684    where
685        ID: Into<BlockId<Types>> + Send + Sync,
686    {
687        self.fetcher.get(id.into()).await
688    }
689
690    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
691    where
692        ID: Into<BlockId<Types>> + Send + Sync,
693    {
694        self.fetcher.get(VidCommonRequest::from(id.into())).await
695    }
696
697    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
698    where
699        ID: Into<BlockId<Types>> + Send + Sync,
700    {
701        self.fetcher.get(VidCommonRequest::from(id.into())).await
702    }
703
704    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
705    where
706        R: RangeBounds<usize> + Send + 'static,
707    {
708        self.fetcher.clone().get_range(range)
709    }
710
711    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
712    where
713        R: RangeBounds<usize> + Send + 'static,
714    {
715        self.fetcher.clone().get_range(range)
716    }
717
718    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
719    where
720        R: RangeBounds<usize> + Send + 'static,
721    {
722        let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
723
724        leaves
725            .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
726            .boxed()
727    }
728
729    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
730    where
731        R: RangeBounds<usize> + Send + 'static,
732    {
733        self.fetcher.clone().get_range(range)
734    }
735
736    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
737    where
738        R: RangeBounds<usize> + Send + 'static,
739    {
740        self.fetcher.clone().get_range(range)
741    }
742
743    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
744    where
745        R: RangeBounds<usize> + Send + 'static,
746    {
747        self.fetcher.clone().get_range(range)
748    }
749
750    async fn get_vid_common_metadata_range<R>(
751        &self,
752        range: R,
753    ) -> FetchStream<VidCommonMetadata<Types>>
754    where
755        R: RangeBounds<usize> + Send + 'static,
756    {
757        self.fetcher.clone().get_range(range)
758    }
759
760    async fn get_leaf_range_rev(
761        &self,
762        start: Bound<usize>,
763        end: usize,
764    ) -> FetchStream<LeafQueryData<Types>> {
765        self.fetcher.clone().get_range_rev(start, end)
766    }
767
768    async fn get_block_range_rev(
769        &self,
770        start: Bound<usize>,
771        end: usize,
772    ) -> FetchStream<BlockQueryData<Types>> {
773        self.fetcher.clone().get_range_rev(start, end)
774    }
775
776    async fn get_payload_range_rev(
777        &self,
778        start: Bound<usize>,
779        end: usize,
780    ) -> FetchStream<PayloadQueryData<Types>> {
781        self.fetcher.clone().get_range_rev(start, end)
782    }
783
784    async fn get_payload_metadata_range_rev(
785        &self,
786        start: Bound<usize>,
787        end: usize,
788    ) -> FetchStream<PayloadMetadata<Types>> {
789        self.fetcher.clone().get_range_rev(start, end)
790    }
791
792    async fn get_vid_common_range_rev(
793        &self,
794        start: Bound<usize>,
795        end: usize,
796    ) -> FetchStream<VidCommonQueryData<Types>> {
797        self.fetcher.clone().get_range_rev(start, end)
798    }
799
800    async fn get_vid_common_metadata_range_rev(
801        &self,
802        start: Bound<usize>,
803        end: usize,
804    ) -> FetchStream<VidCommonMetadata<Types>> {
805        self.fetcher.clone().get_range_rev(start, end)
806    }
807
808    async fn get_block_containing_transaction(
809        &self,
810        h: TransactionHash<Types>,
811    ) -> Fetch<BlockWithTransaction<Types>> {
812        self.fetcher.clone().get(TransactionRequest::from(h)).await
813    }
814
815    async fn get_state_cert(&self, epoch: u64) -> Fetch<StateCertQueryDataV2<Types>> {
816        self.fetcher.get(StateCertRequest::from(epoch)).await
817    }
818}
819
820impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
821where
822    Types: NodeType,
823    Header<Types>: QueryableHeader<Types>,
824    Payload<Types>: QueryablePayload<Types>,
825    S: VersionedDataSource + 'static,
826    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
827    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
828    P: AvailabilityProvider<Types>,
829{
830    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
831        let height = info.height() as usize;
832        let fetch_block = info.block.is_none();
833        let fetch_vid = info.vid_common.is_none();
834
835        // Trigger a fetch of the parent leaf, if we don't already have it.
836        leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
837
838        self.fetcher.store_and_notify(info).await;
839
840        if fetch_block || fetch_vid {
841            // If data related to this block is missing, try and fetch it. Do this in an async task:
842            // we're triggering a fire-and-forget fetch; we don't need to block the caller on this.
843            let fetcher = self.fetcher.clone();
844            let span = tracing::info_span!("fetch missing data", height);
845            spawn(
846                async move {
847                    tracing::info!(fetch_block, fetch_vid, "fetching missing data");
848                    if fetch_block {
849                        fetcher.get::<PayloadMetadata<Types>>(height).await;
850                    }
851                    if fetch_vid {
852                        fetcher.get::<VidCommonMetadata<Types>>(height).await;
853                    }
854                }
855                .instrument(span),
856            );
857        }
858        Ok(())
859    }
860}
861
862impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
863where
864    Types: NodeType,
865    S: VersionedDataSource + Send + Sync,
866    P: Send + Sync,
867{
868    type Transaction<'a>
869        = S::Transaction<'a>
870    where
871        Self: 'a;
872    type ReadOnly<'a>
873        = S::ReadOnly<'a>
874    where
875        Self: 'a;
876
877    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
878        self.fetcher.write().await
879    }
880
881    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
882        self.fetcher.read().await
883    }
884}
885
886/// Asynchronous retrieval and storage of [`Fetchable`] resources.
887#[derive(Debug)]
888struct Fetcher<Types, S, P>
889where
890    Types: NodeType,
891{
892    storage: Arc<S>,
893    notifiers: Notifiers<Types>,
894    provider: Arc<P>,
895    leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
896    payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
897    vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
898    range_chunk_size: usize,
899    // Duration to sleep after each active fetch,
900    active_fetch_delay: Duration,
901    // Duration to sleep after each chunk fetched
902    chunk_fetch_delay: Duration,
903    // Exponential backoff when retrying failed operations.
904    backoff: ExponentialBackoff,
905    // Semaphore limiting the number of simultaneous DB accesses we can have from tasks spawned to
906    // retry failed loads.
907    retry_semaphore: Arc<Semaphore>,
908    leaf_only: bool,
909}
910
911impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
912where
913    Types: NodeType,
914    S: VersionedDataSource + Send + Sync,
915    P: Send + Sync,
916{
917    type Transaction<'a>
918        = S::Transaction<'a>
919    where
920        Self: 'a;
921    type ReadOnly<'a>
922        = S::ReadOnly<'a>
923    where
924        Self: 'a;
925
926    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
927        self.storage.write().await
928    }
929
930    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
931        self.storage.read().await
932    }
933}
934
935impl<Types, S, P> Fetcher<Types, S, P>
936where
937    Types: NodeType,
938    Header<Types>: QueryableHeader<Types>,
939    S: VersionedDataSource + Sync,
940    for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
941{
942    pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
943        let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
944        let backoff = builder.backoff.build();
945
946        let (payload_fetcher, vid_fetcher) = if builder.is_leaf_only() {
947            (None, None)
948        } else {
949            (
950                Some(Arc::new(fetching::Fetcher::new(
951                    retry_semaphore.clone(),
952                    backoff.clone(),
953                ))),
954                Some(Arc::new(fetching::Fetcher::new(
955                    retry_semaphore.clone(),
956                    backoff.clone(),
957                ))),
958            )
959        };
960        let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
961
962        let leaf_only = builder.leaf_only;
963
964        Ok(Self {
965            storage: Arc::new(builder.storage),
966            notifiers: Default::default(),
967            provider: Arc::new(builder.provider),
968            leaf_fetcher: Arc::new(leaf_fetcher),
969            payload_fetcher,
970            vid_common_fetcher: vid_fetcher,
971            range_chunk_size: builder.range_chunk_size,
972            active_fetch_delay: builder.active_fetch_delay,
973            chunk_fetch_delay: builder.chunk_fetch_delay,
974            backoff,
975            retry_semaphore,
976            leaf_only,
977        })
978    }
979}
980
981impl<Types, S, P> Fetcher<Types, S, P>
982where
983    Types: NodeType,
984    Header<Types>: QueryableHeader<Types>,
985    Payload<Types>: QueryablePayload<Types>,
986    S: VersionedDataSource + 'static,
987    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
988    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
989    P: AvailabilityProvider<Types>,
990{
991    async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
992    where
993        T: Fetchable<Types>,
994    {
995        let req = req.into();
996
997        // Subscribe to notifications before we check storage for the requested object. This ensures
998        // that this operation will always eventually succeed as long as the requested object
999        // actually exists (or will exist). We will either find it in our local storage and succeed
1000        // immediately, or (if it exists) someone will *later* come and add it to storage, at which
1001        // point we will get a notification causing this passive fetch to resolve.
1002        //
1003        // Note the "someone" who later fetches the object and adds it to storage may be an active
1004        // fetch triggered by this very requests, in cases where that is possible, but it need not
1005        // be.
1006        let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
1007
1008        match self.try_get(req).await {
1009            Ok(Some(obj)) => return Fetch::Ready(obj),
1010            Ok(None) => return passive(req, passive_fetch),
1011            Err(err) => {
1012                tracing::warn!(
1013                    ?req,
1014                    "unable to fetch object; spawning a task to retry: {err:#}"
1015                );
1016            },
1017        }
1018
1019        // We'll use this channel to get the object back if we successfully load it on retry.
1020        let (send, recv) = oneshot::channel();
1021
1022        let fetcher = self.clone();
1023        let mut backoff = fetcher.backoff.clone();
1024        let span = tracing::warn_span!("get retry", ?req);
1025        spawn(
1026            async move {
1027                let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1028                loop {
1029                    let res = {
1030                        // Limit the number of simultaneous retry tasks hitting the database. When
1031                        // the database is down, we might have a lot of these tasks running, and if
1032                        // they all hit the DB at once, they are only going to make things worse.
1033                        let _guard = fetcher.retry_semaphore.acquire().await;
1034                        fetcher.try_get(req).await
1035                    };
1036                    match res {
1037                        Ok(Some(obj)) => {
1038                            // If the object was immediately available after all, signal the
1039                            // original fetch. We probably just temporarily couldn't access it due
1040                            // to database errors.
1041                            tracing::info!(?req, "object was ready after retries");
1042                            send.send(obj).ok();
1043                            break;
1044                        },
1045                        Ok(None) => {
1046                            // The object was not immediately available after all, but we have
1047                            // successfully spawned a fetch for it if possible. The spawned fetch
1048                            // will notify the original request once it completes.
1049                            tracing::info!(?req, "spawned fetch after retries");
1050                            break;
1051                        },
1052                        Err(err) => {
1053                            tracing::warn!(
1054                                ?req,
1055                                ?delay,
1056                                "unable to fetch object, will retry: {err:#}"
1057                            );
1058                            sleep(delay).await;
1059                            if let Some(next_delay) = backoff.next_backoff() {
1060                                delay = next_delay;
1061                            }
1062                        },
1063                    }
1064                }
1065            }
1066            .instrument(span),
1067        );
1068
1069        // Wait for the object to be fetched, either from the local database on retry or from
1070        // another provider eventually.
1071        passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1072    }
1073
1074    /// Try to get an object from local storage or initialize a fetch if it is missing.
1075    ///
1076    /// There are three possible scenarios in this function, indicated by the return type:
1077    /// * `Ok(Some(obj))`: the requested object was available locally and successfully retrieved
1078    ///   from the database; no fetch was spawned
1079    /// * `Ok(None)`: the requested object was not available locally, but a fetch was successfully
1080    ///   spawned if possible (in other words, if a fetch was not spawned, it was determined that
1081    ///   the requested object is not fetchable)
1082    /// * `Err(_)`: it could not be determined whether the object was available locally or whether
1083    ///   it could be fetched; no fetch was spawned even though the object may be fetchable
1084    async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1085    where
1086        T: Fetchable<Types>,
1087    {
1088        let mut tx = self.read().await.context("opening read transaction")?;
1089        match T::load(&mut tx, req).await {
1090            Ok(t) => Ok(Some(t)),
1091            Err(QueryError::Missing | QueryError::NotFound) => {
1092                // We successfully queried the database, but the object wasn't there. Try to
1093                // fetch it.
1094                tracing::debug!(?req, "object missing from local storage, will try to fetch");
1095                self.fetch::<T>(&mut tx, req).await?;
1096                Ok(None)
1097            },
1098            Err(err) => {
1099                // An error occurred while querying the database. We don't know if we need to fetch
1100                // the object or not. Return an error so we can try again.
1101                bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1102            },
1103        }
1104    }
1105
1106    /// Get a range of objects from local storage or a provider.
1107    ///
1108    /// Convert a finite stream of fallible local storage lookups into a (possibly infinite) stream
1109    /// of infallible fetches. Objects in `range` are loaded from local storage. Any gaps or missing
1110    /// objects are filled by fetching from a provider. Items in the resulting stream are futures
1111    /// that will never fail to produce a resource, although they may block indefinitely if the
1112    /// resource needs to be fetched.
1113    ///
1114    /// Objects are loaded and fetched in chunks, which strikes a good balance of limiting the total
1115    /// number of storage and network requests, while also keeping the amount of simultaneous
1116    /// resource consumption bounded.
1117    fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1118    where
1119        R: RangeBounds<usize> + Send + 'static,
1120        T: RangedFetchable<Types>,
1121    {
1122        let chunk_size = self.range_chunk_size;
1123        self.get_range_with_chunk_size(chunk_size, range)
1124    }
1125
1126    /// Same as [`Self::get_range`], but uses the given chunk size instead of the default.
1127    fn get_range_with_chunk_size<R, T>(
1128        self: Arc<Self>,
1129        chunk_size: usize,
1130        range: R,
1131    ) -> BoxStream<'static, Fetch<T>>
1132    where
1133        R: RangeBounds<usize> + Send + 'static,
1134        T: RangedFetchable<Types>,
1135    {
1136        let chunk_fetch_delay = self.chunk_fetch_delay;
1137        let active_fetch_delay = self.active_fetch_delay;
1138
1139        stream::iter(range_chunks(range, chunk_size))
1140            .then(move |chunk| {
1141                let self_clone = self.clone();
1142                async move {
1143                    {
1144                        let chunk = self_clone.get_chunk(chunk).await;
1145
1146                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1147                        // helps to limit constant high CPU usage when fetching long range of data,
1148                        // especially for older streams that fetch most of the data from local
1149                        // storage.
1150                        sleep(chunk_fetch_delay).await;
1151                        stream::iter(chunk)
1152                    }
1153                }
1154            })
1155            .flatten()
1156            .then(move |f| async move {
1157                match f {
1158                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1159                    // the catchup provider. The delay applies between pending fetches, not between
1160                    // chunks.
1161                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1162                    Fetch::Ready(_) => (),
1163                };
1164                f
1165            })
1166            .boxed()
1167    }
1168
1169    /// Same as [`Self::get_range`], but yields objects in reverse order by height.
1170    ///
1171    /// Note that unlike [`Self::get_range`], which accepts any range and yields an infinite stream
1172    /// if the range has no upper bound, this function requires there to be a defined upper bound,
1173    /// otherwise we don't know where the reversed stream should _start_. The `end` bound given here
1174    /// is inclusive; i.e. the first item yielded by the stream will have height `end`.
1175    fn get_range_rev<T>(
1176        self: Arc<Self>,
1177        start: Bound<usize>,
1178        end: usize,
1179    ) -> BoxStream<'static, Fetch<T>>
1180    where
1181        T: RangedFetchable<Types>,
1182    {
1183        let chunk_size = self.range_chunk_size;
1184        self.get_range_with_chunk_size_rev(chunk_size, start, end)
1185    }
1186
1187    /// Same as [`Self::get_range_rev`], but uses the given chunk size instead of the default.
1188    fn get_range_with_chunk_size_rev<T>(
1189        self: Arc<Self>,
1190        chunk_size: usize,
1191        start: Bound<usize>,
1192        end: usize,
1193    ) -> BoxStream<'static, Fetch<T>>
1194    where
1195        T: RangedFetchable<Types>,
1196    {
1197        let chunk_fetch_delay = self.chunk_fetch_delay;
1198        let active_fetch_delay = self.active_fetch_delay;
1199
1200        stream::iter(range_chunks_rev(start, end, chunk_size))
1201            .then(move |chunk| {
1202                let self_clone = self.clone();
1203                async move {
1204                    {
1205                        let chunk = self_clone.get_chunk(chunk).await;
1206
1207                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1208                        // helps to limit constant high CPU usage when fetching long range of data,
1209                        // especially for older streams that fetch most of the data from local
1210                        // storage
1211                        sleep(chunk_fetch_delay).await;
1212                        stream::iter(chunk.into_iter().rev())
1213                    }
1214                }
1215            })
1216            .flatten()
1217            .then(move |f| async move {
1218                match f {
1219                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1220                    // the catchup provider. The delay applies between pending fetches, not between
1221                    // chunks.
1222                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1223                    Fetch::Ready(_) => (),
1224                };
1225                f
1226            })
1227            .boxed()
1228    }
1229
1230    /// Get a range of objects from local storage or a provider.
1231    ///
1232    /// This method is similar to `get_range`, except that:
1233    /// * It fetches all desired objects together, as a single chunk
1234    /// * It loads the object or triggers fetches right now rather than providing a lazy stream
1235    ///   which only fetches objects when polled.
1236    async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1237    where
1238        T: RangedFetchable<Types>,
1239    {
1240        // Subscribe to notifications first. As in [`get`](Self::get), this ensures we won't miss
1241        // any notifications sent in between checking local storage and triggering a fetch if
1242        // necessary.
1243        let passive_fetches = join_all(
1244            chunk
1245                .clone()
1246                .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1247        )
1248        .await;
1249
1250        match self.try_get_chunk(&chunk).await {
1251            Ok(objs) => {
1252                // Convert to fetches. Objects which are not immediately available (`None` in the
1253                // chunk) become passive fetches awaiting a notification of availability.
1254                return objs
1255                    .into_iter()
1256                    .zip(passive_fetches)
1257                    .enumerate()
1258                    .map(move |(i, (obj, passive_fetch))| match obj {
1259                        Some(obj) => Fetch::Ready(obj),
1260                        None => passive(T::Request::from(chunk.start + i), passive_fetch),
1261                    })
1262                    .collect();
1263            },
1264            Err(err) => {
1265                tracing::warn!(
1266                    ?chunk,
1267                    "unable to fetch chunk; spawning a task to retry: {err:#}"
1268                );
1269            },
1270        }
1271
1272        // We'll use these channels to get the objects back that we successfully load on retry.
1273        let (send, recv): (Vec<_>, Vec<_>) =
1274            repeat_with(oneshot::channel).take(chunk.len()).unzip();
1275
1276        {
1277            let fetcher = self.clone();
1278            let mut backoff = fetcher.backoff.clone();
1279            let chunk = chunk.clone();
1280            let span = tracing::warn_span!("get_chunk retry", ?chunk);
1281            spawn(
1282                async move {
1283                    let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1284                    loop {
1285                        let res = {
1286                            // Limit the number of simultaneous retry tasks hitting the database.
1287                            // When the database is down, we might have a lot of these tasks
1288                            // running, and if they all hit the DB at once, they are only going to
1289                            // make things worse.
1290                            let _guard = fetcher.retry_semaphore.acquire().await;
1291                            fetcher.try_get_chunk(&chunk).await
1292                        };
1293                        match res {
1294                            Ok(objs) => {
1295                                for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1296                                    if let Some(obj) = obj {
1297                                        // If the object was immediately available after all, signal
1298                                        // the original fetch. We probably just temporarily couldn't
1299                                        // access it due to database errors.
1300                                        tracing::info!(?chunk, i, "object was ready after retries");
1301                                        sender.send(obj).ok();
1302                                    } else {
1303                                        // The object was not immediately available after all, but
1304                                        // we have successfully spawned a fetch for it if possible.
1305                                        // The spawned fetch will notify the original request once
1306                                        // it completes.
1307                                        tracing::info!(?chunk, i, "spawned fetch after retries");
1308                                    }
1309                                }
1310                                break;
1311                            },
1312                            Err(err) => {
1313                                tracing::warn!(
1314                                    ?chunk,
1315                                    ?delay,
1316                                    "unable to fetch chunk, will retry: {err:#}"
1317                                );
1318                                sleep(delay).await;
1319                                if let Some(next_delay) = backoff.next_backoff() {
1320                                    delay = next_delay;
1321                                }
1322                            },
1323                        }
1324                    }
1325                }
1326                .instrument(span),
1327            );
1328        }
1329
1330        // Wait for the objects to be fetched, either from the local database on retry or from
1331        // another provider eventually.
1332        passive_fetches
1333            .into_iter()
1334            .zip(recv)
1335            .enumerate()
1336            .map(move |(i, (passive_fetch, recv))| {
1337                passive(
1338                    T::Request::from(chunk.start + i),
1339                    select_some(passive_fetch, recv.map(Result::ok)),
1340                )
1341            })
1342            .collect()
1343    }
1344
1345    /// Try to get a range of objects from local storage, initializing fetches if any are missing.
1346    ///
1347    /// If this function succeeded, then for each object in the requested range, either:
1348    /// * the object was available locally, and corresponds to `Some(_)` object in the result
1349    /// * the object was not available locally (and corresponds to `None` in the result), but a
1350    ///   fetch was successfully spawned if possible (in other words, if a fetch was not spawned, it
1351    ///   was determined that the requested object is not fetchable)
1352    ///
1353    /// This function will fail if it could not be determined which objects in the requested range
1354    /// are available locally, or if, for any missing object, it could not be determined whether
1355    /// that object is fetchable. In this case, there may be no fetch spawned for certain objects in
1356    /// the requested range, even if those objects are actually fetchable.
1357    async fn try_get_chunk<T>(
1358        self: &Arc<Self>,
1359        chunk: &Range<usize>,
1360    ) -> anyhow::Result<Vec<Option<T>>>
1361    where
1362        T: RangedFetchable<Types>,
1363    {
1364        let mut tx = self.read().await.context("opening read transaction")?;
1365        let ts = T::load_range(&mut tx, chunk.clone())
1366            .await
1367            .context(format!("when fetching items in range {chunk:?}"))?;
1368
1369        // Log and discard error information; we want a list of Option where None indicates an
1370        // object that needs to be fetched. Note that we don't use `FetchRequest::might_exist` to
1371        // silence the logs here when an object is missing that is not expected to exist at all.
1372        // When objects are not expected to exist, `load_range` should just return a truncated list
1373        // rather than returning `Err` objects, so if there are errors in here they are unexpected
1374        // and we do want to log them.
1375        let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1376
1377        // Kick off a fetch for each missing object.
1378        let mut results = Vec::with_capacity(chunk.len());
1379        for t in ts {
1380            // Fetch missing objects that should come before `t`.
1381            while chunk.start + results.len() < t.height() as usize {
1382                tracing::debug!(
1383                    "item {} in chunk not available, will be fetched",
1384                    results.len()
1385                );
1386                self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1387                    .await?;
1388                results.push(None);
1389            }
1390
1391            results.push(Some(t));
1392        }
1393        // Fetch missing objects from the end of the range.
1394        while results.len() < chunk.len() {
1395            self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1396                .await?;
1397            results.push(None);
1398        }
1399
1400        Ok(results)
1401    }
1402
1403    /// Spawn an active fetch for the requested object, if possible.
1404    ///
1405    /// On success, either an active fetch for `req` has been spawned, or it has been determined
1406    /// that `req` is not fetchable. Fails if it cannot be determined (e.g. due to errors in the
1407    /// local database) whether `req` is fetchable or not.
1408    async fn fetch<T>(
1409        self: &Arc<Self>,
1410        tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1411        req: T::Request,
1412    ) -> anyhow::Result<()>
1413    where
1414        T: Fetchable<Types>,
1415    {
1416        tracing::debug!("fetching resource {req:?}");
1417
1418        // Trigger an active fetch from a remote provider if possible.
1419        let heights = Heights::load(tx)
1420            .await
1421            .context("failed to load heights; cannot definitively say object might exist")?;
1422        if req.might_exist(heights) {
1423            T::active_fetch(tx, self.clone(), req).await?;
1424        } else {
1425            tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1426        }
1427        Ok(())
1428    }
1429
1430    /// Proactively search for and retrieve missing objects.
1431    ///
1432    /// This function will proactively identify and retrieve blocks and leaves which are missing
1433    /// from storage. It will run until cancelled, thus, it is meant to be spawned as a background
1434    /// task rather than called synchronously.
1435    async fn proactive_scan(
1436        self: Arc<Self>,
1437        minor_interval: Duration,
1438        major_interval: usize,
1439        major_offset: usize,
1440        chunk_size: usize,
1441        metrics: ScannerMetrics,
1442    ) {
1443        let mut prev_height = 0;
1444
1445        for i in 0.. {
1446            let major = i % major_interval == major_offset % major_interval;
1447            let span = tracing::warn_span!("proactive scan", i, major, prev_height);
1448            metrics.running.set(1);
1449            metrics.current_scan.set(i);
1450            metrics.current_is_major.set(major as usize);
1451            async {
1452                let mut backoff = minor_interval;
1453                let max_backoff = Duration::from_secs(60);
1454                metrics.backoff.set(backoff.as_secs() as usize);
1455
1456                // We can't start the scan until we know the current block height and pruned height,
1457                // so we know which blocks to scan. Thus we retry until this succeeds.
1458                let heights = loop {
1459                    let mut tx = match self.read().await {
1460                        Ok(tx) => tx,
1461                        Err(err) => {
1462                            tracing::error!(
1463                                ?backoff,
1464                                "unable to start transaction for scan: {err:#}"
1465                            );
1466                            metrics.retries.update(1);
1467                            sleep(backoff).await;
1468                            backoff = min(2 * backoff, max_backoff);
1469                            metrics.backoff.set(backoff.as_secs() as usize);
1470                            continue;
1471                        },
1472                    };
1473                    let heights = match Heights::load(&mut tx).await {
1474                        Ok(heights) => heights,
1475                        Err(err) => {
1476                            tracing::error!(?backoff, "unable to load heights: {err:#}");
1477                            metrics.retries.update(1);
1478                            sleep(backoff).await;
1479                            backoff = min(2 * backoff, max_backoff);
1480                            metrics.backoff.set(backoff.as_secs() as usize);
1481                            continue;
1482                        },
1483                    };
1484                    metrics.retries.set(0);
1485                    break heights;
1486                };
1487
1488                // Get the pruned height or default to 0 if it is not set. We will not look for
1489                // blocks older than the pruned height.
1490                let minimum_block_height = heights.pruned_height.unwrap_or(0) as usize;
1491                // Get the block height; we will look for any missing blocks up to `block_height`.
1492                let block_height = heights.height as usize;
1493
1494                // In a major scan, we fetch all blocks between 0 and `block_height`. In minor scans
1495                // (much more frequent) we fetch blocks that are missing since the last scan.
1496                let start = if major {
1497                    // We log major scans at WARN level, since they happen infrequently and can have
1498                    // a measurable impact on performance while running. This also serves as a
1499                    // useful progress heartbeat.
1500                    tracing::warn!(
1501                        start = minimum_block_height,
1502                        block_height,
1503                        "starting major scan"
1504                    );
1505
1506                    // If we're starting a major scan, reset the major scan counts of missing data
1507                    // as we're about to recompute them.
1508                    metrics.major_missing_blocks.set(0);
1509                    metrics.major_missing_vid.set(0);
1510
1511                    minimum_block_height
1512                } else {
1513                    tracing::info!(start = prev_height, block_height, "starting minor scan");
1514                    prev_height
1515                };
1516                prev_height = block_height;
1517                metrics.current_start.set(start);
1518                metrics.current_end.set(block_height);
1519                metrics.scanned_blocks.set(0);
1520                metrics.scanned_vid.set(0);
1521
1522                // Iterate over all blocks that we should have. Fetching the payload metadata is
1523                // enough to trigger an active fetch of the corresponding leaf and the full block if
1524                // they are missing, without loading the full payload from storage if we already
1525                // have it.
1526                //
1527                // We iterate in reverse order for two reasons:
1528                // 1. More recent blocks are more likely to be requested sooner.
1529                // 2. Leaves are inherently fetched in reverse order, because we cannot (actively)
1530                //    fetch a leaf until we have the subsequent leaf, which tells us what the hash
1531                //    of its parent should be. Thus, if we iterated forwards, we could get stuck any
1532                //    time we came upon two consecutive missing leaves.
1533                let mut blocks = self
1534                    .clone()
1535                    .get_range_with_chunk_size_rev::<PayloadMetadata<Types>>(
1536                        chunk_size,
1537                        Bound::Included(start),
1538                        block_height.saturating_sub(1),
1539                    );
1540                let mut missing_blocks = 0;
1541                while let Some(fetch) = blocks.next().await {
1542                    if fetch.is_pending() {
1543                        missing_blocks += 1;
1544                        // Wait for the block to be fetched. This slows down the scanner so that we
1545                        // don't waste memory generating more active fetch tasks then we can handle
1546                        // at a given time. Note that even with this await, all blocks within a
1547                        // chunk are fetched in parallel, so this does not block the next block in
1548                        // the chunk, only the next chunk until the current chunk completes.
1549                        fetch.await;
1550                    }
1551                    metrics.scanned_blocks.update(1);
1552                }
1553                metrics.add_missing_blocks(major, missing_blocks);
1554
1555                // We have to trigger a separate fetch of the VID data, since this is fetched
1556                // independently of the block payload.
1557                let mut vid = self
1558                    .clone()
1559                    .get_range_with_chunk_size_rev::<VidCommonMetadata<Types>>(
1560                        chunk_size,
1561                        Bound::Included(start),
1562                        block_height.saturating_sub(1),
1563                    );
1564                let mut missing_vid = 0;
1565                while let Some(fetch) = vid.next().await {
1566                    if fetch.is_pending() {
1567                        missing_vid += 1;
1568                        // As above, limit the speed at which we spawn new fetches to the speed at
1569                        // which we can process them.
1570                        fetch.await;
1571                    }
1572                    metrics.scanned_vid.update(1);
1573                }
1574                metrics.add_missing_vid(major, missing_vid);
1575
1576                tracing::info!("completed proactive scan, will scan again in {minor_interval:?}");
1577
1578                // Reset metrics.
1579                metrics.running.set(0);
1580                if major {
1581                    // If we just completed a major scan, reset the incremental counts of missing
1582                    // data from minor scans, so the next round of minor scans can recompute/update
1583                    // them.
1584                    metrics.minor_missing_blocks.set(0);
1585                    metrics.minor_missing_vid.set(0);
1586                }
1587
1588                sleep(minor_interval).await;
1589            }
1590            .instrument(span)
1591            .await;
1592        }
1593    }
1594}
1595
1596impl<Types, S, P> Fetcher<Types, S, P>
1597where
1598    Types: NodeType,
1599    Header<Types>: QueryableHeader<Types>,
1600    Payload<Types>: QueryablePayload<Types>,
1601    S: VersionedDataSource + 'static,
1602    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1603    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1604        + NodeStorage<Types>
1605        + PrunedHeightStorage
1606        + AggregatesStorage<Types>,
1607    P: AvailabilityProvider<Types>,
1608{
1609    #[tracing::instrument(skip_all)]
1610    async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1611        loop {
1612            let prev_aggregate = loop {
1613                let mut tx = match self.read().await {
1614                    Ok(tx) => tx,
1615                    Err(err) => {
1616                        tracing::error!("unable to open read tx: {err:#}");
1617                        sleep(Duration::from_secs(5)).await;
1618                        continue;
1619                    },
1620                };
1621                match tx.load_prev_aggregate().await {
1622                    Ok(agg) => break agg,
1623                    Err(err) => {
1624                        tracing::error!("unable to load previous aggregate: {err:#}");
1625                        sleep(Duration::from_secs(5)).await;
1626                        continue;
1627                    },
1628                }
1629            };
1630
1631            let (start, mut prev_aggregate) = match prev_aggregate {
1632                Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1633                None => (0, Aggregate::default()),
1634            };
1635
1636            tracing::info!(start, "starting aggregator");
1637            metrics.height.set(start);
1638
1639            let mut blocks = self
1640                .clone()
1641                .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1642                .then(Fetch::resolve)
1643                .ready_chunks(chunk_size)
1644                .boxed();
1645            while let Some(chunk) = blocks.next().await {
1646                let Some(last) = chunk.last() else {
1647                    // This is not supposed to happen, but if the chunk is empty, just skip it.
1648                    tracing::warn!("ready_chunks returned an empty chunk");
1649                    continue;
1650                };
1651                let height = last.height();
1652                let num_blocks = chunk.len();
1653                tracing::debug!(
1654                    num_blocks,
1655                    height,
1656                    "updating aggregate statistics for chunk"
1657                );
1658                loop {
1659                    let res = async {
1660                        let mut tx = self.write().await.context("opening transaction")?;
1661                        let aggregate =
1662                            tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1663                        tx.commit().await.context("committing transaction")?;
1664                        prev_aggregate = aggregate;
1665                        anyhow::Result::<_>::Ok(())
1666                    }
1667                    .await;
1668                    match res {
1669                        Ok(()) => {
1670                            break;
1671                        },
1672                        Err(err) => {
1673                            tracing::warn!(
1674                                num_blocks,
1675                                height,
1676                                "failed to update aggregates for chunk: {err:#}"
1677                            );
1678                            sleep(Duration::from_secs(1)).await;
1679                        },
1680                    }
1681                }
1682                metrics.height.set(height as usize);
1683            }
1684            tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1685        }
1686    }
1687}
1688
1689impl<Types, S, P> Fetcher<Types, S, P>
1690where
1691    Types: NodeType,
1692    S: VersionedDataSource,
1693    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1694{
1695    /// Store an object and notify anyone waiting on this object that it is available.
1696    async fn store_and_notify<T>(&self, obj: T)
1697    where
1698        T: Storable<Types>,
1699    {
1700        let try_store = || async {
1701            let mut tx = self.storage.write().await?;
1702            obj.clone().store(&mut tx, self.leaf_only).await?;
1703            tx.commit().await
1704        };
1705
1706        // Store the object in local storage, so we can avoid fetching it in the future.
1707        let mut backoff = self.backoff.clone();
1708        backoff.reset();
1709        loop {
1710            let Err(err) = try_store().await else {
1711                break;
1712            };
1713            // It is unfortunate if this fails, but we can still proceed by notifying with the
1714            // object that we fetched, keeping it in memory. Log the error, retry a few times, and
1715            // eventually move on.
1716            tracing::warn!(
1717                "failed to store fetched {} {}: {err:#}",
1718                T::name(),
1719                obj.height()
1720            );
1721
1722            let Some(delay) = backoff.next_backoff() else {
1723                break;
1724            };
1725            tracing::info!(?delay, "retrying failed operation");
1726            sleep(delay).await;
1727        }
1728
1729        // Send a notification about the newly received object. It is important that we do this
1730        // _after_ our attempt to store the object in local storage, otherwise there is a potential
1731        // missed notification deadlock:
1732        // * we send the notification
1733        // * a task calls [`get`](Self::get) or [`get_chunk`](Self::get_chunk), finds that the
1734        //   requested object is not in storage, and begins waiting for a notification
1735        // * we store the object. This ensures that no other task will be triggered to fetch it,
1736        //   which means no one will ever notify the waiting task.
1737        //
1738        // Note that we send the notification regardless of whether the store actually succeeded or
1739        // not. This is to avoid _another_ subtle deadlock: if we failed to notify just because we
1740        // failed to store, some fetches might not resolve, even though the object in question has
1741        // actually been fetched. This should actually be ok, because as long as the object is not
1742        // in storage, eventually some other task will come along and fetch, store, and notify about
1743        // it. However, this is certainly not ideal, since we could resolve those pending fetches
1744        // right now, and it causes bigger problems when the fetch that fails to resolve is the
1745        // proactive scanner task, who is often the one that would eventually come along and
1746        // re-fetch the object.
1747        //
1748        // The key thing to note is that it does no harm to notify even if we fail to store: at best
1749        // we wake some tasks up sooner; at worst, anyone who misses the notification still
1750        // satisfies the invariant that we only wait on notifications for objects which are not in
1751        // storage, and eventually some other task will come along, find the object missing from
1752        // storage, and re-fetch it.
1753        obj.notify(&self.notifiers).await;
1754    }
1755}
1756
1757#[derive(Debug)]
1758struct Notifiers<Types>
1759where
1760    Types: NodeType,
1761{
1762    block: Notifier<BlockQueryData<Types>>,
1763    leaf: Notifier<LeafQueryData<Types>>,
1764    vid_common: Notifier<VidCommonQueryData<Types>>,
1765    state_cert: Notifier<StateCertQueryDataV2<Types>>,
1766}
1767
1768impl<Types> Default for Notifiers<Types>
1769where
1770    Types: NodeType,
1771{
1772    fn default() -> Self {
1773        Self {
1774            block: Notifier::new(),
1775            leaf: Notifier::new(),
1776            vid_common: Notifier::new(),
1777            state_cert: Notifier::new(),
1778        }
1779    }
1780}
1781
1782#[derive(Clone, Copy, Debug)]
1783struct Heights {
1784    height: u64,
1785    pruned_height: Option<u64>,
1786}
1787
1788impl Heights {
1789    async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1790    where
1791        Types: NodeType,
1792        Header<Types>: QueryableHeader<Types>,
1793        T: NodeStorage<Types> + PrunedHeightStorage + Send,
1794    {
1795        let height = tx.block_height().await.context("loading block height")? as u64;
1796        let pruned_height = tx
1797            .load_pruned_height()
1798            .await
1799            .context("loading pruned height")?;
1800        Ok(Self {
1801            height,
1802            pruned_height,
1803        })
1804    }
1805
1806    fn might_exist(self, h: u64) -> bool {
1807        h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1808    }
1809}
1810
1811#[async_trait]
1812impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1813    for FetchingDataSource<Types, S, P>
1814where
1815    Types: NodeType,
1816    S: VersionedDataSource + 'static,
1817    for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1818    P: Send + Sync,
1819    State: MerklizedState<Types, ARITY> + 'static,
1820    <State as MerkleTreeScheme>::Commitment: Send,
1821{
1822    async fn get_path(
1823        &self,
1824        snapshot: Snapshot<Types, State, ARITY>,
1825        key: State::Key,
1826    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1827        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1828            message: err.to_string(),
1829        })?;
1830        tx.get_path(snapshot, key).await
1831    }
1832}
1833
1834#[async_trait]
1835impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1836where
1837    Types: NodeType,
1838    Header<Types>: QueryableHeader<Types>,
1839    Payload<Types>: QueryablePayload<Types>,
1840    S: VersionedDataSource + 'static,
1841    for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1842    P: Send + Sync,
1843{
1844    async fn get_last_state_height(&self) -> QueryResult<usize> {
1845        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1846            message: err.to_string(),
1847        })?;
1848        tx.get_last_state_height().await
1849    }
1850}
1851
1852#[async_trait]
1853impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1854where
1855    Types: NodeType,
1856    Header<Types>: QueryableHeader<Types>,
1857    S: VersionedDataSource + 'static,
1858    for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
1859    P: Send + Sync,
1860{
1861    async fn block_height(&self) -> QueryResult<usize> {
1862        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1863            message: err.to_string(),
1864        })?;
1865        tx.block_height().await
1866    }
1867
1868    async fn count_transactions_in_range(
1869        &self,
1870        range: impl RangeBounds<usize> + Send,
1871        namespace: Option<NamespaceId<Types>>,
1872    ) -> QueryResult<usize> {
1873        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1874            message: err.to_string(),
1875        })?;
1876        tx.count_transactions_in_range(range, namespace).await
1877    }
1878
1879    async fn payload_size_in_range(
1880        &self,
1881        range: impl RangeBounds<usize> + Send,
1882        namespace: Option<NamespaceId<Types>>,
1883    ) -> QueryResult<usize> {
1884        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1885            message: err.to_string(),
1886        })?;
1887        tx.payload_size_in_range(range, namespace).await
1888    }
1889
1890    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1891    where
1892        ID: Into<BlockId<Types>> + Send + Sync,
1893    {
1894        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1895            message: err.to_string(),
1896        })?;
1897        tx.vid_share(id).await
1898    }
1899
1900    async fn sync_status(&self) -> QueryResult<SyncStatus> {
1901        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1902            message: err.to_string(),
1903        })?;
1904        tx.sync_status().await
1905    }
1906
1907    async fn get_header_window(
1908        &self,
1909        start: impl Into<WindowStart<Types>> + Send + Sync,
1910        end: u64,
1911        limit: usize,
1912    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
1913        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1914            message: err.to_string(),
1915        })?;
1916        tx.get_header_window(start, end, limit).await
1917    }
1918}
1919
1920#[async_trait]
1921impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
1922where
1923    Types: NodeType,
1924    Payload<Types>: QueryablePayload<Types>,
1925    Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
1926    crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
1927    S: VersionedDataSource + 'static,
1928    for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
1929    P: Send + Sync,
1930{
1931    async fn get_block_summaries(
1932        &self,
1933        request: explorer::query_data::GetBlockSummariesRequest<Types>,
1934    ) -> Result<
1935        Vec<explorer::query_data::BlockSummary<Types>>,
1936        explorer::query_data::GetBlockSummariesError,
1937    > {
1938        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1939            message: err.to_string(),
1940        })?;
1941        tx.get_block_summaries(request).await
1942    }
1943
1944    async fn get_block_detail(
1945        &self,
1946        request: explorer::query_data::BlockIdentifier<Types>,
1947    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
1948    {
1949        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1950            message: err.to_string(),
1951        })?;
1952        tx.get_block_detail(request).await
1953    }
1954
1955    async fn get_transaction_summaries(
1956        &self,
1957        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
1958    ) -> Result<
1959        Vec<explorer::query_data::TransactionSummary<Types>>,
1960        explorer::query_data::GetTransactionSummariesError,
1961    > {
1962        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1963            message: err.to_string(),
1964        })?;
1965        tx.get_transaction_summaries(request).await
1966    }
1967
1968    async fn get_transaction_detail(
1969        &self,
1970        request: explorer::query_data::TransactionIdentifier<Types>,
1971    ) -> Result<
1972        explorer::query_data::TransactionDetailResponse<Types>,
1973        explorer::query_data::GetTransactionDetailError,
1974    > {
1975        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1976            message: err.to_string(),
1977        })?;
1978        tx.get_transaction_detail(request).await
1979    }
1980
1981    async fn get_explorer_summary(
1982        &self,
1983    ) -> Result<
1984        explorer::query_data::ExplorerSummary<Types>,
1985        explorer::query_data::GetExplorerSummaryError,
1986    > {
1987        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1988            message: err.to_string(),
1989        })?;
1990        tx.get_explorer_summary().await
1991    }
1992
1993    async fn get_search_results(
1994        &self,
1995        query: TaggedBase64,
1996    ) -> Result<
1997        explorer::query_data::SearchResult<Types>,
1998        explorer::query_data::GetSearchResultsError,
1999    > {
2000        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2001            message: err.to_string(),
2002        })?;
2003        tx.get_search_results(query).await
2004    }
2005}
2006
2007/// A provider which can be used as a fetcher by the availability service.
2008pub trait AvailabilityProvider<Types: NodeType>:
2009    Provider<Types, request::LeafRequest<Types>>
2010    + Provider<Types, request::PayloadRequest>
2011    + Provider<Types, request::VidCommonRequest>
2012    + Sync
2013    + 'static
2014{
2015}
2016impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2017    P: Provider<Types, request::LeafRequest<Types>>
2018        + Provider<Types, request::PayloadRequest>
2019        + Provider<Types, request::VidCommonRequest>
2020        + Sync
2021        + 'static
2022{
2023}
2024
2025trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2026    /// Indicate whether it is possible this object could exist.
2027    ///
2028    /// This can filter out requests quickly for objects that cannot possibly exist, such as
2029    /// requests for objects with a height greater than the current block height. Not only does this
2030    /// let us fail faster for such requests (without touching storage at all), it also helps keep
2031    /// logging quieter when we fail to fetch an object because the user made a bad request, while
2032    /// still being fairly loud when we fail to fetch an object that might have really existed.
2033    ///
2034    /// This method is conservative: it returns `true` if it cannot tell whether the given object
2035    /// could exist or not.
2036    fn might_exist(self, _heights: Heights) -> bool {
2037        true
2038    }
2039}
2040
2041/// Objects which can be fetched from a remote DA provider and cached in local storage.
2042///
2043/// This trait lets us abstract over leaves, blocks, and other types that can be fetched. Thus, the
2044/// logistics of fetching are shared between all objects, and only the low-level particulars are
2045/// type-specific.
2046#[async_trait]
2047trait Fetchable<Types>: Clone + Send + Sync + 'static
2048where
2049    Types: NodeType,
2050    Header<Types>: QueryableHeader<Types>,
2051    Payload<Types>: QueryablePayload<Types>,
2052{
2053    /// A succinct specification of the object to be fetched.
2054    type Request: FetchRequest;
2055
2056    /// Does this object satisfy the given request?
2057    fn satisfies(&self, req: Self::Request) -> bool;
2058
2059    /// Spawn a task to fetch the object from a remote provider, if possible.
2060    ///
2061    /// An active fetch will only be triggered if:
2062    /// * There is not already an active fetch in progress for the same object
2063    /// * The requested object is known to exist. For example, we will fetch a leaf by height but
2064    ///   not by hash, since we can't guarantee that a leaf with an arbitrary hash exists. Note that
2065    ///   this function assumes `req.might_exist()` has already been checked before calling it, and
2066    ///   so may do unnecessary work if the caller does not ensure this.
2067    ///
2068    /// If we do trigger an active fetch for an object, any passive listeners for the object will be
2069    /// notified once it has been retrieved. If we do not trigger an active fetch for an object,
2070    /// this function does nothing. In either case, as long as the requested object does in fact
2071    /// exist, we will eventually receive it passively, since we will eventually receive all blocks
2072    /// and leaves that are ever produced. Active fetching merely helps us receive certain objects
2073    /// sooner.
2074    ///
2075    /// This function fails if it _might_ be possible to actively fetch the requested object, but we
2076    /// were unable to do so (e.g. due to errors in the database).
2077    async fn active_fetch<S, P>(
2078        tx: &mut impl AvailabilityStorage<Types>,
2079        fetcher: Arc<Fetcher<Types, S, P>>,
2080        req: Self::Request,
2081    ) -> anyhow::Result<()>
2082    where
2083        S: VersionedDataSource + 'static,
2084        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2085        for<'a> S::ReadOnly<'a>:
2086            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2087        P: AvailabilityProvider<Types>;
2088
2089    /// Wait for someone else to fetch the object.
2090    async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2091
2092    /// Load an object from local storage.
2093    ///
2094    /// This function assumes `req.might_exist()` has already been checked before calling it, and so
2095    /// may do unnecessary work if the caller does not ensure this.
2096    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2097    where
2098        S: AvailabilityStorage<Types>;
2099}
2100
2101type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2102
2103#[async_trait]
2104trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2105where
2106    Types: NodeType,
2107    Header<Types>: QueryableHeader<Types>,
2108    Payload<Types>: QueryablePayload<Types>,
2109{
2110    type RangedRequest: FetchRequest + From<usize> + Send;
2111
2112    /// Load a range of these objects from local storage.
2113    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2114    where
2115        S: AvailabilityStorage<Types>,
2116        R: RangeBounds<usize> + Send + 'static;
2117}
2118
2119/// An object which can be stored in the database.
2120trait Storable<Types: NodeType>: HeightIndexed + Clone {
2121    /// The name of this type of object, for debugging purposes.
2122    fn name() -> &'static str;
2123
2124    /// Notify anyone waiting for this object that it has become available.
2125    fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2126
2127    /// Store the object in the local database.
2128    fn store(
2129        self,
2130        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2131        leaf_only: bool,
2132    ) -> impl Send + Future<Output = anyhow::Result<()>>;
2133}
2134
2135impl<Types: NodeType> Storable<Types> for BlockInfo<Types> {
2136    fn name() -> &'static str {
2137        "block info"
2138    }
2139
2140    async fn notify(&self, notifiers: &Notifiers<Types>) {
2141        self.leaf.notify(notifiers).await;
2142
2143        if let Some(block) = &self.block {
2144            block.notify(notifiers).await;
2145        }
2146        if let Some(vid) = &self.vid_common {
2147            vid.notify(notifiers).await;
2148        }
2149    }
2150
2151    async fn store(
2152        self,
2153        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2154        leaf_only: bool,
2155    ) -> anyhow::Result<()> {
2156        self.leaf.store(storage, leaf_only).await?;
2157
2158        if let Some(common) = self.vid_common {
2159            (common, self.vid_share).store(storage, leaf_only).await?;
2160        }
2161
2162        if let Some(block) = self.block {
2163            block.store(storage, leaf_only).await?;
2164        }
2165
2166        if let Some(state_cert) = self.state_cert {
2167            state_cert.store(storage, leaf_only).await?;
2168        }
2169
2170        Ok(())
2171    }
2172}
2173
2174/// Break a range into fixed-size chunks.
2175fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2176where
2177    R: RangeBounds<usize>,
2178{
2179    // Transform range to explicit start (inclusive) and end (exclusive) bounds.
2180    let mut start = match range.start_bound() {
2181        Bound::Included(i) => *i,
2182        Bound::Excluded(i) => *i + 1,
2183        Bound::Unbounded => 0,
2184    };
2185    let end = match range.end_bound() {
2186        Bound::Included(i) => *i + 1,
2187        Bound::Excluded(i) => *i,
2188        Bound::Unbounded => usize::MAX,
2189    };
2190    std::iter::from_fn(move || {
2191        let chunk_end = min(start + chunk_size, end);
2192        if chunk_end == start {
2193            return None;
2194        }
2195
2196        let chunk = start..chunk_end;
2197        start = chunk_end;
2198        Some(chunk)
2199    })
2200}
2201
2202/// Break a range into fixed-size chunks, starting from the end and moving towards the start.
2203///
2204/// While the chunks are yielded in reverse order, from `end` to `start`, each individual chunk is
2205/// in the usual ascending order. That is, the first chunk ends with `end` and the last chunk starts
2206/// with `start`.
2207///
2208/// Note that unlike [`range_chunks_rev`], which accepts any range and yields an infinite iterator
2209/// if the range has no upper bound, this function requires there to be a defined upper bound,
2210/// otherwise we don't know where the reversed iterator should _start_. The `end` bound given here
2211/// is inclusive; i.e. the end of the first chunk yielded by the stream will be exactly `end`.
2212fn range_chunks_rev(
2213    start: Bound<usize>,
2214    end: usize,
2215    chunk_size: usize,
2216) -> impl Iterator<Item = Range<usize>> {
2217    // Transform the start bound to be inclusive.
2218    let start = match start {
2219        Bound::Included(i) => i,
2220        Bound::Excluded(i) => i + 1,
2221        Bound::Unbounded => 0,
2222    };
2223    // Transform the end bound to be exclusive.
2224    let mut end = end + 1;
2225
2226    std::iter::from_fn(move || {
2227        let chunk_start = max(start, end.saturating_sub(chunk_size));
2228        if end <= chunk_start {
2229            return None;
2230        }
2231
2232        let chunk = chunk_start..end;
2233        end = chunk_start;
2234        Some(chunk)
2235    })
2236}
2237
2238trait ResultExt<T, E> {
2239    fn ok_or_trace(self) -> Option<T>
2240    where
2241        E: Display;
2242}
2243
2244impl<T, E> ResultExt<T, E> for Result<T, E> {
2245    fn ok_or_trace(self) -> Option<T>
2246    where
2247        E: Display,
2248    {
2249        match self {
2250            Ok(t) => Some(t),
2251            Err(err) => {
2252                tracing::info!(
2253                    "error loading resource from local storage, will try to fetch: {err:#}"
2254                );
2255                None
2256            },
2257        }
2258    }
2259}
2260
2261#[derive(Debug)]
2262struct ScannerMetrics {
2263    /// Whether a scan is currently running (1) or not (0).
2264    running: Box<dyn Gauge>,
2265    /// The current number that is running.
2266    current_scan: Box<dyn Gauge>,
2267    /// Whether the currently running scan is a major scan (1) or not (0).
2268    current_is_major: Box<dyn Gauge>,
2269    /// Current backoff delay for retries (s).
2270    backoff: Box<dyn Gauge>,
2271    /// Number of retries since last success.
2272    retries: Box<dyn Gauge>,
2273    /// Block height where the current scan started.
2274    current_start: Box<dyn Gauge>,
2275    /// Block height where the current scan will end.
2276    current_end: Box<dyn Gauge>,
2277    /// Number of blocks processed in the current scan.
2278    scanned_blocks: Box<dyn Gauge>,
2279    /// Number of VID entries processed in the current scan.
2280    scanned_vid: Box<dyn Gauge>,
2281    /// The number of missing blocks encountered in the last major scan.
2282    major_missing_blocks: Box<dyn Gauge>,
2283    /// The number of missing VID entries encountered in the last major scan.
2284    major_missing_vid: Box<dyn Gauge>,
2285    /// The number of missing blocks encountered _cumulatively_ in minor scans since the last major
2286    /// scan.
2287    minor_missing_blocks: Box<dyn Gauge>,
2288    /// The number of missing VID entries encountered _cumulatively_ in minor scans since the last
2289    /// major scan.
2290    minor_missing_vid: Box<dyn Gauge>,
2291}
2292
2293impl ScannerMetrics {
2294    fn new(metrics: &PrometheusMetrics) -> Self {
2295        let group = metrics.subgroup("scanner".into());
2296        Self {
2297            running: group.create_gauge("running".into(), None),
2298            current_scan: group.create_gauge("current".into(), None),
2299            current_is_major: group.create_gauge("is_major".into(), None),
2300            backoff: group.create_gauge("backoff".into(), Some("s".into())),
2301            retries: group.create_gauge("retries".into(), None),
2302            current_start: group.create_gauge("start".into(), None),
2303            current_end: group.create_gauge("end".into(), None),
2304            scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2305            scanned_vid: group.create_gauge("scanned_vid".into(), None),
2306            major_missing_blocks: group.create_gauge("major_missing_blocks".into(), None),
2307            major_missing_vid: group.create_gauge("major_missing_vid".into(), None),
2308            minor_missing_blocks: group.create_gauge("minor_missing_blocks".into(), None),
2309            minor_missing_vid: group.create_gauge("minor_missing_vid".into(), None),
2310        }
2311    }
2312
2313    fn add_missing_blocks(&self, major: bool, missing: usize) {
2314        if major {
2315            self.major_missing_blocks.set(missing);
2316        } else {
2317            self.minor_missing_blocks.update(missing as i64);
2318        }
2319    }
2320
2321    fn add_missing_vid(&self, major: bool, missing: usize) {
2322        if major {
2323            self.major_missing_vid.set(missing);
2324        } else {
2325            self.minor_missing_vid.update(missing as i64);
2326        }
2327    }
2328}
2329
2330#[derive(Debug)]
2331struct AggregatorMetrics {
2332    /// The block height for which aggregate statistics are currently available.
2333    height: Box<dyn Gauge>,
2334}
2335
2336impl AggregatorMetrics {
2337    fn new(metrics: &PrometheusMetrics) -> Self {
2338        let group = metrics.subgroup("aggregator".into());
2339        Self {
2340            height: group.create_gauge("height".into(), None),
2341        }
2342    }
2343}
2344
2345/// Turn a fallible passive fetch future into an infallible "fetch".
2346///
2347/// Basically, we ignore failures due to a channel sender being dropped, which should never happen.
2348fn passive<T>(
2349    req: impl Debug + Send + 'static,
2350    fut: impl Future<Output = Option<T>> + Send + 'static,
2351) -> Fetch<T>
2352where
2353    T: Send + 'static,
2354{
2355    Fetch::Pending(
2356        fut.then(move |opt| async move {
2357            match opt {
2358                Some(t) => t,
2359                None => {
2360                    // If `passive_fetch` returns `None`, it means the notifier was dropped without
2361                    // ever sending a notification. In this case, the correct behavior is actually
2362                    // to block forever (unless the `Fetch` itself is dropped), since the semantics
2363                    // of `Fetch` are to never fail. This is analogous to fetching an object which
2364                    // doesn't actually exist: the `Fetch` will never return.
2365                    //
2366                    // However, for ease of debugging, and since this is never expected to happen in
2367                    // normal usage, we panic instead. This should only happen in two cases:
2368                    // * The server was shut down (dropping the notifier) without cleaning up some
2369                    //   background tasks. This will not affect runtime behavior, but should be
2370                    //   fixed if it happens.
2371                    // * There is a very unexpected runtime bug resulting in the notifier being
2372                    //   dropped. If this happens, things are very broken in any case, and it is
2373                    //   better to panic loudly than simply block forever.
2374                    panic!("notifier dropped without satisfying request {req:?}");
2375                },
2376            }
2377        })
2378        .boxed(),
2379    )
2380}
2381
2382/// Get the result of the first future to return `Some`, if either do.
2383async fn select_some<T>(
2384    a: impl Future<Output = Option<T>> + Unpin,
2385    b: impl Future<Output = Option<T>> + Unpin,
2386) -> Option<T> {
2387    match future::select(a, b).await {
2388        // If the first future resolves with `Some`, immediately return the result.
2389        Either::Left((Some(a), _)) => Some(a),
2390        Either::Right((Some(b), _)) => Some(b),
2391
2392        // If the first future resolves with `None`, wait for the result of the second future.
2393        Either::Left((None, b)) => b.await,
2394        Either::Right((None, a)) => a.await,
2395    }
2396}
2397
2398#[cfg(test)]
2399mod test {
2400    use super::*;
2401
2402    #[test]
2403    fn test_range_chunks() {
2404        // Inclusive bounds, partial last chunk.
2405        assert_eq!(
2406            range_chunks(0..=4, 2).collect::<Vec<_>>(),
2407            [0..2, 2..4, 4..5]
2408        );
2409
2410        // Inclusive bounds, complete last chunk.
2411        assert_eq!(
2412            range_chunks(0..=5, 2).collect::<Vec<_>>(),
2413            [0..2, 2..4, 4..6]
2414        );
2415
2416        // Exclusive bounds, partial last chunk.
2417        assert_eq!(
2418            range_chunks(0..5, 2).collect::<Vec<_>>(),
2419            [0..2, 2..4, 4..5]
2420        );
2421
2422        // Exclusive bounds, complete last chunk.
2423        assert_eq!(
2424            range_chunks(0..6, 2).collect::<Vec<_>>(),
2425            [0..2, 2..4, 4..6]
2426        );
2427
2428        // Unbounded.
2429        assert_eq!(
2430            range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2431            [0..2, 2..4, 4..6, 6..8, 8..10]
2432        );
2433    }
2434
2435    #[test]
2436    fn test_range_chunks_rev() {
2437        // Inclusive bounds, partial last chunk.
2438        assert_eq!(
2439            range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2440            [3..5, 1..3, 0..1]
2441        );
2442
2443        // Inclusive bounds, complete last chunk.
2444        assert_eq!(
2445            range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2446            [4..6, 2..4, 0..2]
2447        );
2448
2449        // Exclusive bounds, partial last chunk.
2450        assert_eq!(
2451            range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2452            [4..6, 2..4, 1..2]
2453        );
2454
2455        // Exclusive bounds, complete last chunk.
2456        assert_eq!(
2457            range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2458            [3..5, 1..3]
2459        );
2460    }
2461}