cliquenet/
chan.rs

1//! A channel implementation that keeps only a single copy of an item, as
2//! identified by its Id.
3
4use std::{collections::VecDeque, sync::Arc};
5
6use nohash_hasher::IntSet;
7use parking_lot::Mutex;
8use tokio::sync::Notify;
9
10use crate::Id;
11
12#[derive(Debug)]
13pub struct Sender<T>(Arc<Chan<T>>);
14
15#[derive(Debug)]
16pub struct Receiver<T>(Arc<Chan<T>>);
17
18#[derive(Debug)]
19struct Chan<T> {
20    /// Channel capacity.
21    cap: usize,
22    /// Notifier for receivers that are waiting for items.
23    sig: Notify,
24    /// The items currently in flight.
25    buf: Mutex<Buf<T>>,
26}
27
28#[derive(Debug)]
29struct Buf<T> {
30    /// Ordered queue of items.
31    xs: VecDeque<(Option<Id>, T)>,
32    /// The set of Ids in the queue.
33    ids: IntSet<Id>,
34}
35
36impl<T> Clone for Sender<T> {
37    fn clone(&self) -> Self {
38        Self(self.0.clone())
39    }
40}
41
42pub fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
43    let chan = Arc::new(Chan {
44        cap,
45        sig: Notify::new(),
46        buf: Mutex::new(Buf {
47            xs: VecDeque::new(),
48            ids: IntSet::default(),
49        }),
50    });
51    (Sender(chan.clone()), Receiver(chan))
52}
53
54impl<T> Sender<T> {
55    pub fn send(&self, id: Option<Id>, val: T) {
56        if let Some(id) = id {
57            let mut buf = self.0.buf.lock();
58            if buf.ids.contains(&id) {
59                return;
60            }
61            if buf.xs.len() == self.0.cap
62                && let Some((Some(id), _)) = buf.xs.pop_front()
63            {
64                buf.ids.remove(&id);
65            }
66            buf.xs.push_back((Some(id), val));
67            buf.ids.insert(id);
68        } else {
69            let mut buf = self.0.buf.lock();
70            if buf.xs.len() == self.0.cap
71                && let Some((Some(id), _)) = buf.xs.pop_front()
72            {
73                buf.ids.remove(&id);
74            }
75            buf.xs.push_back((None, val));
76        }
77        self.0.sig.notify_waiters();
78    }
79
80    pub fn capacity(&self) -> usize {
81        self.0.cap
82    }
83}
84
85impl<T> Receiver<T> {
86    pub async fn recv(&self) -> Option<T> {
87        loop {
88            let future = self.0.sig.notified();
89            {
90                let mut buf = self.0.buf.lock();
91                if let Some((id, val)) = buf.xs.pop_front() {
92                    if let Some(id) = id {
93                        buf.ids.remove(&id);
94                    }
95                    return Some(val);
96                }
97            }
98            future.await;
99        }
100    }
101}