hotshot_query_service/
fetching.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Fetching missing data from remote providers.
14//!
15//! This module provides a mechanism to fetch data that is missing from this query service's storage
16//! from a remote data availability provider. [`Fetcher`] can be used to handle concurrent requests
17//! for data, ensuring that each distinct resource is only fetched once at a time.
18//!
19//! Fetching is ultimately dispatched to a [`Provider`], which implements fetching for a specific
20//! type of resource from a specific source. The [`provider`] module contains built-in
21//! implementations of [`Provider`] for various data availability sources.
22//!
23
24use std::{
25    collections::{hash_map::Entry, BTreeSet, HashMap},
26    fmt::Debug,
27    sync::Arc,
28    time::Duration,
29};
30
31use async_lock::{Mutex, Semaphore};
32use backoff::{backoff::Backoff, ExponentialBackoff};
33use derivative::Derivative;
34use tokio::{spawn, time::sleep};
35
36pub mod provider;
37pub mod request;
38
39pub use provider::Provider;
40pub use request::Request;
41
42/// A callback to process the result of a request.
43///
44/// Sometimes, we may fetch the same object for multiple purposes, so a request may have more than
45/// one callback registered. For example, we may fetch a leaf for its own sake and also to
46/// reconstruct a block. Or, we may fetch the same payload for two different blocks. In both of
47/// these cases, there are two objects that must be processed and stored after the fetch completes.
48///
49/// In these cases, we only want one task to actually fetch the resource, but there may be several
50/// unrelated actions to take after the resource is fetched. This trait allows us to identify a
51/// callback, so that when the task that actually fetched the resource completes, it will run one
52/// instance of each distinct callback which was registered. Callbacks will run in the order
53/// determined by `Ord`.
54#[trait_variant::make(Callback: Send)]
55pub trait LocalCallback<T>: Debug + Ord {
56    async fn run(self, response: T);
57}
58
59/// Management of concurrent requests to fetch resources.
60#[derive(Derivative)]
61#[derivative(Clone(bound = ""), Debug(bound = ""))]
62pub struct Fetcher<T, C> {
63    #[derivative(Debug = "ignore")]
64    in_progress: Arc<Mutex<HashMap<T, BTreeSet<C>>>>,
65    backoff: ExponentialBackoff,
66    permit: Arc<Semaphore>,
67}
68
69impl<T, C> Fetcher<T, C> {
70    pub fn new(permit: Arc<Semaphore>, backoff: ExponentialBackoff) -> Self {
71        Self {
72            in_progress: Default::default(),
73            permit,
74            backoff,
75        }
76    }
77}
78
79impl<T, C> Fetcher<T, C> {
80    /// Fetch a resource, if it is not already being fetched.
81    ///
82    /// This function will spawn a new task to fetch the resource in the background, using callbacks
83    /// to process the fetched resource upon success. If the resource is already being fetched, the
84    /// spawned task will terminate without fetching the resource, but only after registering the
85    /// provided callbacks to be executed by the existing fetching task upon completion, as long as
86    /// there are not already equivalent callbacks registered.
87    ///
88    /// We spawn a (short-lived) task even if the resource is already being fetched, because the
89    /// check that the resource is being fetched requires an exclusive lock, and we do not want to
90    /// block the caller, which might be on the critical path of request handling.
91    ///
92    /// Note that while callbacks are allowed to be async, they are executed sequentially while an
93    /// exclusive lock is held, and thus they should not take too long to run or block indefinitely.
94    ///
95    /// The spawned task will continue trying to fetch the object until it succeeds, so it is the
96    /// caller's responsibility only to use this method for resources which are known to exist and
97    /// be fetchable by `provider`.
98    pub fn spawn_fetch<Types>(
99        &self,
100        req: T,
101        provider: impl Provider<Types, T> + 'static,
102        callbacks: impl IntoIterator<Item = C> + Send + 'static,
103    ) where
104        T: Request<Types> + 'static,
105        C: Callback<T::Response> + 'static,
106    {
107        let in_progress = self.in_progress.clone();
108        let permit = self.permit.clone();
109        let mut backoff = self.backoff.clone();
110
111        spawn(async move {
112            tracing::info!("spawned active fetch for {req:?}");
113
114            // Check if the requested object is already being fetched. If not, take a lock on it so
115            // we are the only ones to fetch this particular object.
116            {
117                let mut in_progress = in_progress.lock().await;
118                match in_progress.entry(req) {
119                    Entry::Occupied(mut e) => {
120                        // If the object is already being fetched, add our callback for the fetching
121                        // task to execute upon completion.
122                        e.get_mut().extend(callbacks);
123                        tracing::info!(?req, callbacks = ?e.get(), "resource is already being fetched");
124                        return;
125                    },
126                    Entry::Vacant(e) => {
127                        // If the object is not being fetched, we will register our own callback and
128                        // then fetch it ourselves.
129                        e.insert(callbacks.into_iter().collect());
130                    },
131                }
132            }
133
134            // Now we are responsible for fetching the object, reach out to the provider.
135            backoff.reset();
136            let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
137            let res = loop {
138                // Acquire a permit from the semaphore to rate limit the number of concurrent fetch requests
139                let permit = permit.acquire().await;
140                if let Some(res) = provider.fetch(req).await {
141                    break res;
142                }
143
144                // We only fetch objects which are known to exist, so we should eventually succeed
145                // in fetching if we retry enough. For example, we may be fetching a block from a
146                // peer who hasn't received the block yet.
147                //
148                // To understand why it is ok to retry indefinitely, think about manual
149                // intervention: if we don't retry, or retry with a limit, we may require manual
150                // intervention whenever a query service fails to fetch a resource that should exist
151                // and stops retrying, since it now may never receive that resource. With indefinite
152                // fetching, we require manual intervention only when active fetches are
153                // accumulating because a peer which _should_ have the resource isn't providing it.
154                // In this case, we would require manual intervention on the peer anyways.
155                tracing::warn!("failed to fetch {req:?}, will retry in {delay:?}");
156                drop(permit);
157                sleep(delay).await;
158
159                if let Some(next_delay) = backoff.next_backoff() {
160                    delay = next_delay;
161                }
162            };
163
164            // Done fetching, remove our lock on the object and execute all callbacks.
165            //
166            // We will keep this lock the whole time we are running the callbacks. We can't release
167            // it earlier because we can't allow another task to register a callback after we have
168            // taken the list of callbacks that we will execute. We also don't want to allow any new
169            // fetches until we have executed the callbacks, because one of the callbacks may store
170            // some resource that obviates the need for another fetch.
171            //
172            // The callbacks may acquire arbitrary locks from this task, while we already hold the
173            // lock on `in_progress`. This is fine because we are always running in a freshly
174            // spawned task. Therefore we know that this task holds no locks _before_ acquiring
175            // `in_progress`, and so it is safe to acquire any lock _after_ acquiring `in_progress`.
176            let mut in_progress = in_progress.lock().await;
177            let callbacks = in_progress.remove(&req).unwrap_or_default();
178            for callback in callbacks {
179                callback.run(res.clone()).await;
180            }
181        });
182    }
183}