espresso_types/v0/
utils.rs

1use std::{
2    cmp::{min, Ordering},
3    fmt::{self, Debug, Display, Formatter},
4    num::ParseIntError,
5    str::FromStr,
6    time::Duration,
7};
8
9use anyhow::Context;
10use bytesize::ByteSize;
11use clap::Parser;
12use committable::Committable;
13use derive_more::{From, Into};
14use futures::future::BoxFuture;
15use hotshot_types::{
16    consensus::CommitmentMap,
17    data::{Leaf, Leaf2},
18    traits::node_implementation::NodeType,
19};
20use rand::Rng;
21use sequencer_utils::{impl_serde_from_string_or_integer, ser::FromStringOrInteger};
22use serde::{Deserialize, Serialize};
23use thiserror::Error;
24use time::{
25    format_description::well_known::Rfc3339 as TimestampFormat, macros::time, Date, OffsetDateTime,
26};
27use tokio::time::sleep;
28
29use crate::ChainConfig;
30
31pub fn upgrade_commitment_map<Types: NodeType>(
32    map: CommitmentMap<Leaf<Types>>,
33) -> CommitmentMap<Leaf2<Types>> {
34    map.into_values()
35        .map(|leaf| {
36            let leaf2: Leaf2<Types> = leaf.into();
37            (leaf2.commit(), leaf2)
38        })
39        .collect()
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
43pub enum Update<T> {
44    #[default]
45    #[serde(rename = "__skip")]
46    Skip,
47    #[serde(untagged)]
48    Set(T),
49}
50
51impl<T> Update<T> {
52    pub fn map<U>(self, f: impl FnOnce(T) -> U) -> Update<U> {
53        match self {
54            Update::Skip => Update::Skip,
55            Update::Set(v) => Update::Set(f(v)),
56        }
57    }
58}
59
60/// Information about the genesis state which feeds into the genesis block header.
61#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
62pub struct GenesisHeader {
63    pub timestamp: Timestamp,
64    pub chain_config: ChainConfig,
65}
66
67#[derive(Hash, Copy, Clone, Debug, derive_more::Display, PartialEq, Eq, From, Into)]
68#[display("{}", _0.format(&TimestampFormat).unwrap())]
69pub struct Timestamp(OffsetDateTime);
70
71impl_serde_from_string_or_integer!(Timestamp);
72
73impl Default for Timestamp {
74    fn default() -> Self {
75        Self::from_integer(0).unwrap()
76    }
77}
78
79impl Timestamp {
80    pub fn unix_timestamp(&self) -> u64 {
81        self.0.unix_timestamp() as u64
82    }
83
84    pub fn unix_timestamp_millis(&self) -> u64 {
85        (self.0.unix_timestamp_nanos() / 1_000_000) as u64
86    }
87
88    pub fn max() -> Self {
89        Self(OffsetDateTime::new_utc(Date::MAX, time!(23:59)))
90    }
91}
92
93impl FromStringOrInteger for Timestamp {
94    type Binary = u64;
95    type Integer = u64;
96
97    fn from_binary(b: Self::Binary) -> anyhow::Result<Self> {
98        Self::from_integer(b)
99    }
100
101    fn from_integer(i: Self::Integer) -> anyhow::Result<Self> {
102        let unix = i.try_into().context("timestamp out of range")?;
103        Ok(Self(
104            OffsetDateTime::from_unix_timestamp(unix).context("invalid timestamp")?,
105        ))
106    }
107
108    fn from_string(s: String) -> anyhow::Result<Self> {
109        Ok(Self(
110            OffsetDateTime::parse(&s, &TimestampFormat).context("invalid timestamp")?,
111        ))
112    }
113
114    fn to_binary(&self) -> anyhow::Result<Self::Binary> {
115        Ok(self.unix_timestamp())
116    }
117
118    fn to_string(&self) -> anyhow::Result<String> {
119        Ok(format!("{self}"))
120    }
121}
122
123#[derive(
124    Hash,
125    Copy,
126    Clone,
127    Debug,
128    derive_more::Display,
129    PartialEq,
130    Eq,
131    PartialOrd,
132    Ord,
133    Serialize,
134    Deserialize,
135)]
136#[display("{}", _0)]
137pub struct TimestampMillis(u64);
138
139impl TimestampMillis {
140    pub fn from_time(time: &OffsetDateTime) -> Self {
141        let timestamp = (time.unix_timestamp_nanos() / 1_000_000) as u64;
142
143        Self(timestamp)
144    }
145
146    pub fn from_millis(millis: u64) -> Self {
147        Self(millis)
148    }
149
150    pub fn u64(&self) -> u64 {
151        self.0
152    }
153}
154
155#[derive(Clone, Copy, Debug, PartialEq, Eq)]
156pub struct Ratio {
157    pub numerator: u64,
158    pub denominator: u64,
159}
160
161impl From<Ratio> for (u64, u64) {
162    fn from(r: Ratio) -> Self {
163        (r.numerator, r.denominator)
164    }
165}
166
167impl Display for Ratio {
168    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
169        write!(f, "{}:{}", self.numerator, self.denominator)
170    }
171}
172
173impl PartialOrd for Ratio {
174    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
175        Some(self.cmp(other))
176    }
177}
178
179impl Ord for Ratio {
180    fn cmp(&self, other: &Self) -> Ordering {
181        (self.numerator * other.denominator).cmp(&(other.numerator * self.denominator))
182    }
183}
184
185#[derive(Debug, Error)]
186pub enum ParseRatioError {
187    #[error("numerator and denominator must be separated by :")]
188    MissingDelimiter,
189    #[error("Invalid numerator {err:?}")]
190    InvalidNumerator { err: ParseIntError },
191    #[error("Invalid denominator {err:?}")]
192    InvalidDenominator { err: ParseIntError },
193}
194
195impl FromStr for Ratio {
196    type Err = ParseRatioError;
197
198    fn from_str(s: &str) -> Result<Self, Self::Err> {
199        let (num, den) = s.split_once(':').ok_or(ParseRatioError::MissingDelimiter)?;
200        Ok(Self {
201            numerator: num
202                .parse()
203                .map_err(|err| ParseRatioError::InvalidNumerator { err })?,
204            denominator: den
205                .parse()
206                .map_err(|err| ParseRatioError::InvalidDenominator { err })?,
207        })
208    }
209}
210
211#[derive(Clone, Debug, Error)]
212#[error("Failed to parse duration {reason}")]
213pub struct ParseDurationError {
214    reason: String,
215}
216
217pub fn parse_duration(s: &str) -> Result<Duration, ParseDurationError> {
218    cld::ClDuration::from_str(s)
219        .map(Duration::from)
220        .map_err(|err| ParseDurationError {
221            reason: err.to_string(),
222        })
223}
224
225#[derive(Clone, Debug, From, Error)]
226#[error("failed to parse ByteSize. {msg}")]
227pub struct ParseSizeError {
228    msg: String,
229}
230
231pub fn parse_size(s: &str) -> Result<u64, ParseSizeError> {
232    Ok(s.parse::<ByteSize>()?.0)
233}
234
235pub const MIN_RETRY_DELAY: Duration = Duration::from_millis(500);
236pub const MAX_RETRY_DELAY: Duration = Duration::from_secs(5);
237pub const BACKOFF_FACTOR: u32 = 2;
238// Exponential backoff jitter as a fraction of the backoff delay, (numerator, denominator).
239pub const BACKOFF_JITTER: (u64, u64) = (1, 10);
240
241#[derive(Clone, Copy, Debug, Parser, PartialEq, Eq, PartialOrd, Ord)]
242pub struct BackoffParams {
243    /// Exponential backoff exponent.
244    #[clap(
245        long = "catchup-backoff-factor",
246        env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_FACTOR",
247        default_value = "4"
248    )]
249    factor: u32,
250
251    /// Exponential backoff base delay.
252    #[clap(
253        long = "catchup-base-retry-delay",
254        env = "ESPRESSO_SEQUENCER_CATCHUP_BASE_RETRY_DELAY",
255        default_value = "200ms",
256        value_parser = parse_duration
257    )]
258    base: Duration,
259
260    /// Exponential max delay.
261    #[clap(
262        long = "catchup-max-retry-delay",
263        env = "ESPRESSO_SEQUENCER_CATCHUP_MAX_RETRY_DELAY",
264        default_value = "5s",
265        value_parser = parse_duration
266    )]
267    max: Duration,
268
269    /// Exponential backoff jitter as a ratio of the backoff delay, numerator:denominator.
270    #[clap(
271        long = "catchup-backoff-jitter",
272        env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_JITTER",
273        default_value = "1:10"
274    )]
275    jitter: Ratio,
276
277    /// Disable retries and just fail after one failed attempt.
278    #[clap(short, long, env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_DISABLE")]
279    disable: bool,
280}
281
282impl Default for BackoffParams {
283    fn default() -> Self {
284        Self::parse_from(std::iter::empty::<String>())
285    }
286}
287
288impl BackoffParams {
289    pub fn disabled() -> Self {
290        Self {
291            disable: true,
292            ..Default::default()
293        }
294    }
295
296    pub async fn retry<S, T>(
297        &self,
298        mut state: S,
299        f: impl for<'a> Fn(&'a mut S, usize) -> BoxFuture<'a, anyhow::Result<T>>,
300    ) -> anyhow::Result<T> {
301        let mut delay = self.base;
302        for i in 0.. {
303            match f(&mut state, i).await {
304                Ok(res) => return Ok(res),
305                Err(err) if self.disable => {
306                    return Err(err.context("Retryable operation failed; retries disabled"));
307                },
308                Err(err) => {
309                    tracing::warn!(
310                        "Retryable operation failed, will retry after {delay:?}: {err:#}"
311                    );
312                    sleep(delay).await;
313                    delay = self.backoff(delay);
314                },
315            }
316        }
317        unreachable!()
318    }
319
320    #[must_use]
321    pub fn backoff(&self, delay: Duration) -> Duration {
322        if delay >= self.max {
323            return self.max;
324        }
325
326        let mut rng = rand::thread_rng();
327
328        // Increase the backoff by the backoff factor.
329        let ms = (delay * self.factor).as_millis() as u64;
330
331        // Sample a random jitter factor in the range [0, self.jitter].
332        let jitter_num = rng.gen_range(0..self.jitter.numerator);
333        let jitter_den = self.jitter.denominator;
334
335        // Increase the delay by the jitter factor.
336        let jitter = ms * jitter_num / jitter_den;
337        let delay = Duration::from_millis(ms + jitter);
338
339        // Bound the delay by the maximum.
340        min(delay, self.max)
341    }
342}