hotshot_query_service/data_source/storage/sql/queries/
node.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Node storage implementation for a database query engine.
14
15use std::ops::{Bound, RangeBounds};
16
17use alloy::primitives::map::HashMap;
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::stream::{StreamExt, TryStreamExt};
21use hotshot_types::{
22    data::VidShare,
23    traits::{block_contents::BlockHeader, node_implementation::NodeType},
24};
25use snafu::OptionExt;
26use sqlx::Row;
27
28use super::{
29    super::transaction::{query, query_as, Transaction, TransactionMode, Write},
30    parse_header, DecodeError, QueryBuilder, HEADER_COLUMNS,
31};
32use crate::{
33    availability::{NamespaceId, QueryableHeader},
34    data_source::storage::{
35        Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
36    },
37    node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart},
38    types::HeightIndexed,
39    Header, MissingSnafu, NotFoundSnafu, QueryError, QueryResult,
40};
41
42#[async_trait]
43impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
44where
45    Mode: TransactionMode,
46    Types: NodeType,
47    Header<Types>: QueryableHeader<Types>,
48{
49    async fn block_height(&mut self) -> QueryResult<usize> {
50        match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
51            .fetch_one(self.as_mut())
52            .await?
53        {
54            (Some(height),) => {
55                // The height of the block is the number of blocks below it, so the total number of
56                // blocks is one more than the height of the highest block.
57                Ok(height as usize + 1)
58            },
59            (None,) => {
60                // If there are no blocks yet, the height is 0.
61                Ok(0)
62            },
63        }
64    }
65
66    async fn count_transactions_in_range(
67        &mut self,
68        range: impl RangeBounds<usize> + Send,
69        namespace: Option<NamespaceId<Types>>,
70    ) -> QueryResult<usize> {
71        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
72        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
73            return Ok(0);
74        };
75        let (count,) = query_as::<(i64,)>(
76            "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
77        )
78        .bind(to as i64)
79        .bind(namespace)
80        .fetch_one(self.as_mut())
81        .await?;
82        let mut count = count as usize;
83
84        if from > 0 {
85            let (prev_count,) = query_as::<(i64,)>(
86                "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
87            )
88            .bind((from - 1) as i64)
89            .bind(namespace)
90            .fetch_one(self.as_mut())
91            .await?;
92            count = count.saturating_sub(prev_count as usize);
93        }
94
95        Ok(count)
96    }
97
98    async fn payload_size_in_range(
99        &mut self,
100        range: impl RangeBounds<usize> + Send,
101        namespace: Option<NamespaceId<Types>>,
102    ) -> QueryResult<usize> {
103        let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
104        let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
105            return Ok(0);
106        };
107        let (size,) = query_as::<(i64,)>(
108            "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
109        )
110        .bind(to as i64)
111        .bind(namespace)
112        .fetch_one(self.as_mut())
113        .await?;
114        let mut size = size as usize;
115
116        if from > 0 {
117            let (prev_size,) = query_as::<(i64,)>(
118                "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
119            )
120            .bind((from - 1) as i64)
121            .bind(namespace)
122            .fetch_one(self.as_mut())
123            .await?;
124            size = size.saturating_sub(prev_size as usize);
125        }
126
127        Ok(size)
128    }
129
130    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
131    where
132        ID: Into<BlockId<Types>> + Send + Sync,
133    {
134        let mut query = QueryBuilder::default();
135        let where_clause = query.header_where_clause(id.into())?;
136        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
137        // selecting by payload ID, as payloads are not unique), we return the first one.
138        let sql = format!(
139            "SELECT v.share AS share FROM vid2 AS v
140               JOIN header AS h ON v.height = h.height
141              WHERE {where_clause}
142              ORDER BY h.height
143              LIMIT 1"
144        );
145        let (share_data,) = query
146            .query_as::<(Option<Vec<u8>>,)>(&sql)
147            .fetch_one(self.as_mut())
148            .await?;
149        let share_data = share_data.context(MissingSnafu)?;
150        let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
151        Ok(share)
152    }
153
154    async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
155        // A leaf can only be missing if there is no row for it in the database (all its columns are
156        // non-nullable). A block can be missing if its corresponding leaf is missing or if the
157        // block's `data` field is `NULL`. We can find the number of missing leaves and blocks by
158        // getting the number of fully missing leaf rows and the number of present but null-payload
159        // block rows.
160        //
161        // Note that it should not be possible for a block's row to be missing (as opposed to
162        // present but having a `NULL` payload) if the corresponding leaf is present. The schema
163        // enforces this, since the payload table `REFERENCES` the corresponding leaf row. Thus,
164        // missing block rows should be a subset of missing leaf rows and do not need to be counted
165        // separately. This is very important: if we had to count the number of block rows that were
166        // missing whose corresponding leaf row was present, this would require an expensive join
167        // and table traversal.
168        //
169        // We can get the number of missing leaf rows very efficiently, by subtracting the total
170        // number of leaf rows from the block height (since the block height by definition is the
171        // height of the highest leaf we do have). We can also get the number of null payloads
172        // directly using an `IS NULL` filter.
173        //
174        // For VID, common data can only be missing if the entire row is missing. Shares can be
175        // missing in that case _or_ if the row is present but share data is NULL. Thus, we also
176        // need to select the total number of VID rows and the number of present VID rows with a
177        // NULL share.
178        let sql = "SELECT l.max_height, l.total_leaves, p.null_payloads, v.total_vid, \
179                   vn.null_vid, pruned_height FROM
180                (SELECT max(leaf2.height) AS max_height, count(*) AS total_leaves FROM leaf2) AS l,
181                (SELECT count(*) AS null_payloads FROM payload WHERE data IS NULL) AS p,
182                (SELECT count(*) AS total_vid FROM vid2) AS v,
183                (SELECT count(*) AS null_vid FROM vid2 WHERE share IS NULL) AS vn,
184                (SELECT(SELECT last_height FROM pruned_height ORDER BY id DESC LIMIT 1) as \
185                   pruned_height)
186            ";
187        let row = query(sql)
188            .fetch_optional(self.as_mut())
189            .await?
190            .context(NotFoundSnafu)?;
191
192        let block_height = match row.get::<Option<i64>, _>("max_height") {
193            Some(height) => {
194                // The height of the block is the number of blocks below it, so the total number of
195                // blocks is one more than the height of the highest block.
196                height as usize + 1
197            },
198            None => {
199                // If there are no blocks yet, the height is 0.
200                0
201            },
202        };
203        let total_leaves = row.get::<i64, _>("total_leaves") as usize;
204        let null_payloads = row.get::<i64, _>("null_payloads") as usize;
205        let total_vid = row.get::<i64, _>("total_vid") as usize;
206        let null_vid = row.get::<i64, _>("null_vid") as usize;
207        let pruned_height = row
208            .get::<Option<i64>, _>("pruned_height")
209            .map(|h| h as usize);
210
211        let missing_leaves = block_height.saturating_sub(total_leaves);
212        let missing_blocks = missing_leaves + null_payloads;
213        let missing_vid_common = block_height.saturating_sub(total_vid);
214        let missing_vid_shares = missing_vid_common + null_vid;
215
216        Ok(SyncStatus {
217            missing_leaves,
218            missing_blocks,
219            missing_vid_common,
220            missing_vid_shares,
221            pruned_height,
222        })
223    }
224
225    async fn get_header_window(
226        &mut self,
227        start: impl Into<WindowStart<Types>> + Send + Sync,
228        end: u64,
229        limit: usize,
230    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
231        // Find the specific block that starts the requested window.
232        let first_block = match start.into() {
233            WindowStart::Time(t) => {
234                // If the request is not to start from a specific block, but from a timestamp, we
235                // use a different method to find the window, as detecting whether we have
236                // sufficient data to answer the query is not as simple as just trying `load_header`
237                // for a specific block ID.
238                return self.time_window::<Types>(t, end, limit).await;
239            },
240            WindowStart::Height(h) => h,
241            WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
242        };
243
244        // Find all blocks starting from `first_block` with timestamps less than `end`. Block
245        // timestamps are monotonically increasing, so this query is guaranteed to return a
246        // contiguous range of blocks ordered by increasing height.
247        let sql = format!(
248            "SELECT {HEADER_COLUMNS}
249               FROM header AS h
250              WHERE h.height >= $1 AND h.timestamp < $2
251              ORDER BY h.height
252              LIMIT $3"
253        );
254        let rows = query(&sql)
255            .bind(first_block as i64)
256            .bind(end as i64)
257            .bind(limit as i64)
258            .fetch(self.as_mut());
259        let window = rows
260            .map(|row| parse_header::<Types>(row?))
261            .try_collect::<Vec<_>>()
262            .await?;
263
264        // Find the block just before the window.
265        let prev = if first_block > 0 {
266            Some(self.load_header::<Types>(first_block as usize - 1).await?)
267        } else {
268            None
269        };
270
271        let next = if window.len() < limit {
272            // If we are not limited, complete the window by finding the block just after the
273            // window. We order by timestamp _then_ height, because the timestamp order allows the
274            // query planner to use the index on timestamp to also efficiently solve the WHERE
275            // clause, but this process may turn up multiple results, due to the 1-second resolution
276            // of block timestamps. The final sort by height guarantees us a unique, deterministic
277            // result (the first block with a given timestamp). This sort may not be able to use an
278            // index, but it shouldn't be too expensive, since there will never be more than a
279            // handful of blocks with the same timestamp.
280            let sql = format!(
281                "SELECT {HEADER_COLUMNS}
282               FROM header AS h
283              WHERE h.timestamp >= $1
284              ORDER BY h.timestamp, h.height
285              LIMIT 1"
286            );
287            query(&sql)
288                .bind(end as i64)
289                .fetch_optional(self.as_mut())
290                .await?
291                .map(parse_header::<Types>)
292                .transpose()?
293        } else {
294            // If we have been limited, return a `null` next block indicating an incomplete window.
295            // The client will have to query again with an adjusted starting point to get subsequent
296            // results.
297            tracing::debug!(limit, "cutting off header window request due to limit");
298            None
299        };
300
301        Ok(TimeWindowQueryData { window, prev, next })
302    }
303}
304
305impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
306where
307    Types: NodeType,
308    Header<Types>: QueryableHeader<Types>,
309{
310    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
311        let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
312            .fetch_one(self.as_mut())
313            .await?;
314        Ok(height as usize)
315    }
316
317    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
318        // Get the maximum height for which we have stored aggregated results
319        // then query all the namespace info for that height
320        let res: (Option<i64>,) =
321            query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
322                .fetch_one(self.as_mut())
323                .await?;
324
325        let (Some(max_height),) = res else {
326            return Ok(None);
327        };
328
329        let rows: Vec<(i64, i64, i64)> = query_as(
330            r#"
331        SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
332        "#,
333        )
334        .bind(max_height)
335        .fetch_all(self.as_mut())
336        .await?;
337
338        let mut num_transactions = HashMap::default();
339        let mut payload_size = HashMap::default();
340
341        for (namespace_id, num_tx, payload_sz) in rows {
342            // Null namespace is represented as - 1 in database
343            // as it is part of primary key and primary key can not be NULL
344            // This namespace represents the cumulative sum of all the namespaces
345            let key = if namespace_id == -1 {
346                None
347            } else {
348                Some(namespace_id.into())
349            };
350            num_transactions.insert(key, num_tx as usize);
351            payload_size.insert(key, payload_sz as usize);
352        }
353
354        Ok(Some(Aggregate {
355            height: max_height,
356            num_transactions,
357            payload_size,
358        }))
359    }
360}
361
362impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
363where
364    Header<Types>: QueryableHeader<Types>,
365{
366    async fn update_aggregates(
367        &mut self,
368        prev: Aggregate<Types>,
369        blocks: &[PayloadMetadata<Types>],
370    ) -> anyhow::Result<Aggregate<Types>> {
371        let height = blocks[0].height();
372        let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
373
374        let mut rows = Vec::new();
375
376        // Cumulatively sum up new statistics for each block in this chunk.
377        let aggregates = blocks
378            .iter()
379            .scan(
380                (height, prev_tx_count, prev_size),
381                |(height, tx_count, size), block| {
382                    if *height != block.height {
383                        return Some(Err(anyhow!(
384                            "blocks in update_aggregates are not sequential; expected {}, got {}",
385                            *height,
386                            block.height()
387                        )));
388                    }
389                    *height += 1;
390
391                    //  Update total global stats
392                    // `None` represents stats across all namespaces.
393                    // It is represented as -1 in database
394
395                    *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
396                    *size.entry(None).or_insert(0) += block.size as usize;
397
398                    // Add row for global cumulative stats (namespace = -1)
399
400                    rows.push((
401                        block.height as i64,
402                        -1,
403                        tx_count[&None] as i64,
404                        size[&None] as i64,
405                    ));
406
407                    // Update per-namespace cumulative stats
408                    for (&ns_id, info) in &block.namespaces {
409                        let key = Some(ns_id);
410
411                        *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
412                        *size.entry(key).or_insert(0) += info.size as usize;
413                    }
414
415                    //  Insert cumulative stats for all known namespaces
416                    // Even if a namespace wasn't present in this block,
417                    // we still insert its latest cumulative stats at this height.
418                    for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
419                        let key = Some(*ns_id);
420                        rows.push((
421                            block.height as i64,
422                            (*ns_id).into(),
423                            tx_count[&key] as i64,
424                            size[&key] as i64,
425                        ));
426                    }
427
428                    Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
429                },
430            )
431            .collect::<anyhow::Result<Vec<_>>>()?;
432        let last_aggregate = aggregates.last().cloned();
433
434        let (height, num_transactions, payload_size) =
435            last_aggregate.ok_or_else(|| anyhow!("no row"))?;
436
437        self.upsert(
438            "aggregate",
439            ["height", "namespace", "num_transactions", "payload_size"],
440            ["height", "namespace"],
441            rows,
442        )
443        .await?;
444        Ok(Aggregate {
445            height,
446            num_transactions,
447            payload_size,
448        })
449    }
450}
451
452impl<Mode: TransactionMode> Transaction<Mode> {
453    async fn time_window<Types: NodeType>(
454        &mut self,
455        start: u64,
456        end: u64,
457        limit: usize,
458    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
459        // Find all blocks whose timestamps fall within the window [start, end). Block timestamps
460        // are monotonically increasing, so this query is guaranteed to return a contiguous range of
461        // blocks ordered by increasing height.
462        //
463        // We order by timestamp _then_ height, because the timestamp order allows the query planner
464        // to use the index on timestamp to also efficiently solve the WHERE clause, but this
465        // process may turn up multiple results, due to the 1-second resolution of block timestamps.
466        // The final sort by height guarantees us a unique, deterministic result (the first block
467        // with a given timestamp). This sort may not be able to use an index, but it shouldn't be
468        // too expensive, since there will never be more than a handful of blocks with the same
469        // timestamp.
470        let sql = format!(
471            "SELECT {HEADER_COLUMNS}
472               FROM header AS h
473              WHERE h.timestamp >= $1 AND h.timestamp < $2
474              ORDER BY h.timestamp, h.height
475              LIMIT $3"
476        );
477        let rows = query(&sql)
478            .bind(start as i64)
479            .bind(end as i64)
480            .bind(limit as i64)
481            .fetch(self.as_mut());
482        let window: Vec<_> = rows
483            .map(|row| parse_header::<Types>(row?))
484            .try_collect()
485            .await?;
486
487        let next = if window.len() < limit {
488            // If we are not limited, complete the window by finding the block just after.
489            let sql = format!(
490                "SELECT {HEADER_COLUMNS}
491               FROM header AS h
492              WHERE h.timestamp >= $1
493              ORDER BY h.timestamp, h.height
494              LIMIT 1"
495            );
496            query(&sql)
497                .bind(end as i64)
498                .fetch_optional(self.as_mut())
499                .await?
500                .map(parse_header::<Types>)
501                .transpose()?
502        } else {
503            // If we have been limited, return a `null` next block indicating an incomplete window.
504            // The client will have to query again with an adjusted starting point to get subsequent
505            // results.
506            tracing::debug!(limit, "cutting off header window request due to limit");
507            None
508        };
509
510        // If the `next` block exists, _or_ if any block in the window exists, we know we have
511        // enough information to definitively say at least where the window starts (we may or may
512        // not have where it ends, depending on how many blocks have thus far been produced).
513        // However, if we have neither a block in the window nor a block after it, we cannot say
514        // whether the next block produced will have a timestamp before or after the window start.
515        // In this case, we don't know what the `prev` field of the response should be, so we return
516        // an error: the caller must try again after more blocks have been produced.
517        if window.is_empty() && next.is_none() {
518            return Err(QueryError::NotFound);
519        }
520
521        // Find the block just before the window.
522        let sql = format!(
523            "SELECT {HEADER_COLUMNS}
524               FROM header AS h
525              WHERE h.timestamp < $1
526              ORDER BY h.timestamp DESC, h.height DESC
527              LIMIT 1"
528        );
529        let prev = query(&sql)
530            .bind(start as i64)
531            .fetch_optional(self.as_mut())
532            .await?
533            .map(parse_header::<Types>)
534            .transpose()?;
535
536        Ok(TimeWindowQueryData { window, prev, next })
537    }
538}
539
540/// Get inclusive start and end bounds for a range to pull aggregate statistics.
541///
542/// Returns [`None`] if there are no blocks in the given range, in which case the result should be
543/// the default value of the aggregate statistic.
544async fn aggregate_range_bounds<Types>(
545    tx: &mut Transaction<impl TransactionMode>,
546    range: impl RangeBounds<usize>,
547) -> QueryResult<Option<(usize, usize)>>
548where
549    Types: NodeType,
550    Header<Types>: QueryableHeader<Types>,
551{
552    let from = match range.start_bound() {
553        Bound::Included(from) => *from,
554        Bound::Excluded(from) => *from + 1,
555        Bound::Unbounded => 0,
556    };
557    let to = match range.end_bound() {
558        Bound::Included(to) => *to,
559        Bound::Excluded(0) => return Ok(None),
560        Bound::Excluded(to) => *to - 1,
561        Bound::Unbounded => {
562            let height = AggregatesStorage::<Types>::aggregates_height(tx)
563                .await
564                .map_err(|err| QueryError::Error {
565                    message: format!("{err:#}"),
566                })?;
567            if height == 0 {
568                return Ok(None);
569            }
570            if height < from {
571                return Ok(None);
572            }
573            height - 1
574        },
575    };
576    Ok(Some((from, to)))
577}