hotshot_query_service/data_source/storage/
fs.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{
16    collections::{
17        hash_map::{Entry, HashMap},
18        BTreeMap,
19    },
20    hash::Hash,
21    ops::{Bound, Deref, RangeBounds},
22    path::Path,
23};
24
25use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
26use async_trait::async_trait;
27use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
28use committable::Committable;
29use futures::future::Future;
30use hotshot_types::{
31    data::{VidCommitment, VidShare},
32    traits::{
33        block_contents::BlockHeader,
34        node_implementation::{ConsensusTime, NodeType},
35    },
36};
37use serde::{de::DeserializeOwned, Serialize};
38use snafu::OptionExt;
39
40use super::{
41    ledger_log::{Iter, LedgerLog},
42    pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
43    sql::MigrateTypes,
44    Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
45    UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
46};
47use crate::{
48    availability::{
49        data_source::{BlockId, LeafId},
50        query_data::{
51            BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52            QueryablePayload, TransactionHash, TransactionQueryData, VidCommonQueryData,
53        },
54        NamespaceId, StateCertQueryData,
55    },
56    data_source::{update, VersionedDataSource},
57    metrics::PrometheusMetrics,
58    node::{SyncStatus, TimeWindowQueryData, WindowStart},
59    status::HasMetrics,
60    types::HeightIndexed,
61    ErrorSnafu, Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
62};
63
64const CACHED_LEAVES_COUNT: usize = 100;
65const CACHED_BLOCKS_COUNT: usize = 100;
66const CACHED_VID_COMMON_COUNT: usize = 100;
67const CACHED_STATE_CERT_COUNT: usize = 5;
68
69#[derive(custom_debug::Debug)]
70pub struct FileSystemStorageInner<Types>
71where
72    Types: NodeType,
73    Header<Types>: QueryableHeader<Types>,
74    Payload<Types>: QueryablePayload<Types>,
75{
76    index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
77    index_by_block_hash: HashMap<BlockHash<Types>, u64>,
78    index_by_payload_hash: HashMap<VidCommitment, u64>,
79    index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
80    index_by_time: BTreeMap<u64, Vec<u64>>,
81    num_transactions: usize,
82    payload_size: usize,
83    #[debug(skip)]
84    top_storage: Option<AtomicStore>,
85    leaf_storage: LedgerLog<LeafQueryData<Types>>,
86    block_storage: LedgerLog<BlockQueryData<Types>>,
87    vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
88    state_cert_storage: LedgerLog<StateCertQueryData<Types>>,
89}
90
91impl<Types> FileSystemStorageInner<Types>
92where
93    Types: NodeType,
94    Header<Types>: QueryableHeader<Types>,
95    Payload<Types>: QueryablePayload<Types>,
96{
97    fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
98        match id {
99            BlockId::Number(n) => Ok(n),
100            BlockId::Hash(h) => {
101                Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
102            },
103            BlockId::PayloadHash(h) => {
104                Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
105            },
106        }
107    }
108
109    fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
110        self.block_storage
111            .iter()
112            .nth(self.get_block_index(id)?)
113            .context(NotFoundSnafu)?
114            .context(MissingSnafu)
115    }
116
117    fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
118        self.get_block(id).map(|block| block.header)
119    }
120
121    fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
122    where
123        R: RangeBounds<usize> + Send,
124    {
125        Ok(range_iter(self.block_storage.iter(), range).collect())
126    }
127}
128
129/// Storage for the APIs provided in this crate, backed by a remote PostgreSQL database.
130#[derive(Debug)]
131pub struct FileSystemStorage<Types: NodeType>
132where
133    Header<Types>: QueryableHeader<Types>,
134    Payload<Types>: QueryablePayload<Types>,
135{
136    inner: RwLock<FileSystemStorageInner<Types>>,
137    metrics: PrometheusMetrics,
138}
139
140impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
141where
142    Header<Types>: QueryableHeader<Types>,
143    Payload<Types>: QueryablePayload<Types>,
144{
145}
146impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
147where
148    Header<Types>: QueryableHeader<Types>,
149    Payload<Types>: QueryablePayload<Types>,
150{
151    type Pruner = ();
152}
153
154#[async_trait]
155impl<Types: NodeType> MigrateTypes<Types> for FileSystemStorage<Types>
156where
157    Header<Types>: QueryableHeader<Types>,
158    Payload<Types>: QueryablePayload<Types>,
159{
160    async fn migrate_types(&self, _batch_size: u64) -> anyhow::Result<()> {
161        Ok(())
162    }
163}
164
165impl<Types: NodeType> FileSystemStorage<Types>
166where
167    Payload<Types>: QueryablePayload<Types>,
168    Header<Types>: QueryableHeader<Types>,
169{
170    /// Create a new [FileSystemStorage] with storage at `path`.
171    ///
172    /// If there is already data at `path`, it will be archived.
173    ///
174    /// The [FileSystemStorage] will manage its own persistence synchronization.
175    pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
176        let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
177        loader.retain_archives(1);
178        let data_source = Self::create_with_store(&mut loader).await?;
179        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
180        Ok(data_source)
181    }
182
183    /// Open an existing [FileSystemStorage] from storage at `path`.
184    ///
185    /// If there is no data at `path`, a new store will be created.
186    ///
187    /// The [FileSystemStorage] will manage its own persistence synchronization.
188    pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
189        let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
190        loader.retain_archives(1);
191        let data_source = Self::open_with_store(&mut loader).await?;
192        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
193        Ok(data_source)
194    }
195
196    /// Create a new [FileSystemStorage] using a persistent storage loader.
197    ///
198    /// If there is existing data corresponding to the [FileSystemStorage] data structures, it will
199    /// be archived.
200    ///
201    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
202    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
203    /// synchronization of the store.
204    pub async fn create_with_store(
205        loader: &mut AtomicStoreLoader,
206    ) -> Result<Self, PersistenceError> {
207        Ok(Self {
208            inner: RwLock::new(FileSystemStorageInner {
209                index_by_leaf_hash: Default::default(),
210                index_by_block_hash: Default::default(),
211                index_by_payload_hash: Default::default(),
212                index_by_txn_hash: Default::default(),
213                index_by_time: Default::default(),
214                num_transactions: 0,
215                payload_size: 0,
216                top_storage: None,
217                leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
218                block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
219                vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
220                state_cert_storage: LedgerLog::create(
221                    loader,
222                    "state_cert",
223                    CACHED_STATE_CERT_COUNT,
224                )?,
225            }),
226            metrics: Default::default(),
227        })
228    }
229
230    /// Open an existing [FileSystemStorage] using a persistent storage loader.
231    ///
232    /// If there is no existing data corresponding to the [FileSystemStorage] data structures, a new
233    /// store will be created.
234    ///
235    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
236    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
237    /// synchronization of the store.
238    pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
239        let leaf_storage =
240            LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
241        let block_storage =
242            LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
243        let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
244            loader,
245            "vid_common",
246            CACHED_VID_COMMON_COUNT,
247        )?;
248        let state_cert_storage = LedgerLog::<StateCertQueryData<Types>>::open(
249            loader,
250            "state_cert",
251            CACHED_STATE_CERT_COUNT,
252        )?;
253
254        let mut index_by_block_hash = HashMap::new();
255        let mut index_by_payload_hash = HashMap::new();
256        let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
257        let index_by_leaf_hash = leaf_storage
258            .iter()
259            .flatten()
260            .map(|leaf| {
261                update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
262                update_index_by_hash(
263                    &mut index_by_payload_hash,
264                    leaf.payload_hash(),
265                    leaf.height(),
266                );
267                index_by_time
268                    .entry(leaf.header().timestamp())
269                    .or_default()
270                    .push(leaf.height());
271                (leaf.hash(), leaf.height())
272            })
273            .collect();
274
275        let mut index_by_txn_hash = HashMap::new();
276        let mut num_transactions = 0;
277        let mut payload_size = 0;
278        for block in block_storage.iter().flatten() {
279            num_transactions += block.len();
280            payload_size += block.size() as usize;
281
282            let height = block.height();
283            for (_, txn) in block.enumerate() {
284                update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
285            }
286        }
287
288        Ok(Self {
289            inner: RwLock::new(FileSystemStorageInner {
290                index_by_leaf_hash,
291                index_by_block_hash,
292                index_by_payload_hash,
293                index_by_txn_hash,
294                index_by_time,
295                num_transactions,
296                payload_size,
297                leaf_storage,
298                block_storage,
299                vid_storage,
300                state_cert_storage,
301                top_storage: None,
302            }),
303            metrics: Default::default(),
304        })
305    }
306
307    /// Advance the version of the persistent store without committing changes to persistent state.
308    pub async fn skip_version(&self) -> Result<(), PersistenceError> {
309        let mut inner = self.inner.write().await;
310        inner.leaf_storage.skip_version()?;
311        inner.block_storage.skip_version()?;
312        inner.vid_storage.skip_version()?;
313        inner.state_cert_storage.skip_version()?;
314        if let Some(store) = &mut inner.top_storage {
315            store.commit_version()?;
316        }
317        Ok(())
318    }
319
320    /// Get the stored VID share for a given block, if one exists.
321    pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
322        let mut tx = self.read().await.map_err(|err| QueryError::Error {
323            message: err.to_string(),
324        })?;
325        let share = tx.vid_share(block_id).await?;
326        Ok(share)
327    }
328
329    /// Get the stored VID common data for a given block, if one exists.
330    pub async fn get_vid_common(
331        &self,
332        block_id: BlockId<Types>,
333    ) -> QueryResult<VidCommonQueryData<Types>> {
334        let mut tx = self.read().await.map_err(|err| QueryError::Error {
335            message: err.to_string(),
336        })?;
337        let share = tx.get_vid_common(block_id).await?;
338        Ok(share)
339    }
340
341    /// Get the stored VID common metadata for a given block, if one exists.
342    pub async fn get_vid_common_metadata(
343        &self,
344        block_id: BlockId<Types>,
345    ) -> QueryResult<VidCommonMetadata<Types>> {
346        let mut tx = self.read().await.map_err(|err| QueryError::Error {
347            message: err.to_string(),
348        })?;
349        let share = tx.get_vid_common_metadata(block_id).await?;
350        Ok(share)
351    }
352}
353
354pub trait Revert {
355    fn revert(&mut self);
356}
357
358impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
359where
360    Types: NodeType,
361    Header<Types>: QueryableHeader<Types>,
362    Payload<Types>: QueryablePayload<Types>,
363{
364    fn revert(&mut self) {
365        self.leaf_storage.revert_version().unwrap();
366        self.block_storage.revert_version().unwrap();
367        self.vid_storage.revert_version().unwrap();
368        self.state_cert_storage.revert_version().unwrap();
369    }
370}
371
372impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
373where
374    Types: NodeType,
375    Header<Types>: QueryableHeader<Types>,
376    Payload<Types>: QueryablePayload<Types>,
377{
378    fn revert(&mut self) {
379        // Nothing to revert for a read-only transaction.
380    }
381}
382
383#[derive(Debug)]
384pub struct Transaction<T: Revert> {
385    inner: T,
386}
387
388impl<T: Revert> Drop for Transaction<T> {
389    fn drop(&mut self) {
390        self.inner.revert();
391    }
392}
393impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
394where
395    Types: NodeType,
396    Header<Types>: QueryableHeader<Types>,
397    Payload<Types>: QueryablePayload<Types>,
398{
399    async fn commit(mut self) -> anyhow::Result<()> {
400        self.inner.leaf_storage.commit_version().await?;
401        self.inner.block_storage.commit_version().await?;
402        self.inner.vid_storage.commit_version().await?;
403        self.inner.state_cert_storage.commit_version().await?;
404        if let Some(store) = &mut self.inner.top_storage {
405            store.commit_version()?;
406        }
407        Ok(())
408    }
409
410    fn revert(self) -> impl Future + Send {
411        // The revert is handled when `self` is dropped.
412        async move {}
413    }
414}
415
416impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
417where
418    Types: NodeType,
419    Header<Types>: QueryableHeader<Types>,
420    Payload<Types>: QueryablePayload<Types>,
421{
422    async fn commit(self) -> anyhow::Result<()> {
423        // Nothing to commit for a read-only transaction.
424        Ok(())
425    }
426
427    fn revert(self) -> impl Future + Send {
428        // The revert is handled when `self` is dropped.
429        async move {}
430    }
431}
432
433impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
434where
435    Header<Types>: QueryableHeader<Types>,
436    Payload<Types>: QueryablePayload<Types>,
437{
438    type Transaction<'a>
439        = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
440    where
441        Self: 'a;
442    type ReadOnly<'a>
443        = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
444    where
445        Self: 'a;
446
447    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
448        Ok(Transaction {
449            inner: self.inner.write().await,
450        })
451    }
452
453    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
454        Ok(Transaction {
455            inner: self.inner.read().await,
456        })
457    }
458}
459fn range_iter<T>(
460    mut iter: Iter<'_, T>,
461    range: impl RangeBounds<usize>,
462) -> impl '_ + Iterator<Item = QueryResult<T>>
463where
464    T: Clone + Serialize + DeserializeOwned,
465{
466    let start = range.start_bound().cloned();
467    let end = range.end_bound().cloned();
468
469    // Advance the underlying iterator to the start of the range.
470    let pos = match start {
471        Bound::Included(n) => {
472            if n > 0 {
473                iter.nth(n - 1);
474            }
475            n
476        },
477        Bound::Excluded(n) => {
478            iter.nth(n);
479            n + 1
480        },
481        Bound::Unbounded => 0,
482    };
483
484    itertools::unfold((iter, end, pos), |(iter, end, pos)| {
485        // Check if we have reached the end of the range.
486        let reached_end = match end {
487            Bound::Included(n) => pos > n,
488            Bound::Excluded(n) => pos >= n,
489            Bound::Unbounded => false,
490        };
491        if reached_end {
492            return None;
493        }
494        let opt = iter.next()?;
495        *pos += 1;
496        Some(opt.context(MissingSnafu))
497    })
498}
499
500#[async_trait]
501impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
502where
503    Types: NodeType,
504    Payload<Types>: QueryablePayload<Types>,
505    Header<Types>: QueryableHeader<Types>,
506    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
507{
508    async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
509        let n = match id {
510            LeafId::Number(n) => n,
511            LeafId::Hash(h) => *self
512                .inner
513                .index_by_leaf_hash
514                .get(&h)
515                .context(NotFoundSnafu)? as usize,
516        };
517        self.inner
518            .leaf_storage
519            .iter()
520            .nth(n)
521            .context(NotFoundSnafu)?
522            .context(MissingSnafu)
523    }
524
525    async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
526        self.inner.get_block(id)
527    }
528
529    async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
530        self.inner.get_header(id)
531    }
532
533    async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
534        self.get_block(id).await.map(PayloadQueryData::from)
535    }
536
537    async fn get_payload_metadata(
538        &mut self,
539        id: BlockId<Types>,
540    ) -> QueryResult<PayloadMetadata<Types>> {
541        self.get_block(id).await.map(PayloadMetadata::from)
542    }
543
544    async fn get_vid_common(
545        &mut self,
546        id: BlockId<Types>,
547    ) -> QueryResult<VidCommonQueryData<Types>> {
548        Ok(self
549            .inner
550            .vid_storage
551            .iter()
552            .nth(self.inner.get_block_index(id)?)
553            .context(NotFoundSnafu)?
554            .context(MissingSnafu)?
555            .0)
556    }
557
558    async fn get_vid_common_metadata(
559        &mut self,
560        id: BlockId<Types>,
561    ) -> QueryResult<VidCommonMetadata<Types>> {
562        self.get_vid_common(id).await.map(VidCommonMetadata::from)
563    }
564
565    async fn get_leaf_range<R>(
566        &mut self,
567        range: R,
568    ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
569    where
570        R: RangeBounds<usize> + Send,
571    {
572        Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
573    }
574
575    async fn get_block_range<R>(
576        &mut self,
577        range: R,
578    ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
579    where
580        R: RangeBounds<usize> + Send,
581    {
582        self.inner.get_block_range(range)
583    }
584
585    async fn get_payload_range<R>(
586        &mut self,
587        range: R,
588    ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
589    where
590        R: RangeBounds<usize> + Send,
591    {
592        Ok(range_iter(self.inner.block_storage.iter(), range)
593            .map(|res| res.map(PayloadQueryData::from))
594            .collect())
595    }
596
597    async fn get_payload_metadata_range<R>(
598        &mut self,
599        range: R,
600    ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
601    where
602        R: RangeBounds<usize> + Send + 'static,
603    {
604        Ok(range_iter(self.inner.block_storage.iter(), range)
605            .map(|res| res.map(PayloadMetadata::from))
606            .collect())
607    }
608
609    async fn get_vid_common_range<R>(
610        &mut self,
611        range: R,
612    ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
613    where
614        R: RangeBounds<usize> + Send,
615    {
616        Ok(range_iter(self.inner.vid_storage.iter(), range)
617            .map(|res| res.map(|(common, _)| common))
618            .collect())
619    }
620
621    async fn get_vid_common_metadata_range<R>(
622        &mut self,
623        range: R,
624    ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
625    where
626        R: RangeBounds<usize> + Send,
627    {
628        Ok(range_iter(self.inner.vid_storage.iter(), range)
629            .map(|res| res.map(|(common, _)| common.into()))
630            .collect())
631    }
632
633    async fn get_transaction(
634        &mut self,
635        hash: TransactionHash<Types>,
636    ) -> QueryResult<TransactionQueryData<Types>> {
637        let height = self
638            .inner
639            .index_by_txn_hash
640            .get(&hash)
641            .context(NotFoundSnafu)?;
642        let block = self.inner.get_block((*height as usize).into())?;
643        TransactionQueryData::with_hash(&block, hash).context(ErrorSnafu {
644            message: format!(
645                "transaction index inconsistent: block {height} contains no transaction {hash}"
646            ),
647        })
648    }
649
650    async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
651        // The file system backend doesn't index by whether a leaf is present, so we can't
652        // efficiently seek to the first leaf with height >= `from`. Our best effort is to return
653        // `from` itself if we can, or fail.
654        self.get_leaf((from as usize).into()).await
655    }
656
657    async fn get_state_cert(&mut self, epoch: u64) -> QueryResult<StateCertQueryData<Types>> {
658        self.inner
659            .state_cert_storage
660            .iter()
661            .nth(epoch as usize)
662            .context(NotFoundSnafu)?
663            .context(MissingSnafu)
664    }
665}
666
667impl<Types: NodeType> UpdateAvailabilityStorage<Types>
668    for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
669where
670    Payload<Types>: QueryablePayload<Types>,
671    Header<Types>: QueryableHeader<Types>,
672{
673    async fn insert_leaf(&mut self, leaf: LeafQueryData<Types>) -> anyhow::Result<()> {
674        self.inner
675            .leaf_storage
676            .insert(leaf.height() as usize, leaf.clone())?;
677        self.inner
678            .index_by_leaf_hash
679            .insert(leaf.hash(), leaf.height());
680        update_index_by_hash(
681            &mut self.inner.index_by_block_hash,
682            leaf.block_hash(),
683            leaf.height(),
684        );
685        update_index_by_hash(
686            &mut self.inner.index_by_payload_hash,
687            leaf.payload_hash(),
688            leaf.height(),
689        );
690        self.inner
691            .index_by_time
692            .entry(leaf.header().timestamp())
693            .or_default()
694            .push(leaf.height());
695        Ok(())
696    }
697
698    async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
699        if !self
700            .inner
701            .block_storage
702            .insert(block.height() as usize, block.clone())?
703        {
704            // The block was already present.
705            return Ok(());
706        }
707        self.inner.num_transactions += block.len();
708        self.inner.payload_size += block.size() as usize;
709        for (_, txn) in block.enumerate() {
710            update_index_by_hash(
711                &mut self.inner.index_by_txn_hash,
712                txn.commit(),
713                block.height(),
714            );
715        }
716        Ok(())
717    }
718
719    async fn insert_vid(
720        &mut self,
721        common: VidCommonQueryData<Types>,
722        share: Option<VidShare>,
723    ) -> anyhow::Result<()> {
724        self.inner
725            .vid_storage
726            .insert(common.height() as usize, (common, share))?;
727        Ok(())
728    }
729
730    async fn insert_state_cert(
731        &mut self,
732        state_cert: StateCertQueryData<Types>,
733    ) -> anyhow::Result<()> {
734        self.inner
735            .state_cert_storage
736            .insert(state_cert.0.epoch.u64() as usize, state_cert)?;
737        Ok(())
738    }
739}
740
741/// Update an index mapping hashes of objects to their positions in the ledger.
742///
743/// This function will insert the mapping from `hash` to `pos` into `index`, _unless_ there is
744/// already an entry for `hash` at an earlier position in the ledger.
745fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
746    match index.entry(hash) {
747        Entry::Occupied(mut e) => {
748            if &pos < e.get() {
749                // Overwrite the existing entry if the new object was sequenced first.
750                e.insert(pos);
751            }
752        },
753        Entry::Vacant(e) => {
754            e.insert(pos);
755        },
756    }
757}
758
759#[async_trait]
760impl<Types, T> NodeStorage<Types> for Transaction<T>
761where
762    Types: NodeType,
763    Payload<Types>: QueryablePayload<Types>,
764    Header<Types>: QueryableHeader<Types>,
765    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send,
766{
767    async fn block_height(&mut self) -> QueryResult<usize> {
768        Ok(self.inner.leaf_storage.iter().len())
769    }
770
771    async fn count_transactions_in_range(
772        &mut self,
773        range: impl RangeBounds<usize> + Send,
774        namespace: Option<NamespaceId<Types>>,
775    ) -> QueryResult<usize> {
776        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
777            || !matches!(range.end_bound(), Bound::Unbounded)
778        {
779            return Err(QueryError::Error {
780                message: "partial aggregates are not supported with file system backend".into(),
781            });
782        }
783
784        if namespace.is_some() {
785            return Err(QueryError::Error {
786                message: "file system does not support per-namespace stats".into(),
787            });
788        }
789
790        Ok(self.inner.num_transactions)
791    }
792
793    async fn payload_size_in_range(
794        &mut self,
795        range: impl RangeBounds<usize> + Send,
796        namespace: Option<NamespaceId<Types>>,
797    ) -> QueryResult<usize> {
798        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
799            || !matches!(range.end_bound(), Bound::Unbounded)
800        {
801            return Err(QueryError::Error {
802                message: "partial aggregates are not supported with file system backend".into(),
803            });
804        }
805
806        if namespace.is_some() {
807            return Err(QueryError::Error {
808                message: "file system does not support per-namespace stats".into(),
809            });
810        }
811
812        Ok(self.inner.payload_size)
813    }
814
815    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
816    where
817        ID: Into<BlockId<Types>> + Send + Sync,
818    {
819        self.inner
820            .vid_storage
821            .iter()
822            .nth(self.inner.get_block_index(id.into())?)
823            .context(NotFoundSnafu)?
824            .context(MissingSnafu)?
825            .1
826            .context(MissingSnafu)
827    }
828
829    async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
830        let height = self.inner.leaf_storage.iter().len();
831
832        // The number of missing VID common is just the number of completely missing VID
833        // entries, since every entry we have is guaranteed to have the common data.
834        let missing_vid = self.inner.vid_storage.missing(height);
835        // Missing shares includes the completely missing VID entries, plus any entry which
836        // is _not_ messing but which has a null share.
837        let null_vid_shares: usize = self
838            .inner
839            .vid_storage
840            .iter()
841            .map(|res| if matches!(res, Some((_, None))) { 1 } else { 0 })
842            .sum();
843        Ok(SyncStatus {
844            missing_blocks: self.inner.block_storage.missing(height),
845            missing_leaves: self.inner.leaf_storage.missing(height),
846            missing_vid_common: missing_vid,
847            missing_vid_shares: missing_vid + null_vid_shares,
848            pruned_height: None,
849        })
850    }
851
852    async fn get_header_window(
853        &mut self,
854        start: impl Into<WindowStart<Types>> + Send + Sync,
855        end: u64,
856        limit: usize,
857    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
858        let first_block = match start.into() {
859            WindowStart::Height(h) => h,
860            WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
861            WindowStart::Time(t) => {
862                // Find the minimum timestamp which is at least `t`, and all the blocks with
863                // that timestamp.
864                let blocks = self
865                    .inner
866                    .index_by_time
867                    .range(t..)
868                    .next()
869                    .context(NotFoundSnafu)?
870                    .1;
871                // Multiple blocks can have the same timestamp (when truncated to seconds);
872                // we want the first one. It is an invariant that any timestamp which has an
873                // entry in `index_by_time` has a non-empty list associated with it, so this
874                // indexing is safe.
875                blocks[0]
876            },
877        } as usize;
878
879        let mut res = TimeWindowQueryData::default();
880
881        // Include the block just before the start of the window, if there is one.
882        if first_block > 0 {
883            res.prev = Some(self.inner.get_header((first_block - 1).into())?);
884        }
885
886        // Add blocks to the window, starting from `first_block`, until we reach the end of
887        // the requested time window.
888        for block in self.inner.get_block_range(first_block..)? {
889            let header = block?.header().clone();
890            if header.timestamp() >= end {
891                res.next = Some(header);
892                break;
893            }
894            res.window.push(header);
895            if res.window.len() >= limit {
896                break;
897            }
898        }
899
900        Ok(res)
901    }
902}
903
904impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
905where
906    Types: NodeType,
907    Header<Types>: QueryableHeader<Types>,
908{
909    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
910        Ok(0)
911    }
912
913    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
914        Ok(None)
915    }
916}
917
918impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
919where
920    Types: NodeType,
921    Header<Types>: QueryableHeader<Types>,
922{
923    async fn update_aggregates(
924        &mut self,
925        _prev: Aggregate<Types>,
926        _blocks: &[PayloadMetadata<Types>],
927    ) -> anyhow::Result<Aggregate<Types>> {
928        Ok(Aggregate::default())
929    }
930}
931
932impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
933
934impl<Types> HasMetrics for FileSystemStorage<Types>
935where
936    Types: NodeType,
937    Header<Types>: QueryableHeader<Types>,
938    Payload<Types>: QueryablePayload<Types>,
939{
940    fn metrics(&self) -> &PrometheusMetrics {
941        &self.metrics
942    }
943}