hotshot_query_service/data_source/
extension.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{
14    ops::{Bound, RangeBounds},
15    sync::Arc,
16};
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use hotshot::types::Event;
21use hotshot_events_service::events_source::{EventFilterSet, EventsSource, StartupInfo};
22use hotshot_types::{data::VidShare, event::LegacyEvent, traits::node_implementation::NodeType};
23use jf_merkle_tree_compat::prelude::MerkleProof;
24use tagged_base64::TaggedBase64;
25
26use super::VersionedDataSource;
27use crate::{
28    availability::{
29        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
30        FetchStream, LeafId, LeafQueryData, NamespaceId, PayloadMetadata, PayloadQueryData,
31        QueryableHeader, QueryablePayload, TransactionHash, UpdateAvailabilityData,
32        VidCommonMetadata, VidCommonQueryData,
33    },
34    data_source::storage::pruning::PrunedHeightDataSource,
35    explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
36    merklized_state::{
37        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
38        UpdateStateData,
39    },
40    metrics::PrometheusMetrics,
41    node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
42    status::{HasMetrics, StatusDataSource},
43    Header, Payload, QueryResult, Transaction,
44};
45/// Wrapper to add extensibility to an existing data source.
46///
47/// [`ExtensibleDataSource`] adds app-specific data to any existing data source. It implements all
48/// the data source traits defined in this crate as long as the underlying data source does so,
49/// which means it can be used as state for instantiating the APIs defined in this crate. At the
50/// same time, it provides access to an application-defined state type, which means it can also be
51/// used to implement application-specific endpoints.
52///
53/// [`ExtensibleDataSource`] implements `AsRef<U>` and `AsMut<U>` for some user-defined type `U`, so
54/// your API extensions can always access application-specific state from [`ExtensibleDataSource`].
55/// We can use this to complete the [UTXO example](crate#extension) by extending our data source
56/// with an index to look up transactions by the UTXOs they contain:
57///
58/// ```
59/// # use async_trait::async_trait;
60/// # use hotshot_query_service::availability::{AvailabilityDataSource, TransactionIndex};
61/// # use hotshot_query_service::data_source::ExtensibleDataSource;
62/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
63/// # use std::collections::HashMap;
64/// # #[async_trait]
65/// # trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
66/// #   async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
67/// # }
68/// type UtxoIndex = HashMap<u64, (usize, TransactionIndex<AppTypes>, usize)>;
69///
70/// #[async_trait]
71/// impl<UnderlyingDataSource> UtxoDataSource for
72///     ExtensibleDataSource<UnderlyingDataSource, UtxoIndex>
73/// where
74///     UnderlyingDataSource: AvailabilityDataSource<AppTypes> + Send + Sync,
75/// {
76///     async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)> {
77///         self.as_ref().get(&utxo).cloned()
78///     }
79/// }
80/// ```
81#[derive(Clone, Copy, Debug)]
82pub struct ExtensibleDataSource<D, U> {
83    data_source: D,
84    user_data: U,
85}
86
87impl<D, U> ExtensibleDataSource<D, U> {
88    pub fn new(data_source: D, user_data: U) -> Self {
89        Self {
90            data_source,
91            user_data,
92        }
93    }
94
95    /// Access the underlying data source.
96    ///
97    /// This functionality is provided as an inherent method rather than an implementation of the
98    /// [`AsRef`] trait so that `self.as_ref()` unambiguously returns `&U`, helping with type
99    /// inference.
100    pub fn inner(&self) -> &D {
101        &self.data_source
102    }
103
104    /// Mutably access the underlying data source.
105    ///
106    /// This functionality is provided as an inherent method rather than an implementation of the
107    /// [`AsMut`] trait so that `self.as_mut()` unambiguously returns `&U`, helping with type
108    /// inference.
109    pub fn inner_mut(&mut self) -> &mut D {
110        &mut self.data_source
111    }
112}
113
114impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
115    fn as_ref(&self) -> &U {
116        &self.user_data
117    }
118}
119
120impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
121    fn as_mut(&mut self) -> &mut U {
122        &mut self.user_data
123    }
124}
125
126impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
127where
128    D: VersionedDataSource + Send,
129    U: Send + Sync,
130{
131    type Transaction<'a>
132        = D::Transaction<'a>
133    where
134        Self: 'a;
135
136    type ReadOnly<'a>
137        = D::ReadOnly<'a>
138    where
139        Self: 'a;
140
141    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
142        self.data_source.write().await
143    }
144
145    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
146        self.data_source.read().await
147    }
148}
149
150#[async_trait]
151impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
152where
153    D: PrunedHeightDataSource + Send + Sync,
154    U: Send + Sync,
155{
156    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
157        self.data_source.load_pruned_height().await
158    }
159}
160
161#[async_trait]
162impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
163where
164    D: AvailabilityDataSource<Types> + Send + Sync,
165    U: Send + Sync,
166    Types: NodeType,
167    Header<Types>: QueryableHeader<Types>,
168    Payload<Types>: QueryablePayload<Types>,
169{
170    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
171    where
172        ID: Into<LeafId<Types>> + Send + Sync,
173    {
174        self.data_source.get_leaf(id).await
175    }
176
177    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
178    where
179        ID: Into<BlockId<Types>> + Send + Sync,
180    {
181        self.data_source.get_header(id).await
182    }
183
184    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
185    where
186        ID: Into<BlockId<Types>> + Send + Sync,
187    {
188        self.data_source.get_block(id).await
189    }
190    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
191    where
192        ID: Into<BlockId<Types>> + Send + Sync,
193    {
194        self.data_source.get_payload(id).await
195    }
196    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
197    where
198        ID: Into<BlockId<Types>> + Send + Sync,
199    {
200        self.data_source.get_payload_metadata(id).await
201    }
202    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
203    where
204        ID: Into<BlockId<Types>> + Send + Sync,
205    {
206        self.data_source.get_vid_common(id).await
207    }
208    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
209    where
210        ID: Into<BlockId<Types>> + Send + Sync,
211    {
212        self.data_source.get_vid_common_metadata(id).await
213    }
214    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
215    where
216        R: RangeBounds<usize> + Send + 'static,
217    {
218        self.data_source.get_leaf_range(range).await
219    }
220    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
221    where
222        R: RangeBounds<usize> + Send + 'static,
223    {
224        self.data_source.get_block_range(range).await
225    }
226
227    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
228    where
229        R: RangeBounds<usize> + Send + 'static,
230    {
231        self.data_source.get_header_range(range).await
232    }
233    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
234    where
235        R: RangeBounds<usize> + Send + 'static,
236    {
237        self.data_source.get_payload_range(range).await
238    }
239    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
240    where
241        R: RangeBounds<usize> + Send + 'static,
242    {
243        self.data_source.get_payload_metadata_range(range).await
244    }
245    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
246    where
247        R: RangeBounds<usize> + Send + 'static,
248    {
249        self.data_source.get_vid_common_range(range).await
250    }
251    async fn get_vid_common_metadata_range<R>(
252        &self,
253        range: R,
254    ) -> FetchStream<VidCommonMetadata<Types>>
255    where
256        R: RangeBounds<usize> + Send + 'static,
257    {
258        self.data_source.get_vid_common_metadata_range(range).await
259    }
260
261    async fn get_leaf_range_rev(
262        &self,
263        start: Bound<usize>,
264        end: usize,
265    ) -> FetchStream<LeafQueryData<Types>> {
266        self.data_source.get_leaf_range_rev(start, end).await
267    }
268    async fn get_block_range_rev(
269        &self,
270        start: Bound<usize>,
271        end: usize,
272    ) -> FetchStream<BlockQueryData<Types>> {
273        self.data_source.get_block_range_rev(start, end).await
274    }
275    async fn get_payload_range_rev(
276        &self,
277        start: Bound<usize>,
278        end: usize,
279    ) -> FetchStream<PayloadQueryData<Types>> {
280        self.data_source.get_payload_range_rev(start, end).await
281    }
282    async fn get_payload_metadata_range_rev(
283        &self,
284        start: Bound<usize>,
285        end: usize,
286    ) -> FetchStream<PayloadMetadata<Types>> {
287        self.data_source
288            .get_payload_metadata_range_rev(start, end)
289            .await
290    }
291    async fn get_vid_common_range_rev(
292        &self,
293        start: Bound<usize>,
294        end: usize,
295    ) -> FetchStream<VidCommonQueryData<Types>> {
296        self.data_source.get_vid_common_range_rev(start, end).await
297    }
298    async fn get_vid_common_metadata_range_rev(
299        &self,
300        start: Bound<usize>,
301        end: usize,
302    ) -> FetchStream<VidCommonMetadata<Types>> {
303        self.data_source
304            .get_vid_common_metadata_range_rev(start, end)
305            .await
306    }
307    async fn get_block_containing_transaction(
308        &self,
309        h: TransactionHash<Types>,
310    ) -> Fetch<BlockWithTransaction<Types>> {
311        self.data_source.get_block_containing_transaction(h).await
312    }
313}
314
315impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
316where
317    D: UpdateAvailabilityData<Types> + Send + Sync,
318    U: Send + Sync,
319    Types: NodeType,
320{
321    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
322        self.data_source.append(info).await
323    }
324}
325
326#[async_trait]
327impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
328where
329    D: NodeDataSource<Types> + Send + Sync,
330    U: Send + Sync,
331    Types: NodeType,
332    Header<Types>: QueryableHeader<Types>,
333{
334    async fn block_height(&self) -> QueryResult<usize> {
335        self.data_source.block_height().await
336    }
337    async fn count_transactions_in_range(
338        &self,
339        range: impl RangeBounds<usize> + Send,
340        namespace: Option<NamespaceId<Types>>,
341    ) -> QueryResult<usize> {
342        self.data_source
343            .count_transactions_in_range(range, namespace)
344            .await
345    }
346    async fn payload_size_in_range(
347        &self,
348        range: impl RangeBounds<usize> + Send,
349        namespace: Option<NamespaceId<Types>>,
350    ) -> QueryResult<usize> {
351        self.data_source
352            .payload_size_in_range(range, namespace)
353            .await
354    }
355    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
356    where
357        ID: Into<BlockId<Types>> + Send + Sync,
358    {
359        self.data_source.vid_share(id).await
360    }
361    async fn sync_status(&self) -> QueryResult<SyncStatus> {
362        self.data_source.sync_status().await
363    }
364    async fn get_header_window(
365        &self,
366        start: impl Into<WindowStart<Types>> + Send + Sync,
367        end: u64,
368        limit: usize,
369    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
370        self.data_source.get_header_window(start, end, limit).await
371    }
372}
373
374impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
375where
376    D: HasMetrics,
377{
378    fn metrics(&self) -> &PrometheusMetrics {
379        self.data_source.metrics()
380    }
381}
382
383#[async_trait]
384impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
385where
386    D: StatusDataSource + Send + Sync,
387    U: Send + Sync,
388{
389    async fn block_height(&self) -> QueryResult<usize> {
390        self.data_source.block_height().await
391    }
392}
393
394#[async_trait]
395impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
396    for ExtensibleDataSource<D, U>
397where
398    D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
399    U: Send + Sync,
400    Types: NodeType,
401    State: MerklizedState<Types, ARITY>,
402{
403    async fn get_path(
404        &self,
405        snapshot: Snapshot<Types, State, ARITY>,
406        key: State::Key,
407    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
408        self.data_source.get_path(snapshot, key).await
409    }
410}
411
412#[async_trait]
413impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
414where
415    D: MerklizedStateHeightPersistence + Sync,
416    U: Send + Sync,
417{
418    async fn get_last_state_height(&self) -> QueryResult<usize> {
419        self.data_source.get_last_state_height().await
420    }
421}
422
423#[async_trait]
424impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
425    for ExtensibleDataSource<D, U>
426where
427    D: UpdateStateData<Types, State, ARITY> + Send + Sync,
428    U: Send + Sync,
429    State: MerklizedState<Types, ARITY>,
430    Types: NodeType,
431{
432    async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
433        self.data_source.set_last_state_height(height).await
434    }
435
436    async fn insert_merkle_nodes(
437        &mut self,
438        path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
439        traversal_path: Vec<usize>,
440        block_number: u64,
441    ) -> anyhow::Result<()> {
442        self.data_source
443            .insert_merkle_nodes(path, traversal_path, block_number)
444            .await
445    }
446
447    async fn insert_merkle_nodes_batch(
448        &mut self,
449        proofs: Vec<(
450            MerkleProof<State::Entry, State::Key, State::T, ARITY>,
451            Vec<usize>,
452        )>,
453        block_number: u64,
454    ) -> anyhow::Result<()> {
455        self.data_source
456            .insert_merkle_nodes_batch(proofs, block_number)
457            .await
458    }
459}
460
461#[async_trait]
462impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
463where
464    D: ExplorerDataSource<Types> + Sync,
465    U: Send + Sync,
466    Types: NodeType,
467    Payload<Types>: QueryablePayload<Types>,
468    Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
469    Transaction<Types>: ExplorerTransaction<Types>,
470{
471    async fn get_block_detail(
472        &self,
473        request: explorer::query_data::BlockIdentifier<Types>,
474    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
475    {
476        self.data_source.get_block_detail(request).await
477    }
478
479    async fn get_block_summaries(
480        &self,
481        request: explorer::query_data::GetBlockSummariesRequest<Types>,
482    ) -> Result<
483        Vec<explorer::query_data::BlockSummary<Types>>,
484        explorer::query_data::GetBlockSummariesError,
485    > {
486        self.data_source.get_block_summaries(request).await
487    }
488
489    async fn get_transaction_detail(
490        &self,
491        request: explorer::query_data::TransactionIdentifier<Types>,
492    ) -> Result<
493        explorer::query_data::TransactionDetailResponse<Types>,
494        explorer::query_data::GetTransactionDetailError,
495    > {
496        self.data_source.get_transaction_detail(request).await
497    }
498
499    async fn get_transaction_summaries(
500        &self,
501        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
502    ) -> Result<
503        Vec<explorer::query_data::TransactionSummary<Types>>,
504        explorer::query_data::GetTransactionSummariesError,
505    > {
506        self.data_source.get_transaction_summaries(request).await
507    }
508
509    async fn get_explorer_summary(
510        &self,
511    ) -> Result<
512        explorer::query_data::ExplorerSummary<Types>,
513        explorer::query_data::GetExplorerSummaryError,
514    > {
515        self.data_source.get_explorer_summary().await
516    }
517
518    async fn get_search_results(
519        &self,
520        query: TaggedBase64,
521    ) -> Result<
522        explorer::query_data::SearchResult<Types>,
523        explorer::query_data::GetSearchResultsError,
524    > {
525        self.data_source.get_search_results(query).await
526    }
527}
528
529/// Where the user data type supports it, derive `EventsSource` for the extensible data
530/// source.
531#[async_trait]
532impl<D, U, Types> EventsSource<Types> for ExtensibleDataSource<D, U>
533where
534    U: EventsSource<Types> + Sync,
535    D: Send + Sync,
536    Types: NodeType,
537{
538    type EventStream = BoxStream<'static, Arc<Event<Types>>>;
539    type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
540
541    async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
542        Box::pin(self.user_data.get_event_stream(filter).await)
543    }
544
545    async fn get_legacy_event_stream(
546        &self,
547        filter: Option<EventFilterSet<Types>>,
548    ) -> Self::LegacyEventStream {
549        Box::pin(self.user_data.get_legacy_event_stream(filter).await)
550    }
551
552    async fn get_startup_info(&self) -> StartupInfo<Types> {
553        self.user_data.get_startup_info().await
554    }
555}
556
557#[cfg(any(test, feature = "testing"))]
558mod impl_testable_data_source {
559    use hotshot::types::Event;
560
561    use super::*;
562    use crate::{
563        data_source::UpdateDataSource,
564        testing::{
565            consensus::{DataSourceLifeCycle, TestableDataSource},
566            mocks::MockTypes,
567        },
568    };
569
570    #[async_trait]
571    impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
572    where
573        D: TestableDataSource + UpdateDataSource<MockTypes>,
574        U: Clone + Default + Send + Sync + 'static,
575    {
576        type Storage = D::Storage;
577
578        async fn create(node_id: usize) -> Self::Storage {
579            D::create(node_id).await
580        }
581
582        async fn connect(storage: &Self::Storage) -> Self {
583            Self::new(D::connect(storage).await, Default::default())
584        }
585
586        async fn reset(storage: &Self::Storage) -> Self {
587            Self::new(D::reset(storage).await, Default::default())
588        }
589
590        async fn handle_event(&self, event: &Event<MockTypes>) {
591            self.update(event).await.unwrap();
592        }
593    }
594}
595
596#[cfg(test)]
597mod test {
598    use super::ExtensibleDataSource;
599    use crate::testing::consensus::MockDataSource;
600    // For some reason this is the only way to import the macro defined in another module of this
601    // crate.
602    use crate::*;
603
604    instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
605}