hotshot_query_service/data_source/fetching/
leaf.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`LeafQueryData`].
14
15use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use derive_more::From;
22use futures::future::{BoxFuture, FutureExt};
23use hotshot_types::traits::node_implementation::NodeType;
24use tokio::spawn;
25use tracing::Instrument;
26
27use super::{
28    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
29    Storable, header::HeaderCallback,
30};
31use crate::{
32    Header, Payload, QueryError, QueryResult,
33    availability::{LeafId, LeafQueryData, QueryableHeader, QueryablePayload},
34    data_source::{
35        VersionedDataSource,
36        storage::{
37            AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
38            pruning::PrunedHeightStorage,
39        },
40    },
41    fetching::{self, Callback, request},
42    types::HeightIndexed,
43};
44
45pub(super) type LeafFetcher<Types, S, P> =
46    fetching::Fetcher<request::LeafRequest<Types>, LeafCallback<Types, S, P>>;
47
48impl<Types> FetchRequest for LeafId<Types>
49where
50    Types: NodeType,
51{
52    fn might_exist(self, heights: Heights) -> bool {
53        if let LeafId::Number(n) = self {
54            heights.might_exist(n as u64)
55        } else {
56            true
57        }
58    }
59}
60
61#[async_trait]
62impl<Types> Fetchable<Types> for LeafQueryData<Types>
63where
64    Types: NodeType,
65    Header<Types>: QueryableHeader<Types>,
66    Payload<Types>: QueryablePayload<Types>,
67{
68    type Request = LeafId<Types>;
69
70    fn satisfies(&self, req: Self::Request) -> bool {
71        match req {
72            LeafId::Number(n) => self.height() == n as u64,
73            LeafId::Hash(h) => self.hash() == h,
74        }
75    }
76
77    async fn passive_fetch(
78        notifiers: &Notifiers<Types>,
79        req: Self::Request,
80    ) -> BoxFuture<'static, Option<Self>> {
81        notifiers
82            .leaf
83            .wait_for(move |leaf| leaf.satisfies(req))
84            .await
85            .into_future()
86            .boxed()
87    }
88
89    async fn active_fetch<S, P>(
90        tx: &mut impl AvailabilityStorage<Types>,
91        fetcher: Arc<Fetcher<Types, S, P>>,
92        req: Self::Request,
93    ) -> anyhow::Result<()>
94    where
95        S: VersionedDataSource + 'static,
96        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
97        for<'a> S::ReadOnly<'a>:
98            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
99        P: AvailabilityProvider<Types>,
100    {
101        fetch_leaf_with_callbacks(tx, fetcher, req, None).await
102    }
103
104    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
105    where
106        S: AvailabilityStorage<Types>,
107    {
108        storage.get_leaf(req).await
109    }
110}
111
112pub(super) async fn fetch_leaf_with_callbacks<Types, S, P, I>(
113    tx: &mut impl AvailabilityStorage<Types>,
114    fetcher: Arc<Fetcher<Types, S, P>>,
115    req: LeafId<Types>,
116    callbacks: I,
117) -> anyhow::Result<()>
118where
119    Types: NodeType,
120    Header<Types>: QueryableHeader<Types>,
121    Payload<Types>: QueryablePayload<Types>,
122    S: VersionedDataSource + 'static,
123    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
124    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
125    P: AvailabilityProvider<Types>,
126    I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
127    I::IntoIter: Send,
128{
129    match req {
130        LeafId::Number(n) => {
131            // We need the next leaf in the chain so we can figure out what hash we expect for this
132            // leaf, so we can fetch it securely from an untrusted provider.
133            let next = (n + 1) as u64;
134            let next = match tx.first_available_leaf(next).await {
135                Ok(leaf) if leaf.height() == next => leaf,
136                Ok(leaf) => {
137                    // If we don't have the immediate successor leaf, but we have some later leaf,
138                    // then we can't trigger this exact fetch, but we can fetch the (apparently)
139                    // missing parent of the leaf we do have, which will trigger a chain of fetches
140                    // that eventually reaches all the way back to the desired leaf.
141                    tracing::debug!(
142                        n,
143                        fetching = leaf.height() - 1,
144                        "do not have necessary leaf; trigger fetch of a later leaf"
145                    );
146
147                    let mut callbacks = vec![LeafCallback::Leaf {
148                        fetcher: fetcher.clone(),
149                    }];
150
151                    if !fetcher.leaf_only {
152                        callbacks.push(
153                            HeaderCallback::Payload {
154                                fetcher: fetcher.clone(),
155                            }
156                            .into(),
157                        );
158                        callbacks.push(
159                            HeaderCallback::VidCommon {
160                                fetcher: fetcher.clone(),
161                            }
162                            .into(),
163                        );
164                    }
165
166                    fetcher.leaf_fetcher.clone().spawn_fetch(
167                        request::LeafRequest::new(
168                            leaf.height() - 1,
169                            leaf.leaf().parent_commitment(),
170                            leaf.leaf().justify_qc().commit(),
171                        ),
172                        fetcher.provider.clone(),
173                        // After getting the leaf, grab the other data as well; that will be missing
174                        // whenever the leaf was.
175                        callbacks,
176                    );
177                    return Ok(());
178                },
179                Err(QueryError::Missing | QueryError::NotFound) => {
180                    // We successfully queried the database, but the next leaf wasn't there. We
181                    // know for sure that based on the current state of the DB, we cannot fetch this
182                    // leaf.
183                    tracing::debug!(n, "not fetching leaf with unknown successor");
184                    return Ok(());
185                },
186                Err(QueryError::Error { message }) => {
187                    // An error occurred while querying the database. We don't know if we need to
188                    // fetch the leaf or not. Return an error so we can try again.
189                    bail!("failed to fetch successor for leaf {n}: {message}");
190                },
191            };
192
193            let fetcher = fetcher.clone();
194            fetcher.leaf_fetcher.clone().spawn_fetch(
195                request::LeafRequest::new(
196                    n as u64,
197                    next.leaf().parent_commitment(),
198                    next.leaf().justify_qc().commit(),
199                ),
200                fetcher.provider.clone(),
201                once(LeafCallback::Leaf { fetcher }).chain(callbacks),
202            );
203        },
204        LeafId::Hash(h) => {
205            // We don't actively fetch leaves when requested by hash, because we have no way of
206            // knowing whether a leaf with such a hash actually exists, and we don't want to bother
207            // peers with requests for non-existent leaves.
208            tracing::debug!("not fetching unknown leaf {h}");
209        },
210    }
211
212    Ok(())
213}
214
215/// Trigger a fetch of the parent of the given `leaf`, if it is missing.
216///
217/// Leaves have a unique constraint among fetchable objects: we cannot fetch a given leaf at height
218/// `h` unless we have its child at height `h + 1`. This is because the child, through its
219/// `parent_commitment`, tells us what the hash of the parent should be, which lets us authenticate
220/// it when fetching from an untrusted provider. Thus, requests for leaf `h` might block if `h + 1`
221/// is not available. To ensure all these requests are eventually unblocked, and all leaves are
222/// eventually fetched, we call this function whenever we receive leaf `h + 1` to check if we need
223/// to then fetch leaf `h`.
224pub(super) fn trigger_fetch_for_parent<Types, S, P>(
225    fetcher: &Arc<Fetcher<Types, S, P>>,
226    leaf: &LeafQueryData<Types>,
227) where
228    Types: NodeType,
229    Header<Types>: QueryableHeader<Types>,
230    Payload<Types>: QueryablePayload<Types>,
231    S: VersionedDataSource + 'static,
232    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
233    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
234    P: AvailabilityProvider<Types>,
235{
236    let height = leaf.height();
237    let parent = leaf.leaf().parent_commitment();
238    let parent_qc = leaf.leaf().justify_qc().commit();
239
240    // Check that there is a parent to fetch.
241    if height == 0 {
242        return;
243    }
244
245    // Spawn an async task; we're triggering a fire-and-forget fetch of a leaf that might now be
246    // available; we don't need to block the caller on this.
247    let fetcher = fetcher.clone();
248    let span = tracing::info_span!("fetch parent leaf", height, %parent, %parent_qc);
249    spawn(
250        async move {
251            // Check if we already have the parent.
252            match fetcher.storage.read().await {
253                Ok(mut tx) => {
254                    // Don't bother fetching a pruned leaf.
255                    if let Ok(pruned_height) = tx.load_pruned_height().await
256                        && pruned_height.is_some_and(|ph| height <= ph)
257                    {
258                        tracing::info!(height, ?pruned_height, "not fetching pruned parent leaf");
259                        return;
260                    }
261
262                    if tx.get_leaf(((height - 1) as usize).into()).await.is_ok() {
263                        return;
264                    }
265                },
266                Err(err) => {
267                    // If we can't open a transaction, we can't be sure that we already have the
268                    // parent, so we fall through to fetching it just to be safe.
269                    tracing::warn!(
270                        height,
271                        %parent,
272                        "error opening transaction to check for parent leaf: {err:#}",
273                    );
274                },
275            }
276
277            tracing::info!(height, %parent, "received new leaf; fetching missing parent");
278            fetcher.leaf_fetcher.clone().spawn_fetch(
279                request::LeafRequest::new(height - 1, parent, parent_qc),
280                fetcher.provider.clone(),
281                // After getting the leaf, grab the other data as well; that will be missing
282                // whenever the leaf was.
283                [
284                    LeafCallback::Leaf {
285                        fetcher: fetcher.clone(),
286                    },
287                    HeaderCallback::Payload {
288                        fetcher: fetcher.clone(),
289                    }
290                    .into(),
291                    HeaderCallback::VidCommon {
292                        fetcher: fetcher.clone(),
293                    }
294                    .into(),
295                ],
296            );
297        }
298        .instrument(span),
299    );
300}
301
302#[async_trait]
303impl<Types> RangedFetchable<Types> for LeafQueryData<Types>
304where
305    Types: NodeType,
306    Header<Types>: QueryableHeader<Types>,
307    Payload<Types>: QueryablePayload<Types>,
308{
309    type RangedRequest = LeafId<Types>;
310
311    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
312    where
313        S: AvailabilityStorage<Types>,
314        R: RangeBounds<usize> + Send + 'static,
315    {
316        storage.get_leaf_range(range).await
317    }
318}
319
320impl<Types> Storable<Types> for LeafQueryData<Types>
321where
322    Types: NodeType,
323{
324    fn name() -> &'static str {
325        "leaf"
326    }
327
328    async fn notify(&self, notifiers: &Notifiers<Types>) {
329        notifiers.leaf.notify(self).await;
330    }
331
332    async fn store(
333        self,
334        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
335        _leaf_only: bool,
336    ) -> anyhow::Result<()> {
337        storage.insert_leaf(self).await
338    }
339}
340
341#[derive(Derivative, From)]
342#[derivative(Debug(bound = ""))]
343pub(super) enum LeafCallback<Types: NodeType, S, P> {
344    /// Callback when fetching the leaf for its own sake.
345    #[from(ignore)]
346    Leaf {
347        #[derivative(Debug = "ignore")]
348        fetcher: Arc<Fetcher<Types, S, P>>,
349    },
350    /// Callback when fetching the leaf in order to then look up something else.
351    Continuation {
352        callback: HeaderCallback<Types, S, P>,
353    },
354}
355
356impl<Types: NodeType, S, P> PartialEq for LeafCallback<Types, S, P> {
357    fn eq(&self, other: &Self) -> bool {
358        self.cmp(other).is_eq()
359    }
360}
361
362impl<Types: NodeType, S, P> Eq for LeafCallback<Types, S, P> {}
363
364impl<Types: NodeType, S, P> Ord for LeafCallback<Types, S, P> {
365    fn cmp(&self, other: &Self) -> Ordering {
366        match (self, other) {
367            // Store leaves in the database before storing derived objects.
368            (Self::Leaf { .. }, Self::Continuation { .. }) => Ordering::Less,
369            (Self::Continuation { .. }, Self::Leaf { .. }) => Ordering::Greater,
370
371            (Self::Continuation { callback: cb1 }, Self::Continuation { callback: cb2 }) => {
372                cb1.cmp(cb2)
373            },
374            _ => Ordering::Equal,
375        }
376    }
377}
378
379impl<Types: NodeType, S, P> PartialOrd for LeafCallback<Types, S, P> {
380    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
381        Some(self.cmp(other))
382    }
383}
384
385impl<Types: NodeType, S, P> Callback<LeafQueryData<Types>> for LeafCallback<Types, S, P>
386where
387    Header<Types>: QueryableHeader<Types>,
388    Payload<Types>: QueryablePayload<Types>,
389    S: VersionedDataSource + 'static,
390    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
391    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
392    P: AvailabilityProvider<Types>,
393{
394    async fn run(self, leaf: LeafQueryData<Types>) {
395        match self {
396            Self::Leaf { fetcher } => {
397                tracing::info!("fetched leaf {}", leaf.height());
398                // Trigger a fetch of the parent leaf, if we don't already have it.
399                trigger_fetch_for_parent(&fetcher, &leaf);
400                fetcher.store_and_notify(&leaf).await;
401            },
402            Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()),
403        }
404    }
405}