sqlx_mysql/testing/
mod.rsuse std::fmt::Write;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};
use futures_core::future::BoxFuture;
use once_cell::sync::OnceCell;
use crate::connection::Connection;
use crate::error::Error;
use crate::executor::Executor;
use crate::pool::{Pool, PoolOptions};
use crate::query::query;
use crate::query_builder::QueryBuilder;
use crate::query_scalar::query_scalar;
use crate::{MySql, MySqlConnectOptions, MySqlConnection};
pub(crate) use sqlx_core::testing::*;
static MASTER_POOL: OnceCell<Pool<MySql>> = OnceCell::new();
static DO_CLEANUP: AtomicBool = AtomicBool::new(true);
impl TestSupport for MySql {
fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
Box::pin(async move { test_context(args).await })
}
fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let mut conn = MASTER_POOL
.get()
.expect("cleanup_test() invoked outside `#[sqlx::test]")
.acquire()
.await?;
let db_id = db_id(db_name);
conn.execute(&format!("drop database if exists {db_name};")[..])
.await?;
query("delete from _sqlx_test_databases where db_id = ?")
.bind(db_id)
.execute(&mut *conn)
.await?;
Ok(())
})
}
fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
Box::pin(async move {
let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut conn = MySqlConnection::connect(&url).await?;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
let num_deleted = do_cleanup(&mut conn, now).await?;
let _ = conn.close().await;
Ok(Some(num_deleted))
})
}
fn snapshot(
_conn: &mut Self::Connection,
) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
todo!()
}
}
async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
let master_opts = MySqlConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
let pool = PoolOptions::new()
.max_connections(20)
.after_release(|_conn, _| Box::pin(async move { Ok(false) }))
.connect_lazy_with(master_opts);
let master_pool = match MASTER_POOL.try_insert(pool) {
Ok(inserted) => inserted,
Err((existing, pool)) => {
assert_eq!(
existing.connect_options().host,
pool.connect_options().host,
"DATABASE_URL changed at runtime, host differs"
);
assert_eq!(
existing.connect_options().database,
pool.connect_options().database,
"DATABASE_URL changed at runtime, database differs"
);
existing
}
};
let mut conn = master_pool.acquire().await?;
conn.execute(
r#"
create table if not exists _sqlx_test_databases (
db_id bigint unsigned primary key auto_increment,
test_path text not null,
created_at timestamp not null default current_timestamp
);
"#,
)
.await?;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
if DO_CLEANUP.swap(false, Ordering::SeqCst) {
do_cleanup(&mut conn, now).await?;
}
query("insert into _sqlx_test_databases(test_path) values (?)")
.bind(args.test_path)
.execute(&mut *conn)
.await?;
let new_db_id: u64 = query_scalar("select last_insert_id()")
.fetch_one(&mut *conn)
.await?;
let new_db_name = db_name(new_db_id);
conn.execute(&format!("create database {new_db_name}")[..])
.await?;
eprintln!("created database {new_db_name}");
Ok(TestContext {
pool_opts: PoolOptions::new()
.max_connections(5)
.idle_timeout(Some(Duration::from_secs(1)))
.parent(master_pool.clone()),
connect_opts: master_pool
.connect_options()
.deref()
.clone()
.database(&new_db_name),
db_name: new_db_name,
})
}
async fn do_cleanup(conn: &mut MySqlConnection, created_before: Duration) -> Result<usize, Error> {
let created_before_as_secs = created_before.as_secs() - 2;
let delete_db_ids: Vec<u64> = query_scalar(
"select db_id from _sqlx_test_databases \
where created_at < from_unixtime(?)",
)
.bind(created_before_as_secs)
.fetch_all(&mut *conn)
.await?;
if delete_db_ids.is_empty() {
return Ok(0);
}
let mut deleted_db_ids = Vec::with_capacity(delete_db_ids.len());
let mut command = String::new();
for db_id in delete_db_ids {
command.clear();
let db_name = db_name(db_id);
writeln!(command, "drop database if exists {db_name}").ok();
match conn.execute(&*command).await {
Ok(_deleted) => {
deleted_db_ids.push(db_id);
}
Err(Error::Database(dbe)) => {
eprintln!("could not clean test database {db_id:?}: {dbe}")
}
Err(e) => return Err(e),
}
}
let mut query = QueryBuilder::new("delete from _sqlx_test_databases where db_id in (");
let mut separated = query.separated(",");
for db_id in &deleted_db_ids {
separated.push_bind(db_id);
}
query.push(")").build().execute(&mut *conn).await?;
Ok(deleted_db_ids.len())
}
fn db_name(id: u64) -> String {
format!("_sqlx_test_database_{id}")
}
fn db_id(name: &str) -> u64 {
name.trim_start_matches("_sqlx_test_database_")
.parse()
.unwrap_or_else(|_1| panic!("failed to parse ID from database name {name:?}"))
}
#[test]
fn test_db_name_id() {
assert_eq!(db_name(12345), "_sqlx_test_database_12345");
assert_eq!(db_id("_sqlx_test_database_12345"), 12345);
}