1use std::{
55 cmp::{max, min},
56 fmt::{Debug, Display},
57 iter::repeat_with,
58 marker::PhantomData,
59 ops::{Bound, Range, RangeBounds},
60 sync::Arc,
61 time::Duration,
62};
63
64use anyhow::{Context, bail};
65use async_lock::Semaphore;
66use async_trait::async_trait;
67use backoff::{ExponentialBackoff, ExponentialBackoffBuilder, backoff::Backoff};
68use derivative::Derivative;
69use futures::{
70 channel::oneshot,
71 future::{self, BoxFuture, Either, Future, FutureExt, join_all},
72 stream::{self, BoxStream, StreamExt},
73};
74use hotshot_types::{
75 data::VidShare,
76 simple_certificate::CertificatePair,
77 traits::{
78 metrics::{Gauge, Metrics},
79 node_implementation::NodeType,
80 },
81};
82use jf_merkle_tree_compat::{MerkleTreeScheme, prelude::MerkleProof};
83use tagged_base64::TaggedBase64;
84use tokio::{spawn, time::sleep};
85use tracing::Instrument;
86
87use super::{
88 Transaction, VersionedDataSource,
89 notifier::Notifier,
90 storage::{
91 Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
92 MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
93 UpdateAvailabilityStorage,
94 pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
95 },
96};
97use crate::{
98 Header, Payload, QueryError, QueryResult,
99 availability::{
100 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
101 FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
102 PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
103 UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
104 },
105 explorer::{self, ExplorerDataSource},
106 fetching::{self, Provider, request},
107 merklized_state::{
108 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
109 },
110 metrics::PrometheusMetrics,
111 node::{
112 NodeDataSource, SyncStatus, SyncStatusQueryData, SyncStatusRange, TimeWindowQueryData,
113 WindowStart,
114 },
115 status::{HasMetrics, StatusDataSource},
116 task::BackgroundTask,
117 types::HeightIndexed,
118};
119
120mod block;
121mod header;
122mod leaf;
123mod transaction;
124mod vid;
125
126use self::{
127 block::PayloadFetcher,
128 leaf::LeafFetcher,
129 transaction::TransactionRequest,
130 vid::{VidCommonFetcher, VidCommonRequest},
131};
132
133pub struct Builder<Types, S, P> {
135 storage: S,
136 provider: P,
137 backoff: ExponentialBackoffBuilder,
138 rate_limit: usize,
139 range_chunk_size: usize,
140 proactive_interval: Duration,
141 proactive_range_chunk_size: Option<usize>,
142 sync_status_chunk_size: usize,
143 active_fetch_delay: Duration,
144 chunk_fetch_delay: Duration,
145 proactive_fetching: bool,
146 aggregator: bool,
147 aggregator_chunk_size: Option<usize>,
148 leaf_only: bool,
149 _types: PhantomData<Types>,
150}
151
152impl<Types, S, P> Builder<Types, S, P> {
153 pub fn new(storage: S, provider: P) -> Self {
155 let mut default_backoff = ExponentialBackoffBuilder::default();
156 default_backoff
157 .with_initial_interval(Duration::from_secs(1))
158 .with_multiplier(2.)
159 .with_max_interval(Duration::from_secs(32))
160 .with_max_elapsed_time(Some(Duration::from_secs(64)));
161
162 Self {
163 storage,
164 provider,
165 backoff: default_backoff,
166 rate_limit: 32,
167 range_chunk_size: 25,
168 proactive_interval: Duration::from_hours(8),
169 proactive_range_chunk_size: None,
170 sync_status_chunk_size: 100_000,
171 active_fetch_delay: Duration::from_millis(50),
172 chunk_fetch_delay: Duration::from_millis(100),
173 proactive_fetching: true,
174 aggregator: true,
175 aggregator_chunk_size: None,
176 leaf_only: false,
177 _types: Default::default(),
178 }
179 }
180
181 pub fn leaf_only(mut self) -> Self {
182 self.leaf_only = true;
183 self
184 }
185
186 pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
188 self.backoff.with_initial_interval(interval);
189 self
190 }
191
192 pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
194 self.backoff.with_max_interval(interval);
195 self
196 }
197
198 pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
200 self.backoff.with_multiplier(multiplier);
201 self
202 }
203
204 pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
206 self.backoff.with_randomization_factor(factor);
207 self
208 }
209
210 pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
212 self.backoff.with_max_elapsed_time(Some(timeout));
213 self
214 }
215
216 pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
218 self.rate_limit = with_rate_limit;
219 self
220 }
221
222 pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
229 self.range_chunk_size = range_chunk_size;
230 self
231 }
232
233 pub fn with_proactive_interval(mut self, interval: Duration) -> Self {
237 self.proactive_interval = interval;
238 self
239 }
240
241 pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
250 self.proactive_range_chunk_size = Some(range_chunk_size);
251 self
252 }
253
254 pub fn with_sync_status_chunk_size(mut self, chunk_size: usize) -> Self {
257 self.sync_status_chunk_size = chunk_size;
258 self
259 }
260
261 pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
267 self.active_fetch_delay = active_fetch_delay;
268 self
269 }
270
271 pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
283 self.chunk_fetch_delay = chunk_fetch_delay;
284 self
285 }
286
287 pub fn disable_proactive_fetching(mut self) -> Self {
296 self.proactive_fetching = false;
297 self
298 }
299
300 pub fn disable_aggregator(mut self) -> Self {
305 self.aggregator = false;
306 self
307 }
308
309 pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
318 self.aggregator_chunk_size = Some(chunk_size);
319 self
320 }
321
322 pub fn is_leaf_only(&self) -> bool {
323 self.leaf_only
324 }
325}
326
327impl<Types, S, P> Builder<Types, S, P>
328where
329 Types: NodeType,
330 Payload<Types>: QueryablePayload<Types>,
331 Header<Types>: QueryableHeader<Types>,
332 S: PruneStorage + VersionedDataSource + HasMetrics + 'static,
333 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
334 + PrunedHeightStorage
335 + NodeStorage<Types>
336 + AggregatesStorage<Types>,
337 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
338 P: AvailabilityProvider<Types>,
339{
340 pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
342 FetchingDataSource::new(self).await
343 }
344}
345
346#[derive(Derivative)]
363#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
364pub struct FetchingDataSource<Types, S, P>
365where
366 Types: NodeType,
367{
368 fetcher: Arc<Fetcher<Types, S, P>>,
373 scanner: Option<BackgroundTask>,
375 aggregator: Option<BackgroundTask>,
377 pruner: Pruner<Types, S>,
378}
379
380#[derive(Derivative)]
381#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, "))]
382pub struct Pruner<Types, S>
383where
384 Types: NodeType,
385{
386 handle: Option<BackgroundTask>,
387 _types: PhantomData<(Types, S)>,
388}
389
390impl<Types, S> Pruner<Types, S>
391where
392 Types: NodeType,
393 Header<Types>: QueryableHeader<Types>,
394 Payload<Types>: QueryablePayload<Types>,
395 S: PruneStorage + Send + Sync + 'static,
396{
397 async fn new(storage: Arc<S>) -> Self {
398 let cfg = storage.get_pruning_config();
399 let Some(cfg) = cfg else {
400 return Self {
401 handle: None,
402 _types: Default::default(),
403 };
404 };
405
406 let future = async move {
407 for i in 1.. {
408 tracing::warn!("starting pruner run {i} ");
409 Self::prune(storage.clone()).await;
410 sleep(cfg.interval()).await;
411 }
412 };
413
414 let task = BackgroundTask::spawn("pruner", future);
415
416 Self {
417 handle: Some(task),
418 _types: Default::default(),
419 }
420 }
421
422 async fn prune(storage: Arc<S>) {
423 let mut pruner = S::Pruner::default();
425 loop {
426 match storage.prune(&mut pruner).await {
427 Ok(Some(height)) => {
428 tracing::warn!("Pruned to height {height}");
429 },
430 Ok(None) => {
431 tracing::warn!("pruner run complete.");
432 break;
433 },
434 Err(e) => {
435 tracing::error!("pruner run failed: {e:?}");
436 break;
437 },
438 }
439 }
440 }
441}
442
443impl<Types, S, P> FetchingDataSource<Types, S, P>
444where
445 Types: NodeType,
446 Payload<Types>: QueryablePayload<Types>,
447 Header<Types>: QueryableHeader<Types>,
448 S: VersionedDataSource + PruneStorage + HasMetrics + 'static,
449 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
450 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
451 + NodeStorage<Types>
452 + PrunedHeightStorage
453 + AggregatesStorage<Types>,
454 P: AvailabilityProvider<Types>,
455{
456 pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
458 Builder::new(storage, provider)
459 }
460
461 async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
462 let leaf_only = builder.is_leaf_only();
463 let aggregator = builder.aggregator;
464 let aggregator_chunk_size = builder
465 .aggregator_chunk_size
466 .unwrap_or(builder.range_chunk_size);
467 let proactive_fetching = builder.proactive_fetching;
468 let proactive_interval = builder.proactive_interval;
469 let proactive_range_chunk_size = builder
470 .proactive_range_chunk_size
471 .unwrap_or(builder.range_chunk_size);
472 let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
473 let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
474
475 let fetcher = Arc::new(Fetcher::new(builder).await?);
476 let scanner = if proactive_fetching && !leaf_only {
477 Some(BackgroundTask::spawn(
478 "proactive scanner",
479 fetcher.clone().proactive_scan(
480 proactive_interval,
481 proactive_range_chunk_size,
482 scanner_metrics,
483 ),
484 ))
485 } else {
486 None
487 };
488
489 let aggregator = if aggregator && !leaf_only {
490 Some(BackgroundTask::spawn(
491 "aggregator",
492 fetcher
493 .clone()
494 .aggregate(aggregator_chunk_size, aggregator_metrics),
495 ))
496 } else {
497 None
498 };
499
500 let storage = fetcher.storage.clone();
501
502 let pruner = Pruner::new(storage).await;
503 let ds = Self {
504 fetcher,
505 scanner,
506 pruner,
507 aggregator,
508 };
509
510 Ok(ds)
511 }
512
513 pub fn inner(&self) -> Arc<S> {
515 self.fetcher.storage.clone()
516 }
517}
518
519impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
520where
521 Types: NodeType,
522{
523 fn as_ref(&self) -> &S {
524 &self.fetcher.storage
525 }
526}
527
528impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
529where
530 Types: NodeType,
531 S: HasMetrics,
532{
533 fn metrics(&self) -> &PrometheusMetrics {
534 self.as_ref().metrics()
535 }
536}
537
538#[async_trait]
539impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
540where
541 Types: NodeType,
542 Header<Types>: QueryableHeader<Types>,
543 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
544 for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
545 P: Send + Sync,
546{
547 async fn block_height(&self) -> QueryResult<usize> {
548 let mut tx = self.read().await.map_err(|err| QueryError::Error {
549 message: err.to_string(),
550 })?;
551 tx.block_height().await
552 }
553}
554
555#[async_trait]
556impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
557where
558 Types: NodeType,
559 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
560 for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
561 P: Send + Sync,
562{
563 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
564 let mut tx = self.read().await?;
565 tx.load_pruned_height().await
566 }
567}
568
569#[async_trait]
570impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
571where
572 Types: NodeType,
573 Header<Types>: QueryableHeader<Types>,
574 Payload<Types>: QueryablePayload<Types>,
575 S: VersionedDataSource + 'static,
576 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
577 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
578 P: AvailabilityProvider<Types>,
579{
580 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
581 where
582 ID: Into<LeafId<Types>> + Send + Sync,
583 {
584 self.fetcher.get(id.into()).await
585 }
586
587 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
588 where
589 ID: Into<BlockId<Types>> + Send + Sync,
590 {
591 self.fetcher
592 .get::<HeaderQueryData<_>>(id.into())
593 .await
594 .map(|h| h.header)
595 }
596
597 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
598 where
599 ID: Into<BlockId<Types>> + Send + Sync,
600 {
601 self.fetcher.get(id.into()).await
602 }
603
604 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
605 where
606 ID: Into<BlockId<Types>> + Send + Sync,
607 {
608 self.fetcher.get(id.into()).await
609 }
610
611 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
612 where
613 ID: Into<BlockId<Types>> + Send + Sync,
614 {
615 self.fetcher.get(id.into()).await
616 }
617
618 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
619 where
620 ID: Into<BlockId<Types>> + Send + Sync,
621 {
622 self.fetcher.get(VidCommonRequest::from(id.into())).await
623 }
624
625 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
626 where
627 ID: Into<BlockId<Types>> + Send + Sync,
628 {
629 self.fetcher.get(VidCommonRequest::from(id.into())).await
630 }
631
632 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
633 where
634 R: RangeBounds<usize> + Send + 'static,
635 {
636 self.fetcher.clone().get_range(range)
637 }
638
639 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
640 where
641 R: RangeBounds<usize> + Send + 'static,
642 {
643 self.fetcher.clone().get_range(range)
644 }
645
646 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
647 where
648 R: RangeBounds<usize> + Send + 'static,
649 {
650 let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
651
652 leaves
653 .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
654 .boxed()
655 }
656
657 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
658 where
659 R: RangeBounds<usize> + Send + 'static,
660 {
661 self.fetcher.clone().get_range(range)
662 }
663
664 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
665 where
666 R: RangeBounds<usize> + Send + 'static,
667 {
668 self.fetcher.clone().get_range(range)
669 }
670
671 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
672 where
673 R: RangeBounds<usize> + Send + 'static,
674 {
675 self.fetcher.clone().get_range(range)
676 }
677
678 async fn get_vid_common_metadata_range<R>(
679 &self,
680 range: R,
681 ) -> FetchStream<VidCommonMetadata<Types>>
682 where
683 R: RangeBounds<usize> + Send + 'static,
684 {
685 self.fetcher.clone().get_range(range)
686 }
687
688 async fn get_leaf_range_rev(
689 &self,
690 start: Bound<usize>,
691 end: usize,
692 ) -> FetchStream<LeafQueryData<Types>> {
693 self.fetcher.clone().get_range_rev(start, end)
694 }
695
696 async fn get_block_range_rev(
697 &self,
698 start: Bound<usize>,
699 end: usize,
700 ) -> FetchStream<BlockQueryData<Types>> {
701 self.fetcher.clone().get_range_rev(start, end)
702 }
703
704 async fn get_payload_range_rev(
705 &self,
706 start: Bound<usize>,
707 end: usize,
708 ) -> FetchStream<PayloadQueryData<Types>> {
709 self.fetcher.clone().get_range_rev(start, end)
710 }
711
712 async fn get_payload_metadata_range_rev(
713 &self,
714 start: Bound<usize>,
715 end: usize,
716 ) -> FetchStream<PayloadMetadata<Types>> {
717 self.fetcher.clone().get_range_rev(start, end)
718 }
719
720 async fn get_vid_common_range_rev(
721 &self,
722 start: Bound<usize>,
723 end: usize,
724 ) -> FetchStream<VidCommonQueryData<Types>> {
725 self.fetcher.clone().get_range_rev(start, end)
726 }
727
728 async fn get_vid_common_metadata_range_rev(
729 &self,
730 start: Bound<usize>,
731 end: usize,
732 ) -> FetchStream<VidCommonMetadata<Types>> {
733 self.fetcher.clone().get_range_rev(start, end)
734 }
735
736 async fn get_block_containing_transaction(
737 &self,
738 h: TransactionHash<Types>,
739 ) -> Fetch<BlockWithTransaction<Types>> {
740 self.fetcher.clone().get(TransactionRequest::from(h)).await
741 }
742}
743
744impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
745where
746 Types: NodeType,
747 Header<Types>: QueryableHeader<Types>,
748 Payload<Types>: QueryablePayload<Types>,
749 S: VersionedDataSource + 'static,
750 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
751 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
752 P: AvailabilityProvider<Types>,
753{
754 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
755 let height = info.height() as usize;
756
757 self.fetcher
759 .store(&(info.leaf.clone(), info.qc_chain))
760 .await;
761
762 leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
764
765 let block = match info.block {
779 Some(block) => Some(block),
780 None => match self.fetcher.get::<BlockQueryData<Types>>(height).await {
781 Fetch::Ready(block) => Some(block),
782 Fetch::Pending(fut) => {
783 let span = tracing::info_span!("fetch missing block", height);
784 spawn(
785 async move {
786 tracing::info!("fetching missing block");
787 fut.await;
788 }
789 .instrument(span),
790 );
791 None
792 },
793 },
794 };
795 if let Some(block) = &block {
796 self.fetcher.store(block).await;
797 }
798 let vid = match info.vid_common {
799 Some(vid) => Some(vid),
800 None => match self.fetcher.get::<VidCommonQueryData<Types>>(height).await {
801 Fetch::Ready(vid) => Some(vid),
802 Fetch::Pending(fut) => {
803 let span = tracing::info_span!("fetch missing VID common", height);
804 spawn(
805 async move {
806 tracing::info!("fetching missing VID common");
807 fut.await;
808 }
809 .instrument(span),
810 );
811 None
812 },
813 },
814 };
815 if let Some(vid) = &vid {
816 self.fetcher.store(&(vid.clone(), info.vid_share)).await;
817 }
818
819 info.leaf.notify(&self.fetcher.notifiers).await;
825 if let Some(block) = &block {
826 block.notify(&self.fetcher.notifiers).await;
827 }
828 if let Some(vid) = &vid {
829 vid.notify(&self.fetcher.notifiers).await;
830 }
831
832 Ok(())
833 }
834}
835
836impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
837where
838 Types: NodeType,
839 S: VersionedDataSource + Send + Sync,
840 P: Send + Sync,
841{
842 type Transaction<'a>
843 = S::Transaction<'a>
844 where
845 Self: 'a;
846 type ReadOnly<'a>
847 = S::ReadOnly<'a>
848 where
849 Self: 'a;
850
851 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
852 self.fetcher.write().await
853 }
854
855 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
856 self.fetcher.read().await
857 }
858}
859
860#[derive(Debug)]
862struct Fetcher<Types, S, P>
863where
864 Types: NodeType,
865{
866 storage: Arc<S>,
867 notifiers: Notifiers<Types>,
868 provider: Arc<P>,
869 leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
870 payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
871 vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
872 range_chunk_size: usize,
873 sync_status_chunk_size: usize,
874 active_fetch_delay: Duration,
876 chunk_fetch_delay: Duration,
878 backoff: ExponentialBackoff,
880 retry_semaphore: Arc<Semaphore>,
883 leaf_only: bool,
884}
885
886impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
887where
888 Types: NodeType,
889 S: VersionedDataSource + Send + Sync,
890 P: Send + Sync,
891{
892 type Transaction<'a>
893 = S::Transaction<'a>
894 where
895 Self: 'a;
896 type ReadOnly<'a>
897 = S::ReadOnly<'a>
898 where
899 Self: 'a;
900
901 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
902 self.storage.write().await
903 }
904
905 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
906 self.storage.read().await
907 }
908}
909
910impl<Types, S, P> Fetcher<Types, S, P>
911where
912 Types: NodeType,
913 Header<Types>: QueryableHeader<Types>,
914 S: VersionedDataSource + Sync,
915 for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
916{
917 pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
918 let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
919 let backoff = builder.backoff.build();
920
921 let (payload_fetcher, vid_fetcher) = if builder.is_leaf_only() {
922 (None, None)
923 } else {
924 (
925 Some(Arc::new(fetching::Fetcher::new(
926 retry_semaphore.clone(),
927 backoff.clone(),
928 ))),
929 Some(Arc::new(fetching::Fetcher::new(
930 retry_semaphore.clone(),
931 backoff.clone(),
932 ))),
933 )
934 };
935 let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
936
937 let leaf_only = builder.leaf_only;
938
939 Ok(Self {
940 storage: Arc::new(builder.storage),
941 notifiers: Default::default(),
942 provider: Arc::new(builder.provider),
943 leaf_fetcher: Arc::new(leaf_fetcher),
944 payload_fetcher,
945 vid_common_fetcher: vid_fetcher,
946 range_chunk_size: builder.range_chunk_size,
947 sync_status_chunk_size: builder.sync_status_chunk_size,
948 active_fetch_delay: builder.active_fetch_delay,
949 chunk_fetch_delay: builder.chunk_fetch_delay,
950 backoff,
951 retry_semaphore,
952 leaf_only,
953 })
954 }
955}
956
957impl<Types, S, P> Fetcher<Types, S, P>
958where
959 Types: NodeType,
960 Header<Types>: QueryableHeader<Types>,
961 Payload<Types>: QueryablePayload<Types>,
962 S: VersionedDataSource + 'static,
963 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
964 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
965 P: AvailabilityProvider<Types>,
966{
967 async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
968 where
969 T: Fetchable<Types>,
970 {
971 let req = req.into();
972
973 let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
983
984 match self.try_get(req).await {
985 Ok(Some(obj)) => return Fetch::Ready(obj),
986 Ok(None) => return passive(req, passive_fetch),
987 Err(err) => {
988 tracing::warn!(
989 ?req,
990 "unable to fetch object; spawning a task to retry: {err:#}"
991 );
992 },
993 }
994
995 let (send, recv) = oneshot::channel();
997
998 let fetcher = self.clone();
999 let mut backoff = fetcher.backoff.clone();
1000 let span = tracing::warn_span!("get retry", ?req);
1001 spawn(
1002 async move {
1003 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1004 loop {
1005 let res = {
1006 let _guard = fetcher.retry_semaphore.acquire().await;
1010 fetcher.try_get(req).await
1011 };
1012 match res {
1013 Ok(Some(obj)) => {
1014 tracing::info!(?req, "object was ready after retries");
1018 send.send(obj).ok();
1019 break;
1020 },
1021 Ok(None) => {
1022 tracing::info!(?req, "spawned fetch after retries");
1026 break;
1027 },
1028 Err(err) => {
1029 tracing::warn!(
1030 ?req,
1031 ?delay,
1032 "unable to fetch object, will retry: {err:#}"
1033 );
1034 sleep(delay).await;
1035 if let Some(next_delay) = backoff.next_backoff() {
1036 delay = next_delay;
1037 }
1038 },
1039 }
1040 }
1041 }
1042 .instrument(span),
1043 );
1044
1045 passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1048 }
1049
1050 async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1061 where
1062 T: Fetchable<Types>,
1063 {
1064 let mut tx = self.read().await.context("opening read transaction")?;
1065 match T::load(&mut tx, req).await {
1066 Ok(t) => Ok(Some(t)),
1067 Err(QueryError::Missing | QueryError::NotFound) => {
1068 tracing::debug!(?req, "object missing from local storage, will try to fetch");
1071 self.fetch::<T>(&mut tx, req).await?;
1072 Ok(None)
1073 },
1074 Err(err) => {
1075 bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1078 },
1079 }
1080 }
1081
1082 fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1094 where
1095 R: RangeBounds<usize> + Send + 'static,
1096 T: RangedFetchable<Types>,
1097 {
1098 let chunk_size = self.range_chunk_size;
1099 self.get_range_with_chunk_size(chunk_size, range)
1100 }
1101
1102 fn get_range_with_chunk_size<R, T>(
1104 self: Arc<Self>,
1105 chunk_size: usize,
1106 range: R,
1107 ) -> BoxStream<'static, Fetch<T>>
1108 where
1109 R: RangeBounds<usize> + Send + 'static,
1110 T: RangedFetchable<Types>,
1111 {
1112 let chunk_fetch_delay = self.chunk_fetch_delay;
1113 let active_fetch_delay = self.active_fetch_delay;
1114
1115 stream::iter(range_chunks(range, chunk_size))
1116 .then(move |chunk| {
1117 let self_clone = self.clone();
1118 async move {
1119 {
1120 let chunk = self_clone.get_chunk(chunk).await;
1121
1122 sleep(chunk_fetch_delay).await;
1127 stream::iter(chunk)
1128 }
1129 }
1130 })
1131 .flatten()
1132 .then(move |f| async move {
1133 match f {
1134 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1138 Fetch::Ready(_) => (),
1139 };
1140 f
1141 })
1142 .boxed()
1143 }
1144
1145 fn get_range_rev<T>(
1152 self: Arc<Self>,
1153 start: Bound<usize>,
1154 end: usize,
1155 ) -> BoxStream<'static, Fetch<T>>
1156 where
1157 T: RangedFetchable<Types>,
1158 {
1159 let chunk_size = self.range_chunk_size;
1160 self.get_range_with_chunk_size_rev(chunk_size, start, end)
1161 }
1162
1163 fn get_range_with_chunk_size_rev<T>(
1165 self: Arc<Self>,
1166 chunk_size: usize,
1167 start: Bound<usize>,
1168 end: usize,
1169 ) -> BoxStream<'static, Fetch<T>>
1170 where
1171 T: RangedFetchable<Types>,
1172 {
1173 let chunk_fetch_delay = self.chunk_fetch_delay;
1174 let active_fetch_delay = self.active_fetch_delay;
1175
1176 stream::iter(range_chunks_rev(start, end, chunk_size))
1177 .then(move |chunk| {
1178 let self_clone = self.clone();
1179 async move {
1180 {
1181 let chunk = self_clone.get_chunk(chunk).await;
1182
1183 sleep(chunk_fetch_delay).await;
1188 stream::iter(chunk.into_iter().rev())
1189 }
1190 }
1191 })
1192 .flatten()
1193 .then(move |f| async move {
1194 match f {
1195 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1199 Fetch::Ready(_) => (),
1200 };
1201 f
1202 })
1203 .boxed()
1204 }
1205
1206 async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1213 where
1214 T: RangedFetchable<Types>,
1215 {
1216 let passive_fetches = join_all(
1220 chunk
1221 .clone()
1222 .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1223 )
1224 .await;
1225
1226 match self.try_get_chunk(&chunk).await {
1227 Ok(objs) => {
1228 return objs
1231 .into_iter()
1232 .zip(passive_fetches)
1233 .enumerate()
1234 .map(move |(i, (obj, passive_fetch))| match obj {
1235 Some(obj) => Fetch::Ready(obj),
1236 None => passive(T::Request::from(chunk.start + i), passive_fetch),
1237 })
1238 .collect();
1239 },
1240 Err(err) => {
1241 tracing::warn!(
1242 ?chunk,
1243 "unable to fetch chunk; spawning a task to retry: {err:#}"
1244 );
1245 },
1246 }
1247
1248 let (send, recv): (Vec<_>, Vec<_>) =
1250 repeat_with(oneshot::channel).take(chunk.len()).unzip();
1251
1252 {
1253 let fetcher = self.clone();
1254 let mut backoff = fetcher.backoff.clone();
1255 let chunk = chunk.clone();
1256 let span = tracing::warn_span!("get_chunk retry", ?chunk);
1257 spawn(
1258 async move {
1259 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1260 loop {
1261 let res = {
1262 let _guard = fetcher.retry_semaphore.acquire().await;
1267 fetcher.try_get_chunk(&chunk).await
1268 };
1269 match res {
1270 Ok(objs) => {
1271 for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1272 if let Some(obj) = obj {
1273 tracing::info!(?chunk, i, "object was ready after retries");
1277 sender.send(obj).ok();
1278 } else {
1279 tracing::info!(?chunk, i, "spawned fetch after retries");
1284 }
1285 }
1286 break;
1287 },
1288 Err(err) => {
1289 tracing::warn!(
1290 ?chunk,
1291 ?delay,
1292 "unable to fetch chunk, will retry: {err:#}"
1293 );
1294 sleep(delay).await;
1295 if let Some(next_delay) = backoff.next_backoff() {
1296 delay = next_delay;
1297 }
1298 },
1299 }
1300 }
1301 }
1302 .instrument(span),
1303 );
1304 }
1305
1306 passive_fetches
1309 .into_iter()
1310 .zip(recv)
1311 .enumerate()
1312 .map(move |(i, (passive_fetch, recv))| {
1313 passive(
1314 T::Request::from(chunk.start + i),
1315 select_some(passive_fetch, recv.map(Result::ok)),
1316 )
1317 })
1318 .collect()
1319 }
1320
1321 async fn try_get_chunk<T>(
1334 self: &Arc<Self>,
1335 chunk: &Range<usize>,
1336 ) -> anyhow::Result<Vec<Option<T>>>
1337 where
1338 T: RangedFetchable<Types>,
1339 {
1340 let mut tx = self.read().await.context("opening read transaction")?;
1341 let ts = T::load_range(&mut tx, chunk.clone())
1342 .await
1343 .context(format!("when fetching items in range {chunk:?}"))?;
1344
1345 let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1352
1353 let mut results = Vec::with_capacity(chunk.len());
1355 for t in ts {
1356 while chunk.start + results.len() < t.height() as usize {
1358 tracing::debug!(
1359 "item {} in chunk not available, will be fetched",
1360 results.len()
1361 );
1362 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1363 .await?;
1364 results.push(None);
1365 }
1366
1367 results.push(Some(t));
1368 }
1369 while results.len() < chunk.len() {
1371 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1372 .await?;
1373 results.push(None);
1374 }
1375
1376 Ok(results)
1377 }
1378
1379 async fn fetch<T>(
1385 self: &Arc<Self>,
1386 tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1387 req: T::Request,
1388 ) -> anyhow::Result<()>
1389 where
1390 T: Fetchable<Types>,
1391 {
1392 tracing::debug!("fetching resource {req:?}");
1393
1394 let heights = Heights::load(tx)
1396 .await
1397 .context("failed to load heights; cannot definitively say object might exist")?;
1398 if req.might_exist(heights) {
1399 T::active_fetch(tx, self.clone(), req).await?;
1400 } else {
1401 tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1402 }
1403 Ok(())
1404 }
1405
1406 async fn proactive_scan(
1412 self: Arc<Self>,
1413 interval: Duration,
1414 chunk_size: usize,
1415 metrics: ScannerMetrics,
1416 ) {
1417 for i in 0.. {
1418 let span = tracing::warn_span!("proactive scan", i);
1419 metrics.running.set(1);
1420 metrics.current_scan.set(i);
1421 async {
1422 let sync_status = {
1423 match self.sync_status().await {
1424 Ok(st) => st,
1425 Err(err) => {
1426 tracing::warn!(
1427 "unable to load sync status, scan will be skipped: {err:#}"
1428 );
1429 return;
1430 },
1431 }
1432 };
1433 tracing::info!(?sync_status, "starting scan");
1434 metrics.missing_blocks.set(sync_status.blocks.missing);
1435 metrics.missing_vid.set(sync_status.vid_common.missing);
1436
1437 for range in sync_status.blocks.ranges {
1440 metrics.scanned_blocks.set(range.start);
1441 if range.status != SyncStatus::Missing {
1442 continue;
1443 }
1444
1445 tracing::info!(?range, "fetching missing block range");
1446 self.clone()
1447 .get_range_with_chunk_size_rev::<PayloadMetadata<Types>>(
1454 chunk_size,
1455 Bound::Included(range.start),
1456 range.end - 1,
1457 )
1458 .then(|fetch| async move {fetch.await;})
1459 .collect::<()>()
1460 .await;
1461 metrics
1462 .missing_blocks
1463 .update((range.start as i64) - (range.end as i64));
1464 }
1465
1466 for range in sync_status.vid_common.ranges {
1468 metrics.scanned_vid.set(range.start);
1469 if range.status != SyncStatus::Missing {
1470 continue;
1471 }
1472
1473 tracing::info!(?range, "fetching missing VID range");
1474 self.clone()
1475 .get_range_with_chunk_size_rev::<VidCommonMetadata<Types>>(
1476 chunk_size,
1477 Bound::Included(range.start),
1478 range.end - 1,
1479 )
1480 .then(|fetch| async move {
1481 fetch.await;
1482 })
1483 .collect::<()>()
1484 .await;
1485 metrics
1486 .missing_vid
1487 .update((range.start as i64) - (range.end as i64));
1488 }
1489
1490 tracing::info!("completed proactive scan, will scan again in {interval:?}");
1491
1492 metrics.running.set(0);
1494 }
1495 .instrument(span)
1496 .await;
1497
1498 sleep(interval).await;
1499 }
1500 }
1501}
1502
1503impl<Types, S, P> Fetcher<Types, S, P>
1504where
1505 Types: NodeType,
1506 Header<Types>: QueryableHeader<Types>,
1507 S: VersionedDataSource + 'static,
1508 for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1509 P: Send + Sync,
1510{
1511 async fn sync_status(&self) -> anyhow::Result<SyncStatusQueryData> {
1512 let heights = {
1513 let mut tx = self
1514 .read()
1515 .await
1516 .context("opening transaction to load heights")?;
1517 Heights::load(&mut tx).await.context("loading heights")?
1518 };
1519
1520 let mut res = SyncStatusQueryData {
1521 pruned_height: heights.pruned_height.map(|h| h as usize),
1522 ..Default::default()
1523 };
1524 let start = if let Some(height) = res.pruned_height {
1525 let range = SyncStatusRange {
1527 status: SyncStatus::Pruned,
1528 start: 0,
1529 end: height + 1,
1530 };
1531 res.blocks.ranges.push(range);
1532 res.leaves.ranges.push(range);
1533 res.vid_common.ranges.push(range);
1534 res.vid_shares.ranges.push(range);
1535
1536 height + 1
1537 } else {
1538 0
1539 };
1540
1541 for chunk in range_chunks(
1544 start..(heights.height as usize),
1545 self.sync_status_chunk_size,
1546 ) {
1547 tracing::debug!(chunk.start, chunk.end, "checking sync status in sub-range");
1548 let mut tx = self
1549 .read()
1550 .await
1551 .context("opening transaction to sync status range")?;
1552 let range_status = tx
1553 .sync_status_for_range(chunk.start, chunk.end)
1554 .await
1555 .context(format!("checking sync status in sub-range {chunk:?}"))?;
1556 tracing::debug!(
1557 chunk.start,
1558 chunk.end,
1559 ?range_status,
1560 "found sync status for range"
1561 );
1562
1563 res.blocks.extend(range_status.blocks);
1564 res.leaves.extend(range_status.leaves);
1565 res.vid_common.extend(range_status.vid_common);
1566 res.vid_shares.extend(range_status.vid_shares);
1567 }
1568
1569 Ok(res)
1570 }
1571}
1572
1573impl<Types, S, P> Fetcher<Types, S, P>
1574where
1575 Types: NodeType,
1576 Header<Types>: QueryableHeader<Types>,
1577 Payload<Types>: QueryablePayload<Types>,
1578 S: VersionedDataSource + 'static,
1579 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1580 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1581 + NodeStorage<Types>
1582 + PrunedHeightStorage
1583 + AggregatesStorage<Types>,
1584 P: AvailabilityProvider<Types>,
1585{
1586 #[tracing::instrument(skip_all)]
1587 async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1588 loop {
1589 let prev_aggregate = loop {
1590 let mut tx = match self.read().await {
1591 Ok(tx) => tx,
1592 Err(err) => {
1593 tracing::error!("unable to open read tx: {err:#}");
1594 sleep(Duration::from_secs(5)).await;
1595 continue;
1596 },
1597 };
1598 match tx.load_prev_aggregate().await {
1599 Ok(agg) => break agg,
1600 Err(err) => {
1601 tracing::error!("unable to load previous aggregate: {err:#}");
1602 sleep(Duration::from_secs(5)).await;
1603 continue;
1604 },
1605 }
1606 };
1607
1608 let (start, mut prev_aggregate) = match prev_aggregate {
1609 Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1610 None => (0, Aggregate::default()),
1611 };
1612
1613 tracing::info!(start, "starting aggregator");
1614 metrics.height.set(start);
1615
1616 let mut blocks = self
1617 .clone()
1618 .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1619 .then(Fetch::resolve)
1620 .ready_chunks(chunk_size)
1621 .boxed();
1622 while let Some(chunk) = blocks.next().await {
1623 let Some(last) = chunk.last() else {
1624 tracing::warn!("ready_chunks returned an empty chunk");
1626 continue;
1627 };
1628 let height = last.height();
1629 let num_blocks = chunk.len();
1630 tracing::debug!(
1631 num_blocks,
1632 height,
1633 "updating aggregate statistics for chunk"
1634 );
1635 loop {
1636 let res = async {
1637 let mut tx = self.write().await.context("opening transaction")?;
1638 let aggregate =
1639 tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1640 tx.commit().await.context("committing transaction")?;
1641 prev_aggregate = aggregate;
1642 anyhow::Result::<_>::Ok(())
1643 }
1644 .await;
1645 match res {
1646 Ok(()) => {
1647 break;
1648 },
1649 Err(err) => {
1650 tracing::warn!(
1651 num_blocks,
1652 height,
1653 "failed to update aggregates for chunk: {err:#}"
1654 );
1655 sleep(Duration::from_secs(1)).await;
1656 },
1657 }
1658 }
1659 metrics.height.set(height as usize);
1660 }
1661 tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1662 }
1663 }
1664}
1665
1666impl<Types, S, P> Fetcher<Types, S, P>
1667where
1668 Types: NodeType,
1669 S: VersionedDataSource,
1670 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1671{
1672 async fn store_and_notify<T>(&self, obj: &T)
1674 where
1675 T: Storable<Types>,
1676 {
1677 self.store(obj).await;
1678
1679 obj.notify(&self.notifiers).await;
1704 }
1705
1706 async fn store<T>(&self, obj: &T)
1707 where
1708 T: Storable<Types>,
1709 {
1710 let try_store = || async {
1711 let mut tx = self.storage.write().await?;
1712 obj.clone().store(&mut tx, self.leaf_only).await?;
1713 tx.commit().await
1714 };
1715
1716 let mut backoff = self.backoff.clone();
1718 backoff.reset();
1719 loop {
1720 let Err(err) = try_store().await else {
1721 break;
1722 };
1723 tracing::warn!(
1727 "failed to store fetched {} {}: {err:#}",
1728 T::name(),
1729 obj.height()
1730 );
1731
1732 let Some(delay) = backoff.next_backoff() else {
1733 break;
1734 };
1735 tracing::info!(?delay, "retrying failed operation");
1736 sleep(delay).await;
1737 }
1738 }
1739}
1740
1741#[derive(Debug)]
1742struct Notifiers<Types>
1743where
1744 Types: NodeType,
1745{
1746 block: Notifier<BlockQueryData<Types>>,
1747 leaf: Notifier<LeafQueryData<Types>>,
1748 vid_common: Notifier<VidCommonQueryData<Types>>,
1749}
1750
1751impl<Types> Default for Notifiers<Types>
1752where
1753 Types: NodeType,
1754{
1755 fn default() -> Self {
1756 Self {
1757 block: Notifier::new(),
1758 leaf: Notifier::new(),
1759 vid_common: Notifier::new(),
1760 }
1761 }
1762}
1763
1764#[derive(Clone, Copy, Debug)]
1765struct Heights {
1766 height: u64,
1767 pruned_height: Option<u64>,
1768}
1769
1770impl Heights {
1771 async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1772 where
1773 Types: NodeType,
1774 Header<Types>: QueryableHeader<Types>,
1775 T: NodeStorage<Types> + PrunedHeightStorage + Send,
1776 {
1777 let height = tx.block_height().await.context("loading block height")? as u64;
1778 let pruned_height = tx
1779 .load_pruned_height()
1780 .await
1781 .context("loading pruned height")?;
1782 Ok(Self {
1783 height,
1784 pruned_height,
1785 })
1786 }
1787
1788 fn might_exist(self, h: u64) -> bool {
1789 h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1790 }
1791}
1792
1793#[async_trait]
1794impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1795 for FetchingDataSource<Types, S, P>
1796where
1797 Types: NodeType,
1798 S: VersionedDataSource + 'static,
1799 for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1800 P: Send + Sync,
1801 State: MerklizedState<Types, ARITY> + 'static,
1802 <State as MerkleTreeScheme>::Commitment: Send,
1803{
1804 async fn get_path(
1805 &self,
1806 snapshot: Snapshot<Types, State, ARITY>,
1807 key: State::Key,
1808 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1809 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1810 message: err.to_string(),
1811 })?;
1812 tx.get_path(snapshot, key).await
1813 }
1814}
1815
1816#[async_trait]
1817impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1818where
1819 Types: NodeType,
1820 Header<Types>: QueryableHeader<Types>,
1821 Payload<Types>: QueryablePayload<Types>,
1822 S: VersionedDataSource + 'static,
1823 for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1824 P: Send + Sync,
1825{
1826 async fn get_last_state_height(&self) -> QueryResult<usize> {
1827 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1828 message: err.to_string(),
1829 })?;
1830 tx.get_last_state_height().await
1831 }
1832}
1833
1834#[async_trait]
1835impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1836where
1837 Types: NodeType,
1838 Header<Types>: QueryableHeader<Types>,
1839 S: VersionedDataSource + 'static,
1840 for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1841 P: Send + Sync,
1842{
1843 async fn block_height(&self) -> QueryResult<usize> {
1844 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1845 message: err.to_string(),
1846 })?;
1847 tx.block_height().await
1848 }
1849
1850 async fn count_transactions_in_range(
1851 &self,
1852 range: impl RangeBounds<usize> + Send,
1853 namespace: Option<NamespaceId<Types>>,
1854 ) -> QueryResult<usize> {
1855 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1856 message: err.to_string(),
1857 })?;
1858 tx.count_transactions_in_range(range, namespace).await
1859 }
1860
1861 async fn payload_size_in_range(
1862 &self,
1863 range: impl RangeBounds<usize> + Send,
1864 namespace: Option<NamespaceId<Types>>,
1865 ) -> QueryResult<usize> {
1866 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1867 message: err.to_string(),
1868 })?;
1869 tx.payload_size_in_range(range, namespace).await
1870 }
1871
1872 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1873 where
1874 ID: Into<BlockId<Types>> + Send + Sync,
1875 {
1876 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1877 message: err.to_string(),
1878 })?;
1879 tx.vid_share(id).await
1880 }
1881
1882 async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
1883 self.fetcher
1884 .sync_status()
1885 .await
1886 .map_err(|err| QueryError::Error {
1887 message: format!("{err:#}"),
1888 })
1889 }
1890
1891 async fn get_header_window(
1892 &self,
1893 start: impl Into<WindowStart<Types>> + Send + Sync,
1894 end: u64,
1895 limit: usize,
1896 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
1897 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1898 message: err.to_string(),
1899 })?;
1900 tx.get_header_window(start, end, limit).await
1901 }
1902}
1903
1904#[async_trait]
1905impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
1906where
1907 Types: NodeType,
1908 Payload<Types>: QueryablePayload<Types>,
1909 Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
1910 crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
1911 S: VersionedDataSource + 'static,
1912 for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
1913 P: Send + Sync,
1914{
1915 async fn get_block_summaries(
1916 &self,
1917 request: explorer::query_data::GetBlockSummariesRequest<Types>,
1918 ) -> Result<
1919 Vec<explorer::query_data::BlockSummary<Types>>,
1920 explorer::query_data::GetBlockSummariesError,
1921 > {
1922 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1923 message: err.to_string(),
1924 })?;
1925 tx.get_block_summaries(request).await
1926 }
1927
1928 async fn get_block_detail(
1929 &self,
1930 request: explorer::query_data::BlockIdentifier<Types>,
1931 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
1932 {
1933 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1934 message: err.to_string(),
1935 })?;
1936 tx.get_block_detail(request).await
1937 }
1938
1939 async fn get_transaction_summaries(
1940 &self,
1941 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
1942 ) -> Result<
1943 Vec<explorer::query_data::TransactionSummary<Types>>,
1944 explorer::query_data::GetTransactionSummariesError,
1945 > {
1946 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1947 message: err.to_string(),
1948 })?;
1949 tx.get_transaction_summaries(request).await
1950 }
1951
1952 async fn get_transaction_detail(
1953 &self,
1954 request: explorer::query_data::TransactionIdentifier<Types>,
1955 ) -> Result<
1956 explorer::query_data::TransactionDetailResponse<Types>,
1957 explorer::query_data::GetTransactionDetailError,
1958 > {
1959 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1960 message: err.to_string(),
1961 })?;
1962 tx.get_transaction_detail(request).await
1963 }
1964
1965 async fn get_explorer_summary(
1966 &self,
1967 ) -> Result<
1968 explorer::query_data::ExplorerSummary<Types>,
1969 explorer::query_data::GetExplorerSummaryError,
1970 > {
1971 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1972 message: err.to_string(),
1973 })?;
1974 tx.get_explorer_summary().await
1975 }
1976
1977 async fn get_search_results(
1978 &self,
1979 query: TaggedBase64,
1980 ) -> Result<
1981 explorer::query_data::SearchResult<Types>,
1982 explorer::query_data::GetSearchResultsError,
1983 > {
1984 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1985 message: err.to_string(),
1986 })?;
1987 tx.get_search_results(query).await
1988 }
1989}
1990
1991pub trait AvailabilityProvider<Types: NodeType>:
1993 Provider<Types, request::LeafRequest<Types>>
1994 + Provider<Types, request::PayloadRequest>
1995 + Provider<Types, request::VidCommonRequest>
1996 + Sync
1997 + 'static
1998{
1999}
2000impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2001 P: Provider<Types, request::LeafRequest<Types>>
2002 + Provider<Types, request::PayloadRequest>
2003 + Provider<Types, request::VidCommonRequest>
2004 + Sync
2005 + 'static
2006{
2007}
2008
2009trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2010 fn might_exist(self, _heights: Heights) -> bool {
2021 true
2022 }
2023}
2024
2025#[async_trait]
2031trait Fetchable<Types>: Clone + Send + Sync + 'static
2032where
2033 Types: NodeType,
2034 Header<Types>: QueryableHeader<Types>,
2035 Payload<Types>: QueryablePayload<Types>,
2036{
2037 type Request: FetchRequest;
2039
2040 fn satisfies(&self, req: Self::Request) -> bool;
2042
2043 async fn active_fetch<S, P>(
2062 tx: &mut impl AvailabilityStorage<Types>,
2063 fetcher: Arc<Fetcher<Types, S, P>>,
2064 req: Self::Request,
2065 ) -> anyhow::Result<()>
2066 where
2067 S: VersionedDataSource + 'static,
2068 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2069 for<'a> S::ReadOnly<'a>:
2070 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2071 P: AvailabilityProvider<Types>;
2072
2073 async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2075
2076 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2081 where
2082 S: AvailabilityStorage<Types>;
2083}
2084
2085type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2086
2087#[async_trait]
2088trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2089where
2090 Types: NodeType,
2091 Header<Types>: QueryableHeader<Types>,
2092 Payload<Types>: QueryablePayload<Types>,
2093{
2094 type RangedRequest: FetchRequest + From<usize> + Send;
2095
2096 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2098 where
2099 S: AvailabilityStorage<Types>,
2100 R: RangeBounds<usize> + Send + 'static;
2101}
2102
2103trait Storable<Types: NodeType>: HeightIndexed + Clone {
2105 fn name() -> &'static str;
2107
2108 fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2110
2111 fn store(
2113 self,
2114 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2115 leaf_only: bool,
2116 ) -> impl Send + Future<Output = anyhow::Result<()>>;
2117}
2118
2119impl<Types: NodeType> HeightIndexed
2120 for (LeafQueryData<Types>, Option<[CertificatePair<Types>; 2]>)
2121{
2122 fn height(&self) -> u64 {
2123 self.0.height()
2124 }
2125}
2126
2127impl<Types: NodeType> Storable<Types>
2128 for (LeafQueryData<Types>, Option<[CertificatePair<Types>; 2]>)
2129{
2130 fn name() -> &'static str {
2131 "leaf with QC chain"
2132 }
2133
2134 async fn notify(&self, notifiers: &Notifiers<Types>) {
2135 self.0.notify(notifiers).await;
2136 }
2137
2138 async fn store(
2139 self,
2140 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2141 _leaf_only: bool,
2142 ) -> anyhow::Result<()> {
2143 storage.insert_leaf_with_qc_chain(self.0, self.1).await
2144 }
2145}
2146
2147fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2149where
2150 R: RangeBounds<usize>,
2151{
2152 let mut start = match range.start_bound() {
2154 Bound::Included(i) => *i,
2155 Bound::Excluded(i) => *i + 1,
2156 Bound::Unbounded => 0,
2157 };
2158 let end = match range.end_bound() {
2159 Bound::Included(i) => *i + 1,
2160 Bound::Excluded(i) => *i,
2161 Bound::Unbounded => usize::MAX,
2162 };
2163 std::iter::from_fn(move || {
2164 let chunk_end = min(start + chunk_size, end);
2165 if chunk_end == start {
2166 return None;
2167 }
2168
2169 let chunk = start..chunk_end;
2170 start = chunk_end;
2171 Some(chunk)
2172 })
2173}
2174
2175fn range_chunks_rev(
2186 start: Bound<usize>,
2187 end: usize,
2188 chunk_size: usize,
2189) -> impl Iterator<Item = Range<usize>> {
2190 let start = match start {
2192 Bound::Included(i) => i,
2193 Bound::Excluded(i) => i + 1,
2194 Bound::Unbounded => 0,
2195 };
2196 let mut end = end + 1;
2198
2199 std::iter::from_fn(move || {
2200 let chunk_start = max(start, end.saturating_sub(chunk_size));
2201 if end <= chunk_start {
2202 return None;
2203 }
2204
2205 let chunk = chunk_start..end;
2206 end = chunk_start;
2207 Some(chunk)
2208 })
2209}
2210
2211trait ResultExt<T, E> {
2212 fn ok_or_trace(self) -> Option<T>
2213 where
2214 E: Display;
2215}
2216
2217impl<T, E> ResultExt<T, E> for Result<T, E> {
2218 fn ok_or_trace(self) -> Option<T>
2219 where
2220 E: Display,
2221 {
2222 match self {
2223 Ok(t) => Some(t),
2224 Err(err) => {
2225 tracing::info!(
2226 "error loading resource from local storage, will try to fetch: {err:#}"
2227 );
2228 None
2229 },
2230 }
2231 }
2232}
2233
2234#[derive(Debug)]
2235struct ScannerMetrics {
2236 running: Box<dyn Gauge>,
2238 current_scan: Box<dyn Gauge>,
2240 scanned_blocks: Box<dyn Gauge>,
2242 scanned_vid: Box<dyn Gauge>,
2244 missing_blocks: Box<dyn Gauge>,
2246 missing_vid: Box<dyn Gauge>,
2248}
2249
2250impl ScannerMetrics {
2251 fn new(metrics: &PrometheusMetrics) -> Self {
2252 let group = metrics.subgroup("scanner".into());
2253 Self {
2254 running: group.create_gauge("running".into(), None),
2255 current_scan: group.create_gauge("current".into(), None),
2256 scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2257 scanned_vid: group.create_gauge("scanned_vid".into(), None),
2258 missing_blocks: group.create_gauge("missing_blocks".into(), None),
2259 missing_vid: group.create_gauge("missing_vid".into(), None),
2260 }
2261 }
2262}
2263
2264#[derive(Debug)]
2265struct AggregatorMetrics {
2266 height: Box<dyn Gauge>,
2268}
2269
2270impl AggregatorMetrics {
2271 fn new(metrics: &PrometheusMetrics) -> Self {
2272 let group = metrics.subgroup("aggregator".into());
2273 Self {
2274 height: group.create_gauge("height".into(), None),
2275 }
2276 }
2277}
2278
2279fn passive<T>(
2283 req: impl Debug + Send + 'static,
2284 fut: impl Future<Output = Option<T>> + Send + 'static,
2285) -> Fetch<T>
2286where
2287 T: Send + 'static,
2288{
2289 Fetch::Pending(
2290 fut.then(move |opt| async move {
2291 match opt {
2292 Some(t) => t,
2293 None => {
2294 panic!("notifier dropped without satisfying request {req:?}");
2309 },
2310 }
2311 })
2312 .boxed(),
2313 )
2314}
2315
2316async fn select_some<T>(
2318 a: impl Future<Output = Option<T>> + Unpin,
2319 b: impl Future<Output = Option<T>> + Unpin,
2320) -> Option<T> {
2321 match future::select(a, b).await {
2322 Either::Left((Some(a), _)) => Some(a),
2324 Either::Right((Some(b), _)) => Some(b),
2325
2326 Either::Left((None, b)) => b.await,
2328 Either::Right((None, a)) => a.await,
2329 }
2330}
2331
2332#[cfg(test)]
2333mod test {
2334 use hotshot_example_types::node_types::TEST_VERSIONS;
2335
2336 use super::*;
2337 use crate::{
2338 data_source::{
2339 sql::testing::TmpDb,
2340 storage::{SqlStorage, StorageConnectionType},
2341 },
2342 fetching::provider::NoFetching,
2343 testing::{consensus::MockSqlDataSource, mocks::MockTypes},
2344 };
2345
2346 #[test]
2347 fn test_range_chunks() {
2348 assert_eq!(
2350 range_chunks(0..=4, 2).collect::<Vec<_>>(),
2351 [0..2, 2..4, 4..5]
2352 );
2353
2354 assert_eq!(
2356 range_chunks(0..=5, 2).collect::<Vec<_>>(),
2357 [0..2, 2..4, 4..6]
2358 );
2359
2360 assert_eq!(
2362 range_chunks(0..5, 2).collect::<Vec<_>>(),
2363 [0..2, 2..4, 4..5]
2364 );
2365
2366 assert_eq!(
2368 range_chunks(0..6, 2).collect::<Vec<_>>(),
2369 [0..2, 2..4, 4..6]
2370 );
2371
2372 assert_eq!(
2374 range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2375 [0..2, 2..4, 4..6, 6..8, 8..10]
2376 );
2377 }
2378
2379 #[test]
2380 fn test_range_chunks_rev() {
2381 assert_eq!(
2383 range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2384 [3..5, 1..3, 0..1]
2385 );
2386
2387 assert_eq!(
2389 range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2390 [4..6, 2..4, 0..2]
2391 );
2392
2393 assert_eq!(
2395 range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2396 [4..6, 2..4, 1..2]
2397 );
2398
2399 assert_eq!(
2401 range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2402 [3..5, 1..3]
2403 );
2404 }
2405
2406 async fn test_sync_status(chunk_size: usize, present_ranges: &[(usize, usize)]) {
2407 let block_height = present_ranges.last().unwrap().1;
2408 let storage = TmpDb::init().await;
2409 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2410 .await
2411 .unwrap();
2412 let ds = MockSqlDataSource::builder(db, NoFetching)
2413 .with_sync_status_chunk_size(chunk_size)
2414 .build()
2415 .await
2416 .unwrap();
2417
2418 let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
2420 LeafQueryData::<MockTypes>::genesis(
2421 &Default::default(),
2422 &Default::default(),
2423 TEST_VERSIONS.test,
2424 )
2425 .await,
2426 ];
2427 for i in 1..block_height {
2428 let mut leaf = leaves[i - 1].clone();
2429 leaf.leaf.block_header_mut().block_number = i as u64;
2430 leaves.push(leaf);
2431 }
2432
2433 {
2435 let mut tx = ds.write().await.unwrap();
2436
2437 for &(start, end) in present_ranges {
2438 for leaf in leaves[start..end].iter() {
2439 tracing::info!(height = leaf.height(), "insert leaf");
2440 tx.insert_leaf(leaf.clone()).await.unwrap();
2441 }
2442 }
2443
2444 if present_ranges[0].0 > 0 {
2445 tx.save_pruned_height((present_ranges[0].0 - 1) as u64)
2446 .await
2447 .unwrap();
2448 }
2449
2450 tx.commit().await.unwrap();
2451 }
2452
2453 let sync_status = ds.sync_status().await.unwrap().leaves;
2454
2455 let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
2457 assert_eq!(
2458 sync_status.missing,
2459 block_height - present - present_ranges[0].0
2460 );
2461
2462 let mut ranges = sync_status.ranges.into_iter();
2464 let mut prev = 0;
2465 for &(start, end) in present_ranges {
2466 if start != prev {
2467 let range = ranges.next().unwrap();
2468 assert_eq!(
2469 range,
2470 SyncStatusRange {
2471 start: prev,
2472 end: start,
2473 status: if prev == 0 {
2474 SyncStatus::Pruned
2475 } else {
2476 SyncStatus::Missing
2477 },
2478 }
2479 );
2480 }
2481 let range = ranges.next().unwrap();
2482 assert_eq!(
2483 range,
2484 SyncStatusRange {
2485 start,
2486 end,
2487 status: SyncStatus::Present,
2488 }
2489 );
2490 prev = end;
2491 }
2492
2493 if prev != block_height {
2494 let range = ranges.next().unwrap();
2495 assert_eq!(
2496 range,
2497 SyncStatusRange {
2498 start: prev,
2499 end: block_height,
2500 status: SyncStatus::Missing,
2501 }
2502 );
2503 }
2504
2505 assert_eq!(ranges.next(), None);
2506 }
2507
2508 #[tokio::test]
2509 #[test_log::test]
2510 async fn test_sync_status_multiple_chunks() {
2511 test_sync_status(10, &[(0, 1), (3, 5), (8, 10)]).await;
2512 }
2513
2514 #[tokio::test]
2515 #[test_log::test]
2516 async fn test_sync_status_multiple_chunks_present_range_overlapping_chunk() {
2517 test_sync_status(5, &[(1, 4)]).await;
2518 }
2519
2520 #[tokio::test]
2521 #[test_log::test]
2522 async fn test_sync_status_multiple_chunks_missing_range_overlapping_chunk() {
2523 test_sync_status(5, &[(0, 1), (4, 5)]).await;
2524 }
2525}