sequencer/
catchup.rs

1use std::{
2    cmp::Ordering,
3    collections::HashMap,
4    fmt::{Debug, Display},
5    sync::Arc,
6    time::Duration,
7};
8
9use alloy::primitives::U256;
10use anyhow::{anyhow, bail, ensure, Context};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::{Commitment, Committable};
14use espresso_types::{
15    config::PublicNetworkConfig,
16    traits::SequencerPersistence,
17    v0::traits::StateCatchup,
18    v0_3::{
19        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1,
20        RewardMerkleTreeV1,
21    },
22    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2, RewardMerkleTreeV2},
23    BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
24    FeeMerkleTree, Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions, ValidatedState,
25};
26use futures::{
27    future::{Future, FutureExt, TryFuture, TryFutureExt},
28    stream::FuturesUnordered,
29    StreamExt,
30};
31use hotshot_types::{
32    consensus::Consensus,
33    data::ViewNumber,
34    message::UpgradeLock,
35    network::NetworkConfig,
36    stake_table::HSStakeTable,
37    traits::{
38        metrics::{Counter, CounterFamily, Metrics},
39        network::ConnectedNetwork,
40        node_implementation::{ConsensusTime as _, NodeType, Versions},
41        ValidatedState as ValidatedStateTrait,
42    },
43    utils::{verify_leaf_chain, View, ViewInner},
44    ValidatorConfig,
45};
46use itertools::Itertools;
47use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme};
48use parking_lot::Mutex;
49use priority_queue::PriorityQueue;
50use serde::de::DeserializeOwned;
51use surf_disco::Request;
52use tide_disco::error::ServerError;
53use tokio::time::timeout;
54use tokio_util::task::AbortOnDropHandle;
55use tracing::warn;
56use url::Url;
57use vbs::version::StaticVersionType;
58
59use crate::api::BlocksFrontier;
60
61// This newtype is probably not worth having. It's only used to be able to log
62// URLs before doing requests.
63#[derive(Debug, Clone)]
64struct Client<ServerError, ApiVer: StaticVersionType> {
65    inner: surf_disco::Client<ServerError, ApiVer>,
66    url: Url,
67    requests: Arc<Box<dyn Counter>>,
68    failures: Arc<Box<dyn Counter>>,
69}
70
71impl<ApiVer: StaticVersionType> Client<ServerError, ApiVer> {
72    pub fn new(
73        url: Url,
74        requests: &(impl CounterFamily + ?Sized),
75        failures: &(impl CounterFamily + ?Sized),
76    ) -> Self {
77        Self {
78            inner: surf_disco::Client::new(url.clone()),
79            requests: Arc::new(requests.create(vec![url.to_string()])),
80            failures: Arc::new(failures.create(vec![url.to_string()])),
81            url,
82        }
83    }
84
85    pub fn get<T: DeserializeOwned>(&self, route: &str) -> Request<T, ServerError, ApiVer> {
86        self.inner.get(route)
87    }
88}
89
90/// A score of a catchup peer, based on our interactions with that peer.
91///
92/// The score accounts for malicious peers -- i.e. peers that gave us an invalid response to a
93/// verifiable request -- and faulty/unreliable peers -- those that fail to respond to requests at
94/// all. The score has a comparison function where higher is better, or in other words `p1 > p2`
95/// means we believe we are more likely to successfully catch up using `p1` than `p2`. This makes it
96/// convenient and efficient to collect peers in a priority queue which we can easily convert to a
97/// list sorted by reliability.
98#[derive(Clone, Copy, Debug, Default)]
99struct PeerScore {
100    requests: usize,
101    failures: usize,
102}
103
104impl Ord for PeerScore {
105    fn cmp(&self, other: &Self) -> Ordering {
106        // Compare failure rates: `self` is better than `other` if
107        //      self.failures / self.requests < other.failures / other.requests
108        // or equivalently
109        //      other.failures * self.requests > self.failures * other.requests
110        (other.failures * self.requests).cmp(&(self.failures * other.requests))
111    }
112}
113
114impl PartialOrd for PeerScore {
115    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
116        Some(self.cmp(other))
117    }
118}
119
120impl PartialEq for PeerScore {
121    fn eq(&self, other: &Self) -> bool {
122        self.cmp(other).is_eq()
123    }
124}
125
126impl Eq for PeerScore {}
127
128#[derive(Debug, Clone, Default)]
129pub struct StatePeers<ApiVer: StaticVersionType> {
130    // Peer IDs, ordered by reliability score. Each ID is an index into `clients`.
131    scores: Arc<RwLock<PriorityQueue<usize, PeerScore>>>,
132    clients: Vec<Client<ServerError, ApiVer>>,
133    backoff: BackoffParams,
134}
135
136impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
137    async fn fetch<Fut>(
138        &self,
139        retry: usize,
140        f: impl Fn(Client<ServerError, ApiVer>) -> Fut,
141    ) -> anyhow::Result<Fut::Ok>
142    where
143        Fut: TryFuture<Error: Display>,
144    {
145        // Since we have generally have multiple peers we can catch up from, we want a fairly
146        // aggressive timeout for requests: if a peer is not responding quickly, we're better off
147        // just trying the next one rather than waiting, and this prevents a malicious peer from
148        // delaying catchup for a long time.
149        //
150        // However, if we set the timeout _too_ aggressively, we might fail to catch up even from an
151        // honest peer, and thus never make progress. Thus, we start with a timeout of 500ms, which
152        // is aggressive but still very reasonable for an HTTP request. If that fails with all of
153        // our peers, we increase the timeout by 1 second for each successive retry, until we
154        // eventually succeed.
155        let timeout_dur = Duration::from_millis(500) * (retry as u32 + 1);
156
157        // Keep track of which peers we make requests to and which succeed (`true`) or fail (`false`),
158        // so we can update reliability scores at the end.
159        let mut requests = HashMap::new();
160        let mut res = Err(anyhow!("failed fetching from every peer"));
161
162        // Try each peer in order of reliability score, until we succeed. We clone out of
163        // `self.scores` because it is small (contains only numeric IDs and scores), so this clone
164        // is a lot cheaper than holding the read lock the entire time we are making requests (which
165        // could be a while).
166        let mut scores = { (*self.scores.read().await).clone() };
167        while let Some((id, score)) = scores.pop() {
168            let client = &self.clients[id];
169            tracing::info!("fetching from {}", client.url);
170            match timeout(timeout_dur, f(client.clone()).into_future()).await {
171                Ok(Ok(t)) => {
172                    requests.insert(id, true);
173                    res = Ok(t);
174                    break;
175                },
176                Ok(Err(err)) => {
177                    tracing::warn!(id, ?score, peer = %client.url, "error from peer: {err:#}");
178                    requests.insert(id, false);
179                },
180                Err(_) => {
181                    tracing::warn!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out");
182                    requests.insert(id, false);
183                },
184            }
185        }
186
187        // Update client scores.
188        let mut scores = self.scores.write().await;
189        for (id, success) in requests {
190            scores.change_priority_by(&id, |score| {
191                score.requests += 1;
192                self.clients[id].requests.add(1);
193                if !success {
194                    score.failures += 1;
195                    self.clients[id].failures.add(1);
196                }
197            });
198        }
199
200        res
201    }
202
203    pub fn from_urls(
204        urls: Vec<Url>,
205        backoff: BackoffParams,
206        metrics: &(impl Metrics + ?Sized),
207    ) -> Self {
208        if urls.is_empty() {
209            panic!("Cannot create StatePeers with no peers");
210        }
211
212        let metrics = metrics.subgroup("catchup".into());
213        let requests = metrics.counter_family("requests".into(), vec!["peer".into()]);
214        let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]);
215
216        let scores = urls
217            .iter()
218            .enumerate()
219            .map(|(i, _)| (i, PeerScore::default()))
220            .collect();
221        let clients = urls
222            .into_iter()
223            .map(|url| Client::new(url, &*requests, &*failures))
224            .collect();
225
226        Self {
227            clients,
228            scores: Arc::new(RwLock::new(scores)),
229            backoff,
230        }
231    }
232
233    #[tracing::instrument(skip(self, my_own_validator_config))]
234    pub async fn fetch_config(
235        &self,
236        my_own_validator_config: ValidatorConfig<SeqTypes>,
237    ) -> anyhow::Result<NetworkConfig<SeqTypes>> {
238        self.backoff()
239            .retry(self, move |provider, retry| {
240                let my_own_validator_config = my_own_validator_config.clone();
241                async move {
242                    let cfg: PublicNetworkConfig = provider
243                        .fetch(retry, |client| {
244                            let url = client.url.join("config/hotshot").unwrap();
245
246                            reqwest::get(url.clone())
247                        })
248                        .await?
249                        .json()
250                        .await?;
251                    cfg.into_network_config(my_own_validator_config)
252                        .context("fetched config, but failed to convert to private config")
253                }
254                .boxed()
255            })
256            .await
257    }
258}
259
260#[async_trait]
261impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
262    #[tracing::instrument(skip(self, _instance))]
263    async fn try_fetch_accounts(
264        &self,
265        retry: usize,
266        _instance: &NodeState,
267        height: u64,
268        view: ViewNumber,
269        fee_merkle_tree_root: FeeMerkleCommitment,
270        accounts: &[FeeAccount],
271    ) -> anyhow::Result<Vec<FeeAccountProof>> {
272        self.fetch(retry, |client| async move {
273            let tree = client
274                .inner
275                .post::<FeeMerkleTree>(&format!("catchup/{height}/{}/accounts", view.u64()))
276                .body_binary(&accounts.to_vec())?
277                .send()
278                .await?;
279
280            // Verify proofs.
281            let mut proofs = Vec::new();
282            for account in accounts {
283                let (proof, _) = FeeAccountProof::prove(&tree, (*account).into())
284                    .context(format!("response missing fee account {account}"))?;
285                proof
286                    .verify(&fee_merkle_tree_root)
287                    .context(format!("invalid proof for fee account {account}"))?;
288                proofs.push(proof);
289            }
290
291            anyhow::Ok(proofs)
292        })
293        .await
294    }
295
296    #[tracing::instrument(skip(self, _instance, mt))]
297    async fn try_remember_blocks_merkle_tree(
298        &self,
299        retry: usize,
300        _instance: &NodeState,
301        height: u64,
302        view: ViewNumber,
303        mt: &mut BlockMerkleTree,
304    ) -> anyhow::Result<()> {
305        *mt = self
306            .fetch(retry, |client| {
307                let mut mt = mt.clone();
308                async move {
309                    let frontier = client
310                        .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
311                        .send()
312                        .await?;
313                    let elem = frontier
314                        .elem()
315                        .context("provided frontier is missing leaf element")?;
316                    mt.remember(mt.num_leaves() - 1, *elem, &frontier)
317                        .context("verifying block proof")?;
318                    anyhow::Ok(mt)
319                }
320            })
321            .await?;
322        Ok(())
323    }
324
325    async fn try_fetch_chain_config(
326        &self,
327        retry: usize,
328        commitment: Commitment<ChainConfig>,
329    ) -> anyhow::Result<ChainConfig> {
330        self.fetch(retry, |client| async move {
331            let cf = client
332                .get::<ChainConfig>(&format!("catchup/chain-config/{commitment}"))
333                .send()
334                .await?;
335            ensure!(
336                cf.commit() == commitment,
337                "received chain config with mismatched commitment: expected {commitment}, got {}",
338                cf.commit()
339            );
340            Ok(cf)
341        })
342        .await
343    }
344
345    async fn try_fetch_leaf(
346        &self,
347        retry: usize,
348        height: u64,
349        stake_table: HSStakeTable<SeqTypes>,
350        success_threshold: U256,
351    ) -> anyhow::Result<Leaf2> {
352        // Get the leaf chain
353        let leaf_chain = self
354            .fetch(retry, |client| async move {
355                let leaf = client
356                    .get::<Vec<Leaf2>>(&format!("catchup/{height}/leafchain"))
357                    .send()
358                    .await?;
359                anyhow::Ok(leaf)
360            })
361            .await
362            .with_context(|| format!("failed to fetch leaf chain at height {height}"))?;
363
364        // Verify it, returning the leaf at the given height
365        verify_leaf_chain(
366            leaf_chain,
367            &stake_table,
368            success_threshold,
369            height,
370            &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
371        )
372        .await
373        .with_context(|| format!("failed to verify leaf chain at height {height}"))
374    }
375
376    #[tracing::instrument(skip(self, _instance))]
377    async fn try_fetch_reward_accounts_v2(
378        &self,
379        retry: usize,
380        _instance: &NodeState,
381        height: u64,
382        view: ViewNumber,
383        reward_merkle_tree_root: RewardMerkleCommitmentV2,
384        accounts: &[RewardAccountV2],
385    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
386        self.fetch(retry, |client| async move {
387            let tree = client
388                .inner
389                .post::<RewardMerkleTreeV2>(&format!(
390                    "catchup/{height}/{}/reward-accounts-v2",
391                    view.u64()
392                ))
393                .body_binary(&accounts.to_vec())?
394                .send()
395                .await?;
396
397            // Verify proofs.
398            // Verify proofs.
399            let mut proofs = Vec::new();
400            for account in accounts {
401                let (proof, _) = RewardAccountProofV2::prove(&tree, (*account).into())
402                    .context(format!("response missing reward account {account}"))?;
403                proof
404                    .verify(&reward_merkle_tree_root)
405                    .context(format!("invalid proof for reward account {account}"))?;
406                proofs.push(proof);
407            }
408
409            anyhow::Ok(proofs)
410        })
411        .await
412    }
413
414    #[tracing::instrument(skip(self, _instance))]
415    async fn try_fetch_reward_accounts_v1(
416        &self,
417        retry: usize,
418        _instance: &NodeState,
419        height: u64,
420        view: ViewNumber,
421        reward_merkle_tree_root: RewardMerkleCommitmentV1,
422        accounts: &[RewardAccountV1],
423    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
424        self.fetch(retry, |client| async move {
425            let tree = client
426                .inner
427                .post::<RewardMerkleTreeV1>(&format!(
428                    "catchup/{height}/{}/reward-accounts",
429                    view.u64()
430                ))
431                .body_binary(&accounts.to_vec())?
432                .send()
433                .await?;
434
435            // Verify proofs.
436            let mut proofs = Vec::new();
437            for account in accounts {
438                let (proof, _) = RewardAccountProofV1::prove(&tree, (*account).into())
439                    .context(format!("response missing reward account {account}"))?;
440                proof
441                    .verify(&reward_merkle_tree_root)
442                    .context(format!("invalid proof for reward account {account}"))?;
443                proofs.push(proof);
444            }
445
446            anyhow::Ok(proofs)
447        })
448        .await
449    }
450
451    fn backoff(&self) -> &BackoffParams {
452        &self.backoff
453    }
454
455    fn name(&self) -> String {
456        format!(
457            "StatePeers({})",
458            self.clients
459                .iter()
460                .map(|client| client.url.to_string())
461                .join(",")
462        )
463    }
464
465    fn is_local(&self) -> bool {
466        false
467    }
468}
469
470pub(crate) trait CatchupStorage: Sync {
471    /// Get the state of the requested `accounts`.
472    ///
473    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
474    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
475    /// This function is intended to be used for catchup, so `view` should be no older than the last
476    /// decided view.
477    ///
478    /// If successful, this function also returns the leaf from `view`, if it is available. This can
479    /// be used to add the recovered state to HotShot's state map, so that future requests can get
480    /// the state from memory rather than storage.
481    fn get_accounts(
482        &self,
483        _instance: &NodeState,
484        _height: u64,
485        _view: ViewNumber,
486        _accounts: &[FeeAccount],
487    ) -> impl Send + Future<Output = anyhow::Result<(FeeMerkleTree, Leaf2)>> {
488        // Merklized state catchup is only supported by persistence backends that provide merklized
489        // state storage. This default implementation is overridden for those that do. Otherwise,
490        // catchup can still be provided by fetching undecided merklized state from consensus
491        // memory.
492        async {
493            bail!("merklized state catchup is not supported for this data source");
494        }
495    }
496
497    fn get_reward_accounts_v1(
498        &self,
499        _instance: &NodeState,
500        _height: u64,
501        _view: ViewNumber,
502        _accounts: &[RewardAccountV1],
503    ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV1, Leaf2)>> {
504        async {
505            bail!("merklized state catchup is not supported for this data source");
506        }
507    }
508
509    fn get_reward_accounts_v2(
510        &self,
511        _instance: &NodeState,
512        _height: u64,
513        _view: ViewNumber,
514        _accounts: &[RewardAccountV2],
515    ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV2, Leaf2)>> {
516        async {
517            bail!("merklized state catchup is not supported for this data source");
518        }
519    }
520
521    /// Get the blocks Merkle tree frontier.
522    ///
523    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
524    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
525    /// This function is intended to be used for catchup, so `view` should be no older than the last
526    /// decided view.
527    fn get_frontier(
528        &self,
529        _instance: &NodeState,
530        _height: u64,
531        _view: ViewNumber,
532    ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>> {
533        // Merklized state catchup is only supported by persistence backends that provide merklized
534        // state storage. This default implementation is overridden for those that do. Otherwise,
535        // catchup can still be provided by fetching undecided merklized state from consensus
536        // memory.
537        async {
538            bail!("merklized state catchup is not supported for this data source");
539        }
540    }
541
542    fn get_chain_config(
543        &self,
544        _commitment: Commitment<ChainConfig>,
545    ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>> {
546        async {
547            bail!("chain config catchup is not supported for this data source");
548        }
549    }
550
551    fn get_leaf_chain(
552        &self,
553        _height: u64,
554    ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>> {
555        async {
556            bail!("leaf chain catchup is not supported for this data source");
557        }
558    }
559}
560
561impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {}
562
563impl<T, S> CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
564where
565    T: CatchupStorage,
566    S: Sync,
567{
568    async fn get_accounts(
569        &self,
570        instance: &NodeState,
571        height: u64,
572        view: ViewNumber,
573        accounts: &[FeeAccount],
574    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
575        self.inner()
576            .get_accounts(instance, height, view, accounts)
577            .await
578    }
579
580    async fn get_reward_accounts_v2(
581        &self,
582        instance: &NodeState,
583        height: u64,
584        view: ViewNumber,
585        accounts: &[RewardAccountV2],
586    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
587        self.inner()
588            .get_reward_accounts_v2(instance, height, view, accounts)
589            .await
590    }
591
592    async fn get_reward_accounts_v1(
593        &self,
594        instance: &NodeState,
595        height: u64,
596        view: ViewNumber,
597        accounts: &[RewardAccountV1],
598    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
599        self.inner()
600            .get_reward_accounts_v1(instance, height, view, accounts)
601            .await
602    }
603
604    async fn get_frontier(
605        &self,
606        instance: &NodeState,
607        height: u64,
608        view: ViewNumber,
609    ) -> anyhow::Result<BlocksFrontier> {
610        self.inner().get_frontier(instance, height, view).await
611    }
612
613    async fn get_chain_config(
614        &self,
615        commitment: Commitment<ChainConfig>,
616    ) -> anyhow::Result<ChainConfig> {
617        self.inner().get_chain_config(commitment).await
618    }
619    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
620        self.inner().get_leaf_chain(height).await
621    }
622}
623
624#[derive(Debug)]
625pub(crate) struct SqlStateCatchup<T> {
626    db: Arc<T>,
627    backoff: BackoffParams,
628}
629
630impl<T> SqlStateCatchup<T> {
631    pub(crate) fn new(db: Arc<T>, backoff: BackoffParams) -> Self {
632        Self { db, backoff }
633    }
634}
635
636#[async_trait]
637impl<T> StateCatchup for SqlStateCatchup<T>
638where
639    T: CatchupStorage + Send + Sync,
640{
641    async fn try_fetch_leaf(
642        &self,
643        _retry: usize,
644        height: u64,
645        stake_table: HSStakeTable<SeqTypes>,
646        success_threshold: U256,
647    ) -> anyhow::Result<Leaf2> {
648        // Get the leaf chain
649        let leaf_chain = self
650            .db
651            .get_leaf_chain(height)
652            .await
653            .with_context(|| "failed to get leaf chain from DB")?;
654
655        // Verify the leaf chain
656        let leaf = verify_leaf_chain(
657            leaf_chain,
658            &stake_table,
659            success_threshold,
660            height,
661            &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
662        )
663        .await
664        .with_context(|| "failed to verify leaf chain")?;
665
666        Ok(leaf)
667    }
668    // TODO: add a test for the account proof validation
669    // issue # 2102 (https://github.com/EspressoSystems/espresso-sequencer/issues/2102)
670    #[tracing::instrument(skip(self, _retry, instance))]
671    async fn try_fetch_accounts(
672        &self,
673        _retry: usize,
674        instance: &NodeState,
675        block_height: u64,
676        view: ViewNumber,
677        fee_merkle_tree_root: FeeMerkleCommitment,
678        accounts: &[FeeAccount],
679    ) -> anyhow::Result<Vec<FeeAccountProof>> {
680        // Get the accounts
681        let (fee_merkle_tree_from_db, _) = self
682            .db
683            .get_accounts(instance, block_height, view, accounts)
684            .await
685            .with_context(|| "failed to get fee accounts from DB")?;
686
687        // Verify the accounts
688        let mut proofs = Vec::new();
689        for account in accounts {
690            let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree_from_db, (*account).into())
691                .context(format!("response missing account {account}"))?;
692            proof
693                .verify(&fee_merkle_tree_root)
694                .context(format!("invalid proof for account {account}"))?;
695            proofs.push(proof);
696        }
697
698        Ok(proofs)
699    }
700
701    #[tracing::instrument(skip(self, _retry, instance, mt))]
702    async fn try_remember_blocks_merkle_tree(
703        &self,
704        _retry: usize,
705        instance: &NodeState,
706        bh: u64,
707        view: ViewNumber,
708        mt: &mut BlockMerkleTree,
709    ) -> anyhow::Result<()> {
710        if bh == 0 {
711            return Ok(());
712        }
713
714        let proof = self.db.get_frontier(instance, bh, view).await?;
715        match proof
716            .proof
717            .first()
718            .context(format!("empty proof for frontier at height {bh}"))?
719        {
720            MerkleNode::Leaf { pos, elem, .. } => mt
721                .remember(pos, elem, proof.clone())
722                .context("failed to remember proof"),
723            _ => bail!("invalid proof"),
724        }
725    }
726
727    async fn try_fetch_chain_config(
728        &self,
729        _retry: usize,
730        commitment: Commitment<ChainConfig>,
731    ) -> anyhow::Result<ChainConfig> {
732        let cf = self.db.get_chain_config(commitment).await?;
733
734        if cf.commit() != commitment {
735            panic!(
736                "Critical error: Mismatched chain config detected. Expected chain config: {:?}, \
737                 but got: {:?}.
738                This may indicate a compromised database",
739                commitment,
740                cf.commit()
741            )
742        }
743
744        Ok(cf)
745    }
746
747    #[tracing::instrument(skip(self, _retry, instance))]
748    async fn try_fetch_reward_accounts_v2(
749        &self,
750        _retry: usize,
751        instance: &NodeState,
752        block_height: u64,
753        view: ViewNumber,
754        reward_merkle_tree_root: RewardMerkleCommitmentV2,
755        accounts: &[RewardAccountV2],
756    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
757        // Get the accounts
758        let (reward_merkle_tree_from_db, _) = self
759            .db
760            .get_reward_accounts_v2(instance, block_height, view, accounts)
761            .await
762            .with_context(|| "failed to get reward accounts from DB")?;
763        // Verify the accounts
764        let mut proofs = Vec::new();
765        for account in accounts {
766            let (proof, _) =
767                RewardAccountProofV2::prove(&reward_merkle_tree_from_db, (*account).into())
768                    .context(format!("response missing account {account}"))?;
769            proof
770                .verify(&reward_merkle_tree_root)
771                .context(format!("invalid proof for account {account}"))?;
772            proofs.push(proof);
773        }
774
775        Ok(proofs)
776    }
777
778    #[tracing::instrument(skip(self, _retry, instance))]
779    async fn try_fetch_reward_accounts_v1(
780        &self,
781        _retry: usize,
782        instance: &NodeState,
783        block_height: u64,
784        view: ViewNumber,
785        reward_merkle_tree_root: RewardMerkleCommitmentV1,
786        accounts: &[RewardAccountV1],
787    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
788        // Get the accounts
789        let (reward_merkle_tree_from_db, _) = self
790            .db
791            .get_reward_accounts_v1(instance, block_height, view, accounts)
792            .await
793            .with_context(|| "failed to get reward accounts from DB")?;
794        // Verify the accounts
795        let mut proofs = Vec::new();
796        for account in accounts {
797            let (proof, _) =
798                RewardAccountProofV1::prove(&reward_merkle_tree_from_db, (*account).into())
799                    .context(format!("response missing account {account}"))?;
800            proof
801                .verify(&reward_merkle_tree_root)
802                .context(format!("invalid proof for account {account}"))?;
803            proofs.push(proof);
804        }
805
806        Ok(proofs)
807    }
808
809    fn backoff(&self) -> &BackoffParams {
810        &self.backoff
811    }
812
813    fn name(&self) -> String {
814        "SqlStateCatchup".into()
815    }
816
817    fn is_local(&self) -> bool {
818        true
819    }
820}
821
822/// Disable catchup entirely.
823#[derive(Clone, Debug)]
824pub struct NullStateCatchup {
825    backoff: BackoffParams,
826    chain_configs: HashMap<Commitment<ChainConfig>, ChainConfig>,
827}
828
829impl Default for NullStateCatchup {
830    fn default() -> Self {
831        Self {
832            backoff: BackoffParams::disabled(),
833            chain_configs: Default::default(),
834        }
835    }
836}
837
838impl NullStateCatchup {
839    /// Add a chain config preimage which can be fetched by hash during STF evaluation.
840    ///
841    /// [`NullStateCatchup`] is used to disable catchup entirely when evaluating the STF, which
842    /// requires the [`ValidatedState`](espresso_types::ValidatedState) to be pre-seeded with all
843    /// the dependencies of STF evaluation. However, the STF also depends on having the preimage of
844    /// various [`ChainConfig`] commitments, which are not stored in the
845    /// [`ValidatedState`](espresso_types::ValidatedState), but which instead must be supplied by a
846    /// separate preimage oracle. Thus, [`NullStateCatchup`] may be populated with a set of
847    /// [`ChainConfig`]s, which it can feed to the STF during evaluation.
848    pub fn add_chain_config(&mut self, cf: ChainConfig) {
849        self.chain_configs.insert(cf.commit(), cf);
850    }
851}
852
853#[async_trait]
854impl StateCatchup for NullStateCatchup {
855    async fn try_fetch_leaf(
856        &self,
857        _retry: usize,
858        _height: u64,
859        _stake_table: HSStakeTable<SeqTypes>,
860        _success_threshold: U256,
861    ) -> anyhow::Result<Leaf2> {
862        bail!("state catchup is disabled")
863    }
864
865    async fn try_fetch_accounts(
866        &self,
867        _retry: usize,
868        _instance: &NodeState,
869        _height: u64,
870        _view: ViewNumber,
871        _fee_merkle_tree_root: FeeMerkleCommitment,
872        _account: &[FeeAccount],
873    ) -> anyhow::Result<Vec<FeeAccountProof>> {
874        bail!("state catchup is disabled");
875    }
876
877    async fn try_fetch_reward_accounts_v2(
878        &self,
879        _retry: usize,
880        _instance: &NodeState,
881        _height: u64,
882        _view: ViewNumber,
883        _fee_merkle_tree_root: RewardMerkleCommitmentV2,
884        _account: &[RewardAccountV2],
885    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
886        bail!("state catchup is disabled");
887    }
888
889    async fn try_remember_blocks_merkle_tree(
890        &self,
891        _retry: usize,
892        _instance: &NodeState,
893        _height: u64,
894        _view: ViewNumber,
895        _mt: &mut BlockMerkleTree,
896    ) -> anyhow::Result<()> {
897        bail!("state catchup is disabled");
898    }
899
900    async fn try_fetch_chain_config(
901        &self,
902        _retry: usize,
903        commitment: Commitment<ChainConfig>,
904    ) -> anyhow::Result<ChainConfig> {
905        self.chain_configs
906            .get(&commitment)
907            .copied()
908            .context(format!("chain config {commitment} not available"))
909    }
910
911    async fn try_fetch_reward_accounts_v1(
912        &self,
913        _retry: usize,
914        _instance: &NodeState,
915        _height: u64,
916        _view: ViewNumber,
917        _fee_merkle_tree_root: RewardMerkleCommitmentV1,
918        _account: &[RewardAccountV1],
919    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
920        bail!("state catchup is disabled");
921    }
922
923    fn backoff(&self) -> &BackoffParams {
924        &self.backoff
925    }
926
927    fn name(&self) -> String {
928        "NullStateCatchup".into()
929    }
930
931    fn is_local(&self) -> bool {
932        true
933    }
934}
935
936/// A catchup implementation that parallelizes requests to many providers.
937/// It returns the result of the first non-erroring provider to complete.
938#[derive(Clone)]
939pub struct ParallelStateCatchup {
940    providers: Arc<Mutex<Vec<Arc<dyn StateCatchup>>>>,
941}
942
943impl ParallelStateCatchup {
944    /// Create a new [`ParallelStateCatchup`] with two providers.
945    pub fn new(providers: &[Arc<dyn StateCatchup>]) -> Self {
946        Self {
947            providers: Arc::new(Mutex::new(providers.to_vec())),
948        }
949    }
950
951    /// Add a provider to the list of providers
952    pub fn add_provider(&self, provider: Arc<dyn StateCatchup>) {
953        self.providers.lock().push(provider);
954    }
955
956    /// Perform an async operation on all local providers, returning the first result to succeed
957    pub async fn on_local_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
958    where
959        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
960        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
961        RT: Send + Sync + 'static,
962    {
963        self.on_providers(|provider| provider.is_local(), closure)
964            .await
965    }
966
967    /// Perform an async operation on all remote providers, returning the first result to succeed
968    pub async fn on_remote_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
969    where
970        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
971        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
972        RT: Send + Sync + 'static,
973    {
974        self.on_providers(|provider| !provider.is_local(), closure)
975            .await
976    }
977
978    /// Perform an async operation on all providers matching the given predicate, returning the first result to succeed
979    pub async fn on_providers<P, C, F, RT>(&self, predicate: P, closure: C) -> anyhow::Result<RT>
980    where
981        P: Fn(&Arc<dyn StateCatchup>) -> bool + Clone + Send + Sync + 'static,
982        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
983        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
984        RT: Send + Sync + 'static,
985    {
986        // Make sure we have at least one provider
987        let providers = self.providers.lock().clone();
988        if providers.is_empty() {
989            return Err(anyhow::anyhow!("no providers were initialized"));
990        }
991
992        // Filter the providers by the predicate
993        let providers = providers.into_iter().filter(predicate).collect::<Vec<_>>();
994        if providers.is_empty() {
995            return Err(anyhow::anyhow!("no providers matched the given predicate"));
996        }
997
998        // Spawn futures for each provider
999        let mut futures = FuturesUnordered::new();
1000        for provider in providers {
1001            let closure = closure.clone();
1002            futures.push(AbortOnDropHandle::new(tokio::spawn(closure(provider))));
1003        }
1004
1005        // Return the first successful result
1006        while let Some(result) = futures.next().await {
1007            // Unwrap the inner (join) result
1008            let result = match result {
1009                Ok(res) => res,
1010                Err(err) => {
1011                    warn!("Failed to join on provider: {err:#}. Trying next provider...");
1012                    continue;
1013                },
1014            };
1015
1016            // If a provider fails, print why
1017            let result = match result {
1018                Ok(res) => res,
1019                Err(err) => {
1020                    warn!("Failed to fetch data: {err:#}. Trying next provider...");
1021                    continue;
1022                },
1023            };
1024
1025            return Ok(result);
1026        }
1027
1028        Err(anyhow::anyhow!("no providers returned a successful result"))
1029    }
1030}
1031
1032macro_rules! clone {
1033    ( ($( $x:ident ),*) $y:expr ) => {
1034        {
1035            $(let $x = $x.clone();)*
1036            $y
1037        }
1038    };
1039}
1040
1041/// A catchup implementation that parallelizes requests to a local and remote provider.
1042/// It returns the result of the first provider to complete.
1043#[async_trait]
1044impl StateCatchup for ParallelStateCatchup {
1045    async fn try_fetch_leaf(
1046        &self,
1047        retry: usize,
1048        height: u64,
1049        stake_table: HSStakeTable<SeqTypes>,
1050        success_threshold: U256,
1051    ) -> anyhow::Result<Leaf2> {
1052        // Try fetching the leaf on the local providers first
1053        let local_result = self
1054            .on_local_providers(clone! {(stake_table) move |provider| {
1055                clone!{(stake_table) async move {
1056                    provider
1057                        .try_fetch_leaf(retry, height, stake_table, success_threshold)
1058                        .await
1059                }}
1060            }})
1061            .await;
1062
1063        // Check if we were successful locally
1064        if local_result.is_ok() {
1065            return local_result;
1066        }
1067
1068        // If that fails, try the remote ones
1069        self.on_remote_providers(clone! {(stake_table) move |provider| {
1070            clone!{(stake_table) async move {
1071                provider
1072                    .try_fetch_leaf(retry, height, stake_table, success_threshold)
1073                    .await
1074            }}
1075        }})
1076        .await
1077    }
1078
1079    async fn try_fetch_accounts(
1080        &self,
1081        retry: usize,
1082        instance: &NodeState,
1083        height: u64,
1084        view: ViewNumber,
1085        fee_merkle_tree_root: FeeMerkleCommitment,
1086        accounts: &[FeeAccount],
1087    ) -> anyhow::Result<Vec<FeeAccountProof>> {
1088        // Try to get the accounts on local providers first
1089        let accounts_vec = accounts.to_vec();
1090        let local_result = self
1091            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1092                clone! {(instance, accounts_vec) async move {
1093                    provider
1094                        .try_fetch_accounts(
1095                            retry,
1096                            &instance,
1097                            height,
1098                            view,
1099                            fee_merkle_tree_root,
1100                            &accounts_vec,
1101                        )
1102                        .await
1103                }}
1104            }})
1105            .await;
1106
1107        // Check if we were successful locally
1108        if local_result.is_ok() {
1109            return local_result;
1110        }
1111
1112        // If that fails, try the remote ones
1113        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1114            clone!{(instance, accounts_vec) async move {
1115                provider
1116                .try_fetch_accounts(
1117                    retry,
1118                    &instance,
1119                    height,
1120                    view,
1121                    fee_merkle_tree_root,
1122                    &accounts_vec,
1123                ).await
1124            }}
1125        }})
1126        .await
1127    }
1128
1129    async fn try_remember_blocks_merkle_tree(
1130        &self,
1131        retry: usize,
1132        instance: &NodeState,
1133        height: u64,
1134        view: ViewNumber,
1135        mt: &mut BlockMerkleTree,
1136    ) -> anyhow::Result<()> {
1137        // Try to remember the blocks merkle tree on local providers first
1138        let local_result = self
1139            .on_local_providers(clone! {(mt, instance) move |provider| {
1140                let mut mt = mt.clone();
1141                clone! {(instance) async move {
1142                    // Perform the call
1143                    provider
1144                        .try_remember_blocks_merkle_tree(
1145                            retry,
1146                            &instance,
1147                            height,
1148                            view,
1149                            &mut mt,
1150                        )
1151                        .await?;
1152
1153                    // Return the merkle tree so we can modify it
1154                    Ok(mt)
1155                }}
1156            }})
1157            .await;
1158
1159        // Check if we were successful locally
1160        if let Ok(modified_mt) = local_result {
1161            // Set the merkle tree to the output of the successful local call
1162            *mt = modified_mt;
1163
1164            return Ok(());
1165        }
1166
1167        // If that fails, try the remote ones
1168        let remote_result = self
1169            .on_remote_providers(clone! {(mt, instance) move |provider| {
1170                let mut mt = mt.clone();
1171                clone!{(instance) async move {
1172                    // Perform the call
1173                    provider
1174                    .try_remember_blocks_merkle_tree(
1175                        retry,
1176                        &instance,
1177                        height,
1178                        view,
1179                        &mut mt,
1180                    )
1181                    .await?;
1182
1183                    // Return the merkle tree
1184                    Ok(mt)
1185                }}
1186            }})
1187            .await?;
1188
1189        // Update the original, local merkle tree
1190        *mt = remote_result;
1191
1192        Ok(())
1193    }
1194
1195    async fn try_fetch_chain_config(
1196        &self,
1197        retry: usize,
1198        commitment: Commitment<ChainConfig>,
1199    ) -> anyhow::Result<ChainConfig> {
1200        // Try fetching the chain config on the local providers first
1201        let local_result = self
1202            .on_local_providers(move |provider| async move {
1203                provider.try_fetch_chain_config(retry, commitment).await
1204            })
1205            .await;
1206
1207        // Check if we were successful locally
1208        if local_result.is_ok() {
1209            return local_result;
1210        }
1211
1212        // If that fails, try the remote ones
1213        self.on_remote_providers(move |provider| async move {
1214            provider.try_fetch_chain_config(retry, commitment).await
1215        })
1216        .await
1217    }
1218
1219    async fn try_fetch_reward_accounts_v2(
1220        &self,
1221        retry: usize,
1222        instance: &NodeState,
1223        height: u64,
1224        view: ViewNumber,
1225        reward_merkle_tree_root: RewardMerkleCommitmentV2,
1226        accounts: &[RewardAccountV2],
1227    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
1228        // Try to get the accounts on local providers first
1229        let accounts_vec = accounts.to_vec();
1230        let local_result = self
1231            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1232                clone! {(instance, accounts_vec) async move {
1233                    provider
1234                        .try_fetch_reward_accounts_v2(
1235                            retry,
1236                            &instance,
1237                            height,
1238                            view,
1239                            reward_merkle_tree_root,
1240                            &accounts_vec,
1241                        )
1242                        .await
1243                }}
1244            }})
1245            .await;
1246
1247        // Check if we were successful locally
1248        if local_result.is_ok() {
1249            return local_result;
1250        }
1251
1252        // If that fails, try the remote ones
1253        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1254            clone!{(instance, accounts_vec) async move {
1255                provider
1256                .try_fetch_reward_accounts_v2(
1257                    retry,
1258                    &instance,
1259                    height,
1260                    view,
1261                    reward_merkle_tree_root,
1262                    &accounts_vec,
1263                ).await
1264            }}
1265        }})
1266        .await
1267    }
1268
1269    async fn try_fetch_reward_accounts_v1(
1270        &self,
1271        retry: usize,
1272        instance: &NodeState,
1273        height: u64,
1274        view: ViewNumber,
1275        reward_merkle_tree_root: RewardMerkleCommitmentV1,
1276        accounts: &[RewardAccountV1],
1277    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1278        // Try to get the accounts on local providers first
1279        let accounts_vec = accounts.to_vec();
1280        let local_result = self
1281            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1282                clone! {(instance, accounts_vec) async move {
1283                    provider
1284                        .try_fetch_reward_accounts_v1(
1285                            retry,
1286                            &instance,
1287                            height,
1288                            view,
1289                            reward_merkle_tree_root,
1290                            &accounts_vec,
1291                        )
1292                        .await
1293                }}
1294            }})
1295            .await;
1296
1297        // Check if we were successful locally
1298        if local_result.is_ok() {
1299            return local_result;
1300        }
1301
1302        // If that fails, try the remote ones
1303        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1304            clone!{(instance, accounts_vec) async move {
1305                provider
1306                .try_fetch_reward_accounts_v1(
1307                    retry,
1308                    &instance,
1309                    height,
1310                    view,
1311                    reward_merkle_tree_root,
1312                    &accounts_vec,
1313                ).await
1314            }}
1315        }})
1316        .await
1317    }
1318
1319    fn backoff(&self) -> &BackoffParams {
1320        unreachable!()
1321    }
1322
1323    fn name(&self) -> String {
1324        format!(
1325            "[{}]",
1326            self.providers
1327                .lock()
1328                .iter()
1329                .map(|p| p.name())
1330                .collect::<Vec<_>>()
1331                .join(", ")
1332        )
1333    }
1334
1335    async fn fetch_accounts(
1336        &self,
1337        instance: &NodeState,
1338        height: u64,
1339        view: ViewNumber,
1340        fee_merkle_tree_root: FeeMerkleCommitment,
1341        accounts: Vec<FeeAccount>,
1342    ) -> anyhow::Result<Vec<FeeAccountProof>> {
1343        // Try to get the accounts on local providers first
1344        let accounts_vec = accounts.to_vec();
1345        let local_result = self
1346            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1347                clone! {(instance, accounts_vec) async move {
1348                    provider
1349                        .try_fetch_accounts(
1350                            0,
1351                            &instance,
1352                            height,
1353                            view,
1354                            fee_merkle_tree_root,
1355                            &accounts_vec,
1356                        )
1357                        .await
1358                }}
1359            }})
1360            .await;
1361
1362        // Check if we were successful locally
1363        if local_result.is_ok() {
1364            return local_result;
1365        }
1366
1367        // If that fails, try the remote ones (with retry)
1368        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1369            clone!{(instance, accounts_vec) async move {
1370                provider
1371                .fetch_accounts(
1372                    &instance,
1373                    height,
1374                    view,
1375                    fee_merkle_tree_root,
1376                    accounts_vec,
1377                ).await
1378            }}
1379        }})
1380        .await
1381    }
1382
1383    async fn fetch_leaf(
1384        &self,
1385        height: u64,
1386        stake_table: HSStakeTable<SeqTypes>,
1387        success_threshold: U256,
1388    ) -> anyhow::Result<Leaf2> {
1389        // Try fetching the leaf on the local providers first
1390        let local_result = self
1391            .on_local_providers(clone! {(stake_table) move |provider| {
1392                clone!{(stake_table) async move {
1393                    provider
1394                        .try_fetch_leaf(0, height, stake_table, success_threshold)
1395                        .await
1396                }}
1397            }})
1398            .await;
1399
1400        // Check if we were successful locally
1401        if local_result.is_ok() {
1402            return local_result;
1403        }
1404
1405        // If that fails, try the remote ones (with retry)
1406        self.on_remote_providers(clone! {(stake_table) move |provider| {
1407         clone!{(stake_table) async move {
1408             provider
1409                 .fetch_leaf(height, stake_table, success_threshold)
1410                 .await
1411         }}
1412        }})
1413        .await
1414    }
1415
1416    async fn fetch_chain_config(
1417        &self,
1418        commitment: Commitment<ChainConfig>,
1419    ) -> anyhow::Result<ChainConfig> {
1420        // Try fetching the chain config on the local providers first
1421        let local_result = self
1422            .on_local_providers(move |provider| async move {
1423                provider.try_fetch_chain_config(0, commitment).await
1424            })
1425            .await;
1426
1427        // Check if we were successful locally
1428        if local_result.is_ok() {
1429            return local_result;
1430        }
1431
1432        // If that fails, try the remote ones (with retry)
1433        self.on_remote_providers(move |provider| async move {
1434            provider.fetch_chain_config(commitment).await
1435        })
1436        .await
1437    }
1438
1439    async fn fetch_reward_accounts_v2(
1440        &self,
1441        instance: &NodeState,
1442        height: u64,
1443        view: ViewNumber,
1444        reward_merkle_tree_root: RewardMerkleCommitmentV2,
1445        accounts: Vec<RewardAccountV2>,
1446    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
1447        // Try to get the accounts on local providers first
1448        let accounts_vec = accounts.to_vec();
1449        let local_result = self
1450            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1451                clone! {(instance, accounts_vec) async move {
1452                    provider
1453                        .try_fetch_reward_accounts_v2(
1454                            0,
1455                            &instance,
1456                            height,
1457                            view,
1458                            reward_merkle_tree_root,
1459                            &accounts_vec,
1460                        )
1461                        .await
1462                }}
1463            }})
1464            .await;
1465
1466        // Check if we were successful locally
1467        if local_result.is_ok() {
1468            return local_result;
1469        }
1470
1471        // If that fails, try the remote ones (with retry)
1472        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1473            clone!{(instance, accounts_vec) async move {
1474                provider
1475                .fetch_reward_accounts_v2(
1476                    &instance,
1477                    height,
1478                    view,
1479                    reward_merkle_tree_root,
1480                    accounts_vec,
1481                ).await
1482            }}
1483        }})
1484        .await
1485    }
1486
1487    async fn fetch_reward_accounts_v1(
1488        &self,
1489        instance: &NodeState,
1490        height: u64,
1491        view: ViewNumber,
1492        reward_merkle_tree_root: RewardMerkleCommitmentV1,
1493        accounts: Vec<RewardAccountV1>,
1494    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1495        // Try to get the accounts on local providers first
1496        let accounts_vec = accounts.to_vec();
1497        let local_result = self
1498            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1499                clone! {(instance, accounts_vec) async move {
1500                    provider
1501                        .try_fetch_reward_accounts_v1(
1502                            0,
1503                            &instance,
1504                            height,
1505                            view,
1506                            reward_merkle_tree_root,
1507                            &accounts_vec,
1508                        )
1509                        .await
1510                }}
1511            }})
1512            .await;
1513
1514        // Check if we were successful locally
1515        if local_result.is_ok() {
1516            return local_result;
1517        }
1518
1519        // If that fails, try the remote ones (with retry)
1520        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1521            clone!{(instance, accounts_vec) async move {
1522                provider
1523                .fetch_reward_accounts_v1(
1524                    &instance,
1525                    height,
1526                    view,
1527                    reward_merkle_tree_root,
1528                    accounts_vec,
1529                ).await
1530            }}
1531        }})
1532        .await
1533    }
1534    async fn remember_blocks_merkle_tree(
1535        &self,
1536        instance: &NodeState,
1537        height: u64,
1538        view: ViewNumber,
1539        mt: &mut BlockMerkleTree,
1540    ) -> anyhow::Result<()> {
1541        // Try to remember the blocks merkle tree on local providers first
1542        let local_result = self
1543            .on_local_providers(clone! {(mt, instance) move |provider| {
1544                let mut mt = mt.clone();
1545                clone! {(instance) async move {
1546                    // Perform the call
1547                    provider
1548                        .try_remember_blocks_merkle_tree(
1549                            0,
1550                            &instance,
1551                            height,
1552                            view,
1553                            &mut mt,
1554                        )
1555                        .await?;
1556
1557                    // Return the merkle tree so we can modify it
1558                    Ok(mt)
1559                }}
1560            }})
1561            .await;
1562
1563        // Check if we were successful locally
1564        if let Ok(modified_mt) = local_result {
1565            // Set the merkle tree to the one with the
1566            // successful call
1567            *mt = modified_mt;
1568
1569            return Ok(());
1570        }
1571
1572        // If that fails, try the remote ones (with retry)
1573        let remote_result = self
1574            .on_remote_providers(clone! {(mt, instance) move |provider| {
1575                let mut mt = mt.clone();
1576                clone!{(instance) async move {
1577                    // Perform the call
1578                    provider
1579                    .remember_blocks_merkle_tree(
1580                        &instance,
1581                        height,
1582                        view,
1583                        &mut mt,
1584                    )
1585                    .await?;
1586
1587                    // Return the merkle tree
1588                    Ok(mt)
1589                }}
1590            }})
1591            .await?;
1592
1593        // Update the original, local merkle tree
1594        *mt = remote_result;
1595
1596        Ok(())
1597    }
1598
1599    fn is_local(&self) -> bool {
1600        self.providers.lock().iter().all(|p| p.is_local())
1601    }
1602}
1603
1604/// Add accounts to the in-memory consensus state.
1605/// We use this during catchup after receiving verified accounts.
1606#[allow(clippy::type_complexity)]
1607pub async fn add_fee_accounts_to_state<
1608    N: ConnectedNetwork<PubKey>,
1609    V: Versions,
1610    P: SequencerPersistence,
1611>(
1612    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1613    view: &<SeqTypes as NodeType>::View,
1614    accounts: &[FeeAccount],
1615    tree: &FeeMerkleTree,
1616    leaf: Leaf2,
1617) -> anyhow::Result<()> {
1618    // Get the consensus handle
1619    let mut consensus = consensus.write().await;
1620
1621    let (state, delta) = match consensus.validated_state_map().get(view) {
1622        Some(View {
1623            view_inner: ViewInner::Leaf { state, delta, .. },
1624        }) => {
1625            let mut state = (**state).clone();
1626
1627            // Add the fetched accounts to the state.
1628            for account in accounts {
1629                if let Some((proof, _)) = FeeAccountProof::prove(tree, (*account).into()) {
1630                    if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
1631                        tracing::warn!(
1632                            ?view,
1633                            %account,
1634                            "cannot update fetched account state: {err:#}"
1635                        );
1636                    }
1637                } else {
1638                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1639                };
1640            }
1641
1642            (Arc::new(state), delta.clone())
1643        },
1644        _ => {
1645            // If we don't already have a leaf for this view, or if we don't have the view
1646            // at all, we can create a new view based on the recovered leaf and add it to
1647            // our state map. In this case, we must also add the leaf to the saved leaves
1648            // map to ensure consistency.
1649            let mut state = ValidatedState::from_header(leaf.block_header());
1650            state.fee_merkle_tree = tree.clone();
1651            (Arc::new(state), None)
1652        },
1653    };
1654
1655    consensus
1656        .update_leaf(leaf, Arc::clone(&state), delta)
1657        .with_context(|| "failed to update leaf")?;
1658
1659    Ok(())
1660}
1661
1662/// Add accounts to the in-memory consensus state.
1663/// We use this during catchup after receiving verified accounts.
1664#[allow(clippy::type_complexity)]
1665pub async fn add_v2_reward_accounts_to_state<
1666    N: ConnectedNetwork<PubKey>,
1667    V: Versions,
1668    P: SequencerPersistence,
1669>(
1670    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1671    view: &<SeqTypes as NodeType>::View,
1672    accounts: &[RewardAccountV2],
1673    tree: &RewardMerkleTreeV2,
1674    leaf: Leaf2,
1675) -> anyhow::Result<()> {
1676    // Get the consensus handle
1677    let mut consensus = consensus.write().await;
1678
1679    let (state, delta) = match consensus.validated_state_map().get(view) {
1680        Some(View {
1681            view_inner: ViewInner::Leaf { state, delta, .. },
1682        }) => {
1683            let mut state = (**state).clone();
1684
1685            // Add the fetched accounts to the state.
1686            for account in accounts {
1687                if let Some((proof, _)) = RewardAccountProofV2::prove(tree, (*account).into()) {
1688                    if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v2) {
1689                        tracing::warn!(
1690                            ?view,
1691                            %account,
1692                            "cannot update fetched account state: {err:#}"
1693                        );
1694                    }
1695                } else {
1696                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1697                };
1698            }
1699
1700            (Arc::new(state), delta.clone())
1701        },
1702        _ => {
1703            // If we don't already have a leaf for this view, or if we don't have the view
1704            // at all, we can create a new view based on the recovered leaf and add it to
1705            // our state map. In this case, we must also add the leaf to the saved leaves
1706            // map to ensure consistency.
1707            let mut state = ValidatedState::from_header(leaf.block_header());
1708            state.reward_merkle_tree_v2 = tree.clone();
1709            (Arc::new(state), None)
1710        },
1711    };
1712
1713    consensus
1714        .update_leaf(leaf, Arc::clone(&state), delta)
1715        .with_context(|| "failed to update leaf")?;
1716
1717    Ok(())
1718}
1719
1720/// Add accounts to the in-memory consensus state.
1721/// We use this during catchup after receiving verified accounts.
1722#[allow(clippy::type_complexity)]
1723pub async fn add_v1_reward_accounts_to_state<
1724    N: ConnectedNetwork<PubKey>,
1725    V: Versions,
1726    P: SequencerPersistence,
1727>(
1728    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1729    view: &<SeqTypes as NodeType>::View,
1730    accounts: &[RewardAccountV1],
1731    tree: &RewardMerkleTreeV1,
1732    leaf: Leaf2,
1733) -> anyhow::Result<()> {
1734    // Get the consensus handle
1735    let mut consensus = consensus.write().await;
1736
1737    let (state, delta) = match consensus.validated_state_map().get(view) {
1738        Some(View {
1739            view_inner: ViewInner::Leaf { state, delta, .. },
1740        }) => {
1741            let mut state = (**state).clone();
1742
1743            // Add the fetched accounts to the state.
1744            for account in accounts {
1745                if let Some((proof, _)) = RewardAccountProofV1::prove(tree, (*account).into()) {
1746                    if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v1) {
1747                        tracing::warn!(
1748                            ?view,
1749                            %account,
1750                            "cannot update fetched account state: {err:#}"
1751                        );
1752                    }
1753                } else {
1754                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1755                };
1756            }
1757
1758            (Arc::new(state), delta.clone())
1759        },
1760        _ => {
1761            // If we don't already have a leaf for this view, or if we don't have the view
1762            // at all, we can create a new view based on the recovered leaf and add it to
1763            // our state map. In this case, we must also add the leaf to the saved leaves
1764            // map to ensure consistency.
1765            let mut state = ValidatedState::from_header(leaf.block_header());
1766            state.reward_merkle_tree_v1 = tree.clone();
1767            (Arc::new(state), None)
1768        },
1769    };
1770
1771    consensus
1772        .update_leaf(leaf, Arc::clone(&state), delta)
1773        .with_context(|| "failed to update leaf")?;
1774
1775    Ok(())
1776}
1777
1778#[cfg(test)]
1779mod test {
1780    use super::*;
1781
1782    #[test]
1783    fn test_peer_priority() {
1784        let good_peer = PeerScore {
1785            requests: 1000,
1786            failures: 2,
1787        };
1788        let bad_peer = PeerScore {
1789            requests: 10,
1790            failures: 1,
1791        };
1792        assert!(good_peer > bad_peer);
1793
1794        let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect();
1795        assert_eq!(peers.pop(), Some((0, good_peer)));
1796        assert_eq!(peers.pop(), Some((1, bad_peer)));
1797    }
1798}