2 个不稳定版本

0.2.0 2024 年 3 月 1 日
0.1.0 2023 年 5 月 25 日

#522 in 编码

Download history 9/week @ 2024-04-07 71/week @ 2024-04-14 23/week @ 2024-04-21 98/week @ 2024-04-28 4/week @ 2024-05-05 19/week @ 2024-05-12 50/week @ 2024-05-19 16/week @ 2024-05-26 58/week @ 2024-06-02 22/week @ 2024-06-09 35/week @ 2024-06-16 26/week @ 2024-06-23 7/week @ 2024-06-30 20/week @ 2024-07-07 13/week @ 2024-07-14 60/week @ 2024-07-21

100 每月下载量

MIT 许可证

94KB
2K SLoC

Omniqueue

Omniqueue 是 Rust 队列后端的抽象层。它包括对 RabbitMQ、Redis 流和 SQS 的支持。

Omniqueue 提供了一个高级接口,允许通过 JSON 编码的字节序列发送和接收原始字节序列、任何 serde DeserializeSerialize 实现者,或者任何为你提供了编码和/或解码函数的任意类型。

它旨在灵活,能够适应现有的队列配置,但提供了一组默认设置,使发送和接收数据变得简单快捷。

如何使用 Omniqueue

具体的配置将取决于所使用的后端,但使用方法大致如下。

  1. omniqueue 添加到你的 Cargo.toml 文件中。默认情况下启用了所有后端,包括 RabbitMQ、Redis(通过它们的流类型)、SQS,以及基于 tokio 的 mpsc 通道的内存队列,后者非常适合测试。

    如果你只需要某些后端,则只需禁用默认功能,并启用所需的任何后端。

  2. 构建并使用你的队列。

    具体的配置类型将取决于你的后端,但通常非常简单,如下所示

    let cfg = SqsConfig {
        queue_dsn: "https://127.0.0.1:9324/queue/queue_name".to_owned(),
        override_endpoint: true,
    };
    let (producer, mut consumer) = SqsBackend::builder(cfg).build_pair().await?;
    
    producer.send_serde_json(&ExampleType::default()).await?;
    
    let delivery = c.receive().await?;
    assert_eq!(
        delivery.payload_serde_json::<ExampleType>().await?,
        Some(ExampleType::default())
    );
    
    delivery.ack().await?;
    

    返回的生产者和消费者实现了 QueueProducerQueueConsumer 特性。这意味着你可以使函数针对任何队列后端进行泛型化。或者,如果你需要动态分派,则只需在构建器中添加一行额外的代码即可

    let cfg = SqsConfig {
        queue_dsn: "https://127.0.0.1:9324/queue/queue_name".to_owned(),
        override_endpoint: true,
    };
    let (producer, mut consumer) = SqsBackend::builder(cfg)
        .make_dynamic()
        .build_pair()
        .await?;
    

编码器和解码器

该软件包的设计部分是将责任明确分开。队列的使用者应该无需关心任何特定项目在队列后端中的表示方式。相反,他们应该只允许在 Rust 类型中思考。

另一方面,定义要使用哪个后端的用户应该唯一关注将队列的内部表示转换为Rust类型的操作。

输入 CustomEncoderCustomDecoder:这些只是将常规Rust类型转换为队列后端输入或输出所需的类型的基本闭包或函数指针。

它们的定义和使用方法如下

#[derive(Debug, PartialEq)]
struct ExampleType {
	field: u8,
}


let (p, mut c) = RabbitMqBackend::builder(cfg)
	// RabbitMQ's internal representation is an arbitrary byte array.
	.with_encoder(|et: &ExampleType| -> omniqueue::Result<Vec<u8>> {
		Ok(vec![et.field])
	})
	.with_decoder(|v: &Vec<u8>| -> omniqueue::Result<ExampleType> {
		Ok(ExampleType {
			field: *v.first().unwrap_or(&0),
		})
	})
	.build_pair()
	.await?;

let payload = ExampleType { field: 2 };

p.send_custom(&payload).await?;

let delivery = c.receive().await?;
assert_eq!(d.payload_custom::<ExampleType>()?, Some(payload))

delivery.ack().await?;

假设您有正确类型的编码器和/或解码器,这些函数会自动调用。这使得将crate适配到内部数据布局与默认值不匹配的现有队列变得尽可能简单。

依赖关系

~6–23MB
~368K SLoC