hotshot_query_service/data_source/storage/sql/
migrate.rs1use async_trait::async_trait;
2use derive_more::From;
3use futures::stream::StreamExt;
4use refinery_core::{
5 traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction},
6 Migration,
7};
8use sqlx::{pool::PoolConnection, Acquire, Executor, Row};
9use time::{format_description::well_known::Rfc3339, OffsetDateTime};
10
11use super::{queries::DecodeError, Db};
12
13#[derive(Debug, From)]
21pub(super) struct Migrator<'a> {
22 conn: &'a mut PoolConnection<Db>,
23}
24
25#[async_trait]
26impl AsyncTransaction for Migrator<'_> {
27 type Error = sqlx::Error;
28
29 async fn execute(&mut self, queries: &[&str]) -> sqlx::Result<usize> {
30 let mut tx = self.conn.begin().await?;
31 let mut count = 0;
32 for query in queries {
33 let res = tx.execute(*query).await?;
34 count += res.rows_affected();
35 }
36 tx.commit().await?;
37 Ok(count as usize)
38 }
39}
40
41#[async_trait]
42impl AsyncQuery<Vec<Migration>> for Migrator<'_> {
43 async fn query(&mut self, query: &str) -> sqlx::Result<Vec<Migration>> {
44 let mut tx = self.conn.begin().await?;
45
46 let mut applied = Vec::new();
47 let mut rows = tx.fetch(query);
48 while let Some(row) = rows.next().await {
49 let row = row?;
50 let version = row.try_get(0)?;
51 let applied_on: String = row.try_get(2)?;
52 let applied_on = OffsetDateTime::parse(&applied_on, &Rfc3339)
53 .decode_error("malformed migration timestamp")?;
54 let checksum: String = row.get(3);
55
56 applied.push(Migration::applied(
57 version,
58 row.try_get(1)?,
59 applied_on,
60 checksum
61 .parse::<u64>()
62 .decode_error("malformed migration checksum")?,
63 ));
64 }
65
66 drop(rows);
67 tx.commit().await?;
68 Ok(applied)
69 }
70}
71
72impl AsyncMigrate for Migrator<'_> {}