31 个版本 (5 个重大更改)
新功能 0.7.5 | 2024年8月20日 |
---|---|
0.7.2 | 2024年7月16日 |
0.7.0 | 2024年6月12日 |
0.4.10 | 2024年3月23日 |
0.2.6 |
|
#121 在 数据库接口
每月下载量:446
用于 charybdis_macros
93KB
1.5K SLoC
Rust 编写的 ScyllaDB 和 Apache Cassandra ORM
⚠️ 此项目目前处于早期开发阶段。欢迎反馈和贡献!
Charybdis 是基于 ScyllaDB Rust 驱动程序 的 ORM 层,专注于易用性和性能
使用注意事项
- 为整个模型提供表达式的 CRUD 和复杂查询操作 API
- 通过使用自动生成的
partial_<model>!
宏,提供轻松处理模型字段子集的方法 - 通过使用自动生成的
find_<model>!
宏,提供轻松运行复杂查询的方法 - 自动迁移工具分析项目文件,并根据模型定义和数据库之间的差异运行迁移
性能考虑
- 使用预处理语句(分片/令牌感知)-> 绑定值
- 期望以
CachingSession
作为会话参数进行操作 - 查询是宏生成的字符串常量(运行时无连接)
- 通过使用
find_<model>!
宏,我们可以运行在编译时生成的复杂查询,作为&'static str
- 尽管它具有表达式的 API,但它是在 scylla_rust_driver 之上的一个薄层,并且它不会引入任何重大开销
目录
Charybdis 模型
定义表
use charybdis::macros::charybdis_model;
use charybdis::types::{Text, Timestamp, Uuid};
#[charybdis_model(
table_name = users,
partition_keys = [id],
clustering_keys = [],
global_secondary_indexes = [username],
local_secondary_indexes = [],
static_columns = []
)]
pub struct User {
pub id: Uuid,
pub username: Text,
pub email: Text,
pub created_at: Timestamp,
pub updated_at: Timestamp,
pub address: Address,
}
定义UDT
use charybdis::macros::charybdis_udt_model;
use charybdis::types::Text;
#[charybdis_udt_model(type_name = address)]
pub struct Address {
pub street: Text,
pub city: Text,
pub state: Option<Text>,
pub zip: Text,
pub country: Text,
}
请注意,为了使迁移能够正确检测每次迁移中的更改,type_name
必须与结构体名称匹配。所以如果我们有struct ReorderData
,我们必须使用#[charybdis_udt_model(type_name = reorderdata)]
- 不使用下划线。
定义物化视图
use charybdis::macros::charybdis_view_model;
use charybdis::types::{Text, Timestamp, Uuid};
#[charybdis_view_model(
table_name=users_by_username,
base_table=users,
partition_keys=[username],
clustering_keys=[id]
)]
pub struct UsersByUsername {
pub username: Text,
pub id: Uuid,
pub email: Text,
pub created_at: Timestamp,
pub updated_at: Timestamp,
}
生成的自动迁移查询将是
CREATE MATERIALIZED VIEW IF NOT EXISTS users_by_email
AS SELECT created_at, updated_at, username, email, id
FROM users
WHERE email IS NOT NULL AND id IS NOT NULL
PRIMARY KEY (email, id)
自动迁移
-
charybdis-migrate
允许自动迁移到数据库,无需手动编写迁移。它遍历项目文件,并根据模型定义和数据库之间的差异生成迁移。它支持以下操作- 创建新表
- 创建新列
- 删除列
- 更改字段类型(使用
--drop-and-replace
标志) - 创建二级索引
- 删除二级索引
- 创建UDT
- 创建物化视图
- 表选项
#[charybdis_model( table_name = commits, partition_keys = [object_id], clustering_keys = [created_at, id], global_secondary_indexes = [], local_secondary_indexes = [], table_options = r#" CLUSTERING ORDER BY (created_at DESC) AND gc_grace_seconds = 86400 "# )] #[derive(Serialize, Deserialize, Default)] pub struct Commit {...}
- ⚠️ 如果表已存在,表选项将生成没有
CLUSTERING ORDER
和COMPACT STORAGE
选项的 alter table 查询。
- ⚠️ 如果表已存在,表选项将生成没有
未添加模型删除。如果您删除了模型,您需要手动删除表。
-
运行迁移
cargo install charybdis-migrate migrate --hosts <host> --keyspace <your_keyspace> --drop-and-replace (optional)
⚠️ 如果您正在处理 现有 数据集,在运行迁移之前,您需要确保您的 **模型** 定义结构在表名、列名、列类型、分区键、聚类键和二级索引方面与数据库匹配,以避免意外更改结构。如果结构匹配,它将不会运行任何迁移。如上所述,如果表格没有模型定义,它将 不会 删除它。将来,我们将添加
modelize
命令,该命令将从现有数据源生成src/models
文件。 -
以编程方式运行迁移
在测试或开发环境中,我们可以以编程方式触发迁移
use charybdis_migrate::MigrationBuilder; let migration = MigrationBuilder::new() .keyspace("test") .drop_and_replace(true) .build(&session) .await; migration.run().await;
-
全局二级索引
如果我们有模型
#[charybdis_model( table_name = users, partition_keys = [id], clustering_keys = [], global_secondary_indexes = [username] )]
生成的查询将是:
CREATE INDEX ON users (username);
-
本地二级索引
针对分区键的范围索引
#[charybdis_model( table_name = menus, partition_keys = [location], clustering_keys = [name, price, dish_type], global_secondary_indexes = [], local_secondary_indexes = [dish_type] )]
生成的查询将是:
CREATE INDEX ON menus((location), dish_type);
基本操作
对于每个操作,您需要将相应的特质引入作用域。它们在 charybdis::operations
模块中定义。
插入
-
use charybdis::{CachingSession, Insert}; #[tokio::main] async fn main() { let session: &CachingSession; // init sylla session // init user let user: User = User { id, email: "[email protected]".to_string(), username: "charybdis".to_string(), created_at: Utc::now(), updated_at: Utc::now(), address: Some( Address { street: "street".to_string(), state: "state".to_string(), zip: "zip".to_string(), country: "country".to_string(), city: "city".to_string(), } ), }; // create user.insert().execute(&session).await; }
查找
-
按主键查找
let user = User {id, ..Default::default()}; let user = user.find_by_primary_key().execute(&session).await?;
-
按分区键查找
let users = User {id, ..Default::default()}.find_by_partition_key().execute(&session).await;
-
按关联主键查找
let users = User::find_by_primary_key_value(val: User::PrimaryKey).execute(&session).await;
-
可用的查找函数
use scylla::CachingSession; use charybdis::errors::CharybdisError; use charybdis::macros::charybdis_model; use charybdis::stream::CharybdisModelStream; use charybdis::types::{Date, Text, Uuid}; #[charybdis_model( table_name = posts, partition_keys = [date], clustering_keys = [category_id, title], global_secondary_indexes = [category_id], local_secondary_indexes = [title] )] pub struct Post { pub date: Date, pub category_id: Uuid, pub title: Text, } impl Post { async fn find_various(db_session: &CachingSession) -> Result<(), CharybdisError> { let date = Date::default(); let category_id = Uuid::new_v4(); let title = Text::default(); let posts: CharybdisModelStream<Post> = Post::find_by_date(date).execute(db_session).await?; let posts: CharybdisModelStream<Post> = Post::find_by_date_and_category_id(date, category_id).execute(db_session).await?; let posts: Post = Post::find_by_date_and_category_id_and_title(date, category_id, title.clone()).execute(db_session).await?; let post: Post = Post::find_first_by_date(date).execute(db_session).await?; let post: Post = Post::find_first_by_date_and_category_id(date, category_id).execute(db_session).await?; let post: Option<Post> = Post::maybe_find_first_by_date(date).execute(db_session).await?; let post: Option<Post> = Post::maybe_find_first_by_date_and_category_id(date, category_id).execute(db_session).await?; let post: Option<Post> = Post::maybe_find_first_by_date_and_category_id_and_title(date, category_id, title.clone()).execute(db_session).await?; // find by local secondary index let posts: CharybdisModelStream<Post> = Post::find_by_date_and_title(date, title.clone()).execute(db_session).await?; let post: Post = Post::find_first_by_date_and_title(date, title.clone()).execute(db_session).await?; let post: Option<Post> = Post::maybe_find_first_by_date_and_title(date, title.clone()).execute(db_session).await?; // find by global secondary index let posts: CharybdisModelStream<Post> = Post::find_by_category_id(category_id).execute(db_session).await?; let post: Post = Post::find_first_by_category_id(category_id).execute(db_session).await?; let post: Option<Post> = Post::maybe_find_first_by_category_id(category_id).execute(db_session).await?; Ok(()) } }
-
自定义过滤
让我们以我们的
Post
模型为例#[charybdis_model( table_name = posts, partition_keys = [category_id], clustering_keys = [date, title], global_secondary_indexes = [] )] pub struct Post {...}
我们自动生成了遵循以下约定的宏
find_<struct_name>!
的find_post!
宏。它可以用来创建自定义查询。以下代码将返回
Post
模型的流,并且查询将在编译时构建为&'static str
。// automatically generated macro rule let posts = find_post!("category_id in ? AND date > ?", (categor_vec, date)) .execute(session) .await?;
我们还可以使用
find_first_post!
宏来获取单个结果let post = find_first_post!("category_id in ? AND date > ? LIMIT 1", (date, categor_vec)) .execute(session) .await?;
如果我们只需要
Query
而不是结果,我们可以使用find_post_query!
宏let query = find_post_query!("date = ? AND category_id in ?", (date, categor_vec));
更新
-
let user = User::from_json(json); user.username = "scylla".to_string(); user.email = "[email protected]"; user.update().execute(&session).await;
-
收集
- 让我们以我们的
User
模型为例#[charybdis_model( table_name = users, partition_keys = [id], clustering_keys = [], )] pub struct User { id: Uuid, tags: Set<Text>, post_ids: List<Uuid>, }
- 为每个集合字段生成了
push_to_<field_name>
和pull_from_<field_name>
方法。let user: User; user.push_tags(vec![tag]).execute(&session).await; user.pull_tags(vec![tag]).execute(&session).await; user.push_post_ids(vec![tag]).execute(&session).await; user.pull_post_ids(vec![tag]).execute(&session).await;
- 让我们以我们的
-
计数器
- 让我们定义 post_counter 模型
#[charybdis_model( table_name = post_counters, partition_keys = [id], clustering_keys = [], )] pub struct PostCounter { id: Uuid, likes: Counter, comments: Counter, }
- 我们可以使用
increment_<field_name>
和decrement_<field_name>
方法来更新计数器字段。let post_counter: PostCounter; post_counter.increment_likes(1).execute(&session).await; post_counter.decrement_likes(1).execute(&session).await; post_counter.increment_comments(1).execute(&session).await; post_counter.decrement_comments(1).execute(&session).await;
- 让我们定义 post_counter 模型
删除
-
let user = User::from_json(json); user.delete().execute(&session).await;
-
宏生成的删除助手
让我们以我们的
Post
模型为例#[charybdis_model( table_name = posts, partition_keys = [date], clustering_keys = [categogry_id, title], global_secondary_indexes = []) ] pub struct Post { date: Date, category_id: Uuid, title: Text, id: Uuid, ... }
我们为从主键生成的最多 3 个字段有宏生成的函数。
Post::delete_by_date(date: Date).execute(&session).await?; Post::delete_by_date_and_category_id(date: Date, category_id: Uuid).execute(&session).await?; Post::delete_by_date_and_category_id_and_title(date: Date, category_id: Uuid, title: Text).execute(&session).await?;
-
自定义删除查询
我们可以使用
delete_post!
宏来创建自定义删除查询。delete_post!("date = ? AND category_id in ?", (date, category_vec)).execute(&session).await?
配置
每个操作都返回 CharybdisQuery
,在执行前可以通过方法链进行配置。
let user: User = User::find_by_id(id)
.consistency(Consistency::One)
.timeout(Some(Duration::from_secs(5)))
.execute(&app.session)
.await?;
let result: QueryResult = user.update().consistency(Consistency::One).execute(&session).await?;
支持的配置选项
一致性
串行一致性
时间戳
超时
页面大小
时间戳
批处理
使用 CharybdisModelBatch
操作来执行单个批处理中的多个操作。
-
批量操作
let users: Vec<User>; let batch = User::batch(); // inserts batch.append_inserts(users); // or updates batch.append_updates(users); // or deletes batch.append_deletes(users); batch.execute(&session).await?;
-
分块批量操作
分块批处理操作用于分块处理大量数据。
let users: Vec<User>; let chunk_size = 100; User::batch().chunked_inserts(&session, users, chunk_size).await?; User::batch().chunked_updates(&session, users, chunk_size).await?; User::batch().chunked_deletes(&session, users, chunk_size).await?;
-
批量配置
批处理操作可以在执行前通过方法链进行配置。
let batch = User::batch() .consistency(Consistency::One) .retry_policy(Some(Arc::new(DefaultRetryPolicy::new()))) .chunked_inserts(&session, users, 100) .await?;
我们也可以使用方法链将操作附加到批处理
let batch = User::batch() .consistency(Consistency::One) .retry_policy(Some(Arc::new(DefaultRetryPolicy::new()))) .append_update(&user_1) .append_update(&user_2) .execute(data.db_session()) .await?;
-
语句批处理
我们可以使用批处理语句来批量执行集合操作。
let batch = User::batch(); let users: Vec<User>; for user in users { batch.append_statement(User::PUSH_TAGS_QUERY, (vec![tag], user.id)); } batch.execute(&session).await;
部分模型
-
使用自动生成的
partial_<model>!
宏来在模型字段的子集上运行操作。此宏生成一个结构与原始模型相同的新结构,但只包含提供的字段。宏由#[charybdis_model]
自动生成。它遵循以下约定partial_<struct_name>!
。// auto-generated macro - available in crate::models::user partial_user!(UpdateUsernameUser, id, username);
现在我们有了新的结构体
UpdateUsernameUser
,它与User
模型等效,但只包含id
和username
字段。let mut update_user_username = UpdateUsernameUser { id, username: "updated_username".to_string(), }; update_user_username.update().execute(&session).await?;
-
部分模型注意事项
partial_<model>
要求在本地模型上使用#[derive(Default)]
partial_<model>
需要在定义中包含完整的键- 在
#charybdis_model
宏下方定义的所有派生项都将自动添加到部分模型中。 partial_<model>
结构体实现了与原生模型相同的字段属性,因此如果我们原生模型字段上存在#[serde(rename = "rootId")]
注解,那么它也会出现在部分模型字段上。partial_<model>
应该定义在原生模型相同的文件中,以便它可以重用原生模型所需的导入。
-
作为本地
如果我们需要在原生模型上运行操作,可以使用
as_native
方法。let native_user: User = update_user_username.as_native().find_by_primary_key().execute(&session).await?; // action that requires native model authorize_user(&native_user);
as_native
方法通过返回一个包含部分模型字段的新实例来工作。对于其他字段,它使用默认值。 -
建议的命名规范是
Purpose
+Original Struct Name
。例如:UpdateAdresssUser
、UpdateDescriptionPost
。
回调函数
回调是方便在模型在特定操作前后运行额外逻辑的方法。例如:
- 我们可以使用
before_insert
在插入前设置默认值和/或验证模型。 - 我们可以使用
after_update
来更新其他数据源,例如 Elasticsearch。
实现
- 假设我们定义了一个自定义扩展,该扩展将在每次更新帖子时更新 Elasticsearch 文档。
pub struct AppExtensions { pub elastic_client: ElasticClient, }
- 现在我们可以实现一个利用此扩展的回调。
#[charybdis_model(...)] pub struct Post {} impl ExtCallbacks for Post { type Extention = AppExtensions; type Error = AppError; // From<CharybdisError> // use before_insert to set default values async fn before_insert( &mut self, _session: &CachingSession, extension: &AppExtensions, ) -> Result<(), CustomError> { self.id = Uuid::new_v4(); self.created_at = Utc::now(); Ok(()) } // use before_update to set updated_at async fn before_update( &mut self, _session: &CachingSession, extension: &AppExtensions, ) -> Result<(), CustomError> { self.updated_at = Utc::now(); Ok(()) } // use after_update to update elastic document async fn after_update( &mut self, _session: &CachingSession, extension: &AppExtensions, ) -> Result<(), CustomError> { extension.elastic_client.update(...).await?; Ok(()) } // use after_delete to delete elastic document async fn after_delete( &mut self, _session: &CachingSession, extension: &AppExtensions, ) -> Result<(), CustomError> { extension.elastic_client.delete(...).await?; Ok(()) } }
-
可能的回调
before_insert
before_update
before_delete
after_insert
after_update
after_delete
-
触发回调函数
为了触发回调,我们使用<operation>_cb
方法:insert_cb
、update_cb
、delete_cb
根据特性行为。这使得我们可以在insert
和带有回调的插入(insert_cb
)之间有明确的区分。就像在主操作中一样,我们可以在执行之前配置回调操作查询。use charybdis::operations::{DeleteWithCallbacks, InsertWithCallbacks, UpdateWithCallbacks}; post.insert_cb(app_extensions).execute(&session).await; post.update_cb(app_extensions).execute(&session).await; post.delete_cb(app_extensions).consistency(Consistency::All).execute(&session).await;
集合
对于每个集合字段,我们得到以下内容:
PUSH_<field_name>_QUERY
静态字符串PUSH_<field_name>_IF_EXISTS_QUERY
静态字符串PULL_<field_name>_QUERY
静态字符串PULL_<field_name>_IF_EXISTS_QUERY
静态字符串push_<field_name>
方法push_<field_name>_if_exists
方法pull_<field_name>
方法pull_<field_name>_if_exists
方法
-
模型
#[charybdis_model( table_name = users, partition_keys = [id], clustering_keys = [] )] pub struct User { id: Uuid, tags: Set<Text>, post_ids: List<Uuid>, books_by_genre: Map<Text, Frozen<List<Text>>>, }
-
生成的收集查询
生成的查询将期望值作为第一个绑定值,而主键字段作为后续的绑定值。
impl User { const PUSH_TAGS_QUERY: &'static str = "UPDATE users SET tags = tags + ? WHERE id = ?"; const PUSH_TAGS_IF_EXISTS_QUERY: &'static str = "UPDATE users SET tags = tags + ? WHERE id = ? IF EXISTS"; const PULL_TAGS_QUERY: &'static str = "UPDATE users SET tags = tags - ? WHERE id = ?"; const PULL_TAGS_IF_EXISTS_QUERY: &'static str = "UPDATE users SET tags = tags - ? WHERE id = ? IF EXISTS"; const PUSH_POST_IDS_QUERY: &'static str = "UPDATE users SET post_ids = post_ids + ? WHERE id = ?"; const PUSH_POST_IDS_IF_EXISTS_QUERY: &'static str = "UPDATE users SET post_ids = post_ids + ? WHERE id = ? IF EXISTS"; const PULL_POST_IDS_QUERY: &'static str = "UPDATE users SET post_ids = post_ids - ? WHERE id = ?"; const PULL_POST_IDS_IF_EXISTS_QUERY: &'static str = "UPDATE users SET post_ids = post_ids - ? WHERE id = ? IF EXISTS"; const PUSH_BOOKS_BY_GENRE_QUERY: &'static str = "UPDATE users SET books_by_genre = books_by_genre + ? WHERE id = ?"; const PUSH_BOOKS_BY_GENRE_IF_EXISTS_QUERY: &'static str = "UPDATE users SET books_by_genre = books_by_genre + ? WHERE id = ? IF EXISTS"; const PULL_BOOKS_BY_GENRE_QUERY: &'static str = "UPDATE users SET books_by_genre = books_by_genre - ? WHERE id = ?"; const PULL_BOOKS_BY_GENRE_IF_EXISTS_QUERY: &'static str = "UPDATE users SET books_by_genre = books_by_genre - ? WHERE id = ? IF EXISTS"; }
现在我们可以在批量操作中使用此常量。
let batch = User::batch(); let users: Vec<User>; for user in users { batch.append_statement(User::PUSH_TAGS_QUERY, (vec![tag], user.id)); } batch.execute(&session).await;
-
生成的收集方法
为每个集合字段生成了
push_to_<field_name>
和pull_from_<field_name>
方法。let user: User::new(); user.push_tags(tags: HashSet<T>).execute(&session).await; user.push_tags_if_exists(tags: HashSet<T>).execute(&session).await; user.pull_tags(tags: HashSet<T>).execute(&session).await; user.pull_tags_if_exists(tags: HashSet<T>).execute(&session).await; user.push_post_ids(ids: Vec<T>).execute(&session).await; user.push_post_ids_if_exists(ids: Vec<T>).execute(&session).await; user.pull_post_ids(ids: Vec<T>).execute(&session).await; user.pull_post_ids_if_exists(ids: Vec<T>).execute(&session).await; user.push_books_by_genre(map: HashMap<K, V>).execute(&session).await; user.push_books_by_genre_if_exists(map: HashMap<K, V>).execute(&session).await; user.pull_books_by_genre(map: HashMap<K, V>).execute(&session).await; user.pull_books_by_genre_if_exists(map: HashMap<K, V>).execute(&session).await;
忽略的字段
我们可以通过使用 #[charybdis(ignore)]
属性来忽略字段
#[charybdis_model(...)]
pub struct User {
id: Uuid,
#[charybdis(ignore)]
organization: Option<Organization>,
}
因此,字段 organization
在所有操作中都将被忽略,并在从其他数据源反序列化时使用默认值。它可以用来存储不持久化在数据库中的数据。
依赖项
~12–22MB
~310K SLoC