1use std::{
2 collections::{BTreeMap, HashSet},
3 fs::{self, File, OpenOptions},
4 io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
5 ops::RangeInclusive,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Instant,
9};
10
11use anyhow::{anyhow, bail, Context};
12use async_lock::RwLock;
13use async_trait::async_trait;
14use clap::Parser;
15use espresso_types::{
16 traits::{EventsPersistenceRead, MembershipPersistence},
17 v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
18 v0_3::{EventKey, IndexedStake, RewardAmount, StakeTableEvent, Validator},
19 Leaf, Leaf2, NetworkConfig, Payload, PubKey, SeqTypes, StakeTableHash, ValidatorMap,
20};
21use hotshot::InitializerEpochInfo;
22use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
23 DhtPersistentStorage, SerializableRecord,
24};
25use hotshot_types::{
26 data::{
27 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
28 QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare, VidDisperseShare0,
29 },
30 drb::{DrbInput, DrbResult},
31 event::{Event, EventType, HotShotAction, LeafInfo},
32 message::{convert_proposal, Proposal},
33 simple_certificate::{
34 CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
35 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
36 },
37 traits::{
38 block_contents::{BlockHeader, BlockPayload},
39 metrics::Metrics,
40 node_implementation::{ConsensusTime, NodeType},
41 },
42 vote::HasViewNumber,
43};
44use itertools::Itertools;
45
46use crate::{
47 persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
48 ViewNumber, RECENT_STAKE_TABLES_LIMIT,
49};
50
51#[derive(Parser, Clone, Debug)]
53pub struct Options {
54 #[clap(long, env = "ESPRESSO_SEQUENCER_STORAGE_PATH")]
56 path: PathBuf,
57
58 #[clap(
70 long,
71 env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION",
72 default_value = "130000"
73 )]
74 pub(crate) consensus_view_retention: u64,
75}
76
77impl Default for Options {
78 fn default() -> Self {
79 Self::parse_from(std::iter::empty::<String>())
80 }
81}
82
83impl Options {
84 pub fn new(path: PathBuf) -> Self {
85 Self {
86 path,
87 consensus_view_retention: 130000,
88 }
89 }
90
91 pub(crate) fn path(&self) -> &Path {
92 &self.path
93 }
94}
95
96#[async_trait]
97impl PersistenceOptions for Options {
98 type Persistence = Persistence;
99
100 fn set_view_retention(&mut self, view_retention: u64) {
101 self.consensus_view_retention = view_retention;
102 }
103
104 async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
105 let path = self.path.clone();
106 let view_retention = self.consensus_view_retention;
107
108 let migration_path = path.join("migration");
109 let migrated = if migration_path.is_file() {
110 let bytes = fs::read(&migration_path).context(format!(
111 "unable to read migration from {}",
112 migration_path.display()
113 ))?;
114 bincode::deserialize(&bytes).context("malformed migration file")?
115 } else {
116 HashSet::new()
117 };
118
119 Ok(Persistence {
120 inner: Arc::new(RwLock::new(Inner {
121 path,
122 migrated,
123 view_retention,
124 })),
125 metrics: Arc::new(PersistenceMetricsValue::default()),
126 })
127 }
128
129 async fn reset(self) -> anyhow::Result<()> {
130 todo!()
131 }
132}
133
134#[derive(Clone, Debug)]
136pub struct Persistence {
137 inner: Arc<RwLock<Inner>>,
141 metrics: Arc<PersistenceMetricsValue>,
143}
144
145#[derive(Debug)]
146struct Inner {
147 path: PathBuf,
148 view_retention: u64,
149 migrated: HashSet<String>,
150}
151
152impl Inner {
153 fn config_path(&self) -> PathBuf {
154 self.path.join("hotshot.cfg")
155 }
156
157 fn migration(&self) -> PathBuf {
158 self.path.join("migration")
159 }
160
161 fn voted_view_path(&self) -> PathBuf {
162 self.path.join("highest_voted_view")
163 }
164
165 fn restart_view_path(&self) -> PathBuf {
166 self.path.join("restart_view")
167 }
168
169 fn decided_leaf_path(&self) -> PathBuf {
171 self.path.join("decided_leaves")
172 }
173
174 fn decided_leaf2_path(&self) -> PathBuf {
175 self.path.join("decided_leaves2")
176 }
177
178 fn legacy_anchor_leaf_path(&self) -> PathBuf {
180 self.path.join("anchor_leaf")
181 }
182
183 fn vid_dir_path(&self) -> PathBuf {
184 self.path.join("vid")
185 }
186
187 fn vid2_dir_path(&self) -> PathBuf {
188 self.path.join("vid2")
189 }
190
191 fn da_dir_path(&self) -> PathBuf {
192 self.path.join("da")
193 }
194
195 fn drb_dir_path(&self) -> PathBuf {
196 self.path.join("drb")
197 }
198
199 fn da2_dir_path(&self) -> PathBuf {
200 self.path.join("da2")
201 }
202
203 fn quorum_proposals_dir_path(&self) -> PathBuf {
204 self.path.join("quorum_proposals")
205 }
206
207 fn quorum_proposals2_dir_path(&self) -> PathBuf {
208 self.path.join("quorum_proposals2")
209 }
210
211 fn upgrade_certificate_dir_path(&self) -> PathBuf {
212 self.path.join("upgrade_certificate")
213 }
214
215 fn stake_table_dir_path(&self) -> PathBuf {
216 self.path.join("stake_table")
217 }
218
219 fn next_epoch_qc(&self) -> PathBuf {
220 self.path.join("next_epoch_quorum_certificate")
221 }
222
223 fn eqc(&self) -> PathBuf {
224 self.path.join("eqc")
225 }
226
227 fn libp2p_dht_path(&self) -> PathBuf {
228 self.path.join("libp2p_dht")
229 }
230 fn epoch_drb_result_dir_path(&self) -> PathBuf {
231 self.path.join("epoch_drb_result")
232 }
233
234 fn epoch_root_block_header_dir_path(&self) -> PathBuf {
235 self.path.join("epoch_root_block_header")
236 }
237
238 fn finalized_state_cert_dir_path(&self) -> PathBuf {
239 self.path.join("finalized_state_cert")
240 }
241
242 fn state_cert_dir_path(&self) -> PathBuf {
243 self.path.join("state_cert")
244 }
245
246 fn update_migration(&mut self) -> anyhow::Result<()> {
247 let path = self.migration();
248 let bytes = bincode::serialize(&self.migrated)?;
249
250 self.replace(
251 &path,
252 |_| Ok(true),
253 |mut file| {
254 file.write_all(&bytes)?;
255 Ok(())
256 },
257 )
258 }
259
260 fn replace(
270 &mut self,
271 path: &Path,
272 pred: impl FnOnce(File) -> anyhow::Result<bool>,
273 write: impl FnOnce(File) -> anyhow::Result<()>,
274 ) -> anyhow::Result<()> {
275 if path.is_file() {
276 if !pred(File::open(path)?)? {
281 return Ok(());
284 }
285 }
286
287 let mut swap_path = path.to_owned();
290 swap_path.set_extension("swp");
291 let swap = OpenOptions::new()
292 .write(true)
293 .truncate(true)
294 .create(true)
295 .open(&swap_path)?;
296 write(swap)?;
297
298 fs::rename(swap_path, path)?;
300
301 Ok(())
302 }
303
304 fn collect_garbage(
305 &mut self,
306 decided_view: ViewNumber,
307 prune_intervals: &[RangeInclusive<ViewNumber>],
308 ) -> anyhow::Result<()> {
309 let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
310
311 self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?;
312 self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?;
313 self.prune_files(
314 self.quorum_proposals2_dir_path(),
315 prune_view,
316 None,
317 prune_intervals,
318 )?;
319 self.prune_files(
320 self.state_cert_dir_path(),
321 prune_view,
322 None,
323 prune_intervals,
324 )?;
325
326 self.prune_files(
328 self.decided_leaf2_path(),
329 prune_view,
330 Some(decided_view),
331 prune_intervals,
332 )?;
333
334 Ok(())
335 }
336
337 fn prune_files(
338 &mut self,
339 dir_path: PathBuf,
340 prune_view: ViewNumber,
341 keep_decided_view: Option<ViewNumber>,
342 prune_intervals: &[RangeInclusive<ViewNumber>],
343 ) -> anyhow::Result<()> {
344 if !dir_path.is_dir() {
345 return Ok(());
346 }
347
348 for (file_view, path) in view_files(dir_path)? {
349 if let Some(decided_view) = keep_decided_view {
351 if decided_view == file_view {
352 continue;
353 }
354 }
355 if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
359 fs::remove_file(&path)?;
360 }
361 }
362
363 Ok(())
364 }
365
366 fn parse_decided_leaf(
367 &self,
368 bytes: &[u8],
369 ) -> anyhow::Result<(Leaf2, CertificatePair<SeqTypes>)> {
370 match bincode::deserialize(bytes) {
374 Ok((leaf, cert)) => Ok((leaf, cert)),
375 Err(err) => {
376 tracing::info!(
377 "error parsing decided leaf, maybe file was created without next epoch QC? \
378 {err}"
379 );
380 let (leaf, qc) =
381 bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(bytes)
382 .context("parsing decided leaf")?;
383 Ok((leaf, CertificatePair::non_epoch_change(qc)))
384 },
385 }
386 }
387
388 async fn generate_decide_events(
393 &mut self,
394 view: ViewNumber,
395 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
396 consumer: &impl EventConsumer,
397 ) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
398 let mut leaves = BTreeMap::new();
402 for (v, path) in view_files(self.decided_leaf2_path())? {
403 if v > view {
404 continue;
405 }
406
407 let bytes =
408 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
409 let (mut leaf, cert) = self.parse_decided_leaf(&bytes)?;
410
411 let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data);
413 if vid_share.is_none() {
414 tracing::debug!(?v, "VID share not available at decide");
415 }
416
417 let state_cert = self.store_finalized_state_cert(v)?;
419
420 if let Some(proposal) = self.load_da_proposal(v)? {
422 let payload = Payload::from_bytes(
423 &proposal.data.encoded_transactions,
424 &proposal.data.metadata,
425 );
426 leaf.fill_block_payload_unchecked(payload);
427 } else {
428 tracing::debug!(?v, "DA proposal not available at decide");
429 }
430
431 let info = LeafInfo {
432 leaf,
433 vid_share,
434 state_cert,
435 state: Default::default(),
438 delta: Default::default(),
439 };
440
441 leaves.insert(v, (info, cert));
442 }
443
444 if let Some((oldest_view, _)) = leaves.first_key_value() {
448 if *oldest_view > ViewNumber::genesis() {
451 leaves.pop_first();
452 }
453 }
454
455 let mut intervals = vec![];
456 let mut current_interval = None;
457 for (view, (leaf, cert)) in leaves {
458 let height = leaf.leaf.block_header().block_number();
459
460 {
461 let deciding_qc = if let Some(deciding_qc) = &deciding_qc {
464 (deciding_qc.view_number() == cert.view_number() + 1)
465 .then_some(deciding_qc.clone())
466 } else {
467 None
468 };
469
470 consumer
471 .handle_event(&Event {
472 view_number: view,
473 event: EventType::Decide {
474 committing_qc: Arc::new(cert),
475 deciding_qc,
476 leaf_chain: Arc::new(vec![leaf]),
477 block_size: None,
478 },
479 })
480 .await?;
481 }
482 if let Some((start, end, current_height)) = current_interval.as_mut() {
483 if height == *current_height + 1 {
484 *current_height += 1;
487 *end = view;
488 } else {
489 intervals.push(*start..=*end);
491 current_interval = Some((view, view, height));
492 }
493 } else {
494 current_interval = Some((view, view, height));
496 }
497 }
498 if let Some((start, end, _)) = current_interval {
499 intervals.push(start..=end);
500 }
501
502 Ok(intervals)
503 }
504
505 fn load_da_proposal(
506 &self,
507 view: ViewNumber,
508 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
509 let dir_path = self.da2_dir_path();
510
511 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
512
513 if !file_path.exists() {
514 return Ok(None);
515 }
516
517 let da_bytes = fs::read(file_path)?;
518
519 let da_proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
520 bincode::deserialize(&da_bytes)?;
521 Ok(Some(da_proposal))
522 }
523
524 fn load_vid_share(
525 &self,
526 view: ViewNumber,
527 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
528 let dir_path = self.vid2_dir_path();
529
530 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
531
532 if !file_path.exists() {
533 return Ok(None);
534 }
535
536 let vid_share_bytes = fs::read(file_path)?;
537 let vid_share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
538 bincode::deserialize(&vid_share_bytes)?;
539 Ok(Some(vid_share))
540 }
541
542 fn load_anchor_leaf(&self) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
543 tracing::info!("Checking `Leaf2` to load the anchor leaf.");
544 if self.decided_leaf2_path().is_dir() {
545 let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
546
547 for (_, path) in view_files(self.decided_leaf2_path())? {
549 let bytes =
550 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
551 let (leaf, cert) = self.parse_decided_leaf(&bytes)?;
552 if let Some((anchor_leaf, _)) = &anchor {
553 if leaf.view_number() > anchor_leaf.view_number() {
554 anchor = Some((leaf, cert.qc().clone()));
555 }
556 } else {
557 anchor = Some((leaf, cert.qc().clone()));
558 }
559 }
560
561 return Ok(anchor);
562 }
563
564 tracing::warn!(
565 "Failed to find an anchor leaf in `Leaf2` storage. Checking legacy `Leaf` storage. \
566 This is very likely to fail."
567 );
568 if self.legacy_anchor_leaf_path().is_file() {
569 let mut file = BufReader::new(File::open(self.legacy_anchor_leaf_path())?);
572
573 file.seek(SeekFrom::Start(8)).context("seek")?;
575 let bytes = file
576 .bytes()
577 .collect::<Result<Vec<_>, _>>()
578 .context("read")?;
579 return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?));
580 }
581
582 Ok(None)
583 }
584
585 fn store_finalized_state_cert(
586 &mut self,
587 view: ViewNumber,
588 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
589 let dir_path = self.state_cert_dir_path();
590 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
591
592 if !file_path.exists() {
593 return Ok(None);
594 }
595
596 let bytes = fs::read(&file_path)?;
597
598 let state_cert: LightClientStateUpdateCertificateV2<SeqTypes> =
599 bincode::deserialize(&bytes).or_else(|err_v2| {
600 tracing::info!(
601 error = %err_v2,
602 path = %file_path.display(),
603 "Failed to deserialize state certificate, attempting with v1"
604 );
605
606 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
607 .map(Into::into)
608 .with_context(|| {
609 format!(
610 "Failed to deserialize with both v2 and v1 from file '{}'. error: \
611 {err_v2}",
612 file_path.display()
613 )
614 })
615 })?;
616
617 let epoch = state_cert.epoch.u64();
618 let finalized_dir_path = self.finalized_state_cert_dir_path();
619 fs::create_dir_all(&finalized_dir_path).context("creating finalized state cert dir")?;
620
621 let finalized_file_path = finalized_dir_path
622 .join(epoch.to_string())
623 .with_extension("txt");
624
625 self.replace(
626 &finalized_file_path,
627 |_| Ok(true),
628 |mut file| {
629 file.write_all(&bytes)?;
630 Ok(())
631 },
632 )
633 .context(format!(
634 "finalizing light client state update certificate file for epoch {epoch:?}"
635 ))?;
636
637 Ok(Some(state_cert))
638 }
639}
640
641#[async_trait]
642impl SequencerPersistence for Persistence {
643 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
644 let inner = self.inner.read().await;
645 let path = inner.config_path();
646 if !path.is_file() {
647 tracing::info!("config not found at {}", path.display());
648 return Ok(None);
649 }
650 tracing::info!("loading config from {}", path.display());
651
652 let bytes =
653 fs::read(&path).context(format!("unable to read config from {}", path.display()))?;
654 let json = serde_json::from_slice(&bytes).context("config file is not valid JSON")?;
655 let json = migrate_network_config(json).context("migration of network config failed")?;
656 let config = serde_json::from_value(json).context("malformed config file")?;
657 Ok(Some(config))
658 }
659
660 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
661 let inner = self.inner.write().await;
662 let path = inner.config_path();
663 tracing::info!("saving config to {}", path.display());
664 Ok(cfg.to_file(path.display().to_string())?)
665 }
666
667 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
668 let inner = self.inner.read().await;
669 let path = inner.voted_view_path();
670 if !path.is_file() {
671 return Ok(None);
672 }
673 let bytes = fs::read(inner.voted_view_path())?
674 .try_into()
675 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
676 Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
677 }
678
679 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
680 let inner = self.inner.read().await;
681 let path = inner.restart_view_path();
682 if !path.is_file() {
683 return Ok(None);
684 }
685 let bytes = fs::read(path)?
686 .try_into()
687 .map_err(|bytes| anyhow!("malformed restart view file: {bytes:?}"))?;
688 Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
689 }
690
691 async fn append_decided_leaves(
692 &self,
693 view: ViewNumber,
694 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
695 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
696 consumer: &impl EventConsumer,
697 ) -> anyhow::Result<()> {
698 let mut inner = self.inner.write().await;
699 let path = inner.decided_leaf2_path();
700
701 fs::create_dir_all(&path).context("creating anchor leaf directory")?;
703
704 let legacy_path = inner.legacy_anchor_leaf_path();
707 if !path.is_dir() && legacy_path.is_file() {
708 tracing::info!("migrating to multi-leaf storage");
709
710 let (leaf, qc) = inner
712 .load_anchor_leaf()?
713 .context("anchor leaf file exists but unable to load contents")?;
714 let view = leaf.view_number().u64();
715 let bytes = bincode::serialize(&(leaf, qc))?;
716 let new_file = path.join(view.to_string()).with_extension("txt");
717 inner
718 .replace(
719 &new_file,
720 |_| Ok(true),
721 |mut file| {
722 file.write_all(&bytes)?;
723 Ok(())
724 },
725 )
726 .context(format!("writing anchor leaf file {view}"))?;
727
728 fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?;
730 }
731
732 for (info, cert) in leaf_chain {
733 let view = info.leaf.view_number().u64();
734 let file_path = path.join(view.to_string()).with_extension("txt");
735 inner.replace(
736 &file_path,
737 |_| {
738 tracing::warn!(view, "duplicate decided leaf");
741 Ok(false)
742 },
743 |mut file| {
744 let bytes = bincode::serialize(&(&info.leaf, cert))?;
745 file.write_all(&bytes)?;
746 Ok(())
747 },
748 )?;
749 }
750
751 match inner
752 .generate_decide_events(view, deciding_qc, consumer)
753 .await
754 {
755 Err(err) => {
756 tracing::warn!(?view, "event processing failed: {err:#}");
760 },
761 Ok(intervals) => {
762 if let Err(err) = inner.collect_garbage(view, &intervals) {
763 tracing::warn!(?view, "GC failed: {err:#}");
767 }
768 },
769 }
770
771 Ok(())
772 }
773
774 async fn load_anchor_leaf(
775 &self,
776 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
777 self.inner.read().await.load_anchor_leaf()
778 }
779
780 async fn load_da_proposal(
781 &self,
782 view: ViewNumber,
783 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
784 self.inner.read().await.load_da_proposal(view)
785 }
786
787 async fn load_vid_share(
788 &self,
789 view: ViewNumber,
790 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
791 self.inner.read().await.load_vid_share(view)
792 }
793
794 async fn append_vid(
795 &self,
796 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
797 ) -> anyhow::Result<()> {
798 let mut inner = self.inner.write().await;
799 let view_number = proposal.data.view_number().u64();
800 let dir_path = inner.vid2_dir_path();
801
802 fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
803
804 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
805 inner.replace(
806 &file_path,
807 |_| {
808 tracing::warn!(view_number, "duplicate VID share");
811 Ok(false)
812 },
813 |mut file| {
814 let proposal_bytes = bincode::serialize(proposal).context("serialize proposal")?;
815 let now = Instant::now();
816 file.write_all(&proposal_bytes)?;
817 self.metrics
818 .internal_append_vid_duration
819 .add_point(now.elapsed().as_secs_f64());
820 Ok(())
821 },
822 )
823 }
824
825 async fn append_da(
826 &self,
827 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
828 _vid_commit: VidCommitment,
829 ) -> anyhow::Result<()> {
830 let mut inner = self.inner.write().await;
831 let view_number = proposal.data.view_number().u64();
832 let dir_path = inner.da_dir_path();
833
834 fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
835
836 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
837 inner.replace(
838 &file_path,
839 |_| {
840 tracing::warn!(view_number, "duplicate DA proposal");
843 Ok(false)
844 },
845 |mut file| {
846 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
847 let now = Instant::now();
848 file.write_all(&proposal_bytes)?;
849 self.metrics
850 .internal_append_da_duration
851 .add_point(now.elapsed().as_secs_f64());
852 Ok(())
853 },
854 )
855 }
856 async fn record_action(
857 &self,
858 view: ViewNumber,
859 _epoch: Option<EpochNumber>,
860 action: HotShotAction,
861 ) -> anyhow::Result<()> {
862 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
864 return Ok(());
865 }
866 let mut inner = self.inner.write().await;
867 let path = &inner.voted_view_path();
868 inner.replace(
869 path,
870 |mut file| {
871 let mut bytes = vec![];
872 file.read_to_end(&mut bytes)?;
873 let bytes = bytes
874 .try_into()
875 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
876 let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
877
878 Ok(saved_view < view)
880 },
881 |mut file| {
882 file.write_all(&view.u64().to_le_bytes())?;
883 Ok(())
884 },
885 )?;
886
887 if matches!(action, HotShotAction::Vote) {
888 let restart_view_path = &inner.restart_view_path();
889 let restart_view = view + 1;
890 inner.replace(
891 restart_view_path,
892 |mut file| {
893 let mut bytes = vec![];
894 file.read_to_end(&mut bytes)?;
895 let bytes = bytes
896 .try_into()
897 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
898 let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
899
900 Ok(saved_view < restart_view)
902 },
903 |mut file| {
904 file.write_all(&restart_view.u64().to_le_bytes())?;
905 Ok(())
906 },
907 )?;
908 }
909 Ok(())
910 }
911
912 async fn append_quorum_proposal2(
913 &self,
914 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
915 ) -> anyhow::Result<()> {
916 let mut inner = self.inner.write().await;
917 let view_number = proposal.data.view_number().u64();
918 let dir_path = inner.quorum_proposals2_dir_path();
919
920 fs::create_dir_all(dir_path.clone()).context("failed to create proposals dir")?;
921
922 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
923 inner.replace(
924 &file_path,
925 |_| {
926 Ok(true)
928 },
929 |mut file| {
930 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
931 let now = Instant::now();
932 file.write_all(&proposal_bytes)?;
933 self.metrics
934 .internal_append_quorum2_duration
935 .add_point(now.elapsed().as_secs_f64());
936 Ok(())
937 },
938 )
939 }
940 async fn load_quorum_proposals(
941 &self,
942 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
943 {
944 let inner = self.inner.read().await;
945
946 let dir_path = inner.quorum_proposals2_dir_path();
948 if !dir_path.is_dir() {
949 return Ok(Default::default());
950 }
951
952 let mut map = BTreeMap::new();
954 for (view, path) in view_files(&dir_path)? {
955 let proposal_bytes = fs::read(path)?;
956 let Some(proposal) = bincode::deserialize::<
957 Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
958 >(&proposal_bytes)
959 .or_else(|error| {
960 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
961 &proposal_bytes,
962 )
963 .map(convert_proposal)
964 .inspect_err(|err_v3| {
965 tracing::warn!(
973 ?view,
974 %error,
975 error_v3 = %err_v3,
976 "ignoring malformed quorum proposal file"
977 );
978 })
979 })
980 .ok() else {
981 continue;
982 };
983
984 let proposal2 = convert_proposal(proposal);
985
986 map.insert(view, proposal2);
988 }
989
990 Ok(map)
991 }
992
993 async fn load_quorum_proposal(
994 &self,
995 view: ViewNumber,
996 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
997 let inner = self.inner.read().await;
998 let dir_path = inner.quorum_proposals2_dir_path();
999 let file_path = dir_path.join(view.to_string()).with_extension("txt");
1000 let bytes = fs::read(file_path)?;
1001 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1002 bincode::deserialize(&bytes).or_else(|error| {
1003 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1004 &bytes,
1005 )
1006 .map(convert_proposal)
1007 .context(format!(
1008 "Failed to deserialize quorum proposal for view {view:?}: {error}."
1009 ))
1010 })?;
1011 Ok(proposal)
1012 }
1013
1014 async fn load_upgrade_certificate(
1015 &self,
1016 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1017 let inner = self.inner.read().await;
1018 let path = inner.upgrade_certificate_dir_path();
1019 if !path.is_file() {
1020 return Ok(None);
1021 }
1022 let bytes = fs::read(&path).context("read")?;
1023 Ok(Some(
1024 bincode::deserialize(&bytes).context("deserialize upgrade certificate")?,
1025 ))
1026 }
1027
1028 async fn store_upgrade_certificate(
1029 &self,
1030 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1031 ) -> anyhow::Result<()> {
1032 let mut inner = self.inner.write().await;
1033 let path = &inner.upgrade_certificate_dir_path();
1034 let certificate = match decided_upgrade_certificate {
1035 Some(cert) => cert,
1036 None => return Ok(()),
1037 };
1038 inner.replace(
1039 path,
1040 |_| {
1041 Ok(true)
1043 },
1044 |mut file| {
1045 let bytes =
1046 bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1047 file.write_all(&bytes)?;
1048 Ok(())
1049 },
1050 )
1051 }
1052
1053 async fn store_next_epoch_quorum_certificate(
1054 &self,
1055 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1056 ) -> anyhow::Result<()> {
1057 let mut inner = self.inner.write().await;
1058 let path = &inner.next_epoch_qc();
1059
1060 inner.replace(
1061 path,
1062 |_| {
1063 Ok(true)
1065 },
1066 |mut file| {
1067 let bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1068 file.write_all(&bytes)?;
1069 Ok(())
1070 },
1071 )
1072 }
1073
1074 async fn load_next_epoch_quorum_certificate(
1075 &self,
1076 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1077 let inner = self.inner.read().await;
1078 let path = inner.next_epoch_qc();
1079 if !path.is_file() {
1080 return Ok(None);
1081 }
1082 let bytes = fs::read(&path).context("read")?;
1083 Ok(Some(
1084 bincode::deserialize(&bytes).context("deserialize next epoch qc")?,
1085 ))
1086 }
1087
1088 async fn store_eqc(
1089 &self,
1090 high_qc: QuorumCertificate2<SeqTypes>,
1091 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1092 ) -> anyhow::Result<()> {
1093 let mut inner = self.inner.write().await;
1094 let path = &inner.eqc();
1095
1096 inner.replace(
1097 path,
1098 |_| {
1099 Ok(true)
1101 },
1102 |mut file| {
1103 let bytes = bincode::serialize(&(high_qc, next_epoch_high_qc))
1104 .context("serializing next epoch qc")?;
1105 file.write_all(&bytes)?;
1106 Ok(())
1107 },
1108 )
1109 }
1110
1111 async fn load_eqc(
1112 &self,
1113 ) -> Option<(
1114 QuorumCertificate2<SeqTypes>,
1115 NextEpochQuorumCertificate2<SeqTypes>,
1116 )> {
1117 let inner = self.inner.read().await;
1118 let path = inner.eqc();
1119 if !path.is_file() {
1120 return None;
1121 }
1122 let bytes = fs::read(&path).ok()?;
1123
1124 bincode::deserialize(&bytes).ok()
1125 }
1126
1127 async fn append_da2(
1128 &self,
1129 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1130 _vid_commit: VidCommitment,
1131 ) -> anyhow::Result<()> {
1132 let mut inner = self.inner.write().await;
1133 let view_number = proposal.data.view_number().u64();
1134 let dir_path = inner.da2_dir_path();
1135
1136 fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
1137
1138 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
1139 inner.replace(
1140 &file_path,
1141 |_| {
1142 tracing::warn!(view_number, "duplicate DA proposal");
1145 Ok(false)
1146 },
1147 |mut file| {
1148 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1149 let now = Instant::now();
1150 file.write_all(&proposal_bytes)?;
1151 self.metrics
1152 .internal_append_da2_duration
1153 .add_point(now.elapsed().as_secs_f64());
1154 Ok(())
1155 },
1156 )
1157 }
1158
1159 async fn append_proposal2(
1160 &self,
1161 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1162 ) -> anyhow::Result<()> {
1163 self.append_quorum_proposal2(proposal).await
1164 }
1165
1166 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1167 let mut inner = self.inner.write().await;
1168
1169 if inner.migrated.contains("anchor_leaf") {
1170 tracing::info!("decided leaves already migrated");
1171 return Ok(());
1172 }
1173
1174 let new_leaf_dir = inner.decided_leaf2_path();
1175
1176 fs::create_dir_all(new_leaf_dir.clone()).context("failed to create anchor leaf 2 dir")?;
1177
1178 let old_leaf_dir = inner.decided_leaf_path();
1179 if !old_leaf_dir.is_dir() {
1180 return Ok(());
1181 }
1182
1183 tracing::warn!("migrating decided leaves..");
1184 for entry in fs::read_dir(old_leaf_dir)? {
1185 let entry = entry?;
1186 let path = entry.path();
1187
1188 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1189 continue;
1190 };
1191 let Ok(view) = file.parse::<u64>() else {
1192 continue;
1193 };
1194
1195 let bytes =
1196 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
1197 let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
1198 .context(format!("parsing decided leaf {}", path.display()))?;
1199
1200 let leaf2: Leaf2 = leaf.into();
1201 let cert = CertificatePair::non_epoch_change(qc.to_qc2());
1202
1203 let new_leaf_path = new_leaf_dir.join(view.to_string()).with_extension("txt");
1204
1205 inner.replace(
1206 &new_leaf_path,
1207 |_| {
1208 tracing::warn!(view, "duplicate decided leaf");
1209 Ok(false)
1210 },
1211 |mut file| {
1212 let bytes = bincode::serialize(&(&leaf2.clone(), cert))?;
1213 file.write_all(&bytes)?;
1214 Ok(())
1215 },
1216 )?;
1217
1218 if view % 100 == 0 {
1219 tracing::info!(view, "decided leaves migration progress");
1220 }
1221 }
1222
1223 inner.migrated.insert("anchor_leaf".to_string());
1224 inner.update_migration()?;
1225 tracing::warn!("successfully migrated decided leaves");
1226 Ok(())
1227 }
1228 async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1229 let mut inner = self.inner.write().await;
1230
1231 if inner.migrated.contains("da_proposal") {
1232 tracing::info!("da proposals already migrated");
1233 return Ok(());
1234 }
1235
1236 let new_da_dir = inner.da2_dir_path();
1237
1238 fs::create_dir_all(new_da_dir.clone()).context("failed to create da proposals 2 dir")?;
1239
1240 let old_da_dir = inner.da_dir_path();
1241 if !old_da_dir.is_dir() {
1242 return Ok(());
1243 }
1244
1245 tracing::warn!("migrating da proposals..");
1246
1247 for entry in fs::read_dir(old_da_dir)? {
1248 let entry = entry?;
1249 let path = entry.path();
1250
1251 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1252 continue;
1253 };
1254 let Ok(view) = file.parse::<u64>() else {
1255 continue;
1256 };
1257
1258 let bytes =
1259 fs::read(&path).context(format!("reading da proposal {}", path.display()))?;
1260 let proposal = bincode::deserialize::<Proposal<SeqTypes, DaProposal<SeqTypes>>>(&bytes)
1261 .context(format!("parsing da proposal {}", path.display()))?;
1262
1263 let new_da_path = new_da_dir.join(view.to_string()).with_extension("txt");
1264
1265 let proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> = convert_proposal(proposal);
1266
1267 inner.replace(
1268 &new_da_path,
1269 |_| {
1270 tracing::warn!(view, "duplicate DA proposal 2");
1271 Ok(false)
1272 },
1273 |mut file| {
1274 let bytes = bincode::serialize(&proposal2)?;
1275 file.write_all(&bytes)?;
1276 Ok(())
1277 },
1278 )?;
1279
1280 if view % 100 == 0 {
1281 tracing::info!(view, "DA proposals migration progress");
1282 }
1283 }
1284
1285 inner.migrated.insert("da_proposal".to_string());
1286 inner.update_migration()?;
1287 tracing::warn!("successfully migrated da proposals");
1288 Ok(())
1289 }
1290 async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1291 let mut inner = self.inner.write().await;
1292
1293 if inner.migrated.contains("vid_share") {
1294 tracing::info!("vid shares already migrated");
1295 return Ok(());
1296 }
1297
1298 let new_vid_dir = inner.vid2_dir_path();
1299
1300 fs::create_dir_all(new_vid_dir.clone()).context("failed to create vid shares 2 dir")?;
1301
1302 let old_vid_dir = inner.vid_dir_path();
1303 if !old_vid_dir.is_dir() {
1304 return Ok(());
1305 }
1306
1307 tracing::warn!("migrating vid shares..");
1308
1309 for entry in fs::read_dir(old_vid_dir)? {
1310 let entry = entry?;
1311 let path = entry.path();
1312
1313 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1314 continue;
1315 };
1316 let Ok(view) = file.parse::<u64>() else {
1317 continue;
1318 };
1319
1320 let bytes = fs::read(&path).context(format!("reading vid share {}", path.display()))?;
1321 let proposal =
1322 bincode::deserialize::<Proposal<SeqTypes, VidDisperseShare0<SeqTypes>>>(&bytes)
1323 .context(format!("parsing vid share {}", path.display()))?;
1324
1325 let new_vid_path = new_vid_dir.join(view.to_string()).with_extension("txt");
1326
1327 let proposal2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1328 convert_proposal(proposal);
1329
1330 inner.replace(
1331 &new_vid_path,
1332 |_| {
1333 tracing::warn!(view, "duplicate VID share ");
1334 Ok(false)
1335 },
1336 |mut file| {
1337 let bytes = bincode::serialize(&proposal2)?;
1338 file.write_all(&bytes)?;
1339 Ok(())
1340 },
1341 )?;
1342
1343 if view % 100 == 0 {
1344 tracing::info!(view, "VID shares migration progress");
1345 }
1346 }
1347
1348 inner.migrated.insert("vid_share".to_string());
1349 inner.update_migration()?;
1350 tracing::warn!("successfully migrated vid shares");
1351 Ok(())
1352 }
1353
1354 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1355 let mut inner = self.inner.write().await;
1356
1357 if inner.migrated.contains("quorum_proposals") {
1358 tracing::info!("quorum proposals already migrated");
1359 return Ok(());
1360 }
1361
1362 let new_quorum_proposals_dir = inner.quorum_proposals2_dir_path();
1363
1364 fs::create_dir_all(new_quorum_proposals_dir.clone())
1365 .context("failed to create quorum proposals 2 dir")?;
1366
1367 let old_quorum_proposals_dir = inner.quorum_proposals_dir_path();
1368 if !old_quorum_proposals_dir.is_dir() {
1369 tracing::info!("no existing quorum proposals found for migration");
1370 return Ok(());
1371 }
1372
1373 tracing::warn!("migrating quorum proposals..");
1374 for entry in fs::read_dir(old_quorum_proposals_dir)? {
1375 let entry = entry?;
1376 let path = entry.path();
1377
1378 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1379 continue;
1380 };
1381 let Ok(view) = file.parse::<u64>() else {
1382 continue;
1383 };
1384
1385 let bytes =
1386 fs::read(&path).context(format!("reading quorum proposal {}", path.display()))?;
1387 let proposal =
1388 bincode::deserialize::<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>(&bytes)
1389 .context(format!("parsing quorum proposal {}", path.display()))?;
1390
1391 let new_file_path = new_quorum_proposals_dir
1392 .join(view.to_string())
1393 .with_extension("txt");
1394
1395 let proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1396 convert_proposal(proposal);
1397
1398 inner.replace(
1399 &new_file_path,
1400 |_| {
1401 tracing::warn!(view, "duplicate Quorum proposal2 ");
1402 Ok(false)
1403 },
1404 |mut file| {
1405 let bytes = bincode::serialize(&proposal2)?;
1406 file.write_all(&bytes)?;
1407 Ok(())
1408 },
1409 )?;
1410
1411 if view % 100 == 0 {
1412 tracing::info!(view, "Quorum proposals migration progress");
1413 }
1414 }
1415
1416 inner.migrated.insert("quorum_proposals".to_string());
1417 inner.update_migration()?;
1418 tracing::warn!("successfully migrated quorum proposals");
1419 Ok(())
1420 }
1421 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1422 Ok(())
1423 }
1424
1425 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1426 if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
1427 if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
1428 tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
1429 } else if loaded_drb_input.iteration >= drb_input.iteration {
1430 anyhow::bail!(
1431 "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
1432 loaded_drb_input,
1433 drb_input
1434 )
1435 }
1436 }
1437
1438 let mut inner = self.inner.write().await;
1439 let dir_path = inner.drb_dir_path();
1440
1441 fs::create_dir_all(dir_path.clone()).context("failed to create drb dir")?;
1442
1443 let drb_input_bytes =
1444 bincode::serialize(&drb_input).context("failed to serialize drb_input")?;
1445
1446 let file_path = dir_path
1447 .join(drb_input.epoch.to_string())
1448 .with_extension("bin");
1449
1450 inner.replace(
1451 &file_path,
1452 |_| {
1453 Ok(true)
1455 },
1456 |mut file| {
1457 file.write_all(&drb_input_bytes).context(format!(
1458 "writing epoch drb_input file for epoch {:?} at {:?}",
1459 drb_input.epoch, file_path
1460 ))
1461 },
1462 )
1463 }
1464
1465 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1466 let inner = self.inner.read().await;
1467 let path = &inner.drb_dir_path();
1468 let file_path = path.join(epoch.to_string()).with_extension("bin");
1469 let bytes = fs::read(&file_path).context("read")?;
1470 Ok(bincode::deserialize(&bytes)
1471 .context(format!("failed to deserialize DrbInput for epoch {epoch}"))?)
1472 }
1473
1474 async fn store_drb_result(
1475 &self,
1476 epoch: EpochNumber,
1477 drb_result: DrbResult,
1478 ) -> anyhow::Result<()> {
1479 let mut inner = self.inner.write().await;
1480 let dir_path = inner.epoch_drb_result_dir_path();
1481
1482 fs::create_dir_all(dir_path.clone()).context("failed to create epoch drb result dir")?;
1483
1484 let drb_result_bytes = bincode::serialize(&drb_result).context("serialize drb result")?;
1485
1486 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1487
1488 inner.replace(
1489 &file_path,
1490 |_| {
1491 Ok(true)
1493 },
1494 |mut file| {
1495 file.write_all(&drb_result_bytes)
1496 .context(format!("writing epoch drb result file for epoch {epoch:?}"))
1497 },
1498 )
1499 }
1500
1501 async fn store_epoch_root(
1502 &self,
1503 epoch: EpochNumber,
1504 block_header: <SeqTypes as NodeType>::BlockHeader,
1505 ) -> anyhow::Result<()> {
1506 let mut inner = self.inner.write().await;
1507 let dir_path = inner.epoch_root_block_header_dir_path();
1508
1509 fs::create_dir_all(dir_path.clone())
1510 .context("failed to create epoch root block header dir")?;
1511
1512 let block_header_bytes =
1513 bincode::serialize(&block_header).context("serialize block header")?;
1514
1515 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1516 inner
1517 .replace(
1518 &file_path,
1519 |_| Ok(true),
1520 |mut file| {
1521 file.write_all(&block_header_bytes)?;
1522 Ok(())
1523 },
1524 )
1525 .context(format!(
1526 "writing epoch root block header file for epoch {epoch:?}"
1527 ))?;
1528
1529 Ok(())
1530 }
1531
1532 async fn add_state_cert(
1533 &self,
1534 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1535 ) -> anyhow::Result<()> {
1536 let mut inner = self.inner.write().await;
1537 let view = state_cert.light_client_state.view_number;
1539 let dir_path = inner.state_cert_dir_path();
1540
1541 fs::create_dir_all(dir_path.clone())
1542 .context("failed to create light client state update certificate dir")?;
1543
1544 let bytes = bincode::serialize(&state_cert)
1545 .context("serialize light client state update certificate")?;
1546
1547 let file_path = dir_path.join(view.to_string()).with_extension("txt");
1548 inner
1549 .replace(
1550 &file_path,
1551 |_| Ok(true),
1552 |mut file| {
1553 file.write_all(&bytes)?;
1554 Ok(())
1555 },
1556 )
1557 .context(format!(
1558 "writing light client state update certificate file for view {view:?}"
1559 ))?;
1560
1561 Ok(())
1562 }
1563
1564 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
1565 let inner = self.inner.read().await;
1566 let drb_dir_path = inner.epoch_drb_result_dir_path();
1567 let block_header_dir_path = inner.epoch_root_block_header_dir_path();
1568
1569 let mut result = Vec::new();
1570
1571 if !drb_dir_path.is_dir() {
1572 return Ok(Vec::new());
1573 }
1574 for (epoch, path) in epoch_files(drb_dir_path)? {
1575 let bytes =
1576 fs::read(&path).context(format!("reading epoch drb result {}", path.display()))?;
1577 let drb_result = bincode::deserialize::<DrbResult>(&bytes)
1578 .context(format!("parsing epoch drb result {}", path.display()))?;
1579
1580 let block_header_path = block_header_dir_path
1581 .join(epoch.to_string())
1582 .with_extension("txt");
1583 let block_header = if block_header_path.is_file() {
1584 let bytes = fs::read(&block_header_path).context(format!(
1585 "reading epoch root block header {}",
1586 block_header_path.display()
1587 ))?;
1588 Some(
1589 bincode::deserialize::<<SeqTypes as NodeType>::BlockHeader>(&bytes).context(
1590 format!(
1591 "parsing epoch root block header {}",
1592 block_header_path.display()
1593 ),
1594 )?,
1595 )
1596 } else {
1597 None
1598 };
1599
1600 result.push(InitializerEpochInfo::<SeqTypes> {
1601 epoch,
1602 drb_result,
1603 block_header,
1604 });
1605 }
1606
1607 result.sort_by(|a, b| a.epoch.cmp(&b.epoch));
1608
1609 let start = result
1611 .len()
1612 .saturating_sub(RECENT_STAKE_TABLES_LIMIT as usize);
1613 let recent = result[start..].to_vec();
1614
1615 Ok(recent)
1616 }
1617
1618 async fn load_state_cert(
1619 &self,
1620 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1621 let inner = self.inner.read().await;
1622 let dir_path = inner.finalized_state_cert_dir_path();
1623
1624 if !dir_path.is_dir() {
1625 return Ok(None);
1626 }
1627
1628 let mut result: Option<LightClientStateUpdateCertificateV2<SeqTypes>> = None;
1629
1630 for (epoch, path) in epoch_files(dir_path)? {
1631 if result.as_ref().is_some_and(|cert| epoch <= cert.epoch) {
1632 continue;
1633 }
1634 let bytes = fs::read(&path).context(format!(
1635 "reading light client state update certificate {}",
1636 path.display()
1637 ))?;
1638 let cert =
1639 bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1640 .or_else(|error| {
1641 tracing::info!(
1642 %error,
1643 path = %path.display(),
1644 "Failed to deserialize LightClientStateUpdateCertificateV2"
1645 );
1646
1647 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(
1648 &bytes,
1649 )
1650 .map(Into::into)
1651 .with_context(|| {
1652 format!(
1653 "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1654 path.display()
1655 )
1656 })
1657 })?;
1658
1659 result = Some(cert);
1660 }
1661
1662 Ok(result)
1663 }
1664
1665 async fn get_state_cert_by_epoch(
1666 &self,
1667 epoch: u64,
1668 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1669 let inner = self.inner.read().await;
1670 let dir_path = inner.finalized_state_cert_dir_path();
1671
1672 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1673
1674 if !file_path.exists() {
1675 return Ok(None);
1676 }
1677
1678 let bytes = fs::read(&file_path).context(format!(
1679 "reading light client state update certificate {}",
1680 file_path.display()
1681 ))?;
1682
1683 let cert = bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1684 .or_else(|error| {
1685 tracing::info!(
1686 %error,
1687 path = %file_path.display(),
1688 "Failed to deserialize LightClientStateUpdateCertificateV2"
1689 );
1690
1691 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
1692 .map(Into::into)
1693 .with_context(|| {
1694 format!(
1695 "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1696 file_path.display()
1697 )
1698 })
1699 })?;
1700
1701 Ok(Some(cert))
1702 }
1703
1704 async fn insert_state_cert(
1705 &self,
1706 epoch: u64,
1707 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1708 ) -> anyhow::Result<()> {
1709 let inner = self.inner.read().await;
1710 let dir_path = inner.finalized_state_cert_dir_path();
1711
1712 fs::create_dir_all(&dir_path)
1713 .context(format!("creating state cert dir {}", dir_path.display()))?;
1714
1715 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1716 let bytes = bincode::serialize(&cert)
1717 .context("serializing light client state update certificate")?;
1718
1719 fs::write(&file_path, bytes).context(format!(
1720 "writing light client state update certificate {}",
1721 file_path.display()
1722 ))?;
1723
1724 Ok(())
1725 }
1726
1727 fn enable_metrics(&mut self, _metrics: &dyn Metrics) {
1728 }
1730}
1731
1732#[async_trait]
1733impl MembershipPersistence for Persistence {
1734 async fn load_stake(
1735 &self,
1736 epoch: EpochNumber,
1737 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>> {
1738 let inner = self.inner.read().await;
1739 let path = &inner.stake_table_dir_path();
1740 let file_path = path.join(epoch.to_string()).with_extension("txt");
1741
1742 if !file_path.exists() {
1743 return Ok(None);
1744 }
1745
1746 let bytes = fs::read(&file_path).with_context(|| {
1747 format!("failed to read stake table file at {}", file_path.display())
1748 })?;
1749
1750 let stake = match bincode::deserialize(&bytes) {
1751 Ok(res) => res,
1752 Err(err) => {
1753 let map = bincode::deserialize::<ValidatorMap>(&bytes).with_context(|| {
1754 format!(
1755 "fallback deserialization of legacy stake table at {} failed after \
1756 initial error: {}",
1757 file_path.display(),
1758 err
1759 )
1760 })?;
1761 (map, None, None)
1762 },
1763 };
1764
1765 Ok(Some(stake))
1766 }
1767
1768 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
1769 let limit = limit as usize;
1770 let inner = self.inner.read().await;
1771 let path = &inner.stake_table_dir_path();
1772 let sorted_files = epoch_files(&path)?
1773 .sorted_by(|(e1, _), (e2, _)| e2.cmp(e1))
1774 .take(limit);
1775 let mut validator_sets: Vec<IndexedStake> = Vec::new();
1776
1777 for (epoch, file_path) in sorted_files {
1778 let bytes = fs::read(&file_path).with_context(|| {
1779 format!("failed to read stake table file at {}", file_path.display())
1780 })?;
1781
1782 let stake: (ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>) =
1783 match bincode::deserialize::<(
1784 ValidatorMap,
1785 Option<RewardAmount>,
1786 Option<StakeTableHash>,
1787 )>(&bytes)
1788 {
1789 Ok(res) => res,
1790 Err(err) => {
1791 let validatormap = bincode::deserialize::<ValidatorMap>(&bytes)
1792 .with_context(|| {
1793 format!(
1794 "failed to deserialize legacy stake table at {}: fallback \
1795 also failed after initial error: {}",
1796 file_path.display(),
1797 err
1798 )
1799 })?;
1800
1801 (validatormap, None, None)
1802 },
1803 };
1804
1805 validator_sets.push((epoch, (stake.0, stake.1), stake.2));
1806 }
1807
1808 Ok(Some(validator_sets))
1809 }
1810
1811 async fn store_stake(
1812 &self,
1813 epoch: EpochNumber,
1814 stake: ValidatorMap,
1815 block_reward: Option<RewardAmount>,
1816 stake_table_hash: Option<StakeTableHash>,
1817 ) -> anyhow::Result<()> {
1818 let mut inner = self.inner.write().await;
1819 let dir_path = &inner.stake_table_dir_path();
1820
1821 fs::create_dir_all(dir_path.clone()).context("failed to create stake table dir")?;
1822
1823 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1824
1825 inner.replace(
1826 &file_path,
1827 |_| {
1828 Ok(true)
1830 },
1831 |mut file| {
1832 let bytes = bincode::serialize(&(stake, block_reward, stake_table_hash))
1833 .context("serializing combined stake table")?;
1834 file.write_all(&bytes)?;
1835 Ok(())
1836 },
1837 )
1838 }
1839
1840 async fn store_events(
1842 &self,
1843 to_l1_block: u64,
1844 events: Vec<(EventKey, StakeTableEvent)>,
1845 ) -> anyhow::Result<()> {
1846 if events.is_empty() {
1847 return Ok(());
1848 }
1849
1850 let mut inner = self.inner.write().await;
1851 let dir_path = &inner.stake_table_dir_path();
1852 let events_dir = dir_path.join("events");
1853
1854 fs::create_dir_all(events_dir.clone()).context("failed to create events dir")?;
1855 let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1857
1858 if last_l1_finalized_path.exists() {
1860 let bytes = fs::read(&last_l1_finalized_path).with_context(|| {
1861 format!("Failed to read file at path: {last_l1_finalized_path:?}")
1862 })?;
1863 let mut buf = [0; 8];
1864 bytes
1865 .as_slice()
1866 .read_exact(&mut buf[..8])
1867 .with_context(|| {
1868 format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1869 })?;
1870 let persisted_l1_block = u64::from_le_bytes(buf);
1871 if persisted_l1_block > to_l1_block {
1872 tracing::debug!(?persisted_l1_block, ?to_l1_block, "stored l1 is greater");
1873 return Ok(());
1874 }
1875 }
1876
1877 for (event_key, event) in events {
1881 let (block_number, event_index) = event_key;
1882 let filename = format!("{block_number}_{event_index}");
1884 let file_path = events_dir.join(filename).with_extension("json");
1885
1886 if file_path.exists() {
1887 continue;
1888 }
1889
1890 inner
1891 .replace(
1892 &file_path,
1893 |_| Ok(true),
1894 |file| {
1895 let writer = BufWriter::new(file);
1896
1897 serde_json::to_writer_pretty(writer, &event)?;
1898 Ok(())
1899 },
1900 )
1901 .context("Failed to write event to file")?;
1902 }
1903
1904 inner.replace(
1906 &last_l1_finalized_path,
1907 |_| Ok(true),
1908 |mut file| {
1909 let bytes = to_l1_block.to_le_bytes();
1910
1911 file.write_all(&bytes)?;
1912 tracing::debug!("updated l1 finalized ={to_l1_block:?}");
1913 Ok(())
1914 },
1915 )
1916 }
1917
1918 async fn load_events(
1929 &self,
1930 from_l1_block: u64,
1931 to_l1_block: u64,
1932 ) -> anyhow::Result<(
1933 Option<EventsPersistenceRead>,
1934 Vec<(EventKey, StakeTableEvent)>,
1935 )> {
1936 let inner = self.inner.read().await;
1937 let dir_path = inner.stake_table_dir_path();
1938 let events_dir = dir_path.join("events");
1939
1940 let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1943
1944 if !last_l1_finalized_path.exists() || !events_dir.exists() {
1945 return Ok((None, Vec::new()));
1946 }
1947
1948 let mut events = Vec::new();
1949
1950 let bytes = fs::read(&last_l1_finalized_path)
1951 .with_context(|| format!("Failed to read file at path: {last_l1_finalized_path:?}"))?;
1952 let mut buf = [0; 8];
1953 bytes
1954 .as_slice()
1955 .read_exact(&mut buf[..8])
1956 .with_context(|| {
1957 format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1958 })?;
1959
1960 let last_processed_l1_block = u64::from_le_bytes(buf);
1961
1962 let query_l1_block = if last_processed_l1_block > to_l1_block {
1966 to_l1_block
1967 } else {
1968 last_processed_l1_block
1969 };
1970
1971 for entry in fs::read_dir(&events_dir).context("events directory")? {
1972 let entry = entry?;
1973 let path = entry.path();
1974
1975 if !entry.file_type()?.is_file() {
1976 continue;
1977 }
1978
1979 if path
1980 .extension()
1981 .context(format!("extension for path={path:?}"))?
1982 != "json"
1983 {
1984 continue;
1985 }
1986
1987 let filename = path
1988 .file_stem()
1989 .and_then(|f| f.to_str())
1990 .unwrap_or_default();
1991
1992 let parts: Vec<&str> = filename.split('_').collect();
1993 if parts.len() != 2 {
1994 continue;
1995 }
1996
1997 let block_number = parts[0].parse::<u64>()?;
1998 let log_index = parts[1].parse::<u64>()?;
1999
2000 if block_number < from_l1_block || block_number > query_l1_block {
2001 continue;
2002 }
2003
2004 let file =
2005 File::open(&path).context(format!("Failed to open event file. path={path:?}"))?;
2006 let reader = BufReader::new(file);
2007
2008 let event: StakeTableEvent = serde_json::from_reader(reader)
2009 .context(format!("Failed to deserialize event at path={path:?}"))?;
2010
2011 events.push(((block_number, log_index), event));
2012 }
2013
2014 events.sort_by_key(|(key, _)| *key);
2015
2016 if query_l1_block == to_l1_block {
2017 Ok((Some(EventsPersistenceRead::Complete), events))
2018 } else {
2019 Ok((
2020 Some(EventsPersistenceRead::UntilL1Block(query_l1_block)),
2021 events,
2022 ))
2023 }
2024 }
2025
2026 async fn store_all_validators(
2027 &self,
2028 epoch: EpochNumber,
2029 all_validators: ValidatorMap,
2030 ) -> anyhow::Result<()> {
2031 let mut inner = self.inner.write().await;
2032 let dir_path = inner.stake_table_dir_path();
2033 let validators_dir = dir_path.join("validators");
2034
2035 fs::create_dir_all(&validators_dir)
2037 .with_context(|| format!("Failed to create validators dir: {validators_dir:?}"))?;
2038
2039 let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2041
2042 inner
2043 .replace(
2044 &file_path,
2045 |_| Ok(true),
2046 |file| {
2047 let writer = BufWriter::new(file);
2048
2049 serde_json::to_writer_pretty(writer, &all_validators).with_context(|| {
2050 format!("Failed to serialize validators for epoch {epoch}")
2051 })?;
2052 Ok(())
2053 },
2054 )
2055 .with_context(|| format!("Failed to write validator file: {file_path:?}"))?;
2056
2057 Ok(())
2058 }
2059
2060 async fn load_all_validators(
2061 &self,
2062 epoch: EpochNumber,
2063 offset: u64,
2064 limit: u64,
2065 ) -> anyhow::Result<Vec<Validator<PubKey>>> {
2066 let inner = self.inner.read().await;
2067 let dir_path = inner.stake_table_dir_path();
2068 let validators_dir = dir_path.join("validators");
2069 let file_path = validators_dir.join(format!("epoch_{epoch}.json"));
2070
2071 if !file_path.exists() {
2072 bail!("Validator file not found for epoch {epoch}");
2073 }
2074
2075 let file = File::open(&file_path)
2076 .with_context(|| format!("Failed to open validator file: {file_path:?}"))?;
2077 let reader = BufReader::new(file);
2078
2079 let map: ValidatorMap = serde_json::from_reader(reader).with_context(|| {
2080 format!("Failed to deserialize validators at {file_path:?}. epoch = {epoch}")
2081 })?;
2082
2083 let mut values: Vec<Validator<PubKey>> = map.into_values().collect();
2084 values.sort_by_key(|v| v.account);
2085
2086 let start = offset as usize;
2087 let end = (start + limit as usize).min(values.len());
2088
2089 if start >= values.len() {
2090 return Ok(vec![]);
2091 }
2092
2093 Ok(values[start..end].to_vec())
2094 }
2095}
2096
2097#[async_trait]
2098impl DhtPersistentStorage for Persistence {
2099 async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
2105 let to_save =
2107 bincode::serialize(&records).with_context(|| "failed to serialize records")?;
2108
2109 let path = self.inner.read().await.libp2p_dht_path();
2111
2112 fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
2114 .with_context(|| "failed to create directory")?;
2115
2116 let mut inner = self.inner.write().await;
2118
2119 inner
2121 .replace(
2122 &path,
2123 |_| {
2124 Ok(true)
2126 },
2127 |mut file| {
2128 file.write_all(&to_save)
2129 .with_context(|| "failed to write records to file")?;
2130 Ok(())
2131 },
2132 )
2133 .with_context(|| "failed to save records to file")?;
2134
2135 Ok(())
2136 }
2137
2138 async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
2144 let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
2146 .with_context(|| "Failed to read records from file")?;
2147
2148 let records: Vec<SerializableRecord> =
2150 bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;
2151
2152 Ok(records)
2153 }
2154}
2155
2156fn view_files(
2158 dir: impl AsRef<Path>,
2159) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
2160 Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2161 let dir = dir.as_ref().display();
2162 let entry = entry.ok()?;
2163 if !entry.file_type().ok()?.is_file() {
2164 tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2165 return None;
2166 }
2167 let path = entry.path();
2168 if path.extension()? != "txt" {
2169 tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2170 return None;
2171 }
2172 let file_name = path.file_stem()?;
2173 let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
2174 tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2175 return None;
2176 };
2177 Some((ViewNumber::new(view_number), entry.path().to_owned()))
2178 }))
2179}
2180
2181fn epoch_files(
2184 dir: impl AsRef<Path>,
2185) -> anyhow::Result<impl Iterator<Item = (EpochNumber, PathBuf)>> {
2186 Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2187 let dir = dir.as_ref().display();
2188 let entry = entry.ok()?;
2189 if !entry.file_type().ok()?.is_file() {
2190 tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2191 return None;
2192 }
2193 let path = entry.path();
2194 if path.extension()? != "txt" {
2195 tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2196 return None;
2197 }
2198 let file_name = path.file_stem()?;
2199 let Ok(epoch_number) = file_name.to_string_lossy().parse::<u64>() else {
2200 tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2201 return None;
2202 };
2203 Some((EpochNumber::new(epoch_number), entry.path().to_owned()))
2204 }))
2205}
2206
2207#[cfg(test)]
2208mod test {
2209 use std::marker::PhantomData;
2210
2211 use committable::{Commitment, CommitmentBoundsArkless, Committable};
2212 use espresso_types::{Header, Leaf, NodeState, PubKey, ValidatedState};
2213 use hotshot::types::SignatureKey;
2214 use hotshot_example_types::node_types::TestVersions;
2215 use hotshot_query_service::testing::mocks::MockVersions;
2216 use hotshot_types::{
2217 data::QuorumProposal2,
2218 light_client::LightClientState,
2219 simple_certificate::QuorumCertificate,
2220 simple_vote::QuorumData,
2221 traits::{
2222 block_contents::GENESIS_VID_NUM_STORAGE_NODES, node_implementation::Versions,
2223 EncodeBytes,
2224 },
2225 vid::advz::advz_scheme,
2226 };
2227 use jf_advz::VidScheme;
2228 use serde_json::json;
2229 use tempfile::TempDir;
2230 use vbs::version::StaticVersionType;
2231
2232 use super::*;
2233 use crate::{persistence::tests::TestablePersistence, BLSPubKey};
2234
2235 #[async_trait]
2236 impl TestablePersistence for Persistence {
2237 type Storage = TempDir;
2238
2239 async fn tmp_storage() -> Self::Storage {
2240 TempDir::new().unwrap()
2241 }
2242
2243 fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self> {
2244 Options::new(storage.path().into())
2245 }
2246 }
2247
2248 #[test]
2249 fn test_config_migrations_add_builder_urls() {
2250 let before = json!({
2251 "config": {
2252 "builder_url": "https://test:8080",
2253 "start_proposing_view": 1,
2254 "stop_proposing_view": 2,
2255 "start_voting_view": 1,
2256 "stop_voting_view": 2,
2257 "start_proposing_time": 1,
2258 "stop_proposing_time": 2,
2259 "start_voting_time": 1,
2260 "stop_voting_time": 2
2261 }
2262 });
2263 let after = json!({
2264 "config": {
2265 "builder_urls": ["https://test:8080"],
2266 "start_proposing_view": 1,
2267 "stop_proposing_view": 2,
2268 "start_voting_view": 1,
2269 "stop_voting_view": 2,
2270 "start_proposing_time": 1,
2271 "stop_proposing_time": 2,
2272 "start_voting_time": 1,
2273 "stop_voting_time": 2,
2274 "epoch_height": 0,
2275 "drb_difficulty": 0,
2276 "drb_upgrade_difficulty": 0,
2277 "da_committees": [],
2278 }
2279 });
2280
2281 assert_eq!(migrate_network_config(before).unwrap(), after);
2282 }
2283
2284 #[test]
2285 fn test_config_migrations_existing_builder_urls() {
2286 let before = json!({
2287 "config": {
2288 "builder_urls": ["https://test:8080", "https://test:8081"],
2289 "start_proposing_view": 1,
2290 "stop_proposing_view": 2,
2291 "start_voting_view": 1,
2292 "stop_voting_view": 2,
2293 "start_proposing_time": 1,
2294 "stop_proposing_time": 2,
2295 "start_voting_time": 1,
2296 "stop_voting_time": 2,
2297 "epoch_height": 0,
2298 "drb_difficulty": 0,
2299 "drb_upgrade_difficulty": 0,
2300 "da_committees": [],
2301 }
2302 });
2303
2304 assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2305 }
2306
2307 #[test]
2308 fn test_config_migrations_add_upgrade_params() {
2309 let before = json!({
2310 "config": {
2311 "builder_urls": ["https://test:8080", "https://test:8081"]
2312 }
2313 });
2314 let after = json!({
2315 "config": {
2316 "builder_urls": ["https://test:8080", "https://test:8081"],
2317 "start_proposing_view": 9007199254740991u64,
2318 "stop_proposing_view": 0,
2319 "start_voting_view": 9007199254740991u64,
2320 "stop_voting_view": 0,
2321 "start_proposing_time": 9007199254740991u64,
2322 "stop_proposing_time": 0,
2323 "start_voting_time": 9007199254740991u64,
2324 "stop_voting_time": 0,
2325 "epoch_height": 0,
2326 "drb_difficulty": 0,
2327 "drb_upgrade_difficulty": 0,
2328 "da_committees": [],
2329 }
2330 });
2331
2332 assert_eq!(migrate_network_config(before).unwrap(), after);
2333 }
2334
2335 #[test]
2336 fn test_config_migrations_existing_upgrade_params() {
2337 let before = json!({
2338 "config": {
2339 "builder_urls": ["https://test:8080", "https://test:8081"],
2340 "start_proposing_view": 1,
2341 "stop_proposing_view": 2,
2342 "start_voting_view": 1,
2343 "stop_voting_view": 2,
2344 "start_proposing_time": 1,
2345 "stop_proposing_time": 2,
2346 "start_voting_time": 1,
2347 "stop_voting_time": 2,
2348 "epoch_height": 0,
2349 "drb_difficulty": 0,
2350 "drb_upgrade_difficulty": 0,
2351 "da_committees": [],
2352 }
2353 });
2354
2355 assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2356 }
2357
2358 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2359 pub async fn test_consensus_migration() {
2360 let rows = 300;
2361 let tmp = Persistence::tmp_storage().await;
2362 let mut opt = Persistence::options(&tmp);
2363 let storage = opt.create().await.unwrap();
2364
2365 let inner = storage.inner.read().await;
2366
2367 let decided_leaves_path = inner.decided_leaf_path();
2368 fs::create_dir_all(decided_leaves_path.clone()).expect("failed to create proposals dir");
2369
2370 let qp_dir_path = inner.quorum_proposals_dir_path();
2371 fs::create_dir_all(qp_dir_path.clone()).expect("failed to create proposals dir");
2372
2373 let state_cert_dir_path = inner.state_cert_dir_path();
2374 fs::create_dir_all(state_cert_dir_path.clone()).expect("failed to create state cert dir");
2375 drop(inner);
2376
2377 assert!(storage.load_state_cert().await.unwrap().is_none());
2378
2379 for i in 0..rows {
2380 let view = ViewNumber::new(i);
2381 let validated_state = ValidatedState::default();
2382 let instance_state = NodeState::default();
2383
2384 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
2385 let (payload, metadata) =
2386 Payload::from_transactions([], &validated_state, &instance_state)
2387 .await
2388 .unwrap();
2389
2390 let payload_bytes = payload.encode();
2391
2392 let block_header =
2393 Header::genesis::<TestVersions>(&instance_state, payload.clone(), &metadata);
2394
2395 let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
2396 epoch: EpochNumber::new(i),
2397 light_client_state: LightClientState {
2398 view_number: i,
2399 block_height: i,
2400 block_comm_root: Default::default(),
2401 },
2402 next_stake_table_state: Default::default(),
2403 signatures: vec![], auth_root: Default::default(),
2405 };
2406 assert!(storage.add_state_cert(state_cert).await.is_ok());
2407
2408 let null_quorum_data = QuorumData {
2409 leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
2410 };
2411
2412 let justify_qc = QuorumCertificate::new(
2413 null_quorum_data.clone(),
2414 null_quorum_data.commit(),
2415 view,
2416 None,
2417 PhantomData,
2418 );
2419
2420 let quorum_proposal = QuorumProposal {
2421 block_header,
2422 view_number: view,
2423 justify_qc: justify_qc.clone(),
2424 upgrade_certificate: None,
2425 proposal_certificate: None,
2426 };
2427
2428 let quorum_proposal_signature =
2429 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2430 .expect("Failed to sign quorum proposal");
2431
2432 let proposal = Proposal {
2433 data: quorum_proposal.clone(),
2434 signature: quorum_proposal_signature,
2435 _pd: PhantomData,
2436 };
2437
2438 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
2439 leaf.fill_block_payload::<TestVersions>(
2440 payload,
2441 GENESIS_VID_NUM_STORAGE_NODES,
2442 <TestVersions as Versions>::Base::VERSION,
2443 )
2444 .unwrap();
2445
2446 let mut inner = storage.inner.write().await;
2447
2448 tracing::debug!("inserting decided leaves");
2449 let file_path = decided_leaves_path
2450 .join(view.to_string())
2451 .with_extension("txt");
2452
2453 tracing::debug!("inserting decided leaves");
2454
2455 inner
2456 .replace(
2457 &file_path,
2458 |_| Ok(true),
2459 |mut file| {
2460 let bytes = bincode::serialize(&(&leaf.clone(), justify_qc))?;
2461 file.write_all(&bytes)?;
2462 Ok(())
2463 },
2464 )
2465 .expect("replace decided leaves");
2466
2467 let file_path = qp_dir_path.join(view.to_string()).with_extension("txt");
2468
2469 tracing::debug!("inserting qc for {view}");
2470
2471 inner
2472 .replace(
2473 &file_path,
2474 |_| Ok(true),
2475 |mut file| {
2476 let proposal_bytes =
2477 bincode::serialize(&proposal).context("serialize proposal")?;
2478
2479 file.write_all(&proposal_bytes)?;
2480 Ok(())
2481 },
2482 )
2483 .unwrap();
2484
2485 drop(inner);
2486 let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
2487 .disperse(payload_bytes.clone())
2488 .unwrap();
2489
2490 let vid = VidDisperseShare0::<SeqTypes> {
2491 view_number: ViewNumber::new(i),
2492 payload_commitment: Default::default(),
2493 share: disperse.shares[0].clone(),
2494 common: disperse.common,
2495 recipient_key: pubkey,
2496 };
2497
2498 let (payload, metadata) =
2499 Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
2500 .await
2501 .unwrap();
2502
2503 let da = DaProposal::<SeqTypes> {
2504 encoded_transactions: payload.encode(),
2505 metadata,
2506 view_number: ViewNumber::new(i),
2507 };
2508
2509 let block_payload_signature =
2510 BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
2511
2512 let da_proposal = Proposal {
2513 data: da,
2514 signature: block_payload_signature,
2515 _pd: Default::default(),
2516 };
2517
2518 tracing::debug!("inserting vid for {view}");
2519 storage
2520 .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
2521 .await
2522 .unwrap();
2523
2524 tracing::debug!("inserting da for {view}");
2525 storage
2526 .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
2527 .await
2528 .unwrap();
2529 }
2530
2531 storage.migrate_storage().await.unwrap();
2532 let inner = storage.inner.read().await;
2533 let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2534 let decided_leaves_count = decided_leaves
2535 .filter_map(Result::ok)
2536 .filter(|e| e.path().is_file())
2537 .count();
2538 assert_eq!(
2539 decided_leaves_count, rows as usize,
2540 "decided leaves count does not match",
2541 );
2542
2543 let da_proposals = fs::read_dir(inner.da2_dir_path()).unwrap();
2544 let da_proposals_count = da_proposals
2545 .filter_map(Result::ok)
2546 .filter(|e| e.path().is_file())
2547 .count();
2548 assert_eq!(
2549 da_proposals_count, rows as usize,
2550 "da proposals does not match",
2551 );
2552
2553 let vids = fs::read_dir(inner.vid2_dir_path()).unwrap();
2554 let vids_count = vids
2555 .filter_map(Result::ok)
2556 .filter(|e| e.path().is_file())
2557 .count();
2558 assert_eq!(vids_count, rows as usize, "vid shares count does not match",);
2559
2560 let qps = fs::read_dir(inner.quorum_proposals2_dir_path()).unwrap();
2561 let qps_count = qps
2562 .filter_map(Result::ok)
2563 .filter(|e| e.path().is_file())
2564 .count();
2565 assert_eq!(
2566 qps_count, rows as usize,
2567 "quorum proposals count does not match",
2568 );
2569
2570 let state_certs = fs::read_dir(inner.state_cert_dir_path()).unwrap();
2571 let state_cert_count = state_certs
2572 .filter_map(Result::ok)
2573 .filter(|e| e.path().is_file())
2574 .count();
2575 assert_eq!(
2576 state_cert_count, rows as usize,
2577 "light client state update certificate count does not match",
2578 );
2579
2580 let storage = opt.create().await.unwrap();
2584 storage.migrate_storage().await.unwrap();
2585
2586 let inner = storage.inner.read().await;
2587 let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2588 let decided_leaves_count = decided_leaves
2589 .filter_map(Result::ok)
2590 .filter(|e| e.path().is_file())
2591 .count();
2592 assert_eq!(
2593 decided_leaves_count, rows as usize,
2594 "decided leaves count does not match",
2595 );
2596 }
2597
2598 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2599 async fn test_load_quorum_proposals_invalid_extension() {
2600 let tmp = Persistence::tmp_storage().await;
2601 let storage = Persistence::connect(&tmp).await;
2602
2603 let leaf = Leaf2::genesis::<MockVersions>(&Default::default(), &NodeState::mock()).await;
2605 let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2606 let signature = PubKey::sign(&privkey, &[]).unwrap();
2607 let mut quorum_proposal = Proposal {
2608 data: QuorumProposalWrapper::<SeqTypes> {
2609 proposal: QuorumProposal2::<SeqTypes> {
2610 epoch: None,
2611 block_header: leaf.block_header().clone(),
2612 view_number: ViewNumber::genesis(),
2613 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
2614 &Default::default(),
2615 &NodeState::mock(),
2616 )
2617 .await,
2618 upgrade_certificate: None,
2619 view_change_evidence: None,
2620 next_drb_result: None,
2621 next_epoch_justify_qc: None,
2622 state_cert: None,
2623 },
2624 },
2625 signature,
2626 _pd: Default::default(),
2627 };
2628
2629 let quorum_proposal1 = quorum_proposal.clone();
2631 storage
2632 .append_quorum_proposal2(&quorum_proposal1)
2633 .await
2634 .unwrap();
2635 quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
2636 let quorum_proposal2 = quorum_proposal.clone();
2637 storage
2638 .append_quorum_proposal2(&quorum_proposal2)
2639 .await
2640 .unwrap();
2641
2642 fs::rename(
2645 tmp.path().join("quorum_proposals2/1.txt"),
2646 tmp.path().join("quorum_proposals2/1.swp"),
2647 )
2648 .unwrap();
2649
2650 assert_eq!(
2652 storage.load_quorum_proposals().await.unwrap(),
2653 [(ViewNumber::genesis(), quorum_proposal1)]
2654 .into_iter()
2655 .collect::<BTreeMap<_, _>>()
2656 );
2657 }
2658
2659 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2660 async fn test_load_quorum_proposals_malformed_data() {
2661 let tmp = Persistence::tmp_storage().await;
2662 let storage = Persistence::connect(&tmp).await;
2663
2664 let leaf: Leaf2 = Leaf::genesis::<MockVersions>(&Default::default(), &NodeState::mock())
2666 .await
2667 .into();
2668 let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2669 let signature = PubKey::sign(&privkey, &[]).unwrap();
2670 let quorum_proposal = Proposal {
2671 data: QuorumProposalWrapper::<SeqTypes> {
2672 proposal: QuorumProposal2::<SeqTypes> {
2673 epoch: None,
2674 block_header: leaf.block_header().clone(),
2675 view_number: ViewNumber::new(1),
2676 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
2677 &Default::default(),
2678 &NodeState::mock(),
2679 )
2680 .await,
2681 upgrade_certificate: None,
2682 view_change_evidence: None,
2683 next_drb_result: None,
2684 next_epoch_justify_qc: None,
2685 state_cert: None,
2686 },
2687 },
2688 signature,
2689 _pd: Default::default(),
2690 };
2691
2692 fs::create_dir_all(tmp.path().join("quorum_proposals2")).unwrap();
2694 fs::write(
2695 tmp.path().join("quorum_proposals2/0.txt"),
2696 "invalid data".as_bytes(),
2697 )
2698 .unwrap();
2699
2700 storage
2702 .append_quorum_proposal2(&quorum_proposal)
2703 .await
2704 .unwrap();
2705
2706 assert_eq!(
2708 storage.load_quorum_proposals().await.unwrap(),
2709 [(ViewNumber::new(1), quorum_proposal)]
2710 .into_iter()
2711 .collect::<BTreeMap<_, _>>()
2712 );
2713 }
2714}