hotshot_query_service/data_source/fs.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::path::Path;
16
17use atomic_store::AtomicStoreLoader;
18use hotshot_types::traits::node_implementation::NodeType;
19
20pub use super::storage::fs::Transaction;
21use super::{storage::FileSystemStorage, AvailabilityProvider, FetchingDataSource};
22use crate::{
23 availability::{query_data::QueryablePayload, QueryableHeader},
24 Header, Payload,
25};
26
27/// A data source for the APIs provided in this crate, backed by the local file system.
28///
29/// Synchronization and atomicity of persisted data structures are provided via [`atomic_store`].
30/// The methods [`commit`](super::Transaction::commit), [`revert`](super::Transaction::revert), and
31/// [`skip_version`](Self::skip_version) of this type and its associated [`Transaction`] type can be
32/// used to control synchronization in the underlying [`AtomicStore`](atomic_store::AtomicStore).
33///
34/// Note that because [`AtomicStore`](atomic_store::AtomicStore) only allows changes to be made to
35/// the underlying store, a [`Transaction`] takes full control of the whole store, and does not
36/// permit concurrent readers or other transactions while in flight. This is enforced internally via
37/// a global `RwLock`, and is a significant downside of this storage implementation, compared to the
38/// more relaxed concurrency semantics of a SQL implementation.
39///
40/// # Extension and Composition
41///
42/// [`FileSystemDataSource`] is designed to be both extensible (so you can add additional state to
43/// the API modules defined in this crate) and composable (so you can use [`FileSystemDataSource`]
44/// as one component of a larger state type for an application with additional modules).
45///
46/// ## Extension
47///
48/// Adding additional, application-specific state to [`FileSystemDataSource`] is possible by
49/// wrapping it in [`ExtensibleDataSource`](super::ExtensibleDataSource):
50///
51/// ```
52/// # use hotshot_query_service::data_source::{ExtensibleDataSource, FileSystemDataSource};
53/// # use hotshot_query_service::fetching::provider::NoFetching;
54/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
55/// # use std::path::Path;
56/// # async fn doc(storage_path: &Path) -> Result<(), anyhow::Error> {
57/// type AppState = &'static str;
58///
59/// let data_source: ExtensibleDataSource<FileSystemDataSource<AppTypes, NoFetching>, AppState> =
60/// ExtensibleDataSource::new(FileSystemDataSource::create(storage_path, NoFetching).await?, "app state");
61/// # Ok(())
62/// # }
63/// ```
64///
65/// The [`ExtensibleDataSource`](super::ExtensibleDataSource) wrapper implements all the same data
66/// source traits as [`FileSystemDataSource`], and also provides access to the `AppState` parameter
67/// for use in API endpoint handlers. This can be used to implement an app-specific data source
68/// trait and add a new API endpoint that uses this app-specific data, as described in the
69/// [extension guide](crate#extension).
70///
71/// ## Composition
72///
73/// Composing [`FileSystemDataSource`] with other module states is in principle simple -- just
74/// create an aggregate struct containing both [`FileSystemDataSource`] and your additional module
75/// states. A complication arises from how persistent storage is managed: if other modules have
76/// their own persistent state, should the storage of [`FileSystemDataSource`] and the other modules
77/// be completely independent, or synchronized under the control of a single
78/// [`AtomicStore`](atomic_store::AtomicStore)? [`FileSystemDataSource`] supports both patterns:
79/// when you create it with [`create`](Self::create) or [`open`](Self::open), it will open its own
80/// [`AtomicStore`](atomic_store::AtomicStore) and manage the synchronization of its own storage,
81/// independent of any other persistent data it might be composed with. But when you create it with
82/// [`create_with_store`](Self::create_with_store) or [`open_with_store`](Self::open_with_store),
83/// you may ask it to register its persistent data structures with an existing
84/// [`AtomicStoreLoader`]. If you register other modules' persistent data structures with the same
85/// loader, you can create one [`AtomicStore`](atomic_store::AtomicStore) that synchronizes all the
86/// persistent data. Note, though, that when you choose to use
87/// [`create_with_store`](Self::create_with_store) or [`open_with_store`](Self::open_with_store),
88/// you become responsible for ensuring that calls to
89/// [`AtomicStore::commit_version`](atomic_store::AtomicStore::commit_version) alternate with calls
90/// to [`commit`](super::Transaction::commit) or [`skip_version`](Self::skip_version).
91///
92/// In the following example, we compose HotShot query service modules with other application-
93/// specific modules, using a single top-level [`AtomicStore`](atomic_store::AtomicStore) to
94/// synchronize all persistent storage.
95///
96/// ```
97/// # use atomic_store::{AtomicStore, AtomicStoreLoader};
98/// # use futures::StreamExt;
99/// # use hotshot::types::SystemContextHandle;
100/// # use hotshot_query_service::Error;
101/// # use hotshot_query_service::data_source::{
102/// # FileSystemDataSource, Transaction, UpdateDataSource, VersionedDataSource,
103/// # };
104/// # use hotshot_query_service::fetching::provider::NoFetching;
105/// # use hotshot_query_service::testing::mocks::{
106/// # MockNodeImpl as AppNodeImpl, MockTypes as AppTypes, MockVersions as AppVersions
107/// # };
108/// # use hotshot_example_types::node_types::TestVersions;
109/// # use std::{path::Path, sync::Arc};
110/// # use tide_disco::App;
111/// # use tokio::{spawn, sync::RwLock};
112/// # use vbs::version::StaticVersionType;
113/// struct AppState {
114/// // Top-level storage coordinator
115/// store: AtomicStore,
116/// hotshot_qs: FileSystemDataSource<AppTypes, NoFetching>,
117/// // additional state for other modules
118/// }
119///
120/// async fn init_server<Ver: StaticVersionType + 'static>(
121/// storage_path: &Path,
122/// hotshot: SystemContextHandle<AppTypes, AppNodeImpl, AppVersions>,
123/// ) -> anyhow::Result<App<Arc<RwLock<AppState>>, Error>> {
124/// let mut loader = AtomicStoreLoader::create(storage_path, "my_app")?; // or `open`
125/// let hotshot_qs = FileSystemDataSource::create_with_store(&mut loader, NoFetching)
126/// .await?;
127/// // Initialize storage for other modules using the same loader.
128///
129/// let store = AtomicStore::open(loader)?;
130/// let state = Arc::new(RwLock::new(AppState {
131/// store,
132/// hotshot_qs,
133/// // additional state for other modules
134/// }));
135/// let mut app = App::with_state(state.clone());
136/// // Register API modules.
137///
138/// spawn(async move {
139/// let mut events = hotshot.event_stream();
140/// while let Some(event) = events.next().await {
141/// let mut state = state.write().await;
142/// if state.hotshot_qs.update(&event).await.is_err() {
143/// continue;
144/// }
145///
146/// // Update other modules' states based on `event`.
147/// let mut tx = state.hotshot_qs.write().await.unwrap();
148/// // Do updates
149/// tx.commit().await.unwrap();
150///
151/// // Commit or skip versions for other modules' storage.
152/// state.store.commit_version().unwrap();
153/// }
154/// });
155///
156/// Ok(app)
157/// }
158/// ```
159pub type FileSystemDataSource<Types, P> = FetchingDataSource<Types, FileSystemStorage<Types>, P>;
160
161impl<Types: NodeType, P> FileSystemDataSource<Types, P>
162where
163 Payload<Types>: QueryablePayload<Types>,
164 Header<Types>: QueryableHeader<Types>,
165 P: AvailabilityProvider<Types>,
166{
167 /// Create a new [FileSystemDataSource] with storage at `path`.
168 ///
169 /// If there is already data at `path`, it will be archived.
170 ///
171 /// The [FileSystemDataSource] will manage its own persistence synchronization.
172 pub async fn create(path: &Path, provider: P) -> anyhow::Result<Self> {
173 FileSystemDataSource::builder(FileSystemStorage::create(path).await?, provider)
174 .build()
175 .await
176 }
177
178 /// Open an existing [FileSystemDataSource] from storage at `path`.
179 ///
180 /// If there is no data at `path`, a new store will be created.
181 ///
182 /// The [FileSystemDataSource] will manage its own persistence synchronization.
183 pub async fn open(path: &Path, provider: P) -> anyhow::Result<Self> {
184 FileSystemDataSource::builder(FileSystemStorage::open(path).await?, provider)
185 .build()
186 .await
187 }
188
189 /// Create a new [FileSystemDataSource] using a persistent storage loader.
190 ///
191 /// If there is existing data corresponding to the [FileSystemDataSource] data structures, it
192 /// will be archived.
193 ///
194 /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
195 /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
196 /// and managing synchronization of the store.
197 pub async fn create_with_store(
198 loader: &mut AtomicStoreLoader,
199 provider: P,
200 ) -> anyhow::Result<Self> {
201 FileSystemDataSource::builder(
202 FileSystemStorage::create_with_store(loader).await?,
203 provider,
204 )
205 .build()
206 .await
207 }
208
209 /// Open an existing [FileSystemDataSource] using a persistent storage loader.
210 ///
211 /// If there is no existing data corresponding to the [FileSystemDataSource] data structures, a
212 /// new store will be created.
213 ///
214 /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
215 /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
216 /// and managing synchronization of the store.
217 pub async fn open_with_store(
218 loader: &mut AtomicStoreLoader,
219 provider: P,
220 ) -> anyhow::Result<Self> {
221 FileSystemDataSource::builder(FileSystemStorage::open_with_store(loader).await?, provider)
222 .build()
223 .await
224 }
225
226 /// Advance the version of the persistent store without committing changes to persistent state.
227 ///
228 /// This function is useful when the [AtomicStore](atomic_store::AtomicStore) synchronizing
229 /// storage for this [FileSystemDataSource] is being managed by the caller. The caller may want
230 /// to persist some changes to other modules whose state is managed by the same
231 /// [AtomicStore](atomic_store::AtomicStore). In order to call
232 /// [AtomicStore::commit_version](atomic_store::AtomicStore::commit_version), the version of
233 /// this [FileSystemDataSource] must be advanced, either by [commit](super::Transaction::commit)
234 /// or, if there are no outstanding changes, [skip_version](Self::skip_version).
235 pub async fn skip_version(&self) -> anyhow::Result<()> {
236 self.as_ref().skip_version().await?;
237 Ok(())
238 }
239}
240
241#[cfg(any(test, feature = "testing"))]
242mod impl_testable_data_source {
243 use async_trait::async_trait;
244 use hotshot::types::Event;
245 use tempfile::TempDir;
246
247 use super::*;
248 use crate::{
249 data_source::UpdateDataSource,
250 testing::{consensus::DataSourceLifeCycle, mocks::MockTypes},
251 };
252
253 #[async_trait]
254 impl<P: AvailabilityProvider<MockTypes> + Default> DataSourceLifeCycle
255 for FileSystemDataSource<MockTypes, P>
256 {
257 type Storage = TempDir;
258
259 async fn create(node_id: usize) -> Self::Storage {
260 TempDir::with_prefix(format!("file_system_data_source_{node_id}")).unwrap()
261 }
262
263 async fn connect(storage: &Self::Storage) -> Self {
264 Self::open(storage.path(), Default::default())
265 .await
266 .unwrap()
267 }
268
269 async fn reset(storage: &Self::Storage) -> Self {
270 Self::create(storage.path(), Default::default())
271 .await
272 .unwrap()
273 }
274
275 async fn handle_event(&self, event: &Event<MockTypes>) {
276 self.update(event).await.unwrap();
277 }
278 }
279}
280
281#[cfg(test)]
282mod test {
283 use super::FileSystemDataSource;
284 // For some reason this is the only way to import the macro defined in another module of this
285 // crate.
286 use crate::*;
287 use crate::{fetching::provider::NoFetching, testing::mocks::MockTypes};
288
289 instantiate_data_source_tests!(FileSystemDataSource<MockTypes, NoFetching>);
290}