1use std::{
77 cmp::{max, min},
78 fmt::{Debug, Display},
79 iter::repeat_with,
80 marker::PhantomData,
81 ops::{Bound, Range, RangeBounds},
82 sync::Arc,
83 time::Duration,
84};
85
86use anyhow::{bail, Context};
87use async_lock::Semaphore;
88use async_trait::async_trait;
89use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder};
90use derivative::Derivative;
91use futures::{
92 channel::oneshot,
93 future::{self, join_all, BoxFuture, Either, Future, FutureExt},
94 stream::{self, BoxStream, StreamExt},
95};
96use hotshot_types::{
97 data::VidShare,
98 traits::{
99 metrics::{Gauge, Metrics},
100 node_implementation::NodeType,
101 },
102};
103use jf_merkle_tree::{prelude::MerkleProof, MerkleTreeScheme};
104use tagged_base64::TaggedBase64;
105use tokio::{spawn, time::sleep};
106use tracing::Instrument;
107
108use super::{
109 notifier::Notifier,
110 storage::{
111 pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
112 sql::MigrateTypes,
113 Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
114 MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
115 UpdateAvailabilityStorage,
116 },
117 Transaction, VersionedDataSource,
118};
119use crate::{
120 availability::{
121 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
122 FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
123 PayloadQueryData, QueryableHeader, QueryablePayload, StateCertQueryDataV2, TransactionHash,
124 UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
125 },
126 explorer::{self, ExplorerDataSource},
127 fetching::{
128 self,
129 request::{self, StateCertRequest},
130 Provider,
131 },
132 merklized_state::{
133 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
134 },
135 metrics::PrometheusMetrics,
136 node::{NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart},
137 status::{HasMetrics, StatusDataSource},
138 task::BackgroundTask,
139 types::HeightIndexed,
140 Header, Payload, QueryError, QueryResult,
141};
142
143mod block;
144mod header;
145mod leaf;
146mod state_cert;
147mod transaction;
148mod vid;
149
150use self::{
151 block::PayloadFetcher,
152 leaf::LeafFetcher,
153 transaction::TransactionRequest,
154 vid::{VidCommonFetcher, VidCommonRequest},
155};
156
157pub struct Builder<Types, S, P> {
159 storage: S,
160 provider: P,
161 backoff: ExponentialBackoffBuilder,
162 rate_limit: usize,
163 range_chunk_size: usize,
164 minor_scan_interval: Duration,
165 major_scan_interval: usize,
166 major_scan_offset: usize,
167 proactive_range_chunk_size: Option<usize>,
168 active_fetch_delay: Duration,
169 chunk_fetch_delay: Duration,
170 proactive_fetching: bool,
171 aggregator: bool,
172 aggregator_chunk_size: Option<usize>,
173 types_migration_batch_size: u64,
174 leaf_only: bool,
175 _types: PhantomData<Types>,
176}
177
178impl<Types, S, P> Builder<Types, S, P> {
179 pub fn new(storage: S, provider: P) -> Self {
181 let mut default_backoff = ExponentialBackoffBuilder::default();
182 default_backoff
183 .with_initial_interval(Duration::from_secs(1))
184 .with_multiplier(2.)
185 .with_max_interval(Duration::from_secs(32))
186 .with_max_elapsed_time(Some(Duration::from_secs(64)));
187
188 Self {
189 storage,
190 provider,
191 backoff: default_backoff,
192 rate_limit: 32,
193 range_chunk_size: 25,
194 minor_scan_interval: Duration::from_secs(60),
198 major_scan_interval: 60,
202 major_scan_offset: 0,
205 proactive_range_chunk_size: None,
206 active_fetch_delay: Duration::from_millis(50),
207 chunk_fetch_delay: Duration::from_millis(100),
208 proactive_fetching: true,
209 aggregator: true,
210 aggregator_chunk_size: None,
211 types_migration_batch_size: 10000,
212 leaf_only: false,
213 _types: Default::default(),
214 }
215 }
216
217 pub fn leaf_only(mut self) -> Self {
218 self.leaf_only = true;
219 self
220 }
221
222 pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
224 self.backoff.with_initial_interval(interval);
225 self
226 }
227
228 pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
230 self.backoff.with_max_interval(interval);
231 self
232 }
233
234 pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
236 self.backoff.with_multiplier(multiplier);
237 self
238 }
239
240 pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
242 self.backoff.with_randomization_factor(factor);
243 self
244 }
245
246 pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
248 self.backoff.with_max_elapsed_time(Some(timeout));
249 self
250 }
251
252 pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
254 self.rate_limit = with_rate_limit;
255 self
256 }
257
258 pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
265 self.range_chunk_size = range_chunk_size;
266 self
267 }
268
269 pub fn with_minor_scan_interval(mut self, interval: Duration) -> Self {
273 self.minor_scan_interval = interval;
274 self
275 }
276
277 pub fn with_major_scan_interval(mut self, interval: usize) -> Self {
282 self.major_scan_interval = interval;
283 self
284 }
285
286 pub fn with_major_scan_offset(mut self, offset: usize) -> Self {
296 self.major_scan_offset = offset;
297 self
298 }
299
300 pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
309 self.proactive_range_chunk_size = Some(range_chunk_size);
310 self
311 }
312
313 pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
319 self.active_fetch_delay = active_fetch_delay;
320 self
321 }
322
323 pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
335 self.chunk_fetch_delay = chunk_fetch_delay;
336 self
337 }
338
339 pub fn disable_proactive_fetching(mut self) -> Self {
348 self.proactive_fetching = false;
349 self
350 }
351
352 pub fn disable_aggregator(mut self) -> Self {
357 self.aggregator = false;
358 self
359 }
360
361 pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
370 self.aggregator_chunk_size = Some(chunk_size);
371 self
372 }
373
374 pub fn with_types_migration_batch_size(mut self, batch: u64) -> Self {
378 self.types_migration_batch_size = batch;
379 self
380 }
381
382 pub fn is_leaf_only(&self) -> bool {
383 self.leaf_only
384 }
385}
386
387impl<Types, S, P> Builder<Types, S, P>
388where
389 Types: NodeType,
390 Payload<Types>: QueryablePayload<Types>,
391 Header<Types>: QueryableHeader<Types>,
392 S: PruneStorage + VersionedDataSource + HasMetrics + MigrateTypes<Types> + 'static,
393 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
394 + PrunedHeightStorage
395 + NodeStorage<Types>
396 + AggregatesStorage<Types>,
397 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
398 P: AvailabilityProvider<Types>,
399{
400 pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
402 FetchingDataSource::new(self).await
403 }
404}
405
406#[derive(Derivative)]
423#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
424pub struct FetchingDataSource<Types, S, P>
425where
426 Types: NodeType,
427{
428 fetcher: Arc<Fetcher<Types, S, P>>,
433 scanner: Option<BackgroundTask>,
435 aggregator: Option<BackgroundTask>,
437 pruner: Pruner<Types, S>,
438}
439
440#[derive(Derivative)]
441#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, "))]
442pub struct Pruner<Types, S>
443where
444 Types: NodeType,
445{
446 handle: Option<BackgroundTask>,
447 _types: PhantomData<(Types, S)>,
448}
449
450impl<Types, S> Pruner<Types, S>
451where
452 Types: NodeType,
453 Header<Types>: QueryableHeader<Types>,
454 Payload<Types>: QueryablePayload<Types>,
455 S: PruneStorage + Send + Sync + 'static,
456{
457 async fn new(storage: Arc<S>) -> Self {
458 let cfg = storage.get_pruning_config();
459 let Some(cfg) = cfg else {
460 return Self {
461 handle: None,
462 _types: Default::default(),
463 };
464 };
465
466 let future = async move {
467 for i in 1.. {
468 tracing::warn!("starting pruner run {i} ");
469 Self::prune(storage.clone()).await;
470 sleep(cfg.interval()).await;
471 }
472 };
473
474 let task = BackgroundTask::spawn("pruner", future);
475
476 Self {
477 handle: Some(task),
478 _types: Default::default(),
479 }
480 }
481
482 async fn prune(storage: Arc<S>) {
483 let mut pruner = S::Pruner::default();
485 loop {
486 match storage.prune(&mut pruner).await {
487 Ok(Some(height)) => {
488 tracing::warn!("Pruned to height {height}");
489 },
490 Ok(None) => {
491 tracing::warn!("pruner run complete.");
492 break;
493 },
494 Err(e) => {
495 tracing::error!("pruner run failed: {e:?}");
496 break;
497 },
498 }
499 }
500 }
501}
502
503impl<Types, S, P> FetchingDataSource<Types, S, P>
504where
505 Types: NodeType,
506 Payload<Types>: QueryablePayload<Types>,
507 Header<Types>: QueryableHeader<Types>,
508 S: VersionedDataSource + PruneStorage + HasMetrics + MigrateTypes<Types> + 'static,
509 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
510 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
511 + NodeStorage<Types>
512 + PrunedHeightStorage
513 + AggregatesStorage<Types>,
514 P: AvailabilityProvider<Types>,
515{
516 pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
518 Builder::new(storage, provider)
519 }
520
521 async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
522 let leaf_only = builder.is_leaf_only();
523 let aggregator = builder.aggregator;
524 let aggregator_chunk_size = builder
525 .aggregator_chunk_size
526 .unwrap_or(builder.range_chunk_size);
527 let proactive_fetching = builder.proactive_fetching;
528 let minor_interval = builder.minor_scan_interval;
529 let major_interval = builder.major_scan_interval;
530 let major_offset = builder.major_scan_offset;
531 let proactive_range_chunk_size = builder
532 .proactive_range_chunk_size
533 .unwrap_or(builder.range_chunk_size);
534 let migration_batch_size = builder.types_migration_batch_size;
535 let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
536 let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
537
538 let fetcher = Arc::new(Fetcher::new(builder).await?);
539
540 fetcher.storage.migrate_types(migration_batch_size).await?;
545
546 let scanner = if proactive_fetching && !leaf_only {
547 Some(BackgroundTask::spawn(
548 "proactive scanner",
549 fetcher.clone().proactive_scan(
550 minor_interval,
551 major_interval,
552 major_offset,
553 proactive_range_chunk_size,
554 scanner_metrics,
555 ),
556 ))
557 } else {
558 None
559 };
560
561 let aggregator = if aggregator && !leaf_only {
562 Some(BackgroundTask::spawn(
563 "aggregator",
564 fetcher
565 .clone()
566 .aggregate(aggregator_chunk_size, aggregator_metrics),
567 ))
568 } else {
569 None
570 };
571
572 let storage = fetcher.storage.clone();
573
574 let pruner = Pruner::new(storage).await;
575 let ds = Self {
576 fetcher,
577 scanner,
578 pruner,
579 aggregator,
580 };
581
582 Ok(ds)
583 }
584
585 pub fn inner(&self) -> Arc<S> {
587 self.fetcher.storage.clone()
588 }
589}
590
591impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
592where
593 Types: NodeType,
594{
595 fn as_ref(&self) -> &S {
596 &self.fetcher.storage
597 }
598}
599
600impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
601where
602 Types: NodeType,
603 S: HasMetrics,
604{
605 fn metrics(&self) -> &PrometheusMetrics {
606 self.as_ref().metrics()
607 }
608}
609
610#[async_trait]
611impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
612where
613 Types: NodeType,
614 Header<Types>: QueryableHeader<Types>,
615 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
616 for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
617 P: Send + Sync,
618{
619 async fn block_height(&self) -> QueryResult<usize> {
620 let mut tx = self.read().await.map_err(|err| QueryError::Error {
621 message: err.to_string(),
622 })?;
623 tx.block_height().await
624 }
625}
626
627#[async_trait]
628impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
629where
630 Types: NodeType,
631 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
632 for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
633 P: Send + Sync,
634{
635 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
636 let mut tx = self.read().await?;
637 tx.load_pruned_height().await
638 }
639}
640
641#[async_trait]
642impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
643where
644 Types: NodeType,
645 Header<Types>: QueryableHeader<Types>,
646 Payload<Types>: QueryablePayload<Types>,
647 S: VersionedDataSource + 'static,
648 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
649 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
650 P: AvailabilityProvider<Types>,
651{
652 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
653 where
654 ID: Into<LeafId<Types>> + Send + Sync,
655 {
656 self.fetcher.get(id.into()).await
657 }
658
659 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
660 where
661 ID: Into<BlockId<Types>> + Send + Sync,
662 {
663 self.fetcher
664 .get::<HeaderQueryData<_>>(id.into())
665 .await
666 .map(|h| h.header)
667 }
668
669 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
670 where
671 ID: Into<BlockId<Types>> + Send + Sync,
672 {
673 self.fetcher.get(id.into()).await
674 }
675
676 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
677 where
678 ID: Into<BlockId<Types>> + Send + Sync,
679 {
680 self.fetcher.get(id.into()).await
681 }
682
683 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
684 where
685 ID: Into<BlockId<Types>> + Send + Sync,
686 {
687 self.fetcher.get(id.into()).await
688 }
689
690 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
691 where
692 ID: Into<BlockId<Types>> + Send + Sync,
693 {
694 self.fetcher.get(VidCommonRequest::from(id.into())).await
695 }
696
697 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
698 where
699 ID: Into<BlockId<Types>> + Send + Sync,
700 {
701 self.fetcher.get(VidCommonRequest::from(id.into())).await
702 }
703
704 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
705 where
706 R: RangeBounds<usize> + Send + 'static,
707 {
708 self.fetcher.clone().get_range(range)
709 }
710
711 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
712 where
713 R: RangeBounds<usize> + Send + 'static,
714 {
715 self.fetcher.clone().get_range(range)
716 }
717
718 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
719 where
720 R: RangeBounds<usize> + Send + 'static,
721 {
722 let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
723
724 leaves
725 .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
726 .boxed()
727 }
728
729 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
730 where
731 R: RangeBounds<usize> + Send + 'static,
732 {
733 self.fetcher.clone().get_range(range)
734 }
735
736 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
737 where
738 R: RangeBounds<usize> + Send + 'static,
739 {
740 self.fetcher.clone().get_range(range)
741 }
742
743 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
744 where
745 R: RangeBounds<usize> + Send + 'static,
746 {
747 self.fetcher.clone().get_range(range)
748 }
749
750 async fn get_vid_common_metadata_range<R>(
751 &self,
752 range: R,
753 ) -> FetchStream<VidCommonMetadata<Types>>
754 where
755 R: RangeBounds<usize> + Send + 'static,
756 {
757 self.fetcher.clone().get_range(range)
758 }
759
760 async fn get_leaf_range_rev(
761 &self,
762 start: Bound<usize>,
763 end: usize,
764 ) -> FetchStream<LeafQueryData<Types>> {
765 self.fetcher.clone().get_range_rev(start, end)
766 }
767
768 async fn get_block_range_rev(
769 &self,
770 start: Bound<usize>,
771 end: usize,
772 ) -> FetchStream<BlockQueryData<Types>> {
773 self.fetcher.clone().get_range_rev(start, end)
774 }
775
776 async fn get_payload_range_rev(
777 &self,
778 start: Bound<usize>,
779 end: usize,
780 ) -> FetchStream<PayloadQueryData<Types>> {
781 self.fetcher.clone().get_range_rev(start, end)
782 }
783
784 async fn get_payload_metadata_range_rev(
785 &self,
786 start: Bound<usize>,
787 end: usize,
788 ) -> FetchStream<PayloadMetadata<Types>> {
789 self.fetcher.clone().get_range_rev(start, end)
790 }
791
792 async fn get_vid_common_range_rev(
793 &self,
794 start: Bound<usize>,
795 end: usize,
796 ) -> FetchStream<VidCommonQueryData<Types>> {
797 self.fetcher.clone().get_range_rev(start, end)
798 }
799
800 async fn get_vid_common_metadata_range_rev(
801 &self,
802 start: Bound<usize>,
803 end: usize,
804 ) -> FetchStream<VidCommonMetadata<Types>> {
805 self.fetcher.clone().get_range_rev(start, end)
806 }
807
808 async fn get_block_containing_transaction(
809 &self,
810 h: TransactionHash<Types>,
811 ) -> Fetch<BlockWithTransaction<Types>> {
812 self.fetcher.clone().get(TransactionRequest::from(h)).await
813 }
814
815 async fn get_state_cert(&self, epoch: u64) -> Fetch<StateCertQueryDataV2<Types>> {
816 self.fetcher.get(StateCertRequest::from(epoch)).await
817 }
818}
819
820impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
821where
822 Types: NodeType,
823 Header<Types>: QueryableHeader<Types>,
824 Payload<Types>: QueryablePayload<Types>,
825 S: VersionedDataSource + 'static,
826 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
827 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
828 P: AvailabilityProvider<Types>,
829{
830 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
831 let height = info.height() as usize;
832 let fetch_block = info.block.is_none();
833 let fetch_vid = info.vid_common.is_none();
834
835 leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
837
838 self.fetcher.store_and_notify(info).await;
839
840 if fetch_block || fetch_vid {
841 let fetcher = self.fetcher.clone();
844 let span = tracing::info_span!("fetch missing data", height);
845 spawn(
846 async move {
847 tracing::info!(fetch_block, fetch_vid, "fetching missing data");
848 if fetch_block {
849 fetcher.get::<PayloadMetadata<Types>>(height).await;
850 }
851 if fetch_vid {
852 fetcher.get::<VidCommonMetadata<Types>>(height).await;
853 }
854 }
855 .instrument(span),
856 );
857 }
858 Ok(())
859 }
860}
861
862impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
863where
864 Types: NodeType,
865 S: VersionedDataSource + Send + Sync,
866 P: Send + Sync,
867{
868 type Transaction<'a>
869 = S::Transaction<'a>
870 where
871 Self: 'a;
872 type ReadOnly<'a>
873 = S::ReadOnly<'a>
874 where
875 Self: 'a;
876
877 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
878 self.fetcher.write().await
879 }
880
881 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
882 self.fetcher.read().await
883 }
884}
885
886#[derive(Debug)]
888struct Fetcher<Types, S, P>
889where
890 Types: NodeType,
891{
892 storage: Arc<S>,
893 notifiers: Notifiers<Types>,
894 provider: Arc<P>,
895 leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
896 payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
897 vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
898 range_chunk_size: usize,
899 active_fetch_delay: Duration,
901 chunk_fetch_delay: Duration,
903 backoff: ExponentialBackoff,
905 retry_semaphore: Arc<Semaphore>,
908 leaf_only: bool,
909}
910
911impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
912where
913 Types: NodeType,
914 S: VersionedDataSource + Send + Sync,
915 P: Send + Sync,
916{
917 type Transaction<'a>
918 = S::Transaction<'a>
919 where
920 Self: 'a;
921 type ReadOnly<'a>
922 = S::ReadOnly<'a>
923 where
924 Self: 'a;
925
926 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
927 self.storage.write().await
928 }
929
930 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
931 self.storage.read().await
932 }
933}
934
935impl<Types, S, P> Fetcher<Types, S, P>
936where
937 Types: NodeType,
938 Header<Types>: QueryableHeader<Types>,
939 S: VersionedDataSource + Sync,
940 for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
941{
942 pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
943 let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
944 let backoff = builder.backoff.build();
945
946 let (payload_fetcher, vid_fetcher) = if builder.is_leaf_only() {
947 (None, None)
948 } else {
949 (
950 Some(Arc::new(fetching::Fetcher::new(
951 retry_semaphore.clone(),
952 backoff.clone(),
953 ))),
954 Some(Arc::new(fetching::Fetcher::new(
955 retry_semaphore.clone(),
956 backoff.clone(),
957 ))),
958 )
959 };
960 let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
961
962 let leaf_only = builder.leaf_only;
963
964 Ok(Self {
965 storage: Arc::new(builder.storage),
966 notifiers: Default::default(),
967 provider: Arc::new(builder.provider),
968 leaf_fetcher: Arc::new(leaf_fetcher),
969 payload_fetcher,
970 vid_common_fetcher: vid_fetcher,
971 range_chunk_size: builder.range_chunk_size,
972 active_fetch_delay: builder.active_fetch_delay,
973 chunk_fetch_delay: builder.chunk_fetch_delay,
974 backoff,
975 retry_semaphore,
976 leaf_only,
977 })
978 }
979}
980
981impl<Types, S, P> Fetcher<Types, S, P>
982where
983 Types: NodeType,
984 Header<Types>: QueryableHeader<Types>,
985 Payload<Types>: QueryablePayload<Types>,
986 S: VersionedDataSource + 'static,
987 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
988 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
989 P: AvailabilityProvider<Types>,
990{
991 async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
992 where
993 T: Fetchable<Types>,
994 {
995 let req = req.into();
996
997 let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
1007
1008 match self.try_get(req).await {
1009 Ok(Some(obj)) => return Fetch::Ready(obj),
1010 Ok(None) => return passive(req, passive_fetch),
1011 Err(err) => {
1012 tracing::warn!(
1013 ?req,
1014 "unable to fetch object; spawning a task to retry: {err:#}"
1015 );
1016 },
1017 }
1018
1019 let (send, recv) = oneshot::channel();
1021
1022 let fetcher = self.clone();
1023 let mut backoff = fetcher.backoff.clone();
1024 let span = tracing::warn_span!("get retry", ?req);
1025 spawn(
1026 async move {
1027 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1028 loop {
1029 let res = {
1030 let _guard = fetcher.retry_semaphore.acquire().await;
1034 fetcher.try_get(req).await
1035 };
1036 match res {
1037 Ok(Some(obj)) => {
1038 tracing::info!(?req, "object was ready after retries");
1042 send.send(obj).ok();
1043 break;
1044 },
1045 Ok(None) => {
1046 tracing::info!(?req, "spawned fetch after retries");
1050 break;
1051 },
1052 Err(err) => {
1053 tracing::warn!(
1054 ?req,
1055 ?delay,
1056 "unable to fetch object, will retry: {err:#}"
1057 );
1058 sleep(delay).await;
1059 if let Some(next_delay) = backoff.next_backoff() {
1060 delay = next_delay;
1061 }
1062 },
1063 }
1064 }
1065 }
1066 .instrument(span),
1067 );
1068
1069 passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1072 }
1073
1074 async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1085 where
1086 T: Fetchable<Types>,
1087 {
1088 let mut tx = self.read().await.context("opening read transaction")?;
1089 match T::load(&mut tx, req).await {
1090 Ok(t) => Ok(Some(t)),
1091 Err(QueryError::Missing | QueryError::NotFound) => {
1092 tracing::debug!(?req, "object missing from local storage, will try to fetch");
1095 self.fetch::<T>(&mut tx, req).await?;
1096 Ok(None)
1097 },
1098 Err(err) => {
1099 bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1102 },
1103 }
1104 }
1105
1106 fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1118 where
1119 R: RangeBounds<usize> + Send + 'static,
1120 T: RangedFetchable<Types>,
1121 {
1122 let chunk_size = self.range_chunk_size;
1123 self.get_range_with_chunk_size(chunk_size, range)
1124 }
1125
1126 fn get_range_with_chunk_size<R, T>(
1128 self: Arc<Self>,
1129 chunk_size: usize,
1130 range: R,
1131 ) -> BoxStream<'static, Fetch<T>>
1132 where
1133 R: RangeBounds<usize> + Send + 'static,
1134 T: RangedFetchable<Types>,
1135 {
1136 let chunk_fetch_delay = self.chunk_fetch_delay;
1137 let active_fetch_delay = self.active_fetch_delay;
1138
1139 stream::iter(range_chunks(range, chunk_size))
1140 .then(move |chunk| {
1141 let self_clone = self.clone();
1142 async move {
1143 {
1144 let chunk = self_clone.get_chunk(chunk).await;
1145
1146 sleep(chunk_fetch_delay).await;
1151 stream::iter(chunk)
1152 }
1153 }
1154 })
1155 .flatten()
1156 .then(move |f| async move {
1157 match f {
1158 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1162 Fetch::Ready(_) => (),
1163 };
1164 f
1165 })
1166 .boxed()
1167 }
1168
1169 fn get_range_rev<T>(
1176 self: Arc<Self>,
1177 start: Bound<usize>,
1178 end: usize,
1179 ) -> BoxStream<'static, Fetch<T>>
1180 where
1181 T: RangedFetchable<Types>,
1182 {
1183 let chunk_size = self.range_chunk_size;
1184 self.get_range_with_chunk_size_rev(chunk_size, start, end)
1185 }
1186
1187 fn get_range_with_chunk_size_rev<T>(
1189 self: Arc<Self>,
1190 chunk_size: usize,
1191 start: Bound<usize>,
1192 end: usize,
1193 ) -> BoxStream<'static, Fetch<T>>
1194 where
1195 T: RangedFetchable<Types>,
1196 {
1197 let chunk_fetch_delay = self.chunk_fetch_delay;
1198 let active_fetch_delay = self.active_fetch_delay;
1199
1200 stream::iter(range_chunks_rev(start, end, chunk_size))
1201 .then(move |chunk| {
1202 let self_clone = self.clone();
1203 async move {
1204 {
1205 let chunk = self_clone.get_chunk(chunk).await;
1206
1207 sleep(chunk_fetch_delay).await;
1212 stream::iter(chunk.into_iter().rev())
1213 }
1214 }
1215 })
1216 .flatten()
1217 .then(move |f| async move {
1218 match f {
1219 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1223 Fetch::Ready(_) => (),
1224 };
1225 f
1226 })
1227 .boxed()
1228 }
1229
1230 async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1237 where
1238 T: RangedFetchable<Types>,
1239 {
1240 let passive_fetches = join_all(
1244 chunk
1245 .clone()
1246 .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1247 )
1248 .await;
1249
1250 match self.try_get_chunk(&chunk).await {
1251 Ok(objs) => {
1252 return objs
1255 .into_iter()
1256 .zip(passive_fetches)
1257 .enumerate()
1258 .map(move |(i, (obj, passive_fetch))| match obj {
1259 Some(obj) => Fetch::Ready(obj),
1260 None => passive(T::Request::from(chunk.start + i), passive_fetch),
1261 })
1262 .collect();
1263 },
1264 Err(err) => {
1265 tracing::warn!(
1266 ?chunk,
1267 "unable to fetch chunk; spawning a task to retry: {err:#}"
1268 );
1269 },
1270 }
1271
1272 let (send, recv): (Vec<_>, Vec<_>) =
1274 repeat_with(oneshot::channel).take(chunk.len()).unzip();
1275
1276 {
1277 let fetcher = self.clone();
1278 let mut backoff = fetcher.backoff.clone();
1279 let chunk = chunk.clone();
1280 let span = tracing::warn_span!("get_chunk retry", ?chunk);
1281 spawn(
1282 async move {
1283 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1284 loop {
1285 let res = {
1286 let _guard = fetcher.retry_semaphore.acquire().await;
1291 fetcher.try_get_chunk(&chunk).await
1292 };
1293 match res {
1294 Ok(objs) => {
1295 for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1296 if let Some(obj) = obj {
1297 tracing::info!(?chunk, i, "object was ready after retries");
1301 sender.send(obj).ok();
1302 } else {
1303 tracing::info!(?chunk, i, "spawned fetch after retries");
1308 }
1309 }
1310 break;
1311 },
1312 Err(err) => {
1313 tracing::warn!(
1314 ?chunk,
1315 ?delay,
1316 "unable to fetch chunk, will retry: {err:#}"
1317 );
1318 sleep(delay).await;
1319 if let Some(next_delay) = backoff.next_backoff() {
1320 delay = next_delay;
1321 }
1322 },
1323 }
1324 }
1325 }
1326 .instrument(span),
1327 );
1328 }
1329
1330 passive_fetches
1333 .into_iter()
1334 .zip(recv)
1335 .enumerate()
1336 .map(move |(i, (passive_fetch, recv))| {
1337 passive(
1338 T::Request::from(chunk.start + i),
1339 select_some(passive_fetch, recv.map(Result::ok)),
1340 )
1341 })
1342 .collect()
1343 }
1344
1345 async fn try_get_chunk<T>(
1358 self: &Arc<Self>,
1359 chunk: &Range<usize>,
1360 ) -> anyhow::Result<Vec<Option<T>>>
1361 where
1362 T: RangedFetchable<Types>,
1363 {
1364 let mut tx = self.read().await.context("opening read transaction")?;
1365 let ts = T::load_range(&mut tx, chunk.clone())
1366 .await
1367 .context(format!("when fetching items in range {chunk:?}"))?;
1368
1369 let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1376
1377 let mut results = Vec::with_capacity(chunk.len());
1379 for t in ts {
1380 while chunk.start + results.len() < t.height() as usize {
1382 tracing::debug!(
1383 "item {} in chunk not available, will be fetched",
1384 results.len()
1385 );
1386 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1387 .await?;
1388 results.push(None);
1389 }
1390
1391 results.push(Some(t));
1392 }
1393 while results.len() < chunk.len() {
1395 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1396 .await?;
1397 results.push(None);
1398 }
1399
1400 Ok(results)
1401 }
1402
1403 async fn fetch<T>(
1409 self: &Arc<Self>,
1410 tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1411 req: T::Request,
1412 ) -> anyhow::Result<()>
1413 where
1414 T: Fetchable<Types>,
1415 {
1416 tracing::debug!("fetching resource {req:?}");
1417
1418 let heights = Heights::load(tx)
1420 .await
1421 .context("failed to load heights; cannot definitively say object might exist")?;
1422 if req.might_exist(heights) {
1423 T::active_fetch(tx, self.clone(), req).await?;
1424 } else {
1425 tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1426 }
1427 Ok(())
1428 }
1429
1430 async fn proactive_scan(
1436 self: Arc<Self>,
1437 minor_interval: Duration,
1438 major_interval: usize,
1439 major_offset: usize,
1440 chunk_size: usize,
1441 metrics: ScannerMetrics,
1442 ) {
1443 let mut prev_height = 0;
1444
1445 for i in 0.. {
1446 let major = i % major_interval == major_offset % major_interval;
1447 let span = tracing::warn_span!("proactive scan", i, major, prev_height);
1448 metrics.running.set(1);
1449 metrics.current_scan.set(i);
1450 metrics.current_is_major.set(major as usize);
1451 async {
1452 let mut backoff = minor_interval;
1453 let max_backoff = Duration::from_secs(60);
1454 metrics.backoff.set(backoff.as_secs() as usize);
1455
1456 let heights = loop {
1459 let mut tx = match self.read().await {
1460 Ok(tx) => tx,
1461 Err(err) => {
1462 tracing::error!(
1463 ?backoff,
1464 "unable to start transaction for scan: {err:#}"
1465 );
1466 metrics.retries.update(1);
1467 sleep(backoff).await;
1468 backoff = min(2 * backoff, max_backoff);
1469 metrics.backoff.set(backoff.as_secs() as usize);
1470 continue;
1471 },
1472 };
1473 let heights = match Heights::load(&mut tx).await {
1474 Ok(heights) => heights,
1475 Err(err) => {
1476 tracing::error!(?backoff, "unable to load heights: {err:#}");
1477 metrics.retries.update(1);
1478 sleep(backoff).await;
1479 backoff = min(2 * backoff, max_backoff);
1480 metrics.backoff.set(backoff.as_secs() as usize);
1481 continue;
1482 },
1483 };
1484 metrics.retries.set(0);
1485 break heights;
1486 };
1487
1488 let minimum_block_height = heights.pruned_height.unwrap_or(0) as usize;
1491 let block_height = heights.height as usize;
1493
1494 let start = if major {
1497 tracing::warn!(
1501 start = minimum_block_height,
1502 block_height,
1503 "starting major scan"
1504 );
1505
1506 metrics.major_missing_blocks.set(0);
1509 metrics.major_missing_vid.set(0);
1510
1511 minimum_block_height
1512 } else {
1513 tracing::info!(start = prev_height, block_height, "starting minor scan");
1514 prev_height
1515 };
1516 prev_height = block_height;
1517 metrics.current_start.set(start);
1518 metrics.current_end.set(block_height);
1519 metrics.scanned_blocks.set(0);
1520 metrics.scanned_vid.set(0);
1521
1522 let mut blocks = self
1534 .clone()
1535 .get_range_with_chunk_size_rev::<PayloadMetadata<Types>>(
1536 chunk_size,
1537 Bound::Included(start),
1538 block_height.saturating_sub(1),
1539 );
1540 let mut missing_blocks = 0;
1541 while let Some(fetch) = blocks.next().await {
1542 if fetch.is_pending() {
1543 missing_blocks += 1;
1544 fetch.await;
1550 }
1551 metrics.scanned_blocks.update(1);
1552 }
1553 metrics.add_missing_blocks(major, missing_blocks);
1554
1555 let mut vid = self
1558 .clone()
1559 .get_range_with_chunk_size_rev::<VidCommonMetadata<Types>>(
1560 chunk_size,
1561 Bound::Included(start),
1562 block_height.saturating_sub(1),
1563 );
1564 let mut missing_vid = 0;
1565 while let Some(fetch) = vid.next().await {
1566 if fetch.is_pending() {
1567 missing_vid += 1;
1568 fetch.await;
1571 }
1572 metrics.scanned_vid.update(1);
1573 }
1574 metrics.add_missing_vid(major, missing_vid);
1575
1576 tracing::info!("completed proactive scan, will scan again in {minor_interval:?}");
1577
1578 metrics.running.set(0);
1580 if major {
1581 metrics.minor_missing_blocks.set(0);
1585 metrics.minor_missing_vid.set(0);
1586 }
1587
1588 sleep(minor_interval).await;
1589 }
1590 .instrument(span)
1591 .await;
1592 }
1593 }
1594}
1595
1596impl<Types, S, P> Fetcher<Types, S, P>
1597where
1598 Types: NodeType,
1599 Header<Types>: QueryableHeader<Types>,
1600 Payload<Types>: QueryablePayload<Types>,
1601 S: VersionedDataSource + 'static,
1602 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1603 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1604 + NodeStorage<Types>
1605 + PrunedHeightStorage
1606 + AggregatesStorage<Types>,
1607 P: AvailabilityProvider<Types>,
1608{
1609 #[tracing::instrument(skip_all)]
1610 async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1611 loop {
1612 let prev_aggregate = loop {
1613 let mut tx = match self.read().await {
1614 Ok(tx) => tx,
1615 Err(err) => {
1616 tracing::error!("unable to open read tx: {err:#}");
1617 sleep(Duration::from_secs(5)).await;
1618 continue;
1619 },
1620 };
1621 match tx.load_prev_aggregate().await {
1622 Ok(agg) => break agg,
1623 Err(err) => {
1624 tracing::error!("unable to load previous aggregate: {err:#}");
1625 sleep(Duration::from_secs(5)).await;
1626 continue;
1627 },
1628 }
1629 };
1630
1631 let (start, mut prev_aggregate) = match prev_aggregate {
1632 Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1633 None => (0, Aggregate::default()),
1634 };
1635
1636 tracing::info!(start, "starting aggregator");
1637 metrics.height.set(start);
1638
1639 let mut blocks = self
1640 .clone()
1641 .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1642 .then(Fetch::resolve)
1643 .ready_chunks(chunk_size)
1644 .boxed();
1645 while let Some(chunk) = blocks.next().await {
1646 let Some(last) = chunk.last() else {
1647 tracing::warn!("ready_chunks returned an empty chunk");
1649 continue;
1650 };
1651 let height = last.height();
1652 let num_blocks = chunk.len();
1653 tracing::debug!(
1654 num_blocks,
1655 height,
1656 "updating aggregate statistics for chunk"
1657 );
1658 loop {
1659 let res = async {
1660 let mut tx = self.write().await.context("opening transaction")?;
1661 let aggregate =
1662 tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1663 tx.commit().await.context("committing transaction")?;
1664 prev_aggregate = aggregate;
1665 anyhow::Result::<_>::Ok(())
1666 }
1667 .await;
1668 match res {
1669 Ok(()) => {
1670 break;
1671 },
1672 Err(err) => {
1673 tracing::warn!(
1674 num_blocks,
1675 height,
1676 "failed to update aggregates for chunk: {err:#}"
1677 );
1678 sleep(Duration::from_secs(1)).await;
1679 },
1680 }
1681 }
1682 metrics.height.set(height as usize);
1683 }
1684 tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1685 }
1686 }
1687}
1688
1689impl<Types, S, P> Fetcher<Types, S, P>
1690where
1691 Types: NodeType,
1692 S: VersionedDataSource,
1693 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1694{
1695 async fn store_and_notify<T>(&self, obj: T)
1697 where
1698 T: Storable<Types>,
1699 {
1700 let try_store = || async {
1701 let mut tx = self.storage.write().await?;
1702 obj.clone().store(&mut tx, self.leaf_only).await?;
1703 tx.commit().await
1704 };
1705
1706 let mut backoff = self.backoff.clone();
1708 backoff.reset();
1709 loop {
1710 let Err(err) = try_store().await else {
1711 break;
1712 };
1713 tracing::warn!(
1717 "failed to store fetched {} {}: {err:#}",
1718 T::name(),
1719 obj.height()
1720 );
1721
1722 let Some(delay) = backoff.next_backoff() else {
1723 break;
1724 };
1725 tracing::info!(?delay, "retrying failed operation");
1726 sleep(delay).await;
1727 }
1728
1729 obj.notify(&self.notifiers).await;
1754 }
1755}
1756
1757#[derive(Debug)]
1758struct Notifiers<Types>
1759where
1760 Types: NodeType,
1761{
1762 block: Notifier<BlockQueryData<Types>>,
1763 leaf: Notifier<LeafQueryData<Types>>,
1764 vid_common: Notifier<VidCommonQueryData<Types>>,
1765 state_cert: Notifier<StateCertQueryDataV2<Types>>,
1766}
1767
1768impl<Types> Default for Notifiers<Types>
1769where
1770 Types: NodeType,
1771{
1772 fn default() -> Self {
1773 Self {
1774 block: Notifier::new(),
1775 leaf: Notifier::new(),
1776 vid_common: Notifier::new(),
1777 state_cert: Notifier::new(),
1778 }
1779 }
1780}
1781
1782#[derive(Clone, Copy, Debug)]
1783struct Heights {
1784 height: u64,
1785 pruned_height: Option<u64>,
1786}
1787
1788impl Heights {
1789 async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1790 where
1791 Types: NodeType,
1792 Header<Types>: QueryableHeader<Types>,
1793 T: NodeStorage<Types> + PrunedHeightStorage + Send,
1794 {
1795 let height = tx.block_height().await.context("loading block height")? as u64;
1796 let pruned_height = tx
1797 .load_pruned_height()
1798 .await
1799 .context("loading pruned height")?;
1800 Ok(Self {
1801 height,
1802 pruned_height,
1803 })
1804 }
1805
1806 fn might_exist(self, h: u64) -> bool {
1807 h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1808 }
1809}
1810
1811#[async_trait]
1812impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1813 for FetchingDataSource<Types, S, P>
1814where
1815 Types: NodeType,
1816 S: VersionedDataSource + 'static,
1817 for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1818 P: Send + Sync,
1819 State: MerklizedState<Types, ARITY> + 'static,
1820 <State as MerkleTreeScheme>::Commitment: Send,
1821{
1822 async fn get_path(
1823 &self,
1824 snapshot: Snapshot<Types, State, ARITY>,
1825 key: State::Key,
1826 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1827 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1828 message: err.to_string(),
1829 })?;
1830 tx.get_path(snapshot, key).await
1831 }
1832}
1833
1834#[async_trait]
1835impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1836where
1837 Types: NodeType,
1838 Header<Types>: QueryableHeader<Types>,
1839 Payload<Types>: QueryablePayload<Types>,
1840 S: VersionedDataSource + 'static,
1841 for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1842 P: Send + Sync,
1843{
1844 async fn get_last_state_height(&self) -> QueryResult<usize> {
1845 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1846 message: err.to_string(),
1847 })?;
1848 tx.get_last_state_height().await
1849 }
1850}
1851
1852#[async_trait]
1853impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1854where
1855 Types: NodeType,
1856 Header<Types>: QueryableHeader<Types>,
1857 S: VersionedDataSource + 'static,
1858 for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
1859 P: Send + Sync,
1860{
1861 async fn block_height(&self) -> QueryResult<usize> {
1862 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1863 message: err.to_string(),
1864 })?;
1865 tx.block_height().await
1866 }
1867
1868 async fn count_transactions_in_range(
1869 &self,
1870 range: impl RangeBounds<usize> + Send,
1871 namespace: Option<NamespaceId<Types>>,
1872 ) -> QueryResult<usize> {
1873 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1874 message: err.to_string(),
1875 })?;
1876 tx.count_transactions_in_range(range, namespace).await
1877 }
1878
1879 async fn payload_size_in_range(
1880 &self,
1881 range: impl RangeBounds<usize> + Send,
1882 namespace: Option<NamespaceId<Types>>,
1883 ) -> QueryResult<usize> {
1884 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1885 message: err.to_string(),
1886 })?;
1887 tx.payload_size_in_range(range, namespace).await
1888 }
1889
1890 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1891 where
1892 ID: Into<BlockId<Types>> + Send + Sync,
1893 {
1894 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1895 message: err.to_string(),
1896 })?;
1897 tx.vid_share(id).await
1898 }
1899
1900 async fn sync_status(&self) -> QueryResult<SyncStatus> {
1901 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1902 message: err.to_string(),
1903 })?;
1904 tx.sync_status().await
1905 }
1906
1907 async fn get_header_window(
1908 &self,
1909 start: impl Into<WindowStart<Types>> + Send + Sync,
1910 end: u64,
1911 limit: usize,
1912 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
1913 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1914 message: err.to_string(),
1915 })?;
1916 tx.get_header_window(start, end, limit).await
1917 }
1918}
1919
1920#[async_trait]
1921impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
1922where
1923 Types: NodeType,
1924 Payload<Types>: QueryablePayload<Types>,
1925 Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
1926 crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
1927 S: VersionedDataSource + 'static,
1928 for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
1929 P: Send + Sync,
1930{
1931 async fn get_block_summaries(
1932 &self,
1933 request: explorer::query_data::GetBlockSummariesRequest<Types>,
1934 ) -> Result<
1935 Vec<explorer::query_data::BlockSummary<Types>>,
1936 explorer::query_data::GetBlockSummariesError,
1937 > {
1938 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1939 message: err.to_string(),
1940 })?;
1941 tx.get_block_summaries(request).await
1942 }
1943
1944 async fn get_block_detail(
1945 &self,
1946 request: explorer::query_data::BlockIdentifier<Types>,
1947 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
1948 {
1949 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1950 message: err.to_string(),
1951 })?;
1952 tx.get_block_detail(request).await
1953 }
1954
1955 async fn get_transaction_summaries(
1956 &self,
1957 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
1958 ) -> Result<
1959 Vec<explorer::query_data::TransactionSummary<Types>>,
1960 explorer::query_data::GetTransactionSummariesError,
1961 > {
1962 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1963 message: err.to_string(),
1964 })?;
1965 tx.get_transaction_summaries(request).await
1966 }
1967
1968 async fn get_transaction_detail(
1969 &self,
1970 request: explorer::query_data::TransactionIdentifier<Types>,
1971 ) -> Result<
1972 explorer::query_data::TransactionDetailResponse<Types>,
1973 explorer::query_data::GetTransactionDetailError,
1974 > {
1975 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1976 message: err.to_string(),
1977 })?;
1978 tx.get_transaction_detail(request).await
1979 }
1980
1981 async fn get_explorer_summary(
1982 &self,
1983 ) -> Result<
1984 explorer::query_data::ExplorerSummary<Types>,
1985 explorer::query_data::GetExplorerSummaryError,
1986 > {
1987 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1988 message: err.to_string(),
1989 })?;
1990 tx.get_explorer_summary().await
1991 }
1992
1993 async fn get_search_results(
1994 &self,
1995 query: TaggedBase64,
1996 ) -> Result<
1997 explorer::query_data::SearchResult<Types>,
1998 explorer::query_data::GetSearchResultsError,
1999 > {
2000 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2001 message: err.to_string(),
2002 })?;
2003 tx.get_search_results(query).await
2004 }
2005}
2006
2007pub trait AvailabilityProvider<Types: NodeType>:
2009 Provider<Types, request::LeafRequest<Types>>
2010 + Provider<Types, request::PayloadRequest>
2011 + Provider<Types, request::VidCommonRequest>
2012 + Sync
2013 + 'static
2014{
2015}
2016impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2017 P: Provider<Types, request::LeafRequest<Types>>
2018 + Provider<Types, request::PayloadRequest>
2019 + Provider<Types, request::VidCommonRequest>
2020 + Sync
2021 + 'static
2022{
2023}
2024
2025trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2026 fn might_exist(self, _heights: Heights) -> bool {
2037 true
2038 }
2039}
2040
2041#[async_trait]
2047trait Fetchable<Types>: Clone + Send + Sync + 'static
2048where
2049 Types: NodeType,
2050 Header<Types>: QueryableHeader<Types>,
2051 Payload<Types>: QueryablePayload<Types>,
2052{
2053 type Request: FetchRequest;
2055
2056 fn satisfies(&self, req: Self::Request) -> bool;
2058
2059 async fn active_fetch<S, P>(
2078 tx: &mut impl AvailabilityStorage<Types>,
2079 fetcher: Arc<Fetcher<Types, S, P>>,
2080 req: Self::Request,
2081 ) -> anyhow::Result<()>
2082 where
2083 S: VersionedDataSource + 'static,
2084 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2085 for<'a> S::ReadOnly<'a>:
2086 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2087 P: AvailabilityProvider<Types>;
2088
2089 async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2091
2092 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2097 where
2098 S: AvailabilityStorage<Types>;
2099}
2100
2101type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2102
2103#[async_trait]
2104trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2105where
2106 Types: NodeType,
2107 Header<Types>: QueryableHeader<Types>,
2108 Payload<Types>: QueryablePayload<Types>,
2109{
2110 type RangedRequest: FetchRequest + From<usize> + Send;
2111
2112 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2114 where
2115 S: AvailabilityStorage<Types>,
2116 R: RangeBounds<usize> + Send + 'static;
2117}
2118
2119trait Storable<Types: NodeType>: HeightIndexed + Clone {
2121 fn name() -> &'static str;
2123
2124 fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2126
2127 fn store(
2129 self,
2130 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2131 leaf_only: bool,
2132 ) -> impl Send + Future<Output = anyhow::Result<()>>;
2133}
2134
2135impl<Types: NodeType> Storable<Types> for BlockInfo<Types> {
2136 fn name() -> &'static str {
2137 "block info"
2138 }
2139
2140 async fn notify(&self, notifiers: &Notifiers<Types>) {
2141 self.leaf.notify(notifiers).await;
2142
2143 if let Some(block) = &self.block {
2144 block.notify(notifiers).await;
2145 }
2146 if let Some(vid) = &self.vid_common {
2147 vid.notify(notifiers).await;
2148 }
2149 }
2150
2151 async fn store(
2152 self,
2153 storage: &mut (impl UpdateAvailabilityStorage<Types> + Send),
2154 leaf_only: bool,
2155 ) -> anyhow::Result<()> {
2156 self.leaf.store(storage, leaf_only).await?;
2157
2158 if let Some(common) = self.vid_common {
2159 (common, self.vid_share).store(storage, leaf_only).await?;
2160 }
2161
2162 if let Some(block) = self.block {
2163 block.store(storage, leaf_only).await?;
2164 }
2165
2166 if let Some(state_cert) = self.state_cert {
2167 state_cert.store(storage, leaf_only).await?;
2168 }
2169
2170 Ok(())
2171 }
2172}
2173
2174fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2176where
2177 R: RangeBounds<usize>,
2178{
2179 let mut start = match range.start_bound() {
2181 Bound::Included(i) => *i,
2182 Bound::Excluded(i) => *i + 1,
2183 Bound::Unbounded => 0,
2184 };
2185 let end = match range.end_bound() {
2186 Bound::Included(i) => *i + 1,
2187 Bound::Excluded(i) => *i,
2188 Bound::Unbounded => usize::MAX,
2189 };
2190 std::iter::from_fn(move || {
2191 let chunk_end = min(start + chunk_size, end);
2192 if chunk_end == start {
2193 return None;
2194 }
2195
2196 let chunk = start..chunk_end;
2197 start = chunk_end;
2198 Some(chunk)
2199 })
2200}
2201
2202fn range_chunks_rev(
2213 start: Bound<usize>,
2214 end: usize,
2215 chunk_size: usize,
2216) -> impl Iterator<Item = Range<usize>> {
2217 let start = match start {
2219 Bound::Included(i) => i,
2220 Bound::Excluded(i) => i + 1,
2221 Bound::Unbounded => 0,
2222 };
2223 let mut end = end + 1;
2225
2226 std::iter::from_fn(move || {
2227 let chunk_start = max(start, end.saturating_sub(chunk_size));
2228 if end <= chunk_start {
2229 return None;
2230 }
2231
2232 let chunk = chunk_start..end;
2233 end = chunk_start;
2234 Some(chunk)
2235 })
2236}
2237
2238trait ResultExt<T, E> {
2239 fn ok_or_trace(self) -> Option<T>
2240 where
2241 E: Display;
2242}
2243
2244impl<T, E> ResultExt<T, E> for Result<T, E> {
2245 fn ok_or_trace(self) -> Option<T>
2246 where
2247 E: Display,
2248 {
2249 match self {
2250 Ok(t) => Some(t),
2251 Err(err) => {
2252 tracing::info!(
2253 "error loading resource from local storage, will try to fetch: {err:#}"
2254 );
2255 None
2256 },
2257 }
2258 }
2259}
2260
2261#[derive(Debug)]
2262struct ScannerMetrics {
2263 running: Box<dyn Gauge>,
2265 current_scan: Box<dyn Gauge>,
2267 current_is_major: Box<dyn Gauge>,
2269 backoff: Box<dyn Gauge>,
2271 retries: Box<dyn Gauge>,
2273 current_start: Box<dyn Gauge>,
2275 current_end: Box<dyn Gauge>,
2277 scanned_blocks: Box<dyn Gauge>,
2279 scanned_vid: Box<dyn Gauge>,
2281 major_missing_blocks: Box<dyn Gauge>,
2283 major_missing_vid: Box<dyn Gauge>,
2285 minor_missing_blocks: Box<dyn Gauge>,
2288 minor_missing_vid: Box<dyn Gauge>,
2291}
2292
2293impl ScannerMetrics {
2294 fn new(metrics: &PrometheusMetrics) -> Self {
2295 let group = metrics.subgroup("scanner".into());
2296 Self {
2297 running: group.create_gauge("running".into(), None),
2298 current_scan: group.create_gauge("current".into(), None),
2299 current_is_major: group.create_gauge("is_major".into(), None),
2300 backoff: group.create_gauge("backoff".into(), Some("s".into())),
2301 retries: group.create_gauge("retries".into(), None),
2302 current_start: group.create_gauge("start".into(), None),
2303 current_end: group.create_gauge("end".into(), None),
2304 scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2305 scanned_vid: group.create_gauge("scanned_vid".into(), None),
2306 major_missing_blocks: group.create_gauge("major_missing_blocks".into(), None),
2307 major_missing_vid: group.create_gauge("major_missing_vid".into(), None),
2308 minor_missing_blocks: group.create_gauge("minor_missing_blocks".into(), None),
2309 minor_missing_vid: group.create_gauge("minor_missing_vid".into(), None),
2310 }
2311 }
2312
2313 fn add_missing_blocks(&self, major: bool, missing: usize) {
2314 if major {
2315 self.major_missing_blocks.set(missing);
2316 } else {
2317 self.minor_missing_blocks.update(missing as i64);
2318 }
2319 }
2320
2321 fn add_missing_vid(&self, major: bool, missing: usize) {
2322 if major {
2323 self.major_missing_vid.set(missing);
2324 } else {
2325 self.minor_missing_vid.update(missing as i64);
2326 }
2327 }
2328}
2329
2330#[derive(Debug)]
2331struct AggregatorMetrics {
2332 height: Box<dyn Gauge>,
2334}
2335
2336impl AggregatorMetrics {
2337 fn new(metrics: &PrometheusMetrics) -> Self {
2338 let group = metrics.subgroup("aggregator".into());
2339 Self {
2340 height: group.create_gauge("height".into(), None),
2341 }
2342 }
2343}
2344
2345fn passive<T>(
2349 req: impl Debug + Send + 'static,
2350 fut: impl Future<Output = Option<T>> + Send + 'static,
2351) -> Fetch<T>
2352where
2353 T: Send + 'static,
2354{
2355 Fetch::Pending(
2356 fut.then(move |opt| async move {
2357 match opt {
2358 Some(t) => t,
2359 None => {
2360 panic!("notifier dropped without satisfying request {req:?}");
2375 },
2376 }
2377 })
2378 .boxed(),
2379 )
2380}
2381
2382async fn select_some<T>(
2384 a: impl Future<Output = Option<T>> + Unpin,
2385 b: impl Future<Output = Option<T>> + Unpin,
2386) -> Option<T> {
2387 match future::select(a, b).await {
2388 Either::Left((Some(a), _)) => Some(a),
2390 Either::Right((Some(b), _)) => Some(b),
2391
2392 Either::Left((None, b)) => b.await,
2394 Either::Right((None, a)) => a.await,
2395 }
2396}
2397
2398#[cfg(test)]
2399mod test {
2400 use super::*;
2401
2402 #[test]
2403 fn test_range_chunks() {
2404 assert_eq!(
2406 range_chunks(0..=4, 2).collect::<Vec<_>>(),
2407 [0..2, 2..4, 4..5]
2408 );
2409
2410 assert_eq!(
2412 range_chunks(0..=5, 2).collect::<Vec<_>>(),
2413 [0..2, 2..4, 4..6]
2414 );
2415
2416 assert_eq!(
2418 range_chunks(0..5, 2).collect::<Vec<_>>(),
2419 [0..2, 2..4, 4..5]
2420 );
2421
2422 assert_eq!(
2424 range_chunks(0..6, 2).collect::<Vec<_>>(),
2425 [0..2, 2..4, 4..6]
2426 );
2427
2428 assert_eq!(
2430 range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2431 [0..2, 2..4, 4..6, 6..8, 8..10]
2432 );
2433 }
2434
2435 #[test]
2436 fn test_range_chunks_rev() {
2437 assert_eq!(
2439 range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2440 [3..5, 1..3, 0..1]
2441 );
2442
2443 assert_eq!(
2445 range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2446 [4..6, 2..4, 0..2]
2447 );
2448
2449 assert_eq!(
2451 range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2452 [4..6, 2..4, 1..2]
2453 );
2454
2455 assert_eq!(
2457 range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2458 [3..5, 1..3]
2459 );
2460 }
2461}