hotshot_query_service/data_source/storage/
ledger_log.rs1#![cfg(feature = "file-system-data-source")]
14
15use std::{collections::VecDeque, fmt::Debug};
16
17use atomic_store::{
18 AppendLog, AtomicStoreLoader, PersistenceError, append_log, load_store::BincodeLoadStore,
19};
20use serde::{Serialize, de::DeserializeOwned};
21use tracing::{debug, instrument, warn};
22
23use crate::node::{ResourceSyncStatus, SyncStatus, SyncStatusRange};
24
25#[derive(Debug)]
27pub(crate) struct LedgerLog<T: Serialize + DeserializeOwned> {
28 cache_start: usize,
29 cache_size: usize,
30 cache: VecDeque<Option<T>>,
31 store: AppendLog<BincodeLoadStore<Option<T>>>,
32 pending_inserts: usize,
37 missing: usize,
39}
40
41impl<T: Serialize + DeserializeOwned + Clone> LedgerLog<T> {
42 pub(crate) fn create(
43 loader: &mut AtomicStoreLoader,
44 file_pattern: &str,
45 cache_size: usize,
46 ) -> Result<Self, PersistenceError> {
47 Ok(Self {
48 cache_start: 0,
49 cache_size,
50 cache: VecDeque::with_capacity(cache_size),
51 store: AppendLog::create(
52 loader,
53 Default::default(),
54 file_pattern,
55 10u64 << 20, )?,
57 pending_inserts: 0,
58 missing: 0,
59 })
60 }
61
62 pub(crate) fn open(
63 loader: &mut AtomicStoreLoader,
64 file_pattern: &str,
65 cache_size: usize,
66 ) -> Result<Self, PersistenceError> {
67 let store = AppendLog::load(
68 loader,
69 Default::default(),
70 file_pattern,
71 1u64 << 20, )?;
73 let len = store.iter().len();
74 tracing::info!("loading LedgerLog {}, len={}", file_pattern, len);
75
76 let cache_start = len.saturating_sub(cache_size);
77 let mut missing = 0;
78 let mut cache = store
79 .iter()
80 .skip(cache_start)
81 .map(|r| {
82 if let Err(e) = &r {
83 warn!("failed to load object. Error: {}", e);
84 }
85 let obj = r.ok().flatten();
88 if obj.is_none() {
89 missing += 1;
90 }
91 obj
92 })
93 .collect::<VecDeque<_>>();
94 cache.reserve_exact(cache_size - cache.len());
95
96 Ok(Self {
97 cache_start,
98 cache_size,
99 cache,
100 store,
101 pending_inserts: 0,
102 missing,
103 })
104 }
105
106 pub(crate) fn iter(&self) -> Iter<'_, T> {
107 Iter {
108 index: 0,
109 cache_start: self.cache_start,
110 cache: &self.cache,
111 store: self.store.iter(),
112 }
113 }
114
115 pub(crate) fn store_resource(&mut self, resource: Option<T>) -> Result<(), PersistenceError> {
116 let missing = resource.is_none();
117 self.store.store_resource(&resource)?;
118 self.pending_inserts += 1;
119 if missing {
120 self.missing += 1;
121 }
122 if self.cache.len() >= self.cache_size {
123 self.cache.pop_front();
124 self.cache_start += 1;
125 }
126 self.cache.push_back(resource);
127 Ok(())
128 }
129
130 pub(crate) fn insert(&mut self, index: usize, object: T) -> Result<bool, PersistenceError>
135 where
136 T: Debug,
137 {
138 let len = self.iter().len();
141 let target_len = std::cmp::max(index, len);
142 for i in len..target_len {
143 debug!("storing placeholders for position {i}/{target_len}");
144 if let Err(err) = self.store_resource(None) {
145 warn!("Failed to store placeholder: {}", err);
146 return Err(err);
147 }
148 }
149 assert!(target_len >= index);
150 if target_len == index {
151 if let Err(err) = self.store_resource(Some(object)) {
153 warn!("Failed to store object at index {}: {}", index, err);
154 return Err(err);
155 }
156 Ok(true)
157 } else if matches!(self.iter().nth(index), Some(Some(_))) {
158 Ok(false)
160 } else {
161 warn!(
166 index,
167 len, target_len, "skipping out-of-order object; random inserts not yet supported"
168 );
169
170 Ok(true)
179 }
180 }
181
182 pub(crate) async fn commit_version(&mut self) -> Result<(), PersistenceError> {
183 tracing::debug!("committing new version of LedgerLog");
184 self.store.commit_version()?;
185 self.pending_inserts = 0;
186 Ok(())
187 }
188
189 pub(crate) fn skip_version(&mut self) -> Result<(), PersistenceError> {
190 self.store.skip_version()
191 }
192
193 pub(crate) fn revert_version(&mut self) -> Result<(), PersistenceError> {
194 self.store.revert_version()?;
195
196 for _ in 0..self.pending_inserts {
198 self.cache.pop_back();
199 }
200
201 self.pending_inserts = 0;
202 Ok(())
203 }
204
205 #[instrument(skip(self, is_missing))]
212 pub(crate) fn sync_status(
213 &self,
214 start: usize,
215 end: usize,
216 is_missing: impl Fn(&T) -> bool,
217 ) -> ResourceSyncStatus {
218 let mut missing = 0;
219 let mut ranges = vec![];
220
221 let mut curr: Option<SyncStatusRange> = None;
224 for (i, obj) in self.iter().enumerate().skip(start) {
225 let Some(obj) = obj else {
226 tracing::debug!(i, "skipping placeholder object");
227 continue;
228 };
229 if is_missing(&obj) {
230 tracing::debug!(i, "skipping object which is considered missing");
231 continue;
232 }
233
234 let prev = if let Some(range) = &mut curr {
235 if i == range.end {
236 range.end += 1;
238 continue;
239 }
240
241 ranges.push(*range);
243 range.end
244 } else {
245 0
246 };
247
248 if i > prev {
249 ranges.push(SyncStatusRange {
252 start: prev,
253 end: i,
254 status: SyncStatus::Missing,
255 });
256 missing += i - prev;
257 }
258
259 curr = Some(SyncStatusRange {
261 start: i,
262 end: i + 1,
263 status: SyncStatus::Present,
264 });
265 }
266
267 if let Some(range) = curr {
269 ranges.push(range);
270 }
271
272 let prev = match ranges.last() {
274 Some(range) => range.end,
275 None => 0,
276 };
277 if prev < end {
278 ranges.push(SyncStatusRange {
279 start: prev,
280 end,
281 status: SyncStatus::Missing,
282 });
283 missing += end - prev;
284 }
285
286 ResourceSyncStatus { missing, ranges }
287 }
288}
289
290pub struct Iter<'a, T: Serialize + DeserializeOwned> {
291 index: usize,
292 cache_start: usize,
293 cache: &'a VecDeque<Option<T>>,
294 store: append_log::Iter<'a, BincodeLoadStore<Option<T>>>,
295}
296
297impl<T: Serialize + DeserializeOwned + Clone> Iterator for Iter<'_, T> {
298 type Item = Option<T>;
299
300 fn next(&mut self) -> Option<Self::Item> {
301 #[allow(clippy::iter_nth_zero)]
304 self.nth(0)
305 }
306
307 fn size_hint(&self) -> (usize, Option<usize>) {
308 let len = (self.cache_start + self.cache.len()).saturating_sub(self.index);
311 (len, Some(len))
312 }
313
314 fn nth(&mut self, n: usize) -> Option<Self::Item> {
315 self.index += n;
316 let res = if self.index >= self.cache_start {
317 self.cache.get(self.index - self.cache_start).cloned()
319 } else {
320 self.store.nth(n).map(|res| {
322 if let Err(e) = &res {
323 warn!("failed to load object at position {}: error {}", n, e);
324 }
325 res.ok().flatten()
329 })
330 };
331
332 self.index += 1;
333 res
334 }
335
336 fn count(self) -> usize {
337 self.size_hint().0
338 }
339}
340
341impl<T: Serialize + DeserializeOwned + Clone> ExactSizeIterator for Iter<'_, T> {}
342
343#[cfg(test)]
344mod test {
345 use atomic_store::AtomicStore;
346 use tempfile::TempDir;
347
348 use super::*;
349
350 #[test_log::test(tokio::test(flavor = "multi_thread"))]
351 async fn test_ledger_log_creation() {
352 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
353
354 {
356 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
357 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
358 let mut store = AtomicStore::open(loader).unwrap();
359 for i in 0..5 {
360 log.store_resource(Some(i)).unwrap();
361 log.commit_version().await.unwrap();
362 store.commit_version().unwrap();
363 }
364 }
365
366 {
368 let mut loader = AtomicStoreLoader::load(dir.path(), "test_ledger_log").unwrap();
369 let log = LedgerLog::<u64>::open(&mut loader, "ledger", 3).unwrap();
370 AtomicStore::open(loader).unwrap();
371 assert_eq!(
372 log.iter().collect::<Vec<_>>(),
373 (0..5).map(Some).collect::<Vec<_>>()
374 );
375 }
376 }
377
378 #[test_log::test(tokio::test(flavor = "multi_thread"))]
379 async fn test_ledger_log_insert() {
380 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
381 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
382 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
383 let mut store = AtomicStore::open(loader).unwrap();
384 assert_eq!(log.iter().collect::<Vec<_>>(), Vec::<Option<u64>>::new());
385
386 log.insert(0, 1).unwrap();
388 log.commit_version().await.unwrap();
389 store.commit_version().unwrap();
390 assert_eq!(log.iter().collect::<Vec<_>>(), vec![Some(1)]);
391
392 log.insert(4, 2).unwrap();
394 log.commit_version().await.unwrap();
395 store.commit_version().unwrap();
396 assert_eq!(
397 log.iter().collect::<Vec<_>>(),
398 vec![Some(1), None, None, None, Some(2)]
399 );
400
401 log.insert(2, 3).unwrap();
403 log.commit_version().await.unwrap();
404 store.commit_version().unwrap();
405 log.insert(1, 4).unwrap();
414 log.commit_version().await.unwrap();
415 store.commit_version().unwrap();
416 }
419
420 #[test_log::test(tokio::test(flavor = "multi_thread"))]
421 async fn test_ledger_log_iter() {
422 let dir = TempDir::with_prefix("test_ledger_log").unwrap();
423 let mut loader = AtomicStoreLoader::create(dir.path(), "test_ledger_log").unwrap();
424 let mut log = LedgerLog::<u64>::create(&mut loader, "ledger", 3).unwrap();
425 let mut store = AtomicStore::open(loader).unwrap();
426 for i in 0..5 {
427 log.store_resource(Some(i)).unwrap();
428 log.commit_version().await.unwrap();
429 store.commit_version().unwrap();
430 }
431
432 assert_eq!(log.iter().len(), 5);
433 for i in 0..5 {
434 let mut iter = log.iter();
435 assert_eq!(iter.nth(i as usize).unwrap(), Some(i), "{log:?}");
436
437 assert_eq!(
439 iter.collect::<Vec<_>>(),
440 (i + 1..5).map(Some).collect::<Vec<_>>()
441 );
442 }
443 assert_eq!(log.iter().nth(5), None);
444 }
445}