sequencer/persistence/
fs.rs

1use std::{
2    collections::{BTreeMap, HashSet},
3    fs::{self, File, OpenOptions},
4    io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
5    ops::RangeInclusive,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Instant,
9};
10
11use anyhow::{anyhow, Context};
12use async_lock::RwLock;
13use async_trait::async_trait;
14use clap::Parser;
15use espresso_types::{
16    traits::{EventsPersistenceRead, MembershipPersistence},
17    v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
18    v0_3::{EventKey, IndexedStake, RewardAmount, StakeTableEvent},
19    Leaf, Leaf2, NetworkConfig, Payload, SeqTypes, StakeTableHash, ValidatorMap,
20};
21use hotshot::InitializerEpochInfo;
22use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
23    DhtPersistentStorage, SerializableRecord,
24};
25use hotshot_types::{
26    data::{
27        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
28        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
29        QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare,
30    },
31    drb::{DrbInput, DrbResult},
32    event::{Event, EventType, HotShotAction, LeafInfo},
33    message::{convert_proposal, Proposal},
34    simple_certificate::{
35        LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
36        NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
37    },
38    traits::{
39        block_contents::{BlockHeader, BlockPayload},
40        metrics::Metrics,
41        node_implementation::{ConsensusTime, NodeType},
42    },
43    vote::HasViewNumber,
44};
45use itertools::Itertools;
46
47use crate::{
48    persistence::persistence_metrics::PersistenceMetricsValue, ViewNumber,
49    RECENT_STAKE_TABLES_LIMIT,
50};
51
52/// Options for file system backed persistence.
53#[derive(Parser, Clone, Debug)]
54pub struct Options {
55    /// Storage path for persistent data.
56    #[clap(long, env = "ESPRESSO_SEQUENCER_STORAGE_PATH")]
57    path: PathBuf,
58
59    /// Number of views to retain in consensus storage before data that hasn't been archived is
60    /// garbage collected.
61    ///
62    /// The longer this is, the more certain that all data will eventually be archived, even if
63    /// there are temporary problems with archive storage or partially missing data. This can be set
64    /// very large, as most data is garbage collected as soon as it is finalized by consensus. This
65    /// setting only applies to views which never get decided (ie forks in consensus) and views for
66    /// which this node is partially offline. These should be exceptionally rare.
67    ///
68    /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average
69    /// view time of 2s.
70    #[clap(
71        long,
72        env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION",
73        default_value = "130000"
74    )]
75    pub(crate) consensus_view_retention: u64,
76}
77
78impl Default for Options {
79    fn default() -> Self {
80        Self::parse_from(std::iter::empty::<String>())
81    }
82}
83
84impl Options {
85    pub fn new(path: PathBuf) -> Self {
86        Self {
87            path,
88            consensus_view_retention: 130000,
89        }
90    }
91
92    pub(crate) fn path(&self) -> &Path {
93        &self.path
94    }
95}
96
97#[async_trait]
98impl PersistenceOptions for Options {
99    type Persistence = Persistence;
100
101    fn set_view_retention(&mut self, view_retention: u64) {
102        self.consensus_view_retention = view_retention;
103    }
104
105    async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
106        let path = self.path.clone();
107        let view_retention = self.consensus_view_retention;
108
109        let migration_path = path.join("migration");
110        let migrated = if migration_path.is_file() {
111            let bytes = fs::read(&migration_path).context(format!(
112                "unable to read migration from {}",
113                migration_path.display()
114            ))?;
115            bincode::deserialize(&bytes).context("malformed migration file")?
116        } else {
117            HashSet::new()
118        };
119
120        Ok(Persistence {
121            inner: Arc::new(RwLock::new(Inner {
122                path,
123                migrated,
124                view_retention,
125            })),
126            metrics: Arc::new(PersistenceMetricsValue::default()),
127        })
128    }
129
130    async fn reset(self) -> anyhow::Result<()> {
131        todo!()
132    }
133}
134
135/// File system backed persistence.
136#[derive(Clone, Debug)]
137pub struct Persistence {
138    // We enforce mutual exclusion on access to the data source, as the current file system
139    // implementation does not support transaction isolation for concurrent reads and writes. We can
140    // improve this in the future by switching to a SQLite-based file system implementation.
141    inner: Arc<RwLock<Inner>>,
142    /// A reference to the metrics trait
143    metrics: Arc<PersistenceMetricsValue>,
144}
145
146#[derive(Debug)]
147struct Inner {
148    path: PathBuf,
149    view_retention: u64,
150    migrated: HashSet<String>,
151}
152
153impl Inner {
154    fn config_path(&self) -> PathBuf {
155        self.path.join("hotshot.cfg")
156    }
157
158    fn migration(&self) -> PathBuf {
159        self.path.join("migration")
160    }
161
162    fn voted_view_path(&self) -> PathBuf {
163        self.path.join("highest_voted_view")
164    }
165
166    fn restart_view_path(&self) -> PathBuf {
167        self.path.join("restart_view")
168    }
169
170    /// Path to a directory containing decided leaves.
171    fn decided_leaf_path(&self) -> PathBuf {
172        self.path.join("decided_leaves")
173    }
174
175    fn decided_leaf2_path(&self) -> PathBuf {
176        self.path.join("decided_leaves2")
177    }
178
179    /// The path from previous versions where there was only a single file for anchor leaves.
180    fn legacy_anchor_leaf_path(&self) -> PathBuf {
181        self.path.join("anchor_leaf")
182    }
183
184    fn vid_dir_path(&self) -> PathBuf {
185        self.path.join("vid")
186    }
187
188    fn vid2_dir_path(&self) -> PathBuf {
189        self.path.join("vid2")
190    }
191
192    fn da_dir_path(&self) -> PathBuf {
193        self.path.join("da")
194    }
195
196    fn drb_dir_path(&self) -> PathBuf {
197        self.path.join("drb")
198    }
199
200    fn da2_dir_path(&self) -> PathBuf {
201        self.path.join("da2")
202    }
203
204    fn quorum_proposals_dir_path(&self) -> PathBuf {
205        self.path.join("quorum_proposals")
206    }
207
208    fn quorum_proposals2_dir_path(&self) -> PathBuf {
209        self.path.join("quorum_proposals2")
210    }
211
212    fn upgrade_certificate_dir_path(&self) -> PathBuf {
213        self.path.join("upgrade_certificate")
214    }
215
216    fn stake_table_dir_path(&self) -> PathBuf {
217        self.path.join("stake_table")
218    }
219
220    fn next_epoch_qc(&self) -> PathBuf {
221        self.path.join("next_epoch_quorum_certificate")
222    }
223
224    fn libp2p_dht_path(&self) -> PathBuf {
225        self.path.join("libp2p_dht")
226    }
227    fn epoch_drb_result_dir_path(&self) -> PathBuf {
228        self.path.join("epoch_drb_result")
229    }
230
231    fn epoch_root_block_header_dir_path(&self) -> PathBuf {
232        self.path.join("epoch_root_block_header")
233    }
234
235    fn finalized_state_cert_dir_path(&self) -> PathBuf {
236        self.path.join("finalized_state_cert")
237    }
238
239    fn state_cert_dir_path(&self) -> PathBuf {
240        self.path.join("state_cert")
241    }
242
243    fn update_migration(&mut self) -> anyhow::Result<()> {
244        let path = self.migration();
245        let bytes = bincode::serialize(&self.migrated)?;
246
247        self.replace(
248            &path,
249            |_| Ok(true),
250            |mut file| {
251                file.write_all(&bytes)?;
252                Ok(())
253            },
254        )
255    }
256
257    /// Overwrite a file if a condition is met.
258    ///
259    /// The file at `path`, if it exists, is opened in read mode and passed to `pred`. If `pred`
260    /// returns `true`, or if there was no existing file, then `write` is called to update the
261    /// contents of the file. `write` receives a truncated file open in write mode and sets the
262    /// contents of the file.
263    ///
264    /// The final replacement of the original file is atomic; that is, `path` will be modified only
265    /// if the entire update succeeds.
266    fn replace(
267        &mut self,
268        path: &Path,
269        pred: impl FnOnce(File) -> anyhow::Result<bool>,
270        write: impl FnOnce(File) -> anyhow::Result<()>,
271    ) -> anyhow::Result<()> {
272        if path.is_file() {
273            // If there is an existing file, check if it is suitable to replace. Note that this
274            // check is not atomic with respect to the subsequent write at the file system level,
275            // but this object is the only one which writes to this file, and we have a mutable
276            // reference, so this should be safe.
277            if !pred(File::open(path)?)? {
278                // If we are not overwriting the file, we are done and consider the whole operation
279                // successful.
280                return Ok(());
281            }
282        }
283
284        // Either there is no existing file or we have decided to overwrite the file. Write the new
285        // contents into a temporary file so we can update `path` atomically using `rename`.
286        let mut swap_path = path.to_owned();
287        swap_path.set_extension("swp");
288        let swap = OpenOptions::new()
289            .write(true)
290            .truncate(true)
291            .create(true)
292            .open(&swap_path)?;
293        write(swap)?;
294
295        // Now we can replace the original file.
296        fs::rename(swap_path, path)?;
297
298        Ok(())
299    }
300
301    fn collect_garbage(
302        &mut self,
303        decided_view: ViewNumber,
304        prune_intervals: &[RangeInclusive<ViewNumber>],
305    ) -> anyhow::Result<()> {
306        let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
307
308        self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?;
309        self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?;
310        self.prune_files(
311            self.quorum_proposals2_dir_path(),
312            prune_view,
313            None,
314            prune_intervals,
315        )?;
316        self.prune_files(
317            self.state_cert_dir_path(),
318            prune_view,
319            None,
320            prune_intervals,
321        )?;
322
323        // Save the most recent leaf as it will be our anchor point if the node restarts.
324        self.prune_files(
325            self.decided_leaf2_path(),
326            prune_view,
327            Some(decided_view),
328            prune_intervals,
329        )?;
330
331        Ok(())
332    }
333
334    fn prune_files(
335        &mut self,
336        dir_path: PathBuf,
337        prune_view: ViewNumber,
338        keep_decided_view: Option<ViewNumber>,
339        prune_intervals: &[RangeInclusive<ViewNumber>],
340    ) -> anyhow::Result<()> {
341        if !dir_path.is_dir() {
342            return Ok(());
343        }
344
345        for (file_view, path) in view_files(dir_path)? {
346            // If the view is the anchor view, keep it no matter what.
347            if let Some(decided_view) = keep_decided_view {
348                if decided_view == file_view {
349                    continue;
350                }
351            }
352            // Otherwise, delete it if it is time to prune this view _or_ if the given intervals,
353            // which we've already successfully processed, contain the view; in this case we simply
354            // don't need it anymore.
355            if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
356                fs::remove_file(&path)?;
357            }
358        }
359
360        Ok(())
361    }
362
363    /// Generate events based on persisted decided leaves.
364    ///
365    /// Returns a list of closed intervals of views which can be safely deleted, as all leaves
366    /// within these view ranges have been processed by the event consumer.
367    async fn generate_decide_events(
368        &self,
369        view: ViewNumber,
370        consumer: &impl EventConsumer,
371    ) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
372        // Generate a decide event for each leaf, to be processed by the event consumer. We make a
373        // separate event for each leaf because it is possible we have non-consecutive leaves in our
374        // storage, which would not be valid as a single decide with a single leaf chain.
375        let mut leaves = BTreeMap::new();
376        for (v, path) in view_files(self.decided_leaf2_path())? {
377            if v > view {
378                continue;
379            }
380
381            let bytes =
382                fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
383            let (mut leaf, qc) =
384                bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(&bytes)
385                    .context(format!("parsing decided leaf {}", path.display()))?;
386
387            // Include the VID share if available.
388            let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data);
389            if vid_share.is_none() {
390                tracing::debug!(?v, "VID share not available at decide");
391            }
392
393            // Move the state cert to the finalized dir if it exists.
394            let state_cert = self.finalized_state_cert(v)?;
395
396            // Fill in the full block payload using the DA proposals we had persisted.
397            if let Some(proposal) = self.load_da_proposal(v)? {
398                let payload = Payload::from_bytes(
399                    &proposal.data.encoded_transactions,
400                    &proposal.data.metadata,
401                );
402                leaf.fill_block_payload_unchecked(payload);
403            } else {
404                tracing::debug!(?v, "DA proposal not available at decide");
405            }
406
407            let info = LeafInfo {
408                leaf,
409                vid_share,
410                state_cert,
411                // Note: the following fields are not used in Decide event processing, and should be
412                // removed. For now, we just default them.
413                state: Default::default(),
414                delta: Default::default(),
415            };
416
417            leaves.insert(v, (info, qc));
418        }
419
420        // The invariant is that the oldest existing leaf in the `anchor_leaf` table -- if there is
421        // one -- was always included in the _previous_ decide event...but not removed from the
422        // database, because we always persist the most recent anchor leaf.
423        if let Some((oldest_view, _)) = leaves.first_key_value() {
424            // The only exception is when the oldest leaf is the genesis leaf; then there was no
425            // previous decide event.
426            if *oldest_view > ViewNumber::genesis() {
427                leaves.pop_first();
428            }
429        }
430
431        let mut intervals = vec![];
432        let mut current_interval = None;
433        for (view, (leaf, qc)) in leaves {
434            let height = leaf.leaf.block_header().block_number();
435            consumer
436                .handle_event(&Event {
437                    view_number: view,
438                    event: EventType::Decide {
439                        qc: Arc::new(qc),
440                        leaf_chain: Arc::new(vec![leaf]),
441                        block_size: None,
442                    },
443                })
444                .await?;
445            if let Some((start, end, current_height)) = current_interval.as_mut() {
446                if height == *current_height + 1 {
447                    // If we have a chain of consecutive leaves, extend the current interval of
448                    // views which are safe to delete.
449                    *current_height += 1;
450                    *end = view;
451                } else {
452                    // Otherwise, end the current interval and start a new one.
453                    intervals.push(*start..=*end);
454                    current_interval = Some((view, view, height));
455                }
456            } else {
457                // Start a new interval.
458                current_interval = Some((view, view, height));
459            }
460        }
461        if let Some((start, end, _)) = current_interval {
462            intervals.push(start..=end);
463        }
464
465        Ok(intervals)
466    }
467
468    fn load_da_proposal(
469        &self,
470        view: ViewNumber,
471    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
472        let dir_path = self.da2_dir_path();
473
474        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
475
476        if !file_path.exists() {
477            return Ok(None);
478        }
479
480        let da_bytes = fs::read(file_path)?;
481
482        let da_proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
483            bincode::deserialize(&da_bytes)?;
484        Ok(Some(da_proposal))
485    }
486
487    fn load_vid_share(
488        &self,
489        view: ViewNumber,
490    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
491        let dir_path = self.vid2_dir_path();
492
493        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
494
495        if !file_path.exists() {
496            return Ok(None);
497        }
498
499        let vid_share_bytes = fs::read(file_path)?;
500        let vid_share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
501            bincode::deserialize(&vid_share_bytes)?;
502        Ok(Some(vid_share))
503    }
504
505    fn load_anchor_leaf(&self) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
506        tracing::info!("Checking `Leaf2` to load the anchor leaf.");
507        if self.decided_leaf2_path().is_dir() {
508            let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
509
510            // Return the latest decided leaf.
511            for (_, path) in view_files(self.decided_leaf2_path())? {
512                let bytes =
513                    fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
514                let (leaf2, qc2) =
515                    bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(&bytes)
516                        .context(format!("parsing decided leaf {}", path.display()))?;
517                if let Some((anchor_leaf, _)) = &anchor {
518                    if leaf2.view_number() > anchor_leaf.view_number() {
519                        anchor = Some((leaf2, qc2));
520                    }
521                } else {
522                    anchor = Some((leaf2, qc2));
523                }
524            }
525
526            return Ok(anchor);
527        }
528
529        tracing::warn!(
530            "Failed to find an anchor leaf in `Leaf2` storage. Checking legacy `Leaf` storage. \
531             This is very likely to fail."
532        );
533        if self.legacy_anchor_leaf_path().is_file() {
534            // We may have an old version of storage, where there is just a single file for the
535            // anchor leaf. Read it and return the contents.
536            let mut file = BufReader::new(File::open(self.legacy_anchor_leaf_path())?);
537
538            // The first 8 bytes just contain the height of the leaf. We can skip this.
539            file.seek(SeekFrom::Start(8)).context("seek")?;
540            let bytes = file
541                .bytes()
542                .collect::<Result<Vec<_>, _>>()
543                .context("read")?;
544            return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?));
545        }
546
547        Ok(None)
548    }
549
550    fn finalized_state_cert(
551        &self,
552        view: ViewNumber,
553    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
554        let dir_path = self.state_cert_dir_path();
555        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
556
557        if !file_path.exists() {
558            return Ok(None);
559        }
560
561        let bytes = fs::read(&file_path)?;
562
563        let state_cert: LightClientStateUpdateCertificateV2<SeqTypes> =
564            bincode::deserialize(&bytes).or_else(|err_v2| {
565                tracing::info!(
566                    error = %err_v2,
567                    path = %file_path.display(),
568                    "Failed to deserialize state certificate, attempting with v1"
569                );
570
571                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
572                    .map(Into::into)
573                    .with_context(|| {
574                        format!(
575                            "Failed to deserialize with both v2 and v1 from file '{}'. error: \
576                             {err_v2}",
577                            file_path.display()
578                        )
579                    })
580            })?;
581
582        let epoch = state_cert.epoch.u64();
583        let finalized_dir_path = self.finalized_state_cert_dir_path();
584        fs::create_dir_all(&finalized_dir_path).context("creating finalized state cert dir")?;
585
586        let finalized_file_path = finalized_dir_path
587            .join(epoch.to_string())
588            .with_extension("txt");
589
590        fs::write(&finalized_file_path, &bytes).context(format!(
591            "finalizing light client state update certificate file for epoch {epoch:?}"
592        ))?;
593
594        Ok(Some(state_cert))
595    }
596}
597
598#[async_trait]
599impl SequencerPersistence for Persistence {
600    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
601        let inner = self.inner.read().await;
602        let path = inner.config_path();
603        if !path.is_file() {
604            tracing::info!("config not found at {}", path.display());
605            return Ok(None);
606        }
607        tracing::info!("loading config from {}", path.display());
608
609        let bytes =
610            fs::read(&path).context(format!("unable to read config from {}", path.display()))?;
611        let json = serde_json::from_slice(&bytes).context("config file is not valid JSON")?;
612        let json = migrate_network_config(json).context("migration of network config failed")?;
613        let config = serde_json::from_value(json).context("malformed config file")?;
614        Ok(Some(config))
615    }
616
617    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
618        let inner = self.inner.write().await;
619        let path = inner.config_path();
620        tracing::info!("saving config to {}", path.display());
621        Ok(cfg.to_file(path.display().to_string())?)
622    }
623
624    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
625        let inner = self.inner.read().await;
626        let path = inner.voted_view_path();
627        if !path.is_file() {
628            return Ok(None);
629        }
630        let bytes = fs::read(inner.voted_view_path())?
631            .try_into()
632            .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
633        Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
634    }
635
636    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
637        let inner = self.inner.read().await;
638        let path = inner.restart_view_path();
639        if !path.is_file() {
640            return Ok(None);
641        }
642        let bytes = fs::read(path)?
643            .try_into()
644            .map_err(|bytes| anyhow!("malformed restart view file: {bytes:?}"))?;
645        Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
646    }
647
648    async fn append_decided_leaves(
649        &self,
650        view: ViewNumber,
651        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
652        consumer: &impl EventConsumer,
653    ) -> anyhow::Result<()> {
654        let mut inner = self.inner.write().await;
655        let path = inner.decided_leaf2_path();
656
657        // Ensure the anchor leaf directory exists.
658        fs::create_dir_all(&path).context("creating anchor leaf directory")?;
659
660        // Earlier versions stored only a single decided leaf in a regular file. If our storage is
661        // still on this version, migrate to a directory structure storing (possibly) many leaves.
662        let legacy_path = inner.legacy_anchor_leaf_path();
663        if !path.is_dir() && legacy_path.is_file() {
664            tracing::info!("migrating to multi-leaf storage");
665
666            // Move the existing data into the new directory.
667            let (leaf, qc) = inner
668                .load_anchor_leaf()?
669                .context("anchor leaf file exists but unable to load contents")?;
670            let view = leaf.view_number().u64();
671            let bytes = bincode::serialize(&(leaf, qc))?;
672            let new_file = path.join(view.to_string()).with_extension("txt");
673            fs::write(new_file, bytes).context(format!("writing anchor leaf file {view}"))?;
674
675            // Now we can remove the old file.
676            fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?;
677        }
678
679        for (info, qc2) in leaf_chain {
680            let view = info.leaf.view_number().u64();
681            let file_path = path.join(view.to_string()).with_extension("txt");
682            inner.replace(
683                &file_path,
684                |_| {
685                    // Don't overwrite an existing leaf, but warn about it as this is likely not
686                    // intended behavior from HotShot.
687                    tracing::warn!(view, "duplicate decided leaf");
688                    Ok(false)
689                },
690                |mut file| {
691                    let bytes = bincode::serialize(&(&info.leaf.clone(), qc2))?;
692                    file.write_all(&bytes)?;
693                    Ok(())
694                },
695            )?;
696        }
697
698        match inner.generate_decide_events(view, consumer).await {
699            Err(err) => {
700                // Event processing failure is not an error, since by this point we have at least
701                // managed to persist the decided leaves successfully, and the event processing will
702                // just run again at the next decide.
703                tracing::warn!(?view, "event processing failed: {err:#}");
704            },
705            Ok(intervals) => {
706                if let Err(err) = inner.collect_garbage(view, &intervals) {
707                    // Similarly, garbage collection is not an error. We have done everything we
708                    // strictly needed to do, and GC will run again at the next decide. Log the
709                    // error but do not return it.
710                    tracing::warn!(?view, "GC failed: {err:#}");
711                }
712            },
713        }
714
715        Ok(())
716    }
717
718    async fn load_anchor_leaf(
719        &self,
720    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
721        self.inner.read().await.load_anchor_leaf()
722    }
723
724    async fn load_da_proposal(
725        &self,
726        view: ViewNumber,
727    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
728        self.inner.read().await.load_da_proposal(view)
729    }
730
731    async fn load_vid_share(
732        &self,
733        view: ViewNumber,
734    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
735        self.inner.read().await.load_vid_share(view)
736    }
737
738    async fn append_vid(
739        &self,
740        proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
741    ) -> anyhow::Result<()> {
742        let mut inner = self.inner.write().await;
743        let view_number = proposal.data.view_number().u64();
744        let dir_path = inner.vid2_dir_path();
745
746        fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
747
748        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
749        inner.replace(
750            &file_path,
751            |_| {
752                // Don't overwrite an existing share, but warn about it as this is likely not intended
753                // behavior from HotShot.
754                tracing::warn!(view_number, "duplicate VID share");
755                Ok(false)
756            },
757            |mut file| {
758                let proposal: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
759                    convert_proposal(proposal.clone());
760                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
761                let now = Instant::now();
762                file.write_all(&proposal_bytes)?;
763                self.metrics
764                    .internal_append_vid_duration
765                    .add_point(now.elapsed().as_secs_f64());
766                Ok(())
767            },
768        )
769    }
770    async fn append_vid2(
771        &self,
772        proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
773    ) -> anyhow::Result<()> {
774        let mut inner = self.inner.write().await;
775        let view_number = proposal.data.view_number().u64();
776
777        let dir_path = inner.vid2_dir_path();
778
779        fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
780
781        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
782
783        inner.replace(
784            &file_path,
785            |_| {
786                // Don't overwrite an existing share, but warn about it as this is likely not intended
787                // behavior from HotShot.
788                tracing::warn!(view_number, "duplicate VID share");
789                Ok(false)
790            },
791            |mut file| {
792                let proposal: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
793                    convert_proposal(proposal.clone());
794                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
795                let now = Instant::now();
796                file.write_all(&proposal_bytes)?;
797                self.metrics
798                    .internal_append_vid2_duration
799                    .add_point(now.elapsed().as_secs_f64());
800                Ok(())
801            },
802        )
803    }
804    async fn append_da(
805        &self,
806        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
807        _vid_commit: VidCommitment,
808    ) -> anyhow::Result<()> {
809        let mut inner = self.inner.write().await;
810        let view_number = proposal.data.view_number().u64();
811        let dir_path = inner.da_dir_path();
812
813        fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
814
815        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
816        inner.replace(
817            &file_path,
818            |_| {
819                // Don't overwrite an existing proposal, but warn about it as this is likely not
820                // intended behavior from HotShot.
821                tracing::warn!(view_number, "duplicate DA proposal");
822                Ok(false)
823            },
824            |mut file| {
825                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
826                let now = Instant::now();
827                file.write_all(&proposal_bytes)?;
828                self.metrics
829                    .internal_append_da_duration
830                    .add_point(now.elapsed().as_secs_f64());
831                Ok(())
832            },
833        )
834    }
835    async fn record_action(
836        &self,
837        view: ViewNumber,
838        _epoch: Option<EpochNumber>,
839        action: HotShotAction,
840    ) -> anyhow::Result<()> {
841        // Todo Remove this after https://github.com/EspressoSystems/espresso-sequencer/issues/1931
842        if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
843            return Ok(());
844        }
845        let mut inner = self.inner.write().await;
846        let path = &inner.voted_view_path();
847        inner.replace(
848            path,
849            |mut file| {
850                let mut bytes = vec![];
851                file.read_to_end(&mut bytes)?;
852                let bytes = bytes
853                    .try_into()
854                    .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
855                let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
856
857                // Overwrite the file if the saved view is older than the new view.
858                Ok(saved_view < view)
859            },
860            |mut file| {
861                file.write_all(&view.u64().to_le_bytes())?;
862                Ok(())
863            },
864        )?;
865
866        if matches!(action, HotShotAction::Vote) {
867            let restart_view_path = &inner.restart_view_path();
868            let restart_view = view + 1;
869            inner.replace(
870                restart_view_path,
871                |mut file| {
872                    let mut bytes = vec![];
873                    file.read_to_end(&mut bytes)?;
874                    let bytes = bytes
875                        .try_into()
876                        .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
877                    let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
878
879                    // Overwrite the file if the saved view is older than the new view.
880                    Ok(saved_view < restart_view)
881                },
882                |mut file| {
883                    file.write_all(&restart_view.u64().to_le_bytes())?;
884                    Ok(())
885                },
886            )?;
887        }
888        Ok(())
889    }
890
891    async fn append_quorum_proposal2(
892        &self,
893        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
894    ) -> anyhow::Result<()> {
895        let mut inner = self.inner.write().await;
896        let view_number = proposal.data.view_number().u64();
897        let dir_path = inner.quorum_proposals2_dir_path();
898
899        fs::create_dir_all(dir_path.clone()).context("failed to create proposals dir")?;
900
901        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
902        inner.replace(
903            &file_path,
904            |_| {
905                // Always overwrite the previous file
906                Ok(true)
907            },
908            |mut file| {
909                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
910                let now = Instant::now();
911                file.write_all(&proposal_bytes)?;
912                self.metrics
913                    .internal_append_quorum2_duration
914                    .add_point(now.elapsed().as_secs_f64());
915                Ok(())
916            },
917        )
918    }
919    async fn load_quorum_proposals(
920        &self,
921    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
922    {
923        let inner = self.inner.read().await;
924
925        // First, get the proposal directory.
926        let dir_path = inner.quorum_proposals2_dir_path();
927        if !dir_path.is_dir() {
928            return Ok(Default::default());
929        }
930
931        // Read quorum proposals from every data file in this directory.
932        let mut map = BTreeMap::new();
933        for (view, path) in view_files(&dir_path)? {
934            let proposal_bytes = fs::read(path)?;
935            let Some(proposal) = bincode::deserialize::<
936                Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
937            >(&proposal_bytes)
938            .or_else(|error| {
939                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
940                    &proposal_bytes,
941                )
942                .map(convert_proposal)
943                .inspect_err(|err_v3| {
944                    // At this point, if the file contents are invalid, it is most likely an
945                    // error rather than a miscellaneous file somehow ending up in the
946                    // directory. However, we continue on, because it is better to collect as
947                    // many proposals as we can rather than letting one bad proposal cause the
948                    // entire operation to fail, and it is still possible that this was just
949                    // some unintended file whose name happened to match the naming convention.
950
951                    tracing::warn!(
952                        ?view,
953                        %error,
954                        error_v3 = %err_v3,
955                        "ignoring malformed quorum proposal file"
956                    );
957                })
958            })
959            .ok() else {
960                continue;
961            };
962
963            let proposal2 = convert_proposal(proposal);
964
965            // Push to the map and we're done.
966            map.insert(view, proposal2);
967        }
968
969        Ok(map)
970    }
971
972    async fn load_quorum_proposal(
973        &self,
974        view: ViewNumber,
975    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
976        let inner = self.inner.read().await;
977        let dir_path = inner.quorum_proposals2_dir_path();
978        let file_path = dir_path.join(view.to_string()).with_extension("txt");
979        let bytes = fs::read(file_path)?;
980        let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
981            bincode::deserialize(&bytes).or_else(|error| {
982                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
983                    &bytes,
984                )
985                .map(convert_proposal)
986                .context(format!(
987                    "Failed to deserialize quorum proposal for view {view:?}: {error}."
988                ))
989            })?;
990        Ok(proposal)
991    }
992
993    async fn load_upgrade_certificate(
994        &self,
995    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
996        let inner = self.inner.read().await;
997        let path = inner.upgrade_certificate_dir_path();
998        if !path.is_file() {
999            return Ok(None);
1000        }
1001        let bytes = fs::read(&path).context("read")?;
1002        Ok(Some(
1003            bincode::deserialize(&bytes).context("deserialize upgrade certificate")?,
1004        ))
1005    }
1006
1007    async fn store_upgrade_certificate(
1008        &self,
1009        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1010    ) -> anyhow::Result<()> {
1011        let mut inner = self.inner.write().await;
1012        let path = &inner.upgrade_certificate_dir_path();
1013        let certificate = match decided_upgrade_certificate {
1014            Some(cert) => cert,
1015            None => return Ok(()),
1016        };
1017        inner.replace(
1018            path,
1019            |_| {
1020                // Always overwrite the previous file.
1021                Ok(true)
1022            },
1023            |mut file| {
1024                let bytes =
1025                    bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1026                file.write_all(&bytes)?;
1027                Ok(())
1028            },
1029        )
1030    }
1031
1032    async fn store_next_epoch_quorum_certificate(
1033        &self,
1034        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1035    ) -> anyhow::Result<()> {
1036        let mut inner = self.inner.write().await;
1037        let path = &inner.next_epoch_qc();
1038
1039        inner.replace(
1040            path,
1041            |_| {
1042                // Always overwrite the previous file.
1043                Ok(true)
1044            },
1045            |mut file| {
1046                let bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1047                file.write_all(&bytes)?;
1048                Ok(())
1049            },
1050        )
1051    }
1052
1053    async fn load_next_epoch_quorum_certificate(
1054        &self,
1055    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1056        let inner = self.inner.read().await;
1057        let path = inner.next_epoch_qc();
1058        if !path.is_file() {
1059            return Ok(None);
1060        }
1061        let bytes = fs::read(&path).context("read")?;
1062        Ok(Some(
1063            bincode::deserialize(&bytes).context("deserialize next epoch qc")?,
1064        ))
1065    }
1066
1067    async fn append_da2(
1068        &self,
1069        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1070        _vid_commit: VidCommitment,
1071    ) -> anyhow::Result<()> {
1072        let mut inner = self.inner.write().await;
1073        let view_number = proposal.data.view_number().u64();
1074        let dir_path = inner.da2_dir_path();
1075
1076        fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
1077
1078        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
1079        inner.replace(
1080            &file_path,
1081            |_| {
1082                // Don't overwrite an existing proposal, but warn about it as this is likely not
1083                // intended behavior from HotShot.
1084                tracing::warn!(view_number, "duplicate DA proposal");
1085                Ok(false)
1086            },
1087            |mut file| {
1088                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1089                let now = Instant::now();
1090                file.write_all(&proposal_bytes)?;
1091                self.metrics
1092                    .internal_append_da2_duration
1093                    .add_point(now.elapsed().as_secs_f64());
1094                Ok(())
1095            },
1096        )
1097    }
1098
1099    async fn append_proposal2(
1100        &self,
1101        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1102    ) -> anyhow::Result<()> {
1103        self.append_quorum_proposal2(proposal).await
1104    }
1105
1106    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1107        let mut inner = self.inner.write().await;
1108
1109        if inner.migrated.contains("anchor_leaf") {
1110            tracing::info!("decided leaves already migrated");
1111            return Ok(());
1112        }
1113
1114        let new_leaf_dir = inner.decided_leaf2_path();
1115
1116        fs::create_dir_all(new_leaf_dir.clone()).context("failed to create anchor leaf 2  dir")?;
1117
1118        let old_leaf_dir = inner.decided_leaf_path();
1119        if !old_leaf_dir.is_dir() {
1120            return Ok(());
1121        }
1122
1123        tracing::warn!("migrating decided leaves..");
1124        for entry in fs::read_dir(old_leaf_dir)? {
1125            let entry = entry?;
1126            let path = entry.path();
1127
1128            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1129                continue;
1130            };
1131            let Ok(view) = file.parse::<u64>() else {
1132                continue;
1133            };
1134
1135            let bytes =
1136                fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
1137            let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
1138                .context(format!("parsing decided leaf {}", path.display()))?;
1139
1140            let leaf2: Leaf2 = leaf.into();
1141            let qc2 = qc.to_qc2();
1142
1143            let new_leaf_path = new_leaf_dir.join(view.to_string()).with_extension("txt");
1144
1145            inner.replace(
1146                &new_leaf_path,
1147                |_| {
1148                    tracing::warn!(view, "duplicate decided leaf");
1149                    Ok(false)
1150                },
1151                |mut file| {
1152                    let bytes = bincode::serialize(&(&leaf2.clone(), qc2))?;
1153                    file.write_all(&bytes)?;
1154                    Ok(())
1155                },
1156            )?;
1157
1158            if view % 100 == 0 {
1159                tracing::info!(view, "decided leaves migration progress");
1160            }
1161        }
1162
1163        inner.migrated.insert("anchor_leaf".to_string());
1164        inner.update_migration()?;
1165        tracing::warn!("successfully migrated decided leaves");
1166        Ok(())
1167    }
1168    async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1169        let mut inner = self.inner.write().await;
1170
1171        if inner.migrated.contains("da_proposal") {
1172            tracing::info!("da proposals already migrated");
1173            return Ok(());
1174        }
1175
1176        let new_da_dir = inner.da2_dir_path();
1177
1178        fs::create_dir_all(new_da_dir.clone()).context("failed to create da proposals 2 dir")?;
1179
1180        let old_da_dir = inner.da_dir_path();
1181        if !old_da_dir.is_dir() {
1182            return Ok(());
1183        }
1184
1185        tracing::warn!("migrating da proposals..");
1186
1187        for entry in fs::read_dir(old_da_dir)? {
1188            let entry = entry?;
1189            let path = entry.path();
1190
1191            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1192                continue;
1193            };
1194            let Ok(view) = file.parse::<u64>() else {
1195                continue;
1196            };
1197
1198            let bytes =
1199                fs::read(&path).context(format!("reading da proposal {}", path.display()))?;
1200            let proposal = bincode::deserialize::<Proposal<SeqTypes, DaProposal<SeqTypes>>>(&bytes)
1201                .context(format!("parsing da proposal {}", path.display()))?;
1202
1203            let new_da_path = new_da_dir.join(view.to_string()).with_extension("txt");
1204
1205            let proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> = convert_proposal(proposal);
1206
1207            inner.replace(
1208                &new_da_path,
1209                |_| {
1210                    tracing::warn!(view, "duplicate DA proposal 2");
1211                    Ok(false)
1212                },
1213                |mut file| {
1214                    let bytes = bincode::serialize(&proposal2)?;
1215                    file.write_all(&bytes)?;
1216                    Ok(())
1217                },
1218            )?;
1219
1220            if view % 100 == 0 {
1221                tracing::info!(view, "DA proposals migration progress");
1222            }
1223        }
1224
1225        inner.migrated.insert("da_proposal".to_string());
1226        inner.update_migration()?;
1227        tracing::warn!("successfully migrated da proposals");
1228        Ok(())
1229    }
1230    async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1231        let mut inner = self.inner.write().await;
1232
1233        if inner.migrated.contains("vid_share") {
1234            tracing::info!("vid shares already migrated");
1235            return Ok(());
1236        }
1237
1238        let new_vid_dir = inner.vid2_dir_path();
1239
1240        fs::create_dir_all(new_vid_dir.clone()).context("failed to create vid shares 2 dir")?;
1241
1242        let old_vid_dir = inner.vid_dir_path();
1243        if !old_vid_dir.is_dir() {
1244            return Ok(());
1245        }
1246
1247        tracing::warn!("migrating vid shares..");
1248
1249        for entry in fs::read_dir(old_vid_dir)? {
1250            let entry = entry?;
1251            let path = entry.path();
1252
1253            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1254                continue;
1255            };
1256            let Ok(view) = file.parse::<u64>() else {
1257                continue;
1258            };
1259
1260            let bytes = fs::read(&path).context(format!("reading vid share {}", path.display()))?;
1261            let proposal =
1262                bincode::deserialize::<Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>>(&bytes)
1263                    .context(format!("parsing vid share {}", path.display()))?;
1264
1265            let new_vid_path = new_vid_dir.join(view.to_string()).with_extension("txt");
1266
1267            let proposal2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1268                convert_proposal(proposal);
1269
1270            inner.replace(
1271                &new_vid_path,
1272                |_| {
1273                    tracing::warn!(view, "duplicate VID share ");
1274                    Ok(false)
1275                },
1276                |mut file| {
1277                    let bytes = bincode::serialize(&proposal2)?;
1278                    file.write_all(&bytes)?;
1279                    Ok(())
1280                },
1281            )?;
1282
1283            if view % 100 == 0 {
1284                tracing::info!(view, "VID shares migration progress");
1285            }
1286        }
1287
1288        inner.migrated.insert("vid_share".to_string());
1289        inner.update_migration()?;
1290        tracing::warn!("successfully migrated vid shares");
1291        Ok(())
1292    }
1293
1294    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1295        let mut inner = self.inner.write().await;
1296
1297        if inner.migrated.contains("quorum_proposals") {
1298            tracing::info!("quorum proposals already migrated");
1299            return Ok(());
1300        }
1301
1302        let new_quorum_proposals_dir = inner.quorum_proposals2_dir_path();
1303
1304        fs::create_dir_all(new_quorum_proposals_dir.clone())
1305            .context("failed to create quorum proposals 2 dir")?;
1306
1307        let old_quorum_proposals_dir = inner.quorum_proposals_dir_path();
1308        if !old_quorum_proposals_dir.is_dir() {
1309            tracing::info!("no existing quorum proposals found for migration");
1310            return Ok(());
1311        }
1312
1313        tracing::warn!("migrating quorum proposals..");
1314        for entry in fs::read_dir(old_quorum_proposals_dir)? {
1315            let entry = entry?;
1316            let path = entry.path();
1317
1318            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1319                continue;
1320            };
1321            let Ok(view) = file.parse::<u64>() else {
1322                continue;
1323            };
1324
1325            let bytes =
1326                fs::read(&path).context(format!("reading quorum proposal {}", path.display()))?;
1327            let proposal =
1328                bincode::deserialize::<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>(&bytes)
1329                    .context(format!("parsing quorum proposal {}", path.display()))?;
1330
1331            let new_file_path = new_quorum_proposals_dir
1332                .join(view.to_string())
1333                .with_extension("txt");
1334
1335            let proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1336                convert_proposal(proposal);
1337
1338            inner.replace(
1339                &new_file_path,
1340                |_| {
1341                    tracing::warn!(view, "duplicate Quorum proposal2 ");
1342                    Ok(false)
1343                },
1344                |mut file| {
1345                    let bytes = bincode::serialize(&proposal2)?;
1346                    file.write_all(&bytes)?;
1347                    Ok(())
1348                },
1349            )?;
1350
1351            if view % 100 == 0 {
1352                tracing::info!(view, "Quorum proposals migration progress");
1353            }
1354        }
1355
1356        inner.migrated.insert("quorum_proposals".to_string());
1357        inner.update_migration()?;
1358        tracing::warn!("successfully migrated quorum proposals");
1359        Ok(())
1360    }
1361    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1362        Ok(())
1363    }
1364
1365    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1366        if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
1367            if loaded_drb_input.iteration >= drb_input.iteration {
1368                anyhow::bail!(
1369                    "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
1370                    loaded_drb_input,
1371                    drb_input
1372                )
1373            }
1374        }
1375
1376        let inner = self.inner.write().await;
1377        let dir_path = inner.drb_dir_path();
1378
1379        fs::create_dir_all(dir_path.clone()).context("failed to create drb dir")?;
1380
1381        let drb_input_bytes =
1382            bincode::serialize(&drb_input).context("failed to serialize drb_input")?;
1383
1384        let file_path = dir_path
1385            .join(drb_input.epoch.to_string())
1386            .with_extension("bin");
1387        fs::write(&file_path, drb_input_bytes).context(format!(
1388            "writing epoch drb_input file for epoch {:?} at {:?}",
1389            drb_input.epoch, file_path
1390        ))
1391    }
1392
1393    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1394        let inner = self.inner.read().await;
1395        let path = &inner.drb_dir_path();
1396        let file_path = path.join(epoch.to_string()).with_extension("bin");
1397        let bytes = fs::read(&file_path).context("read")?;
1398        Ok(bincode::deserialize(&bytes)
1399            .context(format!("failed to deserialize DrbInput for epoch {epoch}"))?)
1400    }
1401
1402    async fn store_drb_result(
1403        &self,
1404        epoch: EpochNumber,
1405        drb_result: DrbResult,
1406    ) -> anyhow::Result<()> {
1407        let inner = self.inner.write().await;
1408        let dir_path = inner.epoch_drb_result_dir_path();
1409
1410        fs::create_dir_all(dir_path.clone()).context("failed to create epoch drb result dir")?;
1411
1412        let drb_result_bytes = bincode::serialize(&drb_result).context("serialize drb result")?;
1413
1414        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1415        fs::write(file_path, drb_result_bytes)
1416            .context(format!("writing epoch drb result file for epoch {epoch:?}"))?;
1417
1418        Ok(())
1419    }
1420
1421    async fn store_epoch_root(
1422        &self,
1423        epoch: EpochNumber,
1424        block_header: <SeqTypes as NodeType>::BlockHeader,
1425    ) -> anyhow::Result<()> {
1426        let inner = self.inner.write().await;
1427        let dir_path = inner.epoch_root_block_header_dir_path();
1428
1429        fs::create_dir_all(dir_path.clone())
1430            .context("failed to create epoch root block header dir")?;
1431
1432        let block_header_bytes =
1433            bincode::serialize(&block_header).context("serialize block header")?;
1434
1435        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1436        fs::write(file_path, block_header_bytes).context(format!(
1437            "writing epoch root block header file for epoch {epoch:?}"
1438        ))?;
1439
1440        Ok(())
1441    }
1442
1443    async fn add_state_cert(
1444        &self,
1445        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1446    ) -> anyhow::Result<()> {
1447        let inner = self.inner.write().await;
1448        // let epoch = state_cert.epoch;
1449        let view = state_cert.light_client_state.view_number;
1450        let dir_path = inner.state_cert_dir_path();
1451
1452        fs::create_dir_all(dir_path.clone())
1453            .context("failed to create light client state update certificate dir")?;
1454
1455        let bytes = bincode::serialize(&state_cert)
1456            .context("serialize light client state update certificate")?;
1457
1458        let file_path = dir_path.join(view.to_string()).with_extension("txt");
1459        fs::write(file_path, bytes).context(format!(
1460            "writing light client state update certificate file for view {view:?}"
1461        ))?;
1462
1463        Ok(())
1464    }
1465
1466    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
1467        let inner = self.inner.read().await;
1468        let drb_dir_path = inner.epoch_drb_result_dir_path();
1469        let block_header_dir_path = inner.epoch_root_block_header_dir_path();
1470
1471        let mut result = Vec::new();
1472
1473        if drb_dir_path.is_dir() {
1474            for (epoch, path) in epoch_files(drb_dir_path)? {
1475                let bytes = fs::read(&path)
1476                    .context(format!("reading epoch drb result {}", path.display()))?;
1477                let drb_result = bincode::deserialize::<DrbResult>(&bytes)
1478                    .context(format!("parsing epoch drb result {}", path.display()))?;
1479
1480                let block_header_path = block_header_dir_path
1481                    .join(epoch.to_string())
1482                    .with_extension("txt");
1483                let block_header = if block_header_path.is_file() {
1484                    let bytes = fs::read(&block_header_path).context(format!(
1485                        "reading epoch root block header {}",
1486                        block_header_path.display()
1487                    ))?;
1488                    Some(
1489                        bincode::deserialize::<<SeqTypes as NodeType>::BlockHeader>(&bytes)
1490                            .context(format!(
1491                                "parsing epoch root block header {}",
1492                                block_header_path.display()
1493                            ))?,
1494                    )
1495                } else {
1496                    None
1497                };
1498
1499                result.push(InitializerEpochInfo::<SeqTypes> {
1500                    epoch,
1501                    drb_result,
1502                    block_header,
1503                });
1504            }
1505        }
1506
1507        result.sort_by(|a, b| a.epoch.cmp(&b.epoch));
1508
1509        // Keep only the most recent epochs
1510        let start = result
1511            .len()
1512            .saturating_sub(RECENT_STAKE_TABLES_LIMIT as usize);
1513        let recent = result[start..].to_vec();
1514
1515        Ok(recent)
1516    }
1517
1518    async fn load_state_cert(
1519        &self,
1520    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1521        let inner = self.inner.read().await;
1522        let dir_path = inner.finalized_state_cert_dir_path();
1523
1524        if !dir_path.is_dir() {
1525            return Ok(None);
1526        }
1527
1528        let mut result: Option<LightClientStateUpdateCertificateV2<SeqTypes>> = None;
1529
1530        for (epoch, path) in epoch_files(dir_path)? {
1531            if result.as_ref().is_some_and(|cert| epoch <= cert.epoch) {
1532                continue;
1533            }
1534            let bytes = fs::read(&path).context(format!(
1535                "reading light client state update certificate {}",
1536                path.display()
1537            ))?;
1538            let cert =
1539                bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1540                    .or_else(|error| {
1541                        tracing::info!(
1542                            %error,
1543                            path = %path.display(),
1544                            "Failed to deserialize LightClientStateUpdateCertificateV2"
1545                        );
1546
1547                        bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(
1548                            &bytes,
1549                        )
1550                        .map(Into::into)
1551                        .with_context(|| {
1552                            format!(
1553                                "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1554                                path.display()
1555                            )
1556                        })
1557                    })?;
1558
1559            result = Some(cert);
1560        }
1561
1562        Ok(result)
1563    }
1564
1565    fn enable_metrics(&mut self, _metrics: &dyn Metrics) {
1566        // todo!()
1567    }
1568}
1569
1570#[async_trait]
1571impl MembershipPersistence for Persistence {
1572    async fn load_stake(
1573        &self,
1574        epoch: EpochNumber,
1575    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>> {
1576        let inner = self.inner.read().await;
1577        let path = &inner.stake_table_dir_path();
1578        let file_path = path.join(epoch.to_string()).with_extension("txt");
1579
1580        if !file_path.exists() {
1581            return Ok(None);
1582        }
1583
1584        let bytes = fs::read(&file_path).with_context(|| {
1585            format!("failed to read stake table file at {}", file_path.display())
1586        })?;
1587
1588        let stake = match bincode::deserialize(&bytes) {
1589            Ok(res) => res,
1590            Err(err) => {
1591                let map = bincode::deserialize::<ValidatorMap>(&bytes).with_context(|| {
1592                    format!(
1593                        "fallback deserialization of legacy stake table at {} failed after \
1594                         initial error: {}",
1595                        file_path.display(),
1596                        err
1597                    )
1598                })?;
1599                (map, None, None)
1600            },
1601        };
1602
1603        Ok(Some(stake))
1604    }
1605
1606    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
1607        let limit = limit as usize;
1608        let inner = self.inner.read().await;
1609        let path = &inner.stake_table_dir_path();
1610        let sorted_files = epoch_files(&path)?
1611            .sorted_by(|(e1, _), (e2, _)| e2.cmp(e1))
1612            .take(limit);
1613        let mut validator_sets: Vec<IndexedStake> = Vec::new();
1614
1615        for (epoch, file_path) in sorted_files {
1616            let bytes = fs::read(&file_path).with_context(|| {
1617                format!("failed to read stake table file at {}", file_path.display())
1618            })?;
1619
1620            let stake: (ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>) =
1621                match bincode::deserialize::<(
1622                    ValidatorMap,
1623                    Option<RewardAmount>,
1624                    Option<StakeTableHash>,
1625                )>(&bytes)
1626                {
1627                    Ok(res) => res,
1628                    Err(err) => {
1629                        let validatormap = bincode::deserialize::<ValidatorMap>(&bytes)
1630                            .with_context(|| {
1631                                format!(
1632                                    "failed to deserialize legacy stake table at {}: fallback \
1633                                     also failed after initial error: {}",
1634                                    file_path.display(),
1635                                    err
1636                                )
1637                            })?;
1638
1639                        (validatormap, None, None)
1640                    },
1641                };
1642
1643            validator_sets.push((epoch, (stake.0, stake.1), stake.2));
1644        }
1645
1646        Ok(Some(validator_sets))
1647    }
1648
1649    async fn store_stake(
1650        &self,
1651        epoch: EpochNumber,
1652        stake: ValidatorMap,
1653        block_reward: Option<RewardAmount>,
1654        stake_table_hash: Option<StakeTableHash>,
1655    ) -> anyhow::Result<()> {
1656        let mut inner = self.inner.write().await;
1657        let dir_path = &inner.stake_table_dir_path();
1658
1659        fs::create_dir_all(dir_path.clone()).context("failed to create stake table dir")?;
1660
1661        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1662
1663        inner.replace(
1664            &file_path,
1665            |_| {
1666                // Always overwrite the previous file.
1667                Ok(true)
1668            },
1669            |mut file| {
1670                let bytes = bincode::serialize(&(stake, block_reward, stake_table_hash))
1671                    .context("serializing combined stake table")?;
1672                file.write_all(&bytes)?;
1673                Ok(())
1674            },
1675        )
1676    }
1677
1678    /// store stake table events upto the l1 block
1679    async fn store_events(
1680        &self,
1681        to_l1_block: u64,
1682        events: Vec<(EventKey, StakeTableEvent)>,
1683    ) -> anyhow::Result<()> {
1684        if events.is_empty() {
1685            return Ok(());
1686        }
1687
1688        let mut inner = self.inner.write().await;
1689        let dir_path = &inner.stake_table_dir_path();
1690        let events_dir = dir_path.join("events");
1691
1692        fs::create_dir_all(events_dir.clone()).context("failed to create events dir")?;
1693        // Read the last l1 finalized for which events has been stored
1694        let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1695
1696        // check if the last l1 events is higher than the incoming one
1697        if last_l1_finalized_path.exists() {
1698            let bytes = fs::read(&last_l1_finalized_path).with_context(|| {
1699                format!("Failed to read file at path: {last_l1_finalized_path:?}")
1700            })?;
1701            let mut buf = [0; 8];
1702            bytes
1703                .as_slice()
1704                .read_exact(&mut buf[..8])
1705                .with_context(|| {
1706                    format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1707                })?;
1708            let persisted_l1_block = u64::from_le_bytes(buf);
1709            if persisted_l1_block > to_l1_block {
1710                tracing::debug!(?persisted_l1_block, ?to_l1_block, "stored l1 is greater");
1711                return Ok(());
1712            }
1713        }
1714
1715        // stores each event in a separate file
1716        // this can cause performance issue when, for example, reading all the files
1717        // However, the plan is to remove file system completely in future
1718        for (event_key, event) in events {
1719            let (block_number, event_index) = event_key;
1720            // file name is like block_index.json
1721            let filename = format!("{block_number}_{event_index}");
1722            let file_path = events_dir.join(filename).with_extension("json");
1723
1724            if file_path.exists() {
1725                continue;
1726            }
1727
1728            let file = File::create(&file_path).context("Failed to create event file")?;
1729            let writer = BufWriter::new(file);
1730
1731            serde_json::to_writer_pretty(writer, &event)
1732                .context("Failed to write event to file")?;
1733        }
1734
1735        // update the l1 block for which we have processed events
1736        inner.replace(
1737            &last_l1_finalized_path,
1738            |_| Ok(true),
1739            |mut file| {
1740                let bytes = to_l1_block.to_le_bytes();
1741
1742                file.write_all(&bytes)?;
1743                tracing::debug!("updated l1 finalized ={to_l1_block:?}");
1744                Ok(())
1745            },
1746        )
1747    }
1748
1749    /// Loads all events from persistent storage up to the specified L1 block.
1750    ///
1751    /// # Returns
1752    ///
1753    /// Returns a tuple containing:
1754    /// - `Option<u64>` - The queried L1 block for which all events have been successfully fetched.
1755    /// - `Vec<(EventKey, StakeTableEvent)>` - A list of events, where each entry is a tuple of the event key
1756    /// event key is (l1 block number, log index)
1757    ///   and the corresponding StakeTable event.
1758    ///
1759    async fn load_events(
1760        &self,
1761        to_l1_block: u64,
1762    ) -> anyhow::Result<(
1763        Option<EventsPersistenceRead>,
1764        Vec<(EventKey, StakeTableEvent)>,
1765    )> {
1766        let inner = self.inner.read().await;
1767        let dir_path = inner.stake_table_dir_path();
1768        let events_dir = dir_path.join("events");
1769
1770        // check if we have any events in storage
1771        // we can do this by checking last l1 finalized block for which we processed events
1772        let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1773
1774        if !last_l1_finalized_path.exists() || !events_dir.exists() {
1775            return Ok((None, Vec::new()));
1776        }
1777
1778        let mut events = Vec::new();
1779
1780        let bytes = fs::read(&last_l1_finalized_path)
1781            .with_context(|| format!("Failed to read file at path: {last_l1_finalized_path:?}"))?;
1782        let mut buf = [0; 8];
1783        bytes
1784            .as_slice()
1785            .read_exact(&mut buf[..8])
1786            .with_context(|| {
1787                format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1788            })?;
1789
1790        let last_processed_l1_block = u64::from_le_bytes(buf);
1791
1792        // Determine the L1 block for querying events.
1793        // If the last stored L1 block is greater than the requested block, limit the query to the requested block.
1794        // Otherwise, query up to the last stored block.
1795        let query_l1_block = if last_processed_l1_block > to_l1_block {
1796            to_l1_block
1797        } else {
1798            last_processed_l1_block
1799        };
1800
1801        for entry in fs::read_dir(&events_dir).context("events directory")? {
1802            let entry = entry?;
1803            let path = entry.path();
1804
1805            if !entry.file_type()?.is_file() {
1806                continue;
1807            }
1808
1809            if path
1810                .extension()
1811                .context(format!("extension for path={path:?}"))?
1812                != "json"
1813            {
1814                continue;
1815            }
1816
1817            let filename = path
1818                .file_stem()
1819                .and_then(|f| f.to_str())
1820                .unwrap_or_default();
1821
1822            let parts: Vec<&str> = filename.split('_').collect();
1823            if parts.len() != 2 {
1824                continue;
1825            }
1826
1827            let block_number = parts[0].parse::<u64>()?;
1828            let log_index = parts[1].parse::<u64>()?;
1829
1830            if block_number > query_l1_block {
1831                continue;
1832            }
1833
1834            let file =
1835                File::open(&path).context(format!("Failed to open event file. path={path:?}"))?;
1836            let reader = BufReader::new(file);
1837
1838            let event: StakeTableEvent = serde_json::from_reader(reader)
1839                .context(format!("Failed to deserialize event at path={path:?}"))?;
1840
1841            events.push(((block_number, log_index), event));
1842        }
1843
1844        events.sort_by_key(|(key, _)| *key);
1845
1846        if query_l1_block == to_l1_block {
1847            Ok((Some(EventsPersistenceRead::Complete), events))
1848        } else {
1849            Ok((
1850                Some(EventsPersistenceRead::UntilL1Block(query_l1_block)),
1851                events,
1852            ))
1853        }
1854    }
1855}
1856
1857#[async_trait]
1858impl DhtPersistentStorage for Persistence {
1859    /// Save the DHT to the file on disk
1860    ///
1861    /// # Errors
1862    /// - If we fail to serialize the records
1863    /// - If we fail to write the serialized records to the file
1864    async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
1865        // Bincode-serialize the records
1866        let to_save =
1867            bincode::serialize(&records).with_context(|| "failed to serialize records")?;
1868
1869        // Get the path to save the file to
1870        let path = self.inner.read().await.libp2p_dht_path();
1871
1872        // Create the directory if it doesn't exist
1873        fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
1874            .with_context(|| "failed to create directory")?;
1875
1876        // Get a write lock on the inner struct
1877        let mut inner = self.inner.write().await;
1878
1879        // Save the file, replacing the previous one if it exists
1880        inner
1881            .replace(
1882                &path,
1883                |_| {
1884                    // Always overwrite the previous file
1885                    Ok(true)
1886                },
1887                |mut file| {
1888                    file.write_all(&to_save)
1889                        .with_context(|| "failed to write records to file")?;
1890                    Ok(())
1891                },
1892            )
1893            .with_context(|| "failed to save records to file")?;
1894
1895        Ok(())
1896    }
1897
1898    /// Load the DHT from the file on disk
1899    ///
1900    /// # Errors
1901    /// - If we fail to read the file
1902    /// - If we fail to deserialize the records
1903    async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
1904        // Read the contents of the file
1905        let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
1906            .with_context(|| "Failed to read records from file")?;
1907
1908        // Deserialize the contents
1909        let records: Vec<SerializableRecord> =
1910            bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;
1911
1912        Ok(records)
1913    }
1914}
1915/// Update a `NetworkConfig` that may have originally been persisted with an old version.
1916fn migrate_network_config(
1917    mut network_config: serde_json::Value,
1918) -> anyhow::Result<serde_json::Value> {
1919    let config = network_config
1920        .get_mut("config")
1921        .context("missing field `config`")?
1922        .as_object_mut()
1923        .context("`config` must be an object")?;
1924
1925    if !config.contains_key("builder_urls") {
1926        // When multi-builder support was added, the configuration field `builder_url: Url` was
1927        // replaced by an array `builder_urls: Vec<Url>`. If the saved config has no `builder_urls`
1928        // field, it is older than this change. Populate `builder_urls` with a singleton array
1929        // formed from the old value of `builder_url`, and delete the no longer used `builder_url`.
1930        let url = config
1931            .remove("builder_url")
1932            .context("missing field `builder_url`")?;
1933        config.insert("builder_urls".into(), vec![url].into());
1934    }
1935
1936    // HotShotConfig was upgraded to include parameters for proposing and voting on upgrades.
1937    // Configs which were persisted before this upgrade may be missing these parameters. This
1938    // migration initializes them with a default. By default, we use JS MAX_SAFE_INTEGER for the
1939    // start parameters so that nodes will never do an upgrade, unless explicitly configured
1940    // otherwise.
1941    if !config.contains_key("start_proposing_view") {
1942        config.insert("start_proposing_view".into(), 9007199254740991u64.into());
1943    }
1944    if !config.contains_key("stop_proposing_view") {
1945        config.insert("stop_proposing_view".into(), 0.into());
1946    }
1947    if !config.contains_key("start_voting_view") {
1948        config.insert("start_voting_view".into(), 9007199254740991u64.into());
1949    }
1950    if !config.contains_key("stop_voting_view") {
1951        config.insert("stop_voting_view".into(), 0.into());
1952    }
1953    if !config.contains_key("start_proposing_time") {
1954        config.insert("start_proposing_time".into(), 9007199254740991u64.into());
1955    }
1956    if !config.contains_key("stop_proposing_time") {
1957        config.insert("stop_proposing_time".into(), 0.into());
1958    }
1959    if !config.contains_key("start_voting_time") {
1960        config.insert("start_voting_time".into(), 9007199254740991u64.into());
1961    }
1962    if !config.contains_key("stop_voting_time") {
1963        config.insert("stop_voting_time".into(), 0.into());
1964    }
1965
1966    // HotShotConfig was upgraded to include an `epoch_height` parameter. Initialize with a default
1967    // if missing.
1968    if !config.contains_key("epoch_height") {
1969        config.insert("epoch_height".into(), 0.into());
1970    }
1971
1972    // HotShotConfig was upgraded to include `drb_difficulty` and `drb_upgrade_difficulty` parameters. Initialize with a default
1973    // if missing.
1974    if !config.contains_key("drb_difficulty") {
1975        config.insert("drb_difficulty".into(), 0.into());
1976    }
1977    if !config.contains_key("drb_upgrade_difficulty") {
1978        config.insert("drb_upgrade_difficulty".into(), 0.into());
1979    }
1980
1981    Ok(network_config)
1982}
1983
1984/// Get all paths under `dir` whose name is of the form <view number>.txt.
1985fn view_files(
1986    dir: impl AsRef<Path>,
1987) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
1988    Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
1989        let dir = dir.as_ref().display();
1990        let entry = entry.ok()?;
1991        if !entry.file_type().ok()?.is_file() {
1992            tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
1993            return None;
1994        }
1995        let path = entry.path();
1996        if path.extension()? != "txt" {
1997            tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
1998            return None;
1999        }
2000        let file_name = path.file_stem()?;
2001        let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
2002            tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2003            return None;
2004        };
2005        Some((ViewNumber::new(view_number), entry.path().to_owned()))
2006    }))
2007}
2008
2009/// Get all paths under `dir` whose name is of the form <epoch number>.txt.
2010/// Should probably be made generic and merged with view_files.
2011fn epoch_files(
2012    dir: impl AsRef<Path>,
2013) -> anyhow::Result<impl Iterator<Item = (EpochNumber, PathBuf)>> {
2014    Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2015        let dir = dir.as_ref().display();
2016        let entry = entry.ok()?;
2017        if !entry.file_type().ok()?.is_file() {
2018            tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2019            return None;
2020        }
2021        let path = entry.path();
2022        if path.extension()? != "txt" {
2023            tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2024            return None;
2025        }
2026        let file_name = path.file_stem()?;
2027        let Ok(epoch_number) = file_name.to_string_lossy().parse::<u64>() else {
2028            tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2029            return None;
2030        };
2031        Some((EpochNumber::new(epoch_number), entry.path().to_owned()))
2032    }))
2033}
2034
2035#[cfg(test)]
2036mod test {
2037    use std::marker::PhantomData;
2038
2039    use committable::{Commitment, CommitmentBoundsArkless, Committable};
2040    use espresso_types::{Header, Leaf, NodeState, PubKey, ValidatedState};
2041    use hotshot::types::SignatureKey;
2042    use hotshot_example_types::node_types::TestVersions;
2043    use hotshot_query_service::testing::mocks::MockVersions;
2044    use hotshot_types::{
2045        data::QuorumProposal2,
2046        light_client::LightClientState,
2047        simple_certificate::QuorumCertificate,
2048        simple_vote::QuorumData,
2049        traits::{
2050            block_contents::GENESIS_VID_NUM_STORAGE_NODES, node_implementation::Versions,
2051            EncodeBytes,
2052        },
2053        vid::advz::advz_scheme,
2054    };
2055    use jf_vid::VidScheme;
2056    use serde_json::json;
2057    use tempfile::TempDir;
2058    use vbs::version::StaticVersionType;
2059
2060    use super::*;
2061    use crate::{persistence::tests::TestablePersistence, BLSPubKey};
2062
2063    #[async_trait]
2064    impl TestablePersistence for Persistence {
2065        type Storage = TempDir;
2066
2067        async fn tmp_storage() -> Self::Storage {
2068            TempDir::new().unwrap()
2069        }
2070
2071        fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self> {
2072            Options::new(storage.path().into())
2073        }
2074    }
2075
2076    #[test]
2077    fn test_config_migrations_add_builder_urls() {
2078        let before = json!({
2079            "config": {
2080                "builder_url": "https://test:8080",
2081                "start_proposing_view": 1,
2082                "stop_proposing_view": 2,
2083                "start_voting_view": 1,
2084                "stop_voting_view": 2,
2085                "start_proposing_time": 1,
2086                "stop_proposing_time": 2,
2087                "start_voting_time": 1,
2088                "stop_voting_time": 2
2089            }
2090        });
2091        let after = json!({
2092            "config": {
2093                "builder_urls": ["https://test:8080"],
2094                "start_proposing_view": 1,
2095                "stop_proposing_view": 2,
2096                "start_voting_view": 1,
2097                "stop_voting_view": 2,
2098                "start_proposing_time": 1,
2099                "stop_proposing_time": 2,
2100                "start_voting_time": 1,
2101                "stop_voting_time": 2,
2102                "epoch_height": 0,
2103                "drb_difficulty": 0,
2104                "drb_upgrade_difficulty": 0,
2105            }
2106        });
2107
2108        assert_eq!(migrate_network_config(before).unwrap(), after);
2109    }
2110
2111    #[test]
2112    fn test_config_migrations_existing_builder_urls() {
2113        let before = json!({
2114            "config": {
2115                "builder_urls": ["https://test:8080", "https://test:8081"],
2116                "start_proposing_view": 1,
2117                "stop_proposing_view": 2,
2118                "start_voting_view": 1,
2119                "stop_voting_view": 2,
2120                "start_proposing_time": 1,
2121                "stop_proposing_time": 2,
2122                "start_voting_time": 1,
2123                "stop_voting_time": 2,
2124                "epoch_height": 0,
2125                "drb_difficulty": 0,
2126                "drb_upgrade_difficulty": 0,
2127            }
2128        });
2129
2130        assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2131    }
2132
2133    #[test]
2134    fn test_config_migrations_add_upgrade_params() {
2135        let before = json!({
2136            "config": {
2137                "builder_urls": ["https://test:8080", "https://test:8081"]
2138            }
2139        });
2140        let after = json!({
2141            "config": {
2142                "builder_urls": ["https://test:8080", "https://test:8081"],
2143                "start_proposing_view": 9007199254740991u64,
2144                "stop_proposing_view": 0,
2145                "start_voting_view": 9007199254740991u64,
2146                "stop_voting_view": 0,
2147                "start_proposing_time": 9007199254740991u64,
2148                "stop_proposing_time": 0,
2149                "start_voting_time": 9007199254740991u64,
2150                "stop_voting_time": 0,
2151                "epoch_height": 0,
2152                "drb_difficulty": 0,
2153                "drb_upgrade_difficulty": 0,
2154            }
2155        });
2156
2157        assert_eq!(migrate_network_config(before).unwrap(), after);
2158    }
2159
2160    #[test]
2161    fn test_config_migrations_existing_upgrade_params() {
2162        let before = json!({
2163            "config": {
2164                "builder_urls": ["https://test:8080", "https://test:8081"],
2165                "start_proposing_view": 1,
2166                "stop_proposing_view": 2,
2167                "start_voting_view": 1,
2168                "stop_voting_view": 2,
2169                "start_proposing_time": 1,
2170                "stop_proposing_time": 2,
2171                "start_voting_time": 1,
2172                "stop_voting_time": 2,
2173                "epoch_height": 0,
2174                "drb_difficulty": 0,
2175                "drb_upgrade_difficulty": 0,
2176            }
2177        });
2178
2179        assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2180    }
2181
2182    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2183    pub async fn test_consensus_migration() {
2184        let rows = 300;
2185        let tmp = Persistence::tmp_storage().await;
2186        let mut opt = Persistence::options(&tmp);
2187        let storage = opt.create().await.unwrap();
2188
2189        let inner = storage.inner.read().await;
2190
2191        let decided_leaves_path = inner.decided_leaf_path();
2192        fs::create_dir_all(decided_leaves_path.clone()).expect("failed to create proposals dir");
2193
2194        let qp_dir_path = inner.quorum_proposals_dir_path();
2195        fs::create_dir_all(qp_dir_path.clone()).expect("failed to create proposals dir");
2196
2197        let state_cert_dir_path = inner.state_cert_dir_path();
2198        fs::create_dir_all(state_cert_dir_path.clone()).expect("failed to create state cert dir");
2199        drop(inner);
2200
2201        assert!(storage.load_state_cert().await.unwrap().is_none());
2202
2203        for i in 0..rows {
2204            let view = ViewNumber::new(i);
2205            let validated_state = ValidatedState::default();
2206            let instance_state = NodeState::default();
2207
2208            let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
2209            let (payload, metadata) =
2210                Payload::from_transactions([], &validated_state, &instance_state)
2211                    .await
2212                    .unwrap();
2213
2214            let payload_bytes = payload.encode();
2215
2216            let block_header =
2217                Header::genesis::<TestVersions>(&instance_state, payload.clone(), &metadata);
2218
2219            let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
2220                epoch: EpochNumber::new(i),
2221                light_client_state: LightClientState {
2222                    view_number: i,
2223                    block_height: i,
2224                    block_comm_root: Default::default(),
2225                },
2226                next_stake_table_state: Default::default(),
2227                signatures: vec![], // filling arbitrary value
2228                auth_root: Default::default(),
2229            };
2230            assert!(storage.add_state_cert(state_cert).await.is_ok());
2231
2232            let null_quorum_data = QuorumData {
2233                leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
2234            };
2235
2236            let justify_qc = QuorumCertificate::new(
2237                null_quorum_data.clone(),
2238                null_quorum_data.commit(),
2239                view,
2240                None,
2241                PhantomData,
2242            );
2243
2244            let quorum_proposal = QuorumProposal {
2245                block_header,
2246                view_number: view,
2247                justify_qc: justify_qc.clone(),
2248                upgrade_certificate: None,
2249                proposal_certificate: None,
2250            };
2251
2252            let quorum_proposal_signature =
2253                BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2254                    .expect("Failed to sign quorum proposal");
2255
2256            let proposal = Proposal {
2257                data: quorum_proposal.clone(),
2258                signature: quorum_proposal_signature,
2259                _pd: PhantomData,
2260            };
2261
2262            let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
2263            leaf.fill_block_payload::<TestVersions>(
2264                payload,
2265                GENESIS_VID_NUM_STORAGE_NODES,
2266                <TestVersions as Versions>::Base::VERSION,
2267            )
2268            .unwrap();
2269
2270            let mut inner = storage.inner.write().await;
2271
2272            tracing::debug!("inserting decided leaves");
2273            let file_path = decided_leaves_path
2274                .join(view.to_string())
2275                .with_extension("txt");
2276
2277            tracing::debug!("inserting decided leaves");
2278
2279            inner
2280                .replace(
2281                    &file_path,
2282                    |_| Ok(true),
2283                    |mut file| {
2284                        let bytes = bincode::serialize(&(&leaf.clone(), justify_qc))?;
2285                        file.write_all(&bytes)?;
2286                        Ok(())
2287                    },
2288                )
2289                .expect("replace decided leaves");
2290
2291            let file_path = qp_dir_path.join(view.to_string()).with_extension("txt");
2292
2293            tracing::debug!("inserting qc for {view}");
2294
2295            inner
2296                .replace(
2297                    &file_path,
2298                    |_| Ok(true),
2299                    |mut file| {
2300                        let proposal_bytes =
2301                            bincode::serialize(&proposal).context("serialize proposal")?;
2302
2303                        file.write_all(&proposal_bytes)?;
2304                        Ok(())
2305                    },
2306                )
2307                .unwrap();
2308
2309            drop(inner);
2310            let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
2311                .disperse(payload_bytes.clone())
2312                .unwrap();
2313
2314            let vid = ADVZDisperseShare::<SeqTypes> {
2315                view_number: ViewNumber::new(i),
2316                payload_commitment: Default::default(),
2317                share: disperse.shares[0].clone(),
2318                common: disperse.common,
2319                recipient_key: pubkey,
2320            };
2321
2322            let (payload, metadata) =
2323                Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
2324                    .await
2325                    .unwrap();
2326
2327            let da = DaProposal::<SeqTypes> {
2328                encoded_transactions: payload.encode(),
2329                metadata,
2330                view_number: ViewNumber::new(i),
2331            };
2332
2333            let block_payload_signature =
2334                BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
2335
2336            let da_proposal = Proposal {
2337                data: da,
2338                signature: block_payload_signature,
2339                _pd: Default::default(),
2340            };
2341
2342            tracing::debug!("inserting vid for {view}");
2343            storage
2344                .append_vid(&vid.to_proposal(&privkey).unwrap())
2345                .await
2346                .unwrap();
2347
2348            tracing::debug!("inserting da for {view}");
2349            storage
2350                .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
2351                .await
2352                .unwrap();
2353        }
2354
2355        storage.migrate_consensus().await.unwrap();
2356        let inner = storage.inner.read().await;
2357        let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2358        let decided_leaves_count = decided_leaves
2359            .filter_map(Result::ok)
2360            .filter(|e| e.path().is_file())
2361            .count();
2362        assert_eq!(
2363            decided_leaves_count, rows as usize,
2364            "decided leaves count does not match",
2365        );
2366
2367        let da_proposals = fs::read_dir(inner.da2_dir_path()).unwrap();
2368        let da_proposals_count = da_proposals
2369            .filter_map(Result::ok)
2370            .filter(|e| e.path().is_file())
2371            .count();
2372        assert_eq!(
2373            da_proposals_count, rows as usize,
2374            "da proposals does not match",
2375        );
2376
2377        let vids = fs::read_dir(inner.vid2_dir_path()).unwrap();
2378        let vids_count = vids
2379            .filter_map(Result::ok)
2380            .filter(|e| e.path().is_file())
2381            .count();
2382        assert_eq!(vids_count, rows as usize, "vid shares count does not match",);
2383
2384        let qps = fs::read_dir(inner.quorum_proposals2_dir_path()).unwrap();
2385        let qps_count = qps
2386            .filter_map(Result::ok)
2387            .filter(|e| e.path().is_file())
2388            .count();
2389        assert_eq!(
2390            qps_count, rows as usize,
2391            "quorum proposals count does not match",
2392        );
2393
2394        let state_certs = fs::read_dir(inner.state_cert_dir_path()).unwrap();
2395        let state_cert_count = state_certs
2396            .filter_map(Result::ok)
2397            .filter(|e| e.path().is_file())
2398            .count();
2399        assert_eq!(
2400            state_cert_count, rows as usize,
2401            "light client state update certificate count does not match",
2402        );
2403
2404        // Reinitialize the file system persistence using the same path.
2405        // re run the consensus migration.
2406        // No changes will occur, as the migration has already been completed.
2407        let storage = opt.create().await.unwrap();
2408        storage.migrate_consensus().await.unwrap();
2409
2410        let inner = storage.inner.read().await;
2411        let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2412        let decided_leaves_count = decided_leaves
2413            .filter_map(Result::ok)
2414            .filter(|e| e.path().is_file())
2415            .count();
2416        assert_eq!(
2417            decided_leaves_count, rows as usize,
2418            "decided leaves count does not match",
2419        );
2420    }
2421
2422    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2423    async fn test_load_quorum_proposals_invalid_extension() {
2424        let tmp = Persistence::tmp_storage().await;
2425        let storage = Persistence::connect(&tmp).await;
2426
2427        // Generate a couple of valid quorum proposals.
2428        let leaf = Leaf2::genesis::<MockVersions>(&Default::default(), &NodeState::mock()).await;
2429        let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2430        let signature = PubKey::sign(&privkey, &[]).unwrap();
2431        let mut quorum_proposal = Proposal {
2432            data: QuorumProposalWrapper::<SeqTypes> {
2433                proposal: QuorumProposal2::<SeqTypes> {
2434                    epoch: None,
2435                    block_header: leaf.block_header().clone(),
2436                    view_number: ViewNumber::genesis(),
2437                    justify_qc: QuorumCertificate2::genesis::<TestVersions>(
2438                        &Default::default(),
2439                        &NodeState::mock(),
2440                    )
2441                    .await,
2442                    upgrade_certificate: None,
2443                    view_change_evidence: None,
2444                    next_drb_result: None,
2445                    next_epoch_justify_qc: None,
2446                    state_cert: None,
2447                },
2448            },
2449            signature,
2450            _pd: Default::default(),
2451        };
2452
2453        // Store quorum proposals.
2454        let quorum_proposal1 = quorum_proposal.clone();
2455        storage
2456            .append_quorum_proposal2(&quorum_proposal1)
2457            .await
2458            .unwrap();
2459        quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
2460        let quorum_proposal2 = quorum_proposal.clone();
2461        storage
2462            .append_quorum_proposal2(&quorum_proposal2)
2463            .await
2464            .unwrap();
2465
2466        // Change one of the file extensions. It can happen that we end up with files with the wrong
2467        // extension if, for example, the node is killed before cleaning up a swap file.
2468        fs::rename(
2469            tmp.path().join("quorum_proposals2/1.txt"),
2470            tmp.path().join("quorum_proposals2/1.swp"),
2471        )
2472        .unwrap();
2473
2474        // Loading should simply ignore the unrecognized extension.
2475        assert_eq!(
2476            storage.load_quorum_proposals().await.unwrap(),
2477            [(ViewNumber::genesis(), quorum_proposal1)]
2478                .into_iter()
2479                .collect::<BTreeMap<_, _>>()
2480        );
2481    }
2482
2483    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2484    async fn test_load_quorum_proposals_malformed_data() {
2485        let tmp = Persistence::tmp_storage().await;
2486        let storage = Persistence::connect(&tmp).await;
2487
2488        // Generate a valid quorum proposal.
2489        let leaf: Leaf2 = Leaf::genesis::<MockVersions>(&Default::default(), &NodeState::mock())
2490            .await
2491            .into();
2492        let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2493        let signature = PubKey::sign(&privkey, &[]).unwrap();
2494        let quorum_proposal = Proposal {
2495            data: QuorumProposalWrapper::<SeqTypes> {
2496                proposal: QuorumProposal2::<SeqTypes> {
2497                    epoch: None,
2498                    block_header: leaf.block_header().clone(),
2499                    view_number: ViewNumber::new(1),
2500                    justify_qc: QuorumCertificate2::genesis::<TestVersions>(
2501                        &Default::default(),
2502                        &NodeState::mock(),
2503                    )
2504                    .await,
2505                    upgrade_certificate: None,
2506                    view_change_evidence: None,
2507                    next_drb_result: None,
2508                    next_epoch_justify_qc: None,
2509                    state_cert: None,
2510                },
2511            },
2512            signature,
2513            _pd: Default::default(),
2514        };
2515
2516        // First store an invalid quorum proposal.
2517        fs::create_dir_all(tmp.path().join("quorum_proposals2")).unwrap();
2518        fs::write(
2519            tmp.path().join("quorum_proposals2/0.txt"),
2520            "invalid data".as_bytes(),
2521        )
2522        .unwrap();
2523
2524        // Store valid quorum proposal.
2525        storage
2526            .append_quorum_proposal2(&quorum_proposal)
2527            .await
2528            .unwrap();
2529
2530        // Loading should ignore the invalid data and return the valid proposal.
2531        assert_eq!(
2532            storage.load_quorum_proposals().await.unwrap(),
2533            [(ViewNumber::new(1), quorum_proposal)]
2534                .into_iter()
2535                .collect::<BTreeMap<_, _>>()
2536        );
2537    }
2538}