sequencer/
catchup.rs

1use std::{
2    cmp::Ordering,
3    collections::HashMap,
4    fmt::{Debug, Display},
5    sync::Arc,
6    time::Duration,
7};
8
9use alloy::primitives::U256;
10use anyhow::{anyhow, bail, ensure, Context};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::{Commitment, Committable};
14use espresso_types::{
15    config::PublicNetworkConfig,
16    traits::SequencerPersistence,
17    v0::traits::StateCatchup,
18    v0_3::{
19        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
20        RewardMerkleTreeV1,
21    },
22    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2, RewardMerkleTreeV2},
23    BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
24    FeeMerkleTree, Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions, ValidatedState,
25};
26use futures::{
27    future::{Future, FutureExt, TryFuture, TryFutureExt},
28    stream::FuturesUnordered,
29    StreamExt,
30};
31use hotshot_types::{
32    consensus::Consensus,
33    data::ViewNumber,
34    message::UpgradeLock,
35    network::NetworkConfig,
36    simple_certificate::LightClientStateUpdateCertificateV2,
37    stake_table::HSStakeTable,
38    traits::{
39        metrics::{Counter, CounterFamily, Metrics},
40        network::ConnectedNetwork,
41        node_implementation::{ConsensusTime as _, NodeType, Versions},
42        ValidatedState as ValidatedStateTrait,
43    },
44    utils::{verify_leaf_chain, View, ViewInner},
45    ValidatorConfig,
46};
47use itertools::Itertools;
48use jf_merkle_tree_compat::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme};
49use parking_lot::Mutex;
50use priority_queue::PriorityQueue;
51use serde::de::DeserializeOwned;
52use surf_disco::Request;
53use tide_disco::error::ServerError;
54use tokio::time::timeout;
55use tokio_util::task::AbortOnDropHandle;
56use url::Url;
57use vbs::version::StaticVersionType;
58
59use crate::api::BlocksFrontier;
60
61// This newtype is probably not worth having. It's only used to be able to log
62// URLs before doing requests.
63#[derive(Debug, Clone)]
64struct Client<ServerError, ApiVer: StaticVersionType> {
65    inner: surf_disco::Client<ServerError, ApiVer>,
66    url: Url,
67    requests: Arc<Box<dyn Counter>>,
68    failures: Arc<Box<dyn Counter>>,
69}
70
71impl<ApiVer: StaticVersionType> Client<ServerError, ApiVer> {
72    pub fn new(
73        url: Url,
74        requests: &(impl CounterFamily + ?Sized),
75        failures: &(impl CounterFamily + ?Sized),
76    ) -> Self {
77        Self {
78            inner: surf_disco::Client::new(url.clone()),
79            requests: Arc::new(requests.create(vec![url.to_string()])),
80            failures: Arc::new(failures.create(vec![url.to_string()])),
81            url,
82        }
83    }
84
85    pub fn get<T: DeserializeOwned>(&self, route: &str) -> Request<T, ServerError, ApiVer> {
86        self.inner.get(route)
87    }
88}
89
90/// A score of a catchup peer, based on our interactions with that peer.
91///
92/// The score accounts for malicious peers -- i.e. peers that gave us an invalid response to a
93/// verifiable request -- and faulty/unreliable peers -- those that fail to respond to requests at
94/// all. The score has a comparison function where higher is better, or in other words `p1 > p2`
95/// means we believe we are more likely to successfully catch up using `p1` than `p2`. This makes it
96/// convenient and efficient to collect peers in a priority queue which we can easily convert to a
97/// list sorted by reliability.
98#[derive(Clone, Copy, Debug, Default)]
99struct PeerScore {
100    requests: usize,
101    failures: usize,
102}
103
104impl Ord for PeerScore {
105    fn cmp(&self, other: &Self) -> Ordering {
106        // Compare failure rates: `self` is better than `other` if
107        //      self.failures / self.requests < other.failures / other.requests
108        // or equivalently
109        //      other.failures * self.requests > self.failures * other.requests
110        (other.failures * self.requests).cmp(&(self.failures * other.requests))
111    }
112}
113
114impl PartialOrd for PeerScore {
115    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
116        Some(self.cmp(other))
117    }
118}
119
120impl PartialEq for PeerScore {
121    fn eq(&self, other: &Self) -> bool {
122        self.cmp(other).is_eq()
123    }
124}
125
126impl Eq for PeerScore {}
127
128#[derive(Debug, Clone, Default)]
129pub struct StatePeers<ApiVer: StaticVersionType> {
130    // Peer IDs, ordered by reliability score. Each ID is an index into `clients`.
131    scores: Arc<RwLock<PriorityQueue<usize, PeerScore>>>,
132    clients: Vec<Client<ServerError, ApiVer>>,
133    backoff: BackoffParams,
134}
135
136impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
137    async fn fetch<Fut>(
138        &self,
139        retry: usize,
140        f: impl Fn(Client<ServerError, ApiVer>) -> Fut,
141    ) -> anyhow::Result<Fut::Ok>
142    where
143        Fut: TryFuture<Error: Display>,
144    {
145        // Since we have generally have multiple peers we can catch up from, we want a fairly
146        // aggressive timeout for requests: if a peer is not responding quickly, we're better off
147        // just trying the next one rather than waiting, and this prevents a malicious peer from
148        // delaying catchup for a long time.
149        //
150        // However, if we set the timeout _too_ aggressively, we might fail to catch up even from an
151        // honest peer, and thus never make progress. Thus, we start with a timeout of 500ms, which
152        // is aggressive but still very reasonable for an HTTP request. If that fails with all of
153        // our peers, we increase the timeout by 1 second for each successive retry, until we
154        // eventually succeed.
155        let timeout_dur = Duration::from_millis(500) * (retry as u32 + 1);
156
157        // Keep track of which peers we make requests to and which succeed (`true`) or fail (`false`),
158        // so we can update reliability scores at the end.
159        let mut requests = HashMap::new();
160        let mut res = Err(anyhow!("failed fetching from every peer"));
161
162        // Try each peer in order of reliability score, until we succeed. We clone out of
163        // `self.scores` because it is small (contains only numeric IDs and scores), so this clone
164        // is a lot cheaper than holding the read lock the entire time we are making requests (which
165        // could be a while).
166        let mut scores = { (*self.scores.read().await).clone() };
167        let mut logs = vec![format!("Fetching failed.\n")];
168        while let Some((id, score)) = scores.pop() {
169            let client = &self.clients[id];
170            tracing::info!("fetching from {}", client.url);
171            match timeout(timeout_dur, f(client.clone()).into_future()).await {
172                Ok(Ok(t)) => {
173                    requests.insert(id, true);
174                    res = Ok(t);
175                    logs = Vec::new();
176                    break;
177                },
178                Ok(Err(err)) => {
179                    tracing::debug!(id, ?score, peer = %client.url, "error from peer: {err:#}");
180                    logs.push(format!(
181                        "Error from peer {} with id {id} and score {score:?}: {err:#}",
182                        client.url
183                    ));
184                    requests.insert(id, false);
185                },
186                Err(_) => {
187                    tracing::debug!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out");
188                    logs.push(format!(
189                        "Error from peer {} with id {id} and score {score:?}: request timed out",
190                        client.url
191                    ));
192                    requests.insert(id, false);
193                },
194            }
195        }
196
197        if !logs.is_empty() {
198            tracing::warn!("{}", logs.join("\n"));
199        }
200
201        // Update client scores.
202        let mut scores = self.scores.write().await;
203        for (id, success) in requests {
204            scores.change_priority_by(&id, |score| {
205                score.requests += 1;
206                self.clients[id].requests.add(1);
207                if !success {
208                    score.failures += 1;
209                    self.clients[id].failures.add(1);
210                }
211            });
212        }
213
214        res
215    }
216
217    pub fn from_urls(
218        urls: Vec<Url>,
219        backoff: BackoffParams,
220        metrics: &(impl Metrics + ?Sized),
221    ) -> Self {
222        if urls.is_empty() {
223            panic!("Cannot create StatePeers with no peers");
224        }
225
226        let metrics = metrics.subgroup("catchup".into());
227        let requests = metrics.counter_family("requests".into(), vec!["peer".into()]);
228        let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]);
229
230        let scores = urls
231            .iter()
232            .enumerate()
233            .map(|(i, _)| (i, PeerScore::default()))
234            .collect();
235        let clients = urls
236            .into_iter()
237            .map(|url| Client::new(url, &*requests, &*failures))
238            .collect();
239
240        Self {
241            clients,
242            scores: Arc::new(RwLock::new(scores)),
243            backoff,
244        }
245    }
246
247    #[tracing::instrument(skip(self, my_own_validator_config))]
248    pub async fn fetch_config(
249        &self,
250        my_own_validator_config: ValidatorConfig<SeqTypes>,
251    ) -> anyhow::Result<NetworkConfig<SeqTypes>> {
252        self.backoff()
253            .retry(self, move |provider, retry| {
254                let my_own_validator_config = my_own_validator_config.clone();
255                async move {
256                    let cfg: PublicNetworkConfig = provider
257                        .fetch(retry, |client| {
258                            let url = client.url.join("config/hotshot").unwrap();
259
260                            reqwest::get(url.clone())
261                        })
262                        .await?
263                        .json()
264                        .await?;
265                    cfg.into_network_config(my_own_validator_config)
266                        .context("fetched config, but failed to convert to private config")
267                }
268                .boxed()
269            })
270            .await
271    }
272}
273
274#[async_trait]
275impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
276    #[tracing::instrument(skip(self, _instance))]
277    async fn try_fetch_accounts(
278        &self,
279        retry: usize,
280        _instance: &NodeState,
281        height: u64,
282        view: ViewNumber,
283        fee_merkle_tree_root: FeeMerkleCommitment,
284        accounts: &[FeeAccount],
285    ) -> anyhow::Result<Vec<FeeAccountProof>> {
286        self.fetch(retry, |client| async move {
287            let tree = client
288                .inner
289                .post::<FeeMerkleTree>(&format!("catchup/{height}/{}/accounts", view.u64()))
290                .body_binary(&accounts.to_vec())?
291                .send()
292                .await?;
293
294            // Verify proofs.
295            let mut proofs = Vec::new();
296            for account in accounts {
297                let (proof, _) = FeeAccountProof::prove(&tree, (*account).into())
298                    .context(format!("response missing fee account {account}"))?;
299                proof.verify(&fee_merkle_tree_root).context(format!(
300                    "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
301                ))?;
302                proofs.push(proof);
303            }
304
305            anyhow::Ok(proofs)
306        })
307        .await
308    }
309
310    #[tracing::instrument(skip(self, _instance, mt))]
311    async fn try_remember_blocks_merkle_tree(
312        &self,
313        retry: usize,
314        _instance: &NodeState,
315        height: u64,
316        view: ViewNumber,
317        mt: &mut BlockMerkleTree,
318    ) -> anyhow::Result<()> {
319        *mt = self
320            .fetch(retry, |client| {
321                let mut mt = mt.clone();
322                async move {
323                    let frontier = client
324                        .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
325                        .send()
326                        .await?;
327                    let elem = frontier
328                        .elem()
329                        .context("provided frontier is missing leaf element")?;
330                    mt.remember(mt.num_leaves() - 1, *elem, &frontier)
331                        .context("verifying block proof")?;
332                    anyhow::Ok(mt)
333                }
334            })
335            .await?;
336        Ok(())
337    }
338
339    async fn try_fetch_chain_config(
340        &self,
341        retry: usize,
342        commitment: Commitment<ChainConfig>,
343    ) -> anyhow::Result<ChainConfig> {
344        self.fetch(retry, |client| async move {
345            let cf = client
346                .get::<ChainConfig>(&format!("catchup/chain-config/{commitment}"))
347                .send()
348                .await?;
349            ensure!(
350                cf.commit() == commitment,
351                "received chain config with mismatched commitment: expected {commitment}, got {}",
352                cf.commit()
353            );
354            Ok(cf)
355        })
356        .await
357    }
358
359    async fn try_fetch_leaf(
360        &self,
361        retry: usize,
362        height: u64,
363        stake_table: HSStakeTable<SeqTypes>,
364        success_threshold: U256,
365    ) -> anyhow::Result<Leaf2> {
366        // Get the leaf chain
367        let leaf_chain = self
368            .fetch(retry, |client| async move {
369                let leaf = client
370                    .get::<Vec<Leaf2>>(&format!("catchup/{height}/leafchain"))
371                    .send()
372                    .await?;
373                anyhow::Ok(leaf)
374            })
375            .await
376            .with_context(|| format!("failed to fetch leaf chain at height {height}"))?;
377
378        // Verify it, returning the leaf at the given height
379        verify_leaf_chain(
380            leaf_chain,
381            &stake_table,
382            success_threshold,
383            height,
384            &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
385        )
386        .await
387        .with_context(|| format!("failed to verify leaf chain at height {height}"))
388    }
389
390    #[tracing::instrument(skip(self, _instance))]
391    async fn try_fetch_reward_accounts_v2(
392        &self,
393        retry: usize,
394        _instance: &NodeState,
395        height: u64,
396        view: ViewNumber,
397        reward_merkle_tree_root: RewardMerkleCommitmentV2,
398        accounts: &[RewardAccountV2],
399    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
400        self.fetch(retry, |client| async move {
401            let tree = client
402                .inner
403                .post::<RewardMerkleTreeV2>(&format!(
404                    "catchup/{height}/{}/reward-accounts-v2",
405                    view.u64()
406                ))
407                .body_binary(&accounts.to_vec())?
408                .send()
409                .await?;
410
411            // Verify proofs.
412            // Verify proofs.
413            let mut proofs = Vec::new();
414            for account in accounts {
415                let (proof, _) = RewardAccountProofV2::prove(&tree, (*account).into())
416                    .context(format!("response missing reward account {account}"))?;
417                proof.verify(&reward_merkle_tree_root).context(format!(
418                    "invalid proof for v2 reward account {account}, root: \
419                     {reward_merkle_tree_root} height {height} view {view}"
420                ))?;
421                proofs.push(proof);
422            }
423
424            anyhow::Ok(proofs)
425        })
426        .await
427    }
428
429    #[tracing::instrument(skip(self, _instance))]
430    async fn try_fetch_reward_accounts_v1(
431        &self,
432        retry: usize,
433        _instance: &NodeState,
434        height: u64,
435        view: ViewNumber,
436        reward_merkle_tree_root: RewardMerkleCommitmentV1,
437        accounts: &[RewardAccountV1],
438    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
439        self.fetch(retry, |client| async move {
440            let tree = client
441                .inner
442                .post::<RewardMerkleTreeV1>(&format!(
443                    "catchup/{height}/{}/reward-accounts",
444                    view.u64()
445                ))
446                .body_binary(&accounts.to_vec())?
447                .send()
448                .await?;
449
450            // Verify proofs.
451            let mut proofs = Vec::new();
452            for account in accounts {
453                let (proof, _) = RewardAccountProofV1::prove(&tree, (*account).into())
454                    .context(format!("response missing reward account {account}"))?;
455                proof.verify(&reward_merkle_tree_root).context(format!(
456                    "invalid proof for v1 reward account {account}, root: \
457                     {reward_merkle_tree_root} height {height} view {view}"
458                ))?;
459                proofs.push(proof);
460            }
461
462            anyhow::Ok(proofs)
463        })
464        .await
465    }
466
467    async fn try_fetch_state_cert(
468        &self,
469        retry: usize,
470        epoch: u64,
471    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
472        self.fetch(retry, |client| async move {
473            client
474                .get::<LightClientStateUpdateCertificateV2<SeqTypes>>(&format!(
475                    "catchup/{epoch}/state-cert"
476                ))
477                .send()
478                .await
479        })
480        .await
481    }
482
483    fn backoff(&self) -> &BackoffParams {
484        &self.backoff
485    }
486
487    fn name(&self) -> String {
488        format!(
489            "StatePeers({})",
490            self.clients
491                .iter()
492                .map(|client| client.url.to_string())
493                .join(",")
494        )
495    }
496
497    fn is_local(&self) -> bool {
498        false
499    }
500}
501
502pub(crate) trait CatchupStorage: Sync {
503    /// Get the state of the requested `accounts`.
504    ///
505    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
506    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
507    /// This function is intended to be used for catchup, so `view` should be no older than the last
508    /// decided view.
509    ///
510    /// If successful, this function also returns the leaf from `view`, if it is available. This can
511    /// be used to add the recovered state to HotShot's state map, so that future requests can get
512    /// the state from memory rather than storage.
513    fn get_accounts(
514        &self,
515        _instance: &NodeState,
516        _height: u64,
517        _view: ViewNumber,
518        _accounts: &[FeeAccount],
519    ) -> impl Send + Future<Output = anyhow::Result<(FeeMerkleTree, Leaf2)>> {
520        // Merklized state catchup is only supported by persistence backends that provide merklized
521        // state storage. This default implementation is overridden for those that do. Otherwise,
522        // catchup can still be provided by fetching undecided merklized state from consensus
523        // memory.
524        async {
525            bail!("merklized state catchup is not supported for this data source");
526        }
527    }
528
529    fn get_reward_accounts_v1(
530        &self,
531        _instance: &NodeState,
532        _height: u64,
533        _view: ViewNumber,
534        _accounts: &[RewardAccountV1],
535    ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV1, Leaf2)>> {
536        async {
537            bail!("merklized state catchup is not supported for this data source");
538        }
539    }
540
541    fn get_reward_accounts_v2(
542        &self,
543        _instance: &NodeState,
544        _height: u64,
545        _view: ViewNumber,
546        _accounts: &[RewardAccountV2],
547    ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV2, Leaf2)>> {
548        async {
549            bail!("merklized state catchup is not supported for this data source");
550        }
551    }
552
553    fn get_all_reward_accounts(
554        &self,
555        _height: u64,
556        _offset: u64,
557        _limit: u64,
558    ) -> impl Send + Future<Output = anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>>> {
559        async { bail!("merklized state catchup is not supported for this data source") }
560    }
561
562    /// Get the blocks Merkle tree frontier.
563    ///
564    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
565    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
566    /// This function is intended to be used for catchup, so `view` should be no older than the last
567    /// decided view.
568    fn get_frontier(
569        &self,
570        _instance: &NodeState,
571        _height: u64,
572        _view: ViewNumber,
573    ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>> {
574        // Merklized state catchup is only supported by persistence backends that provide merklized
575        // state storage. This default implementation is overridden for those that do. Otherwise,
576        // catchup can still be provided by fetching undecided merklized state from consensus
577        // memory.
578        async {
579            bail!("merklized state catchup is not supported for this data source");
580        }
581    }
582
583    fn get_chain_config(
584        &self,
585        _commitment: Commitment<ChainConfig>,
586    ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>> {
587        async {
588            bail!("chain config catchup is not supported for this data source");
589        }
590    }
591
592    fn get_leaf_chain(
593        &self,
594        _height: u64,
595    ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>> {
596        async {
597            bail!("leaf chain catchup is not supported for this data source");
598        }
599    }
600}
601
602impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {}
603
604impl<T, S> CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
605where
606    T: CatchupStorage,
607    S: Sync,
608{
609    async fn get_accounts(
610        &self,
611        instance: &NodeState,
612        height: u64,
613        view: ViewNumber,
614        accounts: &[FeeAccount],
615    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
616        self.inner()
617            .get_accounts(instance, height, view, accounts)
618            .await
619    }
620
621    async fn get_reward_accounts_v2(
622        &self,
623        instance: &NodeState,
624        height: u64,
625        view: ViewNumber,
626        accounts: &[RewardAccountV2],
627    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
628        self.inner()
629            .get_reward_accounts_v2(instance, height, view, accounts)
630            .await
631    }
632
633    async fn get_all_reward_accounts(
634        &self,
635        height: u64,
636        offset: u64,
637        limit: u64,
638    ) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
639        self.inner()
640            .get_all_reward_accounts(height, offset, limit)
641            .await
642    }
643
644    async fn get_reward_accounts_v1(
645        &self,
646        instance: &NodeState,
647        height: u64,
648        view: ViewNumber,
649        accounts: &[RewardAccountV1],
650    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
651        self.inner()
652            .get_reward_accounts_v1(instance, height, view, accounts)
653            .await
654    }
655
656    async fn get_frontier(
657        &self,
658        instance: &NodeState,
659        height: u64,
660        view: ViewNumber,
661    ) -> anyhow::Result<BlocksFrontier> {
662        self.inner().get_frontier(instance, height, view).await
663    }
664
665    async fn get_chain_config(
666        &self,
667        commitment: Commitment<ChainConfig>,
668    ) -> anyhow::Result<ChainConfig> {
669        self.inner().get_chain_config(commitment).await
670    }
671    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
672        self.inner().get_leaf_chain(height).await
673    }
674}
675
676#[derive(Debug)]
677pub(crate) struct SqlStateCatchup<T> {
678    db: Arc<T>,
679    backoff: BackoffParams,
680}
681
682impl<T> SqlStateCatchup<T> {
683    pub(crate) fn new(db: Arc<T>, backoff: BackoffParams) -> Self {
684        Self { db, backoff }
685    }
686}
687
688#[async_trait]
689impl<T> StateCatchup for SqlStateCatchup<T>
690where
691    T: CatchupStorage + Send + Sync,
692{
693    async fn try_fetch_leaf(
694        &self,
695        _retry: usize,
696        height: u64,
697        stake_table: HSStakeTable<SeqTypes>,
698        success_threshold: U256,
699    ) -> anyhow::Result<Leaf2> {
700        // Get the leaf chain
701        let leaf_chain = self
702            .db
703            .get_leaf_chain(height)
704            .await
705            .with_context(|| "failed to get leaf chain from DB")?;
706
707        // Verify the leaf chain
708        let leaf = verify_leaf_chain(
709            leaf_chain,
710            &stake_table,
711            success_threshold,
712            height,
713            &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
714        )
715        .await
716        .with_context(|| "failed to verify leaf chain")?;
717
718        Ok(leaf)
719    }
720    // TODO: add a test for the account proof validation
721    // issue # 2102 (https://github.com/EspressoSystems/espresso-sequencer/issues/2102)
722    #[tracing::instrument(skip(self, _retry, instance))]
723    async fn try_fetch_accounts(
724        &self,
725        _retry: usize,
726        instance: &NodeState,
727        block_height: u64,
728        view: ViewNumber,
729        fee_merkle_tree_root: FeeMerkleCommitment,
730        accounts: &[FeeAccount],
731    ) -> anyhow::Result<Vec<FeeAccountProof>> {
732        // Get the accounts
733        let (fee_merkle_tree_from_db, _) = self
734            .db
735            .get_accounts(instance, block_height, view, accounts)
736            .await
737            .with_context(|| "failed to get fee accounts from DB")?;
738
739        // Verify the accounts
740        let mut proofs = Vec::new();
741        for account in accounts {
742            let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree_from_db, (*account).into())
743                .context(format!("response missing account {account}"))?;
744            proof.verify(&fee_merkle_tree_root).context(format!(
745                "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
746            ))?;
747            proofs.push(proof);
748        }
749
750        Ok(proofs)
751    }
752
753    #[tracing::instrument(skip(self, _retry, instance, mt))]
754    async fn try_remember_blocks_merkle_tree(
755        &self,
756        _retry: usize,
757        instance: &NodeState,
758        bh: u64,
759        view: ViewNumber,
760        mt: &mut BlockMerkleTree,
761    ) -> anyhow::Result<()> {
762        if bh == 0 {
763            return Ok(());
764        }
765
766        let proof = self.db.get_frontier(instance, bh, view).await?;
767        match proof
768            .proof
769            .first()
770            .context(format!("empty proof for frontier at height {bh}"))?
771        {
772            MerkleNode::Leaf { pos, elem, .. } => mt
773                .remember(pos, elem, proof.clone())
774                .context("failed to remember proof"),
775            _ => bail!("invalid proof"),
776        }
777    }
778
779    async fn try_fetch_chain_config(
780        &self,
781        _retry: usize,
782        commitment: Commitment<ChainConfig>,
783    ) -> anyhow::Result<ChainConfig> {
784        let cf = self.db.get_chain_config(commitment).await?;
785
786        if cf.commit() != commitment {
787            panic!(
788                "Critical error: Mismatched chain config detected. Expected chain config: {:?}, \
789                 but got: {:?}.
790                This may indicate a compromised database",
791                commitment,
792                cf.commit()
793            )
794        }
795
796        Ok(cf)
797    }
798
799    #[tracing::instrument(skip(self, _retry, instance))]
800    async fn try_fetch_reward_accounts_v2(
801        &self,
802        _retry: usize,
803        instance: &NodeState,
804        block_height: u64,
805        view: ViewNumber,
806        reward_merkle_tree_root: RewardMerkleCommitmentV2,
807        accounts: &[RewardAccountV2],
808    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
809        // Get the accounts
810        let (reward_merkle_tree_from_db, _) = self
811            .db
812            .get_reward_accounts_v2(instance, block_height, view, accounts)
813            .await
814            .with_context(|| "failed to get reward accounts from DB")?;
815        // Verify the accounts
816        let mut proofs = Vec::new();
817        for account in accounts {
818            let (proof, _) =
819                RewardAccountProofV2::prove(&reward_merkle_tree_from_db, (*account).into())
820                    .context(format!("response missing account {account}"))?;
821            proof.verify(&reward_merkle_tree_root).context(format!(
822                "invalid proof for v2 reward account {account}, root: {reward_merkle_tree_root}"
823            ))?;
824            proofs.push(proof);
825        }
826
827        Ok(proofs)
828    }
829
830    #[tracing::instrument(skip(self, _retry, instance))]
831    async fn try_fetch_reward_accounts_v1(
832        &self,
833        _retry: usize,
834        instance: &NodeState,
835        block_height: u64,
836        view: ViewNumber,
837        reward_merkle_tree_root: RewardMerkleCommitmentV1,
838        accounts: &[RewardAccountV1],
839    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
840        // Get the accounts
841        let (reward_merkle_tree_from_db, _) = self
842            .db
843            .get_reward_accounts_v1(instance, block_height, view, accounts)
844            .await
845            .with_context(|| "failed to get reward accounts from DB")?;
846        // Verify the accounts
847        let mut proofs = Vec::new();
848        for account in accounts {
849            let (proof, _) =
850                RewardAccountProofV1::prove(&reward_merkle_tree_from_db, (*account).into())
851                    .context(format!("response missing account {account}"))?;
852            proof.verify(&reward_merkle_tree_root).context(format!(
853                "invalid proof for v1 reward account {account}, root: {reward_merkle_tree_root}"
854            ))?;
855            proofs.push(proof);
856        }
857
858        Ok(proofs)
859    }
860
861    async fn try_fetch_state_cert(
862        &self,
863        _retry: usize,
864        _epoch: u64,
865    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
866        bail!("state cert catchup not supported for SqlStateCatchup");
867    }
868
869    fn backoff(&self) -> &BackoffParams {
870        &self.backoff
871    }
872
873    fn name(&self) -> String {
874        "SqlStateCatchup".into()
875    }
876
877    fn is_local(&self) -> bool {
878        true
879    }
880}
881
882/// Disable catchup entirely.
883#[derive(Clone, Debug)]
884pub struct NullStateCatchup {
885    backoff: BackoffParams,
886    chain_configs: HashMap<Commitment<ChainConfig>, ChainConfig>,
887}
888
889impl Default for NullStateCatchup {
890    fn default() -> Self {
891        Self {
892            backoff: BackoffParams::disabled(),
893            chain_configs: Default::default(),
894        }
895    }
896}
897
898impl NullStateCatchup {
899    /// Add a chain config preimage which can be fetched by hash during STF evaluation.
900    ///
901    /// [`NullStateCatchup`] is used to disable catchup entirely when evaluating the STF, which
902    /// requires the [`ValidatedState`](espresso_types::ValidatedState) to be pre-seeded with all
903    /// the dependencies of STF evaluation. However, the STF also depends on having the preimage of
904    /// various [`ChainConfig`] commitments, which are not stored in the
905    /// [`ValidatedState`](espresso_types::ValidatedState), but which instead must be supplied by a
906    /// separate preimage oracle. Thus, [`NullStateCatchup`] may be populated with a set of
907    /// [`ChainConfig`]s, which it can feed to the STF during evaluation.
908    pub fn add_chain_config(&mut self, cf: ChainConfig) {
909        self.chain_configs.insert(cf.commit(), cf);
910    }
911}
912
913#[async_trait]
914impl StateCatchup for NullStateCatchup {
915    async fn try_fetch_leaf(
916        &self,
917        _retry: usize,
918        _height: u64,
919        _stake_table: HSStakeTable<SeqTypes>,
920        _success_threshold: U256,
921    ) -> anyhow::Result<Leaf2> {
922        bail!("state catchup is disabled")
923    }
924
925    async fn try_fetch_accounts(
926        &self,
927        _retry: usize,
928        _instance: &NodeState,
929        _height: u64,
930        _view: ViewNumber,
931        _fee_merkle_tree_root: FeeMerkleCommitment,
932        _account: &[FeeAccount],
933    ) -> anyhow::Result<Vec<FeeAccountProof>> {
934        bail!("state catchup is disabled");
935    }
936
937    async fn try_fetch_reward_accounts_v2(
938        &self,
939        _retry: usize,
940        _instance: &NodeState,
941        _height: u64,
942        _view: ViewNumber,
943        _fee_merkle_tree_root: RewardMerkleCommitmentV2,
944        _account: &[RewardAccountV2],
945    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
946        bail!("state catchup is disabled");
947    }
948
949    async fn try_remember_blocks_merkle_tree(
950        &self,
951        _retry: usize,
952        _instance: &NodeState,
953        _height: u64,
954        _view: ViewNumber,
955        _mt: &mut BlockMerkleTree,
956    ) -> anyhow::Result<()> {
957        bail!("state catchup is disabled");
958    }
959
960    async fn try_fetch_chain_config(
961        &self,
962        _retry: usize,
963        commitment: Commitment<ChainConfig>,
964    ) -> anyhow::Result<ChainConfig> {
965        self.chain_configs
966            .get(&commitment)
967            .copied()
968            .context(format!("chain config {commitment} not available"))
969    }
970
971    async fn try_fetch_reward_accounts_v1(
972        &self,
973        _retry: usize,
974        _instance: &NodeState,
975        _height: u64,
976        _view: ViewNumber,
977        _fee_merkle_tree_root: RewardMerkleCommitmentV1,
978        _account: &[RewardAccountV1],
979    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
980        bail!("state catchup is disabled");
981    }
982
983    async fn try_fetch_state_cert(
984        &self,
985        _retry: usize,
986        _epoch: u64,
987    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
988        bail!("state catchup is disabled");
989    }
990
991    fn backoff(&self) -> &BackoffParams {
992        &self.backoff
993    }
994
995    fn name(&self) -> String {
996        "NullStateCatchup".into()
997    }
998
999    fn is_local(&self) -> bool {
1000        true
1001    }
1002}
1003
1004/// A catchup implementation that parallelizes requests to many providers.
1005/// It returns the result of the first non-erroring provider to complete.
1006#[derive(Clone)]
1007pub struct ParallelStateCatchup {
1008    providers: Arc<Mutex<Vec<Arc<dyn StateCatchup>>>>,
1009    backoff: BackoffParams,
1010}
1011
1012impl ParallelStateCatchup {
1013    /// Create a new [`ParallelStateCatchup`] with two providers.
1014    pub fn new(providers: &[Arc<dyn StateCatchup>]) -> Self {
1015        Self {
1016            providers: Arc::new(Mutex::new(providers.to_vec())),
1017            backoff: BackoffParams::disabled(),
1018        }
1019    }
1020
1021    /// Add a provider to the list of providers
1022    pub fn add_provider(&self, provider: Arc<dyn StateCatchup>) {
1023        self.providers.lock().push(provider);
1024    }
1025
1026    /// Perform an async operation on all local providers, returning the first result to succeed
1027    pub async fn on_local_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1028    where
1029        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1030        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1031        RT: Send + Sync + 'static,
1032    {
1033        self.on_providers(|provider| provider.is_local(), closure)
1034            .await
1035    }
1036
1037    /// Perform an async operation on all remote providers, returning the first result to succeed
1038    pub async fn on_remote_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1039    where
1040        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1041        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1042        RT: Send + Sync + 'static,
1043    {
1044        self.on_providers(|provider| !provider.is_local(), closure)
1045            .await
1046    }
1047
1048    /// Perform an async operation on all providers matching the given predicate, returning the first result to succeed
1049    pub async fn on_providers<P, C, F, RT>(&self, predicate: P, closure: C) -> anyhow::Result<RT>
1050    where
1051        P: Fn(&Arc<dyn StateCatchup>) -> bool + Clone + Send + Sync + 'static,
1052        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1053        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1054        RT: Send + Sync + 'static,
1055    {
1056        // Make sure we have at least one provider
1057        let providers = self.providers.lock().clone();
1058        if providers.is_empty() {
1059            return Err(anyhow::anyhow!("no providers were initialized"));
1060        }
1061
1062        // Filter the providers by the predicate
1063        let providers = providers.into_iter().filter(predicate).collect::<Vec<_>>();
1064        if providers.is_empty() {
1065            return Err(anyhow::anyhow!("no providers matched the given predicate"));
1066        }
1067
1068        // Spawn futures for each provider
1069        let mut futures = FuturesUnordered::new();
1070        for provider in providers {
1071            let closure = closure.clone();
1072            futures.push(AbortOnDropHandle::new(tokio::spawn(closure(provider))));
1073        }
1074
1075        let mut logs = vec![format!("No providers returned a successful result.\n")];
1076        // Return the first successful result
1077        while let Some(result) = futures.next().await {
1078            // Unwrap the inner (join) result
1079            let result = match result {
1080                Ok(res) => res,
1081                Err(err) => {
1082                    tracing::debug!("Failed to join on provider: {err:#}.");
1083                    logs.push(format!("Failed to join on provider: {err:#}."));
1084                    continue;
1085                },
1086            };
1087
1088            // If a provider fails, print why
1089            let result = match result {
1090                Ok(res) => res,
1091                Err(err) => {
1092                    tracing::debug!("Failed to fetch data: {err:#}.");
1093                    logs.push(format!("Failed to fetch data: {err:#}."));
1094                    continue;
1095                },
1096            };
1097
1098            return Ok(result);
1099        }
1100
1101        Err(anyhow::anyhow!(logs.join("\n")))
1102    }
1103}
1104
1105macro_rules! clone {
1106    ( ($( $x:ident ),*) $y:expr ) => {
1107        {
1108            $(let $x = $x.clone();)*
1109            $y
1110        }
1111    };
1112}
1113
1114/// A catchup implementation that parallelizes requests to a local and remote provider.
1115/// It returns the result of the first provider to complete.
1116#[async_trait]
1117impl StateCatchup for ParallelStateCatchup {
1118    async fn try_fetch_leaf(
1119        &self,
1120        retry: usize,
1121        height: u64,
1122        stake_table: HSStakeTable<SeqTypes>,
1123        success_threshold: U256,
1124    ) -> anyhow::Result<Leaf2> {
1125        // Try fetching the leaf on the local providers first
1126        let local_result = self
1127            .on_local_providers(clone! {(stake_table) move |provider| {
1128                clone!{(stake_table) async move {
1129                    provider
1130                        .try_fetch_leaf(retry, height, stake_table, success_threshold)
1131                        .await
1132                }}
1133            }})
1134            .await;
1135
1136        // Check if we were successful locally
1137        if local_result.is_ok() {
1138            return local_result;
1139        }
1140
1141        // If that fails, try the remote ones
1142        self.on_remote_providers(clone! {(stake_table) move |provider| {
1143            clone!{(stake_table) async move {
1144                provider
1145                    .try_fetch_leaf(retry, height, stake_table, success_threshold)
1146                    .await
1147            }}
1148        }})
1149        .await
1150    }
1151
1152    async fn try_fetch_accounts(
1153        &self,
1154        retry: usize,
1155        instance: &NodeState,
1156        height: u64,
1157        view: ViewNumber,
1158        fee_merkle_tree_root: FeeMerkleCommitment,
1159        accounts: &[FeeAccount],
1160    ) -> anyhow::Result<Vec<FeeAccountProof>> {
1161        // Try to get the accounts on local providers first
1162        let accounts_vec = accounts.to_vec();
1163        let local_result = self
1164            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1165                clone! {(instance, accounts_vec) async move {
1166                    provider
1167                        .try_fetch_accounts(
1168                            retry,
1169                            &instance,
1170                            height,
1171                            view,
1172                            fee_merkle_tree_root,
1173                            &accounts_vec,
1174                        )
1175                        .await
1176                }}
1177            }})
1178            .await;
1179
1180        // Check if we were successful locally
1181        if local_result.is_ok() {
1182            return local_result;
1183        }
1184
1185        // If that fails, try the remote ones
1186        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1187            clone!{(instance, accounts_vec) async move {
1188                provider
1189                .try_fetch_accounts(
1190                    retry,
1191                    &instance,
1192                    height,
1193                    view,
1194                    fee_merkle_tree_root,
1195                    &accounts_vec,
1196                ).await
1197            }}
1198        }})
1199        .await
1200    }
1201
1202    async fn try_remember_blocks_merkle_tree(
1203        &self,
1204        retry: usize,
1205        instance: &NodeState,
1206        height: u64,
1207        view: ViewNumber,
1208        mt: &mut BlockMerkleTree,
1209    ) -> anyhow::Result<()> {
1210        // Try to remember the blocks merkle tree on local providers first
1211        let local_result = self
1212            .on_local_providers(clone! {(mt, instance) move |provider| {
1213                let mut mt = mt.clone();
1214                clone! {(instance) async move {
1215                    // Perform the call
1216                    provider
1217                        .try_remember_blocks_merkle_tree(
1218                            retry,
1219                            &instance,
1220                            height,
1221                            view,
1222                            &mut mt,
1223                        )
1224                        .await?;
1225
1226                    // Return the merkle tree so we can modify it
1227                    Ok(mt)
1228                }}
1229            }})
1230            .await;
1231
1232        // Check if we were successful locally
1233        if let Ok(modified_mt) = local_result {
1234            // Set the merkle tree to the output of the successful local call
1235            *mt = modified_mt;
1236
1237            return Ok(());
1238        }
1239
1240        // If that fails, try the remote ones
1241        let remote_result = self
1242            .on_remote_providers(clone! {(mt, instance) move |provider| {
1243                let mut mt = mt.clone();
1244                clone!{(instance) async move {
1245                    // Perform the call
1246                    provider
1247                    .try_remember_blocks_merkle_tree(
1248                        retry,
1249                        &instance,
1250                        height,
1251                        view,
1252                        &mut mt,
1253                    )
1254                    .await?;
1255
1256                    // Return the merkle tree
1257                    Ok(mt)
1258                }}
1259            }})
1260            .await?;
1261
1262        // Update the original, local merkle tree
1263        *mt = remote_result;
1264
1265        Ok(())
1266    }
1267
1268    async fn try_fetch_chain_config(
1269        &self,
1270        retry: usize,
1271        commitment: Commitment<ChainConfig>,
1272    ) -> anyhow::Result<ChainConfig> {
1273        // Try fetching the chain config on the local providers first
1274        let local_result = self
1275            .on_local_providers(move |provider| async move {
1276                provider.try_fetch_chain_config(retry, commitment).await
1277            })
1278            .await;
1279
1280        // Check if we were successful locally
1281        if local_result.is_ok() {
1282            return local_result;
1283        }
1284
1285        // If that fails, try the remote ones
1286        self.on_remote_providers(move |provider| async move {
1287            provider.try_fetch_chain_config(retry, commitment).await
1288        })
1289        .await
1290    }
1291
1292    async fn try_fetch_reward_accounts_v2(
1293        &self,
1294        retry: usize,
1295        instance: &NodeState,
1296        height: u64,
1297        view: ViewNumber,
1298        reward_merkle_tree_root: RewardMerkleCommitmentV2,
1299        accounts: &[RewardAccountV2],
1300    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
1301        // Try to get the accounts on local providers first
1302        let accounts_vec = accounts.to_vec();
1303        let local_result = self
1304            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1305                clone! {(instance, accounts_vec) async move {
1306                    provider
1307                        .try_fetch_reward_accounts_v2(
1308                            retry,
1309                            &instance,
1310                            height,
1311                            view,
1312                            reward_merkle_tree_root,
1313                            &accounts_vec,
1314                        )
1315                        .await
1316                }}
1317            }})
1318            .await;
1319
1320        // Check if we were successful locally
1321        if local_result.is_ok() {
1322            return local_result;
1323        }
1324
1325        // If that fails, try the remote ones
1326        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1327            clone!{(instance, accounts_vec) async move {
1328                provider
1329                .try_fetch_reward_accounts_v2(
1330                    retry,
1331                    &instance,
1332                    height,
1333                    view,
1334                    reward_merkle_tree_root,
1335                    &accounts_vec,
1336                ).await
1337            }}
1338        }})
1339        .await
1340    }
1341
1342    async fn try_fetch_reward_accounts_v1(
1343        &self,
1344        retry: usize,
1345        instance: &NodeState,
1346        height: u64,
1347        view: ViewNumber,
1348        reward_merkle_tree_root: RewardMerkleCommitmentV1,
1349        accounts: &[RewardAccountV1],
1350    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1351        // Try to get the accounts on local providers first
1352        let accounts_vec = accounts.to_vec();
1353        let local_result = self
1354            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1355                clone! {(instance, accounts_vec) async move {
1356                    provider
1357                        .try_fetch_reward_accounts_v1(
1358                            retry,
1359                            &instance,
1360                            height,
1361                            view,
1362                            reward_merkle_tree_root,
1363                            &accounts_vec,
1364                        )
1365                        .await
1366                }}
1367            }})
1368            .await;
1369
1370        // Check if we were successful locally
1371        if local_result.is_ok() {
1372            return local_result;
1373        }
1374
1375        // If that fails, try the remote ones
1376        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1377            clone!{(instance, accounts_vec) async move {
1378                provider
1379                .try_fetch_reward_accounts_v1(
1380                    retry,
1381                    &instance,
1382                    height,
1383                    view,
1384                    reward_merkle_tree_root,
1385                    &accounts_vec,
1386                ).await
1387            }}
1388        }})
1389        .await
1390    }
1391
1392    async fn try_fetch_state_cert(
1393        &self,
1394        retry: usize,
1395        epoch: u64,
1396    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1397        // Try fetching the state cert on the local providers first
1398        let local_result = self
1399            .on_local_providers(move |provider| async move {
1400                provider.try_fetch_state_cert(retry, epoch).await
1401            })
1402            .await;
1403
1404        // Check if we were successful locally
1405        if local_result.is_ok() {
1406            return local_result;
1407        }
1408
1409        // If that fails, try the remote ones
1410        self.on_remote_providers(move |provider| async move {
1411            provider.try_fetch_state_cert(retry, epoch).await
1412        })
1413        .await
1414    }
1415
1416    fn backoff(&self) -> &BackoffParams {
1417        &self.backoff
1418    }
1419
1420    fn name(&self) -> String {
1421        format!(
1422            "[{}]",
1423            self.providers
1424                .lock()
1425                .iter()
1426                .map(|p| p.name())
1427                .collect::<Vec<_>>()
1428                .join(", ")
1429        )
1430    }
1431
1432    async fn fetch_accounts(
1433        &self,
1434        instance: &NodeState,
1435        height: u64,
1436        view: ViewNumber,
1437        fee_merkle_tree_root: FeeMerkleCommitment,
1438        accounts: Vec<FeeAccount>,
1439    ) -> anyhow::Result<Vec<FeeAccountProof>> {
1440        // Try to get the accounts on local providers first
1441        let accounts_vec = accounts.to_vec();
1442        let local_result = self
1443            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1444                clone! {(instance, accounts_vec) async move {
1445                    provider
1446                        .try_fetch_accounts(
1447                            0,
1448                            &instance,
1449                            height,
1450                            view,
1451                            fee_merkle_tree_root,
1452                            &accounts_vec,
1453                        )
1454                        .await
1455                }}
1456            }})
1457            .await;
1458
1459        // Check if we were successful locally
1460        if local_result.is_ok() {
1461            return local_result;
1462        }
1463
1464        // If that fails, try the remote ones (with retry)
1465        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1466            clone!{(instance, accounts_vec) async move {
1467                provider
1468                .fetch_accounts(
1469                    &instance,
1470                    height,
1471                    view,
1472                    fee_merkle_tree_root,
1473                    accounts_vec,
1474                ).await
1475            }}
1476        }})
1477        .await
1478    }
1479
1480    async fn fetch_leaf(
1481        &self,
1482        height: u64,
1483        stake_table: HSStakeTable<SeqTypes>,
1484        success_threshold: U256,
1485    ) -> anyhow::Result<Leaf2> {
1486        // Try fetching the leaf on the local providers first
1487        let local_result = self
1488            .on_local_providers(clone! {(stake_table) move |provider| {
1489                clone!{(stake_table) async move {
1490                    provider
1491                        .try_fetch_leaf(0, height, stake_table, success_threshold)
1492                        .await
1493                }}
1494            }})
1495            .await;
1496
1497        // Check if we were successful locally
1498        if local_result.is_ok() {
1499            return local_result;
1500        }
1501
1502        // If that fails, try the remote ones (with retry)
1503        self.on_remote_providers(clone! {(stake_table) move |provider| {
1504         clone!{(stake_table) async move {
1505             provider
1506                 .fetch_leaf(height, stake_table, success_threshold)
1507                 .await
1508         }}
1509        }})
1510        .await
1511    }
1512
1513    async fn fetch_chain_config(
1514        &self,
1515        commitment: Commitment<ChainConfig>,
1516    ) -> anyhow::Result<ChainConfig> {
1517        // Try fetching the chain config on the local providers first
1518        let local_result = self
1519            .on_local_providers(move |provider| async move {
1520                provider.try_fetch_chain_config(0, commitment).await
1521            })
1522            .await;
1523
1524        // Check if we were successful locally
1525        if local_result.is_ok() {
1526            return local_result;
1527        }
1528
1529        // If that fails, try the remote ones (with retry)
1530        self.on_remote_providers(move |provider| async move {
1531            provider.fetch_chain_config(commitment).await
1532        })
1533        .await
1534    }
1535
1536    async fn fetch_reward_accounts_v2(
1537        &self,
1538        instance: &NodeState,
1539        height: u64,
1540        view: ViewNumber,
1541        reward_merkle_tree_root: RewardMerkleCommitmentV2,
1542        accounts: Vec<RewardAccountV2>,
1543    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
1544        // Try to get the accounts on local providers first
1545        let accounts_vec = accounts.to_vec();
1546        let local_result = self
1547            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1548                clone! {(instance, accounts_vec) async move {
1549                    provider
1550                        .try_fetch_reward_accounts_v2(
1551                            0,
1552                            &instance,
1553                            height,
1554                            view,
1555                            reward_merkle_tree_root,
1556                            &accounts_vec,
1557                        )
1558                        .await
1559                }}
1560            }})
1561            .await;
1562
1563        // Check if we were successful locally
1564        if local_result.is_ok() {
1565            return local_result;
1566        }
1567
1568        // If that fails, try the remote ones (with retry)
1569        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1570            clone!{(instance, accounts_vec) async move {
1571                provider
1572                .fetch_reward_accounts_v2(
1573                    &instance,
1574                    height,
1575                    view,
1576                    reward_merkle_tree_root,
1577                    accounts_vec,
1578                ).await
1579            }}
1580        }})
1581        .await
1582    }
1583
1584    async fn fetch_reward_accounts_v1(
1585        &self,
1586        instance: &NodeState,
1587        height: u64,
1588        view: ViewNumber,
1589        reward_merkle_tree_root: RewardMerkleCommitmentV1,
1590        accounts: Vec<RewardAccountV1>,
1591    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1592        // Try to get the accounts on local providers first
1593        let accounts_vec = accounts.to_vec();
1594        let local_result = self
1595            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1596                clone! {(instance, accounts_vec) async move {
1597                    provider
1598                        .try_fetch_reward_accounts_v1(
1599                            0,
1600                            &instance,
1601                            height,
1602                            view,
1603                            reward_merkle_tree_root,
1604                            &accounts_vec,
1605                        )
1606                        .await
1607                }}
1608            }})
1609            .await;
1610
1611        // Check if we were successful locally
1612        if local_result.is_ok() {
1613            return local_result;
1614        }
1615
1616        // If that fails, try the remote ones (with retry)
1617        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1618            clone!{(instance, accounts_vec) async move {
1619                provider
1620                .fetch_reward_accounts_v1(
1621                    &instance,
1622                    height,
1623                    view,
1624                    reward_merkle_tree_root,
1625                    accounts_vec,
1626                ).await
1627            }}
1628        }})
1629        .await
1630    }
1631
1632    async fn fetch_state_cert(
1633        &self,
1634        epoch: u64,
1635    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1636        let local_result = self
1637            .on_local_providers(move |provider| async move {
1638                provider.try_fetch_state_cert(0, epoch).await
1639            })
1640            .await;
1641
1642        // Check if we were successful locally
1643        if local_result.is_ok() {
1644            return local_result;
1645        }
1646
1647        // If that fails, try the remote ones (with retry)
1648        self.on_remote_providers(
1649            move |provider| async move { provider.fetch_state_cert(epoch).await },
1650        )
1651        .await
1652    }
1653
1654    async fn remember_blocks_merkle_tree(
1655        &self,
1656        instance: &NodeState,
1657        height: u64,
1658        view: ViewNumber,
1659        mt: &mut BlockMerkleTree,
1660    ) -> anyhow::Result<()> {
1661        // Try to remember the blocks merkle tree on local providers first
1662        let local_result = self
1663            .on_local_providers(clone! {(mt, instance) move |provider| {
1664                let mut mt = mt.clone();
1665                clone! {(instance) async move {
1666                    // Perform the call
1667                    provider
1668                        .try_remember_blocks_merkle_tree(
1669                            0,
1670                            &instance,
1671                            height,
1672                            view,
1673                            &mut mt,
1674                        )
1675                        .await?;
1676
1677                    // Return the merkle tree so we can modify it
1678                    Ok(mt)
1679                }}
1680            }})
1681            .await;
1682
1683        // Check if we were successful locally
1684        if let Ok(modified_mt) = local_result {
1685            // Set the merkle tree to the one with the
1686            // successful call
1687            *mt = modified_mt;
1688
1689            return Ok(());
1690        }
1691
1692        // If that fails, try the remote ones (with retry)
1693        let remote_result = self
1694            .on_remote_providers(clone! {(mt, instance) move |provider| {
1695                let mut mt = mt.clone();
1696                clone!{(instance) async move {
1697                    // Perform the call
1698                    provider
1699                    .remember_blocks_merkle_tree(
1700                        &instance,
1701                        height,
1702                        view,
1703                        &mut mt,
1704                    )
1705                    .await?;
1706
1707                    // Return the merkle tree
1708                    Ok(mt)
1709                }}
1710            }})
1711            .await?;
1712
1713        // Update the original, local merkle tree
1714        *mt = remote_result;
1715
1716        Ok(())
1717    }
1718
1719    fn is_local(&self) -> bool {
1720        self.providers.lock().iter().all(|p| p.is_local())
1721    }
1722}
1723
1724/// Add accounts to the in-memory consensus state.
1725/// We use this during catchup after receiving verified accounts.
1726#[allow(clippy::type_complexity)]
1727pub async fn add_fee_accounts_to_state<
1728    N: ConnectedNetwork<PubKey>,
1729    V: Versions,
1730    P: SequencerPersistence,
1731>(
1732    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1733    view: &<SeqTypes as NodeType>::View,
1734    accounts: &[FeeAccount],
1735    tree: &FeeMerkleTree,
1736    leaf: Leaf2,
1737) -> anyhow::Result<()> {
1738    // Get the consensus handle
1739    let mut consensus = consensus.write().await;
1740
1741    let (state, delta) = match consensus.validated_state_map().get(view) {
1742        Some(View {
1743            view_inner: ViewInner::Leaf { state, delta, .. },
1744        }) => {
1745            let mut state = (**state).clone();
1746
1747            // Add the fetched accounts to the state.
1748            for account in accounts {
1749                if let Some((proof, _)) = FeeAccountProof::prove(tree, (*account).into()) {
1750                    if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
1751                        tracing::warn!(
1752                            ?view,
1753                            %account,
1754                            "cannot update fetched account state: {err:#}"
1755                        );
1756                    }
1757                } else {
1758                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1759                };
1760            }
1761
1762            (Arc::new(state), delta.clone())
1763        },
1764        _ => {
1765            // If we don't already have a leaf for this view, or if we don't have the view
1766            // at all, we can create a new view based on the recovered leaf and add it to
1767            // our state map. In this case, we must also add the leaf to the saved leaves
1768            // map to ensure consistency.
1769            let mut state = ValidatedState::from_header(leaf.block_header());
1770            state.fee_merkle_tree = tree.clone();
1771            (Arc::new(state), None)
1772        },
1773    };
1774
1775    consensus
1776        .update_leaf(leaf, Arc::clone(&state), delta)
1777        .with_context(|| "failed to update leaf")?;
1778
1779    Ok(())
1780}
1781
1782/// Add accounts to the in-memory consensus state.
1783/// We use this during catchup after receiving verified accounts.
1784#[allow(clippy::type_complexity)]
1785pub async fn add_v2_reward_accounts_to_state<
1786    N: ConnectedNetwork<PubKey>,
1787    V: Versions,
1788    P: SequencerPersistence,
1789>(
1790    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1791    view: &<SeqTypes as NodeType>::View,
1792    accounts: &[RewardAccountV2],
1793    tree: &RewardMerkleTreeV2,
1794    leaf: Leaf2,
1795) -> anyhow::Result<()> {
1796    // Get the consensus handle
1797    let mut consensus = consensus.write().await;
1798
1799    let (state, delta) = match consensus.validated_state_map().get(view) {
1800        Some(View {
1801            view_inner: ViewInner::Leaf { state, delta, .. },
1802        }) => {
1803            let mut state = (**state).clone();
1804
1805            // Add the fetched accounts to the state.
1806            for account in accounts {
1807                if let Some((proof, _)) = RewardAccountProofV2::prove(tree, (*account).into()) {
1808                    if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v2) {
1809                        tracing::warn!(
1810                            ?view,
1811                            %account,
1812                            "cannot update fetched account state: {err:#}"
1813                        );
1814                    }
1815                } else {
1816                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1817                };
1818            }
1819
1820            (Arc::new(state), delta.clone())
1821        },
1822        _ => {
1823            // If we don't already have a leaf for this view, or if we don't have the view
1824            // at all, we can create a new view based on the recovered leaf and add it to
1825            // our state map. In this case, we must also add the leaf to the saved leaves
1826            // map to ensure consistency.
1827            let mut state = ValidatedState::from_header(leaf.block_header());
1828            state.reward_merkle_tree_v2 = tree.clone();
1829            (Arc::new(state), None)
1830        },
1831    };
1832
1833    consensus
1834        .update_leaf(leaf, Arc::clone(&state), delta)
1835        .with_context(|| "failed to update leaf")?;
1836
1837    Ok(())
1838}
1839
1840/// Add accounts to the in-memory consensus state.
1841/// We use this during catchup after receiving verified accounts.
1842#[allow(clippy::type_complexity)]
1843pub async fn add_v1_reward_accounts_to_state<
1844    N: ConnectedNetwork<PubKey>,
1845    V: Versions,
1846    P: SequencerPersistence,
1847>(
1848    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1849    view: &<SeqTypes as NodeType>::View,
1850    accounts: &[RewardAccountV1],
1851    tree: &RewardMerkleTreeV1,
1852    leaf: Leaf2,
1853) -> anyhow::Result<()> {
1854    // Get the consensus handle
1855    let mut consensus = consensus.write().await;
1856
1857    let (state, delta) = match consensus.validated_state_map().get(view) {
1858        Some(View {
1859            view_inner: ViewInner::Leaf { state, delta, .. },
1860        }) => {
1861            let mut state = (**state).clone();
1862
1863            // Add the fetched accounts to the state.
1864            for account in accounts {
1865                if let Some((proof, _)) = RewardAccountProofV1::prove(tree, (*account).into()) {
1866                    if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v1) {
1867                        tracing::warn!(
1868                            ?view,
1869                            %account,
1870                            "cannot update fetched account state: {err:#}"
1871                        );
1872                    }
1873                } else {
1874                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1875                };
1876            }
1877
1878            (Arc::new(state), delta.clone())
1879        },
1880        _ => {
1881            // If we don't already have a leaf for this view, or if we don't have the view
1882            // at all, we can create a new view based on the recovered leaf and add it to
1883            // our state map. In this case, we must also add the leaf to the saved leaves
1884            // map to ensure consistency.
1885            let mut state = ValidatedState::from_header(leaf.block_header());
1886            state.reward_merkle_tree_v1 = tree.clone();
1887            (Arc::new(state), None)
1888        },
1889    };
1890
1891    consensus
1892        .update_leaf(leaf, Arc::clone(&state), delta)
1893        .with_context(|| "failed to update leaf")?;
1894
1895    Ok(())
1896}
1897
1898#[cfg(test)]
1899mod test {
1900    use super::*;
1901
1902    #[test]
1903    fn test_peer_priority() {
1904        let good_peer = PeerScore {
1905            requests: 1000,
1906            failures: 2,
1907        };
1908        let bad_peer = PeerScore {
1909            requests: 10,
1910            failures: 1,
1911        };
1912        assert!(good_peer > bad_peer);
1913
1914        let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect();
1915        assert_eq!(peers.pop(), Some((0, good_peer)));
1916        assert_eq!(peers.pop(), Some((1, bad_peer)));
1917    }
1918}