hotshot_query_service/data_source/fetching/
block.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`BlockQueryData`] and [`PayloadQueryData`].
14
15use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use futures::future::{BoxFuture, FutureExt};
20use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
21
22use super::{
23    header::{fetch_header_and_then, HeaderCallback},
24    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
25    Storable,
26};
27use crate::{
28    availability::{
29        BlockId, BlockQueryData, PayloadMetadata, PayloadQueryData, QueryableHeader,
30        QueryablePayload,
31    },
32    data_source::{
33        storage::{
34            pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
35            UpdateAvailabilityStorage,
36        },
37        VersionedDataSource,
38    },
39    fetching::{
40        self,
41        request::{self, PayloadRequest},
42        Callback,
43    },
44    types::HeightIndexed,
45    Header, Payload, QueryResult,
46};
47pub(super) type PayloadFetcher<Types, S, P> =
48    fetching::Fetcher<request::PayloadRequest, PayloadCallback<Types, S, P>>;
49
50impl<Types> FetchRequest for BlockId<Types>
51where
52    Types: NodeType,
53{
54    fn might_exist(self, heights: Heights) -> bool {
55        if let BlockId::Number(n) = self {
56            heights.might_exist(n as u64)
57        } else {
58            true
59        }
60    }
61}
62
63#[async_trait]
64impl<Types> Fetchable<Types> for BlockQueryData<Types>
65where
66    Types: NodeType,
67    Header<Types>: QueryableHeader<Types>,
68    Payload<Types>: QueryablePayload<Types>,
69{
70    type Request = BlockId<Types>;
71
72    fn satisfies(&self, req: Self::Request) -> bool {
73        match req {
74            BlockId::Number(n) => self.height() == n as u64,
75            BlockId::Hash(h) => self.hash() == h,
76            BlockId::PayloadHash(h) => self.payload_hash() == h,
77        }
78    }
79
80    async fn passive_fetch(
81        notifiers: &Notifiers<Types>,
82        req: Self::Request,
83    ) -> BoxFuture<'static, Option<Self>> {
84        notifiers
85            .block
86            .wait_for(move |block| block.satisfies(req))
87            .await
88            .into_future()
89            .boxed()
90    }
91
92    async fn active_fetch<S, P>(
93        tx: &mut impl AvailabilityStorage<Types>,
94        fetcher: Arc<Fetcher<Types, S, P>>,
95        req: Self::Request,
96    ) -> anyhow::Result<()>
97    where
98        S: VersionedDataSource + 'static,
99        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
100        for<'a> S::ReadOnly<'a>:
101            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
102        P: AvailabilityProvider<Types>,
103    {
104        fetch_header_and_then(
105            tx,
106            req,
107            HeaderCallback::Payload {
108                fetcher: fetcher.clone(),
109            },
110        )
111        .await
112    }
113
114    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
115    where
116        S: AvailabilityStorage<Types>,
117    {
118        storage.get_block(req).await
119    }
120}
121
122#[async_trait]
123impl<Types> RangedFetchable<Types> for BlockQueryData<Types>
124where
125    Types: NodeType,
126    Header<Types>: QueryableHeader<Types>,
127    Payload<Types>: QueryablePayload<Types>,
128{
129    type RangedRequest = BlockId<Types>;
130
131    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
132    where
133        S: AvailabilityStorage<Types>,
134        R: RangeBounds<usize> + Send + 'static,
135    {
136        storage.get_block_range(range).await
137    }
138}
139
140impl<Types> Storable<Types> for BlockQueryData<Types>
141where
142    Types: NodeType,
143{
144    fn name() -> &'static str {
145        "block"
146    }
147
148    async fn notify(&self, notifiers: &Notifiers<Types>) {
149        notifiers.block.notify(self).await;
150    }
151
152    async fn store(
153        self,
154        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
155        leaf_only: bool,
156    ) -> anyhow::Result<()> {
157        if leaf_only {
158            return Ok(());
159        }
160
161        storage.insert_block(self).await
162    }
163}
164
165pub(super) fn fetch_block_with_header<Types, S, P>(
166    fetcher: Arc<Fetcher<Types, S, P>>,
167    header: Header<Types>,
168) where
169    Types: NodeType,
170    Header<Types>: QueryableHeader<Types>,
171    Payload<Types>: QueryablePayload<Types>,
172    S: VersionedDataSource + 'static,
173    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
174    P: AvailabilityProvider<Types>,
175{
176    let Some(payload_fetcher) = fetcher.payload_fetcher.as_ref() else {
177        // If we're in light-weight mode, we don't need to fetch the VID common data.
178        return;
179    };
180
181    // Now that we have the header, we only need to retrieve the payload.
182    tracing::info!(
183        "spawned active fetch for payload {:?} (height {})",
184        header.payload_commitment(),
185        header.block_number()
186    );
187    payload_fetcher.spawn_fetch(
188        PayloadRequest(header.payload_commitment()),
189        fetcher.provider.clone(),
190        once(PayloadCallback {
191            header,
192            fetcher: fetcher.clone(),
193        }),
194    );
195}
196
197#[async_trait]
198impl<Types> Fetchable<Types> for PayloadQueryData<Types>
199where
200    Types: NodeType,
201    Header<Types>: QueryableHeader<Types>,
202    Payload<Types>: QueryablePayload<Types>,
203{
204    type Request = BlockId<Types>;
205
206    fn satisfies(&self, req: Self::Request) -> bool {
207        match req {
208            BlockId::Number(n) => self.height() == n as u64,
209            BlockId::Hash(h) => self.block_hash() == h,
210            BlockId::PayloadHash(h) => self.hash() == h,
211        }
212    }
213
214    async fn passive_fetch(
215        notifiers: &Notifiers<Types>,
216        req: Self::Request,
217    ) -> BoxFuture<'static, Option<Self>> {
218        notifiers
219            .block
220            .wait_for(move |block| block.satisfies(req))
221            .await
222            .into_future()
223            .map(|block| block.map(PayloadQueryData::from))
224            .boxed()
225    }
226
227    async fn active_fetch<S, P>(
228        tx: &mut impl AvailabilityStorage<Types>,
229        fetcher: Arc<Fetcher<Types, S, P>>,
230        req: Self::Request,
231    ) -> anyhow::Result<()>
232    where
233        S: VersionedDataSource + 'static,
234        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
235        for<'a> S::ReadOnly<'a>:
236            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
237        P: AvailabilityProvider<Types>,
238    {
239        // We don't have storage for the payload alone, only the whole block. So if we need to fetch
240        // the payload, we just fetch the whole block (which may end up fetching only the payload,
241        // if that's all that's needed to complete the block).
242        BlockQueryData::active_fetch(tx, fetcher, req).await
243    }
244
245    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
246    where
247        S: AvailabilityStorage<Types>,
248    {
249        storage.get_payload(req).await
250    }
251}
252
253#[async_trait]
254impl<Types> RangedFetchable<Types> for PayloadQueryData<Types>
255where
256    Types: NodeType,
257    Header<Types>: QueryableHeader<Types>,
258    Payload<Types>: QueryablePayload<Types>,
259{
260    type RangedRequest = BlockId<Types>;
261
262    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
263    where
264        S: AvailabilityStorage<Types>,
265        R: RangeBounds<usize> + Send + 'static,
266    {
267        storage.get_payload_range(range).await
268    }
269}
270
271#[derive(Derivative)]
272#[derivative(Debug(bound = ""))]
273pub(super) struct PayloadCallback<Types: NodeType, S, P> {
274    header: Header<Types>,
275    #[derivative(Debug = "ignore")]
276    fetcher: Arc<Fetcher<Types, S, P>>,
277}
278
279impl<Types: NodeType, S, P> PartialEq for PayloadCallback<Types, S, P> {
280    fn eq(&self, other: &Self) -> bool {
281        self.cmp(other).is_eq()
282    }
283}
284
285impl<Types: NodeType, S, P> Eq for PayloadCallback<Types, S, P> {}
286
287impl<Types: NodeType, S, P> Ord for PayloadCallback<Types, S, P> {
288    fn cmp(&self, other: &Self) -> Ordering {
289        self.header.block_number().cmp(&other.header.block_number())
290    }
291}
292
293impl<Types: NodeType, S, P> PartialOrd for PayloadCallback<Types, S, P> {
294    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
295        Some(self.cmp(other))
296    }
297}
298
299impl<Types: NodeType, S, P> Callback<Payload<Types>> for PayloadCallback<Types, S, P>
300where
301    Header<Types>: QueryableHeader<Types>,
302    Payload<Types>: QueryablePayload<Types>,
303    S: 'static + VersionedDataSource,
304    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
305    P: AvailabilityProvider<Types>,
306{
307    async fn run(self, payload: Payload<Types>) {
308        tracing::info!("fetched payload {:?}", self.header.payload_commitment());
309        let block = BlockQueryData::new(self.header, payload);
310        self.fetcher.store_and_notify(block).await;
311    }
312}
313
314#[async_trait]
315impl<Types> Fetchable<Types> for PayloadMetadata<Types>
316where
317    Types: NodeType,
318    Header<Types>: QueryableHeader<Types>,
319    Payload<Types>: QueryablePayload<Types>,
320{
321    type Request = BlockId<Types>;
322
323    fn satisfies(&self, req: Self::Request) -> bool {
324        match req {
325            BlockId::Number(n) => self.height == n as u64,
326            BlockId::Hash(h) => self.block_hash == h,
327            BlockId::PayloadHash(h) => self.hash == h,
328        }
329    }
330
331    async fn passive_fetch(
332        notifiers: &Notifiers<Types>,
333        req: Self::Request,
334    ) -> BoxFuture<'static, Option<Self>> {
335        notifiers
336            .block
337            .wait_for(move |block| block.satisfies(req))
338            .await
339            .into_future()
340            .map(|opt| opt.map(Self::from))
341            .boxed()
342    }
343
344    async fn active_fetch<S, P>(
345        tx: &mut impl AvailabilityStorage<Types>,
346        fetcher: Arc<Fetcher<Types, S, P>>,
347        req: Self::Request,
348    ) -> anyhow::Result<()>
349    where
350        S: VersionedDataSource + 'static,
351        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
352        for<'a> S::ReadOnly<'a>:
353            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
354        P: AvailabilityProvider<Types>,
355    {
356        // Trigger the full block to be fetched. This will be enough to satisfy this request for the
357        // payload summary.
358        BlockQueryData::active_fetch(tx, fetcher, req).await
359    }
360
361    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
362    where
363        S: AvailabilityStorage<Types>,
364    {
365        storage.get_payload_metadata(req).await
366    }
367}
368
369#[async_trait]
370impl<Types> RangedFetchable<Types> for PayloadMetadata<Types>
371where
372    Types: NodeType,
373    Header<Types>: QueryableHeader<Types>,
374    Payload<Types>: QueryablePayload<Types>,
375{
376    type RangedRequest = BlockId<Types>;
377
378    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
379    where
380        S: AvailabilityStorage<Types>,
381        R: RangeBounds<usize> + Send + 'static,
382    {
383        storage.get_payload_metadata_range(range).await
384    }
385}