1mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50 use std::ops::{Bound, RangeBounds};
51
52 use futures::{
53 future,
54 stream::{BoxStream, StreamExt},
55 };
56
57 use crate::{
58 availability::{BlockQueryData, Fetch, LeafQueryData},
59 node::NodeDataSource,
60 testing::{consensus::TestableDataSource, mocks::MockTypes},
61 };
62
63 async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize>
65 where
66 D: TestableDataSource,
67 R: RangeBounds<usize>,
68 {
69 let start = range.start_bound().cloned();
70 let mut end = range.end_bound().cloned();
71 if end == Bound::Unbounded {
72 end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73 }
74 (start, end)
75 }
76
77 pub async fn block_range<R, D>(
79 ds: &D,
80 range: R,
81 ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82 where
83 D: TestableDataSource,
84 R: RangeBounds<usize> + Send + 'static,
85 {
86 ds.get_block_range(bound_range(ds, range).await)
87 .await
88 .then(Fetch::resolve)
89 .boxed()
90 }
91
92 pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94 where
95 D: TestableDataSource,
96 R: RangeBounds<usize> + Send + 'static,
97 {
98 ds.get_leaf_range(bound_range(ds, range).await)
99 .await
100 .then(Fetch::resolve)
101 .boxed()
102 }
103
104 pub async fn get_non_empty_blocks<D>(
105 ds: &D,
106 ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107 where
108 D: TestableDataSource,
109 {
110 leaf_range(ds, 1..)
112 .await
113 .zip(block_range(ds, 1..).await)
114 .filter(|(_, block)| future::ready(!block.is_empty()))
115 .collect()
116 .await
117 }
118}
119
120#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124 use std::{
125 collections::HashMap,
126 fmt::Debug,
127 ops::{Bound, RangeBounds},
128 };
129
130 use committable::Committable;
131 use futures::stream::StreamExt;
132 use hotshot_types::{data::Leaf2, vote::HasViewNumber};
133
134 use super::test_helpers::*;
135 use crate::{
136 availability::{payload_size, BlockId},
137 data_source::storage::{AvailabilityStorage, NodeStorage},
138 node::NodeDataSource,
139 testing::{
140 consensus::{MockNetwork, TestableDataSource},
141 mocks::{mock_transaction, MockTypes, MockVersions},
142 },
143 types::HeightIndexed,
144 };
145
146 async fn validate<D: TestableDataSource>(ds: &D)
147 where
148 for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
149 {
150 let mut seen_payloads = HashMap::new();
153 let mut seen_transactions = HashMap::new();
154 let mut leaves = leaf_range(ds, ..).await.enumerate();
155 while let Some((i, leaf)) = leaves.next().await {
156 assert_eq!(leaf.height(), i as u64);
157 assert_eq!(
158 leaf.hash(),
159 <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
160 );
161
162 tracing::info!("looking up leaf {i} various ways");
164 assert_eq!(leaf, ds.get_leaf(i).await.await);
165 assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
166
167 tracing::info!("looking up block {i} various ways");
168 let block = ds.get_block(i).await.await;
169 assert_eq!(leaf.block_hash(), block.hash());
170 assert_eq!(block.height(), i as u64);
171 assert_eq!(block.hash(), block.header().commit());
172 assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
173
174 assert_eq!(block, ds.get_block(i).await.await);
176 assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
177 let ix = seen_payloads
187 .entry(block.payload_hash())
188 .or_insert(i as u64);
189 if let Ok(block) = ds
190 .get_block(BlockId::PayloadHash(block.payload_hash()))
191 .await
192 .try_resolve()
193 {
194 assert_eq!(block.height(), *ix);
195 } else {
196 tracing::warn!(
197 "skipping block by payload index check for missing payload {:?}",
198 block.header()
199 );
200 ds.get_block(BlockId::PayloadHash(block.payload_hash()))
202 .await
203 .await;
204 }
205
206 tracing::info!("looking up payload {i} various ways");
208 let expected_payload = block.clone().into();
209 assert_eq!(ds.get_payload(i).await.await, expected_payload);
210 assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
211 if let Ok(payload) = ds
214 .get_payload(BlockId::PayloadHash(block.payload_hash()))
215 .await
216 .try_resolve()
217 {
218 if *ix == i as u64 {
219 assert_eq!(payload, expected_payload);
220 }
221 } else {
222 tracing::warn!(
223 "skipping payload index check for missing payload {:?}",
224 block.header()
225 );
226 ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
228 .await
229 .await;
230 }
231
232 tracing::info!("looking up VID common {i} various ways");
234 let common = ds.get_vid_common(block.height() as usize).await.await;
235 assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
236 if let Ok(res) = ds
239 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
240 .await
241 .try_resolve()
242 {
243 if *ix == i as u64 {
244 assert_eq!(res, common);
245 }
246 } else {
247 tracing::warn!(
248 "skipping VID common index check for missing data {:?}",
249 block.header()
250 );
251 let res = ds
253 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
254 .await
255 .await;
256 assert_eq!(res.payload_hash(), common.payload_hash());
257 }
258
259 for (j, txn) in block.enumerate() {
260 tracing::info!("looking up transaction {i},{j:?}");
261
262 let ix = seen_transactions
270 .entry(txn.commit())
271 .or_insert((i as u64, j.clone()));
272 if let Ok(tx_data) = ds
273 .get_block_containing_transaction(txn.commit())
274 .await
275 .try_resolve()
276 {
277 assert_eq!(tx_data.transaction.transaction(), &txn);
278 assert_eq!(tx_data.transaction.block_height(), ix.0);
279 assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
280 assert_eq!(tx_data.index, ix.1);
281 assert_eq!(tx_data.block, block);
282 } else {
283 tracing::warn!(
284 "skipping transaction index check for missing transaction {j:?} {txn:?}"
285 );
286 ds.get_block_containing_transaction(txn.commit())
288 .await
289 .await;
290 }
291 }
292 }
293
294 {
296 let mut tx = ds.read().await.unwrap();
297 let block_height = NodeStorage::block_height(&mut tx).await.unwrap();
298 let last_leaf = tx.get_leaf((block_height - 1).into()).await.unwrap();
299
300 if last_leaf.qc().data.epoch.is_some() {
301 tracing::info!(block_height, "checking QC chain");
302 let qc_chain = tx.latest_qc_chain().await.unwrap().unwrap();
303
304 assert_eq!(last_leaf.height(), (block_height - 1) as u64);
305 assert_eq!(qc_chain[0].view_number(), last_leaf.leaf().view_number());
306 assert_eq!(qc_chain[0].leaf_commit(), last_leaf.hash());
307 assert_eq!(qc_chain[1].view_number(), qc_chain[0].view_number() + 1);
308 }
309 }
310 }
311
312 #[test_log::test(tokio::test(flavor = "multi_thread"))]
313 pub async fn test_update<D: TestableDataSource>()
314 where
315 for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
316 {
317 let mut network = MockNetwork::<D, MockVersions>::init().await;
318 let ds = network.data_source();
319
320 network.start().await;
321 assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
322
323 let mut blocks = ds.subscribe_blocks(0).await.enumerate();
326 for nonce in 0..3 {
327 let txn = mock_transaction(vec![nonce]);
328 network.submit_transaction(txn).await;
329
330 let (i, block) = loop {
332 tracing::info!("waiting for tx {nonce}");
333 let (i, block) = blocks.next().await.unwrap();
334 if !block.is_empty() {
335 break (i, block);
336 }
337 tracing::info!("block {i} is empty");
338 };
339
340 tracing::info!("got tx {nonce} in block {i}");
341 assert_eq!(ds.get_block(i).await.await, block);
342 validate(&ds).await;
343 }
344
345 {
349 tracing::info!("checking persisted storage");
350 let storage = D::connect(network.storage()).await;
351
352 let block_height = NodeDataSource::block_height(&ds).await.unwrap();
356 assert_eq!(
357 ds.get_block_range(..block_height)
358 .await
359 .map(|fetch| fetch.try_resolve().ok())
360 .collect::<Vec<_>>()
361 .await,
362 storage
363 .get_block_range(..block_height)
364 .await
365 .map(|fetch| fetch.try_resolve().ok())
366 .collect::<Vec<_>>()
367 .await
368 );
369 assert_eq!(
370 ds.get_leaf_range(..block_height)
371 .await
372 .map(|fetch| fetch.try_resolve().ok())
373 .collect::<Vec<_>>()
374 .await,
375 storage
376 .get_leaf_range(..block_height)
377 .await
378 .map(|fetch| fetch.try_resolve().ok())
379 .collect::<Vec<_>>()
380 .await
381 );
382 }
383 }
384
385 #[test_log::test(tokio::test(flavor = "multi_thread"))]
386 pub async fn test_range<D: TestableDataSource>()
387 where
388 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
389 {
390 let mut network = MockNetwork::<D, MockVersions>::init().await;
391 let ds = network.data_source();
392 network.start().await;
393
394 let block_height = loop {
396 let mut tx = ds.read().await.unwrap();
397 let block_height = tx.block_height().await.unwrap();
398 if block_height >= 3 {
399 break block_height as u64;
400 }
401 };
402
403 do_range_test(&ds, 1..=2, 1..3).await; do_range_test(&ds, 1..3, 1..3).await; do_range_test(&ds, 1.., 1..block_height).await; do_range_test(&ds, ..=2, 0..3).await; do_range_test(&ds, ..3, 0..3).await; do_range_test(&ds, .., 0..block_height).await; do_range_test(&ds, ExRange(0..=2), 1..3).await; do_range_test(&ds, ExRange(0..3), 1..3).await; do_range_test(&ds, ExRange(0..), 1..block_height).await; }
415
416 async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
417 where
418 D: TestableDataSource,
419 R: RangeBounds<usize> + Clone + Debug + Send + 'static,
420 I: IntoIterator<Item = u64>,
421 {
422 tracing::info!("testing range {range:?}");
423
424 let mut leaves = ds.get_leaf_range(range.clone()).await;
425 let mut blocks = ds.get_block_range(range.clone()).await;
426 let mut payloads = ds.get_payload_range(range.clone()).await;
427 let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
428 let mut vid_common = ds.get_vid_common_range(range.clone()).await;
429 let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
430
431 for i in expected_indices {
432 tracing::info!(i, "check entries");
433 let leaf = leaves.next().await.unwrap().await;
434 let block = blocks.next().await.unwrap().await;
435 let payload = payloads.next().await.unwrap().await;
436 let payload_meta = payloads_meta.next().await.unwrap().await;
437 let common = vid_common.next().await.unwrap().await;
438 let common_meta = vid_common_meta.next().await.unwrap().await;
439 assert_eq!(leaf.height(), i);
440 assert_eq!(block.height(), i);
441 assert_eq!(payload, ds.get_payload(i as usize).await.await);
442 assert_eq!(payload_meta, block.into());
443 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
444 assert_eq!(common_meta, common.into());
445 }
446
447 if range.end_bound() == Bound::Unbounded {
448 loop {
451 let fetch_leaf = leaves.next().await.unwrap();
452 let fetch_block = blocks.next().await.unwrap();
453 let fetch_payload = payloads.next().await.unwrap();
454 let fetch_payload_meta = payloads_meta.next().await.unwrap();
455 let fetch_common = vid_common.next().await.unwrap();
456 let fetch_common_meta = vid_common_meta.next().await.unwrap();
457
458 if fetch_leaf.try_resolve().is_ok()
459 && fetch_block.try_resolve().is_ok()
460 && fetch_payload.try_resolve().is_ok()
461 && fetch_payload_meta.try_resolve().is_ok()
462 && fetch_common.try_resolve().is_ok()
463 && fetch_common_meta.try_resolve().is_ok()
464 {
465 tracing::info!("searching for end of available objects");
466 } else {
467 break;
468 }
469 }
470 } else {
471 assert!(leaves.next().await.is_none());
473 assert!(blocks.next().await.is_none());
474 assert!(payloads.next().await.is_none());
475 assert!(payloads_meta.next().await.is_none());
476 assert!(vid_common.next().await.is_none());
477 assert!(vid_common_meta.next().await.is_none());
478 }
479 }
480
481 #[test_log::test(tokio::test(flavor = "multi_thread"))]
482 pub async fn test_range_rev<D: TestableDataSource>()
483 where
484 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
485 {
486 let mut network = MockNetwork::<D, MockVersions>::init().await;
487 let ds = network.data_source();
488 network.start().await;
489
490 ds.subscribe_leaves(5).await.next().await.unwrap();
492
493 do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
495 do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
496 do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
497 }
498
499 async fn do_range_rev_test<D>(
500 ds: &D,
501 start: Bound<usize>,
502 end: usize,
503 expected_indices: impl DoubleEndedIterator<Item = u64>,
504 ) where
505 D: TestableDataSource,
506 {
507 tracing::info!("testing range {start:?}-{end}");
508
509 let mut leaves = ds.get_leaf_range_rev(start, end).await;
510 let mut blocks = ds.get_block_range_rev(start, end).await;
511 let mut payloads = ds.get_payload_range_rev(start, end).await;
512 let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
513 let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
514 let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
515
516 for i in expected_indices.rev() {
517 tracing::info!(i, "check entries");
518 let leaf = leaves.next().await.unwrap().await;
519 let block = blocks.next().await.unwrap().await;
520 let payload = payloads.next().await.unwrap().await;
521 let payload_meta = payloads_meta.next().await.unwrap().await;
522 let common = vid_common.next().await.unwrap().await;
523 let common_meta = vid_common_meta.next().await.unwrap().await;
524 assert_eq!(leaf.height(), i);
525 assert_eq!(block.height(), i);
526 assert_eq!(payload.height(), i);
527 assert_eq!(payload_meta.height(), i);
528 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
529 assert_eq!(
530 common_meta,
531 ds.get_vid_common_metadata(i as usize).await.await
532 );
533 }
534
535 assert!(leaves.next().await.is_none());
537 assert!(blocks.next().await.is_none());
538 assert!(payloads.next().await.is_none());
539 assert!(payloads_meta.next().await.is_none());
540 assert!(vid_common.next().await.is_none());
541 assert!(vid_common_meta.next().await.is_none());
542 }
543
544 #[derive(Clone, Copy, Debug)]
546 struct ExRange<R>(R);
547
548 impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
549 fn start_bound(&self) -> Bound<&usize> {
550 match self.0.start_bound() {
551 Bound::Included(x) => Bound::Excluded(x),
552 Bound::Excluded(x) => Bound::Excluded(x),
553 Bound::Unbounded => Bound::Excluded(&0),
554 }
555 }
556
557 fn end_bound(&self) -> Bound<&usize> {
558 self.0.end_bound()
559 }
560 }
561}
562
563#[cfg(any(test, feature = "testing"))]
565#[espresso_macros::generic_tests]
566pub mod persistence_tests {
567 use committable::Committable;
568 use hotshot_example_types::state_types::{TestInstanceState, TestValidatedState};
569 use hotshot_types::simple_certificate::QuorumCertificate2;
570
571 use crate::{
572 availability::{BlockQueryData, LeafQueryData},
573 data_source::{
574 storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
575 Transaction,
576 },
577 node::NodeDataSource,
578 testing::{
579 consensus::TestableDataSource,
580 mocks::{MockPayload, MockTypes},
581 },
582 types::HeightIndexed,
583 Leaf2,
584 };
585
586 #[test_log::test(tokio::test(flavor = "multi_thread"))]
587 pub async fn test_revert<D: TestableDataSource>()
588 where
589 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
590 + AvailabilityStorage<MockTypes>
591 + NodeStorage<MockTypes>,
592 {
593 use hotshot_example_types::node_types::TestVersions;
594
595 let storage = D::create(0).await;
596 let ds = D::connect(&storage).await;
597
598 let mut qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
600 &TestValidatedState::default(),
601 &TestInstanceState::default(),
602 )
603 .await;
604 let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
605 &TestValidatedState::default(),
606 &TestInstanceState::default(),
607 )
608 .await;
609 leaf.block_header_mut().block_number += 1;
612 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
613
614 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
615 let leaf = LeafQueryData::new(leaf, qc).unwrap();
616
617 let mut tx = ds.write().await.unwrap();
619 tx.insert_leaf(leaf.clone()).await.unwrap();
620 tx.insert_block(block.clone()).await.unwrap();
621
622 assert_eq!(tx.block_height().await.unwrap(), 2);
623 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
624 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
625
626 tx.revert().await;
628 assert_eq!(
629 NodeDataSource::<MockTypes>::block_height(&ds)
630 .await
631 .unwrap(),
632 0
633 );
634 ds.get_leaf(1).await.try_resolve().unwrap_err();
635 ds.get_block(1).await.try_resolve().unwrap_err();
636 }
637
638 #[test_log::test(tokio::test(flavor = "multi_thread"))]
639 pub async fn test_reset<D: TestableDataSource>()
640 where
641 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
642 {
643 use hotshot_example_types::node_types::TestVersions;
644
645 let storage = D::create(0).await;
646 let ds = D::connect(&storage).await;
647
648 let mut qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
650 &TestValidatedState::default(),
651 &TestInstanceState::default(),
652 )
653 .await;
654 let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
655 &TestValidatedState::default(),
656 &TestInstanceState::default(),
657 )
658 .await;
659 leaf.block_header_mut().block_number += 1;
662 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
663
664 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
665 let leaf = LeafQueryData::new(leaf, qc).unwrap();
666
667 let mut tx = ds.write().await.unwrap();
669 tx.insert_leaf(leaf.clone()).await.unwrap();
670 tx.insert_block(block.clone()).await.unwrap();
671 tx.commit().await.unwrap();
672
673 assert_eq!(
674 NodeDataSource::<MockTypes>::block_height(&ds)
675 .await
676 .unwrap(),
677 2
678 );
679 assert_eq!(leaf, ds.get_leaf(1).await.await);
680 assert_eq!(block, ds.get_block(1).await.await);
681
682 drop(ds);
683
684 let ds = D::reset(&storage).await;
686 assert_eq!(
687 NodeDataSource::<MockTypes>::block_height(&ds)
688 .await
689 .unwrap(),
690 0
691 );
692 ds.get_leaf(1).await.try_resolve().unwrap_err();
693 ds.get_block(1).await.try_resolve().unwrap_err();
694 }
695
696 #[test_log::test(tokio::test(flavor = "multi_thread"))]
697 pub async fn test_drop_tx<D: TestableDataSource>()
698 where
699 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
700 + AvailabilityStorage<MockTypes>
701 + NodeStorage<MockTypes>,
702 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
703 {
704 use hotshot_example_types::node_types::TestVersions;
705
706 let storage = D::create(0).await;
707 let ds = D::connect(&storage).await;
708
709 let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
711 &TestValidatedState::default(),
712 &TestInstanceState::default(),
713 )
714 .await;
715 let mut mock_leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
716 &TestValidatedState::default(),
717 &TestInstanceState::default(),
718 )
719 .await;
720 mock_leaf.block_header_mut().block_number += 1;
723 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
724
725 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
726 let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
727
728 tracing::info!("write");
730 let mut tx = ds.write().await.unwrap();
731 tx.insert_leaf(leaf.clone()).await.unwrap();
732 tx.insert_block(block.clone()).await.unwrap();
733
734 assert_eq!(tx.block_height().await.unwrap(), 2);
735 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
736 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
737
738 drop(tx);
740
741 tracing::info!("read");
743 let mut tx = ds.read().await.unwrap();
744 assert_eq!(tx.block_height().await.unwrap(), 0);
745 drop(tx);
746
747 mock_leaf.block_header_mut().block_number += 1;
749 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
750 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
751 let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
752
753 tracing::info!("write again");
754 let mut tx = ds.write().await.unwrap();
755 tx.insert_leaf(leaf.clone()).await.unwrap();
756 tx.insert_block(block.clone()).await.unwrap();
757 tx.commit().await.unwrap();
758
759 tracing::info!("read again");
762 let height = leaf.height() as usize;
763 assert_eq!(
764 NodeDataSource::<MockTypes>::block_height(&ds)
765 .await
766 .unwrap(),
767 height + 1
768 );
769 assert_eq!(leaf, ds.get_leaf(height).await.await);
770 assert_eq!(block, ds.get_block(height).await.await);
771 ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
772 ds.get_block(height - 1).await.try_resolve().unwrap_err();
773 }
774}
775
776#[cfg(any(test, feature = "testing"))]
778#[espresso_macros::generic_tests]
779pub mod node_tests {
780 use std::time::Duration;
781
782 use committable::Committable;
783 use futures::{future::join_all, stream::StreamExt};
784 use hotshot::traits::BlockPayload;
785 use hotshot_example_types::{
786 block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
787 node_types::{TestTypes, TestVersions},
788 state_types::{TestInstanceState, TestValidatedState},
789 };
790 use hotshot_types::{
791 data::{vid_commitment, VidCommitment, VidShare, ViewNumber},
792 simple_certificate::{CertificatePair, QuorumCertificate2},
793 traits::{
794 block_contents::{BlockHeader, EncodeBytes},
795 node_implementation::{ConsensusTime, Versions},
796 },
797 vid::advz::{advz_scheme, ADVZScheme},
798 };
799 use jf_advz::VidScheme;
800 use vbs::version::StaticVersionType;
801
802 use crate::{
803 availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
804 data_source::{
805 storage::{NodeStorage, UpdateAvailabilityStorage},
806 update::Transaction,
807 },
808 node::{BlockId, NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
809 testing::{
810 consensus::{MockNetwork, TestableDataSource},
811 mocks::{mock_transaction, MockPayload, MockTypes, MockVersions},
812 sleep,
813 },
814 types::HeightIndexed,
815 Header, Leaf2, VidCommon,
816 };
817
818 fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
819 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
820 }
821
822 #[test_log::test(tokio::test(flavor = "multi_thread"))]
823 pub async fn test_sync_status<D: TestableDataSource>()
824 where
825 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
826 {
827 use hotshot_example_types::node_types::TestVersions;
828
829 let storage = D::create(0).await;
830 let ds = D::connect(&storage).await;
831
832 let mut vid = advz_scheme(2);
834
835 let mut leaves = vec![
837 LeafQueryData::<MockTypes>::genesis::<TestVersions>(
838 &TestValidatedState::default(),
839 &TestInstanceState::default(),
840 )
841 .await,
842 ];
843 let mut blocks = vec![
844 BlockQueryData::<MockTypes>::genesis::<TestVersions>(
845 &TestValidatedState::default(),
846 &TestInstanceState::default(),
847 )
848 .await,
849 ];
850 for i in 0..2 {
851 let mut leaf = leaves[i].clone();
852 leaf.leaf.block_header_mut().block_number += 1;
853 leaves.push(leaf);
854
855 let mut block = blocks[i].clone();
856 block.header.block_number += 1;
857 blocks.push(block);
858 }
859 let disperse = vid.disperse([]).unwrap();
862 let vid = leaves
863 .iter()
864 .map(|leaf| {
865 (
866 VidCommonQueryData::new(
867 leaf.header().clone(),
868 VidCommon::V0(disperse.common.clone()),
869 ),
870 disperse.shares[0].clone(),
871 )
872 })
873 .collect::<Vec<_>>();
874
875 assert!(ds.sync_status().await.unwrap().is_fully_synced());
877
878 ds.append(leaves[0].clone().into()).await.unwrap();
881 assert_eq!(
882 ds.sync_status().await.unwrap(),
883 SyncStatus {
884 missing_blocks: 1,
885 missing_vid_common: 1,
886 missing_vid_shares: 1,
887 missing_leaves: 0,
888 pruned_height: None,
889 }
890 );
891
892 ds.append(leaves[2].clone().into()).await.unwrap();
895 assert_eq!(
896 ds.sync_status().await.unwrap(),
897 SyncStatus {
898 missing_blocks: 3,
899 missing_vid_common: 3,
900 missing_vid_shares: 3,
901 missing_leaves: 1,
902 pruned_height: None,
903 }
904 );
905
906 {
908 let mut tx = ds.write().await.unwrap();
909 tx.insert_vid(vid[0].0.clone(), None).await.unwrap();
910 tx.commit().await.unwrap();
911 }
912 assert_eq!(
913 ds.sync_status().await.unwrap(),
914 SyncStatus {
915 missing_blocks: 3,
916 missing_vid_common: 2,
917 missing_vid_shares: 3,
918 missing_leaves: 1,
919 pruned_height: None,
920 }
921 );
922
923 {
925 let mut tx = ds.write().await.unwrap();
926 tx.insert_block(blocks[0].clone()).await.unwrap();
927 tx.insert_vid(vid[0].0.clone(), Some(VidShare::V0(vid[0].1.clone())))
928 .await
929 .unwrap();
930 tx.insert_leaf(leaves[1].clone()).await.unwrap();
931 tx.insert_block(blocks[1].clone()).await.unwrap();
932 tx.insert_vid(vid[1].0.clone(), Some(VidShare::V0(vid[1].1.clone())))
933 .await
934 .unwrap();
935 tx.insert_block(blocks[2].clone()).await.unwrap();
936 tx.insert_vid(vid[2].0.clone(), Some(VidShare::V0(vid[2].1.clone())))
937 .await
938 .unwrap();
939 tx.commit().await.unwrap();
940 }
941
942 let expected_missing = if ds.get_leaf(1).await.try_resolve().is_err() {
947 tracing::warn!(
948 "data source does not support out-of-order filling, allowing one missing leaf and \
949 VID share"
950 );
951 1
952 } else {
953 0
954 };
955 let expected_sync_status = SyncStatus {
956 missing_blocks: 0,
957 missing_leaves: expected_missing,
958 missing_vid_common: 0,
959 missing_vid_shares: expected_missing,
960 pruned_height: None,
961 };
962 assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
963
964 {
967 let mut tx = ds.write().await.unwrap();
968 tx.insert_vid(vid[0].0.clone(), None).await.unwrap();
969 tx.commit().await.unwrap();
970 }
971 assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
972 }
973
974 #[test_log::test(tokio::test(flavor = "multi_thread"))]
975 pub async fn test_counters<D: TestableDataSource>() {
976 use hotshot_example_types::node_types::TestVersions;
977
978 let storage = D::create(0).await;
979 let ds = D::connect(&storage).await;
980
981 assert_eq!(ds.count_transactions().await.unwrap(), 0);
982 assert_eq!(ds.payload_size().await.unwrap(), 0);
983
984 let mut total_transactions = 0;
986 let mut total_size = 0;
987 'outer: for i in [0, 1, 2] {
988 let (payload, metadata) =
993 <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
994 [mock_transaction(vec![i as u8 % 2])],
995 &TestValidatedState::default(),
996 &TestInstanceState::default(),
997 )
998 .await
999 .unwrap();
1000 let encoded = payload.encode();
1001 let payload_commitment = vid_commitment::<TestVersions>(
1002 &encoded,
1003 &metadata.encode(),
1004 1,
1005 <TestVersions as Versions>::Base::VERSION,
1006 );
1007 let header = TestBlockHeader {
1008 block_number: i,
1009 payload_commitment,
1010 timestamp: i,
1011 timestamp_millis: i * 1_000,
1012 builder_commitment:
1013 <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
1014 &payload, &metadata,
1015 ),
1016 metadata: TestMetadata {
1017 num_transactions: 7, },
1019 random: 1, version: <TestVersions as Versions>::Base::VERSION,
1021 };
1022
1023 let mut leaf = LeafQueryData::<MockTypes>::genesis::<TestVersions>(
1024 &TestValidatedState::default(),
1025 &TestInstanceState::default(),
1026 )
1027 .await;
1028 *leaf.leaf.block_header_mut() = header.clone();
1029 let block = BlockQueryData::new(header, payload);
1030 ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1031 .await
1032 .unwrap();
1033 assert_eq!(
1034 NodeDataSource::<MockTypes>::block_height(&ds)
1035 .await
1036 .unwrap(),
1037 (i + 1) as usize,
1038 );
1039
1040 total_transactions += 1;
1041 total_size += encoded.len();
1042
1043 for retry in 0..5 {
1045 let ds_transactions = ds.count_transactions().await.unwrap();
1046 let ds_payload_size = ds.payload_size().await.unwrap();
1047 if ds_transactions != total_transactions || ds_payload_size != total_size {
1048 tracing::info!(
1049 i,
1050 retry,
1051 total_transactions,
1052 ds_transactions,
1053 total_size,
1054 ds_payload_size,
1055 "waiting for statistics to update"
1056 );
1057 sleep(Duration::from_secs(1)).await;
1058 } else {
1059 continue 'outer;
1060 }
1061 }
1062 panic!("counters did not update in time");
1063 }
1064 }
1065
1066 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1067 pub async fn test_vid_shares<D: TestableDataSource>()
1068 where
1069 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1070 {
1071 let mut network = MockNetwork::<D, MockVersions>::init().await;
1072 let ds = network.data_source();
1073
1074 network.start().await;
1075
1076 let mut leaves = ds.subscribe_leaves(0).await.take(3);
1078 while let Some(leaf) = leaves.next().await {
1079 tracing::info!("got leaf {}", leaf.height());
1080 let mut tx = ds.read().await.unwrap();
1081 let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1082 assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1083 assert_eq!(
1084 share,
1085 tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1086 .await
1087 .unwrap()
1088 );
1089 }
1090 }
1091
1092 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1093 pub async fn test_vid_monotonicity<D: TestableDataSource>()
1094 where
1095 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1096 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1097 {
1098 use hotshot_example_types::node_types::TestVersions;
1099
1100 let storage = D::create(0).await;
1101 let ds = D::connect(&storage).await;
1102
1103 let mut vid = advz_scheme(2);
1105 let disperse = vid.disperse([]).unwrap();
1106
1107 let leaf = LeafQueryData::<MockTypes>::genesis::<TestVersions>(
1109 &TestValidatedState::default(),
1110 &TestInstanceState::default(),
1111 )
1112 .await;
1113 let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1114 ds.append(BlockInfo::new(
1115 leaf,
1116 None,
1117 Some(common.clone()),
1118 Some(VidShare::V0(disperse.shares[0].clone())),
1119 ))
1120 .await
1121 .unwrap();
1122
1123 {
1124 assert_eq!(ds.get_vid_common(0).await.await, common);
1125 assert_eq!(
1126 ds.vid_share(0).await.unwrap(),
1127 VidShare::V0(disperse.shares[0].clone())
1128 );
1129 }
1130
1131 {
1134 let mut tx = ds.write().await.unwrap();
1135 tx.insert_vid(common.clone(), None).await.unwrap();
1136 tx.commit().await.unwrap();
1137 }
1138 {
1139 assert_eq!(ds.get_vid_common(0).await.await, common);
1140 assert_eq!(
1141 ds.vid_share(0).await.unwrap(),
1142 VidShare::V0(disperse.shares[0].clone())
1143 );
1144 }
1145 }
1146
1147 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1148 pub async fn test_vid_recovery<D: TestableDataSource>()
1149 where
1150 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1151 {
1152 let mut network = MockNetwork::<D, MockVersions>::init().await;
1153 let ds = network.data_source();
1154
1155 network.start().await;
1156
1157 let mut blocks = ds.subscribe_blocks(0).await;
1159 let txn = mock_transaction(vec![1, 2, 3]);
1160 network.submit_transaction(txn.clone()).await;
1161
1162 let block = loop {
1164 tracing::info!("waiting for transaction");
1165 let block = blocks.next().await.unwrap();
1166 if !block.is_empty() {
1167 tracing::info!(height = block.height(), "transaction sequenced");
1168 break block;
1169 }
1170 tracing::info!(height = block.height(), "empty block");
1171 };
1172 let height = block.height() as usize;
1173 let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1174 commit
1175 } else {
1176 panic!("expect ADVZ commitment")
1177 };
1178
1179 let vid = advz_scheme(network.num_nodes());
1181
1182 tracing::info!("fetching common data");
1184 let common = ds.get_vid_common(height).await.await;
1185 let VidCommon::V0(common) = &common.common() else {
1186 panic!("expect ADVZ common");
1187 };
1188 ADVZScheme::is_consistent(&commit, common).unwrap();
1189
1190 tracing::info!("fetching shares");
1192 let network = &network;
1193 let vid = &vid;
1194 let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1195 let ds = network.data_source_index(i);
1196
1197 let mut leaves = ds.subscribe_leaves(height).await;
1200 let leaf = leaves.next().await.unwrap();
1201 assert_eq!(leaf.height(), height as u64);
1202 assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1203
1204 let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1205 share
1206 } else {
1207 panic!("expect ADVZ share")
1208 };
1209 vid.verify_share(&share, common, &commit).unwrap().unwrap();
1210 share
1211 }))
1212 .await;
1213
1214 tracing::info!("recovering payload");
1216 let bytes = vid.recover_payload(&shares, common).unwrap();
1217 let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1218 &bytes,
1219 &TestMetadata {
1220 num_transactions: 7, },
1222 );
1223 assert_eq!(recovered, *block.payload());
1224 assert_eq!(recovered.transactions, vec![txn]);
1225 }
1226
1227 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1228 pub async fn test_timestamp_window<D: TestableDataSource>() {
1229 let mut network = MockNetwork::<D, MockVersions>::init().await;
1230 let ds = network.data_source();
1231
1232 network.start().await;
1233
1234 let mut leaves = ds.subscribe_leaves(0).await;
1237 let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1240 while test_blocks.len() < 3 {
1241 let leaf = leaves.next().await.unwrap();
1243 let header = leaf.header().clone();
1244 if let Some(last_timestamp) = test_blocks.last_mut() {
1245 if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1246 == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1247 {
1248 last_timestamp.push(header);
1249 } else {
1250 test_blocks.push(vec![header]);
1251 }
1252 } else {
1253 test_blocks.push(vec![header]);
1254 }
1255 }
1256 tracing::info!("blocks for testing: {test_blocks:#?}");
1257
1258 let check_invariants =
1260 |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1261 let mut prev = res.prev.as_ref();
1262 if let Some(prev) = prev {
1263 if check_prev {
1264 assert!(block_header_timestamp(prev) < start);
1265 }
1266 } else {
1267 assert_eq!(res.from().unwrap(), 0);
1270 };
1271 for header in &res.window {
1272 assert!(start <= block_header_timestamp(header));
1273 assert!(block_header_timestamp(header) < end);
1274 if let Some(prev) = prev {
1275 assert!(
1276 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1277 <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1278 );
1279 }
1280 prev = Some(header);
1281 }
1282 if let Some(next) = &res.next {
1283 assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1284 assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1287 }
1288 };
1289
1290 let get_window = |start, end| {
1291 let ds = ds.clone();
1292 async move {
1293 let window = ds
1294 .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1295 .await
1296 .unwrap();
1297 tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1298 check_invariants(&window, start, end, true);
1299 window
1300 }
1301 };
1302
1303 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1305 let end = start + 1;
1306 let res = get_window(start, end).await;
1307 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1308 assert_eq!(res.window, test_blocks[1]);
1309 assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1310
1311 let start = 0;
1313 let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1314 let res = get_window(start, end).await;
1315 assert_eq!(res.prev, None);
1316 assert_eq!(res.window, test_blocks[0]);
1317 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1318
1319 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1321 let end = i64::MAX as u64;
1322 let res = get_window(start, end).await;
1323 assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1324 assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1327 assert_eq!(res.next, None);
1328 let from = test_blocks.iter().flatten().count() - 1;
1332 let more = ds
1333 .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1334 .await
1335 .unwrap();
1336 check_invariants(&more, start, end, false);
1337 assert_eq!(
1338 more.prev.as_ref().unwrap(),
1339 test_blocks.iter().flatten().nth(from - 1).unwrap()
1340 );
1341 assert_eq!(
1342 more.window[..res.window.len() - test_blocks[2].len() + 1],
1343 res.window[test_blocks[2].len() - 1..]
1344 );
1345 assert_eq!(res.next, None);
1346 let more2 = ds
1348 .get_header_window(
1349 test_blocks[2].last().unwrap().commit(),
1350 end,
1351 i64::MAX as usize,
1352 )
1353 .await
1354 .unwrap();
1355 check_invariants(&more2, start, end, false);
1356 assert_eq!(more2.from().unwrap(), more.from().unwrap());
1357 assert_eq!(more2.prev, more.prev);
1358 assert_eq!(more2.next, more.next);
1359 assert_eq!(more2.window[..more.window.len()], more.window);
1360
1361 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1363 let end = start;
1364 let res = get_window(start, end).await;
1365 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1366 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1367 assert_eq!(res.window, vec![]);
1368
1369 ds.get_header_window(
1371 WindowStart::Time((i64::MAX - 1) as u64),
1372 i64::MAX as u64,
1373 i64::MAX as usize,
1374 )
1375 .await
1376 .unwrap_err();
1377
1378 let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1380 .into_iter()
1381 .flatten()
1382 .collect::<Vec<_>>();
1383 let start = block_header_timestamp(&blocks[0]);
1385 let end = block_header_timestamp(&test_blocks[2][0]);
1386 let res = ds
1387 .get_header_window(WindowStart::Time(start), end, 1)
1388 .await
1389 .unwrap();
1390 assert_eq!(res.prev, None);
1391 assert_eq!(res.window, [blocks[0].clone()]);
1392 assert_eq!(res.next, None);
1393 let res = ds
1395 .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1396 .await
1397 .unwrap();
1398 assert_eq!(res.window, [blocks[1].clone()]);
1399 assert_eq!(res.next, None);
1400 let res = ds
1402 .get_header_window(
1403 WindowStart::Height(blocks[1].height() + 1),
1404 end,
1405 blocks.len() - 1,
1406 )
1407 .await
1408 .unwrap();
1409 assert_eq!(res.window, blocks[2..].to_vec());
1410 assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1411 }
1412
1413 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1414 pub async fn test_latest_qc_chain<D: TestableDataSource>()
1415 where
1416 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1417 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1418 {
1419 let storage = D::create(0).await;
1420 let ds = D::connect(&storage).await;
1421
1422 {
1423 let mut tx = ds.read().await.unwrap();
1424 assert_eq!(tx.latest_qc_chain().await.unwrap(), None);
1425 }
1426
1427 async fn leaf_with_qc_chain(
1428 number: u64,
1429 ) -> (LeafQueryData<MockTypes>, [CertificatePair<MockTypes>; 2]) {
1430 let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
1431 &Default::default(),
1432 &Default::default(),
1433 )
1434 .await;
1435 leaf.block_header_mut().block_number = number;
1436
1437 let mut qc1 = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
1438 &Default::default(),
1439 &Default::default(),
1440 )
1441 .await;
1442 qc1.view_number = ViewNumber::new(1);
1443 qc1.data.leaf_commit = Committable::commit(&leaf);
1444
1445 let mut qc2 = qc1.clone();
1446 qc2.view_number += 1;
1447
1448 let leaf = LeafQueryData::new(leaf, qc1.clone()).unwrap();
1449 (
1450 leaf,
1451 [
1452 CertificatePair::non_epoch_change(qc1),
1453 CertificatePair::non_epoch_change(qc2),
1454 ],
1455 )
1456 }
1457
1458 {
1460 let (leaf, qcs) = leaf_with_qc_chain(2).await;
1461 let mut tx = ds.write().await.unwrap();
1462 tx.insert_leaf_with_qc_chain(leaf, Some(qcs.clone()))
1463 .await
1464 .unwrap();
1465 tx.commit().await.unwrap();
1466
1467 assert_eq!(
1468 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1469 Some(qcs)
1470 );
1471 }
1472
1473 {
1476 let (leaf, _) = leaf_with_qc_chain(3).await;
1477 let mut tx = ds.write().await.unwrap();
1478 tx.insert_leaf_with_qc_chain(leaf, None).await.unwrap();
1479 tx.commit().await.unwrap();
1480
1481 assert_eq!(
1482 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1483 None
1484 );
1485 }
1486
1487 {
1490 let (leaf, qcs) = leaf_with_qc_chain(1).await;
1491 let mut tx = ds.write().await.unwrap();
1492 tx.insert_leaf_with_qc_chain(leaf, Some(qcs)).await.unwrap();
1493 tx.commit().await.unwrap();
1494
1495 assert_eq!(
1496 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1497 None
1498 );
1499 }
1500 }
1501}
1502
1503#[cfg(any(test, feature = "testing"))]
1505#[espresso_macros::generic_tests]
1506pub mod status_tests {
1507 use std::time::Duration;
1508
1509 use crate::{
1510 status::StatusDataSource,
1511 testing::{
1512 consensus::{DataSourceLifeCycle, MockNetwork},
1513 mocks::{mock_transaction, MockVersions},
1514 sleep,
1515 },
1516 };
1517
1518 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1519 pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1520 let mut network = MockNetwork::<D, MockVersions>::init().await;
1521 let ds = network.data_source();
1522
1523 {
1524 assert_eq!(ds.block_height().await.unwrap(), 0);
1526 assert!(ds.success_rate().await.unwrap().is_nan());
1529 assert!(
1532 (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1533 - chrono::Utc::now().timestamp())
1534 .abs()
1535 <= 1,
1536 "time elapsed since last_decided_time is not within 1s"
1537 );
1538 }
1539
1540 let txn = mock_transaction(vec![1, 2, 3]);
1542 network.submit_transaction(txn.clone()).await;
1543
1544 network.start().await;
1546
1547 loop {
1549 let height = ds.block_height().await.unwrap();
1550 if height > 1 {
1551 break;
1552 }
1553 tracing::info!(height, "waiting for a block to be finalized");
1554 sleep(Duration::from_secs(1)).await;
1555 }
1556
1557 {
1558 let success_rate = ds.success_rate().await.unwrap();
1562 assert!(success_rate.is_finite(), "{success_rate}");
1563 assert!(success_rate > 0.0, "{success_rate}");
1564 }
1565
1566 {
1567 network.shut_down().await;
1570 sleep(Duration::from_secs(3)).await;
1571 assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1573 }
1574 }
1575}
1576
1577#[macro_export]
1578macro_rules! instantiate_data_source_tests {
1579 ($t:ty) => {
1580 use $crate::data_source::{
1581 availability_tests, node_tests, persistence_tests, status_tests,
1582 };
1583
1584 instantiate_availability_tests!($t);
1585 instantiate_persistence_tests!($t);
1586 instantiate_node_tests!($t);
1587 instantiate_status_tests!($t);
1588 };
1589}