36个版本 (17个稳定版)
2.1.1 | 2020年10月28日 |
---|---|
1.3.11 | 2020年9月19日 |
0.4.5 | 2020年8月5日 |
0.4.1 | 2020年7月31日 |
#455 in HTTP客户端
63 每月下载次数
120KB
2K SLoC
晶体管
A Rust Crux客户端crate/lib。目前,这个crate打算支持两种与Crux交互的方式
- 通过带有
crux-standalone
版本的docker-hub的Docker。当前Docker镜像juxt/crux-standalone:20.09-1.11-0
。 - 通过
HTTP
使用HTTP API
。 - 异步支持。
- Clojure.api.(待评估。)
- FFI.(待评估。)
在第一版发布后可能会添加其他解决方案。
- Crux入门指南
- Crux常见问题解答
- 有关用法示例,请参阅示例目录或
ATM Crux
以获取更完整和交互式的示例。
双时态Crux
Crux通过使用一对事务时间和有效时间戳,针对高效和全局一致的时间点查询进行了优化。
典型的双时态记录系统通常依赖于在关系中显式跟踪有效从和有效到时间戳或范围类型。Crux提供的是一个非常简单的双时态文档模型,它在整个数据库中都是通用的,因此它不需要您预先考虑哪些历史信息值得存储在特殊的“双时态表”中。
可以通过在特定有效时间将一个或多个文档插入到Crux中,通过put事务,默认为事务时间(即现在),并且每个文档在通过put更新为新版本或通过delete删除之前都保持有效。
为什么?
时间 | 目的 |
---|---|
事务时间 | 用于审计目的,例如事件溯源等技术要求。 |
valid-time | 用于查询跨时间的数据,历史分析。 |
transaction-time
表示数据进入数据库的时间点。这为我们提供了一个审计轨迹,我们可以看到数据库在特定时间点的状态。您不能使用过去的时间点创建新的交易。
valid-time
是一个任意时间,可以来自上游系统,或默认设置为 transaction-time。有效时间通常是用户用于查询的目的。
参考 crux bitemporality 和 bitemporality 的价值
用法
要将此软件包添加到您的项目,您应该在 Cargo.toml
文件中的 dependencies
字段中添加以下行之一
[dependencies] transistor = "2.1.1"
创建 Crux 客户端
所有使用 Transistor 的操作都从模块 client
开始,使用 Crux::new("localhost", "3000")
开始。结构体 Crux
负责定义请求 HeadersMap
和请求 URL
。URL
的定义是必需的,它通过静态函数 new
完成,该函数接收一个 host
和一个 port
并返回一个 Crux
实例。要更改 HeadersMap
信息以便添加 AUTHORIZATION
,您可以使用接收授权令牌并修改 Crux
实例的函数 with_authorization
。
HeaderMap
已经包含标题Content-Type: application/edn
。
最后,要创建 Crux 客户端,应调用函数 <type>_client
,例如 http_client
。此函数返回一个包含所有可能的实现以查询 Crux Docker 和 Standalone HTTP 服务器查询的结构体。
use transistor::client::Crux;
// HttpClient with AUTHORIZATION
let auth_client = Crux::new("127.0.0.1","3000").with_authorization("my-auth-token").http_client();
// HttpClient without AUTHORIZATION
let client = Crux::new("127.0.0.1","3000").http_client();
Http 客户端
一旦调用 http_client
,您将拥有一个 HttpClient
结构体实例,该实例具有查询 Crux 在 Docker 和 Standalone HTTP 服务器上的一系列函数。
let body = client.state().unwrap();
// StateResponse {
// index___index_version: 5,
// doc_log___consumer_state: None,
// tx_log___consumer_state: None,
// kv___kv_store: "crux.kv.rocksdb.RocksKv",
// kv___estimate_num_keys: 56,
// kv___size: 2271042
// }
use transistor::http::{Actions};
use transistor::client::Crux;
use transistor::types::{CruxId};
let person1 = Person {
crux__db___id: CruxId::new("jorge-3"),
..
};
let person2 = Person {
crux__db___id: CruxId::new("manuel-1"),
..
};
let actions = Actions::new()
.append_put(person1)
.append_put(person2);
let body = client.tx_log(actions).unwrap();
// {:crux.tx/tx-id 7, :crux.tx/tx-time #inst \"2020-07-16T21:50:39.309-00:00\"}
use transistor::client::Crux;
let body = client.tx_logs().unwrap();
// TxLogsResponse {
// tx_events: [
// TxLogResponse {
// tx___tx_id: 0,
// tx___tx_time: 2020-07-09T23:38:06.465-00:00,
// tx__event___tx_events: Some(
// [
// [
// ":crux.tx/put",
// "a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e",
// "125d29eb3bed1bf51d64194601ad4ff93defe0e2",
// ],
// ],
// ),
// },
// TxLogResponse {
// tx___tx_id: 1,
// tx___tx_time: 2020-07-09T23:39:33.815-00:00,
// tx__event___tx_events: Some(
// [
// [
// ":crux.tx/put",
// "a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e",
// "1b42e0d5137e3833423f7bb958622bee29f91eee",
// ],
// ],
// ),
// },
// ...
// ]
// }
entity
通过POST
方式请求端点/entity
。必须将序列化的CruxId
、序列化的Edn::Key
或包含keyword
的字符串作为参数传递。返回给定 ID 和可选的有效时间/事务时间坐标的实体。
let person = Person {
crux__db___id: CruxId::new("hello-entity"),
...
};
let client = Crux::new("localhost", "3000").http_client();
// entity expects a CruxId
let edn_body = client.entity(person.crux__db___id).unwrap();
// Map(
// Map(
// {
// ":crux.db/id": Key(
// ":hello-entity",
// ),
// ":first-name": Str(
// "Hello",
// ),
// ":last-name": Str(
// "World",
// ),
// },
// ),
// )
-
entity_timed
与entity
类似,因为它请求相同的端点,区别在于它可以发送transaction-time
和valid-time
作为查询参数。这是通过额外的参数transaction_time: Option<DateTime<FixedOffset>>
和valid_time: Option<DateTime<FixedOffset>>
来实现的。 -
entity_tx
通过POST
方式请求端点/entity-tx
。必须将序列化的CruxId
、序列化的Edn::Key
或包含keyword
的字符串作为参数传递。返回最近设置键的事务。
use transistor::http::{Action};
use transistor::client::Crux;
use transistor::types::{CruxId};
let person = Person {
crux__db___id: CruxId::new("hello-entity"),
...
};
let client = Crux::new("localhost", "3000").http_client();
let tx_body = client.entity_tx(edn_rs::to_string(person.crux__db___id)).unwrap();
// EntityTxResponse {
// db___id: "d72ccae848ce3a371bd313865cedc3d20b1478ca",
// db___content_hash: "1828ebf4466f98ea3f5252a58734208cd0414376",
// db___valid_time: 2020-07-20T20:38:27.515-00:00,
// tx___tx_id: 31,
// tx___tx_time: 2020-07-20T20:38:27.515-00:00,
// }
-
entity_tx_timed
与entity_tx
类似,因为它请求相同的端点,区别在于它可以发送transaction-time
和valid-time
作为查询参数。这是通过额外的参数transaction_time: Option<DateTime<FixedOffset>>
和valid_time: Option<DateTime<FixedOffset>>
来实现的。 -
entity_history
通过GET
方式请求端点/entity-history
。参数包括crux.db/id
作为String
,由枚举http::Order
(Asc 或
Desc
)定义的排序参数,以及一个表示with-docs?
标志的布尔值。响应是一个包含EntityHistoryElement
的 Vector。如果with-docs?
为true
,则字段db__doc
,:crux.db/doc
将返回一个包含插入结构的Option<Edn>
。
use transistor::client::Crux;
use transistor::http::Order;
use transistor::types::CruxId;
let person = Person {
crux__db___id: CruxId::new("hello-history"),
...
let client = Crux::new("localhost", "3000").http_client();
let tx_body = client.entity_tx(person.crux__db___id).unwrap();
let entity_history = client.entity_history(tx_body.db___id.clone(), Order::Asc, true);
// EntityHistoryResponse { history: [
// EntityHistoryElement {
// db___valid_time: 2020-08-05T03:00:06.476-00:00,
// tx___tx_id: 37, tx___tx_time: 2020-08-05T03:00:06.476-00:00,
// db___content_hash: "2da097a2dffbb9828cd4377f1461a59e8454674b",
// db__doc: Some(Map(Map(
// {":crux.db/id": Key(":hello-history"),
// ":first-name": Str("Hello"),
// ":last-name": Str("World")}
// )))
// }
// ]}
let entity_history_without_docs = client.entity_history(tx_body.db___id, Order::Asc, false);
// EntityHistoryResponse {
// history: [
// EntityHistoryElement {
// db___valid_time: 2020-08-05T03:00:06.476-00:00,
// tx___tx_id: 37,
// tx___tx_time: 2020-08-05T03:00:06.476-00:00,
// db___content_hash: "2da097a2dffbb9828cd4377f1461a59e8454674b",
// db__doc: None
// }
// }
// ]}
-
entity_history_timed
与entity_histoty
类似,因为它请求相同的端点,区别在于它可以发送作为查询参数的start-transaction-time
、end-transaction-time
、start-valid-time
和end-valid-time
。这是通过添加一个包含一个TimeHistory::TransactionTime
和/或一个TimeHistory::ValidTime
的Vec<TimeHistory>
来实现的,它们都接收两个Option<DateTime<Utc>>
。第一个DateTime
是start-<type>-time
,第二个是end-<type>-time
。 -
query
通过POST
请求端点/query
。参数是类型为Query
的query
。检索一个包含由函数Query::find
定义的值的集合。可用函数包括find
、find_by_aggregates
、where_clause
、args
、order_by
、limit
、offset
,示例complex_query
和limit_offset_query
展示了如何使用它们。
简单查找
use transistor::client::Crux;
use transistor::types::{query::Query};
let client = Crux::new("localhost", "3000").http_client();
let query_is_sql = Query::find(vec!["?p1", "?n"])
.where_clause(vec!["?p1 :name ?n", "?p1 :is-sql true"])
.build();
// Query:
// {:query
// {:find [?p1 ?n]
// :where [[?p1 :name ?n]
// [?p1 :is-sql true]]}}
let is_sql = client.query(query_is_sql.unwrap()).unwrap();
// {[":mysql", "MySQL"], [":postgres", "Postgres"]} BTreeSet
通过聚合查找
- 可在
Aggregate
找到可用聚合
use transistor::client::Crux;
use transistor::types::{query::Query};
let client = Crux::new("localhost", "3000").http_client();
let q = Query::find_by_aggregates(vec![
Aggregate::Min("?e".to_string()), Aggregate::Max("?e".to_string()), Aggregate::Count("?e".to_string()),
Aggregate::MinN(5, "?e".to_string()), Aggregate::CountDistinct("?e".to_string())
])?
.where_clause(vec!["?e :type :burger"])?
.build()?;
// Query:
// {:query
// {:find [(min ?e) (max ?e) (count ?e) (min 5 ?e) (count-distinct ?e)]
// :where [[?e :type :burger]]
// }}
let _ = client.query(q)?;
Transisitor 的结构和枚举类型
Actions
是一个构建器结构体,用于帮助您为 tx_log
创建一个 Vec<Action>
。可用函数有
new
静态方法用于实例化结构体Actions
。append_put<T: Serialize>(action: T)
将一个无valid-time
的Put
追加到Actions
。append_put_timed<T: Serialize>(action: T, date: DateTime<FixedOffset>)
将一个带valid-time
的Put
追加到Actions
。append_delete(id: CruxId)
将一个没有有效时间
的Delete
添加到Actions
中。最后删除特定文档在最后一个有效时间
。append_delete_timed(id: CruxId, date: DateTime<FixedOffset>)
将一个具有有效时间
的Delete
添加到Actions
中。在指定的有效时间
删除特定文档。append_evict(id: CruxId)
将一个Evict
添加到Actions
。完全驱逐文档,包括所有历史版本(只接收要驱逐的 ID)。append_match_doc<T: Serialize>(id: CruxId, action: T)
将一个没有有效时间
的Match
添加到Actions
。匹配实体的当前状态,如果状态与提供的文档不匹配,则事务将不会继续。append_match_doc_timed<T: Serialize>(id: CruxId, action: T, date: DateTime<FixedOffset>)
将一个具有有效时间
的Match
添加到Actions
。build
从Actions
生成Vec<Action>
。
use transistor::client::Crux;
use transistor::types::Actions;
fn main() -> Result<(), CruxError> {
let crux = Database {
// ...
};
let psql = Database {
// ...
};
let mysql = Database {
// ...
};
let cassandra = Database {
// ...
};
let sqlserver = Database {
// ...
};
let client = Crux::new("localhost", "3000").http_client();
let timed = "2014-11-28T21:00:09-09:00"
.parse::<DateTime<FixedOffset>>()
.unwrap();
let actions: Vec<Action> = Actions::new()
.append_put(crux)
.append_put(psql)
.append_put(mysql)
.append_put_timed(cassandra, timed)
.append_put(sqlserver)
.build();
let _ = client.tx_log(actions)?;
}
Query
是一个结构体,负责创建字段并将它们序列化到正确的 query
格式。它为每个字段提供了一个函数,并有一个 build
函数来帮助检查其格式是否正确。
find
是一个静态构建函数,用于定义:find
子句内的元素。where_clause
是一个构建函数,用于定义:where []
数组内的元素向量。order_by
是一个构建函数,用于定义:order-by
子句内的元素。args
是一个构建函数,用于定义:args
子句内的元素。limit
是一个构建函数,用于定义:limit
子句内的元素。offset
是一个构建函数,用于定义:offset
子句内的元素。with_full_results
是一个构建函数,用于将标志full-results?
定义为 true。这允许您的query
响应返回整个文档,而不是仅返回搜索到的键。查询{:query {:find [?user ?a] :where [[?user :first-name ?a]] :full-results? true}}
的结果将是一个类似于BTreeSet<Vec<String>>
的结构,如([{:crux.db/id :fafilda, :first-name "Jorge", :last-name "Klaus"} "Jorge"])
的结构,因此文档需要进一步的 EDN 解析才能成为文档的结构。
错误定义在 CruxError
枚举中。
EdnError
是edn_rs::EdnError
的包装。RequestError
由reqwest
包产生。无法执行 HTTP 请求。QueryFormatError
在提供的查询结构不匹配模式时产生。QueryError
负责封装 Crux 响应中的 Stacktrace 错误。
use transistor::client::Crux;
use transistor::types::{query::Query};
let _client = Crux::new("localhost", "3000").http_client();
// field `n` doesn't exist
let _query_error_response = Query::find(vec!["?p1", "?n"])
.where_clause(vec!["?p1 :name ?g", "?p1 :is-sql true"])
.build();
let error = client.query(query_error_response?)?;
println!("Stacktrace \n{:?}", error);
// Stacktrace
// QueryError("{:via
// [{:type java.lang.IllegalArgumentException,
// :message \"Find refers to unknown variable: n\",
// :at [crux.query$q invokeStatic \"query.clj\" 1152]}],
// :trace
// [[crux.query$q invokeStatic \"query.clj\" 1152]
// [crux.query$q invoke \"query.clj\" 1099]
// [crux.query$q$fn__10850 invoke \"query.clj\" 1107]
// [clojure.core$binding_conveyor_fn$fn__5754 invoke \"core.clj\" 2030]
// [clojure.lang.AFn call \"AFn.java\" 18]
// [java.util.concurrent.FutureTask run \"FutureTask.java\" 264]
// [java.util.concurrent.ThreadPoolExecutor
// runWorker
// \"ThreadPoolExecutor.java\"
// 1128]
// [java.util.concurrent.ThreadPoolExecutor$Worker
// run
// \"ThreadPoolExecutor.java\"
// 628]
// [java.lang.Thread run \"Thread.java\" 834]],
// :cause \"Find refers to unknown variable: n\"}
// ")
测试 Crux 客户端
为了测试目的,有一个名为 feature
的 mock
,它启用了 http_mock
函数,这是 http_client
函数的替代品。要使用它,请使用标志 --features "mock"
运行您的命令,例如 cargo test --test lib --no-fail-fast --features "mock"
。此模拟功能使用 crate mockito = "0.26"
作为 Cargo 依赖项。启用此功能的一个示例用法
use transistor::client::Crux;
use transistor::http::Action;
use edn_derive::Serialize;
use transistor::types::{CruxId};
use mockito::mock;
#[test]
#[cfg(feature = "mock")]
fn mock_client() {
let _m = mock("POST", "/tx-log")
.with_status(200)
.match_body("[[:crux.tx/put { :crux.db/id :jorge-3, :first-name \"Michael\", :last-name \"Jorge\", }], [:crux.tx/put { :crux.db/id :manuel-1, :first-name \"Diego\", :last-name \"Manuel\", }]]")
.with_header("content-type", "text/plain")
.with_body("{:crux.tx/tx-id 8, :crux.tx/tx-time #inst \"2020-07-16T21:53:14.628-00:00\"}")
.create();
let person1 = Person {
// ...
};
let person2 = Person {
/// ...
};
let actions = vec![Action::put(person1), Action::put(person2)];
let body = Crux::new("localhost", "3000")
.http_mock()
.tx_log(actions)
.unwrap();
assert_eq!(
format!("{:?}", body),
String::from("TxLogResponse { tx___tx_id: 8, tx___tx_time: 2020-07-16T21:53:14.628-00:00, tx__event___tx_events: None }")
);
}
#[derive(Debug, Clone, Serialize)]
#[allow(non_snake_case)]
pub struct Person {
crux__db___id: CruxId,
// ...
}
此外,由于实现了以下代码:impl PartialEq<Vec<ActionMock>> for Actions,可以使用特征
mock
通过枚举 ActionMock
对结构体 Actions
进行测试。一个示例可以是:
use transistor::types::http::{Actions, ActionMock};
fn test_actions_eq_actions_mock() {
let actions = test_actions();
let mock = test_action_mock();
assert_eq!(actions, mock);
}
fn test_action_mock() -> Vec<ActionMock> {
let person1 = Person {
crux__db___id: CruxId::new("jorge-3"),
first_name: "Michael".to_string(),
last_name: "Jorge".to_string(),
};
let person2 = Person {
crux__db___id: CruxId::new("manuel-1"),
first_name: "Diego".to_string(),
last_name: "Manuel".to_string(),
};
vec![
ActionMock::Put(edn_rs::to_string(person1.clone()), None),
ActionMock::Put(edn_rs::to_string(person2), None),
ActionMock::Delete(edn_rs::to_string(person1.crux__db___id), None),
]
}
fn test_actions() -> Actions {
let person1 = Person {
crux__db___id: CruxId::new("jorge-3"),
first_name: "Michael".to_string(),
last_name: "Jorge".to_string(),
};
let person2 = Person {
crux__db___id: CruxId::new("manuel-1"),
first_name: "Diego".to_string(),
last_name: "Manuel".to_string(),
};
Actions::new().append_put(person1.clone()).append_put(person2).append_delete(person1.crux__db___id)
}
异步支持
异步特性仍在BETA测试中,因为它严重依赖于 unwraps
。
可以使用异步/等待 http 客户端,为此需要在 transistor 中启用 async
特性,如下所示:transistor = { version = "2.1.1", features = ["async"] }
。启用此特性后,HttpClient
将使用 reqwest::Client
而不是 reqwest::blocking::Client
。默认的异步运行时是 tokio
,因此最好在 Cargo.toml
中包含具有 macros
以及 futures
特性的 tokio
。
futures = {version = "0.3.5" }
tokio = {version = "0.2.22", features = ["macros"] }
以下是一个异步查询示例
use tokio::prelude::*;
use transistor::client::Crux;
use edn_derive::Serialize;
use transistor::types::http::Action;
use transistor::types::{
error::CruxError,
{query::Query, CruxId},
};
#[tokio::main]
async fn main() {
let crux = Database {
crux__db___id: CruxId::new("crux"),
name: "Crux Datalog".to_string(),
is_sql: false,
};
let psql = Database {
crux__db___id: CruxId::new("postgres"),
name: "Postgres".to_string(),
is_sql: true,
};
let mysql = Database {
crux__db___id: CruxId::new("mysql"),
name: "MySQL".to_string(),
is_sql: true,
};
let client = Crux::new("localhost", "3000").http_client();
let action1 = Action::put(crux, None);
let action2 = Action::put(psql, None);
let action3 = Action::put(mysql, None);
let _ = client.tx_log(vec![action1, action2, action3]).await;
let query_is_sql = Query::find(vec!["?p1", "?n"])
.unwrap()
.where_clause(vec!["?p1 :name ?n", "?p1 :is-sql true"])
.unwrap()
.build();
let is_sql = client.query(query_is_sql.unwrap()).await;
let query_is_no_sql = Query::find(vec!["?p1", "?n", "?s"])
.unwrap()
.where_clause(vec!["?p1 :name ?n", "?p1 :is-sql ?s", "?p1 :is-sql false"])
.unwrap()
.with_full_results()
.build();
let is_no_sql = client.query(query_is_no_sql.unwrap()).await;
}
#[derive(Debug, Clone, Serialize)]
#[allow(non_snake_case)]
pub struct Database {
crux__db___id: CruxId,
name: String,
is_sql: bool
}
注意 use tokio::prelude::*;
和 #[tokio::main] \n async fn main()
。
启用特性 time_as_str
可以接收响应(TxLogResponse
、EntityTxResponse
、EntityHistoryElement
)时间日期作为字符串,为此您需要启用特性 time_as_str
transistor = { version = "2.1.1", features = ["time_as_str"] }
可能的功能
mock = ["mockito"] -> http_mock()
time_as_str = [] -> DataTime types become Strings
async = ["tokio", "futures"] -> async/await
依赖关系
此 crate 的重要依赖项是 edn-rs crate,因为许多返回类型都是 Edn 格式,还包含 edn-derive。同步 http 客户端是启用了 blocking
特性的 reqwest
,用于可以 DateTime<Utc>
的日期时间值,用于插入,以及 DateTime<FixedOffset>
的日期时间值,用于读取,以及 mockito
用于 mock
功能。
许可
该项目根据 LGPP-3.0 (GNU Lesser General Public License v3.0) 许可。
依赖关系
~5–11MB
~229K SLoC