1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 fmt::{Debug, Display},
5 sync::Arc,
6 time::Duration,
7};
8
9use alloy::primitives::U256;
10use anyhow::{anyhow, bail, ensure, Context};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::{Commitment, Committable};
14use espresso_types::{
15 config::PublicNetworkConfig,
16 traits::SequencerPersistence,
17 v0::traits::StateCatchup,
18 v0_3::{
19 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1,
20 RewardMerkleTreeV1,
21 },
22 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2, RewardMerkleTreeV2},
23 BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
24 FeeMerkleTree, Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions, ValidatedState,
25};
26use futures::{
27 future::{Future, FutureExt, TryFuture, TryFutureExt},
28 stream::FuturesUnordered,
29 StreamExt,
30};
31use hotshot_types::{
32 consensus::Consensus,
33 data::ViewNumber,
34 message::UpgradeLock,
35 network::NetworkConfig,
36 stake_table::HSStakeTable,
37 traits::{
38 metrics::{Counter, CounterFamily, Metrics},
39 network::ConnectedNetwork,
40 node_implementation::{ConsensusTime as _, NodeType, Versions},
41 ValidatedState as ValidatedStateTrait,
42 },
43 utils::{verify_leaf_chain, View, ViewInner},
44 ValidatorConfig,
45};
46use itertools::Itertools;
47use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme};
48use parking_lot::Mutex;
49use priority_queue::PriorityQueue;
50use serde::de::DeserializeOwned;
51use surf_disco::Request;
52use tide_disco::error::ServerError;
53use tokio::time::timeout;
54use tokio_util::task::AbortOnDropHandle;
55use tracing::warn;
56use url::Url;
57use vbs::version::StaticVersionType;
58
59use crate::api::BlocksFrontier;
60
61#[derive(Debug, Clone)]
64struct Client<ServerError, ApiVer: StaticVersionType> {
65 inner: surf_disco::Client<ServerError, ApiVer>,
66 url: Url,
67 requests: Arc<Box<dyn Counter>>,
68 failures: Arc<Box<dyn Counter>>,
69}
70
71impl<ApiVer: StaticVersionType> Client<ServerError, ApiVer> {
72 pub fn new(
73 url: Url,
74 requests: &(impl CounterFamily + ?Sized),
75 failures: &(impl CounterFamily + ?Sized),
76 ) -> Self {
77 Self {
78 inner: surf_disco::Client::new(url.clone()),
79 requests: Arc::new(requests.create(vec![url.to_string()])),
80 failures: Arc::new(failures.create(vec![url.to_string()])),
81 url,
82 }
83 }
84
85 pub fn get<T: DeserializeOwned>(&self, route: &str) -> Request<T, ServerError, ApiVer> {
86 self.inner.get(route)
87 }
88}
89
90#[derive(Clone, Copy, Debug, Default)]
99struct PeerScore {
100 requests: usize,
101 failures: usize,
102}
103
104impl Ord for PeerScore {
105 fn cmp(&self, other: &Self) -> Ordering {
106 (other.failures * self.requests).cmp(&(self.failures * other.requests))
111 }
112}
113
114impl PartialOrd for PeerScore {
115 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
116 Some(self.cmp(other))
117 }
118}
119
120impl PartialEq for PeerScore {
121 fn eq(&self, other: &Self) -> bool {
122 self.cmp(other).is_eq()
123 }
124}
125
126impl Eq for PeerScore {}
127
128#[derive(Debug, Clone, Default)]
129pub struct StatePeers<ApiVer: StaticVersionType> {
130 scores: Arc<RwLock<PriorityQueue<usize, PeerScore>>>,
132 clients: Vec<Client<ServerError, ApiVer>>,
133 backoff: BackoffParams,
134}
135
136impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
137 async fn fetch<Fut>(
138 &self,
139 retry: usize,
140 f: impl Fn(Client<ServerError, ApiVer>) -> Fut,
141 ) -> anyhow::Result<Fut::Ok>
142 where
143 Fut: TryFuture<Error: Display>,
144 {
145 let timeout_dur = Duration::from_millis(500) * (retry as u32 + 1);
156
157 let mut requests = HashMap::new();
160 let mut res = Err(anyhow!("failed fetching from every peer"));
161
162 let mut scores = { (*self.scores.read().await).clone() };
167 while let Some((id, score)) = scores.pop() {
168 let client = &self.clients[id];
169 tracing::info!("fetching from {}", client.url);
170 match timeout(timeout_dur, f(client.clone()).into_future()).await {
171 Ok(Ok(t)) => {
172 requests.insert(id, true);
173 res = Ok(t);
174 break;
175 },
176 Ok(Err(err)) => {
177 tracing::warn!(id, ?score, peer = %client.url, "error from peer: {err:#}");
178 requests.insert(id, false);
179 },
180 Err(_) => {
181 tracing::warn!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out");
182 requests.insert(id, false);
183 },
184 }
185 }
186
187 let mut scores = self.scores.write().await;
189 for (id, success) in requests {
190 scores.change_priority_by(&id, |score| {
191 score.requests += 1;
192 self.clients[id].requests.add(1);
193 if !success {
194 score.failures += 1;
195 self.clients[id].failures.add(1);
196 }
197 });
198 }
199
200 res
201 }
202
203 pub fn from_urls(
204 urls: Vec<Url>,
205 backoff: BackoffParams,
206 metrics: &(impl Metrics + ?Sized),
207 ) -> Self {
208 if urls.is_empty() {
209 panic!("Cannot create StatePeers with no peers");
210 }
211
212 let metrics = metrics.subgroup("catchup".into());
213 let requests = metrics.counter_family("requests".into(), vec!["peer".into()]);
214 let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]);
215
216 let scores = urls
217 .iter()
218 .enumerate()
219 .map(|(i, _)| (i, PeerScore::default()))
220 .collect();
221 let clients = urls
222 .into_iter()
223 .map(|url| Client::new(url, &*requests, &*failures))
224 .collect();
225
226 Self {
227 clients,
228 scores: Arc::new(RwLock::new(scores)),
229 backoff,
230 }
231 }
232
233 #[tracing::instrument(skip(self, my_own_validator_config))]
234 pub async fn fetch_config(
235 &self,
236 my_own_validator_config: ValidatorConfig<SeqTypes>,
237 ) -> anyhow::Result<NetworkConfig<SeqTypes>> {
238 self.backoff()
239 .retry(self, move |provider, retry| {
240 let my_own_validator_config = my_own_validator_config.clone();
241 async move {
242 let cfg: PublicNetworkConfig = provider
243 .fetch(retry, |client| {
244 let url = client.url.join("config/hotshot").unwrap();
245
246 reqwest::get(url.clone())
247 })
248 .await?
249 .json()
250 .await?;
251 cfg.into_network_config(my_own_validator_config)
252 .context("fetched config, but failed to convert to private config")
253 }
254 .boxed()
255 })
256 .await
257 }
258}
259
260#[async_trait]
261impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
262 #[tracing::instrument(skip(self, _instance))]
263 async fn try_fetch_accounts(
264 &self,
265 retry: usize,
266 _instance: &NodeState,
267 height: u64,
268 view: ViewNumber,
269 fee_merkle_tree_root: FeeMerkleCommitment,
270 accounts: &[FeeAccount],
271 ) -> anyhow::Result<Vec<FeeAccountProof>> {
272 self.fetch(retry, |client| async move {
273 let tree = client
274 .inner
275 .post::<FeeMerkleTree>(&format!("catchup/{height}/{}/accounts", view.u64()))
276 .body_binary(&accounts.to_vec())?
277 .send()
278 .await?;
279
280 let mut proofs = Vec::new();
282 for account in accounts {
283 let (proof, _) = FeeAccountProof::prove(&tree, (*account).into())
284 .context(format!("response missing fee account {account}"))?;
285 proof
286 .verify(&fee_merkle_tree_root)
287 .context(format!("invalid proof for fee account {account}"))?;
288 proofs.push(proof);
289 }
290
291 anyhow::Ok(proofs)
292 })
293 .await
294 }
295
296 #[tracing::instrument(skip(self, _instance, mt))]
297 async fn try_remember_blocks_merkle_tree(
298 &self,
299 retry: usize,
300 _instance: &NodeState,
301 height: u64,
302 view: ViewNumber,
303 mt: &mut BlockMerkleTree,
304 ) -> anyhow::Result<()> {
305 *mt = self
306 .fetch(retry, |client| {
307 let mut mt = mt.clone();
308 async move {
309 let frontier = client
310 .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
311 .send()
312 .await?;
313 let elem = frontier
314 .elem()
315 .context("provided frontier is missing leaf element")?;
316 mt.remember(mt.num_leaves() - 1, *elem, &frontier)
317 .context("verifying block proof")?;
318 anyhow::Ok(mt)
319 }
320 })
321 .await?;
322 Ok(())
323 }
324
325 async fn try_fetch_chain_config(
326 &self,
327 retry: usize,
328 commitment: Commitment<ChainConfig>,
329 ) -> anyhow::Result<ChainConfig> {
330 self.fetch(retry, |client| async move {
331 let cf = client
332 .get::<ChainConfig>(&format!("catchup/chain-config/{commitment}"))
333 .send()
334 .await?;
335 ensure!(
336 cf.commit() == commitment,
337 "received chain config with mismatched commitment: expected {commitment}, got {}",
338 cf.commit()
339 );
340 Ok(cf)
341 })
342 .await
343 }
344
345 async fn try_fetch_leaf(
346 &self,
347 retry: usize,
348 height: u64,
349 stake_table: HSStakeTable<SeqTypes>,
350 success_threshold: U256,
351 ) -> anyhow::Result<Leaf2> {
352 let leaf_chain = self
354 .fetch(retry, |client| async move {
355 let leaf = client
356 .get::<Vec<Leaf2>>(&format!("catchup/{height}/leafchain"))
357 .send()
358 .await?;
359 anyhow::Ok(leaf)
360 })
361 .await
362 .with_context(|| format!("failed to fetch leaf chain at height {height}"))?;
363
364 verify_leaf_chain(
366 leaf_chain,
367 &stake_table,
368 success_threshold,
369 height,
370 &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
371 )
372 .await
373 .with_context(|| format!("failed to verify leaf chain at height {height}"))
374 }
375
376 #[tracing::instrument(skip(self, _instance))]
377 async fn try_fetch_reward_accounts_v2(
378 &self,
379 retry: usize,
380 _instance: &NodeState,
381 height: u64,
382 view: ViewNumber,
383 reward_merkle_tree_root: RewardMerkleCommitmentV2,
384 accounts: &[RewardAccountV2],
385 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
386 self.fetch(retry, |client| async move {
387 let tree = client
388 .inner
389 .post::<RewardMerkleTreeV2>(&format!(
390 "catchup/{height}/{}/reward-accounts-v2",
391 view.u64()
392 ))
393 .body_binary(&accounts.to_vec())?
394 .send()
395 .await?;
396
397 let mut proofs = Vec::new();
400 for account in accounts {
401 let (proof, _) = RewardAccountProofV2::prove(&tree, (*account).into())
402 .context(format!("response missing reward account {account}"))?;
403 proof
404 .verify(&reward_merkle_tree_root)
405 .context(format!("invalid proof for reward account {account}"))?;
406 proofs.push(proof);
407 }
408
409 anyhow::Ok(proofs)
410 })
411 .await
412 }
413
414 #[tracing::instrument(skip(self, _instance))]
415 async fn try_fetch_reward_accounts_v1(
416 &self,
417 retry: usize,
418 _instance: &NodeState,
419 height: u64,
420 view: ViewNumber,
421 reward_merkle_tree_root: RewardMerkleCommitmentV1,
422 accounts: &[RewardAccountV1],
423 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
424 self.fetch(retry, |client| async move {
425 let tree = client
426 .inner
427 .post::<RewardMerkleTreeV1>(&format!(
428 "catchup/{height}/{}/reward-accounts",
429 view.u64()
430 ))
431 .body_binary(&accounts.to_vec())?
432 .send()
433 .await?;
434
435 let mut proofs = Vec::new();
437 for account in accounts {
438 let (proof, _) = RewardAccountProofV1::prove(&tree, (*account).into())
439 .context(format!("response missing reward account {account}"))?;
440 proof
441 .verify(&reward_merkle_tree_root)
442 .context(format!("invalid proof for reward account {account}"))?;
443 proofs.push(proof);
444 }
445
446 anyhow::Ok(proofs)
447 })
448 .await
449 }
450
451 fn backoff(&self) -> &BackoffParams {
452 &self.backoff
453 }
454
455 fn name(&self) -> String {
456 format!(
457 "StatePeers({})",
458 self.clients
459 .iter()
460 .map(|client| client.url.to_string())
461 .join(",")
462 )
463 }
464
465 fn is_local(&self) -> bool {
466 false
467 }
468}
469
470pub(crate) trait CatchupStorage: Sync {
471 fn get_accounts(
482 &self,
483 _instance: &NodeState,
484 _height: u64,
485 _view: ViewNumber,
486 _accounts: &[FeeAccount],
487 ) -> impl Send + Future<Output = anyhow::Result<(FeeMerkleTree, Leaf2)>> {
488 async {
493 bail!("merklized state catchup is not supported for this data source");
494 }
495 }
496
497 fn get_reward_accounts_v1(
498 &self,
499 _instance: &NodeState,
500 _height: u64,
501 _view: ViewNumber,
502 _accounts: &[RewardAccountV1],
503 ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV1, Leaf2)>> {
504 async {
505 bail!("merklized state catchup is not supported for this data source");
506 }
507 }
508
509 fn get_reward_accounts_v2(
510 &self,
511 _instance: &NodeState,
512 _height: u64,
513 _view: ViewNumber,
514 _accounts: &[RewardAccountV2],
515 ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV2, Leaf2)>> {
516 async {
517 bail!("merklized state catchup is not supported for this data source");
518 }
519 }
520
521 fn get_frontier(
528 &self,
529 _instance: &NodeState,
530 _height: u64,
531 _view: ViewNumber,
532 ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>> {
533 async {
538 bail!("merklized state catchup is not supported for this data source");
539 }
540 }
541
542 fn get_chain_config(
543 &self,
544 _commitment: Commitment<ChainConfig>,
545 ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>> {
546 async {
547 bail!("chain config catchup is not supported for this data source");
548 }
549 }
550
551 fn get_leaf_chain(
552 &self,
553 _height: u64,
554 ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>> {
555 async {
556 bail!("leaf chain catchup is not supported for this data source");
557 }
558 }
559}
560
561impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {}
562
563impl<T, S> CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
564where
565 T: CatchupStorage,
566 S: Sync,
567{
568 async fn get_accounts(
569 &self,
570 instance: &NodeState,
571 height: u64,
572 view: ViewNumber,
573 accounts: &[FeeAccount],
574 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
575 self.inner()
576 .get_accounts(instance, height, view, accounts)
577 .await
578 }
579
580 async fn get_reward_accounts_v2(
581 &self,
582 instance: &NodeState,
583 height: u64,
584 view: ViewNumber,
585 accounts: &[RewardAccountV2],
586 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
587 self.inner()
588 .get_reward_accounts_v2(instance, height, view, accounts)
589 .await
590 }
591
592 async fn get_reward_accounts_v1(
593 &self,
594 instance: &NodeState,
595 height: u64,
596 view: ViewNumber,
597 accounts: &[RewardAccountV1],
598 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
599 self.inner()
600 .get_reward_accounts_v1(instance, height, view, accounts)
601 .await
602 }
603
604 async fn get_frontier(
605 &self,
606 instance: &NodeState,
607 height: u64,
608 view: ViewNumber,
609 ) -> anyhow::Result<BlocksFrontier> {
610 self.inner().get_frontier(instance, height, view).await
611 }
612
613 async fn get_chain_config(
614 &self,
615 commitment: Commitment<ChainConfig>,
616 ) -> anyhow::Result<ChainConfig> {
617 self.inner().get_chain_config(commitment).await
618 }
619 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
620 self.inner().get_leaf_chain(height).await
621 }
622}
623
624#[derive(Debug)]
625pub(crate) struct SqlStateCatchup<T> {
626 db: Arc<T>,
627 backoff: BackoffParams,
628}
629
630impl<T> SqlStateCatchup<T> {
631 pub(crate) fn new(db: Arc<T>, backoff: BackoffParams) -> Self {
632 Self { db, backoff }
633 }
634}
635
636#[async_trait]
637impl<T> StateCatchup for SqlStateCatchup<T>
638where
639 T: CatchupStorage + Send + Sync,
640{
641 async fn try_fetch_leaf(
642 &self,
643 _retry: usize,
644 height: u64,
645 stake_table: HSStakeTable<SeqTypes>,
646 success_threshold: U256,
647 ) -> anyhow::Result<Leaf2> {
648 let leaf_chain = self
650 .db
651 .get_leaf_chain(height)
652 .await
653 .with_context(|| "failed to get leaf chain from DB")?;
654
655 let leaf = verify_leaf_chain(
657 leaf_chain,
658 &stake_table,
659 success_threshold,
660 height,
661 &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
662 )
663 .await
664 .with_context(|| "failed to verify leaf chain")?;
665
666 Ok(leaf)
667 }
668 #[tracing::instrument(skip(self, _retry, instance))]
671 async fn try_fetch_accounts(
672 &self,
673 _retry: usize,
674 instance: &NodeState,
675 block_height: u64,
676 view: ViewNumber,
677 fee_merkle_tree_root: FeeMerkleCommitment,
678 accounts: &[FeeAccount],
679 ) -> anyhow::Result<Vec<FeeAccountProof>> {
680 let (fee_merkle_tree_from_db, _) = self
682 .db
683 .get_accounts(instance, block_height, view, accounts)
684 .await
685 .with_context(|| "failed to get fee accounts from DB")?;
686
687 let mut proofs = Vec::new();
689 for account in accounts {
690 let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree_from_db, (*account).into())
691 .context(format!("response missing account {account}"))?;
692 proof
693 .verify(&fee_merkle_tree_root)
694 .context(format!("invalid proof for account {account}"))?;
695 proofs.push(proof);
696 }
697
698 Ok(proofs)
699 }
700
701 #[tracing::instrument(skip(self, _retry, instance, mt))]
702 async fn try_remember_blocks_merkle_tree(
703 &self,
704 _retry: usize,
705 instance: &NodeState,
706 bh: u64,
707 view: ViewNumber,
708 mt: &mut BlockMerkleTree,
709 ) -> anyhow::Result<()> {
710 if bh == 0 {
711 return Ok(());
712 }
713
714 let proof = self.db.get_frontier(instance, bh, view).await?;
715 match proof
716 .proof
717 .first()
718 .context(format!("empty proof for frontier at height {bh}"))?
719 {
720 MerkleNode::Leaf { pos, elem, .. } => mt
721 .remember(pos, elem, proof.clone())
722 .context("failed to remember proof"),
723 _ => bail!("invalid proof"),
724 }
725 }
726
727 async fn try_fetch_chain_config(
728 &self,
729 _retry: usize,
730 commitment: Commitment<ChainConfig>,
731 ) -> anyhow::Result<ChainConfig> {
732 let cf = self.db.get_chain_config(commitment).await?;
733
734 if cf.commit() != commitment {
735 panic!(
736 "Critical error: Mismatched chain config detected. Expected chain config: {:?}, \
737 but got: {:?}.
738 This may indicate a compromised database",
739 commitment,
740 cf.commit()
741 )
742 }
743
744 Ok(cf)
745 }
746
747 #[tracing::instrument(skip(self, _retry, instance))]
748 async fn try_fetch_reward_accounts_v2(
749 &self,
750 _retry: usize,
751 instance: &NodeState,
752 block_height: u64,
753 view: ViewNumber,
754 reward_merkle_tree_root: RewardMerkleCommitmentV2,
755 accounts: &[RewardAccountV2],
756 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
757 let (reward_merkle_tree_from_db, _) = self
759 .db
760 .get_reward_accounts_v2(instance, block_height, view, accounts)
761 .await
762 .with_context(|| "failed to get reward accounts from DB")?;
763 let mut proofs = Vec::new();
765 for account in accounts {
766 let (proof, _) =
767 RewardAccountProofV2::prove(&reward_merkle_tree_from_db, (*account).into())
768 .context(format!("response missing account {account}"))?;
769 proof
770 .verify(&reward_merkle_tree_root)
771 .context(format!("invalid proof for account {account}"))?;
772 proofs.push(proof);
773 }
774
775 Ok(proofs)
776 }
777
778 #[tracing::instrument(skip(self, _retry, instance))]
779 async fn try_fetch_reward_accounts_v1(
780 &self,
781 _retry: usize,
782 instance: &NodeState,
783 block_height: u64,
784 view: ViewNumber,
785 reward_merkle_tree_root: RewardMerkleCommitmentV1,
786 accounts: &[RewardAccountV1],
787 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
788 let (reward_merkle_tree_from_db, _) = self
790 .db
791 .get_reward_accounts_v1(instance, block_height, view, accounts)
792 .await
793 .with_context(|| "failed to get reward accounts from DB")?;
794 let mut proofs = Vec::new();
796 for account in accounts {
797 let (proof, _) =
798 RewardAccountProofV1::prove(&reward_merkle_tree_from_db, (*account).into())
799 .context(format!("response missing account {account}"))?;
800 proof
801 .verify(&reward_merkle_tree_root)
802 .context(format!("invalid proof for account {account}"))?;
803 proofs.push(proof);
804 }
805
806 Ok(proofs)
807 }
808
809 fn backoff(&self) -> &BackoffParams {
810 &self.backoff
811 }
812
813 fn name(&self) -> String {
814 "SqlStateCatchup".into()
815 }
816
817 fn is_local(&self) -> bool {
818 true
819 }
820}
821
822#[derive(Clone, Debug)]
824pub struct NullStateCatchup {
825 backoff: BackoffParams,
826 chain_configs: HashMap<Commitment<ChainConfig>, ChainConfig>,
827}
828
829impl Default for NullStateCatchup {
830 fn default() -> Self {
831 Self {
832 backoff: BackoffParams::disabled(),
833 chain_configs: Default::default(),
834 }
835 }
836}
837
838impl NullStateCatchup {
839 pub fn add_chain_config(&mut self, cf: ChainConfig) {
849 self.chain_configs.insert(cf.commit(), cf);
850 }
851}
852
853#[async_trait]
854impl StateCatchup for NullStateCatchup {
855 async fn try_fetch_leaf(
856 &self,
857 _retry: usize,
858 _height: u64,
859 _stake_table: HSStakeTable<SeqTypes>,
860 _success_threshold: U256,
861 ) -> anyhow::Result<Leaf2> {
862 bail!("state catchup is disabled")
863 }
864
865 async fn try_fetch_accounts(
866 &self,
867 _retry: usize,
868 _instance: &NodeState,
869 _height: u64,
870 _view: ViewNumber,
871 _fee_merkle_tree_root: FeeMerkleCommitment,
872 _account: &[FeeAccount],
873 ) -> anyhow::Result<Vec<FeeAccountProof>> {
874 bail!("state catchup is disabled");
875 }
876
877 async fn try_fetch_reward_accounts_v2(
878 &self,
879 _retry: usize,
880 _instance: &NodeState,
881 _height: u64,
882 _view: ViewNumber,
883 _fee_merkle_tree_root: RewardMerkleCommitmentV2,
884 _account: &[RewardAccountV2],
885 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
886 bail!("state catchup is disabled");
887 }
888
889 async fn try_remember_blocks_merkle_tree(
890 &self,
891 _retry: usize,
892 _instance: &NodeState,
893 _height: u64,
894 _view: ViewNumber,
895 _mt: &mut BlockMerkleTree,
896 ) -> anyhow::Result<()> {
897 bail!("state catchup is disabled");
898 }
899
900 async fn try_fetch_chain_config(
901 &self,
902 _retry: usize,
903 commitment: Commitment<ChainConfig>,
904 ) -> anyhow::Result<ChainConfig> {
905 self.chain_configs
906 .get(&commitment)
907 .copied()
908 .context(format!("chain config {commitment} not available"))
909 }
910
911 async fn try_fetch_reward_accounts_v1(
912 &self,
913 _retry: usize,
914 _instance: &NodeState,
915 _height: u64,
916 _view: ViewNumber,
917 _fee_merkle_tree_root: RewardMerkleCommitmentV1,
918 _account: &[RewardAccountV1],
919 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
920 bail!("state catchup is disabled");
921 }
922
923 fn backoff(&self) -> &BackoffParams {
924 &self.backoff
925 }
926
927 fn name(&self) -> String {
928 "NullStateCatchup".into()
929 }
930
931 fn is_local(&self) -> bool {
932 true
933 }
934}
935
936#[derive(Clone)]
939pub struct ParallelStateCatchup {
940 providers: Arc<Mutex<Vec<Arc<dyn StateCatchup>>>>,
941}
942
943impl ParallelStateCatchup {
944 pub fn new(providers: &[Arc<dyn StateCatchup>]) -> Self {
946 Self {
947 providers: Arc::new(Mutex::new(providers.to_vec())),
948 }
949 }
950
951 pub fn add_provider(&self, provider: Arc<dyn StateCatchup>) {
953 self.providers.lock().push(provider);
954 }
955
956 pub async fn on_local_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
958 where
959 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
960 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
961 RT: Send + Sync + 'static,
962 {
963 self.on_providers(|provider| provider.is_local(), closure)
964 .await
965 }
966
967 pub async fn on_remote_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
969 where
970 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
971 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
972 RT: Send + Sync + 'static,
973 {
974 self.on_providers(|provider| !provider.is_local(), closure)
975 .await
976 }
977
978 pub async fn on_providers<P, C, F, RT>(&self, predicate: P, closure: C) -> anyhow::Result<RT>
980 where
981 P: Fn(&Arc<dyn StateCatchup>) -> bool + Clone + Send + Sync + 'static,
982 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
983 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
984 RT: Send + Sync + 'static,
985 {
986 let providers = self.providers.lock().clone();
988 if providers.is_empty() {
989 return Err(anyhow::anyhow!("no providers were initialized"));
990 }
991
992 let providers = providers.into_iter().filter(predicate).collect::<Vec<_>>();
994 if providers.is_empty() {
995 return Err(anyhow::anyhow!("no providers matched the given predicate"));
996 }
997
998 let mut futures = FuturesUnordered::new();
1000 for provider in providers {
1001 let closure = closure.clone();
1002 futures.push(AbortOnDropHandle::new(tokio::spawn(closure(provider))));
1003 }
1004
1005 while let Some(result) = futures.next().await {
1007 let result = match result {
1009 Ok(res) => res,
1010 Err(err) => {
1011 warn!("Failed to join on provider: {err:#}. Trying next provider...");
1012 continue;
1013 },
1014 };
1015
1016 let result = match result {
1018 Ok(res) => res,
1019 Err(err) => {
1020 warn!("Failed to fetch data: {err:#}. Trying next provider...");
1021 continue;
1022 },
1023 };
1024
1025 return Ok(result);
1026 }
1027
1028 Err(anyhow::anyhow!("no providers returned a successful result"))
1029 }
1030}
1031
1032macro_rules! clone {
1033 ( ($( $x:ident ),*) $y:expr ) => {
1034 {
1035 $(let $x = $x.clone();)*
1036 $y
1037 }
1038 };
1039}
1040
1041#[async_trait]
1044impl StateCatchup for ParallelStateCatchup {
1045 async fn try_fetch_leaf(
1046 &self,
1047 retry: usize,
1048 height: u64,
1049 stake_table: HSStakeTable<SeqTypes>,
1050 success_threshold: U256,
1051 ) -> anyhow::Result<Leaf2> {
1052 let local_result = self
1054 .on_local_providers(clone! {(stake_table) move |provider| {
1055 clone!{(stake_table) async move {
1056 provider
1057 .try_fetch_leaf(retry, height, stake_table, success_threshold)
1058 .await
1059 }}
1060 }})
1061 .await;
1062
1063 if local_result.is_ok() {
1065 return local_result;
1066 }
1067
1068 self.on_remote_providers(clone! {(stake_table) move |provider| {
1070 clone!{(stake_table) async move {
1071 provider
1072 .try_fetch_leaf(retry, height, stake_table, success_threshold)
1073 .await
1074 }}
1075 }})
1076 .await
1077 }
1078
1079 async fn try_fetch_accounts(
1080 &self,
1081 retry: usize,
1082 instance: &NodeState,
1083 height: u64,
1084 view: ViewNumber,
1085 fee_merkle_tree_root: FeeMerkleCommitment,
1086 accounts: &[FeeAccount],
1087 ) -> anyhow::Result<Vec<FeeAccountProof>> {
1088 let accounts_vec = accounts.to_vec();
1090 let local_result = self
1091 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1092 clone! {(instance, accounts_vec) async move {
1093 provider
1094 .try_fetch_accounts(
1095 retry,
1096 &instance,
1097 height,
1098 view,
1099 fee_merkle_tree_root,
1100 &accounts_vec,
1101 )
1102 .await
1103 }}
1104 }})
1105 .await;
1106
1107 if local_result.is_ok() {
1109 return local_result;
1110 }
1111
1112 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1114 clone!{(instance, accounts_vec) async move {
1115 provider
1116 .try_fetch_accounts(
1117 retry,
1118 &instance,
1119 height,
1120 view,
1121 fee_merkle_tree_root,
1122 &accounts_vec,
1123 ).await
1124 }}
1125 }})
1126 .await
1127 }
1128
1129 async fn try_remember_blocks_merkle_tree(
1130 &self,
1131 retry: usize,
1132 instance: &NodeState,
1133 height: u64,
1134 view: ViewNumber,
1135 mt: &mut BlockMerkleTree,
1136 ) -> anyhow::Result<()> {
1137 let local_result = self
1139 .on_local_providers(clone! {(mt, instance) move |provider| {
1140 let mut mt = mt.clone();
1141 clone! {(instance) async move {
1142 provider
1144 .try_remember_blocks_merkle_tree(
1145 retry,
1146 &instance,
1147 height,
1148 view,
1149 &mut mt,
1150 )
1151 .await?;
1152
1153 Ok(mt)
1155 }}
1156 }})
1157 .await;
1158
1159 if let Ok(modified_mt) = local_result {
1161 *mt = modified_mt;
1163
1164 return Ok(());
1165 }
1166
1167 let remote_result = self
1169 .on_remote_providers(clone! {(mt, instance) move |provider| {
1170 let mut mt = mt.clone();
1171 clone!{(instance) async move {
1172 provider
1174 .try_remember_blocks_merkle_tree(
1175 retry,
1176 &instance,
1177 height,
1178 view,
1179 &mut mt,
1180 )
1181 .await?;
1182
1183 Ok(mt)
1185 }}
1186 }})
1187 .await?;
1188
1189 *mt = remote_result;
1191
1192 Ok(())
1193 }
1194
1195 async fn try_fetch_chain_config(
1196 &self,
1197 retry: usize,
1198 commitment: Commitment<ChainConfig>,
1199 ) -> anyhow::Result<ChainConfig> {
1200 let local_result = self
1202 .on_local_providers(move |provider| async move {
1203 provider.try_fetch_chain_config(retry, commitment).await
1204 })
1205 .await;
1206
1207 if local_result.is_ok() {
1209 return local_result;
1210 }
1211
1212 self.on_remote_providers(move |provider| async move {
1214 provider.try_fetch_chain_config(retry, commitment).await
1215 })
1216 .await
1217 }
1218
1219 async fn try_fetch_reward_accounts_v2(
1220 &self,
1221 retry: usize,
1222 instance: &NodeState,
1223 height: u64,
1224 view: ViewNumber,
1225 reward_merkle_tree_root: RewardMerkleCommitmentV2,
1226 accounts: &[RewardAccountV2],
1227 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
1228 let accounts_vec = accounts.to_vec();
1230 let local_result = self
1231 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1232 clone! {(instance, accounts_vec) async move {
1233 provider
1234 .try_fetch_reward_accounts_v2(
1235 retry,
1236 &instance,
1237 height,
1238 view,
1239 reward_merkle_tree_root,
1240 &accounts_vec,
1241 )
1242 .await
1243 }}
1244 }})
1245 .await;
1246
1247 if local_result.is_ok() {
1249 return local_result;
1250 }
1251
1252 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1254 clone!{(instance, accounts_vec) async move {
1255 provider
1256 .try_fetch_reward_accounts_v2(
1257 retry,
1258 &instance,
1259 height,
1260 view,
1261 reward_merkle_tree_root,
1262 &accounts_vec,
1263 ).await
1264 }}
1265 }})
1266 .await
1267 }
1268
1269 async fn try_fetch_reward_accounts_v1(
1270 &self,
1271 retry: usize,
1272 instance: &NodeState,
1273 height: u64,
1274 view: ViewNumber,
1275 reward_merkle_tree_root: RewardMerkleCommitmentV1,
1276 accounts: &[RewardAccountV1],
1277 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1278 let accounts_vec = accounts.to_vec();
1280 let local_result = self
1281 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1282 clone! {(instance, accounts_vec) async move {
1283 provider
1284 .try_fetch_reward_accounts_v1(
1285 retry,
1286 &instance,
1287 height,
1288 view,
1289 reward_merkle_tree_root,
1290 &accounts_vec,
1291 )
1292 .await
1293 }}
1294 }})
1295 .await;
1296
1297 if local_result.is_ok() {
1299 return local_result;
1300 }
1301
1302 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1304 clone!{(instance, accounts_vec) async move {
1305 provider
1306 .try_fetch_reward_accounts_v1(
1307 retry,
1308 &instance,
1309 height,
1310 view,
1311 reward_merkle_tree_root,
1312 &accounts_vec,
1313 ).await
1314 }}
1315 }})
1316 .await
1317 }
1318
1319 fn backoff(&self) -> &BackoffParams {
1320 unreachable!()
1321 }
1322
1323 fn name(&self) -> String {
1324 format!(
1325 "[{}]",
1326 self.providers
1327 .lock()
1328 .iter()
1329 .map(|p| p.name())
1330 .collect::<Vec<_>>()
1331 .join(", ")
1332 )
1333 }
1334
1335 async fn fetch_accounts(
1336 &self,
1337 instance: &NodeState,
1338 height: u64,
1339 view: ViewNumber,
1340 fee_merkle_tree_root: FeeMerkleCommitment,
1341 accounts: Vec<FeeAccount>,
1342 ) -> anyhow::Result<Vec<FeeAccountProof>> {
1343 let accounts_vec = accounts.to_vec();
1345 let local_result = self
1346 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1347 clone! {(instance, accounts_vec) async move {
1348 provider
1349 .try_fetch_accounts(
1350 0,
1351 &instance,
1352 height,
1353 view,
1354 fee_merkle_tree_root,
1355 &accounts_vec,
1356 )
1357 .await
1358 }}
1359 }})
1360 .await;
1361
1362 if local_result.is_ok() {
1364 return local_result;
1365 }
1366
1367 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1369 clone!{(instance, accounts_vec) async move {
1370 provider
1371 .fetch_accounts(
1372 &instance,
1373 height,
1374 view,
1375 fee_merkle_tree_root,
1376 accounts_vec,
1377 ).await
1378 }}
1379 }})
1380 .await
1381 }
1382
1383 async fn fetch_leaf(
1384 &self,
1385 height: u64,
1386 stake_table: HSStakeTable<SeqTypes>,
1387 success_threshold: U256,
1388 ) -> anyhow::Result<Leaf2> {
1389 let local_result = self
1391 .on_local_providers(clone! {(stake_table) move |provider| {
1392 clone!{(stake_table) async move {
1393 provider
1394 .try_fetch_leaf(0, height, stake_table, success_threshold)
1395 .await
1396 }}
1397 }})
1398 .await;
1399
1400 if local_result.is_ok() {
1402 return local_result;
1403 }
1404
1405 self.on_remote_providers(clone! {(stake_table) move |provider| {
1407 clone!{(stake_table) async move {
1408 provider
1409 .fetch_leaf(height, stake_table, success_threshold)
1410 .await
1411 }}
1412 }})
1413 .await
1414 }
1415
1416 async fn fetch_chain_config(
1417 &self,
1418 commitment: Commitment<ChainConfig>,
1419 ) -> anyhow::Result<ChainConfig> {
1420 let local_result = self
1422 .on_local_providers(move |provider| async move {
1423 provider.try_fetch_chain_config(0, commitment).await
1424 })
1425 .await;
1426
1427 if local_result.is_ok() {
1429 return local_result;
1430 }
1431
1432 self.on_remote_providers(move |provider| async move {
1434 provider.fetch_chain_config(commitment).await
1435 })
1436 .await
1437 }
1438
1439 async fn fetch_reward_accounts_v2(
1440 &self,
1441 instance: &NodeState,
1442 height: u64,
1443 view: ViewNumber,
1444 reward_merkle_tree_root: RewardMerkleCommitmentV2,
1445 accounts: Vec<RewardAccountV2>,
1446 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
1447 let accounts_vec = accounts.to_vec();
1449 let local_result = self
1450 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1451 clone! {(instance, accounts_vec) async move {
1452 provider
1453 .try_fetch_reward_accounts_v2(
1454 0,
1455 &instance,
1456 height,
1457 view,
1458 reward_merkle_tree_root,
1459 &accounts_vec,
1460 )
1461 .await
1462 }}
1463 }})
1464 .await;
1465
1466 if local_result.is_ok() {
1468 return local_result;
1469 }
1470
1471 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1473 clone!{(instance, accounts_vec) async move {
1474 provider
1475 .fetch_reward_accounts_v2(
1476 &instance,
1477 height,
1478 view,
1479 reward_merkle_tree_root,
1480 accounts_vec,
1481 ).await
1482 }}
1483 }})
1484 .await
1485 }
1486
1487 async fn fetch_reward_accounts_v1(
1488 &self,
1489 instance: &NodeState,
1490 height: u64,
1491 view: ViewNumber,
1492 reward_merkle_tree_root: RewardMerkleCommitmentV1,
1493 accounts: Vec<RewardAccountV1>,
1494 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1495 let accounts_vec = accounts.to_vec();
1497 let local_result = self
1498 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1499 clone! {(instance, accounts_vec) async move {
1500 provider
1501 .try_fetch_reward_accounts_v1(
1502 0,
1503 &instance,
1504 height,
1505 view,
1506 reward_merkle_tree_root,
1507 &accounts_vec,
1508 )
1509 .await
1510 }}
1511 }})
1512 .await;
1513
1514 if local_result.is_ok() {
1516 return local_result;
1517 }
1518
1519 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1521 clone!{(instance, accounts_vec) async move {
1522 provider
1523 .fetch_reward_accounts_v1(
1524 &instance,
1525 height,
1526 view,
1527 reward_merkle_tree_root,
1528 accounts_vec,
1529 ).await
1530 }}
1531 }})
1532 .await
1533 }
1534 async fn remember_blocks_merkle_tree(
1535 &self,
1536 instance: &NodeState,
1537 height: u64,
1538 view: ViewNumber,
1539 mt: &mut BlockMerkleTree,
1540 ) -> anyhow::Result<()> {
1541 let local_result = self
1543 .on_local_providers(clone! {(mt, instance) move |provider| {
1544 let mut mt = mt.clone();
1545 clone! {(instance) async move {
1546 provider
1548 .try_remember_blocks_merkle_tree(
1549 0,
1550 &instance,
1551 height,
1552 view,
1553 &mut mt,
1554 )
1555 .await?;
1556
1557 Ok(mt)
1559 }}
1560 }})
1561 .await;
1562
1563 if let Ok(modified_mt) = local_result {
1565 *mt = modified_mt;
1568
1569 return Ok(());
1570 }
1571
1572 let remote_result = self
1574 .on_remote_providers(clone! {(mt, instance) move |provider| {
1575 let mut mt = mt.clone();
1576 clone!{(instance) async move {
1577 provider
1579 .remember_blocks_merkle_tree(
1580 &instance,
1581 height,
1582 view,
1583 &mut mt,
1584 )
1585 .await?;
1586
1587 Ok(mt)
1589 }}
1590 }})
1591 .await?;
1592
1593 *mt = remote_result;
1595
1596 Ok(())
1597 }
1598
1599 fn is_local(&self) -> bool {
1600 self.providers.lock().iter().all(|p| p.is_local())
1601 }
1602}
1603
1604#[allow(clippy::type_complexity)]
1607pub async fn add_fee_accounts_to_state<
1608 N: ConnectedNetwork<PubKey>,
1609 V: Versions,
1610 P: SequencerPersistence,
1611>(
1612 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1613 view: &<SeqTypes as NodeType>::View,
1614 accounts: &[FeeAccount],
1615 tree: &FeeMerkleTree,
1616 leaf: Leaf2,
1617) -> anyhow::Result<()> {
1618 let mut consensus = consensus.write().await;
1620
1621 let (state, delta) = match consensus.validated_state_map().get(view) {
1622 Some(View {
1623 view_inner: ViewInner::Leaf { state, delta, .. },
1624 }) => {
1625 let mut state = (**state).clone();
1626
1627 for account in accounts {
1629 if let Some((proof, _)) = FeeAccountProof::prove(tree, (*account).into()) {
1630 if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
1631 tracing::warn!(
1632 ?view,
1633 %account,
1634 "cannot update fetched account state: {err:#}"
1635 );
1636 }
1637 } else {
1638 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1639 };
1640 }
1641
1642 (Arc::new(state), delta.clone())
1643 },
1644 _ => {
1645 let mut state = ValidatedState::from_header(leaf.block_header());
1650 state.fee_merkle_tree = tree.clone();
1651 (Arc::new(state), None)
1652 },
1653 };
1654
1655 consensus
1656 .update_leaf(leaf, Arc::clone(&state), delta)
1657 .with_context(|| "failed to update leaf")?;
1658
1659 Ok(())
1660}
1661
1662#[allow(clippy::type_complexity)]
1665pub async fn add_v2_reward_accounts_to_state<
1666 N: ConnectedNetwork<PubKey>,
1667 V: Versions,
1668 P: SequencerPersistence,
1669>(
1670 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1671 view: &<SeqTypes as NodeType>::View,
1672 accounts: &[RewardAccountV2],
1673 tree: &RewardMerkleTreeV2,
1674 leaf: Leaf2,
1675) -> anyhow::Result<()> {
1676 let mut consensus = consensus.write().await;
1678
1679 let (state, delta) = match consensus.validated_state_map().get(view) {
1680 Some(View {
1681 view_inner: ViewInner::Leaf { state, delta, .. },
1682 }) => {
1683 let mut state = (**state).clone();
1684
1685 for account in accounts {
1687 if let Some((proof, _)) = RewardAccountProofV2::prove(tree, (*account).into()) {
1688 if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v2) {
1689 tracing::warn!(
1690 ?view,
1691 %account,
1692 "cannot update fetched account state: {err:#}"
1693 );
1694 }
1695 } else {
1696 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1697 };
1698 }
1699
1700 (Arc::new(state), delta.clone())
1701 },
1702 _ => {
1703 let mut state = ValidatedState::from_header(leaf.block_header());
1708 state.reward_merkle_tree_v2 = tree.clone();
1709 (Arc::new(state), None)
1710 },
1711 };
1712
1713 consensus
1714 .update_leaf(leaf, Arc::clone(&state), delta)
1715 .with_context(|| "failed to update leaf")?;
1716
1717 Ok(())
1718}
1719
1720#[allow(clippy::type_complexity)]
1723pub async fn add_v1_reward_accounts_to_state<
1724 N: ConnectedNetwork<PubKey>,
1725 V: Versions,
1726 P: SequencerPersistence,
1727>(
1728 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1729 view: &<SeqTypes as NodeType>::View,
1730 accounts: &[RewardAccountV1],
1731 tree: &RewardMerkleTreeV1,
1732 leaf: Leaf2,
1733) -> anyhow::Result<()> {
1734 let mut consensus = consensus.write().await;
1736
1737 let (state, delta) = match consensus.validated_state_map().get(view) {
1738 Some(View {
1739 view_inner: ViewInner::Leaf { state, delta, .. },
1740 }) => {
1741 let mut state = (**state).clone();
1742
1743 for account in accounts {
1745 if let Some((proof, _)) = RewardAccountProofV1::prove(tree, (*account).into()) {
1746 if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v1) {
1747 tracing::warn!(
1748 ?view,
1749 %account,
1750 "cannot update fetched account state: {err:#}"
1751 );
1752 }
1753 } else {
1754 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1755 };
1756 }
1757
1758 (Arc::new(state), delta.clone())
1759 },
1760 _ => {
1761 let mut state = ValidatedState::from_header(leaf.block_header());
1766 state.reward_merkle_tree_v1 = tree.clone();
1767 (Arc::new(state), None)
1768 },
1769 };
1770
1771 consensus
1772 .update_leaf(leaf, Arc::clone(&state), delta)
1773 .with_context(|| "failed to update leaf")?;
1774
1775 Ok(())
1776}
1777
1778#[cfg(test)]
1779mod test {
1780 use super::*;
1781
1782 #[test]
1783 fn test_peer_priority() {
1784 let good_peer = PeerScore {
1785 requests: 1000,
1786 failures: 2,
1787 };
1788 let bad_peer = PeerScore {
1789 requests: 10,
1790 failures: 1,
1791 };
1792 assert!(good_peer > bad_peer);
1793
1794 let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect();
1795 assert_eq!(peers.pop(), Some((0, good_peer)));
1796 assert_eq!(peers.pop(), Some((1, bad_peer)));
1797 }
1798}