sequencer/api/
sql.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use anyhow::{bail, ensure, Context};
4use async_trait::async_trait;
5use committable::{Commitment, Committable};
6use espresso_types::{
7    get_l1_deposits,
8    v0_1::IterableFeeInfo,
9    v0_3::{
10        ChainConfig, RewardAccountProofV1, RewardAccountQueryDataV1, RewardAccountV1, RewardAmount,
11        RewardMerkleTreeV1, REWARD_MERKLE_TREE_V1_HEIGHT,
12    },
13    v0_4::{
14        RewardAccountProofV2, RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2,
15        REWARD_MERKLE_TREE_V2_HEIGHT,
16    },
17    BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2,
18    NodeState, ValidatedState,
19};
20use hotshot::traits::ValidatedState as _;
21use hotshot_query_service::{
22    availability::{BlockId, LeafId},
23    data_source::{
24        sql::{Config, SqlDataSource, Transaction},
25        storage::{
26            sql::{query_as, Db, TransactionMode, Write},
27            AvailabilityStorage, MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage,
28            SqlStorage,
29        },
30        VersionedDataSource,
31    },
32    merklized_state::{MerklizedState, Snapshot},
33    Resolvable,
34};
35use hotshot_types::{
36    data::{EpochNumber, QuorumProposalWrapper, ViewNumber},
37    message::Proposal,
38    traits::node_implementation::ConsensusTime,
39    utils::epoch_from_block_number,
40    vote::HasViewNumber,
41};
42use jf_merkle_tree_compat::{
43    prelude::MerkleNode, ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme,
44    LookupResult, MerkleTreeScheme,
45};
46use serde_json::Value;
47use sqlx::{Encode, Type};
48use vbs::version::StaticVersionType;
49
50use super::{
51    data_source::{Provider, SequencerDataSource},
52    BlocksFrontier,
53};
54use crate::{
55    api::RewardAccountProofDataSource,
56    catchup::{CatchupStorage, NullStateCatchup},
57    persistence::{sql::Options, ChainConfigPersistence},
58    state::compute_state_update,
59    util::BoundedJoinSet,
60    SeqTypes,
61};
62
63pub type DataSource = SqlDataSource<SeqTypes, Provider>;
64
65#[async_trait]
66impl SequencerDataSource for DataSource {
67    type Options = Options;
68
69    async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
70        let fetch_limit = opt.fetch_rate_limit;
71        let active_fetch_delay = opt.active_fetch_delay;
72        let chunk_fetch_delay = opt.chunk_fetch_delay;
73        let mut cfg = Config::try_from(&opt)?;
74
75        if reset {
76            cfg = cfg.reset_schema();
77        }
78
79        let mut builder = cfg.builder(provider).await?;
80
81        if let Some(limit) = fetch_limit {
82            builder = builder.with_rate_limit(limit);
83        }
84
85        if opt.lightweight {
86            tracing::warn!("enabling light weight mode..");
87            builder = builder.leaf_only();
88        }
89
90        if let Some(delay) = active_fetch_delay {
91            builder = builder.with_active_fetch_delay(delay);
92        }
93        if let Some(delay) = chunk_fetch_delay {
94            builder = builder.with_chunk_fetch_delay(delay);
95        }
96
97        if let Some(batch_size) = opt.types_migration_batch_size {
98            builder = builder.with_types_migration_batch_size(batch_size);
99        }
100
101        builder.build().await
102    }
103}
104
105impl RewardAccountProofDataSource for SqlStorage {
106    async fn load_v1_reward_account_proof(
107        &self,
108        height: u64,
109        account: RewardAccountV1,
110    ) -> anyhow::Result<RewardAccountQueryDataV1> {
111        let mut tx = self.read().await.context(format!(
112            "opening transaction to fetch v1 reward account {account:?}; height {height}"
113        ))?;
114
115        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
116            .await
117            .context("getting block height")? as u64;
118        ensure!(
119            block_height > 0,
120            "cannot get accounts for height {height}: no blocks available"
121        );
122
123        // Check if we have the desired state snapshot. If so, we can load the desired accounts
124        // directly.
125        if height < block_height {
126            let (tree, _) = load_v1_reward_accounts(self, height, &[account])
127                .await
128                .with_context(|| {
129                    format!("failed to load v1 reward account {account:?} at height {height}")
130                })?;
131
132            let (proof, balance) = RewardAccountProofV1::prove(&tree, account.into())
133                .with_context(|| {
134                    format!("reward account {account:?} not available at height {height}")
135                })?;
136
137            Ok(RewardAccountQueryDataV1 { balance, proof })
138        } else {
139            bail!(
140                "requested height {height} is not yet available (latest block height: \
141                 {block_height})"
142            );
143        }
144    }
145
146    async fn load_v2_reward_account_proof(
147        &self,
148        height: u64,
149        account: RewardAccountV2,
150    ) -> anyhow::Result<RewardAccountQueryDataV2> {
151        let mut tx = self.read().await.context(format!(
152            "opening transaction to fetch reward account {account:?}; height {height}"
153        ))?;
154
155        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
156            .await
157            .context("getting block height")? as u64;
158        ensure!(
159            block_height > 0,
160            "cannot get accounts for height {height}: no blocks available"
161        );
162
163        // Check if we have the desired state snapshot. If so, we can load the desired accounts
164        // directly.
165        if height < block_height {
166            let (tree, _) = load_v2_reward_accounts(self, height, &[account])
167                .await
168                .with_context(|| {
169                    format!("failed to load v2 reward account {account:?} at height {height}")
170                })?;
171
172            let (proof, balance) = RewardAccountProofV2::prove(&tree, account.into())
173                .with_context(|| {
174                    format!("reward account {account:?} not available at height {height}")
175                })?;
176
177            Ok(RewardAccountQueryDataV2 { balance, proof })
178        } else {
179            bail!(
180                "requested height {height} is not yet available (latest block height: \
181                 {block_height})"
182            );
183        }
184    }
185}
186impl CatchupStorage for SqlStorage {
187    async fn get_reward_accounts_v1(
188        &self,
189        instance: &NodeState,
190        height: u64,
191        view: ViewNumber,
192        accounts: &[RewardAccountV1],
193    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
194        let mut tx = self.read().await.context(format!(
195            "opening transaction to fetch v1 reward account {accounts:?}; height {height}"
196        ))?;
197
198        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
199            .await
200            .context("getting block height")? as u64;
201        ensure!(
202            block_height > 0,
203            "cannot get accounts for height {height}: no blocks available"
204        );
205
206        // Check if we have the desired state snapshot. If so, we can load the desired accounts
207        // directly.
208        if height < block_height {
209            load_v1_reward_accounts(self, height, accounts).await
210        } else {
211            let accounts: Vec<_> = accounts
212                .iter()
213                .map(|acct| RewardAccountV2::from(*acct))
214                .collect();
215            // If we do not have the exact snapshot we need, we can try going back to the last
216            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
217            let (state, leaf) = reconstruct_state(
218                instance,
219                self,
220                &mut tx,
221                block_height - 1,
222                view,
223                &[],
224                &accounts,
225            )
226            .await?;
227            Ok((state.reward_merkle_tree_v1, leaf))
228        }
229    }
230
231    async fn get_reward_accounts_v2(
232        &self,
233        instance: &NodeState,
234        height: u64,
235        view: ViewNumber,
236        accounts: &[RewardAccountV2],
237    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
238        let mut tx = self.read().await.context(format!(
239            "opening transaction to fetch reward account {accounts:?}; height {height}"
240        ))?;
241
242        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
243            .await
244            .context("getting block height")? as u64;
245        ensure!(
246            block_height > 0,
247            "cannot get accounts for height {height}: no blocks available"
248        );
249
250        // Check if we have the desired state snapshot. If so, we can load the desired accounts
251        // directly.
252        if height < block_height {
253            load_v2_reward_accounts(self, height, accounts).await
254        } else {
255            // If we do not have the exact snapshot we need, we can try going back to the last
256            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
257            let (state, leaf) = reconstruct_state(
258                instance,
259                self,
260                &mut tx,
261                block_height - 1,
262                view,
263                &[],
264                accounts,
265            )
266            .await?;
267            Ok((state.reward_merkle_tree_v2, leaf))
268        }
269    }
270
271    async fn get_all_reward_accounts(
272        &self,
273        height: u64,
274        offset: u64,
275        limit: u64,
276    ) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
277        let mut tx = self.read().await.context(format!(
278            "opening transaction to fetch all reward accounts; height {height}"
279        ))?;
280
281        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
282            .await
283            .context("getting block height")? as u64;
284        ensure!(
285            block_height > 0,
286            "cannot get accounts for height {height}: no blocks available"
287        );
288
289        ensure!(
290            height < block_height,
291            "requested height {height} is not yet available (latest block height: {block_height})"
292        );
293
294        let merklized_state_height = tx
295            .get_last_state_height()
296            .await
297            .context("getting merklized state height")? as u64;
298        ensure!(
299            height <= merklized_state_height,
300            "requested height {height} is not yet available. latest merklized state height: \
301             {merklized_state_height}"
302        );
303
304        let header = tx
305            .get_header(BlockId::<SeqTypes>::from(height as usize))
306            .await
307            .context(format!("header {height} not available"))?;
308
309        if header.version() < DrbAndHeaderUpgradeVersion::version() {
310            return Ok(Vec::new());
311        }
312
313        // get the latest balance for each account.
314        // use DISTINCT ON for Postgres
315        // use ROW_NUMBER() as DISTINCT ON is not supported for SQLite
316        #[cfg(not(feature = "embedded-db"))]
317        let query = format!(
318            "SELECT DISTINCT ON (idx) idx, entry
319               FROM {}
320              WHERE idx IS NOT NULL AND created <= $1
321              ORDER BY idx DESC, created DESC
322              LIMIT $2 OFFSET $3",
323            RewardMerkleTreeV2::state_type()
324        );
325
326        #[cfg(feature = "embedded-db")]
327        let query = format!(
328            "SELECT idx, entry FROM (
329                 SELECT idx, entry, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) as \
330             rn
331                   FROM {}
332                  WHERE created <= $1 AND idx IS NOT NULL
333             ) sub
334             WHERE rn = 1
335             ORDER BY idx DESC
336             LIMIT $2 OFFSET $3",
337            RewardMerkleTreeV2::state_type()
338        );
339
340        let rows = query_as::<(Value, Value)>(&query)
341            .bind(height as i64)
342            .bind(limit as i64)
343            .bind(offset as i64)
344            .fetch_all(tx.as_mut())
345            .await
346            .context("loading reward accounts from storage")?;
347
348        let mut accounts = Vec::new();
349        for (idx, entry) in rows {
350            let account: RewardAccountV2 =
351                serde_json::from_value(idx).context("deserializing reward account")?;
352            let balance: RewardAmount = serde_json::from_value(entry).context(format!(
353                "deserializing reward balance for account {account}"
354            ))?;
355
356            accounts.push((account, balance));
357        }
358
359        Ok(accounts)
360    }
361
362    async fn get_accounts(
363        &self,
364        instance: &NodeState,
365        height: u64,
366        view: ViewNumber,
367        accounts: &[FeeAccount],
368    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
369        let mut tx = self.read().await.context(format!(
370            "opening transaction to fetch account {accounts:?}; height {height}"
371        ))?;
372
373        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
374            .await
375            .context("getting block height")? as u64;
376        ensure!(
377            block_height > 0,
378            "cannot get accounts for height {height}: no blocks available"
379        );
380
381        // Check if we have the desired state snapshot. If so, we can load the desired accounts
382        // directly.
383        if height < block_height {
384            load_accounts(&mut tx, height, accounts).await
385        } else {
386            // If we do not have the exact snapshot we need, we can try going back to the last
387            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
388            let (state, leaf) = reconstruct_state(
389                instance,
390                self,
391                &mut tx,
392                block_height - 1,
393                view,
394                accounts,
395                &[],
396            )
397            .await?;
398            Ok((state.fee_merkle_tree, leaf))
399        }
400    }
401
402    async fn get_frontier(
403        &self,
404        instance: &NodeState,
405        height: u64,
406        view: ViewNumber,
407    ) -> anyhow::Result<BlocksFrontier> {
408        let mut tx = self.read().await.context(format!(
409            "opening transaction to fetch frontier at height {height}"
410        ))?;
411
412        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
413            .await
414            .context("getting block height")? as u64;
415        ensure!(
416            block_height > 0,
417            "cannot get frontier for height {height}: no blocks available"
418        );
419
420        // Check if we have the desired state snapshot. If so, we can load the desired frontier
421        // directly.
422        if height < block_height {
423            load_frontier(&mut tx, height).await
424        } else {
425            // If we do not have the exact snapshot we need, we can try going back to the last
426            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
427            let (state, _) =
428                reconstruct_state(instance, self, &mut tx, block_height - 1, view, &[], &[])
429                    .await?;
430            match state.block_merkle_tree.lookup(height - 1) {
431                LookupResult::Ok(_, proof) => Ok(proof),
432                _ => {
433                    bail!(
434                        "state snapshot {view:?},{height} was found but does not contain frontier \
435                         at height {}; this should not be possible",
436                        height - 1
437                    );
438                },
439            }
440        }
441    }
442
443    async fn get_chain_config(
444        &self,
445        commitment: Commitment<ChainConfig>,
446    ) -> anyhow::Result<ChainConfig> {
447        let mut tx = self.read().await.context(format!(
448            "opening transaction to fetch chain config {commitment}"
449        ))?;
450        load_chain_config(&mut tx, commitment).await
451    }
452
453    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
454        let mut tx = self
455            .read()
456            .await
457            .context(format!("opening transaction to fetch leaf at {height}"))?;
458        let leaf = tx
459            .get_leaf((height as usize).into())
460            .await
461            .context(format!("leaf {height} not available"))?;
462        let mut last_leaf: Leaf2 = leaf.leaf().clone();
463        let mut chain = vec![last_leaf.clone()];
464        let mut h = height + 1;
465
466        loop {
467            let lqd = tx.get_leaf((h as usize).into()).await?;
468            let leaf = lqd.leaf();
469
470            if leaf.justify_qc().view_number() == last_leaf.view_number() {
471                chain.push(leaf.clone());
472            } else {
473                h += 1;
474                continue;
475            }
476
477            // just one away from deciding
478            if leaf.view_number() == last_leaf.view_number() + 1 {
479                last_leaf = leaf.clone();
480                h += 1;
481                break;
482            }
483            h += 1;
484            last_leaf = leaf.clone();
485        }
486
487        loop {
488            let lqd = tx.get_leaf((h as usize).into()).await?;
489            let leaf = lqd.leaf();
490            if leaf.justify_qc().view_number() == last_leaf.view_number() {
491                chain.push(leaf.clone());
492                break;
493            }
494            h += 1;
495        }
496
497        Ok(chain)
498    }
499}
500
501impl RewardAccountProofDataSource for DataSource {
502    async fn load_v1_reward_account_proof(
503        &self,
504        height: u64,
505        account: RewardAccountV1,
506    ) -> anyhow::Result<RewardAccountQueryDataV1> {
507        self.as_ref()
508            .load_v1_reward_account_proof(height, account)
509            .await
510    }
511
512    async fn load_v2_reward_account_proof(
513        &self,
514        height: u64,
515        account: RewardAccountV2,
516    ) -> anyhow::Result<RewardAccountQueryDataV2> {
517        self.as_ref()
518            .load_v2_reward_account_proof(height, account)
519            .await
520    }
521}
522
523impl CatchupStorage for DataSource {
524    async fn get_accounts(
525        &self,
526        instance: &NodeState,
527        height: u64,
528        view: ViewNumber,
529        accounts: &[FeeAccount],
530    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
531        self.as_ref()
532            .get_accounts(instance, height, view, accounts)
533            .await
534    }
535
536    async fn get_reward_accounts_v2(
537        &self,
538        instance: &NodeState,
539        height: u64,
540        view: ViewNumber,
541        accounts: &[RewardAccountV2],
542    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
543        self.as_ref()
544            .get_reward_accounts_v2(instance, height, view, accounts)
545            .await
546    }
547
548    async fn get_reward_accounts_v1(
549        &self,
550        instance: &NodeState,
551        height: u64,
552        view: ViewNumber,
553        accounts: &[RewardAccountV1],
554    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
555        self.as_ref()
556            .get_reward_accounts_v1(instance, height, view, accounts)
557            .await
558    }
559
560    async fn get_frontier(
561        &self,
562        instance: &NodeState,
563        height: u64,
564        view: ViewNumber,
565    ) -> anyhow::Result<BlocksFrontier> {
566        self.as_ref().get_frontier(instance, height, view).await
567    }
568
569    async fn get_chain_config(
570        &self,
571        commitment: Commitment<ChainConfig>,
572    ) -> anyhow::Result<ChainConfig> {
573        self.as_ref().get_chain_config(commitment).await
574    }
575    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
576        self.as_ref().get_leaf_chain(height).await
577    }
578
579    async fn get_all_reward_accounts(
580        &self,
581        height: u64,
582        offset: u64,
583        limit: u64,
584    ) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
585        self.as_ref()
586            .get_all_reward_accounts(height, offset, limit)
587            .await
588    }
589}
590
591#[async_trait]
592impl ChainConfigPersistence for Transaction<Write> {
593    async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()> {
594        let commitment = chain_config.commitment();
595        let data = bincode::serialize(&chain_config)?;
596        self.upsert(
597            "chain_config",
598            ["commitment", "data"],
599            ["commitment"],
600            [(commitment.to_string(), data)],
601        )
602        .await
603    }
604}
605
606async fn load_frontier<Mode: TransactionMode>(
607    tx: &mut Transaction<Mode>,
608    height: u64,
609) -> anyhow::Result<BlocksFrontier> {
610    tx.get_path(
611        Snapshot::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::Index(height),
612        height
613            .checked_sub(1)
614            .ok_or(anyhow::anyhow!("Subtract with overflow ({height})!"))?,
615    )
616    .await
617    .context(format!("fetching frontier at height {height}"))
618}
619
620async fn load_v1_reward_accounts(
621    db: &SqlStorage,
622    height: u64,
623    accounts: &[RewardAccountV1],
624) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
625    // Open a new read transaction to get the leaf
626    let mut tx = db
627        .read()
628        .await
629        .with_context(|| "failed to open read transaction")?;
630
631    // Get the leaf from the database
632    let leaf = tx
633        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
634        .await
635        .context(format!("leaf {height} not available"))?;
636    let header = leaf.header();
637
638    if header.version() < EpochVersion::version()
639        || header.version() >= DrbAndHeaderUpgradeVersion::version()
640    {
641        return Ok((
642            RewardMerkleTreeV1::new(REWARD_MERKLE_TREE_V1_HEIGHT),
643            leaf.leaf().clone(),
644        ));
645    }
646
647    // Get the merkle root from the header and create a snapshot from it
648    let merkle_root = header.reward_merkle_tree_root().unwrap_left();
649    let mut snapshot = RewardMerkleTreeV1::from_commitment(merkle_root);
650
651    // Create a bounded join set with 10 concurrent tasks
652    let mut join_set = BoundedJoinSet::new(10);
653
654    // Create a map from task ID to account
655    let mut task_id_to_account = HashMap::new();
656
657    // Loop through each account, spawning a task to get the path for the account
658    for account in accounts {
659        // Clone things we will need in the closure
660        let db_clone = db.clone();
661        let account_clone = *account;
662        let header_height = header.height();
663
664        // Create the closure that will get the path for the account
665        let func = async move {
666            // Open a new transaction
667            let mut tx = db_clone
668                .read()
669                .await
670                .with_context(|| "failed to open read transaction")?;
671
672            // Get the path for the account
673            let proof = tx
674                .get_path(
675                    Snapshot::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::Index(
676                        header_height,
677                    ),
678                    account_clone,
679                )
680                .await
681                .with_context(|| {
682                    format!(
683                        "failed to get path for v1 reward account {account_clone:?}; height \
684                         {height}"
685                    )
686                })?;
687
688            Ok::<_, anyhow::Error>(proof)
689        };
690
691        // Spawn the task
692        let id = join_set.spawn(func).id();
693
694        // Add the task ID to the account map
695        task_id_to_account.insert(id, account);
696    }
697
698    // Wait for each task to complete
699    while let Some(result) = join_set.join_next_with_id().await {
700        // Get the inner result (past the join error)
701        let (id, result) = result.with_context(|| "failed to join task")?;
702
703        // Get the proof from the result
704        let proof = result?;
705
706        // Get the account from the task ID to account map
707        let account = task_id_to_account
708            .remove(&id)
709            .with_context(|| "task ID for spawned task not found")?;
710
711        match proof.proof.first().with_context(|| {
712            format!("empty proof for v1 reward account {account:?}; height {height}")
713        })? {
714            MerkleNode::Leaf { pos, elem, .. } => {
715                snapshot.remember(*pos, *elem, proof)?;
716            },
717            MerkleNode::Empty => {
718                snapshot.non_membership_remember(*account, proof)?;
719            },
720            _ => {
721                bail!("invalid proof for v1 reward account {account:?}; height {height}");
722            },
723        }
724    }
725
726    Ok((snapshot, leaf.leaf().clone()))
727}
728
729/// Loads reward accounts for new reward merkle tree (V4).
730async fn load_v2_reward_accounts(
731    db: &SqlStorage,
732    height: u64,
733    accounts: &[RewardAccountV2],
734) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
735    // Open a new read transaction to get the leaf
736    let mut tx = db
737        .read()
738        .await
739        .with_context(|| "failed to open read transaction")?;
740
741    // Get the leaf from the database
742    let leaf = tx
743        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
744        .await
745        .with_context(|| format!("leaf {height} not available"))?;
746    let header = leaf.header();
747
748    // If the header is before the epoch version, we can return the new reward merkle tree
749    if header.version() <= EpochVersion::version() {
750        return Ok((
751            RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT),
752            leaf.leaf().clone(),
753        ));
754    }
755
756    // Get the merkle root from the header and create a snapshot from it
757    let merkle_root = header.reward_merkle_tree_root().unwrap_right();
758    let mut snapshot = RewardMerkleTreeV2::from_commitment(merkle_root);
759
760    // Create a bounded join set with 10 concurrent tasks
761    let mut join_set = BoundedJoinSet::new(10);
762
763    // Create a map from task ID to account
764    let mut task_id_to_account = HashMap::new();
765
766    // Loop through each account, spawning a task to get the path for the account
767    for account in accounts {
768        // Clone things we will need in the closure
769        let db_clone = db.clone();
770        let account_clone = *account;
771        let header_height = header.height();
772
773        // Create the closure that will get the path for the account
774        let func = async move {
775            // Open a new transaction
776            let mut tx = db_clone
777                .read()
778                .await
779                .with_context(|| "failed to open read transaction")?;
780
781            // Get the path for the account
782            let proof = tx
783                .get_path(
784                    Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(
785                        header_height,
786                    ),
787                    account_clone,
788                )
789                .await
790                .with_context(|| {
791                    format!(
792                        "failed to get path for v2 reward account {account_clone:?}; height \
793                         {height}"
794                    )
795                })?;
796
797            Ok::<_, anyhow::Error>(proof)
798        };
799
800        // Spawn the task
801        let id = join_set.spawn(func).id();
802
803        // Add the task ID to the account map
804        task_id_to_account.insert(id, account);
805    }
806
807    // Wait for each task to complete
808    while let Some(result) = join_set.join_next_with_id().await {
809        // Get the inner result (past the join error)
810        let (id, result) = result.with_context(|| "failed to join task")?;
811
812        // Get the proof from the result
813        let proof = result?;
814
815        // Get the account from the task ID to account map
816        let account = task_id_to_account
817            .remove(&id)
818            .with_context(|| "task ID for spawned task not found")?;
819
820        match proof.proof.first().with_context(|| {
821            format!("empty proof for v2 reward account {account:?}; height {height}")
822        })? {
823            MerkleNode::Leaf { pos, elem, .. } => {
824                snapshot.remember(*pos, *elem, proof)?;
825            },
826            MerkleNode::Empty => {
827                snapshot.non_membership_remember(*account, proof)?;
828            },
829            _ => {
830                bail!("invalid proof for v2 reward account {account:?}; height {height}");
831            },
832        }
833    }
834
835    Ok((snapshot, leaf.leaf().clone()))
836}
837
838async fn load_accounts<Mode: TransactionMode>(
839    tx: &mut Transaction<Mode>,
840    height: u64,
841    accounts: &[FeeAccount],
842) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
843    let leaf = tx
844        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
845        .await
846        .context(format!("leaf {height} not available"))?;
847    let header = leaf.header();
848
849    let mut snapshot = FeeMerkleTree::from_commitment(header.fee_merkle_tree_root());
850    for account in accounts {
851        let proof = tx
852            .get_path(
853                Snapshot::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::Index(
854                    header.height(),
855                ),
856                *account,
857            )
858            .await
859            .context(format!(
860                "fetching account {account}; height {}",
861                header.height()
862            ))?;
863        match proof.proof.first().context(format!(
864            "empty proof for account {account}; height {}",
865            header.height()
866        ))? {
867            MerkleNode::Leaf { pos, elem, .. } => {
868                snapshot.remember(*pos, *elem, proof)?;
869            },
870            MerkleNode::Empty => {
871                snapshot.non_membership_remember(*account, proof)?;
872            },
873            _ => {
874                bail!("Invalid proof");
875            },
876        }
877    }
878
879    Ok((snapshot, leaf.leaf().clone()))
880}
881
882async fn load_chain_config<Mode: TransactionMode>(
883    tx: &mut Transaction<Mode>,
884    commitment: Commitment<ChainConfig>,
885) -> anyhow::Result<ChainConfig> {
886    let (data,) = query_as::<(Vec<u8>,)>("SELECT data from chain_config where commitment = $1")
887        .bind(commitment.to_string())
888        .fetch_one(tx.as_mut())
889        .await
890        .unwrap();
891
892    bincode::deserialize(&data[..]).context("failed to deserialize")
893}
894
895/// Reconstructs the `ValidatedState` from a specific block height up to a given view.
896///
897/// This loads all required fee and reward accounts into the Merkle tree before applying the
898/// State Transition Function (STF), preventing recursive catchup during STF replay.
899///
900/// Note: Even if the primary goal is to catch up the block Merkle tree,
901/// fee and reward header dependencies must still be present beforehand
902/// This is because reconstructing the `ValidatedState` involves replaying the STF over a
903/// range of leaves, and the STF requires all associated data to be present in the `ValidatedState`;
904/// otherwise, it will attempt to trigger catchup itself.
905#[tracing::instrument(skip(instance, tx))]
906pub(crate) async fn reconstruct_state<Mode: TransactionMode>(
907    instance: &NodeState,
908    db: &SqlStorage,
909    tx: &mut Transaction<Mode>,
910    from_height: u64,
911    to_view: ViewNumber,
912    fee_accounts: &[FeeAccount],
913    reward_accounts: &[RewardAccountV2],
914) -> anyhow::Result<(ValidatedState, Leaf2)> {
915    tracing::info!("attempting to reconstruct fee state");
916    let from_leaf = tx
917        .get_leaf((from_height as usize).into())
918        .await
919        .context(format!("leaf {from_height} not available"))?;
920    let from_leaf: Leaf2 = from_leaf.leaf().clone();
921    ensure!(
922        from_leaf.view_number() < to_view,
923        "state reconstruction: starting state {:?} must be before ending state {to_view:?}",
924        from_leaf.view_number(),
925    );
926
927    // Get the sequence of headers we will be applying to compute the latest state.
928    let mut leaves = VecDeque::new();
929    let to_leaf = get_leaf_from_proposal(tx, "view = $1", &(to_view.u64() as i64))
930        .await
931        .context(format!(
932            "unable to reconstruct state because leaf {to_view:?} is not available"
933        ))?;
934    let mut parent = to_leaf.parent_commitment();
935    tracing::debug!(?to_leaf, ?parent, view = ?to_view, "have required leaf");
936    leaves.push_front(to_leaf.clone());
937    while parent != Committable::commit(&from_leaf) {
938        let leaf = get_leaf_from_proposal(tx, "leaf_hash = $1", &parent.to_string())
939            .await
940            .context(format!(
941                "unable to reconstruct state because leaf {parent} is not available"
942            ))?;
943        parent = leaf.parent_commitment();
944        tracing::debug!(?leaf, ?parent, "have required leaf");
945        leaves.push_front(leaf);
946    }
947
948    // Get the initial state.
949    let mut parent = from_leaf;
950    let mut state = ValidatedState::from_header(parent.block_header());
951
952    // Pre-load the state with the accounts we care about to ensure they will be present in the
953    // final state.
954    let mut catchup = NullStateCatchup::default();
955
956    let mut fee_accounts = fee_accounts.iter().copied().collect::<HashSet<_>>();
957    // Add in all the accounts we will need to replay any of the headers, to ensure that we don't
958    // need to do catchup recursively.
959    tracing::info!(
960        "reconstructing fee accounts state for from height {from_height} to view {to_view}"
961    );
962
963    let dependencies =
964        fee_header_dependencies(&mut catchup, tx, instance, &parent, &leaves).await?;
965    fee_accounts.extend(dependencies);
966    let fee_accounts = fee_accounts.into_iter().collect::<Vec<_>>();
967    state.fee_merkle_tree = load_accounts(tx, from_height, &fee_accounts)
968        .await
969        .context("unable to reconstruct state because accounts are not available at origin")?
970        .0;
971    ensure!(
972        state.fee_merkle_tree.commitment() == parent.block_header().fee_merkle_tree_root(),
973        "loaded fee state does not match parent header"
974    );
975
976    tracing::info!(
977        "reconstructing reward accounts for from height {from_height} to view {to_view}"
978    );
979
980    let mut reward_accounts = reward_accounts.iter().copied().collect::<HashSet<_>>();
981
982    // Collect all reward account dependencies needed for replaying the STF.
983    // These accounts must be preloaded into the reward Merkle tree to prevent recursive catchups.
984    let dependencies = reward_header_dependencies(instance, &leaves).await?;
985    reward_accounts.extend(dependencies);
986    let reward_accounts = reward_accounts.into_iter().collect::<Vec<_>>();
987
988    // Load all required reward accounts and update the reward Merkle tree.
989    match parent.block_header().reward_merkle_tree_root() {
990        either::Either::Left(expected_root) => {
991            let accts = reward_accounts
992                .into_iter()
993                .map(RewardAccountV1::from)
994                .collect::<Vec<_>>();
995            state.reward_merkle_tree_v1 = load_v1_reward_accounts(db, from_height, &accts)
996                .await
997                .context(
998                    "unable to reconstruct state because v1 reward accounts are not available at \
999                     origin",
1000                )?
1001                .0;
1002            ensure!(
1003                state.reward_merkle_tree_v1.commitment() == expected_root,
1004                "loaded v1 reward state does not match parent header"
1005            );
1006        },
1007        either::Either::Right(expected_root) => {
1008            state.reward_merkle_tree_v2 =
1009                load_v2_reward_accounts(db, from_height, &reward_accounts)
1010                    .await
1011                    .context(
1012                        "unable to reconstruct state because v2 reward accounts are not available \
1013                         at origin",
1014                    )?
1015                    .0;
1016            ensure!(
1017                state.reward_merkle_tree_v2.commitment() == expected_root,
1018                "loaded reward state does not match parent header"
1019            );
1020        },
1021    }
1022
1023    // We need the blocks frontier as well, to apply the STF.
1024    let frontier = load_frontier(tx, from_height)
1025        .await
1026        .context("unable to reconstruct state because frontier is not available at origin")?;
1027    match frontier
1028        .proof
1029        .first()
1030        .context("empty proof for frontier at origin")?
1031    {
1032        MerkleNode::Leaf { pos, elem, .. } => state
1033            .block_merkle_tree
1034            .remember(*pos, *elem, frontier)
1035            .context("failed to remember frontier")?,
1036        _ => bail!("invalid frontier proof"),
1037    }
1038
1039    // Apply subsequent headers to compute the later state.
1040    for proposal in leaves {
1041        state = compute_state_update(&state, instance, &catchup, &parent, &proposal)
1042            .await
1043            .context(format!(
1044                "unable to reconstruct state because state update {} failed",
1045                proposal.height(),
1046            ))?
1047            .0;
1048        parent = proposal;
1049    }
1050
1051    tracing::info!(from_height, ?to_view, "successfully reconstructed state");
1052    Ok((state, to_leaf))
1053}
1054
1055/// Get the dependencies needed to apply the STF to the given list of headers.
1056///
1057/// Returns
1058/// * A state catchup implementation seeded with all the chain configs required to apply the headers
1059///   in `leaves`
1060/// * The set of accounts that must be preloaded to apply these headers
1061async fn fee_header_dependencies<Mode: TransactionMode>(
1062    catchup: &mut NullStateCatchup,
1063    tx: &mut Transaction<Mode>,
1064    instance: &NodeState,
1065    mut parent: &Leaf2,
1066    leaves: impl IntoIterator<Item = &Leaf2>,
1067) -> anyhow::Result<HashSet<FeeAccount>> {
1068    let mut accounts = HashSet::default();
1069
1070    for proposal in leaves {
1071        let header = proposal.block_header();
1072        let height = header.height();
1073        let view = proposal.view_number();
1074        tracing::debug!(height, ?view, "fetching dependencies for proposal");
1075
1076        let header_cf = header.chain_config();
1077        let chain_config = if header_cf.commit() == instance.chain_config.commit() {
1078            instance.chain_config
1079        } else {
1080            match header_cf.resolve() {
1081                Some(cf) => cf,
1082                None => {
1083                    tracing::info!(
1084                        height,
1085                        ?view,
1086                        commit = %header_cf.commit(),
1087                        "chain config not available, attempting to load from storage",
1088                    );
1089                    let cf = load_chain_config(tx, header_cf.commit())
1090                        .await
1091                        .context(format!(
1092                            "loading chain config {} for header {},{:?}",
1093                            header_cf.commit(),
1094                            header.height(),
1095                            proposal.view_number()
1096                        ))?;
1097
1098                    // If we had to fetch a chain config now, store it in the catchup implementation
1099                    // so the STF will be able to look it up later.
1100                    catchup.add_chain_config(cf);
1101                    cf
1102                },
1103            }
1104        };
1105
1106        accounts.insert(chain_config.fee_recipient);
1107        accounts.extend(
1108            get_l1_deposits(instance, header, parent, chain_config.fee_contract)
1109                .await
1110                .into_iter()
1111                .map(|fee| fee.account()),
1112        );
1113        accounts.extend(header.fee_info().accounts());
1114        parent = proposal;
1115    }
1116    Ok(accounts)
1117}
1118
1119/// Identifies all reward accounts required to replay the State Transition Function
1120/// for the given leaf proposals. These accounts should be present in the Merkle tree
1121/// *before* applying the STF to avoid recursive catchup (i.e., STF triggering another catchup).
1122async fn reward_header_dependencies(
1123    instance: &NodeState,
1124    leaves: impl IntoIterator<Item = &Leaf2>,
1125) -> anyhow::Result<HashSet<RewardAccountV2>> {
1126    let mut reward_accounts = HashSet::default();
1127    let epoch_height = instance.epoch_height;
1128
1129    let Some(epoch_height) = epoch_height else {
1130        tracing::info!("epoch height is not set. returning empty reward_header_dependencies");
1131        return Ok(HashSet::new());
1132    };
1133
1134    let coordinator = instance.coordinator.clone();
1135    let membership_lock = coordinator.membership().read().await;
1136    let first_epoch = membership_lock.first_epoch();
1137    drop(membership_lock);
1138    // add all the chain configs needed to apply STF to headers to the catchup
1139    for proposal in leaves {
1140        let header = proposal.block_header();
1141
1142        let height = header.height();
1143        let view = proposal.view_number();
1144        tracing::debug!(height, ?view, "fetching dependencies for proposal");
1145
1146        let version = header.version();
1147        // Skip if version is less than epoch version
1148        if version < EpochVersion::version() {
1149            continue;
1150        }
1151
1152        let first_epoch = first_epoch.context("first epoch not found")?;
1153
1154        let proposal_epoch = EpochNumber::new(epoch_from_block_number(height, epoch_height));
1155
1156        // reward distribution starts third epoch onwards
1157        if proposal_epoch <= first_epoch + 1 {
1158            continue;
1159        }
1160
1161        let epoch_membership = match coordinator.membership_for_epoch(Some(proposal_epoch)).await {
1162            Ok(e) => e,
1163            Err(err) => {
1164                tracing::info!(
1165                    "failed to get membership for epoch={proposal_epoch:?}. err={err:#}"
1166                );
1167
1168                coordinator
1169                    .wait_for_catchup(proposal_epoch)
1170                    .await
1171                    .context(format!("failed to catchup for epoch={proposal_epoch}"))?
1172            },
1173        };
1174
1175        let leader = epoch_membership.leader(proposal.view_number()).await?;
1176        let membership_lock = coordinator.membership().read().await;
1177        let validator = membership_lock.get_validator_config(&proposal_epoch, leader)?;
1178        drop(membership_lock);
1179
1180        reward_accounts.insert(RewardAccountV2(validator.account));
1181
1182        let delegators: Vec<RewardAccountV2> = validator
1183            .delegators
1184            .keys()
1185            .map(|d| RewardAccountV2(*d))
1186            .collect();
1187
1188        reward_accounts.extend(delegators);
1189    }
1190    Ok(reward_accounts)
1191}
1192
1193async fn get_leaf_from_proposal<Mode, P>(
1194    tx: &mut Transaction<Mode>,
1195    where_clause: &str,
1196    param: P,
1197) -> anyhow::Result<Leaf2>
1198where
1199    P: Type<Db> + for<'q> Encode<'q, Db>,
1200{
1201    let (data,) = query_as::<(Vec<u8>,)>(&format!(
1202        "SELECT data FROM quorum_proposals2 WHERE {where_clause} LIMIT 1",
1203    ))
1204    .bind(param)
1205    .fetch_one(tx.as_mut())
1206    .await?;
1207    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1208        bincode::deserialize(&data)?;
1209    Ok(Leaf2::from_quorum_proposal(&proposal.data))
1210}
1211
1212#[cfg(any(test, feature = "testing"))]
1213pub(crate) mod impl_testable_data_source {
1214
1215    use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
1216
1217    use super::*;
1218    use crate::api::{self, data_source::testing::TestableSequencerDataSource};
1219
1220    pub fn tmp_options(db: &TmpDb) -> Options {
1221        #[cfg(not(feature = "embedded-db"))]
1222        {
1223            let opt = crate::persistence::sql::PostgresOptions {
1224                port: Some(db.port()),
1225                host: Some(db.host()),
1226                user: Some("postgres".into()),
1227                password: Some("password".into()),
1228                ..Default::default()
1229            };
1230
1231            opt.into()
1232        }
1233
1234        #[cfg(feature = "embedded-db")]
1235        {
1236            let opt = crate::persistence::sql::SqliteOptions { path: db.path() };
1237            opt.into()
1238        }
1239    }
1240
1241    #[async_trait]
1242    impl TestableSequencerDataSource for DataSource {
1243        type Storage = TmpDb;
1244
1245        async fn create_storage() -> Self::Storage {
1246            TmpDb::init().await
1247        }
1248
1249        fn persistence_options(storage: &Self::Storage) -> Self::Options {
1250            tmp_options(storage)
1251        }
1252
1253        fn leaf_only_ds_options(
1254            storage: &Self::Storage,
1255            opt: api::Options,
1256        ) -> anyhow::Result<api::Options> {
1257            let mut ds_opts = tmp_options(storage);
1258            ds_opts.lightweight = true;
1259            Ok(opt.query_sql(Default::default(), ds_opts))
1260        }
1261
1262        fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
1263            opt.query_sql(Default::default(), tmp_options(storage))
1264        }
1265    }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270    use alloy::primitives::Address;
1271    use espresso_types::{
1272        v0_3::RewardAmount,
1273        v0_4::{RewardAccountV2, RewardMerkleTreeV2, REWARD_MERKLE_TREE_V2_HEIGHT},
1274    };
1275    use hotshot_query_service::{
1276        data_source::{
1277            sql::Config,
1278            storage::{
1279                sql::{
1280                    testing::TmpDb, SqlStorage, StorageConnectionType,
1281                    Transaction as SqlTransaction, Write,
1282                },
1283                MerklizedStateStorage,
1284            },
1285            Transaction, VersionedDataSource,
1286        },
1287        merklized_state::{MerklizedState, Snapshot, UpdateStateData},
1288    };
1289    use jf_merkle_tree_compat::{
1290        LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
1291    };
1292
1293    use super::impl_testable_data_source::tmp_options;
1294    use crate::SeqTypes;
1295
1296    fn make_reward_account(i: usize) -> RewardAccountV2 {
1297        let mut addr_bytes = [0u8; 20];
1298        addr_bytes[16..20].copy_from_slice(&(i as u32).to_be_bytes());
1299        RewardAccountV2(Address::from(addr_bytes))
1300    }
1301
1302    async fn insert_test_header(
1303        tx: &mut SqlTransaction<Write>,
1304        block_height: u64,
1305        reward_tree: &RewardMerkleTreeV2,
1306    ) {
1307        let reward_commitment = serde_json::to_value(reward_tree.commitment()).unwrap();
1308        let test_data = serde_json::json!({
1309            "block_merkle_tree_root": format!("block_root_{}", block_height),
1310            "fee_merkle_tree_root": format!("fee_root_{}", block_height),
1311            "fields": {
1312                RewardMerkleTreeV2::header_state_commitment_field(): reward_commitment
1313            }
1314        });
1315        tx.upsert(
1316            "header",
1317            ["height", "hash", "payload_hash", "timestamp", "data"],
1318            ["height"],
1319            [(
1320                block_height as i64,
1321                format!("hash_{}", block_height),
1322                format!("payload_{}", block_height),
1323                block_height as i64,
1324                test_data,
1325            )],
1326        )
1327        .await
1328        .unwrap();
1329    }
1330
1331    async fn batch_insert_proofs(
1332        tx: &mut SqlTransaction<Write>,
1333        reward_tree: &RewardMerkleTreeV2,
1334        accounts: &[RewardAccountV2],
1335        block_height: u64,
1336    ) {
1337        let proofs_and_paths: Vec<_> = accounts
1338            .iter()
1339            .map(|account| {
1340                let proof = match reward_tree.universal_lookup(*account) {
1341                    LookupResult::Ok(_, proof) => proof,
1342                    LookupResult::NotInMemory => panic!("account not in memory"),
1343                    LookupResult::NotFound(proof) => proof,
1344                };
1345                let traversal_path = <RewardAccountV2 as ToTraversalPath<
1346                    { RewardMerkleTreeV2::ARITY },
1347                >>::to_traversal_path(
1348                    account, reward_tree.height()
1349                );
1350                (proof, traversal_path)
1351            })
1352            .collect();
1353
1354        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes_batch(
1355            tx,
1356            proofs_and_paths,
1357            block_height,
1358        )
1359        .await
1360        .expect("failed to batch insert proofs");
1361    }
1362
1363    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1364    async fn test_reward_accounts_batch_insertion() {
1365        // Batch insertion of 1000 accounts at height 1
1366        // Balance updates for some accounts at height 2
1367        // New accounts added at height 2
1368        // More balance updates at height 3
1369        // Querying correct balances at each height snapshot
1370
1371        let db = TmpDb::init().await;
1372        let opt = tmp_options(&db);
1373        let cfg = Config::try_from(&opt).expect("failed to create config from options");
1374        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
1375            .await
1376            .expect("failed to connect to storage");
1377
1378        let num_initial_accounts = 1000usize;
1379
1380        let initial_accounts: Vec<RewardAccountV2> =
1381            (0..num_initial_accounts).map(make_reward_account).collect();
1382
1383        tracing::info!(
1384            "Height 1: Inserting {} initial accounts",
1385            num_initial_accounts
1386        );
1387
1388        let mut reward_tree_h1 = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
1389
1390        for (i, account) in initial_accounts.iter().enumerate() {
1391            let reward_amount = RewardAmount::from(((i + 1) * 1000) as u64);
1392            reward_tree_h1.update(*account, reward_amount).unwrap();
1393        }
1394
1395        let mut tx = storage.write().await.unwrap();
1396        insert_test_header(&mut tx, 1, &reward_tree_h1).await;
1397        batch_insert_proofs(&mut tx, &reward_tree_h1, &initial_accounts, 1).await;
1398
1399        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 1)
1400            .await
1401            .unwrap();
1402        tx.commit().await.unwrap();
1403
1404        tracing::info!("Height 2: Updating balances and adding new accounts");
1405
1406        let mut reward_tree_h2 = reward_tree_h1.clone();
1407
1408        // Update balances for accounts 0-99
1409        let updated_accounts_h2: Vec<RewardAccountV2> = (0..100).map(make_reward_account).collect();
1410        for (i, account) in updated_accounts_h2.iter().enumerate() {
1411            let new_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1412            reward_tree_h2.update(*account, new_reward).unwrap();
1413        }
1414
1415        // Add 100 new accounts (1000..1099)
1416        let new_accounts_h2: Vec<RewardAccountV2> = (1000..1100).map(make_reward_account).collect();
1417        for (i, account) in new_accounts_h2.iter().enumerate() {
1418            let reward_amount = RewardAmount::from(((i + 1001) * 500) as u64);
1419            reward_tree_h2.update(*account, reward_amount).unwrap();
1420        }
1421
1422        let mut changed_accounts_h2 = updated_accounts_h2.clone();
1423        changed_accounts_h2.extend(new_accounts_h2.clone());
1424
1425        let mut tx = storage.write().await.unwrap();
1426        insert_test_header(&mut tx, 2, &reward_tree_h2).await;
1427        batch_insert_proofs(&mut tx, &reward_tree_h2, &changed_accounts_h2, 2).await;
1428
1429        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 2)
1430            .await
1431            .unwrap();
1432        tx.commit().await.unwrap();
1433
1434        tracing::info!("Height 3: More balance updates");
1435
1436        let mut reward_tree_h3 = reward_tree_h2.clone();
1437
1438        // Update balances for accounts 500-599
1439        let updated_accounts_h3: Vec<RewardAccountV2> =
1440            (500..600).map(make_reward_account).collect();
1441        for (i, account) in updated_accounts_h3.iter().enumerate() {
1442            let new_reward = RewardAmount::from(((500 + i + 1) * 3000) as u64);
1443            reward_tree_h3.update(*account, new_reward).unwrap();
1444        }
1445
1446        let mut tx = storage.write().await.unwrap();
1447        insert_test_header(&mut tx, 3, &reward_tree_h3).await;
1448        batch_insert_proofs(&mut tx, &reward_tree_h3, &updated_accounts_h3, 3).await;
1449
1450        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 3)
1451            .await
1452            .unwrap();
1453        tx.commit().await.unwrap();
1454
1455        tracing::info!("Verifying all account proofs at each height");
1456
1457        // Verify height=1
1458        // All 1000 initial accounts
1459        let snapshot_h1 =
1460            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(1);
1461        for i in 0..num_initial_accounts {
1462            let account = make_reward_account(i);
1463            let proof = storage
1464                .read()
1465                .await
1466                .unwrap()
1467                .get_path(snapshot_h1, account)
1468                .await
1469                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h1: {e}"));
1470
1471            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1472            let actual_reward = proof.elem().expect("account should exist");
1473            assert_eq!(*actual_reward, expected_reward,);
1474
1475            assert!(
1476                RewardMerkleTreeV2::verify(reward_tree_h1.commitment(), account, proof)
1477                    .unwrap()
1478                    .is_ok(),
1479            );
1480        }
1481        tracing::info!("Verified height=1 {num_initial_accounts} accounts with proofs",);
1482
1483        // Verify accounts 1000-1099 don't exist at height 1
1484        for i in 1000..1100 {
1485            let account = make_reward_account(i);
1486            let proof = storage
1487                .read()
1488                .await
1489                .unwrap()
1490                .get_path(snapshot_h1, account)
1491                .await
1492                .unwrap();
1493            assert!(proof.elem().is_none(),);
1494
1495            // Verify non-membership proof
1496            assert!(RewardMerkleTreeV2::non_membership_verify(
1497                reward_tree_h1.commitment(),
1498                account,
1499                proof
1500            )
1501            .unwrap(),);
1502        }
1503        tracing::info!("Height 1: Verified 100 non-membership proofs");
1504
1505        // Verify height 2
1506        let snapshot_h2 =
1507            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(2);
1508
1509        // Accounts 0-99
1510        for i in 0..100 {
1511            let account = make_reward_account(i);
1512            let proof = storage
1513                .read()
1514                .await
1515                .unwrap()
1516                .get_path(snapshot_h2, account)
1517                .await
1518                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1519
1520            let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1521            let actual_reward = proof.elem().expect("account should exist");
1522            assert_eq!(*actual_reward, expected_reward,);
1523
1524            assert!(
1525                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1526                    .unwrap()
1527                    .is_ok(),
1528            );
1529        }
1530
1531        // Accounts 100-999: original rewards
1532        for i in 100..1000 {
1533            let account = make_reward_account(i);
1534            let proof = storage
1535                .read()
1536                .await
1537                .unwrap()
1538                .get_path(snapshot_h2, account)
1539                .await
1540                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1541
1542            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1543            let actual_reward = proof.elem().expect("account should exist");
1544            assert_eq!(*actual_reward, expected_reward,);
1545
1546            assert!(
1547                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1548                    .unwrap()
1549                    .is_ok(),
1550            );
1551        }
1552
1553        // Accounts 1000-1099
1554        // new accounts
1555        for i in 1000..1100 {
1556            let account = make_reward_account(i);
1557            let proof = storage
1558                .read()
1559                .await
1560                .unwrap()
1561                .get_path(snapshot_h2, account)
1562                .await
1563                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1564
1565            let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
1566            let actual_reward = proof.elem().expect("account should exist");
1567            assert_eq!(*actual_reward, expected_reward,);
1568
1569            assert!(
1570                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1571                    .unwrap()
1572                    .is_ok(),
1573            );
1574        }
1575        tracing::info!("Height 2: Verified all 1100 accounts with proofs");
1576
1577        // Verify HEIGHT 3: All accounts
1578        let snapshot_h3 =
1579            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(3);
1580
1581        // Accounts 0-99
1582        for i in 0..100 {
1583            let account = make_reward_account(i);
1584            let proof = storage
1585                .read()
1586                .await
1587                .unwrap()
1588                .get_path(snapshot_h3, account)
1589                .await
1590                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1591
1592            let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1593            let actual_reward = proof.elem().expect("account should exist");
1594            assert_eq!(*actual_reward, expected_reward,);
1595
1596            assert!(
1597                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1598                    .unwrap()
1599                    .is_ok(),
1600            );
1601        }
1602
1603        for i in 100..500 {
1604            let account = make_reward_account(i);
1605            let proof = storage
1606                .read()
1607                .await
1608                .unwrap()
1609                .get_path(snapshot_h3, account)
1610                .await
1611                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1612
1613            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1614            let actual_reward = proof.elem().expect("account should exist");
1615            assert_eq!(*actual_reward, expected_reward,);
1616
1617            assert!(
1618                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1619                    .unwrap()
1620                    .is_ok(),
1621            );
1622        }
1623
1624        // Accounts 500-599
1625        for i in 500..600 {
1626            let account = make_reward_account(i);
1627            let proof = storage
1628                .read()
1629                .await
1630                .unwrap()
1631                .get_path(snapshot_h3, account)
1632                .await
1633                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1634
1635            let expected_reward = RewardAmount::from(((i + 1) * 3000) as u64);
1636            let actual_reward = proof.elem().expect("account should exist");
1637            assert_eq!(*actual_reward, expected_reward,);
1638
1639            assert!(
1640                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1641                    .unwrap()
1642                    .is_ok(),
1643            );
1644        }
1645
1646        // Accounts 600-999
1647        for i in 600..1000 {
1648            let account = make_reward_account(i);
1649            let proof = storage
1650                .read()
1651                .await
1652                .unwrap()
1653                .get_path(snapshot_h3, account)
1654                .await
1655                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1656
1657            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1658            let actual_reward = proof.elem().expect("account should exist");
1659            assert_eq!(*actual_reward, expected_reward,);
1660
1661            assert!(
1662                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1663                    .unwrap()
1664                    .is_ok(),
1665            );
1666        }
1667
1668        // Accounts 1000-1099: new accounts (from h2)
1669        for i in 1000..1100 {
1670            let account = make_reward_account(i);
1671            let proof = storage
1672                .read()
1673                .await
1674                .unwrap()
1675                .get_path(snapshot_h3, account)
1676                .await
1677                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1678
1679            let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
1680            let actual_reward = proof.elem().expect("account should exist");
1681            assert_eq!(*actual_reward, expected_reward,);
1682
1683            assert!(
1684                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1685                    .unwrap()
1686                    .is_ok(),
1687            );
1688        }
1689        tracing::info!("Height 3: Verified all 1100 accounts with proofs");
1690
1691        // Verify non-membership proofs for accounts that never existed
1692        for i in 1100..1110 {
1693            let account = make_reward_account(i);
1694            let proof = storage
1695                .read()
1696                .await
1697                .unwrap()
1698                .get_path(snapshot_h3, account)
1699                .await
1700                .unwrap();
1701
1702            assert!(
1703                proof.elem().is_none(),
1704                "Account {i} should not exist at height 3"
1705            );
1706
1707            assert!(RewardMerkleTreeV2::non_membership_verify(
1708                reward_tree_h3.commitment(),
1709                account,
1710                proof
1711            )
1712            .unwrap(),);
1713        }
1714        tracing::info!("Height 3: Verified 10 non-membership proofs");
1715    }
1716}