1use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use futures::future::{BoxFuture, FutureExt};
20use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
21
22use super::{
23 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
24 Storable,
25 header::{HeaderCallback, fetch_header_and_then},
26};
27use crate::{
28 Header, Payload, QueryResult,
29 availability::{
30 BlockId, BlockQueryData, PayloadMetadata, PayloadQueryData, QueryableHeader,
31 QueryablePayload,
32 },
33 data_source::{
34 VersionedDataSource,
35 storage::{
36 AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
37 pruning::PrunedHeightStorage,
38 },
39 },
40 fetching::{
41 self, Callback,
42 request::{self, PayloadRequest},
43 },
44 types::HeightIndexed,
45};
46pub(super) type PayloadFetcher<Types, S, P> =
47 fetching::Fetcher<request::PayloadRequest, PayloadCallback<Types, S, P>>;
48
49impl<Types> FetchRequest for BlockId<Types>
50where
51 Types: NodeType,
52{
53 fn might_exist(self, heights: Heights) -> bool {
54 if let BlockId::Number(n) = self {
55 heights.might_exist(n as u64)
56 } else {
57 true
58 }
59 }
60}
61
62#[async_trait]
63impl<Types> Fetchable<Types> for BlockQueryData<Types>
64where
65 Types: NodeType,
66 Header<Types>: QueryableHeader<Types>,
67 Payload<Types>: QueryablePayload<Types>,
68{
69 type Request = BlockId<Types>;
70
71 fn satisfies(&self, req: Self::Request) -> bool {
72 match req {
73 BlockId::Number(n) => self.height() == n as u64,
74 BlockId::Hash(h) => self.hash() == h,
75 BlockId::PayloadHash(h) => self.payload_hash() == h,
76 }
77 }
78
79 async fn passive_fetch(
80 notifiers: &Notifiers<Types>,
81 req: Self::Request,
82 ) -> BoxFuture<'static, Option<Self>> {
83 notifiers
84 .block
85 .wait_for(move |block| block.satisfies(req))
86 .await
87 .into_future()
88 .boxed()
89 }
90
91 async fn active_fetch<S, P>(
92 tx: &mut impl AvailabilityStorage<Types>,
93 fetcher: Arc<Fetcher<Types, S, P>>,
94 req: Self::Request,
95 ) -> anyhow::Result<()>
96 where
97 S: VersionedDataSource + 'static,
98 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
99 for<'a> S::ReadOnly<'a>:
100 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
101 P: AvailabilityProvider<Types>,
102 {
103 fetch_header_and_then(
104 tx,
105 req,
106 HeaderCallback::Payload {
107 fetcher: fetcher.clone(),
108 },
109 )
110 .await
111 }
112
113 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
114 where
115 S: AvailabilityStorage<Types>,
116 {
117 storage.get_block(req).await
118 }
119}
120
121#[async_trait]
122impl<Types> RangedFetchable<Types> for BlockQueryData<Types>
123where
124 Types: NodeType,
125 Header<Types>: QueryableHeader<Types>,
126 Payload<Types>: QueryablePayload<Types>,
127{
128 type RangedRequest = BlockId<Types>;
129
130 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
131 where
132 S: AvailabilityStorage<Types>,
133 R: RangeBounds<usize> + Send + 'static,
134 {
135 storage.get_block_range(range).await
136 }
137}
138
139impl<Types> Storable<Types> for BlockQueryData<Types>
140where
141 Types: NodeType,
142{
143 fn name() -> &'static str {
144 "block"
145 }
146
147 async fn notify(&self, notifiers: &Notifiers<Types>) {
148 notifiers.block.notify(self).await;
149 }
150
151 async fn store(
152 self,
153 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
154 leaf_only: bool,
155 ) -> anyhow::Result<()> {
156 if leaf_only {
157 return Ok(());
158 }
159
160 storage.insert_block(self).await
161 }
162}
163
164pub(super) fn fetch_block_with_header<Types, S, P>(
165 fetcher: Arc<Fetcher<Types, S, P>>,
166 header: Header<Types>,
167) where
168 Types: NodeType,
169 Header<Types>: QueryableHeader<Types>,
170 Payload<Types>: QueryablePayload<Types>,
171 S: VersionedDataSource + 'static,
172 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
173 P: AvailabilityProvider<Types>,
174{
175 let Some(payload_fetcher) = fetcher.payload_fetcher.as_ref() else {
176 return;
178 };
179
180 tracing::info!(
182 "spawned active fetch for payload {:?} (height {})",
183 header.payload_commitment(),
184 header.block_number()
185 );
186 payload_fetcher.spawn_fetch(
187 PayloadRequest(header.payload_commitment()),
188 fetcher.provider.clone(),
189 once(PayloadCallback {
190 header,
191 fetcher: fetcher.clone(),
192 }),
193 );
194}
195
196#[async_trait]
197impl<Types> Fetchable<Types> for PayloadQueryData<Types>
198where
199 Types: NodeType,
200 Header<Types>: QueryableHeader<Types>,
201 Payload<Types>: QueryablePayload<Types>,
202{
203 type Request = BlockId<Types>;
204
205 fn satisfies(&self, req: Self::Request) -> bool {
206 match req {
207 BlockId::Number(n) => self.height() == n as u64,
208 BlockId::Hash(h) => self.block_hash() == h,
209 BlockId::PayloadHash(h) => self.hash() == h,
210 }
211 }
212
213 async fn passive_fetch(
214 notifiers: &Notifiers<Types>,
215 req: Self::Request,
216 ) -> BoxFuture<'static, Option<Self>> {
217 notifiers
218 .block
219 .wait_for(move |block| block.satisfies(req))
220 .await
221 .into_future()
222 .map(|block| block.map(PayloadQueryData::from))
223 .boxed()
224 }
225
226 async fn active_fetch<S, P>(
227 tx: &mut impl AvailabilityStorage<Types>,
228 fetcher: Arc<Fetcher<Types, S, P>>,
229 req: Self::Request,
230 ) -> anyhow::Result<()>
231 where
232 S: VersionedDataSource + 'static,
233 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
234 for<'a> S::ReadOnly<'a>:
235 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
236 P: AvailabilityProvider<Types>,
237 {
238 BlockQueryData::active_fetch(tx, fetcher, req).await
242 }
243
244 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
245 where
246 S: AvailabilityStorage<Types>,
247 {
248 storage.get_payload(req).await
249 }
250}
251
252#[async_trait]
253impl<Types> RangedFetchable<Types> for PayloadQueryData<Types>
254where
255 Types: NodeType,
256 Header<Types>: QueryableHeader<Types>,
257 Payload<Types>: QueryablePayload<Types>,
258{
259 type RangedRequest = BlockId<Types>;
260
261 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
262 where
263 S: AvailabilityStorage<Types>,
264 R: RangeBounds<usize> + Send + 'static,
265 {
266 storage.get_payload_range(range).await
267 }
268}
269
270#[derive(Derivative)]
271#[derivative(Debug(bound = ""))]
272pub(super) struct PayloadCallback<Types: NodeType, S, P> {
273 header: Header<Types>,
274 #[derivative(Debug = "ignore")]
275 fetcher: Arc<Fetcher<Types, S, P>>,
276}
277
278impl<Types: NodeType, S, P> PartialEq for PayloadCallback<Types, S, P> {
279 fn eq(&self, other: &Self) -> bool {
280 self.cmp(other).is_eq()
281 }
282}
283
284impl<Types: NodeType, S, P> Eq for PayloadCallback<Types, S, P> {}
285
286impl<Types: NodeType, S, P> Ord for PayloadCallback<Types, S, P> {
287 fn cmp(&self, other: &Self) -> Ordering {
288 self.header.block_number().cmp(&other.header.block_number())
289 }
290}
291
292impl<Types: NodeType, S, P> PartialOrd for PayloadCallback<Types, S, P> {
293 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
294 Some(self.cmp(other))
295 }
296}
297
298impl<Types: NodeType, S, P> Callback<Payload<Types>> for PayloadCallback<Types, S, P>
299where
300 Header<Types>: QueryableHeader<Types>,
301 Payload<Types>: QueryablePayload<Types>,
302 S: 'static + VersionedDataSource,
303 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
304 P: AvailabilityProvider<Types>,
305{
306 async fn run(self, payload: Payload<Types>) {
307 tracing::info!("fetched payload {:?}", self.header.payload_commitment());
308 let block = BlockQueryData::new(self.header, payload);
309 self.fetcher.store_and_notify(&block).await;
310 }
311}
312
313#[async_trait]
314impl<Types> Fetchable<Types> for PayloadMetadata<Types>
315where
316 Types: NodeType,
317 Header<Types>: QueryableHeader<Types>,
318 Payload<Types>: QueryablePayload<Types>,
319{
320 type Request = BlockId<Types>;
321
322 fn satisfies(&self, req: Self::Request) -> bool {
323 match req {
324 BlockId::Number(n) => self.height == n as u64,
325 BlockId::Hash(h) => self.block_hash == h,
326 BlockId::PayloadHash(h) => self.hash == h,
327 }
328 }
329
330 async fn passive_fetch(
331 notifiers: &Notifiers<Types>,
332 req: Self::Request,
333 ) -> BoxFuture<'static, Option<Self>> {
334 notifiers
335 .block
336 .wait_for(move |block| block.satisfies(req))
337 .await
338 .into_future()
339 .map(|opt| opt.map(Self::from))
340 .boxed()
341 }
342
343 async fn active_fetch<S, P>(
344 tx: &mut impl AvailabilityStorage<Types>,
345 fetcher: Arc<Fetcher<Types, S, P>>,
346 req: Self::Request,
347 ) -> anyhow::Result<()>
348 where
349 S: VersionedDataSource + 'static,
350 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
351 for<'a> S::ReadOnly<'a>:
352 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
353 P: AvailabilityProvider<Types>,
354 {
355 BlockQueryData::active_fetch(tx, fetcher, req).await
358 }
359
360 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
361 where
362 S: AvailabilityStorage<Types>,
363 {
364 storage.get_payload_metadata(req).await
365 }
366}
367
368#[async_trait]
369impl<Types> RangedFetchable<Types> for PayloadMetadata<Types>
370where
371 Types: NodeType,
372 Header<Types>: QueryableHeader<Types>,
373 Payload<Types>: QueryablePayload<Types>,
374{
375 type RangedRequest = BlockId<Types>;
376
377 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
378 where
379 S: AvailabilityStorage<Types>,
380 R: RangeBounds<usize> + Send + 'static,
381 {
382 storage.get_payload_metadata_range(range).await
383 }
384}