hotshot_query_service/data_source/storage/sql/queries/
node.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Node storage implementation for a database query engine.
14
15use std::ops::{Bound, RangeBounds};
16
17use alloy::primitives::map::HashMap;
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::stream::{StreamExt, TryStreamExt};
21use hotshot_types::{
22    data::VidShare,
23    simple_certificate::CertificatePair,
24    traits::{block_contents::BlockHeader, node_implementation::NodeType},
25};
26use snafu::OptionExt;
27use tracing::instrument;
28
29use super::{
30    super::transaction::{Transaction, TransactionMode, Write, query, query_as},
31    DecodeError, HEADER_COLUMNS, QueryBuilder, parse_header,
32};
33use crate::{
34    Header, MissingSnafu, QueryError, QueryResult,
35    availability::{NamespaceId, QueryableHeader},
36    data_source::storage::{
37        Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
38    },
39    node::{
40        BlockId, ResourceSyncStatus, SyncStatus, SyncStatusQueryData, SyncStatusRange,
41        TimeWindowQueryData, WindowStart,
42    },
43    types::HeightIndexed,
44};
45
46#[async_trait]
47impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
48where
49    Mode: TransactionMode,
50    Types: NodeType,
51    Header<Types>: QueryableHeader<Types>,
52{
53    async fn block_height(&mut self) -> QueryResult<usize> {
54        match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
55            .fetch_one(self.as_mut())
56            .await?
57        {
58            (Some(height),) => {
59                // The height of the block is the number of blocks below it, so the total number of
60                // blocks is one more than the height of the highest block.
61                Ok(height as usize + 1)
62            },
63            (None,) => {
64                // If there are no blocks yet, the height is 0.
65                Ok(0)
66            },
67        }
68    }
69
70    async fn count_transactions_in_range(
71        &mut self,
72        range: impl RangeBounds<usize> + Send,
73        namespace: Option<NamespaceId<Types>>,
74    ) -> QueryResult<usize> {
75        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
76        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
77            return Ok(0);
78        };
79        let (count,) = query_as::<(i64,)>(
80            "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
81        )
82        .bind(to as i64)
83        .bind(namespace)
84        .fetch_one(self.as_mut())
85        .await?;
86        let mut count = count as usize;
87
88        if from > 0 {
89            let (prev_count,) = query_as::<(i64,)>(
90                "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
91            )
92            .bind((from - 1) as i64)
93            .bind(namespace)
94            .fetch_one(self.as_mut())
95            .await?;
96            count = count.saturating_sub(prev_count as usize);
97        }
98
99        Ok(count)
100    }
101
102    async fn payload_size_in_range(
103        &mut self,
104        range: impl RangeBounds<usize> + Send,
105        namespace: Option<NamespaceId<Types>>,
106    ) -> QueryResult<usize> {
107        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
108        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
109            return Ok(0);
110        };
111        let (size,) = query_as::<(i64,)>(
112            "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
113        )
114        .bind(to as i64)
115        .bind(namespace)
116        .fetch_one(self.as_mut())
117        .await?;
118        let mut size = size as usize;
119
120        if from > 0 {
121            let (prev_size,) = query_as::<(i64,)>(
122                "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
123            )
124            .bind((from - 1) as i64)
125            .bind(namespace)
126            .fetch_one(self.as_mut())
127            .await?;
128            size = size.saturating_sub(prev_size as usize);
129        }
130
131        Ok(size)
132    }
133
134    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
135    where
136        ID: Into<BlockId<Types>> + Send + Sync,
137    {
138        let mut query = QueryBuilder::default();
139        let where_clause = query.header_where_clause(id.into())?;
140        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
141        // selecting by payload ID, as payloads are not unique), we return the first one.
142        let sql = format!(
143            "SELECT vid_share FROM header AS h
144              WHERE {where_clause}
145              ORDER BY h.height
146              LIMIT 1"
147        );
148        let (share_data,) = query
149            .query_as::<(Option<Vec<u8>>,)>(&sql)
150            .fetch_one(self.as_mut())
151            .await?;
152        let share_data = share_data.context(MissingSnafu)?;
153        let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
154        Ok(share)
155    }
156
157    async fn sync_status_for_range(
158        &mut self,
159        from: usize,
160        to: usize,
161    ) -> QueryResult<SyncStatusQueryData> {
162        // A block can be missing if its corresponding header is missing or if the block's pyaload
163        // information is missing.
164        let blocks = self
165            .sync_status_ranges(
166                "header AS h JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, \
167                 p.ns_table)",
168                "height",
169                from,
170                to,
171            )
172            .await?;
173
174        let leaves = if blocks.is_fully_synced() {
175            // A common special case is that there are no missing blocks. In this case, we already
176            // know there are no missing leaves either, since a block can only be present if we
177            // already have the corresponding leaf. Just return the fully-synced status for leaves
178            // without doing another expensive query.
179            blocks.clone()
180        } else {
181            // A leaf can only be missing if there is no row for it in the database (all its columns
182            // are non-nullable). We use `height` as an indicator for `NULL` rows in an inner join,
183            // which allows an index-only scan.
184            self.sync_status_ranges("leaf2", "height", from, to).await?
185        };
186
187        // VID common works just like blocks.
188        let vid_common = self
189            .sync_status_ranges(
190                "header AS h JOIN vid_common AS v ON h.payload_hash = v.hash",
191                "height",
192                from,
193                to,
194            )
195            .await?;
196
197        // VID shares can be missing if the corresponding header is missing _or_ if the share data
198        // is NULL, so we use the `share` column as an indicator.
199        let vid_shares = self
200            .sync_status_ranges("header", "vid_share", from, to)
201            .await?;
202
203        Ok(SyncStatusQueryData {
204            leaves,
205            blocks,
206            vid_common,
207            vid_shares,
208            pruned_height: None,
209        })
210    }
211
212    async fn get_header_window(
213        &mut self,
214        start: impl Into<WindowStart<Types>> + Send + Sync,
215        end: u64,
216        limit: usize,
217    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
218        // Find the specific block that starts the requested window.
219        let first_block = match start.into() {
220            WindowStart::Time(t) => {
221                // If the request is not to start from a specific block, but from a timestamp, we
222                // use a different method to find the window, as detecting whether we have
223                // sufficient data to answer the query is not as simple as just trying `load_header`
224                // for a specific block ID.
225                return self.time_window::<Types>(t, end, limit).await;
226            },
227            WindowStart::Height(h) => h,
228            WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
229        };
230
231        // Find all blocks starting from `first_block` with timestamps less than `end`. Block
232        // timestamps are monotonically increasing, so this query is guaranteed to return a
233        // contiguous range of blocks ordered by increasing height.
234        let sql = format!(
235            "SELECT {HEADER_COLUMNS}
236               FROM header AS h
237              WHERE h.height >= $1 AND h.timestamp < $2
238              ORDER BY h.height
239              LIMIT $3"
240        );
241        let rows = query(&sql)
242            .bind(first_block as i64)
243            .bind(end as i64)
244            .bind(limit as i64)
245            .fetch(self.as_mut());
246        let window = rows
247            .map(|row| parse_header::<Types>(row?))
248            .try_collect::<Vec<_>>()
249            .await?;
250
251        // Find the block just before the window.
252        let prev = if first_block > 0 {
253            Some(self.load_header::<Types>(first_block as usize - 1).await?)
254        } else {
255            None
256        };
257
258        let next = if window.len() < limit {
259            // If we are not limited, complete the window by finding the block just after the
260            // window. We order by timestamp _then_ height, because the timestamp order allows the
261            // query planner to use the index on timestamp to also efficiently solve the WHERE
262            // clause, but this process may turn up multiple results, due to the 1-second resolution
263            // of block timestamps. The final sort by height guarantees us a unique, deterministic
264            // result (the first block with a given timestamp). This sort may not be able to use an
265            // index, but it shouldn't be too expensive, since there will never be more than a
266            // handful of blocks with the same timestamp.
267            let sql = format!(
268                "SELECT {HEADER_COLUMNS}
269               FROM header AS h
270              WHERE h.timestamp >= $1
271              ORDER BY h.timestamp, h.height
272              LIMIT 1"
273            );
274            query(&sql)
275                .bind(end as i64)
276                .fetch_optional(self.as_mut())
277                .await?
278                .map(parse_header::<Types>)
279                .transpose()?
280        } else {
281            // If we have been limited, return a `null` next block indicating an incomplete window.
282            // The client will have to query again with an adjusted starting point to get subsequent
283            // results.
284            tracing::debug!(limit, "cutting off header window request due to limit");
285            None
286        };
287
288        Ok(TimeWindowQueryData { window, prev, next })
289    }
290
291    async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
292        let Some((json,)) = query_as("SELECT qcs FROM latest_qc_chain LIMIT 1")
293            .fetch_optional(self.as_mut())
294            .await?
295        else {
296            return Ok(None);
297        };
298        let qcs = serde_json::from_value(json).decode_error("malformed QC")?;
299        Ok(qcs)
300    }
301}
302
303impl<Mode> Transaction<Mode>
304where
305    Mode: TransactionMode,
306{
307    /// Characterize consecutive ranges of objects in the given `height`-indexed table by status.
308    ///
309    /// This function will find all ranges in `[0, block_height)`. If `pruned_height` is specified,
310    /// an initial range will be created with bounds `[0, pruned_height]` and status
311    /// [`SyncStatus::Pruned`]. Then only the range `[pruned_height + 1, block_height)` will
312    /// actually be searched.
313    ///
314    /// The search process uses an indexed outer self-join on `table`, which requires traversing
315    /// the table twice. Thus, it can be fairly expensive on large tables, but it is still linear in
316    /// the size of the table.
317    ///
318    /// The value of `indicator_column` in the outer join results is used to check for missing
319    /// objects (indicated by a `NULL` value). If `indicator_column` is a `NOT NULL` column, such as
320    /// `height`, then this function will only consider objects missing if there is no corresponding
321    /// row in the database at all. However, `indicator_column` may also be a nullable column (such
322    /// as `payload.data`, in which case objects are treated as missing if there is no corresponding
323    /// row _or_ if there is a row but it has an explicit `NULL` value for `indicator_column`).
324    #[instrument(skip(self))]
325    async fn sync_status_ranges(
326        &mut self,
327        table: &str,
328        indicator_column: &str,
329        start: usize,
330        end: usize,
331    ) -> QueryResult<ResourceSyncStatus> {
332        let mut ranges = vec![];
333        tracing::debug!("searching for missing ranges");
334
335        // Find every height in the range `[start, end)` which is the first height in a sequence of
336        // present objects (i.e. the object just before it is missing).
337        //
338        // We do this by outer joining the table with itself, with height shifted by one, to get a
339        // combined table where each row contains a successor object and its immediate predecessor,
340        // if present. If the predecessor is missing, its height will be `NULL` (which is impossible
341        // otherwise, because `height` is a `NOT NULL` column).
342        //
343        // For each table in the self-join, we _first_ sub-select just the range of interest (i.e.
344        // [start, end) for the successor table and [start - 1, end - 1) for the predecessor table).
345        // It is more efficient to do this first to reduce the number of rows involved in the join,
346        // which is the expensive part of the operation. In fact, due to the nature of the outer
347        // join, it is impossible to do this filtering after the join for the predecessor table,
348        // since at that point the table will not necessarily be indexed and will contain some rows
349        // with `NULL` height.
350        let query = format!(
351            "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
352                WHERE height >= $1 AND height < $2)
353            SELECT successor.height FROM range AS predecessor
354            RIGHT JOIN range AS successor
355            ON successor.height = predecessor.height + 1
356            WHERE successor.indicator IS NOT NULL
357              AND predecessor.indicator IS NULL
358            ORDER BY successor.height"
359        );
360        let range_starts = query_as::<(i64,)>(&query)
361            .bind(start as i64)
362            .bind(end as i64)
363            .fetch_all(self.as_mut())
364            .await?;
365        tracing::debug!(
366            ?range_starts,
367            "found {} starting heights for present ranges",
368            range_starts.len()
369        );
370
371        let range_ends = if range_starts.len() <= 10 {
372            // In the common case, where we are mostly or entirely synced and only missing a few
373            // objects, chopping the space into only a small number of present ranges, it is more
374            // efficient to pick out reach range's end individually with a specific, efficient,
375            // height-indexed query, rather than execute another very expensive query which is the
376            // mirror of the `range_starts` query to load all the range ends in bulk.
377            let mut ends = vec![];
378            for (i, &(start,)) in range_starts.iter().enumerate() {
379                // We can easily find the end of the range from the start by finding the maximum
380                // height which is still present between the start and the next range's start.
381                let query = format!(
382                    "SELECT max(height) from {table}
383                      WHERE height < $1 AND {indicator_column} IS NOT NULL"
384                );
385                let upper_bound = if i + 1 < range_starts.len() {
386                    range_starts[i + 1].0
387                } else {
388                    end as i64
389                };
390                let (end,) = query_as::<(i64,)>(&query)
391                    .bind(upper_bound)
392                    // This query is guaranteed to return one result, since `start` satisfies the
393                    // requirements even if nothing else does.
394                    .fetch_one(self.as_mut())
395                    .await?;
396                tracing::debug!(start, end, "found end for present range");
397                ends.push((end,));
398            }
399            ends
400        } else {
401            // When the number of distinct ranges becomes large, making many small queries to fetch
402            // each specific range end becomes inefficient because it is dominated by overhead. In
403            // this case, we fall back to fetching the range ends using a single moderately
404            // expensive query, which is the mirror image of the query we used to fetch the range
405            // starts.
406            let query = format!(
407                "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
408                    WHERE height >= $1 AND height < $2)
409                SELECT predecessor.height FROM range AS predecessor
410                LEFT JOIN range AS successor
411                ON successor.height = predecessor.height + 1
412                WHERE predecessor.indicator IS NOT NULL
413                  AND successor.indicator IS NULL
414                ORDER BY predecessor.height"
415            );
416            let ends = query_as::<(i64,)>(&query)
417                .bind(start as i64)
418                .bind(end as i64)
419                .fetch_all(self.as_mut())
420                .await?;
421            tracing::debug!(
422                ?ends,
423                "found {} ending heights for present ranges",
424                ends.len()
425            );
426            ends
427        };
428
429        // Sanity check: every range has a start and an end.
430        if range_starts.len() != range_ends.len() {
431            return Err(QueryError::Error {
432                message: format!(
433                    "number of present range starts ({}) does not match number of present range \
434                     ends ({})",
435                    range_starts.len(),
436                    range_ends.len(),
437                ),
438            });
439        }
440
441        // Now we can simply zip `range_starts` and `range_ends` to find the full sequence of
442        // [`SyncStatus::Present`] ranges. We can then interpolate [`SyncStatus::Missing`] ranges
443        // between each present range.
444        let mut prev = start;
445        for ((start,), (end,)) in range_starts.into_iter().zip(range_ends) {
446            let start = start as usize;
447            let end = end as usize;
448
449            // Sanity check range bounds.
450            if start < prev {
451                return Err(QueryError::Error {
452                    message: format!(
453                        "found present ranges out of order: range start {start} is before \
454                         previous range end {prev}"
455                    ),
456                });
457            }
458            if end < start {
459                return Err(QueryError::Error {
460                    message: format!("malformed range: start={start}, end={end}"),
461                });
462            }
463
464            if start != prev {
465                // There is a range in between this one and the previous one, which must correspond
466                // to missing objects.
467                tracing::debug!(start = prev, end = start, "found missing range");
468                ranges.push(SyncStatusRange {
469                    start: prev,
470                    end: start,
471                    status: SyncStatus::Missing,
472                });
473            }
474
475            ranges.push(SyncStatusRange {
476                start,
477                end: end + 1, // convert inclusive range to exclusive
478                status: SyncStatus::Present,
479            });
480            prev = end + 1;
481        }
482
483        // There is possibly one more missing range, between the final present range and the overall
484        // block height.
485        if prev != end {
486            tracing::debug!(start = prev, end, "found missing range");
487            ranges.push(SyncStatusRange {
488                start: prev,
489                end,
490                status: SyncStatus::Missing,
491            });
492        }
493
494        let missing = ranges
495            .iter()
496            .filter_map(|range| {
497                if range.status == SyncStatus::Missing {
498                    Some(range.end - range.start)
499                } else {
500                    None
501                }
502            })
503            .sum();
504        tracing::debug!(
505            missing,
506            "found missing objects in {} total ranges",
507            ranges.len()
508        );
509
510        Ok(ResourceSyncStatus { missing, ranges })
511    }
512}
513
514impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
515where
516    Types: NodeType,
517    Header<Types>: QueryableHeader<Types>,
518{
519    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
520        let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
521            .fetch_one(self.as_mut())
522            .await?;
523        Ok(height as usize)
524    }
525
526    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
527        // Get the maximum height for which we have stored aggregated results
528        // then query all the namespace info for that height
529        let res: (Option<i64>,) =
530            query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
531                .fetch_one(self.as_mut())
532                .await?;
533
534        let (Some(max_height),) = res else {
535            return Ok(None);
536        };
537
538        let rows: Vec<(i64, i64, i64)> = query_as(
539            r#"
540        SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
541        "#,
542        )
543        .bind(max_height)
544        .fetch_all(self.as_mut())
545        .await?;
546
547        let mut num_transactions = HashMap::default();
548        let mut payload_size = HashMap::default();
549
550        for (namespace_id, num_tx, payload_sz) in rows {
551            // Null namespace is represented as - 1 in database
552            // as it is part of primary key and primary key can not be NULL
553            // This namespace represents the cumulative sum of all the namespaces
554            let key = if namespace_id == -1 {
555                None
556            } else {
557                Some(namespace_id.into())
558            };
559            num_transactions.insert(key, num_tx as usize);
560            payload_size.insert(key, payload_sz as usize);
561        }
562
563        Ok(Some(Aggregate {
564            height: max_height,
565            num_transactions,
566            payload_size,
567        }))
568    }
569}
570
571impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
572where
573    Header<Types>: QueryableHeader<Types>,
574{
575    async fn update_aggregates(
576        &mut self,
577        prev: Aggregate<Types>,
578        blocks: &[PayloadMetadata<Types>],
579    ) -> anyhow::Result<Aggregate<Types>> {
580        let height = blocks[0].height();
581        let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
582
583        let mut rows = Vec::new();
584
585        // Cumulatively sum up new statistics for each block in this chunk.
586        let aggregates = blocks
587            .iter()
588            .scan(
589                (height, prev_tx_count, prev_size),
590                |(height, tx_count, size), block| {
591                    if *height != block.height {
592                        return Some(Err(anyhow!(
593                            "blocks in update_aggregates are not sequential; expected {}, got {}",
594                            *height,
595                            block.height()
596                        )));
597                    }
598                    *height += 1;
599
600                    //  Update total global stats
601                    // `None` represents stats across all namespaces.
602                    // It is represented as -1 in database
603
604                    *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
605                    *size.entry(None).or_insert(0) += block.size as usize;
606
607                    // Add row for global cumulative stats (namespace = -1)
608
609                    rows.push((
610                        block.height as i64,
611                        -1,
612                        tx_count[&None] as i64,
613                        size[&None] as i64,
614                    ));
615
616                    // Update per-namespace cumulative stats
617                    for (&ns_id, info) in &block.namespaces {
618                        let key = Some(ns_id);
619
620                        *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
621                        *size.entry(key).or_insert(0) += info.size as usize;
622                    }
623
624                    //  Insert cumulative stats for all known namespaces
625                    // Even if a namespace wasn't present in this block,
626                    // we still insert its latest cumulative stats at this height.
627                    for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
628                        let key = Some(*ns_id);
629                        rows.push((
630                            block.height as i64,
631                            (*ns_id).into(),
632                            tx_count[&key] as i64,
633                            size[&key] as i64,
634                        ));
635                    }
636
637                    Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
638                },
639            )
640            .collect::<anyhow::Result<Vec<_>>>()?;
641        let last_aggregate = aggregates.last().cloned();
642
643        let (height, num_transactions, payload_size) =
644            last_aggregate.ok_or_else(|| anyhow!("no row"))?;
645
646        self.upsert(
647            "aggregate",
648            ["height", "namespace", "num_transactions", "payload_size"],
649            ["height", "namespace"],
650            rows,
651        )
652        .await?;
653        Ok(Aggregate {
654            height,
655            num_transactions,
656            payload_size,
657        })
658    }
659}
660
661impl<Mode: TransactionMode> Transaction<Mode> {
662    async fn time_window<Types: NodeType>(
663        &mut self,
664        start: u64,
665        end: u64,
666        limit: usize,
667    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
668        // Find all blocks whose timestamps fall within the window [start, end). Block timestamps
669        // are monotonically increasing, so this query is guaranteed to return a contiguous range of
670        // blocks ordered by increasing height.
671        //
672        // We order by timestamp _then_ height, because the timestamp order allows the query planner
673        // to use the index on timestamp to also efficiently solve the WHERE clause, but this
674        // process may turn up multiple results, due to the 1-second resolution of block timestamps.
675        // The final sort by height guarantees us a unique, deterministic result (the first block
676        // with a given timestamp). This sort may not be able to use an index, but it shouldn't be
677        // too expensive, since there will never be more than a handful of blocks with the same
678        // timestamp.
679        let sql = format!(
680            "SELECT {HEADER_COLUMNS}
681               FROM header AS h
682              WHERE h.timestamp >= $1 AND h.timestamp < $2
683              ORDER BY h.timestamp, h.height
684              LIMIT $3"
685        );
686        let rows = query(&sql)
687            .bind(start as i64)
688            .bind(end as i64)
689            .bind(limit as i64)
690            .fetch(self.as_mut());
691        let window: Vec<_> = rows
692            .map(|row| parse_header::<Types>(row?))
693            .try_collect()
694            .await?;
695
696        let next = if window.len() < limit {
697            // If we are not limited, complete the window by finding the block just after.
698            let sql = format!(
699                "SELECT {HEADER_COLUMNS}
700               FROM header AS h
701              WHERE h.timestamp >= $1
702              ORDER BY h.timestamp, h.height
703              LIMIT 1"
704            );
705            query(&sql)
706                .bind(end as i64)
707                .fetch_optional(self.as_mut())
708                .await?
709                .map(parse_header::<Types>)
710                .transpose()?
711        } else {
712            // If we have been limited, return a `null` next block indicating an incomplete window.
713            // The client will have to query again with an adjusted starting point to get subsequent
714            // results.
715            tracing::debug!(limit, "cutting off header window request due to limit");
716            None
717        };
718
719        // If the `next` block exists, _or_ if any block in the window exists, we know we have
720        // enough information to definitively say at least where the window starts (we may or may
721        // not have where it ends, depending on how many blocks have thus far been produced).
722        // However, if we have neither a block in the window nor a block after it, we cannot say
723        // whether the next block produced will have a timestamp before or after the window start.
724        // In this case, we don't know what the `prev` field of the response should be, so we return
725        // an error: the caller must try again after more blocks have been produced.
726        if window.is_empty() && next.is_none() {
727            return Err(QueryError::NotFound);
728        }
729
730        // Find the block just before the window.
731        let sql = format!(
732            "SELECT {HEADER_COLUMNS}
733               FROM header AS h
734              WHERE h.timestamp < $1
735              ORDER BY h.timestamp DESC, h.height DESC
736              LIMIT 1"
737        );
738        let prev = query(&sql)
739            .bind(start as i64)
740            .fetch_optional(self.as_mut())
741            .await?
742            .map(parse_header::<Types>)
743            .transpose()?;
744
745        Ok(TimeWindowQueryData { window, prev, next })
746    }
747}
748
749/// Get inclusive start and end bounds for a range to pull aggregate statistics.
750///
751/// Returns [`None`] if there are no blocks in the given range, in which case the result should be
752/// the default value of the aggregate statistic.
753async fn aggregate_range_bounds<Types>(
754    tx: &mut Transaction<impl TransactionMode>,
755    range: impl RangeBounds<usize>,
756) -> QueryResult<Option<(usize, usize)>>
757where
758    Types: NodeType,
759    Header<Types>: QueryableHeader<Types>,
760{
761    let from = match range.start_bound() {
762        Bound::Included(from) => *from,
763        Bound::Excluded(from) => *from + 1,
764        Bound::Unbounded => 0,
765    };
766    let to = match range.end_bound() {
767        Bound::Included(to) => *to,
768        Bound::Excluded(0) => return Ok(None),
769        Bound::Excluded(to) => *to - 1,
770        Bound::Unbounded => {
771            let height = AggregatesStorage::<Types>::aggregates_height(tx)
772                .await
773                .map_err(|err| QueryError::Error {
774                    message: format!("{err:#}"),
775                })?;
776            if height == 0 {
777                return Ok(None);
778            }
779            if height < from {
780                return Ok(None);
781            }
782            height - 1
783        },
784    };
785    Ok(Some((from, to)))
786}
787
788#[cfg(test)]
789mod test {
790    use hotshot_example_types::node_types::TEST_VERSIONS;
791    use hotshot_types::vid::advz::advz_scheme;
792    use itertools::Itertools;
793    use jf_advz::VidScheme;
794    use pretty_assertions::assert_eq;
795
796    use super::*;
797    use crate::{
798        availability::{BlockQueryData, LeafQueryData, VidCommonQueryData},
799        data_source::{
800            Transaction as _, VersionedDataSource,
801            sql::testing::TmpDb,
802            storage::{SqlStorage, StorageConnectionType, UpdateAvailabilityStorage},
803        },
804        testing::mocks::MockTypes,
805    };
806
807    async fn test_sync_status_ranges(start: usize, end: usize, present_ranges: &[(usize, usize)]) {
808        let storage = TmpDb::init().await;
809        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
810            .await
811            .unwrap();
812
813        // Generate some mock leaves to insert.
814        let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
815            LeafQueryData::<MockTypes>::genesis(
816                &Default::default(),
817                &Default::default(),
818                TEST_VERSIONS.test,
819            )
820            .await,
821        ];
822        for i in 1..end {
823            let mut leaf = leaves[i - 1].clone();
824            leaf.leaf.block_header_mut().block_number = i as u64;
825            leaves.push(leaf);
826        }
827
828        // Set up.
829        {
830            let mut tx = db.write().await.unwrap();
831
832            for &(start, end) in present_ranges {
833                for leaf in leaves[start..end].iter() {
834                    tx.insert_leaf(leaf.clone()).await.unwrap();
835                }
836            }
837
838            tx.commit().await.unwrap();
839        }
840
841        let sync_status = db
842            .read()
843            .await
844            .unwrap()
845            .sync_status_ranges("leaf2", "height", start, end)
846            .await
847            .unwrap();
848
849        // Verify missing.
850        let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
851        let total = end - start;
852        assert_eq!(sync_status.missing, total - present);
853
854        // Verify ranges.
855        let mut ranges = sync_status.ranges.into_iter();
856        let mut prev = start;
857        for &(start, end) in present_ranges {
858            if start != prev {
859                let range = ranges.next().unwrap();
860                assert_eq!(
861                    range,
862                    SyncStatusRange {
863                        start: prev,
864                        end: start,
865                        status: SyncStatus::Missing,
866                    }
867                );
868            }
869            let range = ranges.next().unwrap();
870            assert_eq!(
871                range,
872                SyncStatusRange {
873                    start,
874                    end,
875                    status: SyncStatus::Present,
876                }
877            );
878            prev = end;
879        }
880
881        if prev != end {
882            let range = ranges.next().unwrap();
883            assert_eq!(
884                range,
885                SyncStatusRange {
886                    start: prev,
887                    end,
888                    status: SyncStatus::Missing,
889                }
890            );
891        }
892
893        assert_eq!(ranges.next(), None);
894    }
895
896    #[tokio::test]
897    #[test_log::test]
898    async fn test_sync_status_ranges_bookends_present() {
899        test_sync_status_ranges(0, 6, &[(0, 2), (4, 6)]).await;
900    }
901
902    #[tokio::test]
903    #[test_log::test]
904    async fn test_sync_status_ranges_bookends_missing() {
905        test_sync_status_ranges(0, 6, &[(2, 4)]).await;
906    }
907
908    #[tokio::test]
909    #[test_log::test]
910    async fn test_sync_status_ranges_start_offset_bookends_present() {
911        test_sync_status_ranges(1, 8, &[(2, 4), (6, 8)]).await;
912    }
913
914    #[tokio::test]
915    #[test_log::test]
916    async fn test_sync_status_ranges_start_offset_bookends_missing() {
917        test_sync_status_ranges(1, 8, &[(4, 6)]).await;
918    }
919
920    #[tokio::test]
921    #[test_log::test]
922    async fn test_sync_status_ranges_singleton_ranges() {
923        test_sync_status_ranges(0, 3, &[(0, 1), (2, 3)]).await;
924    }
925
926    #[tokio::test]
927    #[test_log::test]
928    async fn test_sync_status_ranges_many_ranges_bookends_present() {
929        let ranges = (0..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
930        test_sync_status_ranges(0, 201, &ranges).await;
931    }
932
933    #[tokio::test]
934    #[test_log::test]
935    async fn test_sync_status_ranges_many_ranges_bookends_missing() {
936        let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
937        test_sync_status_ranges(0, 202, &ranges).await;
938    }
939
940    #[tokio::test]
941    #[test_log::test]
942    async fn test_sync_status_ranges_many_ranges_start_offset_bookends_present() {
943        let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
944        test_sync_status_ranges(1, 201, &ranges).await;
945    }
946
947    #[tokio::test]
948    #[test_log::test]
949    async fn test_sync_status_ranges_many_ranges_start_offset_bookends_missing() {
950        let ranges = (2..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
951        test_sync_status_ranges(1, 202, &ranges).await;
952    }
953
954    #[tokio::test]
955    #[test_log::test]
956    async fn test_sync_status_duplicate_payload() {
957        let storage = TmpDb::init().await;
958        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
959            .await
960            .unwrap();
961        let mut vid = advz_scheme(2);
962
963        // Create two blocks with the same (empty) payload.
964        let mut leaves = vec![
965            LeafQueryData::<MockTypes>::genesis(
966                &Default::default(),
967                &Default::default(),
968                TEST_VERSIONS.test,
969            )
970            .await,
971        ];
972        let mut blocks = vec![
973            BlockQueryData::<MockTypes>::genesis(
974                &Default::default(),
975                &Default::default(),
976                TEST_VERSIONS.test.base,
977            )
978            .await,
979        ];
980        let dispersal = vid.disperse([]).unwrap();
981
982        let mut leaf = leaves[0].clone();
983        leaf.leaf.block_header_mut().block_number += 1;
984        let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
985        leaves.push(leaf);
986        blocks.push(block);
987
988        // Insert the first leaf without payload or VID data.
989        {
990            let mut tx = db.write().await.unwrap();
991            tx.insert_leaf(leaves[0].clone()).await.unwrap();
992            tx.commit().await.unwrap();
993        }
994
995        // The block and VID data are missing.
996        let missing = ResourceSyncStatus {
997            missing: 1,
998            ranges: vec![SyncStatusRange {
999                status: SyncStatus::Missing,
1000                start: 0,
1001                end: 1,
1002            }],
1003        };
1004        assert_eq!(
1005            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1006                .await
1007                .unwrap(),
1008            SyncStatusQueryData {
1009                leaves: ResourceSyncStatus {
1010                    missing: 0,
1011                    ranges: vec![SyncStatusRange {
1012                        status: SyncStatus::Present,
1013                        start: 0,
1014                        end: 1,
1015                    }]
1016                },
1017                blocks: missing.clone(),
1018                vid_common: missing.clone(),
1019                vid_shares: missing,
1020                pruned_height: None,
1021            }
1022        );
1023
1024        // Insert the second block with all data.
1025        {
1026            let mut tx = db.write().await.unwrap();
1027            tx.insert_leaf(leaves[1].clone()).await.unwrap();
1028            tx.insert_block(blocks[1].clone()).await.unwrap();
1029            tx.insert_vid(
1030                VidCommonQueryData::<MockTypes>::new(
1031                    leaves[1].header().clone(),
1032                    hotshot_types::data::VidCommon::V0(dispersal.common),
1033                ),
1034                Some(VidShare::V0(dispersal.shares[0].clone())),
1035            )
1036            .await
1037            .unwrap();
1038            tx.commit().await.unwrap();
1039        }
1040
1041        // The payload data is shared by both leaves, and nothing is missing.
1042        let present = ResourceSyncStatus {
1043            missing: 0,
1044            ranges: vec![SyncStatusRange {
1045                status: SyncStatus::Present,
1046                start: 0,
1047                end: 2,
1048            }],
1049        };
1050        assert_eq!(
1051            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1052                .await
1053                .unwrap(),
1054            SyncStatusQueryData {
1055                leaves: present.clone(),
1056                blocks: present.clone(),
1057                vid_common: present,
1058                vid_shares: ResourceSyncStatus {
1059                    missing: 1,
1060                    ranges: vec![
1061                        SyncStatusRange {
1062                            status: SyncStatus::Missing,
1063                            start: 0,
1064                            end: 1,
1065                        },
1066                        SyncStatusRange {
1067                            status: SyncStatus::Present,
1068                            start: 1,
1069                            end: 2,
1070                        },
1071                    ]
1072                },
1073                pruned_height: None,
1074            }
1075        );
1076    }
1077
1078    #[tokio::test]
1079    #[test_log::test]
1080    async fn test_sync_status_same_payload_different_ns_table() {
1081        let storage = TmpDb::init().await;
1082        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
1083            .await
1084            .unwrap();
1085        let mut vid = advz_scheme(2);
1086
1087        // Create two blocks with byte-identical payloads, but different namespace tables (meaning
1088        // the interpretation of the payload is different).
1089        // Create two blocks with the same (empty) payload.
1090        let mut leaves = vec![
1091            LeafQueryData::<MockTypes>::genesis(
1092                &Default::default(),
1093                &Default::default(),
1094                TEST_VERSIONS.test,
1095            )
1096            .await,
1097        ];
1098        let mut blocks = vec![
1099            BlockQueryData::<MockTypes>::genesis(
1100                &Default::default(),
1101                &Default::default(),
1102                TEST_VERSIONS.test.base,
1103            )
1104            .await,
1105        ];
1106        let dispersal = vid.disperse([]).unwrap();
1107
1108        let mut leaf = leaves[0].clone();
1109        leaf.leaf.block_header_mut().block_number += 1;
1110        leaf.leaf.block_header_mut().metadata.num_transactions += 1;
1111        let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
1112        leaves.push(leaf);
1113        blocks.push(block);
1114
1115        // Insert the first leaf without payload or VID data.
1116        {
1117            let mut tx = db.write().await.unwrap();
1118            tx.insert_leaf(leaves[0].clone()).await.unwrap();
1119            tx.commit().await.unwrap();
1120        }
1121
1122        // The block and VID data are missing.
1123        let missing = ResourceSyncStatus {
1124            missing: 1,
1125            ranges: vec![SyncStatusRange {
1126                status: SyncStatus::Missing,
1127                start: 0,
1128                end: 1,
1129            }],
1130        };
1131        assert_eq!(
1132            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1133                .await
1134                .unwrap(),
1135            SyncStatusQueryData {
1136                leaves: ResourceSyncStatus {
1137                    missing: 0,
1138                    ranges: vec![SyncStatusRange {
1139                        status: SyncStatus::Present,
1140                        start: 0,
1141                        end: 1,
1142                    }]
1143                },
1144                blocks: missing.clone(),
1145                vid_common: missing.clone(),
1146                vid_shares: missing,
1147                pruned_height: None,
1148            }
1149        );
1150
1151        // Insert the second block with all data.
1152        {
1153            let mut tx = db.write().await.unwrap();
1154            tx.insert_leaf(leaves[1].clone()).await.unwrap();
1155            tx.insert_block(blocks[1].clone()).await.unwrap();
1156            tx.insert_vid(
1157                VidCommonQueryData::<MockTypes>::new(
1158                    leaves[1].header().clone(),
1159                    hotshot_types::data::VidCommon::V0(dispersal.common),
1160                ),
1161                Some(VidShare::V0(dispersal.shares[0].clone())),
1162            )
1163            .await
1164            .unwrap();
1165            tx.commit().await.unwrap();
1166        }
1167
1168        // The payload data cannot be shared because metadata (e.g. num_transactions) differs. VID,
1169        // on the other hand, is independent of the interpretation of the payload and is shared by
1170        // both leaves.
1171        let present = ResourceSyncStatus {
1172            missing: 0,
1173            ranges: vec![SyncStatusRange {
1174                status: SyncStatus::Present,
1175                start: 0,
1176                end: 2,
1177            }],
1178        };
1179        let missing = ResourceSyncStatus {
1180            missing: 1,
1181            ranges: vec![
1182                SyncStatusRange {
1183                    status: SyncStatus::Missing,
1184                    start: 0,
1185                    end: 1,
1186                },
1187                SyncStatusRange {
1188                    status: SyncStatus::Present,
1189                    start: 1,
1190                    end: 2,
1191                },
1192            ],
1193        };
1194        assert_eq!(
1195            NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1196                .await
1197                .unwrap(),
1198            SyncStatusQueryData {
1199                leaves: present.clone(),
1200                blocks: missing.clone(),
1201                vid_common: present,
1202                vid_shares: missing,
1203                pruned_height: None,
1204            }
1205        );
1206    }
1207}