36个版本 (17个稳定版)

2.1.1 2020年10月28日
1.3.11 2020年9月19日
0.4.5 2020年8月5日
0.4.1 2020年7月31日

#455 in HTTP客户端

Download history 49/week @ 2024-03-28 39/week @ 2024-04-04 7/week @ 2024-06-27 56/week @ 2024-07-04

63 每月下载次数

LGPL-3.0

120KB
2K SLoC

晶体管

A Rust Crux客户端crate/lib。目前,这个crate打算支持两种与Crux交互的方式

  • 通过带有crux-standalone版本的docker-hub的Docker。当前Docker镜像juxt/crux-standalone:20.09-1.11-0
  • 通过HTTP使用HTTP API
  • 异步支持。
  • Clojure.api.(待评估。)
  • FFI.(待评估。)

在第一版发布后可能会添加其他解决方案。

双时态Crux

Crux通过使用一对事务时间和有效时间戳,针对高效和全局一致的时间点查询进行了优化。

典型的双时态记录系统通常依赖于在关系中显式跟踪有效从和有效到时间戳或范围类型。Crux提供的是一个非常简单的双时态文档模型,它在整个数据库中都是通用的,因此它不需要您预先考虑哪些历史信息值得存储在特殊的“双时态表”中。

可以通过在特定有效时间将一个或多个文档插入到Crux中,通过put事务,默认为事务时间(即现在),并且每个文档在通过put更新为新版本或通过delete删除之前都保持有效。

为什么?

时间 目的
事务时间 用于审计目的,例如事件溯源等技术要求。
valid-time 用于查询跨时间的数据,历史分析。

transaction-time 表示数据进入数据库的时间点。这为我们提供了一个审计轨迹,我们可以看到数据库在特定时间点的状态。您不能使用过去的时间点创建新的交易。

valid-time 是一个任意时间,可以来自上游系统,或默认设置为 transaction-time。有效时间通常是用户用于查询的目的。

参考 crux bitemporalitybitemporality 的价值

用法

要将此软件包添加到您的项目,您应该在 Cargo.toml 文件中的 dependencies 字段中添加以下行之一

[dependencies]
transistor = "2.1.1"

创建 Crux 客户端

所有使用 Transistor 的操作都从模块 client 开始,使用 Crux::new("localhost", "3000") 开始。结构体 Crux 负责定义请求 HeadersMap 和请求 URLURL 的定义是必需的,它通过静态函数 new 完成,该函数接收一个 host 和一个 port 并返回一个 Crux 实例。要更改 HeadersMap 信息以便添加 AUTHORIZATION,您可以使用接收授权令牌并修改 Crux 实例的函数 with_authorization

  • HeaderMap 已经包含标题 Content-Type: application/edn

最后,要创建 Crux 客户端,应调用函数 <type>_client,例如 http_client。此函数返回一个包含所有可能的实现以查询 Crux Docker 和 Standalone HTTP 服务器查询的结构体。

use transistor::client::Crux;

// HttpClient with AUTHORIZATION
let auth_client = Crux::new("127.0.0.1","3000").with_authorization("my-auth-token").http_client();

// HttpClient without AUTHORIZATION
let client = Crux::new("127.0.0.1","3000").http_client();

Http 客户端

一旦调用 http_client,您将拥有一个 HttpClient 结构体实例,该实例具有查询 Crux 在 Docker 和 Standalone HTTP 服务器上的一系列函数。

  • state 通过 GET 请求查询端点 /。无参数。返回有关数据库状态的详细信息。
let body = client.state().unwrap();

// StateResponse { 
//     index___index_version: 5, 
//     doc_log___consumer_state: None, 
//     tx_log___consumer_state: None, 
//     kv___kv_store: "crux.kv.rocksdb.RocksKv", 
//     kv___estimate_num_keys: 56, 
//     kv___size: 2271042 
// }
  • tx_log 通过 POST 请求端点 /tx-log。《动作》作为参数预期。这是写入端点,用于发布交易。
use transistor::http::{Actions};
use transistor::client::Crux;
use transistor::types::{CruxId};

let person1 = Person {
    crux__db___id: CruxId::new("jorge-3"), 
    ..
};

let person2 = Person {
    crux__db___id: CruxId::new("manuel-1"), 
    ..
};

let actions = Actions::new()
    .append_put(person1)
    .append_put(person2);

let body = client.tx_log(actions).unwrap();
// {:crux.tx/tx-id 7, :crux.tx/tx-time #inst \"2020-07-16T21:50:39.309-00:00\"}
  • tx_logs 通过 GET 请求端点 /tx-log。无参数。返回所有交易的列表。
use transistor::client::Crux;

let body = client.tx_logs().unwrap();

// TxLogsResponse {
//     tx_events: [
//         TxLogResponse {
//             tx___tx_id: 0,
//             tx___tx_time: 2020-07-09T23:38:06.465-00:00,
//             tx__event___tx_events: Some(
//                 [
//                     [
//                         ":crux.tx/put",
//                         "a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e",
//                         "125d29eb3bed1bf51d64194601ad4ff93defe0e2",
//                     ],
//                 ],
//             ),
//         },
//         TxLogResponse {
//             tx___tx_id: 1,
//             tx___tx_time: 2020-07-09T23:39:33.815-00:00,
//             tx__event___tx_events: Some(
//                 [
//                     [
//                         ":crux.tx/put",
//                         "a15f8b81a160b4eebe5c84e9e3b65c87b9b2f18e",
//                         "1b42e0d5137e3833423f7bb958622bee29f91eee",
//                     ],
//                 ],
//             ),
//         },
//         ...
//     ]
// } 
  • entity 通过 POST 方式请求端点 /entity。必须将序列化的 CruxId、序列化的 Edn::Key 或包含 keyword 的字符串作为参数传递。返回给定 ID 和可选的有效时间/事务时间坐标的实体。
let person = Person {
    crux__db___id: CruxId::new("hello-entity"), 
    ...
};

let client = Crux::new("localhost", "3000").http_client();

// entity expects a CruxId
let edn_body = client.entity(person.crux__db___id).unwrap();
// Map(
//     Map(
//         {
//             ":crux.db/id": Key(
//                 ":hello-entity",
//             ),
//             ":first-name": Str(
//                 "Hello",
//             ),
//             ":last-name": Str(
//                 "World",
//             ),
//         },
//     ),
// )
  • entity_timedentity 类似,因为它请求相同的端点,区别在于它可以发送 transaction-timevalid-time 作为查询参数。这是通过额外的参数 transaction_time: Option<DateTime<FixedOffset>>valid_time: Option<DateTime<FixedOffset>> 来实现的。

  • entity_tx 通过 POST 方式请求端点 /entity-tx。必须将序列化的 CruxId、序列化的 Edn::Key 或包含 keyword 的字符串作为参数传递。返回最近设置键的事务。

use transistor::http::{Action};
use transistor::client::Crux;
use transistor::types::{CruxId};

let person = Person {
    crux__db___id: CruxId::new("hello-entity"), 
    ...
};

let client = Crux::new("localhost", "3000").http_client();

let tx_body = client.entity_tx(edn_rs::to_string(person.crux__db___id)).unwrap();
// EntityTxResponse {
//     db___id: "d72ccae848ce3a371bd313865cedc3d20b1478ca",
//     db___content_hash: "1828ebf4466f98ea3f5252a58734208cd0414376",
//     db___valid_time: 2020-07-20T20:38:27.515-00:00,
//     tx___tx_id: 31,
//     tx___tx_time: 2020-07-20T20:38:27.515-00:00,
// }
  • entity_tx_timedentity_tx 类似,因为它请求相同的端点,区别在于它可以发送 transaction-timevalid-time 作为查询参数。这是通过额外的参数 transaction_time: Option<DateTime<FixedOffset>>valid_time: Option<DateTime<FixedOffset>> 来实现的。

  • entity_history 通过 GET 方式请求端点 /entity-history。参数包括 crux.db/id 作为 String,由枚举 http::OrderAscDesc)定义的排序参数,以及一个表示 with-docs? 标志的布尔值。响应是一个包含 EntityHistoryElement 的 Vector。如果 with-docs?true,则字段 db__doc:crux.db/doc 将返回一个包含插入结构的 Option<Edn>

use transistor::client::Crux;
use transistor::http::Order;
use transistor::types::CruxId;

let person = Person {
    crux__db___id: CruxId::new("hello-history"),
    ...

let client = Crux::new("localhost", "3000").http_client();

let tx_body = client.entity_tx(person.crux__db___id).unwrap();

let entity_history = client.entity_history(tx_body.db___id.clone(), Order::Asc, true);
// EntityHistoryResponse { history: [
//     EntityHistoryElement { 
//         db___valid_time: 2020-08-05T03:00:06.476-00:00, 
//         tx___tx_id: 37, tx___tx_time: 2020-08-05T03:00:06.476-00:00, 
//         db___content_hash: "2da097a2dffbb9828cd4377f1461a59e8454674b", 
//         db__doc: Some(Map(Map(
//                 {":crux.db/id": Key(":hello-history"), 
//                 ":first-name": Str("Hello"), 
//                 ":last-name": Str("World")}
//                ))) 
//     }
// ]}

let entity_history_without_docs = client.entity_history(tx_body.db___id, Order::Asc, false);
// EntityHistoryResponse { 
//     history: [
//         EntityHistoryElement {
//              db___valid_time: 2020-08-05T03:00:06.476-00:00, 
//              tx___tx_id: 37, 
//              tx___tx_time: 2020-08-05T03:00:06.476-00:00, 
//              db___content_hash: "2da097a2dffbb9828cd4377f1461a59e8454674b", 
//              db__doc: None 
//             }
//         }
//     ]}
  • entity_history_timedentity_histoty 类似,因为它请求相同的端点,区别在于它可以发送作为查询参数的 start-transaction-timeend-transaction-timestart-valid-timeend-valid-time。这是通过添加一个包含一个 TimeHistory::TransactionTime 和/或一个 TimeHistory::ValidTimeVec<TimeHistory> 来实现的,它们都接收两个 Option<DateTime<Utc>>。第一个 DateTimestart-<type>-time,第二个是 end-<type>-time

  • query 通过 POST 请求端点 /query。参数是类型为 Queryquery。检索一个包含由函数 Query::find 定义的值的集合。可用函数包括 findfind_by_aggregateswhere_clauseargsorder_bylimitoffset,示例 complex_querylimit_offset_query 展示了如何使用它们。

简单查找

use transistor::client::Crux;
use transistor::types::{query::Query};

let client = Crux::new("localhost", "3000").http_client();

let query_is_sql = Query::find(vec!["?p1", "?n"])
    .where_clause(vec!["?p1 :name ?n", "?p1 :is-sql true"])
    .build();
// Query:
// {:query
//     {:find [?p1 ?n]
//      :where [[?p1 :name ?n]
//              [?p1 :is-sql true]]}}

let is_sql = client.query(query_is_sql.unwrap()).unwrap();
// {[":mysql", "MySQL"], [":postgres", "Postgres"]} BTreeSet

通过聚合查找

use transistor::client::Crux;
use transistor::types::{query::Query};

let client = Crux::new("localhost", "3000").http_client();

let q = Query::find_by_aggregates(vec![
    Aggregate::Min("?e".to_string()), Aggregate::Max("?e".to_string()), Aggregate::Count("?e".to_string()),
    Aggregate::MinN(5, "?e".to_string()), Aggregate::CountDistinct("?e".to_string())
])?
.where_clause(vec!["?e :type :burger"])?
.build()?;
// Query:
// {:query
//   {:find [(min ?e) (max ?e) (count ?e) (min 5 ?e) (count-distinct ?e)]
//    :where [[?e :type :burger]]
// }}

let _ = client.query(q)?;

Transisitor 的结构和枚举类型

Actions 是一个构建器结构体,用于帮助您为 tx_log 创建一个 Vec<Action>。可用函数有

  • new 静态方法用于实例化结构体 Actions
  • append_put<T: Serialize>(action: T) 将一个无 valid-timePut 追加到 Actions
  • append_put_timed<T: Serialize>(action: T, date: DateTime<FixedOffset>) 将一个带 valid-timePut 追加到 Actions
  • append_delete(id: CruxId) 将一个没有 有效时间Delete 添加到 Actions 中。最后删除特定文档在最后一个 有效时间
  • append_delete_timed(id: CruxId, date: DateTime<FixedOffset>) 将一个具有 有效时间Delete 添加到 Actions 中。在指定的 有效时间 删除特定文档。
  • append_evict(id: CruxId) 将一个 Evict 添加到 Actions。完全驱逐文档,包括所有历史版本(只接收要驱逐的 ID)。
  • append_match_doc<T: Serialize>(id: CruxId, action: T) 将一个没有 有效时间Match 添加到 Actions。匹配实体的当前状态,如果状态与提供的文档不匹配,则事务将不会继续。
  • append_match_doc_timed<T: Serialize>(id: CruxId, action: T, date: DateTime<FixedOffset>) 将一个具有 有效时间Match 添加到 Actions
  • buildActions 生成 Vec<Action>
use transistor::client::Crux;
use transistor::types::Actions;

fn main() -> Result<(), CruxError> {
    let crux = Database {
        // ...
    };

    let psql = Database {
        // ...
    };

    let mysql = Database {
        // ...
    };

    let cassandra = Database {
        // ...
    };

    let sqlserver = Database {
        // ...
    };

    let client = Crux::new("localhost", "3000").http_client();
    let timed = "2014-11-28T21:00:09-09:00"
        .parse::<DateTime<FixedOffset>>()
        .unwrap();

    let actions: Vec<Action> = Actions::new()
        .append_put(crux)
        .append_put(psql)
        .append_put(mysql)
        .append_put_timed(cassandra, timed)
        .append_put(sqlserver)
        .build();

    let _ = client.tx_log(actions)?;
}

Query 是一个结构体,负责创建字段并将它们序列化到正确的 query 格式。它为每个字段提供了一个函数,并有一个 build 函数来帮助检查其格式是否正确。

  • find 是一个静态构建函数,用于定义 :find 子句内的元素。
  • where_clause 是一个构建函数,用于定义 :where [] 数组内的元素向量。
  • order_by 是一个构建函数,用于定义 :order-by 子句内的元素。
  • args 是一个构建函数,用于定义 :args 子句内的元素。
  • limit 是一个构建函数,用于定义 :limit 子句内的元素。
  • offset 是一个构建函数,用于定义 :offset 子句内的元素。
  • with_full_results 是一个构建函数,用于将标志 full-results? 定义为 true。这允许您的 query 响应返回整个文档,而不是仅返回搜索到的键。查询 {:query {:find [?user ?a] :where [[?user :first-name ?a]] :full-results? true}} 的结果将是一个类似于 BTreeSet<Vec<String>> 的结构,如 ([{:crux.db/id :fafilda, :first-name "Jorge", :last-name "Klaus"} "Jorge"]) 的结构,因此文档需要进一步的 EDN 解析才能成为文档的结构。

错误定义在 CruxError 枚举中。

  • EdnErroredn_rs::EdnError 的包装。
  • RequestErrorreqwest 包产生。无法执行 HTTP 请求。
  • QueryFormatError 在提供的查询结构不匹配模式时产生。
  • QueryError 负责封装 Crux 响应中的 Stacktrace 错误。
use transistor::client::Crux;
use transistor::types::{query::Query};

let _client = Crux::new("localhost", "3000").http_client();

// field `n` doesn't exist
let _query_error_response = Query::find(vec!["?p1", "?n"])
    .where_clause(vec!["?p1 :name ?g", "?p1 :is-sql true"])
    .build();

let error = client.query(query_error_response?)?;
println!("Stacktrace \n{:?}", error);

// Stacktrace
// QueryError("{:via
//      [{:type java.lang.IllegalArgumentException,
//        :message \"Find refers to unknown variable: n\",
//    :at [crux.query$q invokeStatic \"query.clj\" 1152]}],
//  :trace
//  [[crux.query$q invokeStatic \"query.clj\" 1152]
//   [crux.query$q invoke \"query.clj\" 1099]
//   [crux.query$q$fn__10850 invoke \"query.clj\" 1107]
//   [clojure.core$binding_conveyor_fn$fn__5754 invoke \"core.clj\" 2030]
//   [clojure.lang.AFn call \"AFn.java\" 18]
//   [java.util.concurrent.FutureTask run \"FutureTask.java\" 264]
//   [java.util.concurrent.ThreadPoolExecutor
//    runWorker
//    \"ThreadPoolExecutor.java\"
//    1128]
//   [java.util.concurrent.ThreadPoolExecutor$Worker
//    run
//    \"ThreadPoolExecutor.java\"
//    628]
//   [java.lang.Thread run \"Thread.java\" 834]],
//  :cause \"Find refers to unknown variable: n\"}
// ")

测试 Crux 客户端

为了测试目的,有一个名为 featuremock,它启用了 http_mock 函数,这是 http_client 函数的替代品。要使用它,请使用标志 --features "mock" 运行您的命令,例如 cargo test --test lib --no-fail-fast --features "mock"。此模拟功能使用 crate mockito = "0.26" 作为 Cargo 依赖项。启用此功能的一个示例用法

use transistor::client::Crux;
use transistor::http::Action;
use edn_derive::Serialize;
use transistor::types::{CruxId};
use mockito::mock;

#[test]
#[cfg(feature = "mock")]
fn mock_client() {
    let _m = mock("POST", "/tx-log")
        .with_status(200)
        .match_body("[[:crux.tx/put { :crux.db/id :jorge-3, :first-name \"Michael\", :last-name \"Jorge\", }], [:crux.tx/put { :crux.db/id :manuel-1, :first-name \"Diego\", :last-name \"Manuel\", }]]")
        .with_header("content-type", "text/plain")
        .with_body("{:crux.tx/tx-id 8, :crux.tx/tx-time #inst \"2020-07-16T21:53:14.628-00:00\"}")
        .create();

    let person1 = Person {
        // ...
    };

    let person2 = Person {
        /// ...
    };

    let actions = vec![Action::put(person1), Action::put(person2)];
    
    let body = Crux::new("localhost", "3000")
        .http_mock()
        .tx_log(actions)
        .unwrap();

    assert_eq!(
        format!("{:?}", body),
        String::from("TxLogResponse { tx___tx_id: 8, tx___tx_time: 2020-07-16T21:53:14.628-00:00, tx__event___tx_events: None }")
    );
}

#[derive(Debug, Clone, Serialize)]
#[allow(non_snake_case)]
pub struct Person {
    crux__db___id: CruxId,
    // ...
}

此外,由于实现了以下代码:impl PartialEq<Vec<ActionMock>> for Actions,可以使用特征 mock 通过枚举 ActionMock 对结构体 Actions 进行测试。一个示例可以是:

use transistor::types::http::{Actions, ActionMock};

fn test_actions_eq_actions_mock() {
    let actions = test_actions();
    let mock = test_action_mock();

    assert_eq!(actions, mock);
}

fn test_action_mock() -> Vec<ActionMock> {
    let person1 = Person {
        crux__db___id: CruxId::new("jorge-3"),
        first_name: "Michael".to_string(),
        last_name: "Jorge".to_string(),
    };

    let person2 = Person {
        crux__db___id: CruxId::new("manuel-1"),
        first_name: "Diego".to_string(),
        last_name: "Manuel".to_string(),
    };

    vec![
        ActionMock::Put(edn_rs::to_string(person1.clone()), None),
        ActionMock::Put(edn_rs::to_string(person2), None),
        ActionMock::Delete(edn_rs::to_string(person1.crux__db___id), None),
    ]
}

fn test_actions() -> Actions {
    let person1 = Person {
        crux__db___id: CruxId::new("jorge-3"),
        first_name: "Michael".to_string(),
        last_name: "Jorge".to_string(),
    };

    let person2 = Person {
        crux__db___id: CruxId::new("manuel-1"),
        first_name: "Diego".to_string(),
        last_name: "Manuel".to_string(),
    };
    Actions::new().append_put(person1.clone()).append_put(person2).append_delete(person1.crux__db___id)
}

异步支持

异步特性仍在BETA测试中,因为它严重依赖于 unwraps

可以使用异步/等待 http 客户端,为此需要在 transistor 中启用 async 特性,如下所示:transistor = { version = "2.1.1", features = ["async"] }。启用此特性后,HttpClient 将使用 reqwest::Client 而不是 reqwest::blocking::Client。默认的异步运行时是 tokio,因此最好在 Cargo.toml 中包含具有 macros 以及 futures 特性的 tokio

futures = {version = "0.3.5" }
tokio = {version = "0.2.22", features = ["macros"] }

以下是一个异步查询示例

use tokio::prelude::*;
use transistor::client::Crux;
use edn_derive::Serialize;
use transistor::types::http::Action;
use transistor::types::{
    error::CruxError,
    {query::Query, CruxId},
};

#[tokio::main]
async fn main() {
    let crux = Database {
        crux__db___id: CruxId::new("crux"),
        name: "Crux Datalog".to_string(),
        is_sql: false,
    };

    let psql = Database {
        crux__db___id: CruxId::new("postgres"),
        name: "Postgres".to_string(),
        is_sql: true,
    };

    let mysql = Database {
        crux__db___id: CruxId::new("mysql"),
        name: "MySQL".to_string(),
        is_sql: true,
    };

    let client = Crux::new("localhost", "3000").http_client();
    let action1 = Action::put(crux, None);
    let action2 = Action::put(psql, None);
    let action3 = Action::put(mysql, None);

    let _ = client.tx_log(vec![action1, action2, action3]).await;

    let query_is_sql = Query::find(vec!["?p1", "?n"])
        .unwrap()
        .where_clause(vec!["?p1 :name ?n", "?p1 :is-sql true"])
        .unwrap()
        .build();

    let is_sql = client.query(query_is_sql.unwrap()).await;

    let query_is_no_sql = Query::find(vec!["?p1", "?n", "?s"])
        .unwrap()
        .where_clause(vec!["?p1 :name ?n", "?p1 :is-sql ?s", "?p1 :is-sql false"])
        .unwrap()
        .with_full_results()
        .build();

    let is_no_sql = client.query(query_is_no_sql.unwrap()).await;
}

#[derive(Debug, Clone, Serialize)]
#[allow(non_snake_case)]
pub struct Database {
    crux__db___id: CruxId,
    name: String,
    is_sql: bool
}

注意 use tokio::prelude::*;#[tokio::main] \n async fn main()

启用特性 time_as_str

可以接收响应(TxLogResponseEntityTxResponseEntityHistoryElement)时间日期作为字符串,为此您需要启用特性 time_as_str

transistor = { version = "2.1.1", features = ["time_as_str"] }

可能的功能

mock = ["mockito"] -> http_mock()
time_as_str = [] -> DataTime types become Strings
async = ["tokio", "futures"] -> async/await

依赖关系

此 crate 的重要依赖项是 edn-rs crate,因为许多返回类型都是 Edn 格式,还包含 edn-derive。同步 http 客户端是启用了 blocking 特性的 reqwest,用于可以 DateTime<Utc> 的日期时间值,用于插入,以及 DateTime<FixedOffset> 的日期时间值,用于读取,以及 mockito 用于 mock 功能。

许可

该项目根据 LGPP-3.0 (GNU Lesser General Public License v3.0) 许可。

依赖关系

~5–11MB
~229K SLoC