1use std::{
8 collections::{BTreeMap, HashMap},
9 sync::{
10 Arc,
11 atomic::{AtomicBool, Ordering},
12 },
13};
14
15use anyhow::{Result, anyhow, bail};
16use async_lock::RwLock;
17use async_trait::async_trait;
18use hotshot_types::{
19 data::{
20 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
21 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
22 },
23 drb::{DrbInput, DrbResult},
24 event::HotShotAction,
25 message::{Proposal, convert_proposal},
26 simple_certificate::{
27 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
28 UpgradeCertificate,
29 },
30 traits::{node_implementation::NodeType, storage::Storage},
31 vote::HasViewNumber,
32};
33
34use crate::testable_delay::{DelayConfig, SupportedTraitTypesForAsyncDelay, TestableDelay};
35
36type VidShares<TYPES> = BTreeMap<
37 ViewNumber,
38 HashMap<<TYPES as NodeType>::SignatureKey, Proposal<TYPES, VidDisperseShare<TYPES>>>,
39>;
40#[derive(Clone, Debug)]
41pub struct TestStorageState<TYPES: NodeType> {
42 vids: VidShares<TYPES>,
43 das: HashMap<ViewNumber, Proposal<TYPES, DaProposal<TYPES>>>,
44 da2s: HashMap<ViewNumber, Proposal<TYPES, DaProposal2<TYPES>>>,
45 pub proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposal<TYPES>>>,
46 pub proposals2: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposal2<TYPES>>>,
47 pub proposals_wrapper: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
48 high_qc: Option<hotshot_types::simple_certificate::QuorumCertificate<TYPES>>,
49 high_qc2: Option<hotshot_types::simple_certificate::QuorumCertificate2<TYPES>>,
50 eqc: Option<(
51 hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
52 hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>,
53 )>,
54 next_epoch_high_qc2:
55 Option<hotshot_types::simple_certificate::NextEpochQuorumCertificate2<TYPES>>,
56 action: ViewNumber,
57 epoch: Option<EpochNumber>,
58 state_certs: BTreeMap<EpochNumber, LightClientStateUpdateCertificateV2<TYPES>>,
59 drb_results: BTreeMap<EpochNumber, DrbResult>,
60 drb_inputs: BTreeMap<u64, DrbInput>,
61 epoch_roots: BTreeMap<EpochNumber, TYPES::BlockHeader>,
62 restart_view: ViewNumber,
63}
64
65impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
66 fn default() -> Self {
67 Self {
68 vids: BTreeMap::new(),
69 das: HashMap::new(),
70 da2s: HashMap::new(),
71 proposals: BTreeMap::new(),
72 proposals2: BTreeMap::new(),
73 proposals_wrapper: BTreeMap::new(),
74 high_qc: None,
75 high_qc2: None,
76 eqc: None,
77 next_epoch_high_qc2: None,
78 action: ViewNumber::genesis(),
79 epoch: None,
80 state_certs: BTreeMap::new(),
81 drb_results: BTreeMap::new(),
82 drb_inputs: BTreeMap::new(),
83 epoch_roots: BTreeMap::new(),
84 restart_view: ViewNumber::genesis(),
85 }
86 }
87}
88
89#[derive(Clone, Debug)]
90pub struct TestStorage<TYPES: NodeType> {
91 pub inner: Arc<RwLock<TestStorageState<TYPES>>>,
92 pub should_return_err: Arc<AtomicBool>,
94 pub delay_config: DelayConfig,
95 pub decided_upgrade_certificate: Arc<RwLock<Option<UpgradeCertificate<TYPES>>>>,
96}
97
98impl<TYPES: NodeType> Default for TestStorage<TYPES> {
99 fn default() -> Self {
100 Self {
101 inner: Arc::new(RwLock::new(TestStorageState::default())),
102 should_return_err: Arc::new(AtomicBool::new(false)),
103 delay_config: DelayConfig::default(),
104 decided_upgrade_certificate: Arc::new(RwLock::new(None)),
105 }
106 }
107}
108
109#[async_trait]
110impl<TYPES: NodeType> TestableDelay for TestStorage<TYPES> {
111 async fn run_delay_settings_from_config(delay_config: &DelayConfig) {
112 if let Some(settings) = delay_config.get_setting(&SupportedTraitTypesForAsyncDelay::Storage)
113 {
114 Self::handle_async_delay(settings).await;
115 }
116 }
117}
118
119impl<TYPES: NodeType> TestStorage<TYPES> {
120 pub async fn proposals_cloned(
121 &self,
122 ) -> BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
123 self.inner.read().await.proposals_wrapper.clone()
124 }
125
126 pub async fn high_qc_cloned(&self) -> Option<QuorumCertificate2<TYPES>> {
127 self.inner.read().await.high_qc2.clone()
128 }
129
130 pub async fn next_epoch_high_qc_cloned(&self) -> Option<NextEpochQuorumCertificate2<TYPES>> {
131 self.inner.read().await.next_epoch_high_qc2.clone()
132 }
133
134 pub async fn decided_upgrade_certificate(&self) -> Option<UpgradeCertificate<TYPES>> {
135 self.decided_upgrade_certificate.read().await.clone()
136 }
137
138 pub async fn last_actioned_view(&self) -> ViewNumber {
139 self.inner.read().await.action
140 }
141
142 pub async fn restart_view(&self) -> ViewNumber {
143 self.inner.read().await.restart_view
144 }
145
146 pub async fn last_actioned_epoch(&self) -> Option<EpochNumber> {
147 self.inner.read().await.epoch
148 }
149 pub async fn vids_cloned(&self) -> VidShares<TYPES> {
150 self.inner.read().await.vids.clone()
151 }
152
153 pub async fn state_cert_cloned(&self) -> Option<LightClientStateUpdateCertificateV2<TYPES>> {
154 self.inner
155 .read()
156 .await
157 .state_certs
158 .iter()
159 .next_back()
160 .map(|(_, cert)| cert.clone())
161 }
162}
163
164#[async_trait]
165impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {
166 async fn append_vid(&self, proposal: &Proposal<TYPES, VidDisperseShare<TYPES>>) -> Result<()> {
167 if self.should_return_err.load(Ordering::Relaxed) {
168 bail!("Failed to append VID proposal to storage");
169 }
170 Self::run_delay_settings_from_config(&self.delay_config).await;
171 let mut inner = self.inner.write().await;
172 inner
173 .vids
174 .entry(proposal.data.view_number())
175 .or_default()
176 .insert(proposal.data.recipient_key().clone(), proposal.clone());
177 Ok(())
178 }
179
180 async fn append_da(
181 &self,
182 proposal: &Proposal<TYPES, DaProposal<TYPES>>,
183 _vid_commit: VidCommitment,
184 ) -> Result<()> {
185 if self.should_return_err.load(Ordering::Relaxed) {
186 bail!("Failed to append DA proposal to storage");
187 }
188 Self::run_delay_settings_from_config(&self.delay_config).await;
189 let mut inner = self.inner.write().await;
190 inner
191 .das
192 .insert(proposal.data.view_number, proposal.clone());
193 Ok(())
194 }
195
196 async fn append_da2(
197 &self,
198 proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
199 _vid_commit: VidCommitment,
200 ) -> Result<()> {
201 if self.should_return_err.load(Ordering::Relaxed) {
202 bail!("Failed to append DA proposal (2) to storage");
203 }
204 Self::run_delay_settings_from_config(&self.delay_config).await;
205 let mut inner = self.inner.write().await;
206 inner
207 .da2s
208 .insert(proposal.data.view_number, proposal.clone());
209 Ok(())
210 }
211
212 async fn append_proposal(
213 &self,
214 proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
215 ) -> Result<()> {
216 if self.should_return_err.load(Ordering::Relaxed) {
217 bail!("Failed to append Quorum proposal (1) to storage");
218 }
219 Self::run_delay_settings_from_config(&self.delay_config).await;
220 let mut inner = self.inner.write().await;
221 inner
222 .proposals
223 .insert(proposal.data.view_number, proposal.clone());
224 Ok(())
225 }
226
227 async fn append_proposal2(
228 &self,
229 proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
230 ) -> Result<()> {
231 if self.should_return_err.load(Ordering::Relaxed) {
232 bail!("Failed to append Quorum proposal (2) to storage");
233 }
234 Self::run_delay_settings_from_config(&self.delay_config).await;
235 let mut inner = self.inner.write().await;
236 inner
237 .proposals2
238 .insert(proposal.data.view_number, proposal.clone());
239 Ok(())
240 }
241
242 async fn append_proposal_wrapper(
243 &self,
244 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
245 ) -> Result<()> {
246 if self.should_return_err.load(Ordering::Relaxed) {
247 bail!("Failed to append Quorum proposal (wrapped) to storage");
248 }
249 Self::run_delay_settings_from_config(&self.delay_config).await;
250 let mut inner = self.inner.write().await;
251 inner
252 .proposals_wrapper
253 .insert(proposal.data.view_number(), proposal.clone());
254 Ok(())
255 }
256
257 async fn record_action(
258 &self,
259 view: ViewNumber,
260 epoch: Option<EpochNumber>,
261 action: hotshot_types::event::HotShotAction,
262 ) -> Result<()> {
263 if self.should_return_err.load(Ordering::Relaxed) {
264 bail!("Failed to append Action to storage");
265 }
266 let mut inner = self.inner.write().await;
267 if matches!(
268 action,
269 HotShotAction::Vote | HotShotAction::Propose | HotShotAction::TimeoutVote
270 ) {
271 if view > inner.action {
272 inner.action = view;
273 }
274 if epoch > inner.epoch {
275 inner.epoch = epoch;
276 }
277 }
278 if matches!(action, HotShotAction::Vote) {
279 inner.restart_view = view + 1;
280 }
281 Self::run_delay_settings_from_config(&self.delay_config).await;
282 Ok(())
283 }
284
285 async fn update_high_qc(
286 &self,
287 new_high_qc: hotshot_types::simple_certificate::QuorumCertificate<TYPES>,
288 ) -> Result<()> {
289 if self.should_return_err.load(Ordering::Relaxed) {
290 bail!("Failed to update high qc to storage");
291 }
292 Self::run_delay_settings_from_config(&self.delay_config).await;
293 let mut inner = self.inner.write().await;
294 if let Some(ref current_high_qc) = inner.high_qc {
295 if new_high_qc.view_number() > current_high_qc.view_number() {
296 inner.high_qc = Some(new_high_qc);
297 }
298 } else {
299 inner.high_qc = Some(new_high_qc);
300 }
301 Ok(())
302 }
303
304 async fn update_eqc(
306 &self,
307 high_qc: QuorumCertificate2<TYPES>,
308 next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
309 ) -> Result<()> {
310 if self.should_return_err.load(Ordering::Relaxed) {
311 bail!("Failed to update eqc in storage");
312 }
313 Self::run_delay_settings_from_config(&self.delay_config).await;
314 let mut inner = self.inner.write().await;
315 if let Some((ref current_high_qc, _)) = inner.eqc {
316 if high_qc.view_number() > current_high_qc.view_number() {
317 inner.eqc = Some((high_qc, next_epoch_high_qc));
318 }
319 } else {
320 inner.eqc = Some((high_qc, next_epoch_high_qc));
321 }
322 Ok(())
323 }
324
325 async fn update_high_qc2(
326 &self,
327 new_high_qc: hotshot_types::simple_certificate::QuorumCertificate2<TYPES>,
328 ) -> Result<()> {
329 if self.should_return_err.load(Ordering::Relaxed) {
330 bail!("Failed to update high qc to storage");
331 }
332 Self::run_delay_settings_from_config(&self.delay_config).await;
333 let mut inner = self.inner.write().await;
334 if let Some(ref current_high_qc) = inner.high_qc2 {
335 if new_high_qc.view_number() > current_high_qc.view_number() {
336 inner.high_qc2 = Some(new_high_qc);
337 }
338 } else {
339 inner.high_qc2 = Some(new_high_qc);
340 }
341 Ok(())
342 }
343
344 async fn update_state_cert(
345 &self,
346 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
347 ) -> Result<()> {
348 if self.should_return_err.load(Ordering::Relaxed) {
349 bail!("Failed to update state_cert to storage");
350 }
351 Self::run_delay_settings_from_config(&self.delay_config).await;
352 self.inner
353 .write()
354 .await
355 .state_certs
356 .insert(state_cert.epoch, state_cert);
357 Ok(())
358 }
359
360 async fn update_next_epoch_high_qc2(
361 &self,
362 new_next_epoch_high_qc: hotshot_types::simple_certificate::NextEpochQuorumCertificate2<
363 TYPES,
364 >,
365 ) -> Result<()> {
366 if self.should_return_err.load(Ordering::Relaxed) {
367 bail!("Failed to update next epoch high qc to storage");
368 }
369 Self::run_delay_settings_from_config(&self.delay_config).await;
370 let mut inner = self.inner.write().await;
371 if let Some(ref current_next_epoch_high_qc) = inner.next_epoch_high_qc2 {
372 if new_next_epoch_high_qc.view_number() > current_next_epoch_high_qc.view_number() {
373 inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
374 }
375 } else {
376 inner.next_epoch_high_qc2 = Some(new_next_epoch_high_qc);
377 }
378 Ok(())
379 }
380
381 async fn update_decided_upgrade_certificate(
382 &self,
383 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
384 ) -> Result<()> {
385 *self.decided_upgrade_certificate.write().await = decided_upgrade_certificate;
386
387 Ok(())
388 }
389
390 async fn migrate_storage(&self) -> Result<()> {
391 let mut storage_writer = self.inner.write().await;
392
393 for (view, proposal) in storage_writer.proposals.clone().iter() {
394 storage_writer
395 .proposals2
396 .insert(*view, convert_proposal(proposal.clone()));
397 }
398
399 Ok(())
400 }
401
402 async fn store_drb_result(&self, epoch: EpochNumber, drb_result: DrbResult) -> Result<()> {
403 let mut inner = self.inner.write().await;
404
405 inner.drb_results.insert(epoch, drb_result);
406
407 Ok(())
408 }
409
410 async fn store_epoch_root(
411 &self,
412 epoch: EpochNumber,
413 block_header: TYPES::BlockHeader,
414 ) -> Result<()> {
415 let mut inner = self.inner.write().await;
416
417 inner.epoch_roots.insert(epoch, block_header);
418
419 Ok(())
420 }
421
422 async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()> {
423 let mut inner = self.inner.write().await;
424
425 inner.drb_inputs.insert(drb_input.epoch, drb_input);
426
427 Ok(())
428 }
429
430 async fn load_drb_input(&self, epoch: u64) -> Result<DrbInput> {
431 let inner = self.inner.read().await;
432
433 match inner.drb_inputs.get(&epoch) {
434 Some(drb_input) => Ok(drb_input.clone()),
435 None => Err(anyhow!("Missing DrbInput for epoch {}", epoch)),
436 }
437 }
438}