hotshot_query_service/data_source/storage/sql/queries/
node.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Node storage implementation for a database query engine.
14
15use std::{
16    collections::HashMap,
17    ops::{Bound, RangeBounds},
18};
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use futures::stream::{StreamExt, TryStreamExt};
23use hotshot_types::{
24    data::VidShare,
25    traits::{block_contents::BlockHeader, node_implementation::NodeType},
26};
27use snafu::OptionExt;
28use sqlx::Row;
29
30use super::{
31    super::transaction::{query, query_as, Transaction, TransactionMode, Write},
32    parse_header, DecodeError, QueryBuilder, HEADER_COLUMNS,
33};
34use crate::{
35    availability::{NamespaceId, QueryableHeader},
36    data_source::storage::{
37        Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
38    },
39    node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart},
40    types::HeightIndexed,
41    Header, MissingSnafu, NotFoundSnafu, QueryError, QueryResult,
42};
43
44#[async_trait]
45impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
46where
47    Mode: TransactionMode,
48    Types: NodeType,
49    Header<Types>: QueryableHeader<Types>,
50{
51    async fn block_height(&mut self) -> QueryResult<usize> {
52        match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
53            .fetch_one(self.as_mut())
54            .await?
55        {
56            (Some(height),) => {
57                // The height of the block is the number of blocks below it, so the total number of
58                // blocks is one more than the height of the highest block.
59                Ok(height as usize + 1)
60            },
61            (None,) => {
62                // If there are no blocks yet, the height is 0.
63                Ok(0)
64            },
65        }
66    }
67
68    async fn count_transactions_in_range(
69        &mut self,
70        range: impl RangeBounds<usize> + Send,
71        namespace: Option<NamespaceId<Types>>,
72    ) -> QueryResult<usize> {
73        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
74        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
75            return Ok(0);
76        };
77        let (count,) = query_as::<(i64,)>(
78            "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
79        )
80        .bind(to as i64)
81        .bind(namespace)
82        .fetch_one(self.as_mut())
83        .await?;
84        let mut count = count as usize;
85
86        if from > 0 {
87            let (prev_count,) = query_as::<(i64,)>(
88                "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
89            )
90            .bind((from - 1) as i64)
91            .bind(namespace)
92            .fetch_one(self.as_mut())
93            .await?;
94            count = count.saturating_sub(prev_count as usize);
95        }
96
97        Ok(count)
98    }
99
100    async fn payload_size_in_range(
101        &mut self,
102        range: impl RangeBounds<usize> + Send,
103        namespace: Option<NamespaceId<Types>>,
104    ) -> QueryResult<usize> {
105        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
106        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
107            return Ok(0);
108        };
109        let (size,) = query_as::<(i64,)>(
110            "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
111        )
112        .bind(to as i64)
113        .bind(namespace)
114        .fetch_one(self.as_mut())
115        .await?;
116        let mut size = size as usize;
117
118        if from > 0 {
119            let (prev_size,) = query_as::<(i64,)>(
120                "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
121            )
122            .bind((from - 1) as i64)
123            .bind(namespace)
124            .fetch_one(self.as_mut())
125            .await?;
126            size = size.saturating_sub(prev_size as usize);
127        }
128
129        Ok(size)
130    }
131
132    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
133    where
134        ID: Into<BlockId<Types>> + Send + Sync,
135    {
136        let mut query = QueryBuilder::default();
137        let where_clause = query.header_where_clause(id.into())?;
138        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
139        // selecting by payload ID, as payloads are not unique), we return the first one.
140        let sql = format!(
141            "SELECT v.share AS share FROM vid2 AS v
142               JOIN header AS h ON v.height = h.height
143              WHERE {where_clause}
144              ORDER BY h.height
145              LIMIT 1"
146        );
147        let (share_data,) = query
148            .query_as::<(Option<Vec<u8>>,)>(&sql)
149            .fetch_one(self.as_mut())
150            .await?;
151        let share_data = share_data.context(MissingSnafu)?;
152        let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
153        Ok(share)
154    }
155
156    async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
157        // A leaf can only be missing if there is no row for it in the database (all its columns are
158        // non-nullable). A block can be missing if its corresponding leaf is missing or if the
159        // block's `data` field is `NULL`. We can find the number of missing leaves and blocks by
160        // getting the number of fully missing leaf rows and the number of present but null-payload
161        // block rows.
162        //
163        // Note that it should not be possible for a block's row to be missing (as opposed to
164        // present but having a `NULL` payload) if the corresponding leaf is present. The schema
165        // enforces this, since the payload table `REFERENCES` the corresponding leaf row. Thus,
166        // missing block rows should be a subset of missing leaf rows and do not need to be counted
167        // separately. This is very important: if we had to count the number of block rows that were
168        // missing whose corresponding leaf row was present, this would require an expensive join
169        // and table traversal.
170        //
171        // We can get the number of missing leaf rows very efficiently, by subtracting the total
172        // number of leaf rows from the block height (since the block height by definition is the
173        // height of the highest leaf we do have). We can also get the number of null payloads
174        // directly using an `IS NULL` filter.
175        //
176        // For VID, common data can only be missing if the entire row is missing. Shares can be
177        // missing in that case _or_ if the row is present but share data is NULL. Thus, we also
178        // need to select the total number of VID rows and the number of present VID rows with a
179        // NULL share.
180        let sql = "SELECT l.max_height, l.total_leaves, p.null_payloads, v.total_vid, \
181                   vn.null_vid, pruned_height FROM
182                (SELECT max(leaf2.height) AS max_height, count(*) AS total_leaves FROM leaf2) AS l,
183                (SELECT count(*) AS null_payloads FROM payload WHERE data IS NULL) AS p,
184                (SELECT count(*) AS total_vid FROM vid2) AS v,
185                (SELECT count(*) AS null_vid FROM vid2 WHERE share IS NULL) AS vn,
186                (SELECT(SELECT last_height FROM pruned_height ORDER BY id DESC LIMIT 1) as \
187                   pruned_height)
188            ";
189        let row = query(sql)
190            .fetch_optional(self.as_mut())
191            .await?
192            .context(NotFoundSnafu)?;
193
194        let block_height = match row.get::<Option<i64>, _>("max_height") {
195            Some(height) => {
196                // The height of the block is the number of blocks below it, so the total number of
197                // blocks is one more than the height of the highest block.
198                height as usize + 1
199            },
200            None => {
201                // If there are no blocks yet, the height is 0.
202                0
203            },
204        };
205        let total_leaves = row.get::<i64, _>("total_leaves") as usize;
206        let null_payloads = row.get::<i64, _>("null_payloads") as usize;
207        let total_vid = row.get::<i64, _>("total_vid") as usize;
208        let null_vid = row.get::<i64, _>("null_vid") as usize;
209        let pruned_height = row
210            .get::<Option<i64>, _>("pruned_height")
211            .map(|h| h as usize);
212
213        let missing_leaves = block_height.saturating_sub(total_leaves);
214        let missing_blocks = missing_leaves + null_payloads;
215        let missing_vid_common = block_height.saturating_sub(total_vid);
216        let missing_vid_shares = missing_vid_common + null_vid;
217
218        Ok(SyncStatus {
219            missing_leaves,
220            missing_blocks,
221            missing_vid_common,
222            missing_vid_shares,
223            pruned_height,
224        })
225    }
226
227    async fn get_header_window(
228        &mut self,
229        start: impl Into<WindowStart<Types>> + Send + Sync,
230        end: u64,
231        limit: usize,
232    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
233        // Find the specific block that starts the requested window.
234        let first_block = match start.into() {
235            WindowStart::Time(t) => {
236                // If the request is not to start from a specific block, but from a timestamp, we
237                // use a different method to find the window, as detecting whether we have
238                // sufficient data to answer the query is not as simple as just trying `load_header`
239                // for a specific block ID.
240                return self.time_window::<Types>(t, end, limit).await;
241            },
242            WindowStart::Height(h) => h,
243            WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
244        };
245
246        // Find all blocks starting from `first_block` with timestamps less than `end`. Block
247        // timestamps are monotonically increasing, so this query is guaranteed to return a
248        // contiguous range of blocks ordered by increasing height.
249        let sql = format!(
250            "SELECT {HEADER_COLUMNS}
251               FROM header AS h
252              WHERE h.height >= $1 AND h.timestamp < $2
253              ORDER BY h.height
254              LIMIT $3"
255        );
256        let rows = query(&sql)
257            .bind(first_block as i64)
258            .bind(end as i64)
259            .bind(limit as i64)
260            .fetch(self.as_mut());
261        let window = rows
262            .map(|row| parse_header::<Types>(row?))
263            .try_collect::<Vec<_>>()
264            .await?;
265
266        // Find the block just before the window.
267        let prev = if first_block > 0 {
268            Some(self.load_header::<Types>(first_block as usize - 1).await?)
269        } else {
270            None
271        };
272
273        let next = if window.len() < limit {
274            // If we are not limited, complete the window by finding the block just after the
275            // window. We order by timestamp _then_ height, because the timestamp order allows the
276            // query planner to use the index on timestamp to also efficiently solve the WHERE
277            // clause, but this process may turn up multiple results, due to the 1-second resolution
278            // of block timestamps. The final sort by height guarantees us a unique, deterministic
279            // result (the first block with a given timestamp). This sort may not be able to use an
280            // index, but it shouldn't be too expensive, since there will never be more than a
281            // handful of blocks with the same timestamp.
282            let sql = format!(
283                "SELECT {HEADER_COLUMNS}
284               FROM header AS h
285              WHERE h.timestamp >= $1
286              ORDER BY h.timestamp, h.height
287              LIMIT 1"
288            );
289            query(&sql)
290                .bind(end as i64)
291                .fetch_optional(self.as_mut())
292                .await?
293                .map(parse_header::<Types>)
294                .transpose()?
295        } else {
296            // If we have been limited, return a `null` next block indicating an incomplete window.
297            // The client will have to query again with an adjusted starting point to get subsequent
298            // results.
299            tracing::debug!(limit, "cutting off header window request due to limit");
300            None
301        };
302
303        Ok(TimeWindowQueryData { window, prev, next })
304    }
305}
306
307impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
308where
309    Types: NodeType,
310    Header<Types>: QueryableHeader<Types>,
311{
312    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
313        let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
314            .fetch_one(self.as_mut())
315            .await?;
316        Ok(height as usize)
317    }
318
319    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
320        // Get the maximum height for which we have stored aggregated results
321        // then query all the namespace info for that height
322        let res: (Option<i64>,) =
323            query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
324                .fetch_one(self.as_mut())
325                .await?;
326
327        let (Some(max_height),) = res else {
328            return Ok(None);
329        };
330
331        let rows: Vec<(i64, i64, i64)> = query_as(
332            r#"
333        SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
334        "#,
335        )
336        .bind(max_height)
337        .fetch_all(self.as_mut())
338        .await?;
339
340        let mut num_transactions = HashMap::new();
341        let mut payload_size = HashMap::new();
342
343        for (namespace_id, num_tx, payload_sz) in rows {
344            // Null namespace is represented as - 1 in database
345            // as it is part of primary key and primary key can not be NULL
346            // This namespace represents the cumulative sum of all the namespaces
347            let key = if namespace_id == -1 {
348                None
349            } else {
350                Some(namespace_id.into())
351            };
352            num_transactions.insert(key, num_tx as usize);
353            payload_size.insert(key, payload_sz as usize);
354        }
355
356        Ok(Some(Aggregate {
357            height: max_height,
358            num_transactions,
359            payload_size,
360        }))
361    }
362}
363
364impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
365where
366    Header<Types>: QueryableHeader<Types>,
367{
368    async fn update_aggregates(
369        &mut self,
370        prev: Aggregate<Types>,
371        blocks: &[PayloadMetadata<Types>],
372    ) -> anyhow::Result<Aggregate<Types>> {
373        let height = blocks[0].height();
374        let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
375
376        let mut rows = Vec::new();
377
378        // Cumulatively sum up new statistics for each block in this chunk.
379        let aggregates = blocks
380            .iter()
381            .scan(
382                (height, prev_tx_count, prev_size),
383                |(height, tx_count, size), block| {
384                    if *height != block.height {
385                        return Some(Err(anyhow!(
386                            "blocks in update_aggregates are not sequential; expected {}, got {}",
387                            *height,
388                            block.height()
389                        )));
390                    }
391                    *height += 1;
392
393                    //  Update total global stats
394                    // `None` represents stats across all namespaces.
395                    // It is represented as -1 in database
396
397                    *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
398                    *size.entry(None).or_insert(0) += block.size as usize;
399
400                    // Add row for global cumulative stats (namespace = -1)
401
402                    rows.push((
403                        block.height as i64,
404                        -1,
405                        tx_count[&None] as i64,
406                        size[&None] as i64,
407                    ));
408
409                    // Update per-namespace cumulative stats
410                    for (&ns_id, info) in &block.namespaces {
411                        let key = Some(ns_id);
412
413                        *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
414                        *size.entry(key).or_insert(0) += info.size as usize;
415                    }
416
417                    //  Insert cumulative stats for all known namespaces
418                    // Even if a namespace wasn't present in this block,
419                    // we still insert its latest cumulative stats at this height.
420                    for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
421                        let key = Some(*ns_id);
422                        rows.push((
423                            block.height as i64,
424                            (*ns_id).into(),
425                            tx_count[&key] as i64,
426                            size[&key] as i64,
427                        ));
428                    }
429
430                    Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
431                },
432            )
433            .collect::<anyhow::Result<Vec<_>>>()?;
434        let last_aggregate = aggregates.last().cloned();
435
436        let (height, num_transactions, payload_size) =
437            last_aggregate.ok_or_else(|| anyhow!("no row"))?;
438
439        self.upsert(
440            "aggregate",
441            ["height", "namespace", "num_transactions", "payload_size"],
442            ["height", "namespace"],
443            rows,
444        )
445        .await?;
446        Ok(Aggregate {
447            height,
448            num_transactions,
449            payload_size,
450        })
451    }
452}
453
454impl<Mode: TransactionMode> Transaction<Mode> {
455    async fn time_window<Types: NodeType>(
456        &mut self,
457        start: u64,
458        end: u64,
459        limit: usize,
460    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
461        // Find all blocks whose timestamps fall within the window [start, end). Block timestamps
462        // are monotonically increasing, so this query is guaranteed to return a contiguous range of
463        // blocks ordered by increasing height.
464        //
465        // We order by timestamp _then_ height, because the timestamp order allows the query planner
466        // to use the index on timestamp to also efficiently solve the WHERE clause, but this
467        // process may turn up multiple results, due to the 1-second resolution of block timestamps.
468        // The final sort by height guarantees us a unique, deterministic result (the first block
469        // with a given timestamp). This sort may not be able to use an index, but it shouldn't be
470        // too expensive, since there will never be more than a handful of blocks with the same
471        // timestamp.
472        let sql = format!(
473            "SELECT {HEADER_COLUMNS}
474               FROM header AS h
475              WHERE h.timestamp >= $1 AND h.timestamp < $2
476              ORDER BY h.timestamp, h.height
477              LIMIT $3"
478        );
479        let rows = query(&sql)
480            .bind(start as i64)
481            .bind(end as i64)
482            .bind(limit as i64)
483            .fetch(self.as_mut());
484        let window: Vec<_> = rows
485            .map(|row| parse_header::<Types>(row?))
486            .try_collect()
487            .await?;
488
489        let next = if window.len() < limit {
490            // If we are not limited, complete the window by finding the block just after.
491            let sql = format!(
492                "SELECT {HEADER_COLUMNS}
493               FROM header AS h
494              WHERE h.timestamp >= $1
495              ORDER BY h.timestamp, h.height
496              LIMIT 1"
497            );
498            query(&sql)
499                .bind(end as i64)
500                .fetch_optional(self.as_mut())
501                .await?
502                .map(parse_header::<Types>)
503                .transpose()?
504        } else {
505            // If we have been limited, return a `null` next block indicating an incomplete window.
506            // The client will have to query again with an adjusted starting point to get subsequent
507            // results.
508            tracing::debug!(limit, "cutting off header window request due to limit");
509            None
510        };
511
512        // If the `next` block exists, _or_ if any block in the window exists, we know we have
513        // enough information to definitively say at least where the window starts (we may or may
514        // not have where it ends, depending on how many blocks have thus far been produced).
515        // However, if we have neither a block in the window nor a block after it, we cannot say
516        // whether the next block produced will have a timestamp before or after the window start.
517        // In this case, we don't know what the `prev` field of the response should be, so we return
518        // an error: the caller must try again after more blocks have been produced.
519        if window.is_empty() && next.is_none() {
520            return Err(QueryError::NotFound);
521        }
522
523        // Find the block just before the window.
524        let sql = format!(
525            "SELECT {HEADER_COLUMNS}
526               FROM header AS h
527              WHERE h.timestamp < $1
528              ORDER BY h.timestamp DESC, h.height DESC
529              LIMIT 1"
530        );
531        let prev = query(&sql)
532            .bind(start as i64)
533            .fetch_optional(self.as_mut())
534            .await?
535            .map(parse_header::<Types>)
536            .transpose()?;
537
538        Ok(TimeWindowQueryData { window, prev, next })
539    }
540}
541
542/// Get inclusive start and end bounds for a range to pull aggregate statistics.
543///
544/// Returns [`None`] if there are no blocks in the given range, in which case the result should be
545/// the default value of the aggregate statistic.
546async fn aggregate_range_bounds<Types>(
547    tx: &mut Transaction<impl TransactionMode>,
548    range: impl RangeBounds<usize>,
549) -> QueryResult<Option<(usize, usize)>>
550where
551    Types: NodeType,
552    Header<Types>: QueryableHeader<Types>,
553{
554    let from = match range.start_bound() {
555        Bound::Included(from) => *from,
556        Bound::Excluded(from) => *from + 1,
557        Bound::Unbounded => 0,
558    };
559    let to = match range.end_bound() {
560        Bound::Included(to) => *to,
561        Bound::Excluded(0) => return Ok(None),
562        Bound::Excluded(to) => *to - 1,
563        Bound::Unbounded => {
564            let height = AggregatesStorage::<Types>::aggregates_height(tx)
565                .await
566                .map_err(|err| QueryError::Error {
567                    message: format!("{err:#}"),
568                })?;
569            if height == 0 {
570                return Ok(None);
571            }
572            if height < from {
573                return Ok(None);
574            }
575            height - 1
576        },
577    };
578    Ok(Some((from, to)))
579}