espresso_types/v0/
utils.rs

1use std::{
2    cmp::{min, Ordering},
3    fmt::{self, Debug, Display, Formatter},
4    num::ParseIntError,
5    str::FromStr,
6    time::Duration,
7};
8
9use anyhow::Context;
10use bytesize::ByteSize;
11use clap::Parser;
12use committable::Committable;
13use derive_more::{From, Into};
14use futures::future::BoxFuture;
15use hotshot_types::{
16    consensus::CommitmentMap,
17    data::{Leaf, Leaf2},
18    traits::node_implementation::NodeType,
19};
20use rand::Rng;
21use sequencer_utils::{impl_serde_from_string_or_integer, ser::FromStringOrInteger};
22use serde::{Deserialize, Serialize};
23use thiserror::Error;
24use time::{
25    format_description::well_known::Rfc3339 as TimestampFormat, macros::time, Date, OffsetDateTime,
26};
27use tokio::time::sleep;
28
29pub fn upgrade_commitment_map<Types: NodeType>(
30    map: CommitmentMap<Leaf<Types>>,
31) -> CommitmentMap<Leaf2<Types>> {
32    map.into_values()
33        .map(|leaf| {
34            let leaf2: Leaf2<Types> = leaf.into();
35            (leaf2.commit(), leaf2)
36        })
37        .collect()
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
41pub enum Update<T> {
42    #[default]
43    #[serde(rename = "__skip")]
44    Skip,
45    #[serde(untagged)]
46    Set(T),
47}
48
49impl<T> Update<T> {
50    pub fn map<U>(self, f: impl FnOnce(T) -> U) -> Update<U> {
51        match self {
52            Update::Skip => Update::Skip,
53            Update::Set(v) => Update::Set(f(v)),
54        }
55    }
56}
57
58/// Information about the genesis state which feeds into the genesis block header.
59#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
60pub struct GenesisHeader {
61    pub timestamp: Timestamp,
62}
63
64#[derive(Hash, Copy, Clone, Debug, derive_more::Display, PartialEq, Eq, From, Into)]
65#[display("{}", _0.format(&TimestampFormat).unwrap())]
66pub struct Timestamp(OffsetDateTime);
67
68impl_serde_from_string_or_integer!(Timestamp);
69
70impl Default for Timestamp {
71    fn default() -> Self {
72        Self::from_integer(0).unwrap()
73    }
74}
75
76impl Timestamp {
77    pub fn unix_timestamp(&self) -> u64 {
78        self.0.unix_timestamp() as u64
79    }
80
81    pub fn max() -> Self {
82        Self(OffsetDateTime::new_utc(Date::MAX, time!(23:59)))
83    }
84}
85
86impl FromStringOrInteger for Timestamp {
87    type Binary = u64;
88    type Integer = u64;
89
90    fn from_binary(b: Self::Binary) -> anyhow::Result<Self> {
91        Self::from_integer(b)
92    }
93
94    fn from_integer(i: Self::Integer) -> anyhow::Result<Self> {
95        let unix = i.try_into().context("timestamp out of range")?;
96        Ok(Self(
97            OffsetDateTime::from_unix_timestamp(unix).context("invalid timestamp")?,
98        ))
99    }
100
101    fn from_string(s: String) -> anyhow::Result<Self> {
102        Ok(Self(
103            OffsetDateTime::parse(&s, &TimestampFormat).context("invalid timestamp")?,
104        ))
105    }
106
107    fn to_binary(&self) -> anyhow::Result<Self::Binary> {
108        Ok(self.unix_timestamp())
109    }
110
111    fn to_string(&self) -> anyhow::Result<String> {
112        Ok(format!("{self}"))
113    }
114}
115
116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
117pub struct Ratio {
118    pub numerator: u64,
119    pub denominator: u64,
120}
121
122impl From<Ratio> for (u64, u64) {
123    fn from(r: Ratio) -> Self {
124        (r.numerator, r.denominator)
125    }
126}
127
128impl Display for Ratio {
129    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
130        write!(f, "{}:{}", self.numerator, self.denominator)
131    }
132}
133
134impl PartialOrd for Ratio {
135    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
136        Some(self.cmp(other))
137    }
138}
139
140impl Ord for Ratio {
141    fn cmp(&self, other: &Self) -> Ordering {
142        (self.numerator * other.denominator).cmp(&(other.numerator * self.denominator))
143    }
144}
145
146#[derive(Debug, Error)]
147pub enum ParseRatioError {
148    #[error("numerator and denominator must be separated by :")]
149    MissingDelimiter,
150    #[error("Invalid numerator {err:?}")]
151    InvalidNumerator { err: ParseIntError },
152    #[error("Invalid denominator {err:?}")]
153    InvalidDenominator { err: ParseIntError },
154}
155
156impl FromStr for Ratio {
157    type Err = ParseRatioError;
158
159    fn from_str(s: &str) -> Result<Self, Self::Err> {
160        let (num, den) = s.split_once(':').ok_or(ParseRatioError::MissingDelimiter)?;
161        Ok(Self {
162            numerator: num
163                .parse()
164                .map_err(|err| ParseRatioError::InvalidNumerator { err })?,
165            denominator: den
166                .parse()
167                .map_err(|err| ParseRatioError::InvalidDenominator { err })?,
168        })
169    }
170}
171
172#[derive(Clone, Debug, Error)]
173#[error("Failed to parse duration {reason}")]
174pub struct ParseDurationError {
175    reason: String,
176}
177
178pub fn parse_duration(s: &str) -> Result<Duration, ParseDurationError> {
179    cld::ClDuration::from_str(s)
180        .map(Duration::from)
181        .map_err(|err| ParseDurationError {
182            reason: err.to_string(),
183        })
184}
185
186#[derive(Clone, Debug, From, Error)]
187#[error("failed to parse ByteSize. {msg}")]
188pub struct ParseSizeError {
189    msg: String,
190}
191
192pub fn parse_size(s: &str) -> Result<u64, ParseSizeError> {
193    Ok(s.parse::<ByteSize>()?.0)
194}
195
196pub const MIN_RETRY_DELAY: Duration = Duration::from_millis(500);
197pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(5);
198pub const BACKOFF_FACTOR: u32 = 2;
199// Exponential backoff jitter as a fraction of the backoff delay, (numerator, denominator).
200pub const BACKOFF_JITTER: (u64, u64) = (1, 10);
201
202#[derive(Clone, Copy, Debug, Parser, PartialEq, Eq, PartialOrd, Ord)]
203pub struct BackoffParams {
204    /// Exponential backoff exponent.
205    #[clap(
206        long = "catchup-backoff-factor",
207        env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_FACTOR",
208        default_value = "4"
209    )]
210    factor: u32,
211
212    /// Exponential backoff base delay.
213    #[clap(
214        long = "catchup-base-retry-delay",
215        env = "ESPRESSO_SEQUENCER_CATCHUP_BASE_RETRY_DELAY",
216        default_value = "20ms",
217        value_parser = parse_duration
218    )]
219    base: Duration,
220
221    /// Exponential max delay.
222    #[clap(
223        long = "catchup-max-retry-delay",
224        env = "ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY",
225        default_value = "5s",
226        value_parser = parse_duration
227    )]
228    max: Duration,
229
230    /// Exponential backoff jitter as a ratio of the backoff delay, numerator:denominator.
231    #[clap(
232        long = "catchup-backoff-jitter",
233        env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_JITTER",
234        default_value = "1:10"
235    )]
236    jitter: Ratio,
237
238    /// Disable retries and just fail after one failed attempt.
239    #[clap(short, long, env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_DISABLE")]
240    disable: bool,
241}
242
243impl Default for BackoffParams {
244    fn default() -> Self {
245        Self::parse_from(std::iter::empty::<String>())
246    }
247}
248
249impl BackoffParams {
250    pub fn disabled() -> Self {
251        Self {
252            disable: true,
253            ..Default::default()
254        }
255    }
256
257    pub async fn retry<S, T>(
258        &self,
259        mut state: S,
260        f: impl for<'a> Fn(&'a mut S, usize) -> BoxFuture<'a, anyhow::Result<T>>,
261    ) -> anyhow::Result<T> {
262        let mut delay = self.base;
263        for i in 0.. {
264            match f(&mut state, i).await {
265                Ok(res) => return Ok(res),
266                Err(err) if self.disable => {
267                    return Err(err.context("Retryable operation failed; retries disabled"));
268                },
269                Err(err) => {
270                    tracing::warn!(
271                        "Retryable operation failed, will retry after {delay:?}: {err:#}"
272                    );
273                    sleep(delay).await;
274                    delay = self.backoff(delay);
275                },
276            }
277        }
278        unreachable!()
279    }
280
281    #[must_use]
282    pub fn backoff(&self, delay: Duration) -> Duration {
283        if delay >= self.max {
284            return self.max;
285        }
286
287        let mut rng = rand::thread_rng();
288
289        // Increase the backoff by the backoff factor.
290        let ms = (delay * self.factor).as_millis() as u64;
291
292        // Sample a random jitter factor in the range [0, self.jitter].
293        let jitter_num = rng.gen_range(0..self.jitter.numerator);
294        let jitter_den = self.jitter.denominator;
295
296        // Increase the delay by the jitter factor.
297        let jitter = ms * jitter_num / jitter_den;
298        let delay = Duration::from_millis(ms + jitter);
299
300        // Bound the delay by the maximum.
301        min(delay, self.max)
302    }
303}