1use std::ops::{Bound, RangeBounds};
16
17use alloy::primitives::map::HashMap;
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::stream::{StreamExt, TryStreamExt};
21use hotshot_types::{
22 data::VidShare,
23 traits::{block_contents::BlockHeader, node_implementation::NodeType},
24};
25use snafu::OptionExt;
26use sqlx::Row;
27
28use super::{
29 super::transaction::{query, query_as, Transaction, TransactionMode, Write},
30 parse_header, DecodeError, QueryBuilder, HEADER_COLUMNS,
31};
32use crate::{
33 availability::{NamespaceId, QueryableHeader},
34 data_source::storage::{
35 Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
36 },
37 node::{BlockId, SyncStatus, TimeWindowQueryData, WindowStart},
38 types::HeightIndexed,
39 Header, MissingSnafu, NotFoundSnafu, QueryError, QueryResult,
40};
41
42#[async_trait]
43impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
44where
45 Mode: TransactionMode,
46 Types: NodeType,
47 Header<Types>: QueryableHeader<Types>,
48{
49 async fn block_height(&mut self) -> QueryResult<usize> {
50 match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
51 .fetch_one(self.as_mut())
52 .await?
53 {
54 (Some(height),) => {
55 Ok(height as usize + 1)
58 },
59 (None,) => {
60 Ok(0)
62 },
63 }
64 }
65
66 async fn count_transactions_in_range(
67 &mut self,
68 range: impl RangeBounds<usize> + Send,
69 namespace: Option<NamespaceId<Types>>,
70 ) -> QueryResult<usize> {
71 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
72 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
73 return Ok(0);
74 };
75 let (count,) = query_as::<(i64,)>(
76 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
77 )
78 .bind(to as i64)
79 .bind(namespace)
80 .fetch_one(self.as_mut())
81 .await?;
82 let mut count = count as usize;
83
84 if from > 0 {
85 let (prev_count,) = query_as::<(i64,)>(
86 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
87 )
88 .bind((from - 1) as i64)
89 .bind(namespace)
90 .fetch_one(self.as_mut())
91 .await?;
92 count = count.saturating_sub(prev_count as usize);
93 }
94
95 Ok(count)
96 }
97
98 async fn payload_size_in_range(
99 &mut self,
100 range: impl RangeBounds<usize> + Send,
101 namespace: Option<NamespaceId<Types>>,
102 ) -> QueryResult<usize> {
103 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
104 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
105 return Ok(0);
106 };
107 let (size,) = query_as::<(i64,)>(
108 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
109 )
110 .bind(to as i64)
111 .bind(namespace)
112 .fetch_one(self.as_mut())
113 .await?;
114 let mut size = size as usize;
115
116 if from > 0 {
117 let (prev_size,) = query_as::<(i64,)>(
118 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
119 )
120 .bind((from - 1) as i64)
121 .bind(namespace)
122 .fetch_one(self.as_mut())
123 .await?;
124 size = size.saturating_sub(prev_size as usize);
125 }
126
127 Ok(size)
128 }
129
130 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
131 where
132 ID: Into<BlockId<Types>> + Send + Sync,
133 {
134 let mut query = QueryBuilder::default();
135 let where_clause = query.header_where_clause(id.into())?;
136 let sql = format!(
139 "SELECT v.share AS share FROM vid2 AS v
140 JOIN header AS h ON v.height = h.height
141 WHERE {where_clause}
142 ORDER BY h.height
143 LIMIT 1"
144 );
145 let (share_data,) = query
146 .query_as::<(Option<Vec<u8>>,)>(&sql)
147 .fetch_one(self.as_mut())
148 .await?;
149 let share_data = share_data.context(MissingSnafu)?;
150 let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
151 Ok(share)
152 }
153
154 async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
155 let sql = "SELECT l.max_height, l.total_leaves, p.null_payloads, v.total_vid, \
179 vn.null_vid, pruned_height FROM
180 (SELECT max(leaf2.height) AS max_height, count(*) AS total_leaves FROM leaf2) AS l,
181 (SELECT count(*) AS null_payloads FROM payload WHERE data IS NULL) AS p,
182 (SELECT count(*) AS total_vid FROM vid2) AS v,
183 (SELECT count(*) AS null_vid FROM vid2 WHERE share IS NULL) AS vn,
184 (SELECT(SELECT last_height FROM pruned_height ORDER BY id DESC LIMIT 1) as \
185 pruned_height)
186 ";
187 let row = query(sql)
188 .fetch_optional(self.as_mut())
189 .await?
190 .context(NotFoundSnafu)?;
191
192 let block_height = match row.get::<Option<i64>, _>("max_height") {
193 Some(height) => {
194 height as usize + 1
197 },
198 None => {
199 0
201 },
202 };
203 let total_leaves = row.get::<i64, _>("total_leaves") as usize;
204 let null_payloads = row.get::<i64, _>("null_payloads") as usize;
205 let total_vid = row.get::<i64, _>("total_vid") as usize;
206 let null_vid = row.get::<i64, _>("null_vid") as usize;
207 let pruned_height = row
208 .get::<Option<i64>, _>("pruned_height")
209 .map(|h| h as usize);
210
211 let missing_leaves = block_height.saturating_sub(total_leaves);
212 let missing_blocks = missing_leaves + null_payloads;
213 let missing_vid_common = block_height.saturating_sub(total_vid);
214 let missing_vid_shares = missing_vid_common + null_vid;
215
216 Ok(SyncStatus {
217 missing_leaves,
218 missing_blocks,
219 missing_vid_common,
220 missing_vid_shares,
221 pruned_height,
222 })
223 }
224
225 async fn get_header_window(
226 &mut self,
227 start: impl Into<WindowStart<Types>> + Send + Sync,
228 end: u64,
229 limit: usize,
230 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
231 let first_block = match start.into() {
233 WindowStart::Time(t) => {
234 return self.time_window::<Types>(t, end, limit).await;
239 },
240 WindowStart::Height(h) => h,
241 WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
242 };
243
244 let sql = format!(
248 "SELECT {HEADER_COLUMNS}
249 FROM header AS h
250 WHERE h.height >= $1 AND h.timestamp < $2
251 ORDER BY h.height
252 LIMIT $3"
253 );
254 let rows = query(&sql)
255 .bind(first_block as i64)
256 .bind(end as i64)
257 .bind(limit as i64)
258 .fetch(self.as_mut());
259 let window = rows
260 .map(|row| parse_header::<Types>(row?))
261 .try_collect::<Vec<_>>()
262 .await?;
263
264 let prev = if first_block > 0 {
266 Some(self.load_header::<Types>(first_block as usize - 1).await?)
267 } else {
268 None
269 };
270
271 let next = if window.len() < limit {
272 let sql = format!(
281 "SELECT {HEADER_COLUMNS}
282 FROM header AS h
283 WHERE h.timestamp >= $1
284 ORDER BY h.timestamp, h.height
285 LIMIT 1"
286 );
287 query(&sql)
288 .bind(end as i64)
289 .fetch_optional(self.as_mut())
290 .await?
291 .map(parse_header::<Types>)
292 .transpose()?
293 } else {
294 tracing::debug!(limit, "cutting off header window request due to limit");
298 None
299 };
300
301 Ok(TimeWindowQueryData { window, prev, next })
302 }
303}
304
305impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
306where
307 Types: NodeType,
308 Header<Types>: QueryableHeader<Types>,
309{
310 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
311 let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
312 .fetch_one(self.as_mut())
313 .await?;
314 Ok(height as usize)
315 }
316
317 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
318 let res: (Option<i64>,) =
321 query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
322 .fetch_one(self.as_mut())
323 .await?;
324
325 let (Some(max_height),) = res else {
326 return Ok(None);
327 };
328
329 let rows: Vec<(i64, i64, i64)> = query_as(
330 r#"
331 SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
332 "#,
333 )
334 .bind(max_height)
335 .fetch_all(self.as_mut())
336 .await?;
337
338 let mut num_transactions = HashMap::default();
339 let mut payload_size = HashMap::default();
340
341 for (namespace_id, num_tx, payload_sz) in rows {
342 let key = if namespace_id == -1 {
346 None
347 } else {
348 Some(namespace_id.into())
349 };
350 num_transactions.insert(key, num_tx as usize);
351 payload_size.insert(key, payload_sz as usize);
352 }
353
354 Ok(Some(Aggregate {
355 height: max_height,
356 num_transactions,
357 payload_size,
358 }))
359 }
360}
361
362impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
363where
364 Header<Types>: QueryableHeader<Types>,
365{
366 async fn update_aggregates(
367 &mut self,
368 prev: Aggregate<Types>,
369 blocks: &[PayloadMetadata<Types>],
370 ) -> anyhow::Result<Aggregate<Types>> {
371 let height = blocks[0].height();
372 let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
373
374 let mut rows = Vec::new();
375
376 let aggregates = blocks
378 .iter()
379 .scan(
380 (height, prev_tx_count, prev_size),
381 |(height, tx_count, size), block| {
382 if *height != block.height {
383 return Some(Err(anyhow!(
384 "blocks in update_aggregates are not sequential; expected {}, got {}",
385 *height,
386 block.height()
387 )));
388 }
389 *height += 1;
390
391 *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
396 *size.entry(None).or_insert(0) += block.size as usize;
397
398 rows.push((
401 block.height as i64,
402 -1,
403 tx_count[&None] as i64,
404 size[&None] as i64,
405 ));
406
407 for (&ns_id, info) in &block.namespaces {
409 let key = Some(ns_id);
410
411 *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
412 *size.entry(key).or_insert(0) += info.size as usize;
413 }
414
415 for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
419 let key = Some(*ns_id);
420 rows.push((
421 block.height as i64,
422 (*ns_id).into(),
423 tx_count[&key] as i64,
424 size[&key] as i64,
425 ));
426 }
427
428 Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
429 },
430 )
431 .collect::<anyhow::Result<Vec<_>>>()?;
432 let last_aggregate = aggregates.last().cloned();
433
434 let (height, num_transactions, payload_size) =
435 last_aggregate.ok_or_else(|| anyhow!("no row"))?;
436
437 self.upsert(
438 "aggregate",
439 ["height", "namespace", "num_transactions", "payload_size"],
440 ["height", "namespace"],
441 rows,
442 )
443 .await?;
444 Ok(Aggregate {
445 height,
446 num_transactions,
447 payload_size,
448 })
449 }
450}
451
452impl<Mode: TransactionMode> Transaction<Mode> {
453 async fn time_window<Types: NodeType>(
454 &mut self,
455 start: u64,
456 end: u64,
457 limit: usize,
458 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
459 let sql = format!(
471 "SELECT {HEADER_COLUMNS}
472 FROM header AS h
473 WHERE h.timestamp >= $1 AND h.timestamp < $2
474 ORDER BY h.timestamp, h.height
475 LIMIT $3"
476 );
477 let rows = query(&sql)
478 .bind(start as i64)
479 .bind(end as i64)
480 .bind(limit as i64)
481 .fetch(self.as_mut());
482 let window: Vec<_> = rows
483 .map(|row| parse_header::<Types>(row?))
484 .try_collect()
485 .await?;
486
487 let next = if window.len() < limit {
488 let sql = format!(
490 "SELECT {HEADER_COLUMNS}
491 FROM header AS h
492 WHERE h.timestamp >= $1
493 ORDER BY h.timestamp, h.height
494 LIMIT 1"
495 );
496 query(&sql)
497 .bind(end as i64)
498 .fetch_optional(self.as_mut())
499 .await?
500 .map(parse_header::<Types>)
501 .transpose()?
502 } else {
503 tracing::debug!(limit, "cutting off header window request due to limit");
507 None
508 };
509
510 if window.is_empty() && next.is_none() {
518 return Err(QueryError::NotFound);
519 }
520
521 let sql = format!(
523 "SELECT {HEADER_COLUMNS}
524 FROM header AS h
525 WHERE h.timestamp < $1
526 ORDER BY h.timestamp DESC, h.height DESC
527 LIMIT 1"
528 );
529 let prev = query(&sql)
530 .bind(start as i64)
531 .fetch_optional(self.as_mut())
532 .await?
533 .map(parse_header::<Types>)
534 .transpose()?;
535
536 Ok(TimeWindowQueryData { window, prev, next })
537 }
538}
539
540async fn aggregate_range_bounds<Types>(
545 tx: &mut Transaction<impl TransactionMode>,
546 range: impl RangeBounds<usize>,
547) -> QueryResult<Option<(usize, usize)>>
548where
549 Types: NodeType,
550 Header<Types>: QueryableHeader<Types>,
551{
552 let from = match range.start_bound() {
553 Bound::Included(from) => *from,
554 Bound::Excluded(from) => *from + 1,
555 Bound::Unbounded => 0,
556 };
557 let to = match range.end_bound() {
558 Bound::Included(to) => *to,
559 Bound::Excluded(0) => return Ok(None),
560 Bound::Excluded(to) => *to - 1,
561 Bound::Unbounded => {
562 let height = AggregatesStorage::<Types>::aggregates_height(tx)
563 .await
564 .map_err(|err| QueryError::Error {
565 message: format!("{err:#}"),
566 })?;
567 if height == 0 {
568 return Ok(None);
569 }
570 if height < from {
571 return Ok(None);
572 }
573 height - 1
574 },
575 };
576 Ok(Some((from, to)))
577}