#mysql #wasi #database-client #database-driver #connection-pool #async

mysql_async_wasi

基于Tokio的异步MySql客户端库

8个版本

0.33.0 2023年12月21日
0.31.6 2023年11月11日
0.31.5 2023年9月7日
0.31.4 2023年8月17日
0.30.0 2022年10月1日

#1274数据库接口

Download history 172/week @ 2024-04-16 141/week @ 2024-04-23 19/week @ 2024-04-30 3/week @ 2024-05-14 8/week @ 2024-05-21 2/week @ 2024-06-11 17/week @ 2024-06-18 7/week @ 2024-06-25

每月336 次下载
用于 mega_etl

MIT/Apache

430KB
9K SLoC

mysql_async for WebAssembly

基于Tokio的异步MySql客户端库,适用于Rust编程语言。这是从原始的mysql_async分叉而来,支持WebAssembly编译目标。这允许异步MySql应用程序作为轻量级且安全的替代方案在Linux容器中运行。

有关更多详细信息和使用示例,请参阅原始的mysql_async源代码和此示例

安装

[dependencies]
mysql_async_wasi = "<desired version>"

连接URL参数

该驱动程序支持一组URL参数(请参阅Opts上的文档)。

示例

use mysql_async::prelude::*;

#[derive(Debug, PartialEq, Eq, Clone)]
struct Payment {
    customer_id: i32,
    amount: i32,
    account_name: Option<String>,
}

#[tokio::main]
async fn main() -> Result<()> {
    let payments = vec![
        Payment { customer_id: 1, amount: 2, account_name: None },
        Payment { customer_id: 3, amount: 4, account_name: Some("foo".into()) },
        Payment { customer_id: 5, amount: 6, account_name: None },
        Payment { customer_id: 7, amount: 8, account_name: None },
        Payment { customer_id: 9, amount: 10, account_name: Some("bar".into()) },
    ];

    let database_url = /* ... */
    # get_opts();

    let pool = mysql_async::Pool::new(database_url);
    let mut conn = pool.get_conn().await?;

    // Create a temporary table
    r"CREATE TEMPORARY TABLE payment (
        customer_id int not null,
        amount int not null,
        account_name text
    )".ignore(&mut conn).await?;

    // Save payments
    r"INSERT INTO payment (customer_id, amount, account_name)
      VALUES (:customer_id, :amount, :account_name)"
        .with(payments.iter().map(|payment| params! {
            "customer_id" => payment.customer_id,
            "amount" => payment.amount,
            "account_name" => payment.account_name.as_ref(),
        }))
        .batch(&mut conn)
        .await?;

    // Load payments from the database. Type inference will work here.
    let loaded_payments = "SELECT customer_id, amount, account_name FROM payment"
        .with(())
        .map(&mut conn, |(customer_id, amount, account_name)| Payment { customer_id, amount, account_name })
        .await?;

    // Dropped connection will go to the pool
    drop(conn);

    // The Pool must be disconnected explicitly because
    // it's an asynchronous operation.
    pool.disconnect().await?;

    assert_eq!(loaded_payments, payments);

    // the async fn returns Result, so
    Ok(())
}

连接池

Pool结构是异步连接池。

请注意

  • Pool是智能指针 – 每个克隆都将指向相同的池实例。
  • PoolSend + Sync + 'static – 可以随意传递它。
  • 使用 Pool::disconnect 来优雅地关闭池。
  • Pool::new 是懒加载的,不会断言服务器可用性。

事务

Conn::start_transaction 是一个包装器,它以 START TRANSACTION 开始,并以 COMMITROLLBACK 结束。

如果事务没有被显式提交或回滚,则已丢弃的事务将被隐式回滚。请注意,此行为可能由连接池(在连接丢弃时)或下一个查询触发,即可能被延迟。

API 不会允许您运行嵌套事务,因为某些语句会导致隐式提交(START TRANSACTION 就是这样之一),因此选择这种行为以减少错误。

此枚举表示 MySQL 单元的原始值。库通过以下描述的 FromValue 特性提供了 Value 与不同 Rust 类型的转换。

FromValue 特性

此特性由 mysql_common 创建重新导出。有关支持的转换列表,请参阅其 crate 文档

特性提供两种风格的转换

  • from_value(Value) -> T - 方便,但会引发恐慌的转换。

    请注意,对于 Value 的任何变体,都存在一个完全覆盖其域的类型,即对于 Value 的任何变体,都存在 T: FromValue,使得 from_value 从不引发恐慌。这意味着,如果您的数据库模式已知,则可以仅使用 from_value 编写应用程序,而不必担心运行时恐慌。

    此外,请注意,即使类型看似足够,某些转换也可能失败,例如在无效日期的情况下(参见 sql 模式)。

  • from_value_opt(Value) -> Option<T> - 非恐慌,但不太方便的转换。

    此函数在源数据库模式未知的情况下进行转换测试时很有用。

MySQL 查询协议

文本协议

MySQL 文本协议在 Queryable::query* 方法中实现,并且在 prelude::Query 特性中实现,如果查询是 prelude::AsQuery。当您的查询没有参数时,这很有用。

注意:服务器将所有文本协议结果集的值编码为字符串,因此 from_value 转换可能会导致额外的解析成本。

二进制协议和预编译语句。

MySQL二进制协议在exec*方法集中实现,这些方法定义在prelude::Queryable特性和prelude::Query特性中,如果查询是QueryWithParams。预编译语句是传递Rust值到MySQL服务器的唯一方式。MySQL使用?符号作为参数占位符。

注意:只能在期望单个MySQL值的地方使用参数,即不能执行类似SELECT ... WHERE id IN ?的操作,其中参数是一个向量。你需要构建一个看起来像SELECT ... WHERE id IN (?, ?, ...)的查询,并将每个向量元素作为参数传递。

命名参数

MySQL本身不支持命名参数,因此客户端实现了它。应使用:name作为命名参数的占位符语法。命名参数使用以下命名约定

  • 参数名称必须以_a..z开头
  • 参数名称可以继续使用_a..z0..9

注意:这些规则意味着,例如,语句SELECT :fooBar将被转换为SELECT ?Bar,因此请务必小心。

命名参数可以在语句中重复使用,例如SELECT :foo, :foo将需要一个单个的命名参数foo,该参数将在语句执行期间在相应的位置重复。

应使用params!宏来构建执行参数。

注意:在单个语句中不能混合位置参数和命名参数。

语句

在MySQL中,每个预编译语句都属于特定的连接,不能在另一个连接上执行。尝试这样做将导致错误。驱动程序不会以任何方式将语句绑定到其连接,但可以通过查看包含在Statement结构中的连接ID来进行检查。

LOCAL INFILE处理器

警告:您应该注意LOAD DATA LOCAL的安全注意事项

存在两种LOCAL INFILE处理器 - 全局局部

如果服务器请求LOCAL INFILE,驱动程序将尝试为其找到一个处理器

  1. 如果连接上安装了任何局部处理器,它将尝试使用该处理器;
  2. 如果提供了任何,它将尝试使用通过OptsBuilder::local_infile_handler指定的全局处理器;
  3. 如果没有找到处理器,将发出 LocalInfileError::NoHandler 错误。

处理器(局部全局)的目的是返回 InfileData

全局 LOCAL INFILE 处理器

请参阅 prelude::GlobalHandler

简单来说,全局处理器是一个异步函数,它接受一个文件名(作为 &[u8])并返回 Result<InfileData>

您可以使用 OptsBuilder::local_infile_handler 来设置它。如果没有为连接安装 局部 处理器,服务器将使用它。此处理器可能会被多次调用。

示例

  1. WhiteListFsHandler 是一个 全局 处理器。
  2. 每个 T: Fn(&[u8]) -> BoxFuture<'static, Result<InfileData, LocalInfileError>> 都是一个 全局 处理器。

局部 LOCAL INFILE 处理器。

简单来说,局部处理器是一个返回 Result<InfileData> 的未来。

这是一个一次性处理器 - 使用后就会被消耗。您可以使用 Conn::set_infile_handler 来设置它。此处理器的优先级高于 全局 处理器。

需要注意的是

  1. impl Drop for Conn 将清除 局部 处理器,即处理器将在连接返回到 Pool 时被删除。
  2. Conn::reset 将清除 局部 处理器。

示例

#
let pool = mysql_async::Pool::new(database_url);

let mut conn = pool.get_conn().await?;
"CREATE TEMPORARY TABLE tmp (id INT, val TEXT)".ignore(&mut conn).await?;

// We are going to call `LOAD DATA LOCAL` so let's setup a one-time handler.
conn.set_infile_handler(async move {
    // We need to return a stream of `io::Result<Bytes>`
    Ok(stream::iter([Bytes::from("1,a\r\n"), Bytes::from("2,b\r\n3,c")]).map(Ok).boxed())
});

let result = r#"LOAD DATA LOCAL INFILE 'whatever'
    INTO TABLE `tmp`
    FIELDS TERMINATED BY ',' ENCLOSED BY '\"'
    LINES TERMINATED BY '\r\n'"#.ignore(&mut conn).await;

match result {
    Ok(()) => (),
    Err(Error::Server(ref err)) if err.code == 1148 => {
        // The used command is not allowed with this MySQL version
        return Ok(());
    },
    Err(Error::Server(ref err)) if err.code == 3948 => {
        // Loading local data is disabled;
        // this must be enabled on both the client and the server
        return Ok(());
    }
    e @ Err(_) => e.unwrap(),
}

// Now let's verify the result
let result: Vec<(u32, String)> = conn.query("SELECT * FROM tmp ORDER BY id ASC").await?;
assert_eq!(
    result,
    vec![(1, "a".into()), (2, "b".into()), (3, "c".into())]
);

drop(conn);
pool.disconnect().await?;

测试

测试使用以下环境变量

  • DATABASE_URL - 默认为 mysql://root:[email protected]:3307/mysql
  • COMPRESS - 设置为 1true 以启用测试的压缩
  • SSL - 设置为 1true 以启用测试的 TLS

您可以使用 doker 运行一个测试服务器。请注意,有关最大允许数据包、本地文件和二进制日志的参数对于正确运行测试是必需的(请参阅 azure-pipelines.yml

docker run -d --name container \
    -v `pwd`:/root \
    -p 3307:3306 \
    -e MYSQL_ROOT_PASSWORD=password \
    mysql:8.0 \
    --max-allowed-packet=36700160 \
    --local-infile \
    --log-bin=mysql-bin \
    --log-slave-updates \
    --gtid_mode=ON \
    --enforce_gtid_consistency=ON \
    --server-id=1

变更日志

在此 可用

许可

许可协议如下之一

任选其一。

贡献

除非您明确声明,否则根据 Apache-2.0 许可证定义的,您有意提交以包含在作品中的任何贡献,将按上述方式双许可,不附加任何额外条款或条件。

依赖

~16–34MB
~560K SLoC