1use alloy::primitives::U256;
2use anyhow::Context;
3use async_trait::async_trait;
4use committable::{Commitment, Committable};
5use espresso_types::{
6 traits::{SequencerPersistence, StateCatchup},
7 v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1},
8 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
9 BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
10 Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions,
11};
12use hotshot::traits::NodeImplementation;
13use hotshot_types::{
14 data::ViewNumber,
15 message::UpgradeLock,
16 simple_certificate::LightClientStateUpdateCertificateV2,
17 stake_table::HSStakeTable,
18 traits::{network::ConnectedNetwork, node_implementation::Versions},
19 utils::verify_leaf_chain,
20};
21use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme};
22use request_response::RequestType;
23use tokio::time::timeout;
24
25use crate::request_response::{
26 request::{Request, Response},
27 RequestResponseProtocol,
28};
29
30#[async_trait]
31impl<
32 I: NodeImplementation<SeqTypes>,
33 V: Versions,
34 N: ConnectedNetwork<PubKey>,
35 P: SequencerPersistence,
36 > StateCatchup for RequestResponseProtocol<I, V, N, P>
37{
38 async fn try_fetch_leaf(
39 &self,
40 _retry: usize,
41 height: u64,
42 stake_table: HSStakeTable<SeqTypes>,
43 success_threshold: U256,
44 ) -> anyhow::Result<Leaf2> {
45 let timeout_duration = self.config.request_batch_interval * 3;
47
48 timeout(
50 timeout_duration,
51 self.fetch_leaf(height, stake_table, success_threshold),
52 )
53 .await
54 .with_context(|| "timed out while fetching leaf")?
55 }
56
57 async fn try_fetch_accounts(
58 &self,
59 _retry: usize,
60 instance: &NodeState,
61 height: u64,
62 view: ViewNumber,
63 fee_merkle_tree_root: FeeMerkleCommitment,
64 accounts: &[FeeAccount],
65 ) -> anyhow::Result<Vec<FeeAccountProof>> {
66 let timeout_duration = self.config.request_batch_interval * 3;
68
69 timeout(
71 timeout_duration,
72 self.fetch_accounts(
73 instance,
74 height,
75 view,
76 fee_merkle_tree_root,
77 accounts.to_vec(),
78 ),
79 )
80 .await
81 .with_context(|| "timed out while fetching accounts")?
82 }
83
84 async fn try_remember_blocks_merkle_tree(
85 &self,
86 _retry: usize,
87 instance: &NodeState,
88 height: u64,
89 view: ViewNumber,
90 mt: &mut BlockMerkleTree,
91 ) -> anyhow::Result<()> {
92 let timeout_duration = self.config.request_batch_interval * 3;
94
95 timeout(
97 timeout_duration,
98 self.remember_blocks_merkle_tree(instance, height, view, mt),
99 )
100 .await
101 .with_context(|| "timed out while remembering blocks merkle tree")?
102 }
103
104 async fn try_fetch_chain_config(
105 &self,
106 _retry: usize,
107 commitment: Commitment<ChainConfig>,
108 ) -> anyhow::Result<ChainConfig> {
109 let timeout_duration = self.config.request_batch_interval * 3;
111
112 timeout(timeout_duration, self.fetch_chain_config(commitment))
114 .await
115 .with_context(|| "timed out while fetching chain config")?
116 }
117
118 async fn try_fetch_reward_accounts_v2(
119 &self,
120 _retry: usize,
121 instance: &NodeState,
122 height: u64,
123 view: ViewNumber,
124 reward_merkle_tree_root: RewardMerkleCommitmentV2,
125 accounts: &[RewardAccountV2],
126 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
127 let timeout_duration = self.config.request_batch_interval * 3;
129
130 timeout(
132 timeout_duration,
133 self.fetch_reward_accounts_v2(
134 instance,
135 height,
136 view,
137 reward_merkle_tree_root,
138 accounts.to_vec(),
139 ),
140 )
141 .await
142 .with_context(|| "timed out while fetching reward accounts")?
143 }
144
145 async fn try_fetch_reward_accounts_v1(
146 &self,
147 _retry: usize,
148 instance: &NodeState,
149 height: u64,
150 view: ViewNumber,
151 reward_merkle_tree_root: RewardMerkleCommitmentV1,
152 accounts: &[RewardAccountV1],
153 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
154 let timeout_duration = self.config.request_batch_interval * 3;
156
157 timeout(
159 timeout_duration,
160 self.fetch_reward_accounts_v1(
161 instance,
162 height,
163 view,
164 reward_merkle_tree_root,
165 accounts.to_vec(),
166 ),
167 )
168 .await
169 .with_context(|| "timed out while fetching reward accounts")?
170 }
171
172 async fn try_fetch_state_cert(
173 &self,
174 _retry: usize,
175 epoch: u64,
176 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
177 let timeout_duration = self.config.request_batch_interval * 3;
178
179 timeout(timeout_duration, self.fetch_state_cert(epoch))
180 .await
181 .with_context(|| "timed out while fetching state cert")?
182 }
183
184 fn backoff(&self) -> &BackoffParams {
185 unreachable!()
186 }
187
188 fn name(&self) -> String {
189 "request-response".to_string()
190 }
191
192 async fn fetch_accounts(
193 &self,
194 _instance: &NodeState,
195 height: u64,
196 view: ViewNumber,
197 fee_merkle_tree_root: FeeMerkleCommitment,
198 accounts: Vec<FeeAccount>,
199 ) -> anyhow::Result<Vec<FeeAccountProof>> {
200 tracing::info!("Fetching accounts for height: {height}, view: {view}");
201
202 let accounts_clone = accounts.clone();
204 let response_validation_fn = move |_request: &Request, response: Response| {
205 let accounts_clone = accounts_clone.clone();
207
208 async move {
209 let Response::Accounts(fee_merkle_tree) = response else {
211 return Err(anyhow::anyhow!("expected accounts response"));
212 };
213
214 let mut proofs = Vec::new();
216 for account in accounts_clone {
217 let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree, account.into())
218 .with_context(|| format!("response was missing account {account}"))?;
219 proof.verify(&fee_merkle_tree_root).with_context(|| {
220 format!(
221 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
222 )
223 })?;
224 proofs.push(proof);
225 }
226
227 Ok(proofs)
228 }
229 };
230
231 let response = self
233 .request_indefinitely(
234 Request::Accounts(height, *view, accounts),
235 RequestType::Batched,
236 response_validation_fn,
237 )
238 .await
239 .with_context(|| "failed to request accounts")?;
240
241 tracing::info!("Fetched accounts for height: {height}, view: {view}");
242
243 Ok(response)
244 }
245
246 async fn fetch_leaf(
247 &self,
248 height: u64,
249 stake_table: HSStakeTable<SeqTypes>,
250 success_threshold: U256,
251 ) -> anyhow::Result<Leaf2> {
252 tracing::info!("Fetching leaf for height: {height}");
253
254 let stake_table_clone = stake_table.clone();
256 let response_validation_fn = move |_request: &Request, response: Response| {
257 let stake_table_clone = stake_table_clone.clone();
259
260 async move {
261 let Response::Leaf(leaf_chain) = response else {
263 return Err(anyhow::anyhow!("expected leaf response"));
264 };
265
266 let leaf = verify_leaf_chain(
268 leaf_chain,
269 &stake_table_clone,
270 success_threshold,
271 height,
272 &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
273 )
274 .await
275 .with_context(|| "leaf chain verification failed")?;
276
277 Ok(leaf)
278 }
279 };
280
281 let response = self
283 .request_indefinitely(
284 Request::Leaf(height),
285 RequestType::Batched,
286 response_validation_fn,
287 )
288 .await
289 .with_context(|| "failed to request leaf")?;
290
291 tracing::info!("Fetched leaf for height: {height}");
292
293 Ok(response)
294 }
295
296 async fn fetch_chain_config(
297 &self,
298 commitment: Commitment<ChainConfig>,
299 ) -> anyhow::Result<ChainConfig> {
300 tracing::info!("Fetching chain config with commitment: {commitment}");
301
302 let response_validation_fn = move |_request: &Request, response: Response| {
304 async move {
305 let Response::ChainConfig(chain_config) = response else {
307 return Err(anyhow::anyhow!("expected chain config response"));
308 };
309
310 if commitment != chain_config.commit() {
312 return Err(anyhow::anyhow!("chain config commitment mismatch"));
313 }
314
315 Ok(chain_config)
316 }
317 };
318
319 let response = self
321 .request_indefinitely(
322 Request::ChainConfig(commitment),
323 RequestType::Batched,
324 response_validation_fn,
325 )
326 .await
327 .with_context(|| "failed to request chain config")?;
328
329 tracing::info!("Fetched chain config with commitment: {commitment}");
330
331 Ok(response)
332 }
333
334 async fn remember_blocks_merkle_tree(
335 &self,
336 _instance: &NodeState,
337 height: u64,
338 view: ViewNumber,
339 mt: &mut BlockMerkleTree,
340 ) -> anyhow::Result<()> {
341 tracing::info!("Fetching blocks frontier for height: {height}, view: {view}");
342
343 let mt_clone = mt.clone();
345
346 let response_validation_fn = move |_request: &Request, response: Response| {
348 let mut block_merkle_tree = mt_clone.clone();
350
351 async move {
352 let Response::BlocksFrontier(blocks_frontier) = response else {
354 return Err(anyhow::anyhow!("expected blocks frontier response"));
355 };
356
357 let leaf_elem = blocks_frontier
359 .elem()
360 .with_context(|| "provided frontier is missing leaf element")?;
361
362 block_merkle_tree
364 .remember(
365 block_merkle_tree.num_leaves() - 1,
366 *leaf_elem,
367 blocks_frontier,
368 )
369 .with_context(|| "merkle tree verification failed")?;
370
371 Ok(block_merkle_tree)
373 }
374 };
375
376 let response = self
378 .request_indefinitely(
379 Request::BlocksFrontier(height, *view),
380 RequestType::Batched,
381 response_validation_fn,
382 )
383 .await
384 .with_context(|| "failed to request blocks frontier")?;
385
386 *mt = response;
388
389 tracing::info!("Fetched blocks frontier for height: {height}, view: {view}");
390
391 Ok(())
392 }
393
394 async fn fetch_reward_accounts_v2(
395 &self,
396 _instance: &NodeState,
397 height: u64,
398 view: ViewNumber,
399 reward_merkle_tree_root: RewardMerkleCommitmentV2,
400 accounts: Vec<RewardAccountV2>,
401 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
402 tracing::info!("Fetching reward accounts for height: {height}, view: {view}");
403
404 let accounts_clone = accounts.clone();
406
407 let response_validation_fn = move |_request: &Request, response: Response| {
409 let accounts_clone = accounts_clone.clone();
411
412 async move {
413 let Response::RewardAccountsV2(reward_merkle_tree) = response else {
415 return Err(anyhow::anyhow!("expected reward accounts response"));
416 };
417
418 let mut proofs = Vec::new();
420 for account in accounts_clone {
421 let (proof, _) =
422 RewardAccountProofV2::prove(&reward_merkle_tree, account.into())
423 .with_context(|| format!("response was missing account {account}"))?;
424 proof.verify(&reward_merkle_tree_root).with_context(|| {
425 format!(
426 "invalid proof for v2 reward account {account}, root: \
427 {reward_merkle_tree_root}"
428 )
429 })?;
430 proofs.push(proof);
431 }
432
433 Ok(proofs)
434 }
435 };
436
437 let response = self
439 .request_indefinitely(
440 Request::RewardAccountsV2(height, *view, accounts),
441 RequestType::Batched,
442 response_validation_fn,
443 )
444 .await
445 .with_context(|| "failed to request reward accounts")?;
446
447 tracing::info!("Fetched reward accounts for height: {height}, view: {view}");
448
449 Ok(response)
450 }
451
452 async fn fetch_reward_accounts_v1(
453 &self,
454 _instance: &NodeState,
455 height: u64,
456 view: ViewNumber,
457 reward_merkle_tree_root: RewardMerkleCommitmentV1,
458 accounts: Vec<RewardAccountV1>,
459 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
460 tracing::info!("Fetching v1 reward accounts for height: {height}, view: {view}");
461
462 let accounts_clone = accounts.clone();
464
465 let response_validation_fn = move |_request: &Request, response: Response| {
467 let accounts_clone = accounts_clone.clone();
469
470 async move {
471 let Response::RewardAccountsV1(reward_merkle_tree) = response else {
473 return Err(anyhow::anyhow!("expected v1 reward accounts response"));
474 };
475
476 let mut proofs = Vec::new();
478 for account in accounts_clone {
479 let (proof, _) =
480 RewardAccountProofV1::prove(&reward_merkle_tree, account.into())
481 .with_context(|| format!("response was missing account {account}"))?;
482 proof.verify(&reward_merkle_tree_root).with_context(|| {
483 format!(
484 "invalid proof for v1 reward account {account}, root: \
485 {reward_merkle_tree_root}"
486 )
487 })?;
488 proofs.push(proof);
489 }
490
491 Ok(proofs)
492 }
493 };
494
495 let response = self
497 .request_indefinitely(
498 Request::RewardAccountsV1(height, *view, accounts),
499 RequestType::Batched,
500 response_validation_fn,
501 )
502 .await
503 .with_context(|| "failed to request v1 reward accounts")?;
504
505 tracing::info!("Fetched v1 reward accounts for height: {height}, view: {view}");
506
507 Ok(response)
508 }
509
510 async fn fetch_state_cert(
511 &self,
512 epoch: u64,
513 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
514 tracing::info!("Fetching state cert for epoch: {epoch}");
515
516 let response_validation_fn = move |_request: &Request, response: Response| async move {
518 let Response::StateCert(state_cert) = response else {
520 return Err(anyhow::anyhow!("expected state cert response"));
521 };
522
523 Ok(state_cert)
524 };
525
526 let response = self
528 .request_indefinitely(
529 Request::StateCert(epoch),
530 RequestType::Batched,
531 response_validation_fn,
532 )
533 .await
534 .with_context(|| "failed to request state cert")?;
535
536 tracing::info!("Fetched state cert for epoch: {epoch}");
537
538 Ok(response)
539 }
540
541 fn is_local(&self) -> bool {
542 false
543 }
544}