hotshot_query_service/data_source/storage/sql/queries/
availability.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Availability storage implementation for a database query engine.
14
15use std::ops::RangeBounds;
16
17use async_trait::async_trait;
18use futures::stream::{StreamExt, TryStreamExt};
19use hotshot_types::traits::node_implementation::NodeType;
20use snafu::OptionExt;
21use sqlx::FromRow;
22
23use super::{
24    super::transaction::{query, Transaction, TransactionMode},
25    QueryBuilder, BLOCK_COLUMNS, LEAF_COLUMNS, PAYLOAD_COLUMNS, PAYLOAD_METADATA_COLUMNS,
26    STATE_CERT_COLUMNS, VID_COMMON_COLUMNS, VID_COMMON_METADATA_COLUMNS,
27};
28use crate::{
29    availability::{
30        BlockId, BlockQueryData, LeafId, LeafQueryData, NamespaceInfo, NamespaceMap,
31        PayloadQueryData, QueryableHeader, QueryablePayload, StateCertQueryData, TransactionHash,
32        TransactionQueryData, VidCommonQueryData,
33    },
34    data_source::storage::{
35        sql::sqlx::Row, AvailabilityStorage, PayloadMetadata, VidCommonMetadata,
36    },
37    types::HeightIndexed,
38    ErrorSnafu, Header, MissingSnafu, Payload, QueryError, QueryResult,
39};
40
41#[async_trait]
42impl<Mode, Types> AvailabilityStorage<Types> for Transaction<Mode>
43where
44    Types: NodeType,
45    Mode: TransactionMode,
46    Payload<Types>: QueryablePayload<Types>,
47    Header<Types>: QueryableHeader<Types>,
48{
49    async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
50        let mut query = QueryBuilder::default();
51        let where_clause = match id {
52            LeafId::Number(n) => format!("height = {}", query.bind(n as i64)?),
53            LeafId::Hash(h) => format!("hash = {}", query.bind(h.to_string())?),
54        };
55        let row = query
56            .query(&format!(
57                "SELECT {LEAF_COLUMNS} FROM leaf2 WHERE {where_clause} LIMIT 1"
58            ))
59            .fetch_one(self.as_mut())
60            .await?;
61        let leaf = LeafQueryData::from_row(&row)?;
62        Ok(leaf)
63    }
64
65    async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
66        let mut query = QueryBuilder::default();
67        let where_clause = query.header_where_clause(id)?;
68        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
69        // selecting by payload ID, as payloads are not unique), we return the first one.
70        let sql = format!(
71            "SELECT {BLOCK_COLUMNS}
72              FROM header AS h
73              JOIN payload AS p ON h.height = p.height
74              WHERE {where_clause}
75              ORDER BY h.height
76              LIMIT 1"
77        );
78        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
79        let block = BlockQueryData::from_row(&row)?;
80        Ok(block)
81    }
82
83    async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
84        self.load_header(id).await
85    }
86
87    async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
88        let mut query = QueryBuilder::default();
89        let where_clause = query.header_where_clause(id)?;
90        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
91        // selecting by payload ID, as payloads are not unique), we return the first one.
92        let sql = format!(
93            "SELECT {PAYLOAD_COLUMNS}
94              FROM header AS h
95              JOIN payload AS p ON h.height = p.height
96              WHERE {where_clause}
97              ORDER BY h.height
98              LIMIT 1"
99        );
100        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
101        let payload = PayloadQueryData::from_row(&row)?;
102        Ok(payload)
103    }
104
105    async fn get_payload_metadata(
106        &mut self,
107        id: BlockId<Types>,
108    ) -> QueryResult<PayloadMetadata<Types>> {
109        let mut query = QueryBuilder::default();
110        let where_clause = query.header_where_clause(id)?;
111        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
112        // selecting by payload ID, as payloads are not unique), we return the first one.
113        let sql = format!(
114            "SELECT {PAYLOAD_METADATA_COLUMNS}
115              FROM header AS h
116              JOIN payload AS p ON h.height = p.height
117              WHERE {where_clause} AND p.num_transactions IS NOT NULL
118              ORDER BY h.height ASC
119              LIMIT 1"
120        );
121        let row = query
122            .query(&sql)
123            .fetch_optional(self.as_mut())
124            .await?
125            .context(MissingSnafu)?;
126        let mut payload = PayloadMetadata::from_row(&row)?;
127        payload.namespaces = self
128            .load_namespaces::<Types>(payload.height(), payload.size)
129            .await?;
130        Ok(payload)
131    }
132
133    async fn get_vid_common(
134        &mut self,
135        id: BlockId<Types>,
136    ) -> QueryResult<VidCommonQueryData<Types>> {
137        let mut query = QueryBuilder::default();
138        let where_clause = query.header_where_clause(id)?;
139        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
140        // selecting by payload ID, as payloads are not unique), we return the first one.
141        let sql = format!(
142            "SELECT {VID_COMMON_COLUMNS}
143              FROM header AS h
144              JOIN vid2 AS v ON h.height = v.height
145              WHERE {where_clause}
146              ORDER BY h.height
147              LIMIT 1"
148        );
149        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
150        let common = VidCommonQueryData::from_row(&row)?;
151        Ok(common)
152    }
153
154    async fn get_vid_common_metadata(
155        &mut self,
156        id: BlockId<Types>,
157    ) -> QueryResult<VidCommonMetadata<Types>> {
158        let mut query = QueryBuilder::default();
159        let where_clause = query.header_where_clause(id)?;
160        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
161        // selecting by payload ID, as payloads are not unique), we return the first one.
162        let sql = format!(
163            "SELECT {VID_COMMON_METADATA_COLUMNS}
164              FROM header AS h
165              JOIN vid2 AS v ON h.height = v.height
166              WHERE {where_clause}
167              ORDER BY h.height ASC
168              LIMIT 1"
169        );
170        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
171        let common = VidCommonMetadata::from_row(&row)?;
172        Ok(common)
173    }
174
175    async fn get_leaf_range<R>(
176        &mut self,
177        range: R,
178    ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
179    where
180        R: RangeBounds<usize> + Send,
181    {
182        let mut query = QueryBuilder::default();
183        let where_clause = query.bounds_to_where_clause(range, "height")?;
184        let sql = format!("SELECT {LEAF_COLUMNS} FROM leaf2 {where_clause} ORDER BY height ASC");
185        Ok(query
186            .query(&sql)
187            .fetch(self.as_mut())
188            .map(|res| LeafQueryData::from_row(&res?))
189            .map_err(QueryError::from)
190            .collect()
191            .await)
192    }
193
194    async fn get_block_range<R>(
195        &mut self,
196        range: R,
197    ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
198    where
199        R: RangeBounds<usize> + Send,
200    {
201        let mut query = QueryBuilder::default();
202        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
203        let sql = format!(
204            "SELECT {BLOCK_COLUMNS}
205              FROM header AS h
206              JOIN payload AS p ON h.height = p.height
207              {where_clause}
208              ORDER BY h.height"
209        );
210        Ok(query
211            .query(&sql)
212            .fetch(self.as_mut())
213            .map(|res| BlockQueryData::from_row(&res?))
214            .map_err(QueryError::from)
215            .collect()
216            .await)
217    }
218
219    async fn get_header_range<R>(
220        &mut self,
221        range: R,
222    ) -> QueryResult<Vec<QueryResult<Header<Types>>>>
223    where
224        R: RangeBounds<usize> + Send,
225    {
226        let mut query = QueryBuilder::default();
227        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
228
229        let headers = query
230            .query(&format!(
231                "SELECT data
232                  FROM header AS h
233                  {where_clause}
234                  ORDER BY h.height"
235            ))
236            .fetch(self.as_mut())
237            .map(|res| serde_json::from_value(res?.get("data")).unwrap())
238            .collect()
239            .await;
240
241        Ok(headers)
242    }
243
244    async fn get_payload_range<R>(
245        &mut self,
246        range: R,
247    ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
248    where
249        R: RangeBounds<usize> + Send,
250    {
251        let mut query = QueryBuilder::default();
252        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
253        let sql = format!(
254            "SELECT {PAYLOAD_COLUMNS}
255              FROM header AS h
256              JOIN payload AS p ON h.height = p.height
257              {where_clause}
258              ORDER BY h.height"
259        );
260        Ok(query
261            .query(&sql)
262            .fetch(self.as_mut())
263            .map(|res| PayloadQueryData::from_row(&res?))
264            .map_err(QueryError::from)
265            .collect()
266            .await)
267    }
268
269    async fn get_payload_metadata_range<R>(
270        &mut self,
271        range: R,
272    ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
273    where
274        R: RangeBounds<usize> + Send + 'static,
275    {
276        let mut query = QueryBuilder::default();
277        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
278        let sql = format!(
279            "SELECT {PAYLOAD_METADATA_COLUMNS}
280              FROM header AS h
281              JOIN payload AS p ON h.height = p.height
282              {where_clause} AND p.num_transactions IS NOT NULL
283              ORDER BY h.height ASC"
284        );
285        let rows = query
286            .query(&sql)
287            .fetch(self.as_mut())
288            .collect::<Vec<_>>()
289            .await;
290        let mut payloads = vec![];
291        for row in rows {
292            let res = async {
293                let mut meta = PayloadMetadata::from_row(&row?)?;
294                meta.namespaces = self
295                    .load_namespaces::<Types>(meta.height(), meta.size)
296                    .await?;
297                Ok(meta)
298            }
299            .await;
300            payloads.push(res);
301        }
302        Ok(payloads)
303    }
304
305    async fn get_vid_common_range<R>(
306        &mut self,
307        range: R,
308    ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
309    where
310        R: RangeBounds<usize> + Send,
311    {
312        let mut query = QueryBuilder::default();
313        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
314        let sql = format!(
315            "SELECT {VID_COMMON_COLUMNS}
316              FROM header AS h
317              JOIN vid2 AS v ON h.height = v.height
318              {where_clause}
319              ORDER BY h.height"
320        );
321        Ok(query
322            .query(&sql)
323            .fetch(self.as_mut())
324            .map(|res| VidCommonQueryData::from_row(&res?))
325            .map_err(QueryError::from)
326            .collect()
327            .await)
328    }
329
330    async fn get_vid_common_metadata_range<R>(
331        &mut self,
332        range: R,
333    ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
334    where
335        R: RangeBounds<usize> + Send,
336    {
337        let mut query = QueryBuilder::default();
338        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
339        let sql = format!(
340            "SELECT {VID_COMMON_METADATA_COLUMNS}
341              FROM header AS h
342              JOIN vid2 AS v ON h.height = v.height
343              {where_clause}
344              ORDER BY h.height ASC"
345        );
346        Ok(query
347            .query(&sql)
348            .fetch(self.as_mut())
349            .map(|res| VidCommonMetadata::from_row(&res?))
350            .map_err(QueryError::from)
351            .collect()
352            .await)
353    }
354
355    async fn get_transaction(
356        &mut self,
357        hash: TransactionHash<Types>,
358    ) -> QueryResult<TransactionQueryData<Types>> {
359        let mut query = QueryBuilder::default();
360        let hash_param = query.bind(hash.to_string())?;
361
362        // ORDER BY ASC ensures that if there are duplicate transactions, we return the first
363        // one.
364        let sql = format!(
365            "SELECT {BLOCK_COLUMNS}
366                FROM header AS h
367                JOIN payload AS p ON h.height = p.height
368                JOIN transactions AS t ON t.block_height = h.height
369                WHERE t.hash = {hash_param}
370                ORDER BY t.block_height, t.ns_id, t.position
371                LIMIT 1"
372        );
373        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
374
375        // Extract the block.
376        let block = BlockQueryData::from_row(&row)?;
377
378        TransactionQueryData::with_hash(&block, hash).context(ErrorSnafu {
379            message: format!(
380                "transaction index inconsistent: block {} contains no transaction {hash}",
381                block.height()
382            ),
383        })
384    }
385
386    async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
387        let row = query(&format!(
388            "SELECT {LEAF_COLUMNS} FROM leaf2 WHERE height >= $1 ORDER BY height ASC LIMIT 1"
389        ))
390        .bind(from as i64)
391        .fetch_one(self.as_mut())
392        .await?;
393        let leaf = LeafQueryData::from_row(&row)?;
394        Ok(leaf)
395    }
396
397    async fn get_state_cert(&mut self, epoch: u64) -> QueryResult<StateCertQueryData<Types>> {
398        let row = query(&format!(
399            "SELECT {STATE_CERT_COLUMNS} FROM finalized_state_cert WHERE epoch = $1 LIMIT 1"
400        ))
401        .bind(epoch as i64)
402        .fetch_one(self.as_mut())
403        .await?;
404        Ok(StateCertQueryData::from_row(&row)?)
405    }
406}
407
408impl<Mode> Transaction<Mode>
409where
410    Mode: TransactionMode,
411{
412    async fn load_namespaces<Types>(
413        &mut self,
414        height: u64,
415        payload_size: u64,
416    ) -> QueryResult<NamespaceMap<Types>>
417    where
418        Types: NodeType,
419        Header<Types>: QueryableHeader<Types>,
420        Payload<Types>: QueryablePayload<Types>,
421    {
422        let header = self
423            .get_header(BlockId::<Types>::from(height as usize))
424            .await?;
425        let map = query(
426            "SELECT ns_id, ns_index, max(position) + 1 AS count
427               FROM  transactions
428               WHERE block_height = $1
429               GROUP BY ns_id, ns_index",
430        )
431        .bind(height as i64)
432        .fetch(self.as_mut())
433        .map_ok(|row| {
434            let ns = row.get::<i64, _>("ns_index").into();
435            let id = row.get::<i64, _>("ns_id").into();
436            let num_transactions = row.get::<i64, _>("count") as u64;
437            let size = header.namespace_size(&ns, payload_size as usize);
438            (
439                id,
440                NamespaceInfo {
441                    num_transactions,
442                    size,
443                },
444            )
445        })
446        .try_collect()
447        .await?;
448        Ok(map)
449    }
450}