use std::str::FromStr;
use std::time::Duration;
use std::time::Instant;
use futures_core::future::BoxFuture;
pub(crate) use sqlx_core::migrate::MigrateError;
pub(crate) use sqlx_core::migrate::{AppliedMigration, Migration};
pub(crate) use sqlx_core::migrate::{Migrate, MigrateDatabase};
use crate::connection::{ConnectOptions, Connection};
use crate::error::Error;
use crate::executor::Executor;
use crate::query::query;
use crate::query_as::query_as;
use crate::query_scalar::query_scalar;
use crate::{PgConnectOptions, PgConnection, Postgres};
fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String), Error> {
let mut options = PgConnectOptions::from_str(url)?;
let database = options
.database
.as_deref()
.unwrap_or(&options.username)
.to_owned();
options.database = if database == "postgres" {
Some("template1".into())
} else {
Some("postgres".into())
};
Ok((options, database))
}
impl MigrateDatabase for Postgres {
fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let (options, database) = parse_for_maintenance(url)?;
let mut conn = options.connect().await?;
let _ = conn
.execute(&*format!(
"CREATE DATABASE \"{}\"",
database.replace('"', "\"\"")
))
.await?;
Ok(())
})
}
fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
Box::pin(async move {
let (options, database) = parse_for_maintenance(url)?;
let mut conn = options.connect().await?;
let exists: bool =
query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)")
.bind(database)
.fetch_one(&mut conn)
.await?;
Ok(exists)
})
}
fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let (options, database) = parse_for_maintenance(url)?;
let mut conn = options.connect().await?;
let _ = conn
.execute(&*format!(
"DROP DATABASE IF EXISTS \"{}\"",
database.replace('"', "\"\"")
))
.await?;
Ok(())
})
}
fn force_drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let (options, database) = parse_for_maintenance(url)?;
let mut conn = options.connect().await?;
let row: (String,) = query_as("SELECT current_setting('server_version_num')")
.fetch_one(&mut conn)
.await?;
let version = row.0.parse::<i32>().unwrap();
let pid_type = if version >= 90200 { "pid" } else { "procpid" };
conn.execute(&*format!(
"SELECT pg_terminate_backend(pg_stat_activity.{pid_type}) FROM pg_stat_activity \
WHERE pg_stat_activity.datname = '{database}' AND {pid_type} <> pg_backend_pid()"
))
.await?;
Self::drop_database(url).await
})
}
}
impl Migrate for PgConnection {
fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
Box::pin(async move {
self.execute(
r#"
CREATE TABLE IF NOT EXISTS _sqlx_migrations (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
success BOOLEAN NOT NULL,
checksum BYTEA NOT NULL,
execution_time BIGINT NOT NULL
);
"#,
)
.await?;
Ok(())
})
}
fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
Box::pin(async move {
let row: Option<(i64,)> = query_as(
"SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
)
.fetch_optional(self)
.await?;
Ok(row.map(|r| r.0))
})
}
fn list_applied_migrations(
&mut self,
) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
Box::pin(async move {
let rows: Vec<(i64, Vec<u8>)> =
query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
.fetch_all(self)
.await?;
let migrations = rows
.into_iter()
.map(|(version, checksum)| AppliedMigration {
version,
checksum: checksum.into(),
})
.collect();
Ok(migrations)
})
}
fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
Box::pin(async move {
let database_name = current_database(self).await?;
let lock_id = generate_lock_id(&database_name);
let _ = query("SELECT pg_advisory_lock($1)")
.bind(lock_id)
.execute(self)
.await?;
Ok(())
})
}
fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
Box::pin(async move {
let database_name = current_database(self).await?;
let lock_id = generate_lock_id(&database_name);
let _ = query("SELECT pg_advisory_unlock($1)")
.bind(lock_id)
.execute(self)
.await?;
Ok(())
})
}
fn apply<'e: 'm, 'm>(
&'e mut self,
migration: &'m Migration,
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
Box::pin(async move {
let start = Instant::now();
if migration.no_tx {
execute_migration(self, migration).await?;
} else {
let mut tx = self.begin().await?;
execute_migration(&mut tx, migration).await?;
tx.commit().await?;
}
let elapsed = start.elapsed();
#[allow(clippy::cast_possible_truncation)]
let _ = query(
r#"
UPDATE _sqlx_migrations
SET execution_time = $1
WHERE version = $2
"#,
)
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
.await?;
Ok(elapsed)
})
}
fn revert<'e: 'm, 'm>(
&'e mut self,
migration: &'m Migration,
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
Box::pin(async move {
let mut tx = self.begin().await?;
let start = Instant::now();
let _ = tx.execute(&*migration.sql).await?;
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
.bind(migration.version)
.execute(&mut *tx)
.await?;
tx.commit().await?;
let elapsed = start.elapsed();
Ok(elapsed)
})
}
}
async fn execute_migration(
conn: &mut PgConnection,
migration: &Migration,
) -> Result<(), MigrateError> {
let _ = conn
.execute(&*migration.sql)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
let _ = query(
r#"
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
VALUES ( $1, $2, TRUE, $3, -1 )
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(conn)
.await?;
Ok(())
}
async fn current_database(conn: &mut PgConnection) -> Result<String, MigrateError> {
Ok(query_scalar("SELECT current_database()")
.fetch_one(conn)
.await?)
}
fn generate_lock_id(database_name: &str) -> i64 {
const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
}