hotshot_query_service/data_source/storage/
ledger_log.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{collections::VecDeque, fmt::Debug};
16
17use atomic_store::{
18    append_log, load_store::BincodeLoadStore, AppendLog, AtomicStoreLoader, PersistenceError,
19};
20use serde::{de::DeserializeOwned, Serialize};
21use tracing::{debug, warn};
22
23/// A caching append log for ledger objects.
24#[derive(Debug)]
25pub(crate) struct LedgerLog<T: Serialize + DeserializeOwned> {
26    cache_start: usize,
27    cache_size: usize,
28    cache: VecDeque<Option<T>>,
29    store: AppendLog<BincodeLoadStore<Option<T>>>,
30    // Keep track of the number of appended objects which have not yet been committed. We need this
31    // to detect when we are inserting at the end of the log or in the middle, as the two casese are
32    // handled differently and `self.store.iter().len()` does not update until a new version is
33    // committed.
34    pending_inserts: usize,
35    // Track the number of missing objects, for health reporting.
36    missing: usize,
37}
38
39impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
40    pub(crate) fn create(
41        loader: &mut AtomicStoreLoader,
42        file_pattern: &str,
43        cache_size: usize,
44    ) -> Result<Self, PersistenceError> {
45        Ok(Self {
46            cache_start: 0,
47            cache_size,
48            cache: VecDeque::with_capacity(cache_size),
49            store: AppendLog::create(
50                loader,
51                Default::default(),
52                file_pattern,
53                10u64 << 20, // 10 MB
54            )?,
55            pending_inserts: 0,
56            missing: 0,
57        })
58    }
59
60    pub(crate) fn open(
61        loader: &mut AtomicStoreLoader,
62        file_pattern: &str,
63        cache_size: usize,
64    ) -> Result<Self, PersistenceError> {
65        let store = AppendLog::load(
66            loader,
67            Default::default(),
68            file_pattern,
69            1u64 << 20, // 1 MB
70        )?;
71        let len = store.iter().len();
72        tracing::info!("loading LedgerLog {}, len={}", file_pattern, len);
73
74        let cache_start = len.saturating_sub(cache_size);
75        let mut missing = 0;
76        let mut cache = store
77            .iter()
78            .skip(cache_start)
79            .map(|r| {
80                if let Err(e) = &r {
81                    warn!("failed to load object. Error: {}", e);
82                }
83                // We treat missing objects and failed-to-load objects the same:
84                // if we failed to load a object, it is now missing!
85                let obj = r.ok().flatten();
86                if obj.is_none() {
87                    missing += 1;
88                }
89                obj
90            })
91            .collect::<VecDeque<_>>();
92        cache.reserve_exact(cache_size - cache.len());
93
94        Ok(Self {
95            cache_start,
96            cache_size,
97            cache,
98            store,
99            pending_inserts: 0,
100            missing,
101        })
102    }
103
104    pub(crate) fn iter(&self) -> Iter<T> {
105        Iter {
106            index: 0,
107            cache_start: self.cache_start,
108            cache: &self.cache,
109            store: self.store.iter(),
110        }
111    }
112
113    pub(crate) fn store_resource(&mut self, resource: Option<T>) -> Result<(), PersistenceError> {
114        let missing = resource.is_none();
115        self.store.store_resource(&resource)?;
116        self.pending_inserts += 1;
117        if missing {
118            self.missing += 1;
119        }
120        if self.cache.len() >= self.cache_size {
121            self.cache.pop_front();
122            self.cache_start += 1;
123        }
124        self.cache.push_back(resource);
125        Ok(())
126    }
127
128    /// Insert an object at position `index`.
129    ///
130    /// Returns whether the object was newly inserted; that is, returns `false` if and only if there
131    /// was already an object present at this index.
132    pub(crate) fn insert(&mut self, index: usize, object: T) -> Result<bool, PersistenceError>
133    where
134        T: Debug,
135    {
136        // If there are missing objects between what we currently have and `object`, pad with
137        // placeholders.
138        let len = self.iter().len();
139        let target_len = std::cmp::max(index, len);
140        for i in len..target_len {
141            debug!("storing placeholders for position {i}/{target_len}");
142            if let Err(err) = self.store_resource(None) {
143                warn!("Failed to store placeholder: {}", err);
144                return Err(err);
145            }
146        }
147        assert!(target_len >= index);
148        if target_len == index {
149            // This is the next object in the chain, append it to the log.
150            if let Err(err) = self.store_resource(Some(object)) {
151                warn!("Failed to store object at index {}: {}", index, err);
152                return Err(err);
153            }
154            Ok(true)
155        } else if matches!(self.iter().nth(index), Some(Some(_))) {
156            // This is a duplicate, we don't have to insert anything.
157            Ok(false)
158        } else {
159            // This is an object earlier in the chain that we are now receiving asynchronously.
160            // Update the placeholder with the actual contents of the object.
161            // TODO update persistent storage once AppendLog supports updates.
162            // See: https://github.com/EspressoSystems/hotshot-query-service/issues/16
163            warn!(
164                index,
165                len, target_len, "skipping out-of-order object; random inserts not yet supported"
166            );
167
168            // TODO Update the object in cache if necessary. Note that we could do this now, even
169            // without support for storing the object persistently. But this makes the cache out of
170            // sync with persistent storage, and it means we have an object available that will
171            // become unavailable once it drops out of cache, which is not really what we want.
172            // See: https://github.com/EspressoSystems/hotshot-query-service/issues/16
173            // if index >= self.cache_start {
174            //     self.cache[index - self.cache_start] = Some(object);
175            // }
176            Ok(true)
177        }
178    }
179
180    pub(crate) async fn commit_version(&mut self) -> Result<(), PersistenceError> {
181        tracing::debug!("committing new version of LedgerLog");
182        self.store.commit_version()?;
183        self.pending_inserts = 0;
184        Ok(())
185    }
186
187    pub(crate) fn skip_version(&mut self) -> Result<(), PersistenceError> {
188        self.store.skip_version()
189    }
190
191    pub(crate) fn revert_version(&mut self) -> Result<(), PersistenceError> {
192        self.store.revert_version()?;
193
194        // Remove objects which were inserted in cache but not committed to storage.
195        for _ in 0..self.pending_inserts {
196            self.cache.pop_back();
197        }
198
199        self.pending_inserts = 0;
200        Ok(())
201    }
202
203    pub(crate) fn missing(&self, to_height: usize) -> usize {
204        // The number of missing objects is the number missing from the sequence we currently have,
205        // plus any extra objects at the end if this sequence is shorter than `to_height`.
206        self.missing + to_height.saturating_sub(self.iter().len())
207    }
208}
209
210pub struct Iter<'a, T: Serialize + DeserializeOwned> {
211    index: usize,
212    cache_start: usize,
213    cache: &'a VecDeque<Option<T>>,
214    store: append_log::Iter<'a, BincodeLoadStore<Option<T>>>,
215}
216
217impl<T: Serialize + DeserializeOwned + Clone> Iterator for Iter<'_, T> {
218    type Item = Option<T>;
219
220    fn next(&mut self) -> Option<Self::Item> {
221        // False positive: clippy suggests `self.next()` instead of `self.nth(0)`, but that would be
222        // recursive.
223        #[allow(clippy::iter_nth_zero)]
224        self.nth(0)
225    }
226
227    fn size_hint(&self) -> (usize, Option<usize>) {
228        // Include objects in cache that haven't necessarily been committed to storage yet. This is
229        // consistent with `nth`, which will yield such objects.
230        let len = (self.cache_start + self.cache.len()).saturating_sub(self.index);
231        (len, Some(len))
232    }
233
234    fn nth(&mut self, n: usize) -> Option<Self::Item> {
235        self.index += n;
236        let res = if self.index >= self.cache_start {
237            // Get the object from cache if we can.
238            self.cache.get(self.index - self.cache_start).cloned()
239        } else {
240            // Otherwise load from storage.
241            self.store.nth(n).map(|res| {
242                if let Err(e) = &res {
243                    warn!("failed to load object at position {}: error {}", n, e);
244                }
245                // Both a failed load and a successful load of `None` are treated the same: as
246                // missing data, so we yield `None`. The latter case can happen if there was a
247                // previous failed load and we marked this entry as explicitly missing.
248                res.ok().flatten()
249            })
250        };
251
252        self.index += 1;
253        res
254    }
255
256    fn count(self) -> usize {
257        self.size_hint().0
258    }
259}
260
261impl<T: Serialize + DeserializeOwned + Clone> ExactSizeIterator for Iter<'_, T> {}
262
263#[cfg(test)]
264mod test {
265    use atomic_store::AtomicStore;
266    use tempfile::TempDir;
267
268    use super::*;
269    use crate::testing::setup_test;
270
271    #[tokio::test(flavor = "multi_thread")]
272    async fn test_ledger_log_creation() {
273        setup_test();
274
275        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
276
277        // Create and populuate a log.
278        {
279            let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
280            let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
281            let mut store = AtomicStore::open(loader).unwrap();
282            for i in 0..5 {
283                log.store_resource(Some(i)).unwrap();
284                log.commit_version().await.unwrap();
285                store.commit_version().unwrap();
286            }
287        }
288
289        // Load the log from storage and check that we get the correct contents.
290        {
291            let mut loader = AtomicStoreLoader::load(dir.path(), "test_ledger_log").unwrap();
292            let log = LedgerLog::<u64>::open(&mut loader, "ledger", 3).unwrap();
293            AtomicStore::open(loader).unwrap();
294            assert_eq!(
295                log.iter().collect::<Vec<_>>(),
296                (0..5).map(Some).collect::<Vec<_>>()
297            );
298        }
299    }
300
301    #[tokio::test(flavor = "multi_thread")]
302    async fn test_ledger_log_insert() {
303        setup_test();
304
305        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
306        let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
307        let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
308        let mut store = AtomicStore::open(loader).unwrap();
309        assert_eq!(log.iter().collect::<Vec<_>>(), Vec::<Option<u64>>::new());
310
311        // Insert at end.
312        log.insert(0, 1).unwrap();
313        log.commit_version().await.unwrap();
314        store.commit_version().unwrap();
315        assert_eq!(log.iter().collect::<Vec<_>>(), vec![Some(1)]);
316
317        // Insert past end.
318        log.insert(4, 2).unwrap();
319        log.commit_version().await.unwrap();
320        store.commit_version().unwrap();
321        assert_eq!(
322            log.iter().collect::<Vec<_>>(),
323            vec![Some(1), None, None, None, Some(2)]
324        );
325
326        // Insert in middle (in cache).
327        log.insert(2, 3).unwrap();
328        log.commit_version().await.unwrap();
329        store.commit_version().unwrap();
330        // TODO re-enable this check once AppendLog supports random access updates.
331        // See https://github.com/EspressoSystems/hotshot-query-service/issues/16
332        // assert_eq!(
333        //     log.iter().collect::<Vec<_>>(),
334        //     vec![Some(1), None, Some(3), None, Some(2)]
335        // );
336
337        // Insert in middle (out of cache).
338        log.insert(1, 4).unwrap();
339        log.commit_version().await.unwrap();
340        store.commit_version().unwrap();
341        // TODO check results once AppendLog supports random access updates.
342        // See https://github.com/EspressoSystems/hotshot-query-service/issues/16
343    }
344
345    #[tokio::test(flavor = "multi_thread")]
346    async fn test_ledger_log_iter() {
347        setup_test();
348
349        let dir = TempDir::with_prefix("test_ledger_log").unwrap();
350        let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
351        let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
352        let mut store = AtomicStore::open(loader).unwrap();
353        for i in 0..5 {
354            log.store_resource(Some(i)).unwrap();
355            log.commit_version().await.unwrap();
356            store.commit_version().unwrap();
357        }
358
359        assert_eq!(log.iter().len(), 5);
360        for i in 0..5 {
361            let mut iter = log.iter();
362            assert_eq!(iter.nth(i as usize).unwrap(), Some(i), "{log:?}");
363
364            // `nth` should not only have returned the `n`th element, but also advanced the iterator.
365            assert_eq!(
366                iter.collect::<Vec<_>>(),
367                (i + 1..5).map(Some).collect::<Vec<_>>()
368            );
369        }
370        assert_eq!(log.iter().nth(5), None);
371    }
372}