#api #ipc #时间序列数据库 #进程 #共享 #kdb #数据库接口

kdbplus

为 API 提供双 q/kdb+ 接口,以构建共享库和 IPC 客户端

10 个版本

0.3.9 2024年6月29日
0.3.8 2022年12月21日
0.3.7 2022年2月5日
0.2.3 2021年10月17日
0.2.0 2021年7月30日

#72异步

Download history 25/week @ 2024-05-04 85/week @ 2024-05-11 138/week @ 2024-05-18 63/week @ 2024-05-25 158/week @ 2024-06-01 76/week @ 2024-06-08 117/week @ 2024-06-15 47/week @ 2024-06-22 357/week @ 2024-06-29 102/week @ 2024-07-06 67/week @ 2024-07-13 58/week @ 2024-07-20 110/week @ 2024-07-27 74/week @ 2024-08-03 202/week @ 2024-08-10 143/week @ 2024-08-17

543 每月下载量

Apache-2.0

470KB
6K SLoC

Rust 双接口用于 q/kdb+

由于 Rust 是一种性能和类型安全度都很高的流行编程语言,人们希望用它与仍然非常流行的时序数据库 kdb+ 一起使用。这种愿望是可以理解的,因为我们知道 kdb+ 很快,它的接口或共享库也应该很快。这个接口就是为了满足这种自然的需求而创建的,而且用户使用起来不会有任何痛苦。q/kdb+ C API 中著名的神秘函数名并不是 Rust 开发者的兴趣所在。

"给我们一个 Rust 接口!!"

这就是你的选择。

该接口提供两个功能

  • IPC 接口(q/kdb+ 进程的 Rust 客户端)
  • API(为 q/kdb+ 构建共享库)

下面是每个功能的详细描述。

q/kdb+ 的 Rust IPC 接口

由于 Rust 是为了解决 C/C++ 的类型不安全而设计的,如果可能的话,用 Rust 替换 C/C++ 是可行的。这个接口旨在用作 q/kdb+ 进程的 Rust 客户端,它发送查询并接收响应。对 kdb+ 的查询支持两种方式

消息的压缩/解压缩也遵循 kdb+ 实现

至于连接方法,由于 q/kdb+ 的协议,通常客户端接口不提供监听器。但是,有时 Rust 进程正在连接到上游,而 q/kdb+ 随后启动或更频繁地重启。然后提供监听器方法是一个自然的方向,这里已经实现了。以下方式支持连接到 kdb+

  • TCP
  • TLS
  • Unix 域套接字

此外,为了提高互操作性,提供了一些铸造、获取和设置方法。

环境变量

此crate使用q-native或crate特定的环境变量。

  • KDBPLUS_ACCOUNT_FILE:一个凭证文件路径,用于接收者加载以管理来自q客户端的访问。此文件每行包含一个用户名和SHA-1散列密码,由':'分隔,没有任何空格。例如,包含两个凭证的文件"mattew:oracle""reluctant:slowday"看起来像这样

    mattew:431364b6450fc47ccdbf6a2205dfdb1baeb79412
    reluctant:d03f5cc1cdb11a77410ee34e26ca1102e67a893c
    

    可以使用q中的函数.Q.sha1生成散列密码

    q).Q.sha1 "slowday"
    0xd03f5cc1cdb11a77410ee34e26ca1102e67a893c
    
  • KDBPLUS_TLS_KEY_FILEKDBPLUS_TLS_KEY_FILE_SECRET:TLS接收者使用的pkcs12文件及其密码。

  • QUDSPATH(可选):定义抽象命名空间的q-native环境变量。此环境变量也被UDS接收者使用。如果定义了此环境变量,抽象命名空间将是@${QUDSPATH}/kx.[服务器进程端口];否则,它将是@/tmp/kx.[服务器进程端口]

注意

  • 消息将以OS本地端序发送。
  • 当使用此crate作为TLS客户端时,您需要在q侧设置两个环境变量KX_SSL_CERT_FILEKX_SSL_KEY_FILE,以便q/kdb+作为TLS服务器工作。有关详细信息,请参阅KX网站

类型映射

所有类型都表示为K结构体,它与api模块中的K结构体非常相似,但其结构针对IPC使用和交互的便利性进行了优化。下表显示了每个q类型用于构建K对象的输入类型。请注意,输入类型可能与内部类型不同。例如,时间戳的输入类型为chrono::DateTime<Utc>,但其内部类型是表示自2000.01.01D00:00:00以来经过的纳秒数的i64

q Rust
bool bool
GUID [u8; 16]
byte u8
short i16
int i32
long i64
real f32
float f64
char char
symbol String
timestamp chrono::DateTime<Utc>
month chrono::NaiveDate
date chrono::NaiveDate
datetime chrono::DateTime<Utc>
timespan chrono::Duration
minute chrono::Duration
second chrono::Duration
time chrono::Duration
list Vec<Item>Item是上述相应的类型)
复合列表 Vec<K>
表格 Vec<K>
字典 Vec<K>
()

示例

客户端

use kdbplus::ipc::*;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {

    // Connect to qprocess running on localhost:5000 via UDS
    let mut socket = QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
    println!("Connection type: {}", socket.get_connection_type());

    // Set remote function with asynchronous message
    socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;

    // Send a query synchronously
    let mut result = socket.send_sync_message(&"collatz[12]").await?;
    println!("collatz[12]: {}", result);

    result = socket.send_sync_message(&"collatz[`a]").await?;
    println!("collatz[`a]: {}", result);

    // Send a functional query.
    let mut message = K::new_compound_list(vec![K::new_symbol(String::from("collatz")), K::new_long(100)]);
    result = socket.send_sync_message(&message).await?;
    println!("collatz[100]: {}", result);

    // Modify query to (`collatz; 20)
    message.pop().unwrap();
    message.push(&K::new_long(20)).unwrap();
    result=socket.send_sync_message(&message).await?;
    println!("collatz[20]: {}", result);

    // Send a functional asynchronous query.
    message = K::new_compound_list(vec![K::new_string(String::from("show"), qattribute::NONE), K::new_symbol(String::from("goodbye"))]);
    socket.send_async_message(&message).await?;

    socket.shutdown().await?;

    Ok(())
}

监听器

use kdbplus::ipc::*;

#[tokio::main]
async fn main() -> Result<()> {

  // Start listenening over TCP at the port 7000 with authentication enabled.
  let mut socket_tcp = QStream::accept(ConnectionMethod::TCP, "127.0.0.1", 7000).await?;

  // Send a query with the socket.
  let greeting = socket_tcp.send_sync_message(&"string `Hello").await?;
  println!("Greeting: {}", greeting);

  socket_tcp.shutdown().await?;

  Ok(())
}

然后,一个客户端可以通过acceptor的主机、端口和在KDBPLUS_ACCOUNT_FILE中配置的凭据来连接到这个acceptor

q)h:hopen `::7000:reluctant:slowday

安装

Cargo.toml中将kdbplus用作库名称,并使用"ipc"功能。

[dependencies]
kdbplus={version="^0.3", features=["ipc"]}

q/kdb+ C API的Rust包装器

编程语言q(kdb+是用q编写的数据库)只提供C API,但有时外部库提供Rust接口而不是C/C++接口。从其性能来看,Rust仍然可以构建kdb+的共享库。此库旨在满足这种自然需求(如果你愿意,可以称之为愿望)。由于除了创建这样的包装器之外别无他法来编写kdb+的共享库,因此提供此包装器可能是合理的,并且在这里已经实现了。

为了避免编写过大的unsafe块,从而导致优化不良,大多数本地C API函数都提供了一个包装函数,该函数具有一些符合人体工程学的安全性,并以直观的实现作为特性和方法。唯一的例外是使用省略号(...)作为其参数的knkk。这些函数在native命名空间中提供,与其它C API函数一起。

注意:此库旨在用于构建共享库;因此,删除了一些无关函数。例如,连接到kdb+的连接函数khpu不包括在内。

安装

Cargo.toml中将kdbplus用作库名称,并使用"api"功能。

[dependencies]
kdbplus={version="^0.3", features=["api"]}

示例

使用C API包装器的示例包括在api_examples文件夹中。这些示例与kdbplus::api模块中的示例相对应,并且这些函数也用于库的简单测试。测试是在tests/下的test.q中通过加载从示例构建的共享库中定义的函数来进行的。

以下是一些示例

C API样式

use kdbplus::qtype;
use kdbplus::api::*;
use kdbplus::api::native::*;

#[no_mangle]
pub extern "C" fn create_symbol_list(_: K) -> K {
    unsafe{
        let mut list=ktn(qtype::SYMBOL_LIST as i32, 0);
        js(&mut list, ss(str_to_S!("Abraham")));
        js(&mut list, ss(str_to_S!("Isaac")));
        js(&mut list, ss(str_to_S!("Jacob")));
        js(&mut list, sn(str_to_S!("Josephine"), 6));
        list
    }
}
 
#[no_mangle]
pub extern "C" fn catchy(func: K, args: K) -> K {
    unsafe{
        let result=ee(dot(func, args));
        if (*result).qtype == qtype::ERROR{
            println!("error: {}", S_to_str((*result).value.symbol));
            // Decrement reference count of the error object
            r0(result);
            KNULL
        } else {
            result
        }
    }
}

#[no_mangle]
pub extern "C" fn dictionary_list_to_table() -> K {
    unsafe{
        let dicts = knk(3);
        let dicts_slice = dicts.as_mut_slice::<K>();
        for i in 0..3 {
            let keys = ktn(qtype::SYMBOL_LIST as i32, 2);
            let keys_slice = keys.as_mut_slice::<S>();
            keys_slice[0] = ss(str_to_S!("a"));
            keys_slice[1] = ss(str_to_S!("b"));
            let values = ktn(qtype::INT_LIST as i32, 2);
            values.as_mut_slice::<I>()[0..2].copy_from_slice(&[i*10, i*100]);
            dicts_slice[i as usize] = xD(keys, values);
        }
        // Format list of dictionary as a table.
        // ([] a: 0 10 20i; b: 0 100 200i)
        k(0, str_to_S!("{[dicts] -1 _ dicts, (::)}"), dicts, KNULL)
    } 
}

q可以使用这些函数像这样

q)summon:`libc_api_examples 2: (`create_symbol_list; 1)
q)summon[]
`Abraham`Isaac`Jacob`Joseph
q)`Abraham`Isaac`Jacob`Joseph ~ summon[]
q)catchy: `libc_api_examples 2: (`catchy; 2);
q)catchy[$; ("J"; "42")]
42
q)catchy[+; (1; `a)]
error: type
q)behold: `libc_api_examples 2: (`dictionary_list_to_table; 1);
q)behold[]
a  b  
------
0  0  
10 100
20 200

Rust样式

以下示例中没有unsafe代码。您可以看到包装函数在代码中的舒适度。

use kdbplus::qtype;
use kdbplus::api::*;
use kdbplus::api::native::*;

#[no_mangle]
pub extern "C" fn create_symbol_list2(_: K) -> K {
    let mut list = new_list(qtype::SYMBOL_LIST, 0);
    list.push_symbol("Abraham").unwrap();
    list.push_symbol("Isaac").unwrap();
    list.push_symbol("Jacob").unwrap();
    list.push_symbol_n("Josephine", 6).unwrap();
    list
}

#[no_mangle]
fn no_panick(func: K, args: K) -> K {
    let result = error_to_string(apply(func, args));
    if let Ok(error) = result.get_error_string() {
        println!("FYI: {}", error);
        // Decrement reference count of the error object which is no longer used.
        decrement_reference_count(result);
        KNULL
    }
    else{
        println!("success!");
        result
    }
}

#[no_mangle]
pub extern "C" fn create_table2(_: K) -> K {
    // Build keys
    let keys = new_list(qtype::SYMBOL_LIST, 2);
    let keys_slice = keys.as_mut_slice::<S>();
    keys_slice[0] = enumerate(str_to_S!("time"));
    keys_slice[1] = enumerate_n(str_to_S!("temperature_and_humidity"), 11);

    // Build values
    let values = new_list(qtype::COMPOUND_LIST, 2);
    let time = new_list(qtype::TIMESTAMP_LIST, 3);
    // 2003.10.10D02:24:19.167018272 2006.05.24D06:16:49.419710368 2008.08.12D23:12:24.018691392
    time.as_mut_slice::<J>().copy_from_slice(&[119067859167018272_i64, 201766609419710368, 271897944018691392]);
    let temperature = new_list(qtype::FLOAT_LIST, 3);
    temperature.as_mut_slice::<F>().copy_from_slice(&[22.1_f64, 24.7, 30.5]);
    values.as_mut_slice::<K>().copy_from_slice(&[time, temperature]);
    
    flip(new_dictionary(keys, values))
}

以下是q代码

q)summon:`libc_api_examples 2: (`create_symbol_list2; 1)
q)summon[]
`Abraham`Isaac`Jacob`Joseph
q)chill: `libc_api_examples 2: (`no_panick; 2);
q)chill[$; ("J"; "42")]
success!
42
q)chill[+; (1; `a)]
FYI: type
q)climate_change: libc_api_examples 2: (`create_table2; 1);
q)climate_change[]
time                          temperature
-----------------------------------------
2003.10.10D02:24:19.167018272 22.1       
2006.05.24D06:16:49.419710368 24.7       
2008.08.12D23:12:24.018691392 30.5  

测试

测试以两种方式进行

  1. 使用Cargo
  2. 运行q测试脚本
1. 使用Cargo

在开始测试之前,您需要在端口5000上启动一个q进程

kdbplus]$ q -p 5000
q)

然后运行cargo test

kdbplus]$ cargo test

注意:目前文档中的api示例有20个测试失败。这是因为在api的本质上,示例没有main函数,但仍然使用#[macro_use]

2. 运行q测试脚本

使用tests/test.q执行测试,通过加载在api_examples中构建的示例函数。

kdbplus]$ cargo build
kdbplus]$ cp target/debug/libapi_examples.so tests/
kdbplus]$ cd tests
tests]$ q test.q
Initialized something, probably it is your mindset.
bool: true
bool: false
byte: 0xc4
GUID: 8c6b-8b-64-68-156084
short: 10
int: 42
int: 122
int: 7336
int: 723
int: 14240
int: 2056636
long: -109210
long: 43200123456789
long: -325389000000021
long: 0
real: 193810.31
float: -37017.09330000
float: 742.41927468
char: "k"
symbol: `locust
string: "gnat"
string: "grasshopper"
error: type
What do you see, son of man?: a basket of summer fruit
What do you see, son of man?: boiling pot, facing away from the north
symbol: `rust
success!
FYI: type
this is KNULL
Planet { name: "earth", population: 7500000000, water: true }
Planet { name: "earth", population: 7500000000, water: true }
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
おいしい!
"Collect the clutter of apples!"
test result: ok. 147 passed; 0 failed
q)What are the three largest elements?: `belief`love`hope

使用此库的项目

文档

此crate的文档本身位于crates.io页面

有关C API本身的详细信息,请查阅KX网站的文档。

许可证

kdbplus遵循Apache2.0许可证。

请参阅库中的许可证

依赖项

~0.3–12MB
~151K SLoC