hotshot_query_service/data_source/
extension.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{
14    ops::{Bound, RangeBounds},
15    sync::Arc,
16};
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use hotshot::types::Event;
21use hotshot_events_service::events_source::{EventFilterSet, EventsSource, StartupInfo};
22use hotshot_types::{data::VidShare, event::LegacyEvent, traits::node_implementation::NodeType};
23use jf_merkle_tree::prelude::MerkleProof;
24use tagged_base64::TaggedBase64;
25
26use super::VersionedDataSource;
27use crate::{
28    availability::{
29        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
30        FetchStream, LeafId, LeafQueryData, NamespaceId, PayloadMetadata, PayloadQueryData,
31        QueryableHeader, QueryablePayload, StateCertQueryDataV2, TransactionHash,
32        UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
33    },
34    data_source::storage::pruning::PrunedHeightDataSource,
35    explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
36    merklized_state::{
37        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
38        UpdateStateData,
39    },
40    metrics::PrometheusMetrics,
41    node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
42    status::{HasMetrics, StatusDataSource},
43    Header, Payload, QueryResult, Transaction,
44};
45/// Wrapper to add extensibility to an existing data source.
46///
47/// [`ExtensibleDataSource`] adds app-specific data to any existing data source. It implements all
48/// the data source traits defined in this crate as long as the underlying data source does so,
49/// which means it can be used as state for instantiating the APIs defined in this crate. At the
50/// same time, it provides access to an application-defined state type, which means it can also be
51/// used to implement application-specific endpoints.
52///
53/// [`ExtensibleDataSource`] implements `AsRef<U>` and `AsMut<U>` for some user-defined type `U`, so
54/// your API extensions can always access application-specific state from [`ExtensibleDataSource`].
55/// We can use this to complete the [UTXO example](crate#extension) by extending our data source
56/// with an index to look up transactions by the UTXOs they contain:
57///
58/// ```
59/// # use async_trait::async_trait;
60/// # use hotshot_query_service::availability::{AvailabilityDataSource, TransactionIndex};
61/// # use hotshot_query_service::data_source::ExtensibleDataSource;
62/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
63/// # use std::collections::HashMap;
64/// # #[async_trait]
65/// # trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
66/// #   async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
67/// # }
68/// type UtxoIndex = HashMap<u64, (usize, TransactionIndex<AppTypes>, usize)>;
69///
70/// #[async_trait]
71/// impl<UnderlyingDataSource> UtxoDataSource for
72///     ExtensibleDataSource<UnderlyingDataSource, UtxoIndex>
73/// where
74///     UnderlyingDataSource: AvailabilityDataSource<AppTypes> + Send + Sync,
75/// {
76///     async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)> {
77///         self.as_ref().get(&utxo).cloned()
78///     }
79/// }
80/// ```
81#[derive(Clone, Copy, Debug)]
82pub struct ExtensibleDataSource<D, U> {
83    data_source: D,
84    user_data: U,
85}
86
87impl<D, U> ExtensibleDataSource<D, U> {
88    pub fn new(data_source: D, user_data: U) -> Self {
89        Self {
90            data_source,
91            user_data,
92        }
93    }
94
95    /// Access the underlying data source.
96    ///
97    /// This functionality is provided as an inherent method rather than an implementation of the
98    /// [`AsRef`] trait so that `self.as_ref()` unambiguously returns `&U`, helping with type
99    /// inference.
100    pub fn inner(&self) -> &D {
101        &self.data_source
102    }
103
104    /// Mutably access the underlying data source.
105    ///
106    /// This functionality is provided as an inherent method rather than an implementation of the
107    /// [`AsMut`] trait so that `self.as_mut()` unambiguously returns `&U`, helping with type
108    /// inference.
109    pub fn inner_mut(&mut self) -> &mut D {
110        &mut self.data_source
111    }
112}
113
114impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
115    fn as_ref(&self) -> &U {
116        &self.user_data
117    }
118}
119
120impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
121    fn as_mut(&mut self) -> &mut U {
122        &mut self.user_data
123    }
124}
125
126impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
127where
128    D: VersionedDataSource + Send,
129    U: Send + Sync,
130{
131    type Transaction<'a>
132        = D::Transaction<'a>
133    where
134        Self: 'a;
135
136    type ReadOnly<'a>
137        = D::ReadOnly<'a>
138    where
139        Self: 'a;
140
141    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
142        self.data_source.write().await
143    }
144
145    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
146        self.data_source.read().await
147    }
148}
149
150#[async_trait]
151impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
152where
153    D: PrunedHeightDataSource + Send + Sync,
154    U: Send + Sync,
155{
156    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
157        self.data_source.load_pruned_height().await
158    }
159}
160
161#[async_trait]
162impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
163where
164    D: AvailabilityDataSource<Types> + Send + Sync,
165    U: Send + Sync,
166    Types: NodeType,
167    Header<Types>: QueryableHeader<Types>,
168    Payload<Types>: QueryablePayload<Types>,
169{
170    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
171    where
172        ID: Into<LeafId<Types>> + Send + Sync,
173    {
174        self.data_source.get_leaf(id).await
175    }
176
177    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
178    where
179        ID: Into<BlockId<Types>> + Send + Sync,
180    {
181        self.data_source.get_header(id).await
182    }
183
184    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
185    where
186        ID: Into<BlockId<Types>> + Send + Sync,
187    {
188        self.data_source.get_block(id).await
189    }
190    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
191    where
192        ID: Into<BlockId<Types>> + Send + Sync,
193    {
194        self.data_source.get_payload(id).await
195    }
196    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
197    where
198        ID: Into<BlockId<Types>> + Send + Sync,
199    {
200        self.data_source.get_payload_metadata(id).await
201    }
202    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
203    where
204        ID: Into<BlockId<Types>> + Send + Sync,
205    {
206        self.data_source.get_vid_common(id).await
207    }
208    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
209    where
210        ID: Into<BlockId<Types>> + Send + Sync,
211    {
212        self.data_source.get_vid_common_metadata(id).await
213    }
214    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
215    where
216        R: RangeBounds<usize> + Send + 'static,
217    {
218        self.data_source.get_leaf_range(range).await
219    }
220    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
221    where
222        R: RangeBounds<usize> + Send + 'static,
223    {
224        self.data_source.get_block_range(range).await
225    }
226
227    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
228    where
229        R: RangeBounds<usize> + Send + 'static,
230    {
231        self.data_source.get_header_range(range).await
232    }
233    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
234    where
235        R: RangeBounds<usize> + Send + 'static,
236    {
237        self.data_source.get_payload_range(range).await
238    }
239    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
240    where
241        R: RangeBounds<usize> + Send + 'static,
242    {
243        self.data_source.get_payload_metadata_range(range).await
244    }
245    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
246    where
247        R: RangeBounds<usize> + Send + 'static,
248    {
249        self.data_source.get_vid_common_range(range).await
250    }
251    async fn get_vid_common_metadata_range<R>(
252        &self,
253        range: R,
254    ) -> FetchStream<VidCommonMetadata<Types>>
255    where
256        R: RangeBounds<usize> + Send + 'static,
257    {
258        self.data_source.get_vid_common_metadata_range(range).await
259    }
260
261    async fn get_leaf_range_rev(
262        &self,
263        start: Bound<usize>,
264        end: usize,
265    ) -> FetchStream<LeafQueryData<Types>> {
266        self.data_source.get_leaf_range_rev(start, end).await
267    }
268    async fn get_block_range_rev(
269        &self,
270        start: Bound<usize>,
271        end: usize,
272    ) -> FetchStream<BlockQueryData<Types>> {
273        self.data_source.get_block_range_rev(start, end).await
274    }
275    async fn get_payload_range_rev(
276        &self,
277        start: Bound<usize>,
278        end: usize,
279    ) -> FetchStream<PayloadQueryData<Types>> {
280        self.data_source.get_payload_range_rev(start, end).await
281    }
282    async fn get_payload_metadata_range_rev(
283        &self,
284        start: Bound<usize>,
285        end: usize,
286    ) -> FetchStream<PayloadMetadata<Types>> {
287        self.data_source
288            .get_payload_metadata_range_rev(start, end)
289            .await
290    }
291    async fn get_vid_common_range_rev(
292        &self,
293        start: Bound<usize>,
294        end: usize,
295    ) -> FetchStream<VidCommonQueryData<Types>> {
296        self.data_source.get_vid_common_range_rev(start, end).await
297    }
298    async fn get_vid_common_metadata_range_rev(
299        &self,
300        start: Bound<usize>,
301        end: usize,
302    ) -> FetchStream<VidCommonMetadata<Types>> {
303        self.data_source
304            .get_vid_common_metadata_range_rev(start, end)
305            .await
306    }
307    async fn get_block_containing_transaction(
308        &self,
309        h: TransactionHash<Types>,
310    ) -> Fetch<BlockWithTransaction<Types>> {
311        self.data_source.get_block_containing_transaction(h).await
312    }
313    async fn get_state_cert(&self, epoch: u64) -> Fetch<StateCertQueryDataV2<Types>> {
314        self.data_source.get_state_cert(epoch).await
315    }
316}
317
318impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
319where
320    D: UpdateAvailabilityData<Types> + Send + Sync,
321    U: Send + Sync,
322    Types: NodeType,
323{
324    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
325        self.data_source.append(info).await
326    }
327}
328
329#[async_trait]
330impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
331where
332    D: NodeDataSource<Types> + Send + Sync,
333    U: Send + Sync,
334    Types: NodeType,
335    Header<Types>: QueryableHeader<Types>,
336{
337    async fn block_height(&self) -> QueryResult<usize> {
338        self.data_source.block_height().await
339    }
340    async fn count_transactions_in_range(
341        &self,
342        range: impl RangeBounds<usize> + Send,
343        namespace: Option<NamespaceId<Types>>,
344    ) -> QueryResult<usize> {
345        self.data_source
346            .count_transactions_in_range(range, namespace)
347            .await
348    }
349    async fn payload_size_in_range(
350        &self,
351        range: impl RangeBounds<usize> + Send,
352        namespace: Option<NamespaceId<Types>>,
353    ) -> QueryResult<usize> {
354        self.data_source
355            .payload_size_in_range(range, namespace)
356            .await
357    }
358    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
359    where
360        ID: Into<BlockId<Types>> + Send + Sync,
361    {
362        self.data_source.vid_share(id).await
363    }
364    async fn sync_status(&self) -> QueryResult<SyncStatus> {
365        self.data_source.sync_status().await
366    }
367    async fn get_header_window(
368        &self,
369        start: impl Into<WindowStart<Types>> + Send + Sync,
370        end: u64,
371        limit: usize,
372    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
373        self.data_source.get_header_window(start, end, limit).await
374    }
375}
376
377impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
378where
379    D: HasMetrics,
380{
381    fn metrics(&self) -> &PrometheusMetrics {
382        self.data_source.metrics()
383    }
384}
385
386#[async_trait]
387impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
388where
389    D: StatusDataSource + Send + Sync,
390    U: Send + Sync,
391{
392    async fn block_height(&self) -> QueryResult<usize> {
393        self.data_source.block_height().await
394    }
395}
396
397#[async_trait]
398impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
399    for ExtensibleDataSource<D, U>
400where
401    D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
402    U: Send + Sync,
403    Types: NodeType,
404    State: MerklizedState<Types, ARITY>,
405{
406    async fn get_path(
407        &self,
408        snapshot: Snapshot<Types, State, ARITY>,
409        key: State::Key,
410    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
411        self.data_source.get_path(snapshot, key).await
412    }
413}
414
415#[async_trait]
416impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
417where
418    D: MerklizedStateHeightPersistence + Sync,
419    U: Send + Sync,
420{
421    async fn get_last_state_height(&self) -> QueryResult<usize> {
422        self.data_source.get_last_state_height().await
423    }
424}
425
426#[async_trait]
427impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
428    for ExtensibleDataSource<D, U>
429where
430    D: UpdateStateData<Types, State, ARITY> + Send + Sync,
431    U: Send + Sync,
432    State: MerklizedState<Types, ARITY>,
433    Types: NodeType,
434{
435    async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
436        self.data_source.set_last_state_height(height).await
437    }
438
439    async fn insert_merkle_nodes(
440        &mut self,
441        path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
442        traversal_path: Vec<usize>,
443        block_number: u64,
444    ) -> anyhow::Result<()> {
445        self.data_source
446            .insert_merkle_nodes(path, traversal_path, block_number)
447            .await
448    }
449}
450
451#[async_trait]
452impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
453where
454    D: ExplorerDataSource<Types> + Sync,
455    U: Send + Sync,
456    Types: NodeType,
457    Payload<Types>: QueryablePayload<Types>,
458    Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
459    Transaction<Types>: ExplorerTransaction<Types>,
460{
461    async fn get_block_detail(
462        &self,
463        request: explorer::query_data::BlockIdentifier<Types>,
464    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
465    {
466        self.data_source.get_block_detail(request).await
467    }
468
469    async fn get_block_summaries(
470        &self,
471        request: explorer::query_data::GetBlockSummariesRequest<Types>,
472    ) -> Result<
473        Vec<explorer::query_data::BlockSummary<Types>>,
474        explorer::query_data::GetBlockSummariesError,
475    > {
476        self.data_source.get_block_summaries(request).await
477    }
478
479    async fn get_transaction_detail(
480        &self,
481        request: explorer::query_data::TransactionIdentifier<Types>,
482    ) -> Result<
483        explorer::query_data::TransactionDetailResponse<Types>,
484        explorer::query_data::GetTransactionDetailError,
485    > {
486        self.data_source.get_transaction_detail(request).await
487    }
488
489    async fn get_transaction_summaries(
490        &self,
491        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
492    ) -> Result<
493        Vec<explorer::query_data::TransactionSummary<Types>>,
494        explorer::query_data::GetTransactionSummariesError,
495    > {
496        self.data_source.get_transaction_summaries(request).await
497    }
498
499    async fn get_explorer_summary(
500        &self,
501    ) -> Result<
502        explorer::query_data::ExplorerSummary<Types>,
503        explorer::query_data::GetExplorerSummaryError,
504    > {
505        self.data_source.get_explorer_summary().await
506    }
507
508    async fn get_search_results(
509        &self,
510        query: TaggedBase64,
511    ) -> Result<
512        explorer::query_data::SearchResult<Types>,
513        explorer::query_data::GetSearchResultsError,
514    > {
515        self.data_source.get_search_results(query).await
516    }
517}
518
519/// Where the user data type supports it, derive `EventsSource` for the extensible data
520/// source.
521#[async_trait]
522impl<D, U, Types> EventsSource<Types> for ExtensibleDataSource<D, U>
523where
524    U: EventsSource<Types> + Sync,
525    D: Send + Sync,
526    Types: NodeType,
527{
528    type EventStream = BoxStream<'static, Arc<Event<Types>>>;
529    type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
530
531    async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
532        Box::pin(self.user_data.get_event_stream(filter).await)
533    }
534
535    async fn get_legacy_event_stream(
536        &self,
537        filter: Option<EventFilterSet<Types>>,
538    ) -> Self::LegacyEventStream {
539        Box::pin(self.user_data.get_legacy_event_stream(filter).await)
540    }
541
542    async fn get_startup_info(&self) -> StartupInfo<Types> {
543        self.user_data.get_startup_info().await
544    }
545}
546
547#[cfg(any(test, feature = "testing"))]
548mod impl_testable_data_source {
549    use hotshot::types::Event;
550
551    use super::*;
552    use crate::{
553        data_source::UpdateDataSource,
554        testing::{
555            consensus::{DataSourceLifeCycle, TestableDataSource},
556            mocks::MockTypes,
557        },
558    };
559
560    #[async_trait]
561    impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
562    where
563        D: TestableDataSource + UpdateDataSource<MockTypes>,
564        U: Clone + Default + Send + Sync + 'static,
565    {
566        type Storage = D::Storage;
567
568        async fn create(node_id: usize) -> Self::Storage {
569            D::create(node_id).await
570        }
571
572        async fn connect(storage: &Self::Storage) -> Self {
573            Self::new(D::connect(storage).await, Default::default())
574        }
575
576        async fn reset(storage: &Self::Storage) -> Self {
577            Self::new(D::reset(storage).await, Default::default())
578        }
579
580        async fn handle_event(&self, event: &Event<MockTypes>) {
581            self.update(event).await.unwrap();
582        }
583    }
584}
585
586#[cfg(test)]
587mod test {
588    use super::ExtensibleDataSource;
589    use crate::testing::consensus::MockDataSource;
590    // For some reason this is the only way to import the macro defined in another module of this
591    // crate.
592    use crate::*;
593
594    instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
595}