1use std::ops::{Bound, RangeBounds};
16
17use alloy::primitives::map::HashMap;
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::stream::{StreamExt, TryStreamExt};
21use hotshot_types::{
22 data::VidShare,
23 simple_certificate::CertificatePair,
24 traits::{block_contents::BlockHeader, node_implementation::NodeType},
25};
26use snafu::OptionExt;
27use tracing::instrument;
28
29use super::{
30 super::transaction::{Transaction, TransactionMode, Write, query, query_as},
31 DecodeError, HEADER_COLUMNS, QueryBuilder, parse_header,
32};
33use crate::{
34 Header, MissingSnafu, QueryError, QueryResult,
35 availability::{NamespaceId, QueryableHeader},
36 data_source::storage::{
37 Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
38 },
39 node::{
40 BlockId, ResourceSyncStatus, SyncStatus, SyncStatusQueryData, SyncStatusRange,
41 TimeWindowQueryData, WindowStart,
42 },
43 types::HeightIndexed,
44};
45
46#[async_trait]
47impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
48where
49 Mode: TransactionMode,
50 Types: NodeType,
51 Header<Types>: QueryableHeader<Types>,
52{
53 async fn block_height(&mut self) -> QueryResult<usize> {
54 match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
55 .fetch_one(self.as_mut())
56 .await?
57 {
58 (Some(height),) => {
59 Ok(height as usize + 1)
62 },
63 (None,) => {
64 Ok(0)
66 },
67 }
68 }
69
70 async fn count_transactions_in_range(
71 &mut self,
72 range: impl RangeBounds<usize> + Send,
73 namespace: Option<NamespaceId<Types>>,
74 ) -> QueryResult<usize> {
75 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
76 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
77 return Ok(0);
78 };
79 let (count,) = query_as::<(i64,)>(
80 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
81 )
82 .bind(to as i64)
83 .bind(namespace)
84 .fetch_one(self.as_mut())
85 .await?;
86 let mut count = count as usize;
87
88 if from > 0 {
89 let (prev_count,) = query_as::<(i64,)>(
90 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
91 )
92 .bind((from - 1) as i64)
93 .bind(namespace)
94 .fetch_one(self.as_mut())
95 .await?;
96 count = count.saturating_sub(prev_count as usize);
97 }
98
99 Ok(count)
100 }
101
102 async fn payload_size_in_range(
103 &mut self,
104 range: impl RangeBounds<usize> + Send,
105 namespace: Option<NamespaceId<Types>>,
106 ) -> QueryResult<usize> {
107 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
108 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
109 return Ok(0);
110 };
111 let (size,) = query_as::<(i64,)>(
112 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
113 )
114 .bind(to as i64)
115 .bind(namespace)
116 .fetch_one(self.as_mut())
117 .await?;
118 let mut size = size as usize;
119
120 if from > 0 {
121 let (prev_size,) = query_as::<(i64,)>(
122 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
123 )
124 .bind((from - 1) as i64)
125 .bind(namespace)
126 .fetch_one(self.as_mut())
127 .await?;
128 size = size.saturating_sub(prev_size as usize);
129 }
130
131 Ok(size)
132 }
133
134 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
135 where
136 ID: Into<BlockId<Types>> + Send + Sync,
137 {
138 let mut query = QueryBuilder::default();
139 let where_clause = query.header_where_clause(id.into())?;
140 let sql = format!(
143 "SELECT vid_share FROM header AS h
144 WHERE {where_clause}
145 ORDER BY h.height
146 LIMIT 1"
147 );
148 let (share_data,) = query
149 .query_as::<(Option<Vec<u8>>,)>(&sql)
150 .fetch_one(self.as_mut())
151 .await?;
152 let share_data = share_data.context(MissingSnafu)?;
153 let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
154 Ok(share)
155 }
156
157 async fn sync_status_for_range(
158 &mut self,
159 from: usize,
160 to: usize,
161 ) -> QueryResult<SyncStatusQueryData> {
162 let blocks = self
165 .sync_status_ranges(
166 "header AS h JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, \
167 p.ns_table)",
168 "height",
169 from,
170 to,
171 )
172 .await?;
173
174 let leaves = if blocks.is_fully_synced() {
175 blocks.clone()
180 } else {
181 self.sync_status_ranges("leaf2", "height", from, to).await?
185 };
186
187 let vid_common = self
189 .sync_status_ranges(
190 "header AS h JOIN vid_common AS v ON h.payload_hash = v.hash",
191 "height",
192 from,
193 to,
194 )
195 .await?;
196
197 let vid_shares = self
200 .sync_status_ranges("header", "vid_share", from, to)
201 .await?;
202
203 Ok(SyncStatusQueryData {
204 leaves,
205 blocks,
206 vid_common,
207 vid_shares,
208 pruned_height: None,
209 })
210 }
211
212 async fn get_header_window(
213 &mut self,
214 start: impl Into<WindowStart<Types>> + Send + Sync,
215 end: u64,
216 limit: usize,
217 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
218 let first_block = match start.into() {
220 WindowStart::Time(t) => {
221 return self.time_window::<Types>(t, end, limit).await;
226 },
227 WindowStart::Height(h) => h,
228 WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
229 };
230
231 let sql = format!(
235 "SELECT {HEADER_COLUMNS}
236 FROM header AS h
237 WHERE h.height >= $1 AND h.timestamp < $2
238 ORDER BY h.height
239 LIMIT $3"
240 );
241 let rows = query(&sql)
242 .bind(first_block as i64)
243 .bind(end as i64)
244 .bind(limit as i64)
245 .fetch(self.as_mut());
246 let window = rows
247 .map(|row| parse_header::<Types>(row?))
248 .try_collect::<Vec<_>>()
249 .await?;
250
251 let prev = if first_block > 0 {
253 Some(self.load_header::<Types>(first_block as usize - 1).await?)
254 } else {
255 None
256 };
257
258 let next = if window.len() < limit {
259 let sql = format!(
268 "SELECT {HEADER_COLUMNS}
269 FROM header AS h
270 WHERE h.timestamp >= $1
271 ORDER BY h.timestamp, h.height
272 LIMIT 1"
273 );
274 query(&sql)
275 .bind(end as i64)
276 .fetch_optional(self.as_mut())
277 .await?
278 .map(parse_header::<Types>)
279 .transpose()?
280 } else {
281 tracing::debug!(limit, "cutting off header window request due to limit");
285 None
286 };
287
288 Ok(TimeWindowQueryData { window, prev, next })
289 }
290
291 async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
292 let Some((json,)) = query_as("SELECT qcs FROM latest_qc_chain LIMIT 1")
293 .fetch_optional(self.as_mut())
294 .await?
295 else {
296 return Ok(None);
297 };
298 let qcs = serde_json::from_value(json).decode_error("malformed QC")?;
299 Ok(qcs)
300 }
301}
302
303impl<Mode> Transaction<Mode>
304where
305 Mode: TransactionMode,
306{
307 #[instrument(skip(self))]
325 async fn sync_status_ranges(
326 &mut self,
327 table: &str,
328 indicator_column: &str,
329 start: usize,
330 end: usize,
331 ) -> QueryResult<ResourceSyncStatus> {
332 let mut ranges = vec![];
333 tracing::debug!("searching for missing ranges");
334
335 let query = format!(
351 "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
352 WHERE height >= $1 AND height < $2)
353 SELECT successor.height FROM range AS predecessor
354 RIGHT JOIN range AS successor
355 ON successor.height = predecessor.height + 1
356 WHERE successor.indicator IS NOT NULL
357 AND predecessor.indicator IS NULL
358 ORDER BY successor.height"
359 );
360 let range_starts = query_as::<(i64,)>(&query)
361 .bind(start as i64)
362 .bind(end as i64)
363 .fetch_all(self.as_mut())
364 .await?;
365 tracing::debug!(
366 ?range_starts,
367 "found {} starting heights for present ranges",
368 range_starts.len()
369 );
370
371 let range_ends = if range_starts.len() <= 10 {
372 let mut ends = vec![];
378 for (i, &(start,)) in range_starts.iter().enumerate() {
379 let query = format!(
382 "SELECT max(height) from {table}
383 WHERE height < $1 AND {indicator_column} IS NOT NULL"
384 );
385 let upper_bound = if i + 1 < range_starts.len() {
386 range_starts[i + 1].0
387 } else {
388 end as i64
389 };
390 let (end,) = query_as::<(i64,)>(&query)
391 .bind(upper_bound)
392 .fetch_one(self.as_mut())
395 .await?;
396 tracing::debug!(start, end, "found end for present range");
397 ends.push((end,));
398 }
399 ends
400 } else {
401 let query = format!(
407 "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
408 WHERE height >= $1 AND height < $2)
409 SELECT predecessor.height FROM range AS predecessor
410 LEFT JOIN range AS successor
411 ON successor.height = predecessor.height + 1
412 WHERE predecessor.indicator IS NOT NULL
413 AND successor.indicator IS NULL
414 ORDER BY predecessor.height"
415 );
416 let ends = query_as::<(i64,)>(&query)
417 .bind(start as i64)
418 .bind(end as i64)
419 .fetch_all(self.as_mut())
420 .await?;
421 tracing::debug!(
422 ?ends,
423 "found {} ending heights for present ranges",
424 ends.len()
425 );
426 ends
427 };
428
429 if range_starts.len() != range_ends.len() {
431 return Err(QueryError::Error {
432 message: format!(
433 "number of present range starts ({}) does not match number of present range \
434 ends ({})",
435 range_starts.len(),
436 range_ends.len(),
437 ),
438 });
439 }
440
441 let mut prev = start;
445 for ((start,), (end,)) in range_starts.into_iter().zip(range_ends) {
446 let start = start as usize;
447 let end = end as usize;
448
449 if start < prev {
451 return Err(QueryError::Error {
452 message: format!(
453 "found present ranges out of order: range start {start} is before \
454 previous range end {prev}"
455 ),
456 });
457 }
458 if end < start {
459 return Err(QueryError::Error {
460 message: format!("malformed range: start={start}, end={end}"),
461 });
462 }
463
464 if start != prev {
465 tracing::debug!(start = prev, end = start, "found missing range");
468 ranges.push(SyncStatusRange {
469 start: prev,
470 end: start,
471 status: SyncStatus::Missing,
472 });
473 }
474
475 ranges.push(SyncStatusRange {
476 start,
477 end: end + 1, status: SyncStatus::Present,
479 });
480 prev = end + 1;
481 }
482
483 if prev != end {
486 tracing::debug!(start = prev, end, "found missing range");
487 ranges.push(SyncStatusRange {
488 start: prev,
489 end,
490 status: SyncStatus::Missing,
491 });
492 }
493
494 let missing = ranges
495 .iter()
496 .filter_map(|range| {
497 if range.status == SyncStatus::Missing {
498 Some(range.end - range.start)
499 } else {
500 None
501 }
502 })
503 .sum();
504 tracing::debug!(
505 missing,
506 "found missing objects in {} total ranges",
507 ranges.len()
508 );
509
510 Ok(ResourceSyncStatus { missing, ranges })
511 }
512}
513
514impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
515where
516 Types: NodeType,
517 Header<Types>: QueryableHeader<Types>,
518{
519 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
520 let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
521 .fetch_one(self.as_mut())
522 .await?;
523 Ok(height as usize)
524 }
525
526 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
527 let res: (Option<i64>,) =
530 query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
531 .fetch_one(self.as_mut())
532 .await?;
533
534 let (Some(max_height),) = res else {
535 return Ok(None);
536 };
537
538 let rows: Vec<(i64, i64, i64)> = query_as(
539 r#"
540 SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
541 "#,
542 )
543 .bind(max_height)
544 .fetch_all(self.as_mut())
545 .await?;
546
547 let mut num_transactions = HashMap::default();
548 let mut payload_size = HashMap::default();
549
550 for (namespace_id, num_tx, payload_sz) in rows {
551 let key = if namespace_id == -1 {
555 None
556 } else {
557 Some(namespace_id.into())
558 };
559 num_transactions.insert(key, num_tx as usize);
560 payload_size.insert(key, payload_sz as usize);
561 }
562
563 Ok(Some(Aggregate {
564 height: max_height,
565 num_transactions,
566 payload_size,
567 }))
568 }
569}
570
571impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
572where
573 Header<Types>: QueryableHeader<Types>,
574{
575 async fn update_aggregates(
576 &mut self,
577 prev: Aggregate<Types>,
578 blocks: &[PayloadMetadata<Types>],
579 ) -> anyhow::Result<Aggregate<Types>> {
580 let height = blocks[0].height();
581 let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
582
583 let mut rows = Vec::new();
584
585 let aggregates = blocks
587 .iter()
588 .scan(
589 (height, prev_tx_count, prev_size),
590 |(height, tx_count, size), block| {
591 if *height != block.height {
592 return Some(Err(anyhow!(
593 "blocks in update_aggregates are not sequential; expected {}, got {}",
594 *height,
595 block.height()
596 )));
597 }
598 *height += 1;
599
600 *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
605 *size.entry(None).or_insert(0) += block.size as usize;
606
607 rows.push((
610 block.height as i64,
611 -1,
612 tx_count[&None] as i64,
613 size[&None] as i64,
614 ));
615
616 for (&ns_id, info) in &block.namespaces {
618 let key = Some(ns_id);
619
620 *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
621 *size.entry(key).or_insert(0) += info.size as usize;
622 }
623
624 for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
628 let key = Some(*ns_id);
629 rows.push((
630 block.height as i64,
631 (*ns_id).into(),
632 tx_count[&key] as i64,
633 size[&key] as i64,
634 ));
635 }
636
637 Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
638 },
639 )
640 .collect::<anyhow::Result<Vec<_>>>()?;
641 let last_aggregate = aggregates.last().cloned();
642
643 let (height, num_transactions, payload_size) =
644 last_aggregate.ok_or_else(|| anyhow!("no row"))?;
645
646 self.upsert(
647 "aggregate",
648 ["height", "namespace", "num_transactions", "payload_size"],
649 ["height", "namespace"],
650 rows,
651 )
652 .await?;
653 Ok(Aggregate {
654 height,
655 num_transactions,
656 payload_size,
657 })
658 }
659}
660
661impl<Mode: TransactionMode> Transaction<Mode> {
662 async fn time_window<Types: NodeType>(
663 &mut self,
664 start: u64,
665 end: u64,
666 limit: usize,
667 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
668 let sql = format!(
680 "SELECT {HEADER_COLUMNS}
681 FROM header AS h
682 WHERE h.timestamp >= $1 AND h.timestamp < $2
683 ORDER BY h.timestamp, h.height
684 LIMIT $3"
685 );
686 let rows = query(&sql)
687 .bind(start as i64)
688 .bind(end as i64)
689 .bind(limit as i64)
690 .fetch(self.as_mut());
691 let window: Vec<_> = rows
692 .map(|row| parse_header::<Types>(row?))
693 .try_collect()
694 .await?;
695
696 let next = if window.len() < limit {
697 let sql = format!(
699 "SELECT {HEADER_COLUMNS}
700 FROM header AS h
701 WHERE h.timestamp >= $1
702 ORDER BY h.timestamp, h.height
703 LIMIT 1"
704 );
705 query(&sql)
706 .bind(end as i64)
707 .fetch_optional(self.as_mut())
708 .await?
709 .map(parse_header::<Types>)
710 .transpose()?
711 } else {
712 tracing::debug!(limit, "cutting off header window request due to limit");
716 None
717 };
718
719 if window.is_empty() && next.is_none() {
727 return Err(QueryError::NotFound);
728 }
729
730 let sql = format!(
732 "SELECT {HEADER_COLUMNS}
733 FROM header AS h
734 WHERE h.timestamp < $1
735 ORDER BY h.timestamp DESC, h.height DESC
736 LIMIT 1"
737 );
738 let prev = query(&sql)
739 .bind(start as i64)
740 .fetch_optional(self.as_mut())
741 .await?
742 .map(parse_header::<Types>)
743 .transpose()?;
744
745 Ok(TimeWindowQueryData { window, prev, next })
746 }
747}
748
749async fn aggregate_range_bounds<Types>(
754 tx: &mut Transaction<impl TransactionMode>,
755 range: impl RangeBounds<usize>,
756) -> QueryResult<Option<(usize, usize)>>
757where
758 Types: NodeType,
759 Header<Types>: QueryableHeader<Types>,
760{
761 let from = match range.start_bound() {
762 Bound::Included(from) => *from,
763 Bound::Excluded(from) => *from + 1,
764 Bound::Unbounded => 0,
765 };
766 let to = match range.end_bound() {
767 Bound::Included(to) => *to,
768 Bound::Excluded(0) => return Ok(None),
769 Bound::Excluded(to) => *to - 1,
770 Bound::Unbounded => {
771 let height = AggregatesStorage::<Types>::aggregates_height(tx)
772 .await
773 .map_err(|err| QueryError::Error {
774 message: format!("{err:#}"),
775 })?;
776 if height == 0 {
777 return Ok(None);
778 }
779 if height < from {
780 return Ok(None);
781 }
782 height - 1
783 },
784 };
785 Ok(Some((from, to)))
786}
787
788#[cfg(test)]
789mod test {
790 use hotshot_example_types::node_types::TEST_VERSIONS;
791 use hotshot_types::vid::advz::advz_scheme;
792 use itertools::Itertools;
793 use jf_advz::VidScheme;
794 use pretty_assertions::assert_eq;
795
796 use super::*;
797 use crate::{
798 availability::{BlockQueryData, LeafQueryData, VidCommonQueryData},
799 data_source::{
800 Transaction as _, VersionedDataSource,
801 sql::testing::TmpDb,
802 storage::{SqlStorage, StorageConnectionType, UpdateAvailabilityStorage},
803 },
804 testing::mocks::MockTypes,
805 };
806
807 async fn test_sync_status_ranges(start: usize, end: usize, present_ranges: &[(usize, usize)]) {
808 let storage = TmpDb::init().await;
809 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
810 .await
811 .unwrap();
812
813 let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
815 LeafQueryData::<MockTypes>::genesis(
816 &Default::default(),
817 &Default::default(),
818 TEST_VERSIONS.test,
819 )
820 .await,
821 ];
822 for i in 1..end {
823 let mut leaf = leaves[i - 1].clone();
824 leaf.leaf.block_header_mut().block_number = i as u64;
825 leaves.push(leaf);
826 }
827
828 {
830 let mut tx = db.write().await.unwrap();
831
832 for &(start, end) in present_ranges {
833 for leaf in leaves[start..end].iter() {
834 tx.insert_leaf(leaf.clone()).await.unwrap();
835 }
836 }
837
838 tx.commit().await.unwrap();
839 }
840
841 let sync_status = db
842 .read()
843 .await
844 .unwrap()
845 .sync_status_ranges("leaf2", "height", start, end)
846 .await
847 .unwrap();
848
849 let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
851 let total = end - start;
852 assert_eq!(sync_status.missing, total - present);
853
854 let mut ranges = sync_status.ranges.into_iter();
856 let mut prev = start;
857 for &(start, end) in present_ranges {
858 if start != prev {
859 let range = ranges.next().unwrap();
860 assert_eq!(
861 range,
862 SyncStatusRange {
863 start: prev,
864 end: start,
865 status: SyncStatus::Missing,
866 }
867 );
868 }
869 let range = ranges.next().unwrap();
870 assert_eq!(
871 range,
872 SyncStatusRange {
873 start,
874 end,
875 status: SyncStatus::Present,
876 }
877 );
878 prev = end;
879 }
880
881 if prev != end {
882 let range = ranges.next().unwrap();
883 assert_eq!(
884 range,
885 SyncStatusRange {
886 start: prev,
887 end,
888 status: SyncStatus::Missing,
889 }
890 );
891 }
892
893 assert_eq!(ranges.next(), None);
894 }
895
896 #[tokio::test]
897 #[test_log::test]
898 async fn test_sync_status_ranges_bookends_present() {
899 test_sync_status_ranges(0, 6, &[(0, 2), (4, 6)]).await;
900 }
901
902 #[tokio::test]
903 #[test_log::test]
904 async fn test_sync_status_ranges_bookends_missing() {
905 test_sync_status_ranges(0, 6, &[(2, 4)]).await;
906 }
907
908 #[tokio::test]
909 #[test_log::test]
910 async fn test_sync_status_ranges_start_offset_bookends_present() {
911 test_sync_status_ranges(1, 8, &[(2, 4), (6, 8)]).await;
912 }
913
914 #[tokio::test]
915 #[test_log::test]
916 async fn test_sync_status_ranges_start_offset_bookends_missing() {
917 test_sync_status_ranges(1, 8, &[(4, 6)]).await;
918 }
919
920 #[tokio::test]
921 #[test_log::test]
922 async fn test_sync_status_ranges_singleton_ranges() {
923 test_sync_status_ranges(0, 3, &[(0, 1), (2, 3)]).await;
924 }
925
926 #[tokio::test]
927 #[test_log::test]
928 async fn test_sync_status_ranges_many_ranges_bookends_present() {
929 let ranges = (0..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
930 test_sync_status_ranges(0, 201, &ranges).await;
931 }
932
933 #[tokio::test]
934 #[test_log::test]
935 async fn test_sync_status_ranges_many_ranges_bookends_missing() {
936 let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
937 test_sync_status_ranges(0, 202, &ranges).await;
938 }
939
940 #[tokio::test]
941 #[test_log::test]
942 async fn test_sync_status_ranges_many_ranges_start_offset_bookends_present() {
943 let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
944 test_sync_status_ranges(1, 201, &ranges).await;
945 }
946
947 #[tokio::test]
948 #[test_log::test]
949 async fn test_sync_status_ranges_many_ranges_start_offset_bookends_missing() {
950 let ranges = (2..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
951 test_sync_status_ranges(1, 202, &ranges).await;
952 }
953
954 #[tokio::test]
955 #[test_log::test]
956 async fn test_sync_status_duplicate_payload() {
957 let storage = TmpDb::init().await;
958 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
959 .await
960 .unwrap();
961 let mut vid = advz_scheme(2);
962
963 let mut leaves = vec![
965 LeafQueryData::<MockTypes>::genesis(
966 &Default::default(),
967 &Default::default(),
968 TEST_VERSIONS.test,
969 )
970 .await,
971 ];
972 let mut blocks = vec![
973 BlockQueryData::<MockTypes>::genesis(
974 &Default::default(),
975 &Default::default(),
976 TEST_VERSIONS.test.base,
977 )
978 .await,
979 ];
980 let dispersal = vid.disperse([]).unwrap();
981
982 let mut leaf = leaves[0].clone();
983 leaf.leaf.block_header_mut().block_number += 1;
984 let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
985 leaves.push(leaf);
986 blocks.push(block);
987
988 {
990 let mut tx = db.write().await.unwrap();
991 tx.insert_leaf(leaves[0].clone()).await.unwrap();
992 tx.commit().await.unwrap();
993 }
994
995 let missing = ResourceSyncStatus {
997 missing: 1,
998 ranges: vec![SyncStatusRange {
999 status: SyncStatus::Missing,
1000 start: 0,
1001 end: 1,
1002 }],
1003 };
1004 assert_eq!(
1005 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1006 .await
1007 .unwrap(),
1008 SyncStatusQueryData {
1009 leaves: ResourceSyncStatus {
1010 missing: 0,
1011 ranges: vec![SyncStatusRange {
1012 status: SyncStatus::Present,
1013 start: 0,
1014 end: 1,
1015 }]
1016 },
1017 blocks: missing.clone(),
1018 vid_common: missing.clone(),
1019 vid_shares: missing,
1020 pruned_height: None,
1021 }
1022 );
1023
1024 {
1026 let mut tx = db.write().await.unwrap();
1027 tx.insert_leaf(leaves[1].clone()).await.unwrap();
1028 tx.insert_block(blocks[1].clone()).await.unwrap();
1029 tx.insert_vid(
1030 VidCommonQueryData::<MockTypes>::new(
1031 leaves[1].header().clone(),
1032 hotshot_types::data::VidCommon::V0(dispersal.common),
1033 ),
1034 Some(VidShare::V0(dispersal.shares[0].clone())),
1035 )
1036 .await
1037 .unwrap();
1038 tx.commit().await.unwrap();
1039 }
1040
1041 let present = ResourceSyncStatus {
1043 missing: 0,
1044 ranges: vec![SyncStatusRange {
1045 status: SyncStatus::Present,
1046 start: 0,
1047 end: 2,
1048 }],
1049 };
1050 assert_eq!(
1051 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1052 .await
1053 .unwrap(),
1054 SyncStatusQueryData {
1055 leaves: present.clone(),
1056 blocks: present.clone(),
1057 vid_common: present,
1058 vid_shares: ResourceSyncStatus {
1059 missing: 1,
1060 ranges: vec![
1061 SyncStatusRange {
1062 status: SyncStatus::Missing,
1063 start: 0,
1064 end: 1,
1065 },
1066 SyncStatusRange {
1067 status: SyncStatus::Present,
1068 start: 1,
1069 end: 2,
1070 },
1071 ]
1072 },
1073 pruned_height: None,
1074 }
1075 );
1076 }
1077
1078 #[tokio::test]
1079 #[test_log::test]
1080 async fn test_sync_status_same_payload_different_ns_table() {
1081 let storage = TmpDb::init().await;
1082 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
1083 .await
1084 .unwrap();
1085 let mut vid = advz_scheme(2);
1086
1087 let mut leaves = vec![
1091 LeafQueryData::<MockTypes>::genesis(
1092 &Default::default(),
1093 &Default::default(),
1094 TEST_VERSIONS.test,
1095 )
1096 .await,
1097 ];
1098 let mut blocks = vec![
1099 BlockQueryData::<MockTypes>::genesis(
1100 &Default::default(),
1101 &Default::default(),
1102 TEST_VERSIONS.test.base,
1103 )
1104 .await,
1105 ];
1106 let dispersal = vid.disperse([]).unwrap();
1107
1108 let mut leaf = leaves[0].clone();
1109 leaf.leaf.block_header_mut().block_number += 1;
1110 leaf.leaf.block_header_mut().metadata.num_transactions += 1;
1111 let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
1112 leaves.push(leaf);
1113 blocks.push(block);
1114
1115 {
1117 let mut tx = db.write().await.unwrap();
1118 tx.insert_leaf(leaves[0].clone()).await.unwrap();
1119 tx.commit().await.unwrap();
1120 }
1121
1122 let missing = ResourceSyncStatus {
1124 missing: 1,
1125 ranges: vec![SyncStatusRange {
1126 status: SyncStatus::Missing,
1127 start: 0,
1128 end: 1,
1129 }],
1130 };
1131 assert_eq!(
1132 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1133 .await
1134 .unwrap(),
1135 SyncStatusQueryData {
1136 leaves: ResourceSyncStatus {
1137 missing: 0,
1138 ranges: vec![SyncStatusRange {
1139 status: SyncStatus::Present,
1140 start: 0,
1141 end: 1,
1142 }]
1143 },
1144 blocks: missing.clone(),
1145 vid_common: missing.clone(),
1146 vid_shares: missing,
1147 pruned_height: None,
1148 }
1149 );
1150
1151 {
1153 let mut tx = db.write().await.unwrap();
1154 tx.insert_leaf(leaves[1].clone()).await.unwrap();
1155 tx.insert_block(blocks[1].clone()).await.unwrap();
1156 tx.insert_vid(
1157 VidCommonQueryData::<MockTypes>::new(
1158 leaves[1].header().clone(),
1159 hotshot_types::data::VidCommon::V0(dispersal.common),
1160 ),
1161 Some(VidShare::V0(dispersal.shares[0].clone())),
1162 )
1163 .await
1164 .unwrap();
1165 tx.commit().await.unwrap();
1166 }
1167
1168 let present = ResourceSyncStatus {
1172 missing: 0,
1173 ranges: vec![SyncStatusRange {
1174 status: SyncStatus::Present,
1175 start: 0,
1176 end: 2,
1177 }],
1178 };
1179 let missing = ResourceSyncStatus {
1180 missing: 1,
1181 ranges: vec![
1182 SyncStatusRange {
1183 status: SyncStatus::Missing,
1184 start: 0,
1185 end: 1,
1186 },
1187 SyncStatusRange {
1188 status: SyncStatus::Present,
1189 start: 1,
1190 end: 2,
1191 },
1192 ],
1193 };
1194 assert_eq!(
1195 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1196 .await
1197 .unwrap(),
1198 SyncStatusQueryData {
1199 leaves: present.clone(),
1200 blocks: missing.clone(),
1201 vid_common: present,
1202 vid_shares: missing,
1203 pruned_height: None,
1204 }
1205 );
1206 }
1207}