1use std::{
2 collections::{BTreeMap, HashSet},
3 fs::{self, File, OpenOptions},
4 io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
5 ops::RangeInclusive,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Instant,
9};
10
11use anyhow::{anyhow, Context};
12use async_lock::RwLock;
13use async_trait::async_trait;
14use clap::Parser;
15use espresso_types::{
16 traits::{EventsPersistenceRead, MembershipPersistence},
17 v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
18 v0_3::{EventKey, IndexedStake, RewardAmount, StakeTableEvent},
19 Leaf, Leaf2, NetworkConfig, Payload, SeqTypes, StakeTableHash, ValidatorMap,
20};
21use hotshot::InitializerEpochInfo;
22use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
23 DhtPersistentStorage, SerializableRecord,
24};
25use hotshot_types::{
26 data::{
27 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
28 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
29 QuorumProposalWrapperLegacy, VidCommitment, VidDisperseShare,
30 },
31 drb::{DrbInput, DrbResult},
32 event::{Event, EventType, HotShotAction, LeafInfo},
33 message::{convert_proposal, Proposal},
34 simple_certificate::{
35 LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
36 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
37 },
38 traits::{
39 block_contents::{BlockHeader, BlockPayload},
40 metrics::Metrics,
41 node_implementation::{ConsensusTime, NodeType},
42 },
43 vote::HasViewNumber,
44};
45use itertools::Itertools;
46
47use crate::{
48 persistence::persistence_metrics::PersistenceMetricsValue, ViewNumber,
49 RECENT_STAKE_TABLES_LIMIT,
50};
51
52#[derive(Parser, Clone, Debug)]
54pub struct Options {
55 #[clap(long, env = "ESPRESSO_SEQUENCER_STORAGE_PATH")]
57 path: PathBuf,
58
59 #[clap(
71 long,
72 env = "ESPRESSO_SEQUENCER_CONSENSUS_VIEW_RETENTION",
73 default_value = "130000"
74 )]
75 pub(crate) consensus_view_retention: u64,
76}
77
78impl Default for Options {
79 fn default() -> Self {
80 Self::parse_from(std::iter::empty::<String>())
81 }
82}
83
84impl Options {
85 pub fn new(path: PathBuf) -> Self {
86 Self {
87 path,
88 consensus_view_retention: 130000,
89 }
90 }
91
92 pub(crate) fn path(&self) -> &Path {
93 &self.path
94 }
95}
96
97#[async_trait]
98impl PersistenceOptions for Options {
99 type Persistence = Persistence;
100
101 fn set_view_retention(&mut self, view_retention: u64) {
102 self.consensus_view_retention = view_retention;
103 }
104
105 async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
106 let path = self.path.clone();
107 let view_retention = self.consensus_view_retention;
108
109 let migration_path = path.join("migration");
110 let migrated = if migration_path.is_file() {
111 let bytes = fs::read(&migration_path).context(format!(
112 "unable to read migration from {}",
113 migration_path.display()
114 ))?;
115 bincode::deserialize(&bytes).context("malformed migration file")?
116 } else {
117 HashSet::new()
118 };
119
120 Ok(Persistence {
121 inner: Arc::new(RwLock::new(Inner {
122 path,
123 migrated,
124 view_retention,
125 })),
126 metrics: Arc::new(PersistenceMetricsValue::default()),
127 })
128 }
129
130 async fn reset(self) -> anyhow::Result<()> {
131 todo!()
132 }
133}
134
135#[derive(Clone, Debug)]
137pub struct Persistence {
138 inner: Arc<RwLock<Inner>>,
142 metrics: Arc<PersistenceMetricsValue>,
144}
145
146#[derive(Debug)]
147struct Inner {
148 path: PathBuf,
149 view_retention: u64,
150 migrated: HashSet<String>,
151}
152
153impl Inner {
154 fn config_path(&self) -> PathBuf {
155 self.path.join("hotshot.cfg")
156 }
157
158 fn migration(&self) -> PathBuf {
159 self.path.join("migration")
160 }
161
162 fn voted_view_path(&self) -> PathBuf {
163 self.path.join("highest_voted_view")
164 }
165
166 fn restart_view_path(&self) -> PathBuf {
167 self.path.join("restart_view")
168 }
169
170 fn decided_leaf_path(&self) -> PathBuf {
172 self.path.join("decided_leaves")
173 }
174
175 fn decided_leaf2_path(&self) -> PathBuf {
176 self.path.join("decided_leaves2")
177 }
178
179 fn legacy_anchor_leaf_path(&self) -> PathBuf {
181 self.path.join("anchor_leaf")
182 }
183
184 fn vid_dir_path(&self) -> PathBuf {
185 self.path.join("vid")
186 }
187
188 fn vid2_dir_path(&self) -> PathBuf {
189 self.path.join("vid2")
190 }
191
192 fn da_dir_path(&self) -> PathBuf {
193 self.path.join("da")
194 }
195
196 fn drb_dir_path(&self) -> PathBuf {
197 self.path.join("drb")
198 }
199
200 fn da2_dir_path(&self) -> PathBuf {
201 self.path.join("da2")
202 }
203
204 fn quorum_proposals_dir_path(&self) -> PathBuf {
205 self.path.join("quorum_proposals")
206 }
207
208 fn quorum_proposals2_dir_path(&self) -> PathBuf {
209 self.path.join("quorum_proposals2")
210 }
211
212 fn upgrade_certificate_dir_path(&self) -> PathBuf {
213 self.path.join("upgrade_certificate")
214 }
215
216 fn stake_table_dir_path(&self) -> PathBuf {
217 self.path.join("stake_table")
218 }
219
220 fn next_epoch_qc(&self) -> PathBuf {
221 self.path.join("next_epoch_quorum_certificate")
222 }
223
224 fn libp2p_dht_path(&self) -> PathBuf {
225 self.path.join("libp2p_dht")
226 }
227 fn epoch_drb_result_dir_path(&self) -> PathBuf {
228 self.path.join("epoch_drb_result")
229 }
230
231 fn epoch_root_block_header_dir_path(&self) -> PathBuf {
232 self.path.join("epoch_root_block_header")
233 }
234
235 fn finalized_state_cert_dir_path(&self) -> PathBuf {
236 self.path.join("finalized_state_cert")
237 }
238
239 fn state_cert_dir_path(&self) -> PathBuf {
240 self.path.join("state_cert")
241 }
242
243 fn update_migration(&mut self) -> anyhow::Result<()> {
244 let path = self.migration();
245 let bytes = bincode::serialize(&self.migrated)?;
246
247 self.replace(
248 &path,
249 |_| Ok(true),
250 |mut file| {
251 file.write_all(&bytes)?;
252 Ok(())
253 },
254 )
255 }
256
257 fn replace(
267 &mut self,
268 path: &Path,
269 pred: impl FnOnce(File) -> anyhow::Result<bool>,
270 write: impl FnOnce(File) -> anyhow::Result<()>,
271 ) -> anyhow::Result<()> {
272 if path.is_file() {
273 if !pred(File::open(path)?)? {
278 return Ok(());
281 }
282 }
283
284 let mut swap_path = path.to_owned();
287 swap_path.set_extension("swp");
288 let swap = OpenOptions::new()
289 .write(true)
290 .truncate(true)
291 .create(true)
292 .open(&swap_path)?;
293 write(swap)?;
294
295 fs::rename(swap_path, path)?;
297
298 Ok(())
299 }
300
301 fn collect_garbage(
302 &mut self,
303 decided_view: ViewNumber,
304 prune_intervals: &[RangeInclusive<ViewNumber>],
305 ) -> anyhow::Result<()> {
306 let prune_view = ViewNumber::new(decided_view.saturating_sub(self.view_retention));
307
308 self.prune_files(self.da2_dir_path(), prune_view, None, prune_intervals)?;
309 self.prune_files(self.vid2_dir_path(), prune_view, None, prune_intervals)?;
310 self.prune_files(
311 self.quorum_proposals2_dir_path(),
312 prune_view,
313 None,
314 prune_intervals,
315 )?;
316 self.prune_files(
317 self.state_cert_dir_path(),
318 prune_view,
319 None,
320 prune_intervals,
321 )?;
322
323 self.prune_files(
325 self.decided_leaf2_path(),
326 prune_view,
327 Some(decided_view),
328 prune_intervals,
329 )?;
330
331 Ok(())
332 }
333
334 fn prune_files(
335 &mut self,
336 dir_path: PathBuf,
337 prune_view: ViewNumber,
338 keep_decided_view: Option<ViewNumber>,
339 prune_intervals: &[RangeInclusive<ViewNumber>],
340 ) -> anyhow::Result<()> {
341 if !dir_path.is_dir() {
342 return Ok(());
343 }
344
345 for (file_view, path) in view_files(dir_path)? {
346 if let Some(decided_view) = keep_decided_view {
348 if decided_view == file_view {
349 continue;
350 }
351 }
352 if file_view < prune_view || prune_intervals.iter().any(|i| i.contains(&file_view)) {
356 fs::remove_file(&path)?;
357 }
358 }
359
360 Ok(())
361 }
362
363 async fn generate_decide_events(
368 &self,
369 view: ViewNumber,
370 consumer: &impl EventConsumer,
371 ) -> anyhow::Result<Vec<RangeInclusive<ViewNumber>>> {
372 let mut leaves = BTreeMap::new();
376 for (v, path) in view_files(self.decided_leaf2_path())? {
377 if v > view {
378 continue;
379 }
380
381 let bytes =
382 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
383 let (mut leaf, qc) =
384 bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(&bytes)
385 .context(format!("parsing decided leaf {}", path.display()))?;
386
387 let vid_share = self.load_vid_share(v)?.map(|proposal| proposal.data);
389 if vid_share.is_none() {
390 tracing::debug!(?v, "VID share not available at decide");
391 }
392
393 let state_cert = self.finalized_state_cert(v)?;
395
396 if let Some(proposal) = self.load_da_proposal(v)? {
398 let payload = Payload::from_bytes(
399 &proposal.data.encoded_transactions,
400 &proposal.data.metadata,
401 );
402 leaf.fill_block_payload_unchecked(payload);
403 } else {
404 tracing::debug!(?v, "DA proposal not available at decide");
405 }
406
407 let info = LeafInfo {
408 leaf,
409 vid_share,
410 state_cert,
411 state: Default::default(),
414 delta: Default::default(),
415 };
416
417 leaves.insert(v, (info, qc));
418 }
419
420 if let Some((oldest_view, _)) = leaves.first_key_value() {
424 if *oldest_view > ViewNumber::genesis() {
427 leaves.pop_first();
428 }
429 }
430
431 let mut intervals = vec![];
432 let mut current_interval = None;
433 for (view, (leaf, qc)) in leaves {
434 let height = leaf.leaf.block_header().block_number();
435 consumer
436 .handle_event(&Event {
437 view_number: view,
438 event: EventType::Decide {
439 qc: Arc::new(qc),
440 leaf_chain: Arc::new(vec![leaf]),
441 block_size: None,
442 },
443 })
444 .await?;
445 if let Some((start, end, current_height)) = current_interval.as_mut() {
446 if height == *current_height + 1 {
447 *current_height += 1;
450 *end = view;
451 } else {
452 intervals.push(*start..=*end);
454 current_interval = Some((view, view, height));
455 }
456 } else {
457 current_interval = Some((view, view, height));
459 }
460 }
461 if let Some((start, end, _)) = current_interval {
462 intervals.push(start..=end);
463 }
464
465 Ok(intervals)
466 }
467
468 fn load_da_proposal(
469 &self,
470 view: ViewNumber,
471 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
472 let dir_path = self.da2_dir_path();
473
474 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
475
476 if !file_path.exists() {
477 return Ok(None);
478 }
479
480 let da_bytes = fs::read(file_path)?;
481
482 let da_proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
483 bincode::deserialize(&da_bytes)?;
484 Ok(Some(da_proposal))
485 }
486
487 fn load_vid_share(
488 &self,
489 view: ViewNumber,
490 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
491 let dir_path = self.vid2_dir_path();
492
493 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
494
495 if !file_path.exists() {
496 return Ok(None);
497 }
498
499 let vid_share_bytes = fs::read(file_path)?;
500 let vid_share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
501 bincode::deserialize(&vid_share_bytes)?;
502 Ok(Some(vid_share))
503 }
504
505 fn load_anchor_leaf(&self) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
506 tracing::info!("Checking `Leaf2` to load the anchor leaf.");
507 if self.decided_leaf2_path().is_dir() {
508 let mut anchor: Option<(Leaf2, QuorumCertificate2<SeqTypes>)> = None;
509
510 for (_, path) in view_files(self.decided_leaf2_path())? {
512 let bytes =
513 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
514 let (leaf2, qc2) =
515 bincode::deserialize::<(Leaf2, QuorumCertificate2<SeqTypes>)>(&bytes)
516 .context(format!("parsing decided leaf {}", path.display()))?;
517 if let Some((anchor_leaf, _)) = &anchor {
518 if leaf2.view_number() > anchor_leaf.view_number() {
519 anchor = Some((leaf2, qc2));
520 }
521 } else {
522 anchor = Some((leaf2, qc2));
523 }
524 }
525
526 return Ok(anchor);
527 }
528
529 tracing::warn!(
530 "Failed to find an anchor leaf in `Leaf2` storage. Checking legacy `Leaf` storage. \
531 This is very likely to fail."
532 );
533 if self.legacy_anchor_leaf_path().is_file() {
534 let mut file = BufReader::new(File::open(self.legacy_anchor_leaf_path())?);
537
538 file.seek(SeekFrom::Start(8)).context("seek")?;
540 let bytes = file
541 .bytes()
542 .collect::<Result<Vec<_>, _>>()
543 .context("read")?;
544 return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?));
545 }
546
547 Ok(None)
548 }
549
550 fn finalized_state_cert(
551 &self,
552 view: ViewNumber,
553 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
554 let dir_path = self.state_cert_dir_path();
555 let file_path = dir_path.join(view.u64().to_string()).with_extension("txt");
556
557 if !file_path.exists() {
558 return Ok(None);
559 }
560
561 let bytes = fs::read(&file_path)?;
562
563 let state_cert: LightClientStateUpdateCertificateV2<SeqTypes> =
564 bincode::deserialize(&bytes).or_else(|err_v2| {
565 tracing::info!(
566 error = %err_v2,
567 path = %file_path.display(),
568 "Failed to deserialize state certificate, attempting with v1"
569 );
570
571 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
572 .map(Into::into)
573 .with_context(|| {
574 format!(
575 "Failed to deserialize with both v2 and v1 from file '{}'. error: \
576 {err_v2}",
577 file_path.display()
578 )
579 })
580 })?;
581
582 let epoch = state_cert.epoch.u64();
583 let finalized_dir_path = self.finalized_state_cert_dir_path();
584 fs::create_dir_all(&finalized_dir_path).context("creating finalized state cert dir")?;
585
586 let finalized_file_path = finalized_dir_path
587 .join(epoch.to_string())
588 .with_extension("txt");
589
590 fs::write(&finalized_file_path, &bytes).context(format!(
591 "finalizing light client state update certificate file for epoch {epoch:?}"
592 ))?;
593
594 Ok(Some(state_cert))
595 }
596}
597
598#[async_trait]
599impl SequencerPersistence for Persistence {
600 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
601 let inner = self.inner.read().await;
602 let path = inner.config_path();
603 if !path.is_file() {
604 tracing::info!("config not found at {}", path.display());
605 return Ok(None);
606 }
607 tracing::info!("loading config from {}", path.display());
608
609 let bytes =
610 fs::read(&path).context(format!("unable to read config from {}", path.display()))?;
611 let json = serde_json::from_slice(&bytes).context("config file is not valid JSON")?;
612 let json = migrate_network_config(json).context("migration of network config failed")?;
613 let config = serde_json::from_value(json).context("malformed config file")?;
614 Ok(Some(config))
615 }
616
617 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
618 let inner = self.inner.write().await;
619 let path = inner.config_path();
620 tracing::info!("saving config to {}", path.display());
621 Ok(cfg.to_file(path.display().to_string())?)
622 }
623
624 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
625 let inner = self.inner.read().await;
626 let path = inner.voted_view_path();
627 if !path.is_file() {
628 return Ok(None);
629 }
630 let bytes = fs::read(inner.voted_view_path())?
631 .try_into()
632 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
633 Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
634 }
635
636 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
637 let inner = self.inner.read().await;
638 let path = inner.restart_view_path();
639 if !path.is_file() {
640 return Ok(None);
641 }
642 let bytes = fs::read(path)?
643 .try_into()
644 .map_err(|bytes| anyhow!("malformed restart view file: {bytes:?}"))?;
645 Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes))))
646 }
647
648 async fn append_decided_leaves(
649 &self,
650 view: ViewNumber,
651 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, QuorumCertificate2<SeqTypes>)> + Send,
652 consumer: &impl EventConsumer,
653 ) -> anyhow::Result<()> {
654 let mut inner = self.inner.write().await;
655 let path = inner.decided_leaf2_path();
656
657 fs::create_dir_all(&path).context("creating anchor leaf directory")?;
659
660 let legacy_path = inner.legacy_anchor_leaf_path();
663 if !path.is_dir() && legacy_path.is_file() {
664 tracing::info!("migrating to multi-leaf storage");
665
666 let (leaf, qc) = inner
668 .load_anchor_leaf()?
669 .context("anchor leaf file exists but unable to load contents")?;
670 let view = leaf.view_number().u64();
671 let bytes = bincode::serialize(&(leaf, qc))?;
672 let new_file = path.join(view.to_string()).with_extension("txt");
673 fs::write(new_file, bytes).context(format!("writing anchor leaf file {view}"))?;
674
675 fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?;
677 }
678
679 for (info, qc2) in leaf_chain {
680 let view = info.leaf.view_number().u64();
681 let file_path = path.join(view.to_string()).with_extension("txt");
682 inner.replace(
683 &file_path,
684 |_| {
685 tracing::warn!(view, "duplicate decided leaf");
688 Ok(false)
689 },
690 |mut file| {
691 let bytes = bincode::serialize(&(&info.leaf.clone(), qc2))?;
692 file.write_all(&bytes)?;
693 Ok(())
694 },
695 )?;
696 }
697
698 match inner.generate_decide_events(view, consumer).await {
699 Err(err) => {
700 tracing::warn!(?view, "event processing failed: {err:#}");
704 },
705 Ok(intervals) => {
706 if let Err(err) = inner.collect_garbage(view, &intervals) {
707 tracing::warn!(?view, "GC failed: {err:#}");
711 }
712 },
713 }
714
715 Ok(())
716 }
717
718 async fn load_anchor_leaf(
719 &self,
720 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
721 self.inner.read().await.load_anchor_leaf()
722 }
723
724 async fn load_da_proposal(
725 &self,
726 view: ViewNumber,
727 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
728 self.inner.read().await.load_da_proposal(view)
729 }
730
731 async fn load_vid_share(
732 &self,
733 view: ViewNumber,
734 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
735 self.inner.read().await.load_vid_share(view)
736 }
737
738 async fn append_vid(
739 &self,
740 proposal: &Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>,
741 ) -> anyhow::Result<()> {
742 let mut inner = self.inner.write().await;
743 let view_number = proposal.data.view_number().u64();
744 let dir_path = inner.vid2_dir_path();
745
746 fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
747
748 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
749 inner.replace(
750 &file_path,
751 |_| {
752 tracing::warn!(view_number, "duplicate VID share");
755 Ok(false)
756 },
757 |mut file| {
758 let proposal: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
759 convert_proposal(proposal.clone());
760 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
761 let now = Instant::now();
762 file.write_all(&proposal_bytes)?;
763 self.metrics
764 .internal_append_vid_duration
765 .add_point(now.elapsed().as_secs_f64());
766 Ok(())
767 },
768 )
769 }
770 async fn append_vid2(
771 &self,
772 proposal: &Proposal<SeqTypes, VidDisperseShare2<SeqTypes>>,
773 ) -> anyhow::Result<()> {
774 let mut inner = self.inner.write().await;
775 let view_number = proposal.data.view_number().u64();
776
777 let dir_path = inner.vid2_dir_path();
778
779 fs::create_dir_all(dir_path.clone()).context("failed to create vid dir")?;
780
781 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
782
783 inner.replace(
784 &file_path,
785 |_| {
786 tracing::warn!(view_number, "duplicate VID share");
789 Ok(false)
790 },
791 |mut file| {
792 let proposal: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
793 convert_proposal(proposal.clone());
794 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
795 let now = Instant::now();
796 file.write_all(&proposal_bytes)?;
797 self.metrics
798 .internal_append_vid2_duration
799 .add_point(now.elapsed().as_secs_f64());
800 Ok(())
801 },
802 )
803 }
804 async fn append_da(
805 &self,
806 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
807 _vid_commit: VidCommitment,
808 ) -> anyhow::Result<()> {
809 let mut inner = self.inner.write().await;
810 let view_number = proposal.data.view_number().u64();
811 let dir_path = inner.da_dir_path();
812
813 fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
814
815 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
816 inner.replace(
817 &file_path,
818 |_| {
819 tracing::warn!(view_number, "duplicate DA proposal");
822 Ok(false)
823 },
824 |mut file| {
825 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
826 let now = Instant::now();
827 file.write_all(&proposal_bytes)?;
828 self.metrics
829 .internal_append_da_duration
830 .add_point(now.elapsed().as_secs_f64());
831 Ok(())
832 },
833 )
834 }
835 async fn record_action(
836 &self,
837 view: ViewNumber,
838 _epoch: Option<EpochNumber>,
839 action: HotShotAction,
840 ) -> anyhow::Result<()> {
841 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
843 return Ok(());
844 }
845 let mut inner = self.inner.write().await;
846 let path = &inner.voted_view_path();
847 inner.replace(
848 path,
849 |mut file| {
850 let mut bytes = vec![];
851 file.read_to_end(&mut bytes)?;
852 let bytes = bytes
853 .try_into()
854 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
855 let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
856
857 Ok(saved_view < view)
859 },
860 |mut file| {
861 file.write_all(&view.u64().to_le_bytes())?;
862 Ok(())
863 },
864 )?;
865
866 if matches!(action, HotShotAction::Vote) {
867 let restart_view_path = &inner.restart_view_path();
868 let restart_view = view + 1;
869 inner.replace(
870 restart_view_path,
871 |mut file| {
872 let mut bytes = vec![];
873 file.read_to_end(&mut bytes)?;
874 let bytes = bytes
875 .try_into()
876 .map_err(|bytes| anyhow!("malformed voted view file: {bytes:?}"))?;
877 let saved_view = ViewNumber::new(u64::from_le_bytes(bytes));
878
879 Ok(saved_view < restart_view)
881 },
882 |mut file| {
883 file.write_all(&restart_view.u64().to_le_bytes())?;
884 Ok(())
885 },
886 )?;
887 }
888 Ok(())
889 }
890
891 async fn append_quorum_proposal2(
892 &self,
893 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
894 ) -> anyhow::Result<()> {
895 let mut inner = self.inner.write().await;
896 let view_number = proposal.data.view_number().u64();
897 let dir_path = inner.quorum_proposals2_dir_path();
898
899 fs::create_dir_all(dir_path.clone()).context("failed to create proposals dir")?;
900
901 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
902 inner.replace(
903 &file_path,
904 |_| {
905 Ok(true)
907 },
908 |mut file| {
909 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
910 let now = Instant::now();
911 file.write_all(&proposal_bytes)?;
912 self.metrics
913 .internal_append_quorum2_duration
914 .add_point(now.elapsed().as_secs_f64());
915 Ok(())
916 },
917 )
918 }
919 async fn load_quorum_proposals(
920 &self,
921 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
922 {
923 let inner = self.inner.read().await;
924
925 let dir_path = inner.quorum_proposals2_dir_path();
927 if !dir_path.is_dir() {
928 return Ok(Default::default());
929 }
930
931 let mut map = BTreeMap::new();
933 for (view, path) in view_files(&dir_path)? {
934 let proposal_bytes = fs::read(path)?;
935 let Some(proposal) = bincode::deserialize::<
936 Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
937 >(&proposal_bytes)
938 .or_else(|error| {
939 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
940 &proposal_bytes,
941 )
942 .map(convert_proposal)
943 .inspect_err(|err_v3| {
944 tracing::warn!(
952 ?view,
953 %error,
954 error_v3 = %err_v3,
955 "ignoring malformed quorum proposal file"
956 );
957 })
958 })
959 .ok() else {
960 continue;
961 };
962
963 let proposal2 = convert_proposal(proposal);
964
965 map.insert(view, proposal2);
967 }
968
969 Ok(map)
970 }
971
972 async fn load_quorum_proposal(
973 &self,
974 view: ViewNumber,
975 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
976 let inner = self.inner.read().await;
977 let dir_path = inner.quorum_proposals2_dir_path();
978 let file_path = dir_path.join(view.to_string()).with_extension("txt");
979 let bytes = fs::read(file_path)?;
980 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
981 bincode::deserialize(&bytes).or_else(|error| {
982 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
983 &bytes,
984 )
985 .map(convert_proposal)
986 .context(format!(
987 "Failed to deserialize quorum proposal for view {view:?}: {error}."
988 ))
989 })?;
990 Ok(proposal)
991 }
992
993 async fn load_upgrade_certificate(
994 &self,
995 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
996 let inner = self.inner.read().await;
997 let path = inner.upgrade_certificate_dir_path();
998 if !path.is_file() {
999 return Ok(None);
1000 }
1001 let bytes = fs::read(&path).context("read")?;
1002 Ok(Some(
1003 bincode::deserialize(&bytes).context("deserialize upgrade certificate")?,
1004 ))
1005 }
1006
1007 async fn store_upgrade_certificate(
1008 &self,
1009 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1010 ) -> anyhow::Result<()> {
1011 let mut inner = self.inner.write().await;
1012 let path = &inner.upgrade_certificate_dir_path();
1013 let certificate = match decided_upgrade_certificate {
1014 Some(cert) => cert,
1015 None => return Ok(()),
1016 };
1017 inner.replace(
1018 path,
1019 |_| {
1020 Ok(true)
1022 },
1023 |mut file| {
1024 let bytes =
1025 bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1026 file.write_all(&bytes)?;
1027 Ok(())
1028 },
1029 )
1030 }
1031
1032 async fn store_next_epoch_quorum_certificate(
1033 &self,
1034 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1035 ) -> anyhow::Result<()> {
1036 let mut inner = self.inner.write().await;
1037 let path = &inner.next_epoch_qc();
1038
1039 inner.replace(
1040 path,
1041 |_| {
1042 Ok(true)
1044 },
1045 |mut file| {
1046 let bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
1047 file.write_all(&bytes)?;
1048 Ok(())
1049 },
1050 )
1051 }
1052
1053 async fn load_next_epoch_quorum_certificate(
1054 &self,
1055 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
1056 let inner = self.inner.read().await;
1057 let path = inner.next_epoch_qc();
1058 if !path.is_file() {
1059 return Ok(None);
1060 }
1061 let bytes = fs::read(&path).context("read")?;
1062 Ok(Some(
1063 bincode::deserialize(&bytes).context("deserialize next epoch qc")?,
1064 ))
1065 }
1066
1067 async fn append_da2(
1068 &self,
1069 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1070 _vid_commit: VidCommitment,
1071 ) -> anyhow::Result<()> {
1072 let mut inner = self.inner.write().await;
1073 let view_number = proposal.data.view_number().u64();
1074 let dir_path = inner.da2_dir_path();
1075
1076 fs::create_dir_all(dir_path.clone()).context("failed to create da dir")?;
1077
1078 let file_path = dir_path.join(view_number.to_string()).with_extension("txt");
1079 inner.replace(
1080 &file_path,
1081 |_| {
1082 tracing::warn!(view_number, "duplicate DA proposal");
1085 Ok(false)
1086 },
1087 |mut file| {
1088 let proposal_bytes = bincode::serialize(&proposal).context("serialize proposal")?;
1089 let now = Instant::now();
1090 file.write_all(&proposal_bytes)?;
1091 self.metrics
1092 .internal_append_da2_duration
1093 .add_point(now.elapsed().as_secs_f64());
1094 Ok(())
1095 },
1096 )
1097 }
1098
1099 async fn append_proposal2(
1100 &self,
1101 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1102 ) -> anyhow::Result<()> {
1103 self.append_quorum_proposal2(proposal).await
1104 }
1105
1106 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1107 let mut inner = self.inner.write().await;
1108
1109 if inner.migrated.contains("anchor_leaf") {
1110 tracing::info!("decided leaves already migrated");
1111 return Ok(());
1112 }
1113
1114 let new_leaf_dir = inner.decided_leaf2_path();
1115
1116 fs::create_dir_all(new_leaf_dir.clone()).context("failed to create anchor leaf 2 dir")?;
1117
1118 let old_leaf_dir = inner.decided_leaf_path();
1119 if !old_leaf_dir.is_dir() {
1120 return Ok(());
1121 }
1122
1123 tracing::warn!("migrating decided leaves..");
1124 for entry in fs::read_dir(old_leaf_dir)? {
1125 let entry = entry?;
1126 let path = entry.path();
1127
1128 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1129 continue;
1130 };
1131 let Ok(view) = file.parse::<u64>() else {
1132 continue;
1133 };
1134
1135 let bytes =
1136 fs::read(&path).context(format!("reading decided leaf {}", path.display()))?;
1137 let (leaf, qc) = bincode::deserialize::<(Leaf, QuorumCertificate<SeqTypes>)>(&bytes)
1138 .context(format!("parsing decided leaf {}", path.display()))?;
1139
1140 let leaf2: Leaf2 = leaf.into();
1141 let qc2 = qc.to_qc2();
1142
1143 let new_leaf_path = new_leaf_dir.join(view.to_string()).with_extension("txt");
1144
1145 inner.replace(
1146 &new_leaf_path,
1147 |_| {
1148 tracing::warn!(view, "duplicate decided leaf");
1149 Ok(false)
1150 },
1151 |mut file| {
1152 let bytes = bincode::serialize(&(&leaf2.clone(), qc2))?;
1153 file.write_all(&bytes)?;
1154 Ok(())
1155 },
1156 )?;
1157
1158 if view % 100 == 0 {
1159 tracing::info!(view, "decided leaves migration progress");
1160 }
1161 }
1162
1163 inner.migrated.insert("anchor_leaf".to_string());
1164 inner.update_migration()?;
1165 tracing::warn!("successfully migrated decided leaves");
1166 Ok(())
1167 }
1168 async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1169 let mut inner = self.inner.write().await;
1170
1171 if inner.migrated.contains("da_proposal") {
1172 tracing::info!("da proposals already migrated");
1173 return Ok(());
1174 }
1175
1176 let new_da_dir = inner.da2_dir_path();
1177
1178 fs::create_dir_all(new_da_dir.clone()).context("failed to create da proposals 2 dir")?;
1179
1180 let old_da_dir = inner.da_dir_path();
1181 if !old_da_dir.is_dir() {
1182 return Ok(());
1183 }
1184
1185 tracing::warn!("migrating da proposals..");
1186
1187 for entry in fs::read_dir(old_da_dir)? {
1188 let entry = entry?;
1189 let path = entry.path();
1190
1191 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1192 continue;
1193 };
1194 let Ok(view) = file.parse::<u64>() else {
1195 continue;
1196 };
1197
1198 let bytes =
1199 fs::read(&path).context(format!("reading da proposal {}", path.display()))?;
1200 let proposal = bincode::deserialize::<Proposal<SeqTypes, DaProposal<SeqTypes>>>(&bytes)
1201 .context(format!("parsing da proposal {}", path.display()))?;
1202
1203 let new_da_path = new_da_dir.join(view.to_string()).with_extension("txt");
1204
1205 let proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> = convert_proposal(proposal);
1206
1207 inner.replace(
1208 &new_da_path,
1209 |_| {
1210 tracing::warn!(view, "duplicate DA proposal 2");
1211 Ok(false)
1212 },
1213 |mut file| {
1214 let bytes = bincode::serialize(&proposal2)?;
1215 file.write_all(&bytes)?;
1216 Ok(())
1217 },
1218 )?;
1219
1220 if view % 100 == 0 {
1221 tracing::info!(view, "DA proposals migration progress");
1222 }
1223 }
1224
1225 inner.migrated.insert("da_proposal".to_string());
1226 inner.update_migration()?;
1227 tracing::warn!("successfully migrated da proposals");
1228 Ok(())
1229 }
1230 async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1231 let mut inner = self.inner.write().await;
1232
1233 if inner.migrated.contains("vid_share") {
1234 tracing::info!("vid shares already migrated");
1235 return Ok(());
1236 }
1237
1238 let new_vid_dir = inner.vid2_dir_path();
1239
1240 fs::create_dir_all(new_vid_dir.clone()).context("failed to create vid shares 2 dir")?;
1241
1242 let old_vid_dir = inner.vid_dir_path();
1243 if !old_vid_dir.is_dir() {
1244 return Ok(());
1245 }
1246
1247 tracing::warn!("migrating vid shares..");
1248
1249 for entry in fs::read_dir(old_vid_dir)? {
1250 let entry = entry?;
1251 let path = entry.path();
1252
1253 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1254 continue;
1255 };
1256 let Ok(view) = file.parse::<u64>() else {
1257 continue;
1258 };
1259
1260 let bytes = fs::read(&path).context(format!("reading vid share {}", path.display()))?;
1261 let proposal =
1262 bincode::deserialize::<Proposal<SeqTypes, ADVZDisperseShare<SeqTypes>>>(&bytes)
1263 .context(format!("parsing vid share {}", path.display()))?;
1264
1265 let new_vid_path = new_vid_dir.join(view.to_string()).with_extension("txt");
1266
1267 let proposal2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
1268 convert_proposal(proposal);
1269
1270 inner.replace(
1271 &new_vid_path,
1272 |_| {
1273 tracing::warn!(view, "duplicate VID share ");
1274 Ok(false)
1275 },
1276 |mut file| {
1277 let bytes = bincode::serialize(&proposal2)?;
1278 file.write_all(&bytes)?;
1279 Ok(())
1280 },
1281 )?;
1282
1283 if view % 100 == 0 {
1284 tracing::info!(view, "VID shares migration progress");
1285 }
1286 }
1287
1288 inner.migrated.insert("vid_share".to_string());
1289 inner.update_migration()?;
1290 tracing::warn!("successfully migrated vid shares");
1291 Ok(())
1292 }
1293
1294 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
1295 let mut inner = self.inner.write().await;
1296
1297 if inner.migrated.contains("quorum_proposals") {
1298 tracing::info!("quorum proposals already migrated");
1299 return Ok(());
1300 }
1301
1302 let new_quorum_proposals_dir = inner.quorum_proposals2_dir_path();
1303
1304 fs::create_dir_all(new_quorum_proposals_dir.clone())
1305 .context("failed to create quorum proposals 2 dir")?;
1306
1307 let old_quorum_proposals_dir = inner.quorum_proposals_dir_path();
1308 if !old_quorum_proposals_dir.is_dir() {
1309 tracing::info!("no existing quorum proposals found for migration");
1310 return Ok(());
1311 }
1312
1313 tracing::warn!("migrating quorum proposals..");
1314 for entry in fs::read_dir(old_quorum_proposals_dir)? {
1315 let entry = entry?;
1316 let path = entry.path();
1317
1318 let Some(file) = path.file_stem().and_then(|n| n.to_str()) else {
1319 continue;
1320 };
1321 let Ok(view) = file.parse::<u64>() else {
1322 continue;
1323 };
1324
1325 let bytes =
1326 fs::read(&path).context(format!("reading quorum proposal {}", path.display()))?;
1327 let proposal =
1328 bincode::deserialize::<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>(&bytes)
1329 .context(format!("parsing quorum proposal {}", path.display()))?;
1330
1331 let new_file_path = new_quorum_proposals_dir
1332 .join(view.to_string())
1333 .with_extension("txt");
1334
1335 let proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1336 convert_proposal(proposal);
1337
1338 inner.replace(
1339 &new_file_path,
1340 |_| {
1341 tracing::warn!(view, "duplicate Quorum proposal2 ");
1342 Ok(false)
1343 },
1344 |mut file| {
1345 let bytes = bincode::serialize(&proposal2)?;
1346 file.write_all(&bytes)?;
1347 Ok(())
1348 },
1349 )?;
1350
1351 if view % 100 == 0 {
1352 tracing::info!(view, "Quorum proposals migration progress");
1353 }
1354 }
1355
1356 inner.migrated.insert("quorum_proposals".to_string());
1357 inner.update_migration()?;
1358 tracing::warn!("successfully migrated quorum proposals");
1359 Ok(())
1360 }
1361 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
1362 Ok(())
1363 }
1364
1365 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1366 if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
1367 if loaded_drb_input.iteration >= drb_input.iteration {
1368 anyhow::bail!(
1369 "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
1370 loaded_drb_input,
1371 drb_input
1372 )
1373 }
1374 }
1375
1376 let inner = self.inner.write().await;
1377 let dir_path = inner.drb_dir_path();
1378
1379 fs::create_dir_all(dir_path.clone()).context("failed to create drb dir")?;
1380
1381 let drb_input_bytes =
1382 bincode::serialize(&drb_input).context("failed to serialize drb_input")?;
1383
1384 let file_path = dir_path
1385 .join(drb_input.epoch.to_string())
1386 .with_extension("bin");
1387 fs::write(&file_path, drb_input_bytes).context(format!(
1388 "writing epoch drb_input file for epoch {:?} at {:?}",
1389 drb_input.epoch, file_path
1390 ))
1391 }
1392
1393 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1394 let inner = self.inner.read().await;
1395 let path = &inner.drb_dir_path();
1396 let file_path = path.join(epoch.to_string()).with_extension("bin");
1397 let bytes = fs::read(&file_path).context("read")?;
1398 Ok(bincode::deserialize(&bytes)
1399 .context(format!("failed to deserialize DrbInput for epoch {epoch}"))?)
1400 }
1401
1402 async fn store_drb_result(
1403 &self,
1404 epoch: EpochNumber,
1405 drb_result: DrbResult,
1406 ) -> anyhow::Result<()> {
1407 let inner = self.inner.write().await;
1408 let dir_path = inner.epoch_drb_result_dir_path();
1409
1410 fs::create_dir_all(dir_path.clone()).context("failed to create epoch drb result dir")?;
1411
1412 let drb_result_bytes = bincode::serialize(&drb_result).context("serialize drb result")?;
1413
1414 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1415 fs::write(file_path, drb_result_bytes)
1416 .context(format!("writing epoch drb result file for epoch {epoch:?}"))?;
1417
1418 Ok(())
1419 }
1420
1421 async fn store_epoch_root(
1422 &self,
1423 epoch: EpochNumber,
1424 block_header: <SeqTypes as NodeType>::BlockHeader,
1425 ) -> anyhow::Result<()> {
1426 let inner = self.inner.write().await;
1427 let dir_path = inner.epoch_root_block_header_dir_path();
1428
1429 fs::create_dir_all(dir_path.clone())
1430 .context("failed to create epoch root block header dir")?;
1431
1432 let block_header_bytes =
1433 bincode::serialize(&block_header).context("serialize block header")?;
1434
1435 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1436 fs::write(file_path, block_header_bytes).context(format!(
1437 "writing epoch root block header file for epoch {epoch:?}"
1438 ))?;
1439
1440 Ok(())
1441 }
1442
1443 async fn add_state_cert(
1444 &self,
1445 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1446 ) -> anyhow::Result<()> {
1447 let inner = self.inner.write().await;
1448 let view = state_cert.light_client_state.view_number;
1450 let dir_path = inner.state_cert_dir_path();
1451
1452 fs::create_dir_all(dir_path.clone())
1453 .context("failed to create light client state update certificate dir")?;
1454
1455 let bytes = bincode::serialize(&state_cert)
1456 .context("serialize light client state update certificate")?;
1457
1458 let file_path = dir_path.join(view.to_string()).with_extension("txt");
1459 fs::write(file_path, bytes).context(format!(
1460 "writing light client state update certificate file for view {view:?}"
1461 ))?;
1462
1463 Ok(())
1464 }
1465
1466 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
1467 let inner = self.inner.read().await;
1468 let drb_dir_path = inner.epoch_drb_result_dir_path();
1469 let block_header_dir_path = inner.epoch_root_block_header_dir_path();
1470
1471 let mut result = Vec::new();
1472
1473 if drb_dir_path.is_dir() {
1474 for (epoch, path) in epoch_files(drb_dir_path)? {
1475 let bytes = fs::read(&path)
1476 .context(format!("reading epoch drb result {}", path.display()))?;
1477 let drb_result = bincode::deserialize::<DrbResult>(&bytes)
1478 .context(format!("parsing epoch drb result {}", path.display()))?;
1479
1480 let block_header_path = block_header_dir_path
1481 .join(epoch.to_string())
1482 .with_extension("txt");
1483 let block_header = if block_header_path.is_file() {
1484 let bytes = fs::read(&block_header_path).context(format!(
1485 "reading epoch root block header {}",
1486 block_header_path.display()
1487 ))?;
1488 Some(
1489 bincode::deserialize::<<SeqTypes as NodeType>::BlockHeader>(&bytes)
1490 .context(format!(
1491 "parsing epoch root block header {}",
1492 block_header_path.display()
1493 ))?,
1494 )
1495 } else {
1496 None
1497 };
1498
1499 result.push(InitializerEpochInfo::<SeqTypes> {
1500 epoch,
1501 drb_result,
1502 block_header,
1503 });
1504 }
1505 }
1506
1507 result.sort_by(|a, b| a.epoch.cmp(&b.epoch));
1508
1509 let start = result
1511 .len()
1512 .saturating_sub(RECENT_STAKE_TABLES_LIMIT as usize);
1513 let recent = result[start..].to_vec();
1514
1515 Ok(recent)
1516 }
1517
1518 async fn load_state_cert(
1519 &self,
1520 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
1521 let inner = self.inner.read().await;
1522 let dir_path = inner.finalized_state_cert_dir_path();
1523
1524 if !dir_path.is_dir() {
1525 return Ok(None);
1526 }
1527
1528 let mut result: Option<LightClientStateUpdateCertificateV2<SeqTypes>> = None;
1529
1530 for (epoch, path) in epoch_files(dir_path)? {
1531 if result.as_ref().is_some_and(|cert| epoch <= cert.epoch) {
1532 continue;
1533 }
1534 let bytes = fs::read(&path).context(format!(
1535 "reading light client state update certificate {}",
1536 path.display()
1537 ))?;
1538 let cert =
1539 bincode::deserialize::<LightClientStateUpdateCertificateV2<SeqTypes>>(&bytes)
1540 .or_else(|error| {
1541 tracing::info!(
1542 %error,
1543 path = %path.display(),
1544 "Failed to deserialize LightClientStateUpdateCertificateV2"
1545 );
1546
1547 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(
1548 &bytes,
1549 )
1550 .map(Into::into)
1551 .with_context(|| {
1552 format!(
1553 "Failed to deserialize with v1 and v2. path='{}'. error: {error}",
1554 path.display()
1555 )
1556 })
1557 })?;
1558
1559 result = Some(cert);
1560 }
1561
1562 Ok(result)
1563 }
1564
1565 fn enable_metrics(&mut self, _metrics: &dyn Metrics) {
1566 }
1568}
1569
1570#[async_trait]
1571impl MembershipPersistence for Persistence {
1572 async fn load_stake(
1573 &self,
1574 epoch: EpochNumber,
1575 ) -> anyhow::Result<Option<(ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>)>> {
1576 let inner = self.inner.read().await;
1577 let path = &inner.stake_table_dir_path();
1578 let file_path = path.join(epoch.to_string()).with_extension("txt");
1579
1580 if !file_path.exists() {
1581 return Ok(None);
1582 }
1583
1584 let bytes = fs::read(&file_path).with_context(|| {
1585 format!("failed to read stake table file at {}", file_path.display())
1586 })?;
1587
1588 let stake = match bincode::deserialize(&bytes) {
1589 Ok(res) => res,
1590 Err(err) => {
1591 let map = bincode::deserialize::<ValidatorMap>(&bytes).with_context(|| {
1592 format!(
1593 "fallback deserialization of legacy stake table at {} failed after \
1594 initial error: {}",
1595 file_path.display(),
1596 err
1597 )
1598 })?;
1599 (map, None, None)
1600 },
1601 };
1602
1603 Ok(Some(stake))
1604 }
1605
1606 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
1607 let limit = limit as usize;
1608 let inner = self.inner.read().await;
1609 let path = &inner.stake_table_dir_path();
1610 let sorted_files = epoch_files(&path)?
1611 .sorted_by(|(e1, _), (e2, _)| e2.cmp(e1))
1612 .take(limit);
1613 let mut validator_sets: Vec<IndexedStake> = Vec::new();
1614
1615 for (epoch, file_path) in sorted_files {
1616 let bytes = fs::read(&file_path).with_context(|| {
1617 format!("failed to read stake table file at {}", file_path.display())
1618 })?;
1619
1620 let stake: (ValidatorMap, Option<RewardAmount>, Option<StakeTableHash>) =
1621 match bincode::deserialize::<(
1622 ValidatorMap,
1623 Option<RewardAmount>,
1624 Option<StakeTableHash>,
1625 )>(&bytes)
1626 {
1627 Ok(res) => res,
1628 Err(err) => {
1629 let validatormap = bincode::deserialize::<ValidatorMap>(&bytes)
1630 .with_context(|| {
1631 format!(
1632 "failed to deserialize legacy stake table at {}: fallback \
1633 also failed after initial error: {}",
1634 file_path.display(),
1635 err
1636 )
1637 })?;
1638
1639 (validatormap, None, None)
1640 },
1641 };
1642
1643 validator_sets.push((epoch, (stake.0, stake.1), stake.2));
1644 }
1645
1646 Ok(Some(validator_sets))
1647 }
1648
1649 async fn store_stake(
1650 &self,
1651 epoch: EpochNumber,
1652 stake: ValidatorMap,
1653 block_reward: Option<RewardAmount>,
1654 stake_table_hash: Option<StakeTableHash>,
1655 ) -> anyhow::Result<()> {
1656 let mut inner = self.inner.write().await;
1657 let dir_path = &inner.stake_table_dir_path();
1658
1659 fs::create_dir_all(dir_path.clone()).context("failed to create stake table dir")?;
1660
1661 let file_path = dir_path.join(epoch.to_string()).with_extension("txt");
1662
1663 inner.replace(
1664 &file_path,
1665 |_| {
1666 Ok(true)
1668 },
1669 |mut file| {
1670 let bytes = bincode::serialize(&(stake, block_reward, stake_table_hash))
1671 .context("serializing combined stake table")?;
1672 file.write_all(&bytes)?;
1673 Ok(())
1674 },
1675 )
1676 }
1677
1678 async fn store_events(
1680 &self,
1681 to_l1_block: u64,
1682 events: Vec<(EventKey, StakeTableEvent)>,
1683 ) -> anyhow::Result<()> {
1684 if events.is_empty() {
1685 return Ok(());
1686 }
1687
1688 let mut inner = self.inner.write().await;
1689 let dir_path = &inner.stake_table_dir_path();
1690 let events_dir = dir_path.join("events");
1691
1692 fs::create_dir_all(events_dir.clone()).context("failed to create events dir")?;
1693 let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1695
1696 if last_l1_finalized_path.exists() {
1698 let bytes = fs::read(&last_l1_finalized_path).with_context(|| {
1699 format!("Failed to read file at path: {last_l1_finalized_path:?}")
1700 })?;
1701 let mut buf = [0; 8];
1702 bytes
1703 .as_slice()
1704 .read_exact(&mut buf[..8])
1705 .with_context(|| {
1706 format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1707 })?;
1708 let persisted_l1_block = u64::from_le_bytes(buf);
1709 if persisted_l1_block > to_l1_block {
1710 tracing::debug!(?persisted_l1_block, ?to_l1_block, "stored l1 is greater");
1711 return Ok(());
1712 }
1713 }
1714
1715 for (event_key, event) in events {
1719 let (block_number, event_index) = event_key;
1720 let filename = format!("{block_number}_{event_index}");
1722 let file_path = events_dir.join(filename).with_extension("json");
1723
1724 if file_path.exists() {
1725 continue;
1726 }
1727
1728 let file = File::create(&file_path).context("Failed to create event file")?;
1729 let writer = BufWriter::new(file);
1730
1731 serde_json::to_writer_pretty(writer, &event)
1732 .context("Failed to write event to file")?;
1733 }
1734
1735 inner.replace(
1737 &last_l1_finalized_path,
1738 |_| Ok(true),
1739 |mut file| {
1740 let bytes = to_l1_block.to_le_bytes();
1741
1742 file.write_all(&bytes)?;
1743 tracing::debug!("updated l1 finalized ={to_l1_block:?}");
1744 Ok(())
1745 },
1746 )
1747 }
1748
1749 async fn load_events(
1760 &self,
1761 to_l1_block: u64,
1762 ) -> anyhow::Result<(
1763 Option<EventsPersistenceRead>,
1764 Vec<(EventKey, StakeTableEvent)>,
1765 )> {
1766 let inner = self.inner.read().await;
1767 let dir_path = inner.stake_table_dir_path();
1768 let events_dir = dir_path.join("events");
1769
1770 let last_l1_finalized_path = events_dir.join("last_l1_finalized").with_extension("bin");
1773
1774 if !last_l1_finalized_path.exists() || !events_dir.exists() {
1775 return Ok((None, Vec::new()));
1776 }
1777
1778 let mut events = Vec::new();
1779
1780 let bytes = fs::read(&last_l1_finalized_path)
1781 .with_context(|| format!("Failed to read file at path: {last_l1_finalized_path:?}"))?;
1782 let mut buf = [0; 8];
1783 bytes
1784 .as_slice()
1785 .read_exact(&mut buf[..8])
1786 .with_context(|| {
1787 format!("Failed to read 8 bytes from file at path: {last_l1_finalized_path:?}")
1788 })?;
1789
1790 let last_processed_l1_block = u64::from_le_bytes(buf);
1791
1792 let query_l1_block = if last_processed_l1_block > to_l1_block {
1796 to_l1_block
1797 } else {
1798 last_processed_l1_block
1799 };
1800
1801 for entry in fs::read_dir(&events_dir).context("events directory")? {
1802 let entry = entry?;
1803 let path = entry.path();
1804
1805 if !entry.file_type()?.is_file() {
1806 continue;
1807 }
1808
1809 if path
1810 .extension()
1811 .context(format!("extension for path={path:?}"))?
1812 != "json"
1813 {
1814 continue;
1815 }
1816
1817 let filename = path
1818 .file_stem()
1819 .and_then(|f| f.to_str())
1820 .unwrap_or_default();
1821
1822 let parts: Vec<&str> = filename.split('_').collect();
1823 if parts.len() != 2 {
1824 continue;
1825 }
1826
1827 let block_number = parts[0].parse::<u64>()?;
1828 let log_index = parts[1].parse::<u64>()?;
1829
1830 if block_number > query_l1_block {
1831 continue;
1832 }
1833
1834 let file =
1835 File::open(&path).context(format!("Failed to open event file. path={path:?}"))?;
1836 let reader = BufReader::new(file);
1837
1838 let event: StakeTableEvent = serde_json::from_reader(reader)
1839 .context(format!("Failed to deserialize event at path={path:?}"))?;
1840
1841 events.push(((block_number, log_index), event));
1842 }
1843
1844 events.sort_by_key(|(key, _)| *key);
1845
1846 if query_l1_block == to_l1_block {
1847 Ok((Some(EventsPersistenceRead::Complete), events))
1848 } else {
1849 Ok((
1850 Some(EventsPersistenceRead::UntilL1Block(query_l1_block)),
1851 events,
1852 ))
1853 }
1854 }
1855}
1856
1857#[async_trait]
1858impl DhtPersistentStorage for Persistence {
1859 async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
1865 let to_save =
1867 bincode::serialize(&records).with_context(|| "failed to serialize records")?;
1868
1869 let path = self.inner.read().await.libp2p_dht_path();
1871
1872 fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
1874 .with_context(|| "failed to create directory")?;
1875
1876 let mut inner = self.inner.write().await;
1878
1879 inner
1881 .replace(
1882 &path,
1883 |_| {
1884 Ok(true)
1886 },
1887 |mut file| {
1888 file.write_all(&to_save)
1889 .with_context(|| "failed to write records to file")?;
1890 Ok(())
1891 },
1892 )
1893 .with_context(|| "failed to save records to file")?;
1894
1895 Ok(())
1896 }
1897
1898 async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
1904 let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
1906 .with_context(|| "Failed to read records from file")?;
1907
1908 let records: Vec<SerializableRecord> =
1910 bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;
1911
1912 Ok(records)
1913 }
1914}
1915fn migrate_network_config(
1917 mut network_config: serde_json::Value,
1918) -> anyhow::Result<serde_json::Value> {
1919 let config = network_config
1920 .get_mut("config")
1921 .context("missing field `config`")?
1922 .as_object_mut()
1923 .context("`config` must be an object")?;
1924
1925 if !config.contains_key("builder_urls") {
1926 let url = config
1931 .remove("builder_url")
1932 .context("missing field `builder_url`")?;
1933 config.insert("builder_urls".into(), vec![url].into());
1934 }
1935
1936 if !config.contains_key("start_proposing_view") {
1942 config.insert("start_proposing_view".into(), 9007199254740991u64.into());
1943 }
1944 if !config.contains_key("stop_proposing_view") {
1945 config.insert("stop_proposing_view".into(), 0.into());
1946 }
1947 if !config.contains_key("start_voting_view") {
1948 config.insert("start_voting_view".into(), 9007199254740991u64.into());
1949 }
1950 if !config.contains_key("stop_voting_view") {
1951 config.insert("stop_voting_view".into(), 0.into());
1952 }
1953 if !config.contains_key("start_proposing_time") {
1954 config.insert("start_proposing_time".into(), 9007199254740991u64.into());
1955 }
1956 if !config.contains_key("stop_proposing_time") {
1957 config.insert("stop_proposing_time".into(), 0.into());
1958 }
1959 if !config.contains_key("start_voting_time") {
1960 config.insert("start_voting_time".into(), 9007199254740991u64.into());
1961 }
1962 if !config.contains_key("stop_voting_time") {
1963 config.insert("stop_voting_time".into(), 0.into());
1964 }
1965
1966 if !config.contains_key("epoch_height") {
1969 config.insert("epoch_height".into(), 0.into());
1970 }
1971
1972 if !config.contains_key("drb_difficulty") {
1975 config.insert("drb_difficulty".into(), 0.into());
1976 }
1977 if !config.contains_key("drb_upgrade_difficulty") {
1978 config.insert("drb_upgrade_difficulty".into(), 0.into());
1979 }
1980
1981 Ok(network_config)
1982}
1983
1984fn view_files(
1986 dir: impl AsRef<Path>,
1987) -> anyhow::Result<impl Iterator<Item = (ViewNumber, PathBuf)>> {
1988 Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
1989 let dir = dir.as_ref().display();
1990 let entry = entry.ok()?;
1991 if !entry.file_type().ok()?.is_file() {
1992 tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
1993 return None;
1994 }
1995 let path = entry.path();
1996 if path.extension()? != "txt" {
1997 tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
1998 return None;
1999 }
2000 let file_name = path.file_stem()?;
2001 let Ok(view_number) = file_name.to_string_lossy().parse::<u64>() else {
2002 tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2003 return None;
2004 };
2005 Some((ViewNumber::new(view_number), entry.path().to_owned()))
2006 }))
2007}
2008
2009fn epoch_files(
2012 dir: impl AsRef<Path>,
2013) -> anyhow::Result<impl Iterator<Item = (EpochNumber, PathBuf)>> {
2014 Ok(fs::read_dir(dir.as_ref())?.filter_map(move |entry| {
2015 let dir = dir.as_ref().display();
2016 let entry = entry.ok()?;
2017 if !entry.file_type().ok()?.is_file() {
2018 tracing::debug!(%dir, ?entry, "ignoring non-file in data directory");
2019 return None;
2020 }
2021 let path = entry.path();
2022 if path.extension()? != "txt" {
2023 tracing::debug!(%dir, ?entry, "ignoring non-text file in data directory");
2024 return None;
2025 }
2026 let file_name = path.file_stem()?;
2027 let Ok(epoch_number) = file_name.to_string_lossy().parse::<u64>() else {
2028 tracing::debug!(%dir, ?file_name, "ignoring extraneous file in data directory");
2029 return None;
2030 };
2031 Some((EpochNumber::new(epoch_number), entry.path().to_owned()))
2032 }))
2033}
2034
2035#[cfg(test)]
2036mod test {
2037 use std::marker::PhantomData;
2038
2039 use committable::{Commitment, CommitmentBoundsArkless, Committable};
2040 use espresso_types::{Header, Leaf, NodeState, PubKey, ValidatedState};
2041 use hotshot::types::SignatureKey;
2042 use hotshot_example_types::node_types::TestVersions;
2043 use hotshot_query_service::testing::mocks::MockVersions;
2044 use hotshot_types::{
2045 data::QuorumProposal2,
2046 light_client::LightClientState,
2047 simple_certificate::QuorumCertificate,
2048 simple_vote::QuorumData,
2049 traits::{
2050 block_contents::GENESIS_VID_NUM_STORAGE_NODES, node_implementation::Versions,
2051 EncodeBytes,
2052 },
2053 vid::advz::advz_scheme,
2054 };
2055 use jf_vid::VidScheme;
2056 use serde_json::json;
2057 use tempfile::TempDir;
2058 use vbs::version::StaticVersionType;
2059
2060 use super::*;
2061 use crate::{persistence::tests::TestablePersistence, BLSPubKey};
2062
2063 #[async_trait]
2064 impl TestablePersistence for Persistence {
2065 type Storage = TempDir;
2066
2067 async fn tmp_storage() -> Self::Storage {
2068 TempDir::new().unwrap()
2069 }
2070
2071 fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self> {
2072 Options::new(storage.path().into())
2073 }
2074 }
2075
2076 #[test]
2077 fn test_config_migrations_add_builder_urls() {
2078 let before = json!({
2079 "config": {
2080 "builder_url": "https://test:8080",
2081 "start_proposing_view": 1,
2082 "stop_proposing_view": 2,
2083 "start_voting_view": 1,
2084 "stop_voting_view": 2,
2085 "start_proposing_time": 1,
2086 "stop_proposing_time": 2,
2087 "start_voting_time": 1,
2088 "stop_voting_time": 2
2089 }
2090 });
2091 let after = json!({
2092 "config": {
2093 "builder_urls": ["https://test:8080"],
2094 "start_proposing_view": 1,
2095 "stop_proposing_view": 2,
2096 "start_voting_view": 1,
2097 "stop_voting_view": 2,
2098 "start_proposing_time": 1,
2099 "stop_proposing_time": 2,
2100 "start_voting_time": 1,
2101 "stop_voting_time": 2,
2102 "epoch_height": 0,
2103 "drb_difficulty": 0,
2104 "drb_upgrade_difficulty": 0,
2105 }
2106 });
2107
2108 assert_eq!(migrate_network_config(before).unwrap(), after);
2109 }
2110
2111 #[test]
2112 fn test_config_migrations_existing_builder_urls() {
2113 let before = json!({
2114 "config": {
2115 "builder_urls": ["https://test:8080", "https://test:8081"],
2116 "start_proposing_view": 1,
2117 "stop_proposing_view": 2,
2118 "start_voting_view": 1,
2119 "stop_voting_view": 2,
2120 "start_proposing_time": 1,
2121 "stop_proposing_time": 2,
2122 "start_voting_time": 1,
2123 "stop_voting_time": 2,
2124 "epoch_height": 0,
2125 "drb_difficulty": 0,
2126 "drb_upgrade_difficulty": 0,
2127 }
2128 });
2129
2130 assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2131 }
2132
2133 #[test]
2134 fn test_config_migrations_add_upgrade_params() {
2135 let before = json!({
2136 "config": {
2137 "builder_urls": ["https://test:8080", "https://test:8081"]
2138 }
2139 });
2140 let after = json!({
2141 "config": {
2142 "builder_urls": ["https://test:8080", "https://test:8081"],
2143 "start_proposing_view": 9007199254740991u64,
2144 "stop_proposing_view": 0,
2145 "start_voting_view": 9007199254740991u64,
2146 "stop_voting_view": 0,
2147 "start_proposing_time": 9007199254740991u64,
2148 "stop_proposing_time": 0,
2149 "start_voting_time": 9007199254740991u64,
2150 "stop_voting_time": 0,
2151 "epoch_height": 0,
2152 "drb_difficulty": 0,
2153 "drb_upgrade_difficulty": 0,
2154 }
2155 });
2156
2157 assert_eq!(migrate_network_config(before).unwrap(), after);
2158 }
2159
2160 #[test]
2161 fn test_config_migrations_existing_upgrade_params() {
2162 let before = json!({
2163 "config": {
2164 "builder_urls": ["https://test:8080", "https://test:8081"],
2165 "start_proposing_view": 1,
2166 "stop_proposing_view": 2,
2167 "start_voting_view": 1,
2168 "stop_voting_view": 2,
2169 "start_proposing_time": 1,
2170 "stop_proposing_time": 2,
2171 "start_voting_time": 1,
2172 "stop_voting_time": 2,
2173 "epoch_height": 0,
2174 "drb_difficulty": 0,
2175 "drb_upgrade_difficulty": 0,
2176 }
2177 });
2178
2179 assert_eq!(migrate_network_config(before.clone()).unwrap(), before);
2180 }
2181
2182 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2183 pub async fn test_consensus_migration() {
2184 let rows = 300;
2185 let tmp = Persistence::tmp_storage().await;
2186 let mut opt = Persistence::options(&tmp);
2187 let storage = opt.create().await.unwrap();
2188
2189 let inner = storage.inner.read().await;
2190
2191 let decided_leaves_path = inner.decided_leaf_path();
2192 fs::create_dir_all(decided_leaves_path.clone()).expect("failed to create proposals dir");
2193
2194 let qp_dir_path = inner.quorum_proposals_dir_path();
2195 fs::create_dir_all(qp_dir_path.clone()).expect("failed to create proposals dir");
2196
2197 let state_cert_dir_path = inner.state_cert_dir_path();
2198 fs::create_dir_all(state_cert_dir_path.clone()).expect("failed to create state cert dir");
2199 drop(inner);
2200
2201 assert!(storage.load_state_cert().await.unwrap().is_none());
2202
2203 for i in 0..rows {
2204 let view = ViewNumber::new(i);
2205 let validated_state = ValidatedState::default();
2206 let instance_state = NodeState::default();
2207
2208 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
2209 let (payload, metadata) =
2210 Payload::from_transactions([], &validated_state, &instance_state)
2211 .await
2212 .unwrap();
2213
2214 let payload_bytes = payload.encode();
2215
2216 let block_header =
2217 Header::genesis::<TestVersions>(&instance_state, payload.clone(), &metadata);
2218
2219 let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
2220 epoch: EpochNumber::new(i),
2221 light_client_state: LightClientState {
2222 view_number: i,
2223 block_height: i,
2224 block_comm_root: Default::default(),
2225 },
2226 next_stake_table_state: Default::default(),
2227 signatures: vec![], auth_root: Default::default(),
2229 };
2230 assert!(storage.add_state_cert(state_cert).await.is_ok());
2231
2232 let null_quorum_data = QuorumData {
2233 leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
2234 };
2235
2236 let justify_qc = QuorumCertificate::new(
2237 null_quorum_data.clone(),
2238 null_quorum_data.commit(),
2239 view,
2240 None,
2241 PhantomData,
2242 );
2243
2244 let quorum_proposal = QuorumProposal {
2245 block_header,
2246 view_number: view,
2247 justify_qc: justify_qc.clone(),
2248 upgrade_certificate: None,
2249 proposal_certificate: None,
2250 };
2251
2252 let quorum_proposal_signature =
2253 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2254 .expect("Failed to sign quorum proposal");
2255
2256 let proposal = Proposal {
2257 data: quorum_proposal.clone(),
2258 signature: quorum_proposal_signature,
2259 _pd: PhantomData,
2260 };
2261
2262 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
2263 leaf.fill_block_payload::<TestVersions>(
2264 payload,
2265 GENESIS_VID_NUM_STORAGE_NODES,
2266 <TestVersions as Versions>::Base::VERSION,
2267 )
2268 .unwrap();
2269
2270 let mut inner = storage.inner.write().await;
2271
2272 tracing::debug!("inserting decided leaves");
2273 let file_path = decided_leaves_path
2274 .join(view.to_string())
2275 .with_extension("txt");
2276
2277 tracing::debug!("inserting decided leaves");
2278
2279 inner
2280 .replace(
2281 &file_path,
2282 |_| Ok(true),
2283 |mut file| {
2284 let bytes = bincode::serialize(&(&leaf.clone(), justify_qc))?;
2285 file.write_all(&bytes)?;
2286 Ok(())
2287 },
2288 )
2289 .expect("replace decided leaves");
2290
2291 let file_path = qp_dir_path.join(view.to_string()).with_extension("txt");
2292
2293 tracing::debug!("inserting qc for {view}");
2294
2295 inner
2296 .replace(
2297 &file_path,
2298 |_| Ok(true),
2299 |mut file| {
2300 let proposal_bytes =
2301 bincode::serialize(&proposal).context("serialize proposal")?;
2302
2303 file.write_all(&proposal_bytes)?;
2304 Ok(())
2305 },
2306 )
2307 .unwrap();
2308
2309 drop(inner);
2310 let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
2311 .disperse(payload_bytes.clone())
2312 .unwrap();
2313
2314 let vid = ADVZDisperseShare::<SeqTypes> {
2315 view_number: ViewNumber::new(i),
2316 payload_commitment: Default::default(),
2317 share: disperse.shares[0].clone(),
2318 common: disperse.common,
2319 recipient_key: pubkey,
2320 };
2321
2322 let (payload, metadata) =
2323 Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
2324 .await
2325 .unwrap();
2326
2327 let da = DaProposal::<SeqTypes> {
2328 encoded_transactions: payload.encode(),
2329 metadata,
2330 view_number: ViewNumber::new(i),
2331 };
2332
2333 let block_payload_signature =
2334 BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
2335
2336 let da_proposal = Proposal {
2337 data: da,
2338 signature: block_payload_signature,
2339 _pd: Default::default(),
2340 };
2341
2342 tracing::debug!("inserting vid for {view}");
2343 storage
2344 .append_vid(&vid.to_proposal(&privkey).unwrap())
2345 .await
2346 .unwrap();
2347
2348 tracing::debug!("inserting da for {view}");
2349 storage
2350 .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
2351 .await
2352 .unwrap();
2353 }
2354
2355 storage.migrate_consensus().await.unwrap();
2356 let inner = storage.inner.read().await;
2357 let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2358 let decided_leaves_count = decided_leaves
2359 .filter_map(Result::ok)
2360 .filter(|e| e.path().is_file())
2361 .count();
2362 assert_eq!(
2363 decided_leaves_count, rows as usize,
2364 "decided leaves count does not match",
2365 );
2366
2367 let da_proposals = fs::read_dir(inner.da2_dir_path()).unwrap();
2368 let da_proposals_count = da_proposals
2369 .filter_map(Result::ok)
2370 .filter(|e| e.path().is_file())
2371 .count();
2372 assert_eq!(
2373 da_proposals_count, rows as usize,
2374 "da proposals does not match",
2375 );
2376
2377 let vids = fs::read_dir(inner.vid2_dir_path()).unwrap();
2378 let vids_count = vids
2379 .filter_map(Result::ok)
2380 .filter(|e| e.path().is_file())
2381 .count();
2382 assert_eq!(vids_count, rows as usize, "vid shares count does not match",);
2383
2384 let qps = fs::read_dir(inner.quorum_proposals2_dir_path()).unwrap();
2385 let qps_count = qps
2386 .filter_map(Result::ok)
2387 .filter(|e| e.path().is_file())
2388 .count();
2389 assert_eq!(
2390 qps_count, rows as usize,
2391 "quorum proposals count does not match",
2392 );
2393
2394 let state_certs = fs::read_dir(inner.state_cert_dir_path()).unwrap();
2395 let state_cert_count = state_certs
2396 .filter_map(Result::ok)
2397 .filter(|e| e.path().is_file())
2398 .count();
2399 assert_eq!(
2400 state_cert_count, rows as usize,
2401 "light client state update certificate count does not match",
2402 );
2403
2404 let storage = opt.create().await.unwrap();
2408 storage.migrate_consensus().await.unwrap();
2409
2410 let inner = storage.inner.read().await;
2411 let decided_leaves = fs::read_dir(inner.decided_leaf2_path()).unwrap();
2412 let decided_leaves_count = decided_leaves
2413 .filter_map(Result::ok)
2414 .filter(|e| e.path().is_file())
2415 .count();
2416 assert_eq!(
2417 decided_leaves_count, rows as usize,
2418 "decided leaves count does not match",
2419 );
2420 }
2421
2422 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2423 async fn test_load_quorum_proposals_invalid_extension() {
2424 let tmp = Persistence::tmp_storage().await;
2425 let storage = Persistence::connect(&tmp).await;
2426
2427 let leaf = Leaf2::genesis::<MockVersions>(&Default::default(), &NodeState::mock()).await;
2429 let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2430 let signature = PubKey::sign(&privkey, &[]).unwrap();
2431 let mut quorum_proposal = Proposal {
2432 data: QuorumProposalWrapper::<SeqTypes> {
2433 proposal: QuorumProposal2::<SeqTypes> {
2434 epoch: None,
2435 block_header: leaf.block_header().clone(),
2436 view_number: ViewNumber::genesis(),
2437 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
2438 &Default::default(),
2439 &NodeState::mock(),
2440 )
2441 .await,
2442 upgrade_certificate: None,
2443 view_change_evidence: None,
2444 next_drb_result: None,
2445 next_epoch_justify_qc: None,
2446 state_cert: None,
2447 },
2448 },
2449 signature,
2450 _pd: Default::default(),
2451 };
2452
2453 let quorum_proposal1 = quorum_proposal.clone();
2455 storage
2456 .append_quorum_proposal2(&quorum_proposal1)
2457 .await
2458 .unwrap();
2459 quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
2460 let quorum_proposal2 = quorum_proposal.clone();
2461 storage
2462 .append_quorum_proposal2(&quorum_proposal2)
2463 .await
2464 .unwrap();
2465
2466 fs::rename(
2469 tmp.path().join("quorum_proposals2/1.txt"),
2470 tmp.path().join("quorum_proposals2/1.swp"),
2471 )
2472 .unwrap();
2473
2474 assert_eq!(
2476 storage.load_quorum_proposals().await.unwrap(),
2477 [(ViewNumber::genesis(), quorum_proposal1)]
2478 .into_iter()
2479 .collect::<BTreeMap<_, _>>()
2480 );
2481 }
2482
2483 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2484 async fn test_load_quorum_proposals_malformed_data() {
2485 let tmp = Persistence::tmp_storage().await;
2486 let storage = Persistence::connect(&tmp).await;
2487
2488 let leaf: Leaf2 = Leaf::genesis::<MockVersions>(&Default::default(), &NodeState::mock())
2490 .await
2491 .into();
2492 let privkey = PubKey::generated_from_seed_indexed([0; 32], 1).1;
2493 let signature = PubKey::sign(&privkey, &[]).unwrap();
2494 let quorum_proposal = Proposal {
2495 data: QuorumProposalWrapper::<SeqTypes> {
2496 proposal: QuorumProposal2::<SeqTypes> {
2497 epoch: None,
2498 block_header: leaf.block_header().clone(),
2499 view_number: ViewNumber::new(1),
2500 justify_qc: QuorumCertificate2::genesis::<TestVersions>(
2501 &Default::default(),
2502 &NodeState::mock(),
2503 )
2504 .await,
2505 upgrade_certificate: None,
2506 view_change_evidence: None,
2507 next_drb_result: None,
2508 next_epoch_justify_qc: None,
2509 state_cert: None,
2510 },
2511 },
2512 signature,
2513 _pd: Default::default(),
2514 };
2515
2516 fs::create_dir_all(tmp.path().join("quorum_proposals2")).unwrap();
2518 fs::write(
2519 tmp.path().join("quorum_proposals2/0.txt"),
2520 "invalid data".as_bytes(),
2521 )
2522 .unwrap();
2523
2524 storage
2526 .append_quorum_proposal2(&quorum_proposal)
2527 .await
2528 .unwrap();
2529
2530 assert_eq!(
2532 storage.load_quorum_proposals().await.unwrap(),
2533 [(ViewNumber::new(1), quorum_proposal)]
2534 .into_iter()
2535 .collect::<BTreeMap<_, _>>()
2536 );
2537 }
2538}