1use std::{
8 collections::{BTreeMap, HashMap},
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use anyhow::{anyhow, bail, Result};
16use async_lock::RwLock;
17use async_trait::async_trait;
18use hotshot_types::{
19 data::{
20 DaProposal, DaProposal2, QuorumProposal, QuorumProposal2, QuorumProposalWrapper,
21 VidCommitment, VidDisperseShare,
22 },
23 drb::{DrbInput, DrbResult},
24 event::HotShotAction,
25 message::{convert_proposal, Proposal},
26 simple_certificate::{
27 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
28 UpgradeCertificate,
29 },
30 traits::{
31 node_implementation::{ConsensusTime, NodeType},
32 storage::Storage,
33 },
34 vote::HasViewNumber,
35};
36
37use crate::testable_delay::{DelayConfig, SupportedTraitTypesForAsyncDelay, TestableDelay};
38
39type VidShares<TYPES> = BTreeMap<
40 <TYPES as NodeType>::View,
41 HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare<TYPES>>>,
42>;
43#[derive(Clone, Debug)]
44pub struct TestStorageState<TYPES: NodeType> {
45 vids: VidShares<TYPES>,
46 das: HashMap<TYPES::View, Proposal<TYPES, DaProposal<TYPES>>>,
47 da2s: HashMap<TYPES::View, Proposal<TYPES, DaProposal2<TYPES>>>,
48 pub proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal<TYPES>>>,
49 pub proposals2: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
50 pub proposals_wrapper: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
51 high_qc: Option<hotshot_types::simple_certificate::QuorumCertificate<TYPES>>,
52 high_qc2: Option<hotshot_types::simple_certificate::QuorumCertificate2<TYPES>>,
53 eqc: Option<(
54 hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
55 hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>,
56 )>,
57 next_epoch_high_qc2:
58 Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
59 action: TYPES::View,
60 epoch: Option<TYPES::Epoch>,
61 state_certs: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificateV2<TYPES>>,
62 drb_results: BTreeMap<TYPES::Epoch, DrbResult>,
63 drb_inputs: BTreeMap<u64, DrbInput>,
64 epoch_roots: BTreeMap<TYPES::Epoch, TYPES::BlockHeader>,
65 restart_view: TYPES::View,
66}
67
68impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
69 fn default() -> Self {
70 Self {
71 vids: BTreeMap::new(),
72 das: HashMap::new(),
73 da2s: HashMap::new(),
74 proposals: BTreeMap::new(),
75 proposals2: BTreeMap::new(),
76 proposals_wrapper: BTreeMap::new(),
77 high_qc: None,
78 high_qc2: None,
79 eqc: None,
80 next_epoch_high_qc2: None,
81 action: TYPES::View::genesis(),
82 epoch: None,
83 state_certs: BTreeMap::new(),
84 drb_results: BTreeMap::new(),
85 drb_inputs: BTreeMap::new(),
86 epoch_roots: BTreeMap::new(),
87 restart_view: TYPES::View::genesis(),
88 }
89 }
90}
91
92#[derive(Clone, Debug)]
93pub struct TestStorage<TYPES: NodeType> {
94 pub inner: Arc<RwLock<TestStorageState<TYPES>>>,
95 pub should_return_err: Arc<AtomicBool>,
97 pub delay_config: DelayConfig,
98 pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
99}
100
101impl<TYPES: NodeType> Default for TestStorage<TYPES> {
102 fn default() -> Self {
103 Self {
104 inner: Arc::new(RwLock::new(TestStorageState::default())),
105 should_return_err: Arc::new(AtomicBool::new(false)),
106 delay_config: DelayConfig::default(),
107 decided_upgrade_certificate: Arc::new(RwLock::new(None)),
108 }
109 }
110}
111
112#[async_trait]
113impl<TYPES: NodeType> TestableDelay for TestStorage<TYPES> {
114 async fn run_delay_settings_from_config(delay_config: &DelayConfig) {
115 if let Some(settings) = delay_config.get_setting(&SupportedTraitTypesForAsyncDelay::Storage)
116 {
117 Self::handle_async_delay(settings).await;
118 }
119 }
120}
121
122impl<TYPES: NodeType> TestStorage<TYPES> {
123 pub async fn proposals_cloned(
124 &self,
125 ) -> BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
126 self.inner.read().await.proposals_wrapper.clone()
127 }
128
129 pub async fn high_qc_cloned(&self) -> Option<QuorumCertificate2<TYPES>> {
130 self.inner.read().await.high_qc2.clone()
131 }
132
133 pub async fn next_epoch_high_qc_cloned(&self) -> Option<NextEpochQuorumCertificate2<TYPES>> {
134 self.inner.read().await.next_epoch_high_qc2.clone()
135 }
136
137 pub async fn decided_upgrade_certificate(&self) -> Option<UpgradeCertificate<TYPES>> {
138 self.decided_upgrade_certificate.read().await.clone()
139 }
140
141 pub async fn last_actioned_view(&self) -> TYPES::View {
142 self.inner.read().await.action
143 }
144
145 pub async fn restart_view(&self) -> TYPES::View {
146 self.inner.read().await.restart_view
147 }
148
149 pub async fn last_actioned_epoch(&self) -> Option<TYPES::Epoch> {
150 self.inner.read().await.epoch
151 }
152 pub async fn vids_cloned(&self) -> VidShares<TYPES> {
153 self.inner.read().await.vids.clone()
154 }
155
156 pub async fn state_cert_cloned(&self) -> Option<LightClientStateUpdateCertificateV2<TYPES>> {
157 self.inner
158 .read()
159 .await
160 .state_certs
161 .iter()
162 .next_back()
163 .map(|(_, cert)| cert.clone())
164 }
165}
166
167#[async_trait]
168impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {
169 async fn append_vid(&self, proposal: &Proposal<TYPES, VidDisperseShare<TYPES>>) -> Result<()> {
170 if self.should_return_err.load(Ordering::Relaxed) {
171 bail!("Failed to append VID proposal to storage");
172 }
173 Self::run_delay_settings_from_config(&self.delay_config).await;
174 let mut inner = self.inner.write().await;
175 inner
176 .vids
177 .entry(proposal.data.view_number())
178 .or_default()
179 .insert(proposal.data.recipient_key().clone(), proposal.clone());
180 Ok(())
181 }
182
183 async fn append_da(
184 &self,
185 proposal: &Proposal<TYPES, DaProposal<TYPES>>,
186 _vid_commit: VidCommitment,
187 ) -> Result<()> {
188 if self.should_return_err.load(Ordering::Relaxed) {
189 bail!("Failed to append DA proposal to storage");
190 }
191 Self::run_delay_settings_from_config(&self.delay_config).await;
192 let mut inner = self.inner.write().await;
193 inner
194 .das
195 .insert(proposal.data.view_number, proposal.clone());
196 Ok(())
197 }
198
199 async fn append_da2(
200 &self,
201 proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
202 _vid_commit: VidCommitment,
203 ) -> Result<()> {
204 if self.should_return_err.load(Ordering::Relaxed) {
205 bail!("Failed to append DA proposal (2) to storage");
206 }
207 Self::run_delay_settings_from_config(&self.delay_config).await;
208 let mut inner = self.inner.write().await;
209 inner
210 .da2s
211 .insert(proposal.data.view_number, proposal.clone());
212 Ok(())
213 }
214
215 async fn append_proposal(
216 &self,
217 proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
218 ) -> Result<()> {
219 if self.should_return_err.load(Ordering::Relaxed) {
220 bail!("Failed to append Quorum proposal (1) to storage");
221 }
222 Self::run_delay_settings_from_config(&self.delay_config).await;
223 let mut inner = self.inner.write().await;
224 inner
225 .proposals
226 .insert(proposal.data.view_number, proposal.clone());
227 Ok(())
228 }
229
230 async fn append_proposal2(
231 &self,
232 proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
233 ) -> Result<()> {
234 if self.should_return_err.load(Ordering::Relaxed) {
235 bail!("Failed to append Quorum proposal (2) to storage");
236 }
237 Self::run_delay_settings_from_config(&self.delay_config).await;
238 let mut inner = self.inner.write().await;
239 inner
240 .proposals2
241 .insert(proposal.data.view_number, proposal.clone());
242 Ok(())
243 }
244
245 async fn append_proposal_wrapper(
246 &self,
247 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
248 ) -> Result<()> {
249 if self.should_return_err.load(Ordering::Relaxed) {
250 bail!("Failed to append Quorum proposal (wrapped) to storage");
251 }
252 Self::run_delay_settings_from_config(&self.delay_config).await;
253 let mut inner = self.inner.write().await;
254 inner
255 .proposals_wrapper
256 .insert(proposal.data.view_number(), proposal.clone());
257 Ok(())
258 }
259
260 async fn record_action(
261 &self,
262 view: <TYPES as NodeType>::View,
263 epoch: Option<TYPES::Epoch>,
264 action: hotshot_types::event::HotShotAction,
265 ) -> Result<()> {
266 if self.should_return_err.load(Ordering::Relaxed) {
267 bail!("Failed to append Action to storage");
268 }
269 let mut inner = self.inner.write().await;
270 if matches!(
271 action,
272 HotShotAction::Vote | HotShotAction::Propose | HotShotAction::TimeoutVote
273 ) {
274 if view > inner.action {
275 inner.action = view;
276 }
277 if epoch > inner.epoch {
278 inner.epoch = epoch;
279 }
280 }
281 if matches!(action, HotShotAction::Vote) {
282 inner.restart_view = view + 1;
283 }
284 Self::run_delay_settings_from_config(&self.delay_config).await;
285 Ok(())
286 }
287
288 async fn update_high_qc(
289 &self,
290 new_high_qc: hotshot_types::simple_certificate::QuorumCertificate<TYPES>,
291 ) -> Result<()> {
292 if self.should_return_err.load(Ordering::Relaxed) {
293 bail!("Failed to update high qc to storage");
294 }
295 Self::run_delay_settings_from_config(&self.delay_config).await;
296 let mut inner = self.inner.write().await;
297 if let Some(ref current_high_qc) = inner.high_qc {
298 if new_high_qc.view_number() > current_high_qc.view_number() {
299 inner.high_qc = Some(new_high_qc);
300 }
301 } else {
302 inner.high_qc = Some(new_high_qc);
303 }
304 Ok(())
305 }
306
307 async fn update_eqc(
309 &self,
310 high_qc: QuorumCertificate2<TYPES>,
311 next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
312 ) -> Result<()> {
313 if self.should_return_err.load(Ordering::Relaxed) {
314 bail!("Failed to update eqc in storage");
315 }
316 Self::run_delay_settings_from_config(&self.delay_config).await;
317 let mut inner = self.inner.write().await;
318 if let Some((ref current_high_qc, _)) = inner.eqc {
319 if high_qc.view_number() > current_high_qc.view_number() {
320 inner.eqc = Some((high_qc, next_epoch_high_qc));
321 }
322 } else {
323 inner.eqc = Some((high_qc, next_epoch_high_qc));
324 }
325 Ok(())
326 }
327
328 async fn update_high_qc2(
329 &self,
330 new_high_qc: hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
331 ) -> Result<()> {
332 if self.should_return_err.load(Ordering::Relaxed) {
333 bail!("Failed to update high qc to storage");
334 }
335 Self::run_delay_settings_from_config(&self.delay_config).await;
336 let mut inner = self.inner.write().await;
337 if let Some(ref current_high_qc) = inner.high_qc2 {
338 if new_high_qc.view_number() > current_high_qc.view_number() {
339 inner.high_qc2 = Some(new_high_qc);
340 }
341 } else {
342 inner.high_qc2 = Some(new_high_qc);
343 }
344 Ok(())
345 }
346
347 async fn update_state_cert(
348 &self,
349 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
350 ) -> Result<()> {
351 if self.should_return_err.load(Ordering::Relaxed) {
352 bail!("Failed to update state_cert to storage");
353 }
354 Self::run_delay_settings_from_config(&self.delay_config).await;
355 self.inner
356 .write()
357 .await
358 .state_certs
359 .insert(state_cert.epoch, state_cert);
360 Ok(())
361 }
362
363 async fn update_next_epoch_high_qc2(
364 &self,
365 new_next_epoch_high_qc: hotshot_types::simple_certificate::NextEpochQuorumCertificate2<
366 TYPES,
367 >,
368 ) -> Result<()> {
369 if self.should_return_err.load(Ordering::Relaxed) {
370 bail!("Failed to update next epoch high qc to storage");
371 }
372 Self::run_delay_settings_from_config(&self.delay_config).await;
373 let mut inner = self.inner.write().await;
374 if let Some(ref current_next_epoch_high_qc) = inner.next_epoch_high_qc2 {
375 if new_next_epoch_high_qc.view_number() > current_next_epoch_high_qc.view_number() {
376 inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
377 }
378 } else {
379 inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
380 }
381 Ok(())
382 }
383
384 async fn update_decided_upgrade_certificate(
385 &self,
386 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
387 ) -> Result<()> {
388 *self.decided_upgrade_certificate.write().await = decided_upgrade_certificate;
389
390 Ok(())
391 }
392
393 async fn migrate_storage(&self) -> Result<()> {
394 let mut storage_writer = self.inner.write().await;
395
396 for (view, proposal) in storage_writer.proposals.clone().iter() {
397 storage_writer
398 .proposals2
399 .insert(*view, convert_proposal(proposal.clone()));
400 }
401
402 Ok(())
403 }
404
405 async fn store_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()> {
406 let mut inner = self.inner.write().await;
407
408 inner.drb_results.insert(epoch, drb_result);
409
410 Ok(())
411 }
412
413 async fn store_epoch_root(
414 &self,
415 epoch: TYPES::Epoch,
416 block_header: TYPES::BlockHeader,
417 ) -> Result<()> {
418 let mut inner = self.inner.write().await;
419
420 inner.epoch_roots.insert(epoch, block_header);
421
422 Ok(())
423 }
424
425 async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()> {
426 let mut inner = self.inner.write().await;
427
428 inner.drb_inputs.insert(drb_input.epoch, drb_input);
429
430 Ok(())
431 }
432
433 async fn load_drb_input(&self, epoch: u64) -> Result<DrbInput> {
434 let inner = self.inner.read().await;
435
436 match inner.drb_inputs.get(&epoch) {
437 Some(drb_input) => Ok(drb_input.clone()),
438 None => Err(anyhow!("Missing DrbInput for epoch {}", epoch)),
439 }
440 }
441}