hotshot_query_service/data_source/fetching/
vid.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`VidCommonQueryData`].
14
15use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use derive_more::From;
20use futures::future::{BoxFuture, FutureExt};
21use hotshot_types::{
22    data::VidShare,
23    traits::{block_contents::BlockHeader, node_implementation::NodeType},
24};
25
26use super::{
27    header::{fetch_header_and_then, HeaderCallback},
28    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
29    Storable,
30};
31use crate::{
32    availability::{
33        BlockId, QueryableHeader, QueryablePayload, VidCommonMetadata, VidCommonQueryData,
34    },
35    data_source::{
36        storage::{
37            pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
38            UpdateAvailabilityStorage,
39        },
40        VersionedDataSource,
41    },
42    fetching::{self, request, Callback},
43    types::HeightIndexed,
44    Header, Payload, QueryResult, VidCommon,
45};
46
47pub(super) type VidCommonFetcher<Types, S, P> =
48    fetching::Fetcher<request::VidCommonRequest, VidCommonCallback<Types, S, P>>;
49
50#[derive(Clone, Copy, Debug, From)]
51pub(super) struct VidCommonRequest<Types: NodeType>(BlockId<Types>);
52
53impl<Types: NodeType> From<usize> for VidCommonRequest<Types> {
54    fn from(n: usize) -> Self {
55        Self(n.into())
56    }
57}
58
59impl<Types> FetchRequest for VidCommonRequest<Types>
60where
61    Types: NodeType,
62{
63    fn might_exist(self, heights: Heights) -> bool {
64        self.0.might_exist(heights)
65    }
66}
67
68#[async_trait]
69impl<Types> Fetchable<Types> for VidCommonQueryData<Types>
70where
71    Types: NodeType,
72    Header<Types>: QueryableHeader<Types>,
73    Payload<Types>: QueryablePayload<Types>,
74{
75    type Request = VidCommonRequest<Types>;
76
77    fn satisfies(&self, req: Self::Request) -> bool {
78        match req.0 {
79            BlockId::Number(n) => self.height() == n as u64,
80            BlockId::Hash(h) => self.block_hash() == h,
81            BlockId::PayloadHash(h) => self.payload_hash() == h,
82        }
83    }
84
85    async fn passive_fetch(
86        notifiers: &Notifiers<Types>,
87        req: Self::Request,
88    ) -> BoxFuture<'static, Option<Self>> {
89        notifiers
90            .vid_common
91            .wait_for(move |vid| vid.satisfies(req))
92            .await
93            .into_future()
94            .boxed()
95    }
96
97    async fn active_fetch<S, P>(
98        tx: &mut impl AvailabilityStorage<Types>,
99        fetcher: Arc<Fetcher<Types, S, P>>,
100        req: Self::Request,
101    ) -> anyhow::Result<()>
102    where
103        S: VersionedDataSource + 'static,
104        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
105        for<'a> S::ReadOnly<'a>:
106            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
107        P: AvailabilityProvider<Types>,
108    {
109        fetch_header_and_then(
110            tx,
111            req.0,
112            HeaderCallback::VidCommon {
113                fetcher: fetcher.clone(),
114            },
115        )
116        .await
117    }
118
119    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
120    where
121        S: AvailabilityStorage<Types>,
122    {
123        storage.get_vid_common(req.0).await
124    }
125}
126
127#[async_trait]
128impl<Types> RangedFetchable<Types> for VidCommonQueryData<Types>
129where
130    Types: NodeType,
131    Header<Types>: QueryableHeader<Types>,
132    Payload<Types>: QueryablePayload<Types>,
133{
134    type RangedRequest = VidCommonRequest<Types>;
135
136    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
137    where
138        S: AvailabilityStorage<Types>,
139        R: RangeBounds<usize> + Send + 'static,
140    {
141        storage.get_vid_common_range(range).await
142    }
143}
144
145impl<Types> Storable<Types> for VidCommonQueryData<Types>
146where
147    Types: NodeType,
148{
149    fn name() -> &'static str {
150        "VID common"
151    }
152
153    async fn notify(&self, notifiers: &Notifiers<Types>) {
154        notifiers.vid_common.notify(self).await;
155    }
156
157    async fn store(
158        self,
159        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
160        _leaf_only: bool,
161    ) -> anyhow::Result<()> {
162        storage.insert_vid(self, None).await
163    }
164}
165
166impl<Types> Storable<Types> for (VidCommonQueryData<Types>, Option<VidShare>)
167where
168    Types: NodeType,
169{
170    fn name() -> &'static str {
171        "VID data"
172    }
173
174    async fn notify(&self, notifiers: &Notifiers<Types>) {
175        notifiers.vid_common.notify(&self.0).await;
176    }
177
178    async fn store(
179        self,
180        storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
181        _leaf_only: bool,
182    ) -> anyhow::Result<()> {
183        storage.insert_vid(self.0, self.1).await
184    }
185}
186
187pub(super) fn fetch_vid_common_with_header<Types, S, P>(
188    fetcher: Arc<Fetcher<Types, S, P>>,
189    header: Header<Types>,
190) where
191    Types: NodeType,
192    Header<Types>: QueryableHeader<Types>,
193    Payload<Types>: QueryablePayload<Types>,
194    S: VersionedDataSource + 'static,
195    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
196    P: AvailabilityProvider<Types>,
197{
198    let Some(vid_fetcher) = fetcher.vid_common_fetcher.as_ref() else {
199        tracing::info!("not fetching vid because of leaf only mode");
200        return;
201    };
202
203    // Now that we have the header, we only need to retrieve the VID common data.
204    tracing::info!(
205        "spawned active fetch for VID common {:?} (height {})",
206        header.payload_commitment(),
207        header.block_number()
208    );
209    vid_fetcher.spawn_fetch(
210        request::VidCommonRequest(header.payload_commitment()),
211        fetcher.provider.clone(),
212        once(VidCommonCallback {
213            header,
214            fetcher: fetcher.clone(),
215        }),
216    );
217}
218
219#[derive(Derivative)]
220#[derivative(Debug(bound = ""))]
221pub(super) struct VidCommonCallback<Types: NodeType, S, P> {
222    header: Header<Types>,
223    #[derivative(Debug = "ignore")]
224    fetcher: Arc<Fetcher<Types, S, P>>,
225}
226
227impl<Types: NodeType, S, P> PartialEq for VidCommonCallback<Types, S, P> {
228    fn eq(&self, other: &Self) -> bool {
229        self.cmp(other).is_eq()
230    }
231}
232
233impl<Types: NodeType, S, P> Eq for VidCommonCallback<Types, S, P> {}
234
235impl<Types: NodeType, S, P> Ord for VidCommonCallback<Types, S, P> {
236    fn cmp(&self, other: &Self) -> Ordering {
237        self.header.block_number().cmp(&other.header.block_number())
238    }
239}
240
241impl<Types: NodeType, S, P> PartialOrd for VidCommonCallback<Types, S, P> {
242    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
243        Some(self.cmp(other))
244    }
245}
246
247impl<Types: NodeType, S, P> Callback<VidCommon> for VidCommonCallback<Types, S, P>
248where
249    Header<Types>: QueryableHeader<Types>,
250    Payload<Types>: QueryablePayload<Types>,
251    S: VersionedDataSource + 'static,
252    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
253    P: AvailabilityProvider<Types>,
254{
255    async fn run(self, common: VidCommon) {
256        let common = VidCommonQueryData::new(self.header, common);
257        self.fetcher.store_and_notify(common).await;
258    }
259}
260
261#[async_trait]
262impl<Types> Fetchable<Types> for VidCommonMetadata<Types>
263where
264    Types: NodeType,
265    Header<Types>: QueryableHeader<Types>,
266    Payload<Types>: QueryablePayload<Types>,
267{
268    type Request = VidCommonRequest<Types>;
269
270    fn satisfies(&self, req: Self::Request) -> bool {
271        match req.0 {
272            BlockId::Number(n) => self.height == n as u64,
273            BlockId::Hash(h) => self.block_hash == h,
274            BlockId::PayloadHash(h) => self.payload_hash == h,
275        }
276    }
277
278    async fn passive_fetch(
279        notifiers: &Notifiers<Types>,
280        req: Self::Request,
281    ) -> BoxFuture<'static, Option<Self>> {
282        notifiers
283            .vid_common
284            .wait_for(move |vid| vid.satisfies(req))
285            .await
286            .into_future()
287            .map(|opt| opt.map(Self::from))
288            .boxed()
289    }
290
291    async fn active_fetch<S, P>(
292        tx: &mut impl AvailabilityStorage<Types>,
293        fetcher: Arc<Fetcher<Types, S, P>>,
294        req: Self::Request,
295    ) -> anyhow::Result<()>
296    where
297        S: VersionedDataSource + 'static,
298        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
299        for<'a> S::ReadOnly<'a>:
300            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
301        P: AvailabilityProvider<Types>,
302    {
303        // Do not fetch if we are in leaf only mode
304        if fetcher.leaf_only {
305            return Ok(());
306        }
307        // Trigger the full VID object to be fetched. This will be enough to satisfy this request
308        // for the summary.
309        VidCommonQueryData::active_fetch(tx, fetcher, req).await
310    }
311
312    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
313    where
314        S: AvailabilityStorage<Types>,
315    {
316        storage.get_vid_common_metadata(req.0).await
317    }
318}
319
320#[async_trait]
321impl<Types> RangedFetchable<Types> for VidCommonMetadata<Types>
322where
323    Types: NodeType,
324    Header<Types>: QueryableHeader<Types>,
325    Payload<Types>: QueryablePayload<Types>,
326{
327    type RangedRequest = VidCommonRequest<Types>;
328
329    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
330    where
331        S: AvailabilityStorage<Types>,
332        R: RangeBounds<usize> + Send + 'static,
333    {
334        storage.get_vid_common_metadata_range(range).await
335    }
336}