10 个版本
0.3.9 | 2024年6月29日 |
---|---|
0.3.8 | 2022年12月21日 |
0.3.7 | 2022年2月5日 |
0.2.3 | 2021年10月17日 |
0.2.0 | 2021年7月30日 |
#72 在 异步
543 每月下载量
470KB
6K SLoC
Rust 双接口用于 q/kdb+
由于 Rust 是一种性能和类型安全度都很高的流行编程语言,人们希望用它与仍然非常流行的时序数据库 kdb+ 一起使用。这种愿望是可以理解的,因为我们知道 kdb+ 很快,它的接口或共享库也应该很快。这个接口就是为了满足这种自然的需求而创建的,而且用户使用起来不会有任何痛苦。q/kdb+ C API 中著名的神秘函数名并不是 Rust 开发者的兴趣所在。
"给我们一个 Rust 接口!!"
这就是你的选择。
该接口提供两个功能
- IPC 接口(q/kdb+ 进程的 Rust 客户端)
- API(为 q/kdb+ 构建共享库)
下面是每个功能的详细描述。
q/kdb+ 的 Rust IPC 接口
由于 Rust 是为了解决 C/C++ 的类型不安全而设计的,如果可能的话,用 Rust 替换 C/C++ 是可行的。这个接口旨在用作 q/kdb+ 进程的 Rust 客户端,它发送查询并接收响应。对 kdb+ 的查询支持两种方式
- 文本查询
- 函数查询,表示为 kdb+ 的复合列表(查看 IPC 的详细信息)。
消息的压缩/解压缩也遵循 kdb+ 实现。
至于连接方法,由于 q/kdb+ 的协议,通常客户端接口不提供监听器。但是,有时 Rust 进程正在连接到上游,而 q/kdb+ 随后启动或更频繁地重启。然后提供监听器方法是一个自然的方向,这里已经实现了。以下方式支持连接到 kdb+
- TCP
- TLS
- Unix 域套接字
此外,为了提高互操作性,提供了一些铸造、获取和设置方法。
环境变量
此crate使用q-native或crate特定的环境变量。
-
KDBPLUS_ACCOUNT_FILE
:一个凭证文件路径,用于接收者加载以管理来自q客户端的访问。此文件每行包含一个用户名和SHA-1散列密码,由':'
分隔,没有任何空格。例如,包含两个凭证的文件"mattew:oracle"
和"reluctant:slowday"
看起来像这样mattew:431364b6450fc47ccdbf6a2205dfdb1baeb79412 reluctant:d03f5cc1cdb11a77410ee34e26ca1102e67a893c
可以使用q中的函数
.Q.sha1
生成散列密码q).Q.sha1 "slowday" 0xd03f5cc1cdb11a77410ee34e26ca1102e67a893c
-
KDBPLUS_TLS_KEY_FILE
和KDBPLUS_TLS_KEY_FILE_SECRET
:TLS接收者使用的pkcs12文件及其密码。 -
QUDSPATH
(可选):定义抽象命名空间的q-native环境变量。此环境变量也被UDS接收者使用。如果定义了此环境变量,抽象命名空间将是@${QUDSPATH}/kx.[服务器进程端口]
;否则,它将是@/tmp/kx.[服务器进程端口]
。
注意
- 消息将以OS本地端序发送。
- 当使用此crate作为TLS客户端时,您需要在q侧设置两个环境变量
KX_SSL_CERT_FILE
和KX_SSL_KEY_FILE
,以便q/kdb+作为TLS服务器工作。有关详细信息,请参阅KX网站。
类型映射
所有类型都表示为K
结构体,它与api
模块中的K
结构体非常相似,但其结构针对IPC使用和交互的便利性进行了优化。下表显示了每个q类型用于构建K
对象的输入类型。请注意,输入类型可能与内部类型不同。例如,时间戳的输入类型为chrono::DateTime<Utc>
,但其内部类型是表示自2000.01.01D00:00:00
以来经过的纳秒数的i64
。
q | Rust |
---|---|
bool |
bool |
GUID |
[u8; 16] |
byte |
u8 |
short |
i16 |
int |
i32 |
long |
i64 |
real |
f32 |
float |
f64 |
char |
char |
symbol |
String |
timestamp |
chrono::DateTime<Utc> |
month |
chrono::NaiveDate |
date |
chrono::NaiveDate |
datetime |
chrono::DateTime<Utc> |
timespan |
chrono::Duration |
minute |
chrono::Duration |
second |
chrono::Duration |
time |
chrono::Duration |
list |
Vec<Item> (Item 是上述相应的类型) |
复合列表 |
Vec<K> |
表格 |
Vec<K> |
字典 |
Vec<K> |
空 |
() |
示例
客户端
use kdbplus::ipc::*;
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {
// Connect to qprocess running on localhost:5000 via UDS
let mut socket = QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
println!("Connection type: {}", socket.get_connection_type());
// Set remote function with asynchronous message
socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;
// Send a query synchronously
let mut result = socket.send_sync_message(&"collatz[12]").await?;
println!("collatz[12]: {}", result);
result = socket.send_sync_message(&"collatz[`a]").await?;
println!("collatz[`a]: {}", result);
// Send a functional query.
let mut message = K::new_compound_list(vec![K::new_symbol(String::from("collatz")), K::new_long(100)]);
result = socket.send_sync_message(&message).await?;
println!("collatz[100]: {}", result);
// Modify query to (`collatz; 20)
message.pop().unwrap();
message.push(&K::new_long(20)).unwrap();
result=socket.send_sync_message(&message).await?;
println!("collatz[20]: {}", result);
// Send a functional asynchronous query.
message = K::new_compound_list(vec![K::new_string(String::from("show"), qattribute::NONE), K::new_symbol(String::from("goodbye"))]);
socket.send_async_message(&message).await?;
socket.shutdown().await?;
Ok(())
}
监听器
use kdbplus::ipc::*;
#[tokio::main]
async fn main() -> Result<()> {
// Start listenening over TCP at the port 7000 with authentication enabled.
let mut socket_tcp = QStream::accept(ConnectionMethod::TCP, "127.0.0.1", 7000).await?;
// Send a query with the socket.
let greeting = socket_tcp.send_sync_message(&"string `Hello").await?;
println!("Greeting: {}", greeting);
socket_tcp.shutdown().await?;
Ok(())
}
然后,一个客户端可以通过acceptor的主机、端口和在KDBPLUS_ACCOUNT_FILE
中配置的凭据来连接到这个acceptor
q)h:hopen `::7000:reluctant:slowday
安装
在Cargo.toml
中将kdbplus
用作库名称,并使用"ipc"
功能。
[dependencies]
kdbplus={version="^0.3", features=["ipc"]}
q/kdb+ C API的Rust包装器
编程语言q(kdb+是用q编写的数据库)只提供C API,但有时外部库提供Rust接口而不是C/C++接口。从其性能来看,Rust仍然可以构建kdb+的共享库。此库旨在满足这种自然需求(如果你愿意,可以称之为愿望)。由于除了创建这样的包装器之外别无他法来编写kdb+的共享库,因此提供此包装器可能是合理的,并且在这里已经实现了。
为了避免编写过大的unsafe
块,从而导致优化不良,大多数本地C API函数都提供了一个包装函数,该函数具有一些符合人体工程学的安全性,并以直观的实现作为特性和方法。唯一的例外是使用省略号(...
)作为其参数的knk
和k
。这些函数在native
命名空间中提供,与其它C API函数一起。
注意:此库旨在用于构建共享库;因此,删除了一些无关函数。例如,连接到kdb+的连接函数khpu
不包括在内。
安装
在Cargo.toml
中将kdbplus
用作库名称,并使用"api"
功能。
[dependencies]
kdbplus={version="^0.3", features=["api"]}
示例
使用C API包装器的示例包括在api_examples
文件夹中。这些示例与kdbplus::api
模块中的示例相对应,并且这些函数也用于库的简单测试。测试是在tests/
下的test.q
中通过加载从示例构建的共享库中定义的函数来进行的。
以下是一些示例
C API样式
use kdbplus::qtype;
use kdbplus::api::*;
use kdbplus::api::native::*;
#[no_mangle]
pub extern "C" fn create_symbol_list(_: K) -> K {
unsafe{
let mut list=ktn(qtype::SYMBOL_LIST as i32, 0);
js(&mut list, ss(str_to_S!("Abraham")));
js(&mut list, ss(str_to_S!("Isaac")));
js(&mut list, ss(str_to_S!("Jacob")));
js(&mut list, sn(str_to_S!("Josephine"), 6));
list
}
}
#[no_mangle]
pub extern "C" fn catchy(func: K, args: K) -> K {
unsafe{
let result=ee(dot(func, args));
if (*result).qtype == qtype::ERROR{
println!("error: {}", S_to_str((*result).value.symbol));
// Decrement reference count of the error object
r0(result);
KNULL
} else {
result
}
}
}
#[no_mangle]
pub extern "C" fn dictionary_list_to_table() -> K {
unsafe{
let dicts = knk(3);
let dicts_slice = dicts.as_mut_slice::<K>();
for i in 0..3 {
let keys = ktn(qtype::SYMBOL_LIST as i32, 2);
let keys_slice = keys.as_mut_slice::<S>();
keys_slice[0] = ss(str_to_S!("a"));
keys_slice[1] = ss(str_to_S!("b"));
let values = ktn(qtype::INT_LIST as i32, 2);
values.as_mut_slice::<I>()[0..2].copy_from_slice(&[i*10, i*100]);
dicts_slice[i as usize] = xD(keys, values);
}
// Format list of dictionary as a table.
// ([] a: 0 10 20i; b: 0 100 200i)
k(0, str_to_S!("{[dicts] -1 _ dicts, (::)}"), dicts, KNULL)
}
}
q可以使用这些函数像这样
q)summon:`libc_api_examples 2: (`create_symbol_list; 1)
q)summon[]
`Abraham`Isaac`Jacob`Joseph
q)`Abraham`Isaac`Jacob`Joseph ~ summon[]
q)catchy: `libc_api_examples 2: (`catchy; 2);
q)catchy[$; ("J"; "42")]
42
q)catchy[+; (1; `a)]
error: type
q)behold: `libc_api_examples 2: (`dictionary_list_to_table; 1);
q)behold[]
a b
------
0 0
10 100
20 200
Rust样式
以下示例中没有unsafe
代码。您可以看到包装函数在代码中的舒适度。
use kdbplus::qtype;
use kdbplus::api::*;
use kdbplus::api::native::*;
#[no_mangle]
pub extern "C" fn create_symbol_list2(_: K) -> K {
let mut list = new_list(qtype::SYMBOL_LIST, 0);
list.push_symbol("Abraham").unwrap();
list.push_symbol("Isaac").unwrap();
list.push_symbol("Jacob").unwrap();
list.push_symbol_n("Josephine", 6).unwrap();
list
}
#[no_mangle]
fn no_panick(func: K, args: K) -> K {
let result = error_to_string(apply(func, args));
if let Ok(error) = result.get_error_string() {
println!("FYI: {}", error);
// Decrement reference count of the error object which is no longer used.
decrement_reference_count(result);
KNULL
}
else{
println!("success!");
result
}
}
#[no_mangle]
pub extern "C" fn create_table2(_: K) -> K {
// Build keys
let keys = new_list(qtype::SYMBOL_LIST, 2);
let keys_slice = keys.as_mut_slice::<S>();
keys_slice[0] = enumerate(str_to_S!("time"));
keys_slice[1] = enumerate_n(str_to_S!("temperature_and_humidity"), 11);
// Build values
let values = new_list(qtype::COMPOUND_LIST, 2);
let time = new_list(qtype::TIMESTAMP_LIST, 3);
// 2003.10.10D02:24:19.167018272 2006.05.24D06:16:49.419710368 2008.08.12D23:12:24.018691392
time.as_mut_slice::<J>().copy_from_slice(&[119067859167018272_i64, 201766609419710368, 271897944018691392]);
let temperature = new_list(qtype::FLOAT_LIST, 3);
temperature.as_mut_slice::<F>().copy_from_slice(&[22.1_f64, 24.7, 30.5]);
values.as_mut_slice::<K>().copy_from_slice(&[time, temperature]);
flip(new_dictionary(keys, values))
}
以下是q代码
q)summon:`libc_api_examples 2: (`create_symbol_list2; 1)
q)summon[]
`Abraham`Isaac`Jacob`Joseph
q)chill: `libc_api_examples 2: (`no_panick; 2);
q)chill[$; ("J"; "42")]
success!
42
q)chill[+; (1; `a)]
FYI: type
q)climate_change: libc_api_examples 2: (`create_table2; 1);
q)climate_change[]
time temperature
-----------------------------------------
2003.10.10D02:24:19.167018272 22.1
2006.05.24D06:16:49.419710368 24.7
2008.08.12D23:12:24.018691392 30.5
测试
测试以两种方式进行
- 使用Cargo
- 运行q测试脚本
1. 使用Cargo
在开始测试之前,您需要在端口5000上启动一个q进程
kdbplus]$ q -p 5000
q)
然后运行cargo test
kdbplus]$ cargo test
注意:目前文档中的api
示例有20个测试失败。这是因为在api
的本质上,示例没有main
函数,但仍然使用#[macro_use]
。
2. 运行q测试脚本
使用tests/test.q
执行测试,通过加载在api_examples
中构建的示例函数。
kdbplus]$ cargo build
kdbplus]$ cp target/debug/libapi_examples.so tests/
kdbplus]$ cd tests
tests]$ q test.q
Initialized something, probably it is your mindset.
bool: true
bool: false
byte: 0xc4
GUID: 8c6b-8b-64-68-156084
short: 10
int: 42
int: 122
int: 7336
int: 723
int: 14240
int: 2056636
long: -109210
long: 43200123456789
long: -325389000000021
long: 0
real: 193810.31
float: -37017.09330000
float: 742.41927468
char: "k"
symbol: `locust
string: "gnat"
string: "grasshopper"
error: type
What do you see, son of man?: a basket of summer fruit
What do you see, son of man?: boiling pot, facing away from the north
symbol: `rust
success!
FYI: type
this is KNULL
Planet { name: "earth", population: 7500000000, water: true }
Planet { name: "earth", population: 7500000000, water: true }
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
"Collect the clutter of apples!"
test result: ok. 147 passed; 0 failed
q)What are the three largest elements?: `belief`love`hope
使用此库的项目
- qrpc(gRPC客户端)
- q_comtrade(COMTRADE文件解析器)
文档
此crate的文档本身位于crates.io页面。
有关C API本身的详细信息,请查阅KX网站的文档。
许可证
库kdbplus
遵循Apache2.0许可证。
请参阅库中的许可证。
依赖项
~0.3–12MB
~151K SLoC