sequencer/request_response/catchup/
state.rs

1use alloy::primitives::U256;
2use anyhow::Context;
3use async_trait::async_trait;
4use committable::{Commitment, Committable};
5use espresso_types::{
6    traits::{SequencerPersistence, StateCatchup},
7    v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1},
8    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
9    BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
10    Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions,
11};
12use hotshot::traits::NodeImplementation;
13use hotshot_types::{
14    data::ViewNumber,
15    message::UpgradeLock,
16    simple_certificate::LightClientStateUpdateCertificateV2,
17    stake_table::HSStakeTable,
18    traits::{network::ConnectedNetwork, node_implementation::Versions},
19    utils::verify_leaf_chain,
20};
21use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme};
22use request_response::RequestType;
23use tokio::time::timeout;
24
25use crate::request_response::{
26    request::{Request, Response},
27    RequestResponseProtocol,
28};
29
30#[async_trait]
31impl<
32        I: NodeImplementation<SeqTypes>,
33        V: Versions,
34        N: ConnectedNetwork<PubKey>,
35        P: SequencerPersistence,
36    > StateCatchup for RequestResponseProtocol<I, V, N, P>
37{
38    async fn try_fetch_leaf(
39        &self,
40        _retry: usize,
41        height: u64,
42        stake_table: HSStakeTable<SeqTypes>,
43        success_threshold: U256,
44    ) -> anyhow::Result<Leaf2> {
45        // Timeout after a few batches
46        let timeout_duration = self.config.request_batch_interval * 3;
47
48        // Fetch the leaf
49        timeout(
50            timeout_duration,
51            self.fetch_leaf(height, stake_table, success_threshold),
52        )
53        .await
54        .with_context(|| "timed out while fetching leaf")?
55    }
56
57    async fn try_fetch_accounts(
58        &self,
59        _retry: usize,
60        instance: &NodeState,
61        height: u64,
62        view: ViewNumber,
63        fee_merkle_tree_root: FeeMerkleCommitment,
64        accounts: &[FeeAccount],
65    ) -> anyhow::Result<Vec<FeeAccountProof>> {
66        // Timeout after a few batches
67        let timeout_duration = self.config.request_batch_interval * 3;
68
69        // Fetch the accounts
70        timeout(
71            timeout_duration,
72            self.fetch_accounts(
73                instance,
74                height,
75                view,
76                fee_merkle_tree_root,
77                accounts.to_vec(),
78            ),
79        )
80        .await
81        .with_context(|| "timed out while fetching accounts")?
82    }
83
84    async fn try_remember_blocks_merkle_tree(
85        &self,
86        _retry: usize,
87        instance: &NodeState,
88        height: u64,
89        view: ViewNumber,
90        mt: &mut BlockMerkleTree,
91    ) -> anyhow::Result<()> {
92        // Timeout after a few batches
93        let timeout_duration = self.config.request_batch_interval * 3;
94
95        // Remember the blocks merkle tree
96        timeout(
97            timeout_duration,
98            self.remember_blocks_merkle_tree(instance, height, view, mt),
99        )
100        .await
101        .with_context(|| "timed out while remembering blocks merkle tree")?
102    }
103
104    async fn try_fetch_chain_config(
105        &self,
106        _retry: usize,
107        commitment: Commitment<ChainConfig>,
108    ) -> anyhow::Result<ChainConfig> {
109        // Timeout after a few batches
110        let timeout_duration = self.config.request_batch_interval * 3;
111
112        // Fetch the chain config
113        timeout(timeout_duration, self.fetch_chain_config(commitment))
114            .await
115            .with_context(|| "timed out while fetching chain config")?
116    }
117
118    async fn try_fetch_reward_accounts_v2(
119        &self,
120        _retry: usize,
121        instance: &NodeState,
122        height: u64,
123        view: ViewNumber,
124        reward_merkle_tree_root: RewardMerkleCommitmentV2,
125        accounts: &[RewardAccountV2],
126    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
127        // Timeout after a few batches
128        let timeout_duration = self.config.request_batch_interval * 3;
129
130        // Fetch the reward accounts
131        timeout(
132            timeout_duration,
133            self.fetch_reward_accounts_v2(
134                instance,
135                height,
136                view,
137                reward_merkle_tree_root,
138                accounts.to_vec(),
139            ),
140        )
141        .await
142        .with_context(|| "timed out while fetching reward accounts")?
143    }
144
145    async fn try_fetch_reward_accounts_v1(
146        &self,
147        _retry: usize,
148        instance: &NodeState,
149        height: u64,
150        view: ViewNumber,
151        reward_merkle_tree_root: RewardMerkleCommitmentV1,
152        accounts: &[RewardAccountV1],
153    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
154        // Timeout after a few batches
155        let timeout_duration = self.config.request_batch_interval * 3;
156
157        // Fetch the reward accounts
158        timeout(
159            timeout_duration,
160            self.fetch_reward_accounts_v1(
161                instance,
162                height,
163                view,
164                reward_merkle_tree_root,
165                accounts.to_vec(),
166            ),
167        )
168        .await
169        .with_context(|| "timed out while fetching reward accounts")?
170    }
171
172    async fn try_fetch_state_cert(
173        &self,
174        _retry: usize,
175        epoch: u64,
176    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
177        let timeout_duration = self.config.request_batch_interval * 3;
178
179        timeout(timeout_duration, self.fetch_state_cert(epoch))
180            .await
181            .with_context(|| "timed out while fetching state cert")?
182    }
183
184    fn backoff(&self) -> &BackoffParams {
185        unreachable!()
186    }
187
188    fn name(&self) -> String {
189        "request-response".to_string()
190    }
191
192    async fn fetch_accounts(
193        &self,
194        _instance: &NodeState,
195        height: u64,
196        view: ViewNumber,
197        fee_merkle_tree_root: FeeMerkleCommitment,
198        accounts: Vec<FeeAccount>,
199    ) -> anyhow::Result<Vec<FeeAccountProof>> {
200        tracing::info!("Fetching accounts for height: {height}, view: {view}");
201
202        // Clone things we need in the first closure
203        let accounts_clone = accounts.clone();
204        let response_validation_fn = move |_request: &Request, response: Response| {
205            // Clone again
206            let accounts_clone = accounts_clone.clone();
207
208            async move {
209                // Make sure the response is an accounts response
210                let Response::Accounts(fee_merkle_tree) = response else {
211                    return Err(anyhow::anyhow!("expected accounts response"));
212                };
213
214                // Verify the merkle proofs
215                let mut proofs = Vec::new();
216                for account in accounts_clone {
217                    let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree, account.into())
218                        .with_context(|| format!("response was missing account {account}"))?;
219                    proof.verify(&fee_merkle_tree_root).with_context(|| {
220                        format!(
221                            "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
222                        )
223                    })?;
224                    proofs.push(proof);
225                }
226
227                Ok(proofs)
228            }
229        };
230
231        // Wait for the protocol to send us the accounts
232        let response = self
233            .request_indefinitely(
234                Request::Accounts(height, *view, accounts),
235                RequestType::Batched,
236                response_validation_fn,
237            )
238            .await
239            .with_context(|| "failed to request accounts")?;
240
241        tracing::info!("Fetched accounts for height: {height}, view: {view}");
242
243        Ok(response)
244    }
245
246    async fn fetch_leaf(
247        &self,
248        height: u64,
249        stake_table: HSStakeTable<SeqTypes>,
250        success_threshold: U256,
251    ) -> anyhow::Result<Leaf2> {
252        tracing::info!("Fetching leaf for height: {height}");
253
254        // Clone things we need in the first closure
255        let stake_table_clone = stake_table.clone();
256        let response_validation_fn = move |_request: &Request, response: Response| {
257            // Clone again
258            let stake_table_clone = stake_table_clone.clone();
259
260            async move {
261                // Make sure the response is a leaf response
262                let Response::Leaf(leaf_chain) = response else {
263                    return Err(anyhow::anyhow!("expected leaf response"));
264                };
265
266                // Verify the leaf chain
267                let leaf = verify_leaf_chain(
268                    leaf_chain,
269                    &stake_table_clone,
270                    success_threshold,
271                    height,
272                    &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
273                )
274                .await
275                .with_context(|| "leaf chain verification failed")?;
276
277                Ok(leaf)
278            }
279        };
280
281        // Wait for the protocol to send us the accounts
282        let response = self
283            .request_indefinitely(
284                Request::Leaf(height),
285                RequestType::Batched,
286                response_validation_fn,
287            )
288            .await
289            .with_context(|| "failed to request leaf")?;
290
291        tracing::info!("Fetched leaf for height: {height}");
292
293        Ok(response)
294    }
295
296    async fn fetch_chain_config(
297        &self,
298        commitment: Commitment<ChainConfig>,
299    ) -> anyhow::Result<ChainConfig> {
300        tracing::info!("Fetching chain config with commitment: {commitment}");
301
302        // Create the response validation function
303        let response_validation_fn = move |_request: &Request, response: Response| {
304            async move {
305                // Make sure the response is a chain config response
306                let Response::ChainConfig(chain_config) = response else {
307                    return Err(anyhow::anyhow!("expected chain config response"));
308                };
309
310                // Make sure the commitments match
311                if commitment != chain_config.commit() {
312                    return Err(anyhow::anyhow!("chain config commitment mismatch"));
313                }
314
315                Ok(chain_config)
316            }
317        };
318
319        // Wait for the protocol to send us the chain config
320        let response = self
321            .request_indefinitely(
322                Request::ChainConfig(commitment),
323                RequestType::Batched,
324                response_validation_fn,
325            )
326            .await
327            .with_context(|| "failed to request chain config")?;
328
329        tracing::info!("Fetched chain config with commitment: {commitment}");
330
331        Ok(response)
332    }
333
334    async fn remember_blocks_merkle_tree(
335        &self,
336        _instance: &NodeState,
337        height: u64,
338        view: ViewNumber,
339        mt: &mut BlockMerkleTree,
340    ) -> anyhow::Result<()> {
341        tracing::info!("Fetching blocks frontier for height: {height}, view: {view}");
342
343        // Clone the merkle tree
344        let mt_clone = mt.clone();
345
346        // Create the response validation function
347        let response_validation_fn = move |_request: &Request, response: Response| {
348            // Clone the merkle tree
349            let mut block_merkle_tree = mt_clone.clone();
350
351            async move {
352                // Make sure the response is a blocks frontier response
353                let Response::BlocksFrontier(blocks_frontier) = response else {
354                    return Err(anyhow::anyhow!("expected blocks frontier response"));
355                };
356
357                // Get the leaf element associated with the proof
358                let leaf_elem = blocks_frontier
359                    .elem()
360                    .with_context(|| "provided frontier is missing leaf element")?;
361
362                // Verify the block proof
363                block_merkle_tree
364                    .remember(
365                        block_merkle_tree.num_leaves() - 1,
366                        *leaf_elem,
367                        blocks_frontier,
368                    )
369                    .with_context(|| "merkle tree verification failed")?;
370
371                // Return the verified merkle tree
372                Ok(block_merkle_tree)
373            }
374        };
375
376        // Wait for the protocol to send us the blocks frontier
377        let response = self
378            .request_indefinitely(
379                Request::BlocksFrontier(height, *view),
380                RequestType::Batched,
381                response_validation_fn,
382            )
383            .await
384            .with_context(|| "failed to request blocks frontier")?;
385
386        // Replace the merkle tree
387        *mt = response;
388
389        tracing::info!("Fetched blocks frontier for height: {height}, view: {view}");
390
391        Ok(())
392    }
393
394    async fn fetch_reward_accounts_v2(
395        &self,
396        _instance: &NodeState,
397        height: u64,
398        view: ViewNumber,
399        reward_merkle_tree_root: RewardMerkleCommitmentV2,
400        accounts: Vec<RewardAccountV2>,
401    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
402        tracing::info!("Fetching reward accounts for height: {height}, view: {view}");
403
404        // Clone things we need in the first closure
405        let accounts_clone = accounts.clone();
406
407        // Create the response validation function
408        let response_validation_fn = move |_request: &Request, response: Response| {
409            // Clone again
410            let accounts_clone = accounts_clone.clone();
411
412            async move {
413                // Make sure the response is a reward accounts response
414                let Response::RewardAccountsV2(reward_merkle_tree) = response else {
415                    return Err(anyhow::anyhow!("expected reward accounts response"));
416                };
417
418                // Verify the merkle proofs
419                let mut proofs = Vec::new();
420                for account in accounts_clone {
421                    let (proof, _) =
422                        RewardAccountProofV2::prove(&reward_merkle_tree, account.into())
423                            .with_context(|| format!("response was missing account {account}"))?;
424                    proof.verify(&reward_merkle_tree_root).with_context(|| {
425                        format!(
426                            "invalid proof for v2 reward account {account}, root: \
427                             {reward_merkle_tree_root}"
428                        )
429                    })?;
430                    proofs.push(proof);
431                }
432
433                Ok(proofs)
434            }
435        };
436
437        // Wait for the protocol to send us the reward accounts
438        let response = self
439            .request_indefinitely(
440                Request::RewardAccountsV2(height, *view, accounts),
441                RequestType::Batched,
442                response_validation_fn,
443            )
444            .await
445            .with_context(|| "failed to request reward accounts")?;
446
447        tracing::info!("Fetched reward accounts for height: {height}, view: {view}");
448
449        Ok(response)
450    }
451
452    async fn fetch_reward_accounts_v1(
453        &self,
454        _instance: &NodeState,
455        height: u64,
456        view: ViewNumber,
457        reward_merkle_tree_root: RewardMerkleCommitmentV1,
458        accounts: Vec<RewardAccountV1>,
459    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
460        tracing::info!("Fetching v1 reward accounts for height: {height}, view: {view}");
461
462        // Clone things we need in the first closure
463        let accounts_clone = accounts.clone();
464
465        // Create the response validation function
466        let response_validation_fn = move |_request: &Request, response: Response| {
467            // Clone again
468            let accounts_clone = accounts_clone.clone();
469
470            async move {
471                // Make sure the response is a reward accounts response
472                let Response::RewardAccountsV1(reward_merkle_tree) = response else {
473                    return Err(anyhow::anyhow!("expected v1 reward accounts response"));
474                };
475
476                // Verify the merkle proofs
477                let mut proofs = Vec::new();
478                for account in accounts_clone {
479                    let (proof, _) =
480                        RewardAccountProofV1::prove(&reward_merkle_tree, account.into())
481                            .with_context(|| format!("response was missing account {account}"))?;
482                    proof.verify(&reward_merkle_tree_root).with_context(|| {
483                        format!(
484                            "invalid proof for v1 reward account {account}, root: \
485                             {reward_merkle_tree_root}"
486                        )
487                    })?;
488                    proofs.push(proof);
489                }
490
491                Ok(proofs)
492            }
493        };
494
495        // Wait for the protocol to send us the reward accounts
496        let response = self
497            .request_indefinitely(
498                Request::RewardAccountsV1(height, *view, accounts),
499                RequestType::Batched,
500                response_validation_fn,
501            )
502            .await
503            .with_context(|| "failed to request v1 reward accounts")?;
504
505        tracing::info!("Fetched v1 reward accounts for height: {height}, view: {view}");
506
507        Ok(response)
508    }
509
510    async fn fetch_state_cert(
511        &self,
512        epoch: u64,
513    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
514        tracing::info!("Fetching state cert for epoch: {epoch}");
515
516        // Create the response validation function
517        let response_validation_fn = move |_request: &Request, response: Response| async move {
518            // Make sure the response is a state cert response
519            let Response::StateCert(state_cert) = response else {
520                return Err(anyhow::anyhow!("expected state cert response"));
521            };
522
523            Ok(state_cert)
524        };
525
526        // Wait for the protocol to send us the state cert
527        let response = self
528            .request_indefinitely(
529                Request::StateCert(epoch),
530                RequestType::Batched,
531                response_validation_fn,
532            )
533            .await
534            .with_context(|| "failed to request state cert")?;
535
536        tracing::info!("Fetched state cert for epoch: {epoch}");
537
538        Ok(response)
539    }
540
541    fn is_local(&self) -> bool {
542        false
543    }
544}