hotshot_query_service/data_source/storage/sql/queries/
availability.rs1use std::ops::RangeBounds;
16
17use async_trait::async_trait;
18use futures::stream::{StreamExt, TryStreamExt};
19use hotshot_types::traits::node_implementation::NodeType;
20use snafu::OptionExt;
21use sqlx::FromRow;
22
23use super::{
24 super::transaction::{query, Transaction, TransactionMode},
25 QueryBuilder, BLOCK_COLUMNS, LEAF_COLUMNS, PAYLOAD_COLUMNS, PAYLOAD_METADATA_COLUMNS,
26 STATE_CERT_COLUMNS, VID_COMMON_COLUMNS, VID_COMMON_METADATA_COLUMNS,
27};
28use crate::{
29 availability::{
30 BlockId, BlockQueryData, LeafId, LeafQueryData, NamespaceInfo, NamespaceMap,
31 PayloadQueryData, QueryableHeader, QueryablePayload, StateCertQueryData, TransactionHash,
32 TransactionQueryData, VidCommonQueryData,
33 },
34 data_source::storage::{
35 sql::sqlx::Row, AvailabilityStorage, PayloadMetadata, VidCommonMetadata,
36 },
37 types::HeightIndexed,
38 ErrorSnafu, Header, MissingSnafu, Payload, QueryError, QueryResult,
39};
40
41#[async_trait]
42impl<Mode, Types> AvailabilityStorage<Types> for Transaction<Mode>
43where
44 Types: NodeType,
45 Mode: TransactionMode,
46 Payload<Types>: QueryablePayload<Types>,
47 Header<Types>: QueryableHeader<Types>,
48{
49 async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
50 let mut query = QueryBuilder::default();
51 let where_clause = match id {
52 LeafId::Number(n) => format!("height = {}", query.bind(n as i64)?),
53 LeafId::Hash(h) => format!("hash = {}", query.bind(h.to_string())?),
54 };
55 let row = query
56 .query(&format!(
57 "SELECT {LEAF_COLUMNS} FROM leaf2 WHERE {where_clause} LIMIT 1"
58 ))
59 .fetch_one(self.as_mut())
60 .await?;
61 let leaf = LeafQueryData::from_row(&row)?;
62 Ok(leaf)
63 }
64
65 async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
66 let mut query = QueryBuilder::default();
67 let where_clause = query.header_where_clause(id)?;
68 let sql = format!(
71 "SELECT {BLOCK_COLUMNS}
72 FROM header AS h
73 JOIN payload AS p ON h.height = p.height
74 WHERE {where_clause}
75 ORDER BY h.height
76 LIMIT 1"
77 );
78 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
79 let block = BlockQueryData::from_row(&row)?;
80 Ok(block)
81 }
82
83 async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
84 self.load_header(id).await
85 }
86
87 async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
88 let mut query = QueryBuilder::default();
89 let where_clause = query.header_where_clause(id)?;
90 let sql = format!(
93 "SELECT {PAYLOAD_COLUMNS}
94 FROM header AS h
95 JOIN payload AS p ON h.height = p.height
96 WHERE {where_clause}
97 ORDER BY h.height
98 LIMIT 1"
99 );
100 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
101 let payload = PayloadQueryData::from_row(&row)?;
102 Ok(payload)
103 }
104
105 async fn get_payload_metadata(
106 &mut self,
107 id: BlockId<Types>,
108 ) -> QueryResult<PayloadMetadata<Types>> {
109 let mut query = QueryBuilder::default();
110 let where_clause = query.header_where_clause(id)?;
111 let sql = format!(
114 "SELECT {PAYLOAD_METADATA_COLUMNS}
115 FROM header AS h
116 JOIN payload AS p ON h.height = p.height
117 WHERE {where_clause} AND p.num_transactions IS NOT NULL
118 ORDER BY h.height ASC
119 LIMIT 1"
120 );
121 let row = query
122 .query(&sql)
123 .fetch_optional(self.as_mut())
124 .await?
125 .context(MissingSnafu)?;
126 let mut payload = PayloadMetadata::from_row(&row)?;
127 payload.namespaces = self
128 .load_namespaces::<Types>(payload.height(), payload.size)
129 .await?;
130 Ok(payload)
131 }
132
133 async fn get_vid_common(
134 &mut self,
135 id: BlockId<Types>,
136 ) -> QueryResult<VidCommonQueryData<Types>> {
137 let mut query = QueryBuilder::default();
138 let where_clause = query.header_where_clause(id)?;
139 let sql = format!(
142 "SELECT {VID_COMMON_COLUMNS}
143 FROM header AS h
144 JOIN vid2 AS v ON h.height = v.height
145 WHERE {where_clause}
146 ORDER BY h.height
147 LIMIT 1"
148 );
149 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
150 let common = VidCommonQueryData::from_row(&row)?;
151 Ok(common)
152 }
153
154 async fn get_vid_common_metadata(
155 &mut self,
156 id: BlockId<Types>,
157 ) -> QueryResult<VidCommonMetadata<Types>> {
158 let mut query = QueryBuilder::default();
159 let where_clause = query.header_where_clause(id)?;
160 let sql = format!(
163 "SELECT {VID_COMMON_METADATA_COLUMNS}
164 FROM header AS h
165 JOIN vid2 AS v ON h.height = v.height
166 WHERE {where_clause}
167 ORDER BY h.height ASC
168 LIMIT 1"
169 );
170 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
171 let common = VidCommonMetadata::from_row(&row)?;
172 Ok(common)
173 }
174
175 async fn get_leaf_range<R>(
176 &mut self,
177 range: R,
178 ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
179 where
180 R: RangeBounds<usize> + Send,
181 {
182 let mut query = QueryBuilder::default();
183 let where_clause = query.bounds_to_where_clause(range, "height")?;
184 let sql = format!("SELECT {LEAF_COLUMNS} FROM leaf2 {where_clause} ORDER BY height ASC");
185 Ok(query
186 .query(&sql)
187 .fetch(self.as_mut())
188 .map(|res| LeafQueryData::from_row(&res?))
189 .map_err(QueryError::from)
190 .collect()
191 .await)
192 }
193
194 async fn get_block_range<R>(
195 &mut self,
196 range: R,
197 ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
198 where
199 R: RangeBounds<usize> + Send,
200 {
201 let mut query = QueryBuilder::default();
202 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
203 let sql = format!(
204 "SELECT {BLOCK_COLUMNS}
205 FROM header AS h
206 JOIN payload AS p ON h.height = p.height
207 {where_clause}
208 ORDER BY h.height"
209 );
210 Ok(query
211 .query(&sql)
212 .fetch(self.as_mut())
213 .map(|res| BlockQueryData::from_row(&res?))
214 .map_err(QueryError::from)
215 .collect()
216 .await)
217 }
218
219 async fn get_header_range<R>(
220 &mut self,
221 range: R,
222 ) -> QueryResult<Vec<QueryResult<Header<Types>>>>
223 where
224 R: RangeBounds<usize> + Send,
225 {
226 let mut query = QueryBuilder::default();
227 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
228
229 let headers = query
230 .query(&format!(
231 "SELECT data
232 FROM header AS h
233 {where_clause}
234 ORDER BY h.height"
235 ))
236 .fetch(self.as_mut())
237 .map(|res| serde_json::from_value(res?.get("data")).unwrap())
238 .collect()
239 .await;
240
241 Ok(headers)
242 }
243
244 async fn get_payload_range<R>(
245 &mut self,
246 range: R,
247 ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
248 where
249 R: RangeBounds<usize> + Send,
250 {
251 let mut query = QueryBuilder::default();
252 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
253 let sql = format!(
254 "SELECT {PAYLOAD_COLUMNS}
255 FROM header AS h
256 JOIN payload AS p ON h.height = p.height
257 {where_clause}
258 ORDER BY h.height"
259 );
260 Ok(query
261 .query(&sql)
262 .fetch(self.as_mut())
263 .map(|res| PayloadQueryData::from_row(&res?))
264 .map_err(QueryError::from)
265 .collect()
266 .await)
267 }
268
269 async fn get_payload_metadata_range<R>(
270 &mut self,
271 range: R,
272 ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
273 where
274 R: RangeBounds<usize> + Send + 'static,
275 {
276 let mut query = QueryBuilder::default();
277 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
278 let sql = format!(
279 "SELECT {PAYLOAD_METADATA_COLUMNS}
280 FROM header AS h
281 JOIN payload AS p ON h.height = p.height
282 {where_clause} AND p.num_transactions IS NOT NULL
283 ORDER BY h.height ASC"
284 );
285 let rows = query
286 .query(&sql)
287 .fetch(self.as_mut())
288 .collect::<Vec<_>>()
289 .await;
290 let mut payloads = vec![];
291 for row in rows {
292 let res = async {
293 let mut meta = PayloadMetadata::from_row(&row?)?;
294 meta.namespaces = self
295 .load_namespaces::<Types>(meta.height(), meta.size)
296 .await?;
297 Ok(meta)
298 }
299 .await;
300 payloads.push(res);
301 }
302 Ok(payloads)
303 }
304
305 async fn get_vid_common_range<R>(
306 &mut self,
307 range: R,
308 ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
309 where
310 R: RangeBounds<usize> + Send,
311 {
312 let mut query = QueryBuilder::default();
313 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
314 let sql = format!(
315 "SELECT {VID_COMMON_COLUMNS}
316 FROM header AS h
317 JOIN vid2 AS v ON h.height = v.height
318 {where_clause}
319 ORDER BY h.height"
320 );
321 Ok(query
322 .query(&sql)
323 .fetch(self.as_mut())
324 .map(|res| VidCommonQueryData::from_row(&res?))
325 .map_err(QueryError::from)
326 .collect()
327 .await)
328 }
329
330 async fn get_vid_common_metadata_range<R>(
331 &mut self,
332 range: R,
333 ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
334 where
335 R: RangeBounds<usize> + Send,
336 {
337 let mut query = QueryBuilder::default();
338 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
339 let sql = format!(
340 "SELECT {VID_COMMON_METADATA_COLUMNS}
341 FROM header AS h
342 JOIN vid2 AS v ON h.height = v.height
343 {where_clause}
344 ORDER BY h.height ASC"
345 );
346 Ok(query
347 .query(&sql)
348 .fetch(self.as_mut())
349 .map(|res| VidCommonMetadata::from_row(&res?))
350 .map_err(QueryError::from)
351 .collect()
352 .await)
353 }
354
355 async fn get_transaction(
356 &mut self,
357 hash: TransactionHash<Types>,
358 ) -> QueryResult<TransactionQueryData<Types>> {
359 let mut query = QueryBuilder::default();
360 let hash_param = query.bind(hash.to_string())?;
361
362 let sql = format!(
365 "SELECT {BLOCK_COLUMNS}
366 FROM header AS h
367 JOIN payload AS p ON h.height = p.height
368 JOIN transactions AS t ON t.block_height = h.height
369 WHERE t.hash = {hash_param}
370 ORDER BY t.block_height, t.ns_id, t.position
371 LIMIT 1"
372 );
373 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
374
375 let block = BlockQueryData::from_row(&row)?;
377
378 TransactionQueryData::with_hash(&block, hash).context(ErrorSnafu {
379 message: format!(
380 "transaction index inconsistent: block {} contains no transaction {hash}",
381 block.height()
382 ),
383 })
384 }
385
386 async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
387 let row = query(&format!(
388 "SELECT {LEAF_COLUMNS} FROM leaf2 WHERE height >= $1 ORDER BY height ASC LIMIT 1"
389 ))
390 .bind(from as i64)
391 .fetch_one(self.as_mut())
392 .await?;
393 let leaf = LeafQueryData::from_row(&row)?;
394 Ok(leaf)
395 }
396
397 async fn get_state_cert(&mut self, epoch: u64) -> QueryResult<StateCertQueryData<Types>> {
398 let row = query(&format!(
399 "SELECT {STATE_CERT_COLUMNS} FROM finalized_state_cert WHERE epoch = $1 LIMIT 1"
400 ))
401 .bind(epoch as i64)
402 .fetch_one(self.as_mut())
403 .await?;
404 Ok(StateCertQueryData::from_row(&row)?)
405 }
406}
407
408impl<Mode> Transaction<Mode>
409where
410 Mode: TransactionMode,
411{
412 async fn load_namespaces<Types>(
413 &mut self,
414 height: u64,
415 payload_size: u64,
416 ) -> QueryResult<NamespaceMap<Types>>
417 where
418 Types: NodeType,
419 Header<Types>: QueryableHeader<Types>,
420 Payload<Types>: QueryablePayload<Types>,
421 {
422 let header = self
423 .get_header(BlockId::<Types>::from(height as usize))
424 .await?;
425 let map = query(
426 "SELECT ns_id, ns_index, max(position) + 1 AS count
427 FROM transactions
428 WHERE block_height = $1
429 GROUP BY ns_id, ns_index",
430 )
431 .bind(height as i64)
432 .fetch(self.as_mut())
433 .map_ok(|row| {
434 let ns = row.get::<i64, _>("ns_index").into();
435 let id = row.get::<i64, _>("ns_id").into();
436 let num_transactions = row.get::<i64, _>("count") as u64;
437 let size = header.namespace_size(&ns, payload_size as usize);
438 (
439 id,
440 NamespaceInfo {
441 num_transactions,
442 size,
443 },
444 )
445 })
446 .try_collect()
447 .await?;
448 Ok(map)
449 }
450}