hotshot_query_service/data_source/fetching/
block.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`BlockQueryData`] and [`PayloadQueryData`].
14
15use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use futures::future::{BoxFuture, FutureExt};
20use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
21
22use super::{
23    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
24    Storable,
25    header::{HeaderCallback, fetch_header_and_then},
26};
27use crate::{
28    Header, Payload, QueryResult,
29    availability::{
30        BlockId, BlockQueryData, PayloadMetadata, PayloadQueryData, QueryableHeader,
31        QueryablePayload,
32    },
33    data_source::{
34        VersionedDataSource,
35        storage::{
36            AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
37            pruning::PrunedHeightStorage,
38        },
39    },
40    fetching::{
41        self, Callback,
42        request::{self, PayloadRequest},
43    },
44    types::HeightIndexed,
45};
46pub(super) type PayloadFetcher<Types, S, P> =
47    fetching::Fetcher<request::PayloadRequest, PayloadCallback<Types, S, P>>;
48
49impl<Types> FetchRequest for BlockId<Types>
50where
51    Types: NodeType,
52{
53    fn might_exist(self, heights: Heights) -> bool {
54        if let BlockId::Number(n) = self {
55            heights.might_exist(n as u64)
56        } else {
57            true
58        }
59    }
60}
61
62#[async_trait]
63impl<Types> Fetchable<Types> for BlockQueryData<Types>
64where
65    Types: NodeType,
66    Header<Types>: QueryableHeader<Types>,
67    Payload<Types>: QueryablePayload<Types>,
68{
69    type Request = BlockId<Types>;
70
71    fn satisfies(&self, req: Self::Request) -> bool {
72        match req {
73            BlockId::Number(n) => self.height() == n as u64,
74            BlockId::Hash(h) => self.hash() == h,
75            BlockId::PayloadHash(h) => self.payload_hash() == h,
76        }
77    }
78
79    async fn passive_fetch(
80        notifiers: &Notifiers<Types>,
81        req: Self::Request,
82    ) -> BoxFuture<'static, Option<Self>> {
83        notifiers
84            .block
85            .wait_for(move |block| block.satisfies(req))
86            .await
87            .into_future()
88            .boxed()
89    }
90
91    async fn active_fetch<S, P>(
92        tx: &mut impl AvailabilityStorage<Types>,
93        fetcher: Arc<Fetcher<Types, S, P>>,
94        req: Self::Request,
95    ) -> anyhow::Result<()>
96    where
97        S: VersionedDataSource + 'static,
98        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
99        for<'a> S::ReadOnly<'a>:
100            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
101        P: AvailabilityProvider<Types>,
102    {
103        fetch_header_and_then(
104            tx,
105            req,
106            HeaderCallback::Payload {
107                fetcher: fetcher.clone(),
108            },
109        )
110        .await
111    }
112
113    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
114    where
115        S: AvailabilityStorage<Types>,
116    {
117        storage.get_block(req).await
118    }
119}
120
121#[async_trait]
122impl<Types> RangedFetchable<Types> for BlockQueryData<Types>
123where
124    Types: NodeType,
125    Header<Types>: QueryableHeader<Types>,
126    Payload<Types>: QueryablePayload<Types>,
127{
128    type RangedRequest = BlockId<Types>;
129
130    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
131    where
132        S: AvailabilityStorage<Types>,
133        R: RangeBounds<usize> + Send + 'static,
134    {
135        storage.get_block_range(range).await
136    }
137}
138
139impl<Types> Storable<Types> for BlockQueryData<Types>
140where
141    Types: NodeType,
142{
143    fn name() -> &'static str {
144        "block"
145    }
146
147    async fn notify(&self, notifiers: &Notifiers<Types>) {
148        notifiers.block.notify(self).await;
149    }
150
151    async fn store(
152        self,
153        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
154        leaf_only: bool,
155    ) -> anyhow::Result<()> {
156        if leaf_only {
157            return Ok(());
158        }
159
160        storage.insert_block(self).await
161    }
162}
163
164pub(super) fn fetch_block_with_header<Types, S, P>(
165    fetcher: Arc<Fetcher<Types, S, P>>,
166    header: Header<Types>,
167) where
168    Types: NodeType,
169    Header<Types>: QueryableHeader<Types>,
170    Payload<Types>: QueryablePayload<Types>,
171    S: VersionedDataSource + 'static,
172    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
173    P: AvailabilityProvider<Types>,
174{
175    let Some(payload_fetcher) = fetcher.payload_fetcher.as_ref() else {
176        // If we're in light-weight mode, we don't need to fetch the VID common data.
177        return;
178    };
179
180    // Now that we have the header, we only need to retrieve the payload.
181    tracing::info!(
182        "spawned active fetch for payload {:?} (height {})",
183        header.payload_commitment(),
184        header.block_number()
185    );
186    payload_fetcher.spawn_fetch(
187        PayloadRequest(header.payload_commitment()),
188        fetcher.provider.clone(),
189        once(PayloadCallback {
190            header,
191            fetcher: fetcher.clone(),
192        }),
193    );
194}
195
196#[async_trait]
197impl<Types> Fetchable<Types> for PayloadQueryData<Types>
198where
199    Types: NodeType,
200    Header<Types>: QueryableHeader<Types>,
201    Payload<Types>: QueryablePayload<Types>,
202{
203    type Request = BlockId<Types>;
204
205    fn satisfies(&self, req: Self::Request) -> bool {
206        match req {
207            BlockId::Number(n) => self.height() == n as u64,
208            BlockId::Hash(h) => self.block_hash() == h,
209            BlockId::PayloadHash(h) => self.hash() == h,
210        }
211    }
212
213    async fn passive_fetch(
214        notifiers: &Notifiers<Types>,
215        req: Self::Request,
216    ) -> BoxFuture<'static, Option<Self>> {
217        notifiers
218            .block
219            .wait_for(move |block| block.satisfies(req))
220            .await
221            .into_future()
222            .map(|block| block.map(PayloadQueryData::from))
223            .boxed()
224    }
225
226    async fn active_fetch<S, P>(
227        tx: &mut impl AvailabilityStorage<Types>,
228        fetcher: Arc<Fetcher<Types, S, P>>,
229        req: Self::Request,
230    ) -> anyhow::Result<()>
231    where
232        S: VersionedDataSource + 'static,
233        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
234        for<'a> S::ReadOnly<'a>:
235            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
236        P: AvailabilityProvider<Types>,
237    {
238        // We don't have storage for the payload alone, only the whole block. So if we need to fetch
239        // the payload, we just fetch the whole block (which may end up fetching only the payload,
240        // if that's all that's needed to complete the block).
241        BlockQueryData::active_fetch(tx, fetcher, req).await
242    }
243
244    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
245    where
246        S: AvailabilityStorage<Types>,
247    {
248        storage.get_payload(req).await
249    }
250}
251
252#[async_trait]
253impl<Types> RangedFetchable<Types> for PayloadQueryData<Types>
254where
255    Types: NodeType,
256    Header<Types>: QueryableHeader<Types>,
257    Payload<Types>: QueryablePayload<Types>,
258{
259    type RangedRequest = BlockId<Types>;
260
261    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
262    where
263        S: AvailabilityStorage<Types>,
264        R: RangeBounds<usize> + Send + 'static,
265    {
266        storage.get_payload_range(range).await
267    }
268}
269
270#[derive(Derivative)]
271#[derivative(Debug(bound = ""))]
272pub(super) struct PayloadCallback<Types: NodeType, S, P> {
273    header: Header<Types>,
274    #[derivative(Debug = "ignore")]
275    fetcher: Arc<Fetcher<Types, S, P>>,
276}
277
278impl<Types: NodeType, S, P> PartialEq for PayloadCallback<Types, S, P> {
279    fn eq(&self, other: &Self) -> bool {
280        self.cmp(other).is_eq()
281    }
282}
283
284impl<Types: NodeType, S, P> Eq for PayloadCallback<Types, S, P> {}
285
286impl<Types: NodeType, S, P> Ord for PayloadCallback<Types, S, P> {
287    fn cmp(&self, other: &Self) -> Ordering {
288        self.header.block_number().cmp(&other.header.block_number())
289    }
290}
291
292impl<Types: NodeType, S, P> PartialOrd for PayloadCallback<Types, S, P> {
293    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
294        Some(self.cmp(other))
295    }
296}
297
298impl<Types: NodeType, S, P> Callback<Payload<Types>> for PayloadCallback<Types, S, P>
299where
300    Header<Types>: QueryableHeader<Types>,
301    Payload<Types>: QueryablePayload<Types>,
302    S: 'static + VersionedDataSource,
303    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
304    P: AvailabilityProvider<Types>,
305{
306    async fn run(self, payload: Payload<Types>) {
307        tracing::info!("fetched payload {:?}", self.header.payload_commitment());
308        let block = BlockQueryData::new(self.header, payload);
309        self.fetcher.store_and_notify(&block).await;
310    }
311}
312
313#[async_trait]
314impl<Types> Fetchable<Types> for PayloadMetadata<Types>
315where
316    Types: NodeType,
317    Header<Types>: QueryableHeader<Types>,
318    Payload<Types>: QueryablePayload<Types>,
319{
320    type Request = BlockId<Types>;
321
322    fn satisfies(&self, req: Self::Request) -> bool {
323        match req {
324            BlockId::Number(n) => self.height == n as u64,
325            BlockId::Hash(h) => self.block_hash == h,
326            BlockId::PayloadHash(h) => self.hash == h,
327        }
328    }
329
330    async fn passive_fetch(
331        notifiers: &Notifiers<Types>,
332        req: Self::Request,
333    ) -> BoxFuture<'static, Option<Self>> {
334        notifiers
335            .block
336            .wait_for(move |block| block.satisfies(req))
337            .await
338            .into_future()
339            .map(|opt| opt.map(Self::from))
340            .boxed()
341    }
342
343    async fn active_fetch<S, P>(
344        tx: &mut impl AvailabilityStorage<Types>,
345        fetcher: Arc<Fetcher<Types, S, P>>,
346        req: Self::Request,
347    ) -> anyhow::Result<()>
348    where
349        S: VersionedDataSource + 'static,
350        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
351        for<'a> S::ReadOnly<'a>:
352            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
353        P: AvailabilityProvider<Types>,
354    {
355        // Trigger the full block to be fetched. This will be enough to satisfy this request for the
356        // payload summary.
357        BlockQueryData::active_fetch(tx, fetcher, req).await
358    }
359
360    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
361    where
362        S: AvailabilityStorage<Types>,
363    {
364        storage.get_payload_metadata(req).await
365    }
366}
367
368#[async_trait]
369impl<Types> RangedFetchable<Types> for PayloadMetadata<Types>
370where
371    Types: NodeType,
372    Header<Types>: QueryableHeader<Types>,
373    Payload<Types>: QueryablePayload<Types>,
374{
375    type RangedRequest = BlockId<Types>;
376
377    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
378    where
379        S: AvailabilityStorage<Types>,
380        R: RangeBounds<usize> + Send + 'static,
381    {
382        storage.get_payload_metadata_range(range).await
383    }
384}