1#![cfg(feature = "file-system-data-source")]
14
15use std::{
16 collections::{
17 BTreeMap,
18 hash_map::{Entry, HashMap},
19 },
20 hash::Hash,
21 iter,
22 ops::{Bound, Deref, RangeBounds},
23 path::Path,
24};
25
26use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use async_trait::async_trait;
28use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
29use committable::Committable;
30use futures::future::Future;
31use hotshot_types::{
32 data::{VidCommitment, VidShare},
33 simple_certificate::CertificatePair,
34 traits::{block_contents::BlockHeader, node_implementation::NodeType},
35};
36use serde::{Serialize, de::DeserializeOwned};
37use snafu::OptionExt;
38
39use super::{
40 Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
41 UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
42 ledger_log::{Iter, LedgerLog},
43 pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
44};
45use crate::{
46 Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
47 availability::{
48 NamespaceId,
49 data_source::{BlockId, LeafId},
50 query_data::{
51 BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52 QueryablePayload, TransactionHash, VidCommonQueryData,
53 },
54 },
55 data_source::{VersionedDataSource, update},
56 metrics::PrometheusMetrics,
57 node::{SyncStatusQueryData, TimeWindowQueryData, WindowStart},
58 status::HasMetrics,
59 types::HeightIndexed,
60};
61
62const CACHED_LEAVES_COUNT: usize = 100;
63const CACHED_BLOCKS_COUNT: usize = 100;
64const CACHED_VID_COMMON_COUNT: usize = 100;
65
66#[derive(custom_debug::Debug)]
67pub struct FileSystemStorageInner<Types>
68where
69 Types: NodeType,
70 Header<Types>: QueryableHeader<Types>,
71 Payload<Types>: QueryablePayload<Types>,
72{
73 index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
74 index_by_block_hash: HashMap<BlockHash<Types>, u64>,
75 index_by_payload_hash: HashMap<VidCommitment, u64>,
76 index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
77 index_by_time: BTreeMap<u64, Vec<u64>>,
78 num_transactions: usize,
79 payload_size: usize,
80 #[debug(skip)]
81 top_storage: Option<AtomicStore>,
82 leaf_storage: LedgerLog<LeafQueryData<Types>>,
83 block_storage: LedgerLog<BlockQueryData<Types>>,
84 vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
85 latest_qc_chain: Option<[CertificatePair<Types>; 2]>,
86}
87
88impl<Types> FileSystemStorageInner<Types>
89where
90 Types: NodeType,
91 Header<Types>: QueryableHeader<Types>,
92 Payload<Types>: QueryablePayload<Types>,
93{
94 fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
95 match id {
96 BlockId::Number(n) => Ok(n),
97 BlockId::Hash(h) => {
98 Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
99 },
100 BlockId::PayloadHash(h) => {
101 Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
102 },
103 }
104 }
105
106 fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
107 self.block_storage
108 .iter()
109 .nth(self.get_block_index(id)?)
110 .context(NotFoundSnafu)?
111 .context(MissingSnafu)
112 }
113
114 fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
115 self.get_block(id).map(|block| block.header)
116 }
117
118 fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
119 where
120 R: RangeBounds<usize> + Send,
121 {
122 Ok(range_iter(self.block_storage.iter(), range).collect())
123 }
124}
125
126#[derive(Debug)]
128pub struct FileSystemStorage<Types: NodeType>
129where
130 Header<Types>: QueryableHeader<Types>,
131 Payload<Types>: QueryablePayload<Types>,
132{
133 inner: RwLock<FileSystemStorageInner<Types>>,
134 metrics: PrometheusMetrics,
135}
136
137impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
138where
139 Header<Types>: QueryableHeader<Types>,
140 Payload<Types>: QueryablePayload<Types>,
141{
142}
143impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
144where
145 Header<Types>: QueryableHeader<Types>,
146 Payload<Types>: QueryablePayload<Types>,
147{
148 type Pruner = ();
149}
150
151impl<Types: NodeType> FileSystemStorage<Types>
152where
153 Payload<Types>: QueryablePayload<Types>,
154 Header<Types>: QueryableHeader<Types>,
155{
156 pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
162 let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
163 loader.retain_archives(1);
164 let data_source = Self::create_with_store(&mut loader).await?;
165 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
166 Ok(data_source)
167 }
168
169 pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
175 let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
176 loader.retain_archives(1);
177 let data_source = Self::open_with_store(&mut loader).await?;
178 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
179 Ok(data_source)
180 }
181
182 pub async fn create_with_store(
191 loader: &mut AtomicStoreLoader,
192 ) -> Result<Self, PersistenceError> {
193 Ok(Self {
194 inner: RwLock::new(FileSystemStorageInner {
195 index_by_leaf_hash: Default::default(),
196 index_by_block_hash: Default::default(),
197 index_by_payload_hash: Default::default(),
198 index_by_txn_hash: Default::default(),
199 index_by_time: Default::default(),
200 num_transactions: 0,
201 payload_size: 0,
202 top_storage: None,
203 leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
204 block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
205 vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
206 latest_qc_chain: None,
207 }),
208 metrics: Default::default(),
209 })
210 }
211
212 pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
221 let leaf_storage =
222 LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
223 let block_storage =
224 LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
225 let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
226 loader,
227 "vid_common",
228 CACHED_VID_COMMON_COUNT,
229 )?;
230
231 let mut index_by_block_hash = HashMap::new();
232 let mut index_by_payload_hash = HashMap::new();
233 let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
234 let index_by_leaf_hash = leaf_storage
235 .iter()
236 .flatten()
237 .map(|leaf| {
238 update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
239 update_index_by_hash(
240 &mut index_by_payload_hash,
241 leaf.payload_hash(),
242 leaf.height(),
243 );
244 index_by_time
245 .entry(leaf.header().timestamp())
246 .or_default()
247 .push(leaf.height());
248 (leaf.hash(), leaf.height())
249 })
250 .collect();
251
252 let mut index_by_txn_hash = HashMap::new();
253 let mut num_transactions = 0;
254 let mut payload_size = 0;
255 for block in block_storage.iter().flatten() {
256 num_transactions += block.len();
257 payload_size += block.size() as usize;
258
259 let height = block.height();
260 for (_, txn) in block.enumerate() {
261 update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
262 }
263 }
264
265 Ok(Self {
266 inner: RwLock::new(FileSystemStorageInner {
267 index_by_leaf_hash,
268 index_by_block_hash,
269 index_by_payload_hash,
270 index_by_txn_hash,
271 index_by_time,
272 num_transactions,
273 payload_size,
274 leaf_storage,
275 block_storage,
276 vid_storage,
277 top_storage: None,
278 latest_qc_chain: None,
279 }),
280 metrics: Default::default(),
281 })
282 }
283
284 pub async fn skip_version(&self) -> Result<(), PersistenceError> {
286 let mut inner = self.inner.write().await;
287 inner.leaf_storage.skip_version()?;
288 inner.block_storage.skip_version()?;
289 inner.vid_storage.skip_version()?;
290 if let Some(store) = &mut inner.top_storage {
291 store.commit_version()?;
292 }
293 Ok(())
294 }
295
296 pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
298 let mut tx = self.read().await.map_err(|err| QueryError::Error {
299 message: err.to_string(),
300 })?;
301 let share = tx.vid_share(block_id).await?;
302 Ok(share)
303 }
304
305 pub async fn get_vid_common(
307 &self,
308 block_id: BlockId<Types>,
309 ) -> QueryResult<VidCommonQueryData<Types>> {
310 let mut tx = self.read().await.map_err(|err| QueryError::Error {
311 message: err.to_string(),
312 })?;
313 let share = tx.get_vid_common(block_id).await?;
314 Ok(share)
315 }
316
317 pub async fn get_vid_common_metadata(
319 &self,
320 block_id: BlockId<Types>,
321 ) -> QueryResult<VidCommonMetadata<Types>> {
322 let mut tx = self.read().await.map_err(|err| QueryError::Error {
323 message: err.to_string(),
324 })?;
325 let share = tx.get_vid_common_metadata(block_id).await?;
326 Ok(share)
327 }
328}
329
330pub trait Revert {
331 fn revert(&mut self);
332}
333
334impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
335where
336 Types: NodeType,
337 Header<Types>: QueryableHeader<Types>,
338 Payload<Types>: QueryablePayload<Types>,
339{
340 fn revert(&mut self) {
341 self.leaf_storage.revert_version().unwrap();
342 self.block_storage.revert_version().unwrap();
343 self.vid_storage.revert_version().unwrap();
344 }
345}
346
347impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
348where
349 Types: NodeType,
350 Header<Types>: QueryableHeader<Types>,
351 Payload<Types>: QueryablePayload<Types>,
352{
353 fn revert(&mut self) {
354 }
356}
357
358#[derive(Debug)]
359pub struct Transaction<T: Revert> {
360 inner: T,
361}
362
363impl<T: Revert> Drop for Transaction<T> {
364 fn drop(&mut self) {
365 self.inner.revert();
366 }
367}
368impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
369where
370 Types: NodeType,
371 Header<Types>: QueryableHeader<Types>,
372 Payload<Types>: QueryablePayload<Types>,
373{
374 async fn commit(mut self) -> anyhow::Result<()> {
375 self.inner.leaf_storage.commit_version().await?;
376 self.inner.block_storage.commit_version().await?;
377 self.inner.vid_storage.commit_version().await?;
378 if let Some(store) = &mut self.inner.top_storage {
379 store.commit_version()?;
380 }
381 Ok(())
382 }
383
384 fn revert(self) -> impl Future + Send {
385 async move {}
387 }
388}
389
390impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
391where
392 Types: NodeType,
393 Header<Types>: QueryableHeader<Types>,
394 Payload<Types>: QueryablePayload<Types>,
395{
396 async fn commit(self) -> anyhow::Result<()> {
397 Ok(())
399 }
400
401 fn revert(self) -> impl Future + Send {
402 async move {}
404 }
405}
406
407impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
408where
409 Header<Types>: QueryableHeader<Types>,
410 Payload<Types>: QueryablePayload<Types>,
411{
412 type Transaction<'a>
413 = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
414 where
415 Self: 'a;
416 type ReadOnly<'a>
417 = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
418 where
419 Self: 'a;
420
421 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
422 Ok(Transaction {
423 inner: self.inner.write().await,
424 })
425 }
426
427 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
428 Ok(Transaction {
429 inner: self.inner.read().await,
430 })
431 }
432}
433fn range_iter<T>(
434 mut iter: Iter<'_, T>,
435 range: impl RangeBounds<usize>,
436) -> impl '_ + Iterator<Item = QueryResult<T>>
437where
438 T: Clone + Serialize + DeserializeOwned,
439{
440 let start = range.start_bound().cloned();
441 let end = range.end_bound().cloned();
442
443 let mut pos = match start {
445 Bound::Included(n) => {
446 if n > 0 {
447 iter.nth(n - 1);
448 }
449 n
450 },
451 Bound::Excluded(n) => {
452 iter.nth(n);
453 n + 1
454 },
455 Bound::Unbounded => 0,
456 };
457
458 iter::from_fn(move || {
459 let reached_end = match end {
461 Bound::Included(n) => pos > n,
462 Bound::Excluded(n) => pos >= n,
463 Bound::Unbounded => false,
464 };
465 if reached_end {
466 return None;
467 }
468 let opt = iter.next()?;
469 pos += 1;
470 Some(opt.context(MissingSnafu))
471 })
472}
473
474#[async_trait]
475impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
476where
477 Types: NodeType,
478 Payload<Types>: QueryablePayload<Types>,
479 Header<Types>: QueryableHeader<Types>,
480 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
481{
482 async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
483 let n = match id {
484 LeafId::Number(n) => n,
485 LeafId::Hash(h) => *self
486 .inner
487 .index_by_leaf_hash
488 .get(&h)
489 .context(NotFoundSnafu)? as usize,
490 };
491 self.inner
492 .leaf_storage
493 .iter()
494 .nth(n)
495 .context(NotFoundSnafu)?
496 .context(MissingSnafu)
497 }
498
499 async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
500 self.inner.get_block(id)
501 }
502
503 async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
504 self.inner.get_header(id)
505 }
506
507 async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
508 self.get_block(id).await.map(PayloadQueryData::from)
509 }
510
511 async fn get_payload_metadata(
512 &mut self,
513 id: BlockId<Types>,
514 ) -> QueryResult<PayloadMetadata<Types>> {
515 self.get_block(id).await.map(PayloadMetadata::from)
516 }
517
518 async fn get_vid_common(
519 &mut self,
520 id: BlockId<Types>,
521 ) -> QueryResult<VidCommonQueryData<Types>> {
522 Ok(self
523 .inner
524 .vid_storage
525 .iter()
526 .nth(self.inner.get_block_index(id)?)
527 .context(NotFoundSnafu)?
528 .context(MissingSnafu)?
529 .0)
530 }
531
532 async fn get_vid_common_metadata(
533 &mut self,
534 id: BlockId<Types>,
535 ) -> QueryResult<VidCommonMetadata<Types>> {
536 self.get_vid_common(id).await.map(VidCommonMetadata::from)
537 }
538
539 async fn get_leaf_range<R>(
540 &mut self,
541 range: R,
542 ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
543 where
544 R: RangeBounds<usize> + Send,
545 {
546 Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
547 }
548
549 async fn get_block_range<R>(
550 &mut self,
551 range: R,
552 ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
553 where
554 R: RangeBounds<usize> + Send,
555 {
556 self.inner.get_block_range(range)
557 }
558
559 async fn get_payload_range<R>(
560 &mut self,
561 range: R,
562 ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
563 where
564 R: RangeBounds<usize> + Send,
565 {
566 Ok(range_iter(self.inner.block_storage.iter(), range)
567 .map(|res| res.map(PayloadQueryData::from))
568 .collect())
569 }
570
571 async fn get_payload_metadata_range<R>(
572 &mut self,
573 range: R,
574 ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
575 where
576 R: RangeBounds<usize> + Send + 'static,
577 {
578 Ok(range_iter(self.inner.block_storage.iter(), range)
579 .map(|res| res.map(PayloadMetadata::from))
580 .collect())
581 }
582
583 async fn get_vid_common_range<R>(
584 &mut self,
585 range: R,
586 ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
587 where
588 R: RangeBounds<usize> + Send,
589 {
590 Ok(range_iter(self.inner.vid_storage.iter(), range)
591 .map(|res| res.map(|(common, _)| common))
592 .collect())
593 }
594
595 async fn get_vid_common_metadata_range<R>(
596 &mut self,
597 range: R,
598 ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
599 where
600 R: RangeBounds<usize> + Send,
601 {
602 Ok(range_iter(self.inner.vid_storage.iter(), range)
603 .map(|res| res.map(|(common, _)| common.into()))
604 .collect())
605 }
606
607 async fn get_block_with_transaction(
608 &mut self,
609 hash: TransactionHash<Types>,
610 ) -> QueryResult<BlockQueryData<Types>> {
611 let height = self
612 .inner
613 .index_by_txn_hash
614 .get(&hash)
615 .context(NotFoundSnafu)?;
616 self.inner.get_block((*height as usize).into())
617 }
618
619 async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
620 self.get_leaf((from as usize).into()).await
624 }
625}
626
627impl<Types: NodeType> UpdateAvailabilityStorage<Types>
628 for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
629where
630 Payload<Types>: QueryablePayload<Types>,
631 Header<Types>: QueryableHeader<Types>,
632{
633 async fn insert_leaf_with_qc_chain(
634 &mut self,
635 leaf: LeafQueryData<Types>,
636 qc_chain: Option<[CertificatePair<Types>; 2]>,
637 ) -> anyhow::Result<()> {
638 self.inner
639 .leaf_storage
640 .insert(leaf.height() as usize, leaf.clone())?;
641 self.inner
642 .index_by_leaf_hash
643 .insert(leaf.hash(), leaf.height());
644 update_index_by_hash(
645 &mut self.inner.index_by_block_hash,
646 leaf.block_hash(),
647 leaf.height(),
648 );
649 update_index_by_hash(
650 &mut self.inner.index_by_payload_hash,
651 leaf.payload_hash(),
652 leaf.height(),
653 );
654 self.inner
655 .index_by_time
656 .entry(leaf.header().timestamp())
657 .or_default()
658 .push(leaf.height());
659
660 if leaf.height() + 1 >= (self.inner.leaf_storage.iter().len() as u64) {
661 if let Some(qc_chain) = qc_chain {
666 self.inner.latest_qc_chain = Some(qc_chain);
667 } else {
668 self.inner.latest_qc_chain = None;
671 }
672 }
673
674 Ok(())
675 }
676
677 async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
678 if !self
679 .inner
680 .block_storage
681 .insert(block.height() as usize, block.clone())?
682 {
683 return Ok(());
685 }
686 self.inner.num_transactions += block.len();
687 self.inner.payload_size += block.size() as usize;
688 for (_, txn) in block.enumerate() {
689 update_index_by_hash(
690 &mut self.inner.index_by_txn_hash,
691 txn.commit(),
692 block.height(),
693 );
694 }
695 Ok(())
696 }
697
698 async fn insert_vid(
699 &mut self,
700 common: VidCommonQueryData<Types>,
701 share: Option<VidShare>,
702 ) -> anyhow::Result<()> {
703 self.inner
704 .vid_storage
705 .insert(common.height() as usize, (common, share))?;
706 Ok(())
707 }
708}
709
710fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
715 match index.entry(hash) {
716 Entry::Occupied(mut e) => {
717 if &pos < e.get() {
718 e.insert(pos);
720 }
721 },
722 Entry::Vacant(e) => {
723 e.insert(pos);
724 },
725 }
726}
727
728#[async_trait]
729impl<Types, T> NodeStorage<Types> for Transaction<T>
730where
731 Types: NodeType,
732 Payload<Types>: QueryablePayload<Types>,
733 Header<Types>: QueryableHeader<Types>,
734 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
735{
736 async fn block_height(&mut self) -> QueryResult<usize> {
737 Ok(self.inner.leaf_storage.iter().len())
738 }
739
740 async fn count_transactions_in_range(
741 &mut self,
742 range: impl RangeBounds<usize> + Send,
743 namespace: Option<NamespaceId<Types>>,
744 ) -> QueryResult<usize> {
745 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
746 || !matches!(range.end_bound(), Bound::Unbounded)
747 {
748 return Err(QueryError::Error {
749 message: "partial aggregates are not supported with file system backend".into(),
750 });
751 }
752
753 if namespace.is_some() {
754 return Err(QueryError::Error {
755 message: "file system does not support per-namespace stats".into(),
756 });
757 }
758
759 Ok(self.inner.num_transactions)
760 }
761
762 async fn payload_size_in_range(
763 &mut self,
764 range: impl RangeBounds<usize> + Send,
765 namespace: Option<NamespaceId<Types>>,
766 ) -> QueryResult<usize> {
767 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
768 || !matches!(range.end_bound(), Bound::Unbounded)
769 {
770 return Err(QueryError::Error {
771 message: "partial aggregates are not supported with file system backend".into(),
772 });
773 }
774
775 if namespace.is_some() {
776 return Err(QueryError::Error {
777 message: "file system does not support per-namespace stats".into(),
778 });
779 }
780
781 Ok(self.inner.payload_size)
782 }
783
784 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
785 where
786 ID: Into<BlockId<Types>> + Send + Sync,
787 {
788 self.inner
789 .vid_storage
790 .iter()
791 .nth(self.inner.get_block_index(id.into())?)
792 .context(NotFoundSnafu)?
793 .context(MissingSnafu)?
794 .1
795 .context(MissingSnafu)
796 }
797
798 async fn sync_status_for_range(
799 &mut self,
800 start: usize,
801 end: usize,
802 ) -> QueryResult<SyncStatusQueryData> {
803 Ok(SyncStatusQueryData {
804 leaves: self.inner.leaf_storage.sync_status(start, end, |_| false),
805 blocks: self.inner.block_storage.sync_status(start, end, |_| false),
806 vid_common: self.inner.vid_storage.sync_status(start, end, |_| false),
807 vid_shares: self
808 .inner
809 .vid_storage
810 .sync_status(start, end, |(_, share)| share.is_none()),
813 pruned_height: None,
814 })
815 }
816
817 async fn get_header_window(
818 &mut self,
819 start: impl Into<WindowStart<Types>> + Send + Sync,
820 end: u64,
821 limit: usize,
822 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
823 let first_block = match start.into() {
824 WindowStart::Height(h) => h,
825 WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
826 WindowStart::Time(t) => {
827 let blocks = self
830 .inner
831 .index_by_time
832 .range(t..)
833 .next()
834 .context(NotFoundSnafu)?
835 .1;
836 blocks[0]
841 },
842 } as usize;
843
844 let mut res = TimeWindowQueryData::default();
845
846 if first_block > 0 {
848 res.prev = Some(self.inner.get_header((first_block - 1).into())?);
849 }
850
851 for block in self.inner.get_block_range(first_block..)? {
854 let header = block?.header().clone();
855 if header.timestamp() >= end {
856 res.next = Some(header);
857 break;
858 }
859 res.window.push(header);
860 if res.window.len() >= limit {
861 break;
862 }
863 }
864
865 Ok(res)
866 }
867
868 async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
869 Ok(self.inner.latest_qc_chain.clone())
870 }
871}
872
873impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
874where
875 Types: NodeType,
876 Header<Types>: QueryableHeader<Types>,
877{
878 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
879 Ok(0)
880 }
881
882 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
883 Ok(None)
884 }
885}
886
887impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
888where
889 Types: NodeType,
890 Header<Types>: QueryableHeader<Types>,
891{
892 async fn update_aggregates(
893 &mut self,
894 _prev: Aggregate<Types>,
895 _blocks: &[PayloadMetadata<Types>],
896 ) -> anyhow::Result<Aggregate<Types>> {
897 Ok(Aggregate::default())
898 }
899}
900
901impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
902
903impl<Types> HasMetrics for FileSystemStorage<Types>
904where
905 Types: NodeType,
906 Header<Types>: QueryableHeader<Types>,
907 Payload<Types>: QueryablePayload<Types>,
908{
909 fn metrics(&self) -> &PrometheusMetrics {
910 &self.metrics
911 }
912}