hotshot_query_service/data_source/fetching/
header.rs1use std::{cmp::Ordering, future::IntoFuture, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use futures::{future::BoxFuture, FutureExt};
22use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
23
24use super::{
25 block::fetch_block_with_header, leaf::fetch_leaf_with_callbacks,
26 vid::fetch_vid_common_with_header, AvailabilityProvider, Fetcher,
27};
28use crate::{
29 availability::{BlockId, QueryableHeader, QueryablePayload},
30 data_source::{
31 fetching::{Fetchable, HeaderQueryData, LeafQueryData, Notifiers},
32 storage::{
33 pruning::PrunedHeightStorage, AvailabilityStorage, NodeStorage,
34 UpdateAvailabilityStorage,
35 },
36 update::VersionedDataSource,
37 },
38 Header, Payload, QueryError, QueryResult,
39};
40
41impl<Types: NodeType> From<LeafQueryData<Types>> for HeaderQueryData<Types> {
42 fn from(leaf: LeafQueryData<Types>) -> Self {
43 let header = leaf.header().clone();
44
45 Self { header }
46 }
47}
48
49fn satisfies_header_req_from_leaf<Types>(leaf: &LeafQueryData<Types>, req: BlockId<Types>) -> bool
50where
51 Types: NodeType,
52 Header<Types>: QueryableHeader<Types>,
53 Payload<Types>: QueryablePayload<Types>,
54{
55 HeaderQueryData::satisfies(&HeaderQueryData::new(leaf.header().clone()), req)
56}
57
58#[async_trait]
59impl<Types> Fetchable<Types> for HeaderQueryData<Types>
60where
61 Types: NodeType,
62 Header<Types>: QueryableHeader<Types>,
63 Payload<Types>: QueryablePayload<Types>,
64{
65 type Request = BlockId<Types>;
66
67 fn satisfies(&self, req: Self::Request) -> bool {
68 let header = self.header();
69 match req {
70 BlockId::Number(n) => header.block_number() as usize == n,
71 BlockId::Hash(h) => header.commit() == h,
72 BlockId::PayloadHash(h) => header.payload_commitment() == h,
73 }
74 }
75
76 async fn passive_fetch(
77 notifiers: &Notifiers<Types>,
78 req: Self::Request,
79 ) -> BoxFuture<'static, Option<Self>> {
80 notifiers
81 .leaf
82 .wait_for(move |leaf| satisfies_header_req_from_leaf(leaf, req))
83 .await
84 .into_future()
85 .map(|leaf| leaf.map(HeaderQueryData::from))
86 .boxed()
87 }
88
89 async fn active_fetch<S, P>(
90 tx: &mut impl AvailabilityStorage<Types>,
91 fetcher: Arc<Fetcher<Types, S, P>>,
92 req: Self::Request,
93 ) -> anyhow::Result<()>
94 where
95 S: VersionedDataSource + 'static,
96 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
97 for<'a> S::ReadOnly<'a>:
98 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
99 P: AvailabilityProvider<Types>,
100 {
101 fetch_header_and_then(
105 tx,
106 req,
107 HeaderCallback::Payload {
108 fetcher: fetcher.clone(),
109 },
110 )
111 .await
112 }
113
114 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
115 where
116 S: AvailabilityStorage<Types>,
117 {
118 storage.get_header(req).await.map(|header| Self { header })
119 }
120}
121
122#[derive(Derivative)]
123#[derivative(Debug(bound = ""))]
124pub(super) enum HeaderCallback<Types, S, P>
125where
126 Types: NodeType,
127{
128 Payload {
130 #[derivative(Debug = "ignore")]
131 fetcher: Arc<Fetcher<Types, S, P>>,
132 },
133 VidCommon {
135 #[derivative(Debug = "ignore")]
136 fetcher: Arc<Fetcher<Types, S, P>>,
137 },
138}
139
140impl<Types: NodeType, S, P> PartialEq for HeaderCallback<Types, S, P> {
141 fn eq(&self, other: &Self) -> bool {
142 self.cmp(other).is_eq()
143 }
144}
145
146impl<Types: NodeType, S, P> Eq for HeaderCallback<Types, S, P> {}
147
148impl<Types: NodeType, S, P> PartialOrd for HeaderCallback<Types, S, P> {
149 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
150 Some(self.cmp(other))
151 }
152}
153
154impl<Types: NodeType, S, P> Ord for HeaderCallback<Types, S, P> {
155 fn cmp(&self, other: &Self) -> Ordering {
156 match (self, other) {
158 (Self::Payload { .. }, Self::VidCommon { .. }) => Ordering::Less,
159 (Self::VidCommon { .. }, Self::Payload { .. }) => Ordering::Greater,
160 _ => Ordering::Equal,
161 }
162 }
163}
164
165impl<Types, S, P> HeaderCallback<Types, S, P>
166where
167 Types: NodeType,
168 Header<Types>: QueryableHeader<Types>,
169 Payload<Types>: QueryablePayload<Types>,
170 S: VersionedDataSource + 'static,
171 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
172 P: AvailabilityProvider<Types>,
173{
174 fn fetcher(&self) -> Arc<Fetcher<Types, S, P>> {
175 match self {
176 Self::Payload { fetcher } => fetcher.clone(),
177 Self::VidCommon { fetcher } => fetcher.clone(),
178 }
179 }
180
181 pub(super) fn run(self, header: Header<Types>) {
182 match self {
183 Self::Payload { fetcher } => {
184 tracing::info!(
185 "fetched leaf {}, will now fetch payload",
186 header.block_number()
187 );
188 fetch_block_with_header(fetcher, header);
189 },
190 Self::VidCommon { fetcher } => {
191 tracing::info!(
192 "fetched leaf {}, will now fetch VID common",
193 header.block_number()
194 );
195 fetch_vid_common_with_header(fetcher, header);
196 },
197 }
198 }
199}
200
201pub(super) async fn fetch_header_and_then<Types, S, P>(
202 tx: &mut impl AvailabilityStorage<Types>,
203 req: BlockId<Types>,
204 callback: HeaderCallback<Types, S, P>,
205) -> anyhow::Result<()>
206where
207 Types: NodeType,
208 Header<Types>: QueryableHeader<Types>,
209 Payload<Types>: QueryablePayload<Types>,
210 S: VersionedDataSource + 'static,
211 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
212 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
213 P: AvailabilityProvider<Types>,
214{
215 match tx.get_header(req).await {
224 Ok(header) => {
225 callback.run(header);
226 return Ok(());
227 },
228 Err(QueryError::Missing | QueryError::NotFound) => {
229 tracing::debug!(?req, "header not available locally; trying fetch");
232 },
233 Err(QueryError::Error { message }) => {
234 bail!("failed to fetch header for block {req:?}: {message}");
237 },
238 }
239
240 match req {
245 BlockId::Number(n) => {
246 fetch_leaf_with_callbacks(tx, callback.fetcher(), n.into(), [callback.into()]).await?;
247 },
248 BlockId::Hash(h) => {
249 tracing::debug!("not fetching unknown block {h}");
252 },
253 BlockId::PayloadHash(h) => {
254 tracing::debug!("not fetching block with unknown payload {h}");
256 },
257 }
258
259 Ok(())
260}