hotshot_query_service/data_source/storage/
fs.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{
16    collections::{
17        hash_map::{Entry, HashMap},
18        BTreeMap,
19    },
20    hash::Hash,
21    ops::{Bound, Deref, RangeBounds},
22    path::Path,
23};
24
25use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
26use async_trait::async_trait;
27use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
28use committable::Committable;
29use futures::future::Future;
30use hotshot_types::{
31    data::{VidCommitment, VidShare},
32    simple_certificate::CertificatePair,
33    traits::{block_contents::BlockHeader, node_implementation::NodeType},
34};
35use serde::{de::DeserializeOwned, Serialize};
36use snafu::OptionExt;
37
38use super::{
39    ledger_log::{Iter, LedgerLog},
40    pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
41    sql::MigrateTypes,
42    Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
43    UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
44};
45use crate::{
46    availability::{
47        data_source::{BlockId, LeafId},
48        query_data::{
49            BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
50            QueryablePayload, TransactionHash, VidCommonQueryData,
51        },
52        NamespaceId,
53    },
54    data_source::{update, VersionedDataSource},
55    metrics::PrometheusMetrics,
56    node::{SyncStatus, TimeWindowQueryData, WindowStart},
57    status::HasMetrics,
58    types::HeightIndexed,
59    Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
60};
61
62const CACHED_LEAVES_COUNT: usize = 100;
63const CACHED_BLOCKS_COUNT: usize = 100;
64const CACHED_VID_COMMON_COUNT: usize = 100;
65
66#[derive(custom_debug::Debug)]
67pub struct FileSystemStorageInner<Types>
68where
69    Types: NodeType,
70    Header<Types>: QueryableHeader<Types>,
71    Payload<Types>: QueryablePayload<Types>,
72{
73    index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
74    index_by_block_hash: HashMap<BlockHash<Types>, u64>,
75    index_by_payload_hash: HashMap<VidCommitment, u64>,
76    index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
77    index_by_time: BTreeMap<u64, Vec<u64>>,
78    num_transactions: usize,
79    payload_size: usize,
80    #[debug(skip)]
81    top_storage: Option<AtomicStore>,
82    leaf_storage: LedgerLog<LeafQueryData<Types>>,
83    block_storage: LedgerLog<BlockQueryData<Types>>,
84    vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
85    latest_qc_chain: Option<[CertificatePair<Types>; 2]>,
86}
87
88impl<Types> FileSystemStorageInner<Types>
89where
90    Types: NodeType,
91    Header<Types>: QueryableHeader<Types>,
92    Payload<Types>: QueryablePayload<Types>,
93{
94    fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
95        match id {
96            BlockId::Number(n) => Ok(n),
97            BlockId::Hash(h) => {
98                Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
99            },
100            BlockId::PayloadHash(h) => {
101                Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
102            },
103        }
104    }
105
106    fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
107        self.block_storage
108            .iter()
109            .nth(self.get_block_index(id)?)
110            .context(NotFoundSnafu)?
111            .context(MissingSnafu)
112    }
113
114    fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
115        self.get_block(id).map(|block| block.header)
116    }
117
118    fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
119    where
120        R: RangeBounds<usize> + Send,
121    {
122        Ok(range_iter(self.block_storage.iter(), range).collect())
123    }
124}
125
126/// Storage for the APIs provided in this crate, backed by a remote PostgreSQL database.
127#[derive(Debug)]
128pub struct FileSystemStorage<Types: NodeType>
129where
130    Header<Types>: QueryableHeader<Types>,
131    Payload<Types>: QueryablePayload<Types>,
132{
133    inner: RwLock<FileSystemStorageInner<Types>>,
134    metrics: PrometheusMetrics,
135}
136
137impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
138where
139    Header<Types>: QueryableHeader<Types>,
140    Payload<Types>: QueryablePayload<Types>,
141{
142}
143impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
144where
145    Header<Types>: QueryableHeader<Types>,
146    Payload<Types>: QueryablePayload<Types>,
147{
148    type Pruner = ();
149}
150
151#[async_trait]
152impl<Types: NodeType> MigrateTypes<Types> for FileSystemStorage<Types>
153where
154    Header<Types>: QueryableHeader<Types>,
155    Payload<Types>: QueryablePayload<Types>,
156{
157    async fn migrate_types(&self, _batch_size: u64) -> anyhow::Result<()> {
158        Ok(())
159    }
160}
161
162impl<Types: NodeType> FileSystemStorage<Types>
163where
164    Payload<Types>: QueryablePayload<Types>,
165    Header<Types>: QueryableHeader<Types>,
166{
167    /// Create a new [FileSystemStorage] with storage at `path`.
168    ///
169    /// If there is already data at `path`, it will be archived.
170    ///
171    /// The [FileSystemStorage] will manage its own persistence synchronization.
172    pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
173        let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
174        loader.retain_archives(1);
175        let data_source = Self::create_with_store(&mut loader).await?;
176        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
177        Ok(data_source)
178    }
179
180    /// Open an existing [FileSystemStorage] from storage at `path`.
181    ///
182    /// If there is no data at `path`, a new store will be created.
183    ///
184    /// The [FileSystemStorage] will manage its own persistence synchronization.
185    pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
186        let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
187        loader.retain_archives(1);
188        let data_source = Self::open_with_store(&mut loader).await?;
189        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
190        Ok(data_source)
191    }
192
193    /// Create a new [FileSystemStorage] using a persistent storage loader.
194    ///
195    /// If there is existing data corresponding to the [FileSystemStorage] data structures, it will
196    /// be archived.
197    ///
198    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
199    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
200    /// synchronization of the store.
201    pub async fn create_with_store(
202        loader: &mut AtomicStoreLoader,
203    ) -> Result<Self, PersistenceError> {
204        Ok(Self {
205            inner: RwLock::new(FileSystemStorageInner {
206                index_by_leaf_hash: Default::default(),
207                index_by_block_hash: Default::default(),
208                index_by_payload_hash: Default::default(),
209                index_by_txn_hash: Default::default(),
210                index_by_time: Default::default(),
211                num_transactions: 0,
212                payload_size: 0,
213                top_storage: None,
214                leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
215                block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
216                vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
217                latest_qc_chain: None,
218            }),
219            metrics: Default::default(),
220        })
221    }
222
223    /// Open an existing [FileSystemStorage] using a persistent storage loader.
224    ///
225    /// If there is no existing data corresponding to the [FileSystemStorage] data structures, a new
226    /// store will be created.
227    ///
228    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
229    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
230    /// synchronization of the store.
231    pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
232        let leaf_storage =
233            LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
234        let block_storage =
235            LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
236        let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
237            loader,
238            "vid_common",
239            CACHED_VID_COMMON_COUNT,
240        )?;
241
242        let mut index_by_block_hash = HashMap::new();
243        let mut index_by_payload_hash = HashMap::new();
244        let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
245        let index_by_leaf_hash = leaf_storage
246            .iter()
247            .flatten()
248            .map(|leaf| {
249                update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
250                update_index_by_hash(
251                    &mut index_by_payload_hash,
252                    leaf.payload_hash(),
253                    leaf.height(),
254                );
255                index_by_time
256                    .entry(leaf.header().timestamp())
257                    .or_default()
258                    .push(leaf.height());
259                (leaf.hash(), leaf.height())
260            })
261            .collect();
262
263        let mut index_by_txn_hash = HashMap::new();
264        let mut num_transactions = 0;
265        let mut payload_size = 0;
266        for block in block_storage.iter().flatten() {
267            num_transactions += block.len();
268            payload_size += block.size() as usize;
269
270            let height = block.height();
271            for (_, txn) in block.enumerate() {
272                update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
273            }
274        }
275
276        Ok(Self {
277            inner: RwLock::new(FileSystemStorageInner {
278                index_by_leaf_hash,
279                index_by_block_hash,
280                index_by_payload_hash,
281                index_by_txn_hash,
282                index_by_time,
283                num_transactions,
284                payload_size,
285                leaf_storage,
286                block_storage,
287                vid_storage,
288                top_storage: None,
289                latest_qc_chain: None,
290            }),
291            metrics: Default::default(),
292        })
293    }
294
295    /// Advance the version of the persistent store without committing changes to persistent state.
296    pub async fn skip_version(&self) -> Result<(), PersistenceError> {
297        let mut inner = self.inner.write().await;
298        inner.leaf_storage.skip_version()?;
299        inner.block_storage.skip_version()?;
300        inner.vid_storage.skip_version()?;
301        if let Some(store) = &mut inner.top_storage {
302            store.commit_version()?;
303        }
304        Ok(())
305    }
306
307    /// Get the stored VID share for a given block, if one exists.
308    pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
309        let mut tx = self.read().await.map_err(|err| QueryError::Error {
310            message: err.to_string(),
311        })?;
312        let share = tx.vid_share(block_id).await?;
313        Ok(share)
314    }
315
316    /// Get the stored VID common data for a given block, if one exists.
317    pub async fn get_vid_common(
318        &self,
319        block_id: BlockId<Types>,
320    ) -> QueryResult<VidCommonQueryData<Types>> {
321        let mut tx = self.read().await.map_err(|err| QueryError::Error {
322            message: err.to_string(),
323        })?;
324        let share = tx.get_vid_common(block_id).await?;
325        Ok(share)
326    }
327
328    /// Get the stored VID common metadata for a given block, if one exists.
329    pub async fn get_vid_common_metadata(
330        &self,
331        block_id: BlockId<Types>,
332    ) -> QueryResult<VidCommonMetadata<Types>> {
333        let mut tx = self.read().await.map_err(|err| QueryError::Error {
334            message: err.to_string(),
335        })?;
336        let share = tx.get_vid_common_metadata(block_id).await?;
337        Ok(share)
338    }
339}
340
341pub trait Revert {
342    fn revert(&mut self);
343}
344
345impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
346where
347    Types: NodeType,
348    Header<Types>: QueryableHeader<Types>,
349    Payload<Types>: QueryablePayload<Types>,
350{
351    fn revert(&mut self) {
352        self.leaf_storage.revert_version().unwrap();
353        self.block_storage.revert_version().unwrap();
354        self.vid_storage.revert_version().unwrap();
355    }
356}
357
358impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
359where
360    Types: NodeType,
361    Header<Types>: QueryableHeader<Types>,
362    Payload<Types>: QueryablePayload<Types>,
363{
364    fn revert(&mut self) {
365        // Nothing to revert for a read-only transaction.
366    }
367}
368
369#[derive(Debug)]
370pub struct Transaction<T: Revert> {
371    inner: T,
372}
373
374impl<T: Revert> Drop for Transaction<T> {
375    fn drop(&mut self) {
376        self.inner.revert();
377    }
378}
379impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
380where
381    Types: NodeType,
382    Header<Types>: QueryableHeader<Types>,
383    Payload<Types>: QueryablePayload<Types>,
384{
385    async fn commit(mut self) -> anyhow::Result<()> {
386        self.inner.leaf_storage.commit_version().await?;
387        self.inner.block_storage.commit_version().await?;
388        self.inner.vid_storage.commit_version().await?;
389        if let Some(store) = &mut self.inner.top_storage {
390            store.commit_version()?;
391        }
392        Ok(())
393    }
394
395    fn revert(self) -> impl Future + Send {
396        // The revert is handled when `self` is dropped.
397        async move {}
398    }
399}
400
401impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
402where
403    Types: NodeType,
404    Header<Types>: QueryableHeader<Types>,
405    Payload<Types>: QueryablePayload<Types>,
406{
407    async fn commit(self) -> anyhow::Result<()> {
408        // Nothing to commit for a read-only transaction.
409        Ok(())
410    }
411
412    fn revert(self) -> impl Future + Send {
413        // The revert is handled when `self` is dropped.
414        async move {}
415    }
416}
417
418impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
419where
420    Header<Types>: QueryableHeader<Types>,
421    Payload<Types>: QueryablePayload<Types>,
422{
423    type Transaction<'a>
424        = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
425    where
426        Self: 'a;
427    type ReadOnly<'a>
428        = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
429    where
430        Self: 'a;
431
432    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
433        Ok(Transaction {
434            inner: self.inner.write().await,
435        })
436    }
437
438    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
439        Ok(Transaction {
440            inner: self.inner.read().await,
441        })
442    }
443}
444fn range_iter<T>(
445    mut iter: Iter<'_, T>,
446    range: impl RangeBounds<usize>,
447) -> impl '_ + Iterator<Item = QueryResult<T>>
448where
449    T: Clone + Serialize + DeserializeOwned,
450{
451    let start = range.start_bound().cloned();
452    let end = range.end_bound().cloned();
453
454    // Advance the underlying iterator to the start of the range.
455    let pos = match start {
456        Bound::Included(n) => {
457            if n > 0 {
458                iter.nth(n - 1);
459            }
460            n
461        },
462        Bound::Excluded(n) => {
463            iter.nth(n);
464            n + 1
465        },
466        Bound::Unbounded => 0,
467    };
468
469    itertools::unfold((iter, end, pos), |(iter, end, pos)| {
470        // Check if we have reached the end of the range.
471        let reached_end = match end {
472            Bound::Included(n) => pos > n,
473            Bound::Excluded(n) => pos >= n,
474            Bound::Unbounded => false,
475        };
476        if reached_end {
477            return None;
478        }
479        let opt = iter.next()?;
480        *pos += 1;
481        Some(opt.context(MissingSnafu))
482    })
483}
484
485#[async_trait]
486impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
487where
488    Types: NodeType,
489    Payload<Types>: QueryablePayload<Types>,
490    Header<Types>: QueryableHeader<Types>,
491    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
492{
493    async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
494        let n = match id {
495            LeafId::Number(n) => n,
496            LeafId::Hash(h) => *self
497                .inner
498                .index_by_leaf_hash
499                .get(&h)
500                .context(NotFoundSnafu)? as usize,
501        };
502        self.inner
503            .leaf_storage
504            .iter()
505            .nth(n)
506            .context(NotFoundSnafu)?
507            .context(MissingSnafu)
508    }
509
510    async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
511        self.inner.get_block(id)
512    }
513
514    async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
515        self.inner.get_header(id)
516    }
517
518    async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
519        self.get_block(id).await.map(PayloadQueryData::from)
520    }
521
522    async fn get_payload_metadata(
523        &mut self,
524        id: BlockId<Types>,
525    ) -> QueryResult<PayloadMetadata<Types>> {
526        self.get_block(id).await.map(PayloadMetadata::from)
527    }
528
529    async fn get_vid_common(
530        &mut self,
531        id: BlockId<Types>,
532    ) -> QueryResult<VidCommonQueryData<Types>> {
533        Ok(self
534            .inner
535            .vid_storage
536            .iter()
537            .nth(self.inner.get_block_index(id)?)
538            .context(NotFoundSnafu)?
539            .context(MissingSnafu)?
540            .0)
541    }
542
543    async fn get_vid_common_metadata(
544        &mut self,
545        id: BlockId<Types>,
546    ) -> QueryResult<VidCommonMetadata<Types>> {
547        self.get_vid_common(id).await.map(VidCommonMetadata::from)
548    }
549
550    async fn get_leaf_range<R>(
551        &mut self,
552        range: R,
553    ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
554    where
555        R: RangeBounds<usize> + Send,
556    {
557        Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
558    }
559
560    async fn get_block_range<R>(
561        &mut self,
562        range: R,
563    ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
564    where
565        R: RangeBounds<usize> + Send,
566    {
567        self.inner.get_block_range(range)
568    }
569
570    async fn get_payload_range<R>(
571        &mut self,
572        range: R,
573    ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
574    where
575        R: RangeBounds<usize> + Send,
576    {
577        Ok(range_iter(self.inner.block_storage.iter(), range)
578            .map(|res| res.map(PayloadQueryData::from))
579            .collect())
580    }
581
582    async fn get_payload_metadata_range<R>(
583        &mut self,
584        range: R,
585    ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
586    where
587        R: RangeBounds<usize> + Send + 'static,
588    {
589        Ok(range_iter(self.inner.block_storage.iter(), range)
590            .map(|res| res.map(PayloadMetadata::from))
591            .collect())
592    }
593
594    async fn get_vid_common_range<R>(
595        &mut self,
596        range: R,
597    ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
598    where
599        R: RangeBounds<usize> + Send,
600    {
601        Ok(range_iter(self.inner.vid_storage.iter(), range)
602            .map(|res| res.map(|(common, _)| common))
603            .collect())
604    }
605
606    async fn get_vid_common_metadata_range<R>(
607        &mut self,
608        range: R,
609    ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
610    where
611        R: RangeBounds<usize> + Send,
612    {
613        Ok(range_iter(self.inner.vid_storage.iter(), range)
614            .map(|res| res.map(|(common, _)| common.into()))
615            .collect())
616    }
617
618    async fn get_block_with_transaction(
619        &mut self,
620        hash: TransactionHash<Types>,
621    ) -> QueryResult<BlockQueryData<Types>> {
622        let height = self
623            .inner
624            .index_by_txn_hash
625            .get(&hash)
626            .context(NotFoundSnafu)?;
627        self.inner.get_block((*height as usize).into())
628    }
629
630    async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
631        // The file system backend doesn't index by whether a leaf is present, so we can't
632        // efficiently seek to the first leaf with height >= `from`. Our best effort is to return
633        // `from` itself if we can, or fail.
634        self.get_leaf((from as usize).into()).await
635    }
636}
637
638impl<Types: NodeType> UpdateAvailabilityStorage<Types>
639    for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
640where
641    Payload<Types>: QueryablePayload<Types>,
642    Header<Types>: QueryableHeader<Types>,
643{
644    async fn insert_leaf_with_qc_chain(
645        &mut self,
646        leaf: LeafQueryData<Types>,
647        qc_chain: Option<[CertificatePair<Types>; 2]>,
648    ) -> anyhow::Result<()> {
649        self.inner
650            .leaf_storage
651            .insert(leaf.height() as usize, leaf.clone())?;
652        self.inner
653            .index_by_leaf_hash
654            .insert(leaf.hash(), leaf.height());
655        update_index_by_hash(
656            &mut self.inner.index_by_block_hash,
657            leaf.block_hash(),
658            leaf.height(),
659        );
660        update_index_by_hash(
661            &mut self.inner.index_by_payload_hash,
662            leaf.payload_hash(),
663            leaf.height(),
664        );
665        self.inner
666            .index_by_time
667            .entry(leaf.header().timestamp())
668            .or_default()
669            .push(leaf.height());
670
671        if leaf.height() + 1 >= (self.inner.leaf_storage.iter().len() as u64) {
672            // If this is the latest leaf we know about, also store it's QC chain so that we can
673            // prove to clients that this leaf is finalized. (If it is not the latest leaf, this
674            // is unnecessary, since we can prove it is an ancestor of some later, finalized
675            // leaf.)
676            if let Some(qc_chain) = qc_chain {
677                self.inner.latest_qc_chain = Some(qc_chain);
678            } else {
679                // Since we have a new latest leaf, we have to updated latest QC chain even if we
680                // don't actually have a QC chain to store.
681                self.inner.latest_qc_chain = None;
682            }
683        }
684
685        Ok(())
686    }
687
688    async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
689        if !self
690            .inner
691            .block_storage
692            .insert(block.height() as usize, block.clone())?
693        {
694            // The block was already present.
695            return Ok(());
696        }
697        self.inner.num_transactions += block.len();
698        self.inner.payload_size += block.size() as usize;
699        for (_, txn) in block.enumerate() {
700            update_index_by_hash(
701                &mut self.inner.index_by_txn_hash,
702                txn.commit(),
703                block.height(),
704            );
705        }
706        Ok(())
707    }
708
709    async fn insert_vid(
710        &mut self,
711        common: VidCommonQueryData<Types>,
712        share: Option<VidShare>,
713    ) -> anyhow::Result<()> {
714        self.inner
715            .vid_storage
716            .insert(common.height() as usize, (common, share))?;
717        Ok(())
718    }
719}
720
721/// Update an index mapping hashes of objects to their positions in the ledger.
722///
723/// This function will insert the mapping from `hash` to `pos` into `index`, _unless_ there is
724/// already an entry for `hash` at an earlier position in the ledger.
725fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
726    match index.entry(hash) {
727        Entry::Occupied(mut e) => {
728            if &pos < e.get() {
729                // Overwrite the existing entry if the new object was sequenced first.
730                e.insert(pos);
731            }
732        },
733        Entry::Vacant(e) => {
734            e.insert(pos);
735        },
736    }
737}
738
739#[async_trait]
740impl<Types, T> NodeStorage<Types> for Transaction<T>
741where
742    Types: NodeType,
743    Payload<Types>: QueryablePayload<Types>,
744    Header<Types>: QueryableHeader<Types>,
745    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
746{
747    async fn block_height(&mut self) -> QueryResult<usize> {
748        Ok(self.inner.leaf_storage.iter().len())
749    }
750
751    async fn count_transactions_in_range(
752        &mut self,
753        range: impl RangeBounds<usize> + Send,
754        namespace: Option<NamespaceId<Types>>,
755    ) -> QueryResult<usize> {
756        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
757            || !matches!(range.end_bound(), Bound::Unbounded)
758        {
759            return Err(QueryError::Error {
760                message: "partial aggregates are not supported with file system backend".into(),
761            });
762        }
763
764        if namespace.is_some() {
765            return Err(QueryError::Error {
766                message: "file system does not support per-namespace stats".into(),
767            });
768        }
769
770        Ok(self.inner.num_transactions)
771    }
772
773    async fn payload_size_in_range(
774        &mut self,
775        range: impl RangeBounds<usize> + Send,
776        namespace: Option<NamespaceId<Types>>,
777    ) -> QueryResult<usize> {
778        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
779            || !matches!(range.end_bound(), Bound::Unbounded)
780        {
781            return Err(QueryError::Error {
782                message: "partial aggregates are not supported with file system backend".into(),
783            });
784        }
785
786        if namespace.is_some() {
787            return Err(QueryError::Error {
788                message: "file system does not support per-namespace stats".into(),
789            });
790        }
791
792        Ok(self.inner.payload_size)
793    }
794
795    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
796    where
797        ID: Into<BlockId<Types>> + Send + Sync,
798    {
799        self.inner
800            .vid_storage
801            .iter()
802            .nth(self.inner.get_block_index(id.into())?)
803            .context(NotFoundSnafu)?
804            .context(MissingSnafu)?
805            .1
806            .context(MissingSnafu)
807    }
808
809    async fn sync_status(&mut self) -> QueryResult<SyncStatus> {
810        let height = self.inner.leaf_storage.iter().len();
811
812        // The number of missing VID common is just the number of completely missing VID
813        // entries, since every entry we have is guaranteed to have the common data.
814        let missing_vid = self.inner.vid_storage.missing(height);
815        // Missing shares includes the completely missing VID entries, plus any entry which
816        // is _not_ messing but which has a null share.
817        let null_vid_shares: usize = self
818            .inner
819            .vid_storage
820            .iter()
821            .map(|res| if matches!(res, Some((_, None))) { 1 } else { 0 })
822            .sum();
823        Ok(SyncStatus {
824            missing_blocks: self.inner.block_storage.missing(height),
825            missing_leaves: self.inner.leaf_storage.missing(height),
826            missing_vid_common: missing_vid,
827            missing_vid_shares: missing_vid + null_vid_shares,
828            pruned_height: None,
829        })
830    }
831
832    async fn get_header_window(
833        &mut self,
834        start: impl Into<WindowStart<Types>> + Send + Sync,
835        end: u64,
836        limit: usize,
837    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
838        let first_block = match start.into() {
839            WindowStart::Height(h) => h,
840            WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
841            WindowStart::Time(t) => {
842                // Find the minimum timestamp which is at least `t`, and all the blocks with
843                // that timestamp.
844                let blocks = self
845                    .inner
846                    .index_by_time
847                    .range(t..)
848                    .next()
849                    .context(NotFoundSnafu)?
850                    .1;
851                // Multiple blocks can have the same timestamp (when truncated to seconds);
852                // we want the first one. It is an invariant that any timestamp which has an
853                // entry in `index_by_time` has a non-empty list associated with it, so this
854                // indexing is safe.
855                blocks[0]
856            },
857        } as usize;
858
859        let mut res = TimeWindowQueryData::default();
860
861        // Include the block just before the start of the window, if there is one.
862        if first_block > 0 {
863            res.prev = Some(self.inner.get_header((first_block - 1).into())?);
864        }
865
866        // Add blocks to the window, starting from `first_block`, until we reach the end of
867        // the requested time window.
868        for block in self.inner.get_block_range(first_block..)? {
869            let header = block?.header().clone();
870            if header.timestamp() >= end {
871                res.next = Some(header);
872                break;
873            }
874            res.window.push(header);
875            if res.window.len() >= limit {
876                break;
877            }
878        }
879
880        Ok(res)
881    }
882
883    async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
884        Ok(self.inner.latest_qc_chain.clone())
885    }
886}
887
888impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
889where
890    Types: NodeType,
891    Header<Types>: QueryableHeader<Types>,
892{
893    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
894        Ok(0)
895    }
896
897    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
898        Ok(None)
899    }
900}
901
902impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
903where
904    Types: NodeType,
905    Header<Types>: QueryableHeader<Types>,
906{
907    async fn update_aggregates(
908        &mut self,
909        _prev: Aggregate<Types>,
910        _blocks: &[PayloadMetadata<Types>],
911    ) -> anyhow::Result<Aggregate<Types>> {
912        Ok(Aggregate::default())
913    }
914}
915
916impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
917
918impl<Types> HasMetrics for FileSystemStorage<Types>
919where
920    Types: NodeType,
921    Header<Types>: QueryableHeader<Types>,
922    Payload<Types>: QueryablePayload<Types>,
923{
924    fn metrics(&self) -> &PrometheusMetrics {
925        &self.metrics
926    }
927}