1use std::{
77 cmp::{max, min},
78 fmt::{Debug, Display},
79 iter::repeat_with,
80 marker::PhantomData,
81 ops::{Bound, Range, RangeBounds},
82 sync::Arc,
83 time::Duration,
84};
85
86use anyhow::{bail, Context};
87use async_lock::Semaphore;
88use async_trait::async_trait;
89use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder};
90use derivative::Derivative;
91use futures::{
92 channel::oneshot,
93 future::{self, join_all, BoxFuture, Either, Future, FutureExt},
94 stream::{self, BoxStream, StreamExt},
95};
96use hotshot_types::{
97 data::VidShare,
98 traits::{
99 metrics::{Gauge, Metrics},
100 node_implementation::NodeType,
101 },
102};
103use jf_merkle_tree_compat::{prelude::MerkleProof, MerkleTreeScheme};
104use tagged_base64::TaggedBase64;
105use tokio::{spawn, time::sleep};
106use tracing::Instrument;
107
108use super::{
109 notifier::Notifier,
110 storage::{
111 pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
112 sql::MigrateTypes,
113 Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
114 MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
115 UpdateAvailabilityStorage,
116 },
117 Transaction, VersionedDataSource,
118};
119use crate::{
120 availability::{
121 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
122 FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
123 PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
124 UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
125 },
126 explorer::{self, ExplorerDataSource},
127 fetching::{self, request, Provider},
128 merklized_state::{
129 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
130 },
131 metrics::PrometheusMetrics,
132 node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
133 status::{HasMetrics, StatusDataSource},
134 task::BackgroundTask,
135 types::HeightIndexed,
136 Header, Payload, QueryError, QueryResult,
137};
138
139mod block;
140mod header;
141mod leaf;
142mod transaction;
143mod vid;
144
145use self::{
146 block::PayloadFetcher,
147 leaf::LeafFetcher,
148 transaction::TransactionRequest,
149 vid::{VidCommonFetcher, VidCommonRequest},
150};
151
152pub struct Builder<Types, S, P> {
154 storage: S,
155 provider: P,
156 backoff: ExponentialBackoffBuilder,
157 rate_limit: usize,
158 range_chunk_size: usize,
159 minor_scan_interval: Duration,
160 major_scan_interval: usize,
161 major_scan_offset: usize,
162 proactive_range_chunk_size: Option<usize>,
163 active_fetch_delay: Duration,
164 chunk_fetch_delay: Duration,
165 proactive_fetching: bool,
166 aggregator: bool,
167 aggregator_chunk_size: Option<usize>,
168 types_migration_batch_size: u64,
169 leaf_only: bool,
170 _types: PhantomData<Types>,
171}
172
173impl<Types, S, P> Builder<Types, S, P> {
174 pub fn new(storage: S, provider: P) -> Self {
176 let mut default_backoff = ExponentialBackoffBuilder::default();
177 default_backoff
178 .with_initial_interval(Duration::from_secs(1))
179 .with_multiplier(2.)
180 .with_max_interval(Duration::from_secs(32))
181 .with_max_elapsed_time(Some(Duration::from_secs(64)));
182
183 Self {
184 storage,
185 provider,
186 backoff: default_backoff,
187 rate_limit: 32,
188 range_chunk_size: 25,
189 minor_scan_interval: Duration::from_secs(60),
193 major_scan_interval: 60,
197 major_scan_offset: 0,
200 proactive_range_chunk_size: None,
201 active_fetch_delay: Duration::from_millis(50),
202 chunk_fetch_delay: Duration::from_millis(100),
203 proactive_fetching: true,
204 aggregator: true,
205 aggregator_chunk_size: None,
206 types_migration_batch_size: 10000,
207 leaf_only: false,
208 _types: Default::default(),
209 }
210 }
211
212 pub fn leaf_only(mut self) -> Self {
213 self.leaf_only = true;
214 self
215 }
216
217 pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
219 self.backoff.with_initial_interval(interval);
220 self
221 }
222
223 pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
225 self.backoff.with_max_interval(interval);
226 self
227 }
228
229 pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
231 self.backoff.with_multiplier(multiplier);
232 self
233 }
234
235 pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
237 self.backoff.with_randomization_factor(factor);
238 self
239 }
240
241 pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
243 self.backoff.with_max_elapsed_time(Some(timeout));
244 self
245 }
246
247 pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
249 self.rate_limit = with_rate_limit;
250 self
251 }
252
253 pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
260 self.range_chunk_size = range_chunk_size;
261 self
262 }
263
264 pub fn with_minor_scan_interval(mut self, interval: Duration) -> Self {
268 self.minor_scan_interval = interval;
269 self
270 }
271
272 pub fn with_major_scan_interval(mut self, interval: usize) -> Self {
277 self.major_scan_interval = interval;
278 self
279 }
280
281 pub fn with_major_scan_offset(mut self, offset: usize) -> Self {
291 self.major_scan_offset = offset;
292 self
293 }
294
295 pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
304 self.proactive_range_chunk_size = Some(range_chunk_size);
305 self
306 }
307
308 pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
314 self.active_fetch_delay = active_fetch_delay;
315 self
316 }
317
318 pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
330 self.chunk_fetch_delay = chunk_fetch_delay;
331 self
332 }
333
334 pub fn disable_proactive_fetching(mut self) -> Self {
343 self.proactive_fetching = false;
344 self
345 }
346
347 pub fn disable_aggregator(mut self) -> Self {
352 self.aggregator = false;
353 self
354 }
355
356 pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
365 self.aggregator_chunk_size = Some(chunk_size);
366 self
367 }
368
369 pub fn with_types_migration_batch_size(mut self, batch: u64) -> Self {
373 self.types_migration_batch_size = batch;
374 self
375 }
376
377 pub fn is_leaf_only(&self) -> bool {
378 self.leaf_only
379 }
380}
381
382impl<Types, S, P> Builder<Types, S, P>
383where
384 Types: NodeType,
385 Payload<Types>: QueryablePayload<Types>,
386 Header<Types>: QueryableHeader<Types>,
387 S: PruneStorage + VersionedDataSource + HasMetrics + MigrateTypes<Types> + 'static,
388 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
389 + PrunedHeightStorage
390 + NodeStorage<Types>
391 + AggregatesStorage<Types>,
392 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
393 P: AvailabilityProvider<Types>,
394{
395 pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
397 FetchingDataSource::new(self).await
398 }
399}
400
401#[derive(Derivative)]
418#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
419pub struct FetchingDataSource<Types, S, P>
420where
421 Types: NodeType,
422{
423 fetcher: Arc<Fetcher<Types, S, P>>,
428 scanner: Option<BackgroundTask>,
430 aggregator: Option<BackgroundTask>,
432 pruner: Pruner<Types, S>,
433}
434
435#[derive(Derivative)]
436#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, "))]
437pub struct Pruner<Types, S>
438where
439 Types: NodeType,
440{
441 handle: Option<BackgroundTask>,
442 _types: PhantomData<(Types, S)>,
443}
444
445impl<Types, S> Pruner<Types, S>
446where
447 Types: NodeType,
448 Header<Types>: QueryableHeader<Types>,
449 Payload<Types>: QueryablePayload<Types>,
450 S: PruneStorage + Send + Sync + 'static,
451{
452 async fn new(storage: Arc<S>) -> Self {
453 let cfg = storage.get_pruning_config();
454 let Some(cfg) = cfg else {
455 return Self {
456 handle: None,
457 _types: Default::default(),
458 };
459 };
460
461 let future = async move {
462 for i in 1.. {
463 tracing::warn!("starting pruner run {i} ");
464 Self::prune(storage.clone()).await;
465 sleep(cfg.interval()).await;
466 }
467 };
468
469 let task = BackgroundTask::spawn("pruner", future);
470
471 Self {
472 handle: Some(task),
473 _types: Default::default(),
474 }
475 }
476
477 async fn prune(storage: Arc<S>) {
478 let mut pruner = S::Pruner::default();
480 loop {
481 match storage.prune(&mut pruner).await {
482 Ok(Some(height)) => {
483 tracing::warn!("Pruned to height {height}");
484 },
485 Ok(None) => {
486 tracing::warn!("pruner run complete.");
487 break;
488 },
489 Err(e) => {
490 tracing::error!("pruner run failed: {e:?}");
491 break;
492 },
493 }
494 }
495 }
496}
497
498impl<Types, S, P> FetchingDataSource<Types, S, P>
499where
500 Types: NodeType,
501 Payload<Types>: QueryablePayload<Types>,
502 Header<Types>: QueryableHeader<Types>,
503 S: VersionedDataSource + PruneStorage + HasMetrics + MigrateTypes<Types> + 'static,
504 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
505 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
506 + NodeStorage<Types>
507 + PrunedHeightStorage
508 + AggregatesStorage<Types>,
509 P: AvailabilityProvider<Types>,
510{
511 pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
513 Builder::new(storage, provider)
514 }
515
516 async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
517 let leaf_only = builder.is_leaf_only();
518 let aggregator = builder.aggregator;
519 let aggregator_chunk_size = builder
520 .aggregator_chunk_size
521 .unwrap_or(builder.range_chunk_size);
522 let proactive_fetching = builder.proactive_fetching;
523 let minor_interval = builder.minor_scan_interval;
524 let major_interval = builder.major_scan_interval;
525 let major_offset = builder.major_scan_offset;
526 let proactive_range_chunk_size = builder
527 .proactive_range_chunk_size
528 .unwrap_or(builder.range_chunk_size);
529 let migration_batch_size = builder.types_migration_batch_size;
530 let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
531 let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
532
533 let fetcher = Arc::new(Fetcher::new(builder).await?);
534
535 fetcher.storage.migrate_types(migration_batch_size).await?;
540
541 let scanner = if proactive_fetching && !leaf_only {
542 Some(BackgroundTask::spawn(
543 "proactive scanner",
544 fetcher.clone().proactive_scan(
545 minor_interval,
546 major_interval,
547 major_offset,
548 proactive_range_chunk_size,
549 scanner_metrics,
550 ),
551 ))
552 } else {
553 None
554 };
555
556 let aggregator = if aggregator && !leaf_only {
557 Some(BackgroundTask::spawn(
558 "aggregator",
559 fetcher
560 .clone()
561 .aggregate(aggregator_chunk_size, aggregator_metrics),
562 ))
563 } else {
564 None
565 };
566
567 let storage = fetcher.storage.clone();
568
569 let pruner = Pruner::new(storage).await;
570 let ds = Self {
571 fetcher,
572 scanner,
573 pruner,
574 aggregator,
575 };
576
577 Ok(ds)
578 }
579
580 pub fn inner(&self) -> Arc<S> {
582 self.fetcher.storage.clone()
583 }
584}
585
586impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
587where
588 Types: NodeType,
589{
590 fn as_ref(&self) -> &S {
591 &self.fetcher.storage
592 }
593}
594
595impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
596where
597 Types: NodeType,
598 S: HasMetrics,
599{
600 fn metrics(&self) -> &PrometheusMetrics {
601 self.as_ref().metrics()
602 }
603}
604
605#[async_trait]
606impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
607where
608 Types: NodeType,
609 Header<Types>: QueryableHeader<Types>,
610 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
611 for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
612 P: Send + Sync,
613{
614 async fn block_height(&self) -> QueryResult<usize> {
615 let mut tx = self.read().await.map_err(|err| QueryError::Error {
616 message: err.to_string(),
617 })?;
618 tx.block_height().await
619 }
620}
621
622#[async_trait]
623impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
624where
625 Types: NodeType,
626 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
627 for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
628 P: Send + Sync,
629{
630 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
631 let mut tx = self.read().await?;
632 tx.load_pruned_height().await
633 }
634}
635
636#[async_trait]
637impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
638where
639 Types: NodeType,
640 Header<Types>: QueryableHeader<Types>,
641 Payload<Types>: QueryablePayload<Types>,
642 S: VersionedDataSource + 'static,
643 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
644 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
645 P: AvailabilityProvider<Types>,
646{
647 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
648 where
649 ID: Into<LeafId<Types>> + Send + Sync,
650 {
651 self.fetcher.get(id.into()).await
652 }
653
654 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
655 where
656 ID: Into<BlockId<Types>> + Send + Sync,
657 {
658 self.fetcher
659 .get::<HeaderQueryData<_>>(id.into())
660 .await
661 .map(|h| h.header)
662 }
663
664 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
665 where
666 ID: Into<BlockId<Types>> + Send + Sync,
667 {
668 self.fetcher.get(id.into()).await
669 }
670
671 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
672 where
673 ID: Into<BlockId<Types>> + Send + Sync,
674 {
675 self.fetcher.get(id.into()).await
676 }
677
678 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
679 where
680 ID: Into<BlockId<Types>> + Send + Sync,
681 {
682 self.fetcher.get(id.into()).await
683 }
684
685 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
686 where
687 ID: Into<BlockId<Types>> + Send + Sync,
688 {
689 self.fetcher.get(VidCommonRequest::from(id.into())).await
690 }
691
692 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
693 where
694 ID: Into<BlockId<Types>> + Send + Sync,
695 {
696 self.fetcher.get(VidCommonRequest::from(id.into())).await
697 }
698
699 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
700 where
701 R: RangeBounds<usize> + Send + 'static,
702 {
703 self.fetcher.clone().get_range(range)
704 }
705
706 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
707 where
708 R: RangeBounds<usize> + Send + 'static,
709 {
710 self.fetcher.clone().get_range(range)
711 }
712
713 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
714 where
715 R: RangeBounds<usize> + Send + 'static,
716 {
717 let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
718
719 leaves
720 .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
721 .boxed()
722 }
723
724 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
725 where
726 R: RangeBounds<usize> + Send + 'static,
727 {
728 self.fetcher.clone().get_range(range)
729 }
730
731 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
732 where
733 R: RangeBounds<usize> + Send + 'static,
734 {
735 self.fetcher.clone().get_range(range)
736 }
737
738 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
739 where
740 R: RangeBounds<usize> + Send + 'static,
741 {
742 self.fetcher.clone().get_range(range)
743 }
744
745 async fn get_vid_common_metadata_range<R>(
746 &self,
747 range: R,
748 ) -> FetchStream<VidCommonMetadata<Types>>
749 where
750 R: RangeBounds<usize> + Send + 'static,
751 {
752 self.fetcher.clone().get_range(range)
753 }
754
755 async fn get_leaf_range_rev(
756 &self,
757 start: Bound<usize>,
758 end: usize,
759 ) -> FetchStream<LeafQueryData<Types>> {
760 self.fetcher.clone().get_range_rev(start, end)
761 }
762
763 async fn get_block_range_rev(
764 &self,
765 start: Bound<usize>,
766 end: usize,
767 ) -> FetchStream<BlockQueryData<Types>> {
768 self.fetcher.clone().get_range_rev(start, end)
769 }
770
771 async fn get_payload_range_rev(
772 &self,
773 start: Bound<usize>,
774 end: usize,
775 ) -> FetchStream<PayloadQueryData<Types>> {
776 self.fetcher.clone().get_range_rev(start, end)
777 }
778
779 async fn get_payload_metadata_range_rev(
780 &self,
781 start: Bound<usize>,
782 end: usize,
783 ) -> FetchStream<PayloadMetadata<Types>> {
784 self.fetcher.clone().get_range_rev(start, end)
785 }
786
787 async fn get_vid_common_range_rev(
788 &self,
789 start: Bound<usize>,
790 end: usize,
791 ) -> FetchStream<VidCommonQueryData<Types>> {
792 self.fetcher.clone().get_range_rev(start, end)
793 }
794
795 async fn get_vid_common_metadata_range_rev(
796 &self,
797 start: Bound<usize>,
798 end: usize,
799 ) -> FetchStream<VidCommonMetadata<Types>> {
800 self.fetcher.clone().get_range_rev(start, end)
801 }
802
803 async fn get_block_containing_transaction(
804 &self,
805 h: TransactionHash<Types>,
806 ) -> Fetch<BlockWithTransaction<Types>> {
807 self.fetcher.clone().get(TransactionRequest::from(h)).await
808 }
809}
810
811impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
812where
813 Types: NodeType,
814 Header<Types>: QueryableHeader<Types>,
815 Payload<Types>: QueryablePayload<Types>,
816 S: VersionedDataSource + 'static,
817 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
818 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
819 P: AvailabilityProvider<Types>,
820{
821 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
822 let height = info.height() as usize;
823 let fetch_block = info.block.is_none();
824 let fetch_vid = info.vid_common.is_none();
825
826 leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
828
829 self.fetcher.store_and_notify(info).await;
830
831 if fetch_block || fetch_vid {
832 let fetcher = self.fetcher.clone();
835 let span = tracing::info_span!("fetch missing data", height);
836 spawn(
837 async move {
838 tracing::info!(fetch_block, fetch_vid, "fetching missing data");
839 if fetch_block {
840 fetcher.get::<PayloadMetadata<Types>>(height).await;
841 }
842 if fetch_vid {
843 fetcher.get::<VidCommonMetadata<Types>>(height).await;
844 }
845 }
846 .instrument(span),
847 );
848 }
849 Ok(())
850 }
851}
852
853impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
854where
855 Types: NodeType,
856 S: VersionedDataSource + Send + Sync,
857 P: Send + Sync,
858{
859 type Transaction<'a>
860 = S::Transaction<'a>
861 where
862 Self: 'a;
863 type ReadOnly<'a>
864 = S::ReadOnly<'a>
865 where
866 Self: 'a;
867
868 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
869 self.fetcher.write().await
870 }
871
872 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
873 self.fetcher.read().await
874 }
875}
876
877#[derive(Debug)]
879struct Fetcher<Types, S, P>
880where
881 Types: NodeType,
882{
883 storage: Arc<S>,
884 notifiers: Notifiers<Types>,
885 provider: Arc<P>,
886 leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
887 payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
888 vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
889 range_chunk_size: usize,
890 active_fetch_delay: Duration,
892 chunk_fetch_delay: Duration,
894 backoff: ExponentialBackoff,
896 retry_semaphore: Arc<Semaphore>,
899 leaf_only: bool,
900}
901
902impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
903where
904 Types: NodeType,
905 S: VersionedDataSource + Send + Sync,
906 P: Send + Sync,
907{
908 type Transaction<'a>
909 = S::Transaction<'a>
910 where
911 Self: 'a;
912 type ReadOnly<'a>
913 = S::ReadOnly<'a>
914 where
915 Self: 'a;
916
917 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
918 self.storage.write().await
919 }
920
921 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
922 self.storage.read().await
923 }
924}
925
926impl<Types, S, P> Fetcher<Types, S, P>
927where
928 Types: NodeType,
929 Header<Types>: QueryableHeader<Types>,
930 S: VersionedDataSource + Sync,
931 for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
932{
933 pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
934 let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
935 let backoff = builder.backoff.build();
936
937 let (payload_fetcher, vid_fetcher) = if builder.is_leaf_only() {
938 (None, None)
939 } else {
940 (
941 Some(Arc::new(fetching::Fetcher::new(
942 retry_semaphore.clone(),
943 backoff.clone(),
944 ))),
945 Some(Arc::new(fetching::Fetcher::new(
946 retry_semaphore.clone(),
947 backoff.clone(),
948 ))),
949 )
950 };
951 let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
952
953 let leaf_only = builder.leaf_only;
954
955 Ok(Self {
956 storage: Arc::new(builder.storage),
957 notifiers: Default::default(),
958 provider: Arc::new(builder.provider),
959 leaf_fetcher: Arc::new(leaf_fetcher),
960 payload_fetcher,
961 vid_common_fetcher: vid_fetcher,
962 range_chunk_size: builder.range_chunk_size,
963 active_fetch_delay: builder.active_fetch_delay,
964 chunk_fetch_delay: builder.chunk_fetch_delay,
965 backoff,
966 retry_semaphore,
967 leaf_only,
968 })
969 }
970}
971
972impl<Types, S, P> Fetcher<Types, S, P>
973where
974 Types: NodeType,
975 Header<Types>: QueryableHeader<Types>,
976 Payload<Types>: QueryablePayload<Types>,
977 S: VersionedDataSource + 'static,
978 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
979 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
980 P: AvailabilityProvider<Types>,
981{
982 async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
983 where
984 T: Fetchable<Types>,
985 {
986 let req = req.into();
987
988 let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
998
999 match self.try_get(req).await {
1000 Ok(Some(obj)) => return Fetch::Ready(obj),
1001 Ok(None) => return passive(req, passive_fetch),
1002 Err(err) => {
1003 tracing::warn!(
1004 ?req,
1005 "unable to fetch object; spawning a task to retry: {err:#}"
1006 );
1007 },
1008 }
1009
1010 let (send, recv) = oneshot::channel();
1012
1013 let fetcher = self.clone();
1014 let mut backoff = fetcher.backoff.clone();
1015 let span = tracing::warn_span!("get retry", ?req);
1016 spawn(
1017 async move {
1018 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1019 loop {
1020 let res = {
1021 let _guard = fetcher.retry_semaphore.acquire().await;
1025 fetcher.try_get(req).await
1026 };
1027 match res {
1028 Ok(Some(obj)) => {
1029 tracing::info!(?req, "object was ready after retries");
1033 send.send(obj).ok();
1034 break;
1035 },
1036 Ok(None) => {
1037 tracing::info!(?req, "spawned fetch after retries");
1041 break;
1042 },
1043 Err(err) => {
1044 tracing::warn!(
1045 ?req,
1046 ?delay,
1047 "unable to fetch object, will retry: {err:#}"
1048 );
1049 sleep(delay).await;
1050 if let Some(next_delay) = backoff.next_backoff() {
1051 delay = next_delay;
1052 }
1053 },
1054 }
1055 }
1056 }
1057 .instrument(span),
1058 );
1059
1060 passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1063 }
1064
1065 async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1076 where
1077 T: Fetchable<Types>,
1078 {
1079 let mut tx = self.read().await.context("opening read transaction")?;
1080 match T::load(&mut tx, req).await {
1081 Ok(t) => Ok(Some(t)),
1082 Err(QueryError::Missing | QueryError::NotFound) => {
1083 tracing::debug!(?req, "object missing from local storage, will try to fetch");
1086 self.fetch::<T>(&mut tx, req).await?;
1087 Ok(None)
1088 },
1089 Err(err) => {
1090 bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1093 },
1094 }
1095 }
1096
1097 fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1109 where
1110 R: RangeBounds<usize> + Send + 'static,
1111 T: RangedFetchable<Types>,
1112 {
1113 let chunk_size = self.range_chunk_size;
1114 self.get_range_with_chunk_size(chunk_size, range)
1115 }
1116
1117 fn get_range_with_chunk_size<R, T>(
1119 self: Arc<Self>,
1120 chunk_size: usize,
1121 range: R,
1122 ) -> BoxStream<'static, Fetch<T>>
1123 where
1124 R: RangeBounds<usize> + Send + 'static,
1125 T: RangedFetchable<Types>,
1126 {
1127 let chunk_fetch_delay = self.chunk_fetch_delay;
1128 let active_fetch_delay = self.active_fetch_delay;
1129
1130 stream::iter(range_chunks(range, chunk_size))
1131 .then(move |chunk| {
1132 let self_clone = self.clone();
1133 async move {
1134 {
1135 let chunk = self_clone.get_chunk(chunk).await;
1136
1137 sleep(chunk_fetch_delay).await;
1142 stream::iter(chunk)
1143 }
1144 }
1145 })
1146 .flatten()
1147 .then(move |f| async move {
1148 match f {
1149 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1153 Fetch::Ready(_) => (),
1154 };
1155 f
1156 })
1157 .boxed()
1158 }
1159
1160 fn get_range_rev<T>(
1167 self: Arc<Self>,
1168 start: Bound<usize>,
1169 end: usize,
1170 ) -> BoxStream<'static, Fetch<T>>
1171 where
1172 T: RangedFetchable<Types>,
1173 {
1174 let chunk_size = self.range_chunk_size;
1175 self.get_range_with_chunk_size_rev(chunk_size, start, end)
1176 }
1177
1178 fn get_range_with_chunk_size_rev<T>(
1180 self: Arc<Self>,
1181 chunk_size: usize,
1182 start: Bound<usize>,
1183 end: usize,
1184 ) -> BoxStream<'static, Fetch<T>>
1185 where
1186 T: RangedFetchable<Types>,
1187 {
1188 let chunk_fetch_delay = self.chunk_fetch_delay;
1189 let active_fetch_delay = self.active_fetch_delay;
1190
1191 stream::iter(range_chunks_rev(start, end, chunk_size))
1192 .then(move |chunk| {
1193 let self_clone = self.clone();
1194 async move {
1195 {
1196 let chunk = self_clone.get_chunk(chunk).await;
1197
1198 sleep(chunk_fetch_delay).await;
1203 stream::iter(chunk.into_iter().rev())
1204 }
1205 }
1206 })
1207 .flatten()
1208 .then(move |f| async move {
1209 match f {
1210 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1214 Fetch::Ready(_) => (),
1215 };
1216 f
1217 })
1218 .boxed()
1219 }
1220
1221 async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1228 where
1229 T: RangedFetchable<Types>,
1230 {
1231 let passive_fetches = join_all(
1235 chunk
1236 .clone()
1237 .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1238 )
1239 .await;
1240
1241 match self.try_get_chunk(&chunk).await {
1242 Ok(objs) => {
1243 return objs
1246 .into_iter()
1247 .zip(passive_fetches)
1248 .enumerate()
1249 .map(move |(i, (obj, passive_fetch))| match obj {
1250 Some(obj) => Fetch::Ready(obj),
1251 None => passive(T::Request::from(chunk.start + i), passive_fetch),
1252 })
1253 .collect();
1254 },
1255 Err(err) => {
1256 tracing::warn!(
1257 ?chunk,
1258 "unable to fetch chunk; spawning a task to retry: {err:#}"
1259 );
1260 },
1261 }
1262
1263 let (send, recv): (Vec<_>, Vec<_>) =
1265 repeat_with(oneshot::channel).take(chunk.len()).unzip();
1266
1267 {
1268 let fetcher = self.clone();
1269 let mut backoff = fetcher.backoff.clone();
1270 let chunk = chunk.clone();
1271 let span = tracing::warn_span!("get_chunk retry", ?chunk);
1272 spawn(
1273 async move {
1274 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1275 loop {
1276 let res = {
1277 let _guard = fetcher.retry_semaphore.acquire().await;
1282 fetcher.try_get_chunk(&chunk).await
1283 };
1284 match res {
1285 Ok(objs) => {
1286 for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1287 if let Some(obj) = obj {
1288 tracing::info!(?chunk, i, "object was ready after retries");
1292 sender.send(obj).ok();
1293 } else {
1294 tracing::info!(?chunk, i, "spawned fetch after retries");
1299 }
1300 }
1301 break;
1302 },
1303 Err(err) => {
1304 tracing::warn!(
1305 ?chunk,
1306 ?delay,
1307 "unable to fetch chunk, will retry: {err:#}"
1308 );
1309 sleep(delay).await;
1310 if let Some(next_delay) = backoff.next_backoff() {
1311 delay = next_delay;
1312 }
1313 },
1314 }
1315 }
1316 }
1317 .instrument(span),
1318 );
1319 }
1320
1321 passive_fetches
1324 .into_iter()
1325 .zip(recv)
1326 .enumerate()
1327 .map(move |(i, (passive_fetch, recv))| {
1328 passive(
1329 T::Request::from(chunk.start + i),
1330 select_some(passive_fetch, recv.map(Result::ok)),
1331 )
1332 })
1333 .collect()
1334 }
1335
1336 async fn try_get_chunk<T>(
1349 self: &Arc<Self>,
1350 chunk: &Range<usize>,
1351 ) -> anyhow::Result<Vec<Option<T>>>
1352 where
1353 T: RangedFetchable<Types>,
1354 {
1355 let mut tx = self.read().await.context("opening read transaction")?;
1356 let ts = T::load_range(&mut tx, chunk.clone())
1357 .await
1358 .context(format!("when fetching items in range {chunk:?}"))?;
1359
1360 let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1367
1368 let mut results = Vec::with_capacity(chunk.len());
1370 for t in ts {
1371 while chunk.start + results.len() < t.height() as usize {
1373 tracing::debug!(
1374 "item {} in chunk not available, will be fetched",
1375 results.len()
1376 );
1377 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1378 .await?;
1379 results.push(None);
1380 }
1381
1382 results.push(Some(t));
1383 }
1384 while results.len() < chunk.len() {
1386 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1387 .await?;
1388 results.push(None);
1389 }
1390
1391 Ok(results)
1392 }
1393
1394 async fn fetch<T>(
1400 self: &Arc<Self>,
1401 tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1402 req: T::Request,
1403 ) -> anyhow::Result<()>
1404 where
1405 T: Fetchable<Types>,
1406 {
1407 tracing::debug!("fetching resource {req:?}");
1408
1409 let heights = Heights::load(tx)
1411 .await
1412 .context("failed to load heights; cannot definitively say object might exist")?;
1413 if req.might_exist(heights) {
1414 T::active_fetch(tx, self.clone(), req).await?;
1415 } else {
1416 tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1417 }
1418 Ok(())
1419 }
1420
1421 async fn proactive_scan(
1427 self: Arc<Self>,
1428 minor_interval: Duration,
1429 major_interval: usize,
1430 major_offset: usize,
1431 chunk_size: usize,
1432 metrics: ScannerMetrics,
1433 ) {
1434 let mut prev_height = 0;
1435
1436 for i in 0.. {
1437 let major = i % major_interval == major_offset % major_interval;
1438 let span = tracing::warn_span!("proactive scan", i, major, prev_height);
1439 metrics.running.set(1);
1440 metrics.current_scan.set(i);
1441 metrics.current_is_major.set(major as usize);
1442 async {
1443 let mut backoff = minor_interval;
1444 let max_backoff = Duration::from_secs(60);
1445 metrics.backoff.set(backoff.as_secs() as usize);
1446
1447 let heights = loop {
1450 let mut tx = match self.read().await {
1451 Ok(tx) => tx,
1452 Err(err) => {
1453 tracing::error!(
1454 ?backoff,
1455 "unable to start transaction for scan: {err:#}"
1456 );
1457 metrics.retries.update(1);
1458 sleep(backoff).await;
1459 backoff = min(2 * backoff, max_backoff);
1460 metrics.backoff.set(backoff.as_secs() as usize);
1461 continue;
1462 },
1463 };
1464 let heights = match Heights::load(&mut tx).await {
1465 Ok(heights) => heights,
1466 Err(err) => {
1467 tracing::error!(?backoff, "unable to load heights: {err:#}");
1468 metrics.retries.update(1);
1469 sleep(backoff).await;
1470 backoff = min(2 * backoff, max_backoff);
1471 metrics.backoff.set(backoff.as_secs() as usize);
1472 continue;
1473 },
1474 };
1475 metrics.retries.set(0);
1476 break heights;
1477 };
1478
1479 let minimum_block_height = heights.pruned_height.unwrap_or(0) as usize;
1482 let block_height = heights.height as usize;
1484
1485 let start = if major {
1488 tracing::warn!(
1492 start = minimum_block_height,
1493 block_height,
1494 "starting major scan"
1495 );
1496
1497 metrics.major_missing_blocks.set(0);
1500 metrics.major_missing_vid.set(0);
1501
1502 minimum_block_height
1503 } else {
1504 tracing::info!(start = prev_height, block_height, "starting minor scan");
1505 prev_height
1506 };
1507 prev_height = block_height;
1508 metrics.current_start.set(start);
1509 metrics.current_end.set(block_height);
1510 metrics.scanned_blocks.set(0);
1511 metrics.scanned_vid.set(0);
1512
1513 let mut blocks = self
1525 .clone()
1526 .get_range_with_chunk_size_rev::<PayloadMetadata<Types>>(
1527 chunk_size,
1528 Bound::Included(start),
1529 block_height.saturating_sub(1),
1530 );
1531 let mut missing_blocks = 0;
1532 while let Some(fetch) = blocks.next().await {
1533 if fetch.is_pending() {
1534 missing_blocks += 1;
1535 fetch.await;
1541 }
1542 metrics.scanned_blocks.update(1);
1543 }
1544 metrics.add_missing_blocks(major, missing_blocks);
1545
1546 let mut vid = self
1549 .clone()
1550 .get_range_with_chunk_size_rev::<VidCommonMetadata<Types>>(
1551 chunk_size,
1552 Bound::Included(start),
1553 block_height.saturating_sub(1),
1554 );
1555 let mut missing_vid = 0;
1556 while let Some(fetch) = vid.next().await {
1557 if fetch.is_pending() {
1558 missing_vid += 1;
1559 fetch.await;
1562 }
1563 metrics.scanned_vid.update(1);
1564 }
1565 metrics.add_missing_vid(major, missing_vid);
1566
1567 tracing::info!("completed proactive scan, will scan again in {minor_interval:?}");
1568
1569 metrics.running.set(0);
1571 if major {
1572 metrics.minor_missing_blocks.set(0);
1576 metrics.minor_missing_vid.set(0);
1577 }
1578
1579 sleep(minor_interval).await;
1580 }
1581 .instrument(span)
1582 .await;
1583 }
1584 }
1585}
1586
1587impl<Types, S, P> Fetcher<Types, S, P>
1588where
1589 Types: NodeType,
1590 Header<Types>: QueryableHeader<Types>,
1591 Payload<Types>: QueryablePayload<Types>,
1592 S: VersionedDataSource + 'static,
1593 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1594 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1595 + NodeStorage<Types>
1596 + PrunedHeightStorage
1597 + AggregatesStorage<Types>,
1598 P: AvailabilityProvider<Types>,
1599{
1600 #[tracing::instrument(skip_all)]
1601 async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1602 loop {
1603 let prev_aggregate = loop {
1604 let mut tx = match self.read().await {
1605 Ok(tx) => tx,
1606 Err(err) => {
1607 tracing::error!("unable to open read tx: {err:#}");
1608 sleep(Duration::from_secs(5)).await;
1609 continue;
1610 },
1611 };
1612 match tx.load_prev_aggregate().await {
1613 Ok(agg) => break agg,
1614 Err(err) => {
1615 tracing::error!("unable to load previous aggregate: {err:#}");
1616 sleep(Duration::from_secs(5)).await;
1617 continue;
1618 },
1619 }
1620 };
1621
1622 let (start, mut prev_aggregate) = match prev_aggregate {
1623 Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1624 None => (0, Aggregate::default()),
1625 };
1626
1627 tracing::info!(start, "starting aggregator");
1628 metrics.height.set(start);
1629
1630 let mut blocks = self
1631 .clone()
1632 .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1633 .then(Fetch::resolve)
1634 .ready_chunks(chunk_size)
1635 .boxed();
1636 while let Some(chunk) = blocks.next().await {
1637 let Some(last) = chunk.last() else {
1638 tracing::warn!("ready_chunks returned an empty chunk");
1640 continue;
1641 };
1642 let height = last.height();
1643 let num_blocks = chunk.len();
1644 tracing::debug!(
1645 num_blocks,
1646 height,
1647 "updating aggregate statistics for chunk"
1648 );
1649 loop {
1650 let res = async {
1651 let mut tx = self.write().await.context("opening transaction")?;
1652 let aggregate =
1653 tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1654 tx.commit().await.context("committing transaction")?;
1655 prev_aggregate = aggregate;
1656 anyhow::Result::<_>::Ok(())
1657 }
1658 .await;
1659 match res {
1660 Ok(()) => {
1661 break;
1662 },
1663 Err(err) => {
1664 tracing::warn!(
1665 num_blocks,
1666 height,
1667 "failed to update aggregates for chunk: {err:#}"
1668 );
1669 sleep(Duration::from_secs(1)).await;
1670 },
1671 }
1672 }
1673 metrics.height.set(height as usize);
1674 }
1675 tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1676 }
1677 }
1678}
1679
1680impl<Types, S, P> Fetcher<Types, S, P>
1681where
1682 Types: NodeType,
1683 S: VersionedDataSource,
1684 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1685{
1686 async fn store_and_notify<T>(&self, obj: T)
1688 where
1689 T: Storable<Types>,
1690 {
1691 let try_store = || async {
1692 let mut tx = self.storage.write().await?;
1693 obj.clone().store(&mut tx, self.leaf_only).await?;
1694 tx.commit().await
1695 };
1696
1697 let mut backoff = self.backoff.clone();
1699 backoff.reset();
1700 loop {
1701 let Err(err) = try_store().await else {
1702 break;
1703 };
1704 tracing::warn!(
1708 "failed to store fetched {} {}: {err:#}",
1709 T::name(),
1710 obj.height()
1711 );
1712
1713 let Some(delay) = backoff.next_backoff() else {
1714 break;
1715 };
1716 tracing::info!(?delay, "retrying failed operation");
1717 sleep(delay).await;
1718 }
1719
1720 obj.notify(&self.notifiers).await;
1745 }
1746}
1747
1748#[derive(Debug)]
1749struct Notifiers<Types>
1750where
1751 Types: NodeType,
1752{
1753 block: Notifier<BlockQueryData<Types>>,
1754 leaf: Notifier<LeafQueryData<Types>>,
1755 vid_common: Notifier<VidCommonQueryData<Types>>,
1756}
1757
1758impl<Types> Default for Notifiers<Types>
1759where
1760 Types: NodeType,
1761{
1762 fn default() -> Self {
1763 Self {
1764 block: Notifier::new(),
1765 leaf: Notifier::new(),
1766 vid_common: Notifier::new(),
1767 }
1768 }
1769}
1770
1771#[derive(Clone, Copy, Debug)]
1772struct Heights {
1773 height: u64,
1774 pruned_height: Option<u64>,
1775}
1776
1777impl Heights {
1778 async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1779 where
1780 Types: NodeType,
1781 Header<Types>: QueryableHeader<Types>,
1782 T: NodeStorage<Types> + PrunedHeightStorage + Send,
1783 {
1784 let height = tx.block_height().await.context("loading block height")? as u64;
1785 let pruned_height = tx
1786 .load_pruned_height()
1787 .await
1788 .context("loading pruned height")?;
1789 Ok(Self {
1790 height,
1791 pruned_height,
1792 })
1793 }
1794
1795 fn might_exist(self, h: u64) -> bool {
1796 h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1797 }
1798}
1799
1800#[async_trait]
1801impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1802 for FetchingDataSource<Types, S, P>
1803where
1804 Types: NodeType,
1805 S: VersionedDataSource + 'static,
1806 for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1807 P: Send + Sync,
1808 State: MerklizedState<Types, ARITY> + 'static,
1809 <State as MerkleTreeScheme>::Commitment: Send,
1810{
1811 async fn get_path(
1812 &self,
1813 snapshot: Snapshot<Types, State, ARITY>,
1814 key: State::Key,
1815 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1816 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1817 message: err.to_string(),
1818 })?;
1819 tx.get_path(snapshot, key).await
1820 }
1821}
1822
1823#[async_trait]
1824impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1825where
1826 Types: NodeType,
1827 Header<Types>: QueryableHeader<Types>,
1828 Payload<Types>: QueryablePayload<Types>,
1829 S: VersionedDataSource + 'static,
1830 for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1831 P: Send + Sync,
1832{
1833 async fn get_last_state_height(&self) -> QueryResult<usize> {
1834 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1835 message: err.to_string(),
1836 })?;
1837 tx.get_last_state_height().await
1838 }
1839}
1840
1841#[async_trait]
1842impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1843where
1844 Types: NodeType,
1845 Header<Types>: QueryableHeader<Types>,
1846 S: VersionedDataSource + 'static,
1847 for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
1848 P: Send + Sync,
1849{
1850 async fn block_height(&self) -> QueryResult<usize> {
1851 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1852 message: err.to_string(),
1853 })?;
1854 tx.block_height().await
1855 }
1856
1857 async fn count_transactions_in_range(
1858 &self,
1859 range: impl RangeBounds<usize> + Send,
1860 namespace: Option<NamespaceId<Types>>,
1861 ) -> QueryResult<usize> {
1862 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1863 message: err.to_string(),
1864 })?;
1865 tx.count_transactions_in_range(range, namespace).await
1866 }
1867
1868 async fn payload_size_in_range(
1869 &self,
1870 range: impl RangeBounds<usize> + Send,
1871 namespace: Option<NamespaceId<Types>>,
1872 ) -> QueryResult<usize> {
1873 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1874 message: err.to_string(),
1875 })?;
1876 tx.payload_size_in_range(range, namespace).await
1877 }
1878
1879 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1880 where
1881 ID: Into<BlockId<Types>> + Send + Sync,
1882 {
1883 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1884 message: err.to_string(),
1885 })?;
1886 tx.vid_share(id).await
1887 }
1888
1889 async fn sync_status(&self) -> QueryResult<SyncStatus> {
1890 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1891 message: err.to_string(),
1892 })?;
1893 tx.sync_status().await
1894 }
1895
1896 async fn get_header_window(
1897 &self,
1898 start: impl Into<WindowStart<Types>> + Send + Sync,
1899 end: u64,
1900 limit: usize,
1901 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
1902 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1903 message: err.to_string(),
1904 })?;
1905 tx.get_header_window(start, end, limit).await
1906 }
1907}
1908
1909#[async_trait]
1910impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
1911where
1912 Types: NodeType,
1913 Payload<Types>: QueryablePayload<Types>,
1914 Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
1915 crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
1916 S: VersionedDataSource + 'static,
1917 for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
1918 P: Send + Sync,
1919{
1920 async fn get_block_summaries(
1921 &self,
1922 request: explorer::query_data::GetBlockSummariesRequest<Types>,
1923 ) -> Result<
1924 Vec<explorer::query_data::BlockSummary<Types>>,
1925 explorer::query_data::GetBlockSummariesError,
1926 > {
1927 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1928 message: err.to_string(),
1929 })?;
1930 tx.get_block_summaries(request).await
1931 }
1932
1933 async fn get_block_detail(
1934 &self,
1935 request: explorer::query_data::BlockIdentifier<Types>,
1936 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
1937 {
1938 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1939 message: err.to_string(),
1940 })?;
1941 tx.get_block_detail(request).await
1942 }
1943
1944 async fn get_transaction_summaries(
1945 &self,
1946 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
1947 ) -> Result<
1948 Vec<explorer::query_data::TransactionSummary<Types>>,
1949 explorer::query_data::GetTransactionSummariesError,
1950 > {
1951 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1952 message: err.to_string(),
1953 })?;
1954 tx.get_transaction_summaries(request).await
1955 }
1956
1957 async fn get_transaction_detail(
1958 &self,
1959 request: explorer::query_data::TransactionIdentifier<Types>,
1960 ) -> Result<
1961 explorer::query_data::TransactionDetailResponse<Types>,
1962 explorer::query_data::GetTransactionDetailError,
1963 > {
1964 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1965 message: err.to_string(),
1966 })?;
1967 tx.get_transaction_detail(request).await
1968 }
1969
1970 async fn get_explorer_summary(
1971 &self,
1972 ) -> Result<
1973 explorer::query_data::ExplorerSummary<Types>,
1974 explorer::query_data::GetExplorerSummaryError,
1975 > {
1976 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1977 message: err.to_string(),
1978 })?;
1979 tx.get_explorer_summary().await
1980 }
1981
1982 async fn get_search_results(
1983 &self,
1984 query: TaggedBase64,
1985 ) -> Result<
1986 explorer::query_data::SearchResult<Types>,
1987 explorer::query_data::GetSearchResultsError,
1988 > {
1989 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1990 message: err.to_string(),
1991 })?;
1992 tx.get_search_results(query).await
1993 }
1994}
1995
1996pub trait AvailabilityProvider<Types: NodeType>:
1998 Provider<Types, request::LeafRequest<Types>>
1999 + Provider<Types, request::PayloadRequest>
2000 + Provider<Types, request::VidCommonRequest>
2001 + Sync
2002 + 'static
2003{
2004}
2005impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2006 P: Provider<Types, request::LeafRequest<Types>>
2007 + Provider<Types, request::PayloadRequest>
2008 + Provider<Types, request::VidCommonRequest>
2009 + Sync
2010 + 'static
2011{
2012}
2013
2014trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2015 fn might_exist(self, _heights: Heights) -> bool {
2026 true
2027 }
2028}
2029
2030#[async_trait]
2036trait Fetchable<Types>: Clone + Send + Sync + 'static
2037where
2038 Types: NodeType,
2039 Header<Types>: QueryableHeader<Types>,
2040 Payload<Types>: QueryablePayload<Types>,
2041{
2042 type Request: FetchRequest;
2044
2045 fn satisfies(&self, req: Self::Request) -> bool;
2047
2048 async fn active_fetch<S, P>(
2067 tx: &mut impl AvailabilityStorage<Types>,
2068 fetcher: Arc<Fetcher<Types, S, P>>,
2069 req: Self::Request,
2070 ) -> anyhow::Result<()>
2071 where
2072 S: VersionedDataSource + 'static,
2073 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2074 for<'a> S::ReadOnly<'a>:
2075 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2076 P: AvailabilityProvider<Types>;
2077
2078 async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2080
2081 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2086 where
2087 S: AvailabilityStorage<Types>;
2088}
2089
2090type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2091
2092#[async_trait]
2093trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2094where
2095 Types: NodeType,
2096 Header<Types>: QueryableHeader<Types>,
2097 Payload<Types>: QueryablePayload<Types>,
2098{
2099 type RangedRequest: FetchRequest + From<usize> + Send;
2100
2101 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2103 where
2104 S: AvailabilityStorage<Types>,
2105 R: RangeBounds<usize> + Send + 'static;
2106}
2107
2108trait Storable<Types: NodeType>: HeightIndexed + Clone {
2110 fn name() -> &'static str;
2112
2113 fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2115
2116 fn store(
2118 self,
2119 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2120 leaf_only: bool,
2121 ) -> impl Send + Future<Output = anyhow::Result<()>>;
2122}
2123
2124impl<Types: NodeType> Storable<Types> for BlockInfo<Types> {
2125 fn name() -> &'static str {
2126 "block info"
2127 }
2128
2129 async fn notify(&self, notifiers: &Notifiers<Types>) {
2130 self.leaf.notify(notifiers).await;
2131
2132 if let Some(block) = &self.block {
2133 block.notify(notifiers).await;
2134 }
2135 if let Some(vid) = &self.vid_common {
2136 vid.notify(notifiers).await;
2137 }
2138 }
2139
2140 async fn store(
2141 self,
2142 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2143 leaf_only: bool,
2144 ) -> anyhow::Result<()> {
2145 storage
2146 .insert_leaf_with_qc_chain(self.leaf, self.qc_chain)
2147 .await?;
2148
2149 if let Some(common) = self.vid_common {
2150 (common, self.vid_share).store(storage, leaf_only).await?;
2151 }
2152
2153 if let Some(block) = self.block {
2154 block.store(storage, leaf_only).await?;
2155 }
2156
2157 Ok(())
2158 }
2159}
2160
2161fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2163where
2164 R: RangeBounds<usize>,
2165{
2166 let mut start = match range.start_bound() {
2168 Bound::Included(i) => *i,
2169 Bound::Excluded(i) => *i + 1,
2170 Bound::Unbounded => 0,
2171 };
2172 let end = match range.end_bound() {
2173 Bound::Included(i) => *i + 1,
2174 Bound::Excluded(i) => *i,
2175 Bound::Unbounded => usize::MAX,
2176 };
2177 std::iter::from_fn(move || {
2178 let chunk_end = min(start + chunk_size, end);
2179 if chunk_end == start {
2180 return None;
2181 }
2182
2183 let chunk = start..chunk_end;
2184 start = chunk_end;
2185 Some(chunk)
2186 })
2187}
2188
2189fn range_chunks_rev(
2200 start: Bound<usize>,
2201 end: usize,
2202 chunk_size: usize,
2203) -> impl Iterator<Item = Range<usize>> {
2204 let start = match start {
2206 Bound::Included(i) => i,
2207 Bound::Excluded(i) => i + 1,
2208 Bound::Unbounded => 0,
2209 };
2210 let mut end = end + 1;
2212
2213 std::iter::from_fn(move || {
2214 let chunk_start = max(start, end.saturating_sub(chunk_size));
2215 if end <= chunk_start {
2216 return None;
2217 }
2218
2219 let chunk = chunk_start..end;
2220 end = chunk_start;
2221 Some(chunk)
2222 })
2223}
2224
2225trait ResultExt<T, E> {
2226 fn ok_or_trace(self) -> Option<T>
2227 where
2228 E: Display;
2229}
2230
2231impl<T, E> ResultExt<T, E> for Result<T, E> {
2232 fn ok_or_trace(self) -> Option<T>
2233 where
2234 E: Display,
2235 {
2236 match self {
2237 Ok(t) => Some(t),
2238 Err(err) => {
2239 tracing::info!(
2240 "error loading resource from local storage, will try to fetch: {err:#}"
2241 );
2242 None
2243 },
2244 }
2245 }
2246}
2247
2248#[derive(Debug)]
2249struct ScannerMetrics {
2250 running: Box<dyn Gauge>,
2252 current_scan: Box<dyn Gauge>,
2254 current_is_major: Box<dyn Gauge>,
2256 backoff: Box<dyn Gauge>,
2258 retries: Box<dyn Gauge>,
2260 current_start: Box<dyn Gauge>,
2262 current_end: Box<dyn Gauge>,
2264 scanned_blocks: Box<dyn Gauge>,
2266 scanned_vid: Box<dyn Gauge>,
2268 major_missing_blocks: Box<dyn Gauge>,
2270 major_missing_vid: Box<dyn Gauge>,
2272 minor_missing_blocks: Box<dyn Gauge>,
2275 minor_missing_vid: Box<dyn Gauge>,
2278}
2279
2280impl ScannerMetrics {
2281 fn new(metrics: &PrometheusMetrics) -> Self {
2282 let group = metrics.subgroup("scanner".into());
2283 Self {
2284 running: group.create_gauge("running".into(), None),
2285 current_scan: group.create_gauge("current".into(), None),
2286 current_is_major: group.create_gauge("is_major".into(), None),
2287 backoff: group.create_gauge("backoff".into(), Some("s".into())),
2288 retries: group.create_gauge("retries".into(), None),
2289 current_start: group.create_gauge("start".into(), None),
2290 current_end: group.create_gauge("end".into(), None),
2291 scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2292 scanned_vid: group.create_gauge("scanned_vid".into(), None),
2293 major_missing_blocks: group.create_gauge("major_missing_blocks".into(), None),
2294 major_missing_vid: group.create_gauge("major_missing_vid".into(), None),
2295 minor_missing_blocks: group.create_gauge("minor_missing_blocks".into(), None),
2296 minor_missing_vid: group.create_gauge("minor_missing_vid".into(), None),
2297 }
2298 }
2299
2300 fn add_missing_blocks(&self, major: bool, missing: usize) {
2301 if major {
2302 self.major_missing_blocks.set(missing);
2303 } else {
2304 self.minor_missing_blocks.update(missing as i64);
2305 }
2306 }
2307
2308 fn add_missing_vid(&self, major: bool, missing: usize) {
2309 if major {
2310 self.major_missing_vid.set(missing);
2311 } else {
2312 self.minor_missing_vid.update(missing as i64);
2313 }
2314 }
2315}
2316
2317#[derive(Debug)]
2318struct AggregatorMetrics {
2319 height: Box<dyn Gauge>,
2321}
2322
2323impl AggregatorMetrics {
2324 fn new(metrics: &PrometheusMetrics) -> Self {
2325 let group = metrics.subgroup("aggregator".into());
2326 Self {
2327 height: group.create_gauge("height".into(), None),
2328 }
2329 }
2330}
2331
2332fn passive<T>(
2336 req: impl Debug + Send + 'static,
2337 fut: impl Future<Output = Option<T>> + Send + 'static,
2338) -> Fetch<T>
2339where
2340 T: Send + 'static,
2341{
2342 Fetch::Pending(
2343 fut.then(move |opt| async move {
2344 match opt {
2345 Some(t) => t,
2346 None => {
2347 panic!("notifier dropped without satisfying request {req:?}");
2362 },
2363 }
2364 })
2365 .boxed(),
2366 )
2367}
2368
2369async fn select_some<T>(
2371 a: impl Future<Output = Option<T>> + Unpin,
2372 b: impl Future<Output = Option<T>> + Unpin,
2373) -> Option<T> {
2374 match future::select(a, b).await {
2375 Either::Left((Some(a), _)) => Some(a),
2377 Either::Right((Some(b), _)) => Some(b),
2378
2379 Either::Left((None, b)) => b.await,
2381 Either::Right((None, a)) => a.await,
2382 }
2383}
2384
2385#[cfg(test)]
2386mod test {
2387 use super::*;
2388
2389 #[test]
2390 fn test_range_chunks() {
2391 assert_eq!(
2393 range_chunks(0..=4, 2).collect::<Vec<_>>(),
2394 [0..2, 2..4, 4..5]
2395 );
2396
2397 assert_eq!(
2399 range_chunks(0..=5, 2).collect::<Vec<_>>(),
2400 [0..2, 2..4, 4..6]
2401 );
2402
2403 assert_eq!(
2405 range_chunks(0..5, 2).collect::<Vec<_>>(),
2406 [0..2, 2..4, 4..5]
2407 );
2408
2409 assert_eq!(
2411 range_chunks(0..6, 2).collect::<Vec<_>>(),
2412 [0..2, 2..4, 4..6]
2413 );
2414
2415 assert_eq!(
2417 range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2418 [0..2, 2..4, 4..6, 6..8, 8..10]
2419 );
2420 }
2421
2422 #[test]
2423 fn test_range_chunks_rev() {
2424 assert_eq!(
2426 range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2427 [3..5, 1..3, 0..1]
2428 );
2429
2430 assert_eq!(
2432 range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2433 [4..6, 2..4, 0..2]
2434 );
2435
2436 assert_eq!(
2438 range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2439 [4..6, 2..4, 1..2]
2440 );
2441
2442 assert_eq!(
2444 range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2445 [3..5, 1..3]
2446 );
2447 }
2448}