hotshot_query_service/data_source/storage/
ledger_log.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{collections::VecDeque, fmt::Debug};
16
17use atomic_store::{
18    AppendLog, AtomicStoreLoader, PersistenceError, append_log, load_store::BincodeLoadStore,
19};
20use serde::{Serialize, de::DeserializeOwned};
21use tracing::{debug, instrument, warn};
22
23use crate::node::{ResourceSyncStatus, SyncStatus, SyncStatusRange};
24
25/// A caching append log for ledger objects.
26#[derive(Debug)]
27pub(crate) struct LedgerLog<T: Serialize + DeserializeOwned> {
28    cache_start: usize,
29    cache_size: usize,
30    cache: VecDeque<Option<T>>,
31    store: AppendLog<BincodeLoadStore<Option<T>>>,
32    // Keep track of the number of appended objects which have not yet been committed. We need this
33    // to detect when we are inserting at the end of the log or in the middle, as the two casese are
34    // handled differently and `self.store.iter().len()` does not update until a new version is
35    // committed.
36    pending_inserts: usize,
37    // Track the number of missing objects, for health reporting.
38    missing: usize,
39}
40
41impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
42    pub(crate) fn create(
43        loader: &mut AtomicStoreLoader,
44        file_pattern: &str,
45        cache_size: usize,
46    ) -> Result<Self, PersistenceError> {
47        Ok(Self {
48            cache_start: 0,
49            cache_size,
50            cache: VecDeque::with_capacity(cache_size),
51            store: AppendLog::create(
52                loader,
53                Default::default(),
54                file_pattern,
55                10u64 << 20, // 10 MB
56            )?,
57            pending_inserts: 0,
58            missing: 0,
59        })
60    }
61
62    pub(crate) fn open(
63        loader: &mut AtomicStoreLoader,
64        file_pattern: &str,
65        cache_size: usize,
66    ) -> Result<Self, PersistenceError> {
67        let store = AppendLog::load(
68            loader,
69            Default::default(),
70            file_pattern,
71            1u64 << 20, // 1 MB
72        )?;
73        let len = store.iter().len();
74        tracing::info!("loading LedgerLog {}, len={}", file_pattern, len);
75
76        let cache_start = len.saturating_sub(cache_size);
77        let mut missing = 0;
78        let mut cache = store
79            .iter()
80            .skip(cache_start)
81            .map(|r| {
82                if let Err(e) = &r {
83                    warn!("failed to load object. Error: {}", e);
84                }
85                // We treat missing objects and failed-to-load objects the same:
86                // if we failed to load a object, it is now missing!
87                let obj = r.ok().flatten();
88                if obj.is_none() {
89                    missing += 1;
90                }
91                obj
92            })
93            .collect::<VecDeque<_>>();
94        cache.reserve_exact(cache_size - cache.len());
95
96        Ok(Self {
97            cache_start,
98            cache_size,
99            cache,
100            store,
101            pending_inserts: 0,
102            missing,
103        })
104    }
105
106    pub(crate) fn iter(&self) -> Iter<'_, T> {
107        Iter {
108            index: 0,
109            cache_start: self.cache_start,
110            cache: &self.cache,
111            store: self.store.iter(),
112        }
113    }
114
115    pub(crate) fn store_resource(&mut self, resource: Option<T>) -> Result<(), PersistenceError> {
116        let missing = resource.is_none();
117        self.store.store_resource(&resource)?;
118        self.pending_inserts += 1;
119        if missing {
120            self.missing += 1;
121        }
122        if self.cache.len() >= self.cache_size {
123            self.cache.pop_front();
124            self.cache_start += 1;
125        }
126        self.cache.push_back(resource);
127        Ok(())
128    }
129
130    /// Insert an object at position `index`.
131    ///
132    /// Returns whether the object was newly inserted; that is, returns `false` if and only if there
133    /// was already an object present at this index.
134    pub(crate) fn insert(&mut self, index: usize, object: T) -> Result<bool, PersistenceError>
135    where
136        T: Debug,
137    {
138        // If there are missing objects between what we currently have and `object`, pad with
139        // placeholders.
140        let len = self.iter().len();
141        let target_len = std::cmp::max(index, len);
142        for i in len..target_len {
143            debug!("storing placeholders for position {i}/{target_len}");
144            if let Err(err) = self.store_resource(None) {
145                warn!("Failed to store placeholder: {}", err);
146                return Err(err);
147            }
148        }
149        assert!(target_len >= index);
150        if target_len == index {
151            // This is the next object in the chain, append it to the log.
152            if let Err(err) = self.store_resource(Some(object)) {
153                warn!("Failed to store object at index {}: {}", index, err);
154                return Err(err);
155            }
156            Ok(true)
157        } else if matches!(self.iter().nth(index), Some(Some(_))) {
158            // This is a duplicate, we don't have to insert anything.
159            Ok(false)
160        } else {
161            // This is an object earlier in the chain that we are now receiving asynchronously.
162            // Update the placeholder with the actual contents of the object.
163            // TODO update persistent storage once AppendLog supports updates.
164            // See: https://github.com/EspressoSystems/hotshot-query-service/issues/16
165            warn!(
166                index,
167                len, target_len, "skipping out-of-order object; random inserts not yet supported"
168            );
169
170            // TODO Update the object in cache if necessary. Note that we could do this now, even
171            // without support for storing the object persistently. But this makes the cache out of
172            // sync with persistent storage, and it means we have an object available that will
173            // become unavailable once it drops out of cache, which is not really what we want.
174            // See: https://github.com/EspressoSystems/hotshot-query-service/issues/16
175            // if index >= self.cache_start {
176            //     self.cache[index - self.cache_start] = Some(object);
177            // }
178            Ok(true)
179        }
180    }
181
182    pub(crate) async fn commit_version(&mut self) -> Result<(), PersistenceError> {
183        tracing::debug!("committing new version of LedgerLog");
184        self.store.commit_version()?;
185        self.pending_inserts = 0;
186        Ok(())
187    }
188
189    pub(crate) fn skip_version(&mut self) -> Result<(), PersistenceError> {
190        self.store.skip_version()
191    }
192
193    pub(crate) fn revert_version(&mut self) -> Result<(), PersistenceError> {
194        self.store.revert_version()?;
195
196        // Remove objects which were inserted in cache but not committed to storage.
197        for _ in 0..self.pending_inserts {
198            self.cache.pop_back();
199        }
200
201        self.pending_inserts = 0;
202        Ok(())
203    }
204
205    /// Find the sync status of this resource.
206    ///
207    /// This function will find all consecutive ranges of a given [`SyncStatus`] in
208    /// `[0, block_height)`. The predicate `is_missing` can be used to consider an object missing
209    /// even if there is actually an object present in the database (for example if some field is
210    /// [`None`]).
211    #[instrument(skip(self, is_missing))]
212    pub(crate) fn sync_status(
213        &self,
214        start: usize,
215        end: usize,
216        is_missing: impl Fn(&T) -> bool,
217    ) -> ResourceSyncStatus {
218        let mut missing = 0;
219        let mut ranges = vec![];
220
221        // Iterate over all objects, finding ranges of consecutive present objects. In between each
222        // present range, we will interpolate a missing range.
223        let mut curr: Option<SyncStatusRange> = None;
224        for (i, obj) in self.iter().enumerate().skip(start) {
225            let Some(obj) = obj else {
226                tracing::debug!(i, "skipping placeholder object");
227                continue;
228            };
229            if is_missing(&obj) {
230                tracing::debug!(i, "skipping object which is considered missing");
231                continue;
232            }
233
234            let prev = if let Some(range) = &mut curr {
235                if i == range.end {
236                    // This object extends the current range of present objects.
237                    range.end += 1;
238                    continue;
239                }
240
241                // This object starts a new range of present objects. Save the previous range.
242                ranges.push(*range);
243                range.end
244            } else {
245                0
246            };
247
248            if i > prev {
249                // Insert a missing range between the previous present range and the newfound
250                // present object.
251                ranges.push(SyncStatusRange {
252                    start: prev,
253                    end: i,
254                    status: SyncStatus::Missing,
255                });
256                missing += i - prev;
257            }
258
259            // Start a new range of present objects.
260            curr = Some(SyncStatusRange {
261                start: i,
262                end: i + 1,
263                status: SyncStatus::Present,
264            });
265        }
266
267        // Save the last present range.
268        if let Some(range) = curr {
269            ranges.push(range);
270        }
271
272        // Insert a potential missing range at the end of the chain.
273        let prev = match ranges.last() {
274            Some(range) => range.end,
275            None => 0,
276        };
277        if prev < end {
278            ranges.push(SyncStatusRange {
279                start: prev,
280                end,
281                status: SyncStatus::Missing,
282            });
283            missing += end - prev;
284        }
285
286        ResourceSyncStatus { missing, ranges }
287    }
288}
289
290pub struct Iter<'a, T: Serialize + DeserializeOwned> {
291    index: usize,
292    cache_start: usize,
293    cache: &'a VecDeque<Option<T>>,
294    store: append_log::Iter<'a, BincodeLoadStore<Option<T>>>,
295}
296
297impl<T: Serialize + DeserializeOwned + Clone> Iterator for Iter<'_, T> {
298    type Item = Option<T>;
299
300    fn next(&mut self) -> Option<Self::Item> {
301        // False positive: clippy suggests `self.next()` instead of `self.nth(0)`, but that would be
302        // recursive.
303        #[allow(clippy::iter_nth_zero)]
304        self.nth(0)
305    }
306
307    fn size_hint(&self) -> (usize, Option<usize>) {
308        // Include objects in cache that haven't necessarily been committed to storage yet. This is
309        // consistent with `nth`, which will yield such objects.
310        let len = (self.cache_start + self.cache.len()).saturating_sub(self.index);
311        (len, Some(len))
312    }
313
314    fn nth(&mut self, n: usize) -> Option<Self::Item> {
315        self.index += n;
316        let res = if self.index >= self.cache_start {
317            // Get the object from cache if we can.
318            self.cache.get(self.index - self.cache_start).cloned()
319        } else {
320            // Otherwise load from storage.
321            self.store.nth(n).map(|res| {
322                if let Err(e) = &res {
323                    warn!("failed to load object at position {}: error {}", n, e);
324                }
325                // Both a failed load and a successful load of `None` are treated the same: as
326                // missing data, so we yield `None`. The latter case can happen if there was a
327                // previous failed load and we marked this entry as explicitly missing.
328                res.ok().flatten()
329            })
330        };
331
332        self.index += 1;
333        res
334    }
335
336    fn count(self) -> usize {
337        self.size_hint().0
338    }
339}
340
341impl<T: Serialize + DeserializeOwned + Clone> ExactSizeIterator for Iter<'_, T> {}
342
343#[cfg(test)]
344mod test {
345    use atomic_store::AtomicStore;
346    use tempfile::TempDir;
347
348    use super::*;
349
350    #[test_log::test(tokio::test(flavor = "multi_thread"))]
351    async fn test_ledger_log_creation() {
352        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
353
354        // Create and populuate a log.
355        {
356            let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
357            let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
358            let mut store = AtomicStore::open(loader).unwrap();
359            for i in 0..5 {
360                log.store_resource(Some(i)).unwrap();
361                log.commit_version().await.unwrap();
362                store.commit_version().unwrap();
363            }
364        }
365
366        // Load the log from storage and check that we get the correct contents.
367        {
368            let mut loader = AtomicStoreLoader::load(dir.path(), "test_ledger_log").unwrap();
369            let log = LedgerLog::<u64>::open(&mut loader, "ledger", 3).unwrap();
370            AtomicStore::open(loader).unwrap();
371            assert_eq!(
372                log.iter().collect::<Vec<_>>(),
373                (0..5).map(Some).collect::<Vec<_>>()
374            );
375        }
376    }
377
378    #[test_log::test(tokio::test(flavor = "multi_thread"))]
379    async fn test_ledger_log_insert() {
380        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
381        let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
382        let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
383        let mut store = AtomicStore::open(loader).unwrap();
384        assert_eq!(log.iter().collect::<Vec<_>>(), Vec::<Option<u64>>::new());
385
386        // Insert at end.
387        log.insert(0, 1).unwrap();
388        log.commit_version().await.unwrap();
389        store.commit_version().unwrap();
390        assert_eq!(log.iter().collect::<Vec<_>>(), vec![Some(1)]);
391
392        // Insert past end.
393        log.insert(4, 2).unwrap();
394        log.commit_version().await.unwrap();
395        store.commit_version().unwrap();
396        assert_eq!(
397            log.iter().collect::<Vec<_>>(),
398            vec![Some(1), None, None, None, Some(2)]
399        );
400
401        // Insert in middle (in cache).
402        log.insert(2, 3).unwrap();
403        log.commit_version().await.unwrap();
404        store.commit_version().unwrap();
405        // TODO re-enable this check once AppendLog supports random access updates.
406        // See https://github.com/EspressoSystems/hotshot-query-service/issues/16
407        // assert_eq!(
408        //     log.iter().collect::<Vec<_>>(),
409        //     vec![Some(1), None, Some(3), None, Some(2)]
410        // );
411
412        // Insert in middle (out of cache).
413        log.insert(1, 4).unwrap();
414        log.commit_version().await.unwrap();
415        store.commit_version().unwrap();
416        // TODO check results once AppendLog supports random access updates.
417        // See https://github.com/EspressoSystems/hotshot-query-service/issues/16
418    }
419
420    #[test_log::test(tokio::test(flavor = "multi_thread"))]
421    async fn test_ledger_log_iter() {
422        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
423        let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
424        let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
425        let mut store = AtomicStore::open(loader).unwrap();
426        for i in 0..5 {
427            log.store_resource(Some(i)).unwrap();
428            log.commit_version().await.unwrap();
429            store.commit_version().unwrap();
430        }
431
432        assert_eq!(log.iter().len(), 5);
433        for i in 0..5 {
434            let mut iter = log.iter();
435            assert_eq!(iter.nth(i as usize).unwrap(), Some(i), "{log:?}");
436
437            // `nth` should not only have returned the `n`th element, but also advanced the iterator.
438            assert_eq!(
439                iter.collect::<Vec<_>>(),
440                (i + 1..5).map(Some).collect::<Vec<_>>()
441            );
442        }
443        assert_eq!(log.iter().nth(5), None);
444    }
445}