hotshot_example_types/
storage_types.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    sync::{
10        Arc,
11        atomic::{AtomicBool, Ordering},
12    },
13};
14
15use anyhow::{Result, anyhow, bail};
16use async_lock::RwLock;
17use async_trait::async_trait;
18use hotshot_types::{
19    data::{
20        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
21        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
22    },
23    drb::{DrbInput, DrbResult},
24    event::HotShotAction,
25    message::{Proposal, convert_proposal},
26    simple_certificate::{
27        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
28        UpgradeCertificate,
29    },
30    traits::{node_implementation::NodeType, storage::Storage},
31    vote::HasViewNumber,
32};
33
34use crate::testable_delay::{DelayConfig, SupportedTraitTypesForAsyncDelay, TestableDelay};
35
36type VidShares<TYPES> = BTreeMap<
37    ViewNumber,
38    HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare<TYPES>>>,
39>;
40#[derive(Clone, Debug)]
41pub struct TestStorageState<TYPES: NodeType> {
42    vids: VidShares<TYPES>,
43    das: HashMap<ViewNumber, Proposal<TYPES, DaProposal<TYPES>>>,
44    da2s: HashMap<ViewNumber, Proposal<TYPES, DaProposal2<TYPES>>>,
45    pub proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposal<TYPES>>>,
46    pub proposals2: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposal2<TYPES>>>,
47    pub proposals_wrapper: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
48    high_qc: Option<hotshot_types::simple_certificate::QuorumCertificate<TYPES>>,
49    high_qc2: Option<hotshot_types::simple_certificate::QuorumCertificate2<TYPES>>,
50    eqc: Option<(
51        hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
52        hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>,
53    )>,
54    next_epoch_high_qc2:
55        Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
56    action: ViewNumber,
57    epoch: Option<EpochNumber>,
58    state_certs: BTreeMap<EpochNumber, LightClientStateUpdateCertificateV2<TYPES>>,
59    drb_results: BTreeMap<EpochNumber, DrbResult>,
60    drb_inputs: BTreeMap<u64, DrbInput>,
61    epoch_roots: BTreeMap<EpochNumber, TYPES::BlockHeader>,
62    restart_view: ViewNumber,
63}
64
65impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
66    fn default() -> Self {
67        Self {
68            vids: BTreeMap::new(),
69            das: HashMap::new(),
70            da2s: HashMap::new(),
71            proposals: BTreeMap::new(),
72            proposals2: BTreeMap::new(),
73            proposals_wrapper: BTreeMap::new(),
74            high_qc: None,
75            high_qc2: None,
76            eqc: None,
77            next_epoch_high_qc2: None,
78            action: ViewNumber::genesis(),
79            epoch: None,
80            state_certs: BTreeMap::new(),
81            drb_results: BTreeMap::new(),
82            drb_inputs: BTreeMap::new(),
83            epoch_roots: BTreeMap::new(),
84            restart_view: ViewNumber::genesis(),
85        }
86    }
87}
88
89#[derive(Clone, Debug)]
90pub struct TestStorage<TYPES: NodeType> {
91    pub inner: Arc<RwLock<TestStorageState<TYPES>>>,
92    /// `should_return_err` is a testing utility to validate negative cases.
93    pub should_return_err: Arc<AtomicBool>,
94    pub delay_config: DelayConfig,
95    pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
96}
97
98impl<TYPES: NodeType> Default for TestStorage<TYPES> {
99    fn default() -> Self {
100        Self {
101            inner: Arc::new(RwLock::new(TestStorageState::default())),
102            should_return_err: Arc::new(AtomicBool::new(false)),
103            delay_config: DelayConfig::default(),
104            decided_upgrade_certificate: Arc::new(RwLock::new(None)),
105        }
106    }
107}
108
109#[async_trait]
110impl<TYPES: NodeType> TestableDelay for TestStorage<TYPES> {
111    async fn run_delay_settings_from_config(delay_config: &DelayConfig) {
112        if let Some(settings) = delay_config.get_setting(&SupportedTraitTypesForAsyncDelay::Storage)
113        {
114            Self::handle_async_delay(settings).await;
115        }
116    }
117}
118
119impl<TYPES: NodeType> TestStorage<TYPES> {
120    pub async fn proposals_cloned(
121        &self,
122    ) -> BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
123        self.inner.read().await.proposals_wrapper.clone()
124    }
125
126    pub async fn high_qc_cloned(&self) -> Option<QuorumCertificate2<TYPES>> {
127        self.inner.read().await.high_qc2.clone()
128    }
129
130    pub async fn next_epoch_high_qc_cloned(&self) -> Option<NextEpochQuorumCertificate2<TYPES>> {
131        self.inner.read().await.next_epoch_high_qc2.clone()
132    }
133
134    pub async fn decided_upgrade_certificate(&self) -> Option<UpgradeCertificate<TYPES>> {
135        self.decided_upgrade_certificate.read().await.clone()
136    }
137
138    pub async fn last_actioned_view(&self) -> ViewNumber {
139        self.inner.read().await.action
140    }
141
142    pub async fn restart_view(&self) -> ViewNumber {
143        self.inner.read().await.restart_view
144    }
145
146    pub async fn last_actioned_epoch(&self) -> Option<EpochNumber> {
147        self.inner.read().await.epoch
148    }
149    pub async fn vids_cloned(&self) -> VidShares<TYPES> {
150        self.inner.read().await.vids.clone()
151    }
152
153    pub async fn state_cert_cloned(&self) -> Option<LightClientStateUpdateCertificateV2<TYPES>> {
154        self.inner
155            .read()
156            .await
157            .state_certs
158            .iter()
159            .next_back()
160            .map(|(_, cert)| cert.clone())
161    }
162}
163
164#[async_trait]
165impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {
166    async fn append_vid(&self, proposal: &Proposal<TYPES, VidDisperseShare<TYPES>>) -> Result<()> {
167        if self.should_return_err.load(Ordering::Relaxed) {
168            bail!("Failed to append VID proposal to storage");
169        }
170        Self::run_delay_settings_from_config(&self.delay_config).await;
171        let mut inner = self.inner.write().await;
172        inner
173            .vids
174            .entry(proposal.data.view_number())
175            .or_default()
176            .insert(proposal.data.recipient_key().clone(), proposal.clone());
177        Ok(())
178    }
179
180    async fn append_da(
181        &self,
182        proposal: &Proposal<TYPES, DaProposal<TYPES>>,
183        _vid_commit: VidCommitment,
184    ) -> Result<()> {
185        if self.should_return_err.load(Ordering::Relaxed) {
186            bail!("Failed to append DA proposal to storage");
187        }
188        Self::run_delay_settings_from_config(&self.delay_config).await;
189        let mut inner = self.inner.write().await;
190        inner
191            .das
192            .insert(proposal.data.view_number, proposal.clone());
193        Ok(())
194    }
195
196    async fn append_da2(
197        &self,
198        proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
199        _vid_commit: VidCommitment,
200    ) -> Result<()> {
201        if self.should_return_err.load(Ordering::Relaxed) {
202            bail!("Failed to append DA proposal (2) to storage");
203        }
204        Self::run_delay_settings_from_config(&self.delay_config).await;
205        let mut inner = self.inner.write().await;
206        inner
207            .da2s
208            .insert(proposal.data.view_number, proposal.clone());
209        Ok(())
210    }
211
212    async fn append_proposal(
213        &self,
214        proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
215    ) -> Result<()> {
216        if self.should_return_err.load(Ordering::Relaxed) {
217            bail!("Failed to append Quorum proposal (1) to storage");
218        }
219        Self::run_delay_settings_from_config(&self.delay_config).await;
220        let mut inner = self.inner.write().await;
221        inner
222            .proposals
223            .insert(proposal.data.view_number, proposal.clone());
224        Ok(())
225    }
226
227    async fn append_proposal2(
228        &self,
229        proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
230    ) -> Result<()> {
231        if self.should_return_err.load(Ordering::Relaxed) {
232            bail!("Failed to append Quorum proposal (2) to storage");
233        }
234        Self::run_delay_settings_from_config(&self.delay_config).await;
235        let mut inner = self.inner.write().await;
236        inner
237            .proposals2
238            .insert(proposal.data.view_number, proposal.clone());
239        Ok(())
240    }
241
242    async fn append_proposal_wrapper(
243        &self,
244        proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
245    ) -> Result<()> {
246        if self.should_return_err.load(Ordering::Relaxed) {
247            bail!("Failed to append Quorum proposal (wrapped) to storage");
248        }
249        Self::run_delay_settings_from_config(&self.delay_config).await;
250        let mut inner = self.inner.write().await;
251        inner
252            .proposals_wrapper
253            .insert(proposal.data.view_number(), proposal.clone());
254        Ok(())
255    }
256
257    async fn record_action(
258        &self,
259        view: ViewNumber,
260        epoch: Option<EpochNumber>,
261        action: hotshot_types::event::HotShotAction,
262    ) -> Result<()> {
263        if self.should_return_err.load(Ordering::Relaxed) {
264            bail!("Failed to append Action to storage");
265        }
266        let mut inner = self.inner.write().await;
267        if matches!(
268            action,
269            HotShotAction::Vote | HotShotAction::Propose | HotShotAction::TimeoutVote
270        ) {
271            if view > inner.action {
272                inner.action = view;
273            }
274            if epoch > inner.epoch {
275                inner.epoch = epoch;
276            }
277        }
278        if matches!(action, HotShotAction::Vote) {
279            inner.restart_view = view + 1;
280        }
281        Self::run_delay_settings_from_config(&self.delay_config).await;
282        Ok(())
283    }
284
285    async fn update_high_qc(
286        &self,
287        new_high_qc: hotshot_types::simple_certificate::QuorumCertificate<TYPES>,
288    ) -> Result<()> {
289        if self.should_return_err.load(Ordering::Relaxed) {
290            bail!("Failed to update high qc to storage");
291        }
292        Self::run_delay_settings_from_config(&self.delay_config).await;
293        let mut inner = self.inner.write().await;
294        if let Some(ref current_high_qc) = inner.high_qc {
295            if new_high_qc.view_number() > current_high_qc.view_number() {
296                inner.high_qc = Some(new_high_qc);
297            }
298        } else {
299            inner.high_qc = Some(new_high_qc);
300        }
301        Ok(())
302    }
303
304    /// Update the current high QC in storage.
305    async fn update_eqc(
306        &self,
307        high_qc: QuorumCertificate2<TYPES>,
308        next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
309    ) -> Result<()> {
310        if self.should_return_err.load(Ordering::Relaxed) {
311            bail!("Failed to update eqc in storage");
312        }
313        Self::run_delay_settings_from_config(&self.delay_config).await;
314        let mut inner = self.inner.write().await;
315        if let Some((ref current_high_qc, _)) = inner.eqc {
316            if high_qc.view_number() > current_high_qc.view_number() {
317                inner.eqc = Some((high_qc, next_epoch_high_qc));
318            }
319        } else {
320            inner.eqc = Some((high_qc, next_epoch_high_qc));
321        }
322        Ok(())
323    }
324
325    async fn update_high_qc2(
326        &self,
327        new_high_qc: hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
328    ) -> Result<()> {
329        if self.should_return_err.load(Ordering::Relaxed) {
330            bail!("Failed to update high qc to storage");
331        }
332        Self::run_delay_settings_from_config(&self.delay_config).await;
333        let mut inner = self.inner.write().await;
334        if let Some(ref current_high_qc) = inner.high_qc2 {
335            if new_high_qc.view_number() > current_high_qc.view_number() {
336                inner.high_qc2 = Some(new_high_qc);
337            }
338        } else {
339            inner.high_qc2 = Some(new_high_qc);
340        }
341        Ok(())
342    }
343
344    async fn update_state_cert(
345        &self,
346        state_cert: LightClientStateUpdateCertificateV2<TYPES>,
347    ) -> Result<()> {
348        if self.should_return_err.load(Ordering::Relaxed) {
349            bail!("Failed to update state_cert to storage");
350        }
351        Self::run_delay_settings_from_config(&self.delay_config).await;
352        self.inner
353            .write()
354            .await
355            .state_certs
356            .insert(state_cert.epoch, state_cert);
357        Ok(())
358    }
359
360    async fn update_next_epoch_high_qc2(
361        &self,
362        new_next_epoch_high_qc: hotshot_types::simple_certificate::NextEpochQuorumCertificate2<
363            TYPES,
364        >,
365    ) -> Result<()> {
366        if self.should_return_err.load(Ordering::Relaxed) {
367            bail!("Failed to update next epoch high qc to storage");
368        }
369        Self::run_delay_settings_from_config(&self.delay_config).await;
370        let mut inner = self.inner.write().await;
371        if let Some(ref current_next_epoch_high_qc) = inner.next_epoch_high_qc2 {
372            if new_next_epoch_high_qc.view_number() > current_next_epoch_high_qc.view_number() {
373                inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
374            }
375        } else {
376            inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
377        }
378        Ok(())
379    }
380
381    async fn update_decided_upgrade_certificate(
382        &self,
383        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
384    ) -> Result<()> {
385        *self.decided_upgrade_certificate.write().await = decided_upgrade_certificate;
386
387        Ok(())
388    }
389
390    async fn migrate_storage(&self) -> Result<()> {
391        let mut storage_writer = self.inner.write().await;
392
393        for (view, proposal) in storage_writer.proposals.clone().iter() {
394            storage_writer
395                .proposals2
396                .insert(*view, convert_proposal(proposal.clone()));
397        }
398
399        Ok(())
400    }
401
402    async fn store_drb_result(&self, epoch: EpochNumber, drb_result: DrbResult) -> Result<()> {
403        let mut inner = self.inner.write().await;
404
405        inner.drb_results.insert(epoch, drb_result);
406
407        Ok(())
408    }
409
410    async fn store_epoch_root(
411        &self,
412        epoch: EpochNumber,
413        block_header: TYPES::BlockHeader,
414    ) -> Result<()> {
415        let mut inner = self.inner.write().await;
416
417        inner.epoch_roots.insert(epoch, block_header);
418
419        Ok(())
420    }
421
422    async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()> {
423        let mut inner = self.inner.write().await;
424
425        inner.drb_inputs.insert(drb_input.epoch, drb_input);
426
427        Ok(())
428    }
429
430    async fn load_drb_input(&self, epoch: u64) -> Result<DrbInput> {
431        let inner = self.inner.read().await;
432
433        match inner.drb_inputs.get(&epoch) {
434            Some(drb_input) => Ok(drb_input.clone()),
435            None => Err(anyhow!("Missing DrbInput for epoch {}", epoch)),
436        }
437    }
438}