hotshot_example_types/
storage_types.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13};
14
15use anyhow::{anyhow, bail, Result};
16use async_lock::RwLock;
17use async_trait::async_trait;
18use hotshot_types::{
19    data::{
20        DaProposal, DaProposal2, QuorumProposal, QuorumProposal2, QuorumProposalWrapper,
21        VidCommitment, VidDisperseShare,
22    },
23    drb::{DrbInput, DrbResult},
24    event::HotShotAction,
25    message::{convert_proposal, Proposal},
26    simple_certificate::{
27        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
28        UpgradeCertificate,
29    },
30    traits::{
31        node_implementation::{ConsensusTime, NodeType},
32        storage::Storage,
33    },
34    vote::HasViewNumber,
35};
36
37use crate::testable_delay::{DelayConfig, SupportedTraitTypesForAsyncDelay, TestableDelay};
38
39type VidShares<TYPES> = BTreeMap<
40    <TYPES as NodeType>::View,
41    HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare<TYPES>>>,
42>;
43#[derive(Clone, Debug)]
44pub struct TestStorageState<TYPES: NodeType> {
45    vids: VidShares<TYPES>,
46    das: HashMap<TYPES::View, Proposal<TYPES, DaProposal<TYPES>>>,
47    da2s: HashMap<TYPES::View, Proposal<TYPES, DaProposal2<TYPES>>>,
48    pub proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal<TYPES>>>,
49    pub proposals2: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
50    pub proposals_wrapper: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
51    high_qc: Option<hotshot_types::simple_certificate::QuorumCertificate<TYPES>>,
52    high_qc2: Option<hotshot_types::simple_certificate::QuorumCertificate2<TYPES>>,
53    eqc: Option<(
54        hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
55        hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>,
56    )>,
57    next_epoch_high_qc2:
58        Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
59    action: TYPES::View,
60    epoch: Option<TYPES::Epoch>,
61    state_certs: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificateV2<TYPES>>,
62    drb_results: BTreeMap<TYPES::Epoch, DrbResult>,
63    drb_inputs: BTreeMap<u64, DrbInput>,
64    epoch_roots: BTreeMap<TYPES::Epoch, TYPES::BlockHeader>,
65    restart_view: TYPES::View,
66}
67
68impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
69    fn default() -> Self {
70        Self {
71            vids: BTreeMap::new(),
72            das: HashMap::new(),
73            da2s: HashMap::new(),
74            proposals: BTreeMap::new(),
75            proposals2: BTreeMap::new(),
76            proposals_wrapper: BTreeMap::new(),
77            high_qc: None,
78            high_qc2: None,
79            eqc: None,
80            next_epoch_high_qc2: None,
81            action: TYPES::View::genesis(),
82            epoch: None,
83            state_certs: BTreeMap::new(),
84            drb_results: BTreeMap::new(),
85            drb_inputs: BTreeMap::new(),
86            epoch_roots: BTreeMap::new(),
87            restart_view: TYPES::View::genesis(),
88        }
89    }
90}
91
92#[derive(Clone, Debug)]
93pub struct TestStorage<TYPES: NodeType> {
94    pub inner: Arc<RwLock<TestStorageState<TYPES>>>,
95    /// `should_return_err` is a testing utility to validate negative cases.
96    pub should_return_err: Arc<AtomicBool>,
97    pub delay_config: DelayConfig,
98    pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
99}
100
101impl<TYPES: NodeType> Default for TestStorage<TYPES> {
102    fn default() -> Self {
103        Self {
104            inner: Arc::new(RwLock::new(TestStorageState::default())),
105            should_return_err: Arc::new(AtomicBool::new(false)),
106            delay_config: DelayConfig::default(),
107            decided_upgrade_certificate: Arc::new(RwLock::new(None)),
108        }
109    }
110}
111
112#[async_trait]
113impl<TYPES: NodeType> TestableDelay for TestStorage<TYPES> {
114    async fn run_delay_settings_from_config(delay_config: &DelayConfig) {
115        if let Some(settings) = delay_config.get_setting(&SupportedTraitTypesForAsyncDelay::Storage)
116        {
117            Self::handle_async_delay(settings).await;
118        }
119    }
120}
121
122impl<TYPES: NodeType> TestStorage<TYPES> {
123    pub async fn proposals_cloned(
124        &self,
125    ) -> BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
126        self.inner.read().await.proposals_wrapper.clone()
127    }
128
129    pub async fn high_qc_cloned(&self) -> Option<QuorumCertificate2<TYPES>> {
130        self.inner.read().await.high_qc2.clone()
131    }
132
133    pub async fn next_epoch_high_qc_cloned(&self) -> Option<NextEpochQuorumCertificate2<TYPES>> {
134        self.inner.read().await.next_epoch_high_qc2.clone()
135    }
136
137    pub async fn decided_upgrade_certificate(&self) -> Option<UpgradeCertificate<TYPES>> {
138        self.decided_upgrade_certificate.read().await.clone()
139    }
140
141    pub async fn last_actioned_view(&self) -> TYPES::View {
142        self.inner.read().await.action
143    }
144
145    pub async fn restart_view(&self) -> TYPES::View {
146        self.inner.read().await.restart_view
147    }
148
149    pub async fn last_actioned_epoch(&self) -> Option<TYPES::Epoch> {
150        self.inner.read().await.epoch
151    }
152    pub async fn vids_cloned(&self) -> VidShares<TYPES> {
153        self.inner.read().await.vids.clone()
154    }
155
156    pub async fn state_cert_cloned(&self) -> Option<LightClientStateUpdateCertificateV2<TYPES>> {
157        self.inner
158            .read()
159            .await
160            .state_certs
161            .iter()
162            .next_back()
163            .map(|(_, cert)| cert.clone())
164    }
165}
166
167#[async_trait]
168impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {
169    async fn append_vid(&self, proposal: &Proposal<TYPES, VidDisperseShare<TYPES>>) -> Result<()> {
170        if self.should_return_err.load(Ordering::Relaxed) {
171            bail!("Failed to append VID proposal to storage");
172        }
173        Self::run_delay_settings_from_config(&self.delay_config).await;
174        let mut inner = self.inner.write().await;
175        inner
176            .vids
177            .entry(proposal.data.view_number())
178            .or_default()
179            .insert(proposal.data.recipient_key().clone(), proposal.clone());
180        Ok(())
181    }
182
183    async fn append_da(
184        &self,
185        proposal: &Proposal<TYPES, DaProposal<TYPES>>,
186        _vid_commit: VidCommitment,
187    ) -> Result<()> {
188        if self.should_return_err.load(Ordering::Relaxed) {
189            bail!("Failed to append DA proposal to storage");
190        }
191        Self::run_delay_settings_from_config(&self.delay_config).await;
192        let mut inner = self.inner.write().await;
193        inner
194            .das
195            .insert(proposal.data.view_number, proposal.clone());
196        Ok(())
197    }
198
199    async fn append_da2(
200        &self,
201        proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
202        _vid_commit: VidCommitment,
203    ) -> Result<()> {
204        if self.should_return_err.load(Ordering::Relaxed) {
205            bail!("Failed to append DA proposal (2) to storage");
206        }
207        Self::run_delay_settings_from_config(&self.delay_config).await;
208        let mut inner = self.inner.write().await;
209        inner
210            .da2s
211            .insert(proposal.data.view_number, proposal.clone());
212        Ok(())
213    }
214
215    async fn append_proposal(
216        &self,
217        proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
218    ) -> Result<()> {
219        if self.should_return_err.load(Ordering::Relaxed) {
220            bail!("Failed to append Quorum proposal (1) to storage");
221        }
222        Self::run_delay_settings_from_config(&self.delay_config).await;
223        let mut inner = self.inner.write().await;
224        inner
225            .proposals
226            .insert(proposal.data.view_number, proposal.clone());
227        Ok(())
228    }
229
230    async fn append_proposal2(
231        &self,
232        proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
233    ) -> Result<()> {
234        if self.should_return_err.load(Ordering::Relaxed) {
235            bail!("Failed to append Quorum proposal (2) to storage");
236        }
237        Self::run_delay_settings_from_config(&self.delay_config).await;
238        let mut inner = self.inner.write().await;
239        inner
240            .proposals2
241            .insert(proposal.data.view_number, proposal.clone());
242        Ok(())
243    }
244
245    async fn append_proposal_wrapper(
246        &self,
247        proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
248    ) -> Result<()> {
249        if self.should_return_err.load(Ordering::Relaxed) {
250            bail!("Failed to append Quorum proposal (wrapped) to storage");
251        }
252        Self::run_delay_settings_from_config(&self.delay_config).await;
253        let mut inner = self.inner.write().await;
254        inner
255            .proposals_wrapper
256            .insert(proposal.data.view_number(), proposal.clone());
257        Ok(())
258    }
259
260    async fn record_action(
261        &self,
262        view: <TYPES as NodeType>::View,
263        epoch: Option<TYPES::Epoch>,
264        action: hotshot_types::event::HotShotAction,
265    ) -> Result<()> {
266        if self.should_return_err.load(Ordering::Relaxed) {
267            bail!("Failed to append Action to storage");
268        }
269        let mut inner = self.inner.write().await;
270        if matches!(
271            action,
272            HotShotAction::Vote | HotShotAction::Propose | HotShotAction::TimeoutVote
273        ) {
274            if view > inner.action {
275                inner.action = view;
276            }
277            if epoch > inner.epoch {
278                inner.epoch = epoch;
279            }
280        }
281        if matches!(action, HotShotAction::Vote) {
282            inner.restart_view = view + 1;
283        }
284        Self::run_delay_settings_from_config(&self.delay_config).await;
285        Ok(())
286    }
287
288    async fn update_high_qc(
289        &self,
290        new_high_qc: hotshot_types::simple_certificate::QuorumCertificate<TYPES>,
291    ) -> Result<()> {
292        if self.should_return_err.load(Ordering::Relaxed) {
293            bail!("Failed to update high qc to storage");
294        }
295        Self::run_delay_settings_from_config(&self.delay_config).await;
296        let mut inner = self.inner.write().await;
297        if let Some(ref current_high_qc) = inner.high_qc {
298            if new_high_qc.view_number() > current_high_qc.view_number() {
299                inner.high_qc = Some(new_high_qc);
300            }
301        } else {
302            inner.high_qc = Some(new_high_qc);
303        }
304        Ok(())
305    }
306
307    /// Update the current high QC in storage.
308    async fn update_eqc(
309        &self,
310        high_qc: QuorumCertificate2<TYPES>,
311        next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
312    ) -> Result<()> {
313        if self.should_return_err.load(Ordering::Relaxed) {
314            bail!("Failed to update eqc in storage");
315        }
316        Self::run_delay_settings_from_config(&self.delay_config).await;
317        let mut inner = self.inner.write().await;
318        if let Some((ref current_high_qc, _)) = inner.eqc {
319            if high_qc.view_number() > current_high_qc.view_number() {
320                inner.eqc = Some((high_qc, next_epoch_high_qc));
321            }
322        } else {
323            inner.eqc = Some((high_qc, next_epoch_high_qc));
324        }
325        Ok(())
326    }
327
328    async fn update_high_qc2(
329        &self,
330        new_high_qc: hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
331    ) -> Result<()> {
332        if self.should_return_err.load(Ordering::Relaxed) {
333            bail!("Failed to update high qc to storage");
334        }
335        Self::run_delay_settings_from_config(&self.delay_config).await;
336        let mut inner = self.inner.write().await;
337        if let Some(ref current_high_qc) = inner.high_qc2 {
338            if new_high_qc.view_number() > current_high_qc.view_number() {
339                inner.high_qc2 = Some(new_high_qc);
340            }
341        } else {
342            inner.high_qc2 = Some(new_high_qc);
343        }
344        Ok(())
345    }
346
347    async fn update_state_cert(
348        &self,
349        state_cert: LightClientStateUpdateCertificateV2<TYPES>,
350    ) -> Result<()> {
351        if self.should_return_err.load(Ordering::Relaxed) {
352            bail!("Failed to update state_cert to storage");
353        }
354        Self::run_delay_settings_from_config(&self.delay_config).await;
355        self.inner
356            .write()
357            .await
358            .state_certs
359            .insert(state_cert.epoch, state_cert);
360        Ok(())
361    }
362
363    async fn update_next_epoch_high_qc2(
364        &self,
365        new_next_epoch_high_qc: hotshot_types::simple_certificate::NextEpochQuorumCertificate2<
366            TYPES,
367        >,
368    ) -> Result<()> {
369        if self.should_return_err.load(Ordering::Relaxed) {
370            bail!("Failed to update next epoch high qc to storage");
371        }
372        Self::run_delay_settings_from_config(&self.delay_config).await;
373        let mut inner = self.inner.write().await;
374        if let Some(ref current_next_epoch_high_qc) = inner.next_epoch_high_qc2 {
375            if new_next_epoch_high_qc.view_number() > current_next_epoch_high_qc.view_number() {
376                inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
377            }
378        } else {
379            inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
380        }
381        Ok(())
382    }
383
384    async fn update_decided_upgrade_certificate(
385        &self,
386        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
387    ) -> Result<()> {
388        *self.decided_upgrade_certificate.write().await = decided_upgrade_certificate;
389
390        Ok(())
391    }
392
393    async fn migrate_storage(&self) -> Result<()> {
394        let mut storage_writer = self.inner.write().await;
395
396        for (view, proposal) in storage_writer.proposals.clone().iter() {
397            storage_writer
398                .proposals2
399                .insert(*view, convert_proposal(proposal.clone()));
400        }
401
402        Ok(())
403    }
404
405    async fn store_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()> {
406        let mut inner = self.inner.write().await;
407
408        inner.drb_results.insert(epoch, drb_result);
409
410        Ok(())
411    }
412
413    async fn store_epoch_root(
414        &self,
415        epoch: TYPES::Epoch,
416        block_header: TYPES::BlockHeader,
417    ) -> Result<()> {
418        let mut inner = self.inner.write().await;
419
420        inner.epoch_roots.insert(epoch, block_header);
421
422        Ok(())
423    }
424
425    async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()> {
426        let mut inner = self.inner.write().await;
427
428        inner.drb_inputs.insert(drb_input.epoch, drb_input);
429
430        Ok(())
431    }
432
433    async fn load_drb_input(&self, epoch: u64) -> Result<DrbInput> {
434        let inner = self.inner.read().await;
435
436        match inner.drb_inputs.get(&epoch) {
437            Some(drb_input) => Ok(drb_input.clone()),
438            None => Err(anyhow!("Missing DrbInput for epoch {}", epoch)),
439        }
440    }
441}