9次发布

0.1.8 2024年6月4日
0.1.7 2024年5月12日
0.1.5 2024年2月3日
0.1.4 2023年12月22日

#234数据库接口

MIT 许可证

5MB
3K SLoC

mongor

基于Rust的MongoDB ODM,构建在官方mongodb crate之上。提供一个直观的接口与底层驱动交互,同时提供了一个简化的测试框架,以便编写测试套件更加直观。

目录

需求

安装

cargo添加mongor

文档

此README提供了概述和实用示例,但并未涵盖所有方法。 完整的crate文档可以在docs.rs上找到

使用

概述

虽然该crate公开了core模块,但为该crate设计的主要接口是Model<D>结构体。该结构体控制一个通用的D,它代表模型要控制的文档的结构。以下示例中将展示使用模型的一般模式。

基本示例

use mongor::{
    core::find::FindManyCursor,
    model::Model,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        results::*,
        Client, Collection,
    },
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client_options = ClientOptions::parse("mongodb://127.0.0.1:27017/").await?;
    let client = Client::with_options(client_options)?;
    let db = client.database("db_name");
    let collection: Collection<Shark> = db.collection("shark");

    let model: Model<Shark> = Model::from(collection);

    // Applying an index
    model
        .apply_unique_index(doc! { "species": 1 }, Some("species_unique"), None, None)
        .await?;

    // Inserting a document
    let new_shark = Shark {
        oid: None,
        name: "Whale Shark".to_string(),
        species: "Rhincodon typus".to_string(),
        sightings: 0,
    };

    let document_oid: ObjectId = model.insert_one(new_shark, None, None).await?;

    // Updating a document
    let update_result: UpdateResult = model
        .update_one_by_oid(
            &document_oid,
            doc! {
                "$inc": {
                    "sightings": 1,
                },
            },
            None,
            None,
        )
        .await?;

    // Finding a document
    let shark: Option<Shark> = model
        .find_one_by_field_value("sightings", 1, None, None)
        .await?;

    println!("{:?}", shark);
    // Log example:
    // ```
    // Some(Shark {
    //     oid: Some(ObjectId("65712bdfb1fb166eb8cce7e5")),
    //     name: "Whale Shark",
    //     species: "Rhincodon typus",
    //     sightings: 1,
    // })
    // ```

    // Deleting a document
    let delete_result: DeleteResult = model
        .delete_one(
            doc! {
                "_id": &document_oid,
            },
            None,
            None,
        )
        .await?;

    // Inserting many
    let sharks = vec![
        Shark {
            oid: None,
            name: "Whale Shark".to_string(),
            species: "Rhincodon typus".to_string(),
            sightings: 0,
        },
        Shark {
            oid: None,
            name: "Great White".to_string(),
            species: "Carcharodon carcharias".to_string(),
            sightings: 0,
        },
    ];

    let document_oids: Vec<ObjectId> = model.insert_many(sharks, None, None).await?;

    // Cursor-ing via a `findMany` query
    let options = FindOptions::builder().sort(doc! { "name": -1 }).build();

    let mut cursor: FindManyCursor<Shark> = model
        .find_many(doc! { "sightings": 0 }, Some(options), None)
        .await?;

    while let Some(shark) = cursor.next().await? {
        // shark: Shark
    }

    // Updating many
    let update_result: UpdateResult = model
        .update_many(
            doc! { "sightings": 0 },
            doc! { "$inc": { "sightings": 5 }},
            None,
            None,
        )
        .await?;

    // Deleting many
    let delete_result: DeleteResult = model.delete_many(doc! {}, None, None).await?;

    db.drop(None).await?;

    Ok(())
}

会话

官方参考

https://mongodb.ac.cn/docs/manual/reference/method/Session/

https://mongodb.ac.cn/docs/manual/replication/

概述

请注意,要使用MongoDB的会话和事务,目前您需要一个副本集部署。设置此部署非常简单,您只需在副本设置中运行多个mongod(MongoDB服务器)实例即可。

您可以通过shell中的方法启动此类集合,该方法称为rs.initiate()。下面是一个设置具有三个节点的副本集的示例。

首先,连接到主端口上的MongoDB实例:比如说27017

mongo --port 27017

然后,调用rs.initiate()以启动具有指定成员的副本集

rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "localhost:27017" },
    { _id: 1, host: "localhost:27018" },
    { _id: 2, host: "localhost:27019" }
  ]
});

然后,您需要为每个集合设置独立的数据目录

mkdir ~/data/mongo_rp_1 &&
mkdir ~/data/mongo_rp_2 &&
mkdir ~/data/mongo_rp_3

接下来,您可以启动三个独立的实例,在配置的主机下部署副本集

sudo mongod --port 27017 --dbpath ~/data/mongo_rp_1 --replSet rs0
---
sudo mongod --port 27018 --dbpath ~/data/mongo_rp_2 --replSet rs0
---
sudo mongod --port 27019 --dbpath ~/data/mongo_rp_3 --replSet rs0

现在,您已经部署了一个名为 rs0 的副本集,包含三个节点。

基本示例

use mongor::{
    core::{
        find::FindManyCursor,
        session::{commit_transaction, start_session, start_transaction},
    },
    model::Model,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        results::*,
        Client, Collection,
    },
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Note: We pass all three of our nodes hosts to the client options
    let client_options =
        ClientOptions::parse("mongodb://127.0.0.1:27017,localhost:27018,localhost:27019/").await?;
    let client = Client::with_options(client_options)?;
    let db = client.database("db_name");
    let collection: Collection<Shark> = db.collection("shark");

    let model: Model<Shark> = Model::from(collection);

    let mut session = start_session(&client, None).await?;

    start_transaction(&mut session, None).await?;

    // Inserting a document with a session
    let new_shark = Shark {
        oid: None,
        name: "Whale Shark".to_string(),
        species: "Rhincodon typus".to_string(),
        sightings: 0,
    };

    let document_oid: ObjectId = model
        .insert_one(new_shark, None, Some(&mut session))
        .await?;

    commit_transaction(&mut session).await?;

    // Cursor-ing via a `findMany` query with a session
    let options = FindOptions::builder().sort(doc! { "name": -1 }).build();

    let mut cursor: FindManyCursor<Shark> = model
        .find_many(doc! { "sightings": 0 }, Some(options), Some(&mut session))
        .await?;

    while let Some(shark) = cursor.next_with_session(&mut session).await? {
        // shark: Shark
    }

    db.drop(None).await?;

    Ok(())
}

测试

概述

通过 test_db 模块提供了一个简单的测试框架。其通用用例是每个测试创建一个新的 TestDB,这将创建一个与新生成的 ObjectId 相关的 Database,以便于处理并发测试套件,这在像 tokio 这样的框架中很常见。

然后,提供了一个名为 TestDB::run_test 的方法来接管这个数据库并运行一个隔离的测试闭包,其中数据库在测试成功或失败时将被清理(删除)。

test_db::asset_model::AssertModel 提供了预定义断言,它包装了一个 Model<D>。同时还有一个模块 test_db::assert_document::*,它提供了有关文档的一般断言。还提供了一个来自 test_db::test_error 的宏 test_error!,它在测试闭包中像 panic! 一样工作。闭包接受一个 Future<Output = Result<(), Box<dyn std::error::Error>>>,允许使用 ? 操作符来结束测试。

基本示例

use mongor::mongodb::bson::{doc, oid::ObjectId};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    Ok(())
}

#[cfg(test)]
mod tests {
    use mongor::{
        model::Model,
        mongodb::{
            bson::{doc, oid::ObjectId},
            options::ClientOptions,
            Client, Collection,
        },
        test_db::{assert_model::AssertModel, test_error::TestError, TestDB},
        test_error,
    };

    use crate::Shark;

    #[tokio::test]
    pub async fn example_test() {
        let client_options = ClientOptions::parse("mongodb://127.0.0.1:27017/")
            .await
            .unwrap();
        let client = Client::with_options(client_options).unwrap();

        // Opens a new isolated `Database` named with a `ObjectId`
        let test_db = TestDB::new(&client);

        let shark_collection: Collection<Shark> = test_db.collection("shark");
        let shark_model = Model::from(shark_collection);

        // Run isolated test, cleaning up the Database on failure or success
        test_db
            .run_test(
                || async {
                    let new_shark = Shark {
                        oid: None,
                        name: "Whale Shark".to_string(),
                        species: "Rhincodon typus".to_string(),
                        sightings: 0,
                    };

                    let oid = shark_model.insert_one(new_shark, None, None).await?;

                    // Using pre-defined assertions from `test_db::assert_model`,
                    // which call `test_error!` under the hood

                    let assert_shark_model = AssertModel::from(&shark_model);

                    assert_shark_model
                        .assert_exists(doc! { "name": "Whale Shark" }, None)
                        .await?
                        .assert_count(doc! {}, 1, None)
                        .await?;

                    // Using the `test_error!` macro from `test_db::test_error`,
                    // which acts like `panic!` inside the closure
                    let some_other_oid = ObjectId::new();

                    if oid == some_other_oid {
                        test_error!("Inserted shark oid {} == {}", oid, some_other_oid);
                    }

                    Ok(())
                },
                None,
            )
            .await
            .unwrap();
    }
}

GridFS

官方参考

https://mongodb.ac.cn/docs/manual/core/gridfs/

概述

使用 grid_fs 库特性,您可以公开访问 GridFs 结构体,它是一个对底层 GridFsBucket 的强大接口。此特性添加了一个依赖项 futures_util

许多方法涉及 修订版 编号的概念。由于文件文档上的 name 字段不是唯一的,我们有能力有一个文件的多个版本,它们可以定义为

0 = the original stored file
1 = the first revision
2 = the second revision
etc...
-2 = the second most recent revision
-1 = the most recent revision

流式传输(上传和下载)的方法通过在 mongodb 包中使用 futures_util::io::* 实现,因此如果您使用 tokio,则必须添加一个依赖项 tokio_util,它提供 tokio_util::compat::*。此模块将公开 compat()compat_write(),以将 tokio 异步读或写转换为 Compat<_>,它可以直接与 futures_util::io::AsyncReadfutures_util::io::AsyncWrite 一起使用。

请参阅基本示例或:https://docs.rs/tokio-util/latest/tokio_util/compat/index.html 获取更多信息。

基本示例

use futures_util::io::Cursor;
use mongor::{
    grid_fs::GridFs,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        Client,
    },
};
use tokio::fs::*;
use tokio_util::compat::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client_options = ClientOptions::parse("mongodb://127.0.0.1:27017/").await?;
    let client = Client::with_options(client_options)?;

    let db = client.database("db_name");

    let bucket_options = GridFsBucketOptions::builder()
        .bucket_name(Some("shark_images".to_string()))
        .build();

    let grid_fs = GridFs::new(&db, Some(bucket_options));

    // Upload a file
    let file = File::open("./test_data/shark-0.png").await?;
    let revision_0_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;

    // Upload a revision of a file
    let file = File::open("./test_data/shark-1.jpg").await?;
    let revision_1_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;

    // Get a `core::find::FindManyCursor` of the `FilesCollectionDocument`s
    let mut cursor = grid_fs.find_many(doc! {}, None).await?;

    while let Some(file_document) = cursor.next().await? {
        // file_document: FilesCollectionDocument
    }

    // Download the most recent revision of a file
    let mut writer = Cursor::new(vec![]);
    grid_fs.download("shark", &mut writer, None).await?;

    let file_data: Vec<u8> = writer.into_inner();

    // Download a specific revision of a file
    let mut writer = Cursor::new(vec![]);
    grid_fs.download("shark", &mut writer, Some(0)).await?;

    let file_data: Vec<u8> = writer.into_inner();

    // Rename a file (All revisions as None was provided to revision)
    grid_fs
        .rename_by_filename("shark", "new_shark", None)
        .await?;

    // Delete a file (All revisions as None was provided to revision)
    grid_fs.delete_by_filename("new_shark", None).await?;

    db.drop(None).await?;

    Ok(())
}

错误报告

请通过创建一个 issue 来报告错误,或者如果您知道一个充分的解决方案,请随时提交一个 PR,但请遵循下面的 Contributing 指南。

为了报告一个错误,它必须直接与此包相关,并且您必须尽可能提供更多信息,例如

  • 代码示例

  • 错误消息

  • 重现步骤

  • 系统信息(如果适用)

功能请求

如果您觉得有些东西缺失,或者当前包的某种变体需要除 mongodbserde 之外的其他依赖项(例如,GridFs 需要 future_util,因此它是一个独立的功能);请创建一个 issue 来提出请求,并讨论为什么您认为它应该作为此包的一部分,而不是第三方包,因为我计划实现一个 mongor-extras 包来添加具有更明确范围的附加功能,并希望保持此包尽可能无偏见和灵活。

贡献

我欢迎任何人为此包做出贡献。但我有一些一般要求

  • 更改应保持包无偏见,即尽可能使用会话和来自 mongodb::options::* 的选项,而不是自定义选项。

  • 任何附加或修改的方法都需要进行 100% 测试覆盖率单元测试,应放置在 tests 模块中。

  • 任何添加额外依赖项的更改应作为单独的功能创建。

  • 所有当前单元测试都必须通过,即运行 cargo test 并全部通过。

  • 如果尚未添加,请将您的姓名或昵称添加到 CONTRIBUTORS.md 中,以及文件标题注释中的 Authors 部分。

  • 如果添加新依赖项,请更新此 README 中的 License::Third Party 以符合其许可。

如果你的更改符合这些指南,请随时打开一个PR。

项目状态

我计划在可预见的未来维护这个crate,crate API可能会发生变化,尽管我预计会是简化和新功能,而不是删除或对当前功能集进行重大修改。

许可证

MIT

更多信息请参见LICENSE.md

第三方

这个crate建立在以下crate之上

依赖

~24–36MB
~662K SLoC