hotshot_query_service/data_source/
extension.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::ops::{Bound, RangeBounds};
14
15use async_trait::async_trait;
16use hotshot_types::{data::VidShare, traits::node_implementation::NodeType};
17use jf_merkle_tree::prelude::MerkleProof;
18use tagged_base64::TaggedBase64;
19
20use super::VersionedDataSource;
21use crate::{
22    availability::{
23        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch, FetchStream, LeafId,
24        LeafQueryData, NamespaceId, PayloadMetadata, PayloadQueryData, QueryableHeader,
25        QueryablePayload, StateCertQueryData, TransactionHash, TransactionQueryData,
26        UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
27    },
28    data_source::storage::pruning::PrunedHeightDataSource,
29    explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
30    merklized_state::{
31        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
32        UpdateStateData,
33    },
34    metrics::PrometheusMetrics,
35    node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
36    status::{HasMetrics, StatusDataSource},
37    Header, Payload, QueryResult, Transaction,
38};
39/// Wrapper to add extensibility to an existing data source.
40///
41/// [`ExtensibleDataSource`] adds app-specific data to any existing data source. It implements all
42/// the data source traits defined in this crate as long as the underlying data source does so,
43/// which means it can be used as state for instantiating the APIs defined in this crate. At the
44/// same time, it provides access to an application-defined state type, which means it can also be
45/// used to implement application-specific endpoints.
46///
47/// [`ExtensibleDataSource`] implements `AsRef<U>` and `AsMut<U>` for some user-defined type `U`, so
48/// your API extensions can always access application-specific state from [`ExtensibleDataSource`].
49/// We can use this to complete the [UTXO example](crate#extension) by extending our data source
50/// with an index to look up transactions by the UTXOs they contain:
51///
52/// ```
53/// # use async_trait::async_trait;
54/// # use hotshot_query_service::availability::{AvailabilityDataSource, TransactionIndex};
55/// # use hotshot_query_service::data_source::ExtensibleDataSource;
56/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
57/// # use std::collections::HashMap;
58/// # #[async_trait]
59/// # trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
60/// #   async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
61/// # }
62/// type UtxoIndex = HashMap<u64, (usize, TransactionIndex<AppTypes>, usize)>;
63///
64/// #[async_trait]
65/// impl<UnderlyingDataSource> UtxoDataSource for
66///     ExtensibleDataSource<UnderlyingDataSource, UtxoIndex>
67/// where
68///     UnderlyingDataSource: AvailabilityDataSource<AppTypes> + Send + Sync,
69/// {
70///     async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)> {
71///         self.as_ref().get(&utxo).cloned()
72///     }
73/// }
74/// ```
75#[derive(Clone, Copy, Debug)]
76pub struct ExtensibleDataSource<D, U> {
77    data_source: D,
78    user_data: U,
79}
80
81impl<D, U> ExtensibleDataSource<D, U> {
82    pub fn new(data_source: D, user_data: U) -> Self {
83        Self {
84            data_source,
85            user_data,
86        }
87    }
88
89    /// Access the underlying data source.
90    ///
91    /// This functionality is provided as an inherent method rather than an implementation of the
92    /// [`AsRef`] trait so that `self.as_ref()` unambiguously returns `&U`, helping with type
93    /// inference.
94    pub fn inner(&self) -> &D {
95        &self.data_source
96    }
97
98    /// Mutably access the underlying data source.
99    ///
100    /// This functionality is provided as an inherent method rather than an implementation of the
101    /// [`AsMut`] trait so that `self.as_mut()` unambiguously returns `&U`, helping with type
102    /// inference.
103    pub fn inner_mut(&mut self) -> &mut D {
104        &mut self.data_source
105    }
106}
107
108impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
109    fn as_ref(&self) -> &U {
110        &self.user_data
111    }
112}
113
114impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
115    fn as_mut(&mut self) -> &mut U {
116        &mut self.user_data
117    }
118}
119
120impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
121where
122    D: VersionedDataSource + Send,
123    U: Send + Sync,
124{
125    type Transaction<'a>
126        = D::Transaction<'a>
127    where
128        Self: 'a;
129
130    type ReadOnly<'a>
131        = D::ReadOnly<'a>
132    where
133        Self: 'a;
134
135    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
136        self.data_source.write().await
137    }
138
139    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
140        self.data_source.read().await
141    }
142}
143
144#[async_trait]
145impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
146where
147    D: PrunedHeightDataSource + Send + Sync,
148    U: Send + Sync,
149{
150    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
151        self.data_source.load_pruned_height().await
152    }
153}
154
155#[async_trait]
156impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
157where
158    D: AvailabilityDataSource<Types> + Send + Sync,
159    U: Send + Sync,
160    Types: NodeType,
161    Header<Types>: QueryableHeader<Types>,
162    Payload<Types>: QueryablePayload<Types>,
163{
164    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
165    where
166        ID: Into<LeafId<Types>> + Send + Sync,
167    {
168        self.data_source.get_leaf(id).await
169    }
170
171    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
172    where
173        ID: Into<BlockId<Types>> + Send + Sync,
174    {
175        self.data_source.get_header(id).await
176    }
177
178    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
179    where
180        ID: Into<BlockId<Types>> + Send + Sync,
181    {
182        self.data_source.get_block(id).await
183    }
184    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
185    where
186        ID: Into<BlockId<Types>> + Send + Sync,
187    {
188        self.data_source.get_payload(id).await
189    }
190    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
191    where
192        ID: Into<BlockId<Types>> + Send + Sync,
193    {
194        self.data_source.get_payload_metadata(id).await
195    }
196    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
197    where
198        ID: Into<BlockId<Types>> + Send + Sync,
199    {
200        self.data_source.get_vid_common(id).await
201    }
202    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
203    where
204        ID: Into<BlockId<Types>> + Send + Sync,
205    {
206        self.data_source.get_vid_common_metadata(id).await
207    }
208    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
209    where
210        R: RangeBounds<usize> + Send + 'static,
211    {
212        self.data_source.get_leaf_range(range).await
213    }
214    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
215    where
216        R: RangeBounds<usize> + Send + 'static,
217    {
218        self.data_source.get_block_range(range).await
219    }
220
221    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
222    where
223        R: RangeBounds<usize> + Send + 'static,
224    {
225        self.data_source.get_header_range(range).await
226    }
227    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
228    where
229        R: RangeBounds<usize> + Send + 'static,
230    {
231        self.data_source.get_payload_range(range).await
232    }
233    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
234    where
235        R: RangeBounds<usize> + Send + 'static,
236    {
237        self.data_source.get_payload_metadata_range(range).await
238    }
239    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
240    where
241        R: RangeBounds<usize> + Send + 'static,
242    {
243        self.data_source.get_vid_common_range(range).await
244    }
245    async fn get_vid_common_metadata_range<R>(
246        &self,
247        range: R,
248    ) -> FetchStream<VidCommonMetadata<Types>>
249    where
250        R: RangeBounds<usize> + Send + 'static,
251    {
252        self.data_source.get_vid_common_metadata_range(range).await
253    }
254
255    async fn get_leaf_range_rev(
256        &self,
257        start: Bound<usize>,
258        end: usize,
259    ) -> FetchStream<LeafQueryData<Types>> {
260        self.data_source.get_leaf_range_rev(start, end).await
261    }
262    async fn get_block_range_rev(
263        &self,
264        start: Bound<usize>,
265        end: usize,
266    ) -> FetchStream<BlockQueryData<Types>> {
267        self.data_source.get_block_range_rev(start, end).await
268    }
269    async fn get_payload_range_rev(
270        &self,
271        start: Bound<usize>,
272        end: usize,
273    ) -> FetchStream<PayloadQueryData<Types>> {
274        self.data_source.get_payload_range_rev(start, end).await
275    }
276    async fn get_payload_metadata_range_rev(
277        &self,
278        start: Bound<usize>,
279        end: usize,
280    ) -> FetchStream<PayloadMetadata<Types>> {
281        self.data_source
282            .get_payload_metadata_range_rev(start, end)
283            .await
284    }
285    async fn get_vid_common_range_rev(
286        &self,
287        start: Bound<usize>,
288        end: usize,
289    ) -> FetchStream<VidCommonQueryData<Types>> {
290        self.data_source.get_vid_common_range_rev(start, end).await
291    }
292    async fn get_vid_common_metadata_range_rev(
293        &self,
294        start: Bound<usize>,
295        end: usize,
296    ) -> FetchStream<VidCommonMetadata<Types>> {
297        self.data_source
298            .get_vid_common_metadata_range_rev(start, end)
299            .await
300    }
301    async fn get_transaction(
302        &self,
303        hash: TransactionHash<Types>,
304    ) -> Fetch<TransactionQueryData<Types>> {
305        self.data_source.get_transaction(hash).await
306    }
307    async fn get_state_cert(&self, epoch: u64) -> Fetch<StateCertQueryData<Types>> {
308        self.data_source.get_state_cert(epoch).await
309    }
310}
311
312impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
313where
314    D: UpdateAvailabilityData<Types> + Send + Sync,
315    U: Send + Sync,
316    Types: NodeType,
317{
318    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
319        self.data_source.append(info).await
320    }
321}
322
323#[async_trait]
324impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
325where
326    D: NodeDataSource<Types> + Send + Sync,
327    U: Send + Sync,
328    Types: NodeType,
329    Header<Types>: QueryableHeader<Types>,
330{
331    async fn block_height(&self) -> QueryResult<usize> {
332        self.data_source.block_height().await
333    }
334    async fn count_transactions_in_range(
335        &self,
336        range: impl RangeBounds<usize> + Send,
337        namespace: Option<NamespaceId<Types>>,
338    ) -> QueryResult<usize> {
339        self.data_source
340            .count_transactions_in_range(range, namespace)
341            .await
342    }
343    async fn payload_size_in_range(
344        &self,
345        range: impl RangeBounds<usize> + Send,
346        namespace: Option<NamespaceId<Types>>,
347    ) -> QueryResult<usize> {
348        self.data_source
349            .payload_size_in_range(range, namespace)
350            .await
351    }
352    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
353    where
354        ID: Into<BlockId<Types>> + Send + Sync,
355    {
356        self.data_source.vid_share(id).await
357    }
358    async fn sync_status(&self) -> QueryResult<SyncStatus> {
359        self.data_source.sync_status().await
360    }
361    async fn get_header_window(
362        &self,
363        start: impl Into<WindowStart<Types>> + Send + Sync,
364        end: u64,
365        limit: usize,
366    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
367        self.data_source.get_header_window(start, end, limit).await
368    }
369}
370
371impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
372where
373    D: HasMetrics,
374{
375    fn metrics(&self) -> &PrometheusMetrics {
376        self.data_source.metrics()
377    }
378}
379
380#[async_trait]
381impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
382where
383    D: StatusDataSource + Send + Sync,
384    U: Send + Sync,
385{
386    async fn block_height(&self) -> QueryResult<usize> {
387        self.data_source.block_height().await
388    }
389}
390
391#[async_trait]
392impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
393    for ExtensibleDataSource<D, U>
394where
395    D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
396    U: Send + Sync,
397    Types: NodeType,
398    State: MerklizedState<Types, ARITY>,
399{
400    async fn get_path(
401        &self,
402        snapshot: Snapshot<Types, State, ARITY>,
403        key: State::Key,
404    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
405        self.data_source.get_path(snapshot, key).await
406    }
407}
408
409#[async_trait]
410impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
411where
412    D: MerklizedStateHeightPersistence + Sync,
413    U: Send + Sync,
414{
415    async fn get_last_state_height(&self) -> QueryResult<usize> {
416        self.data_source.get_last_state_height().await
417    }
418}
419
420#[async_trait]
421impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
422    for ExtensibleDataSource<D, U>
423where
424    D: UpdateStateData<Types, State, ARITY> + Send + Sync,
425    U: Send + Sync,
426    State: MerklizedState<Types, ARITY>,
427    Types: NodeType,
428{
429    async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
430        self.data_source.set_last_state_height(height).await
431    }
432
433    async fn insert_merkle_nodes(
434        &mut self,
435        path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
436        traversal_path: Vec<usize>,
437        block_number: u64,
438    ) -> anyhow::Result<()> {
439        self.data_source
440            .insert_merkle_nodes(path, traversal_path, block_number)
441            .await
442    }
443}
444
445#[async_trait]
446impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
447where
448    D: ExplorerDataSource<Types> + Sync,
449    U: Send + Sync,
450    Types: NodeType,
451    Payload<Types>: QueryablePayload<Types>,
452    Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
453    Transaction<Types>: ExplorerTransaction<Types>,
454{
455    async fn get_block_detail(
456        &self,
457        request: explorer::query_data::BlockIdentifier<Types>,
458    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
459    {
460        self.data_source.get_block_detail(request).await
461    }
462
463    async fn get_block_summaries(
464        &self,
465        request: explorer::query_data::GetBlockSummariesRequest<Types>,
466    ) -> Result<
467        Vec<explorer::query_data::BlockSummary<Types>>,
468        explorer::query_data::GetBlockSummariesError,
469    > {
470        self.data_source.get_block_summaries(request).await
471    }
472
473    async fn get_transaction_detail(
474        &self,
475        request: explorer::query_data::TransactionIdentifier<Types>,
476    ) -> Result<
477        explorer::query_data::TransactionDetailResponse<Types>,
478        explorer::query_data::GetTransactionDetailError,
479    > {
480        self.data_source.get_transaction_detail(request).await
481    }
482
483    async fn get_transaction_summaries(
484        &self,
485        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
486    ) -> Result<
487        Vec<explorer::query_data::TransactionSummary<Types>>,
488        explorer::query_data::GetTransactionSummariesError,
489    > {
490        self.data_source.get_transaction_summaries(request).await
491    }
492
493    async fn get_explorer_summary(
494        &self,
495    ) -> Result<
496        explorer::query_data::ExplorerSummary<Types>,
497        explorer::query_data::GetExplorerSummaryError,
498    > {
499        self.data_source.get_explorer_summary().await
500    }
501
502    async fn get_search_results(
503        &self,
504        query: TaggedBase64,
505    ) -> Result<
506        explorer::query_data::SearchResult<Types>,
507        explorer::query_data::GetSearchResultsError,
508    > {
509        self.data_source.get_search_results(query).await
510    }
511}
512
513#[cfg(any(test, feature = "testing"))]
514mod impl_testable_data_source {
515    use hotshot::types::Event;
516
517    use super::*;
518    use crate::{
519        data_source::UpdateDataSource,
520        testing::{
521            consensus::{DataSourceLifeCycle, TestableDataSource},
522            mocks::MockTypes,
523        },
524    };
525
526    #[async_trait]
527    impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
528    where
529        D: TestableDataSource + UpdateDataSource<MockTypes>,
530        U: Clone + Default + Send + Sync + 'static,
531    {
532        type Storage = D::Storage;
533
534        async fn create(node_id: usize) -> Self::Storage {
535            D::create(node_id).await
536        }
537
538        async fn connect(storage: &Self::Storage) -> Self {
539            Self::new(D::connect(storage).await, Default::default())
540        }
541
542        async fn reset(storage: &Self::Storage) -> Self {
543            Self::new(D::reset(storage).await, Default::default())
544        }
545
546        async fn handle_event(&self, event: &Event<MockTypes>) {
547            self.update(event).await.unwrap();
548        }
549    }
550}
551
552#[cfg(test)]
553mod test {
554    use super::ExtensibleDataSource;
555    use crate::testing::consensus::MockDataSource;
556    // For some reason this is the only way to import the macro defined in another module of this
557    // crate.
558    use crate::*;
559
560    instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
561}