hotshot_query_service/data_source/
fs.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::path::Path;
16
17use atomic_store::AtomicStoreLoader;
18use hotshot_types::traits::node_implementation::NodeType;
19
20pub use super::storage::fs::Transaction;
21use super::{storage::FileSystemStorage, AvailabilityProvider, FetchingDataSource};
22use crate::{
23    availability::{query_data::QueryablePayload, QueryableHeader},
24    Header, Payload,
25};
26
27/// A data source for the APIs provided in this crate, backed by the local file system.
28///
29/// Synchronization and atomicity of persisted data structures are provided via [`atomic_store`].
30/// The methods [`commit`](super::Transaction::commit), [`revert`](super::Transaction::revert), and
31/// [`skip_version`](Self::skip_version) of this type and its associated [`Transaction`] type can be
32/// used to control synchronization in the underlying [`AtomicStore`](atomic_store::AtomicStore).
33///
34/// Note that because [`AtomicStore`](atomic_store::AtomicStore) only allows changes to be made to
35/// the underlying store, a [`Transaction`] takes full control of the whole store, and does not
36/// permit concurrent readers or other transactions while in flight. This is enforced internally via
37/// a global `RwLock`, and is a significant downside of this storage implementation, compared to the
38/// more relaxed concurrency semantics of a SQL implementation.
39///
40/// # Extension and Composition
41///
42/// [`FileSystemDataSource`] is designed to be both extensible (so you can add additional state to
43/// the API modules defined in this crate) and composable (so you can use [`FileSystemDataSource`]
44/// as one component of a larger state type for an application with additional modules).
45///
46/// ## Extension
47///
48/// Adding additional, application-specific state to [`FileSystemDataSource`] is possible by
49/// wrapping it in [`ExtensibleDataSource`](super::ExtensibleDataSource):
50///
51/// ```
52/// # use hotshot_query_service::data_source::{ExtensibleDataSource, FileSystemDataSource};
53/// # use hotshot_query_service::fetching::provider::NoFetching;
54/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
55/// # use std::path::Path;
56/// # async fn doc(storage_path: &Path) -> Result<(), anyhow::Error> {
57/// type AppState = &'static str;
58///
59/// let data_source: ExtensibleDataSource<FileSystemDataSource<AppTypes, NoFetching>, AppState> =
60///     ExtensibleDataSource::new(FileSystemDataSource::create(storage_path, NoFetching).await?, "app state");
61/// # Ok(())
62/// # }
63/// ```
64///
65/// The [`ExtensibleDataSource`](super::ExtensibleDataSource) wrapper implements all the same data
66/// source traits as [`FileSystemDataSource`], and also provides access to the `AppState` parameter
67/// for use in API endpoint handlers. This can be used to implement an app-specific data source
68/// trait and add a new API endpoint that uses this app-specific data, as described in the
69/// [extension guide](crate#extension).
70///
71/// ## Composition
72///
73/// Composing [`FileSystemDataSource`] with other module states is in principle simple -- just
74/// create an aggregate struct containing both [`FileSystemDataSource`] and your additional module
75/// states. A complication arises from how persistent storage is managed: if other modules have
76/// their own persistent state, should the storage of [`FileSystemDataSource`] and the other modules
77/// be completely independent, or synchronized under the control of a single
78/// [`AtomicStore`](atomic_store::AtomicStore)? [`FileSystemDataSource`] supports both patterns:
79/// when you create it with [`create`](Self::create) or [`open`](Self::open), it will open its own
80/// [`AtomicStore`](atomic_store::AtomicStore) and manage the synchronization of its own storage,
81/// independent of any other persistent data it might be composed with. But when you create it with
82/// [`create_with_store`](Self::create_with_store) or [`open_with_store`](Self::open_with_store),
83/// you may ask it to register its persistent data structures with an existing
84/// [`AtomicStoreLoader`]. If you register other modules' persistent data structures with the same
85/// loader, you can create one [`AtomicStore`](atomic_store::AtomicStore) that synchronizes all the
86/// persistent data. Note, though, that when you choose to use
87/// [`create_with_store`](Self::create_with_store) or [`open_with_store`](Self::open_with_store),
88/// you become responsible for ensuring that calls to
89/// [`AtomicStore::commit_version`](atomic_store::AtomicStore::commit_version) alternate with calls
90/// to [`commit`](super::Transaction::commit) or [`skip_version`](Self::skip_version).
91///
92/// In the following example, we compose HotShot query service modules with other application-
93/// specific modules, using a single top-level [`AtomicStore`](atomic_store::AtomicStore) to
94/// synchronize all persistent storage.
95///
96/// ```
97/// # use atomic_store::{AtomicStore, AtomicStoreLoader};
98/// # use futures::StreamExt;
99/// # use hotshot::types::SystemContextHandle;
100/// # use hotshot_query_service::Error;
101/// # use hotshot_query_service::data_source::{
102/// #   FileSystemDataSource, Transaction, UpdateDataSource, VersionedDataSource,
103/// # };
104/// # use hotshot_query_service::fetching::provider::NoFetching;
105/// # use hotshot_query_service::testing::mocks::{
106/// #   MockNodeImpl as AppNodeImpl, MockTypes as AppTypes, MockVersions as AppVersions
107/// # };
108/// # use hotshot_example_types::node_types::TestVersions;
109/// # use std::{path::Path, sync::Arc};
110/// # use tide_disco::App;
111/// # use tokio::{spawn, sync::RwLock};
112/// # use vbs::version::StaticVersionType;
113/// struct AppState {
114///     // Top-level storage coordinator
115///     store: AtomicStore,
116///     hotshot_qs: FileSystemDataSource<AppTypes, NoFetching>,
117///     // additional state for other modules
118/// }
119///
120/// async fn init_server<Ver: StaticVersionType + 'static>(
121///     storage_path: &Path,
122///     hotshot: SystemContextHandle<AppTypes, AppNodeImpl, AppVersions>,
123/// ) -> anyhow::Result<App<Arc<RwLock<AppState>>, Error>> {
124///     let mut loader = AtomicStoreLoader::create(storage_path, "my_app")?; // or `open`
125///     let hotshot_qs = FileSystemDataSource::create_with_store(&mut loader, NoFetching)
126///         .await?;
127///     // Initialize storage for other modules using the same loader.
128///
129///     let store = AtomicStore::open(loader)?;
130///     let state = Arc::new(RwLock::new(AppState {
131///         store,
132///         hotshot_qs,
133///         // additional state for other modules
134///     }));
135///     let mut app = App::with_state(state.clone());
136///     // Register API modules.
137///
138///     spawn(async move {
139///         let mut events = hotshot.event_stream();
140///         while let Some(event) = events.next().await {
141///             let mut state = state.write().await;
142///             if state.hotshot_qs.update(&event).await.is_err() {
143///                 continue;
144///             }
145///
146///             // Update other modules' states based on `event`.
147///             let mut tx = state.hotshot_qs.write().await.unwrap();
148///             // Do updates
149///             tx.commit().await.unwrap();
150///
151///             // Commit or skip versions for other modules' storage.
152///             state.store.commit_version().unwrap();
153///         }
154///     });
155///
156///     Ok(app)
157/// }
158/// ```
159pub type FileSystemDataSource<Types, P> = FetchingDataSource<Types, FileSystemStorage<Types>, P>;
160
161impl<Types: NodeType, P> FileSystemDataSource<Types, P>
162where
163    Payload<Types>: QueryablePayload<Types>,
164    Header<Types>: QueryableHeader<Types>,
165    P: AvailabilityProvider<Types>,
166{
167    /// Create a new [FileSystemDataSource] with storage at `path`.
168    ///
169    /// If there is already data at `path`, it will be archived.
170    ///
171    /// The [FileSystemDataSource] will manage its own persistence synchronization.
172    pub async fn create(path: &Path, provider: P) -> anyhow::Result<Self> {
173        FileSystemDataSource::builder(FileSystemStorage::create(path).await?, provider)
174            .build()
175            .await
176    }
177
178    /// Open an existing [FileSystemDataSource] from storage at `path`.
179    ///
180    /// If there is no data at `path`, a new store will be created.
181    ///
182    /// The [FileSystemDataSource] will manage its own persistence synchronization.
183    pub async fn open(path: &Path, provider: P) -> anyhow::Result<Self> {
184        FileSystemDataSource::builder(FileSystemStorage::open(path).await?, provider)
185            .build()
186            .await
187    }
188
189    /// Create a new [FileSystemDataSource] using a persistent storage loader.
190    ///
191    /// If there is existing data corresponding to the [FileSystemDataSource] data structures, it
192    /// will be archived.
193    ///
194    /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
195    /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
196    /// and managing synchronization of the store.
197    pub async fn create_with_store(
198        loader: &mut AtomicStoreLoader,
199        provider: P,
200    ) -> anyhow::Result<Self> {
201        FileSystemDataSource::builder(
202            FileSystemStorage::create_with_store(loader).await?,
203            provider,
204        )
205        .build()
206        .await
207    }
208
209    /// Open an existing [FileSystemDataSource] using a persistent storage loader.
210    ///
211    /// If there is no existing data corresponding to the [FileSystemDataSource] data structures, a
212    /// new store will be created.
213    ///
214    /// The [FileSystemDataSource] will register its persistent data structures with `loader`. The
215    /// caller is responsible for creating an [AtomicStore](atomic_store::AtomicStore) from `loader`
216    /// and managing synchronization of the store.
217    pub async fn open_with_store(
218        loader: &mut AtomicStoreLoader,
219        provider: P,
220    ) -> anyhow::Result<Self> {
221        FileSystemDataSource::builder(FileSystemStorage::open_with_store(loader).await?, provider)
222            .build()
223            .await
224    }
225
226    /// Advance the version of the persistent store without committing changes to persistent state.
227    ///
228    /// This function is useful when the [AtomicStore](atomic_store::AtomicStore) synchronizing
229    /// storage for this [FileSystemDataSource] is being managed by the caller. The caller may want
230    /// to persist some changes to other modules whose state is managed by the same
231    /// [AtomicStore](atomic_store::AtomicStore). In order to call
232    /// [AtomicStore::commit_version](atomic_store::AtomicStore::commit_version), the version of
233    /// this [FileSystemDataSource] must be advanced, either by [commit](super::Transaction::commit)
234    /// or, if there are no outstanding changes, [skip_version](Self::skip_version).
235    pub async fn skip_version(&self) -> anyhow::Result<()> {
236        self.as_ref().skip_version().await?;
237        Ok(())
238    }
239}
240
241#[cfg(any(test, feature = "testing"))]
242mod impl_testable_data_source {
243    use async_trait::async_trait;
244    use hotshot::types::Event;
245    use tempfile::TempDir;
246
247    use super::*;
248    use crate::{
249        data_source::UpdateDataSource,
250        testing::{consensus::DataSourceLifeCycle, mocks::MockTypes},
251    };
252
253    #[async_trait]
254    impl<P: AvailabilityProvider<MockTypes> + Default> DataSourceLifeCycle
255        for FileSystemDataSource<MockTypes, P>
256    {
257        type Storage = TempDir;
258
259        async fn create(node_id: usize) -> Self::Storage {
260            TempDir::with_prefix(format!("file_system_data_source_{node_id}")).unwrap()
261        }
262
263        async fn connect(storage: &Self::Storage) -> Self {
264            Self::open(storage.path(), Default::default())
265                .await
266                .unwrap()
267        }
268
269        async fn reset(storage: &Self::Storage) -> Self {
270            Self::create(storage.path(), Default::default())
271                .await
272                .unwrap()
273        }
274
275        async fn handle_event(&self, event: &Event<MockTypes>) {
276            self.update(event).await.unwrap();
277        }
278    }
279}
280
281#[cfg(test)]
282mod test {
283    use super::FileSystemDataSource;
284    // For some reason this is the only way to import the macro defined in another module of this
285    // crate.
286    use crate::*;
287    use crate::{fetching::provider::NoFetching, testing::mocks::MockTypes};
288
289    instantiate_data_source_tests!(FileSystemDataSource<MockTypes, NoFetching>);
290}