1use std::{
16 collections::{HashMap, HashSet, VecDeque},
17 sync::Arc,
18};
19
20use ark_serialize::CanonicalDeserialize;
21use async_trait::async_trait;
22use futures::stream::TryStreamExt;
23use hotshot_types::traits::node_implementation::NodeType;
24use jf_merkle_tree::{
25 prelude::{MerkleNode, MerkleProof},
26 DigestAlgorithm, MerkleCommitment, ToTraversalPath,
27};
28use sqlx::types::{BitVec, JsonValue};
29
30use super::{
31 super::transaction::{query_as, Transaction, TransactionMode, Write},
32 DecodeError, QueryBuilder,
33};
34use crate::{
35 data_source::storage::{
36 pruning::PrunedHeightStorage,
37 sql::{build_where_in, sqlx::Row},
38 MerklizedStateHeightStorage, MerklizedStateStorage,
39 },
40 merklized_state::{MerklizedState, Snapshot},
41 QueryError, QueryResult,
42};
43
44#[async_trait]
45impl<Mode, Types, State, const ARITY: usize> MerklizedStateStorage<Types, State, ARITY>
46 for Transaction<Mode>
47where
48 Mode: TransactionMode,
49 Types: NodeType,
50 State: MerklizedState<Types, ARITY> + 'static,
51{
52 async fn get_path(
54 &mut self,
55 snapshot: Snapshot<Types, State, ARITY>,
56 key: State::Key,
57 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
58 let state_type = State::state_type();
59 let tree_height = State::tree_height();
60
61 let traversal_path = State::Key::to_traversal_path(&key, tree_height);
63 let (created, merkle_commitment) = self.snapshot_info(snapshot).await?;
64
65 let (query, sql) = build_get_path_query(state_type, traversal_path.clone(), created)?;
68 let rows = query.query(&sql).fetch_all(self.as_mut()).await?;
69
70 let nodes: Vec<Node> = rows.into_iter().map(|r| r.into()).collect();
71
72 let mut hash_ids = HashSet::new();
75 for node in nodes.iter() {
76 hash_ids.insert(node.hash_id);
77 if let Some(children) = &node.children {
78 let children: Vec<i32> =
79 serde_json::from_value(children.clone()).map_err(|e| QueryError::Error {
80 message: format!("Error deserializing 'children' into Vec<i32>: {e}"),
81 })?;
82 hash_ids.extend(children);
83 }
84 }
85
86 let hashes = if !hash_ids.is_empty() {
89 let (query, sql) = build_where_in("SELECT id, value FROM hash", "id", hash_ids)?;
90 query
91 .query_as(&sql)
92 .fetch(self.as_mut())
93 .try_collect::<HashMap<i32, Vec<u8>>>()
94 .await?
95 } else {
96 HashMap::new()
97 };
98
99 let mut proof_path = VecDeque::with_capacity(State::tree_height());
100 for Node {
101 hash_id,
102 children,
103 children_bitvec,
104 idx,
105 entry,
106 ..
107 } in nodes.iter()
108 {
109 {
110 let value = hashes.get(hash_id).ok_or(QueryError::Error {
111 message: format!("node's value references non-existent hash {hash_id}"),
112 })?;
113
114 match (children, children_bitvec, idx, entry) {
115 (Some(children), Some(children_bitvec), None, None) => {
117 let children: Vec<i32> =
118 serde_json::from_value(children.clone()).map_err(|e| {
119 QueryError::Error {
120 message: format!(
121 "Error deserializing 'children' into Vec<i32>: {e}"
122 ),
123 }
124 })?;
125 let mut children = children.iter();
126
127 let child_nodes = children_bitvec
130 .iter()
131 .map(|bit| {
132 if bit {
133 let hash_id = children.next().ok_or(QueryError::Error {
134 message: "node has fewer children than set bits".into(),
135 })?;
136 let value = hashes.get(hash_id).ok_or(QueryError::Error {
137 message: format!(
138 "node's child references non-existent hash {hash_id}"
139 ),
140 })?;
141 Ok(Arc::new(MerkleNode::ForgettenSubtree {
142 value: State::T::deserialize_compressed(value.as_slice())
143 .decode_error("malformed merkle node value")?,
144 }))
145 } else {
146 Ok(Arc::new(MerkleNode::Empty))
147 }
148 })
149 .collect::<QueryResult<Vec<_>>>()?;
150 proof_path.push_back(MerkleNode::Branch {
152 value: State::T::deserialize_compressed(value.as_slice())
153 .decode_error("malformed merkle node value")?,
154 children: child_nodes,
155 });
156 },
157 (None, None, Some(index), Some(entry)) => {
159 proof_path.push_back(MerkleNode::Leaf {
160 value: State::T::deserialize_compressed(value.as_slice())
161 .decode_error("malformed merkle node value")?,
162 pos: serde_json::from_value(index.clone())
163 .decode_error("malformed merkle node index")?,
164 elem: serde_json::from_value(entry.clone())
165 .decode_error("malformed merkle element")?,
166 });
167 },
168 (None, None, Some(_), None) => {
170 proof_path.push_back(MerkleNode::Empty);
171 },
172 _ => {
173 return Err(QueryError::Error {
174 message: "Invalid type of merkle node found".to_string(),
175 });
176 },
177 }
178 }
179 }
180
181 let init = if let Some(MerkleNode::Leaf { value, .. }) = proof_path.front() {
183 *value
184 } else {
185 while proof_path.len() <= State::tree_height() {
191 proof_path.push_front(MerkleNode::Empty);
192 }
193 State::T::default()
194 };
195 let commitment_from_path = traversal_path
196 .iter()
197 .zip(proof_path.iter().skip(1))
198 .try_fold(init, |val, (branch, node)| -> QueryResult<State::T> {
199 match node {
200 MerkleNode::Branch { value: _, children } => {
201 let data = children
202 .iter()
203 .map(|node| match node.as_ref() {
204 MerkleNode::ForgettenSubtree { value } => Ok(*value),
205 MerkleNode::Empty => Ok(State::T::default()),
206 _ => Err(QueryError::Error {
207 message: "Invalid child node".to_string(),
208 }),
209 })
210 .collect::<QueryResult<Vec<_>>>()?;
211
212 if data[*branch] != val {
213 tracing::warn!(
217 ?key,
218 parent = ?data[*branch],
219 child = ?val,
220 branch = %*branch,
221 %created,
222 %merkle_commitment,
223 "missing data in merklized state; parent-child mismatch",
224 );
225 return Err(QueryError::Missing);
226 }
227
228 State::Digest::digest(&data).map_err(|err| QueryError::Error {
229 message: format!("failed to update digest: {err:#}"),
230 })
231 },
232 MerkleNode::Empty => Ok(init),
233 _ => Err(QueryError::Error {
234 message: "Invalid type of Node in the proof".to_string(),
235 }),
236 }
237 })?;
238
239 if commitment_from_path != merkle_commitment.digest() {
240 return Err(QueryError::Error {
241 message: format!(
242 "Commitment calculated from merkle path ({commitment_from_path:?}) does not \
243 match the commitment in the header ({:?})",
244 merkle_commitment.digest()
245 ),
246 });
247 }
248
249 Ok(MerkleProof {
250 pos: key,
251 proof: proof_path.into(),
252 })
253 }
254}
255
256#[async_trait]
257impl<Mode: TransactionMode> MerklizedStateHeightStorage for Transaction<Mode> {
258 async fn get_last_state_height(&mut self) -> QueryResult<usize> {
259 let Some((height,)) = query_as::<(i64,)>("SELECT height from last_merklized_state_height")
260 .fetch_optional(self.as_mut())
261 .await?
262 else {
263 return Ok(0);
264 };
265 Ok(height as usize)
266 }
267}
268
269impl<Mode: TransactionMode> Transaction<Mode> {
270 async fn snapshot_info<Types, State, const ARITY: usize>(
276 &mut self,
277 snapshot: Snapshot<Types, State, ARITY>,
278 ) -> QueryResult<(i64, State::Commit)>
279 where
280 Types: NodeType,
281 State: MerklizedState<Types, ARITY>,
282 {
283 let header_state_commitment_field = State::header_state_commitment_field();
284
285 let (created, commit) = match snapshot {
286 Snapshot::Commit(commit) => {
287 let (height,) = query_as(&format!(
293 "SELECT height
294 FROM header
295 WHERE {header_state_commitment_field} = $1
296 LIMIT 1"
297 ))
298 .bind(commit.to_string())
299 .fetch_one(self.as_mut())
300 .await?;
301
302 (height, commit)
303 },
304 Snapshot::Index(created) => {
305 let created = created as i64;
306 let (commit,) = query_as::<(String,)>(&format!(
307 "SELECT {header_state_commitment_field} AS root_commitment
308 FROM header
309 WHERE height = $1
310 LIMIT 1"
311 ))
312 .bind(created)
313 .fetch_one(self.as_mut())
314 .await?;
315 let commit = serde_json::from_value(commit.into())
316 .decode_error("malformed state commitment")?;
317 (created, commit)
318 },
319 };
320
321 let height = self.get_last_state_height().await?;
323
324 if height < (created as usize) {
325 return Err(QueryError::NotFound);
326 }
327
328 let pruned_height = self
329 .load_pruned_height()
330 .await
331 .map_err(|e| QueryError::Error {
332 message: format!("failed to load pruned height: {e}"),
333 })?;
334
335 if pruned_height.is_some_and(|h| height <= h as usize) {
336 return Err(QueryError::NotFound);
337 }
338
339 Ok((created, commit))
340 }
341}
342
343pub(crate) fn build_hash_batch_insert(
345 hashes: &[Vec<u8>],
346) -> QueryResult<(QueryBuilder<'_>, String)> {
347 let mut query = QueryBuilder::default();
348 let params = hashes
349 .iter()
350 .map(|hash| Ok(format!("({})", query.bind(hash)?)))
351 .collect::<QueryResult<Vec<String>>>()?;
352 let sql = format!(
353 "INSERT INTO hash(value) values {} ON CONFLICT (value) DO UPDATE SET value = \
354 EXCLUDED.value returning value, id",
355 params.join(",")
356 );
357 Ok((query, sql))
358}
359
360#[derive(Debug, Default, Clone)]
362pub(crate) struct Node {
363 pub(crate) path: JsonValue,
364 pub(crate) created: i64,
365 pub(crate) hash_id: i32,
366 pub(crate) children: Option<JsonValue>,
367 pub(crate) children_bitvec: Option<BitVec>,
368 pub(crate) idx: Option<JsonValue>,
369 pub(crate) entry: Option<JsonValue>,
370}
371
372#[cfg(feature = "embedded-db")]
373impl From<sqlx::sqlite::SqliteRow> for Node {
374 fn from(row: sqlx::sqlite::SqliteRow) -> Self {
375 let bit_string: Option<String> = row.get_unchecked("children_bitvec");
376 let children_bitvec: Option<BitVec> =
377 bit_string.map(|b| b.chars().map(|c| c == '1').collect());
378
379 Self {
380 path: row.get_unchecked("path"),
381 created: row.get_unchecked("created"),
382 hash_id: row.get_unchecked("hash_id"),
383 children: row.get_unchecked("children"),
384 children_bitvec,
385 idx: row.get_unchecked("idx"),
386 entry: row.get_unchecked("entry"),
387 }
388 }
389}
390
391#[cfg(not(feature = "embedded-db"))]
392impl From<sqlx::postgres::PgRow> for Node {
393 fn from(row: sqlx::postgres::PgRow) -> Self {
394 Self {
395 path: row.get_unchecked("path"),
396 created: row.get_unchecked("created"),
397 hash_id: row.get_unchecked("hash_id"),
398 children: row.get_unchecked("children"),
399 children_bitvec: row.get_unchecked("children_bitvec"),
400 idx: row.get_unchecked("idx"),
401 entry: row.get_unchecked("entry"),
402 }
403 }
404}
405
406impl Node {
407 pub(crate) async fn upsert(
408 name: &str,
409 nodes: impl IntoIterator<Item = Self>,
410 tx: &mut Transaction<Write>,
411 ) -> anyhow::Result<()> {
412 tx.upsert(
413 name,
414 [
415 "path",
416 "created",
417 "hash_id",
418 "children",
419 "children_bitvec",
420 "idx",
421 "entry",
422 ],
423 ["path", "created"],
424 nodes.into_iter().map(|n| {
425 #[cfg(feature = "embedded-db")]
426 let children_bitvec: Option<String> = n
427 .children_bitvec
428 .clone()
429 .map(|b| b.iter().map(|bit| if bit { '1' } else { '0' }).collect());
430
431 #[cfg(not(feature = "embedded-db"))]
432 let children_bitvec = n.children_bitvec.clone();
433
434 (
435 n.path.clone(),
436 n.created,
437 n.hash_id,
438 n.children.clone(),
439 children_bitvec,
440 n.idx.clone(),
441 n.entry.clone(),
442 )
443 }),
444 )
445 .await
446 }
447}
448
449fn build_get_path_query<'q>(
450 table: &'static str,
451 traversal_path: Vec<usize>,
452 created: i64,
453) -> QueryResult<(QueryBuilder<'q>, String)> {
454 let mut query = QueryBuilder::default();
455 let mut traversal_path = traversal_path.into_iter().map(|x| x as i32);
456
457 let len = traversal_path.len();
459 let mut sub_queries = Vec::new();
460
461 query.bind(created)?;
462
463 for _ in 0..=len {
464 let path = traversal_path.clone().rev().collect::<Vec<_>>();
465 let path: serde_json::Value = path.into();
466 let node_path = query.bind(path)?;
467
468 let sub_query = format!(
469 "SELECT * FROM (SELECT * FROM {table} WHERE path = {node_path} AND created <= $1 \
470 ORDER BY created DESC LIMIT 1)",
471 );
472
473 sub_queries.push(sub_query);
474 traversal_path.next();
475 }
476
477 let mut sql: String = sub_queries.join(" UNION ");
478
479 sql = format!("SELECT * FROM ({sql}) as t ");
480
481 if cfg!(feature = "embedded-db") {
484 sql.push_str("ORDER BY length(t.path) DESC");
485 } else {
486 sql.push_str("ORDER BY t.path DESC");
487 }
488
489 Ok((query, sql))
490}
491
492#[cfg(test)]
493mod test {
494 use futures::stream::StreamExt;
495 use jf_merkle_tree::{
496 universal_merkle_tree::UniversalMerkleTree, LookupResult, MerkleTreeScheme,
497 UniversalMerkleTreeScheme,
498 };
499 use rand::{seq::IteratorRandom, RngCore};
500
501 use super::*;
502 use crate::{
503 data_source::{
504 storage::sql::{testing::TmpDb, *},
505 VersionedDataSource,
506 },
507 merklized_state::UpdateStateData,
508 testing::mocks::{MockMerkleTree, MockTypes},
509 };
510
511 #[test_log::test(tokio::test(flavor = "multi_thread"))]
512 async fn test_merklized_state_storage() {
513 let db = TmpDb::init().await;
517 let storage = SqlStorage::connect(db.config()).await.unwrap();
518
519 let mut test_tree: UniversalMerkleTree<_, _, _, 8, _> =
521 MockMerkleTree::new(MockMerkleTree::tree_height());
522 let block_height = 1;
523
524 let mut tx = storage.write().await.unwrap();
527 for i in 0..27 {
528 test_tree.update(i, i).unwrap();
529
530 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
532 tx.upsert(
533 "header",
534 ["height", "hash", "payload_hash", "timestamp", "data"],
535 ["height"],
536 [(
537 block_height as i64,
538 format!("randomHash{i}"),
539 "t".to_string(),
540 0,
541 test_data,
542 )],
543 )
544 .await
545 .unwrap();
546 let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
548 let traversal_path =
550 <usize as ToTraversalPath<8>>::to_traversal_path(&i, test_tree.height());
551
552 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
553 &mut tx,
554 proof.clone(),
555 traversal_path.clone(),
556 block_height as u64,
557 )
558 .await
559 .expect("failed to insert nodes");
560 }
561 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
563 .await
564 .unwrap();
565 tx.commit().await.unwrap();
566
567 for i in 0..27 {
569 let mut tx = storage.read().await.unwrap();
571 let merkle_path = tx
572 .get_path(
573 Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
574 i,
575 )
576 .await
577 .unwrap();
578
579 let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
580
581 tracing::info!("merkle path {:?}", merkle_path);
582
583 assert_eq!(merkle_path, proof.clone(), "merkle paths mismatch");
585 }
586
587 let (_, proof_bh_1) = test_tree.lookup(0).expect_ok().unwrap();
589 test_tree.update(0, 99).unwrap();
593 let mut tx = storage.write().await.unwrap();
597 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
598 tx.upsert(
599 "header",
600 ["height", "hash", "payload_hash", "timestamp", "data"],
601 ["height"],
602 [(
603 2i64,
604 "randomstring".to_string(),
605 "t".to_string(),
606 0,
607 test_data,
608 )],
609 )
610 .await
611 .unwrap();
612 let (_, proof_bh_2) = test_tree.lookup(0).expect_ok().unwrap();
613 let traversal_path =
615 <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
616 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
619 &mut tx,
620 proof_bh_2.clone(),
621 traversal_path.clone(),
622 2,
623 )
624 .await
625 .expect("failed to insert nodes");
626 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 2)
628 .await
629 .unwrap();
630 tx.commit().await.unwrap();
631
632 let node_path = traversal_path
633 .into_iter()
634 .rev()
635 .map(|n| n as i32)
636 .collect::<Vec<_>>();
637
638 let mut tx = storage.read().await.unwrap();
640 let rows = query("SELECT * from test_tree where path = $1 ORDER BY created")
641 .bind(serde_json::to_value(node_path).unwrap())
642 .fetch(tx.as_mut());
643
644 let nodes: Vec<Node> = rows.map(|res| res.unwrap().into()).collect().await;
645 assert!(nodes.len() == 2, "incorrect number of nodes");
647 assert_eq!(nodes[0].created, 1, "wrong block height");
648 assert_eq!(nodes[1].created, 2, "wrong block height");
649
650 let path_with_bh_2 = storage
655 .read()
656 .await
657 .unwrap()
658 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2), 0)
659 .await
660 .unwrap();
661
662 assert_eq!(path_with_bh_2, proof_bh_2);
663 let path_with_bh_1 = storage
664 .read()
665 .await
666 .unwrap()
667 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(1), 0)
668 .await
669 .unwrap();
670 assert_eq!(path_with_bh_1, proof_bh_1);
671 }
672
673 #[test_log::test(tokio::test(flavor = "multi_thread"))]
674 async fn test_merklized_state_non_membership_proof() {
675 let db = TmpDb::init().await;
682 let storage = SqlStorage::connect(db.config()).await.unwrap();
683
684 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
686 let block_height = 1;
687 test_tree.update(0, 0).unwrap();
689 let commitment = test_tree.commitment();
690
691 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(commitment).unwrap()});
692 let mut tx = storage.write().await.unwrap();
694 tx.upsert(
695 "header",
696 ["height", "hash", "payload_hash", "timestamp", "data"],
697 ["height"],
698 [(
699 block_height as i64,
700 "randomString".to_string(),
701 "t".to_string(),
702 0,
703 test_data,
704 )],
705 )
706 .await
707 .unwrap();
708 let (_, proof_before_remove) = test_tree.lookup(0).expect_ok().unwrap();
710 let traversal_path =
712 <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
713 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
715 &mut tx,
716 proof_before_remove.clone(),
717 traversal_path.clone(),
718 block_height as u64,
719 )
720 .await
721 .expect("failed to insert nodes");
722 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
724 .await
725 .unwrap();
726 tx.commit().await.unwrap();
727 let merkle_path = storage
729 .read()
730 .await
731 .unwrap()
732 .get_path(
733 Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
734 0,
735 )
736 .await
737 .unwrap();
738
739 assert_eq!(
741 merkle_path,
742 proof_before_remove.clone(),
743 "merkle paths mismatch"
744 );
745
746 test_tree.remove(0).expect("failed to delete index 0 ");
748
749 let proof_after_remove = test_tree.universal_lookup(0).expect_not_found().unwrap();
752
753 let mut tx = storage.write().await.unwrap();
754 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
755 &mut tx,
756 proof_after_remove.clone(),
757 traversal_path.clone(),
758 2_u64,
759 )
760 .await
761 .expect("failed to insert nodes");
762 tx.upsert(
764 "header",
765 ["height", "hash", "payload_hash", "timestamp", "data"],
766 ["height"],
767 [(
768 2i64,
769 "randomString2".to_string(),
770 "t".to_string(),
771 0,
772 serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()}),
773 )],
774 )
775 .await
776 .unwrap();
777 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 2)
779 .await
780 .unwrap();
781 tx.commit().await.unwrap();
782 let non_membership_path = storage
784 .read()
785 .await
786 .unwrap()
787 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2_u64), 0)
788 .await
789 .unwrap();
790 assert_eq!(
792 non_membership_path, proof_after_remove,
793 "merkle paths dont match"
794 );
795
796 let proof_bh_1 = storage
801 .read()
802 .await
803 .unwrap()
804 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(1_u64), 0)
805 .await
806 .unwrap();
807 assert_eq!(proof_bh_1, proof_before_remove, "merkle paths dont match");
808 }
809
810 #[test_log::test(tokio::test(flavor = "multi_thread"))]
811 async fn test_merklized_state_non_membership_proof_unseen_entry() {
812 let db = TmpDb::init().await;
813 let storage = SqlStorage::connect(db.config()).await.unwrap();
814
815 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
817
818 for i in 0..=2 {
821 tracing::info!(i, ?test_tree, "testing non-membership proof");
822 let mut tx = storage.write().await.unwrap();
823
824 tx.upsert(
826 "header",
827 ["height", "hash", "payload_hash", "timestamp", "data"],
828 ["height"],
829 [(
830 i as i64,
831 format!("hash{i}"),
832 "t".to_string(),
833 0,
834 serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()})
835 )],
836 )
837 .await
838 .unwrap();
839 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, i)
841 .await
842 .unwrap();
843 tx.commit().await.unwrap();
844
845 let proof = storage
847 .read()
848 .await
849 .unwrap()
850 .get_path(
851 Snapshot::<MockTypes, MockMerkleTree, 8>::Index(i as u64),
852 100,
853 )
854 .await
855 .unwrap();
856 assert_eq!(proof.elem(), None);
857
858 assert!(
859 MockMerkleTree::non_membership_verify(test_tree.commitment(), 100, proof).unwrap()
860 );
861
862 test_tree.update(i, i).unwrap();
864 let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
865 let traversal_path = ToTraversalPath::<8>::to_traversal_path(&i, test_tree.height());
866 let mut tx = storage.write().await.unwrap();
867 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
868 &mut tx,
869 proof,
870 traversal_path,
871 (i + 1) as u64,
872 )
873 .await
874 .expect("failed to insert nodes");
875 tx.commit().await.unwrap();
876 }
877 }
878
879 #[test_log::test(tokio::test(flavor = "multi_thread"))]
880 async fn test_merklized_storage_with_commit() {
881 let db = TmpDb::init().await;
884 let storage = SqlStorage::connect(db.config()).await.unwrap();
885
886 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
888 let block_height = 1;
889 test_tree.update(0, 0).unwrap();
891 let commitment = test_tree.commitment();
892
893 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(commitment).unwrap()});
894 let mut tx = storage.write().await.unwrap();
896 tx.upsert(
897 "header",
898 ["height", "hash", "payload_hash", "timestamp", "data"],
899 ["height"],
900 [(
901 block_height as i64,
902 "randomString".to_string(),
903 "t".to_string(),
904 0,
905 test_data,
906 )],
907 )
908 .await
909 .unwrap();
910 let (_, proof) = test_tree.lookup(0).expect_ok().unwrap();
912 let traversal_path =
914 <usize as ToTraversalPath<8>>::to_traversal_path(&0, test_tree.height());
915 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
917 &mut tx,
918 proof.clone(),
919 traversal_path.clone(),
920 block_height as u64,
921 )
922 .await
923 .expect("failed to insert nodes");
924 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
926 .await
927 .unwrap();
928 tx.commit().await.unwrap();
929
930 let merkle_proof = storage
931 .read()
932 .await
933 .unwrap()
934 .get_path(Snapshot::<_, MockMerkleTree, 8>::Commit(commitment), 0)
935 .await
936 .unwrap();
937
938 let (_, proof) = test_tree.lookup(0).expect_ok().unwrap();
939
940 assert_eq!(merkle_proof, proof.clone(), "merkle paths mismatch");
941 }
942 #[test_log::test(tokio::test(flavor = "multi_thread"))]
943 async fn test_merklized_state_missing_state() {
944 let db = TmpDb::init().await;
952 let storage = SqlStorage::connect(db.config()).await.unwrap();
953
954 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
956 let block_height = 1;
957 let mut tx = storage.write().await.unwrap();
960 for i in 0..27 {
961 test_tree.update(i, i).unwrap();
962 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
963 tx.upsert(
965 "header",
966 ["height", "hash", "payload_hash", "timestamp", "data"],
967 ["height"],
968 [(
969 block_height as i64,
970 format!("rarndomString{i}"),
971 "t".to_string(),
972 0,
973 test_data,
974 )],
975 )
976 .await
977 .unwrap();
978 let (_, proof) = test_tree.lookup(i).expect_ok().unwrap();
980 let traversal_path =
982 <usize as ToTraversalPath<8>>::to_traversal_path(&i, test_tree.height());
983 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
985 &mut tx,
986 proof.clone(),
987 traversal_path.clone(),
988 block_height as u64,
989 )
990 .await
991 .expect("failed to insert nodes");
992 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, block_height)
994 .await
995 .unwrap();
996 }
997
998 test_tree.update(1, 100).unwrap();
999 let traversal_path =
1001 <usize as ToTraversalPath<8>>::to_traversal_path(&1, test_tree.height());
1002 let (_, proof) = test_tree.lookup(1).expect_ok().unwrap();
1003
1004 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1006 &mut tx,
1007 proof.clone(),
1008 traversal_path.clone(),
1009 block_height as u64,
1010 )
1011 .await
1012 .expect("failed to insert nodes");
1013 tx.commit().await.unwrap();
1014
1015 let merkle_path = storage
1016 .read()
1017 .await
1018 .unwrap()
1019 .get_path(
1020 Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
1021 1,
1022 )
1023 .await;
1024 assert!(merkle_path.is_err());
1025
1026 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
1027 let mut tx = storage.write().await.unwrap();
1029 tx.upsert(
1030 "header",
1031 ["height", "hash", "payload_hash", "timestamp", "data"],
1032 ["height"],
1033 [(
1034 block_height as i64,
1035 "randomStringgg".to_string(),
1036 "t".to_string(),
1037 0,
1038 test_data,
1039 )],
1040 )
1041 .await
1042 .unwrap();
1043 tx.commit().await.unwrap();
1044 let merkle_proof = storage
1046 .read()
1047 .await
1048 .unwrap()
1049 .get_path(
1050 Snapshot::<_, MockMerkleTree, 8>::Index(block_height as u64),
1051 1,
1052 )
1053 .await
1054 .unwrap();
1055 assert_eq!(merkle_proof, proof, "path dont match");
1056
1057 test_tree.update(1, 200).unwrap();
1060
1061 let (_, proof) = test_tree.lookup(1).expect_ok().unwrap();
1062 let test_data = serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()});
1063
1064 let mut tx = storage.write().await.unwrap();
1066 tx.upsert(
1067 "header",
1068 ["height", "hash", "payload_hash", "timestamp", "data"],
1069 ["height"],
1070 [(
1071 2i64,
1072 "randomHashString".to_string(),
1073 "t".to_string(),
1074 0,
1075 test_data,
1076 )],
1077 )
1078 .await
1079 .unwrap();
1080 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1081 &mut tx,
1082 proof.clone(),
1083 traversal_path.clone(),
1084 2_u64,
1085 )
1086 .await
1087 .expect("failed to insert nodes");
1088
1089 let node_path = traversal_path
1091 .iter()
1092 .skip(1)
1093 .rev()
1094 .map(|n| *n as i32)
1095 .collect::<Vec<_>>();
1096 tx.execute(
1097 query(&format!(
1098 "DELETE FROM {} WHERE created = 2 and path = $1",
1099 MockMerkleTree::state_type()
1100 ))
1101 .bind(serde_json::to_value(node_path).unwrap()),
1102 )
1103 .await
1104 .expect("failed to delete internal node");
1105 tx.commit().await.unwrap();
1106
1107 let merkle_path = storage
1108 .read()
1109 .await
1110 .unwrap()
1111 .get_path(Snapshot::<_, MockMerkleTree, 8>::Index(2_u64), 1)
1112 .await;
1113
1114 assert!(merkle_path.is_err());
1115 }
1116
1117 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1118 async fn test_merklized_state_snapshot() {
1119 let db = TmpDb::init().await;
1120 let storage = SqlStorage::connect(db.config()).await.unwrap();
1121
1122 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1124
1125 const RESERVED_KEY: usize = (u32::MAX as usize) + 1;
1128
1129 #[tracing::instrument(skip(tree, expected))]
1132 fn randomize(tree: &mut MockMerkleTree, expected: &mut HashMap<usize, Option<usize>>) {
1133 let mut rng = rand::thread_rng();
1134 tracing::info!("randomizing tree");
1135
1136 for _ in 0..50 {
1137 if !expected.values().any(|v| v.is_some()) || rng.next_u32() % 2 == 0 {
1140 let key = rng.next_u32() as usize;
1142 let val = rng.next_u32() as usize;
1143 tracing::info!(key, val, "inserting");
1144
1145 tree.update(key, val).unwrap();
1146 expected.insert(key, Some(val));
1147 } else {
1148 let key = expected
1150 .iter()
1151 .filter_map(|(k, v)| if v.is_some() { Some(k) } else { None })
1152 .choose(&mut rng)
1153 .unwrap();
1154 tracing::info!(key, "deleting");
1155
1156 tree.remove(key).unwrap();
1157 expected.insert(*key, None);
1158 }
1159 }
1160 }
1161
1162 #[tracing::instrument(skip(storage, tree, expected))]
1164 async fn store(
1165 storage: &SqlStorage,
1166 tree: &MockMerkleTree,
1167 expected: &HashMap<usize, Option<usize>>,
1168 block_height: u64,
1169 ) {
1170 tracing::info!("persisting tree");
1171 let mut tx = storage.write().await.unwrap();
1172
1173 for key in expected.keys() {
1174 let proof = match tree.universal_lookup(key) {
1175 LookupResult::Ok(_, proof) => proof,
1176 LookupResult::NotFound(proof) => proof,
1177 LookupResult::NotInMemory => panic!("failed to find key {key}"),
1178 };
1179 let traversal_path = ToTraversalPath::<8>::to_traversal_path(key, tree.height());
1180 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1181 &mut tx,
1182 proof,
1183 traversal_path,
1184 block_height,
1185 )
1186 .await
1187 .unwrap();
1188 }
1189 tx
1191 .upsert("header", ["height", "hash", "payload_hash", "timestamp", "data"], ["height"],
1192 [(
1193 block_height as i64,
1194 format!("hash{block_height}"),
1195 "hash".to_string(),
1196 0i64,
1197 serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(tree.commitment()).unwrap()}),
1198 )],
1199 )
1200 .await
1201 .unwrap();
1202 UpdateStateData::<MockTypes, MockMerkleTree, 8>::set_last_state_height(
1203 &mut tx,
1204 block_height as usize,
1205 )
1206 .await
1207 .unwrap();
1208 tx.commit().await.unwrap();
1209 }
1210
1211 #[tracing::instrument(skip(storage, tree, expected))]
1212 async fn validate(
1213 storage: &SqlStorage,
1214 tree: &MockMerkleTree,
1215 expected: &HashMap<usize, Option<usize>>,
1216 block_height: u64,
1217 ) {
1218 tracing::info!("validating snapshot");
1219
1220 let snapshot = Snapshot::<_, MockMerkleTree, 8>::Index(block_height);
1222
1223 for (key, val) in expected {
1224 let proof = match tree.universal_lookup(key) {
1225 LookupResult::Ok(_, proof) => proof,
1226 LookupResult::NotFound(proof) => proof,
1227 LookupResult::NotInMemory => panic!("failed to find key {key}"),
1228 };
1229 assert_eq!(
1230 proof,
1231 storage
1232 .read()
1233 .await
1234 .unwrap()
1235 .get_path(snapshot, *key)
1236 .await
1237 .unwrap()
1238 );
1239 assert_eq!(val.as_ref(), proof.elem());
1240 if val.is_some() {
1242 MockMerkleTree::verify(tree.commitment(), key, proof)
1243 .unwrap()
1244 .unwrap();
1245 } else {
1246 assert!(
1247 MockMerkleTree::non_membership_verify(tree.commitment(), key, proof)
1248 .unwrap()
1249 );
1250 }
1251 }
1252
1253 let proof = match tree.universal_lookup(RESERVED_KEY) {
1255 LookupResult::Ok(_, proof) => proof,
1256 LookupResult::NotFound(proof) => proof,
1257 LookupResult::NotInMemory => panic!("failed to find reserved key {RESERVED_KEY}"),
1258 };
1259 assert_eq!(
1260 proof,
1261 storage
1262 .read()
1263 .await
1264 .unwrap()
1265 .get_path(snapshot, RESERVED_KEY)
1266 .await
1267 .unwrap()
1268 );
1269 assert_eq!(proof.elem(), None);
1270 assert!(
1272 MockMerkleTree::non_membership_verify(tree.commitment(), RESERVED_KEY, proof)
1273 .unwrap()
1274 );
1275 }
1276
1277 let mut expected = HashMap::<usize, Option<usize>>::new();
1279 randomize(&mut test_tree, &mut expected);
1280
1281 store(&storage, &test_tree, &expected, 1).await;
1283 validate(&storage, &test_tree, &expected, 1).await;
1284
1285 let mut expected2 = expected.clone();
1287 let mut test_tree2 = test_tree.clone();
1288 randomize(&mut test_tree2, &mut expected2);
1289 store(&storage, &test_tree2, &expected2, 2).await;
1290 validate(&storage, &test_tree2, &expected2, 2).await;
1291
1292 validate(&storage, &test_tree, &expected, 1).await;
1294 }
1295
1296 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1297 async fn test_merklized_state_missing_leaf() {
1298 for tree_size in 1..=3 {
1305 let db = TmpDb::init().await;
1306 let storage = SqlStorage::connect(db.config()).await.unwrap();
1307
1308 let mut test_tree = MockMerkleTree::new(MockMerkleTree::tree_height());
1310 for i in 0..tree_size {
1311 test_tree.update(i, i).unwrap();
1312 }
1313
1314 let mut tx = storage.write().await.unwrap();
1315
1316 tx.upsert(
1318 "header",
1319 ["height", "hash", "payload_hash", "timestamp", "data"],
1320 ["height"],
1321 [(
1322 0i64,
1323 "hash".to_string(),
1324 "hash".to_string(),
1325 0,
1326 serde_json::json!({ MockMerkleTree::header_state_commitment_field() : serde_json::to_value(test_tree.commitment()).unwrap()}),
1327 )],
1328 )
1329 .await
1330 .unwrap();
1331
1332 for i in 0..tree_size {
1334 let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1335 let traversal_path =
1336 ToTraversalPath::<8>::to_traversal_path(&i, test_tree.height());
1337 UpdateStateData::<_, MockMerkleTree, 8>::insert_merkle_nodes(
1338 &mut tx,
1339 proof,
1340 traversal_path,
1341 0,
1342 )
1343 .await
1344 .unwrap();
1345 }
1346 UpdateStateData::<_, MockMerkleTree, 8>::set_last_state_height(&mut tx, 0)
1347 .await
1348 .unwrap();
1349 tx.commit().await.unwrap();
1350
1351 let snapshot = Snapshot::<MockTypes, MockMerkleTree, 8>::Index(0);
1353 for i in 0..tree_size {
1354 let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1355 assert_eq!(
1356 proof,
1357 storage
1358 .read()
1359 .await
1360 .unwrap()
1361 .get_path(snapshot, i)
1362 .await
1363 .unwrap()
1364 );
1365 assert_eq!(*proof.elem().unwrap(), i);
1366 }
1367
1368 let index = serde_json::to_value(tree_size - 1).unwrap();
1370 let mut tx = storage.write().await.unwrap();
1371
1372 tx.execute(
1373 query(&format!(
1374 "DELETE FROM {} WHERE idx = $1",
1375 MockMerkleTree::state_type()
1376 ))
1377 .bind(serde_json::to_value(index).unwrap()),
1378 )
1379 .await
1380 .unwrap();
1381 tx.commit().await.unwrap();
1382
1383 for i in 0..tree_size - 1 {
1385 let proof = test_tree.lookup(i).expect_ok().unwrap().1;
1386 assert_eq!(
1387 proof,
1388 storage
1389 .read()
1390 .await
1391 .unwrap()
1392 .get_path(snapshot, i)
1393 .await
1394 .unwrap()
1395 );
1396 assert_eq!(*proof.elem().unwrap(), i);
1397 }
1398
1399 let err = storage
1401 .read()
1402 .await
1403 .unwrap()
1404 .get_path(snapshot, tree_size - 1)
1405 .await
1406 .unwrap_err();
1407 assert!(matches!(err, QueryError::Missing));
1408 }
1409 }
1410}