1use std::{
2 cmp::{min, Ordering},
3 fmt::{self, Debug, Display, Formatter},
4 num::ParseIntError,
5 str::FromStr,
6 time::Duration,
7};
8
9use anyhow::Context;
10use bytesize::ByteSize;
11use clap::Parser;
12use committable::Committable;
13use derive_more::{From, Into};
14use futures::future::BoxFuture;
15use hotshot_types::{
16 consensus::CommitmentMap,
17 data::{Leaf, Leaf2},
18 traits::node_implementation::NodeType,
19};
20use rand::Rng;
21use sequencer_utils::{impl_serde_from_string_or_integer, ser::FromStringOrInteger};
22use serde::{Deserialize, Serialize};
23use thiserror::Error;
24use time::{
25 format_description::well_known::Rfc3339 as TimestampFormat, macros::time, Date, OffsetDateTime,
26};
27use tokio::time::sleep;
28
29pub fn upgrade_commitment_map<Types: NodeType>(
30 map: CommitmentMap<Leaf<Types>>,
31) -> CommitmentMap<Leaf2<Types>> {
32 map.into_values()
33 .map(|leaf| {
34 let leaf2: Leaf2<Types> = leaf.into();
35 (leaf2.commit(), leaf2)
36 })
37 .collect()
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
41pub enum Update<T> {
42 #[default]
43 #[serde(rename = "__skip")]
44 Skip,
45 #[serde(untagged)]
46 Set(T),
47}
48
49impl<T> Update<T> {
50 pub fn map<U>(self, f: impl FnOnce(T) -> U) -> Update<U> {
51 match self {
52 Update::Skip => Update::Skip,
53 Update::Set(v) => Update::Set(f(v)),
54 }
55 }
56}
57
58#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
60pub struct GenesisHeader {
61 pub timestamp: Timestamp,
62}
63
64#[derive(Hash, Copy, Clone, Debug, derive_more::Display, PartialEq, Eq, From, Into)]
65#[display("{}", _0.format(&TimestampFormat).unwrap())]
66pub struct Timestamp(OffsetDateTime);
67
68impl_serde_from_string_or_integer!(Timestamp);
69
70impl Default for Timestamp {
71 fn default() -> Self {
72 Self::from_integer(0).unwrap()
73 }
74}
75
76impl Timestamp {
77 pub fn unix_timestamp(&self) -> u64 {
78 self.0.unix_timestamp() as u64
79 }
80
81 pub fn max() -> Self {
82 Self(OffsetDateTime::new_utc(Date::MAX, time!(23:59)))
83 }
84}
85
86impl FromStringOrInteger for Timestamp {
87 type Binary = u64;
88 type Integer = u64;
89
90 fn from_binary(b: Self::Binary) -> anyhow::Result<Self> {
91 Self::from_integer(b)
92 }
93
94 fn from_integer(i: Self::Integer) -> anyhow::Result<Self> {
95 let unix = i.try_into().context("timestamp out of range")?;
96 Ok(Self(
97 OffsetDateTime::from_unix_timestamp(unix).context("invalid timestamp")?,
98 ))
99 }
100
101 fn from_string(s: String) -> anyhow::Result<Self> {
102 Ok(Self(
103 OffsetDateTime::parse(&s, &TimestampFormat).context("invalid timestamp")?,
104 ))
105 }
106
107 fn to_binary(&self) -> anyhow::Result<Self::Binary> {
108 Ok(self.unix_timestamp())
109 }
110
111 fn to_string(&self) -> anyhow::Result<String> {
112 Ok(format!("{self}"))
113 }
114}
115
116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
117pub struct Ratio {
118 pub numerator: u64,
119 pub denominator: u64,
120}
121
122impl From<Ratio> for (u64, u64) {
123 fn from(r: Ratio) -> Self {
124 (r.numerator, r.denominator)
125 }
126}
127
128impl Display for Ratio {
129 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
130 write!(f, "{}:{}", self.numerator, self.denominator)
131 }
132}
133
134impl PartialOrd for Ratio {
135 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
136 Some(self.cmp(other))
137 }
138}
139
140impl Ord for Ratio {
141 fn cmp(&self, other: &Self) -> Ordering {
142 (self.numerator * other.denominator).cmp(&(other.numerator * self.denominator))
143 }
144}
145
146#[derive(Debug, Error)]
147pub enum ParseRatioError {
148 #[error("numerator and denominator must be separated by :")]
149 MissingDelimiter,
150 #[error("Invalid numerator {err:?}")]
151 InvalidNumerator { err: ParseIntError },
152 #[error("Invalid denominator {err:?}")]
153 InvalidDenominator { err: ParseIntError },
154}
155
156impl FromStr for Ratio {
157 type Err = ParseRatioError;
158
159 fn from_str(s: &str) -> Result<Self, Self::Err> {
160 let (num, den) = s.split_once(':').ok_or(ParseRatioError::MissingDelimiter)?;
161 Ok(Self {
162 numerator: num
163 .parse()
164 .map_err(|err| ParseRatioError::InvalidNumerator { err })?,
165 denominator: den
166 .parse()
167 .map_err(|err| ParseRatioError::InvalidDenominator { err })?,
168 })
169 }
170}
171
172#[derive(Clone, Debug, Error)]
173#[error("Failed to parse duration {reason}")]
174pub struct ParseDurationError {
175 reason: String,
176}
177
178pub fn parse_duration(s: &str) -> Result<Duration, ParseDurationError> {
179 cld::ClDuration::from_str(s)
180 .map(Duration::from)
181 .map_err(|err| ParseDurationError {
182 reason: err.to_string(),
183 })
184}
185
186#[derive(Clone, Debug, From, Error)]
187#[error("failed to parse ByteSize. {msg}")]
188pub struct ParseSizeError {
189 msg: String,
190}
191
192pub fn parse_size(s: &str) -> Result<u64, ParseSizeError> {
193 Ok(s.parse::<ByteSize>()?.0)
194}
195
196pub const MIN_RETRY_DELAY: Duration = Duration::from_millis(500);
197pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(5);
198pub const BACKOFF_FACTOR: u32 = 2;
199pub const BACKOFF_JITTER: (u64, u64) = (1, 10);
201
202#[derive(Clone, Copy, Debug, Parser, PartialEq, Eq, PartialOrd, Ord)]
203pub struct BackoffParams {
204 #[clap(
206 long = "catchup-backoff-factor",
207 env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_FACTOR",
208 default_value = "4"
209 )]
210 factor: u32,
211
212 #[clap(
214 long = "catchup-base-retry-delay",
215 env = "ESPRESSO_SEQUENCER_CATCHUP_BASE_RETRY_DELAY",
216 default_value = "20ms",
217 value_parser = parse_duration
218 )]
219 base: Duration,
220
221 #[clap(
223 long = "catchup-max-retry-delay",
224 env = "ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY",
225 default_value = "5s",
226 value_parser = parse_duration
227 )]
228 max: Duration,
229
230 #[clap(
232 long = "catchup-backoff-jitter",
233 env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_JITTER",
234 default_value = "1:10"
235 )]
236 jitter: Ratio,
237
238 #[clap(short, long, env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_DISABLE")]
240 disable: bool,
241}
242
243impl Default for BackoffParams {
244 fn default() -> Self {
245 Self::parse_from(std::iter::empty::<String>())
246 }
247}
248
249impl BackoffParams {
250 pub fn disabled() -> Self {
251 Self {
252 disable: true,
253 ..Default::default()
254 }
255 }
256
257 pub async fn retry<S, T>(
258 &self,
259 mut state: S,
260 f: impl for<'a> Fn(&'a mut S, usize) -> BoxFuture<'a, anyhow::Result<T>>,
261 ) -> anyhow::Result<T> {
262 let mut delay = self.base;
263 for i in 0.. {
264 match f(&mut state, i).await {
265 Ok(res) => return Ok(res),
266 Err(err) if self.disable => {
267 return Err(err.context("Retryable operation failed; retries disabled"));
268 },
269 Err(err) => {
270 tracing::warn!(
271 "Retryable operation failed, will retry after {delay:?}: {err:#}"
272 );
273 sleep(delay).await;
274 delay = self.backoff(delay);
275 },
276 }
277 }
278 unreachable!()
279 }
280
281 #[must_use]
282 pub fn backoff(&self, delay: Duration) -> Duration {
283 if delay >= self.max {
284 return self.max;
285 }
286
287 let mut rng = rand::thread_rng();
288
289 let ms = (delay * self.factor).as_millis() as u64;
291
292 let jitter_num = rng.gen_range(0..self.jitter.numerator);
294 let jitter_den = self.jitter.denominator;
295
296 let jitter = ms * jitter_num / jitter_den;
298 let delay = Duration::from_millis(ms + jitter);
299
300 min(delay, self.max)
302 }
303}