hotshot_query_service/availability/
fetch.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{future::IntoFuture, time::Duration};
14
15use futures::future::{BoxFuture, FutureExt};
16use snafu::{Error, ErrorCompat, IntoError, NoneError, OptionExt};
17use tokio::time::timeout;
18
19/// An in-progress request to fetch some data.
20///
21/// A [`Fetch`] represents the period of uncertainty after some data has been requested, before it
22/// is known whether that data exists locally or must be retrieved from an external source (or
23/// whether we must wait for the data to be created in the first place).
24///
25/// For data that is already available locally, a request for that data may return [`Fetch::Ready`]
26/// with the data itself. Otherwise, the request will return [`Fetch::Pending`] with a future that
27/// will resolve once the data is available.
28///
29/// Depending on the context, [`Fetch`] can behave a bit like a [`Future`](futures::Future) or a bit
30/// like a [`Result`]. Therefore, it implements [`IntoFuture`], so it can be awaited (this is the
31/// same as calling [`resolve`](Self::resolve)) and it implements methods [`context`](Self::context)
32/// and [`with_context`](Self::with_context), which make it easy to convert a [`Fetch`] to a
33/// [`Result`], mimicking the methods from Snafu traits [`OptionExt`](snafu::OptionExt) and
34/// [`ResultExt`](snafu::ResultExt).
35pub enum Fetch<T> {
36    Ready(T),
37    Pending(BoxFuture<'static, T>),
38}
39
40impl<T> Fetch<T> {
41    /// Get the requested data if it is available immediately.
42    ///
43    /// If the requested data is not immediately available, `Err(self)` is returned so that the
44    /// [`Fetch`] object may be used again.
45    pub fn try_resolve(self) -> Result<T, Self> {
46        match self {
47            Self::Ready(obj) => Ok(obj),
48            Self::Pending(fut) => Err(Self::Pending(fut)),
49        }
50    }
51
52    /// Convert this [`Fetch`] to a [`Result`] with the provided error context.
53    ///
54    /// The result indicates whether the requested data is immediately available. If it is, [`Ok`]
55    /// is returned. Otherwise, an error is created from `context` using Snafu.
56    pub fn context<C, E>(self, context: C) -> Result<T, E>
57    where
58        C: IntoError<E, Source = NoneError>,
59        E: Error + ErrorCompat,
60    {
61        self.try_resolve().ok().context(context)
62    }
63
64    /// Convert this [`Fetch`] to a [`Result`] with the provided error context.
65    ///
66    /// The result indicates whether the requested data is immediately available. If it is, [`Ok`]
67    /// is returned. Otherwise, an error is created from `context` using Snafu.
68    pub fn with_context<F, C, E>(self, context: F) -> Result<T, E>
69    where
70        F: FnOnce() -> C,
71        C: IntoError<E, Source = NoneError>,
72        E: Error + ErrorCompat,
73    {
74        self.try_resolve().ok().with_context(context)
75    }
76
77    /// Does this fetch represent an unresolved query?
78    pub fn is_pending(&self) -> bool {
79        matches!(self, Self::Pending(_))
80    }
81}
82
83impl<T: Send + 'static> Fetch<T> {
84    /// Wait for the data to become available, if it is not already.
85    pub async fn resolve(self) -> T {
86        self.await
87    }
88
89    /// Wait for the requested data to become available, but only for up to `timeout`.
90    ///
91    /// This function is similar to [`resolve`](Self::resolve), but if the future does not resolve
92    /// within `timeout`, then [`with_timeout`](Self::with_timeout) will resolve with [`None`].
93    pub async fn with_timeout(self, timeout_duration: Duration) -> Option<T> {
94        timeout(timeout_duration, self.into_future()).await.ok()
95    }
96}
97
98impl<T: 'static> Fetch<T> {
99    /// Transform the result of this fetch.
100    ///
101    /// If the requested data is already available, `f` will be applied immediately. Otherwise, `f`
102    /// will be saved and applied after the pending future resolves.
103    pub fn map<F, U>(self, f: F) -> Fetch<U>
104    where
105        F: 'static + Send + FnOnce(T) -> U,
106    {
107        match self {
108            Self::Ready(obj) => Fetch::Ready(f(obj)),
109            Self::Pending(fut) => Fetch::Pending(fut.map(f).boxed()),
110        }
111    }
112}
113
114impl<T: Send + 'static> IntoFuture for Fetch<T> {
115    type Output = T;
116    type IntoFuture = BoxFuture<'static, Self::Output>;
117
118    fn into_future(self) -> Self::IntoFuture {
119        async move {
120            match self {
121                Self::Ready(obj) => obj,
122                Self::Pending(fut) => fut.await,
123            }
124        }
125        .boxed()
126    }
127}