hotshot_query_service/data_source/
sql.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "sql-data-source")]
14
15pub use anyhow::Error;
16use hotshot_types::traits::node_implementation::NodeType;
17pub use refinery::Migration;
18pub use sql::Transaction;
19
20use super::{
21    AvailabilityProvider, FetchingDataSource, fetching,
22    storage::sql::{self, SqlStorage, StorageConnectionType},
23};
24pub use crate::include_migrations;
25use crate::{
26    Header, Payload,
27    availability::{QueryableHeader, QueryablePayload},
28};
29
30pub type Builder<Types, Provider> = fetching::Builder<Types, SqlStorage, Provider>;
31
32pub type Config = sql::Config;
33
34impl Config {
35    /// Connect to the database with this config.
36    pub async fn connect<Types, P: AvailabilityProvider<Types>>(
37        self,
38        provider: P,
39    ) -> Result<SqlDataSource<Types, P>, Error>
40    where
41        Types: NodeType,
42        Header<Types>: QueryableHeader<Types>,
43        Payload<Types>: QueryablePayload<Types>,
44    {
45        self.builder(provider).await?.build().await
46    }
47
48    /// Connect to the database, setting options on the underlying [`FetchingDataSource`] using the
49    /// [`fetching::Builder`] interface.
50    pub async fn builder<Types, P: AvailabilityProvider<Types>>(
51        self,
52        provider: P,
53    ) -> Result<Builder<Types, P>, Error>
54    where
55        Types: NodeType,
56        Header<Types>: QueryableHeader<Types>,
57        Payload<Types>: QueryablePayload<Types>,
58    {
59        SqlDataSource::connect(self, provider).await
60    }
61}
62
63/// A data source for the APIs provided in this crate, backed by a remote PostgreSQL database.
64///
65/// # Administration
66///
67/// This data source will automatically connect to and perform queries on a remote SQL database.
68/// However, _administration_ of the database, such as initialization, resetting, and backups, is
69/// left out of the scope of this implementation, and is expected to be performed manually using
70/// off-the-shelf DBMS administration tools. The one exception is migrations, which are handled
71/// transparently by the [`SqlDataSource`].
72///
73/// ## Schema
74///
75/// All the objects created and used by [`SqlDataSource`] are grouped under a schema for easy
76/// management. By default, the schema is named `hotshot`, and is created the first time a
77/// [`SqlDataSource`] is constructed. The name of the schema can be configured by setting
78/// [`Config::schema`].
79///
80/// ## Initialization
81///
82/// When creating a PostgreSQL [`SqlDataSource`], the caller can use [`Config`] to specify the host, user, and
83/// database for the connection. If the `embedded-db` feature is enabled, the caller can instead specify the
84/// file path for an SQLite database.
85/// As such, [`SqlDataSource`] is not very opinionated about how the
86/// database instance is set up. The administrator must simply ensure that there is a database
87/// dedicated to the [`SqlDataSource`] and a user with appropriate permissions (all on `SCHEMA` and
88/// all on `DATABASE`) over that database.
89///
90/// Here is an example of how a sufficient database could be initialized. When using the standard
91/// `postgres` Docker image, these statements could be placed in
92/// `/docker-entrypoint-initdb.d/init.sql` to automatically initialize the database upon startup.
93///
94/// ```sql
95/// CREATE DATABASE hotshot_query_service;
96/// \connect hotshot_query_service;
97/// CREATE USER hotshot_user WITH PASSWORD 'password';
98/// GRANT ALL ON SCHEMA public TO hotshot_user;
99/// GRANT ALL ON DATABASE hotshot_query_service TO hotshot_user WITH GRANT OPTION;
100/// ```
101///
102/// For SQLite, simply provide the file path, and the file will be created if it does not already exist.
103///
104/// One could then connect to this database with the following [`Config`] for postgres:
105///
106/// ```
107/// # use hotshot_query_service::data_source::sql::Config;
108/// #[cfg(not(feature= "embedded-db"))]
109/// Config::default()
110///     .host("postgres.database.hostname")
111///     .database("hotshot_query_service")
112///     .user("hotshot_user")
113///     .password("password")
114/// # ;
115/// ```
116/// Or, if the `embedded-db` feature is enabled, configure it as follows for SQLite:
117///
118/// ```
119/// # use hotshot_query_service::data_source::sql::Config;
120/// #[cfg(feature= "embedded-db")]
121/// Config::default()
122///     .db_path("temp.db".into())
123/// # ;
124/// ```
125/// ## Resetting
126///
127/// In general, resetting the database when necessary is left up to the administrator. However, for
128/// convenience, we do provide a [`reset_schema`](Config::reset_schema) option which can be used to
129/// wipe out existing state and create a fresh instance of the query service. This is particularly
130/// useful for development and staging environments. This function will permanently delete all
131/// tables associated with the schema used by this query service, but will not reset other schemas
132/// or database.
133///
134/// ## Migrations
135///
136/// For the [`SqlDataSource`] to work, the database must be initialized with the appropriate schema,
137/// and the schema must be kept up to date when deploying a new version of this software which
138/// depends on a different schema. Both of these tasks are accomplished via _migrations_.
139///
140/// Each release of this software is bundled with a sequence of migration files: one migration for
141/// each release that changed the schema, including the latest one. Replaying these SQL files
142/// against a database with an older version of the schema, including a completely empty database,
143/// will bring it up to date with the schema required by this version of the software. Upon creating
144/// an instance of [`SqlDataSource`] and connecting to a database, the data source will
145/// automatically fetch the current version from the database and, if it is old, replay the
146/// necessary migration files.
147///
148/// ## Custom Migrations
149///
150/// In keeping with the philosophy of this crate, [`SqlDataSource`] is designed to be
151/// [extensible and composable](#extension-and-composition). When extending the provided APIs with
152/// new, application-specific queries, it will often be desirable to alter the schema of the
153/// database in some way, such as adding additional columns to some of the tables or creating new
154/// indices. When composing the provided APIs with additional API modules, it may also be desirable
155/// to alter the schema, although the changes are more likely to be completely independent of the
156/// schema used by this data source, such as adding entirely new tables.
157///
158/// In either case, the default schema can be modified by inserting additional migrations between
159/// the migrations distributed with this crate. The new migrations will then automatically be
160/// replayed as necessary when initializing a [`SqlDataSource`]. New custom migrations can be
161/// added with each software update, to keep the custom data up to date as the default schema
162/// changes.
163///
164/// Custom migrations can be inserted using [`Config::migrations`]. Each custom migration will be
165/// inserted into the overall sequence of migrations in order of version number. The migrations
166/// provided by this crate only use version numbers which are multiples of 100, so the non-multiples
167/// can be used to insert custom migrations between the default migrations. You can also replace a
168/// default migration completely by providing a custom migration with the same version number. This
169/// may be useful when an earlier custom migration has altered the schema in such a way that a later
170/// migration no longer works as-is. However, this technique is error prone and should be used only
171/// when necessary.
172///
173/// When using custom migrations, it is the user's responsibility to ensure that the resulting
174/// schema is compatible with the schema expected by [`SqlDataSource`]. Adding things (tables,
175/// columns, indices) should usually be safe. Removing, altering, or renaming things should be done
176/// with extreme caution.
177///
178/// It is standard to store custom migrations as SQL files in a sub-directory of the crate. For ease
179/// of release and deployment, such directories can be embedded into a Rust binary and parsed into
180/// a list of [`Migration`] objects using the [`include_migrations`] macro.
181///
182/// It is also possible to take complete control over migrating the schema using
183/// [`Config::no_migrations`] to prevent the [`SqlDataSource`] from running its own migrations. The
184/// database administrator then becomes responsible for manually migrating the database, ensuring the
185/// schema is up to date, and ensuring that the schema is at all times compatible with the schema
186/// expected by the current version of this software. Nevertheless, this may be the best option when
187/// your application-specific schema has diverged significantly from the default schema.
188///
189/// # Synchronization
190///
191/// [`SqlDataSource`] implements [`VersionedDataSource`](super::VersionedDataSource), which means
192/// changes are applied to the underlying database via transactions. [`Transaction`] maps exactly to
193/// a transaction in the underling RDBMS, and inherits the underlying concurrency semantics.
194///
195/// # Extension and Composition
196///
197/// [`SqlDataSource`] is designed to be both extensible (so you can add additional state to the API
198/// modules defined in this crate) and composable (so you can use [`SqlDataSource`] as one component
199/// of a larger state type for an application with additional modules).
200///
201/// ## Extension
202///
203/// It is possible to add additional, application-specific state to [`SqlDataSource`]. If the new
204/// state should live in memory, simply wrap the [`SqlDataSource`] in an
205/// [`ExtensibleDataSource`](super::ExtensibleDataSource):
206///
207/// ```
208/// # use hotshot_query_service::data_source::{
209/// #   sql::{Config, Error}, ExtensibleDataSource, SqlDataSource,
210/// # };
211/// # use hotshot_query_service::fetching::provider::NoFetching;
212/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
213/// # async fn doc(config: Config) -> Result<(), Error> {
214/// type AppState = &'static str;
215///
216/// let data_source: ExtensibleDataSource<SqlDataSource<AppTypes, NoFetching>, AppState> =
217///     ExtensibleDataSource::new(config.connect(NoFetching).await?, "app state");
218/// # Ok(())
219/// # }
220/// ```
221///
222/// The [`ExtensibleDataSource`](super::ExtensibleDataSource) wrapper implements all the same data
223/// source traits as [`SqlDataSource`], and also provides access to the `AppState` parameter for use
224/// in API endpoint handlers. This can be used to implement an app-specific data source trait and
225/// add a new API endpoint that uses this app-specific data, as described in the
226/// [extension guide](crate#extension).
227///
228/// If the new application-specific state should live in the SQL database itself, the implementation
229/// is more involved, but still possible. Follow the steps for [custom
230/// migrations](#custom-migrations) to modify the database schema to account for the new data you
231/// want to store. You can then access this data through the [`SqlDataSource`] using
232/// [`read`](super::VersionedDataSource::read) to run a custom read-only SQL query or
233/// [`write`](super::VersionedDataSource::write) to execute a custom atomic mutation of the
234/// database.
235///
236/// ## Composition
237///
238/// Composing [`SqlDataSource`] with other module states is fairly simple -- just
239/// create an aggregate struct containing both [`SqlDataSource`] and your additional module
240/// states, as described in the [composition guide](crate#composition). If the additional modules
241/// have data that should live in the same database as the [`SqlDataSource`] data, you can follow
242/// the steps in [custom migrations](#custom-migrations) to accommodate this.
243///
244/// ```
245/// # use futures::StreamExt;
246/// # use hotshot::types::SystemContextHandle;
247/// # use hotshot_query_service::Error;
248/// # use hotshot_query_service::data_source::{
249/// #   sql::Config, Transaction, SqlDataSource, UpdateDataSource, VersionedDataSource,
250/// # };
251/// # use hotshot_query_service::fetching::provider::NoFetching;
252/// # use hotshot_query_service::testing::mocks::{
253/// #   MockNodeImpl as AppNodeImpl, MockTypes as AppTypes, MockVersions as AppVersions
254/// # };
255/// # use hotshot_example_types::node_types::TestVersions;
256/// # use std::sync::Arc;
257/// # use tide_disco::App;
258/// # use tokio::spawn;
259/// # use vbs::version::StaticVersionType;
260/// struct AppState {
261///     hotshot_qs: SqlDataSource<AppTypes, NoFetching>,
262///     // additional state for other modules
263/// }
264///
265/// async fn init_server<Ver: StaticVersionType + 'static>(
266///     config: Config,
267///     hotshot: SystemContextHandle<AppTypes, AppNodeImpl, AppVersions>,
268/// ) -> anyhow::Result<App<Arc<AppState>, Error>> {
269///     let mut hotshot_qs = config.connect(NoFetching).await?;
270///     // Initialize storage for other modules, using `hotshot_qs` to access the database.
271///     let tx = hotshot_qs.write().await?;
272///     // ...
273///     tx.commit().await?;
274///
275///     let state = Arc::new(AppState {
276///         hotshot_qs,
277///         // additional state for other modules
278///     });
279///     let mut app = App::with_state(state.clone());
280///     // Register API modules.
281///
282///     spawn(async move {
283///         let mut events = hotshot.event_stream();
284///         while let Some(event) = events.next().await {
285///             if state.hotshot_qs.update(&event).await.is_err() {
286///                 continue;
287///             }
288///
289///             let mut tx = state.hotshot_qs.write().await.unwrap();
290///             // Update other modules' states based on `event`, using `tx` to access the database.
291///             tx.commit().await.unwrap();
292///         }
293///     });
294///
295///     Ok(app)
296/// }
297/// ```
298pub type SqlDataSource<Types, P> = FetchingDataSource<Types, SqlStorage, P>;
299
300impl<Types, P: AvailabilityProvider<Types>> SqlDataSource<Types, P>
301where
302    Types: NodeType,
303    Header<Types>: QueryableHeader<Types>,
304    Payload<Types>: QueryablePayload<Types>,
305{
306    /// Connect to a remote database.
307    ///
308    /// This function returns a [`fetching::Builder`] which can be used to set options on the
309    /// underlying [`FetchingDataSource`], before constructing the [`SqlDataSource`] with
310    /// [`build`](fetching::Builder::build). For a convenient constructor that uses the default
311    /// fetching options, see [`Config::connect`].
312    pub async fn connect(config: Config, provider: P) -> Result<Builder<Types, P>, Error> {
313        Ok(Self::builder(
314            SqlStorage::connect(config, StorageConnectionType::Query).await?,
315            provider,
316        ))
317    }
318}
319
320// These tests run the `postgres` Docker image, which doesn't work on Windows.
321#[cfg(all(any(test, feature = "testing"), not(target_os = "windows")))]
322pub mod testing {
323    use async_trait::async_trait;
324    use hotshot::types::Event;
325    pub use sql::testing::TmpDb;
326
327    use super::*;
328    use crate::{
329        data_source::UpdateDataSource,
330        testing::{consensus::DataSourceLifeCycle, mocks::MockTypes},
331    };
332
333    #[async_trait]
334    impl<P: AvailabilityProvider<MockTypes> + Default> DataSourceLifeCycle
335        for SqlDataSource<MockTypes, P>
336    {
337        type Storage = TmpDb;
338
339        async fn create(_node_id: usize) -> Self::Storage {
340            TmpDb::init().await
341        }
342
343        async fn connect(tmp_db: &Self::Storage) -> Self {
344            tmp_db.config().connect(Default::default()).await.unwrap()
345        }
346
347        async fn reset(tmp_db: &Self::Storage) -> Self {
348            tmp_db
349                .config()
350                .reset_schema()
351                .connect(Default::default())
352                .await
353                .unwrap()
354        }
355
356        async fn leaf_only_ds(tmp_db: &Self::Storage) -> Self {
357            let config = tmp_db.config();
358            let builder = config.builder(Default::default()).await.unwrap();
359
360            builder
361                .leaf_only()
362                .build()
363                .await
364                .expect("failed to build leaf only sql ds")
365        }
366
367        async fn handle_event(&self, event: &Event<MockTypes>) {
368            self.update(event).await.unwrap();
369        }
370    }
371}
372
373// These tests run the `postgres` Docker image, which doesn't work on Windows.
374#[cfg(all(test, not(target_os = "windows")))]
375mod generic_test {
376    use super::SqlDataSource;
377    // For some reason this is the only way to import the macro defined in another module of this
378    // crate.
379    use crate::*;
380    use crate::{fetching::provider::NoFetching, testing::mocks::MockTypes};
381
382    instantiate_data_source_tests!(SqlDataSource<MockTypes, NoFetching>);
383}
384
385#[cfg(all(test, not(target_os = "windows")))]
386mod test {
387    use futures::StreamExt;
388    use hotshot_example_types::{
389        node_types::TEST_VERSIONS,
390        state_types::{TestInstanceState, TestValidatedState},
391    };
392    use hotshot_types::{
393        data::{VidCommon, VidShare},
394        vid::advz::advz_scheme,
395    };
396    use jf_advz::VidScheme;
397
398    use super::*;
399    use crate::{
400        availability::{
401            AvailabilityDataSource, BlockInfo, BlockQueryData, LeafQueryData,
402            UpdateAvailabilityData, VidCommonQueryData,
403        },
404        data_source::{
405            Transaction, VersionedDataSource,
406            storage::{NodeStorage, UpdateAvailabilityStorage},
407        },
408        fetching::provider::NoFetching,
409        testing::{consensus::DataSourceLifeCycle, mocks::MockTypes},
410    };
411
412    type D = SqlDataSource<MockTypes, NoFetching>;
413
414    // This function should be generic, but the file system data source does not currently support
415    // storing VID common and later the corresponding share.
416    #[test_log::test(tokio::test(flavor = "multi_thread"))]
417    async fn test_vid_monotonicity() {
418        let storage = D::create(0).await;
419        let ds = <D as DataSourceLifeCycle>::connect(&storage).await;
420
421        // Generate some test VID data.
422        let disperse = advz_scheme(2).disperse([]).unwrap();
423
424        // Insert test data with VID common but no share.
425        let leaf = LeafQueryData::<MockTypes>::genesis(
426            &TestValidatedState::default(),
427            &TestInstanceState::default(),
428            TEST_VERSIONS.test,
429        )
430        .await;
431        let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
432        ds.append(BlockInfo::new(leaf, None, Some(common.clone()), None))
433            .await
434            .unwrap();
435
436        assert_eq!(ds.get_vid_common(0).await.await, common);
437        NodeStorage::<MockTypes>::vid_share(&mut ds.read().await.unwrap(), 0)
438            .await
439            .unwrap_err();
440
441        // Re-insert the common data with the share.
442        let share0 = VidShare::V0(disperse.shares[0].clone());
443        let mut tx = ds.write().await.unwrap();
444        tx.insert_vid(common.clone(), Some(share0.clone()))
445            .await
446            .unwrap();
447        tx.commit().await.unwrap();
448        assert_eq!(ds.get_vid_common(0).await.await, common);
449        assert_eq!(
450            NodeStorage::<MockTypes>::vid_share(&mut ds.read().await.unwrap(), 0)
451                .await
452                .unwrap(),
453            share0
454        );
455    }
456
457    /// Test subscription streams for identical payloads.
458    ///
459    /// The specific case that may be problematic is this:
460    /// * We get a `Decide` which is missing the payload and/or VID common
461    /// * _But_ we already have a block with an identical payload/VID common
462    /// * We call `store_and_notify` without the payload/VID common, meaning existing subscription
463    ///   streams will not get notified
464    /// * We call `Fetcher::get` got fetch the missing data, but because we already have data in the
465    ///   database that satisfies, the request, we do not spawn a fetch and notify
466    ///
467    /// This specifically affects subscription streams which are following the chain head. In all
468    /// other cases, we would try explicitly loading the next object in the stream from the database
469    /// before passively waiting for it, and this would succeed. Only in the case of a stream
470    /// following the head, do we rely on a passive notification without a corresponding explicit
471    /// fetch, even though the explicit fetch would have actually succeeded.
472    #[tokio::test]
473    #[test_log::test]
474    async fn test_subscribe_identical_payload() {
475        let storage = D::create(0).await;
476        let ds = <D as DataSourceLifeCycle>::connect(&storage).await;
477
478        let mut blocks = ds.subscribe_blocks(0).await;
479        let mut vid = ds.subscribe_vid_common(0).await;
480
481        // Send a decide event with all relevant data.
482        let mut leaf =
483            LeafQueryData::genesis(&Default::default(), &Default::default(), TEST_VERSIONS.test)
484                .await;
485        let block = BlockQueryData::genesis(
486            &Default::default(),
487            &Default::default(),
488            TEST_VERSIONS.test.base,
489        )
490        .await;
491        let common = VidCommonQueryData::genesis(
492            &Default::default(),
493            &Default::default(),
494            TEST_VERSIONS.test.base,
495        )
496        .await;
497        tracing::info!("first decide");
498        ds.append(BlockInfo::new(
499            leaf.clone(),
500            Some(block.clone()),
501            Some(common.clone()),
502            None,
503        ))
504        .await
505        .unwrap();
506
507        tracing::info!("waiting for first block and VID");
508        assert_eq!(blocks.next().await.unwrap(), block);
509        assert_eq!(vid.next().await.unwrap(), common);
510
511        // Send a decide event with only the next leaf.
512        leaf.leaf.block_header_mut().block_number += 1;
513        tracing::info!("second decide");
514        ds.append(leaf.clone().into()).await.unwrap();
515
516        tracing::info!("waiting for second block and VID");
517        assert_eq!(
518            blocks.next().await.unwrap(),
519            BlockQueryData::new(leaf.header().clone(), block.payload)
520        );
521        assert_eq!(
522            vid.next().await.unwrap(),
523            VidCommonQueryData::new(leaf.header().clone(), common.common)
524        );
525    }
526}