hotshot_query_service/data_source/fetching/
block.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`BlockQueryData`] and [`PayloadQueryData`].
14
15use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use derive_more::From;
20use futures::future::{BoxFuture, FutureExt};
21use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
22
23use super::{
24    header::{fetch_header_and_then, HeaderCallback},
25    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
26    Storable,
27};
28use crate::{
29    availability::{
30        BlockId, BlockQueryData, PayloadMetadata, PayloadQueryData, QueryableHeader,
31        QueryablePayload,
32    },
33    data_source::{
34        storage::{
35            pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
36            UpdateAvailabilityStorage,
37        },
38        VersionedDataSource,
39    },
40    fetching::{
41        self,
42        request::{self, PayloadRequest},
43        Callback,
44    },
45    types::HeightIndexed,
46    Header, Payload, QueryResult,
47};
48pub(super) type PayloadFetcher<Types, S, P> =
49    fetching::Fetcher<request::PayloadRequest, PayloadCallback<Types, S, P>>;
50
51impl<Types> FetchRequest for BlockId<Types>
52where
53    Types: NodeType,
54{
55    fn might_exist(self, heights: Heights) -> bool {
56        if let BlockId::Number(n) = self {
57            heights.might_exist(n as u64)
58        } else {
59            true
60        }
61    }
62}
63
64#[async_trait]
65impl<Types> Fetchable<Types> for BlockQueryData<Types>
66where
67    Types: NodeType,
68    Header<Types>: QueryableHeader<Types>,
69    Payload<Types>: QueryablePayload<Types>,
70{
71    type Request = BlockId<Types>;
72
73    fn satisfies(&self, req: Self::Request) -> bool {
74        match req {
75            BlockId::Number(n) => self.height() == n as u64,
76            BlockId::Hash(h) => self.hash() == h,
77            BlockId::PayloadHash(h) => self.payload_hash() == h,
78        }
79    }
80
81    async fn passive_fetch(
82        notifiers: &Notifiers<Types>,
83        req: Self::Request,
84    ) -> BoxFuture<'static, Option<Self>> {
85        notifiers
86            .block
87            .wait_for(move |block| block.satisfies(req))
88            .await
89            .into_future()
90            .boxed()
91    }
92
93    async fn active_fetch<S, P>(
94        tx: &mut impl AvailabilityStorage<Types>,
95        fetcher: Arc<Fetcher<Types, S, P>>,
96        req: Self::Request,
97    ) -> anyhow::Result<()>
98    where
99        S: VersionedDataSource + 'static,
100        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
101        for<'a> S::ReadOnly<'a>:
102            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
103        P: AvailabilityProvider<Types>,
104    {
105        fetch_header_and_then(
106            tx,
107            req,
108            HeaderCallback::Payload {
109                fetcher: fetcher.clone(),
110            },
111        )
112        .await
113    }
114
115    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
116    where
117        S: AvailabilityStorage<Types>,
118    {
119        storage.get_block(req).await
120    }
121}
122
123#[async_trait]
124impl<Types> RangedFetchable<Types> for BlockQueryData<Types>
125where
126    Types: NodeType,
127    Header<Types>: QueryableHeader<Types>,
128    Payload<Types>: QueryablePayload<Types>,
129{
130    type RangedRequest = BlockId<Types>;
131
132    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
133    where
134        S: AvailabilityStorage<Types>,
135        R: RangeBounds<usize> + Send + 'static,
136    {
137        storage.get_block_range(range).await
138    }
139}
140
141impl<Types> Storable<Types> for BlockQueryData<Types>
142where
143    Types: NodeType,
144{
145    fn name() -> &'static str {
146        "block"
147    }
148
149    async fn notify(&self, notifiers: &Notifiers<Types>) {
150        notifiers.block.notify(self).await;
151    }
152
153    async fn store(
154        self,
155        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
156        leaf_only: bool,
157    ) -> anyhow::Result<()> {
158        if leaf_only {
159            return Ok(());
160        }
161
162        storage.insert_block(self).await
163    }
164}
165
166pub(super) fn fetch_block_with_header<Types, S, P>(
167    fetcher: Arc<Fetcher<Types, S, P>>,
168    header: Header<Types>,
169) where
170    Types: NodeType,
171    Header<Types>: QueryableHeader<Types>,
172    Payload<Types>: QueryablePayload<Types>,
173    S: VersionedDataSource + 'static,
174    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
175    P: AvailabilityProvider<Types>,
176{
177    let Some(payload_fetcher) = fetcher.payload_fetcher.as_ref() else {
178        // If we're in light-weight mode, we don't need to fetch the VID common data.
179        return;
180    };
181
182    // Now that we have the header, we only need to retrieve the payload.
183    tracing::info!(
184        "spawned active fetch for payload {:?} (height {})",
185        header.payload_commitment(),
186        header.block_number()
187    );
188    payload_fetcher.spawn_fetch(
189        PayloadRequest(header.payload_commitment()),
190        fetcher.provider.clone(),
191        once(PayloadCallback {
192            header,
193            fetcher: fetcher.clone(),
194        }),
195    );
196}
197
198#[async_trait]
199impl<Types> Fetchable<Types> for PayloadQueryData<Types>
200where
201    Types: NodeType,
202    Header<Types>: QueryableHeader<Types>,
203    Payload<Types>: QueryablePayload<Types>,
204{
205    type Request = BlockId<Types>;
206
207    fn satisfies(&self, req: Self::Request) -> bool {
208        match req {
209            BlockId::Number(n) => self.height() == n as u64,
210            BlockId::Hash(h) => self.block_hash() == h,
211            BlockId::PayloadHash(h) => self.hash() == h,
212        }
213    }
214
215    async fn passive_fetch(
216        notifiers: &Notifiers<Types>,
217        req: Self::Request,
218    ) -> BoxFuture<'static, Option<Self>> {
219        notifiers
220            .block
221            .wait_for(move |block| block.satisfies(req))
222            .await
223            .into_future()
224            .map(|block| block.map(PayloadQueryData::from))
225            .boxed()
226    }
227
228    async fn active_fetch<S, P>(
229        tx: &mut impl AvailabilityStorage<Types>,
230        fetcher: Arc<Fetcher<Types, S, P>>,
231        req: Self::Request,
232    ) -> anyhow::Result<()>
233    where
234        S: VersionedDataSource + 'static,
235        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
236        for<'a> S::ReadOnly<'a>:
237            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
238        P: AvailabilityProvider<Types>,
239    {
240        // We don't have storage for the payload alone, only the whole block. So if we need to fetch
241        // the payload, we just fetch the whole block (which may end up fetching only the payload,
242        // if that's all that's needed to complete the block).
243        BlockQueryData::active_fetch(tx, fetcher, req).await
244    }
245
246    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
247    where
248        S: AvailabilityStorage<Types>,
249    {
250        storage.get_payload(req).await
251    }
252}
253
254#[async_trait]
255impl<Types> RangedFetchable<Types> for PayloadQueryData<Types>
256where
257    Types: NodeType,
258    Header<Types>: QueryableHeader<Types>,
259    Payload<Types>: QueryablePayload<Types>,
260{
261    type RangedRequest = BlockId<Types>;
262
263    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
264    where
265        S: AvailabilityStorage<Types>,
266        R: RangeBounds<usize> + Send + 'static,
267    {
268        storage.get_payload_range(range).await
269    }
270}
271
272#[derive(Derivative)]
273#[derivative(Debug(bound = ""))]
274pub(super) struct PayloadCallback<Types: NodeType, S, P> {
275    header: Header<Types>,
276    #[derivative(Debug = "ignore")]
277    fetcher: Arc<Fetcher<Types, S, P>>,
278}
279
280impl<Types: NodeType, S, P> PartialEq for PayloadCallback<Types, S, P> {
281    fn eq(&self, other: &Self) -> bool {
282        self.cmp(other).is_eq()
283    }
284}
285
286impl<Types: NodeType, S, P> Eq for PayloadCallback<Types, S, P> {}
287
288impl<Types: NodeType, S, P> Ord for PayloadCallback<Types, S, P> {
289    fn cmp(&self, other: &Self) -> Ordering {
290        self.header.block_number().cmp(&other.header.block_number())
291    }
292}
293
294impl<Types: NodeType, S, P> PartialOrd for PayloadCallback<Types, S, P> {
295    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
296        Some(self.cmp(other))
297    }
298}
299
300impl<Types: NodeType, S, P> Callback<Payload<Types>> for PayloadCallback<Types, S, P>
301where
302    Header<Types>: QueryableHeader<Types>,
303    Payload<Types>: QueryablePayload<Types>,
304    S: 'static + VersionedDataSource,
305    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
306    P: AvailabilityProvider<Types>,
307{
308    async fn run(self, payload: Payload<Types>) {
309        tracing::info!("fetched payload {:?}", self.header.payload_commitment());
310        let block = BlockQueryData::new(self.header, payload);
311        self.fetcher.store_and_notify(block).await;
312    }
313}
314
315#[async_trait]
316impl<Types> Fetchable<Types> for PayloadMetadata<Types>
317where
318    Types: NodeType,
319    Header<Types>: QueryableHeader<Types>,
320    Payload<Types>: QueryablePayload<Types>,
321{
322    type Request = BlockId<Types>;
323
324    fn satisfies(&self, req: Self::Request) -> bool {
325        match req {
326            BlockId::Number(n) => self.height == n as u64,
327            BlockId::Hash(h) => self.block_hash == h,
328            BlockId::PayloadHash(h) => self.hash == h,
329        }
330    }
331
332    async fn passive_fetch(
333        notifiers: &Notifiers<Types>,
334        req: Self::Request,
335    ) -> BoxFuture<'static, Option<Self>> {
336        notifiers
337            .block
338            .wait_for(move |block| block.satisfies(req))
339            .await
340            .into_future()
341            .map(|opt| opt.map(Self::from))
342            .boxed()
343    }
344
345    async fn active_fetch<S, P>(
346        tx: &mut impl AvailabilityStorage<Types>,
347        fetcher: Arc<Fetcher<Types, S, P>>,
348        req: Self::Request,
349    ) -> anyhow::Result<()>
350    where
351        S: VersionedDataSource + 'static,
352        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
353        for<'a> S::ReadOnly<'a>:
354            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
355        P: AvailabilityProvider<Types>,
356    {
357        // Trigger the full block to be fetched. This will be enough to satisfy this request for the
358        // payload summary.
359        BlockQueryData::active_fetch(tx, fetcher, req).await
360    }
361
362    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
363    where
364        S: AvailabilityStorage<Types>,
365    {
366        storage.get_payload_metadata(req).await
367    }
368}
369
370#[async_trait]
371impl<Types> RangedFetchable<Types> for PayloadMetadata<Types>
372where
373    Types: NodeType,
374    Header<Types>: QueryableHeader<Types>,
375    Payload<Types>: QueryablePayload<Types>,
376{
377    type RangedRequest = BlockId<Types>;
378
379    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
380    where
381        S: AvailabilityStorage<Types>,
382        R: RangeBounds<usize> + Send + 'static,
383    {
384        storage.get_payload_metadata_range(range).await
385    }
386}