1use std::ops::{Bound, RangeBounds};
14
15use async_trait::async_trait;
16use hotshot_types::{data::VidShare, traits::node_implementation::NodeType};
17use jf_merkle_tree::prelude::MerkleProof;
18use tagged_base64::TaggedBase64;
19
20use super::VersionedDataSource;
21use crate::{
22 availability::{
23 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, Fetch, FetchStream, LeafId,
24 LeafQueryData, NamespaceId, PayloadMetadata, PayloadQueryData, QueryableHeader,
25 QueryablePayload, StateCertQueryData, TransactionHash, TransactionQueryData,
26 UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
27 },
28 data_source::storage::pruning::PrunedHeightDataSource,
29 explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
30 merklized_state::{
31 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
32 UpdateStateData,
33 },
34 metrics::PrometheusMetrics,
35 node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
36 status::{HasMetrics, StatusDataSource},
37 Header, Payload, QueryResult, Transaction,
38};
39#[derive(Clone, Copy, Debug)]
76pub struct ExtensibleDataSource<D, U> {
77 data_source: D,
78 user_data: U,
79}
80
81impl<D, U> ExtensibleDataSource<D, U> {
82 pub fn new(data_source: D, user_data: U) -> Self {
83 Self {
84 data_source,
85 user_data,
86 }
87 }
88
89 pub fn inner(&self) -> &D {
95 &self.data_source
96 }
97
98 pub fn inner_mut(&mut self) -> &mut D {
104 &mut self.data_source
105 }
106}
107
108impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
109 fn as_ref(&self) -> &U {
110 &self.user_data
111 }
112}
113
114impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
115 fn as_mut(&mut self) -> &mut U {
116 &mut self.user_data
117 }
118}
119
120impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
121where
122 D: VersionedDataSource + Send,
123 U: Send + Sync,
124{
125 type Transaction<'a>
126 = D::Transaction<'a>
127 where
128 Self: 'a;
129
130 type ReadOnly<'a>
131 = D::ReadOnly<'a>
132 where
133 Self: 'a;
134
135 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
136 self.data_source.write().await
137 }
138
139 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
140 self.data_source.read().await
141 }
142}
143
144#[async_trait]
145impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
146where
147 D: PrunedHeightDataSource + Send + Sync,
148 U: Send + Sync,
149{
150 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
151 self.data_source.load_pruned_height().await
152 }
153}
154
155#[async_trait]
156impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
157where
158 D: AvailabilityDataSource<Types> + Send + Sync,
159 U: Send + Sync,
160 Types: NodeType,
161 Header<Types>: QueryableHeader<Types>,
162 Payload<Types>: QueryablePayload<Types>,
163{
164 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
165 where
166 ID: Into<LeafId<Types>> + Send + Sync,
167 {
168 self.data_source.get_leaf(id).await
169 }
170
171 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
172 where
173 ID: Into<BlockId<Types>> + Send + Sync,
174 {
175 self.data_source.get_header(id).await
176 }
177
178 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
179 where
180 ID: Into<BlockId<Types>> + Send + Sync,
181 {
182 self.data_source.get_block(id).await
183 }
184 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
185 where
186 ID: Into<BlockId<Types>> + Send + Sync,
187 {
188 self.data_source.get_payload(id).await
189 }
190 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
191 where
192 ID: Into<BlockId<Types>> + Send + Sync,
193 {
194 self.data_source.get_payload_metadata(id).await
195 }
196 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
197 where
198 ID: Into<BlockId<Types>> + Send + Sync,
199 {
200 self.data_source.get_vid_common(id).await
201 }
202 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
203 where
204 ID: Into<BlockId<Types>> + Send + Sync,
205 {
206 self.data_source.get_vid_common_metadata(id).await
207 }
208 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
209 where
210 R: RangeBounds<usize> + Send + 'static,
211 {
212 self.data_source.get_leaf_range(range).await
213 }
214 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
215 where
216 R: RangeBounds<usize> + Send + 'static,
217 {
218 self.data_source.get_block_range(range).await
219 }
220
221 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
222 where
223 R: RangeBounds<usize> + Send + 'static,
224 {
225 self.data_source.get_header_range(range).await
226 }
227 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
228 where
229 R: RangeBounds<usize> + Send + 'static,
230 {
231 self.data_source.get_payload_range(range).await
232 }
233 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
234 where
235 R: RangeBounds<usize> + Send + 'static,
236 {
237 self.data_source.get_payload_metadata_range(range).await
238 }
239 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
240 where
241 R: RangeBounds<usize> + Send + 'static,
242 {
243 self.data_source.get_vid_common_range(range).await
244 }
245 async fn get_vid_common_metadata_range<R>(
246 &self,
247 range: R,
248 ) -> FetchStream<VidCommonMetadata<Types>>
249 where
250 R: RangeBounds<usize> + Send + 'static,
251 {
252 self.data_source.get_vid_common_metadata_range(range).await
253 }
254
255 async fn get_leaf_range_rev(
256 &self,
257 start: Bound<usize>,
258 end: usize,
259 ) -> FetchStream<LeafQueryData<Types>> {
260 self.data_source.get_leaf_range_rev(start, end).await
261 }
262 async fn get_block_range_rev(
263 &self,
264 start: Bound<usize>,
265 end: usize,
266 ) -> FetchStream<BlockQueryData<Types>> {
267 self.data_source.get_block_range_rev(start, end).await
268 }
269 async fn get_payload_range_rev(
270 &self,
271 start: Bound<usize>,
272 end: usize,
273 ) -> FetchStream<PayloadQueryData<Types>> {
274 self.data_source.get_payload_range_rev(start, end).await
275 }
276 async fn get_payload_metadata_range_rev(
277 &self,
278 start: Bound<usize>,
279 end: usize,
280 ) -> FetchStream<PayloadMetadata<Types>> {
281 self.data_source
282 .get_payload_metadata_range_rev(start, end)
283 .await
284 }
285 async fn get_vid_common_range_rev(
286 &self,
287 start: Bound<usize>,
288 end: usize,
289 ) -> FetchStream<VidCommonQueryData<Types>> {
290 self.data_source.get_vid_common_range_rev(start, end).await
291 }
292 async fn get_vid_common_metadata_range_rev(
293 &self,
294 start: Bound<usize>,
295 end: usize,
296 ) -> FetchStream<VidCommonMetadata<Types>> {
297 self.data_source
298 .get_vid_common_metadata_range_rev(start, end)
299 .await
300 }
301 async fn get_transaction(
302 &self,
303 hash: TransactionHash<Types>,
304 ) -> Fetch<TransactionQueryData<Types>> {
305 self.data_source.get_transaction(hash).await
306 }
307 async fn get_state_cert(&self, epoch: u64) -> Fetch<StateCertQueryData<Types>> {
308 self.data_source.get_state_cert(epoch).await
309 }
310}
311
312impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
313where
314 D: UpdateAvailabilityData<Types> + Send + Sync,
315 U: Send + Sync,
316 Types: NodeType,
317{
318 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
319 self.data_source.append(info).await
320 }
321}
322
323#[async_trait]
324impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
325where
326 D: NodeDataSource<Types> + Send + Sync,
327 U: Send + Sync,
328 Types: NodeType,
329 Header<Types>: QueryableHeader<Types>,
330{
331 async fn block_height(&self) -> QueryResult<usize> {
332 self.data_source.block_height().await
333 }
334 async fn count_transactions_in_range(
335 &self,
336 range: impl RangeBounds<usize> + Send,
337 namespace: Option<NamespaceId<Types>>,
338 ) -> QueryResult<usize> {
339 self.data_source
340 .count_transactions_in_range(range, namespace)
341 .await
342 }
343 async fn payload_size_in_range(
344 &self,
345 range: impl RangeBounds<usize> + Send,
346 namespace: Option<NamespaceId<Types>>,
347 ) -> QueryResult<usize> {
348 self.data_source
349 .payload_size_in_range(range, namespace)
350 .await
351 }
352 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
353 where
354 ID: Into<BlockId<Types>> + Send + Sync,
355 {
356 self.data_source.vid_share(id).await
357 }
358 async fn sync_status(&self) -> QueryResult<SyncStatus> {
359 self.data_source.sync_status().await
360 }
361 async fn get_header_window(
362 &self,
363 start: impl Into<WindowStart<Types>> + Send + Sync,
364 end: u64,
365 limit: usize,
366 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
367 self.data_source.get_header_window(start, end, limit).await
368 }
369}
370
371impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
372where
373 D: HasMetrics,
374{
375 fn metrics(&self) -> &PrometheusMetrics {
376 self.data_source.metrics()
377 }
378}
379
380#[async_trait]
381impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
382where
383 D: StatusDataSource + Send + Sync,
384 U: Send + Sync,
385{
386 async fn block_height(&self) -> QueryResult<usize> {
387 self.data_source.block_height().await
388 }
389}
390
391#[async_trait]
392impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
393 for ExtensibleDataSource<D, U>
394where
395 D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
396 U: Send + Sync,
397 Types: NodeType,
398 State: MerklizedState<Types, ARITY>,
399{
400 async fn get_path(
401 &self,
402 snapshot: Snapshot<Types, State, ARITY>,
403 key: State::Key,
404 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
405 self.data_source.get_path(snapshot, key).await
406 }
407}
408
409#[async_trait]
410impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
411where
412 D: MerklizedStateHeightPersistence + Sync,
413 U: Send + Sync,
414{
415 async fn get_last_state_height(&self) -> QueryResult<usize> {
416 self.data_source.get_last_state_height().await
417 }
418}
419
420#[async_trait]
421impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
422 for ExtensibleDataSource<D, U>
423where
424 D: UpdateStateData<Types, State, ARITY> + Send + Sync,
425 U: Send + Sync,
426 State: MerklizedState<Types, ARITY>,
427 Types: NodeType,
428{
429 async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
430 self.data_source.set_last_state_height(height).await
431 }
432
433 async fn insert_merkle_nodes(
434 &mut self,
435 path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
436 traversal_path: Vec<usize>,
437 block_number: u64,
438 ) -> anyhow::Result<()> {
439 self.data_source
440 .insert_merkle_nodes(path, traversal_path, block_number)
441 .await
442 }
443}
444
445#[async_trait]
446impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
447where
448 D: ExplorerDataSource<Types> + Sync,
449 U: Send + Sync,
450 Types: NodeType,
451 Payload<Types>: QueryablePayload<Types>,
452 Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
453 Transaction<Types>: ExplorerTransaction<Types>,
454{
455 async fn get_block_detail(
456 &self,
457 request: explorer::query_data::BlockIdentifier<Types>,
458 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
459 {
460 self.data_source.get_block_detail(request).await
461 }
462
463 async fn get_block_summaries(
464 &self,
465 request: explorer::query_data::GetBlockSummariesRequest<Types>,
466 ) -> Result<
467 Vec<explorer::query_data::BlockSummary<Types>>,
468 explorer::query_data::GetBlockSummariesError,
469 > {
470 self.data_source.get_block_summaries(request).await
471 }
472
473 async fn get_transaction_detail(
474 &self,
475 request: explorer::query_data::TransactionIdentifier<Types>,
476 ) -> Result<
477 explorer::query_data::TransactionDetailResponse<Types>,
478 explorer::query_data::GetTransactionDetailError,
479 > {
480 self.data_source.get_transaction_detail(request).await
481 }
482
483 async fn get_transaction_summaries(
484 &self,
485 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
486 ) -> Result<
487 Vec<explorer::query_data::TransactionSummary<Types>>,
488 explorer::query_data::GetTransactionSummariesError,
489 > {
490 self.data_source.get_transaction_summaries(request).await
491 }
492
493 async fn get_explorer_summary(
494 &self,
495 ) -> Result<
496 explorer::query_data::ExplorerSummary<Types>,
497 explorer::query_data::GetExplorerSummaryError,
498 > {
499 self.data_source.get_explorer_summary().await
500 }
501
502 async fn get_search_results(
503 &self,
504 query: TaggedBase64,
505 ) -> Result<
506 explorer::query_data::SearchResult<Types>,
507 explorer::query_data::GetSearchResultsError,
508 > {
509 self.data_source.get_search_results(query).await
510 }
511}
512
513#[cfg(any(test, feature = "testing"))]
514mod impl_testable_data_source {
515 use hotshot::types::Event;
516
517 use super::*;
518 use crate::{
519 data_source::UpdateDataSource,
520 testing::{
521 consensus::{DataSourceLifeCycle, TestableDataSource},
522 mocks::MockTypes,
523 },
524 };
525
526 #[async_trait]
527 impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
528 where
529 D: TestableDataSource + UpdateDataSource<MockTypes>,
530 U: Clone + Default + Send + Sync + 'static,
531 {
532 type Storage = D::Storage;
533
534 async fn create(node_id: usize) -> Self::Storage {
535 D::create(node_id).await
536 }
537
538 async fn connect(storage: &Self::Storage) -> Self {
539 Self::new(D::connect(storage).await, Default::default())
540 }
541
542 async fn reset(storage: &Self::Storage) -> Self {
543 Self::new(D::reset(storage).await, Default::default())
544 }
545
546 async fn handle_event(&self, event: &Event<MockTypes>) {
547 self.update(event).await.unwrap();
548 }
549 }
550}
551
552#[cfg(test)]
553mod test {
554 use super::ExtensibleDataSource;
555 use crate::testing::consensus::MockDataSource;
556 use crate::*;
559
560 instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
561}