1use std::{
16 collections::HashMap,
17 ops::{Bound, RangeBounds},
18};
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use futures::stream::{StreamExt, TryStreamExt};
23use hotshot_types::{
24 data::VidShare,
25 traits::{block_contents::BlockHeader, node_implementation::NodeType},
26};
27use snafu::OptionExt;
28use sqlx::Row;
29
30use super::{
31 super::transaction::{query, query_as, Transaction, TransactionMode, Write},
32 parse_header, DecodeError, QueryBuilder, HEADER_COLUMNS,
33};
34use crate::{
35 availability::{NamespaceId, QueryableHeader},
36 data_source::storage::{
37 Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
38 },
39 node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart},
40 types::HeightIndexed,
41 Header, MissingSnafu, NotFoundSnafu, QueryError, QueryResult,
42};
43
44#[async_trait]
45impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
46where
47 Mode: TransactionMode,
48 Types: NodeType,
49 Header<Types>: QueryableHeader<Types>,
50{
51 async fn block_height(&mut self) -> QueryResult<usize> {
52 match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
53 .fetch_one(self.as_mut())
54 .await?
55 {
56 (Some(height),) => {
57 Ok(height as usize + 1)
60 },
61 (None,) => {
62 Ok(0)
64 },
65 }
66 }
67
68 async fn count_transactions_in_range(
69 &mut self,
70 range: impl RangeBounds<usize> + Send,
71 namespace: Option<NamespaceId<Types>>,
72 ) -> QueryResult<usize> {
73 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
74 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
75 return Ok(0);
76 };
77 let (count,) = query_as::<(i64,)>(
78 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
79 )
80 .bind(to as i64)
81 .bind(namespace)
82 .fetch_one(self.as_mut())
83 .await?;
84 let mut count = count as usize;
85
86 if from > 0 {
87 let (prev_count,) = query_as::<(i64,)>(
88 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
89 )
90 .bind((from - 1) as i64)
91 .bind(namespace)
92 .fetch_one(self.as_mut())
93 .await?;
94 count = count.saturating_sub(prev_count as usize);
95 }
96
97 Ok(count)
98 }
99
100 async fn payload_size_in_range(
101 &mut self,
102 range: impl RangeBounds<usize> + Send,
103 namespace: Option<NamespaceId<Types>>,
104 ) -> QueryResult<usize> {
105 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
106 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
107 return Ok(0);
108 };
109 let (size,) = query_as::<(i64,)>(
110 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
111 )
112 .bind(to as i64)
113 .bind(namespace)
114 .fetch_one(self.as_mut())
115 .await?;
116 let mut size = size as usize;
117
118 if from > 0 {
119 let (prev_size,) = query_as::<(i64,)>(
120 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
121 )
122 .bind((from - 1) as i64)
123 .bind(namespace)
124 .fetch_one(self.as_mut())
125 .await?;
126 size = size.saturating_sub(prev_size as usize);
127 }
128
129 Ok(size)
130 }
131
132 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
133 where
134 ID: Into<BlockId<Types>> + Send + Sync,
135 {
136 let mut query = QueryBuilder::default();
137 let where_clause = query.header_where_clause(id.into())?;
138 let sql = format!(
141 "SELECT v.share AS share FROM vid2 AS v
142 JOIN header AS h ON v.height = h.height
143 WHERE {where_clause}
144 ORDER BY h.height
145 LIMIT 1"
146 );
147 let (share_data,) = query
148 .query_as::<(Option<Vec<u8>>,)>(&sql)
149 .fetch_one(self.as_mut())
150 .await?;
151 let share_data = share_data.context(MissingSnafu)?;
152 let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
153 Ok(share)
154 }
155
156 async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
157 let sql = "SELECT l.max_height, l.total_leaves, p.null_payloads, v.total_vid, \
181 vn.null_vid, pruned_height FROM
182 (SELECT max(leaf2.height) AS max_height, count(*) AS total_leaves FROM leaf2) AS l,
183 (SELECT count(*) AS null_payloads FROM payload WHERE data IS NULL) AS p,
184 (SELECT count(*) AS total_vid FROM vid2) AS v,
185 (SELECT count(*) AS null_vid FROM vid2 WHERE share IS NULL) AS vn,
186 (SELECT(SELECT last_height FROM pruned_height ORDER BY id DESC LIMIT 1) as \
187 pruned_height)
188 ";
189 let row = query(sql)
190 .fetch_optional(self.as_mut())
191 .await?
192 .context(NotFoundSnafu)?;
193
194 let block_height = match row.get::<Option<i64>, _>("max_height") {
195 Some(height) => {
196 height as usize + 1
199 },
200 None => {
201 0
203 },
204 };
205 let total_leaves = row.get::<i64, _>("total_leaves") as usize;
206 let null_payloads = row.get::<i64, _>("null_payloads") as usize;
207 let total_vid = row.get::<i64, _>("total_vid") as usize;
208 let null_vid = row.get::<i64, _>("null_vid") as usize;
209 let pruned_height = row
210 .get::<Option<i64>, _>("pruned_height")
211 .map(|h| h as usize);
212
213 let missing_leaves = block_height.saturating_sub(total_leaves);
214 let missing_blocks = missing_leaves + null_payloads;
215 let missing_vid_common = block_height.saturating_sub(total_vid);
216 let missing_vid_shares = missing_vid_common + null_vid;
217
218 Ok(SyncStatus {
219 missing_leaves,
220 missing_blocks,
221 missing_vid_common,
222 missing_vid_shares,
223 pruned_height,
224 })
225 }
226
227 async fn get_header_window(
228 &mut self,
229 start: impl Into<WindowStart<Types>> + Send + Sync,
230 end: u64,
231 limit: usize,
232 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
233 let first_block = match start.into() {
235 WindowStart::Time(t) => {
236 return self.time_window::<Types>(t, end, limit).await;
241 },
242 WindowStart::Height(h) => h,
243 WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
244 };
245
246 let sql = format!(
250 "SELECT {HEADER_COLUMNS}
251 FROM header AS h
252 WHERE h.height >= $1 AND h.timestamp < $2
253 ORDER BY h.height
254 LIMIT $3"
255 );
256 let rows = query(&sql)
257 .bind(first_block as i64)
258 .bind(end as i64)
259 .bind(limit as i64)
260 .fetch(self.as_mut());
261 let window = rows
262 .map(|row| parse_header::<Types>(row?))
263 .try_collect::<Vec<_>>()
264 .await?;
265
266 let prev = if first_block > 0 {
268 Some(self.load_header::<Types>(first_block as usize - 1).await?)
269 } else {
270 None
271 };
272
273 let next = if window.len() < limit {
274 let sql = format!(
283 "SELECT {HEADER_COLUMNS}
284 FROM header AS h
285 WHERE h.timestamp >= $1
286 ORDER BY h.timestamp, h.height
287 LIMIT 1"
288 );
289 query(&sql)
290 .bind(end as i64)
291 .fetch_optional(self.as_mut())
292 .await?
293 .map(parse_header::<Types>)
294 .transpose()?
295 } else {
296 tracing::debug!(limit, "cutting off header window request due to limit");
300 None
301 };
302
303 Ok(TimeWindowQueryData { window, prev, next })
304 }
305}
306
307impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
308where
309 Types: NodeType,
310 Header<Types>: QueryableHeader<Types>,
311{
312 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
313 let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
314 .fetch_one(self.as_mut())
315 .await?;
316 Ok(height as usize)
317 }
318
319 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
320 let res: (Option<i64>,) =
323 query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
324 .fetch_one(self.as_mut())
325 .await?;
326
327 let (Some(max_height),) = res else {
328 return Ok(None);
329 };
330
331 let rows: Vec<(i64, i64, i64)> = query_as(
332 r#"
333 SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
334 "#,
335 )
336 .bind(max_height)
337 .fetch_all(self.as_mut())
338 .await?;
339
340 let mut num_transactions = HashMap::new();
341 let mut payload_size = HashMap::new();
342
343 for (namespace_id, num_tx, payload_sz) in rows {
344 let key = if namespace_id == -1 {
348 None
349 } else {
350 Some(namespace_id.into())
351 };
352 num_transactions.insert(key, num_tx as usize);
353 payload_size.insert(key, payload_sz as usize);
354 }
355
356 Ok(Some(Aggregate {
357 height: max_height,
358 num_transactions,
359 payload_size,
360 }))
361 }
362}
363
364impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
365where
366 Header<Types>: QueryableHeader<Types>,
367{
368 async fn update_aggregates(
369 &mut self,
370 prev: Aggregate<Types>,
371 blocks: &[PayloadMetadata<Types>],
372 ) -> anyhow::Result<Aggregate<Types>> {
373 let height = blocks[0].height();
374 let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
375
376 let mut rows = Vec::new();
377
378 let aggregates = blocks
380 .iter()
381 .scan(
382 (height, prev_tx_count, prev_size),
383 |(height, tx_count, size), block| {
384 if *height != block.height {
385 return Some(Err(anyhow!(
386 "blocks in update_aggregates are not sequential; expected {}, got {}",
387 *height,
388 block.height()
389 )));
390 }
391 *height += 1;
392
393 *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
398 *size.entry(None).or_insert(0) += block.size as usize;
399
400 rows.push((
403 block.height as i64,
404 -1,
405 tx_count[&None] as i64,
406 size[&None] as i64,
407 ));
408
409 for (&ns_id, info) in &block.namespaces {
411 let key = Some(ns_id);
412
413 *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
414 *size.entry(key).or_insert(0) += info.size as usize;
415 }
416
417 for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
421 let key = Some(*ns_id);
422 rows.push((
423 block.height as i64,
424 (*ns_id).into(),
425 tx_count[&key] as i64,
426 size[&key] as i64,
427 ));
428 }
429
430 Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
431 },
432 )
433 .collect::<anyhow::Result<Vec<_>>>()?;
434 let last_aggregate = aggregates.last().cloned();
435
436 let (height, num_transactions, payload_size) =
437 last_aggregate.ok_or_else(|| anyhow!("no row"))?;
438
439 self.upsert(
440 "aggregate",
441 ["height", "namespace", "num_transactions", "payload_size"],
442 ["height", "namespace"],
443 rows,
444 )
445 .await?;
446 Ok(Aggregate {
447 height,
448 num_transactions,
449 payload_size,
450 })
451 }
452}
453
454impl<Mode: TransactionMode> Transaction<Mode> {
455 async fn time_window<Types: NodeType>(
456 &mut self,
457 start: u64,
458 end: u64,
459 limit: usize,
460 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
461 let sql = format!(
473 "SELECT {HEADER_COLUMNS}
474 FROM header AS h
475 WHERE h.timestamp >= $1 AND h.timestamp < $2
476 ORDER BY h.timestamp, h.height
477 LIMIT $3"
478 );
479 let rows = query(&sql)
480 .bind(start as i64)
481 .bind(end as i64)
482 .bind(limit as i64)
483 .fetch(self.as_mut());
484 let window: Vec<_> = rows
485 .map(|row| parse_header::<Types>(row?))
486 .try_collect()
487 .await?;
488
489 let next = if window.len() < limit {
490 let sql = format!(
492 "SELECT {HEADER_COLUMNS}
493 FROM header AS h
494 WHERE h.timestamp >= $1
495 ORDER BY h.timestamp, h.height
496 LIMIT 1"
497 );
498 query(&sql)
499 .bind(end as i64)
500 .fetch_optional(self.as_mut())
501 .await?
502 .map(parse_header::<Types>)
503 .transpose()?
504 } else {
505 tracing::debug!(limit, "cutting off header window request due to limit");
509 None
510 };
511
512 if window.is_empty() && next.is_none() {
520 return Err(QueryError::NotFound);
521 }
522
523 let sql = format!(
525 "SELECT {HEADER_COLUMNS}
526 FROM header AS h
527 WHERE h.timestamp < $1
528 ORDER BY h.timestamp DESC, h.height DESC
529 LIMIT 1"
530 );
531 let prev = query(&sql)
532 .bind(start as i64)
533 .fetch_optional(self.as_mut())
534 .await?
535 .map(parse_header::<Types>)
536 .transpose()?;
537
538 Ok(TimeWindowQueryData { window, prev, next })
539 }
540}
541
542async fn aggregate_range_bounds<Types>(
547 tx: &mut Transaction<impl TransactionMode>,
548 range: impl RangeBounds<usize>,
549) -> QueryResult<Option<(usize, usize)>>
550where
551 Types: NodeType,
552 Header<Types>: QueryableHeader<Types>,
553{
554 let from = match range.start_bound() {
555 Bound::Included(from) => *from,
556 Bound::Excluded(from) => *from + 1,
557 Bound::Unbounded => 0,
558 };
559 let to = match range.end_bound() {
560 Bound::Included(to) => *to,
561 Bound::Excluded(0) => return Ok(None),
562 Bound::Excluded(to) => *to - 1,
563 Bound::Unbounded => {
564 let height = AggregatesStorage::<Types>::aggregates_height(tx)
565 .await
566 .map_err(|err| QueryError::Error {
567 message: format!("{err:#}"),
568 })?;
569 if height == 0 {
570 return Ok(None);
571 }
572 if height < from {
573 return Ok(None);
574 }
575 height - 1
576 },
577 };
578 Ok(Some((from, to)))
579}