0 0
Read Time:9 Minute, 15 Second

前言:

   当我们在使用任何的一个编程语言进行数据库应用的开发的时候, “建立数据库连接池” 与 “实现测试隔离” 是相当关键且相当重要的。

   在本文中将探讨 Rust 是如何的产生 PostgreSQL 数据库连接池与实现测试隔离的; 不复杂, 却有些 “关键点” 需要特别的了解与注意的。

 

本文: PostgreSQL 连接池与测试隔离

Rust modules

   我们使用以下的 modules 来组织代码:

src/
  configuration.rs
  lib.rs
  main.rs
  routes/
    mod.rs
    health_check.rs
    subscriptions.rs
  startup.rs

根据 Rust modules 宣告的规范:

  • 我们将所定义的 modules; configuration.rs, routes, startup.rs; 在 lib.rs 宣告为 pub mod。

    lib.rs 的代码如下:

pub mod configuration;
pub mod routes;
pub mod startup;
  • module routes 是一 “目录”, 所以, 我们需在 module routes 中以 “mod.rs” 宣告 module routes 内的 modules; health_check.rs, subscriptions.rs; 为 pub mod。
  • 我们在 “mod.rs” 中使用 pub use 将 module health_check.rs 内的 pub async fn health_check() -> HttpResponse , module subscriptions.rs 内的 pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>) -> HttpRespons; re-export 在 module routes。
  • 我们将 module health_check.rs 内的 pub async fn health_check() -> HttpResponse , module subscriptions.rs 内的 pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>) -> HttpRespons; re-export 在 module routes; 将使得我们在 module routes 内便可以存取到 pub async fn health_check() -> HttpResponse 与 pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>) -> HttpRespons; 存取的方式如下: use crate::routes::{health_check, subscribe}。
  • Rust 的 “目录” 结构的 modules, 使得我们能更结构化的组织我们的代码。而 “re-export”; 简化了我们在 “目录” 结构的 modules 内的存取; 使得我们不致于因为较复杂的 “目录” 结构的 modules, 而就会出现冗长的存取路径。

         mod.rs 的代码如下:

mod health_check;
mod subscriptions;

pub use health_check::{health_check};
pub use subscriptions::{subscribe};

关于 Rust modules 的教学视频, 可参考:

Rust module system explainedhttps://www.youtube.com/watch?v=4KsAsGhFo4U 

 

读取配置档

    我们将使用 crate config 来管理我们在 application port, database connection 上的配置:

  • 我们需设定个 Rust struct; 此 struct 将是个实现 serde’s Deserialize trait 的 Rust type。

     此 Rust struct; Settings; 的代码如下:

#[derive(serde::Deserialize)]
pub struct Settings {
    pub application_port: u16,
    pub database: DatabaseSettings,
}

#[derive(serde::Deserialize)]
pub struct DatabaseSettings {
    pub username: String,
    pub password: String,
    pub port: u16,
    pub host: String,
    pub database_name: String,
}

这个关于 “配置” 的 Rust struct; Settings; 共分为两部份:

  1. application port; 提供给 actix-web 可以听取来自 client 端的请求。
  2. Database connection 的参数。

我们已经将关于 “配置” 的 Rust struct; Settings; 准备好了, 接下来, 我们将在 Cargo.toml 中加入 crate config 的依赖。

Cargo.toml 如下:

[dependencies]
actix-web = "4"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0", features = ["derive"] }
config = "0.11"
uuid = { version = "0.8.1", features = ["v4"]}
chrono = "0.4.15"

[dependencies.sqlx]
version = "0.5.7"
default-features = false
features = [
    "runtime-actix-rustls",
    "macros",
    "postgres",
    "uuid",
    "chrono",
    "migrate"
]

[dev-dependencies]
reqwest = "0.11"

我们现在可以开始开发读取配置档的函式 (function) 了:

   读取配置档的函式; pub fn get_configuration() -> Result<Settings, config::ConfigError>; 主要是使用 crate config 去读取配置文件; config/configuration.yaml。然后, 将从配置文件中, 所读到的值转换到 struct Settings

pub fn get_configuration() -> Result<Settings, config::ConfigError> 的代码如下:

pub fn get_configuration() -> Result<Settings, config::ConfigError> {
    // Initialize our configuration reader
    let mut settings = config::Config::default();

    // Add configuration values from a file named 'configuration'
    settings.merge(config::File::with_name("config/configuration"))?;

    // Try to convert the configuration values it read into our Settings type
    settings.try_into()
}

我们将读取配置档; pub fn get_configuration() -> Result<Settings, config::ConfigError>; 当成是 main.rs 的第一步。

main.rs 的代码如下:

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Panic if we can't read configuration
    let configuration = get_configuration().expect("Failed to read configuration.");

PgConnection vs. PgPool

   sqlx 有著异步的接口, 但是, 由 sqlx::PgConnection::connect 所建立的与 PostgreSQL 的连接, 却不允许著在相同的 PostgreSQL 的连接上, 能同时有著一个以上的 SQL queries。

sqlx::PgPool::connect; 能建立一与 PostgreSQL 连接的连接池 (Pool)。当我们是针对 &PgPool 执行某个 SQL query 时, sqlx 便会从连接池 (Pool) 当中, “borrow” 一个 PgConnection , 并且使用这个 PgConnection 来执行 SQL query。

假如, 连接池 (Pool) 当中没有可供使用的 PgConnection, sqlx 便会在连接池 (Pool) 中产生新的 PgConnection 或是等待其他正在使用的 PgConnection 被释放出来。

所以, PgPool 增加了可同时被执行的 SQL queries 的数量, 更重要的是, PgPool 使得某个执行较慢的 SQL query, 不会影响到其他 SQL queries 的执行。

 

PostgreSQL 的连接池

连接字串

  • 为了要使 sqlx::PgConnection::connect, sqlx::PgPool::connect 能建立与 PostgreSQL 的连接, 我们需要先构建一 “连接字串”。
  • 因为, struct DatabaseSettings 提供了完整的与 PostgreSQL 连接时所需的参数, 所以, 我们就在 struct DatabaseSettings 中加入可构建 “连接字串” 的函式 (function); pub fn connection_string(&self) -> String

pub fn connection_string(&self) -> String 的代码如下:

#[derive(serde::Deserialize)]
pub struct DatabaseSettings {
    pub username: String,
    pub password: String,
    pub port: u16,
    pub host: String,
    pub database_name: String,
}

impl DatabaseSettings {
    pub fn connection_string(&self) -> String {
        format!(
            "postgres://{}:{}@{}:{}/{}",
            self.username, self.password, self.host, self.port, self.database_name
        )
    }
}

Clone-able

  • actix-web 会针对每一个的 HttpServer::new 生成一 actix-web Worker; 也就是说, 每一个的 actix-web Worker 都是执行个自所拥有的 HttpServer::new。
  • 为了使在每一个的 actix-web Worker 中所执行的 HttpServer::new 都能拥有相同且正确的 PostgreSQL 的连接, 我们必需要使得 sqlx::PgConnection::connect, sqlx::PgPool::connect 是 “clone-able” 的。
  • web::Data; actix-web 的 extractor; 能将 sqlx::PgConnection::connect、sqlx::PgPool::connect 封装在 Atomic Reference Counter 的指针; Arc; 当中。
  • Atomic Reference Counter 的指针; Arc<sqlx::PgConnection::connect>, Arc< sqlx::PgPool::connect>; 是 “clone-able” 的。

src/main.rs; 使用 PgPool 建立一与 PostgreSQL 连接的连接池 (Pool)

src/main.rs 代码如下:

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Panic if we can't read configuration
    let configuration = get_configuration().expect("Failed to read configuration.");
    let connection_pool = PgPool::connect(&configuration.database.connection_string())
        .await
        .expect("Failed to connect to Postgres.");

    let address = format!("127.0.0.1:{}", configuration.application_port);
    let listener = TcpListener::bind(address)?;

    run(listener, connection_pool)?.await
}

src/startup.rs; 使用 web::Data 将 PgPool 是可 clone-able的。

src/startup.rs 的代码如下:

pub fn run(listener: TcpListener, db_pool: PgPool) -> Result<Server, std::io::Error> {
    // Wrap the pool using web::Data, which boils down to an Arc smart pointer
    let db_pool = web::Data::new(db_pool);
    let server = HttpServer::new(move || {
        App::new()
            .route("/health_check", web::get().to(health_check))
            // A new entry in our routing table for POST /subscriptions requests
            .route("/subscriptions", web::post().to(subscribe))
            // Register the connection as part of the application state
            .app_data(db_pool.clone())
    })
    .listen(listener)?
    .run();

    // No .await here!
    Ok(server)
}

INSERT Query

   我们使用 sqlx::query! 将订阅者的信息; email, 姓名; 新增至 PostgreSQL。

src/routes/subscriptions.rs 的代码如下:

#[derive(serde::Deserialize)]
pub struct FormData {
    email: String,
    name: String,
}

pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>) -> HttpResponse {
    match sqlx::query!(
        r#"
        INSERT INTO subscriptions (id, email, name, subscribed_at)
        VALUES ($1, $2, $3, $4)
        "#,
        Uuid::new_v4(),
        form.email,
        form.name,
        Utc::now()
    )
    .execute(pool.get_ref())
    .await
    {
        Ok(_) => HttpResponse::Ok().finish(),
        Err(e) => {
            println!("Failed to execute query: {}", e);
            HttpResponse::InternalServerError().finish()
        }
    }
}

测试隔离

   我们已完成了将订阅者的信息, 新增至 PostgreSQL 的开发。接下来, 我们当然就是要来进行自动化集成测试用例的开发。

因为, 此自动化集成测试用例涉及到将资料写至 PostgreSQL, 所以, 我们必须要能做到; 测试隔离: 每次的自动化集成测试用例所执行的结果, 都 “不会” 对之前或之后所执行的自动化集成测试用例的执行结果产生所谓的 “side effect”。

我们实践 “测试隔离” 的作法是:

  • 针对每次自动化集成测试用例的执行, 我们都将产生一 “全新” 的数据库。

我们主要是根据以下的两个步骤, 而使得每次自动化集成测试用例的执行, 都能产生一 “全新” 的数据库:

  • 在 src/configuration.rs , struct DatabaseSettings 中加入 pub fn connection_string_without_db(&self) -> String; 生成没有指定数据库名称的 “连接字串”

src/configuration.rs 的代码如下:

#[derive(serde::Deserialize)]
pub struct Settings {
    pub application_port: u16,
    pub database: DatabaseSettings,
}

#[derive(serde::Deserialize)]
pub struct DatabaseSettings {
    pub username: String,
    pub password: String,
    pub port: u16,
    pub host: String,
    pub database_name: String,
}

impl DatabaseSettings {
    
    pub fn connection_string_without_db(&self) -> String {
        format!(
            "postgres://{}:{}@{}:{}",
            self.username, self.password, self.host, self.port
        )
    }
}
  •  tests/health_check.rs 使用由 pub fn connection_string_without_db(&self) -> String 所生成没有指定数据库名称的 “连接字串”, 建立我们所需的数据库。我们是以一随机生成 Uuid 的方式, 产生数据库的名称; 也就是说, 针对每一次的自动化集成测试用例的执行, 我们都将以一随机生成的数字, 当成是此次自动化集成测试用例的数据库名称, 因而, 使得每次的自动化集成测试用例的执行都不会使用 (影响) 到别的自动化集成测试用例的数据库; 达到我们所期望的 “测试隔离”

tests/health_check.rs 的代码如下:

pub struct TestApp {
    pub address: String,
    pub db_pool: PgPool,
}

pub async fn configure_database(config: &DatabaseSettings) -> PgPool {
    // Create database
    let mut connection = PgConnection::connect(&config.connection_string_without_db())
        .await
        .expect("Failed to connect to Postgres");

    connection
        .execute(format!(r#"CREATE DATABASE "{}";"#, config.database_name).as_str())
        .await
        .expect("Failed to create database");

    // Migrate database
    let connection_pool = PgPool::connect(&config.connection_string())
        .await
        .expect("Failed to connect to Postgres.");

    sqlx::migrate!("./migrations")
        .run(&connection_pool)
        .await
        .expect("Failed to migrate the database");

    connection_pool
}

// Launch our application in the background
// tokio::spawn runs concurrently with down stream futures and tasks; our test logic.
async fn spawn_app() -> TestApp {
    let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");

    // We retrieve the port assigned to us by the OS
    let port = listener.local_addr().unwrap().port();
    let address = format!("http://127.0.0.1:{}", port);

    let mut configuration = get_configuration().expect("Failed to read configuration");
    configuration.database.database_name = Uuid::new_v4().to_string();

    let connection_pool = configure_database(&configuration.database).await;

    let server = run(listener, connection_pool.clone()).expect("Failed to bind address");

    // Launch the server as a background task
    // tokio::spam returns a handle to the spawned future,
    // but we have no use for it here, hence the non-binding let
    let _ = tokio::spawn(server);

    // We return the application address & PgPool to the caller
    TestApp {
        address,
        db_pool: connection_pool,
    }
}

#[tokio::test]
async fn health_check_works() {
    // Arrange HttpServer::run
    let app = spawn_app().await;

    // We need to bring in 'reqwest'
    // to perform HTTP requests against our application
    let client = reqwest::Client::new();

    // Act
    let response = client
        .get(&format!("{}/health_check", &app.address))
        .send()
        .await
        .expect("Failed to execute request.");

    // Assert
    assert!(response.status().is_success());
    assert_eq!(Some(0), response.content_length());
}

#[tokio::test]
async fn subscribe_returns_a_200_for_valid_from_data() {
    // Arrange
    let app = spawn_app().await;
    let client = reqwest::Client::new();

    // Act
    let body = "name=ken%20fang&email=kenfang%40gmail.com";
    let response = client
        .post(&format!("{}/subscriptions", &app.address))
        .header("Content-Type", "application/x-www-form-urlencoded")
        .body(body)
        .send()
        .await
        .expect("Failed to send request");

    // Assert
    assert_eq!(200, response.status().as_u16());

    let saved = sqlx::query!("SELECT email, name FROM subscriptions",)
        .fetch_one(&app.db_pool)
        .await
        .expect("Failed to fetch saved subscription.");

    assert_eq!(saved.email, "kenfang@gmail.com");
    assert_eq!(saved.name, "ken fang");
}

#[tokio::test]
async fn subscribe_returns_a_400_when_data_is_missing() {
    // Arrange
    let app = spawn_app().await;
    let client = reqwest::Client::new();
    let test_cases = vec![
        ("name=ken%20fang", "missing the email"),
        ("email=kenfang%40gmail.com", "missing the name"),
        ("", "missing both name and email"),
    ];

    // Act & Assert
    for (invalid_body, error_message) in test_cases {
        // Act
        let response = client
            .post(&format!("{}/subscriptions", &app.address))
            .header("Content-Type", "application/x-www-form-urlencoded")
            .body(invalid_body)
            .send()
            .await
            .expect("Failed to send request.");

        // Assert
        assert_eq!(
            400,
            response.status().as_u16(),
            // Customized error message on test failure
            "The API did not fail with 400 Bad Request when the payload was {}.",
            error_message
        );
    }
}
图一: 随机的生成数据库的名称

如图一所示, 我们每一次执行 cargo test, 都会随机的生成一数据库的名称。

 

结论:

    Rust 产生 PostgreSQL 数据库连接池与实现测试隔离需了解与注意的 “关键点”:

  • sqlx::PgConnection::connect, sqlx::PgPool::connect 必需是 “clone-able” 的。
  • 测试隔离: 生成没有指定数据库名称的 “连接字串”; 以一随机生成 Uuid 的方式, 产生数据库的名称。

一点都不复杂, 同意吗 ?

完整的代码, 请参考: https://github.com/KenFang/StoringData

About Post Author

方俊贤; Ken Fang

专利号: 201910652769.4; 一种深度学习的算法, 预测微服务持续发布、持续部署后对产品整体质量的影响, 获得国家知识财产局专利; 符合专利法实施细则第 44 条的规定。
Happy
Happy
0 %
Sad
Sad
0 %
Excited
Excited
0 %
Sleepy
Sleepy
0 %
Angry
Angry
0 %
Surprise
Surprise
0 %

Average Rating

5 Star
0%
4 Star
0%
3 Star
0%
2 Star
0%
1 Star
0%

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据