hotshot_query_service/availability/
fetch.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{future::IntoFuture, time::Duration};
14
15use futures::future::{BoxFuture, Future, FutureExt};
16use snafu::{Error, ErrorCompat, IntoError, NoneError, OptionExt};
17use tokio::time::timeout;
18
19/// An in-progress request to fetch some data.
20///
21/// A [`Fetch`] represents the period of uncertainty after some data has been requested, before it
22/// is known whether that data exists locally or must be retrieved from an external source (or
23/// whether we must wait for the data to be created in the first place).
24///
25/// For data that is already available locally, a request for that data may return [`Fetch::Ready`]
26/// with the data itself. Otherwise, the request will return [`Fetch::Pending`] with a future that
27/// will resolve once the data is available.
28///
29/// Depending on the context, [`Fetch`] can behave a bit like a [`Future`](futures::Future) or a bit
30/// like a [`Result`]. Therefore, it implements [`IntoFuture`], so it can be awaited (this is the
31/// same as calling [`resolve`](Self::resolve)) and it implements methods [`context`](Self::context)
32/// and [`with_context`](Self::with_context), which make it easy to convert a [`Fetch`] to a
33/// [`Result`], mimicking the methods from Snafu traits [`OptionExt`](snafu::OptionExt) and
34/// [`ResultExt`](snafu::ResultExt).
35pub enum Fetch<T> {
36    Ready(T),
37    Pending(BoxFuture<'static, T>),
38}
39
40impl<T> Fetch<T> {
41    /// Get the requested data if it is available immediately.
42    ///
43    /// If the requested data is not immediately available, `Err(self)` is returned so that the
44    /// [`Fetch`] object may be used again.
45    pub fn try_resolve(self) -> Result<T, Self> {
46        match self {
47            Self::Ready(obj) => Ok(obj),
48            Self::Pending(fut) => Err(Self::Pending(fut)),
49        }
50    }
51
52    /// Convert this [`Fetch`] to a [`Result`] with the provided error context.
53    ///
54    /// The result indicates whether the requested data is immediately available. If it is, [`Ok`]
55    /// is returned. Otherwise, an error is created from `context` using Snafu.
56    pub fn context<C, E>(self, context: C) -> Result<T, E>
57    where
58        C: IntoError<E, Source = NoneError>,
59        E: Error + ErrorCompat,
60    {
61        self.try_resolve().ok().context(context)
62    }
63
64    /// Convert this [`Fetch`] to a [`Result`] with the provided error context.
65    ///
66    /// The result indicates whether the requested data is immediately available. If it is, [`Ok`]
67    /// is returned. Otherwise, an error is created from `context` using Snafu.
68    pub fn with_context<F, C, E>(self, context: F) -> Result<T, E>
69    where
70        F: FnOnce() -> C,
71        C: IntoError<E, Source = NoneError>,
72        E: Error + ErrorCompat,
73    {
74        self.try_resolve().ok().with_context(context)
75    }
76
77    /// Does this fetch represent an unresolved query?
78    pub fn is_pending(&self) -> bool {
79        matches!(self, Self::Pending(_))
80    }
81
82    pub async fn then_fetch<Fut, R>(
83        self,
84        f: impl Send + FnOnce(&T) -> Fut + 'static,
85    ) -> Fetch<(T, R)>
86    where
87        Fut: Send + Future<Output = Fetch<R>>,
88        T: Send + 'static,
89        R: Send + 'static,
90    {
91        match self {
92            Fetch::Ready(t) => f(&t).await.map(|r| (t, r)),
93            Fetch::Pending(t) => Fetch::Pending(
94                async move {
95                    let t = t.await;
96                    let r = f(&t).await.await;
97                    (t, r)
98                }
99                .boxed(),
100            ),
101        }
102    }
103}
104
105impl<T: Send + 'static> Fetch<T> {
106    /// Wait for the data to become available, if it is not already.
107    pub async fn resolve(self) -> T {
108        self.await
109    }
110
111    /// Wait for the requested data to become available, but only for up to `timeout`.
112    ///
113    /// This function is similar to [`resolve`](Self::resolve), but if the future does not resolve
114    /// within `timeout`, then [`with_timeout`](Self::with_timeout) will resolve with [`None`].
115    pub async fn with_timeout(self, timeout_duration: Duration) -> Option<T> {
116        timeout(timeout_duration, self.into_future()).await.ok()
117    }
118}
119
120impl<T: 'static> Fetch<T> {
121    /// Transform the result of this fetch.
122    ///
123    /// If the requested data is already available, `f` will be applied immediately. Otherwise, `f`
124    /// will be saved and applied after the pending future resolves.
125    pub fn map<F, U>(self, f: F) -> Fetch<U>
126    where
127        F: 'static + Send + FnOnce(T) -> U,
128    {
129        match self {
130            Self::Ready(obj) => Fetch::Ready(f(obj)),
131            Self::Pending(fut) => Fetch::Pending(fut.map(f).boxed()),
132        }
133    }
134}
135
136impl<T: Send + 'static> IntoFuture for Fetch<T> {
137    type Output = T;
138    type IntoFuture = BoxFuture<'static, Self::Output>;
139
140    fn into_future(self) -> Self::IntoFuture {
141        async move {
142            match self {
143                Self::Ready(obj) => obj,
144                Self::Pending(fut) => fut.await,
145            }
146        }
147        .boxed()
148    }
149}