1use std::sync::Arc;
13
14use anyhow::{anyhow, ensure, Result};
15use async_trait::async_trait;
16use futures::future::BoxFuture;
17
18use super::node_implementation::NodeType;
19use crate::{
20 data::{
21 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
22 DaProposal, DaProposal2, QuorumProposal, QuorumProposal2, QuorumProposalWrapper,
23 VidCommitment, VidDisperseShare,
24 },
25 drb::{DrbInput, DrbResult},
26 event::HotShotAction,
27 message::{convert_proposal, Proposal},
28 simple_certificate::{
29 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
30 QuorumCertificate2, UpgradeCertificate,
31 },
32};
33
34#[async_trait]
36pub trait Storage<TYPES: NodeType>: Send + Sync + Clone + 'static {
37 async fn append_vid(&self, proposal: &Proposal<TYPES, ADVZDisperseShare<TYPES>>) -> Result<()>;
39 async fn append_vid2(&self, proposal: &Proposal<TYPES, VidDisperseShare2<TYPES>>)
41 -> Result<()>;
42
43 async fn append_vid_general(
44 &self,
45 proposal: &Proposal<TYPES, VidDisperseShare<TYPES>>,
46 ) -> Result<()> {
47 let signature = proposal.signature.clone();
48 match &proposal.data {
49 VidDisperseShare::V0(share) => {
50 self.append_vid(&Proposal {
51 data: share.clone(),
52 signature,
53 _pd: std::marker::PhantomData,
54 })
55 .await
56 },
57 VidDisperseShare::V1(share) => {
58 self.append_vid2(&Proposal {
59 data: share.clone(),
60 signature,
61 _pd: std::marker::PhantomData,
62 })
63 .await
64 },
65 }
66 }
67 async fn append_da(
69 &self,
70 proposal: &Proposal<TYPES, DaProposal<TYPES>>,
71 vid_commit: VidCommitment,
72 ) -> Result<()>;
73 async fn append_da2(
75 &self,
76 proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
77 vid_commit: VidCommitment,
78 ) -> Result<()> {
79 self.append_da(&convert_proposal(proposal.clone()), vid_commit)
80 .await
81 }
82 async fn append_proposal(
84 &self,
85 proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
86 ) -> Result<()>;
87 async fn append_proposal2(
89 &self,
90 proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
91 ) -> Result<()>;
92 async fn append_proposal_wrapper(
94 &self,
95 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
96 ) -> Result<()> {
97 self.append_proposal2(&convert_proposal(proposal.clone()))
98 .await
99 }
100 async fn record_action(
102 &self,
103 view: TYPES::View,
104 epoch: Option<TYPES::Epoch>,
105 action: HotShotAction,
106 ) -> Result<()>;
107 async fn update_high_qc(&self, high_qc: QuorumCertificate<TYPES>) -> Result<()>;
109 async fn update_high_qc2(&self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
111 self.update_high_qc(high_qc.to_qc()).await
112 }
113 async fn update_state_cert(
115 &self,
116 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
117 ) -> Result<()>;
118
119 async fn update_high_qc2_and_state_cert(
120 &self,
121 high_qc: QuorumCertificate2<TYPES>,
122 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
123 ) -> Result<()> {
124 self.update_high_qc2(high_qc).await?;
125 self.update_state_cert(state_cert).await
126 }
127 async fn update_next_epoch_high_qc2(
129 &self,
130 _next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
131 ) -> Result<()>;
132
133 async fn update_eqc(
135 &self,
136 _high_qc: QuorumCertificate2<TYPES>,
137 _next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
138 ) -> Result<()>;
139
140 async fn update_decided_upgrade_certificate(
142 &self,
143 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
144 ) -> Result<()>;
145 async fn migrate_storage(&self) -> Result<()> {
147 Ok(())
148 }
149 async fn store_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()>;
151 async fn store_epoch_root(
153 &self,
154 epoch: TYPES::Epoch,
155 block_header: TYPES::BlockHeader,
156 ) -> Result<()>;
157 async fn load_drb_result(&self, epoch: TYPES::Epoch) -> Result<DrbResult> {
158 match self.load_drb_input(*epoch).await {
159 Ok(drb_input) => {
160 ensure!(drb_input.iteration == drb_input.difficulty_level);
161
162 Ok(drb_input.value)
163 },
164 Err(e) => Err(e),
165 }
166 }
167 async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()>;
168 async fn load_drb_input(&self, _epoch: u64) -> Result<DrbInput>;
169}
170
171pub async fn load_drb_input_impl<TYPES: NodeType>(
172 storage: impl Storage<TYPES>,
173 epoch: u64,
174) -> Result<DrbInput> {
175 storage.load_drb_input(epoch).await
176}
177
178pub type LoadDrbProgressFn =
179 std::sync::Arc<dyn Fn(u64) -> BoxFuture<'static, Result<DrbInput>> + Send + Sync>;
180
181pub fn load_drb_progress_fn<TYPES: NodeType>(
182 storage: impl Storage<TYPES> + 'static,
183) -> LoadDrbProgressFn {
184 Arc::new(move |epoch| {
185 let storage = storage.clone();
186 Box::pin(load_drb_input_impl(storage, epoch))
187 })
188}
189
190pub fn null_load_drb_progress_fn() -> LoadDrbProgressFn {
191 Arc::new(move |_drb_input| {
192 Box::pin(async { Err(anyhow!("Using null implementation of load_drb_input")) })
193 })
194}
195
196pub async fn store_drb_input_impl<TYPES: NodeType>(
197 storage: impl Storage<TYPES>,
198 drb_input: DrbInput,
199) -> Result<()> {
200 storage.store_drb_input(drb_input).await
201}
202
203pub type StoreDrbProgressFn =
204 std::sync::Arc<dyn Fn(DrbInput) -> BoxFuture<'static, Result<()>> + Send + Sync>;
205
206pub fn store_drb_progress_fn<TYPES: NodeType>(
207 storage: impl Storage<TYPES> + 'static,
208) -> StoreDrbProgressFn {
209 Arc::new(move |drb_input| {
210 let storage = storage.clone();
211 Box::pin(store_drb_input_impl(storage, drb_input))
212 })
213}
214
215pub fn null_store_drb_progress_fn() -> StoreDrbProgressFn {
216 Arc::new(move |_drb_input| Box::pin(async { Ok(()) }))
217}
218
219pub type StoreDrbResultFn<TYPES> = Arc<
220 Box<
221 dyn Fn(<TYPES as NodeType>::Epoch, DrbResult) -> BoxFuture<'static, Result<()>>
222 + Send
223 + Sync
224 + 'static,
225 >,
226>;
227
228async fn store_drb_result_impl<TYPES: NodeType>(
229 storage: impl Storage<TYPES>,
230 epoch: TYPES::Epoch,
231 drb_result: DrbResult,
232) -> Result<()> {
233 storage.store_drb_result(epoch, drb_result).await
234}
235
236pub fn store_drb_result_fn<TYPES: NodeType>(
238 storage: impl Storage<TYPES> + 'static,
239) -> StoreDrbResultFn<TYPES> {
240 Arc::new(Box::new(move |epoch, drb_result| {
241 let st = storage.clone();
242 Box::pin(store_drb_result_impl(st, epoch, drb_result))
243 }))
244}