sequencer/api/endpoints/
availability.rs

1use std::time::Duration;
2
3use espresso_types::{NamespaceId, NsProof, PubKey, StateCertQueryDataV1, StateCertQueryDataV2};
4use futures::{
5    future::{try_join_all, FutureExt, TryFutureExt},
6    join,
7    stream::{Stream, StreamExt, TryStreamExt},
8    try_join,
9};
10use hotshot_query_service::{
11    availability::{
12        self, AvailabilityDataSource, BlockQueryData, Error, FetchBlockSnafu, VidCommonQueryData,
13    },
14    node::{BlockId, NodeDataSource},
15    types::HeightIndexed,
16    ApiState,
17};
18use hotshot_types::{
19    data::VidShare,
20    simple_certificate::LightClientStateUpdateCertificateV2,
21    traits::{network::ConnectedNetwork, node_implementation::Versions},
22    vid::avidm::AvidMShare,
23};
24use snafu::OptionExt;
25use tide_disco::{method::ReadState, Api, RequestParams, StatusCode};
26use tracing::warn;
27use vbs::version::StaticVersionType;
28
29use crate::{
30    api::{
31        data_source::{
32            RequestResponseDataSource, SequencerDataSource, StateCertDataSource,
33            StateCertFetchingDataSource,
34        },
35        StorageState,
36    },
37    SeqTypes, SequencerApiVersion, SequencerPersistence,
38};
39
40pub(in crate::api) type AvailState<N, P, D, ApiVer> = ApiState<StorageState<N, P, D, ApiVer>>;
41
42type AvailabilityApi<N, P, D, V, ApiVer> = Api<AvailState<N, P, D, V>, Error, ApiVer>;
43
44/// Get a namespace proof for the given block, if possible.
45///
46/// Always returns the newest supported proof type, which supports the greatest number of possible
47/// cases (e.g. proofs can still be generated even if the block was maliciously encoded). For
48/// backwards compatibility, the resulting proof can be downgraded. However, this may fail in case a
49/// proof was only able to be generated by making use of one of the newer proof types.
50///
51/// Returns no proof (`Ok(None)`) if the requested namespace is not present at all in the given
52/// block.
53async fn get_namespace_proof<S>(
54    block: &BlockQueryData<SeqTypes>,
55    common: &VidCommonQueryData<SeqTypes>,
56    ns_id: NamespaceId,
57    state: &S,
58) -> Result<Option<NsProof>, Error>
59where
60    S: ReadState,
61    S::State: NodeDataSource<SeqTypes> + RequestResponseDataSource<SeqTypes> + Sync,
62{
63    let ns_table = block.payload().ns_table();
64    let Some(ns_index) = ns_table.find_ns_id(&ns_id) else {
65        return Ok(None);
66    };
67
68    // Optimistically, try to generate a
69    // proof for a correctly encoded block.
70    if let Some(proof) = NsProof::new(block.payload(), &ns_index, common.common()) {
71        return Ok(Some(proof));
72    }
73
74    // If we fail to generate the correct encoding proof, try to generate a v1.1 proof, which
75    // supports proof of incorrect encoding.
76    tracing::warn!(
77        height = block.height(),
78        ?ns_id,
79        "Failed to generate namespace proof, trying to generate incorrect encoding proof"
80    );
81    let vid_shares_req = state
82        .read(move |state| {
83            state
84                .request_vid_shares(block.height(), common.clone(), Duration::from_secs(40))
85                .boxed()
86        })
87        .await;
88    let mut vid_shares = vid_shares_req.await.map_err(|err| {
89        warn!("Failed to request VID shares from network: {err:#}");
90        hotshot_query_service::availability::Error::Custom {
91            message: "Failed to request VID shares from network".to_string(),
92            status: StatusCode::NOT_FOUND,
93        }
94    })?;
95    let vid_share = state
96        .read(|state| state.vid_share(block.height() as usize).boxed())
97        .await;
98    if let Ok(vid_share) = vid_share {
99        vid_shares.push(vid_share);
100    };
101
102    // Collect the shares as V1 shares
103    let vid_shares: Vec<AvidMShare> = vid_shares
104        .into_iter()
105        .filter_map(|share| {
106            if let VidShare::V1(share) = share {
107                Some(share)
108            } else {
109                None
110            }
111        })
112        .collect();
113
114    if let Some(proof) = NsProof::v1_1_new_with_incorrect_encoding(
115        &vid_shares,
116        ns_table,
117        &ns_index,
118        &common.payload_hash(),
119        common.common(),
120    ) {
121        return Ok(Some(proof));
122    }
123
124    Err(Error::Custom {
125        message: "Failed to generate proof of incorrect encoding".to_string(),
126        status: StatusCode::INTERNAL_SERVER_ERROR,
127    })
128}
129
130fn extract_ns_proof_v1(
131    proof: Option<NsProof>,
132    ns_id: NamespaceId,
133) -> Result<espresso_types::NamespaceProofQueryData, Error> {
134    let transactions = proof
135        .as_ref()
136        .map(|proof| proof.export_all_txs(&ns_id))
137        .unwrap_or_default();
138    Ok(espresso_types::NamespaceProofQueryData {
139        transactions,
140        proof,
141    })
142}
143
144fn extract_ns_proof_v0(
145    proof: Option<NsProof>,
146    ns_id: NamespaceId,
147) -> Result<espresso_types::ADVZNamespaceProofQueryData, Error> {
148    let proof = match proof {
149        Some(NsProof::V0(proof)) => Some(proof),
150        Some(_) => {
151            return Err(Error::Custom {
152                message: "Unsupported VID version, use new API version instead.".to_string(),
153                status: StatusCode::NOT_FOUND,
154            })
155        },
156        None => None,
157    };
158    let transactions = proof
159        .as_ref()
160        .map(|proof| proof.export_all_txs(&ns_id))
161        .unwrap_or_default();
162    Ok(espresso_types::ADVZNamespaceProofQueryData {
163        transactions,
164        proof,
165    })
166}
167
168async fn get_block_for_ns_proof<S>(
169    req: &RequestParams,
170    state: &S,
171    timeout: Duration,
172) -> Result<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>), Error>
173where
174    S: ReadState,
175    S::State: AvailabilityDataSource<SeqTypes> + Sync,
176{
177    let id = if let Some(height) = req.opt_integer_param("height")? {
178        BlockId::Number(height)
179    } else if let Some(hash) = req.opt_blob_param("hash")? {
180        BlockId::Hash(hash)
181    } else {
182        BlockId::PayloadHash(req.blob_param("payload-hash")?)
183    };
184    let (fetch_block, fetch_vid) = state
185        .read(|state| async move { join!(state.get_block(id), state.get_vid_common(id)) }.boxed())
186        .await;
187    try_join!(
188        async move {
189            fetch_block
190                .with_timeout(timeout)
191                .await
192                .context(FetchBlockSnafu {
193                    resource: id.to_string(),
194                })
195        },
196        async move {
197            fetch_vid
198                .with_timeout(timeout)
199                .await
200                .context(FetchBlockSnafu {
201                    resource: id.to_string(),
202                })
203        }
204    )
205}
206
207async fn get_block_range_for_ns_proof<S>(
208    req: &RequestParams,
209    state: &S,
210    limit: usize,
211    timeout: Duration,
212) -> Result<Vec<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>)>, Error>
213where
214    S: ReadState,
215    S::State: AvailabilityDataSource<SeqTypes> + Sync,
216{
217    let from: usize = req.integer_param("from")?;
218    let until: usize = req.integer_param("until")?;
219    if until.saturating_sub(from) > limit {
220        return Err(Error::RangeLimit { from, until, limit });
221    }
222
223    let (blocks, vids) = state
224        .read(|state| {
225            async move {
226                join!(
227                    state.get_block_range(from..until),
228                    state.get_vid_common_range(from..until)
229                )
230            }
231            .boxed()
232        })
233        .await;
234    blocks
235        .zip(vids)
236        .enumerate()
237        .then(|(i, (block, vid))| async move {
238            let (Some(block), Some(vid)) =
239                join!(block.with_timeout(timeout), vid.with_timeout(timeout),)
240            else {
241                return Err(Error::FetchBlock {
242                    resource: (from + i).to_string(),
243                });
244            };
245            Ok((block, vid))
246        })
247        .try_collect()
248        .await
249}
250
251fn get_block_stream_for_ns_proof<'a, S>(
252    req: RequestParams,
253    state: &'a S,
254) -> impl 'a
255       + Stream<
256    Item = Result<
257        (
258            NamespaceId,
259            BlockQueryData<SeqTypes>,
260            VidCommonQueryData<SeqTypes>,
261        ),
262        Error,
263    >,
264>
265where
266    S: ReadState,
267    S::State: AvailabilityDataSource<SeqTypes> + Sync,
268{
269    async move {
270        let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
271        let height = req.integer_param("height")?;
272        Ok(state
273            .read(|state| {
274                async move {
275                    state
276                        .subscribe_blocks(height)
277                        .await
278                        .zip(state.subscribe_vid_common(height).await)
279                        .map(move |(block, vid)| (ns_id, block, vid))
280                        .map(Ok)
281                }
282                .boxed()
283            })
284            .await)
285    }
286    .try_flatten_stream()
287}
288
289async fn get_state_cert<S>(
290    state: &S,
291    epoch: u64,
292    timeout: Duration,
293) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, availability::Error>
294where
295    S: ReadState,
296    S::State: StateCertDataSource + StateCertFetchingDataSource<SeqTypes> + Sync,
297{
298    // Try to get from local storage first
299    let state_cert = state
300        .read(|state| state.get_state_cert_by_epoch(epoch).boxed())
301        .await
302        .map_err(|e| availability::Error::Custom {
303            message: format!("Failed to get state cert: {e}"),
304            status: StatusCode::INTERNAL_SERVER_ERROR,
305        })?;
306
307    match state_cert {
308        Some(cert) => Ok(cert),
309        None => {
310            // Not found locally, try to fetch from peers
311            let cert = state
312                .read(|state| state.request_state_cert(epoch, timeout).boxed())
313                .await?;
314
315            // Store the fetched certificate
316            state
317                .read(|state| state.insert_state_cert(epoch, cert.clone()).boxed())
318                .await
319                .map_err(|e| availability::Error::Custom {
320                    message: format!("Failed to store state cert: {e}"),
321                    status: StatusCode::INTERNAL_SERVER_ERROR,
322                })?;
323
324            Ok(cert)
325        },
326    }
327}
328
329// TODO (abdul): replace snafu with `this_error` in  hotshot query service
330// Snafu has been replaced by `this_error` everywhere.
331// However, the query service still uses snafu
332pub(in crate::api) fn availability<N, P, D, V: Versions>(
333    api_ver: semver::Version,
334) -> anyhow::Result<AvailabilityApi<N, P, D, V, SequencerApiVersion>>
335where
336    N: ConnectedNetwork<PubKey>,
337    D: SequencerDataSource + Send + Sync + 'static,
338    P: SequencerPersistence,
339{
340    let mut options = availability::Options::default();
341    let extension = toml::from_str(include_str!("../../../api/availability.toml"))?;
342    options.extensions.push(extension);
343    let timeout = options.fetch_timeout;
344    let limit = options.large_object_range_limit;
345
346    let mut api = availability::define_api::<AvailState<N, P, D, _>, SeqTypes, _>(
347        &options,
348        SequencerApiVersion::instance(),
349        api_ver.clone(),
350    )?;
351
352    if api_ver.major == 1 {
353        api.at("getnamespaceproof", move |req, state| {
354            async move {
355                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
356                let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
357                let proof = get_namespace_proof(&block, &common, ns_id, state).await?;
358                extract_ns_proof_v1(proof, ns_id)
359            }
360            .boxed()
361        })?
362        .at("getnamespaceproof_range", move |req, state| {
363            async move {
364                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
365                let blocks = get_block_range_for_ns_proof(&req, state, limit, timeout).await?;
366                try_join_all(blocks.iter().map(|(block, vid)| async move {
367                    let proof = get_namespace_proof(block, vid, ns_id, state).await?;
368                    extract_ns_proof_v1(proof, ns_id)
369                }))
370                .await
371            }
372            .boxed()
373        })?
374        .stream("stream_namespace_proofs", move |req, state| {
375            get_block_stream_for_ns_proof(req, state)
376                .and_then(move |(ns_id, block, vid)| async move {
377                    let proof = get_namespace_proof(&block, &vid, ns_id, state).await?;
378                    extract_ns_proof_v1(proof, ns_id)
379                })
380                .boxed()
381        })?;
382    } else if api_ver.major == 0 {
383        api.at("getnamespaceproof", move |req, state| {
384            async move {
385                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
386                let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
387                let proof = get_namespace_proof(&block, &common, ns_id, state).await?;
388                extract_ns_proof_v0(proof, ns_id)
389            }
390            .boxed()
391        })?
392        .at("getnamespaceproof_range", move |req, state| {
393            async move {
394                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
395                let blocks = get_block_range_for_ns_proof(&req, state, limit, timeout).await?;
396                try_join_all(blocks.iter().map(|(block, vid)| async move {
397                    let proof = get_namespace_proof(block, vid, ns_id, state).await?;
398                    extract_ns_proof_v0(proof, ns_id)
399                }))
400                .await
401            }
402            .boxed()
403        })?
404        .stream("stream_namespace_proofs", move |req, state| {
405            get_block_stream_for_ns_proof(req, state)
406                .and_then(move |(ns_id, block, vid)| async move {
407                    let proof = get_namespace_proof(&block, &vid, ns_id, state).await?;
408                    extract_ns_proof_v0(proof, ns_id)
409                })
410                .boxed()
411        })?;
412    }
413
414    if api_ver.major >= 1 {
415        api.at("incorrect_encoding_proof", move |req, state| {
416            async move {
417                let ns_id = NamespaceId::from(req.integer_param::<_, u32>("namespace")?);
418                let (block, common) = get_block_for_ns_proof(&req, state, timeout).await?;
419                match get_namespace_proof(&block, &common, ns_id, state).await? {
420                    Some(NsProof::V1IncorrectEncoding(proof)) => Ok(proof),
421                    Some(_) => Err(Error::Custom {
422                        message: "block was correctly encoded".into(),
423                        status: StatusCode::NOT_FOUND,
424                    }),
425                    None => Err(Error::Custom {
426                        message: "namespace not present in block".into(),
427                        status: StatusCode::NOT_FOUND,
428                    }),
429                }
430            }
431            .boxed()
432        })?;
433    }
434
435    api.at("get_state_cert", move |req, state| {
436        async move {
437            let epoch: u64 = req.integer_param("epoch")?;
438            let cert = get_state_cert(state, epoch, timeout).await?;
439            Ok(StateCertQueryDataV1::from(StateCertQueryDataV2(cert)))
440        }
441        .boxed()
442    })?;
443
444    api.at("get_state_cert_v2", move |req, state| {
445        async move {
446            let epoch: u64 = req.integer_param("epoch")?;
447            let cert = get_state_cert(state, epoch, timeout).await?;
448            Ok(StateCertQueryDataV2(cert))
449        }
450        .boxed()
451    })?;
452
453    Ok(api)
454}