hotshot_query_service/data_source/storage/
fs.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{
16    collections::{
17        BTreeMap,
18        hash_map::{Entry, HashMap},
19    },
20    hash::Hash,
21    iter,
22    ops::{Bound, Deref, RangeBounds},
23    path::Path,
24};
25
26use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use async_trait::async_trait;
28use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
29use committable::Committable;
30use futures::future::Future;
31use hotshot_types::{
32    data::{VidCommitment, VidShare},
33    simple_certificate::CertificatePair,
34    traits::{block_contents::BlockHeader, node_implementation::NodeType},
35};
36use serde::{Serialize, de::DeserializeOwned};
37use snafu::OptionExt;
38
39use super::{
40    Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
41    UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
42    ledger_log::{Iter, LedgerLog},
43    pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
44};
45use crate::{
46    Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
47    availability::{
48        NamespaceId,
49        data_source::{BlockId, LeafId},
50        query_data::{
51            BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52            QueryablePayload, TransactionHash, VidCommonQueryData,
53        },
54    },
55    data_source::{VersionedDataSource, update},
56    metrics::PrometheusMetrics,
57    node::{SyncStatusQueryData, TimeWindowQueryData, WindowStart},
58    status::HasMetrics,
59    types::HeightIndexed,
60};
61
62const CACHED_LEAVES_COUNT: usize = 100;
63const CACHED_BLOCKS_COUNT: usize = 100;
64const CACHED_VID_COMMON_COUNT: usize = 100;
65
66#[derive(custom_debug::Debug)]
67pub struct FileSystemStorageInner<Types>
68where
69    Types: NodeType,
70    Header<Types>: QueryableHeader<Types>,
71    Payload<Types>: QueryablePayload<Types>,
72{
73    index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
74    index_by_block_hash: HashMap<BlockHash<Types>, u64>,
75    index_by_payload_hash: HashMap<VidCommitment, u64>,
76    index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
77    index_by_time: BTreeMap<u64, Vec<u64>>,
78    num_transactions: usize,
79    payload_size: usize,
80    #[debug(skip)]
81    top_storage: Option<AtomicStore>,
82    leaf_storage: LedgerLog<LeafQueryData<Types>>,
83    block_storage: LedgerLog<BlockQueryData<Types>>,
84    vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
85    latest_qc_chain: Option<[CertificatePair<Types>; 2]>,
86}
87
88impl<Types> FileSystemStorageInner<Types>
89where
90    Types: NodeType,
91    Header<Types>: QueryableHeader<Types>,
92    Payload<Types>: QueryablePayload<Types>,
93{
94    fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
95        match id {
96            BlockId::Number(n) => Ok(n),
97            BlockId::Hash(h) => {
98                Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
99            },
100            BlockId::PayloadHash(h) => {
101                Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
102            },
103        }
104    }
105
106    fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
107        self.block_storage
108            .iter()
109            .nth(self.get_block_index(id)?)
110            .context(NotFoundSnafu)?
111            .context(MissingSnafu)
112    }
113
114    fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
115        self.get_block(id).map(|block| block.header)
116    }
117
118    fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
119    where
120        R: RangeBounds<usize> + Send,
121    {
122        Ok(range_iter(self.block_storage.iter(), range).collect())
123    }
124}
125
126/// Storage for the APIs provided in this crate, backed by a remote PostgreSQL database.
127#[derive(Debug)]
128pub struct FileSystemStorage<Types: NodeType>
129where
130    Header<Types>: QueryableHeader<Types>,
131    Payload<Types>: QueryablePayload<Types>,
132{
133    inner: RwLock<FileSystemStorageInner<Types>>,
134    metrics: PrometheusMetrics,
135}
136
137impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
138where
139    Header<Types>: QueryableHeader<Types>,
140    Payload<Types>: QueryablePayload<Types>,
141{
142}
143impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
144where
145    Header<Types>: QueryableHeader<Types>,
146    Payload<Types>: QueryablePayload<Types>,
147{
148    type Pruner = ();
149}
150
151impl<Types: NodeType> FileSystemStorage<Types>
152where
153    Payload<Types>: QueryablePayload<Types>,
154    Header<Types>: QueryableHeader<Types>,
155{
156    /// Create a new [FileSystemStorage] with storage at `path`.
157    ///
158    /// If there is already data at `path`, it will be archived.
159    ///
160    /// The [FileSystemStorage] will manage its own persistence synchronization.
161    pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
162        let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
163        loader.retain_archives(1);
164        let data_source = Self::create_with_store(&mut loader).await?;
165        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
166        Ok(data_source)
167    }
168
169    /// Open an existing [FileSystemStorage] from storage at `path`.
170    ///
171    /// If there is no data at `path`, a new store will be created.
172    ///
173    /// The [FileSystemStorage] will manage its own persistence synchronization.
174    pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
175        let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
176        loader.retain_archives(1);
177        let data_source = Self::open_with_store(&mut loader).await?;
178        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
179        Ok(data_source)
180    }
181
182    /// Create a new [FileSystemStorage] using a persistent storage loader.
183    ///
184    /// If there is existing data corresponding to the [FileSystemStorage] data structures, it will
185    /// be archived.
186    ///
187    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
188    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
189    /// synchronization of the store.
190    pub async fn create_with_store(
191        loader: &mut AtomicStoreLoader,
192    ) -> Result<Self, PersistenceError> {
193        Ok(Self {
194            inner: RwLock::new(FileSystemStorageInner {
195                index_by_leaf_hash: Default::default(),
196                index_by_block_hash: Default::default(),
197                index_by_payload_hash: Default::default(),
198                index_by_txn_hash: Default::default(),
199                index_by_time: Default::default(),
200                num_transactions: 0,
201                payload_size: 0,
202                top_storage: None,
203                leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
204                block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
205                vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
206                latest_qc_chain: None,
207            }),
208            metrics: Default::default(),
209        })
210    }
211
212    /// Open an existing [FileSystemStorage] using a persistent storage loader.
213    ///
214    /// If there is no existing data corresponding to the [FileSystemStorage] data structures, a new
215    /// store will be created.
216    ///
217    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
218    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
219    /// synchronization of the store.
220    pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
221        let leaf_storage =
222            LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
223        let block_storage =
224            LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
225        let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
226            loader,
227            "vid_common",
228            CACHED_VID_COMMON_COUNT,
229        )?;
230
231        let mut index_by_block_hash = HashMap::new();
232        let mut index_by_payload_hash = HashMap::new();
233        let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
234        let index_by_leaf_hash = leaf_storage
235            .iter()
236            .flatten()
237            .map(|leaf| {
238                update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
239                update_index_by_hash(
240                    &mut index_by_payload_hash,
241                    leaf.payload_hash(),
242                    leaf.height(),
243                );
244                index_by_time
245                    .entry(leaf.header().timestamp())
246                    .or_default()
247                    .push(leaf.height());
248                (leaf.hash(), leaf.height())
249            })
250            .collect();
251
252        let mut index_by_txn_hash = HashMap::new();
253        let mut num_transactions = 0;
254        let mut payload_size = 0;
255        for block in block_storage.iter().flatten() {
256            num_transactions += block.len();
257            payload_size += block.size() as usize;
258
259            let height = block.height();
260            for (_, txn) in block.enumerate() {
261                update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
262            }
263        }
264
265        Ok(Self {
266            inner: RwLock::new(FileSystemStorageInner {
267                index_by_leaf_hash,
268                index_by_block_hash,
269                index_by_payload_hash,
270                index_by_txn_hash,
271                index_by_time,
272                num_transactions,
273                payload_size,
274                leaf_storage,
275                block_storage,
276                vid_storage,
277                top_storage: None,
278                latest_qc_chain: None,
279            }),
280            metrics: Default::default(),
281        })
282    }
283
284    /// Advance the version of the persistent store without committing changes to persistent state.
285    pub async fn skip_version(&self) -> Result<(), PersistenceError> {
286        let mut inner = self.inner.write().await;
287        inner.leaf_storage.skip_version()?;
288        inner.block_storage.skip_version()?;
289        inner.vid_storage.skip_version()?;
290        if let Some(store) = &mut inner.top_storage {
291            store.commit_version()?;
292        }
293        Ok(())
294    }
295
296    /// Get the stored VID share for a given block, if one exists.
297    pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
298        let mut tx = self.read().await.map_err(|err| QueryError::Error {
299            message: err.to_string(),
300        })?;
301        let share = tx.vid_share(block_id).await?;
302        Ok(share)
303    }
304
305    /// Get the stored VID common data for a given block, if one exists.
306    pub async fn get_vid_common(
307        &self,
308        block_id: BlockId<Types>,
309    ) -> QueryResult<VidCommonQueryData<Types>> {
310        let mut tx = self.read().await.map_err(|err| QueryError::Error {
311            message: err.to_string(),
312        })?;
313        let share = tx.get_vid_common(block_id).await?;
314        Ok(share)
315    }
316
317    /// Get the stored VID common metadata for a given block, if one exists.
318    pub async fn get_vid_common_metadata(
319        &self,
320        block_id: BlockId<Types>,
321    ) -> QueryResult<VidCommonMetadata<Types>> {
322        let mut tx = self.read().await.map_err(|err| QueryError::Error {
323            message: err.to_string(),
324        })?;
325        let share = tx.get_vid_common_metadata(block_id).await?;
326        Ok(share)
327    }
328}
329
330pub trait Revert {
331    fn revert(&mut self);
332}
333
334impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
335where
336    Types: NodeType,
337    Header<Types>: QueryableHeader<Types>,
338    Payload<Types>: QueryablePayload<Types>,
339{
340    fn revert(&mut self) {
341        self.leaf_storage.revert_version().unwrap();
342        self.block_storage.revert_version().unwrap();
343        self.vid_storage.revert_version().unwrap();
344    }
345}
346
347impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
348where
349    Types: NodeType,
350    Header<Types>: QueryableHeader<Types>,
351    Payload<Types>: QueryablePayload<Types>,
352{
353    fn revert(&mut self) {
354        // Nothing to revert for a read-only transaction.
355    }
356}
357
358#[derive(Debug)]
359pub struct Transaction<T: Revert> {
360    inner: T,
361}
362
363impl<T: Revert> Drop for Transaction<T> {
364    fn drop(&mut self) {
365        self.inner.revert();
366    }
367}
368impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
369where
370    Types: NodeType,
371    Header<Types>: QueryableHeader<Types>,
372    Payload<Types>: QueryablePayload<Types>,
373{
374    async fn commit(mut self) -> anyhow::Result<()> {
375        self.inner.leaf_storage.commit_version().await?;
376        self.inner.block_storage.commit_version().await?;
377        self.inner.vid_storage.commit_version().await?;
378        if let Some(store) = &mut self.inner.top_storage {
379            store.commit_version()?;
380        }
381        Ok(())
382    }
383
384    fn revert(self) -> impl Future + Send {
385        // The revert is handled when `self` is dropped.
386        async move {}
387    }
388}
389
390impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
391where
392    Types: NodeType,
393    Header<Types>: QueryableHeader<Types>,
394    Payload<Types>: QueryablePayload<Types>,
395{
396    async fn commit(self) -> anyhow::Result<()> {
397        // Nothing to commit for a read-only transaction.
398        Ok(())
399    }
400
401    fn revert(self) -> impl Future + Send {
402        // The revert is handled when `self` is dropped.
403        async move {}
404    }
405}
406
407impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
408where
409    Header<Types>: QueryableHeader<Types>,
410    Payload<Types>: QueryablePayload<Types>,
411{
412    type Transaction<'a>
413        = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
414    where
415        Self: 'a;
416    type ReadOnly<'a>
417        = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
418    where
419        Self: 'a;
420
421    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
422        Ok(Transaction {
423            inner: self.inner.write().await,
424        })
425    }
426
427    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
428        Ok(Transaction {
429            inner: self.inner.read().await,
430        })
431    }
432}
433fn range_iter<T>(
434    mut iter: Iter<'_, T>,
435    range: impl RangeBounds<usize>,
436) -> impl '_ + Iterator<Item = QueryResult<T>>
437where
438    T: Clone + Serialize + DeserializeOwned,
439{
440    let start = range.start_bound().cloned();
441    let end = range.end_bound().cloned();
442
443    // Advance the underlying iterator to the start of the range.
444    let mut pos = match start {
445        Bound::Included(n) => {
446            if n > 0 {
447                iter.nth(n - 1);
448            }
449            n
450        },
451        Bound::Excluded(n) => {
452            iter.nth(n);
453            n + 1
454        },
455        Bound::Unbounded => 0,
456    };
457
458    iter::from_fn(move || {
459        // Check if we have reached the end of the range.
460        let reached_end = match end {
461            Bound::Included(n) => pos > n,
462            Bound::Excluded(n) => pos >= n,
463            Bound::Unbounded => false,
464        };
465        if reached_end {
466            return None;
467        }
468        let opt = iter.next()?;
469        pos += 1;
470        Some(opt.context(MissingSnafu))
471    })
472}
473
474#[async_trait]
475impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
476where
477    Types: NodeType,
478    Payload<Types>: QueryablePayload<Types>,
479    Header<Types>: QueryableHeader<Types>,
480    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
481{
482    async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
483        let n = match id {
484            LeafId::Number(n) => n,
485            LeafId::Hash(h) => *self
486                .inner
487                .index_by_leaf_hash
488                .get(&h)
489                .context(NotFoundSnafu)? as usize,
490        };
491        self.inner
492            .leaf_storage
493            .iter()
494            .nth(n)
495            .context(NotFoundSnafu)?
496            .context(MissingSnafu)
497    }
498
499    async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
500        self.inner.get_block(id)
501    }
502
503    async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
504        self.inner.get_header(id)
505    }
506
507    async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
508        self.get_block(id).await.map(PayloadQueryData::from)
509    }
510
511    async fn get_payload_metadata(
512        &mut self,
513        id: BlockId<Types>,
514    ) -> QueryResult<PayloadMetadata<Types>> {
515        self.get_block(id).await.map(PayloadMetadata::from)
516    }
517
518    async fn get_vid_common(
519        &mut self,
520        id: BlockId<Types>,
521    ) -> QueryResult<VidCommonQueryData<Types>> {
522        Ok(self
523            .inner
524            .vid_storage
525            .iter()
526            .nth(self.inner.get_block_index(id)?)
527            .context(NotFoundSnafu)?
528            .context(MissingSnafu)?
529            .0)
530    }
531
532    async fn get_vid_common_metadata(
533        &mut self,
534        id: BlockId<Types>,
535    ) -> QueryResult<VidCommonMetadata<Types>> {
536        self.get_vid_common(id).await.map(VidCommonMetadata::from)
537    }
538
539    async fn get_leaf_range<R>(
540        &mut self,
541        range: R,
542    ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
543    where
544        R: RangeBounds<usize> + Send,
545    {
546        Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
547    }
548
549    async fn get_block_range<R>(
550        &mut self,
551        range: R,
552    ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
553    where
554        R: RangeBounds<usize> + Send,
555    {
556        self.inner.get_block_range(range)
557    }
558
559    async fn get_payload_range<R>(
560        &mut self,
561        range: R,
562    ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
563    where
564        R: RangeBounds<usize> + Send,
565    {
566        Ok(range_iter(self.inner.block_storage.iter(), range)
567            .map(|res| res.map(PayloadQueryData::from))
568            .collect())
569    }
570
571    async fn get_payload_metadata_range<R>(
572        &mut self,
573        range: R,
574    ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
575    where
576        R: RangeBounds<usize> + Send + 'static,
577    {
578        Ok(range_iter(self.inner.block_storage.iter(), range)
579            .map(|res| res.map(PayloadMetadata::from))
580            .collect())
581    }
582
583    async fn get_vid_common_range<R>(
584        &mut self,
585        range: R,
586    ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
587    where
588        R: RangeBounds<usize> + Send,
589    {
590        Ok(range_iter(self.inner.vid_storage.iter(), range)
591            .map(|res| res.map(|(common, _)| common))
592            .collect())
593    }
594
595    async fn get_vid_common_metadata_range<R>(
596        &mut self,
597        range: R,
598    ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
599    where
600        R: RangeBounds<usize> + Send,
601    {
602        Ok(range_iter(self.inner.vid_storage.iter(), range)
603            .map(|res| res.map(|(common, _)| common.into()))
604            .collect())
605    }
606
607    async fn get_block_with_transaction(
608        &mut self,
609        hash: TransactionHash<Types>,
610    ) -> QueryResult<BlockQueryData<Types>> {
611        let height = self
612            .inner
613            .index_by_txn_hash
614            .get(&hash)
615            .context(NotFoundSnafu)?;
616        self.inner.get_block((*height as usize).into())
617    }
618
619    async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
620        // The file system backend doesn't index by whether a leaf is present, so we can't
621        // efficiently seek to the first leaf with height >= `from`. Our best effort is to return
622        // `from` itself if we can, or fail.
623        self.get_leaf((from as usize).into()).await
624    }
625}
626
627impl<Types: NodeType> UpdateAvailabilityStorage<Types>
628    for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
629where
630    Payload<Types>: QueryablePayload<Types>,
631    Header<Types>: QueryableHeader<Types>,
632{
633    async fn insert_leaf_with_qc_chain(
634        &mut self,
635        leaf: LeafQueryData<Types>,
636        qc_chain: Option<[CertificatePair<Types>; 2]>,
637    ) -> anyhow::Result<()> {
638        self.inner
639            .leaf_storage
640            .insert(leaf.height() as usize, leaf.clone())?;
641        self.inner
642            .index_by_leaf_hash
643            .insert(leaf.hash(), leaf.height());
644        update_index_by_hash(
645            &mut self.inner.index_by_block_hash,
646            leaf.block_hash(),
647            leaf.height(),
648        );
649        update_index_by_hash(
650            &mut self.inner.index_by_payload_hash,
651            leaf.payload_hash(),
652            leaf.height(),
653        );
654        self.inner
655            .index_by_time
656            .entry(leaf.header().timestamp())
657            .or_default()
658            .push(leaf.height());
659
660        if leaf.height() + 1 >= (self.inner.leaf_storage.iter().len() as u64) {
661            // If this is the latest leaf we know about, also store it's QC chain so that we can
662            // prove to clients that this leaf is finalized. (If it is not the latest leaf, this
663            // is unnecessary, since we can prove it is an ancestor of some later, finalized
664            // leaf.)
665            if let Some(qc_chain) = qc_chain {
666                self.inner.latest_qc_chain = Some(qc_chain);
667            } else {
668                // Since we have a new latest leaf, we have to updated latest QC chain even if we
669                // don't actually have a QC chain to store.
670                self.inner.latest_qc_chain = None;
671            }
672        }
673
674        Ok(())
675    }
676
677    async fn insert_block(&mut self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
678        if !self
679            .inner
680            .block_storage
681            .insert(block.height() as usize, block.clone())?
682        {
683            // The block was already present.
684            return Ok(());
685        }
686        self.inner.num_transactions += block.len();
687        self.inner.payload_size += block.size() as usize;
688        for (_, txn) in block.enumerate() {
689            update_index_by_hash(
690                &mut self.inner.index_by_txn_hash,
691                txn.commit(),
692                block.height(),
693            );
694        }
695        Ok(())
696    }
697
698    async fn insert_vid(
699        &mut self,
700        common: VidCommonQueryData<Types>,
701        share: Option<VidShare>,
702    ) -> anyhow::Result<()> {
703        self.inner
704            .vid_storage
705            .insert(common.height() as usize, (common, share))?;
706        Ok(())
707    }
708}
709
710/// Update an index mapping hashes of objects to their positions in the ledger.
711///
712/// This function will insert the mapping from `hash` to `pos` into `index`, _unless_ there is
713/// already an entry for `hash` at an earlier position in the ledger.
714fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
715    match index.entry(hash) {
716        Entry::Occupied(mut e) => {
717            if &pos < e.get() {
718                // Overwrite the existing entry if the new object was sequenced first.
719                e.insert(pos);
720            }
721        },
722        Entry::Vacant(e) => {
723            e.insert(pos);
724        },
725    }
726}
727
728#[async_trait]
729impl<Types, T> NodeStorage<Types> for Transaction<T>
730where
731    Types: NodeType,
732    Payload<Types>: QueryablePayload<Types>,
733    Header<Types>: QueryableHeader<Types>,
734    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
735{
736    async fn block_height(&mut self) -> QueryResult<usize> {
737        Ok(self.inner.leaf_storage.iter().len())
738    }
739
740    async fn count_transactions_in_range(
741        &mut self,
742        range: impl RangeBounds<usize> + Send,
743        namespace: Option<NamespaceId<Types>>,
744    ) -> QueryResult<usize> {
745        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
746            || !matches!(range.end_bound(), Bound::Unbounded)
747        {
748            return Err(QueryError::Error {
749                message: "partial aggregates are not supported with file system backend".into(),
750            });
751        }
752
753        if namespace.is_some() {
754            return Err(QueryError::Error {
755                message: "file system does not support per-namespace stats".into(),
756            });
757        }
758
759        Ok(self.inner.num_transactions)
760    }
761
762    async fn payload_size_in_range(
763        &mut self,
764        range: impl RangeBounds<usize> + Send,
765        namespace: Option<NamespaceId<Types>>,
766    ) -> QueryResult<usize> {
767        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
768            || !matches!(range.end_bound(), Bound::Unbounded)
769        {
770            return Err(QueryError::Error {
771                message: "partial aggregates are not supported with file system backend".into(),
772            });
773        }
774
775        if namespace.is_some() {
776            return Err(QueryError::Error {
777                message: "file system does not support per-namespace stats".into(),
778            });
779        }
780
781        Ok(self.inner.payload_size)
782    }
783
784    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
785    where
786        ID: Into<BlockId<Types>> + Send + Sync,
787    {
788        self.inner
789            .vid_storage
790            .iter()
791            .nth(self.inner.get_block_index(id.into())?)
792            .context(NotFoundSnafu)?
793            .context(MissingSnafu)?
794            .1
795            .context(MissingSnafu)
796    }
797
798    async fn sync_status_for_range(
799        &mut self,
800        start: usize,
801        end: usize,
802    ) -> QueryResult<SyncStatusQueryData> {
803        Ok(SyncStatusQueryData {
804            leaves: self.inner.leaf_storage.sync_status(start, end, |_| false),
805            blocks: self.inner.block_storage.sync_status(start, end, |_| false),
806            vid_common: self.inner.vid_storage.sync_status(start, end, |_| false),
807            vid_shares: self
808                .inner
809                .vid_storage
810                // Missing shares includes the completely missing VID entries, plus any entry which
811                // is _not_ missing but which has a null share.
812                .sync_status(start, end, |(_, share)| share.is_none()),
813            pruned_height: None,
814        })
815    }
816
817    async fn get_header_window(
818        &mut self,
819        start: impl Into<WindowStart<Types>> + Send + Sync,
820        end: u64,
821        limit: usize,
822    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
823        let first_block = match start.into() {
824            WindowStart::Height(h) => h,
825            WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
826            WindowStart::Time(t) => {
827                // Find the minimum timestamp which is at least `t`, and all the blocks with
828                // that timestamp.
829                let blocks = self
830                    .inner
831                    .index_by_time
832                    .range(t..)
833                    .next()
834                    .context(NotFoundSnafu)?
835                    .1;
836                // Multiple blocks can have the same timestamp (when truncated to seconds);
837                // we want the first one. It is an invariant that any timestamp which has an
838                // entry in `index_by_time` has a non-empty list associated with it, so this
839                // indexing is safe.
840                blocks[0]
841            },
842        } as usize;
843
844        let mut res = TimeWindowQueryData::default();
845
846        // Include the block just before the start of the window, if there is one.
847        if first_block > 0 {
848            res.prev = Some(self.inner.get_header((first_block - 1).into())?);
849        }
850
851        // Add blocks to the window, starting from `first_block`, until we reach the end of
852        // the requested time window.
853        for block in self.inner.get_block_range(first_block..)? {
854            let header = block?.header().clone();
855            if header.timestamp() >= end {
856                res.next = Some(header);
857                break;
858            }
859            res.window.push(header);
860            if res.window.len() >= limit {
861                break;
862            }
863        }
864
865        Ok(res)
866    }
867
868    async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
869        Ok(self.inner.latest_qc_chain.clone())
870    }
871}
872
873impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
874where
875    Types: NodeType,
876    Header<Types>: QueryableHeader<Types>,
877{
878    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
879        Ok(0)
880    }
881
882    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
883        Ok(None)
884    }
885}
886
887impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
888where
889    Types: NodeType,
890    Header<Types>: QueryableHeader<Types>,
891{
892    async fn update_aggregates(
893        &mut self,
894        _prev: Aggregate<Types>,
895        _blocks: &[PayloadMetadata<Types>],
896    ) -> anyhow::Result<Aggregate<Types>> {
897        Ok(Aggregate::default())
898    }
899}
900
901impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
902
903impl<Types> HasMetrics for FileSystemStorage<Types>
904where
905    Types: NodeType,
906    Header<Types>: QueryableHeader<Types>,
907    Payload<Types>: QueryablePayload<Types>,
908{
909    fn metrics(&self) -> &PrometheusMetrics {
910        &self.metrics
911    }
912}