hotshot_example_types/
storage_types.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use anyhow::{anyhow, bail, Result};
16use async_lock::RwLock;
17use async_trait::async_trait;
18use hotshot_types::{
19    data::{
20        vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
21        DaProposal, DaProposal2, QuorumProposal, QuorumProposal2, QuorumProposalWrapper,
22        VidCommitment,
23    },
24    drb::{DrbInput, DrbResult},
25    event::HotShotAction,
26    message::{convert_proposal, Proposal},
27    simple_certificate::{
28        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
29        UpgradeCertificate,
30    },
31    traits::{
32        node_implementation::{ConsensusTime, NodeType},
33        storage::Storage,
34    },
35    vote::HasViewNumber,
36};
37
38use crate::testable_delay::{DelayConfig, SupportedTraitTypesForAsyncDelay, TestableDelay};
39
40type VidShares<TYPES> = BTreeMap<
41    <TYPES as NodeType>::View,
42    HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, ADVZDisperseShare<TYPES>>>,
43>;
44type VidShares2<TYPES> = BTreeMap<
45    <TYPES as NodeType>::View,
46    HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare2<TYPES>>>,
47>;
48
49#[derive(Clone, Debug)]
50pub struct TestStorageState<TYPES: NodeType> {
51    vids: VidShares<TYPES>,
52    vid2: VidShares2<TYPES>,
53    das: HashMap<TYPES::View, Proposal<TYPES, DaProposal<TYPES>>>,
54    da2s: HashMap<TYPES::View, Proposal<TYPES, DaProposal2<TYPES>>>,
55    pub proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal<TYPES>>>,
56    pub proposals2: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
57    pub proposals_wrapper: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
58    high_qc: Option<hotshot_types::simple_certificate::QuorumCertificate<TYPES>>,
59    high_qc2: Option<hotshot_types::simple_certificate::QuorumCertificate2<TYPES>>,
60    eqc: Option<(
61        hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
62        hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>,
63    )>,
64    next_epoch_high_qc2:
65        Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
66    action: TYPES::View,
67    epoch: Option<TYPES::Epoch>,
68    state_certs: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificateV2<TYPES>>,
69    drb_results: BTreeMap<TYPES::Epoch, DrbResult>,
70    drb_inputs: BTreeMap<u64, DrbInput>,
71    epoch_roots: BTreeMap<TYPES::Epoch, TYPES::BlockHeader>,
72    restart_view: TYPES::View,
73}
74
75impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
76    fn default() -> Self {
77        Self {
78            vids: BTreeMap::new(),
79            vid2: BTreeMap::new(),
80            das: HashMap::new(),
81            da2s: HashMap::new(),
82            proposals: BTreeMap::new(),
83            proposals2: BTreeMap::new(),
84            proposals_wrapper: BTreeMap::new(),
85            high_qc: None,
86            high_qc2: None,
87            eqc: None,
88            next_epoch_high_qc2: None,
89            action: TYPES::View::genesis(),
90            epoch: None,
91            state_certs: BTreeMap::new(),
92            drb_results: BTreeMap::new(),
93            drb_inputs: BTreeMap::new(),
94            epoch_roots: BTreeMap::new(),
95            restart_view: TYPES::View::genesis(),
96        }
97    }
98}
99
100#[derive(Clone, Debug)]
101pub struct TestStorage<TYPES: NodeType> {
102    pub inner: Arc<RwLock<TestStorageState<TYPES>>>,
103    /// `should_return_err` is a testing utility to validate negative cases.
104    pub should_return_err: Arc<AtomicBool>,
105    pub delay_config: DelayConfig,
106    pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
107}
108
109impl<TYPES: NodeType> Default for TestStorage<TYPES> {
110    fn default() -> Self {
111        Self {
112            inner: Arc::new(RwLock::new(TestStorageState::default())),
113            should_return_err: Arc::new(AtomicBool::new(false)),
114            delay_config: DelayConfig::default(),
115            decided_upgrade_certificate: Arc::new(RwLock::new(None)),
116        }
117    }
118}
119
120#[async_trait]
121impl<TYPES: NodeType> TestableDelay for TestStorage<TYPES> {
122    async fn run_delay_settings_from_config(delay_config: &DelayConfig) {
123        if let Some(settings) = delay_config.get_setting(&SupportedTraitTypesForAsyncDelay::Storage)
124        {
125            Self::handle_async_delay(settings).await;
126        }
127    }
128}
129
130impl<TYPES: NodeType> TestStorage<TYPES> {
131    pub async fn proposals_cloned(
132        &self,
133    ) -> BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
134        self.inner.read().await.proposals_wrapper.clone()
135    }
136
137    pub async fn high_qc_cloned(&self) -> Option<QuorumCertificate2<TYPES>> {
138        self.inner.read().await.high_qc2.clone()
139    }
140
141    pub async fn next_epoch_high_qc_cloned(&self) -> Option<NextEpochQuorumCertificate2<TYPES>> {
142        self.inner.read().await.next_epoch_high_qc2.clone()
143    }
144
145    pub async fn decided_upgrade_certificate(&self) -> Option<UpgradeCertificate<TYPES>> {
146        self.decided_upgrade_certificate.read().await.clone()
147    }
148
149    pub async fn last_actioned_view(&self) -> TYPES::View {
150        self.inner.read().await.action
151    }
152
153    pub async fn restart_view(&self) -> TYPES::View {
154        self.inner.read().await.restart_view
155    }
156
157    pub async fn last_actioned_epoch(&self) -> Option<TYPES::Epoch> {
158        self.inner.read().await.epoch
159    }
160    pub async fn vids_cloned(&self) -> VidShares2<TYPES> {
161        self.inner.read().await.vid2.clone()
162    }
163
164    pub async fn state_cert_cloned(&self) -> Option<LightClientStateUpdateCertificateV2<TYPES>> {
165        self.inner
166            .read()
167            .await
168            .state_certs
169            .iter()
170            .next_back()
171            .map(|(_, cert)| cert.clone())
172    }
173}
174
175#[async_trait]
176impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {
177    async fn append_vid(&self, proposal: &Proposal<TYPES, ADVZDisperseShare<TYPES>>) -> Result<()> {
178        if self.should_return_err.load(Ordering::Relaxed) {
179            bail!("Failed to append VID proposal to storage");
180        }
181        Self::run_delay_settings_from_config(&self.delay_config).await;
182        let mut inner = self.inner.write().await;
183        inner
184            .vids
185            .entry(proposal.data.view_number)
186            .or_default()
187            .insert(proposal.data.recipient_key.clone(), proposal.clone());
188        Ok(())
189    }
190
191    async fn append_vid2(
192        &self,
193        proposal: &Proposal<TYPES, VidDisperseShare2<TYPES>>,
194    ) -> Result<()> {
195        if self.should_return_err.load(Ordering::Relaxed) {
196            bail!("Failed to append VID proposal to storage");
197        }
198        Self::run_delay_settings_from_config(&self.delay_config).await;
199        let mut inner = self.inner.write().await;
200        inner
201            .vid2
202            .entry(proposal.data.view_number)
203            .or_default()
204            .insert(proposal.data.recipient_key.clone(), proposal.clone());
205        Ok(())
206    }
207
208    async fn append_da(
209        &self,
210        proposal: &Proposal<TYPES, DaProposal<TYPES>>,
211        _vid_commit: VidCommitment,
212    ) -> Result<()> {
213        if self.should_return_err.load(Ordering::Relaxed) {
214            bail!("Failed to append DA proposal to storage");
215        }
216        Self::run_delay_settings_from_config(&self.delay_config).await;
217        let mut inner = self.inner.write().await;
218        inner
219            .das
220            .insert(proposal.data.view_number, proposal.clone());
221        Ok(())
222    }
223
224    async fn append_da2(
225        &self,
226        proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
227        _vid_commit: VidCommitment,
228    ) -> Result<()> {
229        if self.should_return_err.load(Ordering::Relaxed) {
230            bail!("Failed to append DA proposal (2) to storage");
231        }
232        Self::run_delay_settings_from_config(&self.delay_config).await;
233        let mut inner = self.inner.write().await;
234        inner
235            .da2s
236            .insert(proposal.data.view_number, proposal.clone());
237        Ok(())
238    }
239
240    async fn append_proposal(
241        &self,
242        proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
243    ) -> Result<()> {
244        if self.should_return_err.load(Ordering::Relaxed) {
245            bail!("Failed to append Quorum proposal (1) to storage");
246        }
247        Self::run_delay_settings_from_config(&self.delay_config).await;
248        let mut inner = self.inner.write().await;
249        inner
250            .proposals
251            .insert(proposal.data.view_number, proposal.clone());
252        Ok(())
253    }
254
255    async fn append_proposal2(
256        &self,
257        proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
258    ) -> Result<()> {
259        if self.should_return_err.load(Ordering::Relaxed) {
260            bail!("Failed to append Quorum proposal (2) to storage");
261        }
262        Self::run_delay_settings_from_config(&self.delay_config).await;
263        let mut inner = self.inner.write().await;
264        inner
265            .proposals2
266            .insert(proposal.data.view_number, proposal.clone());
267        Ok(())
268    }
269
270    async fn append_proposal_wrapper(
271        &self,
272        proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
273    ) -> Result<()> {
274        if self.should_return_err.load(Ordering::Relaxed) {
275            bail!("Failed to append Quorum proposal (wrapped) to storage");
276        }
277        Self::run_delay_settings_from_config(&self.delay_config).await;
278        let mut inner = self.inner.write().await;
279        inner
280            .proposals_wrapper
281            .insert(proposal.data.view_number(), proposal.clone());
282        Ok(())
283    }
284
285    async fn record_action(
286        &self,
287        view: <TYPES as NodeType>::View,
288        epoch: Option<TYPES::Epoch>,
289        action: hotshot_types::event::HotShotAction,
290    ) -> Result<()> {
291        if self.should_return_err.load(Ordering::Relaxed) {
292            bail!("Failed to append Action to storage");
293        }
294        let mut inner = self.inner.write().await;
295        if matches!(
296            action,
297            HotShotAction::Vote | HotShotAction::Propose | HotShotAction::TimeoutVote
298        ) {
299            if view > inner.action {
300                inner.action = view;
301            }
302            if epoch > inner.epoch {
303                inner.epoch = epoch;
304            }
305        }
306        if matches!(action, HotShotAction::Vote) {
307            inner.restart_view = view + 1;
308        }
309        Self::run_delay_settings_from_config(&self.delay_config).await;
310        Ok(())
311    }
312
313    async fn update_high_qc(
314        &self,
315        new_high_qc: hotshot_types::simple_certificate::QuorumCertificate<TYPES>,
316    ) -> Result<()> {
317        if self.should_return_err.load(Ordering::Relaxed) {
318            bail!("Failed to update high qc to storage");
319        }
320        Self::run_delay_settings_from_config(&self.delay_config).await;
321        let mut inner = self.inner.write().await;
322        if let Some(ref current_high_qc) = inner.high_qc {
323            if new_high_qc.view_number() > current_high_qc.view_number() {
324                inner.high_qc = Some(new_high_qc);
325            }
326        } else {
327            inner.high_qc = Some(new_high_qc);
328        }
329        Ok(())
330    }
331
332    /// Update the current high QC in storage.
333    async fn update_eqc(
334        &self,
335        high_qc: QuorumCertificate2<TYPES>,
336        next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
337    ) -> Result<()> {
338        if self.should_return_err.load(Ordering::Relaxed) {
339            bail!("Failed to update eqc in storage");
340        }
341        Self::run_delay_settings_from_config(&self.delay_config).await;
342        let mut inner = self.inner.write().await;
343        if let Some((ref current_high_qc, _)) = inner.eqc {
344            if high_qc.view_number() > current_high_qc.view_number() {
345                inner.eqc = Some((high_qc, next_epoch_high_qc));
346            }
347        } else {
348            inner.eqc = Some((high_qc, next_epoch_high_qc));
349        }
350        Ok(())
351    }
352
353    async fn update_high_qc2(
354        &self,
355        new_high_qc: hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
356    ) -> Result<()> {
357        if self.should_return_err.load(Ordering::Relaxed) {
358            bail!("Failed to update high qc to storage");
359        }
360        Self::run_delay_settings_from_config(&self.delay_config).await;
361        let mut inner = self.inner.write().await;
362        if let Some(ref current_high_qc) = inner.high_qc2 {
363            if new_high_qc.view_number() > current_high_qc.view_number() {
364                inner.high_qc2 = Some(new_high_qc);
365            }
366        } else {
367            inner.high_qc2 = Some(new_high_qc);
368        }
369        Ok(())
370    }
371
372    async fn update_state_cert(
373        &self,
374        state_cert: LightClientStateUpdateCertificateV2<TYPES>,
375    ) -> Result<()> {
376        if self.should_return_err.load(Ordering::Relaxed) {
377            bail!("Failed to update state_cert to storage");
378        }
379        Self::run_delay_settings_from_config(&self.delay_config).await;
380        self.inner
381            .write()
382            .await
383            .state_certs
384            .insert(state_cert.epoch, state_cert);
385        Ok(())
386    }
387
388    async fn update_next_epoch_high_qc2(
389        &self,
390        new_next_epoch_high_qc: hotshot_types::simple_certificate::NextEpochQuorumCertificate2<
391            TYPES,
392        >,
393    ) -> Result<()> {
394        if self.should_return_err.load(Ordering::Relaxed) {
395            bail!("Failed to update next epoch high qc to storage");
396        }
397        Self::run_delay_settings_from_config(&self.delay_config).await;
398        let mut inner = self.inner.write().await;
399        if let Some(ref current_next_epoch_high_qc) = inner.next_epoch_high_qc2 {
400            if new_next_epoch_high_qc.view_number() > current_next_epoch_high_qc.view_number() {
401                inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
402            }
403        } else {
404            inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
405        }
406        Ok(())
407    }
408
409    async fn update_decided_upgrade_certificate(
410        &self,
411        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
412    ) -> Result<()> {
413        *self.decided_upgrade_certificate.write().await = decided_upgrade_certificate;
414
415        Ok(())
416    }
417
418    async fn migrate_storage(&self) -> Result<()> {
419        let mut storage_writer = self.inner.write().await;
420
421        for (view, proposal) in storage_writer.proposals.clone().iter() {
422            storage_writer
423                .proposals2
424                .insert(*view, convert_proposal(proposal.clone()));
425        }
426
427        Ok(())
428    }
429
430    async fn store_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()> {
431        let mut inner = self.inner.write().await;
432
433        inner.drb_results.insert(epoch, drb_result);
434
435        Ok(())
436    }
437
438    async fn store_epoch_root(
439        &self,
440        epoch: TYPES::Epoch,
441        block_header: TYPES::BlockHeader,
442    ) -> Result<()> {
443        let mut inner = self.inner.write().await;
444
445        inner.epoch_roots.insert(epoch, block_header);
446
447        Ok(())
448    }
449
450    async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()> {
451        let mut inner = self.inner.write().await;
452
453        inner.drb_inputs.insert(drb_input.epoch, drb_input);
454
455        Ok(())
456    }
457
458    async fn load_drb_input(&self, epoch: u64) -> Result<DrbInput> {
459        let inner = self.inner.read().await;
460
461        match inner.drb_inputs.get(&epoch) {
462            Some(drb_input) => Ok(drb_input.clone()),
463            None => Err(anyhow!("Missing DrbInput for epoch {}", epoch)),
464        }
465    }
466}