hotshot_query_service/data_source/sql.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "sql-data-source")]
14
15pub use anyhow::Error;
16use hotshot_types::traits::node_implementation::NodeType;
17pub use refinery::Migration;
18pub use sql::Transaction;
19
20use super::{
21 AvailabilityProvider, FetchingDataSource, fetching,
22 storage::sql::{self, SqlStorage, StorageConnectionType},
23};
24pub use crate::include_migrations;
25use crate::{
26 Header, Payload,
27 availability::{QueryableHeader, QueryablePayload},
28};
29
30pub type Builder<Types, Provider> = fetching::Builder<Types, SqlStorage, Provider>;
31
32pub type Config = sql::Config;
33
34impl Config {
35 /// Connect to the database with this config.
36 pub async fn connect<Types, P: AvailabilityProvider<Types>>(
37 self,
38 provider: P,
39 ) -> Result<SqlDataSource<Types, P>, Error>
40 where
41 Types: NodeType,
42 Header<Types>: QueryableHeader<Types>,
43 Payload<Types>: QueryablePayload<Types>,
44 {
45 self.builder(provider).await?.build().await
46 }
47
48 /// Connect to the database, setting options on the underlying [`FetchingDataSource`] using the
49 /// [`fetching::Builder`] interface.
50 pub async fn builder<Types, P: AvailabilityProvider<Types>>(
51 self,
52 provider: P,
53 ) -> Result<Builder<Types, P>, Error>
54 where
55 Types: NodeType,
56 Header<Types>: QueryableHeader<Types>,
57 Payload<Types>: QueryablePayload<Types>,
58 {
59 SqlDataSource::connect(self, provider).await
60 }
61}
62
63/// A data source for the APIs provided in this crate, backed by a remote PostgreSQL database.
64///
65/// # Administration
66///
67/// This data source will automatically connect to and perform queries on a remote SQL database.
68/// However, _administration_ of the database, such as initialization, resetting, and backups, is
69/// left out of the scope of this implementation, and is expected to be performed manually using
70/// off-the-shelf DBMS administration tools. The one exception is migrations, which are handled
71/// transparently by the [`SqlDataSource`].
72///
73/// ## Schema
74///
75/// All the objects created and used by [`SqlDataSource`] are grouped under a schema for easy
76/// management. By default, the schema is named `hotshot`, and is created the first time a
77/// [`SqlDataSource`] is constructed. The name of the schema can be configured by setting
78/// [`Config::schema`].
79///
80/// ## Initialization
81///
82/// When creating a PostgreSQL [`SqlDataSource`], the caller can use [`Config`] to specify the host, user, and
83/// database for the connection. If the `embedded-db` feature is enabled, the caller can instead specify the
84/// file path for an SQLite database.
85/// As such, [`SqlDataSource`] is not very opinionated about how the
86/// database instance is set up. The administrator must simply ensure that there is a database
87/// dedicated to the [`SqlDataSource`] and a user with appropriate permissions (all on `SCHEMA` and
88/// all on `DATABASE`) over that database.
89///
90/// Here is an example of how a sufficient database could be initialized. When using the standard
91/// `postgres` Docker image, these statements could be placed in
92/// `/docker-entrypoint-initdb.d/init.sql` to automatically initialize the database upon startup.
93///
94/// ```sql
95/// CREATE DATABASE hotshot_query_service;
96/// \connect hotshot_query_service;
97/// CREATE USER hotshot_user WITH PASSWORD 'password';
98/// GRANT ALL ON SCHEMA public TO hotshot_user;
99/// GRANT ALL ON DATABASE hotshot_query_service TO hotshot_user WITH GRANT OPTION;
100/// ```
101///
102/// For SQLite, simply provide the file path, and the file will be created if it does not already exist.
103///
104/// One could then connect to this database with the following [`Config`] for postgres:
105///
106/// ```
107/// # use hotshot_query_service::data_source::sql::Config;
108/// #[cfg(not(feature= "embedded-db"))]
109/// Config::default()
110/// .host("postgres.database.hostname")
111/// .database("hotshot_query_service")
112/// .user("hotshot_user")
113/// .password("password")
114/// # ;
115/// ```
116/// Or, if the `embedded-db` feature is enabled, configure it as follows for SQLite:
117///
118/// ```
119/// # use hotshot_query_service::data_source::sql::Config;
120/// #[cfg(feature= "embedded-db")]
121/// Config::default()
122/// .db_path("temp.db".into())
123/// # ;
124/// ```
125/// ## Resetting
126///
127/// In general, resetting the database when necessary is left up to the administrator. However, for
128/// convenience, we do provide a [`reset_schema`](Config::reset_schema) option which can be used to
129/// wipe out existing state and create a fresh instance of the query service. This is particularly
130/// useful for development and staging environments. This function will permanently delete all
131/// tables associated with the schema used by this query service, but will not reset other schemas
132/// or database.
133///
134/// ## Migrations
135///
136/// For the [`SqlDataSource`] to work, the database must be initialized with the appropriate schema,
137/// and the schema must be kept up to date when deploying a new version of this software which
138/// depends on a different schema. Both of these tasks are accomplished via _migrations_.
139///
140/// Each release of this software is bundled with a sequence of migration files: one migration for
141/// each release that changed the schema, including the latest one. Replaying these SQL files
142/// against a database with an older version of the schema, including a completely empty database,
143/// will bring it up to date with the schema required by this version of the software. Upon creating
144/// an instance of [`SqlDataSource`] and connecting to a database, the data source will
145/// automatically fetch the current version from the database and, if it is old, replay the
146/// necessary migration files.
147///
148/// ## Custom Migrations
149///
150/// In keeping with the philosophy of this crate, [`SqlDataSource`] is designed to be
151/// [extensible and composable](#extension-and-composition). When extending the provided APIs with
152/// new, application-specific queries, it will often be desirable to alter the schema of the
153/// database in some way, such as adding additional columns to some of the tables or creating new
154/// indices. When composing the provided APIs with additional API modules, it may also be desirable
155/// to alter the schema, although the changes are more likely to be completely independent of the
156/// schema used by this data source, such as adding entirely new tables.
157///
158/// In either case, the default schema can be modified by inserting additional migrations between
159/// the migrations distributed with this crate. The new migrations will then automatically be
160/// replayed as necessary when initializing a [`SqlDataSource`]. New custom migrations can be
161/// added with each software update, to keep the custom data up to date as the default schema
162/// changes.
163///
164/// Custom migrations can be inserted using [`Config::migrations`]. Each custom migration will be
165/// inserted into the overall sequence of migrations in order of version number. The migrations
166/// provided by this crate only use version numbers which are multiples of 100, so the non-multiples
167/// can be used to insert custom migrations between the default migrations. You can also replace a
168/// default migration completely by providing a custom migration with the same version number. This
169/// may be useful when an earlier custom migration has altered the schema in such a way that a later
170/// migration no longer works as-is. However, this technique is error prone and should be used only
171/// when necessary.
172///
173/// When using custom migrations, it is the user's responsibility to ensure that the resulting
174/// schema is compatible with the schema expected by [`SqlDataSource`]. Adding things (tables,
175/// columns, indices) should usually be safe. Removing, altering, or renaming things should be done
176/// with extreme caution.
177///
178/// It is standard to store custom migrations as SQL files in a sub-directory of the crate. For ease
179/// of release and deployment, such directories can be embedded into a Rust binary and parsed into
180/// a list of [`Migration`] objects using the [`include_migrations`] macro.
181///
182/// It is also possible to take complete control over migrating the schema using
183/// [`Config::no_migrations`] to prevent the [`SqlDataSource`] from running its own migrations. The
184/// database administrator then becomes responsible for manually migrating the database, ensuring the
185/// schema is up to date, and ensuring that the schema is at all times compatible with the schema
186/// expected by the current version of this software. Nevertheless, this may be the best option when
187/// your application-specific schema has diverged significantly from the default schema.
188///
189/// # Synchronization
190///
191/// [`SqlDataSource`] implements [`VersionedDataSource`](super::VersionedDataSource), which means
192/// changes are applied to the underlying database via transactions. [`Transaction`] maps exactly to
193/// a transaction in the underling RDBMS, and inherits the underlying concurrency semantics.
194///
195/// # Extension and Composition
196///
197/// [`SqlDataSource`] is designed to be both extensible (so you can add additional state to the API
198/// modules defined in this crate) and composable (so you can use [`SqlDataSource`] as one component
199/// of a larger state type for an application with additional modules).
200///
201/// ## Extension
202///
203/// It is possible to add additional, application-specific state to [`SqlDataSource`]. If the new
204/// state should live in memory, simply wrap the [`SqlDataSource`] in an
205/// [`ExtensibleDataSource`](super::ExtensibleDataSource):
206///
207/// ```
208/// # use hotshot_query_service::data_source::{
209/// # sql::{Config, Error}, ExtensibleDataSource, SqlDataSource,
210/// # };
211/// # use hotshot_query_service::fetching::provider::NoFetching;
212/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
213/// # async fn doc(config: Config) -> Result<(), Error> {
214/// type AppState = &'static str;
215///
216/// let data_source: ExtensibleDataSource<SqlDataSource<AppTypes, NoFetching>, AppState> =
217/// ExtensibleDataSource::new(config.connect(NoFetching).await?, "app state");
218/// # Ok(())
219/// # }
220/// ```
221///
222/// The [`ExtensibleDataSource`](super::ExtensibleDataSource) wrapper implements all the same data
223/// source traits as [`SqlDataSource`], and also provides access to the `AppState` parameter for use
224/// in API endpoint handlers. This can be used to implement an app-specific data source trait and
225/// add a new API endpoint that uses this app-specific data, as described in the
226/// [extension guide](crate#extension).
227///
228/// If the new application-specific state should live in the SQL database itself, the implementation
229/// is more involved, but still possible. Follow the steps for [custom
230/// migrations](#custom-migrations) to modify the database schema to account for the new data you
231/// want to store. You can then access this data through the [`SqlDataSource`] using
232/// [`read`](super::VersionedDataSource::read) to run a custom read-only SQL query or
233/// [`write`](super::VersionedDataSource::write) to execute a custom atomic mutation of the
234/// database.
235///
236/// ## Composition
237///
238/// Composing [`SqlDataSource`] with other module states is fairly simple -- just
239/// create an aggregate struct containing both [`SqlDataSource`] and your additional module
240/// states, as described in the [composition guide](crate#composition). If the additional modules
241/// have data that should live in the same database as the [`SqlDataSource`] data, you can follow
242/// the steps in [custom migrations](#custom-migrations) to accommodate this.
243///
244/// ```
245/// # use futures::StreamExt;
246/// # use hotshot::types::SystemContextHandle;
247/// # use hotshot_query_service::Error;
248/// # use hotshot_query_service::data_source::{
249/// # sql::Config, Transaction, SqlDataSource, UpdateDataSource, VersionedDataSource,
250/// # };
251/// # use hotshot_query_service::fetching::provider::NoFetching;
252/// # use hotshot_query_service::testing::mocks::{
253/// # MockNodeImpl as AppNodeImpl, MockTypes as AppTypes, MockVersions as AppVersions
254/// # };
255/// # use hotshot_example_types::node_types::TestVersions;
256/// # use std::sync::Arc;
257/// # use tide_disco::App;
258/// # use tokio::spawn;
259/// # use vbs::version::StaticVersionType;
260/// struct AppState {
261/// hotshot_qs: SqlDataSource<AppTypes, NoFetching>,
262/// // additional state for other modules
263/// }
264///
265/// async fn init_server<Ver: StaticVersionType + 'static>(
266/// config: Config,
267/// hotshot: SystemContextHandle<AppTypes, AppNodeImpl, AppVersions>,
268/// ) -> anyhow::Result<App<Arc<AppState>, Error>> {
269/// let mut hotshot_qs = config.connect(NoFetching).await?;
270/// // Initialize storage for other modules, using `hotshot_qs` to access the database.
271/// let tx = hotshot_qs.write().await?;
272/// // ...
273/// tx.commit().await?;
274///
275/// let state = Arc::new(AppState {
276/// hotshot_qs,
277/// // additional state for other modules
278/// });
279/// let mut app = App::with_state(state.clone());
280/// // Register API modules.
281///
282/// spawn(async move {
283/// let mut events = hotshot.event_stream();
284/// while let Some(event) = events.next().await {
285/// if state.hotshot_qs.update(&event).await.is_err() {
286/// continue;
287/// }
288///
289/// let mut tx = state.hotshot_qs.write().await.unwrap();
290/// // Update other modules' states based on `event`, using `tx` to access the database.
291/// tx.commit().await.unwrap();
292/// }
293/// });
294///
295/// Ok(app)
296/// }
297/// ```
298pub type SqlDataSource<Types, P> = FetchingDataSource<Types, SqlStorage, P>;
299
300impl<Types, P: AvailabilityProvider<Types>> SqlDataSource<Types, P>
301where
302 Types: NodeType,
303 Header<Types>: QueryableHeader<Types>,
304 Payload<Types>: QueryablePayload<Types>,
305{
306 /// Connect to a remote database.
307 ///
308 /// This function returns a [`fetching::Builder`] which can be used to set options on the
309 /// underlying [`FetchingDataSource`], before constructing the [`SqlDataSource`] with
310 /// [`build`](fetching::Builder::build). For a convenient constructor that uses the default
311 /// fetching options, see [`Config::connect`].
312 pub async fn connect(config: Config, provider: P) -> Result<Builder<Types, P>, Error> {
313 Ok(Self::builder(
314 SqlStorage::connect(config, StorageConnectionType::Query).await?,
315 provider,
316 ))
317 }
318}
319
320// These tests run the `postgres` Docker image, which doesn't work on Windows.
321#[cfg(all(any(test, feature = "testing"), not(target_os = "windows")))]
322pub mod testing {
323 use async_trait::async_trait;
324 use hotshot::types::Event;
325 pub use sql::testing::TmpDb;
326
327 use super::*;
328 use crate::{
329 data_source::UpdateDataSource,
330 testing::{consensus::DataSourceLifeCycle, mocks::MockTypes},
331 };
332
333 #[async_trait]
334 impl<P: AvailabilityProvider<MockTypes> + Default> DataSourceLifeCycle
335 for SqlDataSource<MockTypes, P>
336 {
337 type Storage = TmpDb;
338
339 async fn create(_node_id: usize) -> Self::Storage {
340 TmpDb::init().await
341 }
342
343 async fn connect(tmp_db: &Self::Storage) -> Self {
344 tmp_db.config().connect(Default::default()).await.unwrap()
345 }
346
347 async fn reset(tmp_db: &Self::Storage) -> Self {
348 tmp_db
349 .config()
350 .reset_schema()
351 .connect(Default::default())
352 .await
353 .unwrap()
354 }
355
356 async fn leaf_only_ds(tmp_db: &Self::Storage) -> Self {
357 let config = tmp_db.config();
358 let builder = config.builder(Default::default()).await.unwrap();
359
360 builder
361 .leaf_only()
362 .build()
363 .await
364 .expect("failed to build leaf only sql ds")
365 }
366
367 async fn handle_event(&self, event: &Event<MockTypes>) {
368 self.update(event).await.unwrap();
369 }
370 }
371}
372
373// These tests run the `postgres` Docker image, which doesn't work on Windows.
374#[cfg(all(test, not(target_os = "windows")))]
375mod generic_test {
376 use super::SqlDataSource;
377 // For some reason this is the only way to import the macro defined in another module of this
378 // crate.
379 use crate::*;
380 use crate::{fetching::provider::NoFetching, testing::mocks::MockTypes};
381
382 instantiate_data_source_tests!(SqlDataSource<MockTypes, NoFetching>);
383}
384
385#[cfg(all(test, not(target_os = "windows")))]
386mod test {
387 use futures::StreamExt;
388 use hotshot_example_types::{
389 node_types::TEST_VERSIONS,
390 state_types::{TestInstanceState, TestValidatedState},
391 };
392 use hotshot_types::{
393 data::{VidCommon, VidShare},
394 vid::advz::advz_scheme,
395 };
396 use jf_advz::VidScheme;
397
398 use super::*;
399 use crate::{
400 availability::{
401 AvailabilityDataSource, BlockInfo, BlockQueryData, LeafQueryData,
402 UpdateAvailabilityData, VidCommonQueryData,
403 },
404 data_source::{
405 Transaction, VersionedDataSource,
406 storage::{NodeStorage, UpdateAvailabilityStorage},
407 },
408 fetching::provider::NoFetching,
409 testing::{consensus::DataSourceLifeCycle, mocks::MockTypes},
410 };
411
412 type D = SqlDataSource<MockTypes, NoFetching>;
413
414 // This function should be generic, but the file system data source does not currently support
415 // storing VID common and later the corresponding share.
416 #[test_log::test(tokio::test(flavor = "multi_thread"))]
417 async fn test_vid_monotonicity() {
418 let storage = D::create(0).await;
419 let ds = <D as DataSourceLifeCycle>::connect(&storage).await;
420
421 // Generate some test VID data.
422 let disperse = advz_scheme(2).disperse([]).unwrap();
423
424 // Insert test data with VID common but no share.
425 let leaf = LeafQueryData::<MockTypes>::genesis(
426 &TestValidatedState::default(),
427 &TestInstanceState::default(),
428 TEST_VERSIONS.test,
429 )
430 .await;
431 let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
432 ds.append(BlockInfo::new(leaf, None, Some(common.clone()), None))
433 .await
434 .unwrap();
435
436 assert_eq!(ds.get_vid_common(0).await.await, common);
437 NodeStorage::<MockTypes>::vid_share(&mut ds.read().await.unwrap(), 0)
438 .await
439 .unwrap_err();
440
441 // Re-insert the common data with the share.
442 let share0 = VidShare::V0(disperse.shares[0].clone());
443 let mut tx = ds.write().await.unwrap();
444 tx.insert_vid(common.clone(), Some(share0.clone()))
445 .await
446 .unwrap();
447 tx.commit().await.unwrap();
448 assert_eq!(ds.get_vid_common(0).await.await, common);
449 assert_eq!(
450 NodeStorage::<MockTypes>::vid_share(&mut ds.read().await.unwrap(), 0)
451 .await
452 .unwrap(),
453 share0
454 );
455 }
456
457 /// Test subscription streams for identical payloads.
458 ///
459 /// The specific case that may be problematic is this:
460 /// * We get a `Decide` which is missing the payload and/or VID common
461 /// * _But_ we already have a block with an identical payload/VID common
462 /// * We call `store_and_notify` without the payload/VID common, meaning existing subscription
463 /// streams will not get notified
464 /// * We call `Fetcher::get` got fetch the missing data, but because we already have data in the
465 /// database that satisfies, the request, we do not spawn a fetch and notify
466 ///
467 /// This specifically affects subscription streams which are following the chain head. In all
468 /// other cases, we would try explicitly loading the next object in the stream from the database
469 /// before passively waiting for it, and this would succeed. Only in the case of a stream
470 /// following the head, do we rely on a passive notification without a corresponding explicit
471 /// fetch, even though the explicit fetch would have actually succeeded.
472 #[tokio::test]
473 #[test_log::test]
474 async fn test_subscribe_identical_payload() {
475 let storage = D::create(0).await;
476 let ds = <D as DataSourceLifeCycle>::connect(&storage).await;
477
478 let mut blocks = ds.subscribe_blocks(0).await;
479 let mut vid = ds.subscribe_vid_common(0).await;
480
481 // Send a decide event with all relevant data.
482 let mut leaf =
483 LeafQueryData::genesis(&Default::default(), &Default::default(), TEST_VERSIONS.test)
484 .await;
485 let block = BlockQueryData::genesis(
486 &Default::default(),
487 &Default::default(),
488 TEST_VERSIONS.test.base,
489 )
490 .await;
491 let common = VidCommonQueryData::genesis(
492 &Default::default(),
493 &Default::default(),
494 TEST_VERSIONS.test.base,
495 )
496 .await;
497 tracing::info!("first decide");
498 ds.append(BlockInfo::new(
499 leaf.clone(),
500 Some(block.clone()),
501 Some(common.clone()),
502 None,
503 ))
504 .await
505 .unwrap();
506
507 tracing::info!("waiting for first block and VID");
508 assert_eq!(blocks.next().await.unwrap(), block);
509 assert_eq!(vid.next().await.unwrap(), common);
510
511 // Send a decide event with only the next leaf.
512 leaf.leaf.block_header_mut().block_number += 1;
513 tracing::info!("second decide");
514 ds.append(leaf.clone().into()).await.unwrap();
515
516 tracing::info!("waiting for second block and VID");
517 assert_eq!(
518 blocks.next().await.unwrap(),
519 BlockQueryData::new(leaf.header().clone(), block.payload)
520 );
521 assert_eq!(
522 vid.next().await.unwrap(),
523 VidCommonQueryData::new(leaf.header().clone(), common.common)
524 );
525 }
526}