hotshot_query_service/data_source/fetching/
header.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Header fetching.
14
15use std::{cmp::Ordering, future::IntoFuture, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use futures::{future::BoxFuture, FutureExt};
22use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
23
24use super::{
25    block::fetch_block_with_header, leaf::fetch_leaf_with_callbacks,
26    vid::fetch_vid_common_with_header, AvailabilityProvider, Fetcher,
27};
28use crate::{
29    availability::{BlockId, QueryableHeader, QueryablePayload},
30    data_source::{
31        fetching::{Fetchable, HeaderQueryData, LeafQueryData, Notifiers},
32        storage::{
33            pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
34            UpdateAvailabilityStorage,
35        },
36        update::VersionedDataSource,
37    },
38    Header, Payload, QueryError, QueryResult,
39};
40
41impl<Types: NodeType> From<LeafQueryData<Types>> for HeaderQueryData<Types> {
42    fn from(leaf: LeafQueryData<Types>) -> Self {
43        let header = leaf.header().clone();
44
45        Self { header }
46    }
47}
48
49fn satisfies_header_req_from_leaf<Types>(leaf: &LeafQueryData<Types>, req: BlockId<Types>) -> bool
50where
51    Types: NodeType,
52    Header<Types>: QueryableHeader<Types>,
53    Payload<Types>: QueryablePayload<Types>,
54{
55    HeaderQueryData::satisfies(&HeaderQueryData::new(leaf.header().clone()), req)
56}
57
58#[async_trait]
59impl<Types> Fetchable<Types> for HeaderQueryData<Types>
60where
61    Types: NodeType,
62    Header<Types>: QueryableHeader<Types>,
63    Payload<Types>: QueryablePayload<Types>,
64{
65    type Request = BlockId<Types>;
66
67    fn satisfies(&self, req: Self::Request) -> bool {
68        let header = self.header();
69        match req {
70            BlockId::Number(n) => header.block_number() as usize == n,
71            BlockId::Hash(h) => header.commit() == h,
72            BlockId::PayloadHash(h) => header.payload_commitment() == h,
73        }
74    }
75
76    async fn passive_fetch(
77        notifiers: &Notifiers<Types>,
78        req: Self::Request,
79    ) -> BoxFuture<'static, Option<Self>> {
80        notifiers
81            .leaf
82            .wait_for(move |leaf| satisfies_header_req_from_leaf(leaf, req))
83            .await
84            .into_future()
85            .map(|leaf| leaf.map(HeaderQueryData::from))
86            .boxed()
87    }
88
89    async fn active_fetch<S, P>(
90        tx: &mut impl AvailabilityStorage<Types>,
91        fetcher: Arc<Fetcher<Types, S, P>>,
92        req: Self::Request,
93    ) -> anyhow::Result<()>
94    where
95        S: VersionedDataSource + 'static,
96        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
97        for<'a> S::ReadOnly<'a>:
98            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
99        P: AvailabilityProvider<Types>,
100    {
101        // Note: if leaf only mode is enabled
102        // the payload callback will not do any active fetching and just return
103        // This is because we don't have payload fetcher for leaf only mode
104        fetch_header_and_then(
105            tx,
106            req,
107            HeaderCallback::Payload {
108                fetcher: fetcher.clone(),
109            },
110        )
111        .await
112    }
113
114    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
115    where
116        S: AvailabilityStorage<Types>,
117    {
118        storage.get_header(req).await.map(|header| Self { header })
119    }
120}
121
122#[derive(Derivative)]
123#[derivative(Debug(bound = ""))]
124pub(super) enum HeaderCallback<Types, S, P>
125where
126    Types: NodeType,
127{
128    /// Callback when fetching the leaf in order to then look up the corresponding block.
129    Payload {
130        #[derivative(Debug = "ignore")]
131        fetcher: Arc<Fetcher<Types, S, P>>,
132    },
133    /// Callback when fetching the leaf in order to then look up the corresponding VID common data.
134    VidCommon {
135        #[derivative(Debug = "ignore")]
136        fetcher: Arc<Fetcher<Types, S, P>>,
137    },
138}
139
140impl<Types: NodeType, S, P> PartialEq for HeaderCallback<Types, S, P> {
141    fn eq(&self, other: &Self) -> bool {
142        self.cmp(other).is_eq()
143    }
144}
145
146impl<Types: NodeType, S, P> Eq for HeaderCallback<Types, S, P> {}
147
148impl<Types: NodeType, S, P> PartialOrd for HeaderCallback<Types, S, P> {
149    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
150        Some(self.cmp(other))
151    }
152}
153
154impl<Types: NodeType, S, P> Ord for HeaderCallback<Types, S, P> {
155    fn cmp(&self, other: &Self) -> Ordering {
156        // Arbitrarily, we choose to run payload callbacks before VID callbacks.
157        match (self, other) {
158            (Self::Payload { .. }, Self::VidCommon { .. }) => Ordering::Less,
159            (Self::VidCommon { .. }, Self::Payload { .. }) => Ordering::Greater,
160            _ => Ordering::Equal,
161        }
162    }
163}
164
165impl<Types, S, P> HeaderCallback<Types, S, P>
166where
167    Types: NodeType,
168    Header<Types>: QueryableHeader<Types>,
169    Payload<Types>: QueryablePayload<Types>,
170    S: VersionedDataSource + 'static,
171    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
172    P: AvailabilityProvider<Types>,
173{
174    fn fetcher(&self) -> Arc<Fetcher<Types, S, P>> {
175        match self {
176            Self::Payload { fetcher } => fetcher.clone(),
177            Self::VidCommon { fetcher } => fetcher.clone(),
178        }
179    }
180
181    pub(super) fn run(self, header: Header<Types>) {
182        match self {
183            Self::Payload { fetcher } => {
184                tracing::info!(
185                    "fetched leaf {}, will now fetch payload",
186                    header.block_number()
187                );
188                fetch_block_with_header(fetcher, header);
189            },
190            Self::VidCommon { fetcher } => {
191                tracing::info!(
192                    "fetched leaf {}, will now fetch VID common",
193                    header.block_number()
194                );
195                fetch_vid_common_with_header(fetcher, header);
196            },
197        }
198    }
199}
200
201pub(super) async fn fetch_header_and_then<Types, S, P>(
202    tx: &mut impl AvailabilityStorage<Types>,
203    req: BlockId<Types>,
204    callback: HeaderCallback<Types, S, P>,
205) -> anyhow::Result<()>
206where
207    Types: NodeType,
208    Header<Types>: QueryableHeader<Types>,
209    Payload<Types>: QueryablePayload<Types>,
210    S: VersionedDataSource + 'static,
211    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
212    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
213    P: AvailabilityProvider<Types>,
214{
215    // Check if at least the header is available in local storage. If it is, we benefit two ways:
216    // 1. We know for sure the corresponding block exists, so we can unconditionally trigger an
217    //    active fetch without unnecessarily bothering our peers.
218    // 2. We only need to fetch the missing data (e.g. payload or VID common), not the full block.
219    //    Not only is this marginally less data to download, there are some providers that may only
220    //    be able to provide certain data. For example, the HotShot DA committee members may be able
221    //    to provide paylaods, but not full blocks. Or, in the case where VID recovery is needed,
222    //    the VID common data may be available but the full block may not exist anywhere.
223    match tx.get_header(req).await {
224        Ok(header) => {
225            callback.run(header);
226            return Ok(());
227        },
228        Err(QueryError::Missing | QueryError::NotFound) => {
229            // We successfully queried the database, but the header wasn't there. Fall through to
230            // fetching it.
231            tracing::debug!(?req, "header not available locally; trying fetch");
232        },
233        Err(QueryError::Error { message }) => {
234            // An error occurred while querying the database. We don't know if we need to fetch the
235            // header or not. Return an error so we can try again.
236            bail!("failed to fetch header for block {req:?}: {message}");
237        },
238    }
239
240    // If the header is _not_ present, we may still be able to fetch the request, but we need to
241    // fetch the header (in fact, the entire leaf) first. This is because we have an invariant that
242    // we should not store derived objects in the database unless we already have the corresponding
243    // header and leaf.
244    match req {
245        BlockId::Number(n) => {
246            fetch_leaf_with_callbacks(tx, callback.fetcher(), n.into(), [callback.into()]).await?;
247        },
248        BlockId::Hash(h) => {
249            // Given only the hash, we cannot tell if the corresponding leaf actually exists, since
250            // we don't have a corresponding header. Therefore, we will not spawn an active fetch.
251            tracing::debug!("not fetching unknown block {h}");
252        },
253        BlockId::PayloadHash(h) => {
254            // Same as above, we don't fetch a block with a payload that is not known to exist.
255            tracing::debug!("not fetching block with unknown payload {h}");
256        },
257    }
258
259    Ok(())
260}