hotshot_query_service/
data_source.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Persistent storage and sources of data consumed by APIs.
14//!
15//! The APIs provided by this query service are generic over the implementation which actually
16//! retrieves data in answer to queries. We call this implementation a _data source_. This module
17//! defines a data source and provides several pre-built implementations:
18//! * [`FileSystemDataSource`]
19//! * [`SqlDataSource`]
20//! * [`FetchingDataSource`], a generalization of the above
21//! * [`MetricsDataSource`]
22//!
23//! The user can choose which data source to use when initializing the query service.
24//!
25//! We also provide combinators for modularly adding functionality to existing data sources:
26//! * [`ExtensibleDataSource`]
27//!
28
29mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50    use std::ops::{Bound, RangeBounds};
51
52    use futures::{
53        future,
54        stream::{BoxStream, StreamExt},
55    };
56
57    use crate::{
58        availability::{BlockQueryData, Fetch, LeafQueryData},
59        node::NodeDataSource,
60        testing::{consensus::TestableDataSource, mocks::MockTypes},
61    };
62
63    /// Apply an upper bound to a range based on the currently available block height.
64    async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize>
65    where
66        D: TestableDataSource,
67        R: RangeBounds<usize>,
68    {
69        let start = range.start_bound().cloned();
70        let mut end = range.end_bound().cloned();
71        if end == Bound::Unbounded {
72            end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73        }
74        (start, end)
75    }
76
77    /// Get a stream of blocks, implicitly terminating at the current block height.
78    pub async fn block_range<R, D>(
79        ds: &D,
80        range: R,
81    ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82    where
83        D: TestableDataSource,
84        R: RangeBounds<usize> + Send + 'static,
85    {
86        ds.get_block_range(bound_range(ds, range).await)
87            .await
88            .then(Fetch::resolve)
89            .boxed()
90    }
91
92    /// Get a stream of leaves, implicitly terminating at the current block height.
93    pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94    where
95        D: TestableDataSource,
96        R: RangeBounds<usize> + Send + 'static,
97    {
98        ds.get_leaf_range(bound_range(ds, range).await)
99            .await
100            .then(Fetch::resolve)
101            .boxed()
102    }
103
104    pub async fn get_non_empty_blocks<D>(
105        ds: &D,
106    ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107    where
108        D: TestableDataSource,
109    {
110        // Ignore the genesis block (start from height 1).
111        leaf_range(ds, 1..)
112            .await
113            .zip(block_range(ds, 1..).await)
114            .filter(|(_, block)| future::ready(!block.is_empty()))
115            .collect()
116            .await
117    }
118}
119
120/// Generic tests we can instantiate for all the availability data sources.
121#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124    use std::{
125        collections::HashMap,
126        fmt::Debug,
127        ops::{Bound, RangeBounds},
128    };
129
130    use committable::Committable;
131    use futures::stream::StreamExt;
132    use hotshot_types::{data::Leaf2, vote::HasViewNumber};
133
134    use super::test_helpers::*;
135    use crate::{
136        availability::{payload_size, BlockId},
137        data_source::storage::{AvailabilityStorage, NodeStorage},
138        node::NodeDataSource,
139        testing::{
140            consensus::{MockNetwork, TestableDataSource},
141            mocks::{mock_transaction, MockTypes, MockVersions},
142        },
143        types::HeightIndexed,
144    };
145
146    async fn validate<D: TestableDataSource>(ds: &D)
147    where
148        for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
149    {
150        // Check the consistency of every block/leaf pair. Keep track of payloads and transactions
151        // we've seen so we can detect duplicates.
152        let mut seen_payloads = HashMap::new();
153        let mut seen_transactions = HashMap::new();
154        let mut leaves = leaf_range(ds, ..).await.enumerate();
155        while let Some((i, leaf)) = leaves.next().await {
156            assert_eq!(leaf.height(), i as u64);
157            assert_eq!(
158                leaf.hash(),
159                <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
160            );
161
162            // Check indices.
163            tracing::info!("looking up leaf {i} various ways");
164            assert_eq!(leaf, ds.get_leaf(i).await.await);
165            assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
166
167            tracing::info!("looking up block {i} various ways");
168            let block = ds.get_block(i).await.await;
169            assert_eq!(leaf.block_hash(), block.hash());
170            assert_eq!(block.height(), i as u64);
171            assert_eq!(block.hash(), block.header().commit());
172            assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
173
174            // Check indices.
175            assert_eq!(block, ds.get_block(i).await.await);
176            assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
177            // We should be able to look up the block by payload hash unless its payload is a
178            // duplicate. For duplicate payloads, this function returns the index of the first
179            // duplicate.
180            //
181            // Note: this ordering is not a strict requirement. It should hold for payloads in local
182            // storage, but we don't have a good way of enforcing it if the payload is missing, in
183            // which case we will return the first matching payload we see, which could happen in
184            // any order. We use `try_resolve` to skip this check if the object isn't available
185            // locally.
186            let ix = seen_payloads
187                .entry(block.payload_hash())
188                .or_insert(i as u64);
189            if let Ok(block) = ds
190                .get_block(BlockId::PayloadHash(block.payload_hash()))
191                .await
192                .try_resolve()
193            {
194                assert_eq!(block.height(), *ix);
195            } else {
196                tracing::warn!(
197                    "skipping block by payload index check for missing payload {:?}",
198                    block.header()
199                );
200                // At least check that _some_ block can be fetched.
201                ds.get_block(BlockId::PayloadHash(block.payload_hash()))
202                    .await
203                    .await;
204            }
205
206            // Check payload lookup.
207            tracing::info!("looking up payload {i} various ways");
208            let expected_payload = block.clone().into();
209            assert_eq!(ds.get_payload(i).await.await, expected_payload);
210            assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
211            // Similar to the above, we can't guarantee which index we will get when passively
212            // fetching this payload, so only check the index if the payload is available locally.
213            if let Ok(payload) = ds
214                .get_payload(BlockId::PayloadHash(block.payload_hash()))
215                .await
216                .try_resolve()
217            {
218                if *ix == i as u64 {
219                    assert_eq!(payload, expected_payload);
220                }
221            } else {
222                tracing::warn!(
223                    "skipping payload index check for missing payload {:?}",
224                    block.header()
225                );
226                // At least check that _some_ payload can be fetched.
227                ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
228                    .await
229                    .await;
230            }
231
232            // Look up the common VID data.
233            tracing::info!("looking up VID common {i} various ways");
234            let common = ds.get_vid_common(block.height() as usize).await.await;
235            assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
236            // Similar to the above, we can't guarantee which index we will get when passively
237            // fetching this data, so only check the index if the data is available locally.
238            if let Ok(res) = ds
239                .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
240                .await
241                .try_resolve()
242            {
243                if *ix == i as u64 {
244                    assert_eq!(res, common);
245                }
246            } else {
247                tracing::warn!(
248                    "skipping VID common index check for missing data {:?}",
249                    block.header()
250                );
251                // At least check that _some_ data can be fetched.
252                let res = ds
253                    .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
254                    .await
255                    .await;
256                assert_eq!(res.payload_hash(), common.payload_hash());
257            }
258
259            for (j, txn) in block.enumerate() {
260                tracing::info!("looking up transaction {i},{j:?}");
261
262                // We should be able to look up the transaction by hash unless it is a duplicate.
263                // For duplicate transactions, this function returns the index of the first
264                // duplicate.
265                //
266                // Similar to the above, we can't guarantee which index we will get when passively
267                // fetching this transaction, so only check the index if the transaction is
268                // available locally.
269                let ix = seen_transactions
270                    .entry(txn.commit())
271                    .or_insert((i as u64, j.clone()));
272                if let Ok(tx_data) = ds
273                    .get_block_containing_transaction(txn.commit())
274                    .await
275                    .try_resolve()
276                {
277                    assert_eq!(tx_data.transaction.transaction(), &txn);
278                    assert_eq!(tx_data.transaction.block_height(), ix.0);
279                    assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
280                    assert_eq!(tx_data.index, ix.1);
281                    assert_eq!(tx_data.block, block);
282                } else {
283                    tracing::warn!(
284                        "skipping transaction index check for missing transaction {j:?} {txn:?}"
285                    );
286                    // At least check that _some_ transaction can be fetched.
287                    ds.get_block_containing_transaction(txn.commit())
288                        .await
289                        .await;
290                }
291            }
292        }
293
294        // Validate consistency of latest QC chain (only available after epoch upgrade).
295        {
296            let mut tx = ds.read().await.unwrap();
297            let block_height = NodeStorage::block_height(&mut tx).await.unwrap();
298            let last_leaf = tx.get_leaf((block_height - 1).into()).await.unwrap();
299
300            if last_leaf.qc().data.epoch.is_some() {
301                tracing::info!(block_height, "checking QC chain");
302                let qc_chain = tx.latest_qc_chain().await.unwrap().unwrap();
303
304                assert_eq!(last_leaf.height(), (block_height - 1) as u64);
305                assert_eq!(qc_chain[0].view_number(), last_leaf.leaf().view_number());
306                assert_eq!(qc_chain[0].leaf_commit(), last_leaf.hash());
307                assert_eq!(qc_chain[1].view_number(), qc_chain[0].view_number() + 1);
308            }
309        }
310    }
311
312    #[test_log::test(tokio::test(flavor = "multi_thread"))]
313    pub async fn test_update<D: TestableDataSource>()
314    where
315        for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
316    {
317        let mut network = MockNetwork::<D, MockVersions>::init().await;
318        let ds = network.data_source();
319
320        network.start().await;
321        assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
322
323        // Submit a few blocks and make sure each one gets reflected in the query service and
324        // preserves the consistency of the data and indices.
325        let mut blocks = ds.subscribe_blocks(0).await.enumerate();
326        for nonce in 0..3 {
327            let txn = mock_transaction(vec![nonce]);
328            network.submit_transaction(txn).await;
329
330            // Wait for the transaction to be finalized.
331            let (i, block) = loop {
332                tracing::info!("waiting for tx {nonce}");
333                let (i, block) = blocks.next().await.unwrap();
334                if !block.is_empty() {
335                    break (i, block);
336                }
337                tracing::info!("block {i} is empty");
338            };
339
340            tracing::info!("got tx {nonce} in block {i}");
341            assert_eq!(ds.get_block(i).await.await, block);
342            validate(&ds).await;
343        }
344
345        // Check that all the updates have been committed to storage, not simply held in memory: we
346        // should be able to read the same data if we connect an entirely new data source to the
347        // underlying storage.
348        {
349            tracing::info!("checking persisted storage");
350            let storage = D::connect(network.storage()).await;
351
352            // Ensure we have the same data in both data sources (if data was missing from the
353            // original it is of course allowed to be missing from persistent storage and thus from
354            // the latter).
355            let block_height = NodeDataSource::block_height(&ds).await.unwrap();
356            assert_eq!(
357                ds.get_block_range(..block_height)
358                    .await
359                    .map(|fetch| fetch.try_resolve().ok())
360                    .collect::<Vec<_>>()
361                    .await,
362                storage
363                    .get_block_range(..block_height)
364                    .await
365                    .map(|fetch| fetch.try_resolve().ok())
366                    .collect::<Vec<_>>()
367                    .await
368            );
369            assert_eq!(
370                ds.get_leaf_range(..block_height)
371                    .await
372                    .map(|fetch| fetch.try_resolve().ok())
373                    .collect::<Vec<_>>()
374                    .await,
375                storage
376                    .get_leaf_range(..block_height)
377                    .await
378                    .map(|fetch| fetch.try_resolve().ok())
379                    .collect::<Vec<_>>()
380                    .await
381            );
382        }
383    }
384
385    #[test_log::test(tokio::test(flavor = "multi_thread"))]
386    pub async fn test_range<D: TestableDataSource>()
387    where
388        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
389    {
390        let mut network = MockNetwork::<D, MockVersions>::init().await;
391        let ds = network.data_source();
392        network.start().await;
393
394        // Wait for there to be at least 3 blocks.
395        let block_height = loop {
396            let mut tx = ds.read().await.unwrap();
397            let block_height = tx.block_height().await.unwrap();
398            if block_height >= 3 {
399                break block_height as u64;
400            }
401        };
402
403        // Query for a variety of ranges testing all cases of included, excluded, and unbounded
404        // starting and ending bounds
405        do_range_test(&ds, 1..=2, 1..3).await; // (inclusive, inclusive)
406        do_range_test(&ds, 1..3, 1..3).await; // (inclusive, exclusive)
407        do_range_test(&ds, 1.., 1..block_height).await; // (inclusive, unbounded)
408        do_range_test(&ds, ..=2, 0..3).await; // (unbounded, inclusive)
409        do_range_test(&ds, ..3, 0..3).await; // (unbounded, exclusive)
410        do_range_test(&ds, .., 0..block_height).await; // (unbounded, unbounded)
411        do_range_test(&ds, ExRange(0..=2), 1..3).await; // (exclusive, inclusive)
412        do_range_test(&ds, ExRange(0..3), 1..3).await; // (exclusive, exclusive)
413        do_range_test(&ds, ExRange(0..), 1..block_height).await; // (exclusive, unbounded)
414    }
415
416    async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
417    where
418        D: TestableDataSource,
419        R: RangeBounds<usize> + Clone + Debug + Send + 'static,
420        I: IntoIterator<Item = u64>,
421    {
422        tracing::info!("testing range {range:?}");
423
424        let mut leaves = ds.get_leaf_range(range.clone()).await;
425        let mut blocks = ds.get_block_range(range.clone()).await;
426        let mut payloads = ds.get_payload_range(range.clone()).await;
427        let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
428        let mut vid_common = ds.get_vid_common_range(range.clone()).await;
429        let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
430
431        for i in expected_indices {
432            tracing::info!(i, "check entries");
433            let leaf = leaves.next().await.unwrap().await;
434            let block = blocks.next().await.unwrap().await;
435            let payload = payloads.next().await.unwrap().await;
436            let payload_meta = payloads_meta.next().await.unwrap().await;
437            let common = vid_common.next().await.unwrap().await;
438            let common_meta = vid_common_meta.next().await.unwrap().await;
439            assert_eq!(leaf.height(), i);
440            assert_eq!(block.height(), i);
441            assert_eq!(payload, ds.get_payload(i as usize).await.await);
442            assert_eq!(payload_meta, block.into());
443            assert_eq!(common, ds.get_vid_common(i as usize).await.await);
444            assert_eq!(common_meta, common.into());
445        }
446
447        if range.end_bound() == Bound::Unbounded {
448            // If the range is unbounded, the stream should continue, eventually reaching a point at
449            // which further objects are not yet available, and yielding pending futures from there.
450            loop {
451                let fetch_leaf = leaves.next().await.unwrap();
452                let fetch_block = blocks.next().await.unwrap();
453                let fetch_payload = payloads.next().await.unwrap();
454                let fetch_payload_meta = payloads_meta.next().await.unwrap();
455                let fetch_common = vid_common.next().await.unwrap();
456                let fetch_common_meta = vid_common_meta.next().await.unwrap();
457
458                if fetch_leaf.try_resolve().is_ok()
459                    && fetch_block.try_resolve().is_ok()
460                    && fetch_payload.try_resolve().is_ok()
461                    && fetch_payload_meta.try_resolve().is_ok()
462                    && fetch_common.try_resolve().is_ok()
463                    && fetch_common_meta.try_resolve().is_ok()
464                {
465                    tracing::info!("searching for end of available objects");
466                } else {
467                    break;
468                }
469            }
470        } else {
471            // If the range is bounded, it should end where expected.
472            assert!(leaves.next().await.is_none());
473            assert!(blocks.next().await.is_none());
474            assert!(payloads.next().await.is_none());
475            assert!(payloads_meta.next().await.is_none());
476            assert!(vid_common.next().await.is_none());
477            assert!(vid_common_meta.next().await.is_none());
478        }
479    }
480
481    #[test_log::test(tokio::test(flavor = "multi_thread"))]
482    pub async fn test_range_rev<D: TestableDataSource>()
483    where
484        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
485    {
486        let mut network = MockNetwork::<D, MockVersions>::init().await;
487        let ds = network.data_source();
488        network.start().await;
489
490        // Wait for there to be at least 5 blocks.
491        ds.subscribe_leaves(5).await.next().await.unwrap();
492
493        // Test inclusive, exclusive and unbounded lower bound.
494        do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
495        do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
496        do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
497    }
498
499    async fn do_range_rev_test<D>(
500        ds: &D,
501        start: Bound<usize>,
502        end: usize,
503        expected_indices: impl DoubleEndedIterator<Item = u64>,
504    ) where
505        D: TestableDataSource,
506    {
507        tracing::info!("testing range {start:?}-{end}");
508
509        let mut leaves = ds.get_leaf_range_rev(start, end).await;
510        let mut blocks = ds.get_block_range_rev(start, end).await;
511        let mut payloads = ds.get_payload_range_rev(start, end).await;
512        let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
513        let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
514        let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
515
516        for i in expected_indices.rev() {
517            tracing::info!(i, "check entries");
518            let leaf = leaves.next().await.unwrap().await;
519            let block = blocks.next().await.unwrap().await;
520            let payload = payloads.next().await.unwrap().await;
521            let payload_meta = payloads_meta.next().await.unwrap().await;
522            let common = vid_common.next().await.unwrap().await;
523            let common_meta = vid_common_meta.next().await.unwrap().await;
524            assert_eq!(leaf.height(), i);
525            assert_eq!(block.height(), i);
526            assert_eq!(payload.height(), i);
527            assert_eq!(payload_meta.height(), i);
528            assert_eq!(common, ds.get_vid_common(i as usize).await.await);
529            assert_eq!(
530                common_meta,
531                ds.get_vid_common_metadata(i as usize).await.await
532            );
533        }
534
535        // The range should end where expected.
536        assert!(leaves.next().await.is_none());
537        assert!(blocks.next().await.is_none());
538        assert!(payloads.next().await.is_none());
539        assert!(payloads_meta.next().await.is_none());
540        assert!(vid_common.next().await.is_none());
541        assert!(vid_common_meta.next().await.is_none());
542    }
543
544    // A wrapper around a range that turns the lower bound from inclusive to exclusive.
545    #[derive(Clone, Copy, Debug)]
546    struct ExRange<R>(R);
547
548    impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
549        fn start_bound(&self) -> Bound<&usize> {
550            match self.0.start_bound() {
551                Bound::Included(x) => Bound::Excluded(x),
552                Bound::Excluded(x) => Bound::Excluded(x),
553                Bound::Unbounded => Bound::Excluded(&0),
554            }
555        }
556
557        fn end_bound(&self) -> Bound<&usize> {
558            self.0.end_bound()
559        }
560    }
561}
562
563/// Generic tests we can instantiate for any data source with reliable, versioned persistent storage.
564#[cfg(any(test, feature = "testing"))]
565#[espresso_macros::generic_tests]
566pub mod persistence_tests {
567    use committable::Committable;
568    use hotshot_example_types::state_types::{TestInstanceState, TestValidatedState};
569    use hotshot_types::simple_certificate::QuorumCertificate2;
570
571    use crate::{
572        availability::{BlockQueryData, LeafQueryData},
573        data_source::{
574            storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
575            Transaction,
576        },
577        node::NodeDataSource,
578        testing::{
579            consensus::TestableDataSource,
580            mocks::{MockPayload, MockTypes},
581        },
582        types::HeightIndexed,
583        Leaf2,
584    };
585
586    #[test_log::test(tokio::test(flavor = "multi_thread"))]
587    pub async fn test_revert<D: TestableDataSource>()
588    where
589        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
590            + AvailabilityStorage<MockTypes>
591            + NodeStorage<MockTypes>,
592    {
593        use hotshot_example_types::node_types::TestVersions;
594
595        let storage = D::create(0).await;
596        let ds = D::connect(&storage).await;
597
598        // Mock up some consensus data.
599        let mut qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
600            &TestValidatedState::default(),
601            &TestInstanceState::default(),
602        )
603        .await;
604        let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
605            &TestValidatedState::default(),
606            &TestInstanceState::default(),
607        )
608        .await;
609        // Increment the block number, to distinguish this block from the genesis block, which
610        // already exists.
611        leaf.block_header_mut().block_number += 1;
612        qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
613
614        let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
615        let leaf = LeafQueryData::new(leaf, qc).unwrap();
616
617        // Insert, but do not commit, some data and check that we can read it back.
618        let mut tx = ds.write().await.unwrap();
619        tx.insert_leaf(leaf.clone()).await.unwrap();
620        tx.insert_block(block.clone()).await.unwrap();
621
622        assert_eq!(tx.block_height().await.unwrap(), 2);
623        assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
624        assert_eq!(block, tx.get_block(1.into()).await.unwrap());
625
626        // Revert the changes.
627        tx.revert().await;
628        assert_eq!(
629            NodeDataSource::<MockTypes>::block_height(&ds)
630                .await
631                .unwrap(),
632            0
633        );
634        ds.get_leaf(1).await.try_resolve().unwrap_err();
635        ds.get_block(1).await.try_resolve().unwrap_err();
636    }
637
638    #[test_log::test(tokio::test(flavor = "multi_thread"))]
639    pub async fn test_reset<D: TestableDataSource>()
640    where
641        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
642    {
643        use hotshot_example_types::node_types::TestVersions;
644
645        let storage = D::create(0).await;
646        let ds = D::connect(&storage).await;
647
648        // Mock up some consensus data.
649        let mut qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
650            &TestValidatedState::default(),
651            &TestInstanceState::default(),
652        )
653        .await;
654        let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
655            &TestValidatedState::default(),
656            &TestInstanceState::default(),
657        )
658        .await;
659        // Increment the block number, to distinguish this block from the genesis block, which
660        // already exists.
661        leaf.block_header_mut().block_number += 1;
662        qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
663
664        let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
665        let leaf = LeafQueryData::new(leaf, qc).unwrap();
666
667        // Insert some data and check that we can read it back.
668        let mut tx = ds.write().await.unwrap();
669        tx.insert_leaf(leaf.clone()).await.unwrap();
670        tx.insert_block(block.clone()).await.unwrap();
671        tx.commit().await.unwrap();
672
673        assert_eq!(
674            NodeDataSource::<MockTypes>::block_height(&ds)
675                .await
676                .unwrap(),
677            2
678        );
679        assert_eq!(leaf, ds.get_leaf(1).await.await);
680        assert_eq!(block, ds.get_block(1).await.await);
681
682        drop(ds);
683
684        // Reset and check that the changes are gone.
685        let ds = D::reset(&storage).await;
686        assert_eq!(
687            NodeDataSource::<MockTypes>::block_height(&ds)
688                .await
689                .unwrap(),
690            0
691        );
692        ds.get_leaf(1).await.try_resolve().unwrap_err();
693        ds.get_block(1).await.try_resolve().unwrap_err();
694    }
695
696    #[test_log::test(tokio::test(flavor = "multi_thread"))]
697    pub async fn test_drop_tx<D: TestableDataSource>()
698    where
699        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
700            + AvailabilityStorage<MockTypes>
701            + NodeStorage<MockTypes>,
702        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
703    {
704        use hotshot_example_types::node_types::TestVersions;
705
706        let storage = D::create(0).await;
707        let ds = D::connect(&storage).await;
708
709        // Mock up some consensus data.
710        let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
711            &TestValidatedState::default(),
712            &TestInstanceState::default(),
713        )
714        .await;
715        let mut mock_leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
716            &TestValidatedState::default(),
717            &TestInstanceState::default(),
718        )
719        .await;
720        // Increment the block number, to distinguish this block from the genesis block, which
721        // already exists.
722        mock_leaf.block_header_mut().block_number += 1;
723        mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
724
725        let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
726        let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
727
728        // Insert, but do not commit, some data and check that we can read it back.
729        tracing::info!("write");
730        let mut tx = ds.write().await.unwrap();
731        tx.insert_leaf(leaf.clone()).await.unwrap();
732        tx.insert_block(block.clone()).await.unwrap();
733
734        assert_eq!(tx.block_height().await.unwrap(), 2);
735        assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
736        assert_eq!(block, tx.get_block(1.into()).await.unwrap());
737
738        // Drop the transaction, causing a revert.
739        drop(tx);
740
741        // Open a new transaction and check that the changes are reverted.
742        tracing::info!("read");
743        let mut tx = ds.read().await.unwrap();
744        assert_eq!(tx.block_height().await.unwrap(), 0);
745        drop(tx);
746
747        // Get a mutable transaction again, insert different data.
748        mock_leaf.block_header_mut().block_number += 1;
749        mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
750        let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
751        let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
752
753        tracing::info!("write again");
754        let mut tx = ds.write().await.unwrap();
755        tx.insert_leaf(leaf.clone()).await.unwrap();
756        tx.insert_block(block.clone()).await.unwrap();
757        tx.commit().await.unwrap();
758
759        // Read the data back. We should have _only_ the data that was written in the final
760        // transaction.
761        tracing::info!("read again");
762        let height = leaf.height() as usize;
763        assert_eq!(
764            NodeDataSource::<MockTypes>::block_height(&ds)
765                .await
766                .unwrap(),
767            height + 1
768        );
769        assert_eq!(leaf, ds.get_leaf(height).await.await);
770        assert_eq!(block, ds.get_block(height).await.await);
771        ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
772        ds.get_block(height - 1).await.try_resolve().unwrap_err();
773    }
774}
775
776/// Generic tests we can instantiate for all the node data sources.
777#[cfg(any(test, feature = "testing"))]
778#[espresso_macros::generic_tests]
779pub mod node_tests {
780    use std::time::Duration;
781
782    use committable::Committable;
783    use futures::{future::join_all, stream::StreamExt};
784    use hotshot::traits::BlockPayload;
785    use hotshot_example_types::{
786        block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
787        node_types::{TestTypes, TestVersions},
788        state_types::{TestInstanceState, TestValidatedState},
789    };
790    use hotshot_types::{
791        data::{vid_commitment, VidCommitment, VidShare, ViewNumber},
792        simple_certificate::{CertificatePair, QuorumCertificate2},
793        traits::{
794            block_contents::{BlockHeader, EncodeBytes},
795            node_implementation::{ConsensusTime, Versions},
796        },
797        vid::advz::{advz_scheme, ADVZScheme},
798    };
799    use jf_advz::VidScheme;
800    use vbs::version::StaticVersionType;
801
802    use crate::{
803        availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
804        data_source::{
805            storage::{NodeStorage, UpdateAvailabilityStorage},
806            update::Transaction,
807        },
808        node::{BlockId, NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
809        testing::{
810            consensus::{MockNetwork, TestableDataSource},
811            mocks::{mock_transaction, MockPayload, MockTypes, MockVersions},
812            sleep,
813        },
814        types::HeightIndexed,
815        Header, Leaf2, VidCommon,
816    };
817
818    fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
819        <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
820    }
821
822    #[test_log::test(tokio::test(flavor = "multi_thread"))]
823    pub async fn test_sync_status<D: TestableDataSource>()
824    where
825        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
826    {
827        use hotshot_example_types::node_types::TestVersions;
828
829        let storage = D::create(0).await;
830        let ds = D::connect(&storage).await;
831
832        // Set up a mock VID scheme to use for generating test data.
833        let mut vid = advz_scheme(2);
834
835        // Generate some mock leaves and blocks to insert.
836        let mut leaves = vec![
837            LeafQueryData::<MockTypes>::genesis::<TestVersions>(
838                &TestValidatedState::default(),
839                &TestInstanceState::default(),
840            )
841            .await,
842        ];
843        let mut blocks = vec![
844            BlockQueryData::<MockTypes>::genesis::<TestVersions>(
845                &TestValidatedState::default(),
846                &TestInstanceState::default(),
847            )
848            .await,
849        ];
850        for i in 0..2 {
851            let mut leaf = leaves[i].clone();
852            leaf.leaf.block_header_mut().block_number += 1;
853            leaves.push(leaf);
854
855            let mut block = blocks[i].clone();
856            block.header.block_number += 1;
857            blocks.push(block);
858        }
859        // Generate mock VID data. We reuse the same (empty) payload for each block, but with
860        // different metadata.
861        let disperse = vid.disperse([]).unwrap();
862        let vid = leaves
863            .iter()
864            .map(|leaf| {
865                (
866                    VidCommonQueryData::new(
867                        leaf.header().clone(),
868                        VidCommon::V0(disperse.common.clone()),
869                    ),
870                    disperse.shares[0].clone(),
871                )
872            })
873            .collect::<Vec<_>>();
874
875        // At first, the node is fully synced.
876        assert!(ds.sync_status().await.unwrap().is_fully_synced());
877
878        // Insert a leaf without the corresponding block or VID info, make sure we detect that the
879        // block and VID info are missing.
880        ds.append(leaves[0].clone().into()).await.unwrap();
881        assert_eq!(
882            ds.sync_status().await.unwrap(),
883            SyncStatus {
884                missing_blocks: 1,
885                missing_vid_common: 1,
886                missing_vid_shares: 1,
887                missing_leaves: 0,
888                pruned_height: None,
889            }
890        );
891
892        // Insert a leaf whose height is not the successor of the previous leaf. We should now
893        // detect that the leaf in between is missing (along with all _three_ corresponding blocks).
894        ds.append(leaves[2].clone().into()).await.unwrap();
895        assert_eq!(
896            ds.sync_status().await.unwrap(),
897            SyncStatus {
898                missing_blocks: 3,
899                missing_vid_common: 3,
900                missing_vid_shares: 3,
901                missing_leaves: 1,
902                pruned_height: None,
903            }
904        );
905
906        // Insert VID common without a corresponding share.
907        {
908            let mut tx = ds.write().await.unwrap();
909            tx.insert_vid(vid[0].0.clone(), None).await.unwrap();
910            tx.commit().await.unwrap();
911        }
912        assert_eq!(
913            ds.sync_status().await.unwrap(),
914            SyncStatus {
915                missing_blocks: 3,
916                missing_vid_common: 2,
917                missing_vid_shares: 3,
918                missing_leaves: 1,
919                pruned_height: None,
920            }
921        );
922
923        // Rectify the missing data.
924        {
925            let mut tx = ds.write().await.unwrap();
926            tx.insert_block(blocks[0].clone()).await.unwrap();
927            tx.insert_vid(vid[0].0.clone(), Some(VidShare::V0(vid[0].1.clone())))
928                .await
929                .unwrap();
930            tx.insert_leaf(leaves[1].clone()).await.unwrap();
931            tx.insert_block(blocks[1].clone()).await.unwrap();
932            tx.insert_vid(vid[1].0.clone(), Some(VidShare::V0(vid[1].1.clone())))
933                .await
934                .unwrap();
935            tx.insert_block(blocks[2].clone()).await.unwrap();
936            tx.insert_vid(vid[2].0.clone(), Some(VidShare::V0(vid[2].1.clone())))
937                .await
938                .unwrap();
939            tx.commit().await.unwrap();
940        }
941
942        // Some data sources (e.g. file system) don't support out-of-order insertion of missing
943        // data. These would have just ignored the insertion of `vid[0]` (the share) and
944        // `leaves[1]`. Detect if this is the case; then we allow 1 missing leaf and 1 missing VID
945        // share.
946        let expected_missing = if ds.get_leaf(1).await.try_resolve().is_err() {
947            tracing::warn!(
948                "data source does not support out-of-order filling, allowing one missing leaf and \
949                 VID share"
950            );
951            1
952        } else {
953            0
954        };
955        let expected_sync_status = SyncStatus {
956            missing_blocks: 0,
957            missing_leaves: expected_missing,
958            missing_vid_common: 0,
959            missing_vid_shares: expected_missing,
960            pruned_height: None,
961        };
962        assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
963
964        // If we re-insert one of the VID entries without a share, it should not overwrite the share
965        // that we already have; that is, `insert_vid` should be monotonic.
966        {
967            let mut tx = ds.write().await.unwrap();
968            tx.insert_vid(vid[0].0.clone(), None).await.unwrap();
969            tx.commit().await.unwrap();
970        }
971        assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
972    }
973
974    #[test_log::test(tokio::test(flavor = "multi_thread"))]
975    pub async fn test_counters<D: TestableDataSource>() {
976        use hotshot_example_types::node_types::TestVersions;
977
978        let storage = D::create(0).await;
979        let ds = D::connect(&storage).await;
980
981        assert_eq!(ds.count_transactions().await.unwrap(), 0);
982        assert_eq!(ds.payload_size().await.unwrap(), 0);
983
984        // Insert some transactions.
985        let mut total_transactions = 0;
986        let mut total_size = 0;
987        'outer: for i in [0, 1, 2] {
988            // Using `i % 2` as the transaction data ensures we insert a duplicate transaction
989            // (since we insert more than 2 transactions total). The query service should still
990            // count these as separate transactions and should include both duplicates when
991            // computing the total size.
992            let (payload, metadata) =
993                <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
994                    [mock_transaction(vec![i as u8 % 2])],
995                    &TestValidatedState::default(),
996                    &TestInstanceState::default(),
997                )
998                .await
999                .unwrap();
1000            let encoded = payload.encode();
1001            let payload_commitment = vid_commitment::<TestVersions>(
1002                &encoded,
1003                &metadata.encode(),
1004                1,
1005                <TestVersions as Versions>::Base::VERSION,
1006            );
1007            let header = TestBlockHeader {
1008                block_number: i,
1009                payload_commitment,
1010                timestamp: i,
1011                timestamp_millis: i * 1_000,
1012                builder_commitment:
1013                    <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
1014                        &payload, &metadata,
1015                    ),
1016                metadata: TestMetadata {
1017                    num_transactions: 7, // arbitrary
1018                },
1019                random: 1, // arbitrary
1020                version: <TestVersions as Versions>::Base::VERSION,
1021            };
1022
1023            let mut leaf = LeafQueryData::<MockTypes>::genesis::<TestVersions>(
1024                &TestValidatedState::default(),
1025                &TestInstanceState::default(),
1026            )
1027            .await;
1028            *leaf.leaf.block_header_mut() = header.clone();
1029            let block = BlockQueryData::new(header, payload);
1030            ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1031                .await
1032                .unwrap();
1033            assert_eq!(
1034                NodeDataSource::<MockTypes>::block_height(&ds)
1035                    .await
1036                    .unwrap(),
1037                (i + 1) as usize,
1038            );
1039
1040            total_transactions += 1;
1041            total_size += encoded.len();
1042
1043            // Allow some time for the aggregator to update.
1044            for retry in 0..5 {
1045                let ds_transactions = ds.count_transactions().await.unwrap();
1046                let ds_payload_size = ds.payload_size().await.unwrap();
1047                if ds_transactions != total_transactions || ds_payload_size != total_size {
1048                    tracing::info!(
1049                        i,
1050                        retry,
1051                        total_transactions,
1052                        ds_transactions,
1053                        total_size,
1054                        ds_payload_size,
1055                        "waiting for statistics to update"
1056                    );
1057                    sleep(Duration::from_secs(1)).await;
1058                } else {
1059                    continue 'outer;
1060                }
1061            }
1062            panic!("counters did not update in time");
1063        }
1064    }
1065
1066    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1067    pub async fn test_vid_shares<D: TestableDataSource>()
1068    where
1069        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1070    {
1071        let mut network = MockNetwork::<D, MockVersions>::init().await;
1072        let ds = network.data_source();
1073
1074        network.start().await;
1075
1076        // Check VID shares for a few blocks.
1077        let mut leaves = ds.subscribe_leaves(0).await.take(3);
1078        while let Some(leaf) = leaves.next().await {
1079            tracing::info!("got leaf {}", leaf.height());
1080            let mut tx = ds.read().await.unwrap();
1081            let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1082            assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1083            assert_eq!(
1084                share,
1085                tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1086                    .await
1087                    .unwrap()
1088            );
1089        }
1090    }
1091
1092    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1093    pub async fn test_vid_monotonicity<D: TestableDataSource>()
1094    where
1095        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1096        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1097    {
1098        use hotshot_example_types::node_types::TestVersions;
1099
1100        let storage = D::create(0).await;
1101        let ds = D::connect(&storage).await;
1102
1103        // Generate some test VID data.
1104        let mut vid = advz_scheme(2);
1105        let disperse = vid.disperse([]).unwrap();
1106
1107        // Insert test data with VID common and a share.
1108        let leaf = LeafQueryData::<MockTypes>::genesis::<TestVersions>(
1109            &TestValidatedState::default(),
1110            &TestInstanceState::default(),
1111        )
1112        .await;
1113        let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1114        ds.append(BlockInfo::new(
1115            leaf,
1116            None,
1117            Some(common.clone()),
1118            Some(VidShare::V0(disperse.shares[0].clone())),
1119        ))
1120        .await
1121        .unwrap();
1122
1123        {
1124            assert_eq!(ds.get_vid_common(0).await.await, common);
1125            assert_eq!(
1126                ds.vid_share(0).await.unwrap(),
1127                VidShare::V0(disperse.shares[0].clone())
1128            );
1129        }
1130
1131        // Re-insert the common data, without a share. This should not overwrite the share we
1132        // already have.
1133        {
1134            let mut tx = ds.write().await.unwrap();
1135            tx.insert_vid(common.clone(), None).await.unwrap();
1136            tx.commit().await.unwrap();
1137        }
1138        {
1139            assert_eq!(ds.get_vid_common(0).await.await, common);
1140            assert_eq!(
1141                ds.vid_share(0).await.unwrap(),
1142                VidShare::V0(disperse.shares[0].clone())
1143            );
1144        }
1145    }
1146
1147    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1148    pub async fn test_vid_recovery<D: TestableDataSource>()
1149    where
1150        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1151    {
1152        let mut network = MockNetwork::<D, MockVersions>::init().await;
1153        let ds = network.data_source();
1154
1155        network.start().await;
1156
1157        // Submit a transaction so we can try to recover a non-empty block.
1158        let mut blocks = ds.subscribe_blocks(0).await;
1159        let txn = mock_transaction(vec![1, 2, 3]);
1160        network.submit_transaction(txn.clone()).await;
1161
1162        // Wait for the transaction to be finalized.
1163        let block = loop {
1164            tracing::info!("waiting for transaction");
1165            let block = blocks.next().await.unwrap();
1166            if !block.is_empty() {
1167                tracing::info!(height = block.height(), "transaction sequenced");
1168                break block;
1169            }
1170            tracing::info!(height = block.height(), "empty block");
1171        };
1172        let height = block.height() as usize;
1173        let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1174            commit
1175        } else {
1176            panic!("expect ADVZ commitment")
1177        };
1178
1179        // Set up a test VID scheme.
1180        let vid = advz_scheme(network.num_nodes());
1181
1182        // Get VID common data and verify it.
1183        tracing::info!("fetching common data");
1184        let common = ds.get_vid_common(height).await.await;
1185        let VidCommon::V0(common) = &common.common() else {
1186            panic!("expect ADVZ common");
1187        };
1188        ADVZScheme::is_consistent(&commit, common).unwrap();
1189
1190        // Collect shares from each node.
1191        tracing::info!("fetching shares");
1192        let network = &network;
1193        let vid = &vid;
1194        let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1195            let ds = network.data_source_index(i);
1196
1197            // Wait until the node has processed up to the desired block; since we have thus far
1198            // only interacted with node 0, it is possible other nodes are slightly behind.
1199            let mut leaves = ds.subscribe_leaves(height).await;
1200            let leaf = leaves.next().await.unwrap();
1201            assert_eq!(leaf.height(), height as u64);
1202            assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1203
1204            let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1205                share
1206            } else {
1207                panic!("expect ADVZ share")
1208            };
1209            vid.verify_share(&share, common, &commit).unwrap().unwrap();
1210            share
1211        }))
1212        .await;
1213
1214        // Recover payload.
1215        tracing::info!("recovering payload");
1216        let bytes = vid.recover_payload(&shares, common).unwrap();
1217        let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1218            &bytes,
1219            &TestMetadata {
1220                num_transactions: 7, // arbitrary
1221            },
1222        );
1223        assert_eq!(recovered, *block.payload());
1224        assert_eq!(recovered.transactions, vec![txn]);
1225    }
1226
1227    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1228    pub async fn test_timestamp_window<D: TestableDataSource>() {
1229        let mut network = MockNetwork::<D, MockVersions>::init().await;
1230        let ds = network.data_source();
1231
1232        network.start().await;
1233
1234        // Wait for blocks with at least three different timestamps to be sequenced. This lets us
1235        // test all the edge cases.
1236        let mut leaves = ds.subscribe_leaves(0).await;
1237        // `test_blocks` is a list of lists of headers with the same timestamp. The flattened list
1238        // of headers is contiguous.
1239        let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1240        while test_blocks.len() < 3 {
1241            // Wait for the next block to be sequenced.
1242            let leaf = leaves.next().await.unwrap();
1243            let header = leaf.header().clone();
1244            if let Some(last_timestamp) = test_blocks.last_mut() {
1245                if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1246                    == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1247                {
1248                    last_timestamp.push(header);
1249                } else {
1250                    test_blocks.push(vec![header]);
1251                }
1252            } else {
1253                test_blocks.push(vec![header]);
1254            }
1255        }
1256        tracing::info!("blocks for testing: {test_blocks:#?}");
1257
1258        // Define invariants that every response should satisfy.
1259        let check_invariants =
1260            |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1261                let mut prev = res.prev.as_ref();
1262                if let Some(prev) = prev {
1263                    if check_prev {
1264                        assert!(block_header_timestamp(prev) < start);
1265                    }
1266                } else {
1267                    // `prev` can only be `None` if the first block in the window is the genesis
1268                    // block.
1269                    assert_eq!(res.from().unwrap(), 0);
1270                };
1271                for header in &res.window {
1272                    assert!(start <= block_header_timestamp(header));
1273                    assert!(block_header_timestamp(header) < end);
1274                    if let Some(prev) = prev {
1275                        assert!(
1276                            <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1277                                <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1278                        );
1279                    }
1280                    prev = Some(header);
1281                }
1282                if let Some(next) = &res.next {
1283                    assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1284                    // If there is a `next`, there must be at least one previous block (either `prev`
1285                    // itself or the last block if the window is nonempty), so we can `unwrap` here.
1286                    assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1287                }
1288            };
1289
1290        let get_window = |start, end| {
1291            let ds = ds.clone();
1292            async move {
1293                let window = ds
1294                    .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1295                    .await
1296                    .unwrap();
1297                tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1298                check_invariants(&window, start, end, true);
1299                window
1300            }
1301        };
1302
1303        // Case 0: happy path. All blocks are available, including prev and next.
1304        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1305        let end = start + 1;
1306        let res = get_window(start, end).await;
1307        assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1308        assert_eq!(res.window, test_blocks[1]);
1309        assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1310
1311        // Case 1: no `prev`, start of window is before genesis.
1312        let start = 0;
1313        let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1314        let res = get_window(start, end).await;
1315        assert_eq!(res.prev, None);
1316        assert_eq!(res.window, test_blocks[0]);
1317        assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1318
1319        // Case 2: no `next`, end of window is after the most recently sequenced block.
1320        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1321        let end = i64::MAX as u64;
1322        let res = get_window(start, end).await;
1323        assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1324        // There may have been more blocks sequenced since we grabbed `test_blocks`, so just check
1325        // that the prefix of the window is correct.
1326        assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1327        assert_eq!(res.next, None);
1328        // Fetch more blocks using the `from` form of the endpoint. Start from the last block we had
1329        // previously (ie fetch a slightly overlapping window) to ensure there is at least one block
1330        // in the new window.
1331        let from = test_blocks.iter().flatten().count() - 1;
1332        let more = ds
1333            .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1334            .await
1335            .unwrap();
1336        check_invariants(&more, start, end, false);
1337        assert_eq!(
1338            more.prev.as_ref().unwrap(),
1339            test_blocks.iter().flatten().nth(from - 1).unwrap()
1340        );
1341        assert_eq!(
1342            more.window[..res.window.len() - test_blocks[2].len() + 1],
1343            res.window[test_blocks[2].len() - 1..]
1344        );
1345        assert_eq!(res.next, None);
1346        // We should get the same result whether we query by block height or hash.
1347        let more2 = ds
1348            .get_header_window(
1349                test_blocks[2].last().unwrap().commit(),
1350                end,
1351                i64::MAX as usize,
1352            )
1353            .await
1354            .unwrap();
1355        check_invariants(&more2, start, end, false);
1356        assert_eq!(more2.from().unwrap(), more.from().unwrap());
1357        assert_eq!(more2.prev, more.prev);
1358        assert_eq!(more2.next, more.next);
1359        assert_eq!(more2.window[..more.window.len()], more.window);
1360
1361        // Case 3: the window is empty.
1362        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1363        let end = start;
1364        let res = get_window(start, end).await;
1365        assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1366        assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1367        assert_eq!(res.window, vec![]);
1368
1369        // Case 4: no relevant blocks are available yet.
1370        ds.get_header_window(
1371            WindowStart::Time((i64::MAX - 1) as u64),
1372            i64::MAX as u64,
1373            i64::MAX as usize,
1374        )
1375        .await
1376        .unwrap_err();
1377
1378        // Case 5: limits.
1379        let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1380            .into_iter()
1381            .flatten()
1382            .collect::<Vec<_>>();
1383        // Make a query that would return everything, but gets limited.
1384        let start = block_header_timestamp(&blocks[0]);
1385        let end = block_header_timestamp(&test_blocks[2][0]);
1386        let res = ds
1387            .get_header_window(WindowStart::Time(start), end, 1)
1388            .await
1389            .unwrap();
1390        assert_eq!(res.prev, None);
1391        assert_eq!(res.window, [blocks[0].clone()]);
1392        assert_eq!(res.next, None);
1393        // Query the next page of results, get limited again.
1394        let res = ds
1395            .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1396            .await
1397            .unwrap();
1398        assert_eq!(res.window, [blocks[1].clone()]);
1399        assert_eq!(res.next, None);
1400        // Get the rest of the results.
1401        let res = ds
1402            .get_header_window(
1403                WindowStart::Height(blocks[1].height() + 1),
1404                end,
1405                blocks.len() - 1,
1406            )
1407            .await
1408            .unwrap();
1409        assert_eq!(res.window, blocks[2..].to_vec());
1410        assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1411    }
1412
1413    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1414    pub async fn test_latest_qc_chain<D: TestableDataSource>()
1415    where
1416        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1417        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1418    {
1419        let storage = D::create(0).await;
1420        let ds = D::connect(&storage).await;
1421
1422        {
1423            let mut tx = ds.read().await.unwrap();
1424            assert_eq!(tx.latest_qc_chain().await.unwrap(), None);
1425        }
1426
1427        async fn leaf_with_qc_chain(
1428            number: u64,
1429        ) -> (LeafQueryData<MockTypes>, [CertificatePair<MockTypes>; 2]) {
1430            let mut leaf = Leaf2::<MockTypes>::genesis::<TestVersions>(
1431                &Default::default(),
1432                &Default::default(),
1433            )
1434            .await;
1435            leaf.block_header_mut().block_number = number;
1436
1437            let mut qc1 = QuorumCertificate2::<MockTypes>::genesis::<TestVersions>(
1438                &Default::default(),
1439                &Default::default(),
1440            )
1441            .await;
1442            qc1.view_number = ViewNumber::new(1);
1443            qc1.data.leaf_commit = Committable::commit(&leaf);
1444
1445            let mut qc2 = qc1.clone();
1446            qc2.view_number += 1;
1447
1448            let leaf = LeafQueryData::new(leaf, qc1.clone()).unwrap();
1449            (
1450                leaf,
1451                [
1452                    CertificatePair::non_epoch_change(qc1),
1453                    CertificatePair::non_epoch_change(qc2),
1454                ],
1455            )
1456        }
1457
1458        // Insert a leaf with QC chain.
1459        {
1460            let (leaf, qcs) = leaf_with_qc_chain(2).await;
1461            let mut tx = ds.write().await.unwrap();
1462            tx.insert_leaf_with_qc_chain(leaf, Some(qcs.clone()))
1463                .await
1464                .unwrap();
1465            tx.commit().await.unwrap();
1466
1467            assert_eq!(
1468                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1469                Some(qcs)
1470            );
1471        }
1472
1473        // Insert a later leaf without a QC chain. This should clear the previously saved QC chain,
1474        // which is no longer up to date.
1475        {
1476            let (leaf, _) = leaf_with_qc_chain(3).await;
1477            let mut tx = ds.write().await.unwrap();
1478            tx.insert_leaf_with_qc_chain(leaf, None).await.unwrap();
1479            tx.commit().await.unwrap();
1480
1481            assert_eq!(
1482                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1483                None
1484            );
1485        }
1486
1487        // Insert an earlier leaf with a QC chain. This should not be saved since it is not the
1488        // latest leaf.
1489        {
1490            let (leaf, qcs) = leaf_with_qc_chain(1).await;
1491            let mut tx = ds.write().await.unwrap();
1492            tx.insert_leaf_with_qc_chain(leaf, Some(qcs)).await.unwrap();
1493            tx.commit().await.unwrap();
1494
1495            assert_eq!(
1496                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1497                None
1498            );
1499        }
1500    }
1501}
1502
1503/// Generic tests we can instantiate for all the status data sources.
1504#[cfg(any(test, feature = "testing"))]
1505#[espresso_macros::generic_tests]
1506pub mod status_tests {
1507    use std::time::Duration;
1508
1509    use crate::{
1510        status::StatusDataSource,
1511        testing::{
1512            consensus::{DataSourceLifeCycle, MockNetwork},
1513            mocks::{mock_transaction, MockVersions},
1514            sleep,
1515        },
1516    };
1517
1518    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1519    pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1520        let mut network = MockNetwork::<D, MockVersions>::init().await;
1521        let ds = network.data_source();
1522
1523        {
1524            // Check that block height is initially zero.
1525            assert_eq!(ds.block_height().await.unwrap(), 0);
1526            // With consensus paused, check that the success rate returns NAN (since the block
1527            // height, the numerator, is 0, and the view number, the denominator, is 0).
1528            assert!(ds.success_rate().await.unwrap().is_nan());
1529            // Since there is no block produced, "last_decided_time" metric is 0.
1530            // Therefore, the elapsed time since the last block should be close to the time elapsed since the Unix epoch.
1531            assert!(
1532                (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1533                    - chrono::Utc::now().timestamp())
1534                .abs()
1535                    <= 1,
1536                "time elapsed since last_decided_time is not within 1s"
1537            );
1538        }
1539
1540        // Submit a transaction
1541        let txn = mock_transaction(vec![1, 2, 3]);
1542        network.submit_transaction(txn.clone()).await;
1543
1544        // Start consensus and wait for the transaction to be finalized.
1545        network.start().await;
1546
1547        // Now wait for at least one non-genesis block to be finalized.
1548        loop {
1549            let height = ds.block_height().await.unwrap();
1550            if height > 1 {
1551                break;
1552            }
1553            tracing::info!(height, "waiting for a block to be finalized");
1554            sleep(Duration::from_secs(1)).await;
1555        }
1556
1557        {
1558            // Check that the success rate has been updated. Note that we can only check if success
1559            // rate is positive. We don't know exactly what it is because we can't know how many
1560            // views have elapsed without race conditions.
1561            let success_rate = ds.success_rate().await.unwrap();
1562            assert!(success_rate.is_finite(), "{success_rate}");
1563            assert!(success_rate > 0.0, "{success_rate}");
1564        }
1565
1566        {
1567            // Shutting down the consensus to halt block production
1568            // Introducing a delay of 3 seconds to ensure that elapsed time since last block is atleast 3seconds
1569            network.shut_down().await;
1570            sleep(Duration::from_secs(3)).await;
1571            // Asserting that the elapsed time since the last block is at least 3 seconds
1572            assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1573        }
1574    }
1575}
1576
1577#[macro_export]
1578macro_rules! instantiate_data_source_tests {
1579    ($t:ty) => {
1580        use $crate::data_source::{
1581            availability_tests, node_tests, persistence_tests, status_tests,
1582        };
1583
1584        instantiate_availability_tests!($t);
1585        instantiate_persistence_tests!($t);
1586        instantiate_node_tests!($t);
1587        instantiate_status_tests!($t);
1588    };
1589}