1mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50 use std::ops::{Bound, RangeBounds};
51
52 use futures::{
53 future,
54 stream::{BoxStream, StreamExt},
55 };
56
57 use crate::{
58 availability::{BlockQueryData, Fetch, LeafQueryData},
59 node::NodeDataSource,
60 testing::{consensus::TestableDataSource, mocks::MockTypes},
61 };
62
63 async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize>
65 where
66 D: TestableDataSource,
67 R: RangeBounds<usize>,
68 {
69 let start = range.start_bound().cloned();
70 let mut end = range.end_bound().cloned();
71 if end == Bound::Unbounded {
72 end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73 }
74 (start, end)
75 }
76
77 pub async fn block_range<R, D>(
79 ds: &D,
80 range: R,
81 ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82 where
83 D: TestableDataSource,
84 R: RangeBounds<usize> + Send + 'static,
85 {
86 ds.get_block_range(bound_range(ds, range).await)
87 .await
88 .then(Fetch::resolve)
89 .boxed()
90 }
91
92 pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94 where
95 D: TestableDataSource,
96 R: RangeBounds<usize> + Send + 'static,
97 {
98 ds.get_leaf_range(bound_range(ds, range).await)
99 .await
100 .then(Fetch::resolve)
101 .boxed()
102 }
103
104 pub async fn get_non_empty_blocks<D>(
105 ds: &D,
106 ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107 where
108 D: TestableDataSource,
109 {
110 leaf_range(ds, 1..)
112 .await
113 .zip(block_range(ds, 1..).await)
114 .filter(|(_, block)| future::ready(!block.is_empty()))
115 .collect()
116 .await
117 }
118}
119
120#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124 use std::{
125 collections::HashMap,
126 fmt::Debug,
127 ops::{Bound, RangeBounds},
128 };
129
130 use committable::Committable;
131 use futures::stream::StreamExt;
132 use hotshot_types::data::Leaf2;
133
134 use super::test_helpers::*;
135 use crate::{
136 availability::{payload_size, BlockId},
137 data_source::storage::NodeStorage,
138 node::NodeDataSource,
139 testing::{
140 consensus::{MockNetwork, TestableDataSource},
141 mocks::{mock_transaction, MockTypes, MockVersions},
142 },
143 types::HeightIndexed,
144 };
145
146 async fn validate(ds: &impl TestableDataSource) {
147 let mut seen_payloads = HashMap::new();
150 let mut seen_transactions = HashMap::new();
151 let mut leaves = leaf_range(ds, ..).await.enumerate();
152 while let Some((i, leaf)) = leaves.next().await {
153 assert_eq!(leaf.height(), i as u64);
154 assert_eq!(
155 leaf.hash(),
156 <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
157 );
158
159 tracing::info!("looking up leaf {i} various ways");
161 assert_eq!(leaf, ds.get_leaf(i).await.await);
162 assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
163
164 tracing::info!("looking up block {i} various ways");
165 let block = ds.get_block(i).await.await;
166 assert_eq!(leaf.block_hash(), block.hash());
167 assert_eq!(block.height(), i as u64);
168 assert_eq!(block.hash(), block.header().commit());
169 assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
170
171 assert_eq!(block, ds.get_block(i).await.await);
173 assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
174 let ix = seen_payloads
184 .entry(block.payload_hash())
185 .or_insert(i as u64);
186 if let Ok(block) = ds
187 .get_block(BlockId::PayloadHash(block.payload_hash()))
188 .await
189 .try_resolve()
190 {
191 assert_eq!(block.height(), *ix);
192 } else {
193 tracing::warn!(
194 "skipping block by payload index check for missing payload {:?}",
195 block.header()
196 );
197 ds.get_block(BlockId::PayloadHash(block.payload_hash()))
199 .await
200 .await;
201 }
202
203 tracing::info!("looking up payload {i} various ways");
205 let expected_payload = block.clone().into();
206 assert_eq!(ds.get_payload(i).await.await, expected_payload);
207 assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
208 if let Ok(payload) = ds
211 .get_payload(BlockId::PayloadHash(block.payload_hash()))
212 .await
213 .try_resolve()
214 {
215 if *ix == i as u64 {
216 assert_eq!(payload, expected_payload);
217 }
218 } else {
219 tracing::warn!(
220 "skipping payload index check for missing payload {:?}",
221 block.header()
222 );
223 ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
225 .await
226 .await;
227 }
228
229 tracing::info!("looking up VID common {i} various ways");
231 let common = ds.get_vid_common(block.height() as usize).await.await;
232 assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
233 if let Ok(res) = ds
236 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
237 .await
238 .try_resolve()
239 {
240 if *ix == i as u64 {
241 assert_eq!(res, common);
242 }
243 } else {
244 tracing::warn!(
245 "skipping VID common index check for missing data {:?}",
246 block.header()
247 );
248 let res = ds
250 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
251 .await
252 .await;
253 assert_eq!(res.payload_hash(), common.payload_hash());
254 }
255
256 for (j, txn) in block.enumerate() {
257 tracing::info!("looking up transaction {i},{j:?}");
258
259 let ix = seen_transactions
267 .entry(txn.commit())
268 .or_insert((i as u64, j.clone()));
269 if let Ok(tx_data) = ds
270 .get_block_containing_transaction(txn.commit())
271 .await
272 .try_resolve()
273 {
274 assert_eq!(tx_data.transaction.transaction(), &txn);
275 assert_eq!(tx_data.transaction.block_height(), ix.0);
276 assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
277 assert_eq!(tx_data.index, ix.1);
278 assert_eq!(tx_data.block, block);
279 } else {
280 tracing::warn!(
281 "skipping transaction index check for missing transaction {j:?} {txn:?}"
282 );
283 ds.get_block_containing_transaction(txn.commit())
285 .await
286 .await;
287 }
288 }
289 }
290 }
291
292 #[test_log::test(tokio::test(flavor = "multi_thread"))]
293 pub async fn test_update<D: TestableDataSource>()
294 where
295 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
296 {
297 let mut network = MockNetwork::<D, MockVersions>::init().await;
298 let ds = network.data_source();
299
300 network.start().await;
301 assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
302
303 let mut blocks = ds.subscribe_blocks(0).await.enumerate();
306 for nonce in 0..3 {
307 let txn = mock_transaction(vec![nonce]);
308 network.submit_transaction(txn).await;
309
310 let (i, block) = loop {
312 tracing::info!("waiting for tx {nonce}");
313 let (i, block) = blocks.next().await.unwrap();
314 if !block.is_empty() {
315 break (i, block);
316 }
317 tracing::info!("block {i} is empty");
318 };
319
320 tracing::info!("got tx {nonce} in block {i}");
321 assert_eq!(ds.get_block(i).await.await, block);
322 validate(&ds).await;
323 }
324
325 {
329 tracing::info!("checking persisted storage");
330 let storage = D::connect(network.storage()).await;
331
332 let block_height = NodeDataSource::block_height(&ds).await.unwrap();
336 assert_eq!(
337 ds.get_block_range(..block_height)
338 .await
339 .map(|fetch| fetch.try_resolve().ok())
340 .collect::<Vec<_>>()
341 .await,
342 storage
343 .get_block_range(..block_height)
344 .await
345 .map(|fetch| fetch.try_resolve().ok())
346 .collect::<Vec<_>>()
347 .await
348 );
349 assert_eq!(
350 ds.get_leaf_range(..block_height)
351 .await
352 .map(|fetch| fetch.try_resolve().ok())
353 .collect::<Vec<_>>()
354 .await,
355 storage
356 .get_leaf_range(..block_height)
357 .await
358 .map(|fetch| fetch.try_resolve().ok())
359 .collect::<Vec<_>>()
360 .await
361 );
362 }
363 }
364
365 #[test_log::test(tokio::test(flavor = "multi_thread"))]
366 pub async fn test_range<D: TestableDataSource>()
367 where
368 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
369 {
370 let mut network = MockNetwork::<D, MockVersions>::init().await;
371 let ds = network.data_source();
372 network.start().await;
373
374 let block_height = loop {
376 let mut tx = ds.read().await.unwrap();
377 let block_height = tx.block_height().await.unwrap();
378 if block_height >= 3 {
379 break block_height as u64;
380 }
381 };
382
383 do_range_test(&ds, 1..=2, 1..3).await; do_range_test(&ds, 1..3, 1..3).await; do_range_test(&ds, 1.., 1..block_height).await; do_range_test(&ds, ..=2, 0..3).await; do_range_test(&ds, ..3, 0..3).await; do_range_test(&ds, .., 0..block_height).await; do_range_test(&ds, ExRange(0..=2), 1..3).await; do_range_test(&ds, ExRange(0..3), 1..3).await; do_range_test(&ds, ExRange(0..), 1..block_height).await; }
395
396 async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
397 where
398 D: TestableDataSource,
399 R: RangeBounds<usize> + Clone + Debug + Send + 'static,
400 I: IntoIterator<Item = u64>,
401 {
402 tracing::info!("testing range {range:?}");
403
404 let mut leaves = ds.get_leaf_range(range.clone()).await;
405 let mut blocks = ds.get_block_range(range.clone()).await;
406 let mut payloads = ds.get_payload_range(range.clone()).await;
407 let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
408 let mut vid_common = ds.get_vid_common_range(range.clone()).await;
409 let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
410
411 for i in expected_indices {
412 tracing::info!(i, "check entries");
413 let leaf = leaves.next().await.unwrap().await;
414 let block = blocks.next().await.unwrap().await;
415 let payload = payloads.next().await.unwrap().await;
416 let payload_meta = payloads_meta.next().await.unwrap().await;
417 let common = vid_common.next().await.unwrap().await;
418 let common_meta = vid_common_meta.next().await.unwrap().await;
419 assert_eq!(leaf.height(), i);
420 assert_eq!(block.height(), i);
421 assert_eq!(payload, ds.get_payload(i as usize).await.await);
422 assert_eq!(payload_meta, block.into());
423 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
424 assert_eq!(common_meta, common.into());
425 }
426
427 if range.end_bound() == Bound::Unbounded {
428 loop {
431 let fetch_leaf = leaves.next().await.unwrap();
432 let fetch_block = blocks.next().await.unwrap();
433 let fetch_payload = payloads.next().await.unwrap();
434 let fetch_payload_meta = payloads_meta.next().await.unwrap();
435 let fetch_common = vid_common.next().await.unwrap();
436 let fetch_common_meta = vid_common_meta.next().await.unwrap();
437
438 if fetch_leaf.try_resolve().is_ok()
439 && fetch_block.try_resolve().is_ok()
440 && fetch_payload.try_resolve().is_ok()
441 && fetch_payload_meta.try_resolve().is_ok()
442 && fetch_common.try_resolve().is_ok()
443 && fetch_common_meta.try_resolve().is_ok()
444 {
445 tracing::info!("searching for end of available objects");
446 } else {
447 break;
448 }
449 }
450 } else {
451 assert!(leaves.next().await.is_none());
453 assert!(blocks.next().await.is_none());
454 assert!(payloads.next().await.is_none());
455 assert!(payloads_meta.next().await.is_none());
456 assert!(vid_common.next().await.is_none());
457 assert!(vid_common_meta.next().await.is_none());
458 }
459 }
460
461 #[test_log::test(tokio::test(flavor = "multi_thread"))]
462 pub async fn test_range_rev<D: TestableDataSource>()
463 where
464 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
465 {
466 let mut network = MockNetwork::<D, MockVersions>::init().await;
467 let ds = network.data_source();
468 network.start().await;
469
470 ds.subscribe_leaves(5).await.next().await.unwrap();
472
473 do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
475 do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
476 do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
477 }
478
479 async fn do_range_rev_test<D>(
480 ds: &D,
481 start: Bound<usize>,
482 end: usize,
483 expected_indices: impl DoubleEndedIterator<Item = u64>,
484 ) where
485 D: TestableDataSource,
486 {
487 tracing::info!("testing range {start:?}-{end}");
488
489 let mut leaves = ds.get_leaf_range_rev(start, end).await;
490 let mut blocks = ds.get_block_range_rev(start, end).await;
491 let mut payloads = ds.get_payload_range_rev(start, end).await;
492 let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
493 let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
494 let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
495
496 for i in expected_indices.rev() {
497 tracing::info!(i, "check entries");
498 let leaf = leaves.next().await.unwrap().await;
499 let block = blocks.next().await.unwrap().await;
500 let payload = payloads.next().await.unwrap().await;
501 let payload_meta = payloads_meta.next().await.unwrap().await;
502 let common = vid_common.next().await.unwrap().await;
503 let common_meta = vid_common_meta.next().await.unwrap().await;
504 assert_eq!(leaf.height(), i);
505 assert_eq!(block.height(), i);
506 assert_eq!(payload.height(), i);
507 assert_eq!(payload_meta.height(), i);
508 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
509 assert_eq!(
510 common_meta,
511 ds.get_vid_common_metadata(i as usize).await.await
512 );
513 }
514
515 assert!(leaves.next().await.is_none());
517 assert!(blocks.next().await.is_none());
518 assert!(payloads.next().await.is_none());
519 assert!(payloads_meta.next().await.is_none());
520 assert!(vid_common.next().await.is_none());
521 assert!(vid_common_meta.next().await.is_none());
522 }
523
524 #[derive(Clone, Copy, Debug)]
526 struct ExRange<R>(R);
527
528 impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
529 fn start_bound(&self) -> Bound<&usize> {
530 match self.0.start_bound() {
531 Bound::Included(x) => Bound::Excluded(x),
532 Bound::Excluded(x) => Bound::Excluded(x),
533 Bound::Unbounded => Bound::Excluded(&0),
534 }
535 }
536
537 fn end_bound(&self) -> Bound<&usize> {
538 self.0.end_bound()
539 }
540 }
541}
542
543#[cfg(any(test, feature = "testing"))]
545#[espresso_macros::generic_tests]
546pub mod persistence_tests {
547 use committable::Committable;
548 use hotshot_example_types::state_types::{TestInstanceState, TestValidatedState};
549 use hotshot_types::simple_certificate::QuorumCertificate2;
550
551 use crate::{
552 availability::{BlockQueryData, LeafQueryData},
553 data_source::{
554 storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
555 Transaction,
556 },
557 node::NodeDataSource,
558 testing::{
559 consensus::TestableDataSource,
560 mocks::{MockPayload, MockTypes},
561 },
562 types::HeightIndexed,
563 Leaf2,
564 };
565
566 #[test_log::test(tokio::test(flavor = "multi_thread"))]
567 pub async fn test_revert<D: TestableDataSource>()
568 where
569 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
570 + AvailabilityStorage<MockTypes>
571 + NodeStorage<MockTypes>,
572 {
573 use hotshot_example_types::node_types::TestVersions;
574
575 let storage = D::create(0).await;
576 let ds = D::connect(&storage).await;
577
578 let mut qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
580 &TestValidatedState::default(),
581 &TestInstanceState::default(),
582 )
583 .await;
584 let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
585 &TestValidatedState::default(),
586 &TestInstanceState::default(),
587 )
588 .await;
589 leaf.block_header_mut().block_number += 1;
592 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
593
594 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
595 let leaf = LeafQueryData::new(leaf, qc).unwrap();
596
597 let mut tx = ds.write().await.unwrap();
599 tx.insert_leaf(leaf.clone()).await.unwrap();
600 tx.insert_block(block.clone()).await.unwrap();
601
602 assert_eq!(tx.block_height().await.unwrap(), 2);
603 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
604 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
605
606 tx.revert().await;
608 assert_eq!(
609 NodeDataSource::<MockTypes>::block_height(&ds)
610 .await
611 .unwrap(),
612 0
613 );
614 ds.get_leaf(1).await.try_resolve().unwrap_err();
615 ds.get_block(1).await.try_resolve().unwrap_err();
616 }
617
618 #[test_log::test(tokio::test(flavor = "multi_thread"))]
619 pub async fn test_reset<D: TestableDataSource>()
620 where
621 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
622 {
623 use hotshot_example_types::node_types::TestVersions;
624
625 let storage = D::create(0).await;
626 let ds = D::connect(&storage).await;
627
628 let mut qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
630 &TestValidatedState::default(),
631 &TestInstanceState::default(),
632 )
633 .await;
634 let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
635 &TestValidatedState::default(),
636 &TestInstanceState::default(),
637 )
638 .await;
639 leaf.block_header_mut().block_number += 1;
642 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
643
644 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
645 let leaf = LeafQueryData::new(leaf, qc).unwrap();
646
647 let mut tx = ds.write().await.unwrap();
649 tx.insert_leaf(leaf.clone()).await.unwrap();
650 tx.insert_block(block.clone()).await.unwrap();
651 tx.commit().await.unwrap();
652
653 assert_eq!(
654 NodeDataSource::<MockTypes>::block_height(&ds)
655 .await
656 .unwrap(),
657 2
658 );
659 assert_eq!(leaf, ds.get_leaf(1).await.await);
660 assert_eq!(block, ds.get_block(1).await.await);
661
662 drop(ds);
663
664 let ds = D::reset(&storage).await;
666 assert_eq!(
667 NodeDataSource::<MockTypes>::block_height(&ds)
668 .await
669 .unwrap(),
670 0
671 );
672 ds.get_leaf(1).await.try_resolve().unwrap_err();
673 ds.get_block(1).await.try_resolve().unwrap_err();
674 }
675
676 #[test_log::test(tokio::test(flavor = "multi_thread"))]
677 pub async fn test_drop_tx<D: TestableDataSource>()
678 where
679 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
680 + AvailabilityStorage<MockTypes>
681 + NodeStorage<MockTypes>,
682 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
683 {
684 use hotshot_example_types::node_types::TestVersions;
685
686 let storage = D::create(0).await;
687 let ds = D::connect(&storage).await;
688
689 let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
691 &TestValidatedState::default(),
692 &TestInstanceState::default(),
693 )
694 .await;
695 let mut mock_leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
696 &TestValidatedState::default(),
697 &TestInstanceState::default(),
698 )
699 .await;
700 mock_leaf.block_header_mut().block_number += 1;
703 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
704
705 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
706 let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
707
708 tracing::info!("write");
710 let mut tx = ds.write().await.unwrap();
711 tx.insert_leaf(leaf.clone()).await.unwrap();
712 tx.insert_block(block.clone()).await.unwrap();
713
714 assert_eq!(tx.block_height().await.unwrap(), 2);
715 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
716 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
717
718 drop(tx);
720
721 tracing::info!("read");
723 let mut tx = ds.read().await.unwrap();
724 assert_eq!(tx.block_height().await.unwrap(), 0);
725 drop(tx);
726
727 mock_leaf.block_header_mut().block_number += 1;
729 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
730 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
731 let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
732
733 tracing::info!("write again");
734 let mut tx = ds.write().await.unwrap();
735 tx.insert_leaf(leaf.clone()).await.unwrap();
736 tx.insert_block(block.clone()).await.unwrap();
737 tx.commit().await.unwrap();
738
739 tracing::info!("read again");
742 let height = leaf.height() as usize;
743 assert_eq!(
744 NodeDataSource::<MockTypes>::block_height(&ds)
745 .await
746 .unwrap(),
747 height + 1
748 );
749 assert_eq!(leaf, ds.get_leaf(height).await.await);
750 assert_eq!(block, ds.get_block(height).await.await);
751 ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
752 ds.get_block(height - 1).await.try_resolve().unwrap_err();
753 }
754}
755
756#[cfg(any(test, feature = "testing"))]
758#[espresso_macros::generic_tests]
759pub mod node_tests {
760 use std::time::Duration;
761
762 use committable::Committable;
763 use futures::{future::join_all, stream::StreamExt};
764 use hotshot::traits::BlockPayload;
765 use hotshot_example_types::{
766 block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
767 node_types::TestTypes,
768 state_types::{TestInstanceState, TestValidatedState},
769 };
770 use hotshot_types::{
771 data::{vid_commitment, VidCommitment, VidShare},
772 traits::{
773 block_contents::{BlockHeader, EncodeBytes},
774 node_implementation::Versions,
775 },
776 vid::advz::{advz_scheme, ADVZScheme},
777 };
778 use jf_vid::VidScheme;
779 use vbs::version::StaticVersionType;
780
781 use crate::{
782 availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
783 data_source::{
784 storage::{NodeStorage, UpdateAvailabilityStorage},
785 update::Transaction,
786 },
787 node::{BlockId, NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
788 testing::{
789 consensus::{MockNetwork, TestableDataSource},
790 mocks::{mock_transaction, MockPayload, MockTypes, MockVersions},
791 sleep,
792 },
793 types::HeightIndexed,
794 Header, VidCommon,
795 };
796
797 fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
798 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
799 }
800
801 #[test_log::test(tokio::test(flavor = "multi_thread"))]
802 pub async fn test_sync_status<D: TestableDataSource>()
803 where
804 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
805 {
806 use hotshot_example_types::node_types::TestVersions;
807
808 let storage = D::create(0).await;
809 let ds = D::connect(&storage).await;
810
811 let mut vid = advz_scheme(2);
813
814 let mut leaves = vec![
816 LeafQueryData::<MockTypes>::genesis::<TestVersions>(
817 &TestValidatedState::default(),
818 &TestInstanceState::default(),
819 )
820 .await,
821 ];
822 let mut blocks = vec![
823 BlockQueryData::<MockTypes>::genesis::<TestVersions>(
824 &TestValidatedState::default(),
825 &TestInstanceState::default(),
826 )
827 .await,
828 ];
829 for i in 0..2 {
830 let mut leaf = leaves[i].clone();
831 leaf.leaf.block_header_mut().block_number += 1;
832 leaves.push(leaf);
833
834 let mut block = blocks[i].clone();
835 block.header.block_number += 1;
836 blocks.push(block);
837 }
838 let disperse = vid.disperse([]).unwrap();
841 let vid = leaves
842 .iter()
843 .map(|leaf| {
844 (
845 VidCommonQueryData::new(
846 leaf.header().clone(),
847 VidCommon::V0(disperse.common.clone()),
848 ),
849 disperse.shares[0].clone(),
850 )
851 })
852 .collect::<Vec<_>>();
853
854 assert!(ds.sync_status().await.unwrap().is_fully_synced());
856
857 ds.append(leaves[0].clone().into()).await.unwrap();
860 assert_eq!(
861 ds.sync_status().await.unwrap(),
862 SyncStatus {
863 missing_blocks: 1,
864 missing_vid_common: 1,
865 missing_vid_shares: 1,
866 missing_leaves: 0,
867 pruned_height: None,
868 }
869 );
870
871 ds.append(leaves[2].clone().into()).await.unwrap();
874 assert_eq!(
875 ds.sync_status().await.unwrap(),
876 SyncStatus {
877 missing_blocks: 3,
878 missing_vid_common: 3,
879 missing_vid_shares: 3,
880 missing_leaves: 1,
881 pruned_height: None,
882 }
883 );
884
885 {
887 let mut tx = ds.write().await.unwrap();
888 tx.insert_vid(vid[0].0.clone(), None).await.unwrap();
889 tx.commit().await.unwrap();
890 }
891 assert_eq!(
892 ds.sync_status().await.unwrap(),
893 SyncStatus {
894 missing_blocks: 3,
895 missing_vid_common: 2,
896 missing_vid_shares: 3,
897 missing_leaves: 1,
898 pruned_height: None,
899 }
900 );
901
902 {
904 let mut tx = ds.write().await.unwrap();
905 tx.insert_block(blocks[0].clone()).await.unwrap();
906 tx.insert_vid(vid[0].0.clone(), Some(VidShare::V0(vid[0].1.clone())))
907 .await
908 .unwrap();
909 tx.insert_leaf(leaves[1].clone()).await.unwrap();
910 tx.insert_block(blocks[1].clone()).await.unwrap();
911 tx.insert_vid(vid[1].0.clone(), Some(VidShare::V0(vid[1].1.clone())))
912 .await
913 .unwrap();
914 tx.insert_block(blocks[2].clone()).await.unwrap();
915 tx.insert_vid(vid[2].0.clone(), Some(VidShare::V0(vid[2].1.clone())))
916 .await
917 .unwrap();
918 tx.commit().await.unwrap();
919 }
920
921 let expected_missing = if ds.get_leaf(1).await.try_resolve().is_err() {
926 tracing::warn!(
927 "data source does not support out-of-order filling, allowing one missing leaf and \
928 VID share"
929 );
930 1
931 } else {
932 0
933 };
934 let expected_sync_status = SyncStatus {
935 missing_blocks: 0,
936 missing_leaves: expected_missing,
937 missing_vid_common: 0,
938 missing_vid_shares: expected_missing,
939 pruned_height: None,
940 };
941 assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
942
943 {
946 let mut tx = ds.write().await.unwrap();
947 tx.insert_vid(vid[0].0.clone(), None).await.unwrap();
948 tx.commit().await.unwrap();
949 }
950 assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
951 }
952
953 #[test_log::test(tokio::test(flavor = "multi_thread"))]
954 pub async fn test_counters<D: TestableDataSource>() {
955 use hotshot_example_types::node_types::TestVersions;
956
957 let storage = D::create(0).await;
958 let ds = D::connect(&storage).await;
959
960 assert_eq!(ds.count_transactions().await.unwrap(), 0);
961 assert_eq!(ds.payload_size().await.unwrap(), 0);
962
963 let mut total_transactions = 0;
965 let mut total_size = 0;
966 'outer: for i in [0, 1, 2] {
967 let (payload, metadata) =
972 <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
973 [mock_transaction(vec![i as u8 % 2])],
974 &TestValidatedState::default(),
975 &TestInstanceState::default(),
976 )
977 .await
978 .unwrap();
979 let encoded = payload.encode();
980 let payload_commitment = vid_commitment::<TestVersions>(
981 &encoded,
982 &metadata.encode(),
983 1,
984 <TestVersions as Versions>::Base::VERSION,
985 );
986 let header = TestBlockHeader {
987 block_number: i,
988 payload_commitment,
989 timestamp: i,
990 timestamp_millis: i * 1_000,
991 builder_commitment:
992 <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
993 &payload, &metadata,
994 ),
995 metadata: TestMetadata {
996 num_transactions: 7, },
998 random: 1, };
1000
1001 let mut leaf = LeafQueryData::<MockTypes>::genesis::<TestVersions>(
1002 &TestValidatedState::default(),
1003 &TestInstanceState::default(),
1004 )
1005 .await;
1006 *leaf.leaf.block_header_mut() = header.clone();
1007 let block = BlockQueryData::new(header, payload);
1008 ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1009 .await
1010 .unwrap();
1011 assert_eq!(
1012 NodeDataSource::<MockTypes>::block_height(&ds)
1013 .await
1014 .unwrap(),
1015 (i + 1) as usize,
1016 );
1017
1018 total_transactions += 1;
1019 total_size += encoded.len();
1020
1021 for retry in 0..5 {
1023 let ds_transactions = ds.count_transactions().await.unwrap();
1024 let ds_payload_size = ds.payload_size().await.unwrap();
1025 if ds_transactions != total_transactions || ds_payload_size != total_size {
1026 tracing::info!(
1027 i,
1028 retry,
1029 total_transactions,
1030 ds_transactions,
1031 total_size,
1032 ds_payload_size,
1033 "waiting for statistics to update"
1034 );
1035 sleep(Duration::from_secs(1)).await;
1036 } else {
1037 continue 'outer;
1038 }
1039 }
1040 panic!("counters did not update in time");
1041 }
1042 }
1043
1044 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1045 pub async fn test_vid_shares<D: TestableDataSource>()
1046 where
1047 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1048 {
1049 let mut network = MockNetwork::<D, MockVersions>::init().await;
1050 let ds = network.data_source();
1051
1052 network.start().await;
1053
1054 let mut leaves = ds.subscribe_leaves(0).await.take(3);
1056 while let Some(leaf) = leaves.next().await {
1057 tracing::info!("got leaf {}", leaf.height());
1058 let mut tx = ds.read().await.unwrap();
1059 let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1060 assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1061 assert_eq!(
1062 share,
1063 tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1064 .await
1065 .unwrap()
1066 );
1067 }
1068 }
1069
1070 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1071 pub async fn test_vid_monotonicity<D: TestableDataSource>()
1072 where
1073 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1074 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1075 {
1076 use hotshot_example_types::node_types::TestVersions;
1077
1078 let storage = D::create(0).await;
1079 let ds = D::connect(&storage).await;
1080
1081 let mut vid = advz_scheme(2);
1083 let disperse = vid.disperse([]).unwrap();
1084
1085 let leaf = LeafQueryData::<MockTypes>::genesis::<TestVersions>(
1087 &TestValidatedState::default(),
1088 &TestInstanceState::default(),
1089 )
1090 .await;
1091 let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1092 ds.append(BlockInfo::new(
1093 leaf,
1094 None,
1095 Some(common.clone()),
1096 Some(VidShare::V0(disperse.shares[0].clone())),
1097 None,
1098 ))
1099 .await
1100 .unwrap();
1101
1102 {
1103 assert_eq!(ds.get_vid_common(0).await.await, common);
1104 assert_eq!(
1105 ds.vid_share(0).await.unwrap(),
1106 VidShare::V0(disperse.shares[0].clone())
1107 );
1108 }
1109
1110 {
1113 let mut tx = ds.write().await.unwrap();
1114 tx.insert_vid(common.clone(), None).await.unwrap();
1115 tx.commit().await.unwrap();
1116 }
1117 {
1118 assert_eq!(ds.get_vid_common(0).await.await, common);
1119 assert_eq!(
1120 ds.vid_share(0).await.unwrap(),
1121 VidShare::V0(disperse.shares[0].clone())
1122 );
1123 }
1124 }
1125
1126 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1127 pub async fn test_vid_recovery<D: TestableDataSource>()
1128 where
1129 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1130 {
1131 let mut network = MockNetwork::<D, MockVersions>::init().await;
1132 let ds = network.data_source();
1133
1134 network.start().await;
1135
1136 let mut blocks = ds.subscribe_blocks(0).await;
1138 let txn = mock_transaction(vec![1, 2, 3]);
1139 network.submit_transaction(txn.clone()).await;
1140
1141 let block = loop {
1143 tracing::info!("waiting for transaction");
1144 let block = blocks.next().await.unwrap();
1145 if !block.is_empty() {
1146 tracing::info!(height = block.height(), "transaction sequenced");
1147 break block;
1148 }
1149 tracing::info!(height = block.height(), "empty block");
1150 };
1151 let height = block.height() as usize;
1152 let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1153 commit
1154 } else {
1155 panic!("expect ADVZ commitment")
1156 };
1157
1158 let vid = advz_scheme(network.num_nodes());
1160
1161 tracing::info!("fetching common data");
1163 let common = ds.get_vid_common(height).await.await;
1164 let VidCommon::V0(common) = &common.common() else {
1165 panic!("expect ADVZ common");
1166 };
1167 ADVZScheme::is_consistent(&commit, common).unwrap();
1168
1169 tracing::info!("fetching shares");
1171 let network = &network;
1172 let vid = &vid;
1173 let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1174 let ds = network.data_source_index(i);
1175
1176 let mut leaves = ds.subscribe_leaves(height).await;
1179 let leaf = leaves.next().await.unwrap();
1180 assert_eq!(leaf.height(), height as u64);
1181 assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1182
1183 let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1184 share
1185 } else {
1186 panic!("expect ADVZ share")
1187 };
1188 vid.verify_share(&share, common, &commit).unwrap().unwrap();
1189 share
1190 }))
1191 .await;
1192
1193 tracing::info!("recovering payload");
1195 let bytes = vid.recover_payload(&shares, common).unwrap();
1196 let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1197 &bytes,
1198 &TestMetadata {
1199 num_transactions: 7, },
1201 );
1202 assert_eq!(recovered, *block.payload());
1203 assert_eq!(recovered.transactions, vec![txn]);
1204 }
1205
1206 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1207 pub async fn test_timestamp_window<D: TestableDataSource>() {
1208 let mut network = MockNetwork::<D, MockVersions>::init().await;
1209 let ds = network.data_source();
1210
1211 network.start().await;
1212
1213 let mut leaves = ds.subscribe_leaves(0).await;
1216 let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1219 while test_blocks.len() < 3 {
1220 let leaf = leaves.next().await.unwrap();
1222 let header = leaf.header().clone();
1223 if let Some(last_timestamp) = test_blocks.last_mut() {
1224 if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1225 == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1226 {
1227 last_timestamp.push(header);
1228 } else {
1229 test_blocks.push(vec![header]);
1230 }
1231 } else {
1232 test_blocks.push(vec![header]);
1233 }
1234 }
1235 tracing::info!("blocks for testing: {test_blocks:#?}");
1236
1237 let check_invariants =
1239 |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1240 let mut prev = res.prev.as_ref();
1241 if let Some(prev) = prev {
1242 if check_prev {
1243 assert!(block_header_timestamp(prev) < start);
1244 }
1245 } else {
1246 assert_eq!(res.from().unwrap(), 0);
1249 };
1250 for header in &res.window {
1251 assert!(start <= block_header_timestamp(header));
1252 assert!(block_header_timestamp(header) < end);
1253 if let Some(prev) = prev {
1254 assert!(
1255 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1256 <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1257 );
1258 }
1259 prev = Some(header);
1260 }
1261 if let Some(next) = &res.next {
1262 assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1263 assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1266 }
1267 };
1268
1269 let get_window = |start, end| {
1270 let ds = ds.clone();
1271 async move {
1272 let window = ds
1273 .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1274 .await
1275 .unwrap();
1276 tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1277 check_invariants(&window, start, end, true);
1278 window
1279 }
1280 };
1281
1282 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1284 let end = start + 1;
1285 let res = get_window(start, end).await;
1286 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1287 assert_eq!(res.window, test_blocks[1]);
1288 assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1289
1290 let start = 0;
1292 let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1293 let res = get_window(start, end).await;
1294 assert_eq!(res.prev, None);
1295 assert_eq!(res.window, test_blocks[0]);
1296 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1297
1298 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1300 let end = i64::MAX as u64;
1301 let res = get_window(start, end).await;
1302 assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1303 assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1306 assert_eq!(res.next, None);
1307 let from = test_blocks.iter().flatten().count() - 1;
1311 let more = ds
1312 .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1313 .await
1314 .unwrap();
1315 check_invariants(&more, start, end, false);
1316 assert_eq!(
1317 more.prev.as_ref().unwrap(),
1318 test_blocks.iter().flatten().nth(from - 1).unwrap()
1319 );
1320 assert_eq!(
1321 more.window[..res.window.len() - test_blocks[2].len() + 1],
1322 res.window[test_blocks[2].len() - 1..]
1323 );
1324 assert_eq!(res.next, None);
1325 let more2 = ds
1327 .get_header_window(
1328 test_blocks[2].last().unwrap().commit(),
1329 end,
1330 i64::MAX as usize,
1331 )
1332 .await
1333 .unwrap();
1334 check_invariants(&more2, start, end, false);
1335 assert_eq!(more2.from().unwrap(), more.from().unwrap());
1336 assert_eq!(more2.prev, more.prev);
1337 assert_eq!(more2.next, more.next);
1338 assert_eq!(more2.window[..more.window.len()], more.window);
1339
1340 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1342 let end = start;
1343 let res = get_window(start, end).await;
1344 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1345 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1346 assert_eq!(res.window, vec![]);
1347
1348 ds.get_header_window(
1350 WindowStart::Time((i64::MAX - 1) as u64),
1351 i64::MAX as u64,
1352 i64::MAX as usize,
1353 )
1354 .await
1355 .unwrap_err();
1356
1357 let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1359 .into_iter()
1360 .flatten()
1361 .collect::<Vec<_>>();
1362 let start = block_header_timestamp(&blocks[0]);
1364 let end = block_header_timestamp(&test_blocks[2][0]);
1365 let res = ds
1366 .get_header_window(WindowStart::Time(start), end, 1)
1367 .await
1368 .unwrap();
1369 assert_eq!(res.prev, None);
1370 assert_eq!(res.window, [blocks[0].clone()]);
1371 assert_eq!(res.next, None);
1372 let res = ds
1374 .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1375 .await
1376 .unwrap();
1377 assert_eq!(res.window, [blocks[1].clone()]);
1378 assert_eq!(res.next, None);
1379 let res = ds
1381 .get_header_window(
1382 WindowStart::Height(blocks[1].height() + 1),
1383 end,
1384 blocks.len() - 1,
1385 )
1386 .await
1387 .unwrap();
1388 assert_eq!(res.window, blocks[2..].to_vec());
1389 assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1390 }
1391}
1392
1393#[cfg(any(test, feature = "testing"))]
1395#[espresso_macros::generic_tests]
1396pub mod status_tests {
1397 use std::time::Duration;
1398
1399 use crate::{
1400 status::StatusDataSource,
1401 testing::{
1402 consensus::{DataSourceLifeCycle, MockNetwork},
1403 mocks::{mock_transaction, MockVersions},
1404 sleep,
1405 },
1406 };
1407
1408 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1409 pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1410 let mut network = MockNetwork::<D, MockVersions>::init().await;
1411 let ds = network.data_source();
1412
1413 {
1414 assert_eq!(ds.block_height().await.unwrap(), 0);
1416 assert!(ds.success_rate().await.unwrap().is_nan());
1419 assert!(
1422 (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1423 - chrono::Utc::now().timestamp())
1424 .abs()
1425 <= 1,
1426 "time elapsed since last_decided_time is not within 1s"
1427 );
1428 }
1429
1430 let txn = mock_transaction(vec![1, 2, 3]);
1432 network.submit_transaction(txn.clone()).await;
1433
1434 network.start().await;
1436
1437 loop {
1439 let height = ds.block_height().await.unwrap();
1440 if height > 1 {
1441 break;
1442 }
1443 tracing::info!(height, "waiting for a block to be finalized");
1444 sleep(Duration::from_secs(1)).await;
1445 }
1446
1447 {
1448 let success_rate = ds.success_rate().await.unwrap();
1452 assert!(success_rate.is_finite(), "{success_rate}");
1453 assert!(success_rate > 0.0, "{success_rate}");
1454 }
1455
1456 {
1457 network.shut_down().await;
1460 sleep(Duration::from_secs(3)).await;
1461 assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1463 }
1464 }
1465}
1466
1467#[macro_export]
1468macro_rules! instantiate_data_source_tests {
1469 ($t:ty) => {
1470 use $crate::data_source::{
1471 availability_tests, node_tests, persistence_tests, status_tests,
1472 };
1473
1474 instantiate_availability_tests!($t);
1475 instantiate_persistence_tests!($t);
1476 instantiate_node_tests!($t);
1477 instantiate_status_tests!($t);
1478 };
1479}