hotshot_query_service/
data_source.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Persistent storage and sources of data consumed by APIs.
14//!
15//! The APIs provided by this query service are generic over the implementation which actually
16//! retrieves data in answer to queries. We call this implementation a _data source_. This module
17//! defines a data source and provides several pre-built implementations:
18//! * [`FileSystemDataSource`]
19//! * [`SqlDataSource`]
20//! * [`FetchingDataSource`], a generalization of the above
21//! * [`MetricsDataSource`]
22//!
23//! The user can choose which data source to use when initializing the query service.
24//!
25//! We also provide combinators for modularly adding functionality to existing data sources:
26//! * [`ExtensibleDataSource`]
27//!
28
29mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50    use std::ops::{Bound, RangeBounds};
51
52    use futures::{
53        future,
54        stream::{BoxStream, StreamExt},
55    };
56
57    use crate::{
58        availability::{BlockQueryData, Fetch, LeafQueryData},
59        node::NodeDataSource,
60        testing::{consensus::TestableDataSource, mocks::MockTypes},
61    };
62
63    /// Apply an upper bound to a range based on the currently available block height.
64    async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize>
65    where
66        D: TestableDataSource,
67        R: RangeBounds<usize>,
68    {
69        let start = range.start_bound().cloned();
70        let mut end = range.end_bound().cloned();
71        if end == Bound::Unbounded {
72            end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73        }
74        (start, end)
75    }
76
77    /// Get a stream of blocks, implicitly terminating at the current block height.
78    pub async fn block_range<R, D>(
79        ds: &D,
80        range: R,
81    ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82    where
83        D: TestableDataSource,
84        R: RangeBounds<usize> + Send + 'static,
85    {
86        ds.get_block_range(bound_range(ds, range).await)
87            .await
88            .then(Fetch::resolve)
89            .boxed()
90    }
91
92    /// Get a stream of leaves, implicitly terminating at the current block height.
93    pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94    where
95        D: TestableDataSource,
96        R: RangeBounds<usize> + Send + 'static,
97    {
98        ds.get_leaf_range(bound_range(ds, range).await)
99            .await
100            .then(Fetch::resolve)
101            .boxed()
102    }
103
104    pub async fn get_non_empty_blocks<D>(
105        ds: &D,
106    ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107    where
108        D: TestableDataSource,
109    {
110        // Ignore the genesis block (start from height 1).
111        leaf_range(ds, 1..)
112            .await
113            .zip(block_range(ds, 1..).await)
114            .filter(|(_, block)| future::ready(!block.is_empty()))
115            .collect()
116            .await
117    }
118}
119
120/// Generic tests we can instantiate for all the availability data sources.
121#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124    use std::{
125        collections::HashMap,
126        fmt::Debug,
127        ops::{Bound, RangeBounds},
128    };
129
130    use committable::Committable;
131    use futures::stream::StreamExt;
132    use hotshot_types::data::Leaf2;
133
134    use super::test_helpers::*;
135    use crate::{
136        availability::{payload_size, BlockId},
137        data_source::storage::NodeStorage,
138        node::NodeDataSource,
139        testing::{
140            consensus::{MockNetwork, TestableDataSource},
141            mocks::{mock_transaction, MockTypes, MockVersions},
142        },
143        types::HeightIndexed,
144    };
145
146    async fn validate(ds: &impl TestableDataSource) {
147        // Check the consistency of every block/leaf pair. Keep track of payloads and transactions
148        // we've seen so we can detect duplicates.
149        let mut seen_payloads = HashMap::new();
150        let mut seen_transactions = HashMap::new();
151        let mut leaves = leaf_range(ds, ..).await.enumerate();
152        while let Some((i, leaf)) = leaves.next().await {
153            assert_eq!(leaf.height(), i as u64);
154            assert_eq!(
155                leaf.hash(),
156                <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
157            );
158
159            // Check indices.
160            tracing::info!("looking up leaf {i} various ways");
161            assert_eq!(leaf, ds.get_leaf(i).await.await);
162            assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
163
164            tracing::info!("looking up block {i} various ways");
165            let block = ds.get_block(i).await.await;
166            assert_eq!(leaf.block_hash(), block.hash());
167            assert_eq!(block.height(), i as u64);
168            assert_eq!(block.hash(), block.header().commit());
169            assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
170
171            // Check indices.
172            assert_eq!(block, ds.get_block(i).await.await);
173            assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
174            // We should be able to look up the block by payload hash unless its payload is a
175            // duplicate. For duplicate payloads, this function returns the index of the first
176            // duplicate.
177            //
178            // Note: this ordering is not a strict requirement. It should hold for payloads in local
179            // storage, but we don't have a good way of enforcing it if the payload is missing, in
180            // which case we will return the first matching payload we see, which could happen in
181            // any order. We use `try_resolve` to skip this check if the object isn't available
182            // locally.
183            let ix = seen_payloads
184                .entry(block.payload_hash())
185                .or_insert(i as u64);
186            if let Ok(block) = ds
187                .get_block(BlockId::PayloadHash(block.payload_hash()))
188                .await
189                .try_resolve()
190            {
191                assert_eq!(block.height(), *ix);
192            } else {
193                tracing::warn!(
194                    "skipping block by payload index check for missing payload {:?}",
195                    block.header()
196                );
197                // At least check that _some_ block can be fetched.
198                ds.get_block(BlockId::PayloadHash(block.payload_hash()))
199                    .await
200                    .await;
201            }
202
203            // Check payload lookup.
204            tracing::info!("looking up payload {i} various ways");
205            let expected_payload = block.clone().into();
206            assert_eq!(ds.get_payload(i).await.await, expected_payload);
207            assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
208            // Similar to the above, we can't guarantee which index we will get when passively
209            // fetching this payload, so only check the index if the payload is available locally.
210            if let Ok(payload) = ds
211                .get_payload(BlockId::PayloadHash(block.payload_hash()))
212                .await
213                .try_resolve()
214            {
215                if *ix == i as u64 {
216                    assert_eq!(payload, expected_payload);
217                }
218            } else {
219                tracing::warn!(
220                    "skipping payload index check for missing payload {:?}",
221                    block.header()
222                );
223                // At least check that _some_ payload can be fetched.
224                ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
225                    .await
226                    .await;
227            }
228
229            // Look up the common VID data.
230            tracing::info!("looking up VID common {i} various ways");
231            let common = ds.get_vid_common(block.height() as usize).await.await;
232            assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
233            // Similar to the above, we can't guarantee which index we will get when passively
234            // fetching this data, so only check the index if the data is available locally.
235            if let Ok(res) = ds
236                .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
237                .await
238                .try_resolve()
239            {
240                if *ix == i as u64 {
241                    assert_eq!(res, common);
242                }
243            } else {
244                tracing::warn!(
245                    "skipping VID common index check for missing data {:?}",
246                    block.header()
247                );
248                // At least check that _some_ data can be fetched.
249                let res = ds
250                    .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
251                    .await
252                    .await;
253                assert_eq!(res.payload_hash(), common.payload_hash());
254            }
255
256            for (j, txn) in block.enumerate() {
257                tracing::info!("looking up transaction {i},{j:?}");
258
259                // We should be able to look up the transaction by hash unless it is a duplicate.
260                // For duplicate transactions, this function returns the index of the first
261                // duplicate.
262                //
263                // Similar to the above, we can't guarantee which index we will get when passively
264                // fetching this transaction, so only check the index if the transaction is
265                // available locally.
266                let ix = seen_transactions
267                    .entry(txn.commit())
268                    .or_insert((i as u64, j.clone()));
269                if let Ok(tx_data) = ds
270                    .get_block_containing_transaction(txn.commit())
271                    .await
272                    .try_resolve()
273                {
274                    assert_eq!(tx_data.transaction.transaction(), &txn);
275                    assert_eq!(tx_data.transaction.block_height(), ix.0);
276                    assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
277                    assert_eq!(tx_data.index, ix.1);
278                    assert_eq!(tx_data.block, block);
279                } else {
280                    tracing::warn!(
281                        "skipping transaction index check for missing transaction {j:?} {txn:?}"
282                    );
283                    // At least check that _some_ transaction can be fetched.
284                    ds.get_block_containing_transaction(txn.commit())
285                        .await
286                        .await;
287                }
288            }
289        }
290    }
291
292    #[test_log::test(tokio::test(flavor = "multi_thread"))]
293    pub async fn test_update<D: TestableDataSource>()
294    where
295        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
296    {
297        let mut network = MockNetwork::<D, MockVersions>::init().await;
298        let ds = network.data_source();
299
300        network.start().await;
301        assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
302
303        // Submit a few blocks and make sure each one gets reflected in the query service and
304        // preserves the consistency of the data and indices.
305        let mut blocks = ds.subscribe_blocks(0).await.enumerate();
306        for nonce in 0..3 {
307            let txn = mock_transaction(vec![nonce]);
308            network.submit_transaction(txn).await;
309
310            // Wait for the transaction to be finalized.
311            let (i, block) = loop {
312                tracing::info!("waiting for tx {nonce}");
313                let (i, block) = blocks.next().await.unwrap();
314                if !block.is_empty() {
315                    break (i, block);
316                }
317                tracing::info!("block {i} is empty");
318            };
319
320            tracing::info!("got tx {nonce} in block {i}");
321            assert_eq!(ds.get_block(i).await.await, block);
322            validate(&ds).await;
323        }
324
325        // Check that all the updates have been committed to storage, not simply held in memory: we
326        // should be able to read the same data if we connect an entirely new data source to the
327        // underlying storage.
328        {
329            tracing::info!("checking persisted storage");
330            let storage = D::connect(network.storage()).await;
331
332            // Ensure we have the same data in both data sources (if data was missing from the
333            // original it is of course allowed to be missing from persistent storage and thus from
334            // the latter).
335            let block_height = NodeDataSource::block_height(&ds).await.unwrap();
336            assert_eq!(
337                ds.get_block_range(..block_height)
338                    .await
339                    .map(|fetch| fetch.try_resolve().ok())
340                    .collect::<Vec<_>>()
341                    .await,
342                storage
343                    .get_block_range(..block_height)
344                    .await
345                    .map(|fetch| fetch.try_resolve().ok())
346                    .collect::<Vec<_>>()
347                    .await
348            );
349            assert_eq!(
350                ds.get_leaf_range(..block_height)
351                    .await
352                    .map(|fetch| fetch.try_resolve().ok())
353                    .collect::<Vec<_>>()
354                    .await,
355                storage
356                    .get_leaf_range(..block_height)
357                    .await
358                    .map(|fetch| fetch.try_resolve().ok())
359                    .collect::<Vec<_>>()
360                    .await
361            );
362        }
363    }
364
365    #[test_log::test(tokio::test(flavor = "multi_thread"))]
366    pub async fn test_range<D: TestableDataSource>()
367    where
368        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
369    {
370        let mut network = MockNetwork::<D, MockVersions>::init().await;
371        let ds = network.data_source();
372        network.start().await;
373
374        // Wait for there to be at least 3 blocks.
375        let block_height = loop {
376            let mut tx = ds.read().await.unwrap();
377            let block_height = tx.block_height().await.unwrap();
378            if block_height >= 3 {
379                break block_height as u64;
380            }
381        };
382
383        // Query for a variety of ranges testing all cases of included, excluded, and unbounded
384        // starting and ending bounds
385        do_range_test(&ds, 1..=2, 1..3).await; // (inclusive, inclusive)
386        do_range_test(&ds, 1..3, 1..3).await; // (inclusive, exclusive)
387        do_range_test(&ds, 1.., 1..block_height).await; // (inclusive, unbounded)
388        do_range_test(&ds, ..=2, 0..3).await; // (unbounded, inclusive)
389        do_range_test(&ds, ..3, 0..3).await; // (unbounded, exclusive)
390        do_range_test(&ds, .., 0..block_height).await; // (unbounded, unbounded)
391        do_range_test(&ds, ExRange(0..=2), 1..3).await; // (exclusive, inclusive)
392        do_range_test(&ds, ExRange(0..3), 1..3).await; // (exclusive, exclusive)
393        do_range_test(&ds, ExRange(0..), 1..block_height).await; // (exclusive, unbounded)
394    }
395
396    async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
397    where
398        D: TestableDataSource,
399        R: RangeBounds<usize> + Clone + Debug + Send + 'static,
400        I: IntoIterator<Item = u64>,
401    {
402        tracing::info!("testing range {range:?}");
403
404        let mut leaves = ds.get_leaf_range(range.clone()).await;
405        let mut blocks = ds.get_block_range(range.clone()).await;
406        let mut payloads = ds.get_payload_range(range.clone()).await;
407        let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
408        let mut vid_common = ds.get_vid_common_range(range.clone()).await;
409        let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
410
411        for i in expected_indices {
412            tracing::info!(i, "check entries");
413            let leaf = leaves.next().await.unwrap().await;
414            let block = blocks.next().await.unwrap().await;
415            let payload = payloads.next().await.unwrap().await;
416            let payload_meta = payloads_meta.next().await.unwrap().await;
417            let common = vid_common.next().await.unwrap().await;
418            let common_meta = vid_common_meta.next().await.unwrap().await;
419            assert_eq!(leaf.height(), i);
420            assert_eq!(block.height(), i);
421            assert_eq!(payload, ds.get_payload(i as usize).await.await);
422            assert_eq!(payload_meta, block.into());
423            assert_eq!(common, ds.get_vid_common(i as usize).await.await);
424            assert_eq!(common_meta, common.into());
425        }
426
427        if range.end_bound() == Bound::Unbounded {
428            // If the range is unbounded, the stream should continue, eventually reaching a point at
429            // which further objects are not yet available, and yielding pending futures from there.
430            loop {
431                let fetch_leaf = leaves.next().await.unwrap();
432                let fetch_block = blocks.next().await.unwrap();
433                let fetch_payload = payloads.next().await.unwrap();
434                let fetch_payload_meta = payloads_meta.next().await.unwrap();
435                let fetch_common = vid_common.next().await.unwrap();
436                let fetch_common_meta = vid_common_meta.next().await.unwrap();
437
438                if fetch_leaf.try_resolve().is_ok()
439                    && fetch_block.try_resolve().is_ok()
440                    && fetch_payload.try_resolve().is_ok()
441                    && fetch_payload_meta.try_resolve().is_ok()
442                    && fetch_common.try_resolve().is_ok()
443                    && fetch_common_meta.try_resolve().is_ok()
444                {
445                    tracing::info!("searching for end of available objects");
446                } else {
447                    break;
448                }
449            }
450        } else {
451            // If the range is bounded, it should end where expected.
452            assert!(leaves.next().await.is_none());
453            assert!(blocks.next().await.is_none());
454            assert!(payloads.next().await.is_none());
455            assert!(payloads_meta.next().await.is_none());
456            assert!(vid_common.next().await.is_none());
457            assert!(vid_common_meta.next().await.is_none());
458        }
459    }
460
461    #[test_log::test(tokio::test(flavor = "multi_thread"))]
462    pub async fn test_range_rev<D: TestableDataSource>()
463    where
464        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
465    {
466        let mut network = MockNetwork::<D, MockVersions>::init().await;
467        let ds = network.data_source();
468        network.start().await;
469
470        // Wait for there to be at least 5 blocks.
471        ds.subscribe_leaves(5).await.next().await.unwrap();
472
473        // Test inclusive, exclusive and unbounded lower bound.
474        do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
475        do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
476        do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
477    }
478
479    async fn do_range_rev_test<D>(
480        ds: &D,
481        start: Bound<usize>,
482        end: usize,
483        expected_indices: impl DoubleEndedIterator<Item = u64>,
484    ) where
485        D: TestableDataSource,
486    {
487        tracing::info!("testing range {start:?}-{end}");
488
489        let mut leaves = ds.get_leaf_range_rev(start, end).await;
490        let mut blocks = ds.get_block_range_rev(start, end).await;
491        let mut payloads = ds.get_payload_range_rev(start, end).await;
492        let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
493        let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
494        let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
495
496        for i in expected_indices.rev() {
497            tracing::info!(i, "check entries");
498            let leaf = leaves.next().await.unwrap().await;
499            let block = blocks.next().await.unwrap().await;
500            let payload = payloads.next().await.unwrap().await;
501            let payload_meta = payloads_meta.next().await.unwrap().await;
502            let common = vid_common.next().await.unwrap().await;
503            let common_meta = vid_common_meta.next().await.unwrap().await;
504            assert_eq!(leaf.height(), i);
505            assert_eq!(block.height(), i);
506            assert_eq!(payload.height(), i);
507            assert_eq!(payload_meta.height(), i);
508            assert_eq!(common, ds.get_vid_common(i as usize).await.await);
509            assert_eq!(
510                common_meta,
511                ds.get_vid_common_metadata(i as usize).await.await
512            );
513        }
514
515        // The range should end where expected.
516        assert!(leaves.next().await.is_none());
517        assert!(blocks.next().await.is_none());
518        assert!(payloads.next().await.is_none());
519        assert!(payloads_meta.next().await.is_none());
520        assert!(vid_common.next().await.is_none());
521        assert!(vid_common_meta.next().await.is_none());
522    }
523
524    // A wrapper around a range that turns the lower bound from inclusive to exclusive.
525    #[derive(Clone, Copy, Debug)]
526    struct ExRange<R>(R);
527
528    impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
529        fn start_bound(&self) -> Bound<&usize> {
530            match self.0.start_bound() {
531                Bound::Included(x) => Bound::Excluded(x),
532                Bound::Excluded(x) => Bound::Excluded(x),
533                Bound::Unbounded => Bound::Excluded(&0),
534            }
535        }
536
537        fn end_bound(&self) -> Bound<&usize> {
538            self.0.end_bound()
539        }
540    }
541}
542
543/// Generic tests we can instantiate for any data source with reliable, versioned persistent storage.
544#[cfg(any(test, feature = "testing"))]
545#[espresso_macros::generic_tests]
546pub mod persistence_tests {
547    use committable::Committable;
548    use hotshot_example_types::state_types::{TestInstanceState, TestValidatedState};
549    use hotshot_types::simple_certificate::QuorumCertificate2;
550
551    use crate::{
552        availability::{BlockQueryData, LeafQueryData},
553        data_source::{
554            storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
555            Transaction,
556        },
557        node::NodeDataSource,
558        testing::{
559            consensus::TestableDataSource,
560            mocks::{MockPayload, MockTypes},
561        },
562        types::HeightIndexed,
563        Leaf2,
564    };
565
566    #[test_log::test(tokio::test(flavor = "multi_thread"))]
567    pub async fn test_revert<D: TestableDataSource>()
568    where
569        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
570            + AvailabilityStorage<MockTypes>
571            + NodeStorage<MockTypes>,
572    {
573        use hotshot_example_types::node_types::TestVersions;
574
575        let storage = D::create(0).await;
576        let ds = D::connect(&storage).await;
577
578        // Mock up some consensus data.
579        let mut qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
580            &TestValidatedState::default(),
581            &TestInstanceState::default(),
582        )
583        .await;
584        let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
585            &TestValidatedState::default(),
586            &TestInstanceState::default(),
587        )
588        .await;
589        // Increment the block number, to distinguish this block from the genesis block, which
590        // already exists.
591        leaf.block_header_mut().block_number += 1;
592        qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
593
594        let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
595        let leaf = LeafQueryData::new(leaf, qc).unwrap();
596
597        // Insert, but do not commit, some data and check that we can read it back.
598        let mut tx = ds.write().await.unwrap();
599        tx.insert_leaf(leaf.clone()).await.unwrap();
600        tx.insert_block(block.clone()).await.unwrap();
601
602        assert_eq!(tx.block_height().await.unwrap(), 2);
603        assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
604        assert_eq!(block, tx.get_block(1.into()).await.unwrap());
605
606        // Revert the changes.
607        tx.revert().await;
608        assert_eq!(
609            NodeDataSource::<MockTypes>::block_height(&ds)
610                .await
611                .unwrap(),
612            0
613        );
614        ds.get_leaf(1).await.try_resolve().unwrap_err();
615        ds.get_block(1).await.try_resolve().unwrap_err();
616    }
617
618    #[test_log::test(tokio::test(flavor = "multi_thread"))]
619    pub async fn test_reset<D: TestableDataSource>()
620    where
621        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
622    {
623        use hotshot_example_types::node_types::TestVersions;
624
625        let storage = D::create(0).await;
626        let ds = D::connect(&storage).await;
627
628        // Mock up some consensus data.
629        let mut qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
630            &TestValidatedState::default(),
631            &TestInstanceState::default(),
632        )
633        .await;
634        let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
635            &TestValidatedState::default(),
636            &TestInstanceState::default(),
637        )
638        .await;
639        // Increment the block number, to distinguish this block from the genesis block, which
640        // already exists.
641        leaf.block_header_mut().block_number += 1;
642        qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
643
644        let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
645        let leaf = LeafQueryData::new(leaf, qc).unwrap();
646
647        // Insert some data and check that we can read it back.
648        let mut tx = ds.write().await.unwrap();
649        tx.insert_leaf(leaf.clone()).await.unwrap();
650        tx.insert_block(block.clone()).await.unwrap();
651        tx.commit().await.unwrap();
652
653        assert_eq!(
654            NodeDataSource::<MockTypes>::block_height(&ds)
655                .await
656                .unwrap(),
657            2
658        );
659        assert_eq!(leaf, ds.get_leaf(1).await.await);
660        assert_eq!(block, ds.get_block(1).await.await);
661
662        drop(ds);
663
664        // Reset and check that the changes are gone.
665        let ds = D::reset(&storage).await;
666        assert_eq!(
667            NodeDataSource::<MockTypes>::block_height(&ds)
668                .await
669                .unwrap(),
670            0
671        );
672        ds.get_leaf(1).await.try_resolve().unwrap_err();
673        ds.get_block(1).await.try_resolve().unwrap_err();
674    }
675
676    #[test_log::test(tokio::test(flavor = "multi_thread"))]
677    pub async fn test_drop_tx<D: TestableDataSource>()
678    where
679        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
680            + AvailabilityStorage<MockTypes>
681            + NodeStorage<MockTypes>,
682        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
683    {
684        use hotshot_example_types::node_types::TestVersions;
685
686        let storage = D::create(0).await;
687        let ds = D::connect(&storage).await;
688
689        // Mock up some consensus data.
690        let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
691            &TestValidatedState::default(),
692            &TestInstanceState::default(),
693        )
694        .await;
695        let mut mock_leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
696            &TestValidatedState::default(),
697            &TestInstanceState::default(),
698        )
699        .await;
700        // Increment the block number, to distinguish this block from the genesis block, which
701        // already exists.
702        mock_leaf.block_header_mut().block_number += 1;
703        mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
704
705        let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
706        let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
707
708        // Insert, but do not commit, some data and check that we can read it back.
709        tracing::info!("write");
710        let mut tx = ds.write().await.unwrap();
711        tx.insert_leaf(leaf.clone()).await.unwrap();
712        tx.insert_block(block.clone()).await.unwrap();
713
714        assert_eq!(tx.block_height().await.unwrap(), 2);
715        assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
716        assert_eq!(block, tx.get_block(1.into()).await.unwrap());
717
718        // Drop the transaction, causing a revert.
719        drop(tx);
720
721        // Open a new transaction and check that the changes are reverted.
722        tracing::info!("read");
723        let mut tx = ds.read().await.unwrap();
724        assert_eq!(tx.block_height().await.unwrap(), 0);
725        drop(tx);
726
727        // Get a mutable transaction again, insert different data.
728        mock_leaf.block_header_mut().block_number += 1;
729        mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
730        let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
731        let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
732
733        tracing::info!("write again");
734        let mut tx = ds.write().await.unwrap();
735        tx.insert_leaf(leaf.clone()).await.unwrap();
736        tx.insert_block(block.clone()).await.unwrap();
737        tx.commit().await.unwrap();
738
739        // Read the data back. We should have _only_ the data that was written in the final
740        // transaction.
741        tracing::info!("read again");
742        let height = leaf.height() as usize;
743        assert_eq!(
744            NodeDataSource::<MockTypes>::block_height(&ds)
745                .await
746                .unwrap(),
747            height + 1
748        );
749        assert_eq!(leaf, ds.get_leaf(height).await.await);
750        assert_eq!(block, ds.get_block(height).await.await);
751        ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
752        ds.get_block(height - 1).await.try_resolve().unwrap_err();
753    }
754}
755
756/// Generic tests we can instantiate for all the node data sources.
757#[cfg(any(test, feature = "testing"))]
758#[espresso_macros::generic_tests]
759pub mod node_tests {
760    use std::time::Duration;
761
762    use committable::Committable;
763    use futures::{future::join_all, stream::StreamExt};
764    use hotshot::traits::BlockPayload;
765    use hotshot_example_types::{
766        block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
767        node_types::TestTypes,
768        state_types::{TestInstanceState, TestValidatedState},
769    };
770    use hotshot_types::{
771        data::{vid_commitment, VidCommitment, VidShare},
772        traits::{
773            block_contents::{BlockHeader, EncodeBytes},
774            node_implementation::Versions,
775        },
776        vid::advz::{advz_scheme, ADVZScheme},
777    };
778    use jf_vid::VidScheme;
779    use vbs::version::StaticVersionType;
780
781    use crate::{
782        availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
783        data_source::{
784            storage::{NodeStorage, UpdateAvailabilityStorage},
785            update::Transaction,
786        },
787        node::{BlockId, NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
788        testing::{
789            consensus::{MockNetwork, TestableDataSource},
790            mocks::{mock_transaction, MockPayload, MockTypes, MockVersions},
791            sleep,
792        },
793        types::HeightIndexed,
794        Header, VidCommon,
795    };
796
797    fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
798        <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
799    }
800
801    #[test_log::test(tokio::test(flavor = "multi_thread"))]
802    pub async fn test_sync_status<D: TestableDataSource>()
803    where
804        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
805    {
806        use hotshot_example_types::node_types::TestVersions;
807
808        let storage = D::create(0).await;
809        let ds = D::connect(&storage).await;
810
811        // Set up a mock VID scheme to use for generating test data.
812        let mut vid = advz_scheme(2);
813
814        // Generate some mock leaves and blocks to insert.
815        let mut leaves = vec![
816            LeafQueryData::<MockTypes>::genesis::<TestVersions>(
817                &TestValidatedState::default(),
818                &TestInstanceState::default(),
819            )
820            .await,
821        ];
822        let mut blocks = vec![
823            BlockQueryData::<MockTypes>::genesis::<TestVersions>(
824                &TestValidatedState::default(),
825                &TestInstanceState::default(),
826            )
827            .await,
828        ];
829        for i in 0..2 {
830            let mut leaf = leaves[i].clone();
831            leaf.leaf.block_header_mut().block_number += 1;
832            leaves.push(leaf);
833
834            let mut block = blocks[i].clone();
835            block.header.block_number += 1;
836            blocks.push(block);
837        }
838        // Generate mock VID data. We reuse the same (empty) payload for each block, but with
839        // different metadata.
840        let disperse = vid.disperse([]).unwrap();
841        let vid = leaves
842            .iter()
843            .map(|leaf| {
844                (
845                    VidCommonQueryData::new(
846                        leaf.header().clone(),
847                        VidCommon::V0(disperse.common.clone()),
848                    ),
849                    disperse.shares[0].clone(),
850                )
851            })
852            .collect::<Vec<_>>();
853
854        // At first, the node is fully synced.
855        assert!(ds.sync_status().await.unwrap().is_fully_synced());
856
857        // Insert a leaf without the corresponding block or VID info, make sure we detect that the
858        // block and VID info are missing.
859        ds.append(leaves[0].clone().into()).await.unwrap();
860        assert_eq!(
861            ds.sync_status().await.unwrap(),
862            SyncStatus {
863                missing_blocks: 1,
864                missing_vid_common: 1,
865                missing_vid_shares: 1,
866                missing_leaves: 0,
867                pruned_height: None,
868            }
869        );
870
871        // Insert a leaf whose height is not the successor of the previous leaf. We should now
872        // detect that the leaf in between is missing (along with all _three_ corresponding blocks).
873        ds.append(leaves[2].clone().into()).await.unwrap();
874        assert_eq!(
875            ds.sync_status().await.unwrap(),
876            SyncStatus {
877                missing_blocks: 3,
878                missing_vid_common: 3,
879                missing_vid_shares: 3,
880                missing_leaves: 1,
881                pruned_height: None,
882            }
883        );
884
885        // Insert VID common without a corresponding share.
886        {
887            let mut tx = ds.write().await.unwrap();
888            tx.insert_vid(vid[0].0.clone(), None).await.unwrap();
889            tx.commit().await.unwrap();
890        }
891        assert_eq!(
892            ds.sync_status().await.unwrap(),
893            SyncStatus {
894                missing_blocks: 3,
895                missing_vid_common: 2,
896                missing_vid_shares: 3,
897                missing_leaves: 1,
898                pruned_height: None,
899            }
900        );
901
902        // Rectify the missing data.
903        {
904            let mut tx = ds.write().await.unwrap();
905            tx.insert_block(blocks[0].clone()).await.unwrap();
906            tx.insert_vid(vid[0].0.clone(), Some(VidShare::V0(vid[0].1.clone())))
907                .await
908                .unwrap();
909            tx.insert_leaf(leaves[1].clone()).await.unwrap();
910            tx.insert_block(blocks[1].clone()).await.unwrap();
911            tx.insert_vid(vid[1].0.clone(), Some(VidShare::V0(vid[1].1.clone())))
912                .await
913                .unwrap();
914            tx.insert_block(blocks[2].clone()).await.unwrap();
915            tx.insert_vid(vid[2].0.clone(), Some(VidShare::V0(vid[2].1.clone())))
916                .await
917                .unwrap();
918            tx.commit().await.unwrap();
919        }
920
921        // Some data sources (e.g. file system) don't support out-of-order insertion of missing
922        // data. These would have just ignored the insertion of `vid[0]` (the share) and
923        // `leaves[1]`. Detect if this is the case; then we allow 1 missing leaf and 1 missing VID
924        // share.
925        let expected_missing = if ds.get_leaf(1).await.try_resolve().is_err() {
926            tracing::warn!(
927                "data source does not support out-of-order filling, allowing one missing leaf and \
928                 VID share"
929            );
930            1
931        } else {
932            0
933        };
934        let expected_sync_status = SyncStatus {
935            missing_blocks: 0,
936            missing_leaves: expected_missing,
937            missing_vid_common: 0,
938            missing_vid_shares: expected_missing,
939            pruned_height: None,
940        };
941        assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
942
943        // If we re-insert one of the VID entries without a share, it should not overwrite the share
944        // that we already have; that is, `insert_vid` should be monotonic.
945        {
946            let mut tx = ds.write().await.unwrap();
947            tx.insert_vid(vid[0].0.clone(), None).await.unwrap();
948            tx.commit().await.unwrap();
949        }
950        assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
951    }
952
953    #[test_log::test(tokio::test(flavor = "multi_thread"))]
954    pub async fn test_counters<D: TestableDataSource>() {
955        use hotshot_example_types::node_types::TestVersions;
956
957        let storage = D::create(0).await;
958        let ds = D::connect(&storage).await;
959
960        assert_eq!(ds.count_transactions().await.unwrap(), 0);
961        assert_eq!(ds.payload_size().await.unwrap(), 0);
962
963        // Insert some transactions.
964        let mut total_transactions = 0;
965        let mut total_size = 0;
966        'outer: for i in [0, 1, 2] {
967            // Using `i % 2` as the transaction data ensures we insert a duplicate transaction
968            // (since we insert more than 2 transactions total). The query service should still
969            // count these as separate transactions and should include both duplicates when
970            // computing the total size.
971            let (payload, metadata) =
972                <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
973                    [mock_transaction(vec![i as u8 % 2])],
974                    &TestValidatedState::default(),
975                    &TestInstanceState::default(),
976                )
977                .await
978                .unwrap();
979            let encoded = payload.encode();
980            let payload_commitment = vid_commitment::<TestVersions>(
981                &encoded,
982                &metadata.encode(),
983                1,
984                <TestVersions as Versions>::Base::VERSION,
985            );
986            let header = TestBlockHeader {
987                block_number: i,
988                payload_commitment,
989                timestamp: i,
990                timestamp_millis: i * 1_000,
991                builder_commitment:
992                    <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
993                        &payload, &metadata,
994                    ),
995                metadata: TestMetadata {
996                    num_transactions: 7, // arbitrary
997                },
998                random: 1, // arbitrary
999            };
1000
1001            let mut leaf = LeafQueryData::<MockTypes>::genesis::<TestVersions>(
1002                &TestValidatedState::default(),
1003                &TestInstanceState::default(),
1004            )
1005            .await;
1006            *leaf.leaf.block_header_mut() = header.clone();
1007            let block = BlockQueryData::new(header, payload);
1008            ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None, None))
1009                .await
1010                .unwrap();
1011            assert_eq!(
1012                NodeDataSource::<MockTypes>::block_height(&ds)
1013                    .await
1014                    .unwrap(),
1015                (i + 1) as usize,
1016            );
1017
1018            total_transactions += 1;
1019            total_size += encoded.len();
1020
1021            // Allow some time for the aggregator to update.
1022            for retry in 0..5 {
1023                let ds_transactions = ds.count_transactions().await.unwrap();
1024                let ds_payload_size = ds.payload_size().await.unwrap();
1025                if ds_transactions != total_transactions || ds_payload_size != total_size {
1026                    tracing::info!(
1027                        i,
1028                        retry,
1029                        total_transactions,
1030                        ds_transactions,
1031                        total_size,
1032                        ds_payload_size,
1033                        "waiting for statistics to update"
1034                    );
1035                    sleep(Duration::from_secs(1)).await;
1036                } else {
1037                    continue 'outer;
1038                }
1039            }
1040            panic!("counters did not update in time");
1041        }
1042    }
1043
1044    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1045    pub async fn test_vid_shares<D: TestableDataSource>()
1046    where
1047        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1048    {
1049        let mut network = MockNetwork::<D, MockVersions>::init().await;
1050        let ds = network.data_source();
1051
1052        network.start().await;
1053
1054        // Check VID shares for a few blocks.
1055        let mut leaves = ds.subscribe_leaves(0).await.take(3);
1056        while let Some(leaf) = leaves.next().await {
1057            tracing::info!("got leaf {}", leaf.height());
1058            let mut tx = ds.read().await.unwrap();
1059            let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1060            assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1061            assert_eq!(
1062                share,
1063                tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1064                    .await
1065                    .unwrap()
1066            );
1067        }
1068    }
1069
1070    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1071    pub async fn test_vid_monotonicity<D: TestableDataSource>()
1072    where
1073        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1074        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1075    {
1076        use hotshot_example_types::node_types::TestVersions;
1077
1078        let storage = D::create(0).await;
1079        let ds = D::connect(&storage).await;
1080
1081        // Generate some test VID data.
1082        let mut vid = advz_scheme(2);
1083        let disperse = vid.disperse([]).unwrap();
1084
1085        // Insert test data with VID common and a share.
1086        let leaf = LeafQueryData::<MockTypes>::genesis::<TestVersions>(
1087            &TestValidatedState::default(),
1088            &TestInstanceState::default(),
1089        )
1090        .await;
1091        let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1092        ds.append(BlockInfo::new(
1093            leaf,
1094            None,
1095            Some(common.clone()),
1096            Some(VidShare::V0(disperse.shares[0].clone())),
1097            None,
1098        ))
1099        .await
1100        .unwrap();
1101
1102        {
1103            assert_eq!(ds.get_vid_common(0).await.await, common);
1104            assert_eq!(
1105                ds.vid_share(0).await.unwrap(),
1106                VidShare::V0(disperse.shares[0].clone())
1107            );
1108        }
1109
1110        // Re-insert the common data, without a share. This should not overwrite the share we
1111        // already have.
1112        {
1113            let mut tx = ds.write().await.unwrap();
1114            tx.insert_vid(common.clone(), None).await.unwrap();
1115            tx.commit().await.unwrap();
1116        }
1117        {
1118            assert_eq!(ds.get_vid_common(0).await.await, common);
1119            assert_eq!(
1120                ds.vid_share(0).await.unwrap(),
1121                VidShare::V0(disperse.shares[0].clone())
1122            );
1123        }
1124    }
1125
1126    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1127    pub async fn test_vid_recovery<D: TestableDataSource>()
1128    where
1129        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1130    {
1131        let mut network = MockNetwork::<D, MockVersions>::init().await;
1132        let ds = network.data_source();
1133
1134        network.start().await;
1135
1136        // Submit a transaction so we can try to recover a non-empty block.
1137        let mut blocks = ds.subscribe_blocks(0).await;
1138        let txn = mock_transaction(vec![1, 2, 3]);
1139        network.submit_transaction(txn.clone()).await;
1140
1141        // Wait for the transaction to be finalized.
1142        let block = loop {
1143            tracing::info!("waiting for transaction");
1144            let block = blocks.next().await.unwrap();
1145            if !block.is_empty() {
1146                tracing::info!(height = block.height(), "transaction sequenced");
1147                break block;
1148            }
1149            tracing::info!(height = block.height(), "empty block");
1150        };
1151        let height = block.height() as usize;
1152        let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1153            commit
1154        } else {
1155            panic!("expect ADVZ commitment")
1156        };
1157
1158        // Set up a test VID scheme.
1159        let vid = advz_scheme(network.num_nodes());
1160
1161        // Get VID common data and verify it.
1162        tracing::info!("fetching common data");
1163        let common = ds.get_vid_common(height).await.await;
1164        let VidCommon::V0(common) = &common.common() else {
1165            panic!("expect ADVZ common");
1166        };
1167        ADVZScheme::is_consistent(&commit, common).unwrap();
1168
1169        // Collect shares from each node.
1170        tracing::info!("fetching shares");
1171        let network = &network;
1172        let vid = &vid;
1173        let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1174            let ds = network.data_source_index(i);
1175
1176            // Wait until the node has processed up to the desired block; since we have thus far
1177            // only interacted with node 0, it is possible other nodes are slightly behind.
1178            let mut leaves = ds.subscribe_leaves(height).await;
1179            let leaf = leaves.next().await.unwrap();
1180            assert_eq!(leaf.height(), height as u64);
1181            assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1182
1183            let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1184                share
1185            } else {
1186                panic!("expect ADVZ share")
1187            };
1188            vid.verify_share(&share, common, &commit).unwrap().unwrap();
1189            share
1190        }))
1191        .await;
1192
1193        // Recover payload.
1194        tracing::info!("recovering payload");
1195        let bytes = vid.recover_payload(&shares, common).unwrap();
1196        let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1197            &bytes,
1198            &TestMetadata {
1199                num_transactions: 7, // arbitrary
1200            },
1201        );
1202        assert_eq!(recovered, *block.payload());
1203        assert_eq!(recovered.transactions, vec![txn]);
1204    }
1205
1206    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1207    pub async fn test_timestamp_window<D: TestableDataSource>() {
1208        let mut network = MockNetwork::<D, MockVersions>::init().await;
1209        let ds = network.data_source();
1210
1211        network.start().await;
1212
1213        // Wait for blocks with at least three different timestamps to be sequenced. This lets us
1214        // test all the edge cases.
1215        let mut leaves = ds.subscribe_leaves(0).await;
1216        // `test_blocks` is a list of lists of headers with the same timestamp. The flattened list
1217        // of headers is contiguous.
1218        let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1219        while test_blocks.len() < 3 {
1220            // Wait for the next block to be sequenced.
1221            let leaf = leaves.next().await.unwrap();
1222            let header = leaf.header().clone();
1223            if let Some(last_timestamp) = test_blocks.last_mut() {
1224                if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1225                    == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1226                {
1227                    last_timestamp.push(header);
1228                } else {
1229                    test_blocks.push(vec![header]);
1230                }
1231            } else {
1232                test_blocks.push(vec![header]);
1233            }
1234        }
1235        tracing::info!("blocks for testing: {test_blocks:#?}");
1236
1237        // Define invariants that every response should satisfy.
1238        let check_invariants =
1239            |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1240                let mut prev = res.prev.as_ref();
1241                if let Some(prev) = prev {
1242                    if check_prev {
1243                        assert!(block_header_timestamp(prev) < start);
1244                    }
1245                } else {
1246                    // `prev` can only be `None` if the first block in the window is the genesis
1247                    // block.
1248                    assert_eq!(res.from().unwrap(), 0);
1249                };
1250                for header in &res.window {
1251                    assert!(start <= block_header_timestamp(header));
1252                    assert!(block_header_timestamp(header) < end);
1253                    if let Some(prev) = prev {
1254                        assert!(
1255                            <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1256                                <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1257                        );
1258                    }
1259                    prev = Some(header);
1260                }
1261                if let Some(next) = &res.next {
1262                    assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1263                    // If there is a `next`, there must be at least one previous block (either `prev`
1264                    // itself or the last block if the window is nonempty), so we can `unwrap` here.
1265                    assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1266                }
1267            };
1268
1269        let get_window = |start, end| {
1270            let ds = ds.clone();
1271            async move {
1272                let window = ds
1273                    .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1274                    .await
1275                    .unwrap();
1276                tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1277                check_invariants(&window, start, end, true);
1278                window
1279            }
1280        };
1281
1282        // Case 0: happy path. All blocks are available, including prev and next.
1283        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1284        let end = start + 1;
1285        let res = get_window(start, end).await;
1286        assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1287        assert_eq!(res.window, test_blocks[1]);
1288        assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1289
1290        // Case 1: no `prev`, start of window is before genesis.
1291        let start = 0;
1292        let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1293        let res = get_window(start, end).await;
1294        assert_eq!(res.prev, None);
1295        assert_eq!(res.window, test_blocks[0]);
1296        assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1297
1298        // Case 2: no `next`, end of window is after the most recently sequenced block.
1299        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1300        let end = i64::MAX as u64;
1301        let res = get_window(start, end).await;
1302        assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1303        // There may have been more blocks sequenced since we grabbed `test_blocks`, so just check
1304        // that the prefix of the window is correct.
1305        assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1306        assert_eq!(res.next, None);
1307        // Fetch more blocks using the `from` form of the endpoint. Start from the last block we had
1308        // previously (ie fetch a slightly overlapping window) to ensure there is at least one block
1309        // in the new window.
1310        let from = test_blocks.iter().flatten().count() - 1;
1311        let more = ds
1312            .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1313            .await
1314            .unwrap();
1315        check_invariants(&more, start, end, false);
1316        assert_eq!(
1317            more.prev.as_ref().unwrap(),
1318            test_blocks.iter().flatten().nth(from - 1).unwrap()
1319        );
1320        assert_eq!(
1321            more.window[..res.window.len() - test_blocks[2].len() + 1],
1322            res.window[test_blocks[2].len() - 1..]
1323        );
1324        assert_eq!(res.next, None);
1325        // We should get the same result whether we query by block height or hash.
1326        let more2 = ds
1327            .get_header_window(
1328                test_blocks[2].last().unwrap().commit(),
1329                end,
1330                i64::MAX as usize,
1331            )
1332            .await
1333            .unwrap();
1334        check_invariants(&more2, start, end, false);
1335        assert_eq!(more2.from().unwrap(), more.from().unwrap());
1336        assert_eq!(more2.prev, more.prev);
1337        assert_eq!(more2.next, more.next);
1338        assert_eq!(more2.window[..more.window.len()], more.window);
1339
1340        // Case 3: the window is empty.
1341        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1342        let end = start;
1343        let res = get_window(start, end).await;
1344        assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1345        assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1346        assert_eq!(res.window, vec![]);
1347
1348        // Case 4: no relevant blocks are available yet.
1349        ds.get_header_window(
1350            WindowStart::Time((i64::MAX - 1) as u64),
1351            i64::MAX as u64,
1352            i64::MAX as usize,
1353        )
1354        .await
1355        .unwrap_err();
1356
1357        // Case 5: limits.
1358        let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1359            .into_iter()
1360            .flatten()
1361            .collect::<Vec<_>>();
1362        // Make a query that would return everything, but gets limited.
1363        let start = block_header_timestamp(&blocks[0]);
1364        let end = block_header_timestamp(&test_blocks[2][0]);
1365        let res = ds
1366            .get_header_window(WindowStart::Time(start), end, 1)
1367            .await
1368            .unwrap();
1369        assert_eq!(res.prev, None);
1370        assert_eq!(res.window, [blocks[0].clone()]);
1371        assert_eq!(res.next, None);
1372        // Query the next page of results, get limited again.
1373        let res = ds
1374            .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1375            .await
1376            .unwrap();
1377        assert_eq!(res.window, [blocks[1].clone()]);
1378        assert_eq!(res.next, None);
1379        // Get the rest of the results.
1380        let res = ds
1381            .get_header_window(
1382                WindowStart::Height(blocks[1].height() + 1),
1383                end,
1384                blocks.len() - 1,
1385            )
1386            .await
1387            .unwrap();
1388        assert_eq!(res.window, blocks[2..].to_vec());
1389        assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1390    }
1391}
1392
1393/// Generic tests we can instantiate for all the status data sources.
1394#[cfg(any(test, feature = "testing"))]
1395#[espresso_macros::generic_tests]
1396pub mod status_tests {
1397    use std::time::Duration;
1398
1399    use crate::{
1400        status::StatusDataSource,
1401        testing::{
1402            consensus::{DataSourceLifeCycle, MockNetwork},
1403            mocks::{mock_transaction, MockVersions},
1404            sleep,
1405        },
1406    };
1407
1408    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1409    pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1410        let mut network = MockNetwork::<D, MockVersions>::init().await;
1411        let ds = network.data_source();
1412
1413        {
1414            // Check that block height is initially zero.
1415            assert_eq!(ds.block_height().await.unwrap(), 0);
1416            // With consensus paused, check that the success rate returns NAN (since the block
1417            // height, the numerator, is 0, and the view number, the denominator, is 0).
1418            assert!(ds.success_rate().await.unwrap().is_nan());
1419            // Since there is no block produced, "last_decided_time" metric is 0.
1420            // Therefore, the elapsed time since the last block should be close to the time elapsed since the Unix epoch.
1421            assert!(
1422                (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1423                    - chrono::Utc::now().timestamp())
1424                .abs()
1425                    <= 1,
1426                "time elapsed since last_decided_time is not within 1s"
1427            );
1428        }
1429
1430        // Submit a transaction
1431        let txn = mock_transaction(vec![1, 2, 3]);
1432        network.submit_transaction(txn.clone()).await;
1433
1434        // Start consensus and wait for the transaction to be finalized.
1435        network.start().await;
1436
1437        // Now wait for at least one non-genesis block to be finalized.
1438        loop {
1439            let height = ds.block_height().await.unwrap();
1440            if height > 1 {
1441                break;
1442            }
1443            tracing::info!(height, "waiting for a block to be finalized");
1444            sleep(Duration::from_secs(1)).await;
1445        }
1446
1447        {
1448            // Check that the success rate has been updated. Note that we can only check if success
1449            // rate is positive. We don't know exactly what it is because we can't know how many
1450            // views have elapsed without race conditions.
1451            let success_rate = ds.success_rate().await.unwrap();
1452            assert!(success_rate.is_finite(), "{success_rate}");
1453            assert!(success_rate > 0.0, "{success_rate}");
1454        }
1455
1456        {
1457            // Shutting down the consensus to halt block production
1458            // Introducing a delay of 3 seconds to ensure that elapsed time since last block is atleast 3seconds
1459            network.shut_down().await;
1460            sleep(Duration::from_secs(3)).await;
1461            // Asserting that the elapsed time since the last block is at least 3 seconds
1462            assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1463        }
1464    }
1465}
1466
1467#[macro_export]
1468macro_rules! instantiate_data_source_tests {
1469    ($t:ty) => {
1470        use $crate::data_source::{
1471            availability_tests, node_tests, persistence_tests, status_tests,
1472        };
1473
1474        instantiate_availability_tests!($t);
1475        instantiate_persistence_tests!($t);
1476        instantiate_node_tests!($t);
1477        instantiate_status_tests!($t);
1478    };
1479}