hotshot_query_service/
task.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Async task utilities.
14
15use std::{fmt::Display, sync::Arc};
16
17use derivative::Derivative;
18use futures::future::Future;
19use tokio::{
20    spawn,
21    task::{JoinError, JoinHandle},
22};
23use tracing::{info_span, Instrument};
24
25/// A background task which is cancelled on [`Drop`]
26///
27/// This handle can be cloned; cloning it does not clone the underlying task. There may be many
28/// handles to the same background task, and the task will be cancelled when all handles are
29/// dropped.
30#[derive(Clone, Debug)]
31pub struct BackgroundTask {
32    // A handle to the inner task. This exists solely so that we can hold it and have it be dropped
33    // when the last clone of this object is dropped.
34    _inner: Arc<Task<()>>,
35}
36
37impl BackgroundTask {
38    /// Spawn a background task, which will be cancelled when every clone is dropped.
39    ///
40    /// The caller should ensure that `future` yields back to the executor fairly frequently, to
41    /// ensure timely cancellation in case the task is dropped. If an operation in `future` may run
42    /// for a long time without blocking or yielding, consider using
43    /// [`yield_now`](async_std::task::yield_now) periodically, or using [`spawn`] or
44    /// [`spawn_blocking`](async_std::task::spawn_blocking) to run long operations in a sub-task.
45    pub fn spawn<F>(name: impl Display, future: F) -> Self
46    where
47        F: Future + Send + 'static,
48    {
49        // Ignore the output of the background task.
50        let future = async move {
51            future.await;
52        };
53        Self {
54            _inner: Arc::new(Task::spawn(name, future)),
55        }
56    }
57}
58
59#[derive(Derivative)]
60#[derivative(Debug(bound = ""))]
61struct TaskInner<T: Send> {
62    name: String,
63    #[derivative(Debug = "ignore")]
64    handle: JoinHandle<T>,
65}
66
67/// A task handle which can be joined, but is cancelled on [`Drop`]
68#[derive(Derivative)]
69#[derivative(Debug(bound = ""))]
70pub struct Task<T: Send + 'static> {
71    // The task handle is an `Option` so we can `take()` out of it during `join` and `drop`. This
72    // will always be `Some` except during joining or cancellation.
73    inner: Option<TaskInner<T>>,
74}
75
76impl<T: Send + 'static> Task<T> {
77    /// Spawn a task, which will be cancelled when dropped.
78    ///
79    /// The caller should ensure that `future` yields back to the executor fairly frequently, to
80    /// ensure timely cancellation in case the task is dropped. If an operation in `future` may run
81    /// for a long time without blocking or yielding, consider using
82    /// [`yield_now`](async_std::task::yield_now) periodically, or using
83    /// [`spawn`] or [`spawn_blocking`](async_std::task::spawn_blocking) to run long operations in a
84    /// sub-task.
85    pub fn spawn<F>(name: impl Display, future: F) -> Self
86    where
87        F: Future<Output = T> + Send + 'static,
88    {
89        let name = name.to_string();
90        let handle = {
91            let span = info_span!("task", name);
92            spawn(
93                async move {
94                    tracing::info!("spawning task");
95                    let res = future.await;
96                    tracing::info!("completed task");
97                    res
98                }
99                .instrument(span),
100            )
101        };
102
103        Self {
104            inner: Some(TaskInner { name, handle }),
105        }
106    }
107
108    /// Wait for the task to complete and get its output.
109    pub async fn join(mut self) -> Result<T, JoinError> {
110        // We take here so that we will not attempt to cancel the joined task when this handle is
111        // dropped at the end of the function. We can unwrap here because `inner` is only `None`
112        // during `join` or `drop`. Since `join` consumes `self`, it is not possible that `join`
113        // already ran, and of course `self` has not been dropped yet.
114        let inner = self.inner.take().unwrap();
115        inner.handle.await
116    }
117}
118
119impl<T: Send + 'static> Drop for Task<T> {
120    fn drop(&mut self) {
121        if let Some(inner) = self.inner.take() {
122            tracing::info!(name = inner.name, "cancelling task");
123            inner.handle.abort();
124            tracing::info!(name = inner.name, "cancelled task");
125        }
126    }
127}