hotshot_query_service/data_source/notifier.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Multi-producer, multi-consumer oneshot notification channel
14//!
15//! [`Notifier`] is an asynchronous, multi-producer, multi-consumer, oneshot channel satisfying
16//! three additional requirements:
17//! 1. Dropped receivers do not consume resources.
18//! 2. Messages are not copied for receivers who don't want them.
19//! 3. Minimal resource contention for concurrent subscriptions.
20//!
21//! ## Dropped receivers do not consume resources
22//!
23//! This requirement is a direct prerequisite of the broader requirement that passive requests for
24//! resources do not consume resources. This is important because in general, passive requests may
25//! be for resources that are not guaranteed to exist, and thus may never terminate. Just like we
26//! avoid spawning a task for passive requests, since it may never complete, we need receivers for
27//! passive requests not to persist beyond the lifetime of the request, or they may never be closed.
28//!
29//! This requirement is implemented via garbage collection: each time a message is sent, resources
30//! belonging to dropped receivers are cleaned up. Thus, strictly speaking, dropped receivers do
31//! consume resources, but only briefly. There is no need to keep them around until the desired
32//! message is delivered, for example.
33//!
34//! ## Messages are not copied for receivers who don't want them.
35//!
36//! The second requirement simplifies the higher level fetching logic by allowing us to maintain a
37//! single channel for all notifications about a particular resource type, rather than separate
38//! channels for each specific request. Since messages are not copied for all subscribers, but only
39//! for the subscribers interested in a particular message, this simplification becomes nearly
40//! cost-free.
41//!
42//! This requirement is implemented by attaching a predicate to each subscription, which takes a
43//! message by reference. The predicate is checked on the sending side, and the message is only
44//! copied to the subscription if the predicate is satisfied.
45//!
46//! ## Minimal resource contention for concurrent subscriptions.
47//!
48//! This is important because subscriptions are requested in response to read-only client requests,
49//! which are supposed to run in parallel as much as possible. By contrast, notifications are
50//! usually send from internal server tasks (e.g. the background task that updates the data source
51//! when new blocks are committed). It is less of a problem if these internal tasks contend with
52//! each other, because they are not directly blocking responses to clients, and we have more
53//! control over how and when they acquire shared resources.
54//!
55//! This requirement also empowers us to create a simpler design for the high-level fetching logic.
56//! Specifically, we can reuse the same code for fetching individual resources as we use for
57//! long-lived subscription streams (e.g. `subscribe_blocks` is a thin wrapper around
58//! `get_block_range`). We do not have to worry about adding complex logic to reuse notification
59//! subscriptions for long-lived streams, because subscribing anew for each entry in the stream has
60//! low overhead in terms of contention over shared resources -- the dominant caused in any
61//! concurrent channel, after data copying (see above).
62//!
63//! This further lets us simplify the interface of this channel a bit: since all notifications are
64//! oneshot, consumers deal with futures rather than streams.
65//!
66//! This requirement is satisfied by maintaining the list of subscribers to a [`Notifier`] in a way
67//! that moves most resource contention to message senders, rather than receivers. We make the
68//! assumption that there is less concurrency among senders. In the common case, there is just one
69//! sender: the task monitoring HotShot for new blocks. Occasionally, there may be other tasks
70//! spawned to fetch missing resources and send them through the [`Notifier`], but these should be
71//! relatively few and rare.
72
73use std::{
74 future::IntoFuture,
75 sync::{
76 atomic::{AtomicBool, Ordering},
77 Arc,
78 },
79};
80
81use async_lock::Mutex;
82use derivative::Derivative;
83use futures::future::{BoxFuture, FutureExt};
84use tokio::sync::{
85 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
86 oneshot,
87};
88use tracing::warn;
89
90/// A predicate on a type `<T>`.
91///
92/// [`Predicate`] is an alias for any type implementing `Fn(&T) -> bool` (with a few extra bounds
93/// to support concurrency). It is used by [`Notifier`] to represent the preferences of subscribers
94/// when filtering messages of type `T`.
95pub trait Predicate<T>: 'static + Send + Sync + Fn(&T) -> bool {}
96impl<F, T> Predicate<T> for F where F: 'static + Send + Sync + Fn(&T) -> bool {}
97
98#[derive(Derivative)]
99#[derivative(Debug)]
100struct Subscriber<T> {
101 #[derivative(Debug = "ignore")]
102 predicate: Box<dyn Predicate<T>>,
103 #[derivative(Debug = "ignore")]
104 sender: Option<oneshot::Sender<T>>,
105 closed: Arc<AtomicBool>,
106}
107
108impl<T> Subscriber<T> {
109 fn is_closed(&self) -> bool {
110 // A subscriber can be closed because it has already been notified, which `take`s the
111 // oneshot sender.
112 self.sender.is_none() ||
113 // Or because it was explicitly closed by its receiver (e.g. the receiver was dropped)
114 self.closed.load(Ordering::Relaxed)
115 }
116}
117
118impl<T: Clone> Subscriber<T> {
119 fn notify(&mut self, msg: &T) {
120 // First, check if the subscriber has been closed. If it has, we can skip it and save the
121 // work of evaluating the predicate.
122 if self.is_closed() {
123 return;
124 }
125 // At this point, it is likely, but not guaranteed, that the subscriber is not closed.
126 // It may have been closed the instant after the check above. However, it is harmless to
127 // evaluate the predicate now; at worst we waste a bit of computation.
128 if !(self.predicate)(msg) {
129 return;
130 }
131 // Now we are committed to sending the message to this subscriber if possible. We can take
132 // the sender. We need to check for closed again in case the subscriber was closed since the
133 // previous check.
134 if let Some(sender) = self.sender.take() {
135 if sender.send(msg.clone()).is_err() {
136 // This is equivalent to the previous behavior in `async-compatibility-layer`
137 warn!("Failed to send notification: channel closed");
138 }
139 }
140 }
141}
142
143/// Multi-producer, multi-consumer oneshot notification channel
144#[derive(Derivative)]
145#[derivative(Debug)]
146pub struct Notifier<T> {
147 // Active subscribers.
148 active: Mutex<Vec<Subscriber<T>>>,
149 // Pending subscribers.
150 //
151 // When a new subscriber joins the subscriber set, they do not immediately add themselves to
152 // `active`. Instead, they simply send themselves to this channel. Every time a message is sent,
153 // it will drain pending subscribers from here and add them to `active`. In this way, almost all
154 // of the cost is paid by senders, rather than receivers, and receivers do not contend with
155 // senders. We adopt this design for
156 // two reasons:
157 // 1. Most basically, more messages are received than sent, since the intended use of this
158 // channel is broadcast, with each message being delivered to multiple receivers. Thus,
159 // moving cost from receivers to senders is always good.
160 // 2. This channel is intended to be highly concurrent, and `active` is a shared resource that
161 // is ripe for contention. With this design, _only_ sends contend for a lock on the
162 // subscribers list, but new subscriptions do not. Again, since there are often more
163 // receivers than senders, this can reduce contention significantly.
164 // Consider the example use case of `Fetcher::fetching_stream`. This design of the notification
165 // channel lets us go with a very simple design of fetching stream, where we fetch each
166 // subsequent entry individually, with a one-shot notification future. This pattern leads to
167 // many concurrent subscription requests: each time a new entry is produced, every open stream
168 // will subscribe anew to notifications for the next entry, at the same time. However, these
169 // concurrent subscriptions go through the high-throughput multi-producer stream implementation,
170 // and do not contend for the lock on `FilterSender::subscribers`.
171 #[derivative(Debug = "ignore")]
172 pending: Mutex<UnboundedReceiver<Subscriber<T>>>,
173 #[derivative(Debug = "ignore")]
174 subscribe: UnboundedSender<Subscriber<T>>,
175}
176
177impl<T> Notifier<T> {
178 pub fn new() -> Self {
179 let (subscribe, pending) = unbounded_channel();
180 Self {
181 active: Default::default(),
182 pending: Mutex::new(pending),
183 subscribe,
184 }
185 }
186}
187
188impl<T: Clone> Notifier<T> {
189 /// Notify all subscribers whose predicate is satisfied by `msg`.
190 pub async fn notify(&self, msg: &T) {
191 let mut active = self.active.lock().await;
192
193 // Try sending the message to each active subscriber.
194 for subscriber in &mut *active {
195 subscriber.notify(msg);
196 }
197
198 // Some subscribers may be closed, either because the receiver was dropped or because we
199 // just sent it its message. Remove these from the `active` list.
200 active.retain(|subscriber| !subscriber.is_closed());
201
202 // Promote pending subscribers to active and send them the message.
203 // There is no contention here since we only have one receiver. This is what
204 // `async-compatibility-layer` did internally.
205 let mut pending_guard = self.pending.lock().await;
206 while let Ok(mut subscriber) = pending_guard.try_recv() {
207 subscriber.notify(msg);
208 if !subscriber.is_closed() {
209 // If that message didn't satisfy the subscriber, or it was dropped, at it to the
210 // active list so it will get future messages.
211 active.push(subscriber);
212 }
213 }
214 drop(pending_guard);
215 }
216}
217
218impl<T> Notifier<T> {
219 /// Wait for a message satisfying `predicate`.
220 pub async fn wait_for(&self, predicate: impl Predicate<T>) -> WaitFor<T> {
221 // Create a oneshot channel for receiving the notification.
222 let (sender, receiver) = oneshot::channel();
223 let sender = Some(sender);
224 let closed = Arc::new(AtomicBool::new(false));
225
226 // Create a handle which will close the subscription when dropped.
227 let handle = ReceiveHandle {
228 closed: closed.clone(),
229 };
230
231 // Create a subscriber with our predicate and the oneshot channel.
232 let subscriber = Subscriber {
233 predicate: Box::new(predicate),
234 sender,
235 closed,
236 };
237
238 // Add the subscriber to the channel and return it. We can ignore errors here: `send` only
239 // fails when the receive end of the channel has been dropped, which means the notifier has
240 // been dropped, and thus the send end of the oneshot handle has been dropped. The caller
241 // will discover this when they try to await a notification and get [`None`].
242 let _ = self.subscribe.send(subscriber);
243 WaitFor { handle, receiver }
244 }
245}
246
247/// A handle that closes a subscriber when dropped.
248struct ReceiveHandle {
249 closed: Arc<AtomicBool>,
250}
251
252impl Drop for ReceiveHandle {
253 fn drop(&mut self) {
254 self.closed.store(true, Ordering::Relaxed);
255 }
256}
257
258/// A pending request for notification.
259///
260/// This object can be `await`ed to block until the requested notification arrives. The result is an
261/// `Option<T>`, which is [`Some`] except in the case that the [`Notifier`] was dropped without ever
262/// sending a satisfying message.
263///
264/// If [`WaitFor`] is dropped before a notification is delivered, it will automatically clean up its
265/// resources in the [`Notifier`].
266pub struct WaitFor<T> {
267 handle: ReceiveHandle,
268 receiver: oneshot::Receiver<T>,
269}
270
271impl<T> IntoFuture for WaitFor<T>
272where
273 T: Send + 'static,
274{
275 type Output = Option<T>;
276 type IntoFuture = BoxFuture<'static, Self::Output>;
277
278 fn into_future(self) -> Self::IntoFuture {
279 async move {
280 let res = self.receiver.await.ok();
281
282 // Explicitly drop `handle` _after_ we're done with `receiver`. If the compiler decides
283 // that it can drop `handle` earlier, we might never get a notification.
284 drop(self.handle);
285
286 res
287 }
288 .boxed()
289 }
290}
291
292#[cfg(test)]
293mod test {
294 use std::time::Duration;
295
296 use tokio::time::timeout;
297
298 use super::*;
299 use crate::testing::setup_test;
300
301 #[tokio::test(flavor = "multi_thread")]
302 async fn test_notify_drop() {
303 setup_test();
304 let n = Notifier::new();
305
306 // Create two subscribers with different predicates.
307 let wait_for_zero = n.wait_for(|i| *i == 0).await;
308 let wait_for_one = n.wait_for(|i| *i == 1).await;
309
310 // Send a message which satisfies only one of the subscribers.
311 n.notify(&0).await;
312 assert_eq!(wait_for_zero.await.unwrap(), 0);
313
314 // Check that the other subscriber was not notified.
315 timeout(Duration::from_secs(1), wait_for_one.into_future())
316 .await
317 .unwrap_err();
318
319 // Check subscribers. The first subscriber should have been cleaned up when it was notified.
320 // The second should have been closed when it was dropped without completing, but not yet
321 // garbage collected.
322 let active = n.active.lock().await;
323 assert_eq!(active.len(), 1);
324 assert!(active[0].is_closed());
325 }
326
327 #[tokio::test(flavor = "multi_thread")]
328 async fn test_notify_active() {
329 setup_test();
330 let n = Notifier::new();
331
332 // Create two subscribers.
333 let s1 = n.wait_for(|i| *i == 1).await;
334 let s2 = n.wait_for(|i| *i == 1).await;
335
336 // Send a message that doesn't notify either subscriber, but just promotes them from pending
337 // to active.
338 n.notify(&0).await;
339 // Check active subscribers.
340 {
341 let active = n.active.lock().await;
342 assert_eq!(active.len(), 2);
343 assert!(!active[0].is_closed());
344 assert!(!active[1].is_closed());
345 }
346
347 // Drop one of the subscribers, then send another non-satisfying message. This should cause
348 // the dropped subscriber to get garbage collected.
349 drop(s2);
350 n.notify(&0).await;
351 {
352 let active = n.active.lock().await;
353 assert_eq!(active.len(), 1);
354 assert!(!active[0].is_closed());
355 }
356
357 // Satisfy the final subscriber.
358 n.notify(&1).await;
359 assert_eq!(s1.await.unwrap(), 1);
360 }
361
362 #[tokio::test(flavor = "multi_thread")]
363 async fn test_pending_dropped() {
364 setup_test();
365 let n = Notifier::new();
366
367 // Create and immediately drop a pending subscriber.
368 drop(n.wait_for(|_| false).await);
369
370 // Check that the subscriber gets garbage collected on the next notification.
371 n.notify(&0).await;
372 assert_eq!(n.active.lock().await.len(), 0);
373 }
374
375 #[tokio::test(flavor = "multi_thread")]
376 async fn test_notifier_dropped() {
377 setup_test();
378
379 let n = Notifier::new();
380
381 // Create an active subscriber.
382 let fut1 = n.wait_for(|_| false).await;
383 n.notify(&0).await;
384
385 // Create a pending subscriber.
386 let fut2 = n.wait_for(|_| false).await;
387
388 // Drop the notifier while both kinds of subscribers are blocked.
389 drop(n);
390 assert_eq!(fut1.await, None);
391 assert_eq!(fut2.await, None);
392 }
393}