1use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use derive_more::From;
22use futures::future::{BoxFuture, FutureExt};
23use hotshot_types::traits::node_implementation::NodeType;
24use tokio::spawn;
25use tracing::Instrument;
26
27use super::{
28 header::HeaderCallback, AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights,
29 Notifiers, RangedFetchable, Storable,
30};
31use crate::{
32 availability::{LeafId, LeafQueryData, QueryableHeader, QueryablePayload},
33 data_source::{
34 storage::{
35 pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
36 UpdateAvailabilityStorage,
37 },
38 VersionedDataSource,
39 },
40 fetching::{self, request, Callback},
41 types::HeightIndexed,
42 Header, Payload, QueryError, QueryResult,
43};
44
45pub(super) type LeafFetcher<Types, S, P> =
46 fetching::Fetcher<request::LeafRequest<Types>, LeafCallback<Types, S, P>>;
47
48impl<Types> FetchRequest for LeafId<Types>
49where
50 Types: NodeType,
51{
52 fn might_exist(self, heights: Heights) -> bool {
53 if let LeafId::Number(n) = self {
54 heights.might_exist(n as u64)
55 } else {
56 true
57 }
58 }
59}
60
61#[async_trait]
62impl<Types> Fetchable<Types> for LeafQueryData<Types>
63where
64 Types: NodeType,
65 Header<Types>: QueryableHeader<Types>,
66 Payload<Types>: QueryablePayload<Types>,
67{
68 type Request = LeafId<Types>;
69
70 fn satisfies(&self, req: Self::Request) -> bool {
71 match req {
72 LeafId::Number(n) => self.height() == n as u64,
73 LeafId::Hash(h) => self.hash() == h,
74 }
75 }
76
77 async fn passive_fetch(
78 notifiers: &Notifiers<Types>,
79 req: Self::Request,
80 ) -> BoxFuture<'static, Option<Self>> {
81 notifiers
82 .leaf
83 .wait_for(move |leaf| leaf.satisfies(req))
84 .await
85 .into_future()
86 .boxed()
87 }
88
89 async fn active_fetch<S, P>(
90 tx: &mut impl AvailabilityStorage<Types>,
91 fetcher: Arc<Fetcher<Types, S, P>>,
92 req: Self::Request,
93 ) -> anyhow::Result<()>
94 where
95 S: VersionedDataSource + 'static,
96 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
97 for<'a> S::ReadOnly<'a>:
98 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
99 P: AvailabilityProvider<Types>,
100 {
101 fetch_leaf_with_callbacks(tx, fetcher, req, None).await
102 }
103
104 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
105 where
106 S: AvailabilityStorage<Types>,
107 {
108 storage.get_leaf(req).await
109 }
110}
111
112pub(super) async fn fetch_leaf_with_callbacks<Types, S, P, I>(
113 tx: &mut impl AvailabilityStorage<Types>,
114 fetcher: Arc<Fetcher<Types, S, P>>,
115 req: LeafId<Types>,
116 callbacks: I,
117) -> anyhow::Result<()>
118where
119 Types: NodeType,
120 Header<Types>: QueryableHeader<Types>,
121 Payload<Types>: QueryablePayload<Types>,
122 S: VersionedDataSource + 'static,
123 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
124 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
125 P: AvailabilityProvider<Types>,
126 I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
127 I::IntoIter: Send,
128{
129 match req {
130 LeafId::Number(n) => {
131 let next = (n + 1) as u64;
134 let next = match tx.first_available_leaf(next).await {
135 Ok(leaf) if leaf.height() == next => leaf,
136 Ok(leaf) => {
137 tracing::debug!(
142 n,
143 fetching = leaf.height() - 1,
144 "do not have necessary leaf; trigger fetch of a later leaf"
145 );
146
147 let mut callbacks = vec![LeafCallback::Leaf {
148 fetcher: fetcher.clone(),
149 }];
150
151 if !fetcher.leaf_only {
152 callbacks.push(
153 HeaderCallback::Payload {
154 fetcher: fetcher.clone(),
155 }
156 .into(),
157 );
158 callbacks.push(
159 HeaderCallback::VidCommon {
160 fetcher: fetcher.clone(),
161 }
162 .into(),
163 );
164 }
165
166 fetcher.leaf_fetcher.clone().spawn_fetch(
167 request::LeafRequest::new(
168 leaf.height() - 1,
169 leaf.leaf().parent_commitment(),
170 leaf.leaf().justify_qc().commit(),
171 ),
172 fetcher.provider.clone(),
173 callbacks,
176 );
177 return Ok(());
178 },
179 Err(QueryError::Missing | QueryError::NotFound) => {
180 tracing::debug!(n, "not fetching leaf with unknown successor");
184 return Ok(());
185 },
186 Err(QueryError::Error { message }) => {
187 bail!("failed to fetch successor for leaf {n}: {message}");
190 },
191 };
192
193 let fetcher = fetcher.clone();
194 fetcher.leaf_fetcher.clone().spawn_fetch(
195 request::LeafRequest::new(
196 n as u64,
197 next.leaf().parent_commitment(),
198 next.leaf().justify_qc().commit(),
199 ),
200 fetcher.provider.clone(),
201 once(LeafCallback::Leaf { fetcher }).chain(callbacks),
202 );
203 },
204 LeafId::Hash(h) => {
205 tracing::debug!("not fetching unknown leaf {h}");
209 },
210 }
211
212 Ok(())
213}
214
215pub(super) fn trigger_fetch_for_parent<Types, S, P>(
225 fetcher: &Arc<Fetcher<Types, S, P>>,
226 leaf: &LeafQueryData<Types>,
227) where
228 Types: NodeType,
229 Header<Types>: QueryableHeader<Types>,
230 Payload<Types>: QueryablePayload<Types>,
231 S: VersionedDataSource + 'static,
232 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
233 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
234 P: AvailabilityProvider<Types>,
235{
236 let height = leaf.height();
237 let parent = leaf.leaf().parent_commitment();
238 let parent_qc = leaf.leaf().justify_qc().commit();
239
240 if height == 0 {
242 return;
243 }
244
245 let fetcher = fetcher.clone();
248 let span = tracing::info_span!("fetch parent leaf", height, %parent, %parent_qc);
249 spawn(
250 async move {
251 match fetcher.storage.read().await {
253 Ok(mut tx) => {
254 if let Ok(pruned_height) = tx.load_pruned_height().await {
256 if pruned_height.is_some_and(|ph| height <= ph) {
257 tracing::info!(
258 height,
259 ?pruned_height,
260 "not fetching pruned parent leaf"
261 );
262 return;
263 }
264 }
265
266 if tx.get_leaf(((height - 1) as usize).into()).await.is_ok() {
267 return;
268 }
269 },
270 Err(err) => {
271 tracing::warn!(
274 height,
275 %parent,
276 "error opening transaction to check for parent leaf: {err:#}",
277 );
278 },
279 }
280
281 tracing::info!(height, %parent, "received new leaf; fetching missing parent");
282 fetcher.leaf_fetcher.clone().spawn_fetch(
283 request::LeafRequest::new(height - 1, parent, parent_qc),
284 fetcher.provider.clone(),
285 [
288 LeafCallback::Leaf {
289 fetcher: fetcher.clone(),
290 },
291 HeaderCallback::Payload {
292 fetcher: fetcher.clone(),
293 }
294 .into(),
295 HeaderCallback::VidCommon {
296 fetcher: fetcher.clone(),
297 }
298 .into(),
299 ],
300 );
301 }
302 .instrument(span),
303 );
304}
305
306#[async_trait]
307impl<Types> RangedFetchable<Types> for LeafQueryData<Types>
308where
309 Types: NodeType,
310 Header<Types>: QueryableHeader<Types>,
311 Payload<Types>: QueryablePayload<Types>,
312{
313 type RangedRequest = LeafId<Types>;
314
315 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
316 where
317 S: AvailabilityStorage<Types>,
318 R: RangeBounds<usize> + Send + 'static,
319 {
320 storage.get_leaf_range(range).await
321 }
322}
323
324impl<Types> Storable<Types> for LeafQueryData<Types>
325where
326 Types: NodeType,
327{
328 fn name() -> &'static str {
329 "leaf"
330 }
331
332 async fn notify(&self, notifiers: &Notifiers<Types>) {
333 notifiers.leaf.notify(self).await;
334 }
335
336 async fn store(
337 self,
338 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
339 _leaf_only: bool,
340 ) -> anyhow::Result<()> {
341 storage.insert_leaf(self).await
342 }
343}
344
345#[derive(Derivative, From)]
346#[derivative(Debug(bound = ""))]
347pub(super) enum LeafCallback<Types: NodeType, S, P> {
348 #[from(ignore)]
350 Leaf {
351 #[derivative(Debug = "ignore")]
352 fetcher: Arc<Fetcher<Types, S, P>>,
353 },
354 Continuation {
356 callback: HeaderCallback<Types, S, P>,
357 },
358}
359
360impl<Types: NodeType, S, P> PartialEq for LeafCallback<Types, S, P> {
361 fn eq(&self, other: &Self) -> bool {
362 self.cmp(other).is_eq()
363 }
364}
365
366impl<Types: NodeType, S, P> Eq for LeafCallback<Types, S, P> {}
367
368impl<Types: NodeType, S, P> Ord for LeafCallback<Types, S, P> {
369 fn cmp(&self, other: &Self) -> Ordering {
370 match (self, other) {
371 (Self::Leaf { .. }, Self::Continuation { .. }) => Ordering::Less,
373 (Self::Continuation { .. }, Self::Leaf { .. }) => Ordering::Greater,
374
375 (Self::Continuation { callback: cb1 }, Self::Continuation { callback: cb2 }) => {
376 cb1.cmp(cb2)
377 },
378 _ => Ordering::Equal,
379 }
380 }
381}
382
383impl<Types: NodeType, S, P> PartialOrd for LeafCallback<Types, S, P> {
384 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
385 Some(self.cmp(other))
386 }
387}
388
389impl<Types: NodeType, S, P> Callback<LeafQueryData<Types>> for LeafCallback<Types, S, P>
390where
391 Header<Types>: QueryableHeader<Types>,
392 Payload<Types>: QueryablePayload<Types>,
393 S: VersionedDataSource + 'static,
394 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
395 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
396 P: AvailabilityProvider<Types>,
397{
398 async fn run(self, leaf: LeafQueryData<Types>) {
399 match self {
400 Self::Leaf { fetcher } => {
401 tracing::info!("fetched leaf {}", leaf.height());
402 trigger_fetch_for_parent(&fetcher, &leaf);
404 fetcher.store_and_notify(leaf).await;
405 },
406 Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()),
407 }
408 }
409}