1use std::{
16 collections::{HashMap, HashSet, VecDeque},
17 sync::Arc,
18};
19
20use ark_serialize::CanonicalDeserialize;
21use async_trait::async_trait;
22use futures::stream::TryStreamExt;
23use hotshot_types::traits::node_implementation::NodeType;
24use jf_merkle_tree::{
25 prelude::{MerkleNode, MerkleProof},
26 DigestAlgorithm, MerkleCommitment, ToTraversalPath,
27};
28use sqlx::types::{BitVec, JsonValue};
29
30use super::{
31 super::transaction::{query_as, Transaction, TransactionMode, Write},
32 DecodeError, QueryBuilder,
33};
34use crate::{
35 data_source::storage::{
36 pruning::PrunedHeightStorage,
37 sql::{build_where_in, sqlx::Row},
38 MerklizedStateHeightStorage, MerklizedStateStorage,
39 },
40 merklized_state::{MerklizedState, Snapshot},
41 QueryError, QueryResult,
42};
43
44#[async_trait]
45impl<Mode, Types, State, const ARITY: usize> MerklizedStateStorage<Types, State, ARITY>
46 for Transaction<Mode>
47where
48 Mode: TransactionMode,
49 Types: NodeType,
50 State: MerklizedState<Types, ARITY> + 'static,
51{
52 async fn get_path(
54 &mut self,
55 snapshot: Snapshot<Types, State, ARITY>,
56 key: State::Key,
57 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
58 let state_type = State::state_type();
59 let tree_height = State::tree_height();
60
61 let traversal_path = State::Key::to_traversal_path(&key, tree_height);
63 let (created, merkle_commitment) = self.snapshot_info(snapshot).await?;
64
65 let (query, sql) = build_get_path_query(state_type, traversal_path.clone(), created)?;
68 let rows = query.query(&sql).fetch_all(self.as_mut()).await?;
69
70 let nodes: Vec<Node> = rows.into_iter().map(|r| r.into()).collect();
71
72 let mut hash_ids = HashSet::new();
75 for node in nodes.iter() {
76 hash_ids.insert(node.hash_id);
77 if let Some(children) = &node.children {
78 let children: Vec<i32> =
79 serde_json::from_value(children.clone()).map_err(|e| QueryError::Error {
80 message: format!("Error deserializing 'children' into Vec<i32>: {e}"),
81 })?;
82 hash_ids.extend(children);
83 }
84 }
85
86 let hashes = if !hash_ids.is_empty() {
89 let (query, sql) = build_where_in("SELECT id, value FROM hash", "id", hash_ids)?;
90 query
91 .query_as(&sql)
92 .fetch(self.as_mut())
93 .try_collect::<HashMap<i32, Vec<u8>>>()
94 .await?
95 } else {
96 HashMap::new()
97 };
98
99 let mut proof_path = VecDeque::with_capacity(State::tree_height());
100 for Node {
101 hash_id,
102 children,
103 children_bitvec,
104 idx,
105 entry,
106 ..
107 } in nodes.iter()
108 {
109 {
110 let value = hashes.get(hash_id).ok_or(QueryError::Error {
111 message: format!("node's value references non-existent hash {hash_id}"),
112 })?;
113
114 match (children, children_bitvec, idx, entry) {
115 (Some(children), Some(children_bitvec), None, None) => {
117 let children: Vec<i32> =
118 serde_json::from_value(children.clone()).map_err(|e| {
119 QueryError::Error {
120 message: format!(
121 "Error deserializing 'children' into Vec<i32>: {e}"
122 ),
123 }
124 })?;
125 let mut children = children.iter();
126
127 let child_nodes = children_bitvec
130 .iter()
131 .map(|bit| {
132 if bit {
133 let hash_id = children.next().ok_or(QueryError::Error {
134 message: "node has fewer children than set bits".into(),
135 })?;
136 let value = hashes.get(hash_id).ok_or(QueryError::Error {
137 message: format!(
138 "node's child references non-existent hash {hash_id}"
139 ),
140 })?;
141 Ok(Arc::new(MerkleNode::ForgettenSubtree {
142 value: State::T::deserialize_compressed(value.as_slice())
143 .decode_error("malformed merkle node value")?,
144 }))
145 } else {
146 Ok(Arc::new(MerkleNode::Empty))
147 }
148 })
149 .collect::<QueryResult<Vec<_>>>()?;
150 proof_path.push_back(MerkleNode::Branch {
152 value: State::T::deserialize_compressed(value.as_slice())
153 .decode_error("malformed merkle node value")?,
154 children: child_nodes,
155 });
156 },
157 (None, None, Some(index), Some(entry)) => {
159 proof_path.push_back(MerkleNode::Leaf {
160 value: State::T::deserialize_compressed(value.as_slice())
161 .decode_error("malformed merkle node value")?,
162 pos: serde_json::from_value(index.clone())
163 .decode_error("malformed merkle node index")?,
164 elem: serde_json::from_value(entry.clone())
165 .decode_error("malformed merkle element")?,
166 });
167 },
168 (None, None, Some(_), None) => {
170 proof_path.push_back(MerkleNode::Empty);
171 },
172 _ => {
173 return Err(QueryError::Error {
174 message: "Invalid type of merkle node found".to_string(),
175 });
176 },
177 }
178 }
179 }
180
181 let init = if let Some(MerkleNode::Leaf { value, .. }) = proof_path.front() {
183 *value
184 } else {
185 while proof_path.len() <= State::tree_height() {
191 proof_path.push_front(MerkleNode::Empty);
192 }
193 State::T::default()
194 };
195 let commitment_from_path = traversal_path
196 .iter()
197 .zip(proof_path.iter().skip(1))
198 .try_fold(init, |val, (branch, node)| -> QueryResult<State::T> {
199 match node {
200 MerkleNode::Branch { value: _, children } => {
201 let data = children
202 .iter()
203 .map(|node| match node.as_ref() {
204 MerkleNode::ForgettenSubtree { value } => Ok(*value),
205 MerkleNode::Empty => Ok(State::T::default()),
206 _ => Err(QueryError::Error {
207 message: "Invalid child node".to_string(),
208 }),
209 })
210 .collect::<QueryResult<Vec<_>>>()?;
211
212 if data[*branch] != val {
213 tracing::warn!(
217 ?key,
218 parent = ?data[*branch],
219 child = ?val,
220 branch = %*branch,
221 %created,
222 %merkle_commitment,
223 "missing data in merklized state; parent-child mismatch",
224 );
225 return Err(QueryError::Missing);
226 }
227
228 State::Digest::digest(&data).map_err(|err| QueryError::Error {
229 message: format!("failed to update digest: {err:#}"),
230 })
231 },
232 MerkleNode::Empty => Ok(init),
233 _ => Err(QueryError::Error {
234 message: "Invalid type of Node in the proof".to_string(),
235 }),
236 }
237 })?;
238
239 if commitment_from_path != merkle_commitment.digest() {
240 return Err(QueryError::Error {
241 message:
242 format!("Commitment calculated from merkle path ({commitment_from_path:?}) does not match the commitment in the header ({:?})", merkle_commitment.digest()),
243 });
244 }
245
246 Ok(MerkleProof {
247 pos: key,
248 proof: proof_path.into(),
249 })
250 }
251}
252
253#[async_trait]
254impl<Mode: TransactionMode> MerklizedStateHeightStorage for Transaction<Mode> {
255 async fn get_last_state_height(&mut self) -> QueryResult<usize> {
256 let Some((height,)) = query_as::<(i64,)>("SELECT height from last_merklized_state_height")
257 .fetch_optional(self.as_mut())
258 .await?
259 else {
260 return Ok(0);
261 };
262 Ok(height as usize)
263 }
264}
265
266impl<Mode: TransactionMode> Transaction<Mode> {
267 async fn snapshot_info<Types, State, const ARITY: usize>(
273 &mut self,
274 snapshot: Snapshot<Types, State, ARITY>,
275 ) -> QueryResult<(i64, State::Commit)>
276 where
277 Types: NodeType,
278 State: MerklizedState<Types, ARITY>,
279 {
280 let header_state_commitment_field = State::header_state_commitment_field();
281
282 let (created, commit) = match snapshot {
283 Snapshot::Commit(commit) => {
284 let (height,) = query_as(&format!(
290 "SELECT height
291 FROM header
292 WHERE {header_state_commitment_field} = $1
293 LIMIT 1"
294 ))
295 .bind(commit.to_string())
296 .fetch_one(self.as_mut())
297 .await?;
298
299 (height, commit)
300 },
301 Snapshot::Index(created) => {
302 let created = created as i64;
303 let (commit,) = query_as::<(String,)>(&format!(
304 "SELECT {header_state_commitment_field} AS root_commitment
305 FROM header
306 WHERE height = $1
307 LIMIT 1"
308 ))
309 .bind(created)
310 .fetch_one(self.as_mut())
311 .await?;
312 let commit = serde_json::from_value(commit.into())
313 .decode_error("malformed state commitment")?;
314 (created, commit)
315 },
316 };
317
318 let height = self.get_last_state_height().await?;
320
321 if height < (created as usize) {
322 return Err(QueryError::NotFound);
323 }
324
325 let pruned_height = self
326 .load_pruned_height()
327 .await
328 .map_err(|e| QueryError::Error {
329 message: format!("failed to load pruned height: {e}"),
330 })?;
331
332 if pruned_height.is_some_and(|h| height <= h as usize) {
333 return Err(QueryError::NotFound);
334 }
335
336 Ok((created, commit))
337 }
338}
339
340pub(crate) fn build_hash_batch_insert(
342 hashes: &[Vec<u8>],
343) -> QueryResult<(QueryBuilder<'_>, String)> {
344 let mut query = QueryBuilder::default();
345 let params = hashes
346 .iter()
347 .map(|hash| Ok(format!("({})", query.bind(hash)?)))
348 .collect::<QueryResult<Vec<String>>>()?;
349 let sql = format!(
350 "INSERT INTO hash(value) values {} ON CONFLICT (value) DO UPDATE SET value = EXCLUDED.value returning value, id",
351 params.join(",")
352 );
353 Ok((query, sql))
354}
355
356#[derive(Debug, Default, Clone)]
358pub(crate) struct Node {
359 pub(crate) path: JsonValue,
360 pub(crate) created: i64,
361 pub(crate) hash_id: i32,
362 pub(crate) children: Option<JsonValue>,
363 pub(crate) children_bitvec: Option<BitVec>,
364 pub(crate) idx: Option<JsonValue>,
365 pub(crate) entry: Option<JsonValue>,
366}
367
368#[cfg(feature = "embedded-db")]
369impl From<sqlx::sqlite::SqliteRow> for Node {
370 fn from(row: sqlx::sqlite::SqliteRow) -> Self {
371 let bit_string: Option<String> = row.get_unchecked("children_bitvec");
372 let children_bitvec: Option<BitVec> =
373 bit_string.map(|b| b.chars().map(|c| c == '1').collect());
374
375 Self {
376 path: row.get_unchecked("path"),
377 created: row.get_unchecked("created"),
378 hash_id: row.get_unchecked("hash_id"),
379 children: row.get_unchecked("children"),
380 children_bitvec,
381 idx: row.get_unchecked("idx"),
382 entry: row.get_unchecked("entry"),
383 }
384 }
385}
386
387#[cfg(not(feature = "embedded-db"))]
388impl From<sqlx::postgres::PgRow> for Node {
389 fn from(row: sqlx::postgres::PgRow) -> Self {
390 Self {
391 path: row.get_unchecked("path"),
392 created: row.get_unchecked("created"),
393 hash_id: row.get_unchecked("hash_id"),
394 children: row.get_unchecked("children"),
395 children_bitvec: row.get_unchecked("children_bitvec"),
396 idx: row.get_unchecked("idx"),
397 entry: row.get_unchecked("entry"),
398 }
399 }
400}
401
402impl Node {
403 pub(crate) async fn upsert(
404 name: &str,
405 nodes: impl IntoIterator<Item = Self>,
406 tx: &mut Transaction<Write>,
407 ) -> anyhow::Result<()> {
408 tx.upsert(
409 name,
410 [
411 "path",
412 "created",
413 "hash_id",
414 "children",
415 "children_bitvec",
416 "idx",
417 "entry",
418 ],
419 ["path", "created"],
420 nodes.into_iter().map(|n| {
421 #[cfg(feature = "embedded-db")]
422 let children_bitvec: Option<String> = n
423 .children_bitvec
424 .clone()
425 .map(|b| b.iter().map(|bit| if bit { '1' } else { '0' }).collect());
426
427 #[cfg(not(feature = "embedded-db"))]
428 let children_bitvec = n.children_bitvec.clone();
429
430 (
431 n.path.clone(),
432 n.created,
433 n.hash_id,
434 n.children.clone(),
435 children_bitvec,
436 n.idx.clone(),
437 n.entry.clone(),
438 )
439 }),
440 )
441 .await
442 }
443}
444
445fn build_get_path_query<'q>(
446 table: &'static str,
447 traversal_path: Vec<usize>,
448 created: i64,
449) -> QueryResult<(QueryBuilder<'q>, String)> {
450 let mut query = QueryBuilder::default();
451 let mut traversal_path = traversal_path.into_iter().map(|x| x as i32);
452
453 let len = traversal_path.len();
455 let mut sub_queries = Vec::new();
456
457 query.bind(created)?;
458
459 for _ in 0..=len {
460 let path = traversal_path.clone().rev().collect::<Vec<_>>();
461 let path: serde_json::Value = path.into();
462 let node_path = query.bind(path)?;
463
464 let sub_query = format!(
465 "SELECT * FROM (SELECT * FROM {table} WHERE path = {node_path} AND created <= $1 ORDER BY created DESC LIMIT 1)",
466 );
467
468 sub_queries.push(sub_query);
469 traversal_path.next();
470 }
471
472 let mut sql: String = sub_queries.join(" UNION ");
473
474 sql = format!("SELECT * FROM ({sql}) as t ");
475
476 if cfg!(feature = "embedded-db") {
479 sql.push_str("ORDER BY length(t.path) DESC");
480 } else {
481 sql.push_str("ORDER BY t.path DESC");
482 }
483
484 Ok((query, sql))
485}
486
487#[cfg(test)]
488mod test {
489 use futures::stream::StreamExt;
490 use jf_merkle_tree::{
491 universal_merkle_tree::UniversalMerkleTree, LookupResult, MerkleTreeScheme,
492 UniversalMerkleTreeScheme,
493 };
494 use rand::{seq::IteratorRandom, RngCore};
495
496 use super::*;
497 use crate::{
498 data_source::{
499 storage::sql::{testing::TmpDb, *},
500 VersionedDataSource,
501 },
502 merklized_state::UpdateStateData,
503 testing::{
504 mocks::{MockMerkleTree, MockTypes},
505 setup_test,
506 },
507 };
508
509 #[tokio::test(flavor = "multi_thread")]
510 async fn test_merklized_state_storage() {
511 setup_test();
514
515 let db = TmpDb::init().await;
516 let storage = SqlStorage::connect(db.config()).await.unwrap();
517
518 let mut test_tree: UniversalMerkleTree<_, _, _, 8, _> =
520 MockMerkleTree::new(MockMerkleTree::tree_height());
521 let block_height = 1;
522
523 let mut tx = storage.write().await.unwrap();
526 for i in 0..27 {
527 test_tree.update(i, i).unwrap();
528
529 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
531 tx.upsert(
532 "header",
533 ["height", "hash", "payload_hash", "timestamp", "data"],
534 ["height"],
535 [(
536 block_height as i64,
537 format!("randomHash{i}"),
538 "t".to_string(),
539 0,
540 test_data,
541 )],
542 )
543 .await
544 .unwrap();
545 let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
547 let traversal_path =
549 <usize as ToTraversalPath<8>>::to_traversal_path(&i, test_tree.height());
550
551 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
552 &mut tx,
553 proof.clone(),
554 traversal_path.clone(),
555 block_height as u64,
556 )
557 .await
558 .expect("failed to insert nodes");
559 }
560 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
562 .await
563 .unwrap();
564 tx.commit().await.unwrap();
565
566 for i in 0..27 {
568 let mut tx = storage.read().await.unwrap();
570 let merkle_path = tx
571 .get_path(
572 Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
573 i,
574 )
575 .await
576 .unwrap();
577
578 let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
579
580 tracing::info!("merkle path {:?}", merkle_path);
581
582 assert_eq!(merkle_path, proof.clone(), "merkle paths mismatch");
584 }
585
586 let (_, proof_bh_1) = test_tree.lookup(0).expect_ok().unwrap();
588 test_tree.update(0, 99).unwrap();
592 let mut tx = storage.write().await.unwrap();
596 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
597 tx.upsert(
598 "header",
599 ["height", "hash", "payload_hash", "timestamp", "data"],
600 ["height"],
601 [(
602 2i64,
603 "randomstring".to_string(),
604 "t".to_string(),
605 0,
606 test_data,
607 )],
608 )
609 .await
610 .unwrap();
611 let (_, proof_bh_2) = test_tree.lookup(0).expect_ok().unwrap();
612 let traversal_path =
614 <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
615 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
618 &mut tx,
619 proof_bh_2.clone(),
620 traversal_path.clone(),
621 2,
622 )
623 .await
624 .expect("failed to insert nodes");
625 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 2)
627 .await
628 .unwrap();
629 tx.commit().await.unwrap();
630
631 let node_path = traversal_path
632 .into_iter()
633 .rev()
634 .map(|n| n as i32)
635 .collect::<Vec<_>>();
636
637 let mut tx = storage.read().await.unwrap();
639 let rows = query("SELECT * from test_tree where path = $1 ORDER BY created")
640 .bind(serde_json::to_value(node_path).unwrap())
641 .fetch(tx.as_mut());
642
643 let nodes: Vec<Node> = rows.map(|res| res.unwrap().into()).collect().await;
644 assert!(nodes.len() == 2, "incorrect number of nodes");
646 assert_eq!(nodes[0].created, 1, "wrong block height");
647 assert_eq!(nodes[1].created, 2, "wrong block height");
648
649 let path_with_bh_2 = storage
654 .read()
655 .await
656 .unwrap()
657 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2), 0)
658 .await
659 .unwrap();
660
661 assert_eq!(path_with_bh_2, proof_bh_2);
662 let path_with_bh_1 = storage
663 .read()
664 .await
665 .unwrap()
666 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(1), 0)
667 .await
668 .unwrap();
669 assert_eq!(path_with_bh_1, proof_bh_1);
670 }
671
672 #[tokio::test(flavor = "multi_thread")]
673 async fn test_merklized_state_non_membership_proof() {
674 setup_test();
680
681 let db = TmpDb::init().await;
682 let storage = SqlStorage::connect(db.config()).await.unwrap();
683
684 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
686 let block_height = 1;
687 test_tree.update(0, 0).unwrap();
689 let commitment = test_tree.commitment();
690
691 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(commitment).unwrap()});
692 let mut tx = storage.write().await.unwrap();
694 tx.upsert(
695 "header",
696 ["height", "hash", "payload_hash", "timestamp", "data"],
697 ["height"],
698 [(
699 block_height as i64,
700 "randomString".to_string(),
701 "t".to_string(),
702 0,
703 test_data,
704 )],
705 )
706 .await
707 .unwrap();
708 let (_, proof_before_remove) = test_tree.lookup(0).expect_ok().unwrap();
710 let traversal_path =
712 <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
713 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
715 &mut tx,
716 proof_before_remove.clone(),
717 traversal_path.clone(),
718 block_height as u64,
719 )
720 .await
721 .expect("failed to insert nodes");
722 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
724 .await
725 .unwrap();
726 tx.commit().await.unwrap();
727 let merkle_path = storage
729 .read()
730 .await
731 .unwrap()
732 .get_path(
733 Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
734 0,
735 )
736 .await
737 .unwrap();
738
739 assert_eq!(
741 merkle_path,
742 proof_before_remove.clone(),
743 "merkle paths mismatch"
744 );
745
746 test_tree.remove(0).expect("failed to delete index 0 ");
748
749 let proof_after_remove = test_tree.universal_lookup(0).expect_not_found().unwrap();
752
753 let mut tx = storage.write().await.unwrap();
754 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
755 &mut tx,
756 proof_after_remove.clone(),
757 traversal_path.clone(),
758 2_u64,
759 )
760 .await
761 .expect("failed to insert nodes");
762 tx.upsert(
764 "header",
765 ["height", "hash", "payload_hash", "timestamp", "data"],
766 ["height"],
767 [(
768 2i64,
769 "randomString2".to_string(),
770 "t".to_string(),
771 0,
772 serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()}),
773 )],
774 )
775 .await
776 .unwrap();
777 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 2)
779 .await
780 .unwrap();
781 tx.commit().await.unwrap();
782 let non_membership_path = storage
784 .read()
785 .await
786 .unwrap()
787 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2_u64), 0)
788 .await
789 .unwrap();
790 assert_eq!(
792 non_membership_path, proof_after_remove,
793 "merkle paths dont match"
794 );
795
796 let proof_bh_1 = storage
801 .read()
802 .await
803 .unwrap()
804 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(1_u64), 0)
805 .await
806 .unwrap();
807 assert_eq!(proof_bh_1, proof_before_remove, "merkle paths dont match");
808 }
809
810 #[tokio::test(flavor = "multi_thread")]
811 async fn test_merklized_state_non_membership_proof_unseen_entry() {
812 setup_test();
813
814 let db = TmpDb::init().await;
815 let storage = SqlStorage::connect(db.config()).await.unwrap();
816
817 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
819
820 for i in 0..=2 {
823 tracing::info!(i, ?test_tree, "testing non-membership proof");
824 let mut tx = storage.write().await.unwrap();
825
826 tx.upsert(
828 "header",
829 ["height", "hash", "payload_hash", "timestamp", "data"],
830 ["height"],
831 [(
832 i as i64,
833 format!("hash{i}"),
834 "t".to_string(),
835 0,
836 serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()})
837 )],
838 )
839 .await
840 .unwrap();
841 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, i)
843 .await
844 .unwrap();
845 tx.commit().await.unwrap();
846
847 let proof = storage
849 .read()
850 .await
851 .unwrap()
852 .get_path(
853 Snapshot::<MockTypes, MockMerkleTree, 8>::Index(i as u64),
854 100,
855 )
856 .await
857 .unwrap();
858 assert_eq!(proof.elem(), None);
859 assert!(test_tree.non_membership_verify(100, proof).unwrap());
860
861 test_tree.update(i, i).unwrap();
863 let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
864 let traversal_path = ToTraversalPath::<8>::to_traversal_path(&i, test_tree.height());
865 let mut tx = storage.write().await.unwrap();
866 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
867 &mut tx,
868 proof,
869 traversal_path,
870 (i + 1) as u64,
871 )
872 .await
873 .expect("failed to insert nodes");
874 tx.commit().await.unwrap();
875 }
876 }
877
878 #[tokio::test(flavor = "multi_thread")]
879 async fn test_merklized_storage_with_commit() {
880 setup_test();
882
883 let db = TmpDb::init().await;
884 let storage = SqlStorage::connect(db.config()).await.unwrap();
885
886 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
888 let block_height = 1;
889 test_tree.update(0, 0).unwrap();
891 let commitment = test_tree.commitment();
892
893 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(commitment).unwrap()});
894 let mut tx = storage.write().await.unwrap();
896 tx.upsert(
897 "header",
898 ["height", "hash", "payload_hash", "timestamp", "data"],
899 ["height"],
900 [(
901 block_height as i64,
902 "randomString".to_string(),
903 "t".to_string(),
904 0,
905 test_data,
906 )],
907 )
908 .await
909 .unwrap();
910 let (_, proof) = test_tree.lookup(0).expect_ok().unwrap();
912 let traversal_path =
914 <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
915 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
917 &mut tx,
918 proof.clone(),
919 traversal_path.clone(),
920 block_height as u64,
921 )
922 .await
923 .expect("failed to insert nodes");
924 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
926 .await
927 .unwrap();
928 tx.commit().await.unwrap();
929
930 let merkle_proof = storage
931 .read()
932 .await
933 .unwrap()
934 .get_path(Snapshot::<_, MockMerkleTree, 8>::Commit(commitment), 0)
935 .await
936 .unwrap();
937
938 let (_, proof) = test_tree.lookup(0).expect_ok().unwrap();
939
940 assert_eq!(merkle_proof, proof.clone(), "merkle paths mismatch");
941 }
942 #[tokio::test(flavor = "multi_thread")]
943 async fn test_merklized_state_missing_state() {
944 setup_test();
951
952 let db = TmpDb::init().await;
953 let storage = SqlStorage::connect(db.config()).await.unwrap();
954
955 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
957 let block_height = 1;
958 let mut tx = storage.write().await.unwrap();
961 for i in 0..27 {
962 test_tree.update(i, i).unwrap();
963 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
964 tx.upsert(
966 "header",
967 ["height", "hash", "payload_hash", "timestamp", "data"],
968 ["height"],
969 [(
970 block_height as i64,
971 format!("rarndomString{i}"),
972 "t".to_string(),
973 0,
974 test_data,
975 )],
976 )
977 .await
978 .unwrap();
979 let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
981 let traversal_path =
983 <usize as ToTraversalPath<8>>::to_traversal_path(&i, test_tree.height());
984 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
986 &mut tx,
987 proof.clone(),
988 traversal_path.clone(),
989 block_height as u64,
990 )
991 .await
992 .expect("failed to insert nodes");
993 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
995 .await
996 .unwrap();
997 }
998
999 test_tree.update(1, 100).unwrap();
1000 let traversal_path =
1002 <usize as ToTraversalPath<8>>::to_traversal_path(&1, test_tree.height());
1003 let (_, proof) = test_tree.lookup(1).expect_ok().unwrap();
1004
1005 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1007 &mut tx,
1008 proof.clone(),
1009 traversal_path.clone(),
1010 block_height as u64,
1011 )
1012 .await
1013 .expect("failed to insert nodes");
1014 tx.commit().await.unwrap();
1015
1016 let merkle_path = storage
1017 .read()
1018 .await
1019 .unwrap()
1020 .get_path(
1021 Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
1022 1,
1023 )
1024 .await;
1025 assert!(merkle_path.is_err());
1026
1027 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
1028 let mut tx = storage.write().await.unwrap();
1030 tx.upsert(
1031 "header",
1032 ["height", "hash", "payload_hash", "timestamp", "data"],
1033 ["height"],
1034 [(
1035 block_height as i64,
1036 "randomStringgg".to_string(),
1037 "t".to_string(),
1038 0,
1039 test_data,
1040 )],
1041 )
1042 .await
1043 .unwrap();
1044 tx.commit().await.unwrap();
1045 let merkle_proof = storage
1047 .read()
1048 .await
1049 .unwrap()
1050 .get_path(
1051 Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
1052 1,
1053 )
1054 .await
1055 .unwrap();
1056 assert_eq!(merkle_proof, proof, "path dont match");
1057
1058 test_tree.update(1, 200).unwrap();
1061
1062 let (_, proof) = test_tree.lookup(1).expect_ok().unwrap();
1063 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
1064
1065 let mut tx = storage.write().await.unwrap();
1067 tx.upsert(
1068 "header",
1069 ["height", "hash", "payload_hash", "timestamp", "data"],
1070 ["height"],
1071 [(
1072 2i64,
1073 "randomHashString".to_string(),
1074 "t".to_string(),
1075 0,
1076 test_data,
1077 )],
1078 )
1079 .await
1080 .unwrap();
1081 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1082 &mut tx,
1083 proof.clone(),
1084 traversal_path.clone(),
1085 2_u64,
1086 )
1087 .await
1088 .expect("failed to insert nodes");
1089
1090 let node_path = traversal_path
1092 .iter()
1093 .skip(1)
1094 .rev()
1095 .map(|n| *n as i32)
1096 .collect::<Vec<_>>();
1097 tx.execute(
1098 query(&format!(
1099 "DELETE FROM {} WHERE created = 2 and path = $1",
1100 MockMerkleTree::state_type()
1101 ))
1102 .bind(serde_json::to_value(node_path).unwrap()),
1103 )
1104 .await
1105 .expect("failed to delete internal node");
1106 tx.commit().await.unwrap();
1107
1108 let merkle_path = storage
1109 .read()
1110 .await
1111 .unwrap()
1112 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2_u64), 1)
1113 .await;
1114
1115 assert!(merkle_path.is_err());
1116 }
1117
1118 #[tokio::test(flavor = "multi_thread")]
1119 async fn test_merklized_state_snapshot() {
1120 setup_test();
1121
1122 let db = TmpDb::init().await;
1123 let storage = SqlStorage::connect(db.config()).await.unwrap();
1124
1125 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1127
1128 const RESERVED_KEY: usize = (u32::MAX as usize) + 1;
1131
1132 #[tracing::instrument(skip(tree, expected))]
1135 fn randomize(tree: &mut MockMerkleTree, expected: &mut HashMap<usize, Option<usize>>) {
1136 let mut rng = rand::thread_rng();
1137 tracing::info!("randomizing tree");
1138
1139 for _ in 0..50 {
1140 if !expected.values().any(|v| v.is_some()) || rng.next_u32() % 2 == 0 {
1143 let key = rng.next_u32() as usize;
1145 let val = rng.next_u32() as usize;
1146 tracing::info!(key, val, "inserting");
1147
1148 tree.update(key, val).unwrap();
1149 expected.insert(key, Some(val));
1150 } else {
1151 let key = expected
1153 .iter()
1154 .filter_map(|(k, v)| if v.is_some() { Some(k) } else { None })
1155 .choose(&mut rng)
1156 .unwrap();
1157 tracing::info!(key, "deleting");
1158
1159 tree.remove(key).unwrap();
1160 expected.insert(*key, None);
1161 }
1162 }
1163 }
1164
1165 #[tracing::instrument(skip(storage, tree, expected))]
1167 async fn store(
1168 storage: &SqlStorage,
1169 tree: &MockMerkleTree,
1170 expected: &HashMap<usize, Option<usize>>,
1171 block_height: u64,
1172 ) {
1173 tracing::info!("persisting tree");
1174 let mut tx = storage.write().await.unwrap();
1175
1176 for key in expected.keys() {
1177 let proof = match tree.universal_lookup(key) {
1178 LookupResult::Ok(_, proof) => proof,
1179 LookupResult::NotFound(proof) => proof,
1180 LookupResult::NotInMemory => panic!("failed to find key {key}"),
1181 };
1182 let traversal_path = ToTraversalPath::<8>::to_traversal_path(key, tree.height());
1183 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1184 &mut tx,
1185 proof,
1186 traversal_path,
1187 block_height,
1188 )
1189 .await
1190 .unwrap();
1191 }
1192 tx
1194 .upsert("header", ["height", "hash", "payload_hash", "timestamp", "data"], ["height"],
1195 [(
1196 block_height as i64,
1197 format!("hash{block_height}"),
1198 "hash".to_string(),
1199 0i64,
1200 serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(tree.commitment()).unwrap()}),
1201 )],
1202 )
1203 .await
1204 .unwrap();
1205 UpdateStateData::<MockTypes, MockMerkleTree, 8>::set_last_state_height(
1206 &mut tx,
1207 block_height as usize,
1208 )
1209 .await
1210 .unwrap();
1211 tx.commit().await.unwrap();
1212 }
1213
1214 #[tracing::instrument(skip(storage, tree, expected))]
1215 async fn validate(
1216 storage: &SqlStorage,
1217 tree: &MockMerkleTree,
1218 expected: &HashMap<usize, Option<usize>>,
1219 block_height: u64,
1220 ) {
1221 tracing::info!("validating snapshot");
1222
1223 let snapshot = Snapshot::<_, MockMerkleTree, 8>::Index(block_height);
1225
1226 for (key, val) in expected {
1227 let proof = match tree.universal_lookup(key) {
1228 LookupResult::Ok(_, proof) => proof,
1229 LookupResult::NotFound(proof) => proof,
1230 LookupResult::NotInMemory => panic!("failed to find key {key}"),
1231 };
1232 assert_eq!(
1233 proof,
1234 storage
1235 .read()
1236 .await
1237 .unwrap()
1238 .get_path(snapshot, *key)
1239 .await
1240 .unwrap()
1241 );
1242 assert_eq!(val.as_ref(), proof.elem());
1243 if val.is_some() {
1245 MockMerkleTree::verify(tree.commitment().digest(), key, proof)
1246 .unwrap()
1247 .unwrap();
1248 } else {
1249 assert!(tree.non_membership_verify(key, proof).unwrap());
1250 }
1251 }
1252
1253 let proof = match tree.universal_lookup(RESERVED_KEY) {
1255 LookupResult::Ok(_, proof) => proof,
1256 LookupResult::NotFound(proof) => proof,
1257 LookupResult::NotInMemory => panic!("failed to find reserved key {RESERVED_KEY}"),
1258 };
1259 assert_eq!(
1260 proof,
1261 storage
1262 .read()
1263 .await
1264 .unwrap()
1265 .get_path(snapshot, RESERVED_KEY)
1266 .await
1267 .unwrap()
1268 );
1269 assert_eq!(proof.elem(), None);
1270 assert!(tree.non_membership_verify(RESERVED_KEY, proof).unwrap());
1272 }
1273
1274 let mut expected = HashMap::<usize, Option<usize>>::new();
1276 randomize(&mut test_tree, &mut expected);
1277
1278 store(&storage, &test_tree, &expected, 1).await;
1280 validate(&storage, &test_tree, &expected, 1).await;
1281
1282 let mut expected2 = expected.clone();
1284 let mut test_tree2 = test_tree.clone();
1285 randomize(&mut test_tree2, &mut expected2);
1286 store(&storage, &test_tree2, &expected2, 2).await;
1287 validate(&storage, &test_tree2, &expected2, 2).await;
1288
1289 validate(&storage, &test_tree, &expected, 1).await;
1291 }
1292
1293 #[tokio::test(flavor = "multi_thread")]
1294 async fn test_merklized_state_missing_leaf() {
1295 setup_test();
1301
1302 for tree_size in 1..=3 {
1303 let db = TmpDb::init().await;
1304 let storage = SqlStorage::connect(db.config()).await.unwrap();
1305
1306 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1308 for i in 0..tree_size {
1309 test_tree.update(i, i).unwrap();
1310 }
1311
1312 let mut tx = storage.write().await.unwrap();
1313
1314 tx.upsert(
1316 "header",
1317 ["height", "hash", "payload_hash", "timestamp", "data"],
1318 ["height"],
1319 [(
1320 0i64,
1321 "hash".to_string(),
1322 "hash".to_string(),
1323 0,
1324 serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()}),
1325 )],
1326 )
1327 .await
1328 .unwrap();
1329
1330 for i in 0..tree_size {
1332 let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1333 let traversal_path =
1334 ToTraversalPath::<8>::to_traversal_path(&i, test_tree.height());
1335 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1336 &mut tx,
1337 proof,
1338 traversal_path,
1339 0,
1340 )
1341 .await
1342 .unwrap();
1343 }
1344 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 0)
1345 .await
1346 .unwrap();
1347 tx.commit().await.unwrap();
1348
1349 let snapshot = Snapshot::<MockTypes, MockMerkleTree, 8>::Index(0);
1351 for i in 0..tree_size {
1352 let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1353 assert_eq!(
1354 proof,
1355 storage
1356 .read()
1357 .await
1358 .unwrap()
1359 .get_path(snapshot, i)
1360 .await
1361 .unwrap()
1362 );
1363 assert_eq!(*proof.elem().unwrap(), i);
1364 }
1365
1366 let index = serde_json::to_value(tree_size - 1).unwrap();
1368 let mut tx = storage.write().await.unwrap();
1369
1370 tx.execute(
1371 query(&format!(
1372 "DELETE FROM {} WHERE idx = $1",
1373 MockMerkleTree::state_type()
1374 ))
1375 .bind(serde_json::to_value(index).unwrap()),
1376 )
1377 .await
1378 .unwrap();
1379 tx.commit().await.unwrap();
1380
1381 for i in 0..tree_size - 1 {
1383 let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1384 assert_eq!(
1385 proof,
1386 storage
1387 .read()
1388 .await
1389 .unwrap()
1390 .get_path(snapshot, i)
1391 .await
1392 .unwrap()
1393 );
1394 assert_eq!(*proof.elem().unwrap(), i);
1395 }
1396
1397 let err = storage
1399 .read()
1400 .await
1401 .unwrap()
1402 .get_path(snapshot, tree_size - 1)
1403 .await
1404 .unwrap_err();
1405 assert!(matches!(err, QueryError::Missing));
1406 }
1407 }
1408}