1use std::{
14 ops::{Bound, RangeBounds},
15 sync::Arc,
16};
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use hotshot::types::Event;
21use hotshot_events_service::events_source::{EventFilterSet, EventsSource, StartupInfo};
22use hotshot_types::{data::VidShare, event::LegacyEvent, traits::node_implementation::NodeType};
23use jf_merkle_tree_compat::prelude::MerkleProof;
24use tagged_base64::TaggedBase64;
25
26use super::VersionedDataSource;
27use crate::{
28 availability::{
29 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
30 FetchStream, LeafId, LeafQueryData, NamespaceId, PayloadMetadata, PayloadQueryData,
31 QueryableHeader, QueryablePayload, TransactionHash, UpdateAvailabilityData,
32 VidCommonMetadata, VidCommonQueryData,
33 },
34 data_source::storage::pruning::PrunedHeightDataSource,
35 explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
36 merklized_state::{
37 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
38 UpdateStateData,
39 },
40 metrics::PrometheusMetrics,
41 node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
42 status::{HasMetrics, StatusDataSource},
43 Header, Payload, QueryResult, Transaction,
44};
45#[derive(Clone, Copy, Debug)]
82pub struct ExtensibleDataSource<D, U> {
83 data_source: D,
84 user_data: U,
85}
86
87impl<D, U> ExtensibleDataSource<D, U> {
88 pub fn new(data_source: D, user_data: U) -> Self {
89 Self {
90 data_source,
91 user_data,
92 }
93 }
94
95 pub fn inner(&self) -> &D {
101 &self.data_source
102 }
103
104 pub fn inner_mut(&mut self) -> &mut D {
110 &mut self.data_source
111 }
112}
113
114impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
115 fn as_ref(&self) -> &U {
116 &self.user_data
117 }
118}
119
120impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
121 fn as_mut(&mut self) -> &mut U {
122 &mut self.user_data
123 }
124}
125
126impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
127where
128 D: VersionedDataSource + Send,
129 U: Send + Sync,
130{
131 type Transaction<'a>
132 = D::Transaction<'a>
133 where
134 Self: 'a;
135
136 type ReadOnly<'a>
137 = D::ReadOnly<'a>
138 where
139 Self: 'a;
140
141 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
142 self.data_source.write().await
143 }
144
145 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
146 self.data_source.read().await
147 }
148}
149
150#[async_trait]
151impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
152where
153 D: PrunedHeightDataSource + Send + Sync,
154 U: Send + Sync,
155{
156 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
157 self.data_source.load_pruned_height().await
158 }
159}
160
161#[async_trait]
162impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
163where
164 D: AvailabilityDataSource<Types> + Send + Sync,
165 U: Send + Sync,
166 Types: NodeType,
167 Header<Types>: QueryableHeader<Types>,
168 Payload<Types>: QueryablePayload<Types>,
169{
170 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
171 where
172 ID: Into<LeafId<Types>> + Send + Sync,
173 {
174 self.data_source.get_leaf(id).await
175 }
176
177 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
178 where
179 ID: Into<BlockId<Types>> + Send + Sync,
180 {
181 self.data_source.get_header(id).await
182 }
183
184 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
185 where
186 ID: Into<BlockId<Types>> + Send + Sync,
187 {
188 self.data_source.get_block(id).await
189 }
190 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
191 where
192 ID: Into<BlockId<Types>> + Send + Sync,
193 {
194 self.data_source.get_payload(id).await
195 }
196 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
197 where
198 ID: Into<BlockId<Types>> + Send + Sync,
199 {
200 self.data_source.get_payload_metadata(id).await
201 }
202 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
203 where
204 ID: Into<BlockId<Types>> + Send + Sync,
205 {
206 self.data_source.get_vid_common(id).await
207 }
208 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
209 where
210 ID: Into<BlockId<Types>> + Send + Sync,
211 {
212 self.data_source.get_vid_common_metadata(id).await
213 }
214 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
215 where
216 R: RangeBounds<usize> + Send + 'static,
217 {
218 self.data_source.get_leaf_range(range).await
219 }
220 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
221 where
222 R: RangeBounds<usize> + Send + 'static,
223 {
224 self.data_source.get_block_range(range).await
225 }
226
227 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
228 where
229 R: RangeBounds<usize> + Send + 'static,
230 {
231 self.data_source.get_header_range(range).await
232 }
233 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
234 where
235 R: RangeBounds<usize> + Send + 'static,
236 {
237 self.data_source.get_payload_range(range).await
238 }
239 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
240 where
241 R: RangeBounds<usize> + Send + 'static,
242 {
243 self.data_source.get_payload_metadata_range(range).await
244 }
245 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
246 where
247 R: RangeBounds<usize> + Send + 'static,
248 {
249 self.data_source.get_vid_common_range(range).await
250 }
251 async fn get_vid_common_metadata_range<R>(
252 &self,
253 range: R,
254 ) -> FetchStream<VidCommonMetadata<Types>>
255 where
256 R: RangeBounds<usize> + Send + 'static,
257 {
258 self.data_source.get_vid_common_metadata_range(range).await
259 }
260
261 async fn get_leaf_range_rev(
262 &self,
263 start: Bound<usize>,
264 end: usize,
265 ) -> FetchStream<LeafQueryData<Types>> {
266 self.data_source.get_leaf_range_rev(start, end).await
267 }
268 async fn get_block_range_rev(
269 &self,
270 start: Bound<usize>,
271 end: usize,
272 ) -> FetchStream<BlockQueryData<Types>> {
273 self.data_source.get_block_range_rev(start, end).await
274 }
275 async fn get_payload_range_rev(
276 &self,
277 start: Bound<usize>,
278 end: usize,
279 ) -> FetchStream<PayloadQueryData<Types>> {
280 self.data_source.get_payload_range_rev(start, end).await
281 }
282 async fn get_payload_metadata_range_rev(
283 &self,
284 start: Bound<usize>,
285 end: usize,
286 ) -> FetchStream<PayloadMetadata<Types>> {
287 self.data_source
288 .get_payload_metadata_range_rev(start, end)
289 .await
290 }
291 async fn get_vid_common_range_rev(
292 &self,
293 start: Bound<usize>,
294 end: usize,
295 ) -> FetchStream<VidCommonQueryData<Types>> {
296 self.data_source.get_vid_common_range_rev(start, end).await
297 }
298 async fn get_vid_common_metadata_range_rev(
299 &self,
300 start: Bound<usize>,
301 end: usize,
302 ) -> FetchStream<VidCommonMetadata<Types>> {
303 self.data_source
304 .get_vid_common_metadata_range_rev(start, end)
305 .await
306 }
307 async fn get_block_containing_transaction(
308 &self,
309 h: TransactionHash<Types>,
310 ) -> Fetch<BlockWithTransaction<Types>> {
311 self.data_source.get_block_containing_transaction(h).await
312 }
313}
314
315impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
316where
317 D: UpdateAvailabilityData<Types> + Send + Sync,
318 U: Send + Sync,
319 Types: NodeType,
320{
321 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
322 self.data_source.append(info).await
323 }
324}
325
326#[async_trait]
327impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
328where
329 D: NodeDataSource<Types> + Send + Sync,
330 U: Send + Sync,
331 Types: NodeType,
332 Header<Types>: QueryableHeader<Types>,
333{
334 async fn block_height(&self) -> QueryResult<usize> {
335 self.data_source.block_height().await
336 }
337 async fn count_transactions_in_range(
338 &self,
339 range: impl RangeBounds<usize> + Send,
340 namespace: Option<NamespaceId<Types>>,
341 ) -> QueryResult<usize> {
342 self.data_source
343 .count_transactions_in_range(range, namespace)
344 .await
345 }
346 async fn payload_size_in_range(
347 &self,
348 range: impl RangeBounds<usize> + Send,
349 namespace: Option<NamespaceId<Types>>,
350 ) -> QueryResult<usize> {
351 self.data_source
352 .payload_size_in_range(range, namespace)
353 .await
354 }
355 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
356 where
357 ID: Into<BlockId<Types>> + Send + Sync,
358 {
359 self.data_source.vid_share(id).await
360 }
361 async fn sync_status(&self) -> QueryResult<SyncStatus> {
362 self.data_source.sync_status().await
363 }
364 async fn get_header_window(
365 &self,
366 start: impl Into<WindowStart<Types>> + Send + Sync,
367 end: u64,
368 limit: usize,
369 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
370 self.data_source.get_header_window(start, end, limit).await
371 }
372}
373
374impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
375where
376 D: HasMetrics,
377{
378 fn metrics(&self) -> &PrometheusMetrics {
379 self.data_source.metrics()
380 }
381}
382
383#[async_trait]
384impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
385where
386 D: StatusDataSource + Send + Sync,
387 U: Send + Sync,
388{
389 async fn block_height(&self) -> QueryResult<usize> {
390 self.data_source.block_height().await
391 }
392}
393
394#[async_trait]
395impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
396 for ExtensibleDataSource<D, U>
397where
398 D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
399 U: Send + Sync,
400 Types: NodeType,
401 State: MerklizedState<Types, ARITY>,
402{
403 async fn get_path(
404 &self,
405 snapshot: Snapshot<Types, State, ARITY>,
406 key: State::Key,
407 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
408 self.data_source.get_path(snapshot, key).await
409 }
410}
411
412#[async_trait]
413impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
414where
415 D: MerklizedStateHeightPersistence + Sync,
416 U: Send + Sync,
417{
418 async fn get_last_state_height(&self) -> QueryResult<usize> {
419 self.data_source.get_last_state_height().await
420 }
421}
422
423#[async_trait]
424impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
425 for ExtensibleDataSource<D, U>
426where
427 D: UpdateStateData<Types, State, ARITY> + Send + Sync,
428 U: Send + Sync,
429 State: MerklizedState<Types, ARITY>,
430 Types: NodeType,
431{
432 async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
433 self.data_source.set_last_state_height(height).await
434 }
435
436 async fn insert_merkle_nodes(
437 &mut self,
438 path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
439 traversal_path: Vec<usize>,
440 block_number: u64,
441 ) -> anyhow::Result<()> {
442 self.data_source
443 .insert_merkle_nodes(path, traversal_path, block_number)
444 .await
445 }
446}
447
448#[async_trait]
449impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
450where
451 D: ExplorerDataSource<Types> + Sync,
452 U: Send + Sync,
453 Types: NodeType,
454 Payload<Types>: QueryablePayload<Types>,
455 Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
456 Transaction<Types>: ExplorerTransaction<Types>,
457{
458 async fn get_block_detail(
459 &self,
460 request: explorer::query_data::BlockIdentifier<Types>,
461 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
462 {
463 self.data_source.get_block_detail(request).await
464 }
465
466 async fn get_block_summaries(
467 &self,
468 request: explorer::query_data::GetBlockSummariesRequest<Types>,
469 ) -> Result<
470 Vec<explorer::query_data::BlockSummary<Types>>,
471 explorer::query_data::GetBlockSummariesError,
472 > {
473 self.data_source.get_block_summaries(request).await
474 }
475
476 async fn get_transaction_detail(
477 &self,
478 request: explorer::query_data::TransactionIdentifier<Types>,
479 ) -> Result<
480 explorer::query_data::TransactionDetailResponse<Types>,
481 explorer::query_data::GetTransactionDetailError,
482 > {
483 self.data_source.get_transaction_detail(request).await
484 }
485
486 async fn get_transaction_summaries(
487 &self,
488 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
489 ) -> Result<
490 Vec<explorer::query_data::TransactionSummary<Types>>,
491 explorer::query_data::GetTransactionSummariesError,
492 > {
493 self.data_source.get_transaction_summaries(request).await
494 }
495
496 async fn get_explorer_summary(
497 &self,
498 ) -> Result<
499 explorer::query_data::ExplorerSummary<Types>,
500 explorer::query_data::GetExplorerSummaryError,
501 > {
502 self.data_source.get_explorer_summary().await
503 }
504
505 async fn get_search_results(
506 &self,
507 query: TaggedBase64,
508 ) -> Result<
509 explorer::query_data::SearchResult<Types>,
510 explorer::query_data::GetSearchResultsError,
511 > {
512 self.data_source.get_search_results(query).await
513 }
514}
515
516#[async_trait]
519impl<D, U, Types> EventsSource<Types> for ExtensibleDataSource<D, U>
520where
521 U: EventsSource<Types> + Sync,
522 D: Send + Sync,
523 Types: NodeType,
524{
525 type EventStream = BoxStream<'static, Arc<Event<Types>>>;
526 type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
527
528 async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
529 Box::pin(self.user_data.get_event_stream(filter).await)
530 }
531
532 async fn get_legacy_event_stream(
533 &self,
534 filter: Option<EventFilterSet<Types>>,
535 ) -> Self::LegacyEventStream {
536 Box::pin(self.user_data.get_legacy_event_stream(filter).await)
537 }
538
539 async fn get_startup_info(&self) -> StartupInfo<Types> {
540 self.user_data.get_startup_info().await
541 }
542}
543
544#[cfg(any(test, feature = "testing"))]
545mod impl_testable_data_source {
546 use hotshot::types::Event;
547
548 use super::*;
549 use crate::{
550 data_source::UpdateDataSource,
551 testing::{
552 consensus::{DataSourceLifeCycle, TestableDataSource},
553 mocks::MockTypes,
554 },
555 };
556
557 #[async_trait]
558 impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
559 where
560 D: TestableDataSource + UpdateDataSource<MockTypes>,
561 U: Clone + Default + Send + Sync + 'static,
562 {
563 type Storage = D::Storage;
564
565 async fn create(node_id: usize) -> Self::Storage {
566 D::create(node_id).await
567 }
568
569 async fn connect(storage: &Self::Storage) -> Self {
570 Self::new(D::connect(storage).await, Default::default())
571 }
572
573 async fn reset(storage: &Self::Storage) -> Self {
574 Self::new(D::reset(storage).await, Default::default())
575 }
576
577 async fn handle_event(&self, event: &Event<MockTypes>) {
578 self.update(event).await.unwrap();
579 }
580 }
581}
582
583#[cfg(test)]
584mod test {
585 use super::ExtensibleDataSource;
586 use crate::testing::consensus::MockDataSource;
587 use crate::*;
590
591 instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
592}