1use std::sync::Arc;
13
14use anyhow::{anyhow, ensure, Result};
15use async_trait::async_trait;
16use futures::future::BoxFuture;
17
18use super::node_implementation::NodeType;
19use crate::{
20 data::{
21 vid_disperse::{ADVZDisperseShare, VidDisperseShare2},
22 DaProposal, DaProposal2, QuorumProposal, QuorumProposal2, QuorumProposalWrapper,
23 VidCommitment, VidDisperseShare,
24 },
25 drb::{DrbInput, DrbResult},
26 event::HotShotAction,
27 message::{convert_proposal, Proposal},
28 simple_certificate::{
29 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate,
30 QuorumCertificate2, UpgradeCertificate,
31 },
32};
33
34#[async_trait]
36pub trait Storage<TYPES: NodeType>: Send + Sync + Clone + 'static {
37 async fn append_vid(&self, proposal: &Proposal<TYPES, ADVZDisperseShare<TYPES>>) -> Result<()>;
39 async fn append_vid2(&self, proposal: &Proposal<TYPES, VidDisperseShare2<TYPES>>)
42 -> Result<()>;
43
44 async fn append_vid_general(
45 &self,
46 proposal: &Proposal<TYPES, VidDisperseShare<TYPES>>,
47 ) -> Result<()> {
48 let signature = proposal.signature.clone();
49 match &proposal.data {
50 VidDisperseShare::V0(share) => {
51 self.append_vid(&Proposal {
52 data: share.clone(),
53 signature,
54 _pd: std::marker::PhantomData,
55 })
56 .await
57 },
58 VidDisperseShare::V1(share) => {
59 self.append_vid2(&Proposal {
60 data: share.clone(),
61 signature,
62 _pd: std::marker::PhantomData,
63 })
64 .await
65 },
66 }
67 }
68 async fn append_da(
70 &self,
71 proposal: &Proposal<TYPES, DaProposal<TYPES>>,
72 vid_commit: VidCommitment,
73 ) -> Result<()>;
74 async fn append_da2(
76 &self,
77 proposal: &Proposal<TYPES, DaProposal2<TYPES>>,
78 vid_commit: VidCommitment,
79 ) -> Result<()> {
80 self.append_da(&convert_proposal(proposal.clone()), vid_commit)
81 .await
82 }
83 async fn append_proposal(
85 &self,
86 proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
87 ) -> Result<()>;
88 async fn append_proposal2(
90 &self,
91 proposal: &Proposal<TYPES, QuorumProposal2<TYPES>>,
92 ) -> Result<()>;
93 async fn append_proposal_wrapper(
95 &self,
96 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
97 ) -> Result<()> {
98 self.append_proposal2(&convert_proposal(proposal.clone()))
99 .await
100 }
101 async fn record_action(
103 &self,
104 view: TYPES::View,
105 epoch: Option<TYPES::Epoch>,
106 action: HotShotAction,
107 ) -> Result<()>;
108 async fn update_high_qc(&self, high_qc: QuorumCertificate<TYPES>) -> Result<()>;
110 async fn update_high_qc2(&self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
112 self.update_high_qc(high_qc.to_qc()).await
113 }
114 async fn update_state_cert(
116 &self,
117 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
118 ) -> Result<()>;
119
120 async fn update_high_qc2_and_state_cert(
121 &self,
122 high_qc: QuorumCertificate2<TYPES>,
123 state_cert: LightClientStateUpdateCertificateV2<TYPES>,
124 ) -> Result<()> {
125 self.update_high_qc2(high_qc).await?;
126 self.update_state_cert(state_cert).await
127 }
128 async fn update_next_epoch_high_qc2(
130 &self,
131 _next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
132 ) -> Result<()>;
133
134 async fn update_eqc(
136 &self,
137 _high_qc: QuorumCertificate2<TYPES>,
138 _next_epoch_high_qc: NextEpochQuorumCertificate2<TYPES>,
139 ) -> Result<()>;
140
141 async fn update_decided_upgrade_certificate(
143 &self,
144 decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
145 ) -> Result<()>;
146 async fn migrate_consensus(&self) -> Result<()> {
148 Ok(())
149 }
150 async fn store_drb_result(&self, epoch: TYPES::Epoch, drb_result: DrbResult) -> Result<()>;
152 async fn store_epoch_root(
154 &self,
155 epoch: TYPES::Epoch,
156 block_header: TYPES::BlockHeader,
157 ) -> Result<()>;
158 async fn load_drb_result(&self, epoch: TYPES::Epoch) -> Result<DrbResult> {
159 match self.load_drb_input(*epoch).await {
160 Ok(drb_input) => {
161 ensure!(drb_input.iteration == drb_input.difficulty_level);
162
163 Ok(drb_input.value)
164 },
165 Err(e) => Err(e),
166 }
167 }
168 async fn store_drb_input(&self, drb_input: DrbInput) -> Result<()>;
169 async fn load_drb_input(&self, _epoch: u64) -> Result<DrbInput>;
170}
171
172pub async fn load_drb_input_impl<TYPES: NodeType>(
173 storage: impl Storage<TYPES>,
174 epoch: u64,
175) -> Result<DrbInput> {
176 storage.load_drb_input(epoch).await
177}
178
179pub type LoadDrbProgressFn =
180 std::sync::Arc<dyn Fn(u64) -> BoxFuture<'static, Result<DrbInput>> + Send + Sync>;
181
182pub fn load_drb_progress_fn<TYPES: NodeType>(
183 storage: impl Storage<TYPES> + 'static,
184) -> LoadDrbProgressFn {
185 Arc::new(move |epoch| {
186 let storage = storage.clone();
187 Box::pin(load_drb_input_impl(storage, epoch))
188 })
189}
190
191pub fn null_load_drb_progress_fn() -> LoadDrbProgressFn {
192 Arc::new(move |_drb_input| {
193 Box::pin(async { Err(anyhow!("Using null implementation of load_drb_input")) })
194 })
195}
196
197pub async fn store_drb_input_impl<TYPES: NodeType>(
198 storage: impl Storage<TYPES>,
199 drb_input: DrbInput,
200) -> Result<()> {
201 storage.store_drb_input(drb_input).await
202}
203
204pub type StoreDrbProgressFn =
205 std::sync::Arc<dyn Fn(DrbInput) -> BoxFuture<'static, Result<()>> + Send + Sync>;
206
207pub fn store_drb_progress_fn<TYPES: NodeType>(
208 storage: impl Storage<TYPES> + 'static,
209) -> StoreDrbProgressFn {
210 Arc::new(move |drb_input| {
211 let storage = storage.clone();
212 Box::pin(store_drb_input_impl(storage, drb_input))
213 })
214}
215
216pub fn null_store_drb_progress_fn() -> StoreDrbProgressFn {
217 Arc::new(move |_drb_input| Box::pin(async { Ok(()) }))
218}
219
220pub type StoreDrbResultFn<TYPES> = Arc<
221 Box<
222 dyn Fn(<TYPES as NodeType>::Epoch, DrbResult) -> BoxFuture<'static, Result<()>>
223 + Send
224 + Sync
225 + 'static,
226 >,
227>;
228
229async fn store_drb_result_impl<TYPES: NodeType>(
230 storage: impl Storage<TYPES>,
231 epoch: TYPES::Epoch,
232 drb_result: DrbResult,
233) -> Result<()> {
234 storage.store_drb_result(epoch, drb_result).await
235}
236
237pub fn store_drb_result_fn<TYPES: NodeType>(
239 storage: impl Storage<TYPES> + 'static,
240) -> StoreDrbResultFn<TYPES> {
241 Arc::new(Box::new(move |epoch, drb_result| {
242 let st = storage.clone();
243 Box::pin(store_drb_result_impl(st, epoch, drb_result))
244 }))
245}