2 个不稳定版本
0.2.0 | 2024 年 3 月 1 日 |
---|---|
0.1.0 | 2023 年 5 月 25 日 |
#522 in 编码
100 每月下载量
94KB
2K SLoC
Omniqueue
Omniqueue 是 Rust 队列后端的抽象层。它包括对 RabbitMQ、Redis 流和 SQS 的支持。
Omniqueue 提供了一个高级接口,允许通过 JSON 编码的字节序列发送和接收原始字节序列、任何 serde
Deserialize
和 Serialize
实现者,或者任何为你提供了编码和/或解码函数的任意类型。
它旨在灵活,能够适应现有的队列配置,但提供了一组默认设置,使发送和接收数据变得简单快捷。
如何使用 Omniqueue
具体的配置将取决于所使用的后端,但使用方法大致如下。
-
将
omniqueue
添加到你的Cargo.toml
文件中。默认情况下启用了所有后端,包括 RabbitMQ、Redis(通过它们的流类型)、SQS,以及基于tokio
的 mpsc 通道的内存队列,后者非常适合测试。如果你只需要某些后端,则只需禁用默认功能,并启用所需的任何后端。
-
构建并使用你的队列。
具体的配置类型将取决于你的后端,但通常非常简单,如下所示
let cfg = SqsConfig { queue_dsn: "https://127.0.0.1:9324/queue/queue_name".to_owned(), override_endpoint: true, }; let (producer, mut consumer) = SqsBackend::builder(cfg).build_pair().await?; producer.send_serde_json(&ExampleType::default()).await?; let delivery = c.receive().await?; assert_eq!( delivery.payload_serde_json::<ExampleType>().await?, Some(ExampleType::default()) ); delivery.ack().await?;
返回的生产者和消费者实现了
QueueProducer
和QueueConsumer
特性。这意味着你可以使函数针对任何队列后端进行泛型化。或者,如果你需要动态分派,则只需在构建器中添加一行额外的代码即可let cfg = SqsConfig { queue_dsn: "https://127.0.0.1:9324/queue/queue_name".to_owned(), override_endpoint: true, }; let (producer, mut consumer) = SqsBackend::builder(cfg) .make_dynamic() .build_pair() .await?;
编码器和解码器
该软件包的设计部分是将责任明确分开。队列的使用者应该无需关心任何特定项目在队列后端中的表示方式。相反,他们应该只允许在 Rust 类型中思考。
另一方面,定义要使用哪个后端的用户应该唯一关注将队列的内部表示转换为Rust类型的操作。
输入 CustomEncoder
和 CustomDecoder
:这些只是将常规Rust类型转换为队列后端输入或输出所需的类型的基本闭包或函数指针。
它们的定义和使用方法如下
#[derive(Debug, PartialEq)]
struct ExampleType {
field: u8,
}
let (p, mut c) = RabbitMqBackend::builder(cfg)
// RabbitMQ's internal representation is an arbitrary byte array.
.with_encoder(|et: &ExampleType| -> omniqueue::Result<Vec<u8>> {
Ok(vec![et.field])
})
.with_decoder(|v: &Vec<u8>| -> omniqueue::Result<ExampleType> {
Ok(ExampleType {
field: *v.first().unwrap_or(&0),
})
})
.build_pair()
.await?;
let payload = ExampleType { field: 2 };
p.send_custom(&payload).await?;
let delivery = c.receive().await?;
assert_eq!(d.payload_custom::<ExampleType>()?, Some(payload))
delivery.ack().await?;
假设您有正确类型的编码器和/或解码器,这些函数会自动调用。这使得将crate适配到内部数据布局与默认值不匹配的现有队列变得尽可能简单。
依赖关系
~6–23MB
~368K SLoC