1use std::{
14 ops::{Bound, RangeBounds},
15 sync::Arc,
16};
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use hotshot::types::Event;
21use hotshot_events_service::events_source::{EventFilterSet, EventsSource, StartupInfo};
22use hotshot_types::{data::VidShare, event::LegacyEvent, traits::node_implementation::NodeType};
23use jf_merkle_tree::prelude::MerkleProof;
24use tagged_base64::TaggedBase64;
25
26use super::VersionedDataSource;
27use crate::{
28 availability::{
29 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
30 FetchStream, LeafId, LeafQueryData, NamespaceId, PayloadMetadata, PayloadQueryData,
31 QueryableHeader, QueryablePayload, StateCertQueryDataV2, TransactionHash,
32 UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
33 },
34 data_source::storage::pruning::PrunedHeightDataSource,
35 explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
36 merklized_state::{
37 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
38 UpdateStateData,
39 },
40 metrics::PrometheusMetrics,
41 node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
42 status::{HasMetrics, StatusDataSource},
43 Header, Payload, QueryResult, Transaction,
44};
45#[derive(Clone, Copy, Debug)]
82pub struct ExtensibleDataSource<D, U> {
83 data_source: D,
84 user_data: U,
85}
86
87impl<D, U> ExtensibleDataSource<D, U> {
88 pub fn new(data_source: D, user_data: U) -> Self {
89 Self {
90 data_source,
91 user_data,
92 }
93 }
94
95 pub fn inner(&self) -> &D {
101 &self.data_source
102 }
103
104 pub fn inner_mut(&mut self) -> &mut D {
110 &mut self.data_source
111 }
112}
113
114impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
115 fn as_ref(&self) -> &U {
116 &self.user_data
117 }
118}
119
120impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
121 fn as_mut(&mut self) -> &mut U {
122 &mut self.user_data
123 }
124}
125
126impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
127where
128 D: VersionedDataSource + Send,
129 U: Send + Sync,
130{
131 type Transaction<'a>
132 = D::Transaction<'a>
133 where
134 Self: 'a;
135
136 type ReadOnly<'a>
137 = D::ReadOnly<'a>
138 where
139 Self: 'a;
140
141 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
142 self.data_source.write().await
143 }
144
145 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
146 self.data_source.read().await
147 }
148}
149
150#[async_trait]
151impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
152where
153 D: PrunedHeightDataSource + Send + Sync,
154 U: Send + Sync,
155{
156 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
157 self.data_source.load_pruned_height().await
158 }
159}
160
161#[async_trait]
162impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
163where
164 D: AvailabilityDataSource<Types> + Send + Sync,
165 U: Send + Sync,
166 Types: NodeType,
167 Header<Types>: QueryableHeader<Types>,
168 Payload<Types>: QueryablePayload<Types>,
169{
170 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
171 where
172 ID: Into<LeafId<Types>> + Send + Sync,
173 {
174 self.data_source.get_leaf(id).await
175 }
176
177 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
178 where
179 ID: Into<BlockId<Types>> + Send + Sync,
180 {
181 self.data_source.get_header(id).await
182 }
183
184 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
185 where
186 ID: Into<BlockId<Types>> + Send + Sync,
187 {
188 self.data_source.get_block(id).await
189 }
190 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
191 where
192 ID: Into<BlockId<Types>> + Send + Sync,
193 {
194 self.data_source.get_payload(id).await
195 }
196 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
197 where
198 ID: Into<BlockId<Types>> + Send + Sync,
199 {
200 self.data_source.get_payload_metadata(id).await
201 }
202 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
203 where
204 ID: Into<BlockId<Types>> + Send + Sync,
205 {
206 self.data_source.get_vid_common(id).await
207 }
208 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
209 where
210 ID: Into<BlockId<Types>> + Send + Sync,
211 {
212 self.data_source.get_vid_common_metadata(id).await
213 }
214 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
215 where
216 R: RangeBounds<usize> + Send + 'static,
217 {
218 self.data_source.get_leaf_range(range).await
219 }
220 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
221 where
222 R: RangeBounds<usize> + Send + 'static,
223 {
224 self.data_source.get_block_range(range).await
225 }
226
227 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
228 where
229 R: RangeBounds<usize> + Send + 'static,
230 {
231 self.data_source.get_header_range(range).await
232 }
233 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
234 where
235 R: RangeBounds<usize> + Send + 'static,
236 {
237 self.data_source.get_payload_range(range).await
238 }
239 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
240 where
241 R: RangeBounds<usize> + Send + 'static,
242 {
243 self.data_source.get_payload_metadata_range(range).await
244 }
245 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
246 where
247 R: RangeBounds<usize> + Send + 'static,
248 {
249 self.data_source.get_vid_common_range(range).await
250 }
251 async fn get_vid_common_metadata_range<R>(
252 &self,
253 range: R,
254 ) -> FetchStream<VidCommonMetadata<Types>>
255 where
256 R: RangeBounds<usize> + Send + 'static,
257 {
258 self.data_source.get_vid_common_metadata_range(range).await
259 }
260
261 async fn get_leaf_range_rev(
262 &self,
263 start: Bound<usize>,
264 end: usize,
265 ) -> FetchStream<LeafQueryData<Types>> {
266 self.data_source.get_leaf_range_rev(start, end).await
267 }
268 async fn get_block_range_rev(
269 &self,
270 start: Bound<usize>,
271 end: usize,
272 ) -> FetchStream<BlockQueryData<Types>> {
273 self.data_source.get_block_range_rev(start, end).await
274 }
275 async fn get_payload_range_rev(
276 &self,
277 start: Bound<usize>,
278 end: usize,
279 ) -> FetchStream<PayloadQueryData<Types>> {
280 self.data_source.get_payload_range_rev(start, end).await
281 }
282 async fn get_payload_metadata_range_rev(
283 &self,
284 start: Bound<usize>,
285 end: usize,
286 ) -> FetchStream<PayloadMetadata<Types>> {
287 self.data_source
288 .get_payload_metadata_range_rev(start, end)
289 .await
290 }
291 async fn get_vid_common_range_rev(
292 &self,
293 start: Bound<usize>,
294 end: usize,
295 ) -> FetchStream<VidCommonQueryData<Types>> {
296 self.data_source.get_vid_common_range_rev(start, end).await
297 }
298 async fn get_vid_common_metadata_range_rev(
299 &self,
300 start: Bound<usize>,
301 end: usize,
302 ) -> FetchStream<VidCommonMetadata<Types>> {
303 self.data_source
304 .get_vid_common_metadata_range_rev(start, end)
305 .await
306 }
307 async fn get_block_containing_transaction(
308 &self,
309 h: TransactionHash<Types>,
310 ) -> Fetch<BlockWithTransaction<Types>> {
311 self.data_source.get_block_containing_transaction(h).await
312 }
313 async fn get_state_cert(&self, epoch: u64) -> Fetch<StateCertQueryDataV2<Types>> {
314 self.data_source.get_state_cert(epoch).await
315 }
316}
317
318impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
319where
320 D: UpdateAvailabilityData<Types> + Send + Sync,
321 U: Send + Sync,
322 Types: NodeType,
323{
324 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
325 self.data_source.append(info).await
326 }
327}
328
329#[async_trait]
330impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
331where
332 D: NodeDataSource<Types> + Send + Sync,
333 U: Send + Sync,
334 Types: NodeType,
335 Header<Types>: QueryableHeader<Types>,
336{
337 async fn block_height(&self) -> QueryResult<usize> {
338 self.data_source.block_height().await
339 }
340 async fn count_transactions_in_range(
341 &self,
342 range: impl RangeBounds<usize> + Send,
343 namespace: Option<NamespaceId<Types>>,
344 ) -> QueryResult<usize> {
345 self.data_source
346 .count_transactions_in_range(range, namespace)
347 .await
348 }
349 async fn payload_size_in_range(
350 &self,
351 range: impl RangeBounds<usize> + Send,
352 namespace: Option<NamespaceId<Types>>,
353 ) -> QueryResult<usize> {
354 self.data_source
355 .payload_size_in_range(range, namespace)
356 .await
357 }
358 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
359 where
360 ID: Into<BlockId<Types>> + Send + Sync,
361 {
362 self.data_source.vid_share(id).await
363 }
364 async fn sync_status(&self) -> QueryResult<SyncStatus> {
365 self.data_source.sync_status().await
366 }
367 async fn get_header_window(
368 &self,
369 start: impl Into<WindowStart<Types>> + Send + Sync,
370 end: u64,
371 limit: usize,
372 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
373 self.data_source.get_header_window(start, end, limit).await
374 }
375}
376
377impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
378where
379 D: HasMetrics,
380{
381 fn metrics(&self) -> &PrometheusMetrics {
382 self.data_source.metrics()
383 }
384}
385
386#[async_trait]
387impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
388where
389 D: StatusDataSource + Send + Sync,
390 U: Send + Sync,
391{
392 async fn block_height(&self) -> QueryResult<usize> {
393 self.data_source.block_height().await
394 }
395}
396
397#[async_trait]
398impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
399 for ExtensibleDataSource<D, U>
400where
401 D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
402 U: Send + Sync,
403 Types: NodeType,
404 State: MerklizedState<Types, ARITY>,
405{
406 async fn get_path(
407 &self,
408 snapshot: Snapshot<Types, State, ARITY>,
409 key: State::Key,
410 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
411 self.data_source.get_path(snapshot, key).await
412 }
413}
414
415#[async_trait]
416impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
417where
418 D: MerklizedStateHeightPersistence + Sync,
419 U: Send + Sync,
420{
421 async fn get_last_state_height(&self) -> QueryResult<usize> {
422 self.data_source.get_last_state_height().await
423 }
424}
425
426#[async_trait]
427impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
428 for ExtensibleDataSource<D, U>
429where
430 D: UpdateStateData<Types, State, ARITY> + Send + Sync,
431 U: Send + Sync,
432 State: MerklizedState<Types, ARITY>,
433 Types: NodeType,
434{
435 async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
436 self.data_source.set_last_state_height(height).await
437 }
438
439 async fn insert_merkle_nodes(
440 &mut self,
441 path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
442 traversal_path: Vec<usize>,
443 block_number: u64,
444 ) -> anyhow::Result<()> {
445 self.data_source
446 .insert_merkle_nodes(path, traversal_path, block_number)
447 .await
448 }
449}
450
451#[async_trait]
452impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
453where
454 D: ExplorerDataSource<Types> + Sync,
455 U: Send + Sync,
456 Types: NodeType,
457 Payload<Types>: QueryablePayload<Types>,
458 Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
459 Transaction<Types>: ExplorerTransaction<Types>,
460{
461 async fn get_block_detail(
462 &self,
463 request: explorer::query_data::BlockIdentifier<Types>,
464 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
465 {
466 self.data_source.get_block_detail(request).await
467 }
468
469 async fn get_block_summaries(
470 &self,
471 request: explorer::query_data::GetBlockSummariesRequest<Types>,
472 ) -> Result<
473 Vec<explorer::query_data::BlockSummary<Types>>,
474 explorer::query_data::GetBlockSummariesError,
475 > {
476 self.data_source.get_block_summaries(request).await
477 }
478
479 async fn get_transaction_detail(
480 &self,
481 request: explorer::query_data::TransactionIdentifier<Types>,
482 ) -> Result<
483 explorer::query_data::TransactionDetailResponse<Types>,
484 explorer::query_data::GetTransactionDetailError,
485 > {
486 self.data_source.get_transaction_detail(request).await
487 }
488
489 async fn get_transaction_summaries(
490 &self,
491 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
492 ) -> Result<
493 Vec<explorer::query_data::TransactionSummary<Types>>,
494 explorer::query_data::GetTransactionSummariesError,
495 > {
496 self.data_source.get_transaction_summaries(request).await
497 }
498
499 async fn get_explorer_summary(
500 &self,
501 ) -> Result<
502 explorer::query_data::ExplorerSummary<Types>,
503 explorer::query_data::GetExplorerSummaryError,
504 > {
505 self.data_source.get_explorer_summary().await
506 }
507
508 async fn get_search_results(
509 &self,
510 query: TaggedBase64,
511 ) -> Result<
512 explorer::query_data::SearchResult<Types>,
513 explorer::query_data::GetSearchResultsError,
514 > {
515 self.data_source.get_search_results(query).await
516 }
517}
518
519#[async_trait]
522impl<D, U, Types> EventsSource<Types> for ExtensibleDataSource<D, U>
523where
524 U: EventsSource<Types> + Sync,
525 D: Send + Sync,
526 Types: NodeType,
527{
528 type EventStream = BoxStream<'static, Arc<Event<Types>>>;
529 type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
530
531 async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
532 Box::pin(self.user_data.get_event_stream(filter).await)
533 }
534
535 async fn get_legacy_event_stream(
536 &self,
537 filter: Option<EventFilterSet<Types>>,
538 ) -> Self::LegacyEventStream {
539 Box::pin(self.user_data.get_legacy_event_stream(filter).await)
540 }
541
542 async fn get_startup_info(&self) -> StartupInfo<Types> {
543 self.user_data.get_startup_info().await
544 }
545}
546
547#[cfg(any(test, feature = "testing"))]
548mod impl_testable_data_source {
549 use hotshot::types::Event;
550
551 use super::*;
552 use crate::{
553 data_source::UpdateDataSource,
554 testing::{
555 consensus::{DataSourceLifeCycle, TestableDataSource},
556 mocks::MockTypes,
557 },
558 };
559
560 #[async_trait]
561 impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
562 where
563 D: TestableDataSource + UpdateDataSource<MockTypes>,
564 U: Clone + Default + Send + Sync + 'static,
565 {
566 type Storage = D::Storage;
567
568 async fn create(node_id: usize) -> Self::Storage {
569 D::create(node_id).await
570 }
571
572 async fn connect(storage: &Self::Storage) -> Self {
573 Self::new(D::connect(storage).await, Default::default())
574 }
575
576 async fn reset(storage: &Self::Storage) -> Self {
577 Self::new(D::reset(storage).await, Default::default())
578 }
579
580 async fn handle_event(&self, event: &Event<MockTypes>) {
581 self.update(event).await.unwrap();
582 }
583 }
584}
585
586#[cfg(test)]
587mod test {
588 use super::ExtensibleDataSource;
589 use crate::testing::consensus::MockDataSource;
590 use crate::*;
593
594 instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
595}