1use std::time::Duration;
2
3use espresso_types::{NamespaceId, NsProof, PubKey, StateCertQueryDataV1, StateCertQueryDataV2};
4use futures::{
5 future::{try_join_all, FutureExt, TryFutureExt},
6 join,
7 stream::{Stream, StreamExt, TryStreamExt},
8 try_join,
9};
10use hotshot_query_service::{
11 availability::{
12 self, AvailabilityDataSource, BlockQueryData, Error, FetchBlockSnafu, VidCommonQueryData,
13 },
14 node::{BlockId, NodeDataSource},
15 types::HeightIndexed,
16 ApiState,
17};
18use hotshot_types::{
19 data::VidShare,
20 simple_certificate::LightClientStateUpdateCertificateV2,
21 traits::{network::ConnectedNetwork, node_implementation::Versions},
22 vid::avidm::AvidMShare,
23};
24use snafu::OptionExt;
25use tide_disco::{method::ReadState, Api, RequestParams, StatusCode};
26use tracing::warn;
27use vbs::version::StaticVersionType;
28
29use crate::{
30 api::{
31 data_source::{
32 RequestResponseDataSource, SequencerDataSource, StateCertDataSource,
33 StateCertFetchingDataSource,
34 },
35 StorageState,
36 },
37 SeqTypes, SequencerApiVersion, SequencerPersistence,
38};
39
40pub(in crate::api) type AvailState<N, P, D, ApiVer> = ApiState<StorageState<N, P, D, ApiVer>>;
41
42type AvailabilityApi<N, P, D, V, ApiVer> = Api<AvailState<N, P, D, V>, Error, ApiVer>;
43
44async fn get_namespace_proof<S>(
54 block: &BlockQueryData<SeqTypes>,
55 common: &VidCommonQueryData<SeqTypes>,
56 ns_id: NamespaceId,
57 state: &S,
58) -> Result<Option<NsProof>, Error>
59where
60 S: ReadState,
61 S::State: NodeDataSource<SeqTypes> + RequestResponseDataSource<SeqTypes> + Sync,
62{
63 let ns_table = block.payload().ns_table();
64 let Some(ns_index) = ns_table.find_ns_id(&ns_id) else {
65 return Ok(None);
66 };
67
68 if let Some(proof) = NsProof::new(block.payload(), &ns_index, common.common()) {
71 return Ok(Some(proof));
72 }
73
74 tracing::warn!(
77 height = block.height(),
78 ?ns_id,
79 "Failed to generate namespace proof, trying to generate incorrect encoding proof"
80 );
81 let vid_shares_req = state
82 .read(move |state| {
83 state
84 .request_vid_shares(block.height(), common.clone(), Duration::from_secs(40))
85 .boxed()
86 })
87 .await;
88 let mut vid_shares = vid_shares_req.await.map_err(|err| {
89 warn!("Failed to request VID shares from network: {err:#}");
90 hotshot_query_service::availability::Error::Custom {
91 message: "Failed to request VID shares from network".to_string(),
92 status: StatusCode::NOT_FOUND,
93 }
94 })?;
95 let vid_share = state
96 .read(|state| state.vid_share(block.height() as usize).boxed())
97 .await;
98 if let Ok(vid_share) = vid_share {
99 vid_shares.push(vid_share);
100 };
101
102 let vid_shares: Vec<AvidMShare> = vid_shares
104 .into_iter()
105 .filter_map(|share| {
106 if let VidShare::V1(share) = share {
107 Some(share)
108 } else {
109 None
110 }
111 })
112 .collect();
113
114 if let Some(proof) = NsProof::v1_1_new_with_incorrect_encoding(
115 &vid_shares,
116 ns_table,
117 &ns_index,
118 &common.payload_hash(),
119 common.common(),
120 ) {
121 return Ok(Some(proof));
122 }
123
124 Err(Error::Custom {
125 message: "Failed to generate proof of incorrect encoding".to_string(),
126 status: StatusCode::INTERNAL_SERVER_ERROR,
127 })
128}
129
130fn extract_ns_proof_v1(
131 proof: Option<NsProof>,
132 ns_id: NamespaceId,
133) -> Result<espresso_types::NamespaceProofQueryData, Error> {
134 let transactions = proof
135 .as_ref()
136 .map(|proof| proof.export_all_txs(&ns_id))
137 .unwrap_or_default();
138 Ok(espresso_types::NamespaceProofQueryData {
139 transactions,
140 proof,
141 })
142}
143
144fn extract_ns_proof_v0(
145 proof: Option<NsProof>,
146 ns_id: NamespaceId,
147) -> Result<espresso_types::ADVZNamespaceProofQueryData, Error> {
148 let proof = match proof {
149 Some(NsProof::V0(proof)) => Some(proof),
150 Some(_) => {
151 return Err(Error::Custom {
152 message: "Unsupported VID version, use new API version instead.".to_string(),
153 status: StatusCode::NOT_FOUND,
154 })
155 },
156 None => None,
157 };
158 let transactions = proof
159 .as_ref()
160 .map(|proof| proof.export_all_txs(&ns_id))
161 .unwrap_or_default();
162 Ok(espresso_types::ADVZNamespaceProofQueryData {
163 transactions,
164 proof,
165 })
166}
167
168async fn get_block_for_ns_proof<S>(
169 req: &RequestParams,
170 state: &S,
171 timeout: Duration,
172) -> Result<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>), Error>
173where
174 S: ReadState,
175 S::State: AvailabilityDataSource<SeqTypes> + Sync,
176{
177 let id = if let Some(height) = req.opt_integer_param("height")? {
178 BlockId::Number(height)
179 } else if let Some(hash) = req.opt_blob_param("hash")? {
180 BlockId::Hash(hash)
181 } else {
182 BlockId::PayloadHash(req.blob_param("payload-hash")?)
183 };
184 let (fetch_block, fetch_vid) = state
185 .read(|state| async move { join!(state.get_block(id), state.get_vid_common(id)) }.boxed())
186 .await;
187 try_join!(
188 async move {
189 fetch_block
190 .with_timeout(timeout)
191 .await
192 .context(FetchBlockSnafu {
193 resource: id.to_string(),
194 })
195 },
196 async move {
197 fetch_vid
198 .with_timeout(timeout)
199 .await
200 .context(FetchBlockSnafu {
201 resource: id.to_string(),
202 })
203 }
204 )
205}
206
207async fn get_block_range_for_ns_proof<S>(
208 req: &RequestParams,
209 state: &S,
210 limit: usize,
211 timeout: Duration,
212) -> Result<Vec<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>)>, Error>
213where
214 S: ReadState,
215 S::State: AvailabilityDataSource<SeqTypes> + Sync,
216{
217 let from: usize = req.integer_param("from")?;
218 let until: usize = req.integer_param("until")?;
219 if until.saturating_sub(from) > limit {
220 return Err(Error::RangeLimit { from, until, limit });
221 }
222
223 let (blocks, vids) = state
224 .read(|state| {
225 async move {
226 join!(
227 state.get_block_range(from..until),
228 state.get_vid_common_range(from..until)
229 )
230 }
231 .boxed()
232 })
233 .await;
234 blocks
235 .zip(vids)
236 .enumerate()
237 .then(|(i, (block, vid))| async move {
238 let (Some(block), Some(vid)) =
239 join!(block.with_timeout(timeout), vid.with_timeout(timeout),)
240 else {
241 return Err(Error::FetchBlock {
242 resource: (from + i).to_string(),
243 });
244 };
245 Ok((block, vid))
246 })
247 .try_collect()
248 .await
249}
250
251fn get_block_stream_for_ns_proof<'a, S>(
252 req: RequestParams,
253 state: &'a S,
254) -> impl 'a
255 + Stream<
256 Item = Result<
257 (
258 NamespaceId,
259 BlockQueryData<SeqTypes>,
260 VidCommonQueryData<SeqTypes>,
261 ),
262 Error,
263 >,
264>
265where
266 S: ReadState,
267 S::State: AvailabilityDataSource<SeqTypes> + Sync,
268{
269 async move {
270 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
271 let height = req.integer_param("height")?;
272 Ok(state
273 .read(|state| {
274 async move {
275 state
276 .subscribe_blocks(height)
277 .await
278 .zip(state.subscribe_vid_common(height).await)
279 .map(move |(block, vid)| (ns_id, block, vid))
280 .map(Ok)
281 }
282 .boxed()
283 })
284 .await)
285 }
286 .try_flatten_stream()
287}
288
289async fn get_state_cert<S>(
290 state: &S,
291 epoch: u64,
292 timeout: Duration,
293) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, availability::Error>
294where
295 S: ReadState,
296 S::State: StateCertDataSource + StateCertFetchingDataSource<SeqTypes> + Sync,
297{
298 let state_cert = state
300 .read(|state| state.get_state_cert_by_epoch(epoch).boxed())
301 .await
302 .map_err(|e| availability::Error::Custom {
303 message: format!("Failed to get state cert: {e}"),
304 status: StatusCode::INTERNAL_SERVER_ERROR,
305 })?;
306
307 match state_cert {
308 Some(cert) => Ok(cert),
309 None => {
310 let cert = state
312 .read(|state| state.request_state_cert(epoch, timeout).boxed())
313 .await?;
314
315 state
317 .read(|state| state.insert_state_cert(epoch, cert.clone()).boxed())
318 .await
319 .map_err(|e| availability::Error::Custom {
320 message: format!("Failed to store state cert: {e}"),
321 status: StatusCode::INTERNAL_SERVER_ERROR,
322 })?;
323
324 Ok(cert)
325 },
326 }
327}
328
329pub(in crate::api) fn availability<N, P, D, V: Versions>(
333 api_ver: semver::Version,
334) -> anyhow::Result<AvailabilityApi<N, P, D, V, SequencerApiVersion>>
335where
336 N: ConnectedNetwork<PubKey>,
337 D: SequencerDataSource + Send + Sync + 'static,
338 P: SequencerPersistence,
339{
340 let mut options = availability::Options::default();
341 let extension = toml::from_str(include_str!("../../../api/availability.toml"))?;
342 options.extensions.push(extension);
343 let timeout = options.fetch_timeout;
344 let limit = options.large_object_range_limit;
345
346 let mut api = availability::define_api::<AvailState<N, P, D, _>, SeqTypes, _>(
347 &options,
348 SequencerApiVersion::instance(),
349 api_ver.clone(),
350 )?;
351
352 if api_ver.major == 1 {
353 api.at("getnamespaceproof", move |req, state| {
354 async move {
355 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
356 let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
357 let proof = get_namespace_proof(&block, &common, ns_id, state).await?;
358 extract_ns_proof_v1(proof, ns_id)
359 }
360 .boxed()
361 })?
362 .at("getnamespaceproof_range", move |req, state| {
363 async move {
364 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
365 let blocks = get_block_range_for_ns_proof(&req, state, limit, timeout).await?;
366 try_join_all(blocks.iter().map(|(block, vid)| async move {
367 let proof = get_namespace_proof(block, vid, ns_id, state).await?;
368 extract_ns_proof_v1(proof, ns_id)
369 }))
370 .await
371 }
372 .boxed()
373 })?
374 .stream("stream_namespace_proofs", move |req, state| {
375 get_block_stream_for_ns_proof(req, state)
376 .and_then(move |(ns_id, block, vid)| async move {
377 let proof = get_namespace_proof(&block, &vid, ns_id, state).await?;
378 extract_ns_proof_v1(proof, ns_id)
379 })
380 .boxed()
381 })?;
382 } else if api_ver.major == 0 {
383 api.at("getnamespaceproof", move |req, state| {
384 async move {
385 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
386 let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
387 let proof = get_namespace_proof(&block, &common, ns_id, state).await?;
388 extract_ns_proof_v0(proof, ns_id)
389 }
390 .boxed()
391 })?
392 .at("getnamespaceproof_range", move |req, state| {
393 async move {
394 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
395 let blocks = get_block_range_for_ns_proof(&req, state, limit, timeout).await?;
396 try_join_all(blocks.iter().map(|(block, vid)| async move {
397 let proof = get_namespace_proof(block, vid, ns_id, state).await?;
398 extract_ns_proof_v0(proof, ns_id)
399 }))
400 .await
401 }
402 .boxed()
403 })?
404 .stream("stream_namespace_proofs", move |req, state| {
405 get_block_stream_for_ns_proof(req, state)
406 .and_then(move |(ns_id, block, vid)| async move {
407 let proof = get_namespace_proof(&block, &vid, ns_id, state).await?;
408 extract_ns_proof_v0(proof, ns_id)
409 })
410 .boxed()
411 })?;
412 }
413
414 if api_ver.major >= 1 {
415 api.at("incorrect_encoding_proof", move |req, state| {
416 async move {
417 let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
418 let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
419 match get_namespace_proof(&block, &common, ns_id, state).await? {
420 Some(NsProof::V1IncorrectEncoding(proof)) => Ok(proof),
421 Some(_) => Err(Error::Custom {
422 message: "block was correctly encoded".into(),
423 status: StatusCode::NOT_FOUND,
424 }),
425 None => Err(Error::Custom {
426 message: "namespace not present in block".into(),
427 status: StatusCode::NOT_FOUND,
428 }),
429 }
430 }
431 .boxed()
432 })?;
433 }
434
435 api.at("get_state_cert", move |req, state| {
436 async move {
437 let epoch: u64 = req.integer_param("epoch")?;
438 let cert = get_state_cert(state, epoch, timeout).await?;
439 Ok(StateCertQueryDataV1::from(StateCertQueryDataV2(cert)))
440 }
441 .boxed()
442 })?;
443
444 api.at("get_state_cert_v2", move |req, state| {
445 async move {
446 let epoch: u64 = req.integer_param("epoch")?;
447 let cert = get_state_cert(state, epoch, timeout).await?;
448 Ok(StateCertQueryDataV2(cert))
449 }
450 .boxed()
451 })?;
452
453 Ok(api)
454}