hotshot_query_service/data_source/storage/
ledger_log.rs1#![cfg(feature = "file-system-data-source")]
14
15use std::{collections::VecDeque, fmt::Debug};
16
17use atomic_store::{
18 append_log, load_store::BincodeLoadStore, AppendLog, AtomicStoreLoader, PersistenceError,
19};
20use serde::{de::DeserializeOwned, Serialize};
21use tracing::{debug, warn};
22
23#[derive(Debug)]
25pub(crate) struct LedgerLog<T: Serialize + DeserializeOwned> {
26 cache_start: usize,
27 cache_size: usize,
28 cache: VecDeque<Option<T>>,
29 store: AppendLog<BincodeLoadStore<Option<T>>>,
30 pending_inserts: usize,
35 missing: usize,
37}
38
39impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
40 pub(crate) fn create(
41 loader: &mut AtomicStoreLoader,
42 file_pattern: &str,
43 cache_size: usize,
44 ) -> Result<Self, PersistenceError> {
45 Ok(Self {
46 cache_start: 0,
47 cache_size,
48 cache: VecDeque::with_capacity(cache_size),
49 store: AppendLog::create(
50 loader,
51 Default::default(),
52 file_pattern,
53 10u64 << 20, )?,
55 pending_inserts: 0,
56 missing: 0,
57 })
58 }
59
60 pub(crate) fn open(
61 loader: &mut AtomicStoreLoader,
62 file_pattern: &str,
63 cache_size: usize,
64 ) -> Result<Self, PersistenceError> {
65 let store = AppendLog::load(
66 loader,
67 Default::default(),
68 file_pattern,
69 1u64 << 20, )?;
71 let len = store.iter().len();
72 tracing::info!("loading LedgerLog {}, len={}", file_pattern, len);
73
74 let cache_start = len.saturating_sub(cache_size);
75 let mut missing = 0;
76 let mut cache = store
77 .iter()
78 .skip(cache_start)
79 .map(|r| {
80 if let Err(e) = &r {
81 warn!("failed to load object. Error: {}", e);
82 }
83 let obj = r.ok().flatten();
86 if obj.is_none() {
87 missing += 1;
88 }
89 obj
90 })
91 .collect::<VecDeque<_>>();
92 cache.reserve_exact(cache_size - cache.len());
93
94 Ok(Self {
95 cache_start,
96 cache_size,
97 cache,
98 store,
99 pending_inserts: 0,
100 missing,
101 })
102 }
103
104 pub(crate) fn iter(&self) -> Iter<T> {
105 Iter {
106 index: 0,
107 cache_start: self.cache_start,
108 cache: &self.cache,
109 store: self.store.iter(),
110 }
111 }
112
113 pub(crate) fn store_resource(&mut self, resource: Option<T>) -> Result<(), PersistenceError> {
114 let missing = resource.is_none();
115 self.store.store_resource(&resource)?;
116 self.pending_inserts += 1;
117 if missing {
118 self.missing += 1;
119 }
120 if self.cache.len() >= self.cache_size {
121 self.cache.pop_front();
122 self.cache_start += 1;
123 }
124 self.cache.push_back(resource);
125 Ok(())
126 }
127
128 pub(crate) fn insert(&mut self, index: usize, object: T) -> Result<bool, PersistenceError>
133 where
134 T: Debug,
135 {
136 let len = self.iter().len();
139 let target_len = std::cmp::max(index, len);
140 for i in len..target_len {
141 debug!("storing placeholders for position {i}/{target_len}");
142 if let Err(err) = self.store_resource(None) {
143 warn!("Failed to store placeholder: {}", err);
144 return Err(err);
145 }
146 }
147 assert!(target_len >= index);
148 if target_len == index {
149 if let Err(err) = self.store_resource(Some(object)) {
151 warn!("Failed to store object at index {}: {}", index, err);
152 return Err(err);
153 }
154 Ok(true)
155 } else if matches!(self.iter().nth(index), Some(Some(_))) {
156 Ok(false)
158 } else {
159 warn!(
164 index,
165 len, target_len, "skipping out-of-order object; random inserts not yet supported"
166 );
167
168 Ok(true)
177 }
178 }
179
180 pub(crate) async fn commit_version(&mut self) -> Result<(), PersistenceError> {
181 tracing::debug!("committing new version of LedgerLog");
182 self.store.commit_version()?;
183 self.pending_inserts = 0;
184 Ok(())
185 }
186
187 pub(crate) fn skip_version(&mut self) -> Result<(), PersistenceError> {
188 self.store.skip_version()
189 }
190
191 pub(crate) fn revert_version(&mut self) -> Result<(), PersistenceError> {
192 self.store.revert_version()?;
193
194 for _ in 0..self.pending_inserts {
196 self.cache.pop_back();
197 }
198
199 self.pending_inserts = 0;
200 Ok(())
201 }
202
203 pub(crate) fn missing(&self, to_height: usize) -> usize {
204 self.missing + to_height.saturating_sub(self.iter().len())
207 }
208}
209
210pub struct Iter<'a, T: Serialize + DeserializeOwned> {
211 index: usize,
212 cache_start: usize,
213 cache: &'a VecDeque<Option<T>>,
214 store: append_log::Iter<'a, BincodeLoadStore<Option<T>>>,
215}
216
217impl<T: Serialize + DeserializeOwned + Clone> Iterator for Iter<'_, T> {
218 type Item = Option<T>;
219
220 fn next(&mut self) -> Option<Self::Item> {
221 #[allow(clippy::iter_nth_zero)]
224 self.nth(0)
225 }
226
227 fn size_hint(&self) -> (usize, Option<usize>) {
228 let len = (self.cache_start + self.cache.len()).saturating_sub(self.index);
231 (len, Some(len))
232 }
233
234 fn nth(&mut self, n: usize) -> Option<Self::Item> {
235 self.index += n;
236 let res = if self.index >= self.cache_start {
237 self.cache.get(self.index - self.cache_start).cloned()
239 } else {
240 self.store.nth(n).map(|res| {
242 if let Err(e) = &res {
243 warn!("failed to load object at position {}: error {}", n, e);
244 }
245 res.ok().flatten()
249 })
250 };
251
252 self.index += 1;
253 res
254 }
255
256 fn count(self) -> usize {
257 self.size_hint().0
258 }
259}
260
261impl<T: Serialize + DeserializeOwned + Clone> ExactSizeIterator for Iter<'_, T> {}
262
263#[cfg(test)]
264mod test {
265 use atomic_store::AtomicStore;
266 use tempfile::TempDir;
267
268 use super::*;
269 use crate::testing::setup_test;
270
271 #[tokio::test(flavor = "multi_thread")]
272 async fn test_ledger_log_creation() {
273 setup_test();
274
275 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
276
277 {
279 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
280 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
281 let mut store = AtomicStore::open(loader).unwrap();
282 for i in 0..5 {
283 log.store_resource(Some(i)).unwrap();
284 log.commit_version().await.unwrap();
285 store.commit_version().unwrap();
286 }
287 }
288
289 {
291 let mut loader = AtomicStoreLoader::load(dir.path(), "test_ledger_log").unwrap();
292 let log = LedgerLog::<u64>::open(&mut loader, "ledger", 3).unwrap();
293 AtomicStore::open(loader).unwrap();
294 assert_eq!(
295 log.iter().collect::<Vec<_>>(),
296 (0..5).map(Some).collect::<Vec<_>>()
297 );
298 }
299 }
300
301 #[tokio::test(flavor = "multi_thread")]
302 async fn test_ledger_log_insert() {
303 setup_test();
304
305 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
306 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
307 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
308 let mut store = AtomicStore::open(loader).unwrap();
309 assert_eq!(log.iter().collect::<Vec<_>>(), Vec::<Option<u64>>::new());
310
311 log.insert(0, 1).unwrap();
313 log.commit_version().await.unwrap();
314 store.commit_version().unwrap();
315 assert_eq!(log.iter().collect::<Vec<_>>(), vec![Some(1)]);
316
317 log.insert(4, 2).unwrap();
319 log.commit_version().await.unwrap();
320 store.commit_version().unwrap();
321 assert_eq!(
322 log.iter().collect::<Vec<_>>(),
323 vec![Some(1), None, None, None, Some(2)]
324 );
325
326 log.insert(2, 3).unwrap();
328 log.commit_version().await.unwrap();
329 store.commit_version().unwrap();
330 log.insert(1, 4).unwrap();
339 log.commit_version().await.unwrap();
340 store.commit_version().unwrap();
341 }
344
345 #[tokio::test(flavor = "multi_thread")]
346 async fn test_ledger_log_iter() {
347 setup_test();
348
349 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
350 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
351 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
352 let mut store = AtomicStore::open(loader).unwrap();
353 for i in 0..5 {
354 log.store_resource(Some(i)).unwrap();
355 log.commit_version().await.unwrap();
356 store.commit_version().unwrap();
357 }
358
359 assert_eq!(log.iter().len(), 5);
360 for i in 0..5 {
361 let mut iter = log.iter();
362 assert_eq!(iter.nth(i as usize).unwrap(), Some(i), "{log:?}");
363
364 assert_eq!(
366 iter.collect::<Vec<_>>(),
367 (i + 1..5).map(Some).collect::<Vec<_>>()
368 );
369 }
370 assert_eq!(log.iter().nth(5), None);
371 }
372}