sequencer/api/
light_client.rs

1use std::{fmt::Display, sync::Arc, time::Duration};
2
3use anyhow::Result;
4use espresso_types::{BlockMerkleTree, NsProof, SeqTypes};
5use futures::{
6    future::{try_join, FutureExt},
7    stream::StreamExt,
8    TryStreamExt,
9};
10use hotshot_query_service::{
11    availability::{self, AvailabilityDataSource, LeafId},
12    data_source::{storage::NodeStorage, VersionedDataSource},
13    merklized_state::{MerklizedStateDataSource, Snapshot},
14    node::BlockId,
15    types::HeightIndexed,
16    Error,
17};
18use hotshot_types::utils::{epoch_from_block_number, root_block_in_epoch};
19use itertools::izip;
20use jf_merkle_tree_compat::MerkleTreeScheme;
21use light_client::consensus::{
22    header::HeaderProof, leaf::LeafProof, namespace::NamespaceProof, payload::PayloadProof,
23};
24use tide_disco::{method::ReadState, Api, RequestParams, StatusCode};
25use vbs::version::StaticVersionType;
26
27use crate::api::data_source::{NodeStateDataSource, StakeTableDataSource};
28
29async fn get_leaf_proof<State>(
30    state: &State,
31    requested: usize,
32    finalized: Option<usize>,
33    fetch_timeout: Duration,
34) -> Result<LeafProof, Error>
35where
36    State: AvailabilityDataSource<SeqTypes> + VersionedDataSource,
37    for<'a> State::ReadOnly<'a>: NodeStorage<SeqTypes>,
38{
39    let (endpoint, qc_chain) = match finalized {
40        Some(finalized) => {
41            // If we have a known-finalized block, we will not need a final 2-chain of QCs to prove
42            // the last leaf in the result finalized, since we will either terminate with a 3-chain
43            // of leaves or at the `finalized` leaf. Thus, we can use None for the final QC chain.
44            if finalized <= requested {
45                return Err(Error::Custom {
46                    message: format!(
47                        "finalized leaf height ({finalized}) must be greater than requested \
48                         ({requested})"
49                    ),
50                    status: StatusCode::BAD_REQUEST,
51                });
52            }
53            (finalized, None)
54        },
55        None => {
56            async {
57                // Grab the endpoint and the final QC chain in the same transaction, to ensure that
58                // the QC chain actually corresponds to the endpoint block (and is not subject to
59                // concurrent updates).
60                let mut tx = state.read().await?;
61                let height = NodeStorage::block_height(&mut tx).await?;
62                let qc_chain = tx.latest_qc_chain().await?;
63                Ok((height, qc_chain))
64            }
65            .await
66            .map_err(|err: anyhow::Error| Error::Custom {
67                message: err.to_string(),
68                status: StatusCode::INTERNAL_SERVER_ERROR,
69            })?
70        },
71    };
72    let mut leaves = state.get_leaf_range(requested..endpoint).await;
73    let mut proof = LeafProof::default();
74
75    while let Some(leaf) = leaves.next().await {
76        let leaf = leaf
77            .with_timeout(fetch_timeout)
78            .await
79            .ok_or_else(|| not_found("missing leaves"))?;
80
81        if proof.push(leaf) {
82            return Ok(proof);
83        }
84    }
85
86    // We reached the end of the range of interest without encountering a 3-chain. Thus, if the last
87    // leaf in the chain is not already assumed finalized by the client, we must prove it finalized
88    // by appending two more QCs.
89    if finalized.is_none() {
90        let Some([committing_qc, deciding_qc]) = qc_chain else {
91            return Err(not_found("missing QC 2-chain to prove finality"));
92        };
93        proof.add_qc_chain(Arc::new(committing_qc), Arc::new(deciding_qc));
94    }
95
96    Ok(proof)
97}
98
99async fn get_header_proof<State>(
100    state: &State,
101    root: u64,
102    requested: BlockId<SeqTypes>,
103    fetch_timeout: Duration,
104) -> Result<HeaderProof, Error>
105where
106    State: AvailabilityDataSource<SeqTypes>
107        + MerklizedStateDataSource<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
108        + VersionedDataSource,
109{
110    let header = state
111        .get_header(requested)
112        .await
113        .with_timeout(fetch_timeout)
114        .await
115        .ok_or_else(|| not_found(format!("unknown header {requested}")))?;
116    if header.height() >= root {
117        return Err(Error::Custom {
118            message: format!(
119                "height ({}) must be less than root ({root})",
120                header.height()
121            ),
122            status: StatusCode::BAD_REQUEST,
123        });
124    }
125    let path = MerklizedStateDataSource::<SeqTypes, BlockMerkleTree, _>::get_path(
126        state,
127        Snapshot::Index(root),
128        header.height(),
129    )
130    .await
131    .map_err(|source| Error::MerklizedState {
132        source: source.into(),
133    })?;
134
135    Ok(HeaderProof::new(header, path))
136}
137
138async fn get_namespace_proof_range<State>(
139    state: &State,
140    start: usize,
141    end: usize,
142    namespace: u64,
143    fetch_timeout: Duration,
144    large_object_range_limit: usize,
145) -> Result<Vec<NamespaceProof>, Error>
146where
147    State: AvailabilityDataSource<SeqTypes>,
148{
149    if end <= start {
150        return Err(Error::Custom {
151            message: format!("requested empty interval [{start}, {end})"),
152            status: StatusCode::BAD_REQUEST,
153        });
154    }
155    if end - start > large_object_range_limit {
156        return Err(Error::Custom {
157            message: format!(
158                "requested range [{start}, {end}) exceeds maximum size {large_object_range_limit}"
159            ),
160            status: StatusCode::BAD_REQUEST,
161        });
162    }
163
164    let fetch_headers = async move {
165        state
166            .get_header_range(start..end)
167            .await
168            .enumerate()
169            .then(|(i, fetch)| async move {
170                fetch
171                    .with_timeout(fetch_timeout)
172                    .await
173                    .ok_or_else(|| Error::Custom {
174                        message: format!("missing header {}", start + i),
175                        status: StatusCode::NOT_FOUND,
176                    })
177            })
178            .try_collect::<Vec<_>>()
179            .await
180    };
181    let fetch_payloads = async move {
182        state
183            .get_payload_range(start..end)
184            .await
185            .enumerate()
186            .then(|(i, fetch)| async move {
187                fetch
188                    .with_timeout(fetch_timeout)
189                    .await
190                    .ok_or_else(|| Error::Custom {
191                        message: format!("missing payload {}", start + i),
192                        status: StatusCode::NOT_FOUND,
193                    })
194            })
195            .try_collect::<Vec<_>>()
196            .await
197    };
198    let fetch_vid_commons = async move {
199        state
200            .get_vid_common_range(start..end)
201            .await
202            .enumerate()
203            .then(|(i, fetch)| async move {
204                fetch
205                    .with_timeout(fetch_timeout)
206                    .await
207                    .ok_or_else(|| Error::Custom {
208                        message: format!("missing VID common {}", start + i),
209                        status: StatusCode::NOT_FOUND,
210                    })
211            })
212            .try_collect::<Vec<_>>()
213            .await
214    };
215    let (headers, (payloads, vid_commons)) =
216        try_join(fetch_headers, try_join(fetch_payloads, fetch_vid_commons)).await?;
217
218    izip!(headers, payloads, vid_commons)
219        .map(|(header, payload, vid_common)| {
220            let Some(ns_index) = header.ns_table().find_ns_id(&namespace.into()) else {
221                return Ok(NamespaceProof::not_present());
222            };
223            let ns_proof = NsProof::new(payload.data(), &ns_index, vid_common.common())
224                .ok_or_else(|| Error::Custom {
225                    message: "failed to construct namespace proof".into(),
226                    status: StatusCode::INTERNAL_SERVER_ERROR,
227                })?;
228            Ok(NamespaceProof::new(ns_proof, vid_common.common().clone()))
229        })
230        .collect()
231}
232
233#[derive(Debug)]
234pub(super) struct Options {
235    /// Timeout for failing requests due to missing data.
236    ///
237    /// If data needed to respond to a request is missing, it can (in some cases) be fetched from an
238    /// external provider. This parameter controls how long the request handler will wait for
239    /// missing data to be fetched before giving up and failing the request.
240    pub fetch_timeout: Duration,
241
242    /// The maximum number of large objects which can be loaded in a single range query.
243    ///
244    /// Large objects include anything that _might_ contain a full payload or an object proportional
245    /// in size to a payload. Note that this limit applies to the entire class of objects: we do not
246    /// check the size of objects while loading to determine which limit to apply. If an object
247    /// belongs to a class which might contain a large payload, the large object limit always
248    /// applies.
249    pub large_object_range_limit: usize,
250}
251
252impl Default for Options {
253    fn default() -> Self {
254        Self {
255            fetch_timeout: Duration::from_millis(500),
256            large_object_range_limit: availability::Options::default().large_object_range_limit,
257        }
258    }
259}
260
261pub(super) fn define_api<S, ApiVer: StaticVersionType + 'static>(
262    opt: Options,
263    api_ver: semver::Version,
264) -> Result<Api<S, Error, ApiVer>>
265where
266    S: ReadState + Send + Sync + 'static,
267    S::State: AvailabilityDataSource<SeqTypes>
268        + MerklizedStateDataSource<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
269        + NodeStateDataSource
270        + StakeTableDataSource<SeqTypes>
271        + VersionedDataSource,
272    for<'a> <S::State as VersionedDataSource>::ReadOnly<'a>: NodeStorage<SeqTypes>,
273{
274    let toml = toml::from_str::<toml::Value>(include_str!("../../api/light-client.toml"))?;
275    let mut api = Api::<S, Error, ApiVer>::new(toml)?;
276    api.with_version(api_ver);
277
278    let Options {
279        fetch_timeout,
280        large_object_range_limit,
281    } = opt;
282
283    api.get("leaf", move |req, state| {
284        async move {
285            let requested = leaf_height_from_req(&req, state, fetch_timeout).await?;
286            let finalized = req
287                .opt_integer_param("finalized")
288                .map_err(bad_param("finalized"))?;
289            get_leaf_proof(state, requested, finalized, fetch_timeout).await
290        }
291        .boxed()
292    })?
293    .get("header", move |req, state| {
294        async move {
295            let root = req.integer_param("root").map_err(bad_param("root"))?;
296            let requested = block_id_from_req(&req)?;
297            get_header_proof(state, root, requested, fetch_timeout).await
298        }
299        .boxed()
300    })?
301    .get("stake_table", move |req, state| {
302        async move {
303            let epoch: u64 = req.integer_param("epoch").map_err(bad_param("epoch"))?;
304
305            let node_state = state.node_state().await;
306            let epoch_height = node_state.epoch_height.ok_or_else(|| Error::Custom {
307                message: "epoch state not set".into(),
308                status: StatusCode::INTERNAL_SERVER_ERROR,
309            })?;
310            let first_epoch = epoch_from_block_number(node_state.epoch_start_block, epoch_height);
311
312            if epoch < first_epoch + 2 {
313                return Err(Error::Custom {
314                    message: format!("epoch must be at least {}", first_epoch + 2),
315                    status: StatusCode::BAD_REQUEST,
316                });
317            }
318
319            // Find the range of L1 block containing events for this epoch. This is determined by
320            // the `l1_finalized` field of the epoch root (from two epochs prior) and the previous
321            // epoch's epoch root.
322            let epoch_root_height = root_block_in_epoch(epoch - 2, epoch_height) as usize;
323            let epoch_root = state
324                .get_header(epoch_root_height)
325                .await
326                .with_timeout(fetch_timeout)
327                .await
328                .ok_or_else(|| {
329                    not_found(format!("missing epoch root header {epoch_root_height}"))
330                })?;
331            let to_l1_block = epoch_root
332                .l1_finalized()
333                .ok_or_else(|| Error::Custom {
334                    message: "epoch root header is missing L1 finalized block".into(),
335                    status: StatusCode::INTERNAL_SERVER_ERROR,
336                })?
337                .number();
338
339            let from_l1_block = if epoch >= first_epoch + 3 {
340                let prev_epoch_root_height = root_block_in_epoch(epoch - 3, epoch_height) as usize;
341                let prev_epoch_root = state
342                    .get_header(prev_epoch_root_height)
343                    .await
344                    .with_timeout(fetch_timeout)
345                    .await
346                    .ok_or_else(|| {
347                        not_found(format!(
348                            "missing previous epoch root header {prev_epoch_root_height}"
349                        ))
350                    })?;
351                prev_epoch_root
352                    .l1_finalized()
353                    .ok_or_else(|| Error::Custom {
354                        message: "previous epoch root header is missing L1 finalized block".into(),
355                        status: StatusCode::INTERNAL_SERVER_ERROR,
356                    })?
357                    .number()
358                    + 1
359            } else {
360                0
361            };
362
363            state
364                .stake_table_events(from_l1_block, to_l1_block)
365                .await
366                .map_err(|err| Error::Custom {
367                    message: format!("failed to load stake table events: {err:#}"),
368                    status: StatusCode::INTERNAL_SERVER_ERROR,
369                })
370        }
371        .boxed()
372    })?
373    .get("payload", move |req, state| {
374        async move {
375            let height: usize = req.integer_param("height").map_err(bad_param("height"))?;
376            let fetch_payload = async move {
377                state
378                    .get_payload(height)
379                    .await
380                    .with_timeout(fetch_timeout)
381                    .await
382                    .ok_or_else(|| Error::Custom {
383                        message: format!("missing payload {height}"),
384                        status: StatusCode::NOT_FOUND,
385                    })
386            };
387            let fetch_vid_common = async move {
388                state
389                    .get_vid_common(height)
390                    .await
391                    .with_timeout(fetch_timeout)
392                    .await
393                    .ok_or_else(|| Error::Custom {
394                        message: format!("missing VID common {height}"),
395                        status: StatusCode::NOT_FOUND,
396                    })
397            };
398            let (payload, vid_common) = try_join(fetch_payload, fetch_vid_common).await?;
399            Ok(PayloadProof::new(
400                payload.data().clone(),
401                vid_common.common().clone(),
402            ))
403        }
404        .boxed()
405    })?
406    .get("namespace", move |req, state| {
407        async move {
408            let height = req.integer_param("height").map_err(bad_param("height"))?;
409            let namespace = req
410                .integer_param("namespace")
411                .map_err(bad_param("namespace"))?;
412            let mut proofs = get_namespace_proof_range(
413                state,
414                height,
415                height + 1,
416                namespace,
417                fetch_timeout,
418                large_object_range_limit,
419            )
420            .await?;
421            if proofs.len() != 1 {
422                tracing::error!(
423                    height,
424                    namespace,
425                    ?proofs,
426                    "get_namespace_proof_range should have returned exactly one proof"
427                );
428                return Err(Error::Custom {
429                    message: "internal consistency error".into(),
430                    status: StatusCode::INTERNAL_SERVER_ERROR,
431                });
432            }
433            Ok(proofs.remove(0))
434        }
435        .boxed()
436    })?
437    .get("namespace_range", move |req, state| {
438        async move {
439            let start = req.integer_param("start").map_err(bad_param("start"))?;
440            let end = req.integer_param("end").map_err(bad_param("end"))?;
441            let namespace = req
442                .integer_param("namespace")
443                .map_err(bad_param("namespace"))?;
444            get_namespace_proof_range(
445                state,
446                start,
447                end,
448                namespace,
449                fetch_timeout,
450                large_object_range_limit,
451            )
452            .await
453        }
454        .boxed()
455    })?;
456
457    Ok(api)
458}
459
460async fn leaf_height_from_req<S>(
461    req: &RequestParams,
462    state: &S,
463    fetch_timeout: Duration,
464) -> Result<usize, Error>
465where
466    S: AvailabilityDataSource<SeqTypes>,
467{
468    if let Some(height) = req
469        .opt_integer_param("height")
470        .map_err(bad_param("height"))?
471    {
472        return Ok(height);
473    } else if let Some(hash) = req.opt_blob_param("hash").map_err(bad_param("hash"))? {
474        let leaf = state
475            .get_leaf(LeafId::Hash(hash))
476            .await
477            .with_timeout(fetch_timeout)
478            .await
479            .ok_or_else(|| not_found(format!("unknown leaf hash {hash}")))?;
480        return Ok(leaf.height() as usize);
481    } else if let Some(hash) = req
482        .opt_blob_param("block-hash")
483        .map_err(bad_param("block-hash"))?
484    {
485        let header = state
486            .get_header(BlockId::Hash(hash))
487            .await
488            .with_timeout(fetch_timeout)
489            .await
490            .ok_or_else(|| not_found(format!("unknown block hash {hash}")))?;
491        return Ok(header.height() as usize);
492    } else if let Some(hash) = req
493        .opt_blob_param("payload-hash")
494        .map_err(bad_param("payload-hash"))?
495    {
496        let header = state
497            .get_header(BlockId::PayloadHash(hash))
498            .await
499            .with_timeout(fetch_timeout)
500            .await
501            .ok_or_else(|| not_found(format!("unknown payload hash {hash}")))?;
502        return Ok(header.height() as usize);
503    }
504
505    Err(Error::Custom {
506        message: "missing parameter: requested leaf must be identified by height, hash, block \
507                  hash, or payload hash"
508            .into(),
509        status: StatusCode::BAD_REQUEST,
510    })
511}
512
513fn block_id_from_req(req: &RequestParams) -> Result<BlockId<SeqTypes>, Error> {
514    if let Some(height) = req
515        .opt_integer_param("height")
516        .map_err(bad_param("height"))?
517    {
518        Ok(BlockId::Number(height))
519    } else if let Some(hash) = req.opt_blob_param("hash").map_err(bad_param("hash"))? {
520        Ok(BlockId::Hash(hash))
521    } else if let Some(hash) = req
522        .opt_blob_param("payload-hash")
523        .map_err(bad_param("payload-hash"))?
524    {
525        Ok(BlockId::PayloadHash(hash))
526    } else {
527        Err(Error::Custom {
528            message: "missing parameter: requested header must be identified by height, hash, or \
529                      payload hash"
530                .into(),
531            status: StatusCode::BAD_REQUEST,
532        })
533    }
534}
535
536fn bad_param<E>(name: &'static str) -> impl FnOnce(E) -> Error
537where
538    E: Display,
539{
540    move |err| Error::Custom {
541        message: format!("{name}: {err:#}"),
542        status: StatusCode::BAD_REQUEST,
543    }
544}
545
546fn not_found(msg: impl Into<String>) -> Error {
547    Error::Custom {
548        message: msg.into(),
549        status: StatusCode::NOT_FOUND,
550    }
551}
552
553#[cfg(test)]
554mod test {
555    use espresso_types::{DrbAndHeaderUpgradeVersion, EpochVersion, BLOCK_MERKLE_TREE_HEIGHT};
556    use futures::future::join_all;
557    use hotshot_query_service::{
558        availability::{BlockQueryData, TransactionIndex, VidCommonQueryData},
559        data_source::{storage::UpdateAvailabilityStorage, Transaction},
560        merklized_state::UpdateStateData,
561    };
562    use hotshot_types::simple_certificate::CertificatePair;
563    use jf_merkle_tree_compat::{AppendableMerkleTreeScheme, ToTraversalPath};
564    use light_client::{
565        consensus::leaf::{FinalityProof, LeafProofHint},
566        testing::{
567            leaf_chain, leaf_chain_with_upgrade, AlwaysTrueQuorum, EnableEpochs, LegacyVersion,
568            TestClient, VersionCheckQuorum,
569        },
570    };
571    use tide_disco::Error;
572
573    use super::*;
574    use crate::api::{
575        data_source::{testing::TestableSequencerDataSource, SequencerDataSource},
576        sql::DataSource,
577    };
578
579    #[test_log::test(tokio::test(flavor = "multi_thread"))]
580    async fn test_two_chain() {
581        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
582        let ds = DataSource::create(
583            DataSource::persistence_options(&storage),
584            Default::default(),
585            false,
586        )
587        .await
588        .unwrap();
589
590        // Insert some leaves, forming a chain.
591        let leaves = leaf_chain::<EpochVersion>(1..=3).await;
592        {
593            let mut tx = ds.write().await.unwrap();
594            tx.insert_leaf(leaves[0].clone()).await.unwrap();
595            tx.insert_leaf(leaves[1].clone()).await.unwrap();
596            tx.insert_leaf(leaves[2].clone()).await.unwrap();
597            tx.commit().await.unwrap();
598        }
599
600        // Ask for the first leaf; it is proved finalized by the chain formed along with the second.
601        let proof = get_leaf_proof(&ds, 1, None, Duration::MAX).await.unwrap();
602        assert_eq!(
603            proof
604                .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
605                .await
606                .unwrap(),
607            leaves[0]
608        );
609    }
610
611    #[test_log::test(tokio::test(flavor = "multi_thread"))]
612    async fn test_finalized() {
613        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
614        let ds = DataSource::create(
615            DataSource::persistence_options(&storage),
616            Default::default(),
617            false,
618        )
619        .await
620        .unwrap();
621
622        // Insert a single leaf. We will not be able to provide proofs ending in a leaf chain, but
623        // we can return a leaf if the leaf after it is already known to be finalized.
624        let leaves = leaf_chain::<EpochVersion>(1..=2).await;
625        {
626            let mut tx = ds.write().await.unwrap();
627            tx.insert_leaf(leaves[0].clone()).await.unwrap();
628            tx.commit().await.unwrap();
629        }
630
631        let proof = get_leaf_proof(&ds, 1, Some(2), Duration::MAX)
632            .await
633            .unwrap();
634        assert_eq!(
635            proof
636                .verify(LeafProofHint::assumption(leaves[1].leaf()))
637                .await
638                .unwrap(),
639            leaves[0]
640        );
641    }
642
643    #[test_log::test(tokio::test(flavor = "multi_thread"))]
644    async fn test_bad_finalized() {
645        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
646        let ds = DataSource::create(
647            DataSource::persistence_options(&storage),
648            Default::default(),
649            false,
650        )
651        .await
652        .unwrap();
653
654        // Insert a single leaf. If we request this leaf but provide a finalized leaf which is
655        // earlier, we should fail.
656        let leaves = leaf_chain::<EpochVersion>(1..2).await;
657        {
658            let mut tx = ds.write().await.unwrap();
659            tx.insert_leaf(leaves[0].clone()).await.unwrap();
660            tx.commit().await.unwrap();
661        }
662
663        let err = get_leaf_proof(&ds, 1, Some(0), Duration::MAX)
664            .await
665            .unwrap_err();
666        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
667    }
668
669    #[test_log::test(tokio::test(flavor = "multi_thread"))]
670    async fn test_no_chain() {
671        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
672        let ds = DataSource::create(
673            DataSource::persistence_options(&storage),
674            Default::default(),
675            false,
676        )
677        .await
678        .unwrap();
679
680        // Insert multiple leaves that don't chain. We will not be able to prove these are
681        // finalized.
682        let leaves = leaf_chain::<EpochVersion>(1..=4).await;
683        {
684            let mut tx = ds.write().await.unwrap();
685            tx.insert_leaf(leaves[0].clone()).await.unwrap();
686            tx.insert_leaf(leaves[2].clone()).await.unwrap();
687            tx.insert_leaf(leaves[3].clone()).await.unwrap();
688            tx.commit().await.unwrap();
689        }
690
691        let err = get_leaf_proof(&ds, 1, None, Duration::from_secs(1))
692            .await
693            .unwrap_err();
694        assert_eq!(err.status(), StatusCode::NOT_FOUND);
695
696        // Even if we start from a finalized leave that extends one of the leaves we do have (4,
697        // extends 3) we fail to generate a proof because we can't generate a chain from the
698        // requested leaf (1) to the finalized leaf (4), since leaf 2 is missing.
699        let err = get_leaf_proof(&ds, 1, Some(4), Duration::from_secs(1))
700            .await
701            .unwrap_err();
702        assert_eq!(err.status(), StatusCode::NOT_FOUND);
703    }
704
705    #[test_log::test(tokio::test(flavor = "multi_thread"))]
706    async fn test_final_qcs() {
707        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
708        let ds = DataSource::create(
709            DataSource::persistence_options(&storage),
710            Default::default(),
711            false,
712        )
713        .await
714        .unwrap();
715
716        // Insert a single leaf, plus an extra QC chain proving it finalized.
717        let leaves = leaf_chain::<EpochVersion>(1..=3).await;
718        let qcs = [
719            CertificatePair::for_parent(leaves[1].leaf()),
720            CertificatePair::for_parent(leaves[2].leaf()),
721        ];
722        {
723            let mut tx = ds.write().await.unwrap();
724            tx.insert_leaf_with_qc_chain(leaves[0].clone(), Some(qcs.clone()))
725                .await
726                .unwrap();
727            tx.commit().await.unwrap();
728        }
729
730        let proof = get_leaf_proof(&ds, 1, None, Duration::MAX).await.unwrap();
731        assert_eq!(
732            proof
733                .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
734                .await
735                .unwrap(),
736            leaves[0]
737        );
738    }
739
740    #[test_log::test(tokio::test(flavor = "multi_thread"))]
741    async fn test_upgrade_to_epochs() {
742        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
743        let ds = DataSource::create(
744            DataSource::persistence_options(&storage),
745            Default::default(),
746            false,
747        )
748        .await
749        .unwrap();
750
751        // Upgrade to epochs (and enabling HotStuff2) in the middle of a leaf chain, so that the
752        // last leaf in the chain only requires 2 QCs to verify, even though at the start of the
753        // chain we would have required 3.
754        let leaves = leaf_chain_with_upgrade::<EnableEpochs>(1..=4, 2).await;
755        assert_eq!(leaves[0].header().version(), LegacyVersion::version());
756        assert_eq!(
757            leaves[1].header().version(),
758            DrbAndHeaderUpgradeVersion::version()
759        );
760        let qcs = [
761            CertificatePair::for_parent(leaves[2].leaf()),
762            CertificatePair::for_parent(leaves[3].leaf()),
763        ];
764        {
765            let mut tx = ds.write().await.unwrap();
766            tx.insert_leaf(leaves[0].clone()).await.unwrap();
767            tx.insert_leaf_with_qc_chain(leaves[1].clone(), Some(qcs.clone()))
768                .await
769                .unwrap();
770            tx.commit().await.unwrap();
771        }
772
773        let proof = get_leaf_proof(&ds, 1, None, Duration::MAX).await.unwrap();
774        assert_eq!(
775            proof
776                .verify(LeafProofHint::Quorum(&VersionCheckQuorum::new(
777                    leaves.iter().map(|leaf| leaf.leaf().clone())
778                )))
779                .await
780                .unwrap(),
781            leaves[0]
782        );
783        assert!(matches!(proof.proof(), FinalityProof::HotStuff2 { .. }))
784    }
785
786    #[tokio::test]
787    #[test_log::test]
788    async fn test_header_proof() {
789        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
790        let ds = DataSource::create(
791            DataSource::persistence_options(&storage),
792            Default::default(),
793            false,
794        )
795        .await
796        .unwrap();
797
798        // Construct a chain of leaves, plus the corresponding block Merkle tree at each leaf.
799        let leaves = leaf_chain::<EpochVersion>(0..=2).await;
800        let mts = leaves
801            .iter()
802            .scan(
803                BlockMerkleTree::new(BLOCK_MERKLE_TREE_HEIGHT),
804                |mt, leaf| {
805                    assert_eq!(mt.commitment(), leaf.header().block_merkle_tree_root());
806                    let item = mt.clone();
807                    mt.push(leaf.block_hash()).unwrap();
808                    Some(item)
809                },
810            )
811            .collect::<Vec<_>>();
812
813        // Save all those objects in the DB.
814        {
815            let mut tx = ds.write().await.unwrap();
816            for (leaf, mt) in leaves.iter().zip(&mts) {
817                tx.insert_leaf(leaf.clone()).await.unwrap();
818
819                if leaf.height() > 0 {
820                    let merkle_path = mt.lookup(leaf.height() - 1).expect_ok().unwrap().1;
821                    UpdateStateData::<SeqTypes, BlockMerkleTree, _>::insert_merkle_nodes(
822                        &mut tx,
823                        merkle_path,
824                        ToTraversalPath::<{ BlockMerkleTree::ARITY }>::to_traversal_path(
825                            &(leaf.height() - 1),
826                            BLOCK_MERKLE_TREE_HEIGHT,
827                        ),
828                        leaf.height(),
829                    )
830                    .await
831                    .unwrap();
832                    UpdateStateData::<SeqTypes, BlockMerkleTree, _>::set_last_state_height(
833                        &mut tx,
834                        leaf.height() as usize,
835                    )
836                    .await
837                    .unwrap();
838                }
839            }
840            tx.commit().await.unwrap();
841        }
842
843        // Test happy path.
844        for (root, mt) in mts.iter().enumerate().skip(1) {
845            for (height, leaf) in leaves.iter().enumerate().take(root) {
846                tracing::info!(root, height, "test happy path");
847                let proof =
848                    get_header_proof(&ds, root as u64, BlockId::Number(height), Duration::MAX)
849                        .await
850                        .unwrap();
851                assert_eq!(proof.verify_ref(mt.commitment()).unwrap(), leaf.header());
852            }
853        }
854
855        // Test unknown leaf.
856        let err = get_header_proof(&ds, 5, BlockId::Number(4), Duration::from_secs(1))
857            .await
858            .unwrap_err();
859        assert_eq!(err.status(), StatusCode::NOT_FOUND);
860
861        // Test height >= root.
862        let err = get_header_proof(&ds, 1, BlockId::Number(1), Duration::MAX)
863            .await
864            .unwrap_err();
865        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
866    }
867
868    #[tokio::test]
869    #[test_log::test]
870    async fn test_namespace_proof() {
871        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
872        let ds = DataSource::create(
873            DataSource::persistence_options(&storage),
874            Default::default(),
875            false,
876        )
877        .await
878        .unwrap();
879
880        // Construct a chain of blocks.
881        let client = TestClient::default();
882        let leaves = join_all((0..=2).map(|i| client.leaf(i))).await;
883        let payloads = join_all((0..=2).map(|i| client.payload(i))).await;
884        let vid_commons = join_all((0..=2).map(|i| client.vid_common(i))).await;
885
886        // Save all those objects in the DB.
887        {
888            let mut tx = ds.write().await.unwrap();
889            for (leaf, payload, vid_common) in izip!(&leaves, &payloads, &vid_commons) {
890                tx.insert_leaf(leaf.clone()).await.unwrap();
891                tx.insert_block(BlockQueryData::<SeqTypes>::new(
892                    leaf.header().clone(),
893                    payload.clone(),
894                ))
895                .await
896                .unwrap();
897                tx.insert_vid(
898                    VidCommonQueryData::<SeqTypes>::new(leaf.header().clone(), vid_common.clone()),
899                    None,
900                )
901                .await
902                .unwrap();
903            }
904            tx.commit().await.unwrap();
905        }
906
907        // Test happy path: all blocks.
908        let ns = payloads[0]
909            .transaction(&TransactionIndex {
910                ns_index: 0.into(),
911                position: 0,
912            })
913            .unwrap()
914            .namespace();
915        let proofs = get_namespace_proof_range(&ds, 0, 3, ns.into(), Duration::MAX, 100)
916            .await
917            .unwrap();
918        assert_eq!(proofs.len(), 3);
919        for (leaf, proof) in leaves.iter().zip(proofs) {
920            proof.verify(leaf.header(), ns).unwrap();
921        }
922
923        // Test happy path: subset.
924        let tx = payloads[1]
925            .transaction(&TransactionIndex {
926                ns_index: 0.into(),
927                position: 0,
928            })
929            .unwrap();
930        let ns = tx.namespace();
931        let proofs = get_namespace_proof_range(&ds, 1, 2, ns.into(), Duration::MAX, 100)
932            .await
933            .unwrap();
934        assert_eq!(proofs.len(), 1);
935        assert_eq!(proofs[0].verify(leaves[1].header(), ns).unwrap(), [tx]);
936
937        // Test missing data in range.
938        let err = get_namespace_proof_range(&ds, 0, 4, ns.into(), Duration::from_secs(1), 100)
939            .await
940            .unwrap_err();
941        assert_eq!(err.status(), StatusCode::NOT_FOUND);
942
943        // Test invalid range.
944        let err = get_namespace_proof_range(&ds, 1, 0, ns.into(), Duration::from_secs(1), 100)
945            .await
946            .unwrap_err();
947        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
948        assert!(
949            err.to_string().contains("requested empty interval"),
950            "{err:#}"
951        );
952
953        // Test large range.
954        let err = get_namespace_proof_range(&ds, 0, 10_000, ns.into(), Duration::from_secs(1), 100)
955            .await
956            .unwrap_err();
957        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
958        assert!(err.to_string().contains("exceeds maximum size"), "{err:#}");
959    }
960}