1#![cfg(feature = "file-system-data-source")]
14
15use std::{
16 collections::{
17 hash_map::{Entry, HashMap},
18 BTreeMap,
19 },
20 hash::Hash,
21 ops::{Bound, Deref, RangeBounds},
22 path::Path,
23};
24
25use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
26use async_trait::async_trait;
27use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
28use committable::Committable;
29use futures::future::Future;
30use hotshot_types::{
31 data::{VidCommitment, VidShare},
32 traits::{
33 block_contents::BlockHeader,
34 node_implementation::{ConsensusTime, NodeType},
35 },
36};
37use serde::{de::DeserializeOwned, Serialize};
38use snafu::OptionExt;
39
40use super::{
41 ledger_log::{Iter, LedgerLog},
42 pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
43 sql::MigrateTypes,
44 Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
45 UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
46};
47use crate::{
48 availability::{
49 data_source::{BlockId, LeafId},
50 query_data::{
51 BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52 QueryablePayload, TransactionHash, VidCommonQueryData,
53 },
54 NamespaceId, StateCertQueryDataV2,
55 },
56 data_source::{update, VersionedDataSource},
57 metrics::PrometheusMetrics,
58 node::{SyncStatus, TimeWindowQueryData, WindowStart},
59 status::HasMetrics,
60 types::HeightIndexed,
61 Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
62};
63
64const CACHED_LEAVES_COUNT: usize = 100;
65const CACHED_BLOCKS_COUNT: usize = 100;
66const CACHED_VID_COMMON_COUNT: usize = 100;
67const CACHED_STATE_CERT_COUNT: usize = 5;
68
69#[derive(custom_debug::Debug)]
70pub struct FileSystemStorageInner<Types>
71where
72 Types: NodeType,
73 Header<Types>: QueryableHeader<Types>,
74 Payload<Types>: QueryablePayload<Types>,
75{
76 index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
77 index_by_block_hash: HashMap<BlockHash<Types>, u64>,
78 index_by_payload_hash: HashMap<VidCommitment, u64>,
79 index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
80 index_by_time: BTreeMap<u64, Vec<u64>>,
81 num_transactions: usize,
82 payload_size: usize,
83 #[debug(skip)]
84 top_storage: Option<AtomicStore>,
85 leaf_storage: LedgerLog<LeafQueryData<Types>>,
86 block_storage: LedgerLog<BlockQueryData<Types>>,
87 vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
88 state_cert_storage: LedgerLog<StateCertQueryDataV2<Types>>,
89}
90
91impl<Types> FileSystemStorageInner<Types>
92where
93 Types: NodeType,
94 Header<Types>: QueryableHeader<Types>,
95 Payload<Types>: QueryablePayload<Types>,
96{
97 fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
98 match id {
99 BlockId::Number(n) => Ok(n),
100 BlockId::Hash(h) => {
101 Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
102 },
103 BlockId::PayloadHash(h) => {
104 Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
105 },
106 }
107 }
108
109 fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
110 self.block_storage
111 .iter()
112 .nth(self.get_block_index(id)?)
113 .context(NotFoundSnafu)?
114 .context(MissingSnafu)
115 }
116
117 fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
118 self.get_block(id).map(|block| block.header)
119 }
120
121 fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
122 where
123 R: RangeBounds<usize> + Send,
124 {
125 Ok(range_iter(self.block_storage.iter(), range).collect())
126 }
127}
128
129#[derive(Debug)]
131pub struct FileSystemStorage<Types: NodeType>
132where
133 Header<Types>: QueryableHeader<Types>,
134 Payload<Types>: QueryablePayload<Types>,
135{
136 inner: RwLock<FileSystemStorageInner<Types>>,
137 metrics: PrometheusMetrics,
138}
139
140impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
141where
142 Header<Types>: QueryableHeader<Types>,
143 Payload<Types>: QueryablePayload<Types>,
144{
145}
146impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
147where
148 Header<Types>: QueryableHeader<Types>,
149 Payload<Types>: QueryablePayload<Types>,
150{
151 type Pruner = ();
152}
153
154#[async_trait]
155impl<Types: NodeType> MigrateTypes<Types> for FileSystemStorage<Types>
156where
157 Header<Types>: QueryableHeader<Types>,
158 Payload<Types>: QueryablePayload<Types>,
159{
160 async fn migrate_types(&self, _batch_size: u64) -> anyhow::Result<()> {
161 Ok(())
162 }
163}
164
165impl<Types: NodeType> FileSystemStorage<Types>
166where
167 Payload<Types>: QueryablePayload<Types>,
168 Header<Types>: QueryableHeader<Types>,
169{
170 pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
176 let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
177 loader.retain_archives(1);
178 let data_source = Self::create_with_store(&mut loader).await?;
179 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
180 Ok(data_source)
181 }
182
183 pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
189 let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
190 loader.retain_archives(1);
191 let data_source = Self::open_with_store(&mut loader).await?;
192 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
193 Ok(data_source)
194 }
195
196 pub async fn create_with_store(
205 loader: &mut AtomicStoreLoader,
206 ) -> Result<Self, PersistenceError> {
207 Ok(Self {
208 inner: RwLock::new(FileSystemStorageInner {
209 index_by_leaf_hash: Default::default(),
210 index_by_block_hash: Default::default(),
211 index_by_payload_hash: Default::default(),
212 index_by_txn_hash: Default::default(),
213 index_by_time: Default::default(),
214 num_transactions: 0,
215 payload_size: 0,
216 top_storage: None,
217 leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
218 block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
219 vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
220 state_cert_storage: LedgerLog::create(
221 loader,
222 "state_cert",
223 CACHED_STATE_CERT_COUNT,
224 )?,
225 }),
226 metrics: Default::default(),
227 })
228 }
229
230 pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
239 let leaf_storage =
240 LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
241 let block_storage =
242 LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
243 let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
244 loader,
245 "vid_common",
246 CACHED_VID_COMMON_COUNT,
247 )?;
248 let state_cert_storage = LedgerLog::<StateCertQueryDataV2<Types>>::open(
249 loader,
250 "state_cert",
251 CACHED_STATE_CERT_COUNT,
252 )?;
253
254 let mut index_by_block_hash = HashMap::new();
255 let mut index_by_payload_hash = HashMap::new();
256 let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
257 let index_by_leaf_hash = leaf_storage
258 .iter()
259 .flatten()
260 .map(|leaf| {
261 update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
262 update_index_by_hash(
263 &mut index_by_payload_hash,
264 leaf.payload_hash(),
265 leaf.height(),
266 );
267 index_by_time
268 .entry(leaf.header().timestamp())
269 .or_default()
270 .push(leaf.height());
271 (leaf.hash(), leaf.height())
272 })
273 .collect();
274
275 let mut index_by_txn_hash = HashMap::new();
276 let mut num_transactions = 0;
277 let mut payload_size = 0;
278 for block in block_storage.iter().flatten() {
279 num_transactions += block.len();
280 payload_size += block.size() as usize;
281
282 let height = block.height();
283 for (_, txn) in block.enumerate() {
284 update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
285 }
286 }
287
288 Ok(Self {
289 inner: RwLock::new(FileSystemStorageInner {
290 index_by_leaf_hash,
291 index_by_block_hash,
292 index_by_payload_hash,
293 index_by_txn_hash,
294 index_by_time,
295 num_transactions,
296 payload_size,
297 leaf_storage,
298 block_storage,
299 vid_storage,
300 state_cert_storage,
301 top_storage: None,
302 }),
303 metrics: Default::default(),
304 })
305 }
306
307 pub async fn skip_version(&self) -> Result<(), PersistenceError> {
309 let mut inner = self.inner.write().await;
310 inner.leaf_storage.skip_version()?;
311 inner.block_storage.skip_version()?;
312 inner.vid_storage.skip_version()?;
313 inner.state_cert_storage.skip_version()?;
314 if let Some(store) = &mut inner.top_storage {
315 store.commit_version()?;
316 }
317 Ok(())
318 }
319
320 pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
322 let mut tx = self.read().await.map_err(|err| QueryError::Error {
323 message: err.to_string(),
324 })?;
325 let share = tx.vid_share(block_id).await?;
326 Ok(share)
327 }
328
329 pub async fn get_vid_common(
331 &self,
332 block_id: BlockId<Types>,
333 ) -> QueryResult<VidCommonQueryData<Types>> {
334 let mut tx = self.read().await.map_err(|err| QueryError::Error {
335 message: err.to_string(),
336 })?;
337 let share = tx.get_vid_common(block_id).await?;
338 Ok(share)
339 }
340
341 pub async fn get_vid_common_metadata(
343 &self,
344 block_id: BlockId<Types>,
345 ) -> QueryResult<VidCommonMetadata<Types>> {
346 let mut tx = self.read().await.map_err(|err| QueryError::Error {
347 message: err.to_string(),
348 })?;
349 let share = tx.get_vid_common_metadata(block_id).await?;
350 Ok(share)
351 }
352}
353
354pub trait Revert {
355 fn revert(&mut self);
356}
357
358impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
359where
360 Types: NodeType,
361 Header<Types>: QueryableHeader<Types>,
362 Payload<Types>: QueryablePayload<Types>,
363{
364 fn revert(&mut self) {
365 self.leaf_storage.revert_version().unwrap();
366 self.block_storage.revert_version().unwrap();
367 self.vid_storage.revert_version().unwrap();
368 self.state_cert_storage.revert_version().unwrap();
369 }
370}
371
372impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
373where
374 Types: NodeType,
375 Header<Types>: QueryableHeader<Types>,
376 Payload<Types>: QueryablePayload<Types>,
377{
378 fn revert(&mut self) {
379 }
381}
382
383#[derive(Debug)]
384pub struct Transaction<T: Revert> {
385 inner: T,
386}
387
388impl<T: Revert> Drop for Transaction<T> {
389 fn drop(&mut self) {
390 self.inner.revert();
391 }
392}
393impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
394where
395 Types: NodeType,
396 Header<Types>: QueryableHeader<Types>,
397 Payload<Types>: QueryablePayload<Types>,
398{
399 async fn commit(mut self) -> anyhow::Result<()> {
400 self.inner.leaf_storage.commit_version().await?;
401 self.inner.block_storage.commit_version().await?;
402 self.inner.vid_storage.commit_version().await?;
403 self.inner.state_cert_storage.commit_version().await?;
404 if let Some(store) = &mut self.inner.top_storage {
405 store.commit_version()?;
406 }
407 Ok(())
408 }
409
410 fn revert(self) -> impl Future + Send {
411 async move {}
413 }
414}
415
416impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
417where
418 Types: NodeType,
419 Header<Types>: QueryableHeader<Types>,
420 Payload<Types>: QueryablePayload<Types>,
421{
422 async fn commit(self) -> anyhow::Result<()> {
423 Ok(())
425 }
426
427 fn revert(self) -> impl Future + Send {
428 async move {}
430 }
431}
432
433impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
434where
435 Header<Types>: QueryableHeader<Types>,
436 Payload<Types>: QueryablePayload<Types>,
437{
438 type Transaction<'a>
439 = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
440 where
441 Self: 'a;
442 type ReadOnly<'a>
443 = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
444 where
445 Self: 'a;
446
447 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
448 Ok(Transaction {
449 inner: self.inner.write().await,
450 })
451 }
452
453 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
454 Ok(Transaction {
455 inner: self.inner.read().await,
456 })
457 }
458}
459fn range_iter<T>(
460 mut iter: Iter<'_, T>,
461 range: impl RangeBounds<usize>,
462) -> impl '_ + Iterator<Item = QueryResult<T>>
463where
464 T: Clone + Serialize + DeserializeOwned,
465{
466 let start = range.start_bound().cloned();
467 let end = range.end_bound().cloned();
468
469 let pos = match start {
471 Bound::Included(n) => {
472 if n > 0 {
473 iter.nth(n - 1);
474 }
475 n
476 },
477 Bound::Excluded(n) => {
478 iter.nth(n);
479 n + 1
480 },
481 Bound::Unbounded => 0,
482 };
483
484 itertools::unfold((iter, end, pos), |(iter, end, pos)| {
485 let reached_end = match end {
487 Bound::Included(n) => pos > n,
488 Bound::Excluded(n) => pos >= n,
489 Bound::Unbounded => false,
490 };
491 if reached_end {
492 return None;
493 }
494 let opt = iter.next()?;
495 *pos += 1;
496 Some(opt.context(MissingSnafu))
497 })
498}
499
500#[async_trait]
501impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
502where
503 Types: NodeType,
504 Payload<Types>: QueryablePayload<Types>,
505 Header<Types>: QueryableHeader<Types>,
506 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
507{
508 async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
509 let n = match id {
510 LeafId::Number(n) => n,
511 LeafId::Hash(h) => *self
512 .inner
513 .index_by_leaf_hash
514 .get(&h)
515 .context(NotFoundSnafu)? as usize,
516 };
517 self.inner
518 .leaf_storage
519 .iter()
520 .nth(n)
521 .context(NotFoundSnafu)?
522 .context(MissingSnafu)
523 }
524
525 async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
526 self.inner.get_block(id)
527 }
528
529 async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
530 self.inner.get_header(id)
531 }
532
533 async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
534 self.get_block(id).await.map(PayloadQueryData::from)
535 }
536
537 async fn get_payload_metadata(
538 &mut self,
539 id: BlockId<Types>,
540 ) -> QueryResult<PayloadMetadata<Types>> {
541 self.get_block(id).await.map(PayloadMetadata::from)
542 }
543
544 async fn get_vid_common(
545 &mut self,
546 id: BlockId<Types>,
547 ) -> QueryResult<VidCommonQueryData<Types>> {
548 Ok(self
549 .inner
550 .vid_storage
551 .iter()
552 .nth(self.inner.get_block_index(id)?)
553 .context(NotFoundSnafu)?
554 .context(MissingSnafu)?
555 .0)
556 }
557
558 async fn get_vid_common_metadata(
559 &mut self,
560 id: BlockId<Types>,
561 ) -> QueryResult<VidCommonMetadata<Types>> {
562 self.get_vid_common(id).await.map(VidCommonMetadata::from)
563 }
564
565 async fn get_leaf_range<R>(
566 &mut self,
567 range: R,
568 ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
569 where
570 R: RangeBounds<usize> + Send,
571 {
572 Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
573 }
574
575 async fn get_block_range<R>(
576 &mut self,
577 range: R,
578 ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
579 where
580 R: RangeBounds<usize> + Send,
581 {
582 self.inner.get_block_range(range)
583 }
584
585 async fn get_payload_range<R>(
586 &mut self,
587 range: R,
588 ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
589 where
590 R: RangeBounds<usize> + Send,
591 {
592 Ok(range_iter(self.inner.block_storage.iter(), range)
593 .map(|res| res.map(PayloadQueryData::from))
594 .collect())
595 }
596
597 async fn get_payload_metadata_range<R>(
598 &mut self,
599 range: R,
600 ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
601 where
602 R: RangeBounds<usize> + Send + 'static,
603 {
604 Ok(range_iter(self.inner.block_storage.iter(), range)
605 .map(|res| res.map(PayloadMetadata::from))
606 .collect())
607 }
608
609 async fn get_vid_common_range<R>(
610 &mut self,
611 range: R,
612 ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
613 where
614 R: RangeBounds<usize> + Send,
615 {
616 Ok(range_iter(self.inner.vid_storage.iter(), range)
617 .map(|res| res.map(|(common, _)| common))
618 .collect())
619 }
620
621 async fn get_vid_common_metadata_range<R>(
622 &mut self,
623 range: R,
624 ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
625 where
626 R: RangeBounds<usize> + Send,
627 {
628 Ok(range_iter(self.inner.vid_storage.iter(), range)
629 .map(|res| res.map(|(common, _)| common.into()))
630 .collect())
631 }
632
633 async fn get_block_with_transaction(
634 &mut self,
635 hash: TransactionHash<Types>,
636 ) -> QueryResult<BlockQueryData<Types>> {
637 let height = self
638 .inner
639 .index_by_txn_hash
640 .get(&hash)
641 .context(NotFoundSnafu)?;
642 self.inner.get_block((*height as usize).into())
643 }
644
645 async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
646 self.get_leaf((from as usize).into()).await
650 }
651
652 async fn get_state_cert(&mut self, epoch: u64) -> QueryResult<StateCertQueryDataV2<Types>> {
653 self.inner
654 .state_cert_storage
655 .iter()
656 .nth(epoch as usize)
657 .context(NotFoundSnafu)?
658 .context(MissingSnafu)
659 }
660}
661
662impl<Types: NodeType> UpdateAvailabilityStorage<Types>
663 for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
664where
665 Payload<Types>: QueryablePayload<Types>,
666 Header<Types>: QueryableHeader<Types>,
667{
668 async fn insert_leaf(&mut self, leaf: LeafQueryData<Types>) -> anyhow::Result<()> {
669 self.inner
670 .leaf_storage
671 .insert(leaf.height() as usize, leaf.clone())?;
672 self.inner
673 .index_by_leaf_hash
674 .insert(leaf.hash(), leaf.height());
675 update_index_by_hash(
676 &mut self.inner.index_by_block_hash,
677 leaf.block_hash(),
678 leaf.height(),
679 );
680 update_index_by_hash(
681 &mut self.inner.index_by_payload_hash,
682 leaf.payload_hash(),
683 leaf.height(),
684 );
685 self.inner
686 .index_by_time
687 .entry(leaf.header().timestamp())
688 .or_default()
689 .push(leaf.height());
690 Ok(())
691 }
692
693 async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
694 if !self
695 .inner
696 .block_storage
697 .insert(block.height() as usize, block.clone())?
698 {
699 return Ok(());
701 }
702 self.inner.num_transactions += block.len();
703 self.inner.payload_size += block.size() as usize;
704 for (_, txn) in block.enumerate() {
705 update_index_by_hash(
706 &mut self.inner.index_by_txn_hash,
707 txn.commit(),
708 block.height(),
709 );
710 }
711 Ok(())
712 }
713
714 async fn insert_vid(
715 &mut self,
716 common: VidCommonQueryData<Types>,
717 share: Option<VidShare>,
718 ) -> anyhow::Result<()> {
719 self.inner
720 .vid_storage
721 .insert(common.height() as usize, (common, share))?;
722 Ok(())
723 }
724
725 async fn insert_state_cert(
726 &mut self,
727 state_cert: StateCertQueryDataV2<Types>,
728 ) -> anyhow::Result<()> {
729 self.inner
730 .state_cert_storage
731 .insert(state_cert.0.epoch.u64() as usize, state_cert)?;
732 Ok(())
733 }
734}
735
736fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
741 match index.entry(hash) {
742 Entry::Occupied(mut e) => {
743 if &pos < e.get() {
744 e.insert(pos);
746 }
747 },
748 Entry::Vacant(e) => {
749 e.insert(pos);
750 },
751 }
752}
753
754#[async_trait]
755impl<Types, T> NodeStorage<Types> for Transaction<T>
756where
757 Types: NodeType,
758 Payload<Types>: QueryablePayload<Types>,
759 Header<Types>: QueryableHeader<Types>,
760 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send,
761{
762 async fn block_height(&mut self) -> QueryResult<usize> {
763 Ok(self.inner.leaf_storage.iter().len())
764 }
765
766 async fn count_transactions_in_range(
767 &mut self,
768 range: impl RangeBounds<usize> + Send,
769 namespace: Option<NamespaceId<Types>>,
770 ) -> QueryResult<usize> {
771 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
772 || !matches!(range.end_bound(), Bound::Unbounded)
773 {
774 return Err(QueryError::Error {
775 message: "partial aggregates are not supported with file system backend".into(),
776 });
777 }
778
779 if namespace.is_some() {
780 return Err(QueryError::Error {
781 message: "file system does not support per-namespace stats".into(),
782 });
783 }
784
785 Ok(self.inner.num_transactions)
786 }
787
788 async fn payload_size_in_range(
789 &mut self,
790 range: impl RangeBounds<usize> + Send,
791 namespace: Option<NamespaceId<Types>>,
792 ) -> QueryResult<usize> {
793 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
794 || !matches!(range.end_bound(), Bound::Unbounded)
795 {
796 return Err(QueryError::Error {
797 message: "partial aggregates are not supported with file system backend".into(),
798 });
799 }
800
801 if namespace.is_some() {
802 return Err(QueryError::Error {
803 message: "file system does not support per-namespace stats".into(),
804 });
805 }
806
807 Ok(self.inner.payload_size)
808 }
809
810 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
811 where
812 ID: Into<BlockId<Types>> + Send + Sync,
813 {
814 self.inner
815 .vid_storage
816 .iter()
817 .nth(self.inner.get_block_index(id.into())?)
818 .context(NotFoundSnafu)?
819 .context(MissingSnafu)?
820 .1
821 .context(MissingSnafu)
822 }
823
824 async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
825 let height = self.inner.leaf_storage.iter().len();
826
827 let missing_vid = self.inner.vid_storage.missing(height);
830 let null_vid_shares: usize = self
833 .inner
834 .vid_storage
835 .iter()
836 .map(|res| if matches!(res, Some((_, None))) { 1 } else { 0 })
837 .sum();
838 Ok(SyncStatus {
839 missing_blocks: self.inner.block_storage.missing(height),
840 missing_leaves: self.inner.leaf_storage.missing(height),
841 missing_vid_common: missing_vid,
842 missing_vid_shares: missing_vid + null_vid_shares,
843 pruned_height: None,
844 })
845 }
846
847 async fn get_header_window(
848 &mut self,
849 start: impl Into<WindowStart<Types>> + Send + Sync,
850 end: u64,
851 limit: usize,
852 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
853 let first_block = match start.into() {
854 WindowStart::Height(h) => h,
855 WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
856 WindowStart::Time(t) => {
857 let blocks = self
860 .inner
861 .index_by_time
862 .range(t..)
863 .next()
864 .context(NotFoundSnafu)?
865 .1;
866 blocks[0]
871 },
872 } as usize;
873
874 let mut res = TimeWindowQueryData::default();
875
876 if first_block > 0 {
878 res.prev = Some(self.inner.get_header((first_block - 1).into())?);
879 }
880
881 for block in self.inner.get_block_range(first_block..)? {
884 let header = block?.header().clone();
885 if header.timestamp() >= end {
886 res.next = Some(header);
887 break;
888 }
889 res.window.push(header);
890 if res.window.len() >= limit {
891 break;
892 }
893 }
894
895 Ok(res)
896 }
897}
898
899impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
900where
901 Types: NodeType,
902 Header<Types>: QueryableHeader<Types>,
903{
904 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
905 Ok(0)
906 }
907
908 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
909 Ok(None)
910 }
911}
912
913impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
914where
915 Types: NodeType,
916 Header<Types>: QueryableHeader<Types>,
917{
918 async fn update_aggregates(
919 &mut self,
920 _prev: Aggregate<Types>,
921 _blocks: &[PayloadMetadata<Types>],
922 ) -> anyhow::Result<Aggregate<Types>> {
923 Ok(Aggregate::default())
924 }
925}
926
927impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
928
929impl<Types> HasMetrics for FileSystemStorage<Types>
930where
931 Types: NodeType,
932 Header<Types>: QueryableHeader<Types>,
933 Payload<Types>: QueryablePayload<Types>,
934{
935 fn metrics(&self) -> &PrometheusMetrics {
936 &self.metrics
937 }
938}