1use std::{
2 cmp::{min, Ordering},
3 fmt::{self, Debug, Display, Formatter},
4 num::ParseIntError,
5 str::FromStr,
6 time::Duration,
7};
8
9use anyhow::Context;
10use bytesize::ByteSize;
11use clap::Parser;
12use committable::Committable;
13use derive_more::{From, Into};
14use futures::future::BoxFuture;
15use hotshot_types::{
16 consensus::CommitmentMap,
17 data::{Leaf, Leaf2},
18 traits::node_implementation::NodeType,
19};
20use rand::Rng;
21use sequencer_utils::{impl_serde_from_string_or_integer, ser::FromStringOrInteger};
22use serde::{Deserialize, Serialize};
23use thiserror::Error;
24use time::{
25 format_description::well_known::Rfc3339 as TimestampFormat, macros::time, Date, OffsetDateTime,
26};
27use tokio::time::sleep;
28
29pub fn upgrade_commitment_map<Types: NodeType>(
30 map: CommitmentMap<Leaf<Types>>,
31) -> CommitmentMap<Leaf2<Types>> {
32 map.into_values()
33 .map(|leaf| {
34 let leaf2: Leaf2<Types> = leaf.into();
35 (leaf2.commit(), leaf2)
36 })
37 .collect()
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
41pub enum Update<T> {
42 #[default]
43 #[serde(rename = "__skip")]
44 Skip,
45 #[serde(untagged)]
46 Set(T),
47}
48
49impl<T> Update<T> {
50 pub fn map<U>(self, f: impl FnOnce(T) -> U) -> Update<U> {
51 match self {
52 Update::Skip => Update::Skip,
53 Update::Set(v) => Update::Set(f(v)),
54 }
55 }
56}
57
58#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
60pub struct GenesisHeader {
61 pub timestamp: Timestamp,
62}
63
64#[derive(Hash, Copy, Clone, Debug, derive_more::Display, PartialEq, Eq, From, Into)]
65#[display("{}", _0.format(&TimestampFormat).unwrap())]
66pub struct Timestamp(OffsetDateTime);
67
68impl_serde_from_string_or_integer!(Timestamp);
69
70impl Default for Timestamp {
71 fn default() -> Self {
72 Self::from_integer(0).unwrap()
73 }
74}
75
76impl Timestamp {
77 pub fn unix_timestamp(&self) -> u64 {
78 self.0.unix_timestamp() as u64
79 }
80
81 pub fn unix_timestamp_millis(&self) -> u64 {
82 (self.0.unix_timestamp_nanos() / 1_000_000) as u64
83 }
84
85 pub fn max() -> Self {
86 Self(OffsetDateTime::new_utc(Date::MAX, time!(23:59)))
87 }
88}
89
90impl FromStringOrInteger for Timestamp {
91 type Binary = u64;
92 type Integer = u64;
93
94 fn from_binary(b: Self::Binary) -> anyhow::Result<Self> {
95 Self::from_integer(b)
96 }
97
98 fn from_integer(i: Self::Integer) -> anyhow::Result<Self> {
99 let unix = i.try_into().context("timestamp out of range")?;
100 Ok(Self(
101 OffsetDateTime::from_unix_timestamp(unix).context("invalid timestamp")?,
102 ))
103 }
104
105 fn from_string(s: String) -> anyhow::Result<Self> {
106 Ok(Self(
107 OffsetDateTime::parse(&s, &TimestampFormat).context("invalid timestamp")?,
108 ))
109 }
110
111 fn to_binary(&self) -> anyhow::Result<Self::Binary> {
112 Ok(self.unix_timestamp())
113 }
114
115 fn to_string(&self) -> anyhow::Result<String> {
116 Ok(format!("{self}"))
117 }
118}
119
120#[derive(
121 Hash,
122 Copy,
123 Clone,
124 Debug,
125 derive_more::Display,
126 PartialEq,
127 Eq,
128 PartialOrd,
129 Ord,
130 Serialize,
131 Deserialize,
132)]
133#[display("{}", _0)]
134pub struct TimestampMillis(u64);
135
136impl TimestampMillis {
137 pub fn from_time(time: &OffsetDateTime) -> Self {
138 let timestamp = (time.unix_timestamp_nanos() / 1_000_000) as u64;
139
140 Self(timestamp)
141 }
142
143 pub fn from_millis(millis: u64) -> Self {
144 Self(millis)
145 }
146
147 pub fn u64(&self) -> u64 {
148 self.0
149 }
150}
151
152#[derive(Clone, Copy, Debug, PartialEq, Eq)]
153pub struct Ratio {
154 pub numerator: u64,
155 pub denominator: u64,
156}
157
158impl From<Ratio> for (u64, u64) {
159 fn from(r: Ratio) -> Self {
160 (r.numerator, r.denominator)
161 }
162}
163
164impl Display for Ratio {
165 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
166 write!(f, "{}:{}", self.numerator, self.denominator)
167 }
168}
169
170impl PartialOrd for Ratio {
171 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
172 Some(self.cmp(other))
173 }
174}
175
176impl Ord for Ratio {
177 fn cmp(&self, other: &Self) -> Ordering {
178 (self.numerator * other.denominator).cmp(&(other.numerator * self.denominator))
179 }
180}
181
182#[derive(Debug, Error)]
183pub enum ParseRatioError {
184 #[error("numerator and denominator must be separated by :")]
185 MissingDelimiter,
186 #[error("Invalid numerator {err:?}")]
187 InvalidNumerator { err: ParseIntError },
188 #[error("Invalid denominator {err:?}")]
189 InvalidDenominator { err: ParseIntError },
190}
191
192impl FromStr for Ratio {
193 type Err = ParseRatioError;
194
195 fn from_str(s: &str) -> Result<Self, Self::Err> {
196 let (num, den) = s.split_once(':').ok_or(ParseRatioError::MissingDelimiter)?;
197 Ok(Self {
198 numerator: num
199 .parse()
200 .map_err(|err| ParseRatioError::InvalidNumerator { err })?,
201 denominator: den
202 .parse()
203 .map_err(|err| ParseRatioError::InvalidDenominator { err })?,
204 })
205 }
206}
207
208#[derive(Clone, Debug, Error)]
209#[error("Failed to parse duration {reason}")]
210pub struct ParseDurationError {
211 reason: String,
212}
213
214pub fn parse_duration(s: &str) -> Result<Duration, ParseDurationError> {
215 cld::ClDuration::from_str(s)
216 .map(Duration::from)
217 .map_err(|err| ParseDurationError {
218 reason: err.to_string(),
219 })
220}
221
222#[derive(Clone, Debug, From, Error)]
223#[error("failed to parse ByteSize. {msg}")]
224pub struct ParseSizeError {
225 msg: String,
226}
227
228pub fn parse_size(s: &str) -> Result<u64, ParseSizeError> {
229 Ok(s.parse::<ByteSize>()?.0)
230}
231
232pub const MIN_RETRY_DELAY: Duration = Duration::from_millis(500);
233pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(5);
234pub const BACKOFF_FACTOR: u32 = 2;
235pub const BACKOFF_JITTER: (u64, u64) = (1, 10);
237
238#[derive(Clone, Copy, Debug, Parser, PartialEq, Eq, PartialOrd, Ord)]
239pub struct BackoffParams {
240 #[clap(
242 long = "catchup-backoff-factor",
243 env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_FACTOR",
244 default_value = "4"
245 )]
246 factor: u32,
247
248 #[clap(
250 long = "catchup-base-retry-delay",
251 env = "ESPRESSO_SEQUENCER_CATCHUP_BASE_RETRY_DELAY",
252 default_value = "200ms",
253 value_parser = parse_duration
254 )]
255 base: Duration,
256
257 #[clap(
259 long = "catchup-max-retry-delay",
260 env = "ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY",
261 default_value = "5s",
262 value_parser = parse_duration
263 )]
264 max: Duration,
265
266 #[clap(
268 long = "catchup-backoff-jitter",
269 env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_JITTER",
270 default_value = "1:10"
271 )]
272 jitter: Ratio,
273
274 #[clap(short, long, env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_DISABLE")]
276 disable: bool,
277}
278
279impl Default for BackoffParams {
280 fn default() -> Self {
281 Self::parse_from(std::iter::empty::<String>())
282 }
283}
284
285impl BackoffParams {
286 pub fn disabled() -> Self {
287 Self {
288 disable: true,
289 ..Default::default()
290 }
291 }
292
293 pub async fn retry<S, T>(
294 &self,
295 mut state: S,
296 f: impl for<'a> Fn(&'a mut S, usize) -> BoxFuture<'a, anyhow::Result<T>>,
297 ) -> anyhow::Result<T> {
298 let mut delay = self.base;
299 for i in 0.. {
300 match f(&mut state, i).await {
301 Ok(res) => return Ok(res),
302 Err(err) if self.disable => {
303 return Err(err.context("Retryable operation failed; retries disabled"));
304 },
305 Err(err) => {
306 tracing::warn!(
307 "Retryable operation failed, will retry after {delay:?}: {err:#}"
308 );
309 sleep(delay).await;
310 delay = self.backoff(delay);
311 },
312 }
313 }
314 unreachable!()
315 }
316
317 #[must_use]
318 pub fn backoff(&self, delay: Duration) -> Duration {
319 if delay >= self.max {
320 return self.max;
321 }
322
323 let mut rng = rand::thread_rng();
324
325 let ms = (delay * self.factor).as_millis() as u64;
327
328 let jitter_num = rng.gen_range(0..self.jitter.numerator);
330 let jitter_den = self.jitter.denominator;
331
332 let jitter = ms * jitter_num / jitter_den;
334 let delay = Duration::from_millis(ms + jitter);
335
336 min(delay, self.max)
338 }
339}