1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 fmt::{Debug, Display},
5 sync::Arc,
6 time::Duration,
7};
8
9use alloy::primitives::U256;
10use anyhow::{anyhow, bail, ensure, Context};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::{Commitment, Committable};
14use espresso_types::{
15 config::PublicNetworkConfig,
16 traits::SequencerPersistence,
17 v0::traits::StateCatchup,
18 v0_3::{
19 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1,
20 RewardMerkleTreeV1,
21 },
22 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2, RewardMerkleTreeV2},
23 BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
24 FeeMerkleTree, Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions, ValidatedState,
25};
26use futures::{
27 future::{Future, FutureExt, TryFuture, TryFutureExt},
28 stream::FuturesUnordered,
29 StreamExt,
30};
31use hotshot_types::{
32 consensus::Consensus,
33 data::ViewNumber,
34 message::UpgradeLock,
35 network::NetworkConfig,
36 simple_certificate::LightClientStateUpdateCertificateV2,
37 stake_table::HSStakeTable,
38 traits::{
39 metrics::{Counter, CounterFamily, Metrics},
40 network::ConnectedNetwork,
41 node_implementation::{ConsensusTime as _, NodeType, Versions},
42 ValidatedState as ValidatedStateTrait,
43 },
44 utils::{verify_leaf_chain, View, ViewInner},
45 ValidatorConfig,
46};
47use itertools::Itertools;
48use jf_merkle_tree_compat::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme};
49use parking_lot::Mutex;
50use priority_queue::PriorityQueue;
51use serde::de::DeserializeOwned;
52use surf_disco::Request;
53use tide_disco::error::ServerError;
54use tokio::time::timeout;
55use tokio_util::task::AbortOnDropHandle;
56use url::Url;
57use vbs::version::StaticVersionType;
58
59use crate::api::BlocksFrontier;
60
61#[derive(Debug, Clone)]
64struct Client<ServerError, ApiVer: StaticVersionType> {
65 inner: surf_disco::Client<ServerError, ApiVer>,
66 url: Url,
67 requests: Arc<Box<dyn Counter>>,
68 failures: Arc<Box<dyn Counter>>,
69}
70
71impl<ApiVer: StaticVersionType> Client<ServerError, ApiVer> {
72 pub fn new(
73 url: Url,
74 requests: &(impl CounterFamily + ?Sized),
75 failures: &(impl CounterFamily + ?Sized),
76 ) -> Self {
77 Self {
78 inner: surf_disco::Client::new(url.clone()),
79 requests: Arc::new(requests.create(vec![url.to_string()])),
80 failures: Arc::new(failures.create(vec![url.to_string()])),
81 url,
82 }
83 }
84
85 pub fn get<T: DeserializeOwned>(&self, route: &str) -> Request<T, ServerError, ApiVer> {
86 self.inner.get(route)
87 }
88}
89
90#[derive(Clone, Copy, Debug, Default)]
99struct PeerScore {
100 requests: usize,
101 failures: usize,
102}
103
104impl Ord for PeerScore {
105 fn cmp(&self, other: &Self) -> Ordering {
106 (other.failures * self.requests).cmp(&(self.failures * other.requests))
111 }
112}
113
114impl PartialOrd for PeerScore {
115 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
116 Some(self.cmp(other))
117 }
118}
119
120impl PartialEq for PeerScore {
121 fn eq(&self, other: &Self) -> bool {
122 self.cmp(other).is_eq()
123 }
124}
125
126impl Eq for PeerScore {}
127
128#[derive(Debug, Clone, Default)]
129pub struct StatePeers<ApiVer: StaticVersionType> {
130 scores: Arc<RwLock<PriorityQueue<usize, PeerScore>>>,
132 clients: Vec<Client<ServerError, ApiVer>>,
133 backoff: BackoffParams,
134}
135
136impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
137 async fn fetch<Fut>(
138 &self,
139 retry: usize,
140 f: impl Fn(Client<ServerError, ApiVer>) -> Fut,
141 ) -> anyhow::Result<Fut::Ok>
142 where
143 Fut: TryFuture<Error: Display>,
144 {
145 let timeout_dur = Duration::from_millis(500) * (retry as u32 + 1);
156
157 let mut requests = HashMap::new();
160 let mut res = Err(anyhow!("failed fetching from every peer"));
161
162 let mut scores = { (*self.scores.read().await).clone() };
167 let mut logs = vec![format!("Fetching failed.\n")];
168 while let Some((id, score)) = scores.pop() {
169 let client = &self.clients[id];
170 tracing::info!("fetching from {}", client.url);
171 match timeout(timeout_dur, f(client.clone()).into_future()).await {
172 Ok(Ok(t)) => {
173 requests.insert(id, true);
174 res = Ok(t);
175 logs = Vec::new();
176 break;
177 },
178 Ok(Err(err)) => {
179 tracing::debug!(id, ?score, peer = %client.url, "error from peer: {err:#}");
180 logs.push(format!(
181 "Error from peer {} with id {id} and score {score:?}: {err:#}",
182 client.url
183 ));
184 requests.insert(id, false);
185 },
186 Err(_) => {
187 tracing::debug!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out");
188 logs.push(format!(
189 "Error from peer {} with id {id} and score {score:?}: request timed out",
190 client.url
191 ));
192 requests.insert(id, false);
193 },
194 }
195 }
196
197 if !logs.is_empty() {
198 tracing::warn!("{}", logs.join("\n"));
199 }
200
201 let mut scores = self.scores.write().await;
203 for (id, success) in requests {
204 scores.change_priority_by(&id, |score| {
205 score.requests += 1;
206 self.clients[id].requests.add(1);
207 if !success {
208 score.failures += 1;
209 self.clients[id].failures.add(1);
210 }
211 });
212 }
213
214 res
215 }
216
217 pub fn from_urls(
218 urls: Vec<Url>,
219 backoff: BackoffParams,
220 metrics: &(impl Metrics + ?Sized),
221 ) -> Self {
222 if urls.is_empty() {
223 panic!("Cannot create StatePeers with no peers");
224 }
225
226 let metrics = metrics.subgroup("catchup".into());
227 let requests = metrics.counter_family("requests".into(), vec!["peer".into()]);
228 let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]);
229
230 let scores = urls
231 .iter()
232 .enumerate()
233 .map(|(i, _)| (i, PeerScore::default()))
234 .collect();
235 let clients = urls
236 .into_iter()
237 .map(|url| Client::new(url, &*requests, &*failures))
238 .collect();
239
240 Self {
241 clients,
242 scores: Arc::new(RwLock::new(scores)),
243 backoff,
244 }
245 }
246
247 #[tracing::instrument(skip(self, my_own_validator_config))]
248 pub async fn fetch_config(
249 &self,
250 my_own_validator_config: ValidatorConfig<SeqTypes>,
251 ) -> anyhow::Result<NetworkConfig<SeqTypes>> {
252 self.backoff()
253 .retry(self, move |provider, retry| {
254 let my_own_validator_config = my_own_validator_config.clone();
255 async move {
256 let cfg: PublicNetworkConfig = provider
257 .fetch(retry, |client| {
258 let url = client.url.join("config/hotshot").unwrap();
259
260 reqwest::get(url.clone())
261 })
262 .await?
263 .json()
264 .await?;
265 cfg.into_network_config(my_own_validator_config)
266 .context("fetched config, but failed to convert to private config")
267 }
268 .boxed()
269 })
270 .await
271 }
272}
273
274#[async_trait]
275impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
276 #[tracing::instrument(skip(self, _instance))]
277 async fn try_fetch_accounts(
278 &self,
279 retry: usize,
280 _instance: &NodeState,
281 height: u64,
282 view: ViewNumber,
283 fee_merkle_tree_root: FeeMerkleCommitment,
284 accounts: &[FeeAccount],
285 ) -> anyhow::Result<Vec<FeeAccountProof>> {
286 self.fetch(retry, |client| async move {
287 let tree = client
288 .inner
289 .post::<FeeMerkleTree>(&format!("catchup/{height}/{}/accounts", view.u64()))
290 .body_binary(&accounts.to_vec())?
291 .send()
292 .await?;
293
294 let mut proofs = Vec::new();
296 for account in accounts {
297 let (proof, _) = FeeAccountProof::prove(&tree, (*account).into())
298 .context(format!("response missing fee account {account}"))?;
299 proof.verify(&fee_merkle_tree_root).context(format!(
300 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
301 ))?;
302 proofs.push(proof);
303 }
304
305 anyhow::Ok(proofs)
306 })
307 .await
308 }
309
310 #[tracing::instrument(skip(self, _instance, mt))]
311 async fn try_remember_blocks_merkle_tree(
312 &self,
313 retry: usize,
314 _instance: &NodeState,
315 height: u64,
316 view: ViewNumber,
317 mt: &mut BlockMerkleTree,
318 ) -> anyhow::Result<()> {
319 *mt = self
320 .fetch(retry, |client| {
321 let mut mt = mt.clone();
322 async move {
323 let frontier = client
324 .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
325 .send()
326 .await?;
327 let elem = frontier
328 .elem()
329 .context("provided frontier is missing leaf element")?;
330 mt.remember(mt.num_leaves() - 1, *elem, &frontier)
331 .context("verifying block proof")?;
332 anyhow::Ok(mt)
333 }
334 })
335 .await?;
336 Ok(())
337 }
338
339 async fn try_fetch_chain_config(
340 &self,
341 retry: usize,
342 commitment: Commitment<ChainConfig>,
343 ) -> anyhow::Result<ChainConfig> {
344 self.fetch(retry, |client| async move {
345 let cf = client
346 .get::<ChainConfig>(&format!("catchup/chain-config/{commitment}"))
347 .send()
348 .await?;
349 ensure!(
350 cf.commit() == commitment,
351 "received chain config with mismatched commitment: expected {commitment}, got {}",
352 cf.commit()
353 );
354 Ok(cf)
355 })
356 .await
357 }
358
359 async fn try_fetch_leaf(
360 &self,
361 retry: usize,
362 height: u64,
363 stake_table: HSStakeTable<SeqTypes>,
364 success_threshold: U256,
365 ) -> anyhow::Result<Leaf2> {
366 let leaf_chain = self
368 .fetch(retry, |client| async move {
369 let leaf = client
370 .get::<Vec<Leaf2>>(&format!("catchup/{height}/leafchain"))
371 .send()
372 .await?;
373 anyhow::Ok(leaf)
374 })
375 .await
376 .with_context(|| format!("failed to fetch leaf chain at height {height}"))?;
377
378 verify_leaf_chain(
380 leaf_chain,
381 &stake_table,
382 success_threshold,
383 height,
384 &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
385 )
386 .await
387 .with_context(|| format!("failed to verify leaf chain at height {height}"))
388 }
389
390 #[tracing::instrument(skip(self, _instance))]
391 async fn try_fetch_reward_accounts_v2(
392 &self,
393 retry: usize,
394 _instance: &NodeState,
395 height: u64,
396 view: ViewNumber,
397 reward_merkle_tree_root: RewardMerkleCommitmentV2,
398 accounts: &[RewardAccountV2],
399 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
400 self.fetch(retry, |client| async move {
401 let tree = client
402 .inner
403 .post::<RewardMerkleTreeV2>(&format!(
404 "catchup/{height}/{}/reward-accounts-v2",
405 view.u64()
406 ))
407 .body_binary(&accounts.to_vec())?
408 .send()
409 .await?;
410
411 let mut proofs = Vec::new();
414 for account in accounts {
415 let (proof, _) = RewardAccountProofV2::prove(&tree, (*account).into())
416 .context(format!("response missing reward account {account}"))?;
417 proof.verify(&reward_merkle_tree_root).context(format!(
418 "invalid proof for v2 reward account {account}, root: \
419 {reward_merkle_tree_root} height {height} view {view}"
420 ))?;
421 proofs.push(proof);
422 }
423
424 anyhow::Ok(proofs)
425 })
426 .await
427 }
428
429 #[tracing::instrument(skip(self, _instance))]
430 async fn try_fetch_reward_accounts_v1(
431 &self,
432 retry: usize,
433 _instance: &NodeState,
434 height: u64,
435 view: ViewNumber,
436 reward_merkle_tree_root: RewardMerkleCommitmentV1,
437 accounts: &[RewardAccountV1],
438 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
439 self.fetch(retry, |client| async move {
440 let tree = client
441 .inner
442 .post::<RewardMerkleTreeV1>(&format!(
443 "catchup/{height}/{}/reward-accounts",
444 view.u64()
445 ))
446 .body_binary(&accounts.to_vec())?
447 .send()
448 .await?;
449
450 let mut proofs = Vec::new();
452 for account in accounts {
453 let (proof, _) = RewardAccountProofV1::prove(&tree, (*account).into())
454 .context(format!("response missing reward account {account}"))?;
455 proof.verify(&reward_merkle_tree_root).context(format!(
456 "invalid proof for v1 reward account {account}, root: \
457 {reward_merkle_tree_root} height {height} view {view}"
458 ))?;
459 proofs.push(proof);
460 }
461
462 anyhow::Ok(proofs)
463 })
464 .await
465 }
466
467 async fn try_fetch_state_cert(
468 &self,
469 retry: usize,
470 epoch: u64,
471 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
472 self.fetch(retry, |client| async move {
473 client
474 .get::<LightClientStateUpdateCertificateV2<SeqTypes>>(&format!(
475 "catchup/{epoch}/state-cert"
476 ))
477 .send()
478 .await
479 })
480 .await
481 }
482
483 fn backoff(&self) -> &BackoffParams {
484 &self.backoff
485 }
486
487 fn name(&self) -> String {
488 format!(
489 "StatePeers({})",
490 self.clients
491 .iter()
492 .map(|client| client.url.to_string())
493 .join(",")
494 )
495 }
496
497 fn is_local(&self) -> bool {
498 false
499 }
500}
501
502pub(crate) trait CatchupStorage: Sync {
503 fn get_accounts(
514 &self,
515 _instance: &NodeState,
516 _height: u64,
517 _view: ViewNumber,
518 _accounts: &[FeeAccount],
519 ) -> impl Send + Future<Output = anyhow::Result<(FeeMerkleTree, Leaf2)>> {
520 async {
525 bail!("merklized state catchup is not supported for this data source");
526 }
527 }
528
529 fn get_reward_accounts_v1(
530 &self,
531 _instance: &NodeState,
532 _height: u64,
533 _view: ViewNumber,
534 _accounts: &[RewardAccountV1],
535 ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV1, Leaf2)>> {
536 async {
537 bail!("merklized state catchup is not supported for this data source");
538 }
539 }
540
541 fn get_reward_accounts_v2(
542 &self,
543 _instance: &NodeState,
544 _height: u64,
545 _view: ViewNumber,
546 _accounts: &[RewardAccountV2],
547 ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV2, Leaf2)>> {
548 async {
549 bail!("merklized state catchup is not supported for this data source");
550 }
551 }
552
553 fn get_all_reward_accounts(
554 &self,
555 _height: u64,
556 _offset: u64,
557 _limit: u64,
558 ) -> impl Send + Future<Output = anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>>> {
559 async { bail!("merklized state catchup is not supported for this data source") }
560 }
561
562 fn get_frontier(
569 &self,
570 _instance: &NodeState,
571 _height: u64,
572 _view: ViewNumber,
573 ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>> {
574 async {
579 bail!("merklized state catchup is not supported for this data source");
580 }
581 }
582
583 fn get_chain_config(
584 &self,
585 _commitment: Commitment<ChainConfig>,
586 ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>> {
587 async {
588 bail!("chain config catchup is not supported for this data source");
589 }
590 }
591
592 fn get_leaf_chain(
593 &self,
594 _height: u64,
595 ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>> {
596 async {
597 bail!("leaf chain catchup is not supported for this data source");
598 }
599 }
600}
601
602impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {}
603
604impl<T, S> CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
605where
606 T: CatchupStorage,
607 S: Sync,
608{
609 async fn get_accounts(
610 &self,
611 instance: &NodeState,
612 height: u64,
613 view: ViewNumber,
614 accounts: &[FeeAccount],
615 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
616 self.inner()
617 .get_accounts(instance, height, view, accounts)
618 .await
619 }
620
621 async fn get_reward_accounts_v2(
622 &self,
623 instance: &NodeState,
624 height: u64,
625 view: ViewNumber,
626 accounts: &[RewardAccountV2],
627 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
628 self.inner()
629 .get_reward_accounts_v2(instance, height, view, accounts)
630 .await
631 }
632
633 async fn get_all_reward_accounts(
634 &self,
635 height: u64,
636 offset: u64,
637 limit: u64,
638 ) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
639 self.inner()
640 .get_all_reward_accounts(height, offset, limit)
641 .await
642 }
643
644 async fn get_reward_accounts_v1(
645 &self,
646 instance: &NodeState,
647 height: u64,
648 view: ViewNumber,
649 accounts: &[RewardAccountV1],
650 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
651 self.inner()
652 .get_reward_accounts_v1(instance, height, view, accounts)
653 .await
654 }
655
656 async fn get_frontier(
657 &self,
658 instance: &NodeState,
659 height: u64,
660 view: ViewNumber,
661 ) -> anyhow::Result<BlocksFrontier> {
662 self.inner().get_frontier(instance, height, view).await
663 }
664
665 async fn get_chain_config(
666 &self,
667 commitment: Commitment<ChainConfig>,
668 ) -> anyhow::Result<ChainConfig> {
669 self.inner().get_chain_config(commitment).await
670 }
671 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
672 self.inner().get_leaf_chain(height).await
673 }
674}
675
676#[derive(Debug)]
677pub(crate) struct SqlStateCatchup<T> {
678 db: Arc<T>,
679 backoff: BackoffParams,
680}
681
682impl<T> SqlStateCatchup<T> {
683 pub(crate) fn new(db: Arc<T>, backoff: BackoffParams) -> Self {
684 Self { db, backoff }
685 }
686}
687
688#[async_trait]
689impl<T> StateCatchup for SqlStateCatchup<T>
690where
691 T: CatchupStorage + Send + Sync,
692{
693 async fn try_fetch_leaf(
694 &self,
695 _retry: usize,
696 height: u64,
697 stake_table: HSStakeTable<SeqTypes>,
698 success_threshold: U256,
699 ) -> anyhow::Result<Leaf2> {
700 let leaf_chain = self
702 .db
703 .get_leaf_chain(height)
704 .await
705 .with_context(|| "failed to get leaf chain from DB")?;
706
707 let leaf = verify_leaf_chain(
709 leaf_chain,
710 &stake_table,
711 success_threshold,
712 height,
713 &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
714 )
715 .await
716 .with_context(|| "failed to verify leaf chain")?;
717
718 Ok(leaf)
719 }
720 #[tracing::instrument(skip(self, _retry, instance))]
723 async fn try_fetch_accounts(
724 &self,
725 _retry: usize,
726 instance: &NodeState,
727 block_height: u64,
728 view: ViewNumber,
729 fee_merkle_tree_root: FeeMerkleCommitment,
730 accounts: &[FeeAccount],
731 ) -> anyhow::Result<Vec<FeeAccountProof>> {
732 let (fee_merkle_tree_from_db, _) = self
734 .db
735 .get_accounts(instance, block_height, view, accounts)
736 .await
737 .with_context(|| "failed to get fee accounts from DB")?;
738
739 let mut proofs = Vec::new();
741 for account in accounts {
742 let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree_from_db, (*account).into())
743 .context(format!("response missing account {account}"))?;
744 proof.verify(&fee_merkle_tree_root).context(format!(
745 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
746 ))?;
747 proofs.push(proof);
748 }
749
750 Ok(proofs)
751 }
752
753 #[tracing::instrument(skip(self, _retry, instance, mt))]
754 async fn try_remember_blocks_merkle_tree(
755 &self,
756 _retry: usize,
757 instance: &NodeState,
758 bh: u64,
759 view: ViewNumber,
760 mt: &mut BlockMerkleTree,
761 ) -> anyhow::Result<()> {
762 if bh == 0 {
763 return Ok(());
764 }
765
766 let proof = self.db.get_frontier(instance, bh, view).await?;
767 match proof
768 .proof
769 .first()
770 .context(format!("empty proof for frontier at height {bh}"))?
771 {
772 MerkleNode::Leaf { pos, elem, .. } => mt
773 .remember(pos, elem, proof.clone())
774 .context("failed to remember proof"),
775 _ => bail!("invalid proof"),
776 }
777 }
778
779 async fn try_fetch_chain_config(
780 &self,
781 _retry: usize,
782 commitment: Commitment<ChainConfig>,
783 ) -> anyhow::Result<ChainConfig> {
784 let cf = self.db.get_chain_config(commitment).await?;
785
786 if cf.commit() != commitment {
787 panic!(
788 "Critical error: Mismatched chain config detected. Expected chain config: {:?}, \
789 but got: {:?}.
790 This may indicate a compromised database",
791 commitment,
792 cf.commit()
793 )
794 }
795
796 Ok(cf)
797 }
798
799 #[tracing::instrument(skip(self, _retry, instance))]
800 async fn try_fetch_reward_accounts_v2(
801 &self,
802 _retry: usize,
803 instance: &NodeState,
804 block_height: u64,
805 view: ViewNumber,
806 reward_merkle_tree_root: RewardMerkleCommitmentV2,
807 accounts: &[RewardAccountV2],
808 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
809 let (reward_merkle_tree_from_db, _) = self
811 .db
812 .get_reward_accounts_v2(instance, block_height, view, accounts)
813 .await
814 .with_context(|| "failed to get reward accounts from DB")?;
815 let mut proofs = Vec::new();
817 for account in accounts {
818 let (proof, _) =
819 RewardAccountProofV2::prove(&reward_merkle_tree_from_db, (*account).into())
820 .context(format!("response missing account {account}"))?;
821 proof.verify(&reward_merkle_tree_root).context(format!(
822 "invalid proof for v2 reward account {account}, root: {reward_merkle_tree_root}"
823 ))?;
824 proofs.push(proof);
825 }
826
827 Ok(proofs)
828 }
829
830 #[tracing::instrument(skip(self, _retry, instance))]
831 async fn try_fetch_reward_accounts_v1(
832 &self,
833 _retry: usize,
834 instance: &NodeState,
835 block_height: u64,
836 view: ViewNumber,
837 reward_merkle_tree_root: RewardMerkleCommitmentV1,
838 accounts: &[RewardAccountV1],
839 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
840 let (reward_merkle_tree_from_db, _) = self
842 .db
843 .get_reward_accounts_v1(instance, block_height, view, accounts)
844 .await
845 .with_context(|| "failed to get reward accounts from DB")?;
846 let mut proofs = Vec::new();
848 for account in accounts {
849 let (proof, _) =
850 RewardAccountProofV1::prove(&reward_merkle_tree_from_db, (*account).into())
851 .context(format!("response missing account {account}"))?;
852 proof.verify(&reward_merkle_tree_root).context(format!(
853 "invalid proof for v1 reward account {account}, root: {reward_merkle_tree_root}"
854 ))?;
855 proofs.push(proof);
856 }
857
858 Ok(proofs)
859 }
860
861 async fn try_fetch_state_cert(
862 &self,
863 _retry: usize,
864 _epoch: u64,
865 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
866 bail!("state cert catchup not supported for SqlStateCatchup");
867 }
868
869 fn backoff(&self) -> &BackoffParams {
870 &self.backoff
871 }
872
873 fn name(&self) -> String {
874 "SqlStateCatchup".into()
875 }
876
877 fn is_local(&self) -> bool {
878 true
879 }
880}
881
882#[derive(Clone, Debug)]
884pub struct NullStateCatchup {
885 backoff: BackoffParams,
886 chain_configs: HashMap<Commitment<ChainConfig>, ChainConfig>,
887}
888
889impl Default for NullStateCatchup {
890 fn default() -> Self {
891 Self {
892 backoff: BackoffParams::disabled(),
893 chain_configs: Default::default(),
894 }
895 }
896}
897
898impl NullStateCatchup {
899 pub fn add_chain_config(&mut self, cf: ChainConfig) {
909 self.chain_configs.insert(cf.commit(), cf);
910 }
911}
912
913#[async_trait]
914impl StateCatchup for NullStateCatchup {
915 async fn try_fetch_leaf(
916 &self,
917 _retry: usize,
918 _height: u64,
919 _stake_table: HSStakeTable<SeqTypes>,
920 _success_threshold: U256,
921 ) -> anyhow::Result<Leaf2> {
922 bail!("state catchup is disabled")
923 }
924
925 async fn try_fetch_accounts(
926 &self,
927 _retry: usize,
928 _instance: &NodeState,
929 _height: u64,
930 _view: ViewNumber,
931 _fee_merkle_tree_root: FeeMerkleCommitment,
932 _account: &[FeeAccount],
933 ) -> anyhow::Result<Vec<FeeAccountProof>> {
934 bail!("state catchup is disabled");
935 }
936
937 async fn try_fetch_reward_accounts_v2(
938 &self,
939 _retry: usize,
940 _instance: &NodeState,
941 _height: u64,
942 _view: ViewNumber,
943 _fee_merkle_tree_root: RewardMerkleCommitmentV2,
944 _account: &[RewardAccountV2],
945 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
946 bail!("state catchup is disabled");
947 }
948
949 async fn try_remember_blocks_merkle_tree(
950 &self,
951 _retry: usize,
952 _instance: &NodeState,
953 _height: u64,
954 _view: ViewNumber,
955 _mt: &mut BlockMerkleTree,
956 ) -> anyhow::Result<()> {
957 bail!("state catchup is disabled");
958 }
959
960 async fn try_fetch_chain_config(
961 &self,
962 _retry: usize,
963 commitment: Commitment<ChainConfig>,
964 ) -> anyhow::Result<ChainConfig> {
965 self.chain_configs
966 .get(&commitment)
967 .copied()
968 .context(format!("chain config {commitment} not available"))
969 }
970
971 async fn try_fetch_reward_accounts_v1(
972 &self,
973 _retry: usize,
974 _instance: &NodeState,
975 _height: u64,
976 _view: ViewNumber,
977 _fee_merkle_tree_root: RewardMerkleCommitmentV1,
978 _account: &[RewardAccountV1],
979 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
980 bail!("state catchup is disabled");
981 }
982
983 async fn try_fetch_state_cert(
984 &self,
985 _retry: usize,
986 _epoch: u64,
987 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
988 bail!("state catchup is disabled");
989 }
990
991 fn backoff(&self) -> &BackoffParams {
992 &self.backoff
993 }
994
995 fn name(&self) -> String {
996 "NullStateCatchup".into()
997 }
998
999 fn is_local(&self) -> bool {
1000 true
1001 }
1002}
1003
1004#[derive(Clone)]
1007pub struct ParallelStateCatchup {
1008 providers: Arc<Mutex<Vec<Arc<dyn StateCatchup>>>>,
1009 backoff: BackoffParams,
1010}
1011
1012impl ParallelStateCatchup {
1013 pub fn new(providers: &[Arc<dyn StateCatchup>]) -> Self {
1015 Self {
1016 providers: Arc::new(Mutex::new(providers.to_vec())),
1017 backoff: BackoffParams::disabled(),
1018 }
1019 }
1020
1021 pub fn add_provider(&self, provider: Arc<dyn StateCatchup>) {
1023 self.providers.lock().push(provider);
1024 }
1025
1026 pub async fn on_local_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1028 where
1029 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1030 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1031 RT: Send + Sync + 'static,
1032 {
1033 self.on_providers(|provider| provider.is_local(), closure)
1034 .await
1035 }
1036
1037 pub async fn on_remote_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1039 where
1040 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1041 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1042 RT: Send + Sync + 'static,
1043 {
1044 self.on_providers(|provider| !provider.is_local(), closure)
1045 .await
1046 }
1047
1048 pub async fn on_providers<P, C, F, RT>(&self, predicate: P, closure: C) -> anyhow::Result<RT>
1050 where
1051 P: Fn(&Arc<dyn StateCatchup>) -> bool + Clone + Send + Sync + 'static,
1052 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1053 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1054 RT: Send + Sync + 'static,
1055 {
1056 let providers = self.providers.lock().clone();
1058 if providers.is_empty() {
1059 return Err(anyhow::anyhow!("no providers were initialized"));
1060 }
1061
1062 let providers = providers.into_iter().filter(predicate).collect::<Vec<_>>();
1064 if providers.is_empty() {
1065 return Err(anyhow::anyhow!("no providers matched the given predicate"));
1066 }
1067
1068 let mut futures = FuturesUnordered::new();
1070 for provider in providers {
1071 let closure = closure.clone();
1072 futures.push(AbortOnDropHandle::new(tokio::spawn(closure(provider))));
1073 }
1074
1075 let mut logs = vec![format!("No providers returned a successful result.\n")];
1076 while let Some(result) = futures.next().await {
1078 let result = match result {
1080 Ok(res) => res,
1081 Err(err) => {
1082 tracing::debug!("Failed to join on provider: {err:#}.");
1083 logs.push(format!("Failed to join on provider: {err:#}."));
1084 continue;
1085 },
1086 };
1087
1088 let result = match result {
1090 Ok(res) => res,
1091 Err(err) => {
1092 tracing::debug!("Failed to fetch data: {err:#}.");
1093 logs.push(format!("Failed to fetch data: {err:#}."));
1094 continue;
1095 },
1096 };
1097
1098 return Ok(result);
1099 }
1100
1101 Err(anyhow::anyhow!(logs.join("\n")))
1102 }
1103}
1104
1105macro_rules! clone {
1106 ( ($( $x:ident ),*) $y:expr ) => {
1107 {
1108 $(let $x = $x.clone();)*
1109 $y
1110 }
1111 };
1112}
1113
1114#[async_trait]
1117impl StateCatchup for ParallelStateCatchup {
1118 async fn try_fetch_leaf(
1119 &self,
1120 retry: usize,
1121 height: u64,
1122 stake_table: HSStakeTable<SeqTypes>,
1123 success_threshold: U256,
1124 ) -> anyhow::Result<Leaf2> {
1125 let local_result = self
1127 .on_local_providers(clone! {(stake_table) move |provider| {
1128 clone!{(stake_table) async move {
1129 provider
1130 .try_fetch_leaf(retry, height, stake_table, success_threshold)
1131 .await
1132 }}
1133 }})
1134 .await;
1135
1136 if local_result.is_ok() {
1138 return local_result;
1139 }
1140
1141 self.on_remote_providers(clone! {(stake_table) move |provider| {
1143 clone!{(stake_table) async move {
1144 provider
1145 .try_fetch_leaf(retry, height, stake_table, success_threshold)
1146 .await
1147 }}
1148 }})
1149 .await
1150 }
1151
1152 async fn try_fetch_accounts(
1153 &self,
1154 retry: usize,
1155 instance: &NodeState,
1156 height: u64,
1157 view: ViewNumber,
1158 fee_merkle_tree_root: FeeMerkleCommitment,
1159 accounts: &[FeeAccount],
1160 ) -> anyhow::Result<Vec<FeeAccountProof>> {
1161 let accounts_vec = accounts.to_vec();
1163 let local_result = self
1164 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1165 clone! {(instance, accounts_vec) async move {
1166 provider
1167 .try_fetch_accounts(
1168 retry,
1169 &instance,
1170 height,
1171 view,
1172 fee_merkle_tree_root,
1173 &accounts_vec,
1174 )
1175 .await
1176 }}
1177 }})
1178 .await;
1179
1180 if local_result.is_ok() {
1182 return local_result;
1183 }
1184
1185 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1187 clone!{(instance, accounts_vec) async move {
1188 provider
1189 .try_fetch_accounts(
1190 retry,
1191 &instance,
1192 height,
1193 view,
1194 fee_merkle_tree_root,
1195 &accounts_vec,
1196 ).await
1197 }}
1198 }})
1199 .await
1200 }
1201
1202 async fn try_remember_blocks_merkle_tree(
1203 &self,
1204 retry: usize,
1205 instance: &NodeState,
1206 height: u64,
1207 view: ViewNumber,
1208 mt: &mut BlockMerkleTree,
1209 ) -> anyhow::Result<()> {
1210 let local_result = self
1212 .on_local_providers(clone! {(mt, instance) move |provider| {
1213 let mut mt = mt.clone();
1214 clone! {(instance) async move {
1215 provider
1217 .try_remember_blocks_merkle_tree(
1218 retry,
1219 &instance,
1220 height,
1221 view,
1222 &mut mt,
1223 )
1224 .await?;
1225
1226 Ok(mt)
1228 }}
1229 }})
1230 .await;
1231
1232 if let Ok(modified_mt) = local_result {
1234 *mt = modified_mt;
1236
1237 return Ok(());
1238 }
1239
1240 let remote_result = self
1242 .on_remote_providers(clone! {(mt, instance) move |provider| {
1243 let mut mt = mt.clone();
1244 clone!{(instance) async move {
1245 provider
1247 .try_remember_blocks_merkle_tree(
1248 retry,
1249 &instance,
1250 height,
1251 view,
1252 &mut mt,
1253 )
1254 .await?;
1255
1256 Ok(mt)
1258 }}
1259 }})
1260 .await?;
1261
1262 *mt = remote_result;
1264
1265 Ok(())
1266 }
1267
1268 async fn try_fetch_chain_config(
1269 &self,
1270 retry: usize,
1271 commitment: Commitment<ChainConfig>,
1272 ) -> anyhow::Result<ChainConfig> {
1273 let local_result = self
1275 .on_local_providers(move |provider| async move {
1276 provider.try_fetch_chain_config(retry, commitment).await
1277 })
1278 .await;
1279
1280 if local_result.is_ok() {
1282 return local_result;
1283 }
1284
1285 self.on_remote_providers(move |provider| async move {
1287 provider.try_fetch_chain_config(retry, commitment).await
1288 })
1289 .await
1290 }
1291
1292 async fn try_fetch_reward_accounts_v2(
1293 &self,
1294 retry: usize,
1295 instance: &NodeState,
1296 height: u64,
1297 view: ViewNumber,
1298 reward_merkle_tree_root: RewardMerkleCommitmentV2,
1299 accounts: &[RewardAccountV2],
1300 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
1301 let accounts_vec = accounts.to_vec();
1303 let local_result = self
1304 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1305 clone! {(instance, accounts_vec) async move {
1306 provider
1307 .try_fetch_reward_accounts_v2(
1308 retry,
1309 &instance,
1310 height,
1311 view,
1312 reward_merkle_tree_root,
1313 &accounts_vec,
1314 )
1315 .await
1316 }}
1317 }})
1318 .await;
1319
1320 if local_result.is_ok() {
1322 return local_result;
1323 }
1324
1325 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1327 clone!{(instance, accounts_vec) async move {
1328 provider
1329 .try_fetch_reward_accounts_v2(
1330 retry,
1331 &instance,
1332 height,
1333 view,
1334 reward_merkle_tree_root,
1335 &accounts_vec,
1336 ).await
1337 }}
1338 }})
1339 .await
1340 }
1341
1342 async fn try_fetch_reward_accounts_v1(
1343 &self,
1344 retry: usize,
1345 instance: &NodeState,
1346 height: u64,
1347 view: ViewNumber,
1348 reward_merkle_tree_root: RewardMerkleCommitmentV1,
1349 accounts: &[RewardAccountV1],
1350 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1351 let accounts_vec = accounts.to_vec();
1353 let local_result = self
1354 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1355 clone! {(instance, accounts_vec) async move {
1356 provider
1357 .try_fetch_reward_accounts_v1(
1358 retry,
1359 &instance,
1360 height,
1361 view,
1362 reward_merkle_tree_root,
1363 &accounts_vec,
1364 )
1365 .await
1366 }}
1367 }})
1368 .await;
1369
1370 if local_result.is_ok() {
1372 return local_result;
1373 }
1374
1375 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1377 clone!{(instance, accounts_vec) async move {
1378 provider
1379 .try_fetch_reward_accounts_v1(
1380 retry,
1381 &instance,
1382 height,
1383 view,
1384 reward_merkle_tree_root,
1385 &accounts_vec,
1386 ).await
1387 }}
1388 }})
1389 .await
1390 }
1391
1392 async fn try_fetch_state_cert(
1393 &self,
1394 retry: usize,
1395 epoch: u64,
1396 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1397 let local_result = self
1399 .on_local_providers(move |provider| async move {
1400 provider.try_fetch_state_cert(retry, epoch).await
1401 })
1402 .await;
1403
1404 if local_result.is_ok() {
1406 return local_result;
1407 }
1408
1409 self.on_remote_providers(move |provider| async move {
1411 provider.try_fetch_state_cert(retry, epoch).await
1412 })
1413 .await
1414 }
1415
1416 fn backoff(&self) -> &BackoffParams {
1417 &self.backoff
1418 }
1419
1420 fn name(&self) -> String {
1421 format!(
1422 "[{}]",
1423 self.providers
1424 .lock()
1425 .iter()
1426 .map(|p| p.name())
1427 .collect::<Vec<_>>()
1428 .join(", ")
1429 )
1430 }
1431
1432 async fn fetch_accounts(
1433 &self,
1434 instance: &NodeState,
1435 height: u64,
1436 view: ViewNumber,
1437 fee_merkle_tree_root: FeeMerkleCommitment,
1438 accounts: Vec<FeeAccount>,
1439 ) -> anyhow::Result<Vec<FeeAccountProof>> {
1440 let accounts_vec = accounts.to_vec();
1442 let local_result = self
1443 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1444 clone! {(instance, accounts_vec) async move {
1445 provider
1446 .try_fetch_accounts(
1447 0,
1448 &instance,
1449 height,
1450 view,
1451 fee_merkle_tree_root,
1452 &accounts_vec,
1453 )
1454 .await
1455 }}
1456 }})
1457 .await;
1458
1459 if local_result.is_ok() {
1461 return local_result;
1462 }
1463
1464 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1466 clone!{(instance, accounts_vec) async move {
1467 provider
1468 .fetch_accounts(
1469 &instance,
1470 height,
1471 view,
1472 fee_merkle_tree_root,
1473 accounts_vec,
1474 ).await
1475 }}
1476 }})
1477 .await
1478 }
1479
1480 async fn fetch_leaf(
1481 &self,
1482 height: u64,
1483 stake_table: HSStakeTable<SeqTypes>,
1484 success_threshold: U256,
1485 ) -> anyhow::Result<Leaf2> {
1486 let local_result = self
1488 .on_local_providers(clone! {(stake_table) move |provider| {
1489 clone!{(stake_table) async move {
1490 provider
1491 .try_fetch_leaf(0, height, stake_table, success_threshold)
1492 .await
1493 }}
1494 }})
1495 .await;
1496
1497 if local_result.is_ok() {
1499 return local_result;
1500 }
1501
1502 self.on_remote_providers(clone! {(stake_table) move |provider| {
1504 clone!{(stake_table) async move {
1505 provider
1506 .fetch_leaf(height, stake_table, success_threshold)
1507 .await
1508 }}
1509 }})
1510 .await
1511 }
1512
1513 async fn fetch_chain_config(
1514 &self,
1515 commitment: Commitment<ChainConfig>,
1516 ) -> anyhow::Result<ChainConfig> {
1517 let local_result = self
1519 .on_local_providers(move |provider| async move {
1520 provider.try_fetch_chain_config(0, commitment).await
1521 })
1522 .await;
1523
1524 if local_result.is_ok() {
1526 return local_result;
1527 }
1528
1529 self.on_remote_providers(move |provider| async move {
1531 provider.fetch_chain_config(commitment).await
1532 })
1533 .await
1534 }
1535
1536 async fn fetch_reward_accounts_v2(
1537 &self,
1538 instance: &NodeState,
1539 height: u64,
1540 view: ViewNumber,
1541 reward_merkle_tree_root: RewardMerkleCommitmentV2,
1542 accounts: Vec<RewardAccountV2>,
1543 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
1544 let accounts_vec = accounts.to_vec();
1546 let local_result = self
1547 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1548 clone! {(instance, accounts_vec) async move {
1549 provider
1550 .try_fetch_reward_accounts_v2(
1551 0,
1552 &instance,
1553 height,
1554 view,
1555 reward_merkle_tree_root,
1556 &accounts_vec,
1557 )
1558 .await
1559 }}
1560 }})
1561 .await;
1562
1563 if local_result.is_ok() {
1565 return local_result;
1566 }
1567
1568 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1570 clone!{(instance, accounts_vec) async move {
1571 provider
1572 .fetch_reward_accounts_v2(
1573 &instance,
1574 height,
1575 view,
1576 reward_merkle_tree_root,
1577 accounts_vec,
1578 ).await
1579 }}
1580 }})
1581 .await
1582 }
1583
1584 async fn fetch_reward_accounts_v1(
1585 &self,
1586 instance: &NodeState,
1587 height: u64,
1588 view: ViewNumber,
1589 reward_merkle_tree_root: RewardMerkleCommitmentV1,
1590 accounts: Vec<RewardAccountV1>,
1591 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1592 let accounts_vec = accounts.to_vec();
1594 let local_result = self
1595 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1596 clone! {(instance, accounts_vec) async move {
1597 provider
1598 .try_fetch_reward_accounts_v1(
1599 0,
1600 &instance,
1601 height,
1602 view,
1603 reward_merkle_tree_root,
1604 &accounts_vec,
1605 )
1606 .await
1607 }}
1608 }})
1609 .await;
1610
1611 if local_result.is_ok() {
1613 return local_result;
1614 }
1615
1616 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1618 clone!{(instance, accounts_vec) async move {
1619 provider
1620 .fetch_reward_accounts_v1(
1621 &instance,
1622 height,
1623 view,
1624 reward_merkle_tree_root,
1625 accounts_vec,
1626 ).await
1627 }}
1628 }})
1629 .await
1630 }
1631
1632 async fn fetch_state_cert(
1633 &self,
1634 epoch: u64,
1635 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1636 let local_result = self
1637 .on_local_providers(move |provider| async move {
1638 provider.try_fetch_state_cert(0, epoch).await
1639 })
1640 .await;
1641
1642 if local_result.is_ok() {
1644 return local_result;
1645 }
1646
1647 self.on_remote_providers(
1649 move |provider| async move { provider.fetch_state_cert(epoch).await },
1650 )
1651 .await
1652 }
1653
1654 async fn remember_blocks_merkle_tree(
1655 &self,
1656 instance: &NodeState,
1657 height: u64,
1658 view: ViewNumber,
1659 mt: &mut BlockMerkleTree,
1660 ) -> anyhow::Result<()> {
1661 let local_result = self
1663 .on_local_providers(clone! {(mt, instance) move |provider| {
1664 let mut mt = mt.clone();
1665 clone! {(instance) async move {
1666 provider
1668 .try_remember_blocks_merkle_tree(
1669 0,
1670 &instance,
1671 height,
1672 view,
1673 &mut mt,
1674 )
1675 .await?;
1676
1677 Ok(mt)
1679 }}
1680 }})
1681 .await;
1682
1683 if let Ok(modified_mt) = local_result {
1685 *mt = modified_mt;
1688
1689 return Ok(());
1690 }
1691
1692 let remote_result = self
1694 .on_remote_providers(clone! {(mt, instance) move |provider| {
1695 let mut mt = mt.clone();
1696 clone!{(instance) async move {
1697 provider
1699 .remember_blocks_merkle_tree(
1700 &instance,
1701 height,
1702 view,
1703 &mut mt,
1704 )
1705 .await?;
1706
1707 Ok(mt)
1709 }}
1710 }})
1711 .await?;
1712
1713 *mt = remote_result;
1715
1716 Ok(())
1717 }
1718
1719 fn is_local(&self) -> bool {
1720 self.providers.lock().iter().all(|p| p.is_local())
1721 }
1722}
1723
1724#[allow(clippy::type_complexity)]
1727pub async fn add_fee_accounts_to_state<
1728 N: ConnectedNetwork<PubKey>,
1729 V: Versions,
1730 P: SequencerPersistence,
1731>(
1732 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1733 view: &<SeqTypes as NodeType>::View,
1734 accounts: &[FeeAccount],
1735 tree: &FeeMerkleTree,
1736 leaf: Leaf2,
1737) -> anyhow::Result<()> {
1738 let mut consensus = consensus.write().await;
1740
1741 let (state, delta) = match consensus.validated_state_map().get(view) {
1742 Some(View {
1743 view_inner: ViewInner::Leaf { state, delta, .. },
1744 }) => {
1745 let mut state = (**state).clone();
1746
1747 for account in accounts {
1749 if let Some((proof, _)) = FeeAccountProof::prove(tree, (*account).into()) {
1750 if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
1751 tracing::warn!(
1752 ?view,
1753 %account,
1754 "cannot update fetched account state: {err:#}"
1755 );
1756 }
1757 } else {
1758 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1759 };
1760 }
1761
1762 (Arc::new(state), delta.clone())
1763 },
1764 _ => {
1765 let mut state = ValidatedState::from_header(leaf.block_header());
1770 state.fee_merkle_tree = tree.clone();
1771 (Arc::new(state), None)
1772 },
1773 };
1774
1775 consensus
1776 .update_leaf(leaf, Arc::clone(&state), delta)
1777 .with_context(|| "failed to update leaf")?;
1778
1779 Ok(())
1780}
1781
1782#[allow(clippy::type_complexity)]
1785pub async fn add_v2_reward_accounts_to_state<
1786 N: ConnectedNetwork<PubKey>,
1787 V: Versions,
1788 P: SequencerPersistence,
1789>(
1790 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1791 view: &<SeqTypes as NodeType>::View,
1792 accounts: &[RewardAccountV2],
1793 tree: &RewardMerkleTreeV2,
1794 leaf: Leaf2,
1795) -> anyhow::Result<()> {
1796 let mut consensus = consensus.write().await;
1798
1799 let (state, delta) = match consensus.validated_state_map().get(view) {
1800 Some(View {
1801 view_inner: ViewInner::Leaf { state, delta, .. },
1802 }) => {
1803 let mut state = (**state).clone();
1804
1805 for account in accounts {
1807 if let Some((proof, _)) = RewardAccountProofV2::prove(tree, (*account).into()) {
1808 if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v2) {
1809 tracing::warn!(
1810 ?view,
1811 %account,
1812 "cannot update fetched account state: {err:#}"
1813 );
1814 }
1815 } else {
1816 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1817 };
1818 }
1819
1820 (Arc::new(state), delta.clone())
1821 },
1822 _ => {
1823 let mut state = ValidatedState::from_header(leaf.block_header());
1828 state.reward_merkle_tree_v2 = tree.clone();
1829 (Arc::new(state), None)
1830 },
1831 };
1832
1833 consensus
1834 .update_leaf(leaf, Arc::clone(&state), delta)
1835 .with_context(|| "failed to update leaf")?;
1836
1837 Ok(())
1838}
1839
1840#[allow(clippy::type_complexity)]
1843pub async fn add_v1_reward_accounts_to_state<
1844 N: ConnectedNetwork<PubKey>,
1845 V: Versions,
1846 P: SequencerPersistence,
1847>(
1848 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1849 view: &<SeqTypes as NodeType>::View,
1850 accounts: &[RewardAccountV1],
1851 tree: &RewardMerkleTreeV1,
1852 leaf: Leaf2,
1853) -> anyhow::Result<()> {
1854 let mut consensus = consensus.write().await;
1856
1857 let (state, delta) = match consensus.validated_state_map().get(view) {
1858 Some(View {
1859 view_inner: ViewInner::Leaf { state, delta, .. },
1860 }) => {
1861 let mut state = (**state).clone();
1862
1863 for account in accounts {
1865 if let Some((proof, _)) = RewardAccountProofV1::prove(tree, (*account).into()) {
1866 if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v1) {
1867 tracing::warn!(
1868 ?view,
1869 %account,
1870 "cannot update fetched account state: {err:#}"
1871 );
1872 }
1873 } else {
1874 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1875 };
1876 }
1877
1878 (Arc::new(state), delta.clone())
1879 },
1880 _ => {
1881 let mut state = ValidatedState::from_header(leaf.block_header());
1886 state.reward_merkle_tree_v1 = tree.clone();
1887 (Arc::new(state), None)
1888 },
1889 };
1890
1891 consensus
1892 .update_leaf(leaf, Arc::clone(&state), delta)
1893 .with_context(|| "failed to update leaf")?;
1894
1895 Ok(())
1896}
1897
1898#[cfg(test)]
1899mod test {
1900 use super::*;
1901
1902 #[test]
1903 fn test_peer_priority() {
1904 let good_peer = PeerScore {
1905 requests: 1000,
1906 failures: 2,
1907 };
1908 let bad_peer = PeerScore {
1909 requests: 10,
1910 failures: 1,
1911 };
1912 assert!(good_peer > bad_peer);
1913
1914 let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect();
1915 assert_eq!(peers.pop(), Some((0, good_peer)));
1916 assert_eq!(peers.pop(), Some((1, bad_peer)));
1917 }
1918}