1mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50 use std::ops::{Bound, RangeBounds};
51
52 use futures::{
53 future,
54 stream::{BoxStream, StreamExt},
55 };
56
57 use crate::{
58 availability::{BlockQueryData, Fetch, LeafQueryData},
59 node::NodeDataSource,
60 testing::{consensus::TestableDataSource, mocks::MockTypes},
61 };
62
63 async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize> + use<R, D>
65 where
66 D: TestableDataSource,
67 R: RangeBounds<usize>,
68 {
69 let start = range.start_bound().cloned();
70 let mut end = range.end_bound().cloned();
71 if end == Bound::Unbounded {
72 end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73 }
74 (start, end)
75 }
76
77 pub async fn block_range<R, D>(
79 ds: &D,
80 range: R,
81 ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82 where
83 D: TestableDataSource,
84 R: RangeBounds<usize> + Send + 'static,
85 {
86 ds.get_block_range(bound_range(ds, range).await)
87 .await
88 .then(Fetch::resolve)
89 .boxed()
90 }
91
92 pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94 where
95 D: TestableDataSource,
96 R: RangeBounds<usize> + Send + 'static,
97 {
98 ds.get_leaf_range(bound_range(ds, range).await)
99 .await
100 .then(Fetch::resolve)
101 .boxed()
102 }
103
104 pub async fn get_non_empty_blocks<D>(
105 ds: &D,
106 ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107 where
108 D: TestableDataSource,
109 {
110 leaf_range(ds, 1..)
112 .await
113 .zip(block_range(ds, 1..).await)
114 .filter(|(_, block)| future::ready(!block.is_empty()))
115 .collect()
116 .await
117 }
118}
119
120#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124 use std::{
125 collections::HashMap,
126 fmt::Debug,
127 ops::{Bound, RangeBounds},
128 };
129
130 use committable::Committable;
131 use futures::stream::StreamExt;
132 use hotshot_types::{data::Leaf2, vote::HasViewNumber};
133
134 use super::test_helpers::*;
135 use crate::{
136 availability::{BlockId, payload_size},
137 data_source::storage::{AvailabilityStorage, NodeStorage},
138 node::NodeDataSource,
139 testing::{
140 consensus::{MockNetwork, TestableDataSource},
141 mocks::{MockTypes, mock_transaction},
142 },
143 types::HeightIndexed,
144 };
145
146 async fn validate<D: TestableDataSource>(ds: &D)
147 where
148 for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
149 {
150 let mut seen_payloads = HashMap::new();
153 let mut seen_transactions = HashMap::new();
154 let mut leaves = leaf_range(ds, ..).await.enumerate();
155 while let Some((i, leaf)) = leaves.next().await {
156 assert_eq!(leaf.height(), i as u64);
157 assert_eq!(
158 leaf.hash(),
159 <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
160 );
161
162 tracing::info!("looking up leaf {i} various ways");
164 assert_eq!(leaf, ds.get_leaf(i).await.await);
165 assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
166
167 tracing::info!("looking up block {i} various ways");
168 let block = ds.get_block(i).await.await;
169 assert_eq!(leaf.block_hash(), block.hash());
170 assert_eq!(block.height(), i as u64);
171 assert_eq!(block.hash(), block.header().commit());
172 assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
173
174 assert_eq!(block, ds.get_block(i).await.await);
176 assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
177 let ix = seen_payloads
187 .entry(block.payload_hash())
188 .or_insert(i as u64);
189 if let Ok(block) = ds
190 .get_block(BlockId::PayloadHash(block.payload_hash()))
191 .await
192 .try_resolve()
193 {
194 assert_eq!(block.height(), *ix);
195 } else {
196 tracing::warn!(
197 "skipping block by payload index check for missing payload {:?}",
198 block.header()
199 );
200 ds.get_block(BlockId::PayloadHash(block.payload_hash()))
202 .await
203 .await;
204 }
205
206 tracing::info!("looking up payload {i} various ways");
208 let expected_payload = block.clone().into();
209 assert_eq!(ds.get_payload(i).await.await, expected_payload);
210 assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
211 if let Ok(payload) = ds
214 .get_payload(BlockId::PayloadHash(block.payload_hash()))
215 .await
216 .try_resolve()
217 {
218 if *ix == i as u64 {
219 assert_eq!(payload, expected_payload);
220 }
221 } else {
222 tracing::warn!(
223 "skipping payload index check for missing payload {:?}",
224 block.header()
225 );
226 ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
228 .await
229 .await;
230 }
231
232 tracing::info!("looking up VID common {i} various ways");
234 let common = ds.get_vid_common(block.height() as usize).await.await;
235 assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
236 if let Ok(res) = ds
239 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
240 .await
241 .try_resolve()
242 {
243 if *ix == i as u64 {
244 assert_eq!(res, common);
245 }
246 } else {
247 tracing::warn!(
248 "skipping VID common index check for missing data {:?}",
249 block.header()
250 );
251 let res = ds
253 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
254 .await
255 .await;
256 assert_eq!(res.payload_hash(), common.payload_hash());
257 }
258
259 for (j, txn) in block.enumerate() {
260 tracing::info!("looking up transaction {i},{j:?}");
261
262 let ix = seen_transactions
270 .entry(txn.commit())
271 .or_insert((i as u64, j.clone()));
272 if let Ok(tx_data) = ds
273 .get_block_containing_transaction(txn.commit())
274 .await
275 .try_resolve()
276 {
277 assert_eq!(tx_data.transaction.transaction(), &txn);
278 assert_eq!(tx_data.transaction.block_height(), ix.0);
279 assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
280 assert_eq!(tx_data.index, ix.1);
281 assert_eq!(tx_data.block, block);
282 } else {
283 tracing::warn!(
284 "skipping transaction index check for missing transaction {j:?} {txn:?}"
285 );
286 ds.get_block_containing_transaction(txn.commit())
288 .await
289 .await;
290 }
291 }
292 }
293
294 {
296 let mut tx = ds.read().await.unwrap();
297 let block_height = NodeStorage::block_height(&mut tx).await.unwrap();
298 let last_leaf = tx.get_leaf((block_height - 1).into()).await.unwrap();
299
300 if last_leaf.qc().data.epoch.is_some() {
301 tracing::info!(block_height, "checking QC chain");
302 let qc_chain = tx.latest_qc_chain().await.unwrap().unwrap();
303
304 assert_eq!(last_leaf.height(), (block_height - 1) as u64);
305 assert_eq!(qc_chain[0].view_number(), last_leaf.leaf().view_number());
306 assert_eq!(qc_chain[0].leaf_commit(), last_leaf.hash());
307 assert_eq!(qc_chain[1].view_number(), qc_chain[0].view_number() + 1);
308 }
309 }
310 }
311
312 #[test_log::test(tokio::test(flavor = "multi_thread"))]
313 pub async fn test_update<D: TestableDataSource>()
314 where
315 for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
316 {
317 let mut network = MockNetwork::<D>::init().await;
318 let ds = network.data_source();
319
320 network.start().await;
321 assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
322
323 let mut blocks = ds.subscribe_blocks(0).await.enumerate();
326 for nonce in 0..3 {
327 let txn = mock_transaction(vec![nonce]);
328 network.submit_transaction(txn).await;
329
330 let (i, block) = loop {
332 tracing::info!("waiting for tx {nonce}");
333 let (i, block) = blocks.next().await.unwrap();
334 if !block.is_empty() {
335 break (i, block);
336 }
337 tracing::info!("block {i} is empty");
338 };
339
340 tracing::info!("got tx {nonce} in block {i}");
341 assert_eq!(ds.get_block(i).await.await, block);
342 validate(&ds).await;
343 }
344
345 {
349 tracing::info!("checking persisted storage");
350 let storage = D::connect(network.storage()).await;
351
352 let block_height = NodeDataSource::block_height(&ds).await.unwrap();
356 assert_eq!(
357 ds.get_block_range(..block_height)
358 .await
359 .map(|fetch| fetch.try_resolve().ok())
360 .collect::<Vec<_>>()
361 .await,
362 storage
363 .get_block_range(..block_height)
364 .await
365 .map(|fetch| fetch.try_resolve().ok())
366 .collect::<Vec<_>>()
367 .await
368 );
369 assert_eq!(
370 ds.get_leaf_range(..block_height)
371 .await
372 .map(|fetch| fetch.try_resolve().ok())
373 .collect::<Vec<_>>()
374 .await,
375 storage
376 .get_leaf_range(..block_height)
377 .await
378 .map(|fetch| fetch.try_resolve().ok())
379 .collect::<Vec<_>>()
380 .await
381 );
382 }
383 }
384
385 #[test_log::test(tokio::test(flavor = "multi_thread"))]
386 pub async fn test_range<D: TestableDataSource>()
387 where
388 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
389 {
390 let mut network = MockNetwork::<D>::init().await;
391 let ds = network.data_source();
392 network.start().await;
393
394 let block_height = loop {
396 let mut tx = ds.read().await.unwrap();
397 let block_height = tx.block_height().await.unwrap();
398 if block_height >= 3 {
399 break block_height as u64;
400 }
401 };
402
403 do_range_test(&ds, 1..=2, 1..3).await; do_range_test(&ds, 1..3, 1..3).await; do_range_test(&ds, 1.., 1..block_height).await; do_range_test(&ds, ..=2, 0..3).await; do_range_test(&ds, ..3, 0..3).await; do_range_test(&ds, .., 0..block_height).await; do_range_test(&ds, ExRange(0..=2), 1..3).await; do_range_test(&ds, ExRange(0..3), 1..3).await; do_range_test(&ds, ExRange(0..), 1..block_height).await; }
415
416 async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
417 where
418 D: TestableDataSource,
419 R: RangeBounds<usize> + Clone + Debug + Send + 'static,
420 I: IntoIterator<Item = u64>,
421 {
422 tracing::info!("testing range {range:?}");
423
424 let mut leaves = ds.get_leaf_range(range.clone()).await;
425 let mut blocks = ds.get_block_range(range.clone()).await;
426 let mut payloads = ds.get_payload_range(range.clone()).await;
427 let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
428 let mut vid_common = ds.get_vid_common_range(range.clone()).await;
429 let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
430
431 for i in expected_indices {
432 tracing::info!(i, "check entries");
433 let leaf = leaves.next().await.unwrap().await;
434 let block = blocks.next().await.unwrap().await;
435 let payload = payloads.next().await.unwrap().await;
436 let payload_meta = payloads_meta.next().await.unwrap().await;
437 let common = vid_common.next().await.unwrap().await;
438 let common_meta = vid_common_meta.next().await.unwrap().await;
439 assert_eq!(leaf.height(), i);
440 assert_eq!(block.height(), i);
441 assert_eq!(payload, ds.get_payload(i as usize).await.await);
442 assert_eq!(payload_meta, block.into());
443 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
444 assert_eq!(common_meta, common.into());
445 }
446
447 if range.end_bound() == Bound::Unbounded {
448 loop {
451 let fetch_leaf = leaves.next().await.unwrap();
452 let fetch_block = blocks.next().await.unwrap();
453 let fetch_payload = payloads.next().await.unwrap();
454 let fetch_payload_meta = payloads_meta.next().await.unwrap();
455 let fetch_common = vid_common.next().await.unwrap();
456 let fetch_common_meta = vid_common_meta.next().await.unwrap();
457
458 if fetch_leaf.try_resolve().is_ok()
459 && fetch_block.try_resolve().is_ok()
460 && fetch_payload.try_resolve().is_ok()
461 && fetch_payload_meta.try_resolve().is_ok()
462 && fetch_common.try_resolve().is_ok()
463 && fetch_common_meta.try_resolve().is_ok()
464 {
465 tracing::info!("searching for end of available objects");
466 } else {
467 break;
468 }
469 }
470 } else {
471 assert!(leaves.next().await.is_none());
473 assert!(blocks.next().await.is_none());
474 assert!(payloads.next().await.is_none());
475 assert!(payloads_meta.next().await.is_none());
476 assert!(vid_common.next().await.is_none());
477 assert!(vid_common_meta.next().await.is_none());
478 }
479 }
480
481 #[test_log::test(tokio::test(flavor = "multi_thread"))]
482 pub async fn test_range_rev<D: TestableDataSource>()
483 where
484 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
485 {
486 let mut network = MockNetwork::<D>::init().await;
487 let ds = network.data_source();
488 network.start().await;
489
490 ds.subscribe_leaves(5).await.next().await.unwrap();
492
493 do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
495 do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
496 do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
497 }
498
499 async fn do_range_rev_test<D>(
500 ds: &D,
501 start: Bound<usize>,
502 end: usize,
503 expected_indices: impl DoubleEndedIterator<Item = u64>,
504 ) where
505 D: TestableDataSource,
506 {
507 tracing::info!("testing range {start:?}-{end}");
508
509 let mut leaves = ds.get_leaf_range_rev(start, end).await;
510 let mut blocks = ds.get_block_range_rev(start, end).await;
511 let mut payloads = ds.get_payload_range_rev(start, end).await;
512 let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
513 let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
514 let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
515
516 for i in expected_indices.rev() {
517 tracing::info!(i, "check entries");
518 let leaf = leaves.next().await.unwrap().await;
519 let block = blocks.next().await.unwrap().await;
520 let payload = payloads.next().await.unwrap().await;
521 let payload_meta = payloads_meta.next().await.unwrap().await;
522 let common = vid_common.next().await.unwrap().await;
523 let common_meta = vid_common_meta.next().await.unwrap().await;
524 assert_eq!(leaf.height(), i);
525 assert_eq!(block.height(), i);
526 assert_eq!(payload.height(), i);
527 assert_eq!(payload_meta.height(), i);
528 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
529 assert_eq!(
530 common_meta,
531 ds.get_vid_common_metadata(i as usize).await.await
532 );
533 }
534
535 assert!(leaves.next().await.is_none());
537 assert!(blocks.next().await.is_none());
538 assert!(payloads.next().await.is_none());
539 assert!(payloads_meta.next().await.is_none());
540 assert!(vid_common.next().await.is_none());
541 assert!(vid_common_meta.next().await.is_none());
542 }
543
544 #[derive(Clone, Copy, Debug)]
546 struct ExRange<R>(R);
547
548 impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
549 fn start_bound(&self) -> Bound<&usize> {
550 match self.0.start_bound() {
551 Bound::Included(x) => Bound::Excluded(x),
552 Bound::Excluded(x) => Bound::Excluded(x),
553 Bound::Unbounded => Bound::Excluded(&0),
554 }
555 }
556
557 fn end_bound(&self) -> Bound<&usize> {
558 self.0.end_bound()
559 }
560 }
561}
562
563#[cfg(any(test, feature = "testing"))]
565#[espresso_macros::generic_tests]
566pub mod persistence_tests {
567 use committable::Committable;
568 use hotshot_example_types::{
569 node_types::TEST_VERSIONS,
570 state_types::{TestInstanceState, TestValidatedState},
571 };
572 use hotshot_types::simple_certificate::QuorumCertificate2;
573
574 use crate::{
575 Leaf2,
576 availability::{BlockQueryData, LeafQueryData},
577 data_source::{
578 Transaction,
579 storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
580 },
581 node::NodeDataSource,
582 testing::{
583 consensus::TestableDataSource,
584 mocks::{MockPayload, MockTypes},
585 },
586 types::HeightIndexed,
587 };
588
589 #[test_log::test(tokio::test(flavor = "multi_thread"))]
590 pub async fn test_revert<D: TestableDataSource>()
591 where
592 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
593 + AvailabilityStorage<MockTypes>
594 + NodeStorage<MockTypes>,
595 {
596 let storage = D::create(0).await;
597 let ds = D::connect(&storage).await;
598
599 let mut qc = QuorumCertificate2::<MockTypes>::genesis(
601 &TestValidatedState::default(),
602 &TestInstanceState::default(),
603 TEST_VERSIONS.test,
604 )
605 .await;
606 let mut leaf = Leaf2::<MockTypes>::genesis(
607 &TestValidatedState::default(),
608 &TestInstanceState::default(),
609 TEST_VERSIONS.test.base,
610 )
611 .await;
612 leaf.block_header_mut().block_number += 1;
615 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
616
617 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
618 let leaf = LeafQueryData::new(leaf, qc).unwrap();
619
620 let mut tx = ds.write().await.unwrap();
622 tx.insert_leaf(leaf.clone()).await.unwrap();
623 tx.insert_block(block.clone()).await.unwrap();
624
625 assert_eq!(tx.block_height().await.unwrap(), 2);
626 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
627 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
628
629 tx.revert().await;
631 assert_eq!(
632 NodeDataSource::<MockTypes>::block_height(&ds)
633 .await
634 .unwrap(),
635 0
636 );
637 ds.get_leaf(1).await.try_resolve().unwrap_err();
638 ds.get_block(1).await.try_resolve().unwrap_err();
639 }
640
641 #[test_log::test(tokio::test(flavor = "multi_thread"))]
642 pub async fn test_reset<D: TestableDataSource>()
643 where
644 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
645 {
646 let storage = D::create(0).await;
647 let ds = D::connect(&storage).await;
648
649 let mut qc = QuorumCertificate2::<MockTypes>::genesis(
651 &TestValidatedState::default(),
652 &TestInstanceState::default(),
653 TEST_VERSIONS.test,
654 )
655 .await;
656 let mut leaf = Leaf2::<MockTypes>::genesis(
657 &TestValidatedState::default(),
658 &TestInstanceState::default(),
659 TEST_VERSIONS.test.base,
660 )
661 .await;
662 leaf.block_header_mut().block_number += 1;
665 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
666
667 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
668 let leaf = LeafQueryData::new(leaf, qc).unwrap();
669
670 let mut tx = ds.write().await.unwrap();
672 tx.insert_leaf(leaf.clone()).await.unwrap();
673 tx.insert_block(block.clone()).await.unwrap();
674 tx.commit().await.unwrap();
675
676 assert_eq!(
677 NodeDataSource::<MockTypes>::block_height(&ds)
678 .await
679 .unwrap(),
680 2
681 );
682 assert_eq!(leaf, ds.get_leaf(1).await.await);
683 assert_eq!(block, ds.get_block(1).await.await);
684
685 drop(ds);
686
687 let ds = D::reset(&storage).await;
689 assert_eq!(
690 NodeDataSource::<MockTypes>::block_height(&ds)
691 .await
692 .unwrap(),
693 0
694 );
695 ds.get_leaf(1).await.try_resolve().unwrap_err();
696 ds.get_block(1).await.try_resolve().unwrap_err();
697 }
698
699 #[test_log::test(tokio::test(flavor = "multi_thread"))]
700 pub async fn test_drop_tx<D: TestableDataSource>()
701 where
702 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
703 + AvailabilityStorage<MockTypes>
704 + NodeStorage<MockTypes>,
705 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
706 {
707 let storage = D::create(0).await;
708 let ds = D::connect(&storage).await;
709
710 let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis(
712 &TestValidatedState::default(),
713 &TestInstanceState::default(),
714 TEST_VERSIONS.test,
715 )
716 .await;
717 let mut mock_leaf = Leaf2::<MockTypes>::genesis(
718 &TestValidatedState::default(),
719 &TestInstanceState::default(),
720 TEST_VERSIONS.test.base,
721 )
722 .await;
723 mock_leaf.block_header_mut().block_number += 1;
726 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
727
728 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
729 let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
730
731 tracing::info!("write");
733 let mut tx = ds.write().await.unwrap();
734 tx.insert_leaf(leaf.clone()).await.unwrap();
735 tx.insert_block(block.clone()).await.unwrap();
736
737 assert_eq!(tx.block_height().await.unwrap(), 2);
738 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
739 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
740
741 drop(tx);
743
744 tracing::info!("read");
746 let mut tx = ds.read().await.unwrap();
747 assert_eq!(tx.block_height().await.unwrap(), 0);
748 drop(tx);
749
750 mock_leaf.block_header_mut().block_number += 1;
752 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
753 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
754 let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
755
756 tracing::info!("write again");
757 let mut tx = ds.write().await.unwrap();
758 tx.insert_leaf(leaf.clone()).await.unwrap();
759 tx.insert_block(block.clone()).await.unwrap();
760 tx.commit().await.unwrap();
761
762 tracing::info!("read again");
765 let height = leaf.height() as usize;
766 assert_eq!(
767 NodeDataSource::<MockTypes>::block_height(&ds)
768 .await
769 .unwrap(),
770 height + 1
771 );
772 assert_eq!(leaf, ds.get_leaf(height).await.await);
773 assert_eq!(block, ds.get_block(height).await.await);
774 ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
775 ds.get_block(height - 1).await.try_resolve().unwrap_err();
776 }
777}
778
779#[cfg(any(test, feature = "testing"))]
781#[espresso_macros::generic_tests]
782pub mod node_tests {
783 use std::time::Duration;
784
785 use committable::Committable;
786 use futures::{future::join_all, stream::StreamExt};
787 use hotshot::traits::BlockPayload;
788 use hotshot_example_types::{
789 block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
790 node_types::{TEST_VERSIONS, TestTypes},
791 state_types::{TestInstanceState, TestValidatedState},
792 };
793 use hotshot_types::{
794 data::{VidCommitment, VidCommon, VidShare, ViewNumber, vid_commitment},
795 simple_certificate::{CertificatePair, QuorumCertificate2},
796 traits::block_contents::{BlockHeader, EncodeBytes},
797 vid::advz::{ADVZScheme, advz_scheme},
798 };
799 use jf_advz::VidScheme;
800 use pretty_assertions::assert_eq;
801
802 use crate::{
803 Header, Leaf2,
804 availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
805 data_source::{
806 storage::{NodeStorage, UpdateAvailabilityStorage},
807 update::Transaction,
808 },
809 node::{
810 BlockId, NodeDataSource, ResourceSyncStatus, SyncStatus, SyncStatusQueryData,
811 SyncStatusRange, TimeWindowQueryData, WindowStart,
812 },
813 testing::{
814 consensus::{MockNetwork, TestableDataSource},
815 mocks::{MockPayload, MockTypes, mock_transaction},
816 sleep,
817 },
818 types::HeightIndexed,
819 };
820
821 fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
822 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
823 }
824
825 #[test_log::test(tokio::test(flavor = "multi_thread"))]
826 pub async fn test_sync_status<D: TestableDataSource>()
827 where
828 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
829 {
830 let storage = D::create(0).await;
831 let ds = D::connect(&storage).await;
832
833 let mut vid = advz_scheme(2);
835
836 let mut leaves = vec![
838 LeafQueryData::<MockTypes>::genesis(
839 &TestValidatedState::default(),
840 &TestInstanceState::default(),
841 TEST_VERSIONS.test,
842 )
843 .await,
844 ];
845 let mut blocks = vec![
846 BlockQueryData::<MockTypes>::genesis(
847 &TestValidatedState::default(),
848 &TestInstanceState::default(),
849 TEST_VERSIONS.test.base,
850 )
851 .await,
852 ];
853 let dispersal = vid.disperse([]).unwrap();
854 let mut vid_data = vec![(
855 VidCommonQueryData::new(
856 leaves[0].header().clone(),
857 VidCommon::V0(dispersal.common.clone()),
858 ),
859 dispersal.shares[0].clone(),
860 )];
861 for i in 0..2 {
862 let (payload, metadata) = <MockPayload as BlockPayload<MockTypes>>::from_transactions(
865 vec![mock_transaction(vec![i as u8])],
866 &Default::default(),
867 &Default::default(),
868 )
869 .await
870 .unwrap();
871 let dispersal = vid.disperse(payload.encode()).unwrap();
872
873 let mut leaf = leaves[i].clone();
874 leaf.leaf.block_header_mut().block_number += 1;
875 leaf.leaf.block_header_mut().payload_commitment = VidCommitment::V0(dispersal.commit);
876 leaf.leaf.block_header_mut().metadata = metadata;
877 let block = BlockQueryData::new(leaf.header().clone(), payload);
878 let vid_common = VidCommonQueryData::new(
879 leaf.header().clone(),
880 VidCommon::V0(dispersal.common.clone()),
881 );
882 let vid_share = dispersal.shares[0].clone();
883
884 leaves.push(leaf);
885 blocks.push(block);
886 vid_data.push((vid_common, vid_share));
887 }
888
889 assert!(ds.sync_status().await.unwrap().is_fully_synced());
891
892 ds.append(leaves[0].clone().into()).await.unwrap();
895 assert_eq!(
896 ds.sync_status().await.unwrap(),
897 SyncStatusQueryData {
898 blocks: ResourceSyncStatus {
899 missing: 1,
900 ranges: vec![SyncStatusRange {
901 start: 0,
902 end: 1,
903 status: SyncStatus::Missing,
904 }]
905 },
906 vid_common: ResourceSyncStatus {
907 missing: 1,
908 ranges: vec![SyncStatusRange {
909 start: 0,
910 end: 1,
911 status: SyncStatus::Missing,
912 }]
913 },
914 vid_shares: ResourceSyncStatus {
915 missing: 1,
916 ranges: vec![SyncStatusRange {
917 start: 0,
918 end: 1,
919 status: SyncStatus::Missing,
920 }]
921 },
922 leaves: ResourceSyncStatus {
923 missing: 0,
924 ranges: vec![SyncStatusRange {
925 start: 0,
926 end: 1,
927 status: SyncStatus::Present,
928 }]
929 },
930 pruned_height: None,
931 }
932 );
933
934 ds.append(leaves[2].clone().into()).await.unwrap();
937 assert_eq!(
938 ds.sync_status().await.unwrap(),
939 SyncStatusQueryData {
940 blocks: ResourceSyncStatus {
941 missing: 3,
942 ranges: vec![SyncStatusRange {
943 start: 0,
944 end: 3,
945 status: SyncStatus::Missing,
946 }]
947 },
948 vid_common: ResourceSyncStatus {
949 missing: 3,
950 ranges: vec![SyncStatusRange {
951 start: 0,
952 end: 3,
953 status: SyncStatus::Missing,
954 }]
955 },
956 vid_shares: ResourceSyncStatus {
957 missing: 3,
958 ranges: vec![SyncStatusRange {
959 start: 0,
960 end: 3,
961 status: SyncStatus::Missing,
962 }]
963 },
964 leaves: ResourceSyncStatus {
965 missing: 1,
966 ranges: vec![
967 SyncStatusRange {
968 start: 0,
969 end: 1,
970 status: SyncStatus::Present,
971 },
972 SyncStatusRange {
973 start: 1,
974 end: 2,
975 status: SyncStatus::Missing,
976 },
977 SyncStatusRange {
978 start: 2,
979 end: 3,
980 status: SyncStatus::Present,
981 }
982 ]
983 },
984 pruned_height: None,
985 }
986 );
987
988 {
990 let mut tx = ds.write().await.unwrap();
991 tx.insert_vid(vid_data[0].0.clone(), None).await.unwrap();
992 tx.commit().await.unwrap();
993 }
994 assert_eq!(
995 ds.sync_status().await.unwrap(),
996 SyncStatusQueryData {
997 blocks: ResourceSyncStatus {
998 missing: 3,
999 ranges: vec![SyncStatusRange {
1000 start: 0,
1001 end: 3,
1002 status: SyncStatus::Missing,
1003 }]
1004 },
1005 vid_common: ResourceSyncStatus {
1006 missing: 2,
1007 ranges: vec![
1008 SyncStatusRange {
1009 start: 0,
1010 end: 1,
1011 status: SyncStatus::Present,
1012 },
1013 SyncStatusRange {
1014 start: 1,
1015 end: 3,
1016 status: SyncStatus::Missing,
1017 },
1018 ]
1019 },
1020 vid_shares: ResourceSyncStatus {
1021 missing: 3,
1022 ranges: vec![SyncStatusRange {
1023 start: 0,
1024 end: 3,
1025 status: SyncStatus::Missing,
1026 }]
1027 },
1028 leaves: ResourceSyncStatus {
1029 missing: 1,
1030 ranges: vec![
1031 SyncStatusRange {
1032 start: 0,
1033 end: 1,
1034 status: SyncStatus::Present,
1035 },
1036 SyncStatusRange {
1037 start: 1,
1038 end: 2,
1039 status: SyncStatus::Missing,
1040 },
1041 SyncStatusRange {
1042 start: 2,
1043 end: 3,
1044 status: SyncStatus::Present,
1045 }
1046 ]
1047 },
1048 pruned_height: None,
1049 }
1050 );
1051
1052 {
1054 let mut tx = ds.write().await.unwrap();
1055 tx.insert_block(blocks[0].clone()).await.unwrap();
1056 tx.insert_vid(
1057 vid_data[0].0.clone(),
1058 Some(VidShare::V0(vid_data[0].1.clone())),
1059 )
1060 .await
1061 .unwrap();
1062 tx.insert_leaf(leaves[1].clone()).await.unwrap();
1063 tx.insert_block(blocks[1].clone()).await.unwrap();
1064 tx.insert_vid(
1065 vid_data[1].0.clone(),
1066 Some(VidShare::V0(vid_data[1].1.clone())),
1067 )
1068 .await
1069 .unwrap();
1070 tx.insert_block(blocks[2].clone()).await.unwrap();
1071 tx.insert_vid(
1072 vid_data[2].0.clone(),
1073 Some(VidShare::V0(vid_data[2].1.clone())),
1074 )
1075 .await
1076 .unwrap();
1077 tx.commit().await.unwrap();
1078 }
1079
1080 let (leaves, vid_shares) = if ds.get_leaf(1).await.try_resolve().is_err() {
1085 tracing::warn!(
1086 "data source does not support out-of-order filling, allowing one missing leaf and \
1087 VID share"
1088 );
1089 (
1090 ResourceSyncStatus {
1091 missing: 1,
1092 ranges: vec![
1093 SyncStatusRange {
1094 start: 0,
1095 end: 1,
1096 status: SyncStatus::Present,
1097 },
1098 SyncStatusRange {
1099 start: 1,
1100 end: 2,
1101 status: SyncStatus::Missing,
1102 },
1103 SyncStatusRange {
1104 start: 2,
1105 end: 3,
1106 status: SyncStatus::Present,
1107 },
1108 ],
1109 },
1110 ResourceSyncStatus {
1111 missing: 1,
1112 ranges: vec![
1113 SyncStatusRange {
1114 start: 0,
1115 end: 1,
1116 status: SyncStatus::Missing,
1117 },
1118 SyncStatusRange {
1119 start: 1,
1120 end: 3,
1121 status: SyncStatus::Present,
1122 },
1123 ],
1124 },
1125 )
1126 } else {
1127 (
1128 ResourceSyncStatus {
1129 missing: 0,
1130 ranges: vec![SyncStatusRange {
1131 start: 0,
1132 end: 3,
1133 status: SyncStatus::Present,
1134 }],
1135 },
1136 ResourceSyncStatus {
1137 missing: 0,
1138 ranges: vec![SyncStatusRange {
1139 start: 0,
1140 end: 3,
1141 status: SyncStatus::Present,
1142 }],
1143 },
1144 )
1145 };
1146 let expected_sync_status = SyncStatusQueryData {
1147 leaves,
1148 vid_shares,
1149 blocks: ResourceSyncStatus {
1150 missing: 0,
1151 ranges: vec![SyncStatusRange {
1152 start: 0,
1153 end: 3,
1154 status: SyncStatus::Present,
1155 }],
1156 },
1157 vid_common: ResourceSyncStatus {
1158 missing: 0,
1159 ranges: vec![SyncStatusRange {
1160 start: 0,
1161 end: 3,
1162 status: SyncStatus::Present,
1163 }],
1164 },
1165 pruned_height: None,
1166 };
1167 assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
1168
1169 {
1172 let mut tx = ds.write().await.unwrap();
1173 tx.insert_vid(vid_data[0].0.clone(), None).await.unwrap();
1174 tx.commit().await.unwrap();
1175 }
1176 assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
1177 }
1178
1179 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1180 pub async fn test_counters<D: TestableDataSource>() {
1181 let storage = D::create(0).await;
1182 let ds = D::connect(&storage).await;
1183
1184 assert_eq!(ds.count_transactions().await.unwrap(), 0);
1185 assert_eq!(ds.payload_size().await.unwrap(), 0);
1186
1187 let mut total_transactions = 0;
1189 let mut total_size = 0;
1190 'outer: for i in [0, 1, 2] {
1191 let (payload, metadata) =
1196 <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
1197 [mock_transaction(vec![i as u8 % 2])],
1198 &TestValidatedState::default(),
1199 &TestInstanceState::default(),
1200 )
1201 .await
1202 .unwrap();
1203 let encoded = payload.encode();
1204 let payload_commitment =
1205 vid_commitment(&encoded, &metadata.encode(), 1, TEST_VERSIONS.test.base);
1206 let header = TestBlockHeader {
1207 block_number: i,
1208 payload_commitment,
1209 timestamp: i,
1210 timestamp_millis: i * 1_000,
1211 builder_commitment:
1212 <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
1213 &payload, &metadata,
1214 ),
1215 metadata: TestMetadata {
1216 num_transactions: 7, },
1218 random: 1, version: TEST_VERSIONS.test.base,
1220 };
1221
1222 let mut leaf = LeafQueryData::<MockTypes>::genesis(
1223 &TestValidatedState::default(),
1224 &TestInstanceState::default(),
1225 TEST_VERSIONS.test,
1226 )
1227 .await;
1228 *leaf.leaf.block_header_mut() = header.clone();
1229 let block = BlockQueryData::new(header, payload);
1230 ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1231 .await
1232 .unwrap();
1233 assert_eq!(
1234 NodeDataSource::<MockTypes>::block_height(&ds)
1235 .await
1236 .unwrap(),
1237 (i + 1) as usize,
1238 );
1239
1240 total_transactions += 1;
1241 total_size += encoded.len();
1242
1243 for retry in 0..5 {
1245 let ds_transactions = ds.count_transactions().await.unwrap();
1246 let ds_payload_size = ds.payload_size().await.unwrap();
1247 if ds_transactions != total_transactions || ds_payload_size != total_size {
1248 tracing::info!(
1249 i,
1250 retry,
1251 total_transactions,
1252 ds_transactions,
1253 total_size,
1254 ds_payload_size,
1255 "waiting for statistics to update"
1256 );
1257 sleep(Duration::from_secs(1)).await;
1258 } else {
1259 continue 'outer;
1260 }
1261 }
1262 panic!("counters did not update in time");
1263 }
1264 }
1265
1266 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1267 pub async fn test_vid_shares<D: TestableDataSource>()
1268 where
1269 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1270 {
1271 let mut network = MockNetwork::<D>::init().await;
1272 let ds = network.data_source();
1273
1274 network.start().await;
1275
1276 let mut leaves = ds.subscribe_leaves(0).await.take(3);
1278 while let Some(leaf) = leaves.next().await {
1279 tracing::info!("got leaf {}", leaf.height());
1280 let mut tx = ds.read().await.unwrap();
1281 let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1282 assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1283 assert_eq!(
1284 share,
1285 tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1286 .await
1287 .unwrap()
1288 );
1289 }
1290 }
1291
1292 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1293 pub async fn test_vid_monotonicity<D: TestableDataSource>()
1294 where
1295 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1296 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1297 {
1298 let storage = D::create(0).await;
1299 let ds = D::connect(&storage).await;
1300
1301 let mut vid = advz_scheme(2);
1303 let disperse = vid.disperse([]).unwrap();
1304
1305 let leaf = LeafQueryData::<MockTypes>::genesis(
1307 &TestValidatedState::default(),
1308 &TestInstanceState::default(),
1309 TEST_VERSIONS.test,
1310 )
1311 .await;
1312 let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1313 ds.append(BlockInfo::new(
1314 leaf,
1315 None,
1316 Some(common.clone()),
1317 Some(VidShare::V0(disperse.shares[0].clone())),
1318 ))
1319 .await
1320 .unwrap();
1321
1322 {
1323 assert_eq!(ds.get_vid_common(0).await.await, common);
1324 assert_eq!(
1325 ds.vid_share(0).await.unwrap(),
1326 VidShare::V0(disperse.shares[0].clone())
1327 );
1328 }
1329
1330 {
1333 let mut tx = ds.write().await.unwrap();
1334 tx.insert_vid(common.clone(), None).await.unwrap();
1335 tx.commit().await.unwrap();
1336 }
1337 {
1338 assert_eq!(ds.get_vid_common(0).await.await, common);
1339 assert_eq!(
1340 ds.vid_share(0).await.unwrap(),
1341 VidShare::V0(disperse.shares[0].clone())
1342 );
1343 }
1344 }
1345
1346 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1347 pub async fn test_vid_recovery<D: TestableDataSource>()
1348 where
1349 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1350 {
1351 let mut network = MockNetwork::<D>::init().await;
1352 let ds = network.data_source();
1353
1354 network.start().await;
1355
1356 let mut blocks = ds.subscribe_blocks(0).await;
1358 let txn = mock_transaction(vec![1, 2, 3]);
1359 network.submit_transaction(txn.clone()).await;
1360
1361 let block = loop {
1363 tracing::info!("waiting for transaction");
1364 let block = blocks.next().await.unwrap();
1365 if !block.is_empty() {
1366 tracing::info!(height = block.height(), "transaction sequenced");
1367 break block;
1368 }
1369 tracing::info!(height = block.height(), "empty block");
1370 };
1371 let height = block.height() as usize;
1372 let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1373 commit
1374 } else {
1375 panic!("expect ADVZ commitment")
1376 };
1377
1378 let vid = advz_scheme(network.num_nodes());
1380
1381 tracing::info!("fetching common data");
1383 let common = ds.get_vid_common(height).await.await;
1384 let VidCommon::V0(common) = &common.common() else {
1385 panic!("expect ADVZ common");
1386 };
1387 ADVZScheme::is_consistent(&commit, common).unwrap();
1388
1389 tracing::info!("fetching shares");
1391 let network = &network;
1392 let vid = &vid;
1393 let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1394 let ds = network.data_source_index(i);
1395
1396 let mut leaves = ds.subscribe_leaves(height).await;
1399 let leaf = leaves.next().await.unwrap();
1400 assert_eq!(leaf.height(), height as u64);
1401 assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1402
1403 let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1404 share
1405 } else {
1406 panic!("expect ADVZ share")
1407 };
1408 vid.verify_share(&share, common, &commit).unwrap().unwrap();
1409 share
1410 }))
1411 .await;
1412
1413 tracing::info!("recovering payload");
1415 let bytes = vid.recover_payload(&shares, common).unwrap();
1416 let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1417 &bytes,
1418 &TestMetadata {
1419 num_transactions: 7, },
1421 );
1422 assert_eq!(recovered, *block.payload());
1423 assert_eq!(recovered.transactions, vec![txn]);
1424 }
1425
1426 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1427 pub async fn test_timestamp_window<D: TestableDataSource>() {
1428 let mut network = MockNetwork::<D>::init().await;
1429 let ds = network.data_source();
1430
1431 network.start().await;
1432
1433 let mut leaves = ds.subscribe_leaves(0).await;
1436 let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1439 while test_blocks.len() < 3 {
1440 let leaf = leaves.next().await.unwrap();
1442 let header = leaf.header().clone();
1443 if let Some(last_timestamp) = test_blocks.last_mut() {
1444 if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1445 == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1446 {
1447 last_timestamp.push(header);
1448 } else {
1449 test_blocks.push(vec![header]);
1450 }
1451 } else {
1452 test_blocks.push(vec![header]);
1453 }
1454 }
1455 tracing::info!("blocks for testing: {test_blocks:#?}");
1456
1457 let check_invariants =
1459 |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1460 let mut prev = res.prev.as_ref();
1461 if let Some(prev) = prev {
1462 if check_prev {
1463 assert!(block_header_timestamp(prev) < start);
1464 }
1465 } else {
1466 assert_eq!(res.from().unwrap(), 0);
1469 };
1470 for header in &res.window {
1471 assert!(start <= block_header_timestamp(header));
1472 assert!(block_header_timestamp(header) < end);
1473 if let Some(prev) = prev {
1474 assert!(
1475 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1476 <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1477 );
1478 }
1479 prev = Some(header);
1480 }
1481 if let Some(next) = &res.next {
1482 assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1483 assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1486 }
1487 };
1488
1489 let get_window = |start, end| {
1490 let ds = ds.clone();
1491 async move {
1492 let window = ds
1493 .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1494 .await
1495 .unwrap();
1496 tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1497 check_invariants(&window, start, end, true);
1498 window
1499 }
1500 };
1501
1502 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1504 let end = start + 1;
1505 let res = get_window(start, end).await;
1506 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1507 assert_eq!(res.window, test_blocks[1]);
1508 assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1509
1510 let start = 0;
1512 let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1513 let res = get_window(start, end).await;
1514 assert_eq!(res.prev, None);
1515 assert_eq!(res.window, test_blocks[0]);
1516 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1517
1518 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1520 let end = i64::MAX as u64;
1521 let res = get_window(start, end).await;
1522 assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1523 assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1526 assert_eq!(res.next, None);
1527 let from = test_blocks.iter().flatten().count() - 1;
1531 let more = ds
1532 .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1533 .await
1534 .unwrap();
1535 check_invariants(&more, start, end, false);
1536 assert_eq!(
1537 more.prev.as_ref().unwrap(),
1538 test_blocks.iter().flatten().nth(from - 1).unwrap()
1539 );
1540 assert_eq!(
1541 more.window[..res.window.len() - test_blocks[2].len() + 1],
1542 res.window[test_blocks[2].len() - 1..]
1543 );
1544 assert_eq!(res.next, None);
1545 let more2 = ds
1547 .get_header_window(
1548 test_blocks[2].last().unwrap().commit(),
1549 end,
1550 i64::MAX as usize,
1551 )
1552 .await
1553 .unwrap();
1554 check_invariants(&more2, start, end, false);
1555 assert_eq!(more2.from().unwrap(), more.from().unwrap());
1556 assert_eq!(more2.prev, more.prev);
1557 assert_eq!(more2.next, more.next);
1558 assert_eq!(more2.window[..more.window.len()], more.window);
1559
1560 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1562 let end = start;
1563 let res = get_window(start, end).await;
1564 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1565 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1566 assert_eq!(res.window, vec![]);
1567
1568 ds.get_header_window(
1570 WindowStart::Time((i64::MAX - 1) as u64),
1571 i64::MAX as u64,
1572 i64::MAX as usize,
1573 )
1574 .await
1575 .unwrap_err();
1576
1577 let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1579 .into_iter()
1580 .flatten()
1581 .collect::<Vec<_>>();
1582 let start = block_header_timestamp(&blocks[0]);
1584 let end = block_header_timestamp(&test_blocks[2][0]);
1585 let res = ds
1586 .get_header_window(WindowStart::Time(start), end, 1)
1587 .await
1588 .unwrap();
1589 assert_eq!(res.prev, None);
1590 assert_eq!(res.window, [blocks[0].clone()]);
1591 assert_eq!(res.next, None);
1592 let res = ds
1594 .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1595 .await
1596 .unwrap();
1597 assert_eq!(res.window, [blocks[1].clone()]);
1598 assert_eq!(res.next, None);
1599 let res = ds
1601 .get_header_window(
1602 WindowStart::Height(blocks[1].height() + 1),
1603 end,
1604 blocks.len() - 1,
1605 )
1606 .await
1607 .unwrap();
1608 assert_eq!(res.window, blocks[2..].to_vec());
1609 assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1610 }
1611
1612 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1613 pub async fn test_latest_qc_chain<D: TestableDataSource>()
1614 where
1615 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1616 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1617 {
1618 let storage = D::create(0).await;
1619 let ds = D::connect(&storage).await;
1620
1621 {
1622 let mut tx = ds.read().await.unwrap();
1623 assert_eq!(tx.latest_qc_chain().await.unwrap(), None);
1624 }
1625
1626 async fn leaf_with_qc_chain(
1627 number: u64,
1628 ) -> (LeafQueryData<MockTypes>, [CertificatePair<MockTypes>; 2]) {
1629 let mut leaf = Leaf2::<MockTypes>::genesis(
1630 &Default::default(),
1631 &Default::default(),
1632 TEST_VERSIONS.test.base,
1633 )
1634 .await;
1635 leaf.block_header_mut().block_number = number;
1636
1637 let mut qc1 = QuorumCertificate2::<MockTypes>::genesis(
1638 &Default::default(),
1639 &Default::default(),
1640 TEST_VERSIONS.test,
1641 )
1642 .await;
1643 qc1.view_number = ViewNumber::new(1);
1644 qc1.data.leaf_commit = Committable::commit(&leaf);
1645
1646 let mut qc2 = qc1.clone();
1647 qc2.view_number += 1;
1648
1649 let leaf = LeafQueryData::new(leaf, qc1.clone()).unwrap();
1650 (
1651 leaf,
1652 [
1653 CertificatePair::non_epoch_change(qc1),
1654 CertificatePair::non_epoch_change(qc2),
1655 ],
1656 )
1657 }
1658
1659 {
1661 let (leaf, qcs) = leaf_with_qc_chain(2).await;
1662 let mut tx = ds.write().await.unwrap();
1663 tx.insert_leaf_with_qc_chain(leaf, Some(qcs.clone()))
1664 .await
1665 .unwrap();
1666 tx.commit().await.unwrap();
1667
1668 assert_eq!(
1669 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1670 Some(qcs)
1671 );
1672 }
1673
1674 {
1677 let (leaf, _) = leaf_with_qc_chain(3).await;
1678 let mut tx = ds.write().await.unwrap();
1679 tx.insert_leaf_with_qc_chain(leaf, None).await.unwrap();
1680 tx.commit().await.unwrap();
1681
1682 assert_eq!(
1683 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1684 None
1685 );
1686 }
1687
1688 {
1691 let (leaf, qcs) = leaf_with_qc_chain(1).await;
1692 let mut tx = ds.write().await.unwrap();
1693 tx.insert_leaf_with_qc_chain(leaf, Some(qcs)).await.unwrap();
1694 tx.commit().await.unwrap();
1695
1696 assert_eq!(
1697 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1698 None
1699 );
1700 }
1701 }
1702}
1703
1704#[cfg(any(test, feature = "testing"))]
1706#[espresso_macros::generic_tests]
1707pub mod status_tests {
1708 use std::time::Duration;
1709
1710 use crate::{
1711 status::StatusDataSource,
1712 testing::{
1713 consensus::{DataSourceLifeCycle, MockNetwork},
1714 mocks::mock_transaction,
1715 sleep,
1716 },
1717 };
1718
1719 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1720 pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1721 let mut network = MockNetwork::<D>::init().await;
1722 let ds = network.data_source();
1723
1724 {
1725 assert_eq!(ds.block_height().await.unwrap(), 0);
1727 assert!(ds.success_rate().await.unwrap().is_nan());
1730 assert!(
1733 (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1734 - chrono::Utc::now().timestamp())
1735 .abs()
1736 <= 1,
1737 "time elapsed since last_decided_time is not within 1s"
1738 );
1739 }
1740
1741 let txn = mock_transaction(vec![1, 2, 3]);
1743 network.submit_transaction(txn.clone()).await;
1744
1745 network.start().await;
1747
1748 loop {
1750 let height = ds.block_height().await.unwrap();
1751 if height > 1 {
1752 break;
1753 }
1754 tracing::info!(height, "waiting for a block to be finalized");
1755 sleep(Duration::from_secs(1)).await;
1756 }
1757
1758 {
1759 let success_rate = ds.success_rate().await.unwrap();
1763 assert!(success_rate.is_finite(), "{success_rate}");
1764 assert!(success_rate > 0.0, "{success_rate}");
1765 }
1766
1767 {
1768 network.shut_down().await;
1771 sleep(Duration::from_secs(3)).await;
1772 assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1774 }
1775 }
1776}
1777
1778#[macro_export]
1779macro_rules! instantiate_data_source_tests {
1780 ($t:ty) => {
1781 use $crate::data_source::{
1782 availability_tests, node_tests, persistence_tests, status_tests,
1783 };
1784
1785 instantiate_availability_tests!($t);
1786 instantiate_persistence_tests!($t);
1787 instantiate_node_tests!($t);
1788 instantiate_status_tests!($t);
1789 };
1790}