#cassandra #macro-derive #cql #struct #derive #generation #table-name

cassandra_macro

从 Rust 结构体创建 Cassandra 表和 CRUD CQL 预处理语句。(自定义 derive: cassandra_macro_derive)

4 个版本

0.1.3 2022 年 5 月 10 日
0.1.2 2020 年 5 月 11 日
0.1.1 2020 年 5 月 10 日
0.1.0 2020 年 5 月 3 日

#1564 in Rust 模式

MIT 许可证

27KB
104

从 Rust 结构体创建 Cassandra 表和 CRUD CQL 语句

自定义 derive 以简化从 Rust 结构体创建 Cassandra 表

这个包旨在减少创建和执行基本 Cassandra 语句时的样板代码。我们可以看到 Rust 结构体持有执行基本操作的元数据。在这个包之前,您需要编写 Cassandra 表的定义,创建 Rust 结构体,并手动编写所有操作的 CQL 语句。比重复元数据更糟糕的是,当你添加一个新表或更改表名时,如果表不再存在,那么这些语句就会爆炸。

这个包并不完美,欢迎您贡献力量。

安装

[dependencies]                  
cdrs = { version = "2" }        
cassandra_macro = "0.1.3"       
cassandra_macro_derive = "0.1.3"

derive API

pub trait CassandraTable {
    /// key space
    fn key_space() -> &'static str;

    /// Table name
    fn table_name() -> &'static str;

    /// CQL for table creation
    fn create_table_cql() -> &'static str;

    /// CQL for drop table
    fn drop_table_cql() -> &'static str;

    /// Prepared statement for selection by primary keys
    fn select_by_primary_keys(projection: Projection) -> String;

    /// Prepared statement for selection by primary keys and cluster keys
    fn select_by_primary_and_cluster_keys(projection: Projection) -> String;

    /// Prepared statement for update by primary keys
    fn update_by_primary_keys(columns: Vec<String>) -> String;

    /// Prepared statement for update by primary keys and cluster keys
    fn update_by_primary_and_cluster_keys(columns: Vec<String>) -> String;

    /// Prepared statement for delete by primary keys
    fn delete_by_primary_keys() -> String;

    /// Prepared statement for delete by primary keys and cluster key
    fn delete_by_primary_and_cluster_keys() -> String;

    /// Create `StoreQuery` containing the prepared statement
    /// to store this entity
    fn store_query(&self) -> StoreQuery;

    /// Create `UpdateQuery` containing the prepared statement
    /// to update this entity
    ///
    /// The statement only can update columns that are not
    /// part of the primary keys.
    fn update_query(&self) -> Result<UpdateQuery, TableWithNoUpdatableColumnsError>;

    /// Create `DeleteQuery` containing the prepared statement
    /// to delete this entity
    fn delete_query(&self) -> DeleteQuery;
}

完整示例

#[macro_use]
extern crate cdrs;

use std::sync::Arc;

use cassandra_macro::{CassandraTable, DeleteQuery, Projection, UpdateQuery};
use cassandra_macro::StoreQuery;
use cassandra_macro_derive::CassandraTable;
use cdrs::authenticators::StaticPasswordAuthenticator;
use cdrs::cluster::{ClusterTcpConfig, NodeTcpConfigBuilder, TcpConnectionPool};
use cdrs::cluster::session::{new_lz4, Session};
use cdrs::Error as CassandraDriverError;
use cdrs::frame::TryFromRow;
use cdrs::load_balancing::RoundRobinSync;
use cdrs::query::{QueryExecutor, QueryValues};
use cdrs::types::ByName;
use cdrs::types::rows::Row;
use cdrs::types::value::Value;
use chrono::Utc;
use uuid::Uuid;

#[table(keyspace = "test", options = "comment='Only for RUST users' | COMPACTION = {'class':'SizeTieredCompactionStrategy'}")]
#[derive(Debug, CassandraTable)]
pub struct User {
    #[column(type = "TEXT", primary_key)]
    username: String,

    #[column(type = "UUID")]
    user_internal_id: Uuid,

    #[column(type = "TEXT")]
    first_name: String,

    #[column(type = "TIMESTAMP", cluster_key(order = "ASC", position = 1))]
    created: i64,

    #[column(type = "TIMESTAMP")]
    updated: i64,
}

impl User {
    fn set_first_name(&mut self, first_name: String) {
        self.first_name = first_name;
    }
}

impl Default for User {
    fn default() -> Self {
        User {
            username: "Rust".to_string(),
            user_internal_id: Uuid::new_v4(),
            first_name: "rust".to_string(),
            created: Utc::now().timestamp_millis(),
            updated: Utc::now().timestamp_millis(),
        }
    }
}

impl TryFromRow for User {
    fn try_from_row(row: Row) -> Result<Self, cdrs::Error> {
        let username = row.r_by_name::<String>("username")?;
        let user_internal_id = row.r_by_name::<Uuid>("user_internal_id")?;
        let first_name = row.r_by_name::<String>("first_name")?;
        let created: i64 = row.r_by_name::<i64>("created")?;
        let updated: i64 = row.r_by_name::<i64>("updated")?;

        Ok(User {
            username,
            user_internal_id,
            first_name,
            created,
            updated,
        })
    }
}


pub struct CassandraConfig {
    nodes: Vec<String>,
    user: String,
    password: String,
}

pub struct CassandraDriver {
    connection: Arc<Session<RoundRobinSync<TcpConnectionPool<StaticPasswordAuthenticator>>>>
}

impl CassandraDriver {
    pub fn execute_simple_statement<Q: ToString>(&self, query: Q) -> Result<bool, CassandraDriverError> {
        match self.connection.query(query) {
            Ok(_) => Ok(true),
            Err(e) => {
                Err(e)
            }
        }
    }

    pub fn execute_store_query(&self, query: &StoreQuery) -> Result<bool, CassandraDriverError> {
        self.execute_query(query.query(), query.values())
    }

    pub fn execute_update_query(&self, query: &UpdateQuery) -> Result<bool, CassandraDriverError> {
        self.execute_query(query.query(), query.values())
    }

    pub fn execute_delete_query(&self, query: &DeleteQuery) -> Result<bool, CassandraDriverError> {
        self.execute_query(query.query(), query.values())
    }

    pub fn execute_query(&self, query: &String, values: &QueryValues) -> Result<bool, CassandraDriverError> {
        let result = self.connection
            .query_with_values(query, values.to_owned());

        result.map(|_| true)
    }

    pub fn find<T: TryFromRow + CassandraTable>(&self, keys: Vec<String>) -> Result<Option<T>, CassandraDriverError> {
        let stmt = T::select_by_primary_keys(Projection::All);

        let values = keys.iter().map(|k| Value::from(k.to_string())).collect::<Vec<Value>>();

        let result_frame = self.connection.query_with_values(stmt, QueryValues::SimpleValues(values))?;

        Ok(result_frame.get_body()?.into_rows()
            .map(|r| { r.first().map(|r| T::try_from_row(r.to_owned()).unwrap()) }).flatten())
    }

    pub fn new_from_config(cassandra_configs: &CassandraConfig) -> Self {
        let mut nodes = Vec::with_capacity(cassandra_configs.nodes.len());

        for node in cassandra_configs.nodes.iter() {
            let authenticator: StaticPasswordAuthenticator = StaticPasswordAuthenticator::new(cassandra_configs.user.clone(),
                                                                                              cassandra_configs.password.clone());

            let node_tcp = NodeTcpConfigBuilder::new(node.as_str(), authenticator).build();

            nodes.push(node_tcp);
        }

        let cluster_config = ClusterTcpConfig(nodes);

        let cassandra_session = new_lz4(&cluster_config, RoundRobinSync::new())
            .expect("Cassandra session must be created");

        CassandraDriver {
            connection: Arc::new(cassandra_session)
        }
    }
}

fn main() {
    let driver_conf = CassandraConfig {
        nodes: vec!["aella:9042".to_string()],
        user: String::from("test"),
        password: String::from("test"),
    };

    let connection = CassandraDriver::new_from_config(&driver_conf);

    println!("Keyspace:.{}.", User::key_space());
    println!("Table name:.{}.", User::table_name());
    println!("Creating table:{}", User::create_table_cql());
    connection.execute_simple_statement(User::create_table_cql()).expect("Must create table");

    println!("You can test those by yourself");
    println!("{}", User::select_by_primary_keys(Projection::Columns(vec!["created".to_string()])));
    println!("{}", User::select_by_primary_and_cluster_keys(Projection::All));
    println!("{}", User::update_by_primary_keys(vec!["updated".to_string()]));
    println!("{}", User::update_by_primary_and_cluster_keys(vec!["updated".to_string()]));
    println!("{}", User::delete_by_primary_keys());
    println!("{}", User::delete_by_primary_and_cluster_keys());

    let mut rust_user = User::default();

    println!("Storing rust: {}", rust_user.store_query().query());
    connection.execute_store_query(&rust_user.store_query()).expect("User must be stored");

    let rust_user_from_db: Option<User> = connection.find::<User>(vec!["Rust".to_string()]).unwrap();
    assert!(rust_user_from_db.unwrap().username.eq(&rust_user.username), "Must be the same");

    println!("Update rust:{}", rust_user.update_query().unwrap().query());
    rust_user.set_first_name(String::from("IamRoot"));

    connection.execute_update_query(&rust_user.update_query().unwrap()).unwrap();

    let rust_user_from_db_1 = connection.find::<User>(vec!["Rust".to_string()]).unwrap();

    assert!(rust_user_from_db_1.unwrap().username.eq(&rust_user.username), "Must be the same");

    println!("Delete:{}", rust_user.delete_query().query());
    connection.execute_delete_query(&rust_user.delete_query()).expect("Must be deleted");

    println!("Dropping table: {}", User::drop_table_cql());
    connection.execute_simple_statement(User::drop_table_cql()).expect("Table must be removed");
       
    /// Results of the println
    /// Keyspace:test.
    /// Table name:user
    /// Creating table:CREATE TABLE IF NOT EXISTS test.user  (user_internal_id UUID,first_name TEXT,username TEXT,created TIMESTAMP,updated TIMESTAMP, PRIMARY KEY ((username), created) ) WITH CLUSTERING ORDER BY (created ASC) AND comment='Only for RUST users'  AND  COMPACTION = {'class':'SizeTieredCompactionStrategy'}
    /// You can test those by yourself
    /// SELECT created FROM test.user WHERE  username=? 
    /// SELECT * FROM test.user WHERE  username=?  AND  created=? 
    /// UPDATE test.user SET  updated=? WHERE  username=? 
    /// UPDATE test.user SET  updated=? WHERE  username=?  AND  created=? 
    /// DELETE FROM test.user WHERE  username=? 
    /// DELETE FROM test.user WHERE  username=?  AND  created=? 
    /// Storing rust: INSERT INTO test.user (user_internal_id,first_name,username,created,updated) VALUES (?,?,?,?,?)
    /// Update rust:UPDATE test.user SET user_internal_id=?,first_name=?,updated=? WHERE username=? AND created=?
    /// Delete:DELETE FROM test.user WHERE username=? AND created=?
    /// Dropping table: DROP TABLE IF EXISTS test.user
}

具有 主键 的示例

use cassandra_macro::Cassandra;
use cassandra_macro_derive::Cassandra;

#[table(keyspace = "fog")]
#[derive(Debug, Cassandra)]
pub struct TestRust {
    #[column(type = "TEXT", primary_key)]
    key_one: String,
}

具有 复合键 的示例

use cassandra_macro::Cassandra;
use cassandra_macro_derive::Cassandra;

#[table(keyspace = "fog")]
#[derive(Debug, Cassandra)]
pub struct TestRust {
    #[column(type = "UUID", compound_key(position = 2))]
    key_one: String,

    #[column(type = "TEXT", compound_key(position = 1))]
    key_two: String,

}

具有 复合键 & 分区键 & 选项 的示例

use cassandra_macro::Cassandra;
use cassandra_macro_derive::Cassandra;

/// Options are separated with '|' char
#[table(keyspace = "fog", options = "comment='From RUST' | COMPACTION = { 'class' : 'SizeTieredCompactionStrategy' }")]
#[derive(Debug, Cassandra)]
pub struct TestRust {
    #[column(type = "UUID", compound_key(position = 2))]
    key_one: String,

    #[column(type = "TEXT", compound_key(position = 1))]
    key_two: String,

    #[column(type = "TEXT", static)]
    rust_version: String,

    #[column(type = "TIMESTAMP", cluster_key(order = "DESC", position = 1))]
    created: i64,

    updated: i64, // Field with no annotation is ignored
}

依赖

~2.2–7.5MB
~51K SLoC