1use std::{collections::VecDeque, sync::Arc};
5
6use nohash_hasher::IntSet;
7use parking_lot::Mutex;
8use tokio::sync::Notify;
9
10use crate::Id;
11
12#[derive(Debug)]
13pub struct Sender<T>(Arc<Chan<T>>);
14
15#[derive(Debug)]
16pub struct Receiver<T>(Arc<Chan<T>>);
17
18#[derive(Debug)]
19struct Chan<T> {
20 cap: usize,
22 sig: Notify,
24 buf: Mutex<Buf<T>>,
26}
27
28#[derive(Debug)]
29struct Buf<T> {
30 xs: VecDeque<(Option<Id>, T)>,
32 ids: IntSet<Id>,
34}
35
36impl<T> Clone for Sender<T> {
37 fn clone(&self) -> Self {
38 Self(self.0.clone())
39 }
40}
41
42pub fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
43 let chan = Arc::new(Chan {
44 cap,
45 sig: Notify::new(),
46 buf: Mutex::new(Buf {
47 xs: VecDeque::new(),
48 ids: IntSet::default(),
49 }),
50 });
51 (Sender(chan.clone()), Receiver(chan))
52}
53
54impl<T> Sender<T> {
55 pub fn send(&self, id: Option<Id>, val: T) {
56 if let Some(id) = id {
57 let mut buf = self.0.buf.lock();
58 if buf.ids.contains(&id) {
59 return;
60 }
61 if buf.xs.len() == self.0.cap
62 && let Some((Some(id), _)) = buf.xs.pop_front()
63 {
64 buf.ids.remove(&id);
65 }
66 buf.xs.push_back((Some(id), val));
67 buf.ids.insert(id);
68 } else {
69 let mut buf = self.0.buf.lock();
70 if buf.xs.len() == self.0.cap
71 && let Some((Some(id), _)) = buf.xs.pop_front()
72 {
73 buf.ids.remove(&id);
74 }
75 buf.xs.push_back((None, val));
76 }
77 self.0.sig.notify_waiters();
78 }
79
80 pub fn capacity(&self) -> usize {
81 self.0.cap
82 }
83}
84
85impl<T> Receiver<T> {
86 pub async fn recv(&self) -> Option<T> {
87 loop {
88 let future = self.0.sig.notified();
89 {
90 let mut buf = self.0.buf.lock();
91 if let Some((id, val)) = buf.xs.pop_front() {
92 if let Some(id) = id {
93 buf.ids.remove(&id);
94 }
95 return Some(val);
96 }
97 }
98 future.await;
99 }
100 }
101}