9次发布
0.1.8 | 2024年6月4日 |
---|---|
0.1.7 | 2024年5月12日 |
0.1.5 | 2024年2月3日 |
0.1.4 | 2023年12月22日 |
#234 在 数据库接口
5MB
3K SLoC
mongor
基于Rust的MongoDB ODM,构建在官方mongodb
crate之上。提供一个直观的接口与底层驱动交互,同时提供了一个简化的测试框架,以便编写测试套件更加直观。
目录
需求
-
MongoDB部署(版本3.6+)
-
Rust(版本1.6+)
安装
cargo添加mongor
文档
此README提供了概述和实用示例,但并未涵盖所有方法。 完整的crate文档可以在docs.rs上找到
使用
概述
虽然该crate公开了core
模块,但为该crate设计的主要接口是Model<D>
结构体。该结构体控制一个通用的D
,它代表模型要控制的文档的结构。以下示例中将展示使用模型的一般模式。
基本示例
use mongor::{
core::find::FindManyCursor,
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
results::*,
Client, Collection,
},
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client_options = ClientOptions::parse("mongodb://127.0.0.1:27017/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let collection: Collection<Shark> = db.collection("shark");
let model: Model<Shark> = Model::from(collection);
// Applying an index
model
.apply_unique_index(doc! { "species": 1 }, Some("species_unique"), None, None)
.await?;
// Inserting a document
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let document_oid: ObjectId = model.insert_one(new_shark, None, None).await?;
// Updating a document
let update_result: UpdateResult = model
.update_one_by_oid(
&document_oid,
doc! {
"$inc": {
"sightings": 1,
},
},
None,
None,
)
.await?;
// Finding a document
let shark: Option<Shark> = model
.find_one_by_field_value("sightings", 1, None, None)
.await?;
println!("{:?}", shark);
// Log example:
// ```
// Some(Shark {
// oid: Some(ObjectId("65712bdfb1fb166eb8cce7e5")),
// name: "Whale Shark",
// species: "Rhincodon typus",
// sightings: 1,
// })
// ```
// Deleting a document
let delete_result: DeleteResult = model
.delete_one(
doc! {
"_id": &document_oid,
},
None,
None,
)
.await?;
// Inserting many
let sharks = vec![
Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
},
Shark {
oid: None,
name: "Great White".to_string(),
species: "Carcharodon carcharias".to_string(),
sightings: 0,
},
];
let document_oids: Vec<ObjectId> = model.insert_many(sharks, None, None).await?;
// Cursor-ing via a `findMany` query
let options = FindOptions::builder().sort(doc! { "name": -1 }).build();
let mut cursor: FindManyCursor<Shark> = model
.find_many(doc! { "sightings": 0 }, Some(options), None)
.await?;
while let Some(shark) = cursor.next().await? {
// shark: Shark
}
// Updating many
let update_result: UpdateResult = model
.update_many(
doc! { "sightings": 0 },
doc! { "$inc": { "sightings": 5 }},
None,
None,
)
.await?;
// Deleting many
let delete_result: DeleteResult = model.delete_many(doc! {}, None, None).await?;
db.drop(None).await?;
Ok(())
}
会话
官方参考
https://www.mongodb.com/docs/manual/reference/method/Session/
https://www.mongodb.com/docs/manual/replication/
概述
请注意,要使用MongoDB的会话和事务,目前您需要一个副本集部署。设置此部署非常简单,您只需在副本设置中运行多个mongod
(MongoDB服务器)实例即可。
您可以通过shell中的方法启动此类集合,该方法称为rs.initiate()
。下面是一个设置具有三个节点的副本集的示例。
首先,连接到主端口上的MongoDB实例:比如说27017
mongo --port 27017
然后,调用rs.initiate()
以启动具有指定成员的副本集
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "localhost:27017" },
{ _id: 1, host: "localhost:27018" },
{ _id: 2, host: "localhost:27019" }
]
});
然后,您需要为每个集合设置独立的数据目录
mkdir ~/data/mongo_rp_1 &&
mkdir ~/data/mongo_rp_2 &&
mkdir ~/data/mongo_rp_3
接下来,您可以启动三个独立的实例,在配置的主机下部署副本集
sudo mongod --port 27017 --dbpath ~/data/mongo_rp_1 --replSet rs0
---
sudo mongod --port 27018 --dbpath ~/data/mongo_rp_2 --replSet rs0
---
sudo mongod --port 27019 --dbpath ~/data/mongo_rp_3 --replSet rs0
现在,您已经部署了一个名为 rs0
的副本集,包含三个节点。
基本示例
use mongor::{
core::{
find::FindManyCursor,
session::{commit_transaction, start_session, start_transaction},
},
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
results::*,
Client, Collection,
},
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: We pass all three of our nodes hosts to the client options
let client_options =
ClientOptions::parse("mongodb://127.0.0.1:27017,localhost:27018,localhost:27019/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let collection: Collection<Shark> = db.collection("shark");
let model: Model<Shark> = Model::from(collection);
let mut session = start_session(&client, None).await?;
start_transaction(&mut session, None).await?;
// Inserting a document with a session
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let document_oid: ObjectId = model
.insert_one(new_shark, None, Some(&mut session))
.await?;
commit_transaction(&mut session).await?;
// Cursor-ing via a `findMany` query with a session
let options = FindOptions::builder().sort(doc! { "name": -1 }).build();
let mut cursor: FindManyCursor<Shark> = model
.find_many(doc! { "sightings": 0 }, Some(options), Some(&mut session))
.await?;
while let Some(shark) = cursor.next_with_session(&mut session).await? {
// shark: Shark
}
db.drop(None).await?;
Ok(())
}
测试
概述
通过 test_db
模块提供了一个简单的测试框架。其通用用例是每个测试创建一个新的 TestDB
,这将创建一个与新生成的 ObjectId
相关的 Database
,以便于处理并发测试套件,这在像 tokio
这样的框架中很常见。
然后,提供了一个名为 TestDB::run_test
的方法来接管这个数据库并运行一个隔离的测试闭包,其中数据库在测试成功或失败时将被清理(删除)。
test_db::asset_model::AssertModel
提供了预定义断言,它包装了一个 Model<D>
。同时还有一个模块 test_db::assert_document::*
,它提供了有关文档的一般断言。还提供了一个来自 test_db::test_error
的宏 test_error!
,它在测试闭包中像 panic!
一样工作。闭包接受一个 Future<Output = Result<(), Box<dyn std::error::Error>>>
,允许使用 ?
操作符来结束测试。
基本示例
use mongor::mongodb::bson::{doc, oid::ObjectId};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
#[cfg(test)]
mod tests {
use mongor::{
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::ClientOptions,
Client, Collection,
},
test_db::{assert_model::AssertModel, test_error::TestError, TestDB},
test_error,
};
use crate::Shark;
#[tokio::test]
pub async fn example_test() {
let client_options = ClientOptions::parse("mongodb://127.0.0.1:27017/")
.await
.unwrap();
let client = Client::with_options(client_options).unwrap();
// Opens a new isolated `Database` named with a `ObjectId`
let test_db = TestDB::new(&client);
let shark_collection: Collection<Shark> = test_db.collection("shark");
let shark_model = Model::from(shark_collection);
// Run isolated test, cleaning up the Database on failure or success
test_db
.run_test(
|| async {
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let oid = shark_model.insert_one(new_shark, None, None).await?;
// Using pre-defined assertions from `test_db::assert_model`,
// which call `test_error!` under the hood
let assert_shark_model = AssertModel::from(&shark_model);
assert_shark_model
.assert_exists(doc! { "name": "Whale Shark" }, None)
.await?
.assert_count(doc! {}, 1, None)
.await?;
// Using the `test_error!` macro from `test_db::test_error`,
// which acts like `panic!` inside the closure
let some_other_oid = ObjectId::new();
if oid == some_other_oid {
test_error!("Inserted shark oid {} == {}", oid, some_other_oid);
}
Ok(())
},
None,
)
.await
.unwrap();
}
}
GridFS
官方参考
https://www.mongodb.com/docs/manual/core/gridfs/
概述
使用 grid_fs
库特性,您可以公开访问 GridFs
结构体,它是一个对底层 GridFsBucket
的强大接口。此特性添加了一个依赖项 futures_util
。
许多方法涉及 修订版
编号的概念。由于文件文档上的 name
字段不是唯一的,我们有能力有一个文件的多个版本,它们可以定义为
0 = the original stored file
1 = the first revision
2 = the second revision
etc...
-2 = the second most recent revision
-1 = the most recent revision
流式传输(上传和下载)的方法通过在 mongodb
包中使用 futures_util::io::*
实现,因此如果您使用 tokio
,则必须添加一个依赖项 tokio_util
,它提供 tokio_util::compat::*
。此模块将公开 compat()
和 compat_write()
,以将 tokio 异步读或写转换为 Compat<_>
,它可以直接与 futures_util::io::AsyncRead
和 futures_util::io::AsyncWrite
一起使用。
请参阅基本示例或:https://docs.rs/tokio-util/latest/tokio_util/compat/index.html
获取更多信息。
基本示例
use futures_util::io::Cursor;
use mongor::{
grid_fs::GridFs,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
Client,
},
};
use tokio::fs::*;
use tokio_util::compat::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client_options = ClientOptions::parse("mongodb://127.0.0.1:27017/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let bucket_options = GridFsBucketOptions::builder()
.bucket_name(Some("shark_images".to_string()))
.build();
let grid_fs = GridFs::new(&db, Some(bucket_options));
// Upload a file
let file = File::open("./test_data/shark-0.png").await?;
let revision_0_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;
// Upload a revision of a file
let file = File::open("./test_data/shark-1.jpg").await?;
let revision_1_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;
// Get a `core::find::FindManyCursor` of the `FilesCollectionDocument`s
let mut cursor = grid_fs.find_many(doc! {}, None).await?;
while let Some(file_document) = cursor.next().await? {
// file_document: FilesCollectionDocument
}
// Download the most recent revision of a file
let mut writer = Cursor::new(vec![]);
grid_fs.download("shark", &mut writer, None).await?;
let file_data: Vec<u8> = writer.into_inner();
// Download a specific revision of a file
let mut writer = Cursor::new(vec![]);
grid_fs.download("shark", &mut writer, Some(0)).await?;
let file_data: Vec<u8> = writer.into_inner();
// Rename a file (All revisions as None was provided to revision)
grid_fs
.rename_by_filename("shark", "new_shark", None)
.await?;
// Delete a file (All revisions as None was provided to revision)
grid_fs.delete_by_filename("new_shark", None).await?;
db.drop(None).await?;
Ok(())
}
错误报告
请通过创建一个 issue
来报告错误,或者如果您知道一个充分的解决方案,请随时提交一个 PR,但请遵循下面的 Contributing
指南。
为了报告一个错误,它必须直接与此包相关,并且您必须尽可能提供更多信息,例如
-
代码示例
-
错误消息
-
重现步骤
-
系统信息(如果适用)
功能请求
如果您觉得有些东西缺失,或者当前包的某种变体需要除 mongodb
和 serde
之外的其他依赖项(例如,GridFs
需要 future_util
,因此它是一个独立的功能);请创建一个 issue
来提出请求,并讨论为什么您认为它应该作为此包的一部分,而不是第三方包,因为我计划实现一个 mongor-extras
包来添加具有更明确范围的附加功能,并希望保持此包尽可能无偏见和灵活。
贡献
我欢迎任何人为此包做出贡献。但我有一些一般要求
-
更改应保持包无偏见,即尽可能使用会话和来自
mongodb::options::*
的选项,而不是自定义选项。 -
任何附加或修改的方法都需要进行 100% 测试覆盖率单元测试,应放置在
tests
模块中。 -
任何添加额外依赖项的更改应作为单独的功能创建。
-
所有当前单元测试都必须通过,即运行
cargo test
并全部通过。 -
如果尚未添加,请将您的姓名或昵称添加到
CONTRIBUTORS.md
中,以及文件标题注释中的Authors
部分。 -
如果添加新依赖项,请更新此 README 中的
License::Third Party
以符合其许可。
如果你的更改符合这些指南,请随时打开一个PR。
项目状态
我计划在可预见的未来维护这个crate,crate API可能会发生变化,尽管我预计会是简化和新功能,而不是删除或对当前功能集进行重大修改。
许可证
MIT
更多信息请参见LICENSE.md
第三方
这个crate建立在以下crate之上
-
serde
crate,受Apache License Version 2.0或MIT许可,在此查看Apache License,或在此查看MIT License。 -
futures_util
crate(用于gird_fs
功能),受Apache License Version 2.0或MIT许可,在此查看Apache License,或在此查看MIT License。 -
tokio_util
crate(用于内部测试),受MIT许可,在此查看。
依赖
~24–36MB
~662K SLoC