1use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use futures::future::{BoxFuture, FutureExt};
20use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
21
22use super::{
23 header::{fetch_header_and_then, HeaderCallback},
24 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
25 Storable,
26};
27use crate::{
28 availability::{
29 BlockId, BlockQueryData, PayloadMetadata, PayloadQueryData, QueryableHeader,
30 QueryablePayload,
31 },
32 data_source::{
33 storage::{
34 pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
35 UpdateAvailabilityStorage,
36 },
37 VersionedDataSource,
38 },
39 fetching::{
40 self,
41 request::{self, PayloadRequest},
42 Callback,
43 },
44 types::HeightIndexed,
45 Header, Payload, QueryResult,
46};
47pub(super) type PayloadFetcher<Types, S, P> =
48 fetching::Fetcher<request::PayloadRequest, PayloadCallback<Types, S, P>>;
49
50impl<Types> FetchRequest for BlockId<Types>
51where
52 Types: NodeType,
53{
54 fn might_exist(self, heights: Heights) -> bool {
55 if let BlockId::Number(n) = self {
56 heights.might_exist(n as u64)
57 } else {
58 true
59 }
60 }
61}
62
63#[async_trait]
64impl<Types> Fetchable<Types> for BlockQueryData<Types>
65where
66 Types: NodeType,
67 Header<Types>: QueryableHeader<Types>,
68 Payload<Types>: QueryablePayload<Types>,
69{
70 type Request = BlockId<Types>;
71
72 fn satisfies(&self, req: Self::Request) -> bool {
73 match req {
74 BlockId::Number(n) => self.height() == n as u64,
75 BlockId::Hash(h) => self.hash() == h,
76 BlockId::PayloadHash(h) => self.payload_hash() == h,
77 }
78 }
79
80 async fn passive_fetch(
81 notifiers: &Notifiers<Types>,
82 req: Self::Request,
83 ) -> BoxFuture<'static, Option<Self>> {
84 notifiers
85 .block
86 .wait_for(move |block| block.satisfies(req))
87 .await
88 .into_future()
89 .boxed()
90 }
91
92 async fn active_fetch<S, P>(
93 tx: &mut impl AvailabilityStorage<Types>,
94 fetcher: Arc<Fetcher<Types, S, P>>,
95 req: Self::Request,
96 ) -> anyhow::Result<()>
97 where
98 S: VersionedDataSource + 'static,
99 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
100 for<'a> S::ReadOnly<'a>:
101 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
102 P: AvailabilityProvider<Types>,
103 {
104 fetch_header_and_then(
105 tx,
106 req,
107 HeaderCallback::Payload {
108 fetcher: fetcher.clone(),
109 },
110 )
111 .await
112 }
113
114 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
115 where
116 S: AvailabilityStorage<Types>,
117 {
118 storage.get_block(req).await
119 }
120}
121
122#[async_trait]
123impl<Types> RangedFetchable<Types> for BlockQueryData<Types>
124where
125 Types: NodeType,
126 Header<Types>: QueryableHeader<Types>,
127 Payload<Types>: QueryablePayload<Types>,
128{
129 type RangedRequest = BlockId<Types>;
130
131 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
132 where
133 S: AvailabilityStorage<Types>,
134 R: RangeBounds<usize> + Send + 'static,
135 {
136 storage.get_block_range(range).await
137 }
138}
139
140impl<Types> Storable<Types> for BlockQueryData<Types>
141where
142 Types: NodeType,
143{
144 fn name() -> &'static str {
145 "block"
146 }
147
148 async fn notify(&self, notifiers: &Notifiers<Types>) {
149 notifiers.block.notify(self).await;
150 }
151
152 async fn store(
153 self,
154 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
155 leaf_only: bool,
156 ) -> anyhow::Result<()> {
157 if leaf_only {
158 return Ok(());
159 }
160
161 storage.insert_block(self).await
162 }
163}
164
165pub(super) fn fetch_block_with_header<Types, S, P>(
166 fetcher: Arc<Fetcher<Types, S, P>>,
167 header: Header<Types>,
168) where
169 Types: NodeType,
170 Header<Types>: QueryableHeader<Types>,
171 Payload<Types>: QueryablePayload<Types>,
172 S: VersionedDataSource + 'static,
173 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
174 P: AvailabilityProvider<Types>,
175{
176 let Some(payload_fetcher) = fetcher.payload_fetcher.as_ref() else {
177 return;
179 };
180
181 tracing::info!(
183 "spawned active fetch for payload {:?} (height {})",
184 header.payload_commitment(),
185 header.block_number()
186 );
187 payload_fetcher.spawn_fetch(
188 PayloadRequest(header.payload_commitment()),
189 fetcher.provider.clone(),
190 once(PayloadCallback {
191 header,
192 fetcher: fetcher.clone(),
193 }),
194 );
195}
196
197#[async_trait]
198impl<Types> Fetchable<Types> for PayloadQueryData<Types>
199where
200 Types: NodeType,
201 Header<Types>: QueryableHeader<Types>,
202 Payload<Types>: QueryablePayload<Types>,
203{
204 type Request = BlockId<Types>;
205
206 fn satisfies(&self, req: Self::Request) -> bool {
207 match req {
208 BlockId::Number(n) => self.height() == n as u64,
209 BlockId::Hash(h) => self.block_hash() == h,
210 BlockId::PayloadHash(h) => self.hash() == h,
211 }
212 }
213
214 async fn passive_fetch(
215 notifiers: &Notifiers<Types>,
216 req: Self::Request,
217 ) -> BoxFuture<'static, Option<Self>> {
218 notifiers
219 .block
220 .wait_for(move |block| block.satisfies(req))
221 .await
222 .into_future()
223 .map(|block| block.map(PayloadQueryData::from))
224 .boxed()
225 }
226
227 async fn active_fetch<S, P>(
228 tx: &mut impl AvailabilityStorage<Types>,
229 fetcher: Arc<Fetcher<Types, S, P>>,
230 req: Self::Request,
231 ) -> anyhow::Result<()>
232 where
233 S: VersionedDataSource + 'static,
234 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
235 for<'a> S::ReadOnly<'a>:
236 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
237 P: AvailabilityProvider<Types>,
238 {
239 BlockQueryData::active_fetch(tx, fetcher, req).await
243 }
244
245 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
246 where
247 S: AvailabilityStorage<Types>,
248 {
249 storage.get_payload(req).await
250 }
251}
252
253#[async_trait]
254impl<Types> RangedFetchable<Types> for PayloadQueryData<Types>
255where
256 Types: NodeType,
257 Header<Types>: QueryableHeader<Types>,
258 Payload<Types>: QueryablePayload<Types>,
259{
260 type RangedRequest = BlockId<Types>;
261
262 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
263 where
264 S: AvailabilityStorage<Types>,
265 R: RangeBounds<usize> + Send + 'static,
266 {
267 storage.get_payload_range(range).await
268 }
269}
270
271#[derive(Derivative)]
272#[derivative(Debug(bound = ""))]
273pub(super) struct PayloadCallback<Types: NodeType, S, P> {
274 header: Header<Types>,
275 #[derivative(Debug = "ignore")]
276 fetcher: Arc<Fetcher<Types, S, P>>,
277}
278
279impl<Types: NodeType, S, P> PartialEq for PayloadCallback<Types, S, P> {
280 fn eq(&self, other: &Self) -> bool {
281 self.cmp(other).is_eq()
282 }
283}
284
285impl<Types: NodeType, S, P> Eq for PayloadCallback<Types, S, P> {}
286
287impl<Types: NodeType, S, P> Ord for PayloadCallback<Types, S, P> {
288 fn cmp(&self, other: &Self) -> Ordering {
289 self.header.block_number().cmp(&other.header.block_number())
290 }
291}
292
293impl<Types: NodeType, S, P> PartialOrd for PayloadCallback<Types, S, P> {
294 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
295 Some(self.cmp(other))
296 }
297}
298
299impl<Types: NodeType, S, P> Callback<Payload<Types>> for PayloadCallback<Types, S, P>
300where
301 Header<Types>: QueryableHeader<Types>,
302 Payload<Types>: QueryablePayload<Types>,
303 S: 'static + VersionedDataSource,
304 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
305 P: AvailabilityProvider<Types>,
306{
307 async fn run(self, payload: Payload<Types>) {
308 tracing::info!("fetched payload {:?}", self.header.payload_commitment());
309 let block = BlockQueryData::new(self.header, payload);
310 self.fetcher.store_and_notify(block).await;
311 }
312}
313
314#[async_trait]
315impl<Types> Fetchable<Types> for PayloadMetadata<Types>
316where
317 Types: NodeType,
318 Header<Types>: QueryableHeader<Types>,
319 Payload<Types>: QueryablePayload<Types>,
320{
321 type Request = BlockId<Types>;
322
323 fn satisfies(&self, req: Self::Request) -> bool {
324 match req {
325 BlockId::Number(n) => self.height == n as u64,
326 BlockId::Hash(h) => self.block_hash == h,
327 BlockId::PayloadHash(h) => self.hash == h,
328 }
329 }
330
331 async fn passive_fetch(
332 notifiers: &Notifiers<Types>,
333 req: Self::Request,
334 ) -> BoxFuture<'static, Option<Self>> {
335 notifiers
336 .block
337 .wait_for(move |block| block.satisfies(req))
338 .await
339 .into_future()
340 .map(|opt| opt.map(Self::from))
341 .boxed()
342 }
343
344 async fn active_fetch<S, P>(
345 tx: &mut impl AvailabilityStorage<Types>,
346 fetcher: Arc<Fetcher<Types, S, P>>,
347 req: Self::Request,
348 ) -> anyhow::Result<()>
349 where
350 S: VersionedDataSource + 'static,
351 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
352 for<'a> S::ReadOnly<'a>:
353 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
354 P: AvailabilityProvider<Types>,
355 {
356 BlockQueryData::active_fetch(tx, fetcher, req).await
359 }
360
361 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
362 where
363 S: AvailabilityStorage<Types>,
364 {
365 storage.get_payload_metadata(req).await
366 }
367}
368
369#[async_trait]
370impl<Types> RangedFetchable<Types> for PayloadMetadata<Types>
371where
372 Types: NodeType,
373 Header<Types>: QueryableHeader<Types>,
374 Payload<Types>: QueryablePayload<Types>,
375{
376 type RangedRequest = BlockId<Types>;
377
378 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
379 where
380 S: AvailabilityStorage<Types>,
381 R: RangeBounds<usize> + Send + 'static,
382 {
383 storage.get_payload_metadata_range(range).await
384 }
385}