1use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use derive_more::From;
20use futures::future::{BoxFuture, FutureExt};
21use hotshot_types::{
22 data::VidShare,
23 traits::{block_contents::BlockHeader, node_implementation::NodeType},
24};
25
26use super::{
27 header::{fetch_header_and_then, HeaderCallback},
28 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
29 Storable,
30};
31use crate::{
32 availability::{
33 BlockId, QueryableHeader, QueryablePayload, VidCommonMetadata, VidCommonQueryData,
34 },
35 data_source::{
36 storage::{
37 pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
38 UpdateAvailabilityStorage,
39 },
40 VersionedDataSource,
41 },
42 fetching::{self, request, Callback},
43 types::HeightIndexed,
44 Header, Payload, QueryResult, VidCommon,
45};
46
47pub(super) type VidCommonFetcher<Types, S, P> =
48 fetching::Fetcher<request::VidCommonRequest, VidCommonCallback<Types, S, P>>;
49
50#[derive(Clone, Copy, Debug, From)]
51pub(super) struct VidCommonRequest<Types: NodeType>(BlockId<Types>);
52
53impl<Types: NodeType> From<usize> for VidCommonRequest<Types> {
54 fn from(n: usize) -> Self {
55 Self(n.into())
56 }
57}
58
59impl<Types> FetchRequest for VidCommonRequest<Types>
60where
61 Types: NodeType,
62{
63 fn might_exist(self, heights: Heights) -> bool {
64 self.0.might_exist(heights)
65 }
66}
67
68#[async_trait]
69impl<Types> Fetchable<Types> for VidCommonQueryData<Types>
70where
71 Types: NodeType,
72 Header<Types>: QueryableHeader<Types>,
73 Payload<Types>: QueryablePayload<Types>,
74{
75 type Request = VidCommonRequest<Types>;
76
77 fn satisfies(&self, req: Self::Request) -> bool {
78 match req.0 {
79 BlockId::Number(n) => self.height() == n as u64,
80 BlockId::Hash(h) => self.block_hash() == h,
81 BlockId::PayloadHash(h) => self.payload_hash() == h,
82 }
83 }
84
85 async fn passive_fetch(
86 notifiers: &Notifiers<Types>,
87 req: Self::Request,
88 ) -> BoxFuture<'static, Option<Self>> {
89 notifiers
90 .vid_common
91 .wait_for(move |vid| vid.satisfies(req))
92 .await
93 .into_future()
94 .boxed()
95 }
96
97 async fn active_fetch<S, P>(
98 tx: &mut impl AvailabilityStorage<Types>,
99 fetcher: Arc<Fetcher<Types, S, P>>,
100 req: Self::Request,
101 ) -> anyhow::Result<()>
102 where
103 S: VersionedDataSource + 'static,
104 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
105 for<'a> S::ReadOnly<'a>:
106 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
107 P: AvailabilityProvider<Types>,
108 {
109 fetch_header_and_then(
110 tx,
111 req.0,
112 HeaderCallback::VidCommon {
113 fetcher: fetcher.clone(),
114 },
115 )
116 .await
117 }
118
119 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
120 where
121 S: AvailabilityStorage<Types>,
122 {
123 storage.get_vid_common(req.0).await
124 }
125}
126
127#[async_trait]
128impl<Types> RangedFetchable<Types> for VidCommonQueryData<Types>
129where
130 Types: NodeType,
131 Header<Types>: QueryableHeader<Types>,
132 Payload<Types>: QueryablePayload<Types>,
133{
134 type RangedRequest = VidCommonRequest<Types>;
135
136 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
137 where
138 S: AvailabilityStorage<Types>,
139 R: RangeBounds<usize> + Send + 'static,
140 {
141 storage.get_vid_common_range(range).await
142 }
143}
144
145impl<Types> Storable<Types> for VidCommonQueryData<Types>
146where
147 Types: NodeType,
148{
149 fn name() -> &'static str {
150 "VID common"
151 }
152
153 async fn notify(&self, notifiers: &Notifiers<Types>) {
154 notifiers.vid_common.notify(self).await;
155 }
156
157 async fn store(
158 self,
159 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
160 _leaf_only: bool,
161 ) -> anyhow::Result<()> {
162 storage.insert_vid(self, None).await
163 }
164}
165
166impl<Types> Storable<Types> for (VidCommonQueryData<Types>, Option<VidShare>)
167where
168 Types: NodeType,
169{
170 fn name() -> &'static str {
171 "VID data"
172 }
173
174 async fn notify(&self, notifiers: &Notifiers<Types>) {
175 notifiers.vid_common.notify(&self.0).await;
176 }
177
178 async fn store(
179 self,
180 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
181 _leaf_only: bool,
182 ) -> anyhow::Result<()> {
183 storage.insert_vid(self.0, self.1).await
184 }
185}
186
187pub(super) fn fetch_vid_common_with_header<Types, S, P>(
188 fetcher: Arc<Fetcher<Types, S, P>>,
189 header: Header<Types>,
190) where
191 Types: NodeType,
192 Header<Types>: QueryableHeader<Types>,
193 Payload<Types>: QueryablePayload<Types>,
194 S: VersionedDataSource + 'static,
195 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
196 P: AvailabilityProvider<Types>,
197{
198 let Some(vid_fetcher) = fetcher.vid_common_fetcher.as_ref() else {
199 tracing::info!("not fetching vid because of leaf only mode");
200 return;
201 };
202
203 tracing::info!(
205 "spawned active fetch for VID common {:?} (height {})",
206 header.payload_commitment(),
207 header.block_number()
208 );
209 vid_fetcher.spawn_fetch(
210 request::VidCommonRequest(header.payload_commitment()),
211 fetcher.provider.clone(),
212 once(VidCommonCallback {
213 header,
214 fetcher: fetcher.clone(),
215 }),
216 );
217}
218
219#[derive(Derivative)]
220#[derivative(Debug(bound = ""))]
221pub(super) struct VidCommonCallback<Types: NodeType, S, P> {
222 header: Header<Types>,
223 #[derivative(Debug = "ignore")]
224 fetcher: Arc<Fetcher<Types, S, P>>,
225}
226
227impl<Types: NodeType, S, P> PartialEq for VidCommonCallback<Types, S, P> {
228 fn eq(&self, other: &Self) -> bool {
229 self.cmp(other).is_eq()
230 }
231}
232
233impl<Types: NodeType, S, P> Eq for VidCommonCallback<Types, S, P> {}
234
235impl<Types: NodeType, S, P> Ord for VidCommonCallback<Types, S, P> {
236 fn cmp(&self, other: &Self) -> Ordering {
237 self.header.block_number().cmp(&other.header.block_number())
238 }
239}
240
241impl<Types: NodeType, S, P> PartialOrd for VidCommonCallback<Types, S, P> {
242 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
243 Some(self.cmp(other))
244 }
245}
246
247impl<Types: NodeType, S, P> Callback<VidCommon> for VidCommonCallback<Types, S, P>
248where
249 Header<Types>: QueryableHeader<Types>,
250 Payload<Types>: QueryablePayload<Types>,
251 S: VersionedDataSource + 'static,
252 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
253 P: AvailabilityProvider<Types>,
254{
255 async fn run(self, common: VidCommon) {
256 let common = VidCommonQueryData::new(self.header, common);
257 self.fetcher.store_and_notify(common).await;
258 }
259}
260
261#[async_trait]
262impl<Types> Fetchable<Types> for VidCommonMetadata<Types>
263where
264 Types: NodeType,
265 Header<Types>: QueryableHeader<Types>,
266 Payload<Types>: QueryablePayload<Types>,
267{
268 type Request = VidCommonRequest<Types>;
269
270 fn satisfies(&self, req: Self::Request) -> bool {
271 match req.0 {
272 BlockId::Number(n) => self.height == n as u64,
273 BlockId::Hash(h) => self.block_hash == h,
274 BlockId::PayloadHash(h) => self.payload_hash == h,
275 }
276 }
277
278 async fn passive_fetch(
279 notifiers: &Notifiers<Types>,
280 req: Self::Request,
281 ) -> BoxFuture<'static, Option<Self>> {
282 notifiers
283 .vid_common
284 .wait_for(move |vid| vid.satisfies(req))
285 .await
286 .into_future()
287 .map(|opt| opt.map(Self::from))
288 .boxed()
289 }
290
291 async fn active_fetch<S, P>(
292 tx: &mut impl AvailabilityStorage<Types>,
293 fetcher: Arc<Fetcher<Types, S, P>>,
294 req: Self::Request,
295 ) -> anyhow::Result<()>
296 where
297 S: VersionedDataSource + 'static,
298 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
299 for<'a> S::ReadOnly<'a>:
300 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
301 P: AvailabilityProvider<Types>,
302 {
303 if fetcher.leaf_only {
305 return Ok(());
306 }
307 VidCommonQueryData::active_fetch(tx, fetcher, req).await
310 }
311
312 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
313 where
314 S: AvailabilityStorage<Types>,
315 {
316 storage.get_vid_common_metadata(req.0).await
317 }
318}
319
320#[async_trait]
321impl<Types> RangedFetchable<Types> for VidCommonMetadata<Types>
322where
323 Types: NodeType,
324 Header<Types>: QueryableHeader<Types>,
325 Payload<Types>: QueryablePayload<Types>,
326{
327 type RangedRequest = VidCommonRequest<Types>;
328
329 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
330 where
331 S: AvailabilityStorage<Types>,
332 R: RangeBounds<usize> + Send + 'static,
333 {
334 storage.get_vid_common_metadata_range(range).await
335 }
336}