#actix-web #sqlx #tx #web #transaction #actix

actix-web-sqlx-tx

简单的 Actix SQL TX 支持

7 个版本

新版本 0.1.6 2024年8月11日
0.1.5 2024年7月28日

712Web编程

Download history 517/week @ 2024-07-26 32/week @ 2024-08-02 117/week @ 2024-08-09

每月666次下载

Apache-2.0

17KB
253 代码行

Actix SQLX TX

支持在 Actix 网络框架中运行 SQLX 事务。

行为

在 Actix 网络框架中运行 SQLX 事务不是一件简单的事情。此库提供了一种在 Actix 网络框架中运行 SQLX 事务的方法。在 with_tx 函数内部编写代码,并从中返回一个 ScopedBoxFuture。该 ScopedBoxFuture 将在事务上下文中执行。如果 ScopedBoxFuture 返回一个 Ok 值,则事务将被提交。如果 Future 返回一个 Err 值,则事务将被回滚。

使用 pgsql 驱动程序的示例


use actix_web::{HttpServer, post, web};
use actix_web::web::Data;
use actix_web_sqlx_tx::http::{ok, Response};
use actix_web_sqlx_tx::tx::with_tx;
use chrono::NaiveDateTime;
use scoped_futures::ScopedFutureExt;
use serde::Deserialize;
use sqlx::{PgPool, Postgres, query_as, Transaction};
use sqlx::postgres::PgPoolOptions;

pub struct User {
    pub id: i32,
    pub email: String,
    pub password: String,
    pub created_at: NaiveDateTime,
    pub organization_id: i32,
}

pub struct Organization {
    pub id: i32,
    pub name: String,
    pub created_at: NaiveDateTime,
}

pub async fn create_organization<'a>(
    name: impl Into<String>,
    transaction: &mut Transaction<'a, Postgres>,
) -> Result<Organization, sqlx::Error> {
    let name = name.into();
    let created_at = chrono::Local::now().naive_utc();
    let organization = query_as!(
        Organization,
        "INSERT INTO organizations (name, created_at) VALUES ($1, $2) RETURNING *",
        name,
        created_at
    )
        .fetch_one(&mut **transaction)
        .await?;
    Ok(organization)
}

pub async fn create_new_user<'a>(
    email: impl Into<String>,
    password: impl Into<String>,
    organization_id: i32,
    transaction: &mut Transaction<'a, Postgres>,
) -> Result<User, sqlx::Error> {
    let email = email.into();
    let password = password.into();
    let created_at = chrono::Local::now().naive_utc();
    let user = query_as!(
        User,
        "INSERT INTO users (organization_id, email, password, created_at) VALUES ($1, $2, $3, $4) RETURNING *",
        organization_id,
        email,
        password,
        created_at,
    )
        .fetch_one(&mut **transaction)
        .await?;
    Ok(user)
}

#[derive(serde::Serialize)]
struct CreateUserResponse {
    message: String,
}

#[derive(Deserialize)]
struct CreateUserRequest {
    org_name: String,
    email: String,
    password: String,
}

#[post("/users")]
async fn create_user(
    create_user_request: web::Json<CreateUserRequest>,
    pool: Data<PgPool>,
) -> Response {
    with_tx(&pool, |tx| {
        async move {
            // create new org
            let organization =
                create_organization(create_user_request.org_name.clone(), tx).await?;

            //create new user in org
            let user = create_new_user(
                create_user_request.email.clone(),
                create_user_request.password.clone(),
                organization.id,
                tx,
            )
                .await?;

            //if create new user fails for some reason, we can rollback the transaction

            ok(CreateUserResponse {
                message: format!("User {} created", user.email),
            })
        }
            .scope_boxed()
    })
        .await
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let pool = Data::new(
        PgPoolOptions::new()
            .max_connections(10)
            .connect("postgres://username:password@pgsql:5432/dbname")
            .await
            .expect("Failed to create pool"),
    );

    HttpServer::new(move || {
        actix_web::App::new()
            .app_data(pool.clone())
            .service(create_user)
    })
        .bind(("0.0.0.0", 9091))
        .unwrap()
        .run()
        .await
}


// testing your fn

#[cfg(test)]
mod tests {
    use actix_sqlx_tx::tx::tests::with_tx;
    use scoped_futures::ScopedFutureExt;

    use super::*;

    #[actix_rt::test]
    async fn test_create_new_user() {
        let pool = ...;
        // this actix_sqlx_tx::tx::tests::with_tx will be rolled back at end
        with_tx(&pool, |mut tx| {
            async move {
                let email = "someemail".to_string();
                let password = "somepassword".to_string();
                let user = create_new_user(email.clone(), password.clone(), &mut tx)
                    .await
                    .unwrap();
                assert_eq!(user.email, email.clone());
            }
                .scope_boxed()
        })
            .await;
    }
}


依赖项

~25–38MB
~654K SLoC