hotshot_query_service/data_source/storage/sql/queries/
state.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Merklized state storage implementation for a database query engine.
14
15use std::{
16    collections::{HashMap, HashSet, VecDeque},
17    sync::Arc,
18};
19
20#[cfg(not(feature = "embedded-db"))]
21use anyhow::Context;
22use ark_serialize::CanonicalDeserialize;
23use async_trait::async_trait;
24use futures::stream::TryStreamExt;
25use hotshot_types::traits::node_implementation::NodeType;
26use jf_merkle_tree_compat::{
27    prelude::{MerkleNode, MerkleProof},
28    DigestAlgorithm, MerkleCommitment, ToTraversalPath,
29};
30use sqlx::types::{BitVec, JsonValue};
31
32use super::{
33    super::transaction::{query_as, Transaction, TransactionMode, Write},
34    DecodeError, QueryBuilder,
35};
36use crate::{
37    data_source::storage::{
38        pruning::PrunedHeightStorage,
39        sql::{build_where_in, sqlx::Row},
40        MerklizedStateHeightStorage, MerklizedStateStorage,
41    },
42    merklized_state::{MerklizedState, Snapshot},
43    QueryError, QueryResult,
44};
45
46#[async_trait]
47impl<Mode, Types, State, const ARITY: usize> MerklizedStateStorage<Types, State, ARITY>
48    for Transaction<Mode>
49where
50    Mode: TransactionMode,
51    Types: NodeType,
52    State: MerklizedState<Types, ARITY> + 'static,
53{
54    /// Retrieves a Merkle path from the database
55    async fn get_path(
56        &mut self,
57        snapshot: Snapshot<Types, State, ARITY>,
58        key: State::Key,
59    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
60        let state_type = State::state_type();
61        let tree_height = State::tree_height();
62
63        // Get the traversal path of the index
64        let traversal_path = State::Key::to_traversal_path(&key, tree_height);
65        let (created, merkle_commitment) = self.snapshot_info(snapshot).await?;
66
67        // Get all the nodes in the path to the index.
68        // Order by pos DESC is to return nodes from the leaf to the root
69        let (query, sql) = build_get_path_query(state_type, traversal_path.clone(), created)?;
70        let rows = query.query(&sql).fetch_all(self.as_mut()).await?;
71
72        let nodes: Vec<Node> = rows.into_iter().map(|r| r.into()).collect();
73
74        // insert all the hash ids to a hashset which is used to query later
75        // HashSet is used to avoid duplicates
76        let mut hash_ids = HashSet::new();
77        for node in nodes.iter() {
78            hash_ids.insert(node.hash_id);
79            if let Some(children) = &node.children {
80                let children: Vec<i32> =
81                    serde_json::from_value(children.clone()).map_err(|e| QueryError::Error {
82                        message: format!("Error deserializing 'children' into Vec<i32>: {e}"),
83                    })?;
84                hash_ids.extend(children);
85            }
86        }
87
88        // Find all the hash values and create a hashmap
89        // Hashmap will be used to get the hash value of the nodes children and the node itself.
90        let hashes = if !hash_ids.is_empty() {
91            let (query, sql) = build_where_in("SELECT id, value FROM hash", "id", hash_ids)?;
92            query
93                .query_as(&sql)
94                .fetch(self.as_mut())
95                .try_collect::<HashMap<i32, Vec<u8>>>()
96                .await?
97        } else {
98            HashMap::new()
99        };
100
101        let mut proof_path = VecDeque::with_capacity(State::tree_height());
102        for Node {
103            hash_id,
104            children,
105            children_bitvec,
106            idx,
107            entry,
108            ..
109        } in nodes.iter()
110        {
111            {
112                let value = hashes.get(hash_id).ok_or(QueryError::Error {
113                    message: format!("node's value references non-existent hash {hash_id}"),
114                })?;
115
116                match (children, children_bitvec, idx, entry) {
117                    // If the row has children then its a branch
118                    (Some(children), Some(children_bitvec), None, None) => {
119                        let children: Vec<i32> =
120                            serde_json::from_value(children.clone()).map_err(|e| {
121                                QueryError::Error {
122                                    message: format!(
123                                        "Error deserializing 'children' into Vec<i32>: {e}"
124                                    ),
125                                }
126                            })?;
127                        let mut children = children.iter();
128
129                        // Reconstruct the Children MerkleNodes from storage.
130                        // Children bit_vec is used to create forgotten  or empty node
131                        let child_nodes = children_bitvec
132                            .iter()
133                            .map(|bit| {
134                                if bit {
135                                    let hash_id = children.next().ok_or(QueryError::Error {
136                                        message: "node has fewer children than set bits".into(),
137                                    })?;
138                                    let value = hashes.get(hash_id).ok_or(QueryError::Error {
139                                        message: format!(
140                                            "node's child references non-existent hash {hash_id}"
141                                        ),
142                                    })?;
143                                    Ok(Arc::new(MerkleNode::ForgettenSubtree {
144                                        value: State::T::deserialize_compressed(value.as_slice())
145                                            .decode_error("malformed merkle node value")?,
146                                    }))
147                                } else {
148                                    Ok(Arc::new(MerkleNode::Empty))
149                                }
150                            })
151                            .collect::<QueryResult<Vec<_>>>()?;
152                        // Use the Children merkle nodes to reconstruct the branch node
153                        proof_path.push_back(MerkleNode::Branch {
154                            value: State::T::deserialize_compressed(value.as_slice())
155                                .decode_error("malformed merkle node value")?,
156                            children: child_nodes,
157                        });
158                    },
159                    // If it has an entry, it's a leaf
160                    (None, None, Some(index), Some(entry)) => {
161                        proof_path.push_back(MerkleNode::Leaf {
162                            value: State::T::deserialize_compressed(value.as_slice())
163                                .decode_error("malformed merkle node value")?,
164                            pos: serde_json::from_value(index.clone())
165                                .decode_error("malformed merkle node index")?,
166                            elem: serde_json::from_value(entry.clone())
167                                .decode_error("malformed merkle element")?,
168                        });
169                    },
170                    // Otherwise, it's empty.
171                    (None, None, Some(_), None) => {
172                        proof_path.push_back(MerkleNode::Empty);
173                    },
174                    _ => {
175                        return Err(QueryError::Error {
176                            message: "Invalid type of merkle node found".to_string(),
177                        });
178                    },
179                }
180            }
181        }
182
183        // Reconstruct the merkle commitment from the path
184        let init = if let Some(MerkleNode::Leaf { value, .. }) = proof_path.front() {
185            *value
186        } else {
187            // If the path ends in a branch (or, as a special case, if the path and thus the entire
188            // tree is empty), we are looking up an entry that is not present in the tree. We always
189            // store all the nodes on all the paths to all the entries in the tree, so the only
190            // nodes we could be missing are empty nodes from unseen entries. Thus, we can
191            // reconstruct what the path should be by prepending empty nodes.
192            while proof_path.len() <= State::tree_height() {
193                proof_path.push_front(MerkleNode::Empty);
194            }
195            State::T::default()
196        };
197        let commitment_from_path = traversal_path
198            .iter()
199            .zip(proof_path.iter().skip(1))
200            .try_fold(init, |val, (branch, node)| -> QueryResult<State::T> {
201                match node {
202                    MerkleNode::Branch { value: _, children } => {
203                        let data = children
204                            .iter()
205                            .map(|node| match node.as_ref() {
206                                MerkleNode::ForgettenSubtree { value } => Ok(*value),
207                                MerkleNode::Empty => Ok(State::T::default()),
208                                _ => Err(QueryError::Error {
209                                    message: "Invalid child node".to_string(),
210                                }),
211                            })
212                            .collect::<QueryResult<Vec<_>>>()?;
213
214                        if data[*branch] != val {
215                            // This can only happen if data is missing: we have an old version of
216                            // one of the nodes in the path, which is why it is not matching up with
217                            // its parent.
218                            tracing::warn!(
219                                ?key,
220                                parent = ?data[*branch],
221                                child = ?val,
222                                branch = %*branch,
223                                %created,
224                                %merkle_commitment,
225                                "missing data in merklized state; parent-child mismatch",
226                            );
227                            return Err(QueryError::Missing);
228                        }
229
230                        State::Digest::digest(&data).map_err(|err| QueryError::Error {
231                            message: format!("failed to update digest: {err:#}"),
232                        })
233                    },
234                    MerkleNode::Empty => Ok(init),
235                    _ => Err(QueryError::Error {
236                        message: "Invalid type of Node in the proof".to_string(),
237                    }),
238                }
239            })?;
240
241        if commitment_from_path != merkle_commitment.digest() {
242            return Err(QueryError::Error {
243                message: format!(
244                    "Commitment calculated from merkle path ({commitment_from_path:?}) does not \
245                     match the commitment in the header ({:?})",
246                    merkle_commitment.digest()
247                ),
248            });
249        }
250
251        Ok(MerkleProof {
252            pos: key,
253            proof: proof_path.into(),
254        })
255    }
256}
257
258#[async_trait]
259impl<Mode: TransactionMode> MerklizedStateHeightStorage for Transaction<Mode> {
260    async fn get_last_state_height(&mut self) -> QueryResult<usize> {
261        let Some((height,)) = query_as::<(i64,)>("SELECT height from last_merklized_state_height")
262            .fetch_optional(self.as_mut())
263            .await?
264        else {
265            return Ok(0);
266        };
267        Ok(height as usize)
268    }
269}
270
271impl<Mode: TransactionMode> Transaction<Mode> {
272    /// Get information identifying a [`Snapshot`].
273    ///
274    /// If the given snapshot is known to the database, this function returns
275    /// * The block height at which the snapshot was created
276    /// * A digest of the Merkle commitment to the snapshotted state
277    async fn snapshot_info<Types, State, const ARITY: usize>(
278        &mut self,
279        snapshot: Snapshot<Types, State, ARITY>,
280    ) -> QueryResult<(i64, State::Commit)>
281    where
282        Types: NodeType,
283        State: MerklizedState<Types, ARITY>,
284    {
285        let header_state_commitment_field = State::header_state_commitment_field();
286
287        let (created, commit) = match snapshot {
288            Snapshot::Commit(commit) => {
289                // Get the block height using the merkle commitment. It is possible that multiple
290                // headers will have the same state commitment. In this case we don't care which
291                // height we get, since any query against equivalent states will yield equivalent
292                // results, regardless of which block the state is from. Thus, we can make this
293                // query fast with `LIMIT 1` and no `ORDER BY`.
294                let (height,) = query_as(&format!(
295                    "SELECT height
296                       FROM header
297                      WHERE {header_state_commitment_field} = $1
298                      LIMIT 1"
299                ))
300                .bind(commit.to_string())
301                .fetch_one(self.as_mut())
302                .await?;
303
304                (height, commit)
305            },
306            Snapshot::Index(created) => {
307                let created = created as i64;
308                let (commit,) = query_as::<(String,)>(&format!(
309                    "SELECT {header_state_commitment_field} AS root_commitment
310                       FROM header
311                      WHERE height = $1
312                      LIMIT 1"
313                ))
314                .bind(created)
315                .fetch_one(self.as_mut())
316                .await?;
317                let commit = serde_json::from_value(commit.into())
318                    .decode_error("malformed state commitment")?;
319                (created, commit)
320            },
321        };
322
323        // Make sure the requested snapshot is up to date.
324        let height = self.get_last_state_height().await?;
325
326        if height < (created as usize) {
327            return Err(QueryError::NotFound);
328        }
329
330        let pruned_height = self
331            .load_pruned_height()
332            .await
333            .map_err(|e| QueryError::Error {
334                message: format!("failed to load pruned height: {e}"),
335            })?;
336
337        if pruned_height.is_some_and(|h| height <= h as usize) {
338            return Err(QueryError::NotFound);
339        }
340
341        Ok((created, commit))
342    }
343}
344
345// TODO: create a generic upsert function with retries that returns the column
346#[cfg(feature = "embedded-db")]
347pub(crate) fn build_hash_batch_insert(
348    hashes: &[Vec<u8>],
349) -> QueryResult<(QueryBuilder<'_>, String)> {
350    let mut query = QueryBuilder::default();
351    let params = hashes
352        .iter()
353        .map(|hash| Ok(format!("({})", query.bind(hash)?)))
354        .collect::<QueryResult<Vec<String>>>()?;
355    let sql = format!(
356        "INSERT INTO hash(value) values {} ON CONFLICT (value) DO UPDATE SET value = \
357         EXCLUDED.value returning value, id",
358        params.join(",")
359    );
360    Ok((query, sql))
361}
362
363/// Batch insert hashes using UNNEST for large batches (postgres only).
364/// Returns a map from hash bytes to their database IDs.
365#[cfg(not(feature = "embedded-db"))]
366pub(crate) async fn batch_insert_hashes(
367    hashes: Vec<Vec<u8>>,
368    tx: &mut Transaction<Write>,
369) -> QueryResult<HashMap<Vec<u8>, i32>> {
370    if hashes.is_empty() {
371        return Ok(HashMap::new());
372    }
373
374    // Use UNNEST-based batch insert (more efficient and avoids parameter limits)
375    let sql = "INSERT INTO hash(value) SELECT * FROM UNNEST($1::bytea[]) ON CONFLICT (value) DO \
376               UPDATE SET value = EXCLUDED.value RETURNING value, id";
377
378    let result: HashMap<Vec<u8>, i32> = sqlx::query_as(sql)
379        .bind(&hashes)
380        .fetch(tx.as_mut())
381        .try_collect()
382        .await
383        .map_err(|e| QueryError::Error {
384            message: format!("batch hash insert failed: {e}"),
385        })?;
386
387    Ok(result)
388}
389
390/// Type alias for a merkle proof with its traversal path.
391pub(crate) type ProofWithPath<Entry, Key, T, const ARITY: usize> =
392    (MerkleProof<Entry, Key, T, ARITY>, Vec<usize>);
393
394/// Collects nodes and hashes from merkle proofs.
395/// Returns (nodes, hashes) for batch insertion.
396pub(crate) fn collect_nodes_from_proofs<Entry, Key, T, const ARITY: usize>(
397    proofs: &[ProofWithPath<Entry, Key, T, ARITY>],
398) -> QueryResult<(Vec<NodeWithHashes>, HashSet<Vec<u8>>)>
399where
400    Entry: jf_merkle_tree_compat::Element + serde::Serialize,
401    Key: jf_merkle_tree_compat::Index + serde::Serialize,
402    T: jf_merkle_tree_compat::NodeValue,
403{
404    let mut nodes = Vec::new();
405    let mut hashes = HashSet::new();
406
407    for (proof, traversal_path) in proofs {
408        let pos = &proof.pos;
409        let path = &proof.proof;
410        let mut trav_path = traversal_path.iter().map(|n| *n as i32);
411
412        for node in path.iter() {
413            match node {
414                MerkleNode::Empty => {
415                    let index =
416                        serde_json::to_value(pos.clone()).map_err(|e| QueryError::Error {
417                            message: format!("malformed merkle position: {e}"),
418                        })?;
419                    let node_path: Vec<i32> = trav_path.clone().rev().collect();
420                    nodes.push((
421                        Node {
422                            path: node_path.into(),
423                            idx: Some(index),
424                            ..Default::default()
425                        },
426                        None,
427                        [0_u8; 32].to_vec(),
428                    ));
429                    hashes.insert([0_u8; 32].to_vec());
430                },
431                MerkleNode::ForgettenSubtree { .. } => {
432                    return Err(QueryError::Error {
433                        message: "Node in the Merkle path contains a forgotten subtree".into(),
434                    });
435                },
436                MerkleNode::Leaf { value, pos, elem } => {
437                    let mut leaf_commit = Vec::new();
438                    value.serialize_compressed(&mut leaf_commit).map_err(|e| {
439                        QueryError::Error {
440                            message: format!("malformed merkle leaf commitment: {e}"),
441                        }
442                    })?;
443
444                    let node_path: Vec<i32> = trav_path.clone().rev().collect();
445
446                    let index =
447                        serde_json::to_value(pos.clone()).map_err(|e| QueryError::Error {
448                            message: format!("malformed merkle position: {e}"),
449                        })?;
450                    let entry = serde_json::to_value(elem).map_err(|e| QueryError::Error {
451                        message: format!("malformed merkle element: {e}"),
452                    })?;
453
454                    nodes.push((
455                        Node {
456                            path: node_path.into(),
457                            idx: Some(index),
458                            entry: Some(entry),
459                            ..Default::default()
460                        },
461                        None,
462                        leaf_commit.clone(),
463                    ));
464
465                    hashes.insert(leaf_commit);
466                },
467                MerkleNode::Branch { value, children } => {
468                    let mut branch_hash = Vec::new();
469                    value.serialize_compressed(&mut branch_hash).map_err(|e| {
470                        QueryError::Error {
471                            message: format!("malformed merkle branch hash: {e}"),
472                        }
473                    })?;
474
475                    let mut children_bitvec = BitVec::new();
476                    let mut children_values = Vec::new();
477                    for child in children {
478                        let child = child.as_ref();
479                        match child {
480                            MerkleNode::Empty => {
481                                children_bitvec.push(false);
482                            },
483                            MerkleNode::Branch { value, .. }
484                            | MerkleNode::Leaf { value, .. }
485                            | MerkleNode::ForgettenSubtree { value } => {
486                                let mut hash = Vec::new();
487                                value.serialize_compressed(&mut hash).map_err(|e| {
488                                    QueryError::Error {
489                                        message: format!("malformed merkle node hash: {e}"),
490                                    }
491                                })?;
492
493                                children_values.push(hash);
494                                children_bitvec.push(true);
495                            },
496                        }
497                    }
498
499                    let node_path: Vec<i32> = trav_path.clone().rev().collect();
500                    nodes.push((
501                        Node {
502                            path: node_path.into(),
503                            children: None,
504                            children_bitvec: Some(children_bitvec),
505                            ..Default::default()
506                        },
507                        Some(children_values.clone()),
508                        branch_hash.clone(),
509                    ));
510                    hashes.insert(branch_hash);
511                    hashes.extend(children_values);
512                },
513            }
514
515            trav_path.next();
516        }
517    }
518
519    Ok((nodes, hashes))
520}
521
522// Represents a row in a state table
523#[derive(Debug, Default, Clone)]
524pub(crate) struct Node {
525    pub(crate) path: JsonValue,
526    pub(crate) created: i64,
527    pub(crate) hash_id: i32,
528    pub(crate) children: Option<JsonValue>,
529    pub(crate) children_bitvec: Option<BitVec>,
530    pub(crate) idx: Option<JsonValue>,
531    pub(crate) entry: Option<JsonValue>,
532}
533
534/// Type alias for node data with optional children hashes and node hash.
535/// Used during batch collection before database insertion.
536pub(crate) type NodeWithHashes = (Node, Option<Vec<Vec<u8>>>, Vec<u8>);
537
538#[cfg(feature = "embedded-db")]
539impl From<sqlx::sqlite::SqliteRow> for Node {
540    fn from(row: sqlx::sqlite::SqliteRow) -> Self {
541        let bit_string: Option<String> = row.get_unchecked("children_bitvec");
542        let children_bitvec: Option<BitVec> =
543            bit_string.map(|b| b.chars().map(|c| c == '1').collect());
544
545        Self {
546            path: row.get_unchecked("path"),
547            created: row.get_unchecked("created"),
548            hash_id: row.get_unchecked("hash_id"),
549            children: row.get_unchecked("children"),
550            children_bitvec,
551            idx: row.get_unchecked("idx"),
552            entry: row.get_unchecked("entry"),
553        }
554    }
555}
556
557#[cfg(not(feature = "embedded-db"))]
558impl From<sqlx::postgres::PgRow> for Node {
559    fn from(row: sqlx::postgres::PgRow) -> Self {
560        Self {
561            path: row.get_unchecked("path"),
562            created: row.get_unchecked("created"),
563            hash_id: row.get_unchecked("hash_id"),
564            children: row.get_unchecked("children"),
565            children_bitvec: row.get_unchecked("children_bitvec"),
566            idx: row.get_unchecked("idx"),
567            entry: row.get_unchecked("entry"),
568        }
569    }
570}
571
572impl Node {
573    pub(crate) async fn upsert(
574        name: &str,
575        nodes: impl IntoIterator<Item = Self>,
576        tx: &mut Transaction<Write>,
577    ) -> anyhow::Result<()> {
578        let nodes: Vec<_> = nodes.into_iter().collect();
579
580        // Use UNNEST-based batch insert for postgres (more efficient and avoids parameter limits)
581        #[cfg(not(feature = "embedded-db"))]
582        return Self::upsert_batch_unnest(name, nodes, tx).await;
583
584        #[cfg(feature = "embedded-db")]
585        {
586            for node_chunk in nodes.chunks(20) {
587                let rows: Vec<_> = node_chunk
588                    .iter()
589                    .map(|n| {
590                        let children_bitvec: Option<String> = n
591                            .children_bitvec
592                            .clone()
593                            .map(|b| b.iter().map(|bit| if bit { '1' } else { '0' }).collect());
594
595                        (
596                            n.path.clone(),
597                            n.created,
598                            n.hash_id,
599                            n.children.clone(),
600                            children_bitvec,
601                            n.idx.clone(),
602                            n.entry.clone(),
603                        )
604                    })
605                    .collect();
606
607                tx.upsert(
608                    name,
609                    [
610                        "path",
611                        "created",
612                        "hash_id",
613                        "children",
614                        "children_bitvec",
615                        "idx",
616                        "entry",
617                    ],
618                    ["path", "created"],
619                    rows,
620                )
621                .await?;
622            }
623            Ok(())
624        }
625    }
626
627    #[cfg(not(feature = "embedded-db"))]
628    async fn upsert_batch_unnest(
629        name: &str,
630        nodes: Vec<Self>,
631        tx: &mut Transaction<Write>,
632    ) -> anyhow::Result<()> {
633        if nodes.is_empty() {
634            return Ok(());
635        }
636
637        // Deduplicate nodes by (path, created) - keep the last occurrence
638        // This is required because UNNEST + ON CONFLICT cannot handle duplicates in the same batch
639        let mut deduped = HashMap::new();
640        for node in nodes {
641            deduped.insert((node.path.to_string(), node.created), node);
642        }
643
644        let mut paths = Vec::with_capacity(deduped.len());
645        let mut createds = Vec::with_capacity(deduped.len());
646        let mut hash_ids = Vec::with_capacity(deduped.len());
647        let mut childrens = Vec::with_capacity(deduped.len());
648        let mut children_bitvecs = Vec::with_capacity(deduped.len());
649        let mut idxs = Vec::with_capacity(deduped.len());
650        let mut entries = Vec::with_capacity(deduped.len());
651
652        for node in deduped.into_values() {
653            paths.push(node.path);
654            createds.push(node.created);
655            hash_ids.push(node.hash_id);
656            childrens.push(node.children);
657            children_bitvecs.push(node.children_bitvec);
658            idxs.push(node.idx);
659            entries.push(node.entry);
660        }
661
662        let sql = format!(
663            r#"
664            INSERT INTO "{name}" (path, created, hash_id, children, children_bitvec, idx, entry)
665            SELECT * FROM UNNEST($1::jsonb[], $2::bigint[], $3::int[], $4::jsonb[], $5::bit varying[], $6::jsonb[], $7::jsonb[])
666            ON CONFLICT (path, created) DO UPDATE SET
667                hash_id = EXCLUDED.hash_id,
668                children = EXCLUDED.children,
669                children_bitvec = EXCLUDED.children_bitvec,
670                idx = EXCLUDED.idx,
671                entry = EXCLUDED.entry
672            "#
673        );
674
675        sqlx::query(&sql)
676            .bind(&paths)
677            .bind(&createds)
678            .bind(&hash_ids)
679            .bind(&childrens)
680            .bind(&children_bitvecs)
681            .bind(&idxs)
682            .bind(&entries)
683            .execute(tx.as_mut())
684            .await
685            .context("batch upsert with UNNEST failed")?;
686
687        Ok(())
688    }
689}
690
691fn build_get_path_query<'q>(
692    table: &'static str,
693    traversal_path: Vec<usize>,
694    created: i64,
695) -> QueryResult<(QueryBuilder<'q>, String)> {
696    let mut query = QueryBuilder::default();
697    let mut traversal_path = traversal_path.into_iter().map(|x| x as i32);
698
699    // We iterate through the path vector skipping the first element after each iteration
700    let len = traversal_path.len();
701    let mut sub_queries = Vec::new();
702
703    query.bind(created)?;
704
705    for _ in 0..=len {
706        let path = traversal_path.clone().rev().collect::<Vec<_>>();
707        let path: serde_json::Value = path.into();
708        let node_path = query.bind(path)?;
709
710        let sub_query = format!(
711            "SELECT * FROM (SELECT * FROM {table} WHERE path = {node_path} AND created <= $1 \
712             ORDER BY created DESC LIMIT 1)",
713        );
714
715        sub_queries.push(sub_query);
716        traversal_path.next();
717    }
718
719    let mut sql: String = sub_queries.join(" UNION ");
720
721    sql = format!("SELECT * FROM ({sql}) as t ");
722
723    // PostgreSQL already orders JSON arrays by length, so no additional function is needed
724    // For SQLite, `length()` is used to sort by length.
725    if cfg!(feature = "embedded-db") {
726        sql.push_str("ORDER BY length(t.path) DESC");
727    } else {
728        sql.push_str("ORDER BY t.path DESC");
729    }
730
731    Ok((query, sql))
732}
733
734#[cfg(test)]
735mod test {
736    use futures::stream::StreamExt;
737    use jf_merkle_tree_compat::{
738        universal_merkle_tree::UniversalMerkleTree, LookupResult, MerkleTreeScheme,
739        UniversalMerkleTreeScheme,
740    };
741    use rand::{seq::IteratorRandom, RngCore};
742
743    use super::*;
744    use crate::{
745        data_source::{
746            storage::sql::{testing::TmpDb, *},
747            VersionedDataSource,
748        },
749        merklized_state::UpdateStateData,
750        testing::mocks::{MockMerkleTree, MockTypes},
751    };
752
753    #[test_log::test(tokio::test(flavor = "multi_thread"))]
754    async fn test_merklized_state_storage() {
755        // In this test we insert some entries into the tree and update the database
756        // Each entry's merkle path is compared with the path from the tree
757
758        let db = TmpDb::init().await;
759        let storage = SqlStorage::connect(db.config(), StorageConnectionType::Query)
760            .await
761            .unwrap();
762
763        // define a test tree
764        let mut test_tree: UniversalMerkleTree<_, _, _, 8, _> =
765            MockMerkleTree::new(MockMerkleTree::tree_height());
766        let block_height = 1;
767
768        // insert some entries into the tree and the header table
769        // Header table is used the get_path query to check if the header exists for the block height.
770        let mut tx = storage.write().await.unwrap();
771        for i in 0..27 {
772            test_tree.update(i, i).unwrap();
773
774            // data field of the header
775            let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
776            tx.upsert(
777                "header",
778                ["height", "hash", "payload_hash", "timestamp", "data"],
779                ["height"],
780                [(
781                    block_height as i64,
782                    format!("randomHash{i}"),
783                    "t".to_string(),
784                    0,
785                    test_data,
786                )],
787            )
788            .await
789            .unwrap();
790            // proof for the index from the tree
791            let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
792            // traversal path for the index.
793            let traversal_path =
794                <usize as ToTraversalPath<8>>::to_traversal_path(&i, test_tree.height());
795
796            UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
797                &mut tx,
798                proof.clone(),
799                traversal_path.clone(),
800                block_height as u64,
801            )
802            .await
803            .expect("failed to insert nodes");
804        }
805        // update saved state height
806        UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
807            .await
808            .unwrap();
809        tx.commit().await.unwrap();
810
811        //Get the path and check if it matches the lookup
812        for i in 0..27 {
813            // Query the path for the index
814            let mut tx = storage.read().await.unwrap();
815            let merkle_path = tx
816                .get_path(
817                    Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
818                    i,
819                )
820                .await
821                .unwrap();
822
823            let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
824
825            tracing::info!("merkle path {:?}", merkle_path);
826
827            // merkle path from the storage should match the path from test tree
828            assert_eq!(merkle_path, proof.clone(), "merkle paths mismatch");
829        }
830
831        // Get the proof of index 0 with bh = 1
832        let (_, proof_bh_1) = test_tree.lookup(0).expect_ok().unwrap();
833        // Inserting Index 0 again with created (bh) = 2
834        // Our database should then have 2 versions of this leaf node
835        // Update the node so that proof is also updated
836        test_tree.update(0, 99).unwrap();
837        // Also update the merkle commitment in the header
838
839        // data field of the header
840        let mut tx = storage.write().await.unwrap();
841        let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
842        tx.upsert(
843            "header",
844            ["height", "hash", "payload_hash", "timestamp", "data"],
845            ["height"],
846            [(
847                2i64,
848                "randomstring".to_string(),
849                "t".to_string(),
850                0,
851                test_data,
852            )],
853        )
854        .await
855        .unwrap();
856        let (_, proof_bh_2) = test_tree.lookup(0).expect_ok().unwrap();
857        // traversal path for the index.
858        let traversal_path =
859            <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
860        // Update storage to insert a new version of this code
861
862        UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
863            &mut tx,
864            proof_bh_2.clone(),
865            traversal_path.clone(),
866            2,
867        )
868        .await
869        .expect("failed to insert nodes");
870        // update saved state height
871        UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 2)
872            .await
873            .unwrap();
874        tx.commit().await.unwrap();
875
876        let node_path = traversal_path
877            .into_iter()
878            .rev()
879            .map(|n| n as i32)
880            .collect::<Vec<_>>();
881
882        // Find all the nodes of Index 0 in table
883        let mut tx = storage.read().await.unwrap();
884        let rows = query("SELECT * from test_tree where path = $1 ORDER BY created")
885            .bind(serde_json::to_value(node_path).unwrap())
886            .fetch(tx.as_mut());
887
888        let nodes: Vec<Node> = rows.map(|res| res.unwrap().into()).collect().await;
889        // There should be only 2 versions of this node
890        assert!(nodes.len() == 2, "incorrect number of nodes");
891        assert_eq!(nodes[0].created, 1, "wrong block height");
892        assert_eq!(nodes[1].created, 2, "wrong block height");
893
894        // Now we can have two snapshots for Index 0
895        // One with created = 1 and other with 2
896        // Query snapshot with created = 2
897
898        let path_with_bh_2 = storage
899            .read()
900            .await
901            .unwrap()
902            .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2), 0)
903            .await
904            .unwrap();
905
906        assert_eq!(path_with_bh_2, proof_bh_2);
907        let path_with_bh_1 = storage
908            .read()
909            .await
910            .unwrap()
911            .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(1), 0)
912            .await
913            .unwrap();
914        assert_eq!(path_with_bh_1, proof_bh_1);
915    }
916
917    #[test_log::test(tokio::test(flavor = "multi_thread"))]
918    async fn test_merklized_state_non_membership_proof() {
919        // This test updates the Merkle tree with a new entry and inserts the corresponding Merkle nodes into the database with created = 1.
920        // A Merkle node is then deleted from the tree.
921        // The database is then updated to reflect the deletion of the entry with a created (block height) of 2
922        // As the leaf node becomes a non-member, we do a universal lookup to obtain its non-membership proof path.
923        // It is expected that the path retrieved from the tree matches the path obtained from the database.
924
925        let db = TmpDb::init().await;
926        let storage = SqlStorage::connect(db.config(), StorageConnectionType::Query)
927            .await
928            .unwrap();
929
930        // define a test tree
931        let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
932        let block_height = 1;
933        //insert an entry into the tree
934        test_tree.update(0, 0).unwrap();
935        let commitment = test_tree.commitment();
936
937        let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(commitment).unwrap()});
938        // insert the header with merkle commitment
939        let mut tx = storage.write().await.unwrap();
940        tx.upsert(
941            "header",
942            ["height", "hash", "payload_hash", "timestamp", "data"],
943            ["height"],
944            [(
945                block_height as i64,
946                "randomString".to_string(),
947                "t".to_string(),
948                0,
949                test_data,
950            )],
951        )
952        .await
953        .unwrap();
954        // proof for the index from the tree
955        let (_, proof_before_remove) = test_tree.lookup(0).expect_ok().unwrap();
956        // traversal path for the index.
957        let traversal_path =
958            <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
959        // insert merkle nodes
960        UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
961            &mut tx,
962            proof_before_remove.clone(),
963            traversal_path.clone(),
964            block_height as u64,
965        )
966        .await
967        .expect("failed to insert nodes");
968        // update saved state height
969        UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
970            .await
971            .unwrap();
972        tx.commit().await.unwrap();
973        // the path from the db and and tree should match
974        let merkle_path = storage
975            .read()
976            .await
977            .unwrap()
978            .get_path(
979                Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
980                0,
981            )
982            .await
983            .unwrap();
984
985        // merkle path from the storage should match the path from test tree
986        assert_eq!(
987            merkle_path,
988            proof_before_remove.clone(),
989            "merkle paths mismatch"
990        );
991
992        //Deleting the index 0
993        test_tree.remove(0).expect("failed to delete index 0 ");
994
995        // Update the database with the proof
996        // Created = 2 in this case
997        let proof_after_remove = test_tree.universal_lookup(0).expect_not_found().unwrap();
998
999        let mut tx = storage.write().await.unwrap();
1000        UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1001            &mut tx,
1002            proof_after_remove.clone(),
1003            traversal_path.clone(),
1004            2_u64,
1005        )
1006        .await
1007        .expect("failed to insert nodes");
1008        // Insert the new header
1009        tx.upsert(
1010                "header",
1011                ["height", "hash", "payload_hash", "timestamp", "data"],
1012                ["height"],
1013                [(
1014                    2i64,
1015                    "randomString2".to_string(),
1016                    "t".to_string(),
1017                    0,
1018                    serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()}),
1019                )],
1020            )
1021            .await
1022            .unwrap();
1023        // update saved state height
1024        UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 2)
1025            .await
1026            .unwrap();
1027        tx.commit().await.unwrap();
1028        // Get non membership proof
1029        let non_membership_path = storage
1030            .read()
1031            .await
1032            .unwrap()
1033            .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2_u64), 0)
1034            .await
1035            .unwrap();
1036        // Assert that the paths from the db and the tree are equal
1037        assert_eq!(
1038            non_membership_path, proof_after_remove,
1039            "merkle paths dont match"
1040        );
1041
1042        // Query the membership proof i.e proof with created = 1
1043        // This proof should be equal to the proof before deletion
1044        // Assert that the paths from the db and the tree are equal
1045
1046        let proof_bh_1 = storage
1047            .read()
1048            .await
1049            .unwrap()
1050            .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(1_u64), 0)
1051            .await
1052            .unwrap();
1053        assert_eq!(proof_bh_1, proof_before_remove, "merkle paths dont match");
1054    }
1055
1056    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1057    async fn test_merklized_state_non_membership_proof_unseen_entry() {
1058        let db = TmpDb::init().await;
1059        let storage = SqlStorage::connect(db.config(), StorageConnectionType::Query)
1060            .await
1061            .unwrap();
1062
1063        // define a test tree
1064        let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1065
1066        // For each case (where the root is empty, a leaf, and a branch) test getting a
1067        // non-membership proof for an entry node the database has never seen.
1068        for i in 0..=2 {
1069            tracing::info!(i, ?test_tree, "testing non-membership proof");
1070            let mut tx = storage.write().await.unwrap();
1071
1072            // Insert a dummy header
1073            tx.upsert(
1074                "header",
1075                ["height", "hash", "payload_hash", "timestamp", "data"],
1076                ["height"],
1077                [(
1078                    i as i64,
1079                    format!("hash{i}"),
1080                    "t".to_string(),
1081                    0,
1082                    serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()})
1083                )],
1084            )
1085            .await
1086            .unwrap();
1087            // update saved state height
1088            UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, i)
1089                .await
1090                .unwrap();
1091            tx.commit().await.unwrap();
1092
1093            // get a non-membership proof for a never-before-seen node.
1094            let proof = storage
1095                .read()
1096                .await
1097                .unwrap()
1098                .get_path(
1099                    Snapshot::<MockTypes, MockMerkleTree, 8>::Index(i as u64),
1100                    100,
1101                )
1102                .await
1103                .unwrap();
1104            assert_eq!(proof.elem(), None);
1105
1106            assert!(
1107                MockMerkleTree::non_membership_verify(test_tree.commitment(), 100, proof).unwrap()
1108            );
1109
1110            // insert an additional node into the tree.
1111            test_tree.update(i, i).unwrap();
1112            let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
1113            let traversal_path = ToTraversalPath::<8>::to_traversal_path(&i, test_tree.height());
1114            let mut tx = storage.write().await.unwrap();
1115            UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1116                &mut tx,
1117                proof,
1118                traversal_path,
1119                (i + 1) as u64,
1120            )
1121            .await
1122            .expect("failed to insert nodes");
1123            tx.commit().await.unwrap();
1124        }
1125    }
1126
1127    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1128    async fn test_merklized_storage_with_commit() {
1129        // This test insert a merkle path into the database and queries the path using the merkle commitment
1130
1131        let db = TmpDb::init().await;
1132        let storage = SqlStorage::connect(db.config(), StorageConnectionType::Query)
1133            .await
1134            .unwrap();
1135
1136        // define a test tree
1137        let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1138        let block_height = 1;
1139        //insert an entry into the tree
1140        test_tree.update(0, 0).unwrap();
1141        let commitment = test_tree.commitment();
1142
1143        let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(commitment).unwrap()});
1144        // insert the header with merkle commitment
1145        let mut tx = storage.write().await.unwrap();
1146        tx.upsert(
1147            "header",
1148            ["height", "hash", "payload_hash", "timestamp", "data"],
1149            ["height"],
1150            [(
1151                block_height as i64,
1152                "randomString".to_string(),
1153                "t".to_string(),
1154                0,
1155                test_data,
1156            )],
1157        )
1158        .await
1159        .unwrap();
1160        // proof for the index from the tree
1161        let (_, proof) = test_tree.lookup(0).expect_ok().unwrap();
1162        // traversal path for the index.
1163        let traversal_path =
1164            <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
1165        // insert merkle nodes
1166        UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1167            &mut tx,
1168            proof.clone(),
1169            traversal_path.clone(),
1170            block_height as u64,
1171        )
1172        .await
1173        .expect("failed to insert nodes");
1174        // update saved state height
1175        UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
1176            .await
1177            .unwrap();
1178        tx.commit().await.unwrap();
1179
1180        let merkle_proof = storage
1181            .read()
1182            .await
1183            .unwrap()
1184            .get_path(Snapshot::<_, MockMerkleTree, 8>::Commit(commitment), 0)
1185            .await
1186            .unwrap();
1187
1188        let (_, proof) = test_tree.lookup(0).expect_ok().unwrap();
1189
1190        assert_eq!(merkle_proof, proof.clone(), "merkle paths mismatch");
1191    }
1192    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1193    async fn test_merklized_state_missing_state() {
1194        // This test checks that header commitment matches the root hash.
1195        // For this, the header merkle root commitment field is not updated, which should result in an error
1196        // The full merkle path verification is also done by recomputing the root hash
1197        // An index and its corresponding merkle nodes with created (bh) = 1 are inserted.
1198        // The entry of the index is updated, and the updated nodes are inserted with created (bh) = 2.
1199        // A node which is in the traversal path with bh = 2 is deleted, so the get_path should return an error as an older version of one of the nodes is used.
1200
1201        let db = TmpDb::init().await;
1202        let storage = SqlStorage::connect(db.config(), StorageConnectionType::Query)
1203            .await
1204            .unwrap();
1205
1206        // define a test tree
1207        let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1208        let block_height = 1;
1209        //insert an entry into the tree
1210
1211        let mut tx = storage.write().await.unwrap();
1212        for i in 0..27 {
1213            test_tree.update(i, i).unwrap();
1214            let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
1215            // insert the header with merkle commitment
1216            tx.upsert(
1217                "header",
1218                ["height", "hash", "payload_hash", "timestamp", "data"],
1219                ["height"],
1220                [(
1221                    block_height as i64,
1222                    format!("rarndomString{i}"),
1223                    "t".to_string(),
1224                    0,
1225                    test_data,
1226                )],
1227            )
1228            .await
1229            .unwrap();
1230            // proof for the index from the tree
1231            let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
1232            // traversal path for the index.
1233            let traversal_path =
1234                <usize as ToTraversalPath<8>>::to_traversal_path(&i, test_tree.height());
1235            // insert merkle nodes
1236            UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1237                &mut tx,
1238                proof.clone(),
1239                traversal_path.clone(),
1240                block_height as u64,
1241            )
1242            .await
1243            .expect("failed to insert nodes");
1244            // update saved state height
1245            UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
1246                .await
1247                .unwrap();
1248        }
1249
1250        test_tree.update(1, 100).unwrap();
1251        //insert updated merkle path without updating the header
1252        let traversal_path =
1253            <usize as ToTraversalPath<8>>::to_traversal_path(&1, test_tree.height());
1254        let (_, proof) = test_tree.lookup(1).expect_ok().unwrap();
1255
1256        // insert merkle nodes
1257        UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1258            &mut tx,
1259            proof.clone(),
1260            traversal_path.clone(),
1261            block_height as u64,
1262        )
1263        .await
1264        .expect("failed to insert nodes");
1265        tx.commit().await.unwrap();
1266
1267        let merkle_path = storage
1268            .read()
1269            .await
1270            .unwrap()
1271            .get_path(
1272                Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
1273                1,
1274            )
1275            .await;
1276        assert!(merkle_path.is_err());
1277
1278        let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
1279        // insert the header with merkle commitment
1280        let mut tx = storage.write().await.unwrap();
1281        tx.upsert(
1282            "header",
1283            ["height", "hash", "payload_hash", "timestamp", "data"],
1284            ["height"],
1285            [(
1286                block_height as i64,
1287                "randomStringgg".to_string(),
1288                "t".to_string(),
1289                0,
1290                test_data,
1291            )],
1292        )
1293        .await
1294        .unwrap();
1295        tx.commit().await.unwrap();
1296        // Querying the path again
1297        let merkle_proof = storage
1298            .read()
1299            .await
1300            .unwrap()
1301            .get_path(
1302                Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
1303                1,
1304            )
1305            .await
1306            .unwrap();
1307        assert_eq!(merkle_proof, proof, "path dont match");
1308
1309        // Update the tree again for index 0 with created (bh) = 2
1310        // Delete one of the node in the traversal path
1311        test_tree.update(1, 200).unwrap();
1312
1313        let (_, proof) = test_tree.lookup(1).expect_ok().unwrap();
1314        let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
1315
1316        // insert the header with merkle commitment
1317        let mut tx = storage.write().await.unwrap();
1318        tx.upsert(
1319            "header",
1320            ["height", "hash", "payload_hash", "timestamp", "data"],
1321            ["height"],
1322            [(
1323                2i64,
1324                "randomHashString".to_string(),
1325                "t".to_string(),
1326                0,
1327                test_data,
1328            )],
1329        )
1330        .await
1331        .unwrap();
1332        UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1333            &mut tx,
1334            proof.clone(),
1335            traversal_path.clone(),
1336            2_u64,
1337        )
1338        .await
1339        .expect("failed to insert nodes");
1340
1341        // Deleting one internal node
1342        let node_path = traversal_path
1343            .iter()
1344            .skip(1)
1345            .rev()
1346            .map(|n| *n as i32)
1347            .collect::<Vec<_>>();
1348        tx.execute(
1349            query(&format!(
1350                "DELETE FROM {} WHERE created = 2 and path = $1",
1351                MockMerkleTree::state_type()
1352            ))
1353            .bind(serde_json::to_value(node_path).unwrap()),
1354        )
1355        .await
1356        .expect("failed to delete internal node");
1357        tx.commit().await.unwrap();
1358
1359        let merkle_path = storage
1360            .read()
1361            .await
1362            .unwrap()
1363            .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2_u64), 1)
1364            .await;
1365
1366        assert!(merkle_path.is_err());
1367    }
1368
1369    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1370    async fn test_merklized_state_snapshot() {
1371        let db = TmpDb::init().await;
1372        let storage = SqlStorage::connect(db.config(), StorageConnectionType::Query)
1373            .await
1374            .unwrap();
1375
1376        // Define a test tree
1377        let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1378
1379        // We will sample random keys as u32. This is a value that is not a valid u32 and thus is a
1380        // key we will never insert into the tree.
1381        const RESERVED_KEY: usize = (u32::MAX as usize) + 1;
1382
1383        // Randomly insert and delete some entries. For each entry we insert, we also keep track of
1384        // whether the entry should be in the tree using a HashMap.
1385        #[tracing::instrument(skip(tree, expected))]
1386        fn randomize(tree: &mut MockMerkleTree, expected: &mut HashMap<usize, Option<usize>>) {
1387            let mut rng = rand::thread_rng();
1388            tracing::info!("randomizing tree");
1389
1390            for _ in 0..50 {
1391                // We flip a coin to decide whether to insert or delete, unless the tree is empty,
1392                // in which case we can only insert.
1393                if !expected.values().any(|v| v.is_some()) || rng.next_u32().is_multiple_of(2) {
1394                    // Insert.
1395                    let key = rng.next_u32() as usize;
1396                    let val = rng.next_u32() as usize;
1397                    tracing::info!(key, val, "inserting");
1398
1399                    tree.update(key, val).unwrap();
1400                    expected.insert(key, Some(val));
1401                } else {
1402                    // Delete.
1403                    let key = expected
1404                        .iter()
1405                        .filter_map(|(k, v)| if v.is_some() { Some(k) } else { None })
1406                        .choose(&mut rng)
1407                        .unwrap();
1408                    tracing::info!(key, "deleting");
1409
1410                    tree.remove(key).unwrap();
1411                    expected.insert(*key, None);
1412                }
1413            }
1414        }
1415
1416        // Commit the tree to storage.
1417        #[tracing::instrument(skip(storage, tree, expected))]
1418        async fn store(
1419            storage: &SqlStorage,
1420            tree: &MockMerkleTree,
1421            expected: &HashMap<usize, Option<usize>>,
1422            block_height: u64,
1423        ) {
1424            tracing::info!("persisting tree");
1425            let mut tx = storage.write().await.unwrap();
1426
1427            for key in expected.keys() {
1428                let proof = match tree.universal_lookup(key) {
1429                    LookupResult::Ok(_, proof) => proof,
1430                    LookupResult::NotFound(proof) => proof,
1431                    LookupResult::NotInMemory => panic!("failed to find key {key}"),
1432                };
1433                let traversal_path = ToTraversalPath::<8>::to_traversal_path(key, tree.height());
1434                UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1435                    &mut tx,
1436                    proof,
1437                    traversal_path,
1438                    block_height,
1439                )
1440                .await
1441                .unwrap();
1442            }
1443            // insert the header with merkle commitment
1444            tx
1445            .upsert("header", ["height", "hash", "payload_hash", "timestamp", "data"], ["height"],
1446                [(
1447                    block_height as i64,
1448                    format!("hash{block_height}"),
1449                    "hash".to_string(),
1450                    0i64,
1451                    serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(tree.commitment()).unwrap()}),
1452                )],
1453            )
1454            .await
1455            .unwrap();
1456            UpdateStateData::<MockTypes, MockMerkleTree, 8>::set_last_state_height(
1457                &mut tx,
1458                block_height as usize,
1459            )
1460            .await
1461            .unwrap();
1462            tx.commit().await.unwrap();
1463        }
1464
1465        #[tracing::instrument(skip(storage, tree, expected))]
1466        async fn validate(
1467            storage: &SqlStorage,
1468            tree: &MockMerkleTree,
1469            expected: &HashMap<usize, Option<usize>>,
1470            block_height: u64,
1471        ) {
1472            tracing::info!("validating snapshot");
1473
1474            // Check that we can get a correct path for each key that we touched.
1475            let snapshot = Snapshot::<_, MockMerkleTree, 8>::Index(block_height);
1476
1477            for (key, val) in expected {
1478                let proof = match tree.universal_lookup(key) {
1479                    LookupResult::Ok(_, proof) => proof,
1480                    LookupResult::NotFound(proof) => proof,
1481                    LookupResult::NotInMemory => panic!("failed to find key {key}"),
1482                };
1483                assert_eq!(
1484                    proof,
1485                    storage
1486                        .read()
1487                        .await
1488                        .unwrap()
1489                        .get_path(snapshot, *key)
1490                        .await
1491                        .unwrap()
1492                );
1493                assert_eq!(val.as_ref(), proof.elem());
1494                // Check path is valid for test_tree
1495                if val.is_some() {
1496                    MockMerkleTree::verify(tree.commitment(), key, proof)
1497                        .unwrap()
1498                        .unwrap();
1499                } else {
1500                    assert!(
1501                        MockMerkleTree::non_membership_verify(tree.commitment(), key, proof)
1502                            .unwrap()
1503                    );
1504                }
1505            }
1506
1507            // Check that we can even get a non-membership proof for a key that we never touched.
1508            let proof = match tree.universal_lookup(RESERVED_KEY) {
1509                LookupResult::Ok(_, proof) => proof,
1510                LookupResult::NotFound(proof) => proof,
1511                LookupResult::NotInMemory => panic!("failed to find reserved key {RESERVED_KEY}"),
1512            };
1513            assert_eq!(
1514                proof,
1515                storage
1516                    .read()
1517                    .await
1518                    .unwrap()
1519                    .get_path(snapshot, RESERVED_KEY)
1520                    .await
1521                    .unwrap()
1522            );
1523            assert_eq!(proof.elem(), None);
1524            // Check path is valid for test_tree
1525            assert!(
1526                MockMerkleTree::non_membership_verify(tree.commitment(), RESERVED_KEY, proof)
1527                    .unwrap()
1528            );
1529        }
1530
1531        // Create a randomized Merkle tree.
1532        let mut expected = HashMap::<usize, Option<usize>>::new();
1533        randomize(&mut test_tree, &mut expected);
1534
1535        // Commit the randomized tree to storage.
1536        store(&storage, &test_tree, &expected, 1).await;
1537        validate(&storage, &test_tree, &expected, 1).await;
1538
1539        // Make random edits and commit another snapshot.
1540        let mut expected2 = expected.clone();
1541        let mut test_tree2 = test_tree.clone();
1542        randomize(&mut test_tree2, &mut expected2);
1543        store(&storage, &test_tree2, &expected2, 2).await;
1544        validate(&storage, &test_tree2, &expected2, 2).await;
1545
1546        // Ensure the original snapshot is still valid.
1547        validate(&storage, &test_tree, &expected, 1).await;
1548    }
1549
1550    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1551    async fn test_merklized_state_missing_leaf() {
1552        // Check that if a leaf is missing but its ancestors are present/key is in the tree, we
1553        // catch it rather than interpreting the entry as an empty node by default. Note that this
1554        // scenario should be impossible in normal usage, since we never store or delete partial
1555        // paths. But we should never return an invalid proof even in extreme cases like database
1556        // corruption.
1557
1558        for tree_size in 1..=3 {
1559            let db = TmpDb::init().await;
1560            let storage = SqlStorage::connect(db.config(), StorageConnectionType::Query)
1561                .await
1562                .unwrap();
1563
1564            // Define a test tree
1565            let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1566            for i in 0..tree_size {
1567                test_tree.update(i, i).unwrap();
1568            }
1569
1570            let mut tx = storage.write().await.unwrap();
1571
1572            // Insert a header with the tree commitment.
1573            tx.upsert(
1574                "header",
1575                ["height", "hash", "payload_hash", "timestamp", "data"],
1576                ["height"],
1577                [(
1578                    0i64,
1579                    "hash".to_string(),
1580                    "hash".to_string(),
1581                    0,
1582                    serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()}),
1583                )],
1584            )
1585            .await
1586            .unwrap();
1587
1588            // Insert Merkle nodes.
1589            for i in 0..tree_size {
1590                let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1591                let traversal_path =
1592                    ToTraversalPath::<8>::to_traversal_path(&i, test_tree.height());
1593                UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1594                    &mut tx,
1595                    proof,
1596                    traversal_path,
1597                    0,
1598                )
1599                .await
1600                .unwrap();
1601            }
1602            UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 0)
1603                .await
1604                .unwrap();
1605            tx.commit().await.unwrap();
1606
1607            // Test that we can get all the entries.
1608            let snapshot = Snapshot::<MockTypes, MockMerkleTree, 8>::Index(0);
1609            for i in 0..tree_size {
1610                let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1611                assert_eq!(
1612                    proof,
1613                    storage
1614                        .read()
1615                        .await
1616                        .unwrap()
1617                        .get_path(snapshot, i)
1618                        .await
1619                        .unwrap()
1620                );
1621                assert_eq!(*proof.elem().unwrap(), i);
1622            }
1623
1624            // Now delete the leaf node for the last entry we inserted, corrupting the database.
1625            let index = serde_json::to_value(tree_size - 1).unwrap();
1626            let mut tx = storage.write().await.unwrap();
1627
1628            tx.execute(
1629                query(&format!(
1630                    "DELETE FROM {} WHERE idx = $1",
1631                    MockMerkleTree::state_type()
1632                ))
1633                .bind(serde_json::to_value(index).unwrap()),
1634            )
1635            .await
1636            .unwrap();
1637            tx.commit().await.unwrap();
1638
1639            // Test that we can still get the entries we didn't delete.
1640            for i in 0..tree_size - 1 {
1641                let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1642                assert_eq!(
1643                    proof,
1644                    storage
1645                        .read()
1646                        .await
1647                        .unwrap()
1648                        .get_path(snapshot, i)
1649                        .await
1650                        .unwrap()
1651                );
1652                assert_eq!(*proof.elem().unwrap(), i);
1653            }
1654
1655            // Looking up the entry we deleted fails, rather than return an invalid path.
1656            let err = storage
1657                .read()
1658                .await
1659                .unwrap()
1660                .get_path(snapshot, tree_size - 1)
1661                .await
1662                .unwrap_err();
1663            assert!(matches!(err, QueryError::Missing));
1664        }
1665    }
1666}