espresso_types/v0/
utils.rs

1use std::{
2    cmp::{min, Ordering},
3    fmt::{self, Debug, Display, Formatter},
4    num::ParseIntError,
5    str::FromStr,
6    time::Duration,
7};
8
9use anyhow::Context;
10use bytesize::ByteSize;
11use clap::Parser;
12use committable::Committable;
13use derive_more::{From, Into};
14use futures::future::BoxFuture;
15use hotshot_types::{
16    consensus::CommitmentMap,
17    data::{Leaf, Leaf2},
18    traits::node_implementation::NodeType,
19};
20use rand::Rng;
21use sequencer_utils::{impl_serde_from_string_or_integer, ser::FromStringOrInteger};
22use serde::{Deserialize, Serialize};
23use thiserror::Error;
24use time::{
25    format_description::well_known::Rfc3339 as TimestampFormat, macros::time, Date, OffsetDateTime,
26};
27use tokio::time::sleep;
28
29pub fn upgrade_commitment_map<Types: NodeType>(
30    map: CommitmentMap<Leaf<Types>>,
31) -> CommitmentMap<Leaf2<Types>> {
32    map.into_values()
33        .map(|leaf| {
34            let leaf2: Leaf2<Types> = leaf.into();
35            (leaf2.commit(), leaf2)
36        })
37        .collect()
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
41pub enum Update<T> {
42    #[default]
43    #[serde(rename = "__skip")]
44    Skip,
45    #[serde(untagged)]
46    Set(T),
47}
48
49impl<T> Update<T> {
50    pub fn map<U>(self, f: impl FnOnce(T) -> U) -> Update<U> {
51        match self {
52            Update::Skip => Update::Skip,
53            Update::Set(v) => Update::Set(f(v)),
54        }
55    }
56}
57
58/// Information about the genesis state which feeds into the genesis block header.
59#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
60pub struct GenesisHeader {
61    pub timestamp: Timestamp,
62}
63
64#[derive(Hash, Copy, Clone, Debug, derive_more::Display, PartialEq, Eq, From, Into)]
65#[display("{}", _0.format(&TimestampFormat).unwrap())]
66pub struct Timestamp(OffsetDateTime);
67
68impl_serde_from_string_or_integer!(Timestamp);
69
70impl Default for Timestamp {
71    fn default() -> Self {
72        Self::from_integer(0).unwrap()
73    }
74}
75
76impl Timestamp {
77    pub fn unix_timestamp(&self) -> u64 {
78        self.0.unix_timestamp() as u64
79    }
80
81    pub fn unix_timestamp_millis(&self) -> u64 {
82        (self.0.unix_timestamp_nanos() / 1_000_000) as u64
83    }
84
85    pub fn max() -> Self {
86        Self(OffsetDateTime::new_utc(Date::MAX, time!(23:59)))
87    }
88}
89
90impl FromStringOrInteger for Timestamp {
91    type Binary = u64;
92    type Integer = u64;
93
94    fn from_binary(b: Self::Binary) -> anyhow::Result<Self> {
95        Self::from_integer(b)
96    }
97
98    fn from_integer(i: Self::Integer) -> anyhow::Result<Self> {
99        let unix = i.try_into().context("timestamp out of range")?;
100        Ok(Self(
101            OffsetDateTime::from_unix_timestamp(unix).context("invalid timestamp")?,
102        ))
103    }
104
105    fn from_string(s: String) -> anyhow::Result<Self> {
106        Ok(Self(
107            OffsetDateTime::parse(&s, &TimestampFormat).context("invalid timestamp")?,
108        ))
109    }
110
111    fn to_binary(&self) -> anyhow::Result<Self::Binary> {
112        Ok(self.unix_timestamp())
113    }
114
115    fn to_string(&self) -> anyhow::Result<String> {
116        Ok(format!("{self}"))
117    }
118}
119
120#[derive(
121    Hash,
122    Copy,
123    Clone,
124    Debug,
125    derive_more::Display,
126    PartialEq,
127    Eq,
128    PartialOrd,
129    Ord,
130    Serialize,
131    Deserialize,
132)]
133#[display("{}", _0)]
134pub struct TimestampMillis(u64);
135
136impl TimestampMillis {
137    pub fn from_time(time: &OffsetDateTime) -> Self {
138        let timestamp = (time.unix_timestamp_nanos() / 1_000_000) as u64;
139
140        Self(timestamp)
141    }
142
143    pub fn from_millis(millis: u64) -> Self {
144        Self(millis)
145    }
146
147    pub fn u64(&self) -> u64 {
148        self.0
149    }
150}
151
152#[derive(Clone, Copy, Debug, PartialEq, Eq)]
153pub struct Ratio {
154    pub numerator: u64,
155    pub denominator: u64,
156}
157
158impl From<Ratio> for (u64, u64) {
159    fn from(r: Ratio) -> Self {
160        (r.numerator, r.denominator)
161    }
162}
163
164impl Display for Ratio {
165    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
166        write!(f, "{}:{}", self.numerator, self.denominator)
167    }
168}
169
170impl PartialOrd for Ratio {
171    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
172        Some(self.cmp(other))
173    }
174}
175
176impl Ord for Ratio {
177    fn cmp(&self, other: &Self) -> Ordering {
178        (self.numerator * other.denominator).cmp(&(other.numerator * self.denominator))
179    }
180}
181
182#[derive(Debug, Error)]
183pub enum ParseRatioError {
184    #[error("numerator and denominator must be separated by :")]
185    MissingDelimiter,
186    #[error("Invalid numerator {err:?}")]
187    InvalidNumerator { err: ParseIntError },
188    #[error("Invalid denominator {err:?}")]
189    InvalidDenominator { err: ParseIntError },
190}
191
192impl FromStr for Ratio {
193    type Err = ParseRatioError;
194
195    fn from_str(s: &str) -> Result<Self, Self::Err> {
196        let (num, den) = s.split_once(':').ok_or(ParseRatioError::MissingDelimiter)?;
197        Ok(Self {
198            numerator: num
199                .parse()
200                .map_err(|err| ParseRatioError::InvalidNumerator { err })?,
201            denominator: den
202                .parse()
203                .map_err(|err| ParseRatioError::InvalidDenominator { err })?,
204        })
205    }
206}
207
208#[derive(Clone, Debug, Error)]
209#[error("Failed to parse duration {reason}")]
210pub struct ParseDurationError {
211    reason: String,
212}
213
214pub fn parse_duration(s: &str) -> Result<Duration, ParseDurationError> {
215    cld::ClDuration::from_str(s)
216        .map(Duration::from)
217        .map_err(|err| ParseDurationError {
218            reason: err.to_string(),
219        })
220}
221
222#[derive(Clone, Debug, From, Error)]
223#[error("failed to parse ByteSize. {msg}")]
224pub struct ParseSizeError {
225    msg: String,
226}
227
228pub fn parse_size(s: &str) -> Result<u64, ParseSizeError> {
229    Ok(s.parse::<ByteSize>()?.0)
230}
231
232pub const MIN_RETRY_DELAY: Duration = Duration::from_millis(500);
233pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(5);
234pub const BACKOFF_FACTOR: u32 = 2;
235// Exponential backoff jitter as a fraction of the backoff delay, (numerator, denominator).
236pub const BACKOFF_JITTER: (u64, u64) = (1, 10);
237
238#[derive(Clone, Copy, Debug, Parser, PartialEq, Eq, PartialOrd, Ord)]
239pub struct BackoffParams {
240    /// Exponential backoff exponent.
241    #[clap(
242        long = "catchup-backoff-factor",
243        env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_FACTOR",
244        default_value = "4"
245    )]
246    factor: u32,
247
248    /// Exponential backoff base delay.
249    #[clap(
250        long = "catchup-base-retry-delay",
251        env = "ESPRESSO_SEQUENCER_CATCHUP_BASE_RETRY_DELAY",
252        default_value = "200ms",
253        value_parser = parse_duration
254    )]
255    base: Duration,
256
257    /// Exponential max delay.
258    #[clap(
259        long = "catchup-max-retry-delay",
260        env = "ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY",
261        default_value = "5s",
262        value_parser = parse_duration
263    )]
264    max: Duration,
265
266    /// Exponential backoff jitter as a ratio of the backoff delay, numerator:denominator.
267    #[clap(
268        long = "catchup-backoff-jitter",
269        env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_JITTER",
270        default_value = "1:10"
271    )]
272    jitter: Ratio,
273
274    /// Disable retries and just fail after one failed attempt.
275    #[clap(short, long, env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_DISABLE")]
276    disable: bool,
277}
278
279impl Default for BackoffParams {
280    fn default() -> Self {
281        Self::parse_from(std::iter::empty::<String>())
282    }
283}
284
285impl BackoffParams {
286    pub fn disabled() -> Self {
287        Self {
288            disable: true,
289            ..Default::default()
290        }
291    }
292
293    pub async fn retry<S, T>(
294        &self,
295        mut state: S,
296        f: impl for<'a> Fn(&'a mut S, usize) -> BoxFuture<'a, anyhow::Result<T>>,
297    ) -> anyhow::Result<T> {
298        let mut delay = self.base;
299        for i in 0.. {
300            match f(&mut state, i).await {
301                Ok(res) => return Ok(res),
302                Err(err) if self.disable => {
303                    return Err(err.context("Retryable operation failed; retries disabled"));
304                },
305                Err(err) => {
306                    tracing::warn!(
307                        "Retryable operation failed, will retry after {delay:?}: {err:#}"
308                    );
309                    sleep(delay).await;
310                    delay = self.backoff(delay);
311                },
312            }
313        }
314        unreachable!()
315    }
316
317    #[must_use]
318    pub fn backoff(&self, delay: Duration) -> Duration {
319        if delay >= self.max {
320            return self.max;
321        }
322
323        let mut rng = rand::thread_rng();
324
325        // Increase the backoff by the backoff factor.
326        let ms = (delay * self.factor).as_millis() as u64;
327
328        // Sample a random jitter factor in the range [0, self.jitter].
329        let jitter_num = rng.gen_range(0..self.jitter.numerator);
330        let jitter_den = self.jitter.denominator;
331
332        // Increase the delay by the jitter factor.
333        let jitter = ms * jitter_num / jitter_den;
334        let delay = Duration::from_millis(ms + jitter);
335
336        // Bound the delay by the maximum.
337        min(delay, self.max)
338    }
339}