sequencer/persistence/
fs.rs

1use std::{
2    collections::{BTreeMap, HashSet},
3    fs::{self, File, OpenOptions},
4    io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
5    ops::RangeInclusive,
6    path::{Path, PathBuf},
7    sync::Arc,
8    time::Instant,
9};
10
11use anyhow::{anyhow, bail, Context};
12use async_lock::RwLock;
13use async_trait::async_trait;
14use clap::Parser;
15use espresso_types::{
16    traits::{EventsPersistenceRead, MembershipPersistence},
17    v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
18    v0_3::{EventKey, IndexedStake, RewardAmount, StakeTableEvent, Validator},
19    Leaf, Leaf2, NetworkConfig, Payload, PubKey, SeqTypes, StakeTableHash, ValidatorMap,
20};
21use hotshot::InitializerEpochInfo;
22use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
23    DhtPersistentStorage, SerializableRecord,
24};
25use hotshot_types::{
26    data::{
27        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
28        QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare, VidDisperseShare0,
29    },
30    drb::{DrbInput, DrbResult},
31    event::{Event, EventType, HotShotAction, LeafInfo},
32    message::{convert_proposal, Proposal},
33    simple_certificate::{
34        CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
35        NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
36    },
37    traits::{
38        block_contents::{BlockHeader, BlockPayload},
39        metrics::Metrics,
40        node_implementation::{ConsensusTime, NodeType},
41    },
42    vote::HasViewNumber,
43};
44use itertools::Itertools;
45
46use crate::{
47    persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
48    ViewNumber, RECENT_STAKE_TABLES_LIMIT,
49};
50
51/// Options for file system backed persistence.
52#[derive(Parser, Clone, Debug)]
53pub struct Options {
54    /// Storage path for persistent data.
55    #[clap(long, env = "ESPRESSO_SEQUENCER_STORAGE_PATH")]
56    path: PathBuf,
57
58    /// Number of views to retain in consensus storage before data that hasn't been archived is
59    /// garbage collected.
60    ///
61    /// The longer this is, the more certain that all data will eventually be archived, even if
62    /// there are temporary problems with archive storage or partially missing data. This can be set
63    /// very large, as most data is garbage collected as soon as it is finalized by consensus. This
64    /// setting only applies to views which never get decided (ie forks in consensus) and views for
65    /// which this node is partially offline. These should be exceptionally rare.
66    ///
67    /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average
68    /// view time of 2s.
69    #[clap(
70        long,
71        env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION",
72        default_value = "130000"
73    )]
74    pub(crate) consensus_view_retention: u64,
75}
76
77impl Default for Options {
78    fn default() -> Self {
79        Self::parse_from(std::iter::empty::<String>())
80    }
81}
82
83impl Options {
84    pub fn new(path: PathBuf) -> Self {
85        Self {
86            path,
87            consensus_view_retention: 130000,
88        }
89    }
90
91    pub(crate) fn path(&self) -> &Path {
92        &self.path
93    }
94}
95
96#[async_trait]
97impl PersistenceOptions for Options {
98    type Persistence = Persistence;
99
100    fn set_view_retention(&mut self, view_retention: u64) {
101        self.consensus_view_retention = view_retention;
102    }
103
104    async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
105        let path = self.path.clone();
106        let view_retention = self.consensus_view_retention;
107
108        let migration_path = path.join("migration");
109        let migrated = if migration_path.is_file() {
110            let bytes = fs::read(&migration_path).context(format!(
111                "unable to read migration from {}",
112                migration_path.display()
113            ))?;
114            bincode::deserialize(&bytes).context("malformed migration file")?
115        } else {
116            HashSet::new()
117        };
118
119        Ok(Persistence {
120            inner: Arc::new(RwLock::new(Inner {
121                path,
122                migrated,
123                view_retention,
124            })),
125            metrics: Arc::new(PersistenceMetricsValue::default()),
126        })
127    }
128
129    async fn reset(self) -> anyhow::Result<()> {
130        todo!()
131    }
132}
133
134/// File system backed persistence.
135#[derive(Clone, Debug)]
136pub struct Persistence {
137    // We enforce mutual exclusion on access to the data source, as the current file system
138    // implementation does not support transaction isolation for concurrent reads and writes. We can
139    // improve this in the future by switching to a SQLite-based file system implementation.
140    inner: Arc<RwLock<Inner>>,
141    /// A reference to the metrics trait
142    metrics: Arc<PersistenceMetricsValue>,
143}
144
145#[derive(Debug)]
146struct Inner {
147    path: PathBuf,
148    view_retention: u64,
149    migrated: HashSet<String>,
150}
151
152impl Inner {
153    fn config_path(&self) -> PathBuf {
154        self.path.join("hotshot.cfg")
155    }
156
157    fn migration(&self) -> PathBuf {
158        self.path.join("migration")
159    }
160
161    fn voted_view_path(&self) -> PathBuf {
162        self.path.join("highest_voted_view")
163    }
164
165    fn restart_view_path(&self) -> PathBuf {
166        self.path.join("restart_view")
167    }
168
169    /// Path to a directory containing decided leaves.
170    fn decided_leaf_path(&self) -> PathBuf {
171        self.path.join("decided_leaves")
172    }
173
174    fn decided_leaf2_path(&self) -> PathBuf {
175        self.path.join("decided_leaves2")
176    }
177
178    /// The path from previous versions where there was only a single file for anchor leaves.
179    fn legacy_anchor_leaf_path(&self) -> PathBuf {
180        self.path.join("anchor_leaf")
181    }
182
183    fn vid_dir_path(&self) -> PathBuf {
184        self.path.join("vid")
185    }
186
187    fn vid2_dir_path(&self) -> PathBuf {
188        self.path.join("vid2")
189    }
190
191    fn da_dir_path(&self) -> PathBuf {
192        self.path.join("da")
193    }
194
195    fn drb_dir_path(&self) -> PathBuf {
196        self.path.join("drb")
197    }
198
199    fn da2_dir_path(&self) -> PathBuf {
200        self.path.join("da2")
201    }
202
203    fn quorum_proposals_dir_path(&self) -> PathBuf {
204        self.path.join("quorum_proposals")
205    }
206
207    fn quorum_proposals2_dir_path(&self) -> PathBuf {
208        self.path.join("quorum_proposals2")
209    }
210
211    fn upgrade_certificate_dir_path(&self) -> PathBuf {
212        self.path.join("upgrade_certificate")
213    }
214
215    fn stake_table_dir_path(&self) -> PathBuf {
216        self.path.join("stake_table")
217    }
218
219    fn next_epoch_qc(&self) -> PathBuf {
220        self.path.join("next_epoch_quorum_certificate")
221    }
222
223    fn eqc(&self) -> PathBuf {
224        self.path.join("eqc")
225    }
226
227    fn libp2p_dht_path(&self) -> PathBuf {
228        self.path.join("libp2p_dht")
229    }
230    fn epoch_drb_result_dir_path(&self) -> PathBuf {
231        self.path.join("epoch_drb_result")
232    }
233
234    fn epoch_root_block_header_dir_path(&self) -> PathBuf {
235        self.path.join("epoch_root_block_header")
236    }
237
238    fn finalized_state_cert_dir_path(&self) -> PathBuf {
239        self.path.join("finalized_state_cert")
240    }
241
242    fn state_cert_dir_path(&self) -> PathBuf {
243        self.path.join("state_cert")
244    }
245
246    fn update_migration(&mut self) -> anyhow::Result<()> {
247        let path = self.migration();
248        let bytes = bincode::serialize(&self.migrated)?;
249
250        self.replace(
251            &path,
252            |_| Ok(true),
253            |mut file| {
254                file.write_all(&bytes)?;
255                Ok(())
256            },
257        )
258    }
259
260    /// Overwrite a file if a condition is met.
261    ///
262    /// The file at `path`, if it exists, is opened in read mode and passed to `pred`. If `pred`
263    /// returns `true`, or if there was no existing file, then `write` is called to update the
264    /// contents of the file. `write` receives a truncated file open in write mode and sets the
265    /// contents of the file.
266    ///
267    /// The final replacement of the original file is atomic; that is, `path` will be modified only
268    /// if the entire update succeeds.
269    fn replace(
270        &mut self,
271        path: &Path,
272        pred: impl FnOnce(File) -> anyhow::Result<bool>,
273        write: impl FnOnce(File) -> anyhow::Result<()>,
274    ) -> anyhow::Result<()> {
275        if path.is_file() {
276            // If there is an existing file, check if it is suitable to replace. Note that this
277            // check is not atomic with respect to the subsequent write at the file system level,
278            // but this object is the only one which writes to this file, and we have a mutable
279            // reference, so this should be safe.
280            if !pred(File::open(path)?)? {
281                // If we are not overwriting the file, we are done and consider the whole operation
282                // successful.
283                return Ok(());
284            }
285        }
286
287        // Either there is no existing file or we have decided to overwrite the file. Write the new
288        // contents into a temporary file so we can update `path` atomically using `rename`.
289        let mut swap_path = path.to_owned();
290        swap_path.set_extension("swp");
291        let swap = OpenOptions::new()
292            .write(true)
293            .truncate(true)
294            .create(true)
295            .open(&swap_path)?;
296        write(swap)?;
297
298        // Now we can replace the original file.
299        fs::rename(swap_path, path)?;
300
301        Ok(())
302    }
303
304    fn collect_garbage(
305        &mut self,
306        decided_view: ViewNumber,
307        prune_intervals: &[RangeInclusive<ViewNumber>],
308    ) -> anyhow::Result<()> {
309        let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
310
311        self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?;
312        self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?;
313        self.prune_files(
314            self.quorum_proposals2_dir_path(),
315            prune_view,
316            None,
317            prune_intervals,
318        )?;
319        self.prune_files(
320            self.state_cert_dir_path(),
321            prune_view,
322            None,
323            prune_intervals,
324        )?;
325
326        // Save the most recent leaf as it will be our anchor point if the node restarts.
327        self.prune_files(
328            self.decided_leaf2_path(),
329            prune_view,
330            Some(decided_view),
331            prune_intervals,
332        )?;
333
334        Ok(())
335    }
336
337    fn prune_files(
338        &mut self,
339        dir_path: PathBuf,
340        prune_view: ViewNumber,
341        keep_decided_view: Option<ViewNumber>,
342        prune_intervals: &[RangeInclusive<ViewNumber>],
343    ) -> anyhow::Result<()> {
344        if !dir_path.is_dir() {
345            return Ok(());
346        }
347
348        for (file_view, path) in view_files(dir_path)? {
349            // If the view is the anchor view, keep it no matter what.
350            if let Some(decided_view) = keep_decided_view {
351                if decided_view == file_view {
352                    continue;
353                }
354            }
355            // Otherwise, delete it if it is time to prune this view _or_ if the given intervals,
356            // which we've already successfully processed, contain the view; in this case we simply
357            // don't need it anymore.
358            if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
359                fs::remove_file(&path)?;
360            }
361        }
362
363        Ok(())
364    }
365
366    fn parse_decided_leaf(
367        &self,
368        bytes: &[u8],
369    ) -> anyhow::Result<(Leaf2, CertificatePair<SeqTypes>)> {
370        // Old versions of the software did not store the next epoch QC. Without knowing which
371        // version this file was created with, we can simply try parsing both ways and then
372        // reconstruct a certificate pair with or without the next epoch QC.
373        match bincode::deserialize(bytes) {
374            Ok((leaf, cert)) => Ok((leaf, cert)),
375            Err(err) => {
376                tracing::info!(
377                    "error parsing decided leaf, maybe file was created without next epoch QC? \
378                     {err}"
379                );
380                let (leaf, qc) =
381                    bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(bytes)
382                        .context("parsing decided leaf")?;
383                Ok((leaf, CertificatePair::non_epoch_change(qc)))
384            },
385        }
386    }
387
388    /// Generate events based on persisted decided leaves.
389    ///
390    /// Returns a list of closed intervals of views which can be safely deleted, as all leaves
391    /// within these view ranges have been processed by the event consumer.
392    async fn generate_decide_events(
393        &mut self,
394        view: ViewNumber,
395        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
396        consumer: &impl EventConsumer,
397    ) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
398        // Generate a decide event for each leaf, to be processed by the event consumer. We make a
399        // separate event for each leaf because it is possible we have non-consecutive leaves in our
400        // storage, which would not be valid as a single decide with a single leaf chain.
401        let mut leaves = BTreeMap::new();
402        for (v, path) in view_files(self.decided_leaf2_path())? {
403            if v > view {
404                continue;
405            }
406
407            let bytes =
408                fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
409            let (mut leaf, cert) = self.parse_decided_leaf(&bytes)?;
410
411            // Include the VID share if available.
412            let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data);
413            if vid_share.is_none() {
414                tracing::debug!(?v, "VID share not available at decide");
415            }
416
417            // Move the state cert to the finalized dir if it exists.
418            let state_cert = self.store_finalized_state_cert(v)?;
419
420            // Fill in the full block payload using the DA proposals we had persisted.
421            if let Some(proposal) = self.load_da_proposal(v)? {
422                let payload = Payload::from_bytes(
423                    &proposal.data.encoded_transactions,
424                    &proposal.data.metadata,
425                );
426                leaf.fill_block_payload_unchecked(payload);
427            } else {
428                tracing::debug!(?v, "DA proposal not available at decide");
429            }
430
431            let info = LeafInfo {
432                leaf,
433                vid_share,
434                state_cert,
435                // Note: the following fields are not used in Decide event processing, and should be
436                // removed. For now, we just default them.
437                state: Default::default(),
438                delta: Default::default(),
439            };
440
441            leaves.insert(v, (info, cert));
442        }
443
444        // The invariant is that the oldest existing leaf in the `anchor_leaf` table -- if there is
445        // one -- was always included in the _previous_ decide event...but not removed from the
446        // database, because we always persist the most recent anchor leaf.
447        if let Some((oldest_view, _)) = leaves.first_key_value() {
448            // The only exception is when the oldest leaf is the genesis leaf; then there was no
449            // previous decide event.
450            if *oldest_view > ViewNumber::genesis() {
451                leaves.pop_first();
452            }
453        }
454
455        let mut intervals = vec![];
456        let mut current_interval = None;
457        for (view, (leaf, cert)) in leaves {
458            let height = leaf.leaf.block_header().block_number();
459
460            {
461                // Insert the deciding QC at the appropriate position, with the last decide event in the
462                // chain.
463                let deciding_qc = if let Some(deciding_qc) = &deciding_qc {
464                    (deciding_qc.view_number() == cert.view_number() + 1)
465                        .then_some(deciding_qc.clone())
466                } else {
467                    None
468                };
469
470                consumer
471                    .handle_event(&Event {
472                        view_number: view,
473                        event: EventType::Decide {
474                            committing_qc: Arc::new(cert),
475                            deciding_qc,
476                            leaf_chain: Arc::new(vec![leaf]),
477                            block_size: None,
478                        },
479                    })
480                    .await?;
481            }
482            if let Some((start, end, current_height)) = current_interval.as_mut() {
483                if height == *current_height + 1 {
484                    // If we have a chain of consecutive leaves, extend the current interval of
485                    // views which are safe to delete.
486                    *current_height += 1;
487                    *end = view;
488                } else {
489                    // Otherwise, end the current interval and start a new one.
490                    intervals.push(*start..=*end);
491                    current_interval = Some((view, view, height));
492                }
493            } else {
494                // Start a new interval.
495                current_interval = Some((view, view, height));
496            }
497        }
498        if let Some((start, end, _)) = current_interval {
499            intervals.push(start..=end);
500        }
501
502        Ok(intervals)
503    }
504
505    fn load_da_proposal(
506        &self,
507        view: ViewNumber,
508    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
509        let dir_path = self.da2_dir_path();
510
511        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
512
513        if !file_path.exists() {
514            return Ok(None);
515        }
516
517        let da_bytes = fs::read(file_path)?;
518
519        let da_proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
520            bincode::deserialize(&da_bytes)?;
521        Ok(Some(da_proposal))
522    }
523
524    fn load_vid_share(
525        &self,
526        view: ViewNumber,
527    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
528        let dir_path = self.vid2_dir_path();
529
530        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
531
532        if !file_path.exists() {
533            return Ok(None);
534        }
535
536        let vid_share_bytes = fs::read(file_path)?;
537        let vid_share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
538            bincode::deserialize(&vid_share_bytes)?;
539        Ok(Some(vid_share))
540    }
541
542    fn load_anchor_leaf(&self) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
543        tracing::info!("Checking `Leaf2` to load the anchor leaf.");
544        if self.decided_leaf2_path().is_dir() {
545            let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
546
547            // Return the latest decided leaf.
548            for (_, path) in view_files(self.decided_leaf2_path())? {
549                let bytes =
550                    fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
551                let (leaf, cert) = self.parse_decided_leaf(&bytes)?;
552                if let Some((anchor_leaf, _)) = &anchor {
553                    if leaf.view_number() > anchor_leaf.view_number() {
554                        anchor = Some((leaf, cert.qc().clone()));
555                    }
556                } else {
557                    anchor = Some((leaf, cert.qc().clone()));
558                }
559            }
560
561            return Ok(anchor);
562        }
563
564        tracing::warn!(
565            "Failed to find an anchor leaf in `Leaf2` storage. Checking legacy `Leaf` storage. \
566             This is very likely to fail."
567        );
568        if self.legacy_anchor_leaf_path().is_file() {
569            // We may have an old version of storage, where there is just a single file for the
570            // anchor leaf. Read it and return the contents.
571            let mut file = BufReader::new(File::open(self.legacy_anchor_leaf_path())?);
572
573            // The first 8 bytes just contain the height of the leaf. We can skip this.
574            file.seek(SeekFrom::Start(8)).context("seek")?;
575            let bytes = file
576                .bytes()
577                .collect::<Result<Vec<_>, _>>()
578                .context("read")?;
579            return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?));
580        }
581
582        Ok(None)
583    }
584
585    fn store_finalized_state_cert(
586        &mut self,
587        view: ViewNumber,
588    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
589        let dir_path = self.state_cert_dir_path();
590        let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
591
592        if !file_path.exists() {
593            return Ok(None);
594        }
595
596        let bytes = fs::read(&file_path)?;
597
598        let state_cert: LightClientStateUpdateCertificateV2<SeqTypes> =
599            bincode::deserialize(&bytes).or_else(|err_v2| {
600                tracing::info!(
601                    error = %err_v2,
602                    path = %file_path.display(),
603                    "Failed to deserialize state certificate, attempting with v1"
604                );
605
606                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
607                    .map(Into::into)
608                    .with_context(|| {
609                        format!(
610                            "Failed to deserialize with both v2 and v1 from file '{}'. error: \
611                             {err_v2}",
612                            file_path.display()
613                        )
614                    })
615            })?;
616
617        let epoch = state_cert.epoch.u64();
618        let finalized_dir_path = self.finalized_state_cert_dir_path();
619        fs::create_dir_all(&finalized_dir_path).context("creating finalized state cert dir")?;
620
621        let finalized_file_path = finalized_dir_path
622            .join(epoch.to_string())
623            .with_extension("txt");
624
625        self.replace(
626            &finalized_file_path,
627            |_| Ok(true),
628            |mut file| {
629                file.write_all(&bytes)?;
630                Ok(())
631            },
632        )
633        .context(format!(
634            "finalizing light client state update certificate file for epoch {epoch:?}"
635        ))?;
636
637        Ok(Some(state_cert))
638    }
639}
640
641#[async_trait]
642impl SequencerPersistence for Persistence {
643    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
644        let inner = self.inner.read().await;
645        let path = inner.config_path();
646        if !path.is_file() {
647            tracing::info!("config not found at {}", path.display());
648            return Ok(None);
649        }
650        tracing::info!("loading config from {}", path.display());
651
652        let bytes =
653            fs::read(&path).context(format!("unable to read config from {}", path.display()))?;
654        let json = serde_json::from_slice(&bytes).context("config file is not valid JSON")?;
655        let json = migrate_network_config(json).context("migration of network config failed")?;
656        let config = serde_json::from_value(json).context("malformed config file")?;
657        Ok(Some(config))
658    }
659
660    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
661        let inner = self.inner.write().await;
662        let path = inner.config_path();
663        tracing::info!("saving config to {}", path.display());
664        Ok(cfg.to_file(path.display().to_string())?)
665    }
666
667    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
668        let inner = self.inner.read().await;
669        let path = inner.voted_view_path();
670        if !path.is_file() {
671            return Ok(None);
672        }
673        let bytes = fs::read(inner.voted_view_path())?
674            .try_into()
675            .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
676        Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
677    }
678
679    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
680        let inner = self.inner.read().await;
681        let path = inner.restart_view_path();
682        if !path.is_file() {
683            return Ok(None);
684        }
685        let bytes = fs::read(path)?
686            .try_into()
687            .map_err(|bytes| anyhow!("malformed restart view file: {bytes:?}"))?;
688        Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
689    }
690
691    async fn append_decided_leaves(
692        &self,
693        view: ViewNumber,
694        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
695        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
696        consumer: &impl EventConsumer,
697    ) -> anyhow::Result<()> {
698        let mut inner = self.inner.write().await;
699        let path = inner.decided_leaf2_path();
700
701        // Ensure the anchor leaf directory exists.
702        fs::create_dir_all(&path).context("creating anchor leaf directory")?;
703
704        // Earlier versions stored only a single decided leaf in a regular file. If our storage is
705        // still on this version, migrate to a directory structure storing (possibly) many leaves.
706        let legacy_path = inner.legacy_anchor_leaf_path();
707        if !path.is_dir() && legacy_path.is_file() {
708            tracing::info!("migrating to multi-leaf storage");
709
710            // Move the existing data into the new directory.
711            let (leaf, qc) = inner
712                .load_anchor_leaf()?
713                .context("anchor leaf file exists but unable to load contents")?;
714            let view = leaf.view_number().u64();
715            let bytes = bincode::serialize(&(leaf, qc))?;
716            let new_file = path.join(view.to_string()).with_extension("txt");
717            inner
718                .replace(
719                    &new_file,
720                    |_| Ok(true),
721                    |mut file| {
722                        file.write_all(&bytes)?;
723                        Ok(())
724                    },
725                )
726                .context(format!("writing anchor leaf file {view}"))?;
727
728            // Now we can remove the old file.
729            fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?;
730        }
731
732        for (info, cert) in leaf_chain {
733            let view = info.leaf.view_number().u64();
734            let file_path = path.join(view.to_string()).with_extension("txt");
735            inner.replace(
736                &file_path,
737                |_| {
738                    // Don't overwrite an existing leaf, but warn about it as this is likely not
739                    // intended behavior from HotShot.
740                    tracing::warn!(view, "duplicate decided leaf");
741                    Ok(false)
742                },
743                |mut file| {
744                    let bytes = bincode::serialize(&(&info.leaf, cert))?;
745                    file.write_all(&bytes)?;
746                    Ok(())
747                },
748            )?;
749        }
750
751        match inner
752            .generate_decide_events(view, deciding_qc, consumer)
753            .await
754        {
755            Err(err) => {
756                // Event processing failure is not an error, since by this point we have at least
757                // managed to persist the decided leaves successfully, and the event processing will
758                // just run again at the next decide.
759                tracing::warn!(?view, "event processing failed: {err:#}");
760            },
761            Ok(intervals) => {
762                if let Err(err) = inner.collect_garbage(view, &intervals) {
763                    // Similarly, garbage collection is not an error. We have done everything we
764                    // strictly needed to do, and GC will run again at the next decide. Log the
765                    // error but do not return it.
766                    tracing::warn!(?view, "GC failed: {err:#}");
767                }
768            },
769        }
770
771        Ok(())
772    }
773
774    async fn load_anchor_leaf(
775        &self,
776    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
777        self.inner.read().await.load_anchor_leaf()
778    }
779
780    async fn load_da_proposal(
781        &self,
782        view: ViewNumber,
783    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
784        self.inner.read().await.load_da_proposal(view)
785    }
786
787    async fn load_vid_share(
788        &self,
789        view: ViewNumber,
790    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
791        self.inner.read().await.load_vid_share(view)
792    }
793
794    async fn append_vid(
795        &self,
796        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
797    ) -> anyhow::Result<()> {
798        let mut inner = self.inner.write().await;
799        let view_number = proposal.data.view_number().u64();
800        let dir_path = inner.vid2_dir_path();
801
802        fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
803
804        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
805        inner.replace(
806            &file_path,
807            |_| {
808                // Don't overwrite an existing share, but warn about it as this is likely not intended
809                // behavior from HotShot.
810                tracing::warn!(view_number, "duplicate VID share");
811                Ok(false)
812            },
813            |mut file| {
814                let proposal_bytes = bincode::serialize(proposal).context("serialize proposal")?;
815                let now = Instant::now();
816                file.write_all(&proposal_bytes)?;
817                self.metrics
818                    .internal_append_vid_duration
819                    .add_point(now.elapsed().as_secs_f64());
820                Ok(())
821            },
822        )
823    }
824
825    async fn append_da(
826        &self,
827        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
828        _vid_commit: VidCommitment,
829    ) -> anyhow::Result<()> {
830        let mut inner = self.inner.write().await;
831        let view_number = proposal.data.view_number().u64();
832        let dir_path = inner.da_dir_path();
833
834        fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
835
836        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
837        inner.replace(
838            &file_path,
839            |_| {
840                // Don't overwrite an existing proposal, but warn about it as this is likely not
841                // intended behavior from HotShot.
842                tracing::warn!(view_number, "duplicate DA proposal");
843                Ok(false)
844            },
845            |mut file| {
846                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
847                let now = Instant::now();
848                file.write_all(&proposal_bytes)?;
849                self.metrics
850                    .internal_append_da_duration
851                    .add_point(now.elapsed().as_secs_f64());
852                Ok(())
853            },
854        )
855    }
856    async fn record_action(
857        &self,
858        view: ViewNumber,
859        _epoch: Option<EpochNumber>,
860        action: HotShotAction,
861    ) -> anyhow::Result<()> {
862        // Todo Remove this after https://github.com/EspressoSystems/espresso-sequencer/issues/1931
863        if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
864            return Ok(());
865        }
866        let mut inner = self.inner.write().await;
867        let path = &inner.voted_view_path();
868        inner.replace(
869            path,
870            |mut file| {
871                let mut bytes = vec![];
872                file.read_to_end(&mut bytes)?;
873                let bytes = bytes
874                    .try_into()
875                    .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
876                let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
877
878                // Overwrite the file if the saved view is older than the new view.
879                Ok(saved_view < view)
880            },
881            |mut file| {
882                file.write_all(&view.u64().to_le_bytes())?;
883                Ok(())
884            },
885        )?;
886
887        if matches!(action, HotShotAction::Vote) {
888            let restart_view_path = &inner.restart_view_path();
889            let restart_view = view + 1;
890            inner.replace(
891                restart_view_path,
892                |mut file| {
893                    let mut bytes = vec![];
894                    file.read_to_end(&mut bytes)?;
895                    let bytes = bytes
896                        .try_into()
897                        .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
898                    let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
899
900                    // Overwrite the file if the saved view is older than the new view.
901                    Ok(saved_view < restart_view)
902                },
903                |mut file| {
904                    file.write_all(&restart_view.u64().to_le_bytes())?;
905                    Ok(())
906                },
907            )?;
908        }
909        Ok(())
910    }
911
912    async fn append_quorum_proposal2(
913        &self,
914        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
915    ) -> anyhow::Result<()> {
916        let mut inner = self.inner.write().await;
917        let view_number = proposal.data.view_number().u64();
918        let dir_path = inner.quorum_proposals2_dir_path();
919
920        fs::create_dir_all(dir_path.clone()).context("failed to create proposals dir")?;
921
922        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
923        inner.replace(
924            &file_path,
925            |_| {
926                // Always overwrite the previous file
927                Ok(true)
928            },
929            |mut file| {
930                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
931                let now = Instant::now();
932                file.write_all(&proposal_bytes)?;
933                self.metrics
934                    .internal_append_quorum2_duration
935                    .add_point(now.elapsed().as_secs_f64());
936                Ok(())
937            },
938        )
939    }
940    async fn load_quorum_proposals(
941        &self,
942    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
943    {
944        let inner = self.inner.read().await;
945
946        // First, get the proposal directory.
947        let dir_path = inner.quorum_proposals2_dir_path();
948        if !dir_path.is_dir() {
949            return Ok(Default::default());
950        }
951
952        // Read quorum proposals from every data file in this directory.
953        let mut map = BTreeMap::new();
954        for (view, path) in view_files(&dir_path)? {
955            let proposal_bytes = fs::read(path)?;
956            let Some(proposal) = bincode::deserialize::<
957                Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
958            >(&proposal_bytes)
959            .or_else(|error| {
960                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
961                    &proposal_bytes,
962                )
963                .map(convert_proposal)
964                .inspect_err(|err_v3| {
965                    // At this point, if the file contents are invalid, it is most likely an
966                    // error rather than a miscellaneous file somehow ending up in the
967                    // directory. However, we continue on, because it is better to collect as
968                    // many proposals as we can rather than letting one bad proposal cause the
969                    // entire operation to fail, and it is still possible that this was just
970                    // some unintended file whose name happened to match the naming convention.
971
972                    tracing::warn!(
973                        ?view,
974                        %error,
975                        error_v3 = %err_v3,
976                        "ignoring malformed quorum proposal file"
977                    );
978                })
979            })
980            .ok() else {
981                continue;
982            };
983
984            let proposal2 = convert_proposal(proposal);
985
986            // Push to the map and we're done.
987            map.insert(view, proposal2);
988        }
989
990        Ok(map)
991    }
992
993    async fn load_quorum_proposal(
994        &self,
995        view: ViewNumber,
996    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
997        let inner = self.inner.read().await;
998        let dir_path = inner.quorum_proposals2_dir_path();
999        let file_path = dir_path.join(view.to_string()).with_extension("txt");
1000        let bytes = fs::read(file_path)?;
1001        let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1002            bincode::deserialize(&bytes).or_else(|error| {
1003                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1004                    &bytes,
1005                )
1006                .map(convert_proposal)
1007                .context(format!(
1008                    "Failed to deserialize quorum proposal for view {view:?}: {error}."
1009                ))
1010            })?;
1011        Ok(proposal)
1012    }
1013
1014    async fn load_upgrade_certificate(
1015        &self,
1016    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1017        let inner = self.inner.read().await;
1018        let path = inner.upgrade_certificate_dir_path();
1019        if !path.is_file() {
1020            return Ok(None);
1021        }
1022        let bytes = fs::read(&path).context("read")?;
1023        Ok(Some(
1024            bincode::deserialize(&bytes).context("deserialize upgrade certificate")?,
1025        ))
1026    }
1027
1028    async fn store_upgrade_certificate(
1029        &self,
1030        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1031    ) -> anyhow::Result<()> {
1032        let mut inner = self.inner.write().await;
1033        let path = &inner.upgrade_certificate_dir_path();
1034        let certificate = match decided_upgrade_certificate {
1035            Some(cert) => cert,
1036            None => return Ok(()),
1037        };
1038        inner.replace(
1039            path,
1040            |_| {
1041                // Always overwrite the previous file.
1042                Ok(true)
1043            },
1044            |mut file| {
1045                let bytes =
1046                    bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1047                file.write_all(&bytes)?;
1048                Ok(())
1049            },
1050        )
1051    }
1052
1053    async fn store_next_epoch_quorum_certificate(
1054        &self,
1055        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1056    ) -> anyhow::Result<()> {
1057        let mut inner = self.inner.write().await;
1058        let path = &inner.next_epoch_qc();
1059
1060        inner.replace(
1061            path,
1062            |_| {
1063                // Always overwrite the previous file.
1064                Ok(true)
1065            },
1066            |mut file| {
1067                let bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1068                file.write_all(&bytes)?;
1069                Ok(())
1070            },
1071        )
1072    }
1073
1074    async fn load_next_epoch_quorum_certificate(
1075        &self,
1076    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1077        let inner = self.inner.read().await;
1078        let path = inner.next_epoch_qc();
1079        if !path.is_file() {
1080            return Ok(None);
1081        }
1082        let bytes = fs::read(&path).context("read")?;
1083        Ok(Some(
1084            bincode::deserialize(&bytes).context("deserialize next epoch qc")?,
1085        ))
1086    }
1087
1088    async fn store_eqc(
1089        &self,
1090        high_qc: QuorumCertificate2<SeqTypes>,
1091        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1092    ) -> anyhow::Result<()> {
1093        let mut inner = self.inner.write().await;
1094        let path = &inner.eqc();
1095
1096        inner.replace(
1097            path,
1098            |_| {
1099                // Always overwrite the previous file.
1100                Ok(true)
1101            },
1102            |mut file| {
1103                let bytes = bincode::serialize(&(high_qc, next_epoch_high_qc))
1104                    .context("serializing next epoch qc")?;
1105                file.write_all(&bytes)?;
1106                Ok(())
1107            },
1108        )
1109    }
1110
1111    async fn load_eqc(
1112        &self,
1113    ) -> Option<(
1114        QuorumCertificate2<SeqTypes>,
1115        NextEpochQuorumCertificate2<SeqTypes>,
1116    )> {
1117        let inner = self.inner.read().await;
1118        let path = inner.eqc();
1119        if !path.is_file() {
1120            return None;
1121        }
1122        let bytes = fs::read(&path).ok()?;
1123
1124        bincode::deserialize(&bytes).ok()
1125    }
1126
1127    async fn append_da2(
1128        &self,
1129        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1130        _vid_commit: VidCommitment,
1131    ) -> anyhow::Result<()> {
1132        let mut inner = self.inner.write().await;
1133        let view_number = proposal.data.view_number().u64();
1134        let dir_path = inner.da2_dir_path();
1135
1136        fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
1137
1138        let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
1139        inner.replace(
1140            &file_path,
1141            |_| {
1142                // Don't overwrite an existing proposal, but warn about it as this is likely not
1143                // intended behavior from HotShot.
1144                tracing::warn!(view_number, "duplicate DA proposal");
1145                Ok(false)
1146            },
1147            |mut file| {
1148                let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1149                let now = Instant::now();
1150                file.write_all(&proposal_bytes)?;
1151                self.metrics
1152                    .internal_append_da2_duration
1153                    .add_point(now.elapsed().as_secs_f64());
1154                Ok(())
1155            },
1156        )
1157    }
1158
1159    async fn append_proposal2(
1160        &self,
1161        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1162    ) -> anyhow::Result<()> {
1163        self.append_quorum_proposal2(proposal).await
1164    }
1165
1166    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1167        let mut inner = self.inner.write().await;
1168
1169        if inner.migrated.contains("anchor_leaf") {
1170            tracing::info!("decided leaves already migrated");
1171            return Ok(());
1172        }
1173
1174        let new_leaf_dir = inner.decided_leaf2_path();
1175
1176        fs::create_dir_all(new_leaf_dir.clone()).context("failed to create anchor leaf 2  dir")?;
1177
1178        let old_leaf_dir = inner.decided_leaf_path();
1179        if !old_leaf_dir.is_dir() {
1180            return Ok(());
1181        }
1182
1183        tracing::warn!("migrating decided leaves..");
1184        for entry in fs::read_dir(old_leaf_dir)? {
1185            let entry = entry?;
1186            let path = entry.path();
1187
1188            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1189                continue;
1190            };
1191            let Ok(view) = file.parse::<u64>() else {
1192                continue;
1193            };
1194
1195            let bytes =
1196                fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
1197            let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
1198                .context(format!("parsing decided leaf {}", path.display()))?;
1199
1200            let leaf2: Leaf2 = leaf.into();
1201            let cert = CertificatePair::non_epoch_change(qc.to_qc2());
1202
1203            let new_leaf_path = new_leaf_dir.join(view.to_string()).with_extension("txt");
1204
1205            inner.replace(
1206                &new_leaf_path,
1207                |_| {
1208                    tracing::warn!(view, "duplicate decided leaf");
1209                    Ok(false)
1210                },
1211                |mut file| {
1212                    let bytes = bincode::serialize(&(&leaf2.clone(), cert))?;
1213                    file.write_all(&bytes)?;
1214                    Ok(())
1215                },
1216            )?;
1217
1218            if view % 100 == 0 {
1219                tracing::info!(view, "decided leaves migration progress");
1220            }
1221        }
1222
1223        inner.migrated.insert("anchor_leaf".to_string());
1224        inner.update_migration()?;
1225        tracing::warn!("successfully migrated decided leaves");
1226        Ok(())
1227    }
1228    async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1229        let mut inner = self.inner.write().await;
1230
1231        if inner.migrated.contains("da_proposal") {
1232            tracing::info!("da proposals already migrated");
1233            return Ok(());
1234        }
1235
1236        let new_da_dir = inner.da2_dir_path();
1237
1238        fs::create_dir_all(new_da_dir.clone()).context("failed to create da proposals 2 dir")?;
1239
1240        let old_da_dir = inner.da_dir_path();
1241        if !old_da_dir.is_dir() {
1242            return Ok(());
1243        }
1244
1245        tracing::warn!("migrating da proposals..");
1246
1247        for entry in fs::read_dir(old_da_dir)? {
1248            let entry = entry?;
1249            let path = entry.path();
1250
1251            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1252                continue;
1253            };
1254            let Ok(view) = file.parse::<u64>() else {
1255                continue;
1256            };
1257
1258            let bytes =
1259                fs::read(&path).context(format!("reading da proposal {}", path.display()))?;
1260            let proposal = bincode::deserialize::<Proposal<SeqTypes, DaProposal<SeqTypes>>>(&bytes)
1261                .context(format!("parsing da proposal {}", path.display()))?;
1262
1263            let new_da_path = new_da_dir.join(view.to_string()).with_extension("txt");
1264
1265            let proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> = convert_proposal(proposal);
1266
1267            inner.replace(
1268                &new_da_path,
1269                |_| {
1270                    tracing::warn!(view, "duplicate DA proposal 2");
1271                    Ok(false)
1272                },
1273                |mut file| {
1274                    let bytes = bincode::serialize(&proposal2)?;
1275                    file.write_all(&bytes)?;
1276                    Ok(())
1277                },
1278            )?;
1279
1280            if view % 100 == 0 {
1281                tracing::info!(view, "DA proposals migration progress");
1282            }
1283        }
1284
1285        inner.migrated.insert("da_proposal".to_string());
1286        inner.update_migration()?;
1287        tracing::warn!("successfully migrated da proposals");
1288        Ok(())
1289    }
1290    async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1291        let mut inner = self.inner.write().await;
1292
1293        if inner.migrated.contains("vid_share") {
1294            tracing::info!("vid shares already migrated");
1295            return Ok(());
1296        }
1297
1298        let new_vid_dir = inner.vid2_dir_path();
1299
1300        fs::create_dir_all(new_vid_dir.clone()).context("failed to create vid shares 2 dir")?;
1301
1302        let old_vid_dir = inner.vid_dir_path();
1303        if !old_vid_dir.is_dir() {
1304            return Ok(());
1305        }
1306
1307        tracing::warn!("migrating vid shares..");
1308
1309        for entry in fs::read_dir(old_vid_dir)? {
1310            let entry = entry?;
1311            let path = entry.path();
1312
1313            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1314                continue;
1315            };
1316            let Ok(view) = file.parse::<u64>() else {
1317                continue;
1318            };
1319
1320            let bytes = fs::read(&path).context(format!("reading vid share {}", path.display()))?;
1321            let proposal =
1322                bincode::deserialize::<Proposal<SeqTypes, VidDisperseShare0<SeqTypes>>>(&bytes)
1323                    .context(format!("parsing vid share {}", path.display()))?;
1324
1325            let new_vid_path = new_vid_dir.join(view.to_string()).with_extension("txt");
1326
1327            let proposal2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1328                convert_proposal(proposal);
1329
1330            inner.replace(
1331                &new_vid_path,
1332                |_| {
1333                    tracing::warn!(view, "duplicate VID share ");
1334                    Ok(false)
1335                },
1336                |mut file| {
1337                    let bytes = bincode::serialize(&proposal2)?;
1338                    file.write_all(&bytes)?;
1339                    Ok(())
1340                },
1341            )?;
1342
1343            if view % 100 == 0 {
1344                tracing::info!(view, "VID shares migration progress");
1345            }
1346        }
1347
1348        inner.migrated.insert("vid_share".to_string());
1349        inner.update_migration()?;
1350        tracing::warn!("successfully migrated vid shares");
1351        Ok(())
1352    }
1353
1354    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1355        let mut inner = self.inner.write().await;
1356
1357        if inner.migrated.contains("quorum_proposals") {
1358            tracing::info!("quorum proposals already migrated");
1359            return Ok(());
1360        }
1361
1362        let new_quorum_proposals_dir = inner.quorum_proposals2_dir_path();
1363
1364        fs::create_dir_all(new_quorum_proposals_dir.clone())
1365            .context("failed to create quorum proposals 2 dir")?;
1366
1367        let old_quorum_proposals_dir = inner.quorum_proposals_dir_path();
1368        if !old_quorum_proposals_dir.is_dir() {
1369            tracing::info!("no existing quorum proposals found for migration");
1370            return Ok(());
1371        }
1372
1373        tracing::warn!("migrating quorum proposals..");
1374        for entry in fs::read_dir(old_quorum_proposals_dir)? {
1375            let entry = entry?;
1376            let path = entry.path();
1377
1378            let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1379                continue;
1380            };
1381            let Ok(view) = file.parse::<u64>() else {
1382                continue;
1383            };
1384
1385            let bytes =
1386                fs::read(&path).context(format!("reading quorum proposal {}", path.display()))?;
1387            let proposal =
1388                bincode::deserialize::<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>(&bytes)
1389                    .context(format!("parsing quorum proposal {}", path.display()))?;
1390
1391            let new_file_path = new_quorum_proposals_dir
1392                .join(view.to_string())
1393                .with_extension("txt");
1394
1395            let proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1396                convert_proposal(proposal);
1397
1398            inner.replace(
1399                &new_file_path,
1400                |_| {
1401                    tracing::warn!(view, "duplicate Quorum proposal2 ");
1402                    Ok(false)
1403                },
1404                |mut file| {
1405                    let bytes = bincode::serialize(&proposal2)?;
1406                    file.write_all(&bytes)?;
1407                    Ok(())
1408                },
1409            )?;
1410
1411            if view % 100 == 0 {
1412                tracing::info!(view, "Quorum proposals migration progress");
1413            }
1414        }
1415
1416        inner.migrated.insert("quorum_proposals".to_string());
1417        inner.update_migration()?;
1418        tracing::warn!("successfully migrated quorum proposals");
1419        Ok(())
1420    }
1421    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1422        Ok(())
1423    }
1424
1425    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1426        if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
1427            if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
1428                tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
1429            } else if loaded_drb_input.iteration >= drb_input.iteration {
1430                anyhow::bail!(
1431                    "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
1432                    loaded_drb_input,
1433                    drb_input
1434                )
1435            }
1436        }
1437
1438        let mut inner = self.inner.write().await;
1439        let dir_path = inner.drb_dir_path();
1440
1441        fs::create_dir_all(dir_path.clone()).context("failed to create drb dir")?;
1442
1443        let drb_input_bytes =
1444            bincode::serialize(&drb_input).context("failed to serialize drb_input")?;
1445
1446        let file_path = dir_path
1447            .join(drb_input.epoch.to_string())
1448            .with_extension("bin");
1449
1450        inner.replace(
1451            &file_path,
1452            |_| {
1453                // Always overwrite the previous file.
1454                Ok(true)
1455            },
1456            |mut file| {
1457                file.write_all(&drb_input_bytes).context(format!(
1458                    "writing epoch drb_input file for epoch {:?} at {:?}",
1459                    drb_input.epoch, file_path
1460                ))
1461            },
1462        )
1463    }
1464
1465    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1466        let inner = self.inner.read().await;
1467        let path = &inner.drb_dir_path();
1468        let file_path = path.join(epoch.to_string()).with_extension("bin");
1469        let bytes = fs::read(&file_path).context("read")?;
1470        Ok(bincode::deserialize(&bytes)
1471            .context(format!("failed to deserialize DrbInput for epoch {epoch}"))?)
1472    }
1473
1474    async fn store_drb_result(
1475        &self,
1476        epoch: EpochNumber,
1477        drb_result: DrbResult,
1478    ) -> anyhow::Result<()> {
1479        let mut inner = self.inner.write().await;
1480        let dir_path = inner.epoch_drb_result_dir_path();
1481
1482        fs::create_dir_all(dir_path.clone()).context("failed to create epoch drb result dir")?;
1483
1484        let drb_result_bytes = bincode::serialize(&drb_result).context("serialize drb result")?;
1485
1486        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1487
1488        inner.replace(
1489            &file_path,
1490            |_| {
1491                // Always overwrite the previous file.
1492                Ok(true)
1493            },
1494            |mut file| {
1495                file.write_all(&drb_result_bytes)
1496                    .context(format!("writing epoch drb result file for epoch {epoch:?}"))
1497            },
1498        )
1499    }
1500
1501    async fn store_epoch_root(
1502        &self,
1503        epoch: EpochNumber,
1504        block_header: <SeqTypes as NodeType>::BlockHeader,
1505    ) -> anyhow::Result<()> {
1506        let mut inner = self.inner.write().await;
1507        let dir_path = inner.epoch_root_block_header_dir_path();
1508
1509        fs::create_dir_all(dir_path.clone())
1510            .context("failed to create epoch root block header dir")?;
1511
1512        let block_header_bytes =
1513            bincode::serialize(&block_header).context("serialize block header")?;
1514
1515        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1516        inner
1517            .replace(
1518                &file_path,
1519                |_| Ok(true),
1520                |mut file| {
1521                    file.write_all(&block_header_bytes)?;
1522                    Ok(())
1523                },
1524            )
1525            .context(format!(
1526                "writing epoch root block header file for epoch {epoch:?}"
1527            ))?;
1528
1529        Ok(())
1530    }
1531
1532    async fn add_state_cert(
1533        &self,
1534        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1535    ) -> anyhow::Result<()> {
1536        let mut inner = self.inner.write().await;
1537        // let epoch = state_cert.epoch;
1538        let view = state_cert.light_client_state.view_number;
1539        let dir_path = inner.state_cert_dir_path();
1540
1541        fs::create_dir_all(dir_path.clone())
1542            .context("failed to create light client state update certificate dir")?;
1543
1544        let bytes = bincode::serialize(&state_cert)
1545            .context("serialize light client state update certificate")?;
1546
1547        let file_path = dir_path.join(view.to_string()).with_extension("txt");
1548        inner
1549            .replace(
1550                &file_path,
1551                |_| Ok(true),
1552                |mut file| {
1553                    file.write_all(&bytes)?;
1554                    Ok(())
1555                },
1556            )
1557            .context(format!(
1558                "writing light client state update certificate file for view {view:?}"
1559            ))?;
1560
1561        Ok(())
1562    }
1563
1564    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
1565        let inner = self.inner.read().await;
1566        let drb_dir_path = inner.epoch_drb_result_dir_path();
1567        let block_header_dir_path = inner.epoch_root_block_header_dir_path();
1568
1569        let mut result = Vec::new();
1570
1571        if !drb_dir_path.is_dir() {
1572            return Ok(Vec::new());
1573        }
1574        for (epoch, path) in epoch_files(drb_dir_path)? {
1575            let bytes =
1576                fs::read(&path).context(format!("reading epoch drb result {}", path.display()))?;
1577            let drb_result = bincode::deserialize::<DrbResult>(&bytes)
1578                .context(format!("parsing epoch drb result {}", path.display()))?;
1579
1580            let block_header_path = block_header_dir_path
1581                .join(epoch.to_string())
1582                .with_extension("txt");
1583            let block_header = if block_header_path.is_file() {
1584                let bytes = fs::read(&block_header_path).context(format!(
1585                    "reading epoch root block header {}",
1586                    block_header_path.display()
1587                ))?;
1588                Some(
1589                    bincode::deserialize::<<SeqTypes as NodeType>::BlockHeader>(&bytes).context(
1590                        format!(
1591                            "parsing epoch root block header {}",
1592                            block_header_path.display()
1593                        ),
1594                    )?,
1595                )
1596            } else {
1597                None
1598            };
1599
1600            result.push(InitializerEpochInfo::<SeqTypes> {
1601                epoch,
1602                drb_result,
1603                block_header,
1604            });
1605        }
1606
1607        result.sort_by(|a, b| a.epoch.cmp(&b.epoch));
1608
1609        // Keep only the most recent epochs
1610        let start = result
1611            .len()
1612            .saturating_sub(RECENT_STAKE_TABLES_LIMIT as usize);
1613        let recent = result[start..].to_vec();
1614
1615        Ok(recent)
1616    }
1617
1618    async fn load_state_cert(
1619        &self,
1620    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1621        let inner = self.inner.read().await;
1622        let dir_path = inner.finalized_state_cert_dir_path();
1623
1624        if !dir_path.is_dir() {
1625            return Ok(None);
1626        }
1627
1628        let mut result: Option<LightClientStateUpdateCertificateV2<SeqTypes>> = None;
1629
1630        for (epoch, path) in epoch_files(dir_path)? {
1631            if result.as_ref().is_some_and(|cert| epoch <= cert.epoch) {
1632                continue;
1633            }
1634            let bytes = fs::read(&path).context(format!(
1635                "reading light client state update certificate {}",
1636                path.display()
1637            ))?;
1638            let cert =
1639                bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1640                    .or_else(|error| {
1641                        tracing::info!(
1642                            %error,
1643                            path = %path.display(),
1644                            "Failed to deserialize LightClientStateUpdateCertificateV2"
1645                        );
1646
1647                        bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(
1648                            &bytes,
1649                        )
1650                        .map(Into::into)
1651                        .with_context(|| {
1652                            format!(
1653                                "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1654                                path.display()
1655                            )
1656                        })
1657                    })?;
1658
1659            result = Some(cert);
1660        }
1661
1662        Ok(result)
1663    }
1664
1665    async fn get_state_cert_by_epoch(
1666        &self,
1667        epoch: u64,
1668    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1669        let inner = self.inner.read().await;
1670        let dir_path = inner.finalized_state_cert_dir_path();
1671
1672        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1673
1674        if !file_path.exists() {
1675            return Ok(None);
1676        }
1677
1678        let bytes = fs::read(&file_path).context(format!(
1679            "reading light client state update certificate {}",
1680            file_path.display()
1681        ))?;
1682
1683        let cert = bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1684            .or_else(|error| {
1685                tracing::info!(
1686                    %error,
1687                    path = %file_path.display(),
1688                    "Failed to deserialize LightClientStateUpdateCertificateV2"
1689                );
1690
1691                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
1692                    .map(Into::into)
1693                    .with_context(|| {
1694                        format!(
1695                            "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1696                            file_path.display()
1697                        )
1698                    })
1699            })?;
1700
1701        Ok(Some(cert))
1702    }
1703
1704    async fn insert_state_cert(
1705        &self,
1706        epoch: u64,
1707        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1708    ) -> anyhow::Result<()> {
1709        let inner = self.inner.read().await;
1710        let dir_path = inner.finalized_state_cert_dir_path();
1711
1712        fs::create_dir_all(&dir_path)
1713            .context(format!("creating state cert dir {}", dir_path.display()))?;
1714
1715        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1716        let bytes = bincode::serialize(&cert)
1717            .context("serializing light client state update certificate")?;
1718
1719        fs::write(&file_path, bytes).context(format!(
1720            "writing light client state update certificate {}",
1721            file_path.display()
1722        ))?;
1723
1724        Ok(())
1725    }
1726
1727    fn enable_metrics(&mut self, _metrics: &dyn Metrics) {
1728        // todo!()
1729    }
1730}
1731
1732#[async_trait]
1733impl MembershipPersistence for Persistence {
1734    async fn load_stake(
1735        &self,
1736        epoch: EpochNumber,
1737    ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>> {
1738        let inner = self.inner.read().await;
1739        let path = &inner.stake_table_dir_path();
1740        let file_path = path.join(epoch.to_string()).with_extension("txt");
1741
1742        if !file_path.exists() {
1743            return Ok(None);
1744        }
1745
1746        let bytes = fs::read(&file_path).with_context(|| {
1747            format!("failed to read stake table file at {}", file_path.display())
1748        })?;
1749
1750        let stake = match bincode::deserialize(&bytes) {
1751            Ok(res) => res,
1752            Err(err) => {
1753                let map = bincode::deserialize::<ValidatorMap>(&bytes).with_context(|| {
1754                    format!(
1755                        "fallback deserialization of legacy stake table at {} failed after \
1756                         initial error: {}",
1757                        file_path.display(),
1758                        err
1759                    )
1760                })?;
1761                (map, None, None)
1762            },
1763        };
1764
1765        Ok(Some(stake))
1766    }
1767
1768    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
1769        let limit = limit as usize;
1770        let inner = self.inner.read().await;
1771        let path = &inner.stake_table_dir_path();
1772        let sorted_files = epoch_files(&path)?
1773            .sorted_by(|(e1, _), (e2, _)| e2.cmp(e1))
1774            .take(limit);
1775        let mut validator_sets: Vec<IndexedStake> = Vec::new();
1776
1777        for (epoch, file_path) in sorted_files {
1778            let bytes = fs::read(&file_path).with_context(|| {
1779                format!("failed to read stake table file at {}", file_path.display())
1780            })?;
1781
1782            let stake: (ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>) =
1783                match bincode::deserialize::<(
1784                    ValidatorMap,
1785                    Option<RewardAmount>,
1786                    Option<StakeTableHash>,
1787                )>(&bytes)
1788                {
1789                    Ok(res) => res,
1790                    Err(err) => {
1791                        let validatormap = bincode::deserialize::<ValidatorMap>(&bytes)
1792                            .with_context(|| {
1793                                format!(
1794                                    "failed to deserialize legacy stake table at {}: fallback \
1795                                     also failed after initial error: {}",
1796                                    file_path.display(),
1797                                    err
1798                                )
1799                            })?;
1800
1801                        (validatormap, None, None)
1802                    },
1803                };
1804
1805            validator_sets.push((epoch, (stake.0, stake.1), stake.2));
1806        }
1807
1808        Ok(Some(validator_sets))
1809    }
1810
1811    async fn store_stake(
1812        &self,
1813        epoch: EpochNumber,
1814        stake: ValidatorMap,
1815        block_reward: Option<RewardAmount>,
1816        stake_table_hash: Option<StakeTableHash>,
1817    ) -> anyhow::Result<()> {
1818        let mut inner = self.inner.write().await;
1819        let dir_path = &inner.stake_table_dir_path();
1820
1821        fs::create_dir_all(dir_path.clone()).context("failed to create stake table dir")?;
1822
1823        let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1824
1825        inner.replace(
1826            &file_path,
1827            |_| {
1828                // Always overwrite the previous file.
1829                Ok(true)
1830            },
1831            |mut file| {
1832                let bytes = bincode::serialize(&(stake, block_reward, stake_table_hash))
1833                    .context("serializing combined stake table")?;
1834                file.write_all(&bytes)?;
1835                Ok(())
1836            },
1837        )
1838    }
1839
1840    /// store stake table events upto the l1 block
1841    async fn store_events(
1842        &self,
1843        to_l1_block: u64,
1844        events: Vec<(EventKey, StakeTableEvent)>,
1845    ) -> anyhow::Result<()> {
1846        if events.is_empty() {
1847            return Ok(());
1848        }
1849
1850        let mut inner = self.inner.write().await;
1851        let dir_path = &inner.stake_table_dir_path();
1852        let events_dir = dir_path.join("events");
1853
1854        fs::create_dir_all(events_dir.clone()).context("failed to create events dir")?;
1855        // Read the last l1 finalized for which events has been stored
1856        let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1857
1858        // check if the last l1 events is higher than the incoming one
1859        if last_l1_finalized_path.exists() {
1860            let bytes = fs::read(&last_l1_finalized_path).with_context(|| {
1861                format!("Failed to read file at path: {last_l1_finalized_path:?}")
1862            })?;
1863            let mut buf = [0; 8];
1864            bytes
1865                .as_slice()
1866                .read_exact(&mut buf[..8])
1867                .with_context(|| {
1868                    format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1869                })?;
1870            let persisted_l1_block = u64::from_le_bytes(buf);
1871            if persisted_l1_block > to_l1_block {
1872                tracing::debug!(?persisted_l1_block, ?to_l1_block, "stored l1 is greater");
1873                return Ok(());
1874            }
1875        }
1876
1877        // stores each event in a separate file
1878        // this can cause performance issue when, for example, reading all the files
1879        // However, the plan is to remove file system completely in future
1880        for (event_key, event) in events {
1881            let (block_number, event_index) = event_key;
1882            // file name is like block_index.json
1883            let filename = format!("{block_number}_{event_index}");
1884            let file_path = events_dir.join(filename).with_extension("json");
1885
1886            if file_path.exists() {
1887                continue;
1888            }
1889
1890            inner
1891                .replace(
1892                    &file_path,
1893                    |_| Ok(true),
1894                    |file| {
1895                        let writer = BufWriter::new(file);
1896
1897                        serde_json::to_writer_pretty(writer, &event)?;
1898                        Ok(())
1899                    },
1900                )
1901                .context("Failed to write event to file")?;
1902        }
1903
1904        // update the l1 block for which we have processed events
1905        inner.replace(
1906            &last_l1_finalized_path,
1907            |_| Ok(true),
1908            |mut file| {
1909                let bytes = to_l1_block.to_le_bytes();
1910
1911                file.write_all(&bytes)?;
1912                tracing::debug!("updated l1 finalized ={to_l1_block:?}");
1913                Ok(())
1914            },
1915        )
1916    }
1917
1918    /// Loads all events from persistent storage up to the specified L1 block.
1919    ///
1920    /// # Returns
1921    ///
1922    /// Returns a tuple containing:
1923    /// - `Option<u64>` - The queried L1 block for which all events have been successfully fetched.
1924    /// - `Vec<(EventKey, StakeTableEvent)>` - A list of events, where each entry is a tuple of the event key
1925    /// event key is (l1 block number, log index)
1926    ///   and the corresponding StakeTable event.
1927    ///
1928    async fn load_events(
1929        &self,
1930        from_l1_block: u64,
1931        to_l1_block: u64,
1932    ) -> anyhow::Result<(
1933        Option<EventsPersistenceRead>,
1934        Vec<(EventKey, StakeTableEvent)>,
1935    )> {
1936        let inner = self.inner.read().await;
1937        let dir_path = inner.stake_table_dir_path();
1938        let events_dir = dir_path.join("events");
1939
1940        // check if we have any events in storage
1941        // we can do this by checking last l1 finalized block for which we processed events
1942        let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1943
1944        if !last_l1_finalized_path.exists() || !events_dir.exists() {
1945            return Ok((None, Vec::new()));
1946        }
1947
1948        let mut events = Vec::new();
1949
1950        let bytes = fs::read(&last_l1_finalized_path)
1951            .with_context(|| format!("Failed to read file at path: {last_l1_finalized_path:?}"))?;
1952        let mut buf = [0; 8];
1953        bytes
1954            .as_slice()
1955            .read_exact(&mut buf[..8])
1956            .with_context(|| {
1957                format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1958            })?;
1959
1960        let last_processed_l1_block = u64::from_le_bytes(buf);
1961
1962        // Determine the L1 block for querying events.
1963        // If the last stored L1 block is greater than the requested block, limit the query to the requested block.
1964        // Otherwise, query up to the last stored block.
1965        let query_l1_block = if last_processed_l1_block > to_l1_block {
1966            to_l1_block
1967        } else {
1968            last_processed_l1_block
1969        };
1970
1971        for entry in fs::read_dir(&events_dir).context("events directory")? {
1972            let entry = entry?;
1973            let path = entry.path();
1974
1975            if !entry.file_type()?.is_file() {
1976                continue;
1977            }
1978
1979            if path
1980                .extension()
1981                .context(format!("extension for path={path:?}"))?
1982                != "json"
1983            {
1984                continue;
1985            }
1986
1987            let filename = path
1988                .file_stem()
1989                .and_then(|f| f.to_str())
1990                .unwrap_or_default();
1991
1992            let parts: Vec<&str> = filename.split('_').collect();
1993            if parts.len() != 2 {
1994                continue;
1995            }
1996
1997            let block_number = parts[0].parse::<u64>()?;
1998            let log_index = parts[1].parse::<u64>()?;
1999
2000            if block_number < from_l1_block || block_number > query_l1_block {
2001                continue;
2002            }
2003
2004            let file =
2005                File::open(&path).context(format!("Failed to open event file. path={path:?}"))?;
2006            let reader = BufReader::new(file);
2007
2008            let event: StakeTableEvent = serde_json::from_reader(reader)
2009                .context(format!("Failed to deserialize event at path={path:?}"))?;
2010
2011            events.push(((block_number, log_index), event));
2012        }
2013
2014        events.sort_by_key(|(key, _)| *key);
2015
2016        if query_l1_block == to_l1_block {
2017            Ok((Some(EventsPersistenceRead::Complete), events))
2018        } else {
2019            Ok((
2020                Some(EventsPersistenceRead::UntilL1Block(query_l1_block)),
2021                events,
2022            ))
2023        }
2024    }
2025
2026    async fn store_all_validators(
2027        &self,
2028        epoch: EpochNumber,
2029        all_validators: ValidatorMap,
2030    ) -> anyhow::Result<()> {
2031        let mut inner = self.inner.write().await;
2032        let dir_path = inner.stake_table_dir_path();
2033        let validators_dir = dir_path.join("validators");
2034
2035        // Ensure validators directory exists
2036        fs::create_dir_all(&validators_dir)
2037            .with_context(|| format!("Failed to create validators dir: {validators_dir:?}"))?;
2038
2039        // Path = validators/epoch_<number>.json
2040        let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2041
2042        inner
2043            .replace(
2044                &file_path,
2045                |_| Ok(true),
2046                |file| {
2047                    let writer = BufWriter::new(file);
2048
2049                    serde_json::to_writer_pretty(writer, &all_validators).with_context(|| {
2050                        format!("Failed to serialize validators for epoch {epoch}")
2051                    })?;
2052                    Ok(())
2053                },
2054            )
2055            .with_context(|| format!("Failed to write validator file: {file_path:?}"))?;
2056
2057        Ok(())
2058    }
2059
2060    async fn load_all_validators(
2061        &self,
2062        epoch: EpochNumber,
2063        offset: u64,
2064        limit: u64,
2065    ) -> anyhow::Result<Vec<Validator<PubKey>>> {
2066        let inner = self.inner.read().await;
2067        let dir_path = inner.stake_table_dir_path();
2068        let validators_dir = dir_path.join("validators");
2069        let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2070
2071        if !file_path.exists() {
2072            bail!("Validator file not found for epoch {epoch}");
2073        }
2074
2075        let file = File::open(&file_path)
2076            .with_context(|| format!("Failed to open validator file: {file_path:?}"))?;
2077        let reader = BufReader::new(file);
2078
2079        let map: ValidatorMap = serde_json::from_reader(reader).with_context(|| {
2080            format!("Failed to deserialize validators at {file_path:?}. epoch = {epoch}")
2081        })?;
2082
2083        let mut values: Vec<Validator<PubKey>> = map.into_values().collect();
2084        values.sort_by_key(|v| v.account);
2085
2086        let start = offset as usize;
2087        let end = (start + limit as usize).min(values.len());
2088
2089        if start >= values.len() {
2090            return Ok(vec![]);
2091        }
2092
2093        Ok(values[start..end].to_vec())
2094    }
2095}
2096
2097#[async_trait]
2098impl DhtPersistentStorage for Persistence {
2099    /// Save the DHT to the file on disk
2100    ///
2101    /// # Errors
2102    /// - If we fail to serialize the records
2103    /// - If we fail to write the serialized records to the file
2104    async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2105        // Bincode-serialize the records
2106        let to_save =
2107            bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2108
2109        // Get the path to save the file to
2110        let path = self.inner.read().await.libp2p_dht_path();
2111
2112        // Create the directory if it doesn't exist
2113        fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
2114            .with_context(|| "failed to create directory")?;
2115
2116        // Get a write lock on the inner struct
2117        let mut inner = self.inner.write().await;
2118
2119        // Save the file, replacing the previous one if it exists
2120        inner
2121            .replace(
2122                &path,
2123                |_| {
2124                    // Always overwrite the previous file
2125                    Ok(true)
2126                },
2127                |mut file| {
2128                    file.write_all(&to_save)
2129                        .with_context(|| "failed to write records to file")?;
2130                    Ok(())
2131                },
2132            )
2133            .with_context(|| "failed to save records to file")?;
2134
2135        Ok(())
2136    }
2137
2138    /// Load the DHT from the file on disk
2139    ///
2140    /// # Errors
2141    /// - If we fail to read the file
2142    /// - If we fail to deserialize the records
2143    async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2144        // Read the contents of the file
2145        let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
2146            .with_context(|| "Failed to read records from file")?;
2147
2148        // Deserialize the contents
2149        let records: Vec<SerializableRecord> =
2150            bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;
2151
2152        Ok(records)
2153    }
2154}
2155
2156/// Get all paths under `dir` whose name is of the form <view number>.txt.
2157fn view_files(
2158    dir: impl AsRef<Path>,
2159) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
2160    Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2161        let dir = dir.as_ref().display();
2162        let entry = entry.ok()?;
2163        if !entry.file_type().ok()?.is_file() {
2164            tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2165            return None;
2166        }
2167        let path = entry.path();
2168        if path.extension()? != "txt" {
2169            tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2170            return None;
2171        }
2172        let file_name = path.file_stem()?;
2173        let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
2174            tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2175            return None;
2176        };
2177        Some((ViewNumber::new(view_number), entry.path().to_owned()))
2178    }))
2179}
2180
2181/// Get all paths under `dir` whose name is of the form <epoch number>.txt.
2182/// Should probably be made generic and merged with view_files.
2183fn epoch_files(
2184    dir: impl AsRef<Path>,
2185) -> anyhow::Result<impl Iterator<Item = (EpochNumber, PathBuf)>> {
2186    Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2187        let dir = dir.as_ref().display();
2188        let entry = entry.ok()?;
2189        if !entry.file_type().ok()?.is_file() {
2190            tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2191            return None;
2192        }
2193        let path = entry.path();
2194        if path.extension()? != "txt" {
2195            tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2196            return None;
2197        }
2198        let file_name = path.file_stem()?;
2199        let Ok(epoch_number) = file_name.to_string_lossy().parse::<u64>() else {
2200            tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2201            return None;
2202        };
2203        Some((EpochNumber::new(epoch_number), entry.path().to_owned()))
2204    }))
2205}
2206
2207#[cfg(test)]
2208mod test {
2209    use std::marker::PhantomData;
2210
2211    use committable::{Commitment, CommitmentBoundsArkless, Committable};
2212    use espresso_types::{Header, Leaf, NodeState, PubKey, ValidatedState};
2213    use hotshot::types::SignatureKey;
2214    use hotshot_example_types::node_types::TestVersions;
2215    use hotshot_query_service::testing::mocks::MockVersions;
2216    use hotshot_types::{
2217        data::QuorumProposal2,
2218        light_client::LightClientState,
2219        simple_certificate::QuorumCertificate,
2220        simple_vote::QuorumData,
2221        traits::{
2222            block_contents::GENESIS_VID_NUM_STORAGE_NODES, node_implementation::Versions,
2223            EncodeBytes,
2224        },
2225        vid::advz::advz_scheme,
2226    };
2227    use jf_advz::VidScheme;
2228    use serde_json::json;
2229    use tempfile::TempDir;
2230    use vbs::version::StaticVersionType;
2231
2232    use super::*;
2233    use crate::{persistence::tests::TestablePersistence, BLSPubKey};
2234
2235    #[async_trait]
2236    impl TestablePersistence for Persistence {
2237        type Storage = TempDir;
2238
2239        async fn tmp_storage() -> Self::Storage {
2240            TempDir::new().unwrap()
2241        }
2242
2243        fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self> {
2244            Options::new(storage.path().into())
2245        }
2246    }
2247
2248    #[test]
2249    fn test_config_migrations_add_builder_urls() {
2250        let before = json!({
2251            "config": {
2252                "builder_url": "https://test:8080",
2253                "start_proposing_view": 1,
2254                "stop_proposing_view": 2,
2255                "start_voting_view": 1,
2256                "stop_voting_view": 2,
2257                "start_proposing_time": 1,
2258                "stop_proposing_time": 2,
2259                "start_voting_time": 1,
2260                "stop_voting_time": 2
2261            }
2262        });
2263        let after = json!({
2264            "config": {
2265                "builder_urls": ["https://test:8080"],
2266                "start_proposing_view": 1,
2267                "stop_proposing_view": 2,
2268                "start_voting_view": 1,
2269                "stop_voting_view": 2,
2270                "start_proposing_time": 1,
2271                "stop_proposing_time": 2,
2272                "start_voting_time": 1,
2273                "stop_voting_time": 2,
2274                "epoch_height": 0,
2275                "drb_difficulty": 0,
2276                "drb_upgrade_difficulty": 0,
2277                "da_committees": [],
2278            }
2279        });
2280
2281        assert_eq!(migrate_network_config(before).unwrap(), after);
2282    }
2283
2284    #[test]
2285    fn test_config_migrations_existing_builder_urls() {
2286        let before = json!({
2287            "config": {
2288                "builder_urls": ["https://test:8080", "https://test:8081"],
2289                "start_proposing_view": 1,
2290                "stop_proposing_view": 2,
2291                "start_voting_view": 1,
2292                "stop_voting_view": 2,
2293                "start_proposing_time": 1,
2294                "stop_proposing_time": 2,
2295                "start_voting_time": 1,
2296                "stop_voting_time": 2,
2297                "epoch_height": 0,
2298                "drb_difficulty": 0,
2299                "drb_upgrade_difficulty": 0,
2300                "da_committees": [],
2301            }
2302        });
2303
2304        assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2305    }
2306
2307    #[test]
2308    fn test_config_migrations_add_upgrade_params() {
2309        let before = json!({
2310            "config": {
2311                "builder_urls": ["https://test:8080", "https://test:8081"]
2312            }
2313        });
2314        let after = json!({
2315            "config": {
2316                "builder_urls": ["https://test:8080", "https://test:8081"],
2317                "start_proposing_view": 9007199254740991u64,
2318                "stop_proposing_view": 0,
2319                "start_voting_view": 9007199254740991u64,
2320                "stop_voting_view": 0,
2321                "start_proposing_time": 9007199254740991u64,
2322                "stop_proposing_time": 0,
2323                "start_voting_time": 9007199254740991u64,
2324                "stop_voting_time": 0,
2325                "epoch_height": 0,
2326                "drb_difficulty": 0,
2327                "drb_upgrade_difficulty": 0,
2328                "da_committees": [],
2329            }
2330        });
2331
2332        assert_eq!(migrate_network_config(before).unwrap(), after);
2333    }
2334
2335    #[test]
2336    fn test_config_migrations_existing_upgrade_params() {
2337        let before = json!({
2338            "config": {
2339                "builder_urls": ["https://test:8080", "https://test:8081"],
2340                "start_proposing_view": 1,
2341                "stop_proposing_view": 2,
2342                "start_voting_view": 1,
2343                "stop_voting_view": 2,
2344                "start_proposing_time": 1,
2345                "stop_proposing_time": 2,
2346                "start_voting_time": 1,
2347                "stop_voting_time": 2,
2348                "epoch_height": 0,
2349                "drb_difficulty": 0,
2350                "drb_upgrade_difficulty": 0,
2351                "da_committees": [],
2352            }
2353        });
2354
2355        assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2356    }
2357
2358    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2359    pub async fn test_consensus_migration() {
2360        let rows = 300;
2361        let tmp = Persistence::tmp_storage().await;
2362        let mut opt = Persistence::options(&tmp);
2363        let storage = opt.create().await.unwrap();
2364
2365        let inner = storage.inner.read().await;
2366
2367        let decided_leaves_path = inner.decided_leaf_path();
2368        fs::create_dir_all(decided_leaves_path.clone()).expect("failed to create proposals dir");
2369
2370        let qp_dir_path = inner.quorum_proposals_dir_path();
2371        fs::create_dir_all(qp_dir_path.clone()).expect("failed to create proposals dir");
2372
2373        let state_cert_dir_path = inner.state_cert_dir_path();
2374        fs::create_dir_all(state_cert_dir_path.clone()).expect("failed to create state cert dir");
2375        drop(inner);
2376
2377        assert!(storage.load_state_cert().await.unwrap().is_none());
2378
2379        for i in 0..rows {
2380            let view = ViewNumber::new(i);
2381            let validated_state = ValidatedState::default();
2382            let instance_state = NodeState::default();
2383
2384            let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
2385            let (payload, metadata) =
2386                Payload::from_transactions([], &validated_state, &instance_state)
2387                    .await
2388                    .unwrap();
2389
2390            let payload_bytes = payload.encode();
2391
2392            let block_header =
2393                Header::genesis::<TestVersions>(&instance_state, payload.clone(), &metadata);
2394
2395            let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
2396                epoch: EpochNumber::new(i),
2397                light_client_state: LightClientState {
2398                    view_number: i,
2399                    block_height: i,
2400                    block_comm_root: Default::default(),
2401                },
2402                next_stake_table_state: Default::default(),
2403                signatures: vec![], // filling arbitrary value
2404                auth_root: Default::default(),
2405            };
2406            assert!(storage.add_state_cert(state_cert).await.is_ok());
2407
2408            let null_quorum_data = QuorumData {
2409                leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
2410            };
2411
2412            let justify_qc = QuorumCertificate::new(
2413                null_quorum_data.clone(),
2414                null_quorum_data.commit(),
2415                view,
2416                None,
2417                PhantomData,
2418            );
2419
2420            let quorum_proposal = QuorumProposal {
2421                block_header,
2422                view_number: view,
2423                justify_qc: justify_qc.clone(),
2424                upgrade_certificate: None,
2425                proposal_certificate: None,
2426            };
2427
2428            let quorum_proposal_signature =
2429                BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2430                    .expect("Failed to sign quorum proposal");
2431
2432            let proposal = Proposal {
2433                data: quorum_proposal.clone(),
2434                signature: quorum_proposal_signature,
2435                _pd: PhantomData,
2436            };
2437
2438            let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
2439            leaf.fill_block_payload::<TestVersions>(
2440                payload,
2441                GENESIS_VID_NUM_STORAGE_NODES,
2442                <TestVersions as Versions>::Base::VERSION,
2443            )
2444            .unwrap();
2445
2446            let mut inner = storage.inner.write().await;
2447
2448            tracing::debug!("inserting decided leaves");
2449            let file_path = decided_leaves_path
2450                .join(view.to_string())
2451                .with_extension("txt");
2452
2453            tracing::debug!("inserting decided leaves");
2454
2455            inner
2456                .replace(
2457                    &file_path,
2458                    |_| Ok(true),
2459                    |mut file| {
2460                        let bytes = bincode::serialize(&(&leaf.clone(), justify_qc))?;
2461                        file.write_all(&bytes)?;
2462                        Ok(())
2463                    },
2464                )
2465                .expect("replace decided leaves");
2466
2467            let file_path = qp_dir_path.join(view.to_string()).with_extension("txt");
2468
2469            tracing::debug!("inserting qc for {view}");
2470
2471            inner
2472                .replace(
2473                    &file_path,
2474                    |_| Ok(true),
2475                    |mut file| {
2476                        let proposal_bytes =
2477                            bincode::serialize(&proposal).context("serialize proposal")?;
2478
2479                        file.write_all(&proposal_bytes)?;
2480                        Ok(())
2481                    },
2482                )
2483                .unwrap();
2484
2485            drop(inner);
2486            let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
2487                .disperse(payload_bytes.clone())
2488                .unwrap();
2489
2490            let vid = VidDisperseShare0::<SeqTypes> {
2491                view_number: ViewNumber::new(i),
2492                payload_commitment: Default::default(),
2493                share: disperse.shares[0].clone(),
2494                common: disperse.common,
2495                recipient_key: pubkey,
2496            };
2497
2498            let (payload, metadata) =
2499                Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
2500                    .await
2501                    .unwrap();
2502
2503            let da = DaProposal::<SeqTypes> {
2504                encoded_transactions: payload.encode(),
2505                metadata,
2506                view_number: ViewNumber::new(i),
2507            };
2508
2509            let block_payload_signature =
2510                BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
2511
2512            let da_proposal = Proposal {
2513                data: da,
2514                signature: block_payload_signature,
2515                _pd: Default::default(),
2516            };
2517
2518            tracing::debug!("inserting vid for {view}");
2519            storage
2520                .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
2521                .await
2522                .unwrap();
2523
2524            tracing::debug!("inserting da for {view}");
2525            storage
2526                .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
2527                .await
2528                .unwrap();
2529        }
2530
2531        storage.migrate_storage().await.unwrap();
2532        let inner = storage.inner.read().await;
2533        let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2534        let decided_leaves_count = decided_leaves
2535            .filter_map(Result::ok)
2536            .filter(|e| e.path().is_file())
2537            .count();
2538        assert_eq!(
2539            decided_leaves_count, rows as usize,
2540            "decided leaves count does not match",
2541        );
2542
2543        let da_proposals = fs::read_dir(inner.da2_dir_path()).unwrap();
2544        let da_proposals_count = da_proposals
2545            .filter_map(Result::ok)
2546            .filter(|e| e.path().is_file())
2547            .count();
2548        assert_eq!(
2549            da_proposals_count, rows as usize,
2550            "da proposals does not match",
2551        );
2552
2553        let vids = fs::read_dir(inner.vid2_dir_path()).unwrap();
2554        let vids_count = vids
2555            .filter_map(Result::ok)
2556            .filter(|e| e.path().is_file())
2557            .count();
2558        assert_eq!(vids_count, rows as usize, "vid shares count does not match",);
2559
2560        let qps = fs::read_dir(inner.quorum_proposals2_dir_path()).unwrap();
2561        let qps_count = qps
2562            .filter_map(Result::ok)
2563            .filter(|e| e.path().is_file())
2564            .count();
2565        assert_eq!(
2566            qps_count, rows as usize,
2567            "quorum proposals count does not match",
2568        );
2569
2570        let state_certs = fs::read_dir(inner.state_cert_dir_path()).unwrap();
2571        let state_cert_count = state_certs
2572            .filter_map(Result::ok)
2573            .filter(|e| e.path().is_file())
2574            .count();
2575        assert_eq!(
2576            state_cert_count, rows as usize,
2577            "light client state update certificate count does not match",
2578        );
2579
2580        // Reinitialize the file system persistence using the same path.
2581        // re run the consensus migration.
2582        // No changes will occur, as the migration has already been completed.
2583        let storage = opt.create().await.unwrap();
2584        storage.migrate_storage().await.unwrap();
2585
2586        let inner = storage.inner.read().await;
2587        let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2588        let decided_leaves_count = decided_leaves
2589            .filter_map(Result::ok)
2590            .filter(|e| e.path().is_file())
2591            .count();
2592        assert_eq!(
2593            decided_leaves_count, rows as usize,
2594            "decided leaves count does not match",
2595        );
2596    }
2597
2598    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2599    async fn test_load_quorum_proposals_invalid_extension() {
2600        let tmp = Persistence::tmp_storage().await;
2601        let storage = Persistence::connect(&tmp).await;
2602
2603        // Generate a couple of valid quorum proposals.
2604        let leaf = Leaf2::genesis::<MockVersions>(&Default::default(), &NodeState::mock()).await;
2605        let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2606        let signature = PubKey::sign(&privkey, &[]).unwrap();
2607        let mut quorum_proposal = Proposal {
2608            data: QuorumProposalWrapper::<SeqTypes> {
2609                proposal: QuorumProposal2::<SeqTypes> {
2610                    epoch: None,
2611                    block_header: leaf.block_header().clone(),
2612                    view_number: ViewNumber::genesis(),
2613                    justify_qc: QuorumCertificate2::genesis::<TestVersions>(
2614                        &Default::default(),
2615                        &NodeState::mock(),
2616                    )
2617                    .await,
2618                    upgrade_certificate: None,
2619                    view_change_evidence: None,
2620                    next_drb_result: None,
2621                    next_epoch_justify_qc: None,
2622                    state_cert: None,
2623                },
2624            },
2625            signature,
2626            _pd: Default::default(),
2627        };
2628
2629        // Store quorum proposals.
2630        let quorum_proposal1 = quorum_proposal.clone();
2631        storage
2632            .append_quorum_proposal2(&quorum_proposal1)
2633            .await
2634            .unwrap();
2635        quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
2636        let quorum_proposal2 = quorum_proposal.clone();
2637        storage
2638            .append_quorum_proposal2(&quorum_proposal2)
2639            .await
2640            .unwrap();
2641
2642        // Change one of the file extensions. It can happen that we end up with files with the wrong
2643        // extension if, for example, the node is killed before cleaning up a swap file.
2644        fs::rename(
2645            tmp.path().join("quorum_proposals2/1.txt"),
2646            tmp.path().join("quorum_proposals2/1.swp"),
2647        )
2648        .unwrap();
2649
2650        // Loading should simply ignore the unrecognized extension.
2651        assert_eq!(
2652            storage.load_quorum_proposals().await.unwrap(),
2653            [(ViewNumber::genesis(), quorum_proposal1)]
2654                .into_iter()
2655                .collect::<BTreeMap<_, _>>()
2656        );
2657    }
2658
2659    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2660    async fn test_load_quorum_proposals_malformed_data() {
2661        let tmp = Persistence::tmp_storage().await;
2662        let storage = Persistence::connect(&tmp).await;
2663
2664        // Generate a valid quorum proposal.
2665        let leaf: Leaf2 = Leaf::genesis::<MockVersions>(&Default::default(), &NodeState::mock())
2666            .await
2667            .into();
2668        let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2669        let signature = PubKey::sign(&privkey, &[]).unwrap();
2670        let quorum_proposal = Proposal {
2671            data: QuorumProposalWrapper::<SeqTypes> {
2672                proposal: QuorumProposal2::<SeqTypes> {
2673                    epoch: None,
2674                    block_header: leaf.block_header().clone(),
2675                    view_number: ViewNumber::new(1),
2676                    justify_qc: QuorumCertificate2::genesis::<TestVersions>(
2677                        &Default::default(),
2678                        &NodeState::mock(),
2679                    )
2680                    .await,
2681                    upgrade_certificate: None,
2682                    view_change_evidence: None,
2683                    next_drb_result: None,
2684                    next_epoch_justify_qc: None,
2685                    state_cert: None,
2686                },
2687            },
2688            signature,
2689            _pd: Default::default(),
2690        };
2691
2692        // First store an invalid quorum proposal.
2693        fs::create_dir_all(tmp.path().join("quorum_proposals2")).unwrap();
2694        fs::write(
2695            tmp.path().join("quorum_proposals2/0.txt"),
2696            "invalid data".as_bytes(),
2697        )
2698        .unwrap();
2699
2700        // Store valid quorum proposal.
2701        storage
2702            .append_quorum_proposal2(&quorum_proposal)
2703            .await
2704            .unwrap();
2705
2706        // Loading should ignore the invalid data and return the valid proposal.
2707        assert_eq!(
2708            storage.load_quorum_proposals().await.unwrap(),
2709            [(ViewNumber::new(1), quorum_proposal)]
2710                .into_iter()
2711                .collect::<BTreeMap<_, _>>()
2712        );
2713    }
2714}