1use alloy::primitives::U256;
2use anyhow::Context;
3use async_trait::async_trait;
4use committable::{Commitment, Committable};
5use espresso_types::{
6 traits::{SequencerPersistence, StateCatchup},
7 v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1},
8 v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
9 BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
10 Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions,
11};
12use hotshot::traits::NodeImplementation;
13use hotshot_types::{
14 data::ViewNumber,
15 message::UpgradeLock,
16 stake_table::HSStakeTable,
17 traits::{network::ConnectedNetwork, node_implementation::Versions},
18 utils::verify_leaf_chain,
19};
20use jf_merkle_tree::{ForgetableMerkleTreeScheme, MerkleTreeScheme};
21use request_response::RequestType;
22use tokio::time::timeout;
23
24use crate::request_response::{
25 request::{Request, Response},
26 RequestResponseProtocol,
27};
28
29#[async_trait]
30impl<
31 I: NodeImplementation<SeqTypes>,
32 V: Versions,
33 N: ConnectedNetwork<PubKey>,
34 P: SequencerPersistence,
35 > StateCatchup for RequestResponseProtocol<I, V, N, P>
36{
37 async fn try_fetch_leaf(
38 &self,
39 _retry: usize,
40 height: u64,
41 stake_table: HSStakeTable<SeqTypes>,
42 success_threshold: U256,
43 ) -> anyhow::Result<Leaf2> {
44 let timeout_duration = self.config.request_batch_interval * 3;
46
47 timeout(
49 timeout_duration,
50 self.fetch_leaf(height, stake_table, success_threshold),
51 )
52 .await
53 .with_context(|| "timed out while fetching leaf")?
54 }
55
56 async fn try_fetch_accounts(
57 &self,
58 _retry: usize,
59 instance: &NodeState,
60 height: u64,
61 view: ViewNumber,
62 fee_merkle_tree_root: FeeMerkleCommitment,
63 accounts: &[FeeAccount],
64 ) -> anyhow::Result<Vec<FeeAccountProof>> {
65 let timeout_duration = self.config.request_batch_interval * 3;
67
68 timeout(
70 timeout_duration,
71 self.fetch_accounts(
72 instance,
73 height,
74 view,
75 fee_merkle_tree_root,
76 accounts.to_vec(),
77 ),
78 )
79 .await
80 .with_context(|| "timed out while fetching accounts")?
81 }
82
83 async fn try_remember_blocks_merkle_tree(
84 &self,
85 _retry: usize,
86 instance: &NodeState,
87 height: u64,
88 view: ViewNumber,
89 mt: &mut BlockMerkleTree,
90 ) -> anyhow::Result<()> {
91 let timeout_duration = self.config.request_batch_interval * 3;
93
94 timeout(
96 timeout_duration,
97 self.remember_blocks_merkle_tree(instance, height, view, mt),
98 )
99 .await
100 .with_context(|| "timed out while remembering blocks merkle tree")?
101 }
102
103 async fn try_fetch_chain_config(
104 &self,
105 _retry: usize,
106 commitment: Commitment<ChainConfig>,
107 ) -> anyhow::Result<ChainConfig> {
108 let timeout_duration = self.config.request_batch_interval * 3;
110
111 timeout(timeout_duration, self.fetch_chain_config(commitment))
113 .await
114 .with_context(|| "timed out while fetching chain config")?
115 }
116
117 async fn try_fetch_reward_accounts_v2(
118 &self,
119 _retry: usize,
120 instance: &NodeState,
121 height: u64,
122 view: ViewNumber,
123 reward_merkle_tree_root: RewardMerkleCommitmentV2,
124 accounts: &[RewardAccountV2],
125 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
126 let timeout_duration = self.config.request_batch_interval * 3;
128
129 timeout(
131 timeout_duration,
132 self.fetch_reward_accounts_v2(
133 instance,
134 height,
135 view,
136 reward_merkle_tree_root,
137 accounts.to_vec(),
138 ),
139 )
140 .await
141 .with_context(|| "timed out while fetching reward accounts")?
142 }
143
144 async fn try_fetch_reward_accounts_v1(
145 &self,
146 _retry: usize,
147 instance: &NodeState,
148 height: u64,
149 view: ViewNumber,
150 reward_merkle_tree_root: RewardMerkleCommitmentV1,
151 accounts: &[RewardAccountV1],
152 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
153 let timeout_duration = self.config.request_batch_interval * 3;
155
156 timeout(
158 timeout_duration,
159 self.fetch_reward_accounts_v1(
160 instance,
161 height,
162 view,
163 reward_merkle_tree_root,
164 accounts.to_vec(),
165 ),
166 )
167 .await
168 .with_context(|| "timed out while fetching reward accounts")?
169 }
170
171 fn backoff(&self) -> &BackoffParams {
172 unreachable!()
173 }
174
175 fn name(&self) -> String {
176 "request-response".to_string()
177 }
178
179 async fn fetch_accounts(
180 &self,
181 _instance: &NodeState,
182 height: u64,
183 view: ViewNumber,
184 fee_merkle_tree_root: FeeMerkleCommitment,
185 accounts: Vec<FeeAccount>,
186 ) -> anyhow::Result<Vec<FeeAccountProof>> {
187 tracing::info!("Fetching accounts for height: {height}, view: {view}");
188
189 let accounts_clone = accounts.clone();
191 let response_validation_fn = move |_request: &Request, response: Response| {
192 let accounts_clone = accounts_clone.clone();
194
195 async move {
196 let Response::Accounts(fee_merkle_tree) = response else {
198 return Err(anyhow::anyhow!("expected accounts response"));
199 };
200
201 let mut proofs = Vec::new();
203 for account in accounts_clone {
204 let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree, account.into())
205 .with_context(|| format!("response was missing account {account}"))?;
206 proof
207 .verify(&fee_merkle_tree_root)
208 .with_context(|| format!("invalid proof for account {account}"))?;
209 proofs.push(proof);
210 }
211
212 Ok(proofs)
213 }
214 };
215
216 let response = self
218 .request_indefinitely(
219 Request::Accounts(height, *view, accounts),
220 RequestType::Batched,
221 response_validation_fn,
222 )
223 .await
224 .with_context(|| "failed to request accounts")?;
225
226 tracing::info!("Fetched accounts for height: {height}, view: {view}");
227
228 Ok(response)
229 }
230
231 async fn fetch_leaf(
232 &self,
233 height: u64,
234 stake_table: HSStakeTable<SeqTypes>,
235 success_threshold: U256,
236 ) -> anyhow::Result<Leaf2> {
237 tracing::info!("Fetching leaf for height: {height}");
238
239 let stake_table_clone = stake_table.clone();
241 let response_validation_fn = move |_request: &Request, response: Response| {
242 let stake_table_clone = stake_table_clone.clone();
244
245 async move {
246 let Response::Leaf(leaf_chain) = response else {
248 return Err(anyhow::anyhow!("expected leaf response"));
249 };
250
251 let leaf = verify_leaf_chain(
253 leaf_chain,
254 &stake_table_clone,
255 success_threshold,
256 height,
257 &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
258 )
259 .await
260 .with_context(|| "leaf chain verification failed")?;
261
262 Ok(leaf)
263 }
264 };
265
266 let response = self
268 .request_indefinitely(
269 Request::Leaf(height),
270 RequestType::Batched,
271 response_validation_fn,
272 )
273 .await
274 .with_context(|| "failed to request leaf")?;
275
276 tracing::info!("Fetched leaf for height: {height}");
277
278 Ok(response)
279 }
280
281 async fn fetch_chain_config(
282 &self,
283 commitment: Commitment<ChainConfig>,
284 ) -> anyhow::Result<ChainConfig> {
285 tracing::info!("Fetching chain config with commitment: {commitment}");
286
287 let response_validation_fn = move |_request: &Request, response: Response| {
289 async move {
290 let Response::ChainConfig(chain_config) = response else {
292 return Err(anyhow::anyhow!("expected chain config response"));
293 };
294
295 if commitment != chain_config.commit() {
297 return Err(anyhow::anyhow!("chain config commitment mismatch"));
298 }
299
300 Ok(chain_config)
301 }
302 };
303
304 let response = self
306 .request_indefinitely(
307 Request::ChainConfig(commitment),
308 RequestType::Batched,
309 response_validation_fn,
310 )
311 .await
312 .with_context(|| "failed to request chain config")?;
313
314 tracing::info!("Fetched chain config with commitment: {commitment}");
315
316 Ok(response)
317 }
318
319 async fn remember_blocks_merkle_tree(
320 &self,
321 _instance: &NodeState,
322 height: u64,
323 view: ViewNumber,
324 mt: &mut BlockMerkleTree,
325 ) -> anyhow::Result<()> {
326 tracing::info!("Fetching blocks frontier for height: {height}, view: {view}");
327
328 let mt_clone = mt.clone();
330
331 let response_validation_fn = move |_request: &Request, response: Response| {
333 let mut block_merkle_tree = mt_clone.clone();
335
336 async move {
337 let Response::BlocksFrontier(blocks_frontier) = response else {
339 return Err(anyhow::anyhow!("expected blocks frontier response"));
340 };
341
342 let leaf_elem = blocks_frontier
344 .elem()
345 .with_context(|| "provided frontier is missing leaf element")?;
346
347 block_merkle_tree
349 .remember(
350 block_merkle_tree.num_leaves() - 1,
351 *leaf_elem,
352 blocks_frontier,
353 )
354 .with_context(|| "merkle tree verification failed")?;
355
356 Ok(block_merkle_tree)
358 }
359 };
360
361 let response = self
363 .request_indefinitely(
364 Request::BlocksFrontier(height, *view),
365 RequestType::Batched,
366 response_validation_fn,
367 )
368 .await
369 .with_context(|| "failed to request blocks frontier")?;
370
371 *mt = response;
373
374 tracing::info!("Fetched blocks frontier for height: {height}, view: {view}");
375
376 Ok(())
377 }
378
379 async fn fetch_reward_accounts_v2(
380 &self,
381 _instance: &NodeState,
382 height: u64,
383 view: ViewNumber,
384 reward_merkle_tree_root: RewardMerkleCommitmentV2,
385 accounts: Vec<RewardAccountV2>,
386 ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
387 tracing::info!("Fetching reward accounts for height: {height}, view: {view}");
388
389 let accounts_clone = accounts.clone();
391
392 let response_validation_fn = move |_request: &Request, response: Response| {
394 let accounts_clone = accounts_clone.clone();
396
397 async move {
398 let Response::RewardAccountsV2(reward_merkle_tree) = response else {
400 return Err(anyhow::anyhow!("expected reward accounts response"));
401 };
402
403 let mut proofs = Vec::new();
405 for account in accounts_clone {
406 let (proof, _) =
407 RewardAccountProofV2::prove(&reward_merkle_tree, account.into())
408 .with_context(|| format!("response was missing account {account}"))?;
409 proof
410 .verify(&reward_merkle_tree_root)
411 .with_context(|| format!("invalid proof for account {account}"))?;
412 proofs.push(proof);
413 }
414
415 Ok(proofs)
416 }
417 };
418
419 let response = self
421 .request_indefinitely(
422 Request::RewardAccountsV2(height, *view, accounts),
423 RequestType::Batched,
424 response_validation_fn,
425 )
426 .await
427 .with_context(|| "failed to request reward accounts")?;
428
429 tracing::info!("Fetched reward accounts for height: {height}, view: {view}");
430
431 Ok(response)
432 }
433
434 async fn fetch_reward_accounts_v1(
435 &self,
436 _instance: &NodeState,
437 height: u64,
438 view: ViewNumber,
439 reward_merkle_tree_root: RewardMerkleCommitmentV1,
440 accounts: Vec<RewardAccountV1>,
441 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
442 tracing::info!("Fetching v1 reward accounts for height: {height}, view: {view}");
443
444 let accounts_clone = accounts.clone();
446
447 let response_validation_fn = move |_request: &Request, response: Response| {
449 let accounts_clone = accounts_clone.clone();
451
452 async move {
453 let Response::RewardAccountsV1(reward_merkle_tree) = response else {
455 return Err(anyhow::anyhow!("expected v1 reward accounts response"));
456 };
457
458 let mut proofs = Vec::new();
460 for account in accounts_clone {
461 let (proof, _) =
462 RewardAccountProofV1::prove(&reward_merkle_tree, account.into())
463 .with_context(|| format!("response was missing account {account}"))?;
464 proof.verify(&reward_merkle_tree_root).with_context(|| {
465 format!("invalid proof for v1 reward account {account}")
466 })?;
467 proofs.push(proof);
468 }
469
470 Ok(proofs)
471 }
472 };
473
474 let response = self
476 .request_indefinitely(
477 Request::RewardAccountsV1(height, *view, accounts),
478 RequestType::Batched,
479 response_validation_fn,
480 )
481 .await
482 .with_context(|| "failed to request v1 reward accounts")?;
483
484 tracing::info!("Fetched v1 reward accounts for height: {height}, view: {view}");
485
486 Ok(response)
487 }
488
489 fn is_local(&self) -> bool {
490 false
491 }
492}