hotshot_query_service/data_source/storage/sql/queries/
availability.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Availability storage implementation for a database query engine.
14
15use std::ops::RangeBounds;
16
17use async_trait::async_trait;
18use futures::stream::{StreamExt, TryStreamExt};
19use hotshot_types::traits::node_implementation::NodeType;
20use snafu::OptionExt;
21use sqlx::FromRow;
22
23use super::{
24    super::transaction::{query, Transaction, TransactionMode},
25    QueryBuilder, BLOCK_COLUMNS, LEAF_COLUMNS, PAYLOAD_COLUMNS, PAYLOAD_METADATA_COLUMNS,
26    VID_COMMON_COLUMNS, VID_COMMON_METADATA_COLUMNS,
27};
28use crate::{
29    availability::{
30        BlockId, BlockQueryData, LeafId, LeafQueryData, NamespaceInfo, NamespaceMap,
31        PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash, VidCommonQueryData,
32    },
33    data_source::storage::{
34        sql::sqlx::Row, AvailabilityStorage, PayloadMetadata, VidCommonMetadata,
35    },
36    types::HeightIndexed,
37    Header, MissingSnafu, Payload, QueryError, QueryResult,
38};
39
40#[async_trait]
41impl<Mode, Types> AvailabilityStorage<Types> for Transaction<Mode>
42where
43    Types: NodeType,
44    Mode: TransactionMode,
45    Payload<Types>: QueryablePayload<Types>,
46    Header<Types>: QueryableHeader<Types>,
47{
48    async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
49        let mut query = QueryBuilder::default();
50        let where_clause = match id {
51            LeafId::Number(n) => format!("height = {}", query.bind(n as i64)?),
52            LeafId::Hash(h) => format!("hash = {}", query.bind(h.to_string())?),
53        };
54        let row = query
55            .query(&format!(
56                "SELECT {LEAF_COLUMNS} FROM leaf2 WHERE {where_clause} LIMIT 1"
57            ))
58            .fetch_one(self.as_mut())
59            .await?;
60        let leaf = LeafQueryData::from_row(&row)?;
61        Ok(leaf)
62    }
63
64    async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
65        let mut query = QueryBuilder::default();
66        let where_clause = query.header_where_clause(id)?;
67        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
68        // selecting by payload ID, as payloads are not unique), we return the first one.
69        let sql = format!(
70            "SELECT {BLOCK_COLUMNS}
71              FROM header AS h
72              JOIN payload AS p ON h.height = p.height
73              WHERE {where_clause}
74              ORDER BY h.height
75              LIMIT 1"
76        );
77        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
78        let block = BlockQueryData::from_row(&row)?;
79        Ok(block)
80    }
81
82    async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
83        self.load_header(id).await
84    }
85
86    async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
87        let mut query = QueryBuilder::default();
88        let where_clause = query.header_where_clause(id)?;
89        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
90        // selecting by payload ID, as payloads are not unique), we return the first one.
91        let sql = format!(
92            "SELECT {PAYLOAD_COLUMNS}
93              FROM header AS h
94              JOIN payload AS p ON h.height = p.height
95              WHERE {where_clause}
96              ORDER BY h.height
97              LIMIT 1"
98        );
99        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
100        let payload = PayloadQueryData::from_row(&row)?;
101        Ok(payload)
102    }
103
104    async fn get_payload_metadata(
105        &mut self,
106        id: BlockId<Types>,
107    ) -> QueryResult<PayloadMetadata<Types>> {
108        let mut query = QueryBuilder::default();
109        let where_clause = query.header_where_clause(id)?;
110        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
111        // selecting by payload ID, as payloads are not unique), we return the first one.
112        let sql = format!(
113            "SELECT {PAYLOAD_METADATA_COLUMNS}
114              FROM header AS h
115              JOIN payload AS p ON h.height = p.height
116              WHERE {where_clause} AND p.num_transactions IS NOT NULL
117              ORDER BY h.height ASC
118              LIMIT 1"
119        );
120        let row = query
121            .query(&sql)
122            .fetch_optional(self.as_mut())
123            .await?
124            .context(MissingSnafu)?;
125        let mut payload = PayloadMetadata::from_row(&row)?;
126        payload.namespaces = self
127            .load_namespaces::<Types>(payload.height(), payload.size)
128            .await?;
129        Ok(payload)
130    }
131
132    async fn get_vid_common(
133        &mut self,
134        id: BlockId<Types>,
135    ) -> QueryResult<VidCommonQueryData<Types>> {
136        let mut query = QueryBuilder::default();
137        let where_clause = query.header_where_clause(id)?;
138        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
139        // selecting by payload ID, as payloads are not unique), we return the first one.
140        let sql = format!(
141            "SELECT {VID_COMMON_COLUMNS}
142              FROM header AS h
143              JOIN vid2 AS v ON h.height = v.height
144              WHERE {where_clause}
145              ORDER BY h.height
146              LIMIT 1"
147        );
148        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
149        let common = VidCommonQueryData::from_row(&row)?;
150        Ok(common)
151    }
152
153    async fn get_vid_common_metadata(
154        &mut self,
155        id: BlockId<Types>,
156    ) -> QueryResult<VidCommonMetadata<Types>> {
157        let mut query = QueryBuilder::default();
158        let where_clause = query.header_where_clause(id)?;
159        // ORDER BY h.height ASC ensures that if there are duplicate blocks (this can happen when
160        // selecting by payload ID, as payloads are not unique), we return the first one.
161        let sql = format!(
162            "SELECT {VID_COMMON_METADATA_COLUMNS}
163              FROM header AS h
164              JOIN vid2 AS v ON h.height = v.height
165              WHERE {where_clause}
166              ORDER BY h.height ASC
167              LIMIT 1"
168        );
169        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
170        let common = VidCommonMetadata::from_row(&row)?;
171        Ok(common)
172    }
173
174    async fn get_leaf_range<R>(
175        &mut self,
176        range: R,
177    ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
178    where
179        R: RangeBounds<usize> + Send,
180    {
181        let mut query = QueryBuilder::default();
182        let where_clause = query.bounds_to_where_clause(range, "height")?;
183        let sql = format!("SELECT {LEAF_COLUMNS} FROM leaf2 {where_clause} ORDER BY height ASC");
184        Ok(query
185            .query(&sql)
186            .fetch(self.as_mut())
187            .map(|res| LeafQueryData::from_row(&res?))
188            .map_err(QueryError::from)
189            .collect()
190            .await)
191    }
192
193    async fn get_block_range<R>(
194        &mut self,
195        range: R,
196    ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
197    where
198        R: RangeBounds<usize> + Send,
199    {
200        let mut query = QueryBuilder::default();
201        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
202        let sql = format!(
203            "SELECT {BLOCK_COLUMNS}
204              FROM header AS h
205              JOIN payload AS p ON h.height = p.height
206              {where_clause}
207              ORDER BY h.height"
208        );
209        Ok(query
210            .query(&sql)
211            .fetch(self.as_mut())
212            .map(|res| BlockQueryData::from_row(&res?))
213            .map_err(QueryError::from)
214            .collect()
215            .await)
216    }
217
218    async fn get_header_range<R>(
219        &mut self,
220        range: R,
221    ) -> QueryResult<Vec<QueryResult<Header<Types>>>>
222    where
223        R: RangeBounds<usize> + Send,
224    {
225        let mut query = QueryBuilder::default();
226        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
227
228        let headers = query
229            .query(&format!(
230                "SELECT data
231                  FROM header AS h
232                  {where_clause}
233                  ORDER BY h.height"
234            ))
235            .fetch(self.as_mut())
236            .map(|res| serde_json::from_value(res?.get("data")).unwrap())
237            .collect()
238            .await;
239
240        Ok(headers)
241    }
242
243    async fn get_payload_range<R>(
244        &mut self,
245        range: R,
246    ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
247    where
248        R: RangeBounds<usize> + Send,
249    {
250        let mut query = QueryBuilder::default();
251        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
252        let sql = format!(
253            "SELECT {PAYLOAD_COLUMNS}
254              FROM header AS h
255              JOIN payload AS p ON h.height = p.height
256              {where_clause}
257              ORDER BY h.height"
258        );
259        Ok(query
260            .query(&sql)
261            .fetch(self.as_mut())
262            .map(|res| PayloadQueryData::from_row(&res?))
263            .map_err(QueryError::from)
264            .collect()
265            .await)
266    }
267
268    async fn get_payload_metadata_range<R>(
269        &mut self,
270        range: R,
271    ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
272    where
273        R: RangeBounds<usize> + Send + 'static,
274    {
275        let mut query = QueryBuilder::default();
276        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
277        let sql = format!(
278            "SELECT {PAYLOAD_METADATA_COLUMNS}
279              FROM header AS h
280              JOIN payload AS p ON h.height = p.height
281              {where_clause} AND p.num_transactions IS NOT NULL
282              ORDER BY h.height ASC"
283        );
284        let rows = query
285            .query(&sql)
286            .fetch(self.as_mut())
287            .collect::<Vec<_>>()
288            .await;
289        let mut payloads = vec![];
290        for row in rows {
291            let res = async {
292                let mut meta = PayloadMetadata::from_row(&row?)?;
293                meta.namespaces = self
294                    .load_namespaces::<Types>(meta.height(), meta.size)
295                    .await?;
296                Ok(meta)
297            }
298            .await;
299            payloads.push(res);
300        }
301        Ok(payloads)
302    }
303
304    async fn get_vid_common_range<R>(
305        &mut self,
306        range: R,
307    ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
308    where
309        R: RangeBounds<usize> + Send,
310    {
311        let mut query = QueryBuilder::default();
312        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
313        let sql = format!(
314            "SELECT {VID_COMMON_COLUMNS}
315              FROM header AS h
316              JOIN vid2 AS v ON h.height = v.height
317              {where_clause}
318              ORDER BY h.height"
319        );
320        Ok(query
321            .query(&sql)
322            .fetch(self.as_mut())
323            .map(|res| VidCommonQueryData::from_row(&res?))
324            .map_err(QueryError::from)
325            .collect()
326            .await)
327    }
328
329    async fn get_vid_common_metadata_range<R>(
330        &mut self,
331        range: R,
332    ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
333    where
334        R: RangeBounds<usize> + Send,
335    {
336        let mut query = QueryBuilder::default();
337        let where_clause = query.bounds_to_where_clause(range, "h.height")?;
338        let sql = format!(
339            "SELECT {VID_COMMON_METADATA_COLUMNS}
340              FROM header AS h
341              JOIN vid2 AS v ON h.height = v.height
342              {where_clause}
343              ORDER BY h.height ASC"
344        );
345        Ok(query
346            .query(&sql)
347            .fetch(self.as_mut())
348            .map(|res| VidCommonMetadata::from_row(&res?))
349            .map_err(QueryError::from)
350            .collect()
351            .await)
352    }
353
354    async fn get_block_with_transaction(
355        &mut self,
356        hash: TransactionHash<Types>,
357    ) -> QueryResult<BlockQueryData<Types>> {
358        let mut query = QueryBuilder::default();
359        let hash_param = query.bind(hash.to_string())?;
360
361        // ORDER BY ASC ensures that if there are duplicate transactions, we return the first
362        // one.
363        let sql = format!(
364            "SELECT {BLOCK_COLUMNS}
365                FROM header AS h
366                JOIN payload AS p ON h.height = p.height
367                JOIN transactions AS t ON t.block_height = h.height
368                WHERE t.hash = {hash_param}
369                ORDER BY t.block_height, t.ns_id, t.position
370                LIMIT 1"
371        );
372        let row = query.query(&sql).fetch_one(self.as_mut()).await?;
373        Ok(BlockQueryData::from_row(&row)?)
374    }
375
376    async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
377        let row = query(&format!(
378            "SELECT {LEAF_COLUMNS} FROM leaf2 WHERE height >= $1 ORDER BY height ASC LIMIT 1"
379        ))
380        .bind(from as i64)
381        .fetch_one(self.as_mut())
382        .await?;
383        let leaf = LeafQueryData::from_row(&row)?;
384        Ok(leaf)
385    }
386}
387
388impl<Mode> Transaction<Mode>
389where
390    Mode: TransactionMode,
391{
392    async fn load_namespaces<Types>(
393        &mut self,
394        height: u64,
395        payload_size: u64,
396    ) -> QueryResult<NamespaceMap<Types>>
397    where
398        Types: NodeType,
399        Header<Types>: QueryableHeader<Types>,
400        Payload<Types>: QueryablePayload<Types>,
401    {
402        let header = self
403            .get_header(BlockId::<Types>::from(height as usize))
404            .await?;
405        let map = query(
406            "SELECT ns_id, ns_index, max(position) + 1 AS count
407               FROM  transactions
408               WHERE block_height = $1
409               GROUP BY ns_id, ns_index",
410        )
411        .bind(height as i64)
412        .fetch(self.as_mut())
413        .map_ok(|row| {
414            let ns = row.get::<i64, _>("ns_index").into();
415            let id = row.get::<i64, _>("ns_id").into();
416            let num_transactions = row.get::<i64, _>("count") as u64;
417            let size = header.namespace_size(&ns, payload_size as usize);
418            (
419                id,
420                NamespaceInfo {
421                    num_transactions,
422                    size,
423                },
424            )
425        })
426        .try_collect()
427        .await?;
428        Ok(map)
429    }
430}