1use std::{
8 collections::{BTreeMap, HashMap},
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14
15use anyhow::{anyhow, bail, Result};
16use async_lock::RwLock;
17use async_trait::async_trait;
18use hotshot_types::{
19 data::{
20 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
21 DaProposal, DaProposal2, QuorumProposal, QuorumProposal2, QuorumProposalWrapper,
22 VidCommitment,
23 },
24 drb::{DrbInput, DrbResult},
25 event::HotShotAction,
26 message::{convert_proposal, Proposal},
27 simple_certificate::{
28 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
29 UpgradeCertificate,
30 },
31 traits::{
32 node_implementation::{ConsensusTime, NodeType},
33 storage::Storage,
34 },
35 vote::HasViewNumber,
36};
37
38use crate::testable_delay::{DelayConfig, SupportedTraitTypesForAsyncDelay, TestableDelay};
39
40type VidShares<TYPES> = BTreeMap<
41 <TYPES as NodeType>::View,
42 HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, ADVZDisperseShare<TYPES>>>,
43>;
44type VidShares2<TYPES> = BTreeMap<
45 <TYPES as NodeType>::View,
46 HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare2<TYPES>>>,
47>;
48
49#[derive(Clone, Debug)]
50pub struct TestStorageState<TYPES: NodeType> {
51 vids: VidShares<TYPES>,
52 vid2: VidShares2<TYPES>,
53 das: HashMap<TYPES::View, Proposal<TYPES, DaProposal<TYPES>>>,
54 da2s: HashMap<TYPES::View, Proposal<TYPES, DaProposal2<TYPES>>>,
55 pub proposals: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal<TYPES>>>,
56 pub proposals2: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposal2<TYPES>>>,
57 pub proposals_wrapper: BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
58 high_qc: Option<hotshot_types::simple_certificate::QuorumCertificate<TYPES>>,
59 high_qc2: Option<hotshot_types::simple_certificate::QuorumCertificate2<TYPES>>,
60 eqc: Option<(
61 hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
62 hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>,
63 )>,
64 next_epoch_high_qc2:
65 Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
66 action: TYPES::View,
67 epoch: Option<TYPES::Epoch>,
68 state_certs: BTreeMap<TYPES::Epoch, LightClientStateUpdateCertificateV2<TYPES>>,
69 drb_results: BTreeMap<TYPES::Epoch, DrbResult>,
70 drb_inputs: BTreeMap<u64, DrbInput>,
71 epoch_roots: BTreeMap<TYPES::Epoch, TYPES::BlockHeader>,
72 restart_view: TYPES::View,
73}
74
75impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
76 fn default() -> Self {
77 Self {
78 vids: BTreeMap::new(),
79 vid2: BTreeMap::new(),
80 das: HashMap::new(),
81 da2s: HashMap::new(),
82 proposals: BTreeMap::new(),
83 proposals2: BTreeMap::new(),
84 proposals_wrapper: BTreeMap::new(),
85 high_qc: None,
86 high_qc2: None,
87 eqc: None,
88 next_epoch_high_qc2: None,
89 action: TYPES::View::genesis(),
90 epoch: None,
91 state_certs: BTreeMap::new(),
92 drb_results: BTreeMap::new(),
93 drb_inputs: BTreeMap::new(),
94 epoch_roots: BTreeMap::new(),
95 restart_view: TYPES::View::genesis(),
96 }
97 }
98}
99
100#[derive(Clone, Debug)]
101pub struct TestStorage<TYPES: NodeType> {
102 pub inner: Arc<RwLock<TestStorageState<TYPES>>>,
103 pub should_return_err: Arc<AtomicBool>,
105 pub delay_config: DelayConfig,
106 pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
107}
108
109impl<TYPES: NodeType> Default for TestStorage<TYPES> {
110 fn default() -> Self {
111 Self {
112 inner: Arc::new(RwLock::new(TestStorageState::default())),
113 should_return_err: Arc::new(AtomicBool::new(false)),
114 delay_config: DelayConfig::default(),
115 decided_upgrade_certificate: Arc::new(RwLock::new(None)),
116 }
117 }
118}
119
120#[async_trait]
121impl<TYPES: NodeType> TestableDelay for TestStorage<TYPES> {
122 async fn run_delay_settings_from_config(delay_config: &DelayConfig) {
123 if let Some(settings) = delay_config.get_setting(&SupportedTraitTypesForAsyncDelay::Storage)
124 {
125 Self::handle_async_delay(settings).await;
126 }
127 }
128}
129
130impl<TYPES: NodeType> TestStorage<TYPES> {
131 pub async fn proposals_cloned(
132 &self,
133 ) -> BTreeMap<TYPES::View, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
134 self.inner.read().await.proposals_wrapper.clone()
135 }
136
137 pub async fn high_qc_cloned(&self) -> Option<QuorumCertificate2<TYPES>> {
138 self.inner.read().await.high_qc2.clone()
139 }
140
141 pub async fn next_epoch_high_qc_cloned(&self) -> Option<NextEpochQuorumCertificate2<TYPES>> {
142 self.inner.read().await.next_epoch_high_qc2.clone()
143 }
144
145 pub async fn decided_upgrade_certificate(&self) -> Option<UpgradeCertificate<TYPES>> {
146 self.decided_upgrade_certificate.read().await.clone()
147 }
148
149 pub async fn last_actioned_view(&self) -> TYPES::View {
150 self.inner.read().await.action
151 }
152
153 pub async fn restart_view(&self) -> TYPES::View {
154 self.inner.read().await.restart_view
155 }
156
157 pub async fn last_actioned_epoch(&self) -> Option<TYPES::Epoch> {
158 self.inner.read().await.epoch
159 }
160 pub async fn vids_cloned(&self) -> VidShares2<TYPES> {
161 self.inner.read().await.vid2.clone()
162 }
163
164 pub async fn state_cert_cloned(&self) -> Option<LightClientStateUpdateCertificateV2<TYPES>> {
165 self.inner
166 .read()
167 .await
168 .state_certs
169 .iter()
170 .next_back()
171 .map(|(_, cert)| cert.clone())
172 }
173}
174
175#[async_trait]
176impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {
177 async fn append_vid(&self, proposal: &Proposal<TYPES, ADVZDisperseShare<TYPES>>) -> Result<()> {
178 if self.should_return_err.load(Ordering::Relaxed) {
179 bail!("Failed to append VID proposal to storage");
180 }
181 Self::run_delay_settings_from_config(&self.delay_config).await;
182 let mut inner = self.inner.write().await;
183 inner
184 .vids
185 .entry(proposal.data.view_number)
186 .or_default()
187 .insert(proposal.data.recipient_key.clone(), proposal.clone());
188 Ok(())
189 }
190
191 async fn append_vid2(
192 &self,
193 proposal: &Proposal<TYPES, VidDisperseShare2<TYPES>>,
194 ) -> Result<()> {
195 if self.should_return_err.load(Ordering::Relaxed) {
196 bail!("Failed to append VID proposal to storage");
197 }
198 Self::run_delay_settings_from_config(&self.delay_config).await;
199 let mut inner = self.inner.write().await;
200 inner
201 .vid2
202 .entry(proposal.data.view_number)
203 .or_default()
204 .insert(proposal.data.recipient_key.clone(), proposal.clone());
205 Ok(())
206 }
207
208 async fn append_da(
209 &self,
210 proposal: &Proposal<TYPES, DaProposal<TYPES>>,
211 _vid_commit: VidCommitment,
212 ) -> Result<()> {
213 if self.should_return_err.load(Ordering::Relaxed) {
214 bail!("Failed to append DA proposal to storage");
215 }
216 Self::run_delay_settings_from_config(&self.delay_config).await;
217 let mut inner = self.inner.write().await;
218 inner
219 .das
220 .insert(proposal.data.view_number, proposal.clone());
221 Ok(())
222 }
223
224 async fn append_da2(
225 &self,
226 proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
227 _vid_commit: VidCommitment,
228 ) -> Result<()> {
229 if self.should_return_err.load(Ordering::Relaxed) {
230 bail!("Failed to append DA proposal (2) to storage");
231 }
232 Self::run_delay_settings_from_config(&self.delay_config).await;
233 let mut inner = self.inner.write().await;
234 inner
235 .da2s
236 .insert(proposal.data.view_number, proposal.clone());
237 Ok(())
238 }
239
240 async fn append_proposal(
241 &self,
242 proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
243 ) -> Result<()> {
244 if self.should_return_err.load(Ordering::Relaxed) {
245 bail!("Failed to append Quorum proposal (1) to storage");
246 }
247 Self::run_delay_settings_from_config(&self.delay_config).await;
248 let mut inner = self.inner.write().await;
249 inner
250 .proposals
251 .insert(proposal.data.view_number, proposal.clone());
252 Ok(())
253 }
254
255 async fn append_proposal2(
256 &self,
257 proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
258 ) -> Result<()> {
259 if self.should_return_err.load(Ordering::Relaxed) {
260 bail!("Failed to append Quorum proposal (2) to storage");
261 }
262 Self::run_delay_settings_from_config(&self.delay_config).await;
263 let mut inner = self.inner.write().await;
264 inner
265 .proposals2
266 .insert(proposal.data.view_number, proposal.clone());
267 Ok(())
268 }
269
270 async fn append_proposal_wrapper(
271 &self,
272 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
273 ) -> Result<()> {
274 if self.should_return_err.load(Ordering::Relaxed) {
275 bail!("Failed to append Quorum proposal (wrapped) to storage");
276 }
277 Self::run_delay_settings_from_config(&self.delay_config).await;
278 let mut inner = self.inner.write().await;
279 inner
280 .proposals_wrapper
281 .insert(proposal.data.view_number(), proposal.clone());
282 Ok(())
283 }
284
285 async fn record_action(
286 &self,
287 view: <TYPES as NodeType>::View,
288 epoch: Option<TYPES::Epoch>,
289 action: hotshot_types::event::HotShotAction,
290 ) -> Result<()> {
291 if self.should_return_err.load(Ordering::Relaxed) {
292 bail!("Failed to append Action to storage");
293 }
294 let mut inner = self.inner.write().await;
295 if matches!(
296 action,
297 HotShotAction::Vote | HotShotAction::Propose | HotShotAction::TimeoutVote
298 ) {
299 if view > inner.action {
300 inner.action = view;
301 }
302 if epoch > inner.epoch {
303 inner.epoch = epoch;
304 }
305 }
306 if matches!(action, HotShotAction::Vote) {
307 inner.restart_view = view + 1;
308 }
309 Self::run_delay_settings_from_config(&self.delay_config).await;
310 Ok(())
311 }
312
313 async fn update_high_qc(
314 &self,
315 new_high_qc: hotshot_types::simple_certificate::QuorumCertificate<TYPES>,
316 ) -> Result<()> {
317 if self.should_return_err.load(Ordering::Relaxed) {
318 bail!("Failed to update high qc to storage");
319 }
320 Self::run_delay_settings_from_config(&self.delay_config).await;
321 let mut inner = self.inner.write().await;
322 if let Some(ref current_high_qc) = inner.high_qc {
323 if new_high_qc.view_number() > current_high_qc.view_number() {
324 inner.high_qc = Some(new_high_qc);
325 }
326 } else {
327 inner.high_qc = Some(new_high_qc);
328 }
329 Ok(())
330 }
331
332 async fn update_eqc(
334 &self,
335 high_qc: QuorumCertificate2<TYPES>,
336 next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
337 ) -> Result<()> {
338 if self.should_return_err.load(Ordering::Relaxed) {
339 bail!("Failed to update eqc in storage");
340 }
341 Self::run_delay_settings_from_config(&self.delay_config).await;
342 let mut inner = self.inner.write().await;
343 if let Some((ref current_high_qc, _)) = inner.eqc {
344 if high_qc.view_number() > current_high_qc.view_number() {
345 inner.eqc = Some((high_qc, next_epoch_high_qc));
346 }
347 } else {
348 inner.eqc = Some((high_qc, next_epoch_high_qc));
349 }
350 Ok(())
351 }
352
353 async fn update_high_qc2(
354 &self,
355 new_high_qc: hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
356 ) -> Result<()> {
357 if self.should_return_err.load(Ordering::Relaxed) {
358 bail!("Failed to update high qc to storage");
359 }
360 Self::run_delay_settings_from_config(&self.delay_config).await;
361 let mut inner = self.inner.write().await;
362 if let Some(ref current_high_qc) = inner.high_qc2 {
363 if new_high_qc.view_number() > current_high_qc.view_number() {
364 inner.high_qc2 = Some(new_high_qc);
365 }
366 } else {
367 inner.high_qc2 = Some(new_high_qc);
368 }
369 Ok(())
370 }
371
372 async fn update_state_cert(
373 &self,
374 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
375 ) -> Result<()> {
376 if self.should_return_err.load(Ordering::Relaxed) {
377 bail!("Failed to update state_cert to storage");
378 }
379 Self::run_delay_settings_from_config(&self.delay_config).await;
380 self.inner
381 .write()
382 .await
383 .state_certs
384 .insert(state_cert.epoch, state_cert);
385 Ok(())
386 }
387
388 async fn update_next_epoch_high_qc2(
389 &self,
390 new_next_epoch_high_qc: hotshot_types::simple_certificate::NextEpochQuorumCertificate2<
391 TYPES,
392 >,
393 ) -> Result<()> {
394 if self.should_return_err.load(Ordering::Relaxed) {
395 bail!("Failed to update next epoch high qc to storage");
396 }
397 Self::run_delay_settings_from_config(&self.delay_config).await;
398 let mut inner = self.inner.write().await;
399 if let Some(ref current_next_epoch_high_qc) = inner.next_epoch_high_qc2 {
400 if new_next_epoch_high_qc.view_number() > current_next_epoch_high_qc.view_number() {
401 inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
402 }
403 } else {
404 inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
405 }
406 Ok(())
407 }
408
409 async fn update_decided_upgrade_certificate(
410 &self,
411 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
412 ) -> Result<()> {
413 *self.decided_upgrade_certificate.write().await = decided_upgrade_certificate;
414
415 Ok(())
416 }
417
418 async fn migrate_storage(&self) -> Result<()> {
419 let mut storage_writer = self.inner.write().await;
420
421 for (view, proposal) in storage_writer.proposals.clone().iter() {
422 storage_writer
423 .proposals2
424 .insert(*view, convert_proposal(proposal.clone()));
425 }
426
427 Ok(())
428 }
429
430 async fn store_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()> {
431 let mut inner = self.inner.write().await;
432
433 inner.drb_results.insert(epoch, drb_result);
434
435 Ok(())
436 }
437
438 async fn store_epoch_root(
439 &self,
440 epoch: TYPES::Epoch,
441 block_header: TYPES::BlockHeader,
442 ) -> Result<()> {
443 let mut inner = self.inner.write().await;
444
445 inner.epoch_roots.insert(epoch, block_header);
446
447 Ok(())
448 }
449
450 async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()> {
451 let mut inner = self.inner.write().await;
452
453 inner.drb_inputs.insert(drb_input.epoch, drb_input);
454
455 Ok(())
456 }
457
458 async fn load_drb_input(&self, epoch: u64) -> Result<DrbInput> {
459 let inner = self.inner.read().await;
460
461 match inner.drb_inputs.get(&epoch) {
462 Some(drb_input) => Ok(drb_input.clone()),
463 None => Err(anyhow!("Missing DrbInput for epoch {}", epoch)),
464 }
465 }
466}