0 0
Read Time:9 Minute, 15 Second

前言:

   當我們在使用任何的一個編程語言進行資料庫應用的開發的時候, 「建立資料庫連接池」 與 「實現測試隔離」 是相當關鍵且相當重要的。

   在本文中將探討 Rust 是如何的產生 PostgreSQL 資料庫連接池與實現測試隔離的; 不複雜, 卻有些 「關鍵點」 需要特別的了解與注意的。

 

本文: PostgreSQL 連接池與測試隔離

Rust modules

   我們使用以下的 modules 來組織代碼:

src/
  configuration.rs
  lib.rs
  main.rs
  routes/
    mod.rs
    health_check.rs
    subscriptions.rs
  startup.rs

根據 Rust modules 宣告的規範:

  • 我們將所定義的 modules; configuration.rs, routes, startup.rs; 在 lib.rs 宣告為 pub mod。

    lib.rs 的代碼如下:

pub mod configuration;
pub mod routes;
pub mod startup;
  • module routes 是一 「目錄」, 所以, 我們需在 module routes 中以 「mod.rs」 宣告 module routes 內的 modules; health_check.rs, subscriptions.rs; 為 pub mod。
  • 我們在 「mod.rs」 中使用 pub use 將 module health_check.rs 內的 pub async fn health_check() -> HttpResponse , module subscriptions.rs 內的 pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>) -> HttpRespons; re-export 在 module routes。
  • 我們將 module health_check.rs 內的 pub async fn health_check() -> HttpResponse , module subscriptions.rs 內的 pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>) -> HttpRespons; re-export 在 module routes; 將使得我們在 module routes 內便可以存取到 pub async fn health_check() -> HttpResponse 與 pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>) -> HttpRespons; 存取的方式如下: use crate::routes::{health_check, subscribe}。
  • Rust 的 「目錄」 結構的 modules, 使得我們能更結構化的組織我們的代碼。而 「re-export」; 簡化了我們在 「目錄」 結構的 modules 內的存取; 使得我們不致於因為較複雜的 「目錄」 結構的 modules, 而就會出現冗長的存取路徑。

         mod.rs 的代碼如下:

mod health_check;
mod subscriptions;

pub use health_check::{health_check};
pub use subscriptions::{subscribe};

關於 Rust modules 的教學視頻, 可參考:

Rust module system explainedhttps://www.youtube.com/watch?v=4KsAsGhFo4U 

 

讀取配置檔

    我們將使用 crate config 來管理我們在 application port, database connection 上的配置:

  • 我們需設定個 Rust struct; 此 struct 將是個實現 serde』s Deserialize trait 的 Rust type。

     此 Rust struct; Settings; 的代碼如下:

#[derive(serde::Deserialize)]
pub struct Settings {
    pub application_port: u16,
    pub database: DatabaseSettings,
}

#[derive(serde::Deserialize)]
pub struct DatabaseSettings {
    pub username: String,
    pub password: String,
    pub port: u16,
    pub host: String,
    pub database_name: String,
}

這個關於 「配置」 的 Rust struct; Settings; 共分為兩部份:

  1. application port; 提供給 actix-web 可以聽取來自 client 端的請求。
  2. Database connection 的參數。

我們已經將關於 「配置」 的 Rust struct; Settings; 準備好了, 接下來, 我們將在 Cargo.toml 中加入 crate config 的依賴。

Cargo.toml 如下:

[dependencies]
actix-web = "4"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0", features = ["derive"] }
config = "0.11"
uuid = { version = "0.8.1", features = ["v4"]}
chrono = "0.4.15"

[dependencies.sqlx]
version = "0.5.7"
default-features = false
features = [
    "runtime-actix-rustls",
    "macros",
    "postgres",
    "uuid",
    "chrono",
    "migrate"
]

[dev-dependencies]
reqwest = "0.11"

我們現在可以開始開發讀取配置檔的函式 (function) 了:

   讀取配置檔的函式; pub fn get_configuration() -> Result<Settings, config::ConfigError>; 主要是使用 crate config 去讀取配置文件; config/configuration.yaml。然後, 將從配置文件中, 所讀到的值轉換到 struct Settings

pub fn get_configuration() -> Result<Settings, config::ConfigError> 的代碼如下:

pub fn get_configuration() -> Result<Settings, config::ConfigError> {
    // Initialize our configuration reader
    let mut settings = config::Config::default();

    // Add configuration values from a file named 'configuration'
    settings.merge(config::File::with_name("config/configuration"))?;

    // Try to convert the configuration values it read into our Settings type
    settings.try_into()
}

我們將讀取配置檔; pub fn get_configuration() -> Result<Settings, config::ConfigError>; 當成是 main.rs 的第一步。

main.rs 的代碼如下:

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Panic if we can't read configuration
    let configuration = get_configuration().expect("Failed to read configuration.");

PgConnection vs. PgPool

   sqlx 有著非同步的介面, 但是, 由 sqlx::PgConnection::connect 所建立的與 PostgreSQL 的連接, 卻不允許著在相同的 PostgreSQL 的連接上, 能同時有著一個以上的 SQL queries。

sqlx::PgPool::connect; 能建立一與 PostgreSQL 連接的連接池 (Pool)。當我們是針對 &PgPool 執行某個 SQL query 時, sqlx 便會從連接池 (Pool) 當中, 「borrow」 一個 PgConnection , 並且使用這個 PgConnection 來執行 SQL query。

假如, 連接池 (Pool) 當中沒有可供使用的 PgConnection, sqlx 便會在連接池 (Pool) 中產生新的 PgConnection 或是等待其他正在使用的 PgConnection 被釋放出來。

所以, PgPool 增加了可同時被執行的 SQL queries 的數量, 更重要的是, PgPool 使得某個執行較慢的 SQL query, 不會影響到其他 SQL queries 的執行。

 

PostgreSQL 的連接池

連接字串

  • 為了要使 sqlx::PgConnection::connect, sqlx::PgPool::connect 能建立與 PostgreSQL 的連接, 我們需要先構建一 「連接字串」。
  • 因為, struct DatabaseSettings 提供了完整的與 PostgreSQL 連接時所需的參數, 所以, 我們就在 struct DatabaseSettings 中加入可構建 「連接字串」 的函式 (function); pub fn connection_string(&self) -> String

pub fn connection_string(&self) -> String 的代碼如下:

#[derive(serde::Deserialize)]
pub struct DatabaseSettings {
    pub username: String,
    pub password: String,
    pub port: u16,
    pub host: String,
    pub database_name: String,
}

impl DatabaseSettings {
    pub fn connection_string(&self) -> String {
        format!(
            "postgres://{}:{}@{}:{}/{}",
            self.username, self.password, self.host, self.port, self.database_name
        )
    }
}

Clone-able

  • actix-web 會針對每一個的 HttpServer::new 生成一 actix-web Worker; 也就是說, 每一個的 actix-web Worker 都是執行個自所擁有的 HttpServer::new。
  • 為了使在每一個的 actix-web Worker 中所執行的 HttpServer::new 都能擁有相同且正確的 PostgreSQL 的連接, 我們必需要使得 sqlx::PgConnection::connect, sqlx::PgPool::connect 是 「clone-able」 的。
  • web::Data; actix-web 的 extractor; 能將 sqlx::PgConnection::connect、sqlx::PgPool::connect 封裝在 Atomic Reference Counter 的指針; Arc; 當中。
  • Atomic Reference Counter 的指針; Arc<sqlx::PgConnection::connect>, Arc< sqlx::PgPool::connect>; 是 「clone-able」 的。

src/main.rs; 使用 PgPool 建立一與 PostgreSQL 連接的連接池 (Pool)

src/main.rs 代碼如下:

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Panic if we can't read configuration
    let configuration = get_configuration().expect("Failed to read configuration.");
    let connection_pool = PgPool::connect(&configuration.database.connection_string())
        .await
        .expect("Failed to connect to Postgres.");

    let address = format!("127.0.0.1:{}", configuration.application_port);
    let listener = TcpListener::bind(address)?;

    run(listener, connection_pool)?.await
}

src/startup.rs; 使用 web::Data 將 PgPool 是可 clone-able的。

src/startup.rs 的代碼如下:

pub fn run(listener: TcpListener, db_pool: PgPool) -> Result<Server, std::io::Error> {
    // Wrap the pool using web::Data, which boils down to an Arc smart pointer
    let db_pool = web::Data::new(db_pool);
    let server = HttpServer::new(move || {
        App::new()
            .route("/health_check", web::get().to(health_check))
            // A new entry in our routing table for POST /subscriptions requests
            .route("/subscriptions", web::post().to(subscribe))
            // Register the connection as part of the application state
            .app_data(db_pool.clone())
    })
    .listen(listener)?
    .run();

    // No .await here!
    Ok(server)
}

INSERT Query

   我們使用 sqlx::query! 將訂閱者的信息; email, 姓名; 新增至 PostgreSQL。

src/routes/subscriptions.rs 的代碼如下:

#[derive(serde::Deserialize)]
pub struct FormData {
    email: String,
    name: String,
}

pub async fn subscribe(form: web::Form<FormData>, pool: web::Data<PgPool>) -> HttpResponse {
    match sqlx::query!(
        r#"
        INSERT INTO subscriptions (id, email, name, subscribed_at)
        VALUES ($1, $2, $3, $4)
        "#,
        Uuid::new_v4(),
        form.email,
        form.name,
        Utc::now()
    )
    .execute(pool.get_ref())
    .await
    {
        Ok(_) => HttpResponse::Ok().finish(),
        Err(e) => {
            println!("Failed to execute query: {}", e);
            HttpResponse::InternalServerError().finish()
        }
    }
}

測試隔離

   我們已完成了將訂閱者的信息, 新增至 PostgreSQL 的開發。接下來, 我們當然就是要來進行自動化集成測試用例的開發。

因為, 此自動化集成測試用例涉及到將資料寫至 PostgreSQL, 所以, 我們必須要能做到; 測試隔離: 每次的自動化集成測試用例所執行的結果, 都 「不會」 對之前或之後所執行的自動化集成測試用例的執行結果產生所謂的 「side effect」。

我們實踐 “測試隔離” 的作法是:

  • 針對每次自動化集成測試用例的執行, 我們都將產生一 「全新」 的資料庫。

我們主要是根據以下的兩個步驟, 而使得每次自動化集成測試用例的執行, 都能產生一 「全新」 的資料庫:

  • 在 src/configuration.rs , struct DatabaseSettings 中加入 pub fn connection_string_without_db(&self) -> String; 生成沒有指定資料庫名稱的 「連接字串」

src/configuration.rs 的代碼如下:

#[derive(serde::Deserialize)]
pub struct Settings {
    pub application_port: u16,
    pub database: DatabaseSettings,
}

#[derive(serde::Deserialize)]
pub struct DatabaseSettings {
    pub username: String,
    pub password: String,
    pub port: u16,
    pub host: String,
    pub database_name: String,
}

impl DatabaseSettings {
    
    pub fn connection_string_without_db(&self) -> String {
        format!(
            "postgres://{}:{}@{}:{}",
            self.username, self.password, self.host, self.port
        )
    }
}
  •  tests/health_check.rs 使用由 pub fn connection_string_without_db(&self) -> String 所生成沒有指定資料庫名稱的 「連接字串」, 建立我們所需的資料庫。我們是以一隨機生成 Uuid 的方式, 產生資料庫的名稱; 也就是說, 針對每一次的自動化集成測試用例的執行, 我們都將以一隨機生成的數字, 當成是此次自動化集成測試用例的資料庫名稱, 因而, 使得每次的自動化集成測試用例的執行都不會使用 (影響) 到別的自動化集成測試用例的資料庫; 達到我們所期望的 「測試隔離」

tests/health_check.rs 的代碼如下:

pub struct TestApp {
    pub address: String,
    pub db_pool: PgPool,
}

pub async fn configure_database(config: &DatabaseSettings) -> PgPool {
    // Create database
    let mut connection = PgConnection::connect(&config.connection_string_without_db())
        .await
        .expect("Failed to connect to Postgres");

    connection
        .execute(format!(r#"CREATE DATABASE "{}";"#, config.database_name).as_str())
        .await
        .expect("Failed to create database");

    // Migrate database
    let connection_pool = PgPool::connect(&config.connection_string())
        .await
        .expect("Failed to connect to Postgres.");

    sqlx::migrate!("./migrations")
        .run(&connection_pool)
        .await
        .expect("Failed to migrate the database");

    connection_pool
}

// Launch our application in the background
// tokio::spawn runs concurrently with down stream futures and tasks; our test logic.
async fn spawn_app() -> TestApp {
    let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");

    // We retrieve the port assigned to us by the OS
    let port = listener.local_addr().unwrap().port();
    let address = format!("http://127.0.0.1:{}", port);

    let mut configuration = get_configuration().expect("Failed to read configuration");
    configuration.database.database_name = Uuid::new_v4().to_string();

    let connection_pool = configure_database(&configuration.database).await;

    let server = run(listener, connection_pool.clone()).expect("Failed to bind address");

    // Launch the server as a background task
    // tokio::spam returns a handle to the spawned future,
    // but we have no use for it here, hence the non-binding let
    let _ = tokio::spawn(server);

    // We return the application address & PgPool to the caller
    TestApp {
        address,
        db_pool: connection_pool,
    }
}

#[tokio::test]
async fn health_check_works() {
    // Arrange HttpServer::run
    let app = spawn_app().await;

    // We need to bring in 'reqwest'
    // to perform HTTP requests against our application
    let client = reqwest::Client::new();

    // Act
    let response = client
        .get(&format!("{}/health_check", &app.address))
        .send()
        .await
        .expect("Failed to execute request.");

    // Assert
    assert!(response.status().is_success());
    assert_eq!(Some(0), response.content_length());
}

#[tokio::test]
async fn subscribe_returns_a_200_for_valid_from_data() {
    // Arrange
    let app = spawn_app().await;
    let client = reqwest::Client::new();

    // Act
    let body = "name=ken%20fang&email=kenfang%40gmail.com";
    let response = client
        .post(&format!("{}/subscriptions", &app.address))
        .header("Content-Type", "application/x-www-form-urlencoded")
        .body(body)
        .send()
        .await
        .expect("Failed to send request");

    // Assert
    assert_eq!(200, response.status().as_u16());

    let saved = sqlx::query!("SELECT email, name FROM subscriptions",)
        .fetch_one(&app.db_pool)
        .await
        .expect("Failed to fetch saved subscription.");

    assert_eq!(saved.email, "kenfang@gmail.com");
    assert_eq!(saved.name, "ken fang");
}

#[tokio::test]
async fn subscribe_returns_a_400_when_data_is_missing() {
    // Arrange
    let app = spawn_app().await;
    let client = reqwest::Client::new();
    let test_cases = vec![
        ("name=ken%20fang", "missing the email"),
        ("email=kenfang%40gmail.com", "missing the name"),
        ("", "missing both name and email"),
    ];

    // Act & Assert
    for (invalid_body, error_message) in test_cases {
        // Act
        let response = client
            .post(&format!("{}/subscriptions", &app.address))
            .header("Content-Type", "application/x-www-form-urlencoded")
            .body(invalid_body)
            .send()
            .await
            .expect("Failed to send request.");

        // Assert
        assert_eq!(
            400,
            response.status().as_u16(),
            // Customized error message on test failure
            "The API did not fail with 400 Bad Request when the payload was {}.",
            error_message
        );
    }
}
圖一: 隨機的生成資料庫的名稱

如圖一所示, 我們每一次執行 cargo test, 都會隨機的生成一資料庫的名稱。

 

結論:

    Rust 產生 PostgreSQL 資料庫連接池與實現測試隔離需了解與注意的 「關鍵點」:

  • sqlx::PgConnection::connect, sqlx::PgPool::connect 必需是 「clone-able」 的。
  • 測試隔離: 生成沒有指定資料庫名稱的 「連接字串」; 以一隨機生成 Uuid 的方式, 產生資料庫的名稱。

一點都不複雜, 同意嗎 ?

完整的代碼, 請參考: https://github.com/KenFang/StoringData

About Post Author

方俊賢; Ken Fang

專利號: 201910652769.4; 一種深度學習的演算法, 預測微服務持續發布、持續部署後對產品整體質量的影響, 獲得國家知識財產局專利; 符合專利法實施細則第 44 條的規定。
Happy
Happy
0 %
Sad
Sad
0 %
Excited
Excited
0 %
Sleepy
Sleepy
0 %
Angry
Angry
0 %
Surprise
Surprise
0 %

Average Rating

5 Star
0%
4 Star
0%
3 Star
0%
2 Star
0%
1 Star
0%

發表回復

您的電子郵箱地址不會被公開。

此站點使用Akismet來減少垃圾評論。了解我們如何處理您的評論數據