1use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use derive_more::From;
20use futures::future::{BoxFuture, FutureExt};
21use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
22
23use super::{
24 header::{fetch_header_and_then, HeaderCallback},
25 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
26 Storable,
27};
28use crate::{
29 availability::{
30 BlockId, BlockQueryData, PayloadMetadata, PayloadQueryData, QueryableHeader,
31 QueryablePayload,
32 },
33 data_source::{
34 storage::{
35 pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
36 UpdateAvailabilityStorage,
37 },
38 VersionedDataSource,
39 },
40 fetching::{
41 self,
42 request::{self, PayloadRequest},
43 Callback,
44 },
45 types::HeightIndexed,
46 Header, Payload, QueryResult,
47};
48pub(super) type PayloadFetcher<Types, S, P> =
49 fetching::Fetcher<request::PayloadRequest, PayloadCallback<Types, S, P>>;
50
51impl<Types> FetchRequest for BlockId<Types>
52where
53 Types: NodeType,
54{
55 fn might_exist(self, heights: Heights) -> bool {
56 if let BlockId::Number(n) = self {
57 heights.might_exist(n as u64)
58 } else {
59 true
60 }
61 }
62}
63
64#[async_trait]
65impl<Types> Fetchable<Types> for BlockQueryData<Types>
66where
67 Types: NodeType,
68 Header<Types>: QueryableHeader<Types>,
69 Payload<Types>: QueryablePayload<Types>,
70{
71 type Request = BlockId<Types>;
72
73 fn satisfies(&self, req: Self::Request) -> bool {
74 match req {
75 BlockId::Number(n) => self.height() == n as u64,
76 BlockId::Hash(h) => self.hash() == h,
77 BlockId::PayloadHash(h) => self.payload_hash() == h,
78 }
79 }
80
81 async fn passive_fetch(
82 notifiers: &Notifiers<Types>,
83 req: Self::Request,
84 ) -> BoxFuture<'static, Option<Self>> {
85 notifiers
86 .block
87 .wait_for(move |block| block.satisfies(req))
88 .await
89 .into_future()
90 .boxed()
91 }
92
93 async fn active_fetch<S, P>(
94 tx: &mut impl AvailabilityStorage<Types>,
95 fetcher: Arc<Fetcher<Types, S, P>>,
96 req: Self::Request,
97 ) -> anyhow::Result<()>
98 where
99 S: VersionedDataSource + 'static,
100 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
101 for<'a> S::ReadOnly<'a>:
102 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
103 P: AvailabilityProvider<Types>,
104 {
105 fetch_header_and_then(
106 tx,
107 req,
108 HeaderCallback::Payload {
109 fetcher: fetcher.clone(),
110 },
111 )
112 .await
113 }
114
115 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
116 where
117 S: AvailabilityStorage<Types>,
118 {
119 storage.get_block(req).await
120 }
121}
122
123#[async_trait]
124impl<Types> RangedFetchable<Types> for BlockQueryData<Types>
125where
126 Types: NodeType,
127 Header<Types>: QueryableHeader<Types>,
128 Payload<Types>: QueryablePayload<Types>,
129{
130 type RangedRequest = BlockId<Types>;
131
132 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
133 where
134 S: AvailabilityStorage<Types>,
135 R: RangeBounds<usize> + Send + 'static,
136 {
137 storage.get_block_range(range).await
138 }
139}
140
141impl<Types> Storable<Types> for BlockQueryData<Types>
142where
143 Types: NodeType,
144{
145 fn name() -> &'static str {
146 "block"
147 }
148
149 async fn notify(&self, notifiers: &Notifiers<Types>) {
150 notifiers.block.notify(self).await;
151 }
152
153 async fn store(
154 self,
155 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
156 leaf_only: bool,
157 ) -> anyhow::Result<()> {
158 if leaf_only {
159 return Ok(());
160 }
161
162 storage.insert_block(self).await
163 }
164}
165
166pub(super) fn fetch_block_with_header<Types, S, P>(
167 fetcher: Arc<Fetcher<Types, S, P>>,
168 header: Header<Types>,
169) where
170 Types: NodeType,
171 Header<Types>: QueryableHeader<Types>,
172 Payload<Types>: QueryablePayload<Types>,
173 S: VersionedDataSource + 'static,
174 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
175 P: AvailabilityProvider<Types>,
176{
177 let Some(payload_fetcher) = fetcher.payload_fetcher.as_ref() else {
178 return;
180 };
181
182 tracing::info!(
184 "spawned active fetch for payload {:?} (height {})",
185 header.payload_commitment(),
186 header.block_number()
187 );
188 payload_fetcher.spawn_fetch(
189 PayloadRequest(header.payload_commitment()),
190 fetcher.provider.clone(),
191 once(PayloadCallback {
192 header,
193 fetcher: fetcher.clone(),
194 }),
195 );
196}
197
198#[async_trait]
199impl<Types> Fetchable<Types> for PayloadQueryData<Types>
200where
201 Types: NodeType,
202 Header<Types>: QueryableHeader<Types>,
203 Payload<Types>: QueryablePayload<Types>,
204{
205 type Request = BlockId<Types>;
206
207 fn satisfies(&self, req: Self::Request) -> bool {
208 match req {
209 BlockId::Number(n) => self.height() == n as u64,
210 BlockId::Hash(h) => self.block_hash() == h,
211 BlockId::PayloadHash(h) => self.hash() == h,
212 }
213 }
214
215 async fn passive_fetch(
216 notifiers: &Notifiers<Types>,
217 req: Self::Request,
218 ) -> BoxFuture<'static, Option<Self>> {
219 notifiers
220 .block
221 .wait_for(move |block| block.satisfies(req))
222 .await
223 .into_future()
224 .map(|block| block.map(PayloadQueryData::from))
225 .boxed()
226 }
227
228 async fn active_fetch<S, P>(
229 tx: &mut impl AvailabilityStorage<Types>,
230 fetcher: Arc<Fetcher<Types, S, P>>,
231 req: Self::Request,
232 ) -> anyhow::Result<()>
233 where
234 S: VersionedDataSource + 'static,
235 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
236 for<'a> S::ReadOnly<'a>:
237 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
238 P: AvailabilityProvider<Types>,
239 {
240 BlockQueryData::active_fetch(tx, fetcher, req).await
244 }
245
246 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
247 where
248 S: AvailabilityStorage<Types>,
249 {
250 storage.get_payload(req).await
251 }
252}
253
254#[async_trait]
255impl<Types> RangedFetchable<Types> for PayloadQueryData<Types>
256where
257 Types: NodeType,
258 Header<Types>: QueryableHeader<Types>,
259 Payload<Types>: QueryablePayload<Types>,
260{
261 type RangedRequest = BlockId<Types>;
262
263 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
264 where
265 S: AvailabilityStorage<Types>,
266 R: RangeBounds<usize> + Send + 'static,
267 {
268 storage.get_payload_range(range).await
269 }
270}
271
272#[derive(Derivative)]
273#[derivative(Debug(bound = ""))]
274pub(super) struct PayloadCallback<Types: NodeType, S, P> {
275 header: Header<Types>,
276 #[derivative(Debug = "ignore")]
277 fetcher: Arc<Fetcher<Types, S, P>>,
278}
279
280impl<Types: NodeType, S, P> PartialEq for PayloadCallback<Types, S, P> {
281 fn eq(&self, other: &Self) -> bool {
282 self.cmp(other).is_eq()
283 }
284}
285
286impl<Types: NodeType, S, P> Eq for PayloadCallback<Types, S, P> {}
287
288impl<Types: NodeType, S, P> Ord for PayloadCallback<Types, S, P> {
289 fn cmp(&self, other: &Self) -> Ordering {
290 self.header.block_number().cmp(&other.header.block_number())
291 }
292}
293
294impl<Types: NodeType, S, P> PartialOrd for PayloadCallback<Types, S, P> {
295 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
296 Some(self.cmp(other))
297 }
298}
299
300impl<Types: NodeType, S, P> Callback<Payload<Types>> for PayloadCallback<Types, S, P>
301where
302 Header<Types>: QueryableHeader<Types>,
303 Payload<Types>: QueryablePayload<Types>,
304 S: 'static + VersionedDataSource,
305 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
306 P: AvailabilityProvider<Types>,
307{
308 async fn run(self, payload: Payload<Types>) {
309 tracing::info!("fetched payload {:?}", self.header.payload_commitment());
310 let block = BlockQueryData::new(self.header, payload);
311 self.fetcher.store_and_notify(block).await;
312 }
313}
314
315#[async_trait]
316impl<Types> Fetchable<Types> for PayloadMetadata<Types>
317where
318 Types: NodeType,
319 Header<Types>: QueryableHeader<Types>,
320 Payload<Types>: QueryablePayload<Types>,
321{
322 type Request = BlockId<Types>;
323
324 fn satisfies(&self, req: Self::Request) -> bool {
325 match req {
326 BlockId::Number(n) => self.height == n as u64,
327 BlockId::Hash(h) => self.block_hash == h,
328 BlockId::PayloadHash(h) => self.hash == h,
329 }
330 }
331
332 async fn passive_fetch(
333 notifiers: &Notifiers<Types>,
334 req: Self::Request,
335 ) -> BoxFuture<'static, Option<Self>> {
336 notifiers
337 .block
338 .wait_for(move |block| block.satisfies(req))
339 .await
340 .into_future()
341 .map(|opt| opt.map(Self::from))
342 .boxed()
343 }
344
345 async fn active_fetch<S, P>(
346 tx: &mut impl AvailabilityStorage<Types>,
347 fetcher: Arc<Fetcher<Types, S, P>>,
348 req: Self::Request,
349 ) -> anyhow::Result<()>
350 where
351 S: VersionedDataSource + 'static,
352 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
353 for<'a> S::ReadOnly<'a>:
354 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
355 P: AvailabilityProvider<Types>,
356 {
357 BlockQueryData::active_fetch(tx, fetcher, req).await
360 }
361
362 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
363 where
364 S: AvailabilityStorage<Types>,
365 {
366 storage.get_payload_metadata(req).await
367 }
368}
369
370#[async_trait]
371impl<Types> RangedFetchable<Types> for PayloadMetadata<Types>
372where
373 Types: NodeType,
374 Header<Types>: QueryableHeader<Types>,
375 Payload<Types>: QueryablePayload<Types>,
376{
377 type RangedRequest = BlockId<Types>;
378
379 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
380 where
381 S: AvailabilityStorage<Types>,
382 R: RangeBounds<usize> + Send + 'static,
383 {
384 storage.get_payload_metadata_range(range).await
385 }
386}