hotshot_query_service/
data_source.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Persistent storage and sources of data consumed by APIs.
14//!
15//! The APIs provided by this query service are generic over the implementation which actually
16//! retrieves data in answer to queries. We call this implementation a _data source_. This module
17//! defines a data source and provides several pre-built implementations:
18//! * [`FileSystemDataSource`]
19//! * [`SqlDataSource`]
20//! * [`FetchingDataSource`], a generalization of the above
21//! * [`MetricsDataSource`]
22//!
23//! The user can choose which data source to use when initializing the query service.
24//!
25//! We also provide combinators for modularly adding functionality to existing data sources:
26//! * [`ExtensibleDataSource`]
27//!
28
29mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50    use std::ops::{Bound, RangeBounds};
51
52    use futures::{
53        future,
54        stream::{BoxStream, StreamExt},
55    };
56
57    use crate::{
58        availability::{BlockQueryData, Fetch, LeafQueryData},
59        node::NodeDataSource,
60        testing::{consensus::TestableDataSource, mocks::MockTypes},
61    };
62
63    /// Apply an upper bound to a range based on the currently available block height.
64    async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize> + use<R, D>
65    where
66        D: TestableDataSource,
67        R: RangeBounds<usize>,
68    {
69        let start = range.start_bound().cloned();
70        let mut end = range.end_bound().cloned();
71        if end == Bound::Unbounded {
72            end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73        }
74        (start, end)
75    }
76
77    /// Get a stream of blocks, implicitly terminating at the current block height.
78    pub async fn block_range<R, D>(
79        ds: &D,
80        range: R,
81    ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82    where
83        D: TestableDataSource,
84        R: RangeBounds<usize> + Send + 'static,
85    {
86        ds.get_block_range(bound_range(ds, range).await)
87            .await
88            .then(Fetch::resolve)
89            .boxed()
90    }
91
92    /// Get a stream of leaves, implicitly terminating at the current block height.
93    pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94    where
95        D: TestableDataSource,
96        R: RangeBounds<usize> + Send + 'static,
97    {
98        ds.get_leaf_range(bound_range(ds, range).await)
99            .await
100            .then(Fetch::resolve)
101            .boxed()
102    }
103
104    pub async fn get_non_empty_blocks<D>(
105        ds: &D,
106    ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107    where
108        D: TestableDataSource,
109    {
110        // Ignore the genesis block (start from height 1).
111        leaf_range(ds, 1..)
112            .await
113            .zip(block_range(ds, 1..).await)
114            .filter(|(_, block)| future::ready(!block.is_empty()))
115            .collect()
116            .await
117    }
118}
119
120/// Generic tests we can instantiate for all the availability data sources.
121#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124    use std::{
125        collections::HashMap,
126        fmt::Debug,
127        ops::{Bound, RangeBounds},
128    };
129
130    use committable::Committable;
131    use futures::stream::StreamExt;
132    use hotshot_types::{data::Leaf2, vote::HasViewNumber};
133
134    use super::test_helpers::*;
135    use crate::{
136        availability::{BlockId, payload_size},
137        data_source::storage::{AvailabilityStorage, NodeStorage},
138        node::NodeDataSource,
139        testing::{
140            consensus::{MockNetwork, TestableDataSource},
141            mocks::{MockTypes, mock_transaction},
142        },
143        types::HeightIndexed,
144    };
145
146    async fn validate<D: TestableDataSource>(ds: &D)
147    where
148        for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
149    {
150        // Check the consistency of every block/leaf pair. Keep track of payloads and transactions
151        // we've seen so we can detect duplicates.
152        let mut seen_payloads = HashMap::new();
153        let mut seen_transactions = HashMap::new();
154        let mut leaves = leaf_range(ds, ..).await.enumerate();
155        while let Some((i, leaf)) = leaves.next().await {
156            assert_eq!(leaf.height(), i as u64);
157            assert_eq!(
158                leaf.hash(),
159                <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
160            );
161
162            // Check indices.
163            tracing::info!("looking up leaf {i} various ways");
164            assert_eq!(leaf, ds.get_leaf(i).await.await);
165            assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
166
167            tracing::info!("looking up block {i} various ways");
168            let block = ds.get_block(i).await.await;
169            assert_eq!(leaf.block_hash(), block.hash());
170            assert_eq!(block.height(), i as u64);
171            assert_eq!(block.hash(), block.header().commit());
172            assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
173
174            // Check indices.
175            assert_eq!(block, ds.get_block(i).await.await);
176            assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
177            // We should be able to look up the block by payload hash unless its payload is a
178            // duplicate. For duplicate payloads, this function returns the index of the first
179            // duplicate.
180            //
181            // Note: this ordering is not a strict requirement. It should hold for payloads in local
182            // storage, but we don't have a good way of enforcing it if the payload is missing, in
183            // which case we will return the first matching payload we see, which could happen in
184            // any order. We use `try_resolve` to skip this check if the object isn't available
185            // locally.
186            let ix = seen_payloads
187                .entry(block.payload_hash())
188                .or_insert(i as u64);
189            if let Ok(block) = ds
190                .get_block(BlockId::PayloadHash(block.payload_hash()))
191                .await
192                .try_resolve()
193            {
194                assert_eq!(block.height(), *ix);
195            } else {
196                tracing::warn!(
197                    "skipping block by payload index check for missing payload {:?}",
198                    block.header()
199                );
200                // At least check that _some_ block can be fetched.
201                ds.get_block(BlockId::PayloadHash(block.payload_hash()))
202                    .await
203                    .await;
204            }
205
206            // Check payload lookup.
207            tracing::info!("looking up payload {i} various ways");
208            let expected_payload = block.clone().into();
209            assert_eq!(ds.get_payload(i).await.await, expected_payload);
210            assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
211            // Similar to the above, we can't guarantee which index we will get when passively
212            // fetching this payload, so only check the index if the payload is available locally.
213            if let Ok(payload) = ds
214                .get_payload(BlockId::PayloadHash(block.payload_hash()))
215                .await
216                .try_resolve()
217            {
218                if *ix == i as u64 {
219                    assert_eq!(payload, expected_payload);
220                }
221            } else {
222                tracing::warn!(
223                    "skipping payload index check for missing payload {:?}",
224                    block.header()
225                );
226                // At least check that _some_ payload can be fetched.
227                ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
228                    .await
229                    .await;
230            }
231
232            // Look up the common VID data.
233            tracing::info!("looking up VID common {i} various ways");
234            let common = ds.get_vid_common(block.height() as usize).await.await;
235            assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
236            // Similar to the above, we can't guarantee which index we will get when passively
237            // fetching this data, so only check the index if the data is available locally.
238            if let Ok(res) = ds
239                .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
240                .await
241                .try_resolve()
242            {
243                if *ix == i as u64 {
244                    assert_eq!(res, common);
245                }
246            } else {
247                tracing::warn!(
248                    "skipping VID common index check for missing data {:?}",
249                    block.header()
250                );
251                // At least check that _some_ data can be fetched.
252                let res = ds
253                    .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
254                    .await
255                    .await;
256                assert_eq!(res.payload_hash(), common.payload_hash());
257            }
258
259            for (j, txn) in block.enumerate() {
260                tracing::info!("looking up transaction {i},{j:?}");
261
262                // We should be able to look up the transaction by hash unless it is a duplicate.
263                // For duplicate transactions, this function returns the index of the first
264                // duplicate.
265                //
266                // Similar to the above, we can't guarantee which index we will get when passively
267                // fetching this transaction, so only check the index if the transaction is
268                // available locally.
269                let ix = seen_transactions
270                    .entry(txn.commit())
271                    .or_insert((i as u64, j.clone()));
272                if let Ok(tx_data) = ds
273                    .get_block_containing_transaction(txn.commit())
274                    .await
275                    .try_resolve()
276                {
277                    assert_eq!(tx_data.transaction.transaction(), &txn);
278                    assert_eq!(tx_data.transaction.block_height(), ix.0);
279                    assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
280                    assert_eq!(tx_data.index, ix.1);
281                    assert_eq!(tx_data.block, block);
282                } else {
283                    tracing::warn!(
284                        "skipping transaction index check for missing transaction {j:?} {txn:?}"
285                    );
286                    // At least check that _some_ transaction can be fetched.
287                    ds.get_block_containing_transaction(txn.commit())
288                        .await
289                        .await;
290                }
291            }
292        }
293
294        // Validate consistency of latest QC chain (only available after epoch upgrade).
295        {
296            let mut tx = ds.read().await.unwrap();
297            let block_height = NodeStorage::block_height(&mut tx).await.unwrap();
298            let last_leaf = tx.get_leaf((block_height - 1).into()).await.unwrap();
299
300            if last_leaf.qc().data.epoch.is_some() {
301                tracing::info!(block_height, "checking QC chain");
302                let qc_chain = tx.latest_qc_chain().await.unwrap().unwrap();
303
304                assert_eq!(last_leaf.height(), (block_height - 1) as u64);
305                assert_eq!(qc_chain[0].view_number(), last_leaf.leaf().view_number());
306                assert_eq!(qc_chain[0].leaf_commit(), last_leaf.hash());
307                assert_eq!(qc_chain[1].view_number(), qc_chain[0].view_number() + 1);
308            }
309        }
310    }
311
312    #[test_log::test(tokio::test(flavor = "multi_thread"))]
313    pub async fn test_update<D: TestableDataSource>()
314    where
315        for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
316    {
317        let mut network = MockNetwork::<D>::init().await;
318        let ds = network.data_source();
319
320        network.start().await;
321        assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
322
323        // Submit a few blocks and make sure each one gets reflected in the query service and
324        // preserves the consistency of the data and indices.
325        let mut blocks = ds.subscribe_blocks(0).await.enumerate();
326        for nonce in 0..3 {
327            let txn = mock_transaction(vec![nonce]);
328            network.submit_transaction(txn).await;
329
330            // Wait for the transaction to be finalized.
331            let (i, block) = loop {
332                tracing::info!("waiting for tx {nonce}");
333                let (i, block) = blocks.next().await.unwrap();
334                if !block.is_empty() {
335                    break (i, block);
336                }
337                tracing::info!("block {i} is empty");
338            };
339
340            tracing::info!("got tx {nonce} in block {i}");
341            assert_eq!(ds.get_block(i).await.await, block);
342            validate(&ds).await;
343        }
344
345        // Check that all the updates have been committed to storage, not simply held in memory: we
346        // should be able to read the same data if we connect an entirely new data source to the
347        // underlying storage.
348        {
349            tracing::info!("checking persisted storage");
350            let storage = D::connect(network.storage()).await;
351
352            // Ensure we have the same data in both data sources (if data was missing from the
353            // original it is of course allowed to be missing from persistent storage and thus from
354            // the latter).
355            let block_height = NodeDataSource::block_height(&ds).await.unwrap();
356            assert_eq!(
357                ds.get_block_range(..block_height)
358                    .await
359                    .map(|fetch| fetch.try_resolve().ok())
360                    .collect::<Vec<_>>()
361                    .await,
362                storage
363                    .get_block_range(..block_height)
364                    .await
365                    .map(|fetch| fetch.try_resolve().ok())
366                    .collect::<Vec<_>>()
367                    .await
368            );
369            assert_eq!(
370                ds.get_leaf_range(..block_height)
371                    .await
372                    .map(|fetch| fetch.try_resolve().ok())
373                    .collect::<Vec<_>>()
374                    .await,
375                storage
376                    .get_leaf_range(..block_height)
377                    .await
378                    .map(|fetch| fetch.try_resolve().ok())
379                    .collect::<Vec<_>>()
380                    .await
381            );
382        }
383    }
384
385    #[test_log::test(tokio::test(flavor = "multi_thread"))]
386    pub async fn test_range<D: TestableDataSource>()
387    where
388        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
389    {
390        let mut network = MockNetwork::<D>::init().await;
391        let ds = network.data_source();
392        network.start().await;
393
394        // Wait for there to be at least 3 blocks.
395        let block_height = loop {
396            let mut tx = ds.read().await.unwrap();
397            let block_height = tx.block_height().await.unwrap();
398            if block_height >= 3 {
399                break block_height as u64;
400            }
401        };
402
403        // Query for a variety of ranges testing all cases of included, excluded, and unbounded
404        // starting and ending bounds
405        do_range_test(&ds, 1..=2, 1..3).await; // (inclusive, inclusive)
406        do_range_test(&ds, 1..3, 1..3).await; // (inclusive, exclusive)
407        do_range_test(&ds, 1.., 1..block_height).await; // (inclusive, unbounded)
408        do_range_test(&ds, ..=2, 0..3).await; // (unbounded, inclusive)
409        do_range_test(&ds, ..3, 0..3).await; // (unbounded, exclusive)
410        do_range_test(&ds, .., 0..block_height).await; // (unbounded, unbounded)
411        do_range_test(&ds, ExRange(0..=2), 1..3).await; // (exclusive, inclusive)
412        do_range_test(&ds, ExRange(0..3), 1..3).await; // (exclusive, exclusive)
413        do_range_test(&ds, ExRange(0..), 1..block_height).await; // (exclusive, unbounded)
414    }
415
416    async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
417    where
418        D: TestableDataSource,
419        R: RangeBounds<usize> + Clone + Debug + Send + 'static,
420        I: IntoIterator<Item = u64>,
421    {
422        tracing::info!("testing range {range:?}");
423
424        let mut leaves = ds.get_leaf_range(range.clone()).await;
425        let mut blocks = ds.get_block_range(range.clone()).await;
426        let mut payloads = ds.get_payload_range(range.clone()).await;
427        let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
428        let mut vid_common = ds.get_vid_common_range(range.clone()).await;
429        let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
430
431        for i in expected_indices {
432            tracing::info!(i, "check entries");
433            let leaf = leaves.next().await.unwrap().await;
434            let block = blocks.next().await.unwrap().await;
435            let payload = payloads.next().await.unwrap().await;
436            let payload_meta = payloads_meta.next().await.unwrap().await;
437            let common = vid_common.next().await.unwrap().await;
438            let common_meta = vid_common_meta.next().await.unwrap().await;
439            assert_eq!(leaf.height(), i);
440            assert_eq!(block.height(), i);
441            assert_eq!(payload, ds.get_payload(i as usize).await.await);
442            assert_eq!(payload_meta, block.into());
443            assert_eq!(common, ds.get_vid_common(i as usize).await.await);
444            assert_eq!(common_meta, common.into());
445        }
446
447        if range.end_bound() == Bound::Unbounded {
448            // If the range is unbounded, the stream should continue, eventually reaching a point at
449            // which further objects are not yet available, and yielding pending futures from there.
450            loop {
451                let fetch_leaf = leaves.next().await.unwrap();
452                let fetch_block = blocks.next().await.unwrap();
453                let fetch_payload = payloads.next().await.unwrap();
454                let fetch_payload_meta = payloads_meta.next().await.unwrap();
455                let fetch_common = vid_common.next().await.unwrap();
456                let fetch_common_meta = vid_common_meta.next().await.unwrap();
457
458                if fetch_leaf.try_resolve().is_ok()
459                    && fetch_block.try_resolve().is_ok()
460                    && fetch_payload.try_resolve().is_ok()
461                    && fetch_payload_meta.try_resolve().is_ok()
462                    && fetch_common.try_resolve().is_ok()
463                    && fetch_common_meta.try_resolve().is_ok()
464                {
465                    tracing::info!("searching for end of available objects");
466                } else {
467                    break;
468                }
469            }
470        } else {
471            // If the range is bounded, it should end where expected.
472            assert!(leaves.next().await.is_none());
473            assert!(blocks.next().await.is_none());
474            assert!(payloads.next().await.is_none());
475            assert!(payloads_meta.next().await.is_none());
476            assert!(vid_common.next().await.is_none());
477            assert!(vid_common_meta.next().await.is_none());
478        }
479    }
480
481    #[test_log::test(tokio::test(flavor = "multi_thread"))]
482    pub async fn test_range_rev<D: TestableDataSource>()
483    where
484        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
485    {
486        let mut network = MockNetwork::<D>::init().await;
487        let ds = network.data_source();
488        network.start().await;
489
490        // Wait for there to be at least 5 blocks.
491        ds.subscribe_leaves(5).await.next().await.unwrap();
492
493        // Test inclusive, exclusive and unbounded lower bound.
494        do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
495        do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
496        do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
497    }
498
499    async fn do_range_rev_test<D>(
500        ds: &D,
501        start: Bound<usize>,
502        end: usize,
503        expected_indices: impl DoubleEndedIterator<Item = u64>,
504    ) where
505        D: TestableDataSource,
506    {
507        tracing::info!("testing range {start:?}-{end}");
508
509        let mut leaves = ds.get_leaf_range_rev(start, end).await;
510        let mut blocks = ds.get_block_range_rev(start, end).await;
511        let mut payloads = ds.get_payload_range_rev(start, end).await;
512        let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
513        let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
514        let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
515
516        for i in expected_indices.rev() {
517            tracing::info!(i, "check entries");
518            let leaf = leaves.next().await.unwrap().await;
519            let block = blocks.next().await.unwrap().await;
520            let payload = payloads.next().await.unwrap().await;
521            let payload_meta = payloads_meta.next().await.unwrap().await;
522            let common = vid_common.next().await.unwrap().await;
523            let common_meta = vid_common_meta.next().await.unwrap().await;
524            assert_eq!(leaf.height(), i);
525            assert_eq!(block.height(), i);
526            assert_eq!(payload.height(), i);
527            assert_eq!(payload_meta.height(), i);
528            assert_eq!(common, ds.get_vid_common(i as usize).await.await);
529            assert_eq!(
530                common_meta,
531                ds.get_vid_common_metadata(i as usize).await.await
532            );
533        }
534
535        // The range should end where expected.
536        assert!(leaves.next().await.is_none());
537        assert!(blocks.next().await.is_none());
538        assert!(payloads.next().await.is_none());
539        assert!(payloads_meta.next().await.is_none());
540        assert!(vid_common.next().await.is_none());
541        assert!(vid_common_meta.next().await.is_none());
542    }
543
544    // A wrapper around a range that turns the lower bound from inclusive to exclusive.
545    #[derive(Clone, Copy, Debug)]
546    struct ExRange<R>(R);
547
548    impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
549        fn start_bound(&self) -> Bound<&usize> {
550            match self.0.start_bound() {
551                Bound::Included(x) => Bound::Excluded(x),
552                Bound::Excluded(x) => Bound::Excluded(x),
553                Bound::Unbounded => Bound::Excluded(&0),
554            }
555        }
556
557        fn end_bound(&self) -> Bound<&usize> {
558            self.0.end_bound()
559        }
560    }
561}
562
563/// Generic tests we can instantiate for any data source with reliable, versioned persistent storage.
564#[cfg(any(test, feature = "testing"))]
565#[espresso_macros::generic_tests]
566pub mod persistence_tests {
567    use committable::Committable;
568    use hotshot_example_types::{
569        node_types::TEST_VERSIONS,
570        state_types::{TestInstanceState, TestValidatedState},
571    };
572    use hotshot_types::simple_certificate::QuorumCertificate2;
573
574    use crate::{
575        Leaf2,
576        availability::{BlockQueryData, LeafQueryData},
577        data_source::{
578            Transaction,
579            storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
580        },
581        node::NodeDataSource,
582        testing::{
583            consensus::TestableDataSource,
584            mocks::{MockPayload, MockTypes},
585        },
586        types::HeightIndexed,
587    };
588
589    #[test_log::test(tokio::test(flavor = "multi_thread"))]
590    pub async fn test_revert<D: TestableDataSource>()
591    where
592        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
593            + AvailabilityStorage<MockTypes>
594            + NodeStorage<MockTypes>,
595    {
596        let storage = D::create(0).await;
597        let ds = D::connect(&storage).await;
598
599        // Mock up some consensus data.
600        let mut qc = QuorumCertificate2::<MockTypes>::genesis(
601            &TestValidatedState::default(),
602            &TestInstanceState::default(),
603            TEST_VERSIONS.test,
604        )
605        .await;
606        let mut leaf = Leaf2::<MockTypes>::genesis(
607            &TestValidatedState::default(),
608            &TestInstanceState::default(),
609            TEST_VERSIONS.test.base,
610        )
611        .await;
612        // Increment the block number, to distinguish this block from the genesis block, which
613        // already exists.
614        leaf.block_header_mut().block_number += 1;
615        qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
616
617        let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
618        let leaf = LeafQueryData::new(leaf, qc).unwrap();
619
620        // Insert, but do not commit, some data and check that we can read it back.
621        let mut tx = ds.write().await.unwrap();
622        tx.insert_leaf(leaf.clone()).await.unwrap();
623        tx.insert_block(block.clone()).await.unwrap();
624
625        assert_eq!(tx.block_height().await.unwrap(), 2);
626        assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
627        assert_eq!(block, tx.get_block(1.into()).await.unwrap());
628
629        // Revert the changes.
630        tx.revert().await;
631        assert_eq!(
632            NodeDataSource::<MockTypes>::block_height(&ds)
633                .await
634                .unwrap(),
635            0
636        );
637        ds.get_leaf(1).await.try_resolve().unwrap_err();
638        ds.get_block(1).await.try_resolve().unwrap_err();
639    }
640
641    #[test_log::test(tokio::test(flavor = "multi_thread"))]
642    pub async fn test_reset<D: TestableDataSource>()
643    where
644        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
645    {
646        let storage = D::create(0).await;
647        let ds = D::connect(&storage).await;
648
649        // Mock up some consensus data.
650        let mut qc = QuorumCertificate2::<MockTypes>::genesis(
651            &TestValidatedState::default(),
652            &TestInstanceState::default(),
653            TEST_VERSIONS.test,
654        )
655        .await;
656        let mut leaf = Leaf2::<MockTypes>::genesis(
657            &TestValidatedState::default(),
658            &TestInstanceState::default(),
659            TEST_VERSIONS.test.base,
660        )
661        .await;
662        // Increment the block number, to distinguish this block from the genesis block, which
663        // already exists.
664        leaf.block_header_mut().block_number += 1;
665        qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
666
667        let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
668        let leaf = LeafQueryData::new(leaf, qc).unwrap();
669
670        // Insert some data and check that we can read it back.
671        let mut tx = ds.write().await.unwrap();
672        tx.insert_leaf(leaf.clone()).await.unwrap();
673        tx.insert_block(block.clone()).await.unwrap();
674        tx.commit().await.unwrap();
675
676        assert_eq!(
677            NodeDataSource::<MockTypes>::block_height(&ds)
678                .await
679                .unwrap(),
680            2
681        );
682        assert_eq!(leaf, ds.get_leaf(1).await.await);
683        assert_eq!(block, ds.get_block(1).await.await);
684
685        drop(ds);
686
687        // Reset and check that the changes are gone.
688        let ds = D::reset(&storage).await;
689        assert_eq!(
690            NodeDataSource::<MockTypes>::block_height(&ds)
691                .await
692                .unwrap(),
693            0
694        );
695        ds.get_leaf(1).await.try_resolve().unwrap_err();
696        ds.get_block(1).await.try_resolve().unwrap_err();
697    }
698
699    #[test_log::test(tokio::test(flavor = "multi_thread"))]
700    pub async fn test_drop_tx<D: TestableDataSource>()
701    where
702        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
703            + AvailabilityStorage<MockTypes>
704            + NodeStorage<MockTypes>,
705        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
706    {
707        let storage = D::create(0).await;
708        let ds = D::connect(&storage).await;
709
710        // Mock up some consensus data.
711        let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis(
712            &TestValidatedState::default(),
713            &TestInstanceState::default(),
714            TEST_VERSIONS.test,
715        )
716        .await;
717        let mut mock_leaf = Leaf2::<MockTypes>::genesis(
718            &TestValidatedState::default(),
719            &TestInstanceState::default(),
720            TEST_VERSIONS.test.base,
721        )
722        .await;
723        // Increment the block number, to distinguish this block from the genesis block, which
724        // already exists.
725        mock_leaf.block_header_mut().block_number += 1;
726        mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
727
728        let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
729        let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
730
731        // Insert, but do not commit, some data and check that we can read it back.
732        tracing::info!("write");
733        let mut tx = ds.write().await.unwrap();
734        tx.insert_leaf(leaf.clone()).await.unwrap();
735        tx.insert_block(block.clone()).await.unwrap();
736
737        assert_eq!(tx.block_height().await.unwrap(), 2);
738        assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
739        assert_eq!(block, tx.get_block(1.into()).await.unwrap());
740
741        // Drop the transaction, causing a revert.
742        drop(tx);
743
744        // Open a new transaction and check that the changes are reverted.
745        tracing::info!("read");
746        let mut tx = ds.read().await.unwrap();
747        assert_eq!(tx.block_height().await.unwrap(), 0);
748        drop(tx);
749
750        // Get a mutable transaction again, insert different data.
751        mock_leaf.block_header_mut().block_number += 1;
752        mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
753        let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
754        let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
755
756        tracing::info!("write again");
757        let mut tx = ds.write().await.unwrap();
758        tx.insert_leaf(leaf.clone()).await.unwrap();
759        tx.insert_block(block.clone()).await.unwrap();
760        tx.commit().await.unwrap();
761
762        // Read the data back. We should have _only_ the data that was written in the final
763        // transaction.
764        tracing::info!("read again");
765        let height = leaf.height() as usize;
766        assert_eq!(
767            NodeDataSource::<MockTypes>::block_height(&ds)
768                .await
769                .unwrap(),
770            height + 1
771        );
772        assert_eq!(leaf, ds.get_leaf(height).await.await);
773        assert_eq!(block, ds.get_block(height).await.await);
774        ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
775        ds.get_block(height - 1).await.try_resolve().unwrap_err();
776    }
777}
778
779/// Generic tests we can instantiate for all the node data sources.
780#[cfg(any(test, feature = "testing"))]
781#[espresso_macros::generic_tests]
782pub mod node_tests {
783    use std::time::Duration;
784
785    use committable::Committable;
786    use futures::{future::join_all, stream::StreamExt};
787    use hotshot::traits::BlockPayload;
788    use hotshot_example_types::{
789        block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
790        node_types::{TEST_VERSIONS, TestTypes},
791        state_types::{TestInstanceState, TestValidatedState},
792    };
793    use hotshot_types::{
794        data::{VidCommitment, VidCommon, VidShare, ViewNumber, vid_commitment},
795        simple_certificate::{CertificatePair, QuorumCertificate2},
796        traits::block_contents::{BlockHeader, EncodeBytes},
797        vid::advz::{ADVZScheme, advz_scheme},
798    };
799    use jf_advz::VidScheme;
800    use pretty_assertions::assert_eq;
801
802    use crate::{
803        Header, Leaf2,
804        availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
805        data_source::{
806            storage::{NodeStorage, UpdateAvailabilityStorage},
807            update::Transaction,
808        },
809        node::{
810            BlockId, NodeDataSource, ResourceSyncStatus, SyncStatus, SyncStatusQueryData,
811            SyncStatusRange, TimeWindowQueryData, WindowStart,
812        },
813        testing::{
814            consensus::{MockNetwork, TestableDataSource},
815            mocks::{MockPayload, MockTypes, mock_transaction},
816            sleep,
817        },
818        types::HeightIndexed,
819    };
820
821    fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
822        <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
823    }
824
825    #[test_log::test(tokio::test(flavor = "multi_thread"))]
826    pub async fn test_sync_status<D: TestableDataSource>()
827    where
828        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
829    {
830        let storage = D::create(0).await;
831        let ds = D::connect(&storage).await;
832
833        // Set up a mock VID scheme to use for generating test data.
834        let mut vid = advz_scheme(2);
835
836        // Generate some mock leaves and blocks to insert.
837        let mut leaves = vec![
838            LeafQueryData::<MockTypes>::genesis(
839                &TestValidatedState::default(),
840                &TestInstanceState::default(),
841                TEST_VERSIONS.test,
842            )
843            .await,
844        ];
845        let mut blocks = vec![
846            BlockQueryData::<MockTypes>::genesis(
847                &TestValidatedState::default(),
848                &TestInstanceState::default(),
849                TEST_VERSIONS.test.base,
850            )
851            .await,
852        ];
853        let dispersal = vid.disperse([]).unwrap();
854        let mut vid_data = vec![(
855            VidCommonQueryData::new(
856                leaves[0].header().clone(),
857                VidCommon::V0(dispersal.common.clone()),
858            ),
859            dispersal.shares[0].clone(),
860        )];
861        for i in 0..2 {
862            // Generate a unique payload and VID data, so that missing data is actually missing
863            // (otherwise it could be borrowed from another block).
864            let (payload, metadata) = <MockPayload as BlockPayload<MockTypes>>::from_transactions(
865                vec![mock_transaction(vec![i as u8])],
866                &Default::default(),
867                &Default::default(),
868            )
869            .await
870            .unwrap();
871            let dispersal = vid.disperse(payload.encode()).unwrap();
872
873            let mut leaf = leaves[i].clone();
874            leaf.leaf.block_header_mut().block_number += 1;
875            leaf.leaf.block_header_mut().payload_commitment = VidCommitment::V0(dispersal.commit);
876            leaf.leaf.block_header_mut().metadata = metadata;
877            let block = BlockQueryData::new(leaf.header().clone(), payload);
878            let vid_common = VidCommonQueryData::new(
879                leaf.header().clone(),
880                VidCommon::V0(dispersal.common.clone()),
881            );
882            let vid_share = dispersal.shares[0].clone();
883
884            leaves.push(leaf);
885            blocks.push(block);
886            vid_data.push((vid_common, vid_share));
887        }
888
889        // At first, the node is fully synced.
890        assert!(ds.sync_status().await.unwrap().is_fully_synced());
891
892        // Insert a leaf without the corresponding block or VID info, make sure we detect that the
893        // block and VID info are missing.
894        ds.append(leaves[0].clone().into()).await.unwrap();
895        assert_eq!(
896            ds.sync_status().await.unwrap(),
897            SyncStatusQueryData {
898                blocks: ResourceSyncStatus {
899                    missing: 1,
900                    ranges: vec![SyncStatusRange {
901                        start: 0,
902                        end: 1,
903                        status: SyncStatus::Missing,
904                    }]
905                },
906                vid_common: ResourceSyncStatus {
907                    missing: 1,
908                    ranges: vec![SyncStatusRange {
909                        start: 0,
910                        end: 1,
911                        status: SyncStatus::Missing,
912                    }]
913                },
914                vid_shares: ResourceSyncStatus {
915                    missing: 1,
916                    ranges: vec![SyncStatusRange {
917                        start: 0,
918                        end: 1,
919                        status: SyncStatus::Missing,
920                    }]
921                },
922                leaves: ResourceSyncStatus {
923                    missing: 0,
924                    ranges: vec![SyncStatusRange {
925                        start: 0,
926                        end: 1,
927                        status: SyncStatus::Present,
928                    }]
929                },
930                pruned_height: None,
931            }
932        );
933
934        // Insert a leaf whose height is not the successor of the previous leaf. We should now
935        // detect that the leaf in between is missing (along with all _three_ corresponding blocks).
936        ds.append(leaves[2].clone().into()).await.unwrap();
937        assert_eq!(
938            ds.sync_status().await.unwrap(),
939            SyncStatusQueryData {
940                blocks: ResourceSyncStatus {
941                    missing: 3,
942                    ranges: vec![SyncStatusRange {
943                        start: 0,
944                        end: 3,
945                        status: SyncStatus::Missing,
946                    }]
947                },
948                vid_common: ResourceSyncStatus {
949                    missing: 3,
950                    ranges: vec![SyncStatusRange {
951                        start: 0,
952                        end: 3,
953                        status: SyncStatus::Missing,
954                    }]
955                },
956                vid_shares: ResourceSyncStatus {
957                    missing: 3,
958                    ranges: vec![SyncStatusRange {
959                        start: 0,
960                        end: 3,
961                        status: SyncStatus::Missing,
962                    }]
963                },
964                leaves: ResourceSyncStatus {
965                    missing: 1,
966                    ranges: vec![
967                        SyncStatusRange {
968                            start: 0,
969                            end: 1,
970                            status: SyncStatus::Present,
971                        },
972                        SyncStatusRange {
973                            start: 1,
974                            end: 2,
975                            status: SyncStatus::Missing,
976                        },
977                        SyncStatusRange {
978                            start: 2,
979                            end: 3,
980                            status: SyncStatus::Present,
981                        }
982                    ]
983                },
984                pruned_height: None,
985            }
986        );
987
988        // Insert VID common without a corresponding share.
989        {
990            let mut tx = ds.write().await.unwrap();
991            tx.insert_vid(vid_data[0].0.clone(), None).await.unwrap();
992            tx.commit().await.unwrap();
993        }
994        assert_eq!(
995            ds.sync_status().await.unwrap(),
996            SyncStatusQueryData {
997                blocks: ResourceSyncStatus {
998                    missing: 3,
999                    ranges: vec![SyncStatusRange {
1000                        start: 0,
1001                        end: 3,
1002                        status: SyncStatus::Missing,
1003                    }]
1004                },
1005                vid_common: ResourceSyncStatus {
1006                    missing: 2,
1007                    ranges: vec![
1008                        SyncStatusRange {
1009                            start: 0,
1010                            end: 1,
1011                            status: SyncStatus::Present,
1012                        },
1013                        SyncStatusRange {
1014                            start: 1,
1015                            end: 3,
1016                            status: SyncStatus::Missing,
1017                        },
1018                    ]
1019                },
1020                vid_shares: ResourceSyncStatus {
1021                    missing: 3,
1022                    ranges: vec![SyncStatusRange {
1023                        start: 0,
1024                        end: 3,
1025                        status: SyncStatus::Missing,
1026                    }]
1027                },
1028                leaves: ResourceSyncStatus {
1029                    missing: 1,
1030                    ranges: vec![
1031                        SyncStatusRange {
1032                            start: 0,
1033                            end: 1,
1034                            status: SyncStatus::Present,
1035                        },
1036                        SyncStatusRange {
1037                            start: 1,
1038                            end: 2,
1039                            status: SyncStatus::Missing,
1040                        },
1041                        SyncStatusRange {
1042                            start: 2,
1043                            end: 3,
1044                            status: SyncStatus::Present,
1045                        }
1046                    ]
1047                },
1048                pruned_height: None,
1049            }
1050        );
1051
1052        // Rectify the missing data.
1053        {
1054            let mut tx = ds.write().await.unwrap();
1055            tx.insert_block(blocks[0].clone()).await.unwrap();
1056            tx.insert_vid(
1057                vid_data[0].0.clone(),
1058                Some(VidShare::V0(vid_data[0].1.clone())),
1059            )
1060            .await
1061            .unwrap();
1062            tx.insert_leaf(leaves[1].clone()).await.unwrap();
1063            tx.insert_block(blocks[1].clone()).await.unwrap();
1064            tx.insert_vid(
1065                vid_data[1].0.clone(),
1066                Some(VidShare::V0(vid_data[1].1.clone())),
1067            )
1068            .await
1069            .unwrap();
1070            tx.insert_block(blocks[2].clone()).await.unwrap();
1071            tx.insert_vid(
1072                vid_data[2].0.clone(),
1073                Some(VidShare::V0(vid_data[2].1.clone())),
1074            )
1075            .await
1076            .unwrap();
1077            tx.commit().await.unwrap();
1078        }
1079
1080        // Some data sources (e.g. file system) don't support out-of-order insertion of missing
1081        // data. These would have just ignored the insertion of `vid[0]` (the share) and
1082        // `leaves[1]`. Detect if this is the case; then we allow 1 missing leaf and 1 missing VID
1083        // share.
1084        let (leaves, vid_shares) = if ds.get_leaf(1).await.try_resolve().is_err() {
1085            tracing::warn!(
1086                "data source does not support out-of-order filling, allowing one missing leaf and \
1087                 VID share"
1088            );
1089            (
1090                ResourceSyncStatus {
1091                    missing: 1,
1092                    ranges: vec![
1093                        SyncStatusRange {
1094                            start: 0,
1095                            end: 1,
1096                            status: SyncStatus::Present,
1097                        },
1098                        SyncStatusRange {
1099                            start: 1,
1100                            end: 2,
1101                            status: SyncStatus::Missing,
1102                        },
1103                        SyncStatusRange {
1104                            start: 2,
1105                            end: 3,
1106                            status: SyncStatus::Present,
1107                        },
1108                    ],
1109                },
1110                ResourceSyncStatus {
1111                    missing: 1,
1112                    ranges: vec![
1113                        SyncStatusRange {
1114                            start: 0,
1115                            end: 1,
1116                            status: SyncStatus::Missing,
1117                        },
1118                        SyncStatusRange {
1119                            start: 1,
1120                            end: 3,
1121                            status: SyncStatus::Present,
1122                        },
1123                    ],
1124                },
1125            )
1126        } else {
1127            (
1128                ResourceSyncStatus {
1129                    missing: 0,
1130                    ranges: vec![SyncStatusRange {
1131                        start: 0,
1132                        end: 3,
1133                        status: SyncStatus::Present,
1134                    }],
1135                },
1136                ResourceSyncStatus {
1137                    missing: 0,
1138                    ranges: vec![SyncStatusRange {
1139                        start: 0,
1140                        end: 3,
1141                        status: SyncStatus::Present,
1142                    }],
1143                },
1144            )
1145        };
1146        let expected_sync_status = SyncStatusQueryData {
1147            leaves,
1148            vid_shares,
1149            blocks: ResourceSyncStatus {
1150                missing: 0,
1151                ranges: vec![SyncStatusRange {
1152                    start: 0,
1153                    end: 3,
1154                    status: SyncStatus::Present,
1155                }],
1156            },
1157            vid_common: ResourceSyncStatus {
1158                missing: 0,
1159                ranges: vec![SyncStatusRange {
1160                    start: 0,
1161                    end: 3,
1162                    status: SyncStatus::Present,
1163                }],
1164            },
1165            pruned_height: None,
1166        };
1167        assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
1168
1169        // If we re-insert one of the VID entries without a share, it should not overwrite the share
1170        // that we already have; that is, `insert_vid` should be monotonic.
1171        {
1172            let mut tx = ds.write().await.unwrap();
1173            tx.insert_vid(vid_data[0].0.clone(), None).await.unwrap();
1174            tx.commit().await.unwrap();
1175        }
1176        assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
1177    }
1178
1179    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1180    pub async fn test_counters<D: TestableDataSource>() {
1181        let storage = D::create(0).await;
1182        let ds = D::connect(&storage).await;
1183
1184        assert_eq!(ds.count_transactions().await.unwrap(), 0);
1185        assert_eq!(ds.payload_size().await.unwrap(), 0);
1186
1187        // Insert some transactions.
1188        let mut total_transactions = 0;
1189        let mut total_size = 0;
1190        'outer: for i in [0, 1, 2] {
1191            // Using `i % 2` as the transaction data ensures we insert a duplicate transaction
1192            // (since we insert more than 2 transactions total). The query service should still
1193            // count these as separate transactions and should include both duplicates when
1194            // computing the total size.
1195            let (payload, metadata) =
1196                <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
1197                    [mock_transaction(vec![i as u8 % 2])],
1198                    &TestValidatedState::default(),
1199                    &TestInstanceState::default(),
1200                )
1201                .await
1202                .unwrap();
1203            let encoded = payload.encode();
1204            let payload_commitment =
1205                vid_commitment(&encoded, &metadata.encode(), 1, TEST_VERSIONS.test.base);
1206            let header = TestBlockHeader {
1207                block_number: i,
1208                payload_commitment,
1209                timestamp: i,
1210                timestamp_millis: i * 1_000,
1211                builder_commitment:
1212                    <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
1213                        &payload, &metadata,
1214                    ),
1215                metadata: TestMetadata {
1216                    num_transactions: 7, // arbitrary
1217                },
1218                random: 1, // arbitrary
1219                version: TEST_VERSIONS.test.base,
1220            };
1221
1222            let mut leaf = LeafQueryData::<MockTypes>::genesis(
1223                &TestValidatedState::default(),
1224                &TestInstanceState::default(),
1225                TEST_VERSIONS.test,
1226            )
1227            .await;
1228            *leaf.leaf.block_header_mut() = header.clone();
1229            let block = BlockQueryData::new(header, payload);
1230            ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1231                .await
1232                .unwrap();
1233            assert_eq!(
1234                NodeDataSource::<MockTypes>::block_height(&ds)
1235                    .await
1236                    .unwrap(),
1237                (i + 1) as usize,
1238            );
1239
1240            total_transactions += 1;
1241            total_size += encoded.len();
1242
1243            // Allow some time for the aggregator to update.
1244            for retry in 0..5 {
1245                let ds_transactions = ds.count_transactions().await.unwrap();
1246                let ds_payload_size = ds.payload_size().await.unwrap();
1247                if ds_transactions != total_transactions || ds_payload_size != total_size {
1248                    tracing::info!(
1249                        i,
1250                        retry,
1251                        total_transactions,
1252                        ds_transactions,
1253                        total_size,
1254                        ds_payload_size,
1255                        "waiting for statistics to update"
1256                    );
1257                    sleep(Duration::from_secs(1)).await;
1258                } else {
1259                    continue 'outer;
1260                }
1261            }
1262            panic!("counters did not update in time");
1263        }
1264    }
1265
1266    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1267    pub async fn test_vid_shares<D: TestableDataSource>()
1268    where
1269        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1270    {
1271        let mut network = MockNetwork::<D>::init().await;
1272        let ds = network.data_source();
1273
1274        network.start().await;
1275
1276        // Check VID shares for a few blocks.
1277        let mut leaves = ds.subscribe_leaves(0).await.take(3);
1278        while let Some(leaf) = leaves.next().await {
1279            tracing::info!("got leaf {}", leaf.height());
1280            let mut tx = ds.read().await.unwrap();
1281            let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1282            assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1283            assert_eq!(
1284                share,
1285                tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1286                    .await
1287                    .unwrap()
1288            );
1289        }
1290    }
1291
1292    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1293    pub async fn test_vid_monotonicity<D: TestableDataSource>()
1294    where
1295        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1296        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1297    {
1298        let storage = D::create(0).await;
1299        let ds = D::connect(&storage).await;
1300
1301        // Generate some test VID data.
1302        let mut vid = advz_scheme(2);
1303        let disperse = vid.disperse([]).unwrap();
1304
1305        // Insert test data with VID common and a share.
1306        let leaf = LeafQueryData::<MockTypes>::genesis(
1307            &TestValidatedState::default(),
1308            &TestInstanceState::default(),
1309            TEST_VERSIONS.test,
1310        )
1311        .await;
1312        let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1313        ds.append(BlockInfo::new(
1314            leaf,
1315            None,
1316            Some(common.clone()),
1317            Some(VidShare::V0(disperse.shares[0].clone())),
1318        ))
1319        .await
1320        .unwrap();
1321
1322        {
1323            assert_eq!(ds.get_vid_common(0).await.await, common);
1324            assert_eq!(
1325                ds.vid_share(0).await.unwrap(),
1326                VidShare::V0(disperse.shares[0].clone())
1327            );
1328        }
1329
1330        // Re-insert the common data, without a share. This should not overwrite the share we
1331        // already have.
1332        {
1333            let mut tx = ds.write().await.unwrap();
1334            tx.insert_vid(common.clone(), None).await.unwrap();
1335            tx.commit().await.unwrap();
1336        }
1337        {
1338            assert_eq!(ds.get_vid_common(0).await.await, common);
1339            assert_eq!(
1340                ds.vid_share(0).await.unwrap(),
1341                VidShare::V0(disperse.shares[0].clone())
1342            );
1343        }
1344    }
1345
1346    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1347    pub async fn test_vid_recovery<D: TestableDataSource>()
1348    where
1349        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1350    {
1351        let mut network = MockNetwork::<D>::init().await;
1352        let ds = network.data_source();
1353
1354        network.start().await;
1355
1356        // Submit a transaction so we can try to recover a non-empty block.
1357        let mut blocks = ds.subscribe_blocks(0).await;
1358        let txn = mock_transaction(vec![1, 2, 3]);
1359        network.submit_transaction(txn.clone()).await;
1360
1361        // Wait for the transaction to be finalized.
1362        let block = loop {
1363            tracing::info!("waiting for transaction");
1364            let block = blocks.next().await.unwrap();
1365            if !block.is_empty() {
1366                tracing::info!(height = block.height(), "transaction sequenced");
1367                break block;
1368            }
1369            tracing::info!(height = block.height(), "empty block");
1370        };
1371        let height = block.height() as usize;
1372        let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1373            commit
1374        } else {
1375            panic!("expect ADVZ commitment")
1376        };
1377
1378        // Set up a test VID scheme.
1379        let vid = advz_scheme(network.num_nodes());
1380
1381        // Get VID common data and verify it.
1382        tracing::info!("fetching common data");
1383        let common = ds.get_vid_common(height).await.await;
1384        let VidCommon::V0(common) = &common.common() else {
1385            panic!("expect ADVZ common");
1386        };
1387        ADVZScheme::is_consistent(&commit, common).unwrap();
1388
1389        // Collect shares from each node.
1390        tracing::info!("fetching shares");
1391        let network = &network;
1392        let vid = &vid;
1393        let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1394            let ds = network.data_source_index(i);
1395
1396            // Wait until the node has processed up to the desired block; since we have thus far
1397            // only interacted with node 0, it is possible other nodes are slightly behind.
1398            let mut leaves = ds.subscribe_leaves(height).await;
1399            let leaf = leaves.next().await.unwrap();
1400            assert_eq!(leaf.height(), height as u64);
1401            assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1402
1403            let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1404                share
1405            } else {
1406                panic!("expect ADVZ share")
1407            };
1408            vid.verify_share(&share, common, &commit).unwrap().unwrap();
1409            share
1410        }))
1411        .await;
1412
1413        // Recover payload.
1414        tracing::info!("recovering payload");
1415        let bytes = vid.recover_payload(&shares, common).unwrap();
1416        let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1417            &bytes,
1418            &TestMetadata {
1419                num_transactions: 7, // arbitrary
1420            },
1421        );
1422        assert_eq!(recovered, *block.payload());
1423        assert_eq!(recovered.transactions, vec![txn]);
1424    }
1425
1426    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1427    pub async fn test_timestamp_window<D: TestableDataSource>() {
1428        let mut network = MockNetwork::<D>::init().await;
1429        let ds = network.data_source();
1430
1431        network.start().await;
1432
1433        // Wait for blocks with at least three different timestamps to be sequenced. This lets us
1434        // test all the edge cases.
1435        let mut leaves = ds.subscribe_leaves(0).await;
1436        // `test_blocks` is a list of lists of headers with the same timestamp. The flattened list
1437        // of headers is contiguous.
1438        let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1439        while test_blocks.len() < 3 {
1440            // Wait for the next block to be sequenced.
1441            let leaf = leaves.next().await.unwrap();
1442            let header = leaf.header().clone();
1443            if let Some(last_timestamp) = test_blocks.last_mut() {
1444                if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1445                    == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1446                {
1447                    last_timestamp.push(header);
1448                } else {
1449                    test_blocks.push(vec![header]);
1450                }
1451            } else {
1452                test_blocks.push(vec![header]);
1453            }
1454        }
1455        tracing::info!("blocks for testing: {test_blocks:#?}");
1456
1457        // Define invariants that every response should satisfy.
1458        let check_invariants =
1459            |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1460                let mut prev = res.prev.as_ref();
1461                if let Some(prev) = prev {
1462                    if check_prev {
1463                        assert!(block_header_timestamp(prev) < start);
1464                    }
1465                } else {
1466                    // `prev` can only be `None` if the first block in the window is the genesis
1467                    // block.
1468                    assert_eq!(res.from().unwrap(), 0);
1469                };
1470                for header in &res.window {
1471                    assert!(start <= block_header_timestamp(header));
1472                    assert!(block_header_timestamp(header) < end);
1473                    if let Some(prev) = prev {
1474                        assert!(
1475                            <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1476                                <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1477                        );
1478                    }
1479                    prev = Some(header);
1480                }
1481                if let Some(next) = &res.next {
1482                    assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1483                    // If there is a `next`, there must be at least one previous block (either `prev`
1484                    // itself or the last block if the window is nonempty), so we can `unwrap` here.
1485                    assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1486                }
1487            };
1488
1489        let get_window = |start, end| {
1490            let ds = ds.clone();
1491            async move {
1492                let window = ds
1493                    .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1494                    .await
1495                    .unwrap();
1496                tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1497                check_invariants(&window, start, end, true);
1498                window
1499            }
1500        };
1501
1502        // Case 0: happy path. All blocks are available, including prev and next.
1503        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1504        let end = start + 1;
1505        let res = get_window(start, end).await;
1506        assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1507        assert_eq!(res.window, test_blocks[1]);
1508        assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1509
1510        // Case 1: no `prev`, start of window is before genesis.
1511        let start = 0;
1512        let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1513        let res = get_window(start, end).await;
1514        assert_eq!(res.prev, None);
1515        assert_eq!(res.window, test_blocks[0]);
1516        assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1517
1518        // Case 2: no `next`, end of window is after the most recently sequenced block.
1519        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1520        let end = i64::MAX as u64;
1521        let res = get_window(start, end).await;
1522        assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1523        // There may have been more blocks sequenced since we grabbed `test_blocks`, so just check
1524        // that the prefix of the window is correct.
1525        assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1526        assert_eq!(res.next, None);
1527        // Fetch more blocks using the `from` form of the endpoint. Start from the last block we had
1528        // previously (ie fetch a slightly overlapping window) to ensure there is at least one block
1529        // in the new window.
1530        let from = test_blocks.iter().flatten().count() - 1;
1531        let more = ds
1532            .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1533            .await
1534            .unwrap();
1535        check_invariants(&more, start, end, false);
1536        assert_eq!(
1537            more.prev.as_ref().unwrap(),
1538            test_blocks.iter().flatten().nth(from - 1).unwrap()
1539        );
1540        assert_eq!(
1541            more.window[..res.window.len() - test_blocks[2].len() + 1],
1542            res.window[test_blocks[2].len() - 1..]
1543        );
1544        assert_eq!(res.next, None);
1545        // We should get the same result whether we query by block height or hash.
1546        let more2 = ds
1547            .get_header_window(
1548                test_blocks[2].last().unwrap().commit(),
1549                end,
1550                i64::MAX as usize,
1551            )
1552            .await
1553            .unwrap();
1554        check_invariants(&more2, start, end, false);
1555        assert_eq!(more2.from().unwrap(), more.from().unwrap());
1556        assert_eq!(more2.prev, more.prev);
1557        assert_eq!(more2.next, more.next);
1558        assert_eq!(more2.window[..more.window.len()], more.window);
1559
1560        // Case 3: the window is empty.
1561        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1562        let end = start;
1563        let res = get_window(start, end).await;
1564        assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1565        assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1566        assert_eq!(res.window, vec![]);
1567
1568        // Case 4: no relevant blocks are available yet.
1569        ds.get_header_window(
1570            WindowStart::Time((i64::MAX - 1) as u64),
1571            i64::MAX as u64,
1572            i64::MAX as usize,
1573        )
1574        .await
1575        .unwrap_err();
1576
1577        // Case 5: limits.
1578        let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1579            .into_iter()
1580            .flatten()
1581            .collect::<Vec<_>>();
1582        // Make a query that would return everything, but gets limited.
1583        let start = block_header_timestamp(&blocks[0]);
1584        let end = block_header_timestamp(&test_blocks[2][0]);
1585        let res = ds
1586            .get_header_window(WindowStart::Time(start), end, 1)
1587            .await
1588            .unwrap();
1589        assert_eq!(res.prev, None);
1590        assert_eq!(res.window, [blocks[0].clone()]);
1591        assert_eq!(res.next, None);
1592        // Query the next page of results, get limited again.
1593        let res = ds
1594            .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1595            .await
1596            .unwrap();
1597        assert_eq!(res.window, [blocks[1].clone()]);
1598        assert_eq!(res.next, None);
1599        // Get the rest of the results.
1600        let res = ds
1601            .get_header_window(
1602                WindowStart::Height(blocks[1].height() + 1),
1603                end,
1604                blocks.len() - 1,
1605            )
1606            .await
1607            .unwrap();
1608        assert_eq!(res.window, blocks[2..].to_vec());
1609        assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1610    }
1611
1612    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1613    pub async fn test_latest_qc_chain<D: TestableDataSource>()
1614    where
1615        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1616        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1617    {
1618        let storage = D::create(0).await;
1619        let ds = D::connect(&storage).await;
1620
1621        {
1622            let mut tx = ds.read().await.unwrap();
1623            assert_eq!(tx.latest_qc_chain().await.unwrap(), None);
1624        }
1625
1626        async fn leaf_with_qc_chain(
1627            number: u64,
1628        ) -> (LeafQueryData<MockTypes>, [CertificatePair<MockTypes>; 2]) {
1629            let mut leaf = Leaf2::<MockTypes>::genesis(
1630                &Default::default(),
1631                &Default::default(),
1632                TEST_VERSIONS.test.base,
1633            )
1634            .await;
1635            leaf.block_header_mut().block_number = number;
1636
1637            let mut qc1 = QuorumCertificate2::<MockTypes>::genesis(
1638                &Default::default(),
1639                &Default::default(),
1640                TEST_VERSIONS.test,
1641            )
1642            .await;
1643            qc1.view_number = ViewNumber::new(1);
1644            qc1.data.leaf_commit = Committable::commit(&leaf);
1645
1646            let mut qc2 = qc1.clone();
1647            qc2.view_number += 1;
1648
1649            let leaf = LeafQueryData::new(leaf, qc1.clone()).unwrap();
1650            (
1651                leaf,
1652                [
1653                    CertificatePair::non_epoch_change(qc1),
1654                    CertificatePair::non_epoch_change(qc2),
1655                ],
1656            )
1657        }
1658
1659        // Insert a leaf with QC chain.
1660        {
1661            let (leaf, qcs) = leaf_with_qc_chain(2).await;
1662            let mut tx = ds.write().await.unwrap();
1663            tx.insert_leaf_with_qc_chain(leaf, Some(qcs.clone()))
1664                .await
1665                .unwrap();
1666            tx.commit().await.unwrap();
1667
1668            assert_eq!(
1669                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1670                Some(qcs)
1671            );
1672        }
1673
1674        // Insert a later leaf without a QC chain. This should clear the previously saved QC chain,
1675        // which is no longer up to date.
1676        {
1677            let (leaf, _) = leaf_with_qc_chain(3).await;
1678            let mut tx = ds.write().await.unwrap();
1679            tx.insert_leaf_with_qc_chain(leaf, None).await.unwrap();
1680            tx.commit().await.unwrap();
1681
1682            assert_eq!(
1683                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1684                None
1685            );
1686        }
1687
1688        // Insert an earlier leaf with a QC chain. This should not be saved since it is not the
1689        // latest leaf.
1690        {
1691            let (leaf, qcs) = leaf_with_qc_chain(1).await;
1692            let mut tx = ds.write().await.unwrap();
1693            tx.insert_leaf_with_qc_chain(leaf, Some(qcs)).await.unwrap();
1694            tx.commit().await.unwrap();
1695
1696            assert_eq!(
1697                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1698                None
1699            );
1700        }
1701    }
1702}
1703
1704/// Generic tests we can instantiate for all the status data sources.
1705#[cfg(any(test, feature = "testing"))]
1706#[espresso_macros::generic_tests]
1707pub mod status_tests {
1708    use std::time::Duration;
1709
1710    use crate::{
1711        status::StatusDataSource,
1712        testing::{
1713            consensus::{DataSourceLifeCycle, MockNetwork},
1714            mocks::mock_transaction,
1715            sleep,
1716        },
1717    };
1718
1719    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1720    pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1721        let mut network = MockNetwork::<D>::init().await;
1722        let ds = network.data_source();
1723
1724        {
1725            // Check that block height is initially zero.
1726            assert_eq!(ds.block_height().await.unwrap(), 0);
1727            // With consensus paused, check that the success rate returns NAN (since the block
1728            // height, the numerator, is 0, and the view number, the denominator, is 0).
1729            assert!(ds.success_rate().await.unwrap().is_nan());
1730            // Since there is no block produced, "last_decided_time" metric is 0.
1731            // Therefore, the elapsed time since the last block should be close to the time elapsed since the Unix epoch.
1732            assert!(
1733                (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1734                    - chrono::Utc::now().timestamp())
1735                .abs()
1736                    <= 1,
1737                "time elapsed since last_decided_time is not within 1s"
1738            );
1739        }
1740
1741        // Submit a transaction
1742        let txn = mock_transaction(vec![1, 2, 3]);
1743        network.submit_transaction(txn.clone()).await;
1744
1745        // Start consensus and wait for the transaction to be finalized.
1746        network.start().await;
1747
1748        // Now wait for at least one non-genesis block to be finalized.
1749        loop {
1750            let height = ds.block_height().await.unwrap();
1751            if height > 1 {
1752                break;
1753            }
1754            tracing::info!(height, "waiting for a block to be finalized");
1755            sleep(Duration::from_secs(1)).await;
1756        }
1757
1758        {
1759            // Check that the success rate has been updated. Note that we can only check if success
1760            // rate is positive. We don't know exactly what it is because we can't know how many
1761            // views have elapsed without race conditions.
1762            let success_rate = ds.success_rate().await.unwrap();
1763            assert!(success_rate.is_finite(), "{success_rate}");
1764            assert!(success_rate > 0.0, "{success_rate}");
1765        }
1766
1767        {
1768            // Shutting down the consensus to halt block production
1769            // Introducing a delay of 3 seconds to ensure that elapsed time since last block is atleast 3seconds
1770            network.shut_down().await;
1771            sleep(Duration::from_secs(3)).await;
1772            // Asserting that the elapsed time since the last block is at least 3 seconds
1773            assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1774        }
1775    }
1776}
1777
1778#[macro_export]
1779macro_rules! instantiate_data_source_tests {
1780    ($t:ty) => {
1781        use $crate::data_source::{
1782            availability_tests, node_tests, persistence_tests, status_tests,
1783        };
1784
1785        instantiate_availability_tests!($t);
1786        instantiate_persistence_tests!($t);
1787        instantiate_node_tests!($t);
1788        instantiate_status_tests!($t);
1789    };
1790}