1use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use derive_more::From;
22use futures::future::{BoxFuture, FutureExt};
23use hotshot_types::traits::node_implementation::NodeType;
24use tokio::spawn;
25use tracing::Instrument;
26
27use super::{
28 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
29 Storable, header::HeaderCallback,
30};
31use crate::{
32 Header, Payload, QueryError, QueryResult,
33 availability::{LeafId, LeafQueryData, QueryableHeader, QueryablePayload},
34 data_source::{
35 VersionedDataSource,
36 storage::{
37 AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
38 pruning::PrunedHeightStorage,
39 },
40 },
41 fetching::{self, Callback, request},
42 types::HeightIndexed,
43};
44
45pub(super) type LeafFetcher<Types, S, P> =
46 fetching::Fetcher<request::LeafRequest<Types>, LeafCallback<Types, S, P>>;
47
48impl<Types> FetchRequest for LeafId<Types>
49where
50 Types: NodeType,
51{
52 fn might_exist(self, heights: Heights) -> bool {
53 if let LeafId::Number(n) = self {
54 heights.might_exist(n as u64)
55 } else {
56 true
57 }
58 }
59}
60
61#[async_trait]
62impl<Types> Fetchable<Types> for LeafQueryData<Types>
63where
64 Types: NodeType,
65 Header<Types>: QueryableHeader<Types>,
66 Payload<Types>: QueryablePayload<Types>,
67{
68 type Request = LeafId<Types>;
69
70 fn satisfies(&self, req: Self::Request) -> bool {
71 match req {
72 LeafId::Number(n) => self.height() == n as u64,
73 LeafId::Hash(h) => self.hash() == h,
74 }
75 }
76
77 async fn passive_fetch(
78 notifiers: &Notifiers<Types>,
79 req: Self::Request,
80 ) -> BoxFuture<'static, Option<Self>> {
81 notifiers
82 .leaf
83 .wait_for(move |leaf| leaf.satisfies(req))
84 .await
85 .into_future()
86 .boxed()
87 }
88
89 async fn active_fetch<S, P>(
90 tx: &mut impl AvailabilityStorage<Types>,
91 fetcher: Arc<Fetcher<Types, S, P>>,
92 req: Self::Request,
93 ) -> anyhow::Result<()>
94 where
95 S: VersionedDataSource + 'static,
96 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
97 for<'a> S::ReadOnly<'a>:
98 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
99 P: AvailabilityProvider<Types>,
100 {
101 fetch_leaf_with_callbacks(tx, fetcher, req, None).await
102 }
103
104 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
105 where
106 S: AvailabilityStorage<Types>,
107 {
108 storage.get_leaf(req).await
109 }
110}
111
112pub(super) async fn fetch_leaf_with_callbacks<Types, S, P, I>(
113 tx: &mut impl AvailabilityStorage<Types>,
114 fetcher: Arc<Fetcher<Types, S, P>>,
115 req: LeafId<Types>,
116 callbacks: I,
117) -> anyhow::Result<()>
118where
119 Types: NodeType,
120 Header<Types>: QueryableHeader<Types>,
121 Payload<Types>: QueryablePayload<Types>,
122 S: VersionedDataSource + 'static,
123 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
124 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
125 P: AvailabilityProvider<Types>,
126 I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
127 I::IntoIter: Send,
128{
129 match req {
130 LeafId::Number(n) => {
131 let next = (n + 1) as u64;
134 let next = match tx.first_available_leaf(next).await {
135 Ok(leaf) if leaf.height() == next => leaf,
136 Ok(leaf) => {
137 tracing::debug!(
142 n,
143 fetching = leaf.height() - 1,
144 "do not have necessary leaf; trigger fetch of a later leaf"
145 );
146
147 let mut callbacks = vec![LeafCallback::Leaf {
148 fetcher: fetcher.clone(),
149 }];
150
151 if !fetcher.leaf_only {
152 callbacks.push(
153 HeaderCallback::Payload {
154 fetcher: fetcher.clone(),
155 }
156 .into(),
157 );
158 callbacks.push(
159 HeaderCallback::VidCommon {
160 fetcher: fetcher.clone(),
161 }
162 .into(),
163 );
164 }
165
166 fetcher.leaf_fetcher.clone().spawn_fetch(
167 request::LeafRequest::new(
168 leaf.height() - 1,
169 leaf.leaf().parent_commitment(),
170 leaf.leaf().justify_qc().commit(),
171 ),
172 fetcher.provider.clone(),
173 callbacks,
176 );
177 return Ok(());
178 },
179 Err(QueryError::Missing | QueryError::NotFound) => {
180 tracing::debug!(n, "not fetching leaf with unknown successor");
184 return Ok(());
185 },
186 Err(QueryError::Error { message }) => {
187 bail!("failed to fetch successor for leaf {n}: {message}");
190 },
191 };
192
193 let fetcher = fetcher.clone();
194 fetcher.leaf_fetcher.clone().spawn_fetch(
195 request::LeafRequest::new(
196 n as u64,
197 next.leaf().parent_commitment(),
198 next.leaf().justify_qc().commit(),
199 ),
200 fetcher.provider.clone(),
201 once(LeafCallback::Leaf { fetcher }).chain(callbacks),
202 );
203 },
204 LeafId::Hash(h) => {
205 tracing::debug!("not fetching unknown leaf {h}");
209 },
210 }
211
212 Ok(())
213}
214
215pub(super) fn trigger_fetch_for_parent<Types, S, P>(
225 fetcher: &Arc<Fetcher<Types, S, P>>,
226 leaf: &LeafQueryData<Types>,
227) where
228 Types: NodeType,
229 Header<Types>: QueryableHeader<Types>,
230 Payload<Types>: QueryablePayload<Types>,
231 S: VersionedDataSource + 'static,
232 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
233 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
234 P: AvailabilityProvider<Types>,
235{
236 let height = leaf.height();
237 let parent = leaf.leaf().parent_commitment();
238 let parent_qc = leaf.leaf().justify_qc().commit();
239
240 if height == 0 {
242 return;
243 }
244
245 let fetcher = fetcher.clone();
248 let span = tracing::info_span!("fetch parent leaf", height, %parent, %parent_qc);
249 spawn(
250 async move {
251 match fetcher.storage.read().await {
253 Ok(mut tx) => {
254 if let Ok(pruned_height) = tx.load_pruned_height().await
256 && pruned_height.is_some_and(|ph| height <= ph)
257 {
258 tracing::info!(height, ?pruned_height, "not fetching pruned parent leaf");
259 return;
260 }
261
262 if tx.get_leaf(((height - 1) as usize).into()).await.is_ok() {
263 return;
264 }
265 },
266 Err(err) => {
267 tracing::warn!(
270 height,
271 %parent,
272 "error opening transaction to check for parent leaf: {err:#}",
273 );
274 },
275 }
276
277 tracing::info!(height, %parent, "received new leaf; fetching missing parent");
278 fetcher.leaf_fetcher.clone().spawn_fetch(
279 request::LeafRequest::new(height - 1, parent, parent_qc),
280 fetcher.provider.clone(),
281 [
284 LeafCallback::Leaf {
285 fetcher: fetcher.clone(),
286 },
287 HeaderCallback::Payload {
288 fetcher: fetcher.clone(),
289 }
290 .into(),
291 HeaderCallback::VidCommon {
292 fetcher: fetcher.clone(),
293 }
294 .into(),
295 ],
296 );
297 }
298 .instrument(span),
299 );
300}
301
302#[async_trait]
303impl<Types> RangedFetchable<Types> for LeafQueryData<Types>
304where
305 Types: NodeType,
306 Header<Types>: QueryableHeader<Types>,
307 Payload<Types>: QueryablePayload<Types>,
308{
309 type RangedRequest = LeafId<Types>;
310
311 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
312 where
313 S: AvailabilityStorage<Types>,
314 R: RangeBounds<usize> + Send + 'static,
315 {
316 storage.get_leaf_range(range).await
317 }
318}
319
320impl<Types> Storable<Types> for LeafQueryData<Types>
321where
322 Types: NodeType,
323{
324 fn name() -> &'static str {
325 "leaf"
326 }
327
328 async fn notify(&self, notifiers: &Notifiers<Types>) {
329 notifiers.leaf.notify(self).await;
330 }
331
332 async fn store(
333 self,
334 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
335 _leaf_only: bool,
336 ) -> anyhow::Result<()> {
337 storage.insert_leaf(self).await
338 }
339}
340
341#[derive(Derivative, From)]
342#[derivative(Debug(bound = ""))]
343pub(super) enum LeafCallback<Types: NodeType, S, P> {
344 #[from(ignore)]
346 Leaf {
347 #[derivative(Debug = "ignore")]
348 fetcher: Arc<Fetcher<Types, S, P>>,
349 },
350 Continuation {
352 callback: HeaderCallback<Types, S, P>,
353 },
354}
355
356impl<Types: NodeType, S, P> PartialEq for LeafCallback<Types, S, P> {
357 fn eq(&self, other: &Self) -> bool {
358 self.cmp(other).is_eq()
359 }
360}
361
362impl<Types: NodeType, S, P> Eq for LeafCallback<Types, S, P> {}
363
364impl<Types: NodeType, S, P> Ord for LeafCallback<Types, S, P> {
365 fn cmp(&self, other: &Self) -> Ordering {
366 match (self, other) {
367 (Self::Leaf { .. }, Self::Continuation { .. }) => Ordering::Less,
369 (Self::Continuation { .. }, Self::Leaf { .. }) => Ordering::Greater,
370
371 (Self::Continuation { callback: cb1 }, Self::Continuation { callback: cb2 }) => {
372 cb1.cmp(cb2)
373 },
374 _ => Ordering::Equal,
375 }
376 }
377}
378
379impl<Types: NodeType, S, P> PartialOrd for LeafCallback<Types, S, P> {
380 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
381 Some(self.cmp(other))
382 }
383}
384
385impl<Types: NodeType, S, P> Callback<LeafQueryData<Types>> for LeafCallback<Types, S, P>
386where
387 Header<Types>: QueryableHeader<Types>,
388 Payload<Types>: QueryablePayload<Types>,
389 S: VersionedDataSource + 'static,
390 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
391 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
392 P: AvailabilityProvider<Types>,
393{
394 async fn run(self, leaf: LeafQueryData<Types>) {
395 match self {
396 Self::Leaf { fetcher } => {
397 tracing::info!("fetched leaf {}", leaf.height());
398 trigger_fetch_for_parent(&fetcher, &leaf);
400 fetcher.store_and_notify(&leaf).await;
401 },
402 Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()),
403 }
404 }
405}