sequencer/request_response/catchup/
state.rs

1use alloy::primitives::U256;
2use anyhow::Context;
3use async_trait::async_trait;
4use committable::{Commitment, Committable};
5use espresso_types::{
6    traits::{SequencerPersistence, StateCatchup},
7    v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1},
8    v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2},
9    BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
10    Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions,
11};
12use hotshot::traits::NodeImplementation;
13use hotshot_types::{
14    data::ViewNumber,
15    message::UpgradeLock,
16    stake_table::HSStakeTable,
17    traits::{network::ConnectedNetwork, node_implementation::Versions},
18    utils::verify_leaf_chain,
19};
20use jf_merkle_tree::{ForgetableMerkleTreeScheme, MerkleTreeScheme};
21use request_response::RequestType;
22use tokio::time::timeout;
23
24use crate::request_response::{
25    request::{Request, Response},
26    RequestResponseProtocol,
27};
28
29#[async_trait]
30impl<
31        I: NodeImplementation<SeqTypes>,
32        V: Versions,
33        N: ConnectedNetwork<PubKey>,
34        P: SequencerPersistence,
35    > StateCatchup for RequestResponseProtocol<I, V, N, P>
36{
37    async fn try_fetch_leaf(
38        &self,
39        _retry: usize,
40        height: u64,
41        stake_table: HSStakeTable<SeqTypes>,
42        success_threshold: U256,
43    ) -> anyhow::Result<Leaf2> {
44        // Timeout after a few batches
45        let timeout_duration = self.config.request_batch_interval * 3;
46
47        // Fetch the leaf
48        timeout(
49            timeout_duration,
50            self.fetch_leaf(height, stake_table, success_threshold),
51        )
52        .await
53        .with_context(|| "timed out while fetching leaf")?
54    }
55
56    async fn try_fetch_accounts(
57        &self,
58        _retry: usize,
59        instance: &NodeState,
60        height: u64,
61        view: ViewNumber,
62        fee_merkle_tree_root: FeeMerkleCommitment,
63        accounts: &[FeeAccount],
64    ) -> anyhow::Result<Vec<FeeAccountProof>> {
65        // Timeout after a few batches
66        let timeout_duration = self.config.request_batch_interval * 3;
67
68        // Fetch the accounts
69        timeout(
70            timeout_duration,
71            self.fetch_accounts(
72                instance,
73                height,
74                view,
75                fee_merkle_tree_root,
76                accounts.to_vec(),
77            ),
78        )
79        .await
80        .with_context(|| "timed out while fetching accounts")?
81    }
82
83    async fn try_remember_blocks_merkle_tree(
84        &self,
85        _retry: usize,
86        instance: &NodeState,
87        height: u64,
88        view: ViewNumber,
89        mt: &mut BlockMerkleTree,
90    ) -> anyhow::Result<()> {
91        // Timeout after a few batches
92        let timeout_duration = self.config.request_batch_interval * 3;
93
94        // Remember the blocks merkle tree
95        timeout(
96            timeout_duration,
97            self.remember_blocks_merkle_tree(instance, height, view, mt),
98        )
99        .await
100        .with_context(|| "timed out while remembering blocks merkle tree")?
101    }
102
103    async fn try_fetch_chain_config(
104        &self,
105        _retry: usize,
106        commitment: Commitment<ChainConfig>,
107    ) -> anyhow::Result<ChainConfig> {
108        // Timeout after a few batches
109        let timeout_duration = self.config.request_batch_interval * 3;
110
111        // Fetch the chain config
112        timeout(timeout_duration, self.fetch_chain_config(commitment))
113            .await
114            .with_context(|| "timed out while fetching chain config")?
115    }
116
117    async fn try_fetch_reward_accounts_v2(
118        &self,
119        _retry: usize,
120        instance: &NodeState,
121        height: u64,
122        view: ViewNumber,
123        reward_merkle_tree_root: RewardMerkleCommitmentV2,
124        accounts: &[RewardAccountV2],
125    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
126        // Timeout after a few batches
127        let timeout_duration = self.config.request_batch_interval * 3;
128
129        // Fetch the reward accounts
130        timeout(
131            timeout_duration,
132            self.fetch_reward_accounts_v2(
133                instance,
134                height,
135                view,
136                reward_merkle_tree_root,
137                accounts.to_vec(),
138            ),
139        )
140        .await
141        .with_context(|| "timed out while fetching reward accounts")?
142    }
143
144    async fn try_fetch_reward_accounts_v1(
145        &self,
146        _retry: usize,
147        instance: &NodeState,
148        height: u64,
149        view: ViewNumber,
150        reward_merkle_tree_root: RewardMerkleCommitmentV1,
151        accounts: &[RewardAccountV1],
152    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
153        // Timeout after a few batches
154        let timeout_duration = self.config.request_batch_interval * 3;
155
156        // Fetch the reward accounts
157        timeout(
158            timeout_duration,
159            self.fetch_reward_accounts_v1(
160                instance,
161                height,
162                view,
163                reward_merkle_tree_root,
164                accounts.to_vec(),
165            ),
166        )
167        .await
168        .with_context(|| "timed out while fetching reward accounts")?
169    }
170
171    fn backoff(&self) -> &BackoffParams {
172        unreachable!()
173    }
174
175    fn name(&self) -> String {
176        "request-response".to_string()
177    }
178
179    async fn fetch_accounts(
180        &self,
181        _instance: &NodeState,
182        height: u64,
183        view: ViewNumber,
184        fee_merkle_tree_root: FeeMerkleCommitment,
185        accounts: Vec<FeeAccount>,
186    ) -> anyhow::Result<Vec<FeeAccountProof>> {
187        tracing::info!("Fetching accounts for height: {height}, view: {view}");
188
189        // Clone things we need in the first closure
190        let accounts_clone = accounts.clone();
191        let response_validation_fn = move |_request: &Request, response: Response| {
192            // Clone again
193            let accounts_clone = accounts_clone.clone();
194
195            async move {
196                // Make sure the response is an accounts response
197                let Response::Accounts(fee_merkle_tree) = response else {
198                    return Err(anyhow::anyhow!("expected accounts response"));
199                };
200
201                // Verify the merkle proofs
202                let mut proofs = Vec::new();
203                for account in accounts_clone {
204                    let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree, account.into())
205                        .with_context(|| format!("response was missing account {account}"))?;
206                    proof
207                        .verify(&fee_merkle_tree_root)
208                        .with_context(|| format!("invalid proof for account {account}"))?;
209                    proofs.push(proof);
210                }
211
212                Ok(proofs)
213            }
214        };
215
216        // Wait for the protocol to send us the accounts
217        let response = self
218            .request_indefinitely(
219                Request::Accounts(height, *view, accounts),
220                RequestType::Batched,
221                response_validation_fn,
222            )
223            .await
224            .with_context(|| "failed to request accounts")?;
225
226        tracing::info!("Fetched accounts for height: {height}, view: {view}");
227
228        Ok(response)
229    }
230
231    async fn fetch_leaf(
232        &self,
233        height: u64,
234        stake_table: HSStakeTable<SeqTypes>,
235        success_threshold: U256,
236    ) -> anyhow::Result<Leaf2> {
237        tracing::info!("Fetching leaf for height: {height}");
238
239        // Clone things we need in the first closure
240        let stake_table_clone = stake_table.clone();
241        let response_validation_fn = move |_request: &Request, response: Response| {
242            // Clone again
243            let stake_table_clone = stake_table_clone.clone();
244
245            async move {
246                // Make sure the response is a leaf response
247                let Response::Leaf(leaf_chain) = response else {
248                    return Err(anyhow::anyhow!("expected leaf response"));
249                };
250
251                // Verify the leaf chain
252                let leaf = verify_leaf_chain(
253                    leaf_chain,
254                    &stake_table_clone,
255                    success_threshold,
256                    height,
257                    &UpgradeLock::<SeqTypes, SequencerVersions<EpochVersion, EpochVersion>>::new(),
258                )
259                .await
260                .with_context(|| "leaf chain verification failed")?;
261
262                Ok(leaf)
263            }
264        };
265
266        // Wait for the protocol to send us the accounts
267        let response = self
268            .request_indefinitely(
269                Request::Leaf(height),
270                RequestType::Batched,
271                response_validation_fn,
272            )
273            .await
274            .with_context(|| "failed to request leaf")?;
275
276        tracing::info!("Fetched leaf for height: {height}");
277
278        Ok(response)
279    }
280
281    async fn fetch_chain_config(
282        &self,
283        commitment: Commitment<ChainConfig>,
284    ) -> anyhow::Result<ChainConfig> {
285        tracing::info!("Fetching chain config with commitment: {commitment}");
286
287        // Create the response validation function
288        let response_validation_fn = move |_request: &Request, response: Response| {
289            async move {
290                // Make sure the response is a chain config response
291                let Response::ChainConfig(chain_config) = response else {
292                    return Err(anyhow::anyhow!("expected chain config response"));
293                };
294
295                // Make sure the commitments match
296                if commitment != chain_config.commit() {
297                    return Err(anyhow::anyhow!("chain config commitment mismatch"));
298                }
299
300                Ok(chain_config)
301            }
302        };
303
304        // Wait for the protocol to send us the chain config
305        let response = self
306            .request_indefinitely(
307                Request::ChainConfig(commitment),
308                RequestType::Batched,
309                response_validation_fn,
310            )
311            .await
312            .with_context(|| "failed to request chain config")?;
313
314        tracing::info!("Fetched chain config with commitment: {commitment}");
315
316        Ok(response)
317    }
318
319    async fn remember_blocks_merkle_tree(
320        &self,
321        _instance: &NodeState,
322        height: u64,
323        view: ViewNumber,
324        mt: &mut BlockMerkleTree,
325    ) -> anyhow::Result<()> {
326        tracing::info!("Fetching blocks frontier for height: {height}, view: {view}");
327
328        // Clone the merkle tree
329        let mt_clone = mt.clone();
330
331        // Create the response validation function
332        let response_validation_fn = move |_request: &Request, response: Response| {
333            // Clone the merkle tree
334            let mut block_merkle_tree = mt_clone.clone();
335
336            async move {
337                // Make sure the response is a blocks frontier response
338                let Response::BlocksFrontier(blocks_frontier) = response else {
339                    return Err(anyhow::anyhow!("expected blocks frontier response"));
340                };
341
342                // Get the leaf element associated with the proof
343                let leaf_elem = blocks_frontier
344                    .elem()
345                    .with_context(|| "provided frontier is missing leaf element")?;
346
347                // Verify the block proof
348                block_merkle_tree
349                    .remember(
350                        block_merkle_tree.num_leaves() - 1,
351                        *leaf_elem,
352                        blocks_frontier,
353                    )
354                    .with_context(|| "merkle tree verification failed")?;
355
356                // Return the verified merkle tree
357                Ok(block_merkle_tree)
358            }
359        };
360
361        // Wait for the protocol to send us the blocks frontier
362        let response = self
363            .request_indefinitely(
364                Request::BlocksFrontier(height, *view),
365                RequestType::Batched,
366                response_validation_fn,
367            )
368            .await
369            .with_context(|| "failed to request blocks frontier")?;
370
371        // Replace the merkle tree
372        *mt = response;
373
374        tracing::info!("Fetched blocks frontier for height: {height}, view: {view}");
375
376        Ok(())
377    }
378
379    async fn fetch_reward_accounts_v2(
380        &self,
381        _instance: &NodeState,
382        height: u64,
383        view: ViewNumber,
384        reward_merkle_tree_root: RewardMerkleCommitmentV2,
385        accounts: Vec<RewardAccountV2>,
386    ) -> anyhow::Result<Vec<RewardAccountProofV2>> {
387        tracing::info!("Fetching reward accounts for height: {height}, view: {view}");
388
389        // Clone things we need in the first closure
390        let accounts_clone = accounts.clone();
391
392        // Create the response validation function
393        let response_validation_fn = move |_request: &Request, response: Response| {
394            // Clone again
395            let accounts_clone = accounts_clone.clone();
396
397            async move {
398                // Make sure the response is a reward accounts response
399                let Response::RewardAccountsV2(reward_merkle_tree) = response else {
400                    return Err(anyhow::anyhow!("expected reward accounts response"));
401                };
402
403                // Verify the merkle proofs
404                let mut proofs = Vec::new();
405                for account in accounts_clone {
406                    let (proof, _) =
407                        RewardAccountProofV2::prove(&reward_merkle_tree, account.into())
408                            .with_context(|| format!("response was missing account {account}"))?;
409                    proof
410                        .verify(&reward_merkle_tree_root)
411                        .with_context(|| format!("invalid proof for account {account}"))?;
412                    proofs.push(proof);
413                }
414
415                Ok(proofs)
416            }
417        };
418
419        // Wait for the protocol to send us the reward accounts
420        let response = self
421            .request_indefinitely(
422                Request::RewardAccountsV2(height, *view, accounts),
423                RequestType::Batched,
424                response_validation_fn,
425            )
426            .await
427            .with_context(|| "failed to request reward accounts")?;
428
429        tracing::info!("Fetched reward accounts for height: {height}, view: {view}");
430
431        Ok(response)
432    }
433
434    async fn fetch_reward_accounts_v1(
435        &self,
436        _instance: &NodeState,
437        height: u64,
438        view: ViewNumber,
439        reward_merkle_tree_root: RewardMerkleCommitmentV1,
440        accounts: Vec<RewardAccountV1>,
441    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
442        tracing::info!("Fetching v1 reward accounts for height: {height}, view: {view}");
443
444        // Clone things we need in the first closure
445        let accounts_clone = accounts.clone();
446
447        // Create the response validation function
448        let response_validation_fn = move |_request: &Request, response: Response| {
449            // Clone again
450            let accounts_clone = accounts_clone.clone();
451
452            async move {
453                // Make sure the response is a reward accounts response
454                let Response::RewardAccountsV1(reward_merkle_tree) = response else {
455                    return Err(anyhow::anyhow!("expected v1 reward accounts response"));
456                };
457
458                // Verify the merkle proofs
459                let mut proofs = Vec::new();
460                for account in accounts_clone {
461                    let (proof, _) =
462                        RewardAccountProofV1::prove(&reward_merkle_tree, account.into())
463                            .with_context(|| format!("response was missing account {account}"))?;
464                    proof.verify(&reward_merkle_tree_root).with_context(|| {
465                        format!("invalid proof for v1 reward account {account}")
466                    })?;
467                    proofs.push(proof);
468                }
469
470                Ok(proofs)
471            }
472        };
473
474        // Wait for the protocol to send us the reward accounts
475        let response = self
476            .request_indefinitely(
477                Request::RewardAccountsV1(height, *view, accounts),
478                RequestType::Batched,
479                response_validation_fn,
480            )
481            .await
482            .with_context(|| "failed to request v1 reward accounts")?;
483
484        tracing::info!("Fetched v1 reward accounts for height: {height}, view: {view}");
485
486        Ok(response)
487    }
488
489    fn is_local(&self) -> bool {
490        false
491    }
492}