hotshot_query_service/data_source/storage/sql/
migrate.rs

1use async_trait::async_trait;
2use derive_more::From;
3use futures::stream::StreamExt;
4use refinery_core::{
5    traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction},
6    Migration,
7};
8use sqlx::{pool::PoolConnection, Acquire, Executor, Row};
9use time::{format_description::well_known::Rfc3339, OffsetDateTime};
10
11use super::{queries::DecodeError, Db};
12
13/// Run migrations using a sqlx connection.
14///
15/// While SQLx has its own built-in migration functionality, we use Refinery, and alas we must
16/// support existing deployed databases which are already using Refinery to handle migrations.
17/// Rather than implement a tricky "migration of the migrations table", or supporting separate
18/// migrations interfaces for databases deployed before and after the switch to SQLx, we continue
19/// using Refinery. This wrapper implements the Refinery traits for SQLx types.
20#[derive(Debug, From)]
21pub(super) struct Migrator<'a> {
22    conn: &'a mut PoolConnection<Db>,
23}
24
25#[async_trait]
26impl AsyncTransaction for Migrator<'_> {
27    type Error = sqlx::Error;
28
29    async fn execute(&mut self, queries: &[&str]) -> sqlx::Result<usize> {
30        let mut tx = self.conn.begin().await?;
31        let mut count = 0;
32        for query in queries {
33            let res = tx.execute(*query).await?;
34            count += res.rows_affected();
35        }
36        tx.commit().await?;
37        Ok(count as usize)
38    }
39}
40
41#[async_trait]
42impl AsyncQuery<Vec<Migration>> for Migrator<'_> {
43    async fn query(&mut self, query: &str) -> sqlx::Result<Vec<Migration>> {
44        let mut tx = self.conn.begin().await?;
45
46        let mut applied = Vec::new();
47        let mut rows = tx.fetch(query);
48        while let Some(row) = rows.next().await {
49            let row = row?;
50            let version = row.try_get(0)?;
51            let applied_on: String = row.try_get(2)?;
52            let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339)
53                .decode_error("malformed migration timestamp")?;
54            let checksum: String = row.get(3);
55
56            applied.push(Migration::applied(
57                version,
58                row.try_get(1)?,
59                applied_on,
60                checksum
61                    .parse::<u64>()
62                    .decode_error("malformed migration checksum")?,
63            ));
64        }
65
66        drop(rows);
67        tx.commit().await?;
68        Ok(applied)
69    }
70}
71
72impl AsyncMigrate for Migrator<'_> {}