1use std::{
2 future::Future,
3 pin::Pin,
4 sync::{Arc, LazyLock},
5 task::{Context, Poll, Waker},
6};
7
8use parking_lot::Mutex;
9use tokio::time::{Duration, Instant, Sleep, sleep};
10
11static EPOCH: LazyLock<Instant> = LazyLock::new(Instant::now);
13
14#[derive(Debug, Copy, Clone)]
16pub struct Timestamp(u64);
17
18impl Timestamp {
19 pub fn now() -> Self {
20 Self(Instant::now().saturating_duration_since(*EPOCH).as_micros() as u64)
21 }
22
23 pub fn from_bytes(bytes: [u8; 8]) -> Self {
24 Self(u64::from_be_bytes(bytes))
25 }
26
27 pub fn to_bytes(self) -> [u8; 8] {
28 self.0.to_be_bytes()
29 }
30
31 pub fn try_from_slice(b: &[u8]) -> Option<Self> {
32 let bytes = b.try_into().ok()?;
33 Some(Self::from_bytes(bytes))
34 }
35
36 #[allow(unused)]
37 pub fn diff(self, other: Self) -> Option<Duration> {
38 self.0.checked_sub(other.0).map(Duration::from_micros)
39 }
40}
41
42#[derive(Debug, Clone)]
44pub struct Countdown {
45 inner: Arc<Mutex<Inner>>,
46}
47
48#[derive(Debug)]
49struct Inner {
50 sleep: Option<Pin<Box<Sleep>>>,
52
53 stopped: bool,
60
61 waker: Option<Waker>,
63}
64
65impl Default for Countdown {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl Countdown {
72 pub fn new() -> Self {
76 Self {
77 inner: Arc::new(Mutex::new(Inner {
78 sleep: None,
79 stopped: true,
80 waker: None,
81 })),
82 }
83 }
84
85 pub fn start(&self, timeout: Duration) {
90 let mut inner = self.inner.lock();
91 if !inner.stopped {
92 return;
94 }
95 inner.stopped = false;
96 if let Some(sleep) = &mut inner.sleep {
97 sleep.as_mut().reset(Instant::now() + timeout)
98 } else {
99 inner.sleep = Some(Box::pin(sleep(timeout)))
100 }
101 if let Some(w) = inner.waker.take() {
102 w.wake()
103 }
104 }
105
106 pub fn stop(&self) {
108 self.inner.lock().stopped = true
109 }
110}
111
112impl Future for Countdown {
113 type Output = ();
114
115 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
116 let mut inner = self.inner.lock();
117 if inner.stopped {
118 if let Some(w) = inner.waker.as_mut() {
119 w.clone_from(cx.waker())
121 } else {
122 inner.waker = Some(cx.waker().clone())
123 }
124 return Poll::Pending;
125 }
126 debug_assert!(inner.waker.is_none());
127 let sleep = inner.sleep.as_mut().expect("!stopped => sleep future");
128 sleep.as_mut().poll(cx)
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use tokio::time::{Duration, Instant, sleep, timeout};
135
136 use super::{Countdown, Timestamp};
137
138 #[tokio::test]
139 async fn duration() {
140 let d = Duration::from_millis(50);
141 let a = Timestamp::now();
142 sleep(d).await;
143 let b = Timestamp::now();
144 let x = b.diff(a).unwrap();
145 assert!(x - d < Duration::from_millis(5))
146 }
147
148 #[tokio::test]
149 async fn countdown() {
150 let mut c = Countdown::new();
151
152 let now = Instant::now();
153 c.start(Duration::from_secs(1));
154 (&mut c).await;
155 assert!(now.elapsed() >= Duration::from_secs(1));
156
157 let now = Instant::now();
159 (&mut c).await;
160 assert!(now.elapsed() < Duration::from_millis(1));
161
162 c.start(Duration::from_secs(1));
164 c.stop();
165 assert!(timeout(Duration::from_secs(2), &mut c).await.is_err());
166
167 c.start(Duration::from_secs(1));
169 let now = Instant::now();
170 (&mut c).await;
171 assert!(now.elapsed() >= Duration::from_secs(1));
172 }
173}