#api #ipc #kdb #shared #embedding #process #build

kxkdb

API 构建共享库和 IPC 客户端的 q/kdb+ 双向接口

3 个不稳定版本

0.1.0 2023年6月5日
0.0.1 2023年2月1日
0.0.0 2023年1月24日

#2 in #kdb

Apache-2.0

470KB
6K SLoC

kxkdb

为 Rust 编程语言提供的 kdb+ 接口。

此接口包含两个功能

  • IPC:通过 IPC 连接 Rust 和 kdb+ 进程
  • API:在 kdb+ 进程中嵌入 Rust 代码

文档

此接口的文档可以在 https://docs.rs/kxkdb/ 找到。

Kdbplus

kxkdb 接口是从由 kdbplus 开发的出色的接口 fork 的,该接口由 diamondrod 开发。

IPC

IPC 功能使 Rust 和 kdb+ 之间可以通过 qipc 通信。

连接性通过 TCP 或 Unix 域套接字实现,支持消息的 压缩TLS 加密

提供连接和监听方法,可以开发

  • kdb+ 服务器进程的 Rust IPC 客户端
  • kdb+ 客户端进程的 Rust IPC 服务器

安装

kxkdb 添加为依赖项,并使用功能 ipc。您可能还需要添加异步运行时,例如 Tokio

e.g.

[dependencies]
kxkdb = { version = "0.0", features = ["ipc"] }
tokio = { version = "1.24", features = ["full"] }

示例

客户端

use kxkdb::ipc::*;
use kxkdb::qattribute;

#[tokio::main]
async fn main() -> Result<()> {
    let mut socket;  // socket connection to kdb+ process
    let mut result;  // result of sync query to kdb+ process
    let mut message; // compound list containing message

    // connect via UDS to local kdb+ process listening on port 4321
    socket = QStream::connect(ConnectionMethod::UDS, "", 4321_u16, "").await?;

    // confirm connection type
    println!("Connection type: {}", socket.get_connection_type());

    // synchronously query kdb+ process using string
    result = socket.send_sync_message(&"sum 1+til 100").await?;
    println!("result1: {}", result);

    // asynchronously define function in kdb+ process
    socket.send_async_message(&"add_one:{x+1}").await?;
   
    // synchronously call function (correctly)
    result = socket.send_sync_message(&"add_one 41").await?;
    println!("result2: {}", result);
   
    // synchronously call function (incorrectly)
    result = socket.send_sync_message(&"add_one`41").await?;
    println!("result3: {}", result);

    // synchronously query kdb+ process using compound list
    message = K::new_compound_list(vec![K::new_symbol(String::from("add_one")), K::new_long(100)]);
    result = socket.send_sync_message(&message).await?;
    println!("result4: {}", result);
    
    // asynchronously call show function in kdb+ process
    message = K::new_compound_list(vec![K::new_string(String::from("show"), qattribute::NONE), K::new_symbol(String::from("hello from rust"))]);
    socket.send_async_message(&message).await?;

    // close socket
    socket.shutdown().await?;

    Ok(())
}

服务器

设置一个包含用户名和(SHA-1 加密)密码的凭证文件。

e.g.

$ cat userpass.txt
fred:e962cde7053eed120f928cd18e58ebd31be77543
homer:df43ad44d44e898f8f4e6ed91e6952bfce573e12

注意:在 q 中可以使用 .Q.sha1 生成散列密码。

将此文件的路径存储在环境变量 KDBPLUS_ACCOUNT_FILE 中。

e.g.

$ export KDBPLUS_ACCOUNT_FILE=`pwd`/userpass.txt

以下代码将建立一个监听端口 4321 的 Rust 服务器进程。

use kxkdb::ipc::*;

#[tokio::main]
async fn main() -> Result<()> {
    let mut socket;   // socket connection to kdb+ process

    // listen for incoming TCP connections on port 4321
    socket = QStream::accept(ConnectionMethod::TCP, "127.0.0.1", 4321).await?;

    // when a connection is established, synchronously send a message to the client
    let response = socket.send_sync_message(&"0N!string `Hello").await?;
    println!("result: {}", response);

    // close socket
    socket.shutdown().await?;

    Ok(())
}

然后 kdb+ 客户端可以使用正确的凭证连接。

e.g.

q)hopen`:127.0.0.1:4321:fred:flintstone;
"Hello"

类型映射

以下表格显示了用于构建不同 q 类型的输入类型(实现为 K 对象)。

q Rust
布尔值 bool
guid [u8; 16]
byte u8
short i16
int i32
long i64
实数 f32
float f64
char char
symbol String
时间戳 chrono::DateTime<Utc>
月份 chrono::NaiveDate
日期 chrono::NaiveDate
datetime chrono::DateTime<Utc>
timespan chrono::Duration
分钟 chrono::Duration
chrono::Duration
时间 chrono::Duration
列表 Vec<T> (T 上面的对应类型)
复合列表 Vec<K>
Vec<K>
字典 Vec<K>
泛型空 ()

注意:输入类型可能与内部类型不同。例如,时间戳的输入类型为 chrono::DateTime<Utc>,但内部类型是 i64,表示自 2000.01.01D00:00:00 以来经过的纳秒数。

环境变量

KDBPLUS_ACCOUNT_FILE

凭证文件路径,由Rust服务器用于管理来自kdb+客户端的访问。

每行包含一个用户名和SHA-1散列密码,由 ':' 分隔。

KDBPLUS_TLS_KEY_FILE

用于TLS连接的pkcs12文件的路径。

KDBPLUS_TLS_KEY_FILE_SECRET

上述pkcs12文件的密码。

QUDSPATH

定义用于 Unix域套接字 的(实际或抽象)路径,用于 $QUDSPATH/kx.[PORT]

注意。如果未定义,则默认为 /tmp/kx.[PORT]

API

API功能允许在Rust中开发共享对象库,这些库可以 动态加载 到kdb+。

为了避免大的 unsafe 块,大多数本地C API函数都提供了一个包装函数,并以直观的实现作为trait方法。例外的是变长函数 knkk,这些函数与其他C API函数一起在 native 命名空间中提供。

安装

kxkdb 添加为依赖项,带有 api 功能。

[dependencies]
kxkdb={version="0.0", features=["api"]}

示例

C API样式

use kxkdb::qtype;
use kxkdb::api::*;
use kxkdb::api::native::*;

#[no_mangle]
pub extern "C" fn create_symbol_list(_: K) -> K {
    unsafe{
        let mut list=ktn(qtype::SYMBOL_LIST as i32, 0);
        js(&mut list, ss(str_to_S!("Abraham")));
        js(&mut list, ss(str_to_S!("Isaac")));
        js(&mut list, ss(str_to_S!("Jacob")));
        js(&mut list, sn(str_to_S!("Josephine"), 6));
        list
    }
}
 
#[no_mangle]
pub extern "C" fn catchy(func: K, args: K) -> K {
    unsafe{
        let result=ee(dot(func, args));
        if (*result).qtype == qtype::ERROR{
            println!("error: {}", S_to_str((*result).value.symbol));
            // Decrement reference count of the error object
            r0(result);
            KNULL
        } else {
            result
        }
    }
}

#[no_mangle]
pub extern "C" fn dictionary_list_to_table() -> K {
    unsafe{
        let dicts = knk(3);
        let dicts_slice = dicts.as_mut_slice::<K>();
        for i in 0..3 {
            let keys = ktn(qtype::SYMBOL_LIST as i32, 2);
            let keys_slice = keys.as_mut_slice::<S>();
            keys_slice[0] = ss(str_to_S!("a"));
            keys_slice[1] = ss(str_to_S!("b"));
            let values = ktn(qtype::INT_LIST as i32, 2);
            values.as_mut_slice::<I>()[0..2].copy_from_slice(&[i*10, i*100]);
            dicts_slice[i as usize] = xD(keys, values);
        }
        // Format list of dictionary as a table.
        // ([] a: 0 10 20i; b: 0 100 200i)
        k(0, str_to_S!("{[dicts] -1 _ dicts, (::)}"), dicts, KNULL)
    } 
}

然后kdb+进程可以动态加载并调用这些函数,如下所示

q)summon:`libc_api_examples 2: (`create_symbol_list; 1)
q)summon[]
`Abraham`Isaac`Jacob`Joseph
q)`Abraham`Isaac`Jacob`Joseph ~ summon[]
q)catchy: `libc_api_examples 2: (`catchy; 2);
q)catchy[$; ("J"; "42")]
42
q)catchy[+; (1; `a)]
error: type
q)behold: `libc_api_examples 2: (`dictionary_list_to_table; 1);
q)behold[]
a  b  
------
0  0  
10 100
20 200

Rust样式

以下示例中未编写 unsafe 代码。

use kxkdb::qtype;
use kxkdb::api::*;
use kxkdb::api::native::*;

#[no_mangle]
pub extern "C" fn create_symbol_list2(_: K) -> K {
    let mut list = new_list(qtype::SYMBOL_LIST, 0);
    list.push_symbol("Abraham").unwrap();
    list.push_symbol("Isaac").unwrap();
    list.push_symbol("Jacob").unwrap();
    list.push_symbol_n("Josephine", 6).unwrap();
    list
}

#[no_mangle]
fn no_panick(func: K, args: K) -> K {
    let result = error_to_string(apply(func, args));
    if let Ok(error) = result.get_error_string() {
        println!("FYI: {}", error);
        // Decrement reference count of the error object which is no longer used.
        decrement_reference_count(result);
        KNULL
    }
    else{
        println!("success!");
        result
    }
}

#[no_mangle]
pub extern "C" fn create_table2(_: K) -> K {
    // Build keys
    let keys = new_list(qtype::SYMBOL_LIST, 2);
    let keys_slice = keys.as_mut_slice::<S>();
    keys_slice[0] = enumerate(str_to_S!("time"));
    keys_slice[1] = enumerate_n(str_to_S!("temperature_and_humidity"), 11);

    // Build values
    let values = new_list(qtype::COMPOUND_LIST, 2);
    let time = new_list(qtype::TIMESTAMP_LIST, 3);
    // 2003.10.10D02:24:19.167018272 2006.05.24D06:16:49.419710368 2008.08.12D23:12:24.018691392
    time.as_mut_slice::<J>().copy_from_slice(&[119067859167018272_i64, 201766609419710368, 271897944018691392]);
    let temperature = new_list(qtype::FLOAT_LIST, 3);
    temperature.as_mut_slice::<F>().copy_from_slice(&[22.1_f64, 24.7, 30.5]);
    values.as_mut_slice::<K>().copy_from_slice(&[time, temperature]);
    
    flip(new_dictionary(keys, values))
}

这里是q代码

q)summon:`libc_api_examples 2: (`create_symbol_list2; 1)
q)summon[]
`Abraham`Isaac`Jacob`Joseph
q)chill: `libc_api_examples 2: (`no_panick; 2);
q)chill[$; ("J"; "42")]
success!
42
q)chill[+; (1; `a)]
FYI: type
q)climate_change: libc_api_examples 2: (`create_table2; 1);
q)climate_change[]
time                          temperature
-----------------------------------------
2003.10.10D02:24:19.167018272 22.1       
2006.05.24D06:16:49.419710368 24.7       
2008.08.12D23:12:24.018691392 30.5  

测试

测试以两种方式进行

  1. 使用Cargo
  2. 运行一个q测试脚本

1. 使用Cargo

在开始测试之前,启动一个监听在端口5000的kdb+进程。

$ q -p 5000
q)

然后运行测试

kxkdb]$ cargo test

注意: 目前文档中的 api 示例有20个测试失败。这是因为在 api 的本质上没有 main 函数,但仍然使用了 #[macro_use]

2. 运行q测试脚本

使用 tests/test.q 进行测试,通过加载 api_examples 中构建的示例函数。

kxkdb]$ cargo build --release
kxkdb]$ cp target/release/libapi_examples.so tests/
kxkdb]$ cd tests
tests]$ q test.q
Initialized something, probably it is your mindset.
bool: true
bool: false
byte: 0xc4
GUID: 8c6b-8b-64-68-156084
short: 10
int: 42
int: 122
int: 7336
int: 723
int: 14240
int: 2056636
long: -109210
long: 43200123456789
long: -325389000000021
long: 0
real: 193810.31
float: -37017.09330000
float: 742.41927468
char: "k"
symbol: `locust
string: "gnat"
string: "grasshopper"
error: type
What do you see, son of man?: a basket of summer fruit
What do you see, son of man?: boiling pot, facing away from the north
symbol: `rust
success!
FYI: type
this is KNULL
Planet { name: "earth", population: 7500000000, water: true }
Planet { name: "earth", population: 7500000000, water: true }
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
"Collect the clutter of apples!"
test result: ok. 147 passed; 0 failed
q)What are the three largest elements?: `belief`love`hope

依赖项

~0.3–15MB
~160K SLoC