hotshot_query_service/fetching.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Fetching missing data from remote providers.
14//!
15//! This module provides a mechanism to fetch data that is missing from this query service's storage
16//! from a remote data availability provider. [`Fetcher`] can be used to handle concurrent requests
17//! for data, ensuring that each distinct resource is only fetched once at a time.
18//!
19//! Fetching is ultimately dispatched to a [`Provider`], which implements fetching for a specific
20//! type of resource from a specific source. The [`provider`] module contains built-in
21//! implementations of [`Provider`] for various data availability sources.
22//!
23
24use std::{
25 collections::{hash_map::Entry, BTreeSet, HashMap},
26 fmt::Debug,
27 sync::Arc,
28 time::Duration,
29};
30
31use async_lock::{Mutex, Semaphore};
32use backoff::{backoff::Backoff, ExponentialBackoff};
33use derivative::Derivative;
34use tokio::{spawn, time::sleep};
35
36pub mod provider;
37pub mod request;
38
39pub use provider::Provider;
40pub use request::Request;
41
42/// A callback to process the result of a request.
43///
44/// Sometimes, we may fetch the same object for multiple purposes, so a request may have more than
45/// one callback registered. For example, we may fetch a leaf for its own sake and also to
46/// reconstruct a block. Or, we may fetch the same payload for two different blocks. In both of
47/// these cases, there are two objects that must be processed and stored after the fetch completes.
48///
49/// In these cases, we only want one task to actually fetch the resource, but there may be several
50/// unrelated actions to take after the resource is fetched. This trait allows us to identify a
51/// callback, so that when the task that actually fetched the resource completes, it will run one
52/// instance of each distinct callback which was registered. Callbacks will run in the order
53/// determined by `Ord`.
54#[trait_variant::make(Callback: Send)]
55pub trait LocalCallback<T>: Debug + Ord {
56 async fn run(self, response: T);
57}
58
59/// Management of concurrent requests to fetch resources.
60#[derive(Derivative)]
61#[derivative(Clone(bound = ""), Debug(bound = ""))]
62pub struct Fetcher<T, C> {
63 #[derivative(Debug = "ignore")]
64 in_progress: Arc<Mutex<HashMap<T, BTreeSet<C>>>>,
65 backoff: ExponentialBackoff,
66 permit: Arc<Semaphore>,
67}
68
69impl<T, C> Fetcher<T, C> {
70 pub fn new(permit: Arc<Semaphore>, backoff: ExponentialBackoff) -> Self {
71 Self {
72 in_progress: Default::default(),
73 permit,
74 backoff,
75 }
76 }
77}
78
79impl<T, C> Fetcher<T, C> {
80 /// Fetch a resource, if it is not already being fetched.
81 ///
82 /// This function will spawn a new task to fetch the resource in the background, using callbacks
83 /// to process the fetched resource upon success. If the resource is already being fetched, the
84 /// spawned task will terminate without fetching the resource, but only after registering the
85 /// provided callbacks to be executed by the existing fetching task upon completion, as long as
86 /// there are not already equivalent callbacks registered.
87 ///
88 /// We spawn a (short-lived) task even if the resource is already being fetched, because the
89 /// check that the resource is being fetched requires an exclusive lock, and we do not want to
90 /// block the caller, which might be on the critical path of request handling.
91 ///
92 /// Note that while callbacks are allowed to be async, they are executed sequentially while an
93 /// exclusive lock is held, and thus they should not take too long to run or block indefinitely.
94 ///
95 /// The spawned task will continue trying to fetch the object until it succeeds, so it is the
96 /// caller's responsibility only to use this method for resources which are known to exist and
97 /// be fetchable by `provider`.
98 pub fn spawn_fetch<Types>(
99 &self,
100 req: T,
101 provider: impl Provider<Types, T> + 'static,
102 callbacks: impl IntoIterator<Item = C> + Send + 'static,
103 ) where
104 T: Request<Types> + 'static,
105 C: Callback<T::Response> + 'static,
106 {
107 let in_progress = self.in_progress.clone();
108 let permit = self.permit.clone();
109 let mut backoff = self.backoff.clone();
110
111 spawn(async move {
112 tracing::info!("spawned active fetch for {req:?}");
113
114 // Check if the requested object is already being fetched. If not, take a lock on it so
115 // we are the only ones to fetch this particular object.
116 {
117 let mut in_progress = in_progress.lock().await;
118 match in_progress.entry(req) {
119 Entry::Occupied(mut e) => {
120 // If the object is already being fetched, add our callback for the fetching
121 // task to execute upon completion.
122 e.get_mut().extend(callbacks);
123 tracing::info!(?req, callbacks = ?e.get(), "resource is already being fetched");
124 return;
125 },
126 Entry::Vacant(e) => {
127 // If the object is not being fetched, we will register our own callback and
128 // then fetch it ourselves.
129 e.insert(callbacks.into_iter().collect());
130 },
131 }
132 }
133
134 // Now we are responsible for fetching the object, reach out to the provider.
135 backoff.reset();
136 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
137 let res = loop {
138 // Acquire a permit from the semaphore to rate limit the number of concurrent fetch requests
139 let permit = permit.acquire().await;
140 if let Some(res) = provider.fetch(req).await {
141 break res;
142 }
143
144 // We only fetch objects which are known to exist, so we should eventually succeed
145 // in fetching if we retry enough. For example, we may be fetching a block from a
146 // peer who hasn't received the block yet.
147 //
148 // To understand why it is ok to retry indefinitely, think about manual
149 // intervention: if we don't retry, or retry with a limit, we may require manual
150 // intervention whenever a query service fails to fetch a resource that should exist
151 // and stops retrying, since it now may never receive that resource. With indefinite
152 // fetching, we require manual intervention only when active fetches are
153 // accumulating because a peer which _should_ have the resource isn't providing it.
154 // In this case, we would require manual intervention on the peer anyways.
155 tracing::warn!("failed to fetch {req:?}, will retry in {delay:?}");
156 drop(permit);
157 sleep(delay).await;
158
159 if let Some(next_delay) = backoff.next_backoff() {
160 delay = next_delay;
161 }
162 };
163
164 // Done fetching, remove our lock on the object and execute all callbacks.
165 //
166 // We will keep this lock the whole time we are running the callbacks. We can't release
167 // it earlier because we can't allow another task to register a callback after we have
168 // taken the list of callbacks that we will execute. We also don't want to allow any new
169 // fetches until we have executed the callbacks, because one of the callbacks may store
170 // some resource that obviates the need for another fetch.
171 //
172 // The callbacks may acquire arbitrary locks from this task, while we already hold the
173 // lock on `in_progress`. This is fine because we are always running in a freshly
174 // spawned task. Therefore we know that this task holds no locks _before_ acquiring
175 // `in_progress`, and so it is safe to acquire any lock _after_ acquiring `in_progress`.
176 let mut in_progress = in_progress.lock().await;
177 let callbacks = in_progress.remove(&req).unwrap_or_default();
178 for callback in callbacks {
179 callback.run(res.clone()).await;
180 }
181 });
182 }
183}