hotshot_query_service/data_source/fetching/
leaf.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`LeafQueryData`].
14
15use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use derive_more::From;
22use futures::future::{BoxFuture, FutureExt};
23use hotshot_types::traits::node_implementation::NodeType;
24use tokio::spawn;
25use tracing::Instrument;
26
27use super::{
28    header::HeaderCallback, AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights,
29    Notifiers, RangedFetchable, Storable,
30};
31use crate::{
32    availability::{LeafId, LeafQueryData, QueryableHeader, QueryablePayload},
33    data_source::{
34        storage::{
35            pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
36            UpdateAvailabilityStorage,
37        },
38        VersionedDataSource,
39    },
40    fetching::{self, request, Callback},
41    types::HeightIndexed,
42    Header, Payload, QueryError, QueryResult,
43};
44
45pub(super) type LeafFetcher<Types, S, P> =
46    fetching::Fetcher<request::LeafRequest<Types>, LeafCallback<Types, S, P>>;
47
48impl<Types> FetchRequest for LeafId<Types>
49where
50    Types: NodeType,
51{
52    fn might_exist(self, heights: Heights) -> bool {
53        if let LeafId::Number(n) = self {
54            heights.might_exist(n as u64)
55        } else {
56            true
57        }
58    }
59}
60
61#[async_trait]
62impl<Types> Fetchable<Types> for LeafQueryData<Types>
63where
64    Types: NodeType,
65    Header<Types>: QueryableHeader<Types>,
66    Payload<Types>: QueryablePayload<Types>,
67{
68    type Request = LeafId<Types>;
69
70    fn satisfies(&self, req: Self::Request) -> bool {
71        match req {
72            LeafId::Number(n) => self.height() == n as u64,
73            LeafId::Hash(h) => self.hash() == h,
74        }
75    }
76
77    async fn passive_fetch(
78        notifiers: &Notifiers<Types>,
79        req: Self::Request,
80    ) -> BoxFuture<'static, Option<Self>> {
81        notifiers
82            .leaf
83            .wait_for(move |leaf| leaf.satisfies(req))
84            .await
85            .into_future()
86            .boxed()
87    }
88
89    async fn active_fetch<S, P>(
90        tx: &mut impl AvailabilityStorage<Types>,
91        fetcher: Arc<Fetcher<Types, S, P>>,
92        req: Self::Request,
93    ) -> anyhow::Result<()>
94    where
95        S: VersionedDataSource + 'static,
96        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
97        for<'a> S::ReadOnly<'a>:
98            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
99        P: AvailabilityProvider<Types>,
100    {
101        fetch_leaf_with_callbacks(tx, fetcher, req, None).await
102    }
103
104    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
105    where
106        S: AvailabilityStorage<Types>,
107    {
108        storage.get_leaf(req).await
109    }
110}
111
112pub(super) async fn fetch_leaf_with_callbacks<Types, S, P, I>(
113    tx: &mut impl AvailabilityStorage<Types>,
114    fetcher: Arc<Fetcher<Types, S, P>>,
115    req: LeafId<Types>,
116    callbacks: I,
117) -> anyhow::Result<()>
118where
119    Types: NodeType,
120    Header<Types>: QueryableHeader<Types>,
121    Payload<Types>: QueryablePayload<Types>,
122    S: VersionedDataSource + 'static,
123    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
124    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
125    P: AvailabilityProvider<Types>,
126    I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
127    I::IntoIter: Send,
128{
129    match req {
130        LeafId::Number(n) => {
131            // We need the next leaf in the chain so we can figure out what hash we expect for this
132            // leaf, so we can fetch it securely from an untrusted provider.
133            let next = (n + 1) as u64;
134            let next = match tx.first_available_leaf(next).await {
135                Ok(leaf) if leaf.height() == next => leaf,
136                Ok(leaf) => {
137                    // If we don't have the immediate successor leaf, but we have some later leaf,
138                    // then we can't trigger this exact fetch, but we can fetch the (apparently)
139                    // missing parent of the leaf we do have, which will trigger a chain of fetches
140                    // that eventually reaches all the way back to the desired leaf.
141                    tracing::debug!(
142                        n,
143                        fetching = leaf.height() - 1,
144                        "do not have necessary leaf; trigger fetch of a later leaf"
145                    );
146
147                    let mut callbacks = vec![LeafCallback::Leaf {
148                        fetcher: fetcher.clone(),
149                    }];
150
151                    if !fetcher.leaf_only {
152                        callbacks.push(
153                            HeaderCallback::Payload {
154                                fetcher: fetcher.clone(),
155                            }
156                            .into(),
157                        );
158                        callbacks.push(
159                            HeaderCallback::VidCommon {
160                                fetcher: fetcher.clone(),
161                            }
162                            .into(),
163                        );
164                    }
165
166                    fetcher.leaf_fetcher.clone().spawn_fetch(
167                        request::LeafRequest::new(
168                            leaf.height() - 1,
169                            leaf.leaf().parent_commitment(),
170                            leaf.leaf().justify_qc().commit(),
171                        ),
172                        fetcher.provider.clone(),
173                        // After getting the leaf, grab the other data as well; that will be missing
174                        // whenever the leaf was.
175                        callbacks,
176                    );
177                    return Ok(());
178                },
179                Err(QueryError::Missing | QueryError::NotFound) => {
180                    // We successfully queried the database, but the next leaf wasn't there. We
181                    // know for sure that based on the current state of the DB, we cannot fetch this
182                    // leaf.
183                    tracing::debug!(n, "not fetching leaf with unknown successor");
184                    return Ok(());
185                },
186                Err(QueryError::Error { message }) => {
187                    // An error occurred while querying the database. We don't know if we need to
188                    // fetch the leaf or not. Return an error so we can try again.
189                    bail!("failed to fetch successor for leaf {n}: {message}");
190                },
191            };
192
193            let fetcher = fetcher.clone();
194            fetcher.leaf_fetcher.clone().spawn_fetch(
195                request::LeafRequest::new(
196                    n as u64,
197                    next.leaf().parent_commitment(),
198                    next.leaf().justify_qc().commit(),
199                ),
200                fetcher.provider.clone(),
201                once(LeafCallback::Leaf { fetcher }).chain(callbacks),
202            );
203        },
204        LeafId::Hash(h) => {
205            // We don't actively fetch leaves when requested by hash, because we have no way of
206            // knowing whether a leaf with such a hash actually exists, and we don't want to bother
207            // peers with requests for non-existent leaves.
208            tracing::debug!("not fetching unknown leaf {h}");
209        },
210    }
211
212    Ok(())
213}
214
215/// Trigger a fetch of the parent of the given `leaf`, if it is missing.
216///
217/// Leaves have a unique constraint among fetchable objects: we cannot fetch a given leaf at height
218/// `h` unless we have its child at height `h + 1`. This is because the child, through its
219/// `parent_commitment`, tells us what the hash of the parent should be, which lets us authenticate
220/// it when fetching from an untrusted provider. Thus, requests for leaf `h` might block if `h + 1`
221/// is not available. To ensure all these requests are eventually unblocked, and all leaves are
222/// eventually fetched, we call this function whenever we receive leaf `h + 1` to check if we need
223/// to then fetch leaf `h`.
224pub(super) fn trigger_fetch_for_parent<Types, S, P>(
225    fetcher: &Arc<Fetcher<Types, S, P>>,
226    leaf: &LeafQueryData<Types>,
227) where
228    Types: NodeType,
229    Header<Types>: QueryableHeader<Types>,
230    Payload<Types>: QueryablePayload<Types>,
231    S: VersionedDataSource + 'static,
232    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
233    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
234    P: AvailabilityProvider<Types>,
235{
236    let height = leaf.height();
237    let parent = leaf.leaf().parent_commitment();
238    let parent_qc = leaf.leaf().justify_qc().commit();
239
240    // Check that there is a parent to fetch.
241    if height == 0 {
242        return;
243    }
244
245    // Spawn an async task; we're triggering a fire-and-forget fetch of a leaf that might now be
246    // available; we don't need to block the caller on this.
247    let fetcher = fetcher.clone();
248    let span = tracing::info_span!("fetch parent leaf", height, %parent, %parent_qc);
249    spawn(
250        async move {
251            // Check if we already have the parent.
252            match fetcher.storage.read().await {
253                Ok(mut tx) => {
254                    // Don't bother fetching a pruned leaf.
255                    if let Ok(pruned_height) = tx.load_pruned_height().await {
256                        if pruned_height.is_some_and(|ph| height <= ph) {
257                            tracing::info!(
258                                height,
259                                ?pruned_height,
260                                "not fetching pruned parent leaf"
261                            );
262                            return;
263                        }
264                    }
265
266                    if tx.get_leaf(((height - 1) as usize).into()).await.is_ok() {
267                        return;
268                    }
269                },
270                Err(err) => {
271                    // If we can't open a transaction, we can't be sure that we already have the
272                    // parent, so we fall through to fetching it just to be safe.
273                    tracing::warn!(
274                        height,
275                        %parent,
276                        "error opening transaction to check for parent leaf: {err:#}",
277                    );
278                },
279            }
280
281            tracing::info!(height, %parent, "received new leaf; fetching missing parent");
282            fetcher.leaf_fetcher.clone().spawn_fetch(
283                request::LeafRequest::new(height - 1, parent, parent_qc),
284                fetcher.provider.clone(),
285                // After getting the leaf, grab the other data as well; that will be missing
286                // whenever the leaf was.
287                [
288                    LeafCallback::Leaf {
289                        fetcher: fetcher.clone(),
290                    },
291                    HeaderCallback::Payload {
292                        fetcher: fetcher.clone(),
293                    }
294                    .into(),
295                    HeaderCallback::VidCommon {
296                        fetcher: fetcher.clone(),
297                    }
298                    .into(),
299                ],
300            );
301        }
302        .instrument(span),
303    );
304}
305
306#[async_trait]
307impl<Types> RangedFetchable<Types> for LeafQueryData<Types>
308where
309    Types: NodeType,
310    Header<Types>: QueryableHeader<Types>,
311    Payload<Types>: QueryablePayload<Types>,
312{
313    type RangedRequest = LeafId<Types>;
314
315    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
316    where
317        S: AvailabilityStorage<Types>,
318        R: RangeBounds<usize> + Send + 'static,
319    {
320        storage.get_leaf_range(range).await
321    }
322}
323
324impl<Types> Storable<Types> for LeafQueryData<Types>
325where
326    Types: NodeType,
327{
328    fn name() -> &'static str {
329        "leaf"
330    }
331
332    async fn notify(&self, notifiers: &Notifiers<Types>) {
333        notifiers.leaf.notify(self).await;
334    }
335
336    async fn store(
337        self,
338        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
339        _leaf_only: bool,
340    ) -> anyhow::Result<()> {
341        storage.insert_leaf(self).await
342    }
343}
344
345#[derive(Derivative, From)]
346#[derivative(Debug(bound = ""))]
347pub(super) enum LeafCallback<Types: NodeType, S, P> {
348    /// Callback when fetching the leaf for its own sake.
349    #[from(ignore)]
350    Leaf {
351        #[derivative(Debug = "ignore")]
352        fetcher: Arc<Fetcher<Types, S, P>>,
353    },
354    /// Callback when fetching the leaf in order to then look up something else.
355    Continuation {
356        callback: HeaderCallback<Types, S, P>,
357    },
358}
359
360impl<Types: NodeType, S, P> PartialEq for LeafCallback<Types, S, P> {
361    fn eq(&self, other: &Self) -> bool {
362        self.cmp(other).is_eq()
363    }
364}
365
366impl<Types: NodeType, S, P> Eq for LeafCallback<Types, S, P> {}
367
368impl<Types: NodeType, S, P> Ord for LeafCallback<Types, S, P> {
369    fn cmp(&self, other: &Self) -> Ordering {
370        match (self, other) {
371            // Store leaves in the database before storing derived objects.
372            (Self::Leaf { .. }, Self::Continuation { .. }) => Ordering::Less,
373            (Self::Continuation { .. }, Self::Leaf { .. }) => Ordering::Greater,
374
375            (Self::Continuation { callback: cb1 }, Self::Continuation { callback: cb2 }) => {
376                cb1.cmp(cb2)
377            },
378            _ => Ordering::Equal,
379        }
380    }
381}
382
383impl<Types: NodeType, S, P> PartialOrd for LeafCallback<Types, S, P> {
384    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
385        Some(self.cmp(other))
386    }
387}
388
389impl<Types: NodeType, S, P> Callback<LeafQueryData<Types>> for LeafCallback<Types, S, P>
390where
391    Header<Types>: QueryableHeader<Types>,
392    Payload<Types>: QueryablePayload<Types>,
393    S: VersionedDataSource + 'static,
394    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
395    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
396    P: AvailabilityProvider<Types>,
397{
398    async fn run(self, leaf: LeafQueryData<Types>) {
399        match self {
400            Self::Leaf { fetcher } => {
401                tracing::info!("fetched leaf {}", leaf.height());
402                // Trigger a fetch of the parent leaf, if we don't already have it.
403                trigger_fetch_for_parent(&fetcher, &leaf);
404                fetcher.store_and_notify(leaf).await;
405            },
406            Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()),
407        }
408    }
409}