31 个版本 (5 个重大更改)

新功能 0.7.5 2024年8月20日
0.7.2 2024年7月16日
0.7.0 2024年6月12日
0.4.10 2024年3月23日
0.2.6 2023年11月30日

#121数据库接口

Download history 122/week @ 2024-05-03 44/week @ 2024-05-10 17/week @ 2024-05-17 101/week @ 2024-05-24 77/week @ 2024-05-31 221/week @ 2024-06-07 52/week @ 2024-06-14 16/week @ 2024-06-21 18/week @ 2024-06-28 162/week @ 2024-07-05 171/week @ 2024-07-12 48/week @ 2024-07-19 186/week @ 2024-07-26 34/week @ 2024-08-02 11/week @ 2024-08-09 203/week @ 2024-08-16

每月下载量:446
用于 charybdis_macros

MIT 许可协议

93KB
1.5K SLoC

Rust 编写的 ScyllaDB 和 Apache Cassandra ORM

⚠️ 此项目目前处于早期开发阶段。欢迎反馈和贡献!

Crates.io License Docs.rs Discord

scylla_logo cassandra_logo

Charybdis 是基于 ScyllaDB Rust 驱动程序 的 ORM 层,专注于易用性和性能

使用注意事项

  • 为整个模型提供表达式的 CRUD 和复杂查询操作 API
  • 通过使用自动生成的 partial_<model>! 宏,提供轻松处理模型字段子集的方法
  • 通过使用自动生成的 find_<model>! 宏,提供轻松运行复杂查询的方法
  • 自动迁移工具分析项目文件,并根据模型定义和数据库之间的差异运行迁移

性能考虑

  • 使用预处理语句(分片/令牌感知)-> 绑定值
  • 期望以 CachingSession 作为会话参数进行操作
  • 查询是宏生成的字符串常量(运行时无连接)
  • 通过使用 find_<model>! 宏,我们可以运行在编译时生成的复杂查询,作为 &'static str
  • 尽管它具有表达式的 API,但它是在 scylla_rust_driver 之上的一个薄层,并且它不会引入任何重大开销

目录

Charybdis 模型

定义表

use charybdis::macros::charybdis_model;
use charybdis::types::{Text, Timestamp, Uuid};

#[charybdis_model(
    table_name = users,
    partition_keys = [id],
    clustering_keys = [],
    global_secondary_indexes = [username],
    local_secondary_indexes = [],
    static_columns = []
)]
pub struct User {
    pub id: Uuid,
    pub username: Text,
    pub email: Text,
    pub created_at: Timestamp,
    pub updated_at: Timestamp,
    pub address: Address,
}

定义UDT

 use charybdis::macros::charybdis_udt_model;
 use charybdis::types::Text;
 
 #[charybdis_udt_model(type_name = address)]
 pub struct Address {
     pub street: Text,
     pub city: Text,
     pub state: Option<Text>,
     pub zip: Text,
     pub country: Text,
 }

🚨 UDT字段必须与数据库中的顺序相同

请注意,为了使迁移能够正确检测每次迁移中的更改,type_name必须与结构体名称匹配。所以如果我们有struct ReorderData,我们必须使用#[charybdis_udt_model(type_name = reorderdata)] - 不使用下划线。

定义物化视图

use charybdis::macros::charybdis_view_model;
use charybdis::types::{Text, Timestamp, Uuid};

#[charybdis_view_model(
    table_name=users_by_username,
    base_table=users,
    partition_keys=[username],
    clustering_keys=[id]
)]
pub struct UsersByUsername {
    pub username: Text,
    pub id: Uuid,
    pub email: Text,
    pub created_at: Timestamp,
    pub updated_at: Timestamp,
}

生成的自动迁移查询将是

CREATE MATERIALIZED VIEW IF NOT EXISTS users_by_email
AS SELECT created_at, updated_at, username, email, id
FROM users
WHERE email IS NOT NULL AND id IS NOT NULL
PRIMARY KEY (email, id)

自动迁移

  • charybdis-migrate 允许自动迁移到数据库,无需手动编写迁移。它遍历项目文件,并根据模型定义和数据库之间的差异生成迁移。它支持以下操作

    • 创建新表
    • 创建新列
    • 删除列
    • 更改字段类型(使用 --drop-and-replace 标志)
    • 创建二级索引
    • 删除二级索引
    • 创建UDT
    • 创建物化视图
    • 表选项
        #[charybdis_model(
            table_name = commits,
            partition_keys = [object_id],
            clustering_keys = [created_at, id],
            global_secondary_indexes = [],
            local_secondary_indexes = [],
            table_options = r#"
                CLUSTERING ORDER BY (created_at DESC) 
                AND gc_grace_seconds = 86400
            "#
        )]
        #[derive(Serialize, Deserialize, Default)]
        pub struct Commit {...}
      
      • ⚠️ 如果表已存在,表选项将生成没有 CLUSTERING ORDERCOMPACT STORAGE 选项的 alter table 查询。

    未添加模型删除。如果您删除了模型,您需要手动删除表。

  • 运行迁移

    cargo install charybdis-migrate
    
    migrate --hosts <host> --keyspace <your_keyspace> --drop-and-replace (optional)
    

    ⚠️ 如果您正在处理 现有 数据集,在运行迁移之前,您需要确保您的 **模型** 定义结构在表名、列名、列类型、分区键、聚类键和二级索引方面与数据库匹配,以避免意外更改结构。如果结构匹配,它将不会运行任何迁移。如上所述,如果表格没有模型定义,它将 不会 删除它。将来,我们将添加 modelize 命令,该命令将从现有数据源生成 src/models 文件。

  • 以编程方式运行迁移

    在测试或开发环境中,我们可以以编程方式触发迁移

    use charybdis_migrate::MigrationBuilder;
    
    let migration = MigrationBuilder::new()
        .keyspace("test")
        .drop_and_replace(true)
        .build(&session)
        .await;
    
    migration.run().await;
    
  • 全局二级索引

    如果我们有模型

    #[charybdis_model(
        table_name = users,
        partition_keys = [id],
        clustering_keys = [],
        global_secondary_indexes = [username]
    )]
    

    生成的查询将是:CREATE INDEX ON users (username);

  • 本地二级索引

    针对分区键的范围索引

    #[charybdis_model(
        table_name = menus,
        partition_keys = [location],
        clustering_keys = [name, price, dish_type],
        global_secondary_indexes = [],
        local_secondary_indexes = [dish_type]
    )]
    

    生成的查询将是:CREATE INDEX ON menus((location), dish_type);

基本操作

对于每个操作,您需要将相应的特质引入作用域。它们在 charybdis::operations 模块中定义。

插入

  • use charybdis::{CachingSession, Insert};
    
    #[tokio::main]
    async fn main() {
      let session: &CachingSession; // init sylla session
      
      // init user
      let user: User = User {
        id,
        email: "[email protected]".to_string(),
        username: "charybdis".to_string(),
        created_at: Utc::now(),
        updated_at: Utc::now(),
        address: Some(
            Address {
                street: "street".to_string(),
                state: "state".to_string(),
                zip: "zip".to_string(),
                country: "country".to_string(),
                city: "city".to_string(),
            }
        ),
      };
    
      // create
      user.insert().execute(&session).await;
    }
    

查找

  • 按主键查找

      let user = User {id, ..Default::default()};
      let user = user.find_by_primary_key().execute(&session).await?;
    
  • 按分区键查找

      let users =  User {id, ..Default::default()}.find_by_partition_key().execute(&session).await;
    
  • 按关联主键查找

    let users = User::find_by_primary_key_value(val: User::PrimaryKey).execute(&session).await;
    
  • 可用的查找函数

    use scylla::CachingSession;
    use charybdis::errors::CharybdisError;
    use charybdis::macros::charybdis_model;
    use charybdis::stream::CharybdisModelStream;
    use charybdis::types::{Date, Text, Uuid};
    
    #[charybdis_model(
        table_name = posts,
        partition_keys = [date],
        clustering_keys = [category_id, title],
        global_secondary_indexes = [category_id],
        local_secondary_indexes = [title]
    )]
    pub struct Post {
        pub date: Date,
        pub category_id: Uuid,
        pub title: Text,
    }
    
    impl Post {
        async fn find_various(db_session: &CachingSession) -> Result<(), CharybdisError> {
           let date = Date::default();
           let category_id = Uuid::new_v4();
           let title = Text::default();
        
           let posts: CharybdisModelStream<Post> = Post::find_by_date(date).execute(db_session).await?;
           let posts: CharybdisModelStream<Post> = Post::find_by_date_and_category_id(date, category_id).execute(db_session).await?;
           let posts: Post = Post::find_by_date_and_category_id_and_title(date, category_id, title.clone()).execute(db_session).await?;
        
           let post: Post = Post::find_first_by_date(date).execute(db_session).await?;
           let post: Post = Post::find_first_by_date_and_category_id(date, category_id).execute(db_session).await?;
        
           let post: Option<Post> = Post::maybe_find_first_by_date(date).execute(db_session).await?;
           let post: Option<Post> = Post::maybe_find_first_by_date_and_category_id(date, category_id).execute(db_session).await?;
           let post: Option<Post> = Post::maybe_find_first_by_date_and_category_id_and_title(date, category_id, title.clone()).execute(db_session).await?;
        
           // find by local secondary index
           let posts: CharybdisModelStream<Post> = Post::find_by_date_and_title(date, title.clone()).execute(db_session).await?;
           let post: Post = Post::find_first_by_date_and_title(date, title.clone()).execute(db_session).await?;
           let post: Option<Post> = Post::maybe_find_first_by_date_and_title(date, title.clone()).execute(db_session).await?;
    
          // find by global secondary index
          let posts: CharybdisModelStream<Post> = Post::find_by_category_id(category_id).execute(db_session).await?;
          let post: Post = Post::find_first_by_category_id(category_id).execute(db_session).await?;
          let post: Option<Post> = Post::maybe_find_first_by_category_id(category_id).execute(db_session).await?;
        
          Ok(())
        }
    }
    
  • 自定义过滤

    让我们以我们的 Post 模型为例

    #[charybdis_model(
        table_name = posts, 
        partition_keys = [category_id], 
        clustering_keys = [date, title],
        global_secondary_indexes = []
    )]
    pub struct Post {...}
    

    我们自动生成了遵循以下约定的宏 find_<struct_name>!find_post! 宏。它可以用来创建自定义查询。

    以下代码将返回 Post 模型的流,并且查询将在编译时构建为 &'static str

    // automatically generated macro rule
    let posts = find_post!("category_id in ? AND date > ?", (categor_vec, date))
        .execute(session)
        .await?;
    

    我们还可以使用 find_first_post! 宏来获取单个结果

    let post = find_first_post!("category_id in ? AND date > ? LIMIT 1", (date, categor_vec))
        .execute(session)
        .await?;
    

    如果我们只需要 Query 而不是结果,我们可以使用 find_post_query!

    let query = find_post_query!("date = ? AND category_id in ?", (date, categor_vec));
    

更新

  • let user = User::from_json(json);
    
    user.username = "scylla".to_string();
    user.email = "[email protected]";
    
    user.update().execute(&session).await;
    
  • 收集

    • 让我们以我们的 User 模型为例
      #[charybdis_model(
          table_name = users,
          partition_keys = [id],
          clustering_keys = [],
      )]
      pub struct User {
          id: Uuid,
          tags: Set<Text>,
          post_ids: List<Uuid>,
      }
      
    • 为每个集合字段生成了 push_to_<field_name>pull_from_<field_name> 方法。
      let user: User;
      
      user.push_tags(vec![tag]).execute(&session).await;
      user.pull_tags(vec![tag]).execute(&session).await;
      
      user.push_post_ids(vec![tag]).execute(&session).await;
      user.pull_post_ids(vec![tag]).execute(&session).await;
      
  • 计数器

    • 让我们定义 post_counter 模型
      #[charybdis_model(
          table_name = post_counters,
          partition_keys = [id],
          clustering_keys = [],
      )]
      pub struct PostCounter {
          id: Uuid,
          likes: Counter,
          comments: Counter,
      }
      
    • 我们可以使用 increment_<field_name>decrement_<field_name> 方法来更新计数器字段。
      let post_counter: PostCounter;
      post_counter.increment_likes(1).execute(&session).await;
      post_counter.decrement_likes(1).execute(&session).await;
      
      post_counter.increment_comments(1).execute(&session).await;
      post_counter.decrement_comments(1).execute(&session).await;
      

删除

  • let user = User::from_json(json);
    
    user.delete().execute(&session).await;
    
  • 宏生成的删除助手

    让我们以我们的 Post 模型为例

    #[charybdis_model(
        table_name = posts,
        partition_keys = [date],
        clustering_keys = [categogry_id, title],
        global_secondary_indexes = [])
    ]
    pub struct Post {
        date: Date,
        category_id: Uuid,
        title: Text,
        id: Uuid,
        ...
    }
    

    我们为从主键生成的最多 3 个字段有宏生成的函数。

    Post::delete_by_date(date: Date).execute(&session).await?;
    Post::delete_by_date_and_category_id(date: Date, category_id: Uuid).execute(&session).await?;
    Post::delete_by_date_and_category_id_and_title(date: Date, category_id: Uuid, title: Text).execute(&session).await?;
    
  • 自定义删除查询

    我们可以使用 delete_post! 宏来创建自定义删除查询。

    delete_post!("date = ? AND category_id in ?", (date, category_vec)).execute(&session).await?
    

配置

每个操作都返回 CharybdisQuery,在执行前可以通过方法链进行配置。

let user: User = User::find_by_id(id)
    .consistency(Consistency::One)
    .timeout(Some(Duration::from_secs(5)))
    .execute(&app.session)
    .await?;
    
let result: QueryResult = user.update().consistency(Consistency::One).execute(&session).await?;

支持的配置选项

  • 一致性
  • 串行一致性
  • 时间戳
  • 超时
  • 页面大小
  • 时间戳

批处理

使用 CharybdisModelBatch 操作来执行单个批处理中的多个操作。

  • 批量操作

    let users: Vec<User>;
    let batch = User::batch();
    
    // inserts
    batch.append_inserts(users);
    
    // or updates
    batch.append_updates(users);
    
    // or deletes
    batch.append_deletes(users);
    
    batch.execute(&session).await?;
    
  • 分块批量操作

    分块批处理操作用于分块处理大量数据。

      let users: Vec<User>;
      let chunk_size = 100;
    
      User::batch().chunked_inserts(&session, users, chunk_size).await?;
      User::batch().chunked_updates(&session, users, chunk_size).await?;
      User::batch().chunked_deletes(&session, users, chunk_size).await?;
    
  • 批量配置

    批处理操作可以在执行前通过方法链进行配置。

    let batch = User::batch()
        .consistency(Consistency::One)
        .retry_policy(Some(Arc::new(DefaultRetryPolicy::new())))
        .chunked_inserts(&session, users, 100)
        .await?;
    

    我们也可以使用方法链将操作附加到批处理

    let batch = User::batch()
        .consistency(Consistency::One)
        .retry_policy(Some(Arc::new(DefaultRetryPolicy::new())))
        .append_update(&user_1)
        .append_update(&user_2)
        .execute(data.db_session())
        .await?;
    
  • 语句批处理

    我们可以使用批处理语句来批量执行集合操作。

    let batch = User::batch();
    let users: Vec<User>;
    
    for user in users {
        batch.append_statement(User::PUSH_TAGS_QUERY, (vec![tag], user.id));
    }
    
    batch.execute(&session).await;
    

部分模型

  • 使用自动生成的 partial_<model>! 宏来在模型字段的子集上运行操作。此宏生成一个结构与原始模型相同的新结构,但只包含提供的字段。宏由 #[charybdis_model] 自动生成。它遵循以下约定 partial_<struct_name>!

    // auto-generated macro - available in crate::models::user
    partial_user!(UpdateUsernameUser, id, username);
    

    现在我们有了新的结构体 UpdateUsernameUser,它与 User 模型等效,但只包含 idusername 字段。

    let mut update_user_username = UpdateUsernameUser {
        id,
        username: "updated_username".to_string(),
    };
    
    update_user_username.update().execute(&session).await?;
    
  • 部分模型注意事项

    • partial_<model> 要求在本地模型上使用 #[derive(Default)]
    • partial_<model> 需要在定义中包含完整的键
    • #charybdis_model 宏下方定义的所有派生项都将自动添加到部分模型中。
    • partial_<model> 结构体实现了与原生模型相同的字段属性,因此如果我们原生模型字段上存在 #[serde(rename = "rootId")] 注解,那么它也会出现在部分模型字段上。
    • partial_<model> 应该定义在原生模型相同的文件中,以便它可以重用原生模型所需的导入。
  • 作为本地

    如果我们需要在原生模型上运行操作,可以使用 as_native 方法。

    let native_user: User = update_user_username.as_native().find_by_primary_key().execute(&session).await?;
    // action that requires native model
    authorize_user(&native_user);
    

    as_native 方法通过返回一个包含部分模型字段的新实例来工作。对于其他字段,它使用默认值。

  • 建议的命名规范是 Purpose + Original Struct Name。例如: UpdateAdresssUserUpdateDescriptionPost

回调函数

回调是方便在模型在特定操作前后运行额外逻辑的方法。例如:

  • 我们可以使用 before_insert 在插入前设置默认值和/或验证模型。
  • 我们可以使用 after_update 来更新其他数据源,例如 Elasticsearch。

实现

  1. 假设我们定义了一个自定义扩展,该扩展将在每次更新帖子时更新 Elasticsearch 文档。
    pub struct AppExtensions {
        pub elastic_client: ElasticClient,
    }
    
  2. 现在我们可以实现一个利用此扩展的回调。
    #[charybdis_model(...)]
    pub struct Post {}
    
    impl ExtCallbacks for Post {
        type Extention = AppExtensions;
        type Error = AppError; // From<CharybdisError>
        
       // use before_insert to set default values
        async fn before_insert(
            &mut self,
            _session: &CachingSession,
            extension: &AppExtensions,
        ) -> Result<(), CustomError> {
            self.id = Uuid::new_v4();
            self.created_at = Utc::now();
            
            Ok(())
        }
        
        // use before_update to set updated_at
        async fn before_update(
            &mut self,
            _session: &CachingSession,
            extension: &AppExtensions,
        ) -> Result<(), CustomError> {
            self.updated_at = Utc::now();
            
            Ok(())
        }
    
        // use after_update to update elastic document
        async fn after_update(
            &mut self,
            _session: &CachingSession,
            extension: &AppExtensions,
        ) -> Result<(), CustomError> {
            extension.elastic_client.update(...).await?;
    
            Ok(())
        }
        
        // use after_delete to delete elastic document
        async fn after_delete(
            &mut self,
            _session: &CachingSession,
            extension: &AppExtensions,
        ) -> Result<(), CustomError> {
            extension.elastic_client.delete(...).await?;
    
            Ok(())
        }
    }
    
  • 可能的回调

    • before_insert
    • before_update
    • before_delete
    • after_insert
    • after_update
    • after_delete
  • 触发回调函数

    为了触发回调,我们使用 <operation>_cb 方法: insert_cbupdate_cbdelete_cb 根据特性行为。这使得我们可以在 insert 和带有回调的插入(insert_cb)之间有明确的区分。就像在主操作中一样,我们可以在执行之前配置回调操作查询。
     use charybdis::operations::{DeleteWithCallbacks, InsertWithCallbacks, UpdateWithCallbacks};
    
     post.insert_cb(app_extensions).execute(&session).await;
     post.update_cb(app_extensions).execute(&session).await;
     post.delete_cb(app_extensions).consistency(Consistency::All).execute(&session).await;
    

集合

对于每个集合字段,我们得到以下内容:

  • PUSH_<field_name>_QUERY 静态字符串
  • PUSH_<field_name>_IF_EXISTS_QUERY 静态字符串
  • PULL_<field_name>_QUERY 静态字符串
  • PULL_<field_name>_IF_EXISTS_QUERY 静态字符串
  • push_<field_name> 方法
  • push_<field_name>_if_exists 方法
  • pull_<field_name> 方法
  • pull_<field_name>_if_exists 方法
  1. 模型

    #[charybdis_model(
        table_name = users,
        partition_keys = [id],
        clustering_keys = []
    )]
    pub struct User {
        id: Uuid,
        tags: Set<Text>,
        post_ids: List<Uuid>,
        books_by_genre: Map<Text, Frozen<List<Text>>>,
    }
    
  2. 生成的收集查询

    生成的查询将期望值作为第一个绑定值,而主键字段作为后续的绑定值。

    impl User {
        const PUSH_TAGS_QUERY: &'static str = "UPDATE users SET tags = tags + ? WHERE id = ?";
        const PUSH_TAGS_IF_EXISTS_QUERY: &'static str = "UPDATE users SET tags = tags + ? WHERE id = ? IF EXISTS";
    
        const PULL_TAGS_QUERY: &'static str = "UPDATE users SET tags = tags - ? WHERE id = ?";
        const PULL_TAGS_IF_EXISTS_QUERY: &'static str = "UPDATE users SET tags = tags - ? WHERE id = ? IF EXISTS";
         
        const PUSH_POST_IDS_QUERY: &'static str = "UPDATE users SET post_ids = post_ids + ? WHERE id = ?";
        const PUSH_POST_IDS_IF_EXISTS_QUERY: &'static str = "UPDATE users SET post_ids = post_ids + ? WHERE id = ? IF EXISTS";
    
        const PULL_POST_IDS_QUERY: &'static str = "UPDATE users SET post_ids = post_ids - ? WHERE id = ?";
        const PULL_POST_IDS_IF_EXISTS_QUERY: &'static str = "UPDATE users SET post_ids = post_ids - ? WHERE id = ? IF EXISTS";
    
        const PUSH_BOOKS_BY_GENRE_QUERY: &'static str = "UPDATE users SET books_by_genre = books_by_genre + ? WHERE id = ?";
        const PUSH_BOOKS_BY_GENRE_IF_EXISTS_QUERY: &'static str = "UPDATE users SET books_by_genre = books_by_genre + ? WHERE id = ? IF EXISTS";
        
        const PULL_BOOKS_BY_GENRE_QUERY: &'static str = "UPDATE users SET books_by_genre = books_by_genre - ? WHERE id = ?";
        const PULL_BOOKS_BY_GENRE_IF_EXISTS_QUERY: &'static str = "UPDATE users SET books_by_genre = books_by_genre - ? WHERE id = ? IF EXISTS";
    }
    

    现在我们可以在批量操作中使用此常量。

    let batch = User::batch();
    let users: Vec<User>;
    
    for user in users {
        batch.append_statement(User::PUSH_TAGS_QUERY, (vec![tag], user.id));
    }
    
    batch.execute(&session).await;
    
  3. 生成的收集方法

    为每个集合字段生成了 push_to_<field_name>pull_from_<field_name> 方法。

    let user: User::new();
    
    user.push_tags(tags: HashSet<T>).execute(&session).await;
    user.push_tags_if_exists(tags: HashSet<T>).execute(&session).await;
    
    user.pull_tags(tags: HashSet<T>).execute(&session).await;
    user.pull_tags_if_exists(tags: HashSet<T>).execute(&session).await;
    
    
    user.push_post_ids(ids: Vec<T>).execute(&session).await;
    user.push_post_ids_if_exists(ids: Vec<T>).execute(&session).await;
    
    user.pull_post_ids(ids: Vec<T>).execute(&session).await;
    user.pull_post_ids_if_exists(ids: Vec<T>).execute(&session).await;
    
    user.push_books_by_genre(map: HashMap<K, V>).execute(&session).await;
    user.push_books_by_genre_if_exists(map: HashMap<K, V>).execute(&session).await;
    
    user.pull_books_by_genre(map: HashMap<K, V>).execute(&session).await;
    user.pull_books_by_genre_if_exists(map: HashMap<K, V>).execute(&session).await;
    

忽略的字段

我们可以通过使用 #[charybdis(ignore)] 属性来忽略字段

#[charybdis_model(...)]
pub struct User {
    id: Uuid,
    #[charybdis(ignore)]
    organization: Option<Organization>,
}

因此,字段 organization 在所有操作中都将被忽略,并在从其他数据源反序列化时使用默认值。它可以用来存储不持久化在数据库中的数据。

依赖项

~12–22MB
~310K SLoC