1#![cfg(feature = "file-system-data-source")]
14
15use std::{
16 collections::{
17 hash_map::{Entry, HashMap},
18 BTreeMap,
19 },
20 hash::Hash,
21 ops::{Bound, Deref, RangeBounds},
22 path::Path,
23};
24
25use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
26use async_trait::async_trait;
27use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
28use committable::Committable;
29use futures::future::Future;
30use hotshot_types::{
31 data::{VidCommitment, VidShare},
32 simple_certificate::CertificatePair,
33 traits::{block_contents::BlockHeader, node_implementation::NodeType},
34};
35use serde::{de::DeserializeOwned, Serialize};
36use snafu::OptionExt;
37
38use super::{
39 ledger_log::{Iter, LedgerLog},
40 pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
41 sql::MigrateTypes,
42 Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
43 UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
44};
45use crate::{
46 availability::{
47 data_source::{BlockId, LeafId},
48 query_data::{
49 BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
50 QueryablePayload, TransactionHash, VidCommonQueryData,
51 },
52 NamespaceId,
53 },
54 data_source::{update, VersionedDataSource},
55 metrics::PrometheusMetrics,
56 node::{SyncStatus, TimeWindowQueryData, WindowStart},
57 status::HasMetrics,
58 types::HeightIndexed,
59 Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
60};
61
62const CACHED_LEAVES_COUNT: usize = 100;
63const CACHED_BLOCKS_COUNT: usize = 100;
64const CACHED_VID_COMMON_COUNT: usize = 100;
65
66#[derive(custom_debug::Debug)]
67pub struct FileSystemStorageInner<Types>
68where
69 Types: NodeType,
70 Header<Types>: QueryableHeader<Types>,
71 Payload<Types>: QueryablePayload<Types>,
72{
73 index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
74 index_by_block_hash: HashMap<BlockHash<Types>, u64>,
75 index_by_payload_hash: HashMap<VidCommitment, u64>,
76 index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
77 index_by_time: BTreeMap<u64, Vec<u64>>,
78 num_transactions: usize,
79 payload_size: usize,
80 #[debug(skip)]
81 top_storage: Option<AtomicStore>,
82 leaf_storage: LedgerLog<LeafQueryData<Types>>,
83 block_storage: LedgerLog<BlockQueryData<Types>>,
84 vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
85 latest_qc_chain: Option<[CertificatePair<Types>; 2]>,
86}
87
88impl<Types> FileSystemStorageInner<Types>
89where
90 Types: NodeType,
91 Header<Types>: QueryableHeader<Types>,
92 Payload<Types>: QueryablePayload<Types>,
93{
94 fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
95 match id {
96 BlockId::Number(n) => Ok(n),
97 BlockId::Hash(h) => {
98 Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
99 },
100 BlockId::PayloadHash(h) => {
101 Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
102 },
103 }
104 }
105
106 fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
107 self.block_storage
108 .iter()
109 .nth(self.get_block_index(id)?)
110 .context(NotFoundSnafu)?
111 .context(MissingSnafu)
112 }
113
114 fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
115 self.get_block(id).map(|block| block.header)
116 }
117
118 fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
119 where
120 R: RangeBounds<usize> + Send,
121 {
122 Ok(range_iter(self.block_storage.iter(), range).collect())
123 }
124}
125
126#[derive(Debug)]
128pub struct FileSystemStorage<Types: NodeType>
129where
130 Header<Types>: QueryableHeader<Types>,
131 Payload<Types>: QueryablePayload<Types>,
132{
133 inner: RwLock<FileSystemStorageInner<Types>>,
134 metrics: PrometheusMetrics,
135}
136
137impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
138where
139 Header<Types>: QueryableHeader<Types>,
140 Payload<Types>: QueryablePayload<Types>,
141{
142}
143impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
144where
145 Header<Types>: QueryableHeader<Types>,
146 Payload<Types>: QueryablePayload<Types>,
147{
148 type Pruner = ();
149}
150
151#[async_trait]
152impl<Types: NodeType> MigrateTypes<Types> for FileSystemStorage<Types>
153where
154 Header<Types>: QueryableHeader<Types>,
155 Payload<Types>: QueryablePayload<Types>,
156{
157 async fn migrate_types(&self, _batch_size: u64) -> anyhow::Result<()> {
158 Ok(())
159 }
160}
161
162impl<Types: NodeType> FileSystemStorage<Types>
163where
164 Payload<Types>: QueryablePayload<Types>,
165 Header<Types>: QueryableHeader<Types>,
166{
167 pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
173 let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
174 loader.retain_archives(1);
175 let data_source = Self::create_with_store(&mut loader).await?;
176 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
177 Ok(data_source)
178 }
179
180 pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
186 let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
187 loader.retain_archives(1);
188 let data_source = Self::open_with_store(&mut loader).await?;
189 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
190 Ok(data_source)
191 }
192
193 pub async fn create_with_store(
202 loader: &mut AtomicStoreLoader,
203 ) -> Result<Self, PersistenceError> {
204 Ok(Self {
205 inner: RwLock::new(FileSystemStorageInner {
206 index_by_leaf_hash: Default::default(),
207 index_by_block_hash: Default::default(),
208 index_by_payload_hash: Default::default(),
209 index_by_txn_hash: Default::default(),
210 index_by_time: Default::default(),
211 num_transactions: 0,
212 payload_size: 0,
213 top_storage: None,
214 leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
215 block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
216 vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
217 latest_qc_chain: None,
218 }),
219 metrics: Default::default(),
220 })
221 }
222
223 pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
232 let leaf_storage =
233 LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
234 let block_storage =
235 LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
236 let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
237 loader,
238 "vid_common",
239 CACHED_VID_COMMON_COUNT,
240 )?;
241
242 let mut index_by_block_hash = HashMap::new();
243 let mut index_by_payload_hash = HashMap::new();
244 let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
245 let index_by_leaf_hash = leaf_storage
246 .iter()
247 .flatten()
248 .map(|leaf| {
249 update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
250 update_index_by_hash(
251 &mut index_by_payload_hash,
252 leaf.payload_hash(),
253 leaf.height(),
254 );
255 index_by_time
256 .entry(leaf.header().timestamp())
257 .or_default()
258 .push(leaf.height());
259 (leaf.hash(), leaf.height())
260 })
261 .collect();
262
263 let mut index_by_txn_hash = HashMap::new();
264 let mut num_transactions = 0;
265 let mut payload_size = 0;
266 for block in block_storage.iter().flatten() {
267 num_transactions += block.len();
268 payload_size += block.size() as usize;
269
270 let height = block.height();
271 for (_, txn) in block.enumerate() {
272 update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
273 }
274 }
275
276 Ok(Self {
277 inner: RwLock::new(FileSystemStorageInner {
278 index_by_leaf_hash,
279 index_by_block_hash,
280 index_by_payload_hash,
281 index_by_txn_hash,
282 index_by_time,
283 num_transactions,
284 payload_size,
285 leaf_storage,
286 block_storage,
287 vid_storage,
288 top_storage: None,
289 latest_qc_chain: None,
290 }),
291 metrics: Default::default(),
292 })
293 }
294
295 pub async fn skip_version(&self) -> Result<(), PersistenceError> {
297 let mut inner = self.inner.write().await;
298 inner.leaf_storage.skip_version()?;
299 inner.block_storage.skip_version()?;
300 inner.vid_storage.skip_version()?;
301 if let Some(store) = &mut inner.top_storage {
302 store.commit_version()?;
303 }
304 Ok(())
305 }
306
307 pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
309 let mut tx = self.read().await.map_err(|err| QueryError::Error {
310 message: err.to_string(),
311 })?;
312 let share = tx.vid_share(block_id).await?;
313 Ok(share)
314 }
315
316 pub async fn get_vid_common(
318 &self,
319 block_id: BlockId<Types>,
320 ) -> QueryResult<VidCommonQueryData<Types>> {
321 let mut tx = self.read().await.map_err(|err| QueryError::Error {
322 message: err.to_string(),
323 })?;
324 let share = tx.get_vid_common(block_id).await?;
325 Ok(share)
326 }
327
328 pub async fn get_vid_common_metadata(
330 &self,
331 block_id: BlockId<Types>,
332 ) -> QueryResult<VidCommonMetadata<Types>> {
333 let mut tx = self.read().await.map_err(|err| QueryError::Error {
334 message: err.to_string(),
335 })?;
336 let share = tx.get_vid_common_metadata(block_id).await?;
337 Ok(share)
338 }
339}
340
341pub trait Revert {
342 fn revert(&mut self);
343}
344
345impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
346where
347 Types: NodeType,
348 Header<Types>: QueryableHeader<Types>,
349 Payload<Types>: QueryablePayload<Types>,
350{
351 fn revert(&mut self) {
352 self.leaf_storage.revert_version().unwrap();
353 self.block_storage.revert_version().unwrap();
354 self.vid_storage.revert_version().unwrap();
355 }
356}
357
358impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
359where
360 Types: NodeType,
361 Header<Types>: QueryableHeader<Types>,
362 Payload<Types>: QueryablePayload<Types>,
363{
364 fn revert(&mut self) {
365 }
367}
368
369#[derive(Debug)]
370pub struct Transaction<T: Revert> {
371 inner: T,
372}
373
374impl<T: Revert> Drop for Transaction<T> {
375 fn drop(&mut self) {
376 self.inner.revert();
377 }
378}
379impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
380where
381 Types: NodeType,
382 Header<Types>: QueryableHeader<Types>,
383 Payload<Types>: QueryablePayload<Types>,
384{
385 async fn commit(mut self) -> anyhow::Result<()> {
386 self.inner.leaf_storage.commit_version().await?;
387 self.inner.block_storage.commit_version().await?;
388 self.inner.vid_storage.commit_version().await?;
389 if let Some(store) = &mut self.inner.top_storage {
390 store.commit_version()?;
391 }
392 Ok(())
393 }
394
395 fn revert(self) -> impl Future + Send {
396 async move {}
398 }
399}
400
401impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
402where
403 Types: NodeType,
404 Header<Types>: QueryableHeader<Types>,
405 Payload<Types>: QueryablePayload<Types>,
406{
407 async fn commit(self) -> anyhow::Result<()> {
408 Ok(())
410 }
411
412 fn revert(self) -> impl Future + Send {
413 async move {}
415 }
416}
417
418impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
419where
420 Header<Types>: QueryableHeader<Types>,
421 Payload<Types>: QueryablePayload<Types>,
422{
423 type Transaction<'a>
424 = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
425 where
426 Self: 'a;
427 type ReadOnly<'a>
428 = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
429 where
430 Self: 'a;
431
432 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
433 Ok(Transaction {
434 inner: self.inner.write().await,
435 })
436 }
437
438 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
439 Ok(Transaction {
440 inner: self.inner.read().await,
441 })
442 }
443}
444fn range_iter<T>(
445 mut iter: Iter<'_, T>,
446 range: impl RangeBounds<usize>,
447) -> impl '_ + Iterator<Item = QueryResult<T>>
448where
449 T: Clone + Serialize + DeserializeOwned,
450{
451 let start = range.start_bound().cloned();
452 let end = range.end_bound().cloned();
453
454 let pos = match start {
456 Bound::Included(n) => {
457 if n > 0 {
458 iter.nth(n - 1);
459 }
460 n
461 },
462 Bound::Excluded(n) => {
463 iter.nth(n);
464 n + 1
465 },
466 Bound::Unbounded => 0,
467 };
468
469 itertools::unfold((iter, end, pos), |(iter, end, pos)| {
470 let reached_end = match end {
472 Bound::Included(n) => pos > n,
473 Bound::Excluded(n) => pos >= n,
474 Bound::Unbounded => false,
475 };
476 if reached_end {
477 return None;
478 }
479 let opt = iter.next()?;
480 *pos += 1;
481 Some(opt.context(MissingSnafu))
482 })
483}
484
485#[async_trait]
486impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
487where
488 Types: NodeType,
489 Payload<Types>: QueryablePayload<Types>,
490 Header<Types>: QueryableHeader<Types>,
491 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
492{
493 async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
494 let n = match id {
495 LeafId::Number(n) => n,
496 LeafId::Hash(h) => *self
497 .inner
498 .index_by_leaf_hash
499 .get(&h)
500 .context(NotFoundSnafu)? as usize,
501 };
502 self.inner
503 .leaf_storage
504 .iter()
505 .nth(n)
506 .context(NotFoundSnafu)?
507 .context(MissingSnafu)
508 }
509
510 async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
511 self.inner.get_block(id)
512 }
513
514 async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
515 self.inner.get_header(id)
516 }
517
518 async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
519 self.get_block(id).await.map(PayloadQueryData::from)
520 }
521
522 async fn get_payload_metadata(
523 &mut self,
524 id: BlockId<Types>,
525 ) -> QueryResult<PayloadMetadata<Types>> {
526 self.get_block(id).await.map(PayloadMetadata::from)
527 }
528
529 async fn get_vid_common(
530 &mut self,
531 id: BlockId<Types>,
532 ) -> QueryResult<VidCommonQueryData<Types>> {
533 Ok(self
534 .inner
535 .vid_storage
536 .iter()
537 .nth(self.inner.get_block_index(id)?)
538 .context(NotFoundSnafu)?
539 .context(MissingSnafu)?
540 .0)
541 }
542
543 async fn get_vid_common_metadata(
544 &mut self,
545 id: BlockId<Types>,
546 ) -> QueryResult<VidCommonMetadata<Types>> {
547 self.get_vid_common(id).await.map(VidCommonMetadata::from)
548 }
549
550 async fn get_leaf_range<R>(
551 &mut self,
552 range: R,
553 ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
554 where
555 R: RangeBounds<usize> + Send,
556 {
557 Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
558 }
559
560 async fn get_block_range<R>(
561 &mut self,
562 range: R,
563 ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
564 where
565 R: RangeBounds<usize> + Send,
566 {
567 self.inner.get_block_range(range)
568 }
569
570 async fn get_payload_range<R>(
571 &mut self,
572 range: R,
573 ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
574 where
575 R: RangeBounds<usize> + Send,
576 {
577 Ok(range_iter(self.inner.block_storage.iter(), range)
578 .map(|res| res.map(PayloadQueryData::from))
579 .collect())
580 }
581
582 async fn get_payload_metadata_range<R>(
583 &mut self,
584 range: R,
585 ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
586 where
587 R: RangeBounds<usize> + Send + 'static,
588 {
589 Ok(range_iter(self.inner.block_storage.iter(), range)
590 .map(|res| res.map(PayloadMetadata::from))
591 .collect())
592 }
593
594 async fn get_vid_common_range<R>(
595 &mut self,
596 range: R,
597 ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
598 where
599 R: RangeBounds<usize> + Send,
600 {
601 Ok(range_iter(self.inner.vid_storage.iter(), range)
602 .map(|res| res.map(|(common, _)| common))
603 .collect())
604 }
605
606 async fn get_vid_common_metadata_range<R>(
607 &mut self,
608 range: R,
609 ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
610 where
611 R: RangeBounds<usize> + Send,
612 {
613 Ok(range_iter(self.inner.vid_storage.iter(), range)
614 .map(|res| res.map(|(common, _)| common.into()))
615 .collect())
616 }
617
618 async fn get_block_with_transaction(
619 &mut self,
620 hash: TransactionHash<Types>,
621 ) -> QueryResult<BlockQueryData<Types>> {
622 let height = self
623 .inner
624 .index_by_txn_hash
625 .get(&hash)
626 .context(NotFoundSnafu)?;
627 self.inner.get_block((*height as usize).into())
628 }
629
630 async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
631 self.get_leaf((from as usize).into()).await
635 }
636}
637
638impl<Types: NodeType> UpdateAvailabilityStorage<Types>
639 for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
640where
641 Payload<Types>: QueryablePayload<Types>,
642 Header<Types>: QueryableHeader<Types>,
643{
644 async fn insert_leaf_with_qc_chain(
645 &mut self,
646 leaf: LeafQueryData<Types>,
647 qc_chain: Option<[CertificatePair<Types>; 2]>,
648 ) -> anyhow::Result<()> {
649 self.inner
650 .leaf_storage
651 .insert(leaf.height() as usize, leaf.clone())?;
652 self.inner
653 .index_by_leaf_hash
654 .insert(leaf.hash(), leaf.height());
655 update_index_by_hash(
656 &mut self.inner.index_by_block_hash,
657 leaf.block_hash(),
658 leaf.height(),
659 );
660 update_index_by_hash(
661 &mut self.inner.index_by_payload_hash,
662 leaf.payload_hash(),
663 leaf.height(),
664 );
665 self.inner
666 .index_by_time
667 .entry(leaf.header().timestamp())
668 .or_default()
669 .push(leaf.height());
670
671 if leaf.height() + 1 >= (self.inner.leaf_storage.iter().len() as u64) {
672 if let Some(qc_chain) = qc_chain {
677 self.inner.latest_qc_chain = Some(qc_chain);
678 } else {
679 self.inner.latest_qc_chain = None;
682 }
683 }
684
685 Ok(())
686 }
687
688 async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
689 if !self
690 .inner
691 .block_storage
692 .insert(block.height() as usize, block.clone())?
693 {
694 return Ok(());
696 }
697 self.inner.num_transactions += block.len();
698 self.inner.payload_size += block.size() as usize;
699 for (_, txn) in block.enumerate() {
700 update_index_by_hash(
701 &mut self.inner.index_by_txn_hash,
702 txn.commit(),
703 block.height(),
704 );
705 }
706 Ok(())
707 }
708
709 async fn insert_vid(
710 &mut self,
711 common: VidCommonQueryData<Types>,
712 share: Option<VidShare>,
713 ) -> anyhow::Result<()> {
714 self.inner
715 .vid_storage
716 .insert(common.height() as usize, (common, share))?;
717 Ok(())
718 }
719}
720
721fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
726 match index.entry(hash) {
727 Entry::Occupied(mut e) => {
728 if &pos < e.get() {
729 e.insert(pos);
731 }
732 },
733 Entry::Vacant(e) => {
734 e.insert(pos);
735 },
736 }
737}
738
739#[async_trait]
740impl<Types, T> NodeStorage<Types> for Transaction<T>
741where
742 Types: NodeType,
743 Payload<Types>: QueryablePayload<Types>,
744 Header<Types>: QueryableHeader<Types>,
745 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
746{
747 async fn block_height(&mut self) -> QueryResult<usize> {
748 Ok(self.inner.leaf_storage.iter().len())
749 }
750
751 async fn count_transactions_in_range(
752 &mut self,
753 range: impl RangeBounds<usize> + Send,
754 namespace: Option<NamespaceId<Types>>,
755 ) -> QueryResult<usize> {
756 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
757 || !matches!(range.end_bound(), Bound::Unbounded)
758 {
759 return Err(QueryError::Error {
760 message: "partial aggregates are not supported with file system backend".into(),
761 });
762 }
763
764 if namespace.is_some() {
765 return Err(QueryError::Error {
766 message: "file system does not support per-namespace stats".into(),
767 });
768 }
769
770 Ok(self.inner.num_transactions)
771 }
772
773 async fn payload_size_in_range(
774 &mut self,
775 range: impl RangeBounds<usize> + Send,
776 namespace: Option<NamespaceId<Types>>,
777 ) -> QueryResult<usize> {
778 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
779 || !matches!(range.end_bound(), Bound::Unbounded)
780 {
781 return Err(QueryError::Error {
782 message: "partial aggregates are not supported with file system backend".into(),
783 });
784 }
785
786 if namespace.is_some() {
787 return Err(QueryError::Error {
788 message: "file system does not support per-namespace stats".into(),
789 });
790 }
791
792 Ok(self.inner.payload_size)
793 }
794
795 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
796 where
797 ID: Into<BlockId<Types>> + Send + Sync,
798 {
799 self.inner
800 .vid_storage
801 .iter()
802 .nth(self.inner.get_block_index(id.into())?)
803 .context(NotFoundSnafu)?
804 .context(MissingSnafu)?
805 .1
806 .context(MissingSnafu)
807 }
808
809 async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
810 let height = self.inner.leaf_storage.iter().len();
811
812 let missing_vid = self.inner.vid_storage.missing(height);
815 let null_vid_shares: usize = self
818 .inner
819 .vid_storage
820 .iter()
821 .map(|res| if matches!(res, Some((_, None))) { 1 } else { 0 })
822 .sum();
823 Ok(SyncStatus {
824 missing_blocks: self.inner.block_storage.missing(height),
825 missing_leaves: self.inner.leaf_storage.missing(height),
826 missing_vid_common: missing_vid,
827 missing_vid_shares: missing_vid + null_vid_shares,
828 pruned_height: None,
829 })
830 }
831
832 async fn get_header_window(
833 &mut self,
834 start: impl Into<WindowStart<Types>> + Send + Sync,
835 end: u64,
836 limit: usize,
837 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
838 let first_block = match start.into() {
839 WindowStart::Height(h) => h,
840 WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
841 WindowStart::Time(t) => {
842 let blocks = self
845 .inner
846 .index_by_time
847 .range(t..)
848 .next()
849 .context(NotFoundSnafu)?
850 .1;
851 blocks[0]
856 },
857 } as usize;
858
859 let mut res = TimeWindowQueryData::default();
860
861 if first_block > 0 {
863 res.prev = Some(self.inner.get_header((first_block - 1).into())?);
864 }
865
866 for block in self.inner.get_block_range(first_block..)? {
869 let header = block?.header().clone();
870 if header.timestamp() >= end {
871 res.next = Some(header);
872 break;
873 }
874 res.window.push(header);
875 if res.window.len() >= limit {
876 break;
877 }
878 }
879
880 Ok(res)
881 }
882
883 async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
884 Ok(self.inner.latest_qc_chain.clone())
885 }
886}
887
888impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
889where
890 Types: NodeType,
891 Header<Types>: QueryableHeader<Types>,
892{
893 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
894 Ok(0)
895 }
896
897 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
898 Ok(None)
899 }
900}
901
902impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
903where
904 Types: NodeType,
905 Header<Types>: QueryableHeader<Types>,
906{
907 async fn update_aggregates(
908 &mut self,
909 _prev: Aggregate<Types>,
910 _blocks: &[PayloadMetadata<Types>],
911 ) -> anyhow::Result<Aggregate<Types>> {
912 Ok(Aggregate::default())
913 }
914}
915
916impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
917
918impl<Types> HasMetrics for FileSystemStorage<Types>
919where
920 Types: NodeType,
921 Header<Types>: QueryableHeader<Types>,
922 Payload<Types>: QueryablePayload<Types>,
923{
924 fn metrics(&self) -> &PrometheusMetrics {
925 &self.metrics
926 }
927}