pub type FileSystemDataSource<Types, P> = FetchingDataSource<Types, FileSystemStorage<Types>, P>;
Expand description
A data source for the APIs provided in this crate, backed by the local file system.
Synchronization and atomicity of persisted data structures are provided via [atomic_store
].
The methods commit
, revert
, and
skip_version
of this type and its associated Transaction
type can be
used to control synchronization in the underlying AtomicStore
.
Note that because AtomicStore
only allows changes to be made to
the underlying store, a Transaction
takes full control of the whole store, and does not
permit concurrent readers or other transactions while in flight. This is enforced internally via
a global RwLock
, and is a significant downside of this storage implementation, compared to the
more relaxed concurrency semantics of a SQL implementation.
§Extension and Composition
FileSystemDataSource
is designed to be both extensible (so you can add additional state to
the API modules defined in this crate) and composable (so you can use FileSystemDataSource
as one component of a larger state type for an application with additional modules).
§Extension
Adding additional, application-specific state to FileSystemDataSource
is possible by
wrapping it in ExtensibleDataSource
:
type AppState = &'static str;
let data_source: ExtensibleDataSource<FileSystemDataSource<AppTypes, NoFetching>, AppState> =
ExtensibleDataSource::new(FileSystemDataSource::create(storage_path, NoFetching).await?, "app state");
The ExtensibleDataSource
wrapper implements all the same data
source traits as FileSystemDataSource
, and also provides access to the AppState
parameter
for use in API endpoint handlers. This can be used to implement an app-specific data source
trait and add a new API endpoint that uses this app-specific data, as described in the
extension guide.
§Composition
Composing FileSystemDataSource
with other module states is in principle simple – just
create an aggregate struct containing both FileSystemDataSource
and your additional module
states. A complication arises from how persistent storage is managed: if other modules have
their own persistent state, should the storage of FileSystemDataSource
and the other modules
be completely independent, or synchronized under the control of a single
AtomicStore
? FileSystemDataSource
supports both patterns:
when you create it with create
or open
, it will open its own
AtomicStore
and manage the synchronization of its own storage,
independent of any other persistent data it might be composed with. But when you create it with
create_with_store
or open_with_store
,
you may ask it to register its persistent data structures with an existing
[AtomicStoreLoader
]. If you register other modules’ persistent data structures with the same
loader, you can create one AtomicStore
that synchronizes all the
persistent data. Note, though, that when you choose to use
create_with_store
or open_with_store
,
you become responsible for ensuring that calls to
AtomicStore::commit_version
alternate with calls
to commit
or skip_version
.
In the following example, we compose HotShot query service modules with other application-
specific modules, using a single top-level AtomicStore
to
synchronize all persistent storage.
struct AppState {
// Top-level storage coordinator
store: AtomicStore,
hotshot_qs: FileSystemDataSource<AppTypes, NoFetching>,
// additional state for other modules
}
async fn init_server<Ver: StaticVersionType + 'static>(
storage_path: &Path,
hotshot: SystemContextHandle<AppTypes, AppNodeImpl, AppVersions>,
) -> anyhow::Result<App<Arc<RwLock<AppState>>, Error>> {
let mut loader = AtomicStoreLoader::create(storage_path, "my_app")?; // or `open`
let hotshot_qs = FileSystemDataSource::create_with_store(&mut loader, NoFetching)
.await?;
// Initialize storage for other modules using the same loader.
let store = AtomicStore::open(loader)?;
let state = Arc::new(RwLock::new(AppState {
store,
hotshot_qs,
// additional state for other modules
}));
let mut app = App::with_state(state.clone());
// Register API modules.
spawn(async move {
let mut events = hotshot.event_stream();
while let Some(event) = events.next().await {
let mut state = state.write().await;
if state.hotshot_qs.update(&event).await.is_err() {
continue;
}
// Update other modules' states based on `event`.
let mut tx = state.hotshot_qs.write().await.unwrap();
// Do updates
tx.commit().await.unwrap();
// Commit or skip versions for other modules' storage.
state.store.commit_version().unwrap();
}
});
Ok(app)
}
Aliased Type§
pub struct FileSystemDataSource<Types, P> {
fetcher: Arc<Fetcher<Types, FileSystemStorage<Types>, P>>,
scanner: Option<BackgroundTask>,
aggregator: Option<BackgroundTask>,
pruner: Pruner<Types, FileSystemStorage<Types>>,
}
Fields§
§fetcher: Arc<Fetcher<Types, FileSystemStorage<Types>, P>>
§scanner: Option<BackgroundTask>
§aggregator: Option<BackgroundTask>
§pruner: Pruner<Types, FileSystemStorage<Types>>
Implementations§
Source§impl<Types: NodeType, P> FileSystemDataSource<Types, P>where
Payload<Types>: QueryablePayload<Types>,
Header<Types>: QueryableHeader<Types>,
P: AvailabilityProvider<Types>,
impl<Types: NodeType, P> FileSystemDataSource<Types, P>where
Payload<Types>: QueryablePayload<Types>,
Header<Types>: QueryableHeader<Types>,
P: AvailabilityProvider<Types>,
Sourcepub async fn create(path: &Path, provider: P) -> Result<Self>
pub async fn create(path: &Path, provider: P) -> Result<Self>
Create a new FileSystemDataSource with storage at path
.
If there is already data at path
, it will be archived.
The FileSystemDataSource will manage its own persistence synchronization.
Sourcepub async fn open(path: &Path, provider: P) -> Result<Self>
pub async fn open(path: &Path, provider: P) -> Result<Self>
Open an existing FileSystemDataSource from storage at path
.
If there is no data at path
, a new store will be created.
The FileSystemDataSource will manage its own persistence synchronization.
Sourcepub async fn create_with_store(
loader: &mut AtomicStoreLoader,
provider: P,
) -> Result<Self>
pub async fn create_with_store( loader: &mut AtomicStoreLoader, provider: P, ) -> Result<Self>
Create a new FileSystemDataSource using a persistent storage loader.
If there is existing data corresponding to the FileSystemDataSource data structures, it will be archived.
The FileSystemDataSource will register its persistent data structures with loader
. The
caller is responsible for creating an AtomicStore from loader
and managing synchronization of the store.
Sourcepub async fn open_with_store(
loader: &mut AtomicStoreLoader,
provider: P,
) -> Result<Self>
pub async fn open_with_store( loader: &mut AtomicStoreLoader, provider: P, ) -> Result<Self>
Open an existing FileSystemDataSource using a persistent storage loader.
If there is no existing data corresponding to the FileSystemDataSource data structures, a new store will be created.
The FileSystemDataSource will register its persistent data structures with loader
. The
caller is responsible for creating an AtomicStore from loader
and managing synchronization of the store.
Sourcepub async fn skip_version(&self) -> Result<()>
pub async fn skip_version(&self) -> Result<()>
Advance the version of the persistent store without committing changes to persistent state.
This function is useful when the AtomicStore synchronizing storage for this FileSystemDataSource is being managed by the caller. The caller may want to persist some changes to other modules whose state is managed by the same AtomicStore. In order to call AtomicStore::commit_version, the version of this FileSystemDataSource must be advanced, either by commit or, if there are no outstanding changes, skip_version.