struct Fetcher<Types, S, P>where
Types: NodeType,{
storage: Arc<S>,
notifiers: Notifiers<Types>,
provider: Arc<P>,
leaf_fetcher: Arc<Fetcher<LeafRequest<Types>, LeafCallback<Types, S, P>>>,
payload_fetcher: Option<Arc<Fetcher<PayloadRequest, PayloadCallback<Types, S, P>>>>,
vid_common_fetcher: Option<Arc<Fetcher<VidCommonRequest, VidCommonCallback<Types, S, P>>>>,
range_chunk_size: usize,
active_fetch_delay: Duration,
chunk_fetch_delay: Duration,
backoff: ExponentialBackoff,
retry_semaphore: Arc<Semaphore>,
leaf_only: bool,
}
Expand description
Asynchronous retrieval and storage of Fetchable
resources.
Fields§
§storage: Arc<S>
§notifiers: Notifiers<Types>
§provider: Arc<P>
§leaf_fetcher: Arc<Fetcher<LeafRequest<Types>, LeafCallback<Types, S, P>>>
§payload_fetcher: Option<Arc<Fetcher<PayloadRequest, PayloadCallback<Types, S, P>>>>
§vid_common_fetcher: Option<Arc<Fetcher<VidCommonRequest, VidCommonCallback<Types, S, P>>>>
§range_chunk_size: usize
§active_fetch_delay: Duration
§chunk_fetch_delay: Duration
§backoff: ExponentialBackoff
§retry_semaphore: Arc<Semaphore>
§leaf_only: bool
Implementations§
Source§impl<Types, S, P> Fetcher<Types, S, P>where
Types: NodeType,
Header<Types>: QueryableHeader<Types>,
S: VersionedDataSource + Sync,
for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
impl<Types, S, P> Fetcher<Types, S, P>where
Types: NodeType,
Header<Types>: QueryableHeader<Types>,
S: VersionedDataSource + Sync,
for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
Source§impl<Types, S, P> Fetcher<Types, S, P>where
Types: NodeType,
Header<Types>: QueryableHeader<Types>,
Payload<Types>: QueryablePayload<Types>,
S: VersionedDataSource + 'static,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
P: AvailabilityProvider<Types>,
impl<Types, S, P> Fetcher<Types, S, P>where
Types: NodeType,
Header<Types>: QueryableHeader<Types>,
Payload<Types>: QueryablePayload<Types>,
S: VersionedDataSource + 'static,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
P: AvailabilityProvider<Types>,
async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>where
T: Fetchable<Types>,
Sourceasync fn try_get<T>(self: &Arc<Self>, req: T::Request) -> Result<Option<T>>where
T: Fetchable<Types>,
async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> Result<Option<T>>where
T: Fetchable<Types>,
Try to get an object from local storage or initialize a fetch if it is missing.
There are three possible scenarios in this function, indicated by the return type:
Ok(Some(obj))
: the requested object was available locally and successfully retrieved from the database; no fetch was spawnedOk(None)
: the requested object was not available locally, but a fetch was successfully spawned if possible (in other words, if a fetch was not spawned, it was determined that the requested object is not fetchable)Err(_)
: it could not be determined whether the object was available locally or whether it could be fetched; no fetch was spawned even though the object may be fetchable
Sourcefn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
Get a range of objects from local storage or a provider.
Convert a finite stream of fallible local storage lookups into a (possibly infinite) stream
of infallible fetches. Objects in range
are loaded from local storage. Any gaps or missing
objects are filled by fetching from a provider. Items in the resulting stream are futures
that will never fail to produce a resource, although they may block indefinitely if the
resource needs to be fetched.
Objects are loaded and fetched in chunks, which strikes a good balance of limiting the total number of storage and network requests, while also keeping the amount of simultaneous resource consumption bounded.
Sourcefn get_range_with_chunk_size<R, T>(
self: Arc<Self>,
chunk_size: usize,
range: R,
) -> BoxStream<'static, Fetch<T>>
fn get_range_with_chunk_size<R, T>( self: Arc<Self>, chunk_size: usize, range: R, ) -> BoxStream<'static, Fetch<T>>
Same as Self::get_range
, but uses the given chunk size instead of the default.
Sourcefn get_range_rev<T>(
self: Arc<Self>,
start: Bound<usize>,
end: usize,
) -> BoxStream<'static, Fetch<T>>where
T: RangedFetchable<Types>,
fn get_range_rev<T>(
self: Arc<Self>,
start: Bound<usize>,
end: usize,
) -> BoxStream<'static, Fetch<T>>where
T: RangedFetchable<Types>,
Same as Self::get_range
, but yields objects in reverse order by height.
Note that unlike Self::get_range
, which accepts any range and yields an infinite stream
if the range has no upper bound, this function requires there to be a defined upper bound,
otherwise we don’t know where the reversed stream should start. The end
bound given here
is inclusive; i.e. the first item yielded by the stream will have height end
.
Sourcefn get_range_with_chunk_size_rev<T>(
self: Arc<Self>,
chunk_size: usize,
start: Bound<usize>,
end: usize,
) -> BoxStream<'static, Fetch<T>>where
T: RangedFetchable<Types>,
fn get_range_with_chunk_size_rev<T>(
self: Arc<Self>,
chunk_size: usize,
start: Bound<usize>,
end: usize,
) -> BoxStream<'static, Fetch<T>>where
T: RangedFetchable<Types>,
Same as Self::get_range_rev
, but uses the given chunk size instead of the default.
Sourceasync fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>where
T: RangedFetchable<Types>,
async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>where
T: RangedFetchable<Types>,
Get a range of objects from local storage or a provider.
This method is similar to get_range
, except that:
- It fetches all desired objects together, as a single chunk
- It loads the object or triggers fetches right now rather than providing a lazy stream which only fetches objects when polled.
Sourceasync fn try_get_chunk<T>(
self: &Arc<Self>,
chunk: &Range<usize>,
) -> Result<Vec<Option<T>>>where
T: RangedFetchable<Types>,
async fn try_get_chunk<T>(
self: &Arc<Self>,
chunk: &Range<usize>,
) -> Result<Vec<Option<T>>>where
T: RangedFetchable<Types>,
Try to get a range of objects from local storage, initializing fetches if any are missing.
If this function succeeded, then for each object in the requested range, either:
- the object was available locally, and corresponds to
Some(_)
object in the result - the object was not available locally (and corresponds to
None
in the result), but a fetch was successfully spawned if possible (in other words, if a fetch was not spawned, it was determined that the requested object is not fetchable)
This function will fail if it could not be determined which objects in the requested range are available locally, or if, for any missing object, it could not be determined whether that object is fetchable. In this case, there may be no fetch spawned for certain objects in the requested range, even if those objects are actually fetchable.
Sourceasync fn fetch<T>(
self: &Arc<Self>,
tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
req: T::Request,
) -> Result<()>where
T: Fetchable<Types>,
async fn fetch<T>(
self: &Arc<Self>,
tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
req: T::Request,
) -> Result<()>where
T: Fetchable<Types>,
Spawn an active fetch for the requested object, if possible.
On success, either an active fetch for req
has been spawned, or it has been determined
that req
is not fetchable. Fails if it cannot be determined (e.g. due to errors in the
local database) whether req
is fetchable or not.
Sourceasync fn proactive_scan(
self: Arc<Self>,
minor_interval: Duration,
major_interval: usize,
major_offset: usize,
chunk_size: usize,
metrics: ScannerMetrics,
)
async fn proactive_scan( self: Arc<Self>, minor_interval: Duration, major_interval: usize, major_offset: usize, chunk_size: usize, metrics: ScannerMetrics, )
Proactively search for and retrieve missing objects.
This function will proactively identify and retrieve blocks and leaves which are missing from storage. It will run until cancelled, thus, it is meant to be spawned as a background task rather than called synchronously.
Source§impl<Types, S, P> Fetcher<Types, S, P>where
Types: NodeType,
Header<Types>: QueryableHeader<Types>,
Payload<Types>: QueryablePayload<Types>,
S: VersionedDataSource + 'static,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage + AggregatesStorage<Types>,
P: AvailabilityProvider<Types>,
impl<Types, S, P> Fetcher<Types, S, P>where
Types: NodeType,
Header<Types>: QueryableHeader<Types>,
Payload<Types>: QueryablePayload<Types>,
S: VersionedDataSource + 'static,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage + AggregatesStorage<Types>,
P: AvailabilityProvider<Types>,
async fn aggregate( self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics, )
Source§impl<Types, S, P> Fetcher<Types, S, P>where
Types: NodeType,
S: VersionedDataSource,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
impl<Types, S, P> Fetcher<Types, S, P>where
Types: NodeType,
S: VersionedDataSource,
for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
Sourceasync fn store_and_notify<T>(&self, obj: T)where
T: Storable<Types>,
async fn store_and_notify<T>(&self, obj: T)where
T: Storable<Types>,
Store an object and notify anyone waiting on this object that it is available.
Trait Implementations§
Source§impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
Source§type Transaction<'a> = <S as VersionedDataSource>::Transaction<'a>
where
Self: 'a
type Transaction<'a> = <S as VersionedDataSource>::Transaction<'a> where Self: 'a
type ReadOnly<'a> = <S as VersionedDataSource>::ReadOnly<'a> where Self: 'a
Source§async fn write(&self) -> Result<Self::Transaction<'_>>
async fn write(&self) -> Result<Self::Transaction<'_>>
Auto Trait Implementations§
impl<Types, S, P> !Freeze for Fetcher<Types, S, P>
impl<Types, S, P> !RefUnwindSafe for Fetcher<Types, S, P>
impl<Types, S, P> Send for Fetcher<Types, S, P>
impl<Types, S, P> Sync for Fetcher<Types, S, P>
impl<Types, S, P> Unpin for Fetcher<Types, S, P>
impl<Types, S, P> !UnwindSafe for Fetcher<Types, S, P>
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<F, W, T, D> Deserialize<With<T, W>, D> for F
impl<F, W, T, D> Deserialize<With<T, W>, D> for F
§fn deserialize(
&self,
deserializer: &mut D,
) -> Result<With<T, W>, <D as Fallible>::Error>
fn deserialize( &self, deserializer: &mut D, ) -> Result<With<T, W>, <D as Fallible>::Error>
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.