hotshot_query_service/availability/fetch.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{future::IntoFuture, time::Duration};
14
15use futures::future::{BoxFuture, FutureExt};
16use snafu::{Error, ErrorCompat, IntoError, NoneError, OptionExt};
17use tokio::time::timeout;
18
19/// An in-progress request to fetch some data.
20///
21/// A [`Fetch`] represents the period of uncertainty after some data has been requested, before it
22/// is known whether that data exists locally or must be retrieved from an external source (or
23/// whether we must wait for the data to be created in the first place).
24///
25/// For data that is already available locally, a request for that data may return [`Fetch::Ready`]
26/// with the data itself. Otherwise, the request will return [`Fetch::Pending`] with a future that
27/// will resolve once the data is available.
28///
29/// Depending on the context, [`Fetch`] can behave a bit like a [`Future`](futures::Future) or a bit
30/// like a [`Result`]. Therefore, it implements [`IntoFuture`], so it can be awaited (this is the
31/// same as calling [`resolve`](Self::resolve)) and it implements methods [`context`](Self::context)
32/// and [`with_context`](Self::with_context), which make it easy to convert a [`Fetch`] to a
33/// [`Result`], mimicking the methods from Snafu traits [`OptionExt`](snafu::OptionExt) and
34/// [`ResultExt`](snafu::ResultExt).
35pub enum Fetch<T> {
36 Ready(T),
37 Pending(BoxFuture<'static, T>),
38}
39
40impl<T> Fetch<T> {
41 /// Get the requested data if it is available immediately.
42 ///
43 /// If the requested data is not immediately available, `Err(self)` is returned so that the
44 /// [`Fetch`] object may be used again.
45 pub fn try_resolve(self) -> Result<T, Self> {
46 match self {
47 Self::Ready(obj) => Ok(obj),
48 Self::Pending(fut) => Err(Self::Pending(fut)),
49 }
50 }
51
52 /// Convert this [`Fetch`] to a [`Result`] with the provided error context.
53 ///
54 /// The result indicates whether the requested data is immediately available. If it is, [`Ok`]
55 /// is returned. Otherwise, an error is created from `context` using Snafu.
56 pub fn context<C, E>(self, context: C) -> Result<T, E>
57 where
58 C: IntoError<E, Source = NoneError>,
59 E: Error + ErrorCompat,
60 {
61 self.try_resolve().ok().context(context)
62 }
63
64 /// Convert this [`Fetch`] to a [`Result`] with the provided error context.
65 ///
66 /// The result indicates whether the requested data is immediately available. If it is, [`Ok`]
67 /// is returned. Otherwise, an error is created from `context` using Snafu.
68 pub fn with_context<F, C, E>(self, context: F) -> Result<T, E>
69 where
70 F: FnOnce() -> C,
71 C: IntoError<E, Source = NoneError>,
72 E: Error + ErrorCompat,
73 {
74 self.try_resolve().ok().with_context(context)
75 }
76
77 /// Does this fetch represent an unresolved query?
78 pub fn is_pending(&self) -> bool {
79 matches!(self, Self::Pending(_))
80 }
81}
82
83impl<T: Send + 'static> Fetch<T> {
84 /// Wait for the data to become available, if it is not already.
85 pub async fn resolve(self) -> T {
86 self.await
87 }
88
89 /// Wait for the requested data to become available, but only for up to `timeout`.
90 ///
91 /// This function is similar to [`resolve`](Self::resolve), but if the future does not resolve
92 /// within `timeout`, then [`with_timeout`](Self::with_timeout) will resolve with [`None`].
93 pub async fn with_timeout(self, timeout_duration: Duration) -> Option<T> {
94 timeout(timeout_duration, self.into_future()).await.ok()
95 }
96}
97
98impl<T: 'static> Fetch<T> {
99 /// Transform the result of this fetch.
100 ///
101 /// If the requested data is already available, `f` will be applied immediately. Otherwise, `f`
102 /// will be saved and applied after the pending future resolves.
103 pub fn map<F, U>(self, f: F) -> Fetch<U>
104 where
105 F: 'static + Send + FnOnce(T) -> U,
106 {
107 match self {
108 Self::Ready(obj) => Fetch::Ready(f(obj)),
109 Self::Pending(fut) => Fetch::Pending(fut.map(f).boxed()),
110 }
111 }
112}
113
114impl<T: Send + 'static> IntoFuture for Fetch<T> {
115 type Output = T;
116 type IntoFuture = BoxFuture<'static, Self::Output>;
117
118 fn into_future(self) -> Self::IntoFuture {
119 async move {
120 match self {
121 Self::Ready(obj) => obj,
122 Self::Pending(fut) => fut.await,
123 }
124 }
125 .boxed()
126 }
127}