hotshot_query_service/data_source/storage/
fs.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{
16    collections::{
17        hash_map::{Entry, HashMap},
18        BTreeMap,
19    },
20    hash::Hash,
21    ops::{Bound, Deref, RangeBounds},
22    path::Path,
23};
24
25use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
26use async_trait::async_trait;
27use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
28use committable::Committable;
29use futures::future::Future;
30use hotshot_types::{
31    data::{VidCommitment, VidShare},
32    traits::{
33        block_contents::BlockHeader,
34        node_implementation::{ConsensusTime, NodeType},
35    },
36};
37use serde::{de::DeserializeOwned, Serialize};
38use snafu::OptionExt;
39
40use super::{
41    ledger_log::{Iter, LedgerLog},
42    pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
43    sql::MigrateTypes,
44    Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
45    UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
46};
47use crate::{
48    availability::{
49        data_source::{BlockId, LeafId},
50        query_data::{
51            BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52            QueryablePayload, TransactionHash, VidCommonQueryData,
53        },
54        NamespaceId, StateCertQueryDataV2,
55    },
56    data_source::{update, VersionedDataSource},
57    metrics::PrometheusMetrics,
58    node::{SyncStatus, TimeWindowQueryData, WindowStart},
59    status::HasMetrics,
60    types::HeightIndexed,
61    Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
62};
63
64const CACHED_LEAVES_COUNT: usize = 100;
65const CACHED_BLOCKS_COUNT: usize = 100;
66const CACHED_VID_COMMON_COUNT: usize = 100;
67const CACHED_STATE_CERT_COUNT: usize = 5;
68
69#[derive(custom_debug::Debug)]
70pub struct FileSystemStorageInner<Types>
71where
72    Types: NodeType,
73    Header<Types>: QueryableHeader<Types>,
74    Payload<Types>: QueryablePayload<Types>,
75{
76    index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
77    index_by_block_hash: HashMap<BlockHash<Types>, u64>,
78    index_by_payload_hash: HashMap<VidCommitment, u64>,
79    index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
80    index_by_time: BTreeMap<u64, Vec<u64>>,
81    num_transactions: usize,
82    payload_size: usize,
83    #[debug(skip)]
84    top_storage: Option<AtomicStore>,
85    leaf_storage: LedgerLog<LeafQueryData<Types>>,
86    block_storage: LedgerLog<BlockQueryData<Types>>,
87    vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
88    state_cert_storage: LedgerLog<StateCertQueryDataV2<Types>>,
89}
90
91impl<Types> FileSystemStorageInner<Types>
92where
93    Types: NodeType,
94    Header<Types>: QueryableHeader<Types>,
95    Payload<Types>: QueryablePayload<Types>,
96{
97    fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
98        match id {
99            BlockId::Number(n) => Ok(n),
100            BlockId::Hash(h) => {
101                Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
102            },
103            BlockId::PayloadHash(h) => {
104                Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
105            },
106        }
107    }
108
109    fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
110        self.block_storage
111            .iter()
112            .nth(self.get_block_index(id)?)
113            .context(NotFoundSnafu)?
114            .context(MissingSnafu)
115    }
116
117    fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
118        self.get_block(id).map(|block| block.header)
119    }
120
121    fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
122    where
123        R: RangeBounds<usize> + Send,
124    {
125        Ok(range_iter(self.block_storage.iter(), range).collect())
126    }
127}
128
129/// Storage for the APIs provided in this crate, backed by a remote PostgreSQL database.
130#[derive(Debug)]
131pub struct FileSystemStorage<Types: NodeType>
132where
133    Header<Types>: QueryableHeader<Types>,
134    Payload<Types>: QueryablePayload<Types>,
135{
136    inner: RwLock<FileSystemStorageInner<Types>>,
137    metrics: PrometheusMetrics,
138}
139
140impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
141where
142    Header<Types>: QueryableHeader<Types>,
143    Payload<Types>: QueryablePayload<Types>,
144{
145}
146impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
147where
148    Header<Types>: QueryableHeader<Types>,
149    Payload<Types>: QueryablePayload<Types>,
150{
151    type Pruner = ();
152}
153
154#[async_trait]
155impl<Types: NodeType> MigrateTypes<Types> for FileSystemStorage<Types>
156where
157    Header<Types>: QueryableHeader<Types>,
158    Payload<Types>: QueryablePayload<Types>,
159{
160    async fn migrate_types(&self, _batch_size: u64) -> anyhow::Result<()> {
161        Ok(())
162    }
163}
164
165impl<Types: NodeType> FileSystemStorage<Types>
166where
167    Payload<Types>: QueryablePayload<Types>,
168    Header<Types>: QueryableHeader<Types>,
169{
170    /// Create a new [FileSystemStorage] with storage at `path`.
171    ///
172    /// If there is already data at `path`, it will be archived.
173    ///
174    /// The [FileSystemStorage] will manage its own persistence synchronization.
175    pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
176        let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
177        loader.retain_archives(1);
178        let data_source = Self::create_with_store(&mut loader).await?;
179        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
180        Ok(data_source)
181    }
182
183    /// Open an existing [FileSystemStorage] from storage at `path`.
184    ///
185    /// If there is no data at `path`, a new store will be created.
186    ///
187    /// The [FileSystemStorage] will manage its own persistence synchronization.
188    pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
189        let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
190        loader.retain_archives(1);
191        let data_source = Self::open_with_store(&mut loader).await?;
192        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
193        Ok(data_source)
194    }
195
196    /// Create a new [FileSystemStorage] using a persistent storage loader.
197    ///
198    /// If there is existing data corresponding to the [FileSystemStorage] data structures, it will
199    /// be archived.
200    ///
201    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
202    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
203    /// synchronization of the store.
204    pub async fn create_with_store(
205        loader: &mut AtomicStoreLoader,
206    ) -> Result<Self, PersistenceError> {
207        Ok(Self {
208            inner: RwLock::new(FileSystemStorageInner {
209                index_by_leaf_hash: Default::default(),
210                index_by_block_hash: Default::default(),
211                index_by_payload_hash: Default::default(),
212                index_by_txn_hash: Default::default(),
213                index_by_time: Default::default(),
214                num_transactions: 0,
215                payload_size: 0,
216                top_storage: None,
217                leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
218                block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
219                vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
220                state_cert_storage: LedgerLog::create(
221                    loader,
222                    "state_cert",
223                    CACHED_STATE_CERT_COUNT,
224                )?,
225            }),
226            metrics: Default::default(),
227        })
228    }
229
230    /// Open an existing [FileSystemStorage] using a persistent storage loader.
231    ///
232    /// If there is no existing data corresponding to the [FileSystemStorage] data structures, a new
233    /// store will be created.
234    ///
235    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
236    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
237    /// synchronization of the store.
238    pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
239        let leaf_storage =
240            LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
241        let block_storage =
242            LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
243        let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
244            loader,
245            "vid_common",
246            CACHED_VID_COMMON_COUNT,
247        )?;
248        let state_cert_storage = LedgerLog::<StateCertQueryDataV2<Types>>::open(
249            loader,
250            "state_cert",
251            CACHED_STATE_CERT_COUNT,
252        )?;
253
254        let mut index_by_block_hash = HashMap::new();
255        let mut index_by_payload_hash = HashMap::new();
256        let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
257        let index_by_leaf_hash = leaf_storage
258            .iter()
259            .flatten()
260            .map(|leaf| {
261                update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
262                update_index_by_hash(
263                    &mut index_by_payload_hash,
264                    leaf.payload_hash(),
265                    leaf.height(),
266                );
267                index_by_time
268                    .entry(leaf.header().timestamp())
269                    .or_default()
270                    .push(leaf.height());
271                (leaf.hash(), leaf.height())
272            })
273            .collect();
274
275        let mut index_by_txn_hash = HashMap::new();
276        let mut num_transactions = 0;
277        let mut payload_size = 0;
278        for block in block_storage.iter().flatten() {
279            num_transactions += block.len();
280            payload_size += block.size() as usize;
281
282            let height = block.height();
283            for (_, txn) in block.enumerate() {
284                update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
285            }
286        }
287
288        Ok(Self {
289            inner: RwLock::new(FileSystemStorageInner {
290                index_by_leaf_hash,
291                index_by_block_hash,
292                index_by_payload_hash,
293                index_by_txn_hash,
294                index_by_time,
295                num_transactions,
296                payload_size,
297                leaf_storage,
298                block_storage,
299                vid_storage,
300                state_cert_storage,
301                top_storage: None,
302            }),
303            metrics: Default::default(),
304        })
305    }
306
307    /// Advance the version of the persistent store without committing changes to persistent state.
308    pub async fn skip_version(&self) -> Result<(), PersistenceError> {
309        let mut inner = self.inner.write().await;
310        inner.leaf_storage.skip_version()?;
311        inner.block_storage.skip_version()?;
312        inner.vid_storage.skip_version()?;
313        inner.state_cert_storage.skip_version()?;
314        if let Some(store) = &mut inner.top_storage {
315            store.commit_version()?;
316        }
317        Ok(())
318    }
319
320    /// Get the stored VID share for a given block, if one exists.
321    pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
322        let mut tx = self.read().await.map_err(|err| QueryError::Error {
323            message: err.to_string(),
324        })?;
325        let share = tx.vid_share(block_id).await?;
326        Ok(share)
327    }
328
329    /// Get the stored VID common data for a given block, if one exists.
330    pub async fn get_vid_common(
331        &self,
332        block_id: BlockId<Types>,
333    ) -> QueryResult<VidCommonQueryData<Types>> {
334        let mut tx = self.read().await.map_err(|err| QueryError::Error {
335            message: err.to_string(),
336        })?;
337        let share = tx.get_vid_common(block_id).await?;
338        Ok(share)
339    }
340
341    /// Get the stored VID common metadata for a given block, if one exists.
342    pub async fn get_vid_common_metadata(
343        &self,
344        block_id: BlockId<Types>,
345    ) -> QueryResult<VidCommonMetadata<Types>> {
346        let mut tx = self.read().await.map_err(|err| QueryError::Error {
347            message: err.to_string(),
348        })?;
349        let share = tx.get_vid_common_metadata(block_id).await?;
350        Ok(share)
351    }
352}
353
354pub trait Revert {
355    fn revert(&mut self);
356}
357
358impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
359where
360    Types: NodeType,
361    Header<Types>: QueryableHeader<Types>,
362    Payload<Types>: QueryablePayload<Types>,
363{
364    fn revert(&mut self) {
365        self.leaf_storage.revert_version().unwrap();
366        self.block_storage.revert_version().unwrap();
367        self.vid_storage.revert_version().unwrap();
368        self.state_cert_storage.revert_version().unwrap();
369    }
370}
371
372impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
373where
374    Types: NodeType,
375    Header<Types>: QueryableHeader<Types>,
376    Payload<Types>: QueryablePayload<Types>,
377{
378    fn revert(&mut self) {
379        // Nothing to revert for a read-only transaction.
380    }
381}
382
383#[derive(Debug)]
384pub struct Transaction<T: Revert> {
385    inner: T,
386}
387
388impl<T: Revert> Drop for Transaction<T> {
389    fn drop(&mut self) {
390        self.inner.revert();
391    }
392}
393impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
394where
395    Types: NodeType,
396    Header<Types>: QueryableHeader<Types>,
397    Payload<Types>: QueryablePayload<Types>,
398{
399    async fn commit(mut self) -> anyhow::Result<()> {
400        self.inner.leaf_storage.commit_version().await?;
401        self.inner.block_storage.commit_version().await?;
402        self.inner.vid_storage.commit_version().await?;
403        self.inner.state_cert_storage.commit_version().await?;
404        if let Some(store) = &mut self.inner.top_storage {
405            store.commit_version()?;
406        }
407        Ok(())
408    }
409
410    fn revert(self) -> impl Future + Send {
411        // The revert is handled when `self` is dropped.
412        async move {}
413    }
414}
415
416impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
417where
418    Types: NodeType,
419    Header<Types>: QueryableHeader<Types>,
420    Payload<Types>: QueryablePayload<Types>,
421{
422    async fn commit(self) -> anyhow::Result<()> {
423        // Nothing to commit for a read-only transaction.
424        Ok(())
425    }
426
427    fn revert(self) -> impl Future + Send {
428        // The revert is handled when `self` is dropped.
429        async move {}
430    }
431}
432
433impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
434where
435    Header<Types>: QueryableHeader<Types>,
436    Payload<Types>: QueryablePayload<Types>,
437{
438    type Transaction<'a>
439        = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
440    where
441        Self: 'a;
442    type ReadOnly<'a>
443        = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
444    where
445        Self: 'a;
446
447    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
448        Ok(Transaction {
449            inner: self.inner.write().await,
450        })
451    }
452
453    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
454        Ok(Transaction {
455            inner: self.inner.read().await,
456        })
457    }
458}
459fn range_iter<T>(
460    mut iter: Iter<'_, T>,
461    range: impl RangeBounds<usize>,
462) -> impl '_ + Iterator<Item = QueryResult<T>>
463where
464    T: Clone + Serialize + DeserializeOwned,
465{
466    let start = range.start_bound().cloned();
467    let end = range.end_bound().cloned();
468
469    // Advance the underlying iterator to the start of the range.
470    let pos = match start {
471        Bound::Included(n) => {
472            if n > 0 {
473                iter.nth(n - 1);
474            }
475            n
476        },
477        Bound::Excluded(n) => {
478            iter.nth(n);
479            n + 1
480        },
481        Bound::Unbounded => 0,
482    };
483
484    itertools::unfold((iter, end, pos), |(iter, end, pos)| {
485        // Check if we have reached the end of the range.
486        let reached_end = match end {
487            Bound::Included(n) => pos > n,
488            Bound::Excluded(n) => pos >= n,
489            Bound::Unbounded => false,
490        };
491        if reached_end {
492            return None;
493        }
494        let opt = iter.next()?;
495        *pos += 1;
496        Some(opt.context(MissingSnafu))
497    })
498}
499
500#[async_trait]
501impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
502where
503    Types: NodeType,
504    Payload<Types>: QueryablePayload<Types>,
505    Header<Types>: QueryableHeader<Types>,
506    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
507{
508    async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
509        let n = match id {
510            LeafId::Number(n) => n,
511            LeafId::Hash(h) => *self
512                .inner
513                .index_by_leaf_hash
514                .get(&h)
515                .context(NotFoundSnafu)? as usize,
516        };
517        self.inner
518            .leaf_storage
519            .iter()
520            .nth(n)
521            .context(NotFoundSnafu)?
522            .context(MissingSnafu)
523    }
524
525    async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
526        self.inner.get_block(id)
527    }
528
529    async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
530        self.inner.get_header(id)
531    }
532
533    async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
534        self.get_block(id).await.map(PayloadQueryData::from)
535    }
536
537    async fn get_payload_metadata(
538        &mut self,
539        id: BlockId<Types>,
540    ) -> QueryResult<PayloadMetadata<Types>> {
541        self.get_block(id).await.map(PayloadMetadata::from)
542    }
543
544    async fn get_vid_common(
545        &mut self,
546        id: BlockId<Types>,
547    ) -> QueryResult<VidCommonQueryData<Types>> {
548        Ok(self
549            .inner
550            .vid_storage
551            .iter()
552            .nth(self.inner.get_block_index(id)?)
553            .context(NotFoundSnafu)?
554            .context(MissingSnafu)?
555            .0)
556    }
557
558    async fn get_vid_common_metadata(
559        &mut self,
560        id: BlockId<Types>,
561    ) -> QueryResult<VidCommonMetadata<Types>> {
562        self.get_vid_common(id).await.map(VidCommonMetadata::from)
563    }
564
565    async fn get_leaf_range<R>(
566        &mut self,
567        range: R,
568    ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
569    where
570        R: RangeBounds<usize> + Send,
571    {
572        Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
573    }
574
575    async fn get_block_range<R>(
576        &mut self,
577        range: R,
578    ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
579    where
580        R: RangeBounds<usize> + Send,
581    {
582        self.inner.get_block_range(range)
583    }
584
585    async fn get_payload_range<R>(
586        &mut self,
587        range: R,
588    ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
589    where
590        R: RangeBounds<usize> + Send,
591    {
592        Ok(range_iter(self.inner.block_storage.iter(), range)
593            .map(|res| res.map(PayloadQueryData::from))
594            .collect())
595    }
596
597    async fn get_payload_metadata_range<R>(
598        &mut self,
599        range: R,
600    ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
601    where
602        R: RangeBounds<usize> + Send + 'static,
603    {
604        Ok(range_iter(self.inner.block_storage.iter(), range)
605            .map(|res| res.map(PayloadMetadata::from))
606            .collect())
607    }
608
609    async fn get_vid_common_range<R>(
610        &mut self,
611        range: R,
612    ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
613    where
614        R: RangeBounds<usize> + Send,
615    {
616        Ok(range_iter(self.inner.vid_storage.iter(), range)
617            .map(|res| res.map(|(common, _)| common))
618            .collect())
619    }
620
621    async fn get_vid_common_metadata_range<R>(
622        &mut self,
623        range: R,
624    ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
625    where
626        R: RangeBounds<usize> + Send,
627    {
628        Ok(range_iter(self.inner.vid_storage.iter(), range)
629            .map(|res| res.map(|(common, _)| common.into()))
630            .collect())
631    }
632
633    async fn get_block_with_transaction(
634        &mut self,
635        hash: TransactionHash<Types>,
636    ) -> QueryResult<BlockQueryData<Types>> {
637        let height = self
638            .inner
639            .index_by_txn_hash
640            .get(&hash)
641            .context(NotFoundSnafu)?;
642        self.inner.get_block((*height as usize).into())
643    }
644
645    async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
646        // The file system backend doesn't index by whether a leaf is present, so we can't
647        // efficiently seek to the first leaf with height >= `from`. Our best effort is to return
648        // `from` itself if we can, or fail.
649        self.get_leaf((from as usize).into()).await
650    }
651
652    async fn get_state_cert(&mut self, epoch: u64) -> QueryResult<StateCertQueryDataV2<Types>> {
653        self.inner
654            .state_cert_storage
655            .iter()
656            .nth(epoch as usize)
657            .context(NotFoundSnafu)?
658            .context(MissingSnafu)
659    }
660}
661
662impl<Types: NodeType> UpdateAvailabilityStorage<Types>
663    for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
664where
665    Payload<Types>: QueryablePayload<Types>,
666    Header<Types>: QueryableHeader<Types>,
667{
668    async fn insert_leaf(&mut self, leaf: LeafQueryData<Types>) -> anyhow::Result<()> {
669        self.inner
670            .leaf_storage
671            .insert(leaf.height() as usize, leaf.clone())?;
672        self.inner
673            .index_by_leaf_hash
674            .insert(leaf.hash(), leaf.height());
675        update_index_by_hash(
676            &mut self.inner.index_by_block_hash,
677            leaf.block_hash(),
678            leaf.height(),
679        );
680        update_index_by_hash(
681            &mut self.inner.index_by_payload_hash,
682            leaf.payload_hash(),
683            leaf.height(),
684        );
685        self.inner
686            .index_by_time
687            .entry(leaf.header().timestamp())
688            .or_default()
689            .push(leaf.height());
690        Ok(())
691    }
692
693    async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
694        if !self
695            .inner
696            .block_storage
697            .insert(block.height() as usize, block.clone())?
698        {
699            // The block was already present.
700            return Ok(());
701        }
702        self.inner.num_transactions += block.len();
703        self.inner.payload_size += block.size() as usize;
704        for (_, txn) in block.enumerate() {
705            update_index_by_hash(
706                &mut self.inner.index_by_txn_hash,
707                txn.commit(),
708                block.height(),
709            );
710        }
711        Ok(())
712    }
713
714    async fn insert_vid(
715        &mut self,
716        common: VidCommonQueryData<Types>,
717        share: Option<VidShare>,
718    ) -> anyhow::Result<()> {
719        self.inner
720            .vid_storage
721            .insert(common.height() as usize, (common, share))?;
722        Ok(())
723    }
724
725    async fn insert_state_cert(
726        &mut self,
727        state_cert: StateCertQueryDataV2<Types>,
728    ) -> anyhow::Result<()> {
729        self.inner
730            .state_cert_storage
731            .insert(state_cert.0.epoch.u64() as usize, state_cert)?;
732        Ok(())
733    }
734}
735
736/// Update an index mapping hashes of objects to their positions in the ledger.
737///
738/// This function will insert the mapping from `hash` to `pos` into `index`, _unless_ there is
739/// already an entry for `hash` at an earlier position in the ledger.
740fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
741    match index.entry(hash) {
742        Entry::Occupied(mut e) => {
743            if &pos < e.get() {
744                // Overwrite the existing entry if the new object was sequenced first.
745                e.insert(pos);
746            }
747        },
748        Entry::Vacant(e) => {
749            e.insert(pos);
750        },
751    }
752}
753
754#[async_trait]
755impl<Types, T> NodeStorage<Types> for Transaction<T>
756where
757    Types: NodeType,
758    Payload<Types>: QueryablePayload<Types>,
759    Header<Types>: QueryableHeader<Types>,
760    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send,
761{
762    async fn block_height(&mut self) -> QueryResult<usize> {
763        Ok(self.inner.leaf_storage.iter().len())
764    }
765
766    async fn count_transactions_in_range(
767        &mut self,
768        range: impl RangeBounds<usize> + Send,
769        namespace: Option<NamespaceId<Types>>,
770    ) -> QueryResult<usize> {
771        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
772            || !matches!(range.end_bound(), Bound::Unbounded)
773        {
774            return Err(QueryError::Error {
775                message: "partial aggregates are not supported with file system backend".into(),
776            });
777        }
778
779        if namespace.is_some() {
780            return Err(QueryError::Error {
781                message: "file system does not support per-namespace stats".into(),
782            });
783        }
784
785        Ok(self.inner.num_transactions)
786    }
787
788    async fn payload_size_in_range(
789        &mut self,
790        range: impl RangeBounds<usize> + Send,
791        namespace: Option<NamespaceId<Types>>,
792    ) -> QueryResult<usize> {
793        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
794            || !matches!(range.end_bound(), Bound::Unbounded)
795        {
796            return Err(QueryError::Error {
797                message: "partial aggregates are not supported with file system backend".into(),
798            });
799        }
800
801        if namespace.is_some() {
802            return Err(QueryError::Error {
803                message: "file system does not support per-namespace stats".into(),
804            });
805        }
806
807        Ok(self.inner.payload_size)
808    }
809
810    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
811    where
812        ID: Into<BlockId<Types>> + Send + Sync,
813    {
814        self.inner
815            .vid_storage
816            .iter()
817            .nth(self.inner.get_block_index(id.into())?)
818            .context(NotFoundSnafu)?
819            .context(MissingSnafu)?
820            .1
821            .context(MissingSnafu)
822    }
823
824    async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
825        let height = self.inner.leaf_storage.iter().len();
826
827        // The number of missing VID common is just the number of completely missing VID
828        // entries, since every entry we have is guaranteed to have the common data.
829        let missing_vid = self.inner.vid_storage.missing(height);
830        // Missing shares includes the completely missing VID entries, plus any entry which
831        // is _not_ messing but which has a null share.
832        let null_vid_shares: usize = self
833            .inner
834            .vid_storage
835            .iter()
836            .map(|res| if matches!(res, Some((_, None))) { 1 } else { 0 })
837            .sum();
838        Ok(SyncStatus {
839            missing_blocks: self.inner.block_storage.missing(height),
840            missing_leaves: self.inner.leaf_storage.missing(height),
841            missing_vid_common: missing_vid,
842            missing_vid_shares: missing_vid + null_vid_shares,
843            pruned_height: None,
844        })
845    }
846
847    async fn get_header_window(
848        &mut self,
849        start: impl Into<WindowStart<Types>> + Send + Sync,
850        end: u64,
851        limit: usize,
852    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
853        let first_block = match start.into() {
854            WindowStart::Height(h) => h,
855            WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
856            WindowStart::Time(t) => {
857                // Find the minimum timestamp which is at least `t`, and all the blocks with
858                // that timestamp.
859                let blocks = self
860                    .inner
861                    .index_by_time
862                    .range(t..)
863                    .next()
864                    .context(NotFoundSnafu)?
865                    .1;
866                // Multiple blocks can have the same timestamp (when truncated to seconds);
867                // we want the first one. It is an invariant that any timestamp which has an
868                // entry in `index_by_time` has a non-empty list associated with it, so this
869                // indexing is safe.
870                blocks[0]
871            },
872        } as usize;
873
874        let mut res = TimeWindowQueryData::default();
875
876        // Include the block just before the start of the window, if there is one.
877        if first_block > 0 {
878            res.prev = Some(self.inner.get_header((first_block - 1).into())?);
879        }
880
881        // Add blocks to the window, starting from `first_block`, until we reach the end of
882        // the requested time window.
883        for block in self.inner.get_block_range(first_block..)? {
884            let header = block?.header().clone();
885            if header.timestamp() >= end {
886                res.next = Some(header);
887                break;
888            }
889            res.window.push(header);
890            if res.window.len() >= limit {
891                break;
892            }
893        }
894
895        Ok(res)
896    }
897}
898
899impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
900where
901    Types: NodeType,
902    Header<Types>: QueryableHeader<Types>,
903{
904    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
905        Ok(0)
906    }
907
908    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
909        Ok(None)
910    }
911}
912
913impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
914where
915    Types: NodeType,
916    Header<Types>: QueryableHeader<Types>,
917{
918    async fn update_aggregates(
919        &mut self,
920        _prev: Aggregate<Types>,
921        _blocks: &[PayloadMetadata<Types>],
922    ) -> anyhow::Result<Aggregate<Types>> {
923        Ok(Aggregate::default())
924    }
925}
926
927impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
928
929impl<Types> HasMetrics for FileSystemStorage<Types>
930where
931    Types: NodeType,
932    Header<Types>: QueryableHeader<Types>,
933    Payload<Types>: QueryablePayload<Types>,
934{
935    fn metrics(&self) -> &PrometheusMetrics {
936        &self.metrics
937    }
938}