hotshot_query_service/task.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Async task utilities.
14
15use std::{fmt::Display, sync::Arc};
16
17use derivative::Derivative;
18use futures::future::Future;
19use tokio::{
20 spawn,
21 task::{JoinError, JoinHandle},
22};
23use tracing::{info_span, Instrument};
24
25/// A background task which is cancelled on [`Drop`]
26///
27/// This handle can be cloned; cloning it does not clone the underlying task. There may be many
28/// handles to the same background task, and the task will be cancelled when all handles are
29/// dropped.
30#[derive(Clone, Debug)]
31pub struct BackgroundTask {
32 // A handle to the inner task. This exists solely so that we can hold it and have it be dropped
33 // when the last clone of this object is dropped.
34 _inner: Arc<Task<()>>,
35}
36
37impl BackgroundTask {
38 /// Spawn a background task, which will be cancelled when every clone is dropped.
39 ///
40 /// The caller should ensure that `future` yields back to the executor fairly frequently, to
41 /// ensure timely cancellation in case the task is dropped. If an operation in `future` may run
42 /// for a long time without blocking or yielding, consider using
43 /// [`yield_now`](async_std::task::yield_now) periodically, or using [`spawn`] or
44 /// [`spawn_blocking`](async_std::task::spawn_blocking) to run long operations in a sub-task.
45 pub fn spawn<F>(name: impl Display, future: F) -> Self
46 where
47 F: Future + Send + 'static,
48 {
49 // Ignore the output of the background task.
50 let future = async move {
51 future.await;
52 };
53 Self {
54 _inner: Arc::new(Task::spawn(name, future)),
55 }
56 }
57}
58
59#[derive(Derivative)]
60#[derivative(Debug(bound = ""))]
61struct TaskInner<T: Send> {
62 name: String,
63 #[derivative(Debug = "ignore")]
64 handle: JoinHandle<T>,
65}
66
67/// A task handle which can be joined, but is cancelled on [`Drop`]
68#[derive(Derivative)]
69#[derivative(Debug(bound = ""))]
70pub struct Task<T: Send + 'static> {
71 // The task handle is an `Option` so we can `take()` out of it during `join` and `drop`. This
72 // will always be `Some` except during joining or cancellation.
73 inner: Option<TaskInner<T>>,
74}
75
76impl<T: Send + 'static> Task<T> {
77 /// Spawn a task, which will be cancelled when dropped.
78 ///
79 /// The caller should ensure that `future` yields back to the executor fairly frequently, to
80 /// ensure timely cancellation in case the task is dropped. If an operation in `future` may run
81 /// for a long time without blocking or yielding, consider using
82 /// [`yield_now`](async_std::task::yield_now) periodically, or using
83 /// [`spawn`] or [`spawn_blocking`](async_std::task::spawn_blocking) to run long operations in a
84 /// sub-task.
85 pub fn spawn<F>(name: impl Display, future: F) -> Self
86 where
87 F: Future<Output = T> + Send + 'static,
88 {
89 let name = name.to_string();
90 let handle = {
91 let span = info_span!("task", name);
92 spawn(
93 async move {
94 tracing::info!("spawning task");
95 let res = future.await;
96 tracing::info!("completed task");
97 res
98 }
99 .instrument(span),
100 )
101 };
102
103 Self {
104 inner: Some(TaskInner { name, handle }),
105 }
106 }
107
108 /// Wait for the task to complete and get its output.
109 pub async fn join(mut self) -> Result<T, JoinError> {
110 // We take here so that we will not attempt to cancel the joined task when this handle is
111 // dropped at the end of the function. We can unwrap here because `inner` is only `None`
112 // during `join` or `drop`. Since `join` consumes `self`, it is not possible that `join`
113 // already ran, and of course `self` has not been dropped yet.
114 let inner = self.inner.take().unwrap();
115 inner.handle.await
116 }
117}
118
119impl<T: Send + 'static> Drop for Task<T> {
120 fn drop(&mut self) {
121 if let Some(inner) = self.inner.take() {
122 tracing::info!(name = inner.name, "cancelling task");
123 inner.handle.abort();
124 tracing::info!(name = inner.name, "cancelled task");
125 }
126 }
127}