hotshot_query_service/data_source/storage/sql/queries/
availability.rs1use std::ops::RangeBounds;
16
17use async_trait::async_trait;
18use futures::stream::{StreamExt, TryStreamExt};
19use hotshot_types::traits::node_implementation::NodeType;
20use snafu::OptionExt;
21use sqlx::FromRow;
22
23use super::{
24 super::transaction::{query, Transaction, TransactionMode},
25 QueryBuilder, BLOCK_COLUMNS, LEAF_COLUMNS, PAYLOAD_COLUMNS, PAYLOAD_METADATA_COLUMNS,
26 VID_COMMON_COLUMNS, VID_COMMON_METADATA_COLUMNS,
27};
28use crate::{
29 availability::{
30 BlockId, BlockQueryData, LeafId, LeafQueryData, NamespaceInfo, NamespaceMap,
31 PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash, VidCommonQueryData,
32 },
33 data_source::storage::{
34 sql::sqlx::Row, AvailabilityStorage, PayloadMetadata, VidCommonMetadata,
35 },
36 types::HeightIndexed,
37 Header, MissingSnafu, Payload, QueryError, QueryResult,
38};
39
40#[async_trait]
41impl<Mode, Types> AvailabilityStorage<Types> for Transaction<Mode>
42where
43 Types: NodeType,
44 Mode: TransactionMode,
45 Payload<Types>: QueryablePayload<Types>,
46 Header<Types>: QueryableHeader<Types>,
47{
48 async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
49 let mut query = QueryBuilder::default();
50 let where_clause = match id {
51 LeafId::Number(n) => format!("height = {}", query.bind(n as i64)?),
52 LeafId::Hash(h) => format!("hash = {}", query.bind(h.to_string())?),
53 };
54 let row = query
55 .query(&format!(
56 "SELECT {LEAF_COLUMNS} FROM leaf2 WHERE {where_clause} LIMIT 1"
57 ))
58 .fetch_one(self.as_mut())
59 .await?;
60 let leaf = LeafQueryData::from_row(&row)?;
61 Ok(leaf)
62 }
63
64 async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
65 let mut query = QueryBuilder::default();
66 let where_clause = query.header_where_clause(id)?;
67 let sql = format!(
70 "SELECT {BLOCK_COLUMNS}
71 FROM header AS h
72 JOIN payload AS p ON h.height = p.height
73 WHERE {where_clause}
74 ORDER BY h.height
75 LIMIT 1"
76 );
77 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
78 let block = BlockQueryData::from_row(&row)?;
79 Ok(block)
80 }
81
82 async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
83 self.load_header(id).await
84 }
85
86 async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
87 let mut query = QueryBuilder::default();
88 let where_clause = query.header_where_clause(id)?;
89 let sql = format!(
92 "SELECT {PAYLOAD_COLUMNS}
93 FROM header AS h
94 JOIN payload AS p ON h.height = p.height
95 WHERE {where_clause}
96 ORDER BY h.height
97 LIMIT 1"
98 );
99 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
100 let payload = PayloadQueryData::from_row(&row)?;
101 Ok(payload)
102 }
103
104 async fn get_payload_metadata(
105 &mut self,
106 id: BlockId<Types>,
107 ) -> QueryResult<PayloadMetadata<Types>> {
108 let mut query = QueryBuilder::default();
109 let where_clause = query.header_where_clause(id)?;
110 let sql = format!(
113 "SELECT {PAYLOAD_METADATA_COLUMNS}
114 FROM header AS h
115 JOIN payload AS p ON h.height = p.height
116 WHERE {where_clause} AND p.num_transactions IS NOT NULL
117 ORDER BY h.height ASC
118 LIMIT 1"
119 );
120 let row = query
121 .query(&sql)
122 .fetch_optional(self.as_mut())
123 .await?
124 .context(MissingSnafu)?;
125 let mut payload = PayloadMetadata::from_row(&row)?;
126 payload.namespaces = self
127 .load_namespaces::<Types>(payload.height(), payload.size)
128 .await?;
129 Ok(payload)
130 }
131
132 async fn get_vid_common(
133 &mut self,
134 id: BlockId<Types>,
135 ) -> QueryResult<VidCommonQueryData<Types>> {
136 let mut query = QueryBuilder::default();
137 let where_clause = query.header_where_clause(id)?;
138 let sql = format!(
141 "SELECT {VID_COMMON_COLUMNS}
142 FROM header AS h
143 JOIN vid2 AS v ON h.height = v.height
144 WHERE {where_clause}
145 ORDER BY h.height
146 LIMIT 1"
147 );
148 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
149 let common = VidCommonQueryData::from_row(&row)?;
150 Ok(common)
151 }
152
153 async fn get_vid_common_metadata(
154 &mut self,
155 id: BlockId<Types>,
156 ) -> QueryResult<VidCommonMetadata<Types>> {
157 let mut query = QueryBuilder::default();
158 let where_clause = query.header_where_clause(id)?;
159 let sql = format!(
162 "SELECT {VID_COMMON_METADATA_COLUMNS}
163 FROM header AS h
164 JOIN vid2 AS v ON h.height = v.height
165 WHERE {where_clause}
166 ORDER BY h.height ASC
167 LIMIT 1"
168 );
169 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
170 let common = VidCommonMetadata::from_row(&row)?;
171 Ok(common)
172 }
173
174 async fn get_leaf_range<R>(
175 &mut self,
176 range: R,
177 ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
178 where
179 R: RangeBounds<usize> + Send,
180 {
181 let mut query = QueryBuilder::default();
182 let where_clause = query.bounds_to_where_clause(range, "height")?;
183 let sql = format!("SELECT {LEAF_COLUMNS} FROM leaf2 {where_clause} ORDER BY height ASC");
184 Ok(query
185 .query(&sql)
186 .fetch(self.as_mut())
187 .map(|res| LeafQueryData::from_row(&res?))
188 .map_err(QueryError::from)
189 .collect()
190 .await)
191 }
192
193 async fn get_block_range<R>(
194 &mut self,
195 range: R,
196 ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
197 where
198 R: RangeBounds<usize> + Send,
199 {
200 let mut query = QueryBuilder::default();
201 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
202 let sql = format!(
203 "SELECT {BLOCK_COLUMNS}
204 FROM header AS h
205 JOIN payload AS p ON h.height = p.height
206 {where_clause}
207 ORDER BY h.height"
208 );
209 Ok(query
210 .query(&sql)
211 .fetch(self.as_mut())
212 .map(|res| BlockQueryData::from_row(&res?))
213 .map_err(QueryError::from)
214 .collect()
215 .await)
216 }
217
218 async fn get_header_range<R>(
219 &mut self,
220 range: R,
221 ) -> QueryResult<Vec<QueryResult<Header<Types>>>>
222 where
223 R: RangeBounds<usize> + Send,
224 {
225 let mut query = QueryBuilder::default();
226 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
227
228 let headers = query
229 .query(&format!(
230 "SELECT data
231 FROM header AS h
232 {where_clause}
233 ORDER BY h.height"
234 ))
235 .fetch(self.as_mut())
236 .map(|res| serde_json::from_value(res?.get("data")).unwrap())
237 .collect()
238 .await;
239
240 Ok(headers)
241 }
242
243 async fn get_payload_range<R>(
244 &mut self,
245 range: R,
246 ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
247 where
248 R: RangeBounds<usize> + Send,
249 {
250 let mut query = QueryBuilder::default();
251 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
252 let sql = format!(
253 "SELECT {PAYLOAD_COLUMNS}
254 FROM header AS h
255 JOIN payload AS p ON h.height = p.height
256 {where_clause}
257 ORDER BY h.height"
258 );
259 Ok(query
260 .query(&sql)
261 .fetch(self.as_mut())
262 .map(|res| PayloadQueryData::from_row(&res?))
263 .map_err(QueryError::from)
264 .collect()
265 .await)
266 }
267
268 async fn get_payload_metadata_range<R>(
269 &mut self,
270 range: R,
271 ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
272 where
273 R: RangeBounds<usize> + Send + 'static,
274 {
275 let mut query = QueryBuilder::default();
276 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
277 let sql = format!(
278 "SELECT {PAYLOAD_METADATA_COLUMNS}
279 FROM header AS h
280 JOIN payload AS p ON h.height = p.height
281 {where_clause} AND p.num_transactions IS NOT NULL
282 ORDER BY h.height ASC"
283 );
284 let rows = query
285 .query(&sql)
286 .fetch(self.as_mut())
287 .collect::<Vec<_>>()
288 .await;
289 let mut payloads = vec![];
290 for row in rows {
291 let res = async {
292 let mut meta = PayloadMetadata::from_row(&row?)?;
293 meta.namespaces = self
294 .load_namespaces::<Types>(meta.height(), meta.size)
295 .await?;
296 Ok(meta)
297 }
298 .await;
299 payloads.push(res);
300 }
301 Ok(payloads)
302 }
303
304 async fn get_vid_common_range<R>(
305 &mut self,
306 range: R,
307 ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
308 where
309 R: RangeBounds<usize> + Send,
310 {
311 let mut query = QueryBuilder::default();
312 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
313 let sql = format!(
314 "SELECT {VID_COMMON_COLUMNS}
315 FROM header AS h
316 JOIN vid2 AS v ON h.height = v.height
317 {where_clause}
318 ORDER BY h.height"
319 );
320 Ok(query
321 .query(&sql)
322 .fetch(self.as_mut())
323 .map(|res| VidCommonQueryData::from_row(&res?))
324 .map_err(QueryError::from)
325 .collect()
326 .await)
327 }
328
329 async fn get_vid_common_metadata_range<R>(
330 &mut self,
331 range: R,
332 ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
333 where
334 R: RangeBounds<usize> + Send,
335 {
336 let mut query = QueryBuilder::default();
337 let where_clause = query.bounds_to_where_clause(range, "h.height")?;
338 let sql = format!(
339 "SELECT {VID_COMMON_METADATA_COLUMNS}
340 FROM header AS h
341 JOIN vid2 AS v ON h.height = v.height
342 {where_clause}
343 ORDER BY h.height ASC"
344 );
345 Ok(query
346 .query(&sql)
347 .fetch(self.as_mut())
348 .map(|res| VidCommonMetadata::from_row(&res?))
349 .map_err(QueryError::from)
350 .collect()
351 .await)
352 }
353
354 async fn get_block_with_transaction(
355 &mut self,
356 hash: TransactionHash<Types>,
357 ) -> QueryResult<BlockQueryData<Types>> {
358 let mut query = QueryBuilder::default();
359 let hash_param = query.bind(hash.to_string())?;
360
361 let sql = format!(
364 "SELECT {BLOCK_COLUMNS}
365 FROM header AS h
366 JOIN payload AS p ON h.height = p.height
367 JOIN transactions AS t ON t.block_height = h.height
368 WHERE t.hash = {hash_param}
369 ORDER BY t.block_height, t.ns_id, t.position
370 LIMIT 1"
371 );
372 let row = query.query(&sql).fetch_one(self.as_mut()).await?;
373 Ok(BlockQueryData::from_row(&row)?)
374 }
375
376 async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
377 let row = query(&format!(
378 "SELECT {LEAF_COLUMNS} FROM leaf2 WHERE height >= $1 ORDER BY height ASC LIMIT 1"
379 ))
380 .bind(from as i64)
381 .fetch_one(self.as_mut())
382 .await?;
383 let leaf = LeafQueryData::from_row(&row)?;
384 Ok(leaf)
385 }
386}
387
388impl<Mode> Transaction<Mode>
389where
390 Mode: TransactionMode,
391{
392 async fn load_namespaces<Types>(
393 &mut self,
394 height: u64,
395 payload_size: u64,
396 ) -> QueryResult<NamespaceMap<Types>>
397 where
398 Types: NodeType,
399 Header<Types>: QueryableHeader<Types>,
400 Payload<Types>: QueryablePayload<Types>,
401 {
402 let header = self
403 .get_header(BlockId::<Types>::from(height as usize))
404 .await?;
405 let map = query(
406 "SELECT ns_id, ns_index, max(position) + 1 AS count
407 FROM transactions
408 WHERE block_height = $1
409 GROUP BY ns_id, ns_index",
410 )
411 .bind(height as i64)
412 .fetch(self.as_mut())
413 .map_ok(|row| {
414 let ns = row.get::<i64, _>("ns_index").into();
415 let id = row.get::<i64, _>("ns_id").into();
416 let num_transactions = row.get::<i64, _>("count") as u64;
417 let size = header.namespace_size(&ns, payload_size as usize);
418 (
419 id,
420 NamespaceInfo {
421 num_transactions,
422 size,
423 },
424 )
425 })
426 .try_collect()
427 .await?;
428 Ok(map)
429 }
430}