hotshot_query_service/data_source/
extension.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{
14    ops::{Bound, RangeBounds},
15    sync::Arc,
16};
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use hotshot::types::Event;
21use hotshot_events_service::events_source::{EventFilterSet, EventsSource, StartupInfo};
22use hotshot_types::{data::VidShare, event::LegacyEvent, traits::node_implementation::NodeType};
23use jf_merkle_tree_compat::prelude::MerkleProof;
24use tagged_base64::TaggedBase64;
25
26use super::VersionedDataSource;
27use crate::{
28    availability::{
29        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
30        FetchStream, LeafId, LeafQueryData, NamespaceId, PayloadMetadata, PayloadQueryData,
31        QueryableHeader, QueryablePayload, TransactionHash, UpdateAvailabilityData,
32        VidCommonMetadata, VidCommonQueryData,
33    },
34    data_source::storage::pruning::PrunedHeightDataSource,
35    explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
36    merklized_state::{
37        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
38        UpdateStateData,
39    },
40    metrics::PrometheusMetrics,
41    node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
42    status::{HasMetrics, StatusDataSource},
43    Header, Payload, QueryResult, Transaction,
44};
45/// Wrapper to add extensibility to an existing data source.
46///
47/// [`ExtensibleDataSource`] adds app-specific data to any existing data source. It implements all
48/// the data source traits defined in this crate as long as the underlying data source does so,
49/// which means it can be used as state for instantiating the APIs defined in this crate. At the
50/// same time, it provides access to an application-defined state type, which means it can also be
51/// used to implement application-specific endpoints.
52///
53/// [`ExtensibleDataSource`] implements `AsRef<U>` and `AsMut<U>` for some user-defined type `U`, so
54/// your API extensions can always access application-specific state from [`ExtensibleDataSource`].
55/// We can use this to complete the [UTXO example](crate#extension) by extending our data source
56/// with an index to look up transactions by the UTXOs they contain:
57///
58/// ```
59/// # use async_trait::async_trait;
60/// # use hotshot_query_service::availability::{AvailabilityDataSource, TransactionIndex};
61/// # use hotshot_query_service::data_source::ExtensibleDataSource;
62/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
63/// # use std::collections::HashMap;
64/// # #[async_trait]
65/// # trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
66/// #   async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
67/// # }
68/// type UtxoIndex = HashMap<u64, (usize, TransactionIndex<AppTypes>, usize)>;
69///
70/// #[async_trait]
71/// impl<UnderlyingDataSource> UtxoDataSource for
72///     ExtensibleDataSource<UnderlyingDataSource, UtxoIndex>
73/// where
74///     UnderlyingDataSource: AvailabilityDataSource<AppTypes> + Send + Sync,
75/// {
76///     async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)> {
77///         self.as_ref().get(&utxo).cloned()
78///     }
79/// }
80/// ```
81#[derive(Clone, Copy, Debug)]
82pub struct ExtensibleDataSource<D, U> {
83    data_source: D,
84    user_data: U,
85}
86
87impl<D, U> ExtensibleDataSource<D, U> {
88    pub fn new(data_source: D, user_data: U) -> Self {
89        Self {
90            data_source,
91            user_data,
92        }
93    }
94
95    /// Access the underlying data source.
96    ///
97    /// This functionality is provided as an inherent method rather than an implementation of the
98    /// [`AsRef`] trait so that `self.as_ref()` unambiguously returns `&U`, helping with type
99    /// inference.
100    pub fn inner(&self) -> &D {
101        &self.data_source
102    }
103
104    /// Mutably access the underlying data source.
105    ///
106    /// This functionality is provided as an inherent method rather than an implementation of the
107    /// [`AsMut`] trait so that `self.as_mut()` unambiguously returns `&U`, helping with type
108    /// inference.
109    pub fn inner_mut(&mut self) -> &mut D {
110        &mut self.data_source
111    }
112}
113
114impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
115    fn as_ref(&self) -> &U {
116        &self.user_data
117    }
118}
119
120impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
121    fn as_mut(&mut self) -> &mut U {
122        &mut self.user_data
123    }
124}
125
126impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
127where
128    D: VersionedDataSource + Send,
129    U: Send + Sync,
130{
131    type Transaction<'a>
132        = D::Transaction<'a>
133    where
134        Self: 'a;
135
136    type ReadOnly<'a>
137        = D::ReadOnly<'a>
138    where
139        Self: 'a;
140
141    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
142        self.data_source.write().await
143    }
144
145    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
146        self.data_source.read().await
147    }
148}
149
150#[async_trait]
151impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
152where
153    D: PrunedHeightDataSource + Send + Sync,
154    U: Send + Sync,
155{
156    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
157        self.data_source.load_pruned_height().await
158    }
159}
160
161#[async_trait]
162impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
163where
164    D: AvailabilityDataSource<Types> + Send + Sync,
165    U: Send + Sync,
166    Types: NodeType,
167    Header<Types>: QueryableHeader<Types>,
168    Payload<Types>: QueryablePayload<Types>,
169{
170    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
171    where
172        ID: Into<LeafId<Types>> + Send + Sync,
173    {
174        self.data_source.get_leaf(id).await
175    }
176
177    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
178    where
179        ID: Into<BlockId<Types>> + Send + Sync,
180    {
181        self.data_source.get_header(id).await
182    }
183
184    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
185    where
186        ID: Into<BlockId<Types>> + Send + Sync,
187    {
188        self.data_source.get_block(id).await
189    }
190    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
191    where
192        ID: Into<BlockId<Types>> + Send + Sync,
193    {
194        self.data_source.get_payload(id).await
195    }
196    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
197    where
198        ID: Into<BlockId<Types>> + Send + Sync,
199    {
200        self.data_source.get_payload_metadata(id).await
201    }
202    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
203    where
204        ID: Into<BlockId<Types>> + Send + Sync,
205    {
206        self.data_source.get_vid_common(id).await
207    }
208    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
209    where
210        ID: Into<BlockId<Types>> + Send + Sync,
211    {
212        self.data_source.get_vid_common_metadata(id).await
213    }
214    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
215    where
216        R: RangeBounds<usize> + Send + 'static,
217    {
218        self.data_source.get_leaf_range(range).await
219    }
220    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
221    where
222        R: RangeBounds<usize> + Send + 'static,
223    {
224        self.data_source.get_block_range(range).await
225    }
226
227    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
228    where
229        R: RangeBounds<usize> + Send + 'static,
230    {
231        self.data_source.get_header_range(range).await
232    }
233    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
234    where
235        R: RangeBounds<usize> + Send + 'static,
236    {
237        self.data_source.get_payload_range(range).await
238    }
239    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
240    where
241        R: RangeBounds<usize> + Send + 'static,
242    {
243        self.data_source.get_payload_metadata_range(range).await
244    }
245    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
246    where
247        R: RangeBounds<usize> + Send + 'static,
248    {
249        self.data_source.get_vid_common_range(range).await
250    }
251    async fn get_vid_common_metadata_range<R>(
252        &self,
253        range: R,
254    ) -> FetchStream<VidCommonMetadata<Types>>
255    where
256        R: RangeBounds<usize> + Send + 'static,
257    {
258        self.data_source.get_vid_common_metadata_range(range).await
259    }
260
261    async fn get_leaf_range_rev(
262        &self,
263        start: Bound<usize>,
264        end: usize,
265    ) -> FetchStream<LeafQueryData<Types>> {
266        self.data_source.get_leaf_range_rev(start, end).await
267    }
268    async fn get_block_range_rev(
269        &self,
270        start: Bound<usize>,
271        end: usize,
272    ) -> FetchStream<BlockQueryData<Types>> {
273        self.data_source.get_block_range_rev(start, end).await
274    }
275    async fn get_payload_range_rev(
276        &self,
277        start: Bound<usize>,
278        end: usize,
279    ) -> FetchStream<PayloadQueryData<Types>> {
280        self.data_source.get_payload_range_rev(start, end).await
281    }
282    async fn get_payload_metadata_range_rev(
283        &self,
284        start: Bound<usize>,
285        end: usize,
286    ) -> FetchStream<PayloadMetadata<Types>> {
287        self.data_source
288            .get_payload_metadata_range_rev(start, end)
289            .await
290    }
291    async fn get_vid_common_range_rev(
292        &self,
293        start: Bound<usize>,
294        end: usize,
295    ) -> FetchStream<VidCommonQueryData<Types>> {
296        self.data_source.get_vid_common_range_rev(start, end).await
297    }
298    async fn get_vid_common_metadata_range_rev(
299        &self,
300        start: Bound<usize>,
301        end: usize,
302    ) -> FetchStream<VidCommonMetadata<Types>> {
303        self.data_source
304            .get_vid_common_metadata_range_rev(start, end)
305            .await
306    }
307    async fn get_block_containing_transaction(
308        &self,
309        h: TransactionHash<Types>,
310    ) -> Fetch<BlockWithTransaction<Types>> {
311        self.data_source.get_block_containing_transaction(h).await
312    }
313}
314
315impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
316where
317    D: UpdateAvailabilityData<Types> + Send + Sync,
318    U: Send + Sync,
319    Types: NodeType,
320{
321    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
322        self.data_source.append(info).await
323    }
324}
325
326#[async_trait]
327impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
328where
329    D: NodeDataSource<Types> + Send + Sync,
330    U: Send + Sync,
331    Types: NodeType,
332    Header<Types>: QueryableHeader<Types>,
333{
334    async fn block_height(&self) -> QueryResult<usize> {
335        self.data_source.block_height().await
336    }
337    async fn count_transactions_in_range(
338        &self,
339        range: impl RangeBounds<usize> + Send,
340        namespace: Option<NamespaceId<Types>>,
341    ) -> QueryResult<usize> {
342        self.data_source
343            .count_transactions_in_range(range, namespace)
344            .await
345    }
346    async fn payload_size_in_range(
347        &self,
348        range: impl RangeBounds<usize> + Send,
349        namespace: Option<NamespaceId<Types>>,
350    ) -> QueryResult<usize> {
351        self.data_source
352            .payload_size_in_range(range, namespace)
353            .await
354    }
355    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
356    where
357        ID: Into<BlockId<Types>> + Send + Sync,
358    {
359        self.data_source.vid_share(id).await
360    }
361    async fn sync_status(&self) -> QueryResult<SyncStatus> {
362        self.data_source.sync_status().await
363    }
364    async fn get_header_window(
365        &self,
366        start: impl Into<WindowStart<Types>> + Send + Sync,
367        end: u64,
368        limit: usize,
369    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
370        self.data_source.get_header_window(start, end, limit).await
371    }
372}
373
374impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
375where
376    D: HasMetrics,
377{
378    fn metrics(&self) -> &PrometheusMetrics {
379        self.data_source.metrics()
380    }
381}
382
383#[async_trait]
384impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
385where
386    D: StatusDataSource + Send + Sync,
387    U: Send + Sync,
388{
389    async fn block_height(&self) -> QueryResult<usize> {
390        self.data_source.block_height().await
391    }
392}
393
394#[async_trait]
395impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
396    for ExtensibleDataSource<D, U>
397where
398    D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
399    U: Send + Sync,
400    Types: NodeType,
401    State: MerklizedState<Types, ARITY>,
402{
403    async fn get_path(
404        &self,
405        snapshot: Snapshot<Types, State, ARITY>,
406        key: State::Key,
407    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
408        self.data_source.get_path(snapshot, key).await
409    }
410}
411
412#[async_trait]
413impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
414where
415    D: MerklizedStateHeightPersistence + Sync,
416    U: Send + Sync,
417{
418    async fn get_last_state_height(&self) -> QueryResult<usize> {
419        self.data_source.get_last_state_height().await
420    }
421}
422
423#[async_trait]
424impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
425    for ExtensibleDataSource<D, U>
426where
427    D: UpdateStateData<Types, State, ARITY> + Send + Sync,
428    U: Send + Sync,
429    State: MerklizedState<Types, ARITY>,
430    Types: NodeType,
431{
432    async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
433        self.data_source.set_last_state_height(height).await
434    }
435
436    async fn insert_merkle_nodes(
437        &mut self,
438        path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
439        traversal_path: Vec<usize>,
440        block_number: u64,
441    ) -> anyhow::Result<()> {
442        self.data_source
443            .insert_merkle_nodes(path, traversal_path, block_number)
444            .await
445    }
446}
447
448#[async_trait]
449impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
450where
451    D: ExplorerDataSource<Types> + Sync,
452    U: Send + Sync,
453    Types: NodeType,
454    Payload<Types>: QueryablePayload<Types>,
455    Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
456    Transaction<Types>: ExplorerTransaction<Types>,
457{
458    async fn get_block_detail(
459        &self,
460        request: explorer::query_data::BlockIdentifier<Types>,
461    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
462    {
463        self.data_source.get_block_detail(request).await
464    }
465
466    async fn get_block_summaries(
467        &self,
468        request: explorer::query_data::GetBlockSummariesRequest<Types>,
469    ) -> Result<
470        Vec<explorer::query_data::BlockSummary<Types>>,
471        explorer::query_data::GetBlockSummariesError,
472    > {
473        self.data_source.get_block_summaries(request).await
474    }
475
476    async fn get_transaction_detail(
477        &self,
478        request: explorer::query_data::TransactionIdentifier<Types>,
479    ) -> Result<
480        explorer::query_data::TransactionDetailResponse<Types>,
481        explorer::query_data::GetTransactionDetailError,
482    > {
483        self.data_source.get_transaction_detail(request).await
484    }
485
486    async fn get_transaction_summaries(
487        &self,
488        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
489    ) -> Result<
490        Vec<explorer::query_data::TransactionSummary<Types>>,
491        explorer::query_data::GetTransactionSummariesError,
492    > {
493        self.data_source.get_transaction_summaries(request).await
494    }
495
496    async fn get_explorer_summary(
497        &self,
498    ) -> Result<
499        explorer::query_data::ExplorerSummary<Types>,
500        explorer::query_data::GetExplorerSummaryError,
501    > {
502        self.data_source.get_explorer_summary().await
503    }
504
505    async fn get_search_results(
506        &self,
507        query: TaggedBase64,
508    ) -> Result<
509        explorer::query_data::SearchResult<Types>,
510        explorer::query_data::GetSearchResultsError,
511    > {
512        self.data_source.get_search_results(query).await
513    }
514}
515
516/// Where the user data type supports it, derive `EventsSource` for the extensible data
517/// source.
518#[async_trait]
519impl<D, U, Types> EventsSource<Types> for ExtensibleDataSource<D, U>
520where
521    U: EventsSource<Types> + Sync,
522    D: Send + Sync,
523    Types: NodeType,
524{
525    type EventStream = BoxStream<'static, Arc<Event<Types>>>;
526    type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
527
528    async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
529        Box::pin(self.user_data.get_event_stream(filter).await)
530    }
531
532    async fn get_legacy_event_stream(
533        &self,
534        filter: Option<EventFilterSet<Types>>,
535    ) -> Self::LegacyEventStream {
536        Box::pin(self.user_data.get_legacy_event_stream(filter).await)
537    }
538
539    async fn get_startup_info(&self) -> StartupInfo<Types> {
540        self.user_data.get_startup_info().await
541    }
542}
543
544#[cfg(any(test, feature = "testing"))]
545mod impl_testable_data_source {
546    use hotshot::types::Event;
547
548    use super::*;
549    use crate::{
550        data_source::UpdateDataSource,
551        testing::{
552            consensus::{DataSourceLifeCycle, TestableDataSource},
553            mocks::MockTypes,
554        },
555    };
556
557    #[async_trait]
558    impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
559    where
560        D: TestableDataSource + UpdateDataSource<MockTypes>,
561        U: Clone + Default + Send + Sync + 'static,
562    {
563        type Storage = D::Storage;
564
565        async fn create(node_id: usize) -> Self::Storage {
566            D::create(node_id).await
567        }
568
569        async fn connect(storage: &Self::Storage) -> Self {
570            Self::new(D::connect(storage).await, Default::default())
571        }
572
573        async fn reset(storage: &Self::Storage) -> Self {
574            Self::new(D::reset(storage).await, Default::default())
575        }
576
577        async fn handle_event(&self, event: &Event<MockTypes>) {
578            self.update(event).await.unwrap();
579        }
580    }
581}
582
583#[cfg(test)]
584mod test {
585    use super::ExtensibleDataSource;
586    use crate::testing::consensus::MockDataSource;
587    // For some reason this is the only way to import the macro defined in another module of this
588    // crate.
589    use crate::*;
590
591    instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
592}