1#![cfg(feature = "file-system-data-source")]
14
15use std::{
16 collections::{
17 hash_map::{Entry, HashMap},
18 BTreeMap,
19 },
20 hash::Hash,
21 ops::{Bound, Deref, RangeBounds},
22 path::Path,
23};
24
25use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
26use async_trait::async_trait;
27use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
28use committable::Committable;
29use futures::future::Future;
30use hotshot_types::{
31 data::{VidCommitment, VidShare},
32 traits::{
33 block_contents::BlockHeader,
34 node_implementation::{ConsensusTime, NodeType},
35 },
36};
37use serde::{de::DeserializeOwned, Serialize};
38use snafu::OptionExt;
39
40use super::{
41 ledger_log::{Iter, LedgerLog},
42 pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
43 sql::MigrateTypes,
44 Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
45 UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
46};
47use crate::{
48 availability::{
49 data_source::{BlockId, LeafId},
50 query_data::{
51 BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52 QueryablePayload, TransactionHash, TransactionQueryData, VidCommonQueryData,
53 },
54 NamespaceId, StateCertQueryData,
55 },
56 data_source::{update, VersionedDataSource},
57 metrics::PrometheusMetrics,
58 node::{SyncStatus, TimeWindowQueryData, WindowStart},
59 status::HasMetrics,
60 types::HeightIndexed,
61 ErrorSnafu, Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
62};
63
64const CACHED_LEAVES_COUNT: usize = 100;
65const CACHED_BLOCKS_COUNT: usize = 100;
66const CACHED_VID_COMMON_COUNT: usize = 100;
67const CACHED_STATE_CERT_COUNT: usize = 5;
68
69#[derive(custom_debug::Debug)]
70pub struct FileSystemStorageInner<Types>
71where
72 Types: NodeType,
73 Header<Types>: QueryableHeader<Types>,
74 Payload<Types>: QueryablePayload<Types>,
75{
76 index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
77 index_by_block_hash: HashMap<BlockHash<Types>, u64>,
78 index_by_payload_hash: HashMap<VidCommitment, u64>,
79 index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
80 index_by_time: BTreeMap<u64, Vec<u64>>,
81 num_transactions: usize,
82 payload_size: usize,
83 #[debug(skip)]
84 top_storage: Option<AtomicStore>,
85 leaf_storage: LedgerLog<LeafQueryData<Types>>,
86 block_storage: LedgerLog<BlockQueryData<Types>>,
87 vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
88 state_cert_storage: LedgerLog<StateCertQueryData<Types>>,
89}
90
91impl<Types> FileSystemStorageInner<Types>
92where
93 Types: NodeType,
94 Header<Types>: QueryableHeader<Types>,
95 Payload<Types>: QueryablePayload<Types>,
96{
97 fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
98 match id {
99 BlockId::Number(n) => Ok(n),
100 BlockId::Hash(h) => {
101 Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
102 },
103 BlockId::PayloadHash(h) => {
104 Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
105 },
106 }
107 }
108
109 fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
110 self.block_storage
111 .iter()
112 .nth(self.get_block_index(id)?)
113 .context(NotFoundSnafu)?
114 .context(MissingSnafu)
115 }
116
117 fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
118 self.get_block(id).map(|block| block.header)
119 }
120
121 fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
122 where
123 R: RangeBounds<usize> + Send,
124 {
125 Ok(range_iter(self.block_storage.iter(), range).collect())
126 }
127}
128
129#[derive(Debug)]
131pub struct FileSystemStorage<Types: NodeType>
132where
133 Header<Types>: QueryableHeader<Types>,
134 Payload<Types>: QueryablePayload<Types>,
135{
136 inner: RwLock<FileSystemStorageInner<Types>>,
137 metrics: PrometheusMetrics,
138}
139
140impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
141where
142 Header<Types>: QueryableHeader<Types>,
143 Payload<Types>: QueryablePayload<Types>,
144{
145}
146impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
147where
148 Header<Types>: QueryableHeader<Types>,
149 Payload<Types>: QueryablePayload<Types>,
150{
151 type Pruner = ();
152}
153
154#[async_trait]
155impl<Types: NodeType> MigrateTypes<Types> for FileSystemStorage<Types>
156where
157 Header<Types>: QueryableHeader<Types>,
158 Payload<Types>: QueryablePayload<Types>,
159{
160 async fn migrate_types(&self, _batch_size: u64) -> anyhow::Result<()> {
161 Ok(())
162 }
163}
164
165impl<Types: NodeType> FileSystemStorage<Types>
166where
167 Payload<Types>: QueryablePayload<Types>,
168 Header<Types>: QueryableHeader<Types>,
169{
170 pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
176 let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
177 loader.retain_archives(1);
178 let data_source = Self::create_with_store(&mut loader).await?;
179 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
180 Ok(data_source)
181 }
182
183 pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
189 let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
190 loader.retain_archives(1);
191 let data_source = Self::open_with_store(&mut loader).await?;
192 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
193 Ok(data_source)
194 }
195
196 pub async fn create_with_store(
205 loader: &mut AtomicStoreLoader,
206 ) -> Result<Self, PersistenceError> {
207 Ok(Self {
208 inner: RwLock::new(FileSystemStorageInner {
209 index_by_leaf_hash: Default::default(),
210 index_by_block_hash: Default::default(),
211 index_by_payload_hash: Default::default(),
212 index_by_txn_hash: Default::default(),
213 index_by_time: Default::default(),
214 num_transactions: 0,
215 payload_size: 0,
216 top_storage: None,
217 leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
218 block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
219 vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
220 state_cert_storage: LedgerLog::create(
221 loader,
222 "state_cert",
223 CACHED_STATE_CERT_COUNT,
224 )?,
225 }),
226 metrics: Default::default(),
227 })
228 }
229
230 pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
239 let leaf_storage =
240 LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
241 let block_storage =
242 LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
243 let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
244 loader,
245 "vid_common",
246 CACHED_VID_COMMON_COUNT,
247 )?;
248 let state_cert_storage = LedgerLog::<StateCertQueryData<Types>>::open(
249 loader,
250 "state_cert",
251 CACHED_STATE_CERT_COUNT,
252 )?;
253
254 let mut index_by_block_hash = HashMap::new();
255 let mut index_by_payload_hash = HashMap::new();
256 let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
257 let index_by_leaf_hash = leaf_storage
258 .iter()
259 .flatten()
260 .map(|leaf| {
261 update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
262 update_index_by_hash(
263 &mut index_by_payload_hash,
264 leaf.payload_hash(),
265 leaf.height(),
266 );
267 index_by_time
268 .entry(leaf.header().timestamp())
269 .or_default()
270 .push(leaf.height());
271 (leaf.hash(), leaf.height())
272 })
273 .collect();
274
275 let mut index_by_txn_hash = HashMap::new();
276 let mut num_transactions = 0;
277 let mut payload_size = 0;
278 for block in block_storage.iter().flatten() {
279 num_transactions += block.len();
280 payload_size += block.size() as usize;
281
282 let height = block.height();
283 for (_, txn) in block.enumerate() {
284 update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
285 }
286 }
287
288 Ok(Self {
289 inner: RwLock::new(FileSystemStorageInner {
290 index_by_leaf_hash,
291 index_by_block_hash,
292 index_by_payload_hash,
293 index_by_txn_hash,
294 index_by_time,
295 num_transactions,
296 payload_size,
297 leaf_storage,
298 block_storage,
299 vid_storage,
300 state_cert_storage,
301 top_storage: None,
302 }),
303 metrics: Default::default(),
304 })
305 }
306
307 pub async fn skip_version(&self) -> Result<(), PersistenceError> {
309 let mut inner = self.inner.write().await;
310 inner.leaf_storage.skip_version()?;
311 inner.block_storage.skip_version()?;
312 inner.vid_storage.skip_version()?;
313 inner.state_cert_storage.skip_version()?;
314 if let Some(store) = &mut inner.top_storage {
315 store.commit_version()?;
316 }
317 Ok(())
318 }
319
320 pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
322 let mut tx = self.read().await.map_err(|err| QueryError::Error {
323 message: err.to_string(),
324 })?;
325 let share = tx.vid_share(block_id).await?;
326 Ok(share)
327 }
328
329 pub async fn get_vid_common(
331 &self,
332 block_id: BlockId<Types>,
333 ) -> QueryResult<VidCommonQueryData<Types>> {
334 let mut tx = self.read().await.map_err(|err| QueryError::Error {
335 message: err.to_string(),
336 })?;
337 let share = tx.get_vid_common(block_id).await?;
338 Ok(share)
339 }
340
341 pub async fn get_vid_common_metadata(
343 &self,
344 block_id: BlockId<Types>,
345 ) -> QueryResult<VidCommonMetadata<Types>> {
346 let mut tx = self.read().await.map_err(|err| QueryError::Error {
347 message: err.to_string(),
348 })?;
349 let share = tx.get_vid_common_metadata(block_id).await?;
350 Ok(share)
351 }
352}
353
354pub trait Revert {
355 fn revert(&mut self);
356}
357
358impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
359where
360 Types: NodeType,
361 Header<Types>: QueryableHeader<Types>,
362 Payload<Types>: QueryablePayload<Types>,
363{
364 fn revert(&mut self) {
365 self.leaf_storage.revert_version().unwrap();
366 self.block_storage.revert_version().unwrap();
367 self.vid_storage.revert_version().unwrap();
368 self.state_cert_storage.revert_version().unwrap();
369 }
370}
371
372impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
373where
374 Types: NodeType,
375 Header<Types>: QueryableHeader<Types>,
376 Payload<Types>: QueryablePayload<Types>,
377{
378 fn revert(&mut self) {
379 }
381}
382
383#[derive(Debug)]
384pub struct Transaction<T: Revert> {
385 inner: T,
386}
387
388impl<T: Revert> Drop for Transaction<T> {
389 fn drop(&mut self) {
390 self.inner.revert();
391 }
392}
393impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
394where
395 Types: NodeType,
396 Header<Types>: QueryableHeader<Types>,
397 Payload<Types>: QueryablePayload<Types>,
398{
399 async fn commit(mut self) -> anyhow::Result<()> {
400 self.inner.leaf_storage.commit_version().await?;
401 self.inner.block_storage.commit_version().await?;
402 self.inner.vid_storage.commit_version().await?;
403 self.inner.state_cert_storage.commit_version().await?;
404 if let Some(store) = &mut self.inner.top_storage {
405 store.commit_version()?;
406 }
407 Ok(())
408 }
409
410 fn revert(self) -> impl Future + Send {
411 async move {}
413 }
414}
415
416impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
417where
418 Types: NodeType,
419 Header<Types>: QueryableHeader<Types>,
420 Payload<Types>: QueryablePayload<Types>,
421{
422 async fn commit(self) -> anyhow::Result<()> {
423 Ok(())
425 }
426
427 fn revert(self) -> impl Future + Send {
428 async move {}
430 }
431}
432
433impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
434where
435 Header<Types>: QueryableHeader<Types>,
436 Payload<Types>: QueryablePayload<Types>,
437{
438 type Transaction<'a>
439 = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
440 where
441 Self: 'a;
442 type ReadOnly<'a>
443 = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
444 where
445 Self: 'a;
446
447 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
448 Ok(Transaction {
449 inner: self.inner.write().await,
450 })
451 }
452
453 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
454 Ok(Transaction {
455 inner: self.inner.read().await,
456 })
457 }
458}
459fn range_iter<T>(
460 mut iter: Iter<'_, T>,
461 range: impl RangeBounds<usize>,
462) -> impl '_ + Iterator<Item = QueryResult<T>>
463where
464 T: Clone + Serialize + DeserializeOwned,
465{
466 let start = range.start_bound().cloned();
467 let end = range.end_bound().cloned();
468
469 let pos = match start {
471 Bound::Included(n) => {
472 if n > 0 {
473 iter.nth(n - 1);
474 }
475 n
476 },
477 Bound::Excluded(n) => {
478 iter.nth(n);
479 n + 1
480 },
481 Bound::Unbounded => 0,
482 };
483
484 itertools::unfold((iter, end, pos), |(iter, end, pos)| {
485 let reached_end = match end {
487 Bound::Included(n) => pos > n,
488 Bound::Excluded(n) => pos >= n,
489 Bound::Unbounded => false,
490 };
491 if reached_end {
492 return None;
493 }
494 let opt = iter.next()?;
495 *pos += 1;
496 Some(opt.context(MissingSnafu))
497 })
498}
499
500#[async_trait]
501impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
502where
503 Types: NodeType,
504 Payload<Types>: QueryablePayload<Types>,
505 Header<Types>: QueryableHeader<Types>,
506 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
507{
508 async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
509 let n = match id {
510 LeafId::Number(n) => n,
511 LeafId::Hash(h) => *self
512 .inner
513 .index_by_leaf_hash
514 .get(&h)
515 .context(NotFoundSnafu)? as usize,
516 };
517 self.inner
518 .leaf_storage
519 .iter()
520 .nth(n)
521 .context(NotFoundSnafu)?
522 .context(MissingSnafu)
523 }
524
525 async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
526 self.inner.get_block(id)
527 }
528
529 async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
530 self.inner.get_header(id)
531 }
532
533 async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
534 self.get_block(id).await.map(PayloadQueryData::from)
535 }
536
537 async fn get_payload_metadata(
538 &mut self,
539 id: BlockId<Types>,
540 ) -> QueryResult<PayloadMetadata<Types>> {
541 self.get_block(id).await.map(PayloadMetadata::from)
542 }
543
544 async fn get_vid_common(
545 &mut self,
546 id: BlockId<Types>,
547 ) -> QueryResult<VidCommonQueryData<Types>> {
548 Ok(self
549 .inner
550 .vid_storage
551 .iter()
552 .nth(self.inner.get_block_index(id)?)
553 .context(NotFoundSnafu)?
554 .context(MissingSnafu)?
555 .0)
556 }
557
558 async fn get_vid_common_metadata(
559 &mut self,
560 id: BlockId<Types>,
561 ) -> QueryResult<VidCommonMetadata<Types>> {
562 self.get_vid_common(id).await.map(VidCommonMetadata::from)
563 }
564
565 async fn get_leaf_range<R>(
566 &mut self,
567 range: R,
568 ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
569 where
570 R: RangeBounds<usize> + Send,
571 {
572 Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
573 }
574
575 async fn get_block_range<R>(
576 &mut self,
577 range: R,
578 ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
579 where
580 R: RangeBounds<usize> + Send,
581 {
582 self.inner.get_block_range(range)
583 }
584
585 async fn get_payload_range<R>(
586 &mut self,
587 range: R,
588 ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
589 where
590 R: RangeBounds<usize> + Send,
591 {
592 Ok(range_iter(self.inner.block_storage.iter(), range)
593 .map(|res| res.map(PayloadQueryData::from))
594 .collect())
595 }
596
597 async fn get_payload_metadata_range<R>(
598 &mut self,
599 range: R,
600 ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
601 where
602 R: RangeBounds<usize> + Send + 'static,
603 {
604 Ok(range_iter(self.inner.block_storage.iter(), range)
605 .map(|res| res.map(PayloadMetadata::from))
606 .collect())
607 }
608
609 async fn get_vid_common_range<R>(
610 &mut self,
611 range: R,
612 ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
613 where
614 R: RangeBounds<usize> + Send,
615 {
616 Ok(range_iter(self.inner.vid_storage.iter(), range)
617 .map(|res| res.map(|(common, _)| common))
618 .collect())
619 }
620
621 async fn get_vid_common_metadata_range<R>(
622 &mut self,
623 range: R,
624 ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
625 where
626 R: RangeBounds<usize> + Send,
627 {
628 Ok(range_iter(self.inner.vid_storage.iter(), range)
629 .map(|res| res.map(|(common, _)| common.into()))
630 .collect())
631 }
632
633 async fn get_transaction(
634 &mut self,
635 hash: TransactionHash<Types>,
636 ) -> QueryResult<TransactionQueryData<Types>> {
637 let height = self
638 .inner
639 .index_by_txn_hash
640 .get(&hash)
641 .context(NotFoundSnafu)?;
642 let block = self.inner.get_block((*height as usize).into())?;
643 TransactionQueryData::with_hash(&block, hash).context(ErrorSnafu {
644 message: format!(
645 "transaction index inconsistent: block {height} contains no transaction {hash}"
646 ),
647 })
648 }
649
650 async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
651 self.get_leaf((from as usize).into()).await
655 }
656
657 async fn get_state_cert(&mut self, epoch: u64) -> QueryResult<StateCertQueryData<Types>> {
658 self.inner
659 .state_cert_storage
660 .iter()
661 .nth(epoch as usize)
662 .context(NotFoundSnafu)?
663 .context(MissingSnafu)
664 }
665}
666
667impl<Types: NodeType> UpdateAvailabilityStorage<Types>
668 for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
669where
670 Payload<Types>: QueryablePayload<Types>,
671 Header<Types>: QueryableHeader<Types>,
672{
673 async fn insert_leaf(&mut self, leaf: LeafQueryData<Types>) -> anyhow::Result<()> {
674 self.inner
675 .leaf_storage
676 .insert(leaf.height() as usize, leaf.clone())?;
677 self.inner
678 .index_by_leaf_hash
679 .insert(leaf.hash(), leaf.height());
680 update_index_by_hash(
681 &mut self.inner.index_by_block_hash,
682 leaf.block_hash(),
683 leaf.height(),
684 );
685 update_index_by_hash(
686 &mut self.inner.index_by_payload_hash,
687 leaf.payload_hash(),
688 leaf.height(),
689 );
690 self.inner
691 .index_by_time
692 .entry(leaf.header().timestamp())
693 .or_default()
694 .push(leaf.height());
695 Ok(())
696 }
697
698 async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
699 if !self
700 .inner
701 .block_storage
702 .insert(block.height() as usize, block.clone())?
703 {
704 return Ok(());
706 }
707 self.inner.num_transactions += block.len();
708 self.inner.payload_size += block.size() as usize;
709 for (_, txn) in block.enumerate() {
710 update_index_by_hash(
711 &mut self.inner.index_by_txn_hash,
712 txn.commit(),
713 block.height(),
714 );
715 }
716 Ok(())
717 }
718
719 async fn insert_vid(
720 &mut self,
721 common: VidCommonQueryData<Types>,
722 share: Option<VidShare>,
723 ) -> anyhow::Result<()> {
724 self.inner
725 .vid_storage
726 .insert(common.height() as usize, (common, share))?;
727 Ok(())
728 }
729
730 async fn insert_state_cert(
731 &mut self,
732 state_cert: StateCertQueryData<Types>,
733 ) -> anyhow::Result<()> {
734 self.inner
735 .state_cert_storage
736 .insert(state_cert.0.epoch.u64() as usize, state_cert)?;
737 Ok(())
738 }
739}
740
741fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
746 match index.entry(hash) {
747 Entry::Occupied(mut e) => {
748 if &pos < e.get() {
749 e.insert(pos);
751 }
752 },
753 Entry::Vacant(e) => {
754 e.insert(pos);
755 },
756 }
757}
758
759#[async_trait]
760impl<Types, T> NodeStorage<Types> for Transaction<T>
761where
762 Types: NodeType,
763 Payload<Types>: QueryablePayload<Types>,
764 Header<Types>: QueryableHeader<Types>,
765 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send,
766{
767 async fn block_height(&mut self) -> QueryResult<usize> {
768 Ok(self.inner.leaf_storage.iter().len())
769 }
770
771 async fn count_transactions_in_range(
772 &mut self,
773 range: impl RangeBounds<usize> + Send,
774 namespace: Option<NamespaceId<Types>>,
775 ) -> QueryResult<usize> {
776 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
777 || !matches!(range.end_bound(), Bound::Unbounded)
778 {
779 return Err(QueryError::Error {
780 message: "partial aggregates are not supported with file system backend".into(),
781 });
782 }
783
784 if namespace.is_some() {
785 return Err(QueryError::Error {
786 message: "file system does not support per-namespace stats".into(),
787 });
788 }
789
790 Ok(self.inner.num_transactions)
791 }
792
793 async fn payload_size_in_range(
794 &mut self,
795 range: impl RangeBounds<usize> + Send,
796 namespace: Option<NamespaceId<Types>>,
797 ) -> QueryResult<usize> {
798 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
799 || !matches!(range.end_bound(), Bound::Unbounded)
800 {
801 return Err(QueryError::Error {
802 message: "partial aggregates are not supported with file system backend".into(),
803 });
804 }
805
806 if namespace.is_some() {
807 return Err(QueryError::Error {
808 message: "file system does not support per-namespace stats".into(),
809 });
810 }
811
812 Ok(self.inner.payload_size)
813 }
814
815 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
816 where
817 ID: Into<BlockId<Types>> + Send + Sync,
818 {
819 self.inner
820 .vid_storage
821 .iter()
822 .nth(self.inner.get_block_index(id.into())?)
823 .context(NotFoundSnafu)?
824 .context(MissingSnafu)?
825 .1
826 .context(MissingSnafu)
827 }
828
829 async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
830 let height = self.inner.leaf_storage.iter().len();
831
832 let missing_vid = self.inner.vid_storage.missing(height);
835 let null_vid_shares: usize = self
838 .inner
839 .vid_storage
840 .iter()
841 .map(|res| if matches!(res, Some((_, None))) { 1 } else { 0 })
842 .sum();
843 Ok(SyncStatus {
844 missing_blocks: self.inner.block_storage.missing(height),
845 missing_leaves: self.inner.leaf_storage.missing(height),
846 missing_vid_common: missing_vid,
847 missing_vid_shares: missing_vid + null_vid_shares,
848 pruned_height: None,
849 })
850 }
851
852 async fn get_header_window(
853 &mut self,
854 start: impl Into<WindowStart<Types>> + Send + Sync,
855 end: u64,
856 limit: usize,
857 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
858 let first_block = match start.into() {
859 WindowStart::Height(h) => h,
860 WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
861 WindowStart::Time(t) => {
862 let blocks = self
865 .inner
866 .index_by_time
867 .range(t..)
868 .next()
869 .context(NotFoundSnafu)?
870 .1;
871 blocks[0]
876 },
877 } as usize;
878
879 let mut res = TimeWindowQueryData::default();
880
881 if first_block > 0 {
883 res.prev = Some(self.inner.get_header((first_block - 1).into())?);
884 }
885
886 for block in self.inner.get_block_range(first_block..)? {
889 let header = block?.header().clone();
890 if header.timestamp() >= end {
891 res.next = Some(header);
892 break;
893 }
894 res.window.push(header);
895 if res.window.len() >= limit {
896 break;
897 }
898 }
899
900 Ok(res)
901 }
902}
903
904impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
905where
906 Types: NodeType,
907 Header<Types>: QueryableHeader<Types>,
908{
909 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
910 Ok(0)
911 }
912
913 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
914 Ok(None)
915 }
916}
917
918impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
919where
920 Types: NodeType,
921 Header<Types>: QueryableHeader<Types>,
922{
923 async fn update_aggregates(
924 &mut self,
925 _prev: Aggregate<Types>,
926 _blocks: &[PayloadMetadata<Types>],
927 ) -> anyhow::Result<Aggregate<Types>> {
928 Ok(Aggregate::default())
929 }
930}
931
932impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
933
934impl<Types> HasMetrics for FileSystemStorage<Types>
935where
936 Types: NodeType,
937 Header<Types>: QueryableHeader<Types>,
938 Payload<Types>: QueryablePayload<Types>,
939{
940 fn metrics(&self) -> &PrometheusMetrics {
941 &self.metrics
942 }
943}