#redis-queue #rsmq #system #async-await #port #methods #original

rsmq_async

将 RSMQ 异步化到 Rust。RSMQ 是一个简单的 Redis 队列系统,适用于任何 Redis v2.4+。它包含了原始版本在 https://github.com/smrchy/rsmq 中的所有方法。

38 个稳定版本 (11 个主要版本)

12.0.0 2024年7月28日
11.2.0 2024年7月12日
11.1.0 2024年5月30日
10.0.0 2024年5月8日
1.0.6 2020年2月1日

#53异步

Download history 6443/week @ 2024-04-24 6373/week @ 2024-05-01 6213/week @ 2024-05-08 6257/week @ 2024-05-15 6380/week @ 2024-05-22 6631/week @ 2024-05-29 6348/week @ 2024-06-05 6198/week @ 2024-06-12 6249/week @ 2024-06-19 6256/week @ 2024-06-26 6401/week @ 2024-07-03 6159/week @ 2024-07-10 6104/week @ 2024-07-17 6627/week @ 2024-07-24 6140/week @ 2024-07-31 5908/week @ 2024-08-07

每月下载量 25,894
2 个 crate 中使用(通过 deepwell

MIT 许可证

66KB
1.5K SLoC

RSMQ 在异步 Rust 中

将 RSMQ 转换为异步 Rust。RSMQ 是一个简单的 Redis 队列系统,适用于任何 Redis v2.6+。它包含了原始版本在 https://github.com/smrchy/rsmq 中的所有方法。

此 crate 在实现中使用了异步。如果您想在同步代码中使用它,可以使用 tokio/async_std 的 "block_on" 方法。使用异步是为了简化代码并实现与 JS 代码的 1:1 转换。

Crates.io Crates.io dependency status Docs

示例


use rsmq_async::{Rsmq, RsmqError, RsmqConnection};

let mut rsmq = Rsmq::new(Default::default()).await?;

let message = rsmq.receive_message::<String>("myqueue", None).await?;

if let Some(message) = message {
    rsmq.delete_message("myqueue", &message.id).await?;
}


主要对象文档在: RsmqPooledRsmq,它们都实现了 RsmqConnection trait,在那里您可以查看所有 RSMQ 方法。请确保您始终导入 RsmqConnection trait。

安装

检查 https://crates.io/crates/rsmq_async

示例

use rsmq_async::{Rsmq, RsmqConnection};

async fn it_works() {
    let mut rsmq = Rsmq::new(Default::default())
        .await
        .expect("connection failed");

    rsmq.create_queue("myqueue", None, None, None)
        .await
        .expect("failed to create queue");

    rsmq.send_message("myqueue", "testmessage", None)
        .await
        .expect("failed to send message");

    let message = rsmq
        .receive_message::<String>("myqueue", None)
        .await
        .expect("cannot receive message");

    if let Some(message) = message {
        rsmq.delete_message("myqueue", &message.id).await;
    }
}

实时

初始化 RSMQ 时,您可以为新消息启用实时 PUBLISH。每当通过 sendMessage 将新消息发送到 RSQM 时,都会发出一个 Redis PUBLISH 命令到 {rsmq.ns}:rt:{qname}。因此,您可以使用 redis-rs 库直接订阅它。

如何使用实时选项

除了将新消息发送到RSMQ时的PUBLISH操作外,不会发生其他任何操作。您的应用程序可以使用Redis SUBSCRIBE命令来通知新消息,然后执行receiveMessage。但是,请确保不要使用多个工作进程通过SUBSCRIBE监听新消息,以防止同时执行多个receiveMessage调用。

同步选项

如果您启用了sync功能,您可以使用带有同步方法版本的RsmqSync对象进行导入。

时间精度

默认情况下,此库与JS版本保持兼容。如果您需要亚秒精度或发送许多非常接近的消息并需要以比一秒更高的精度跟踪它们,您可以在Cargo.toml中启用功能break-js-comp,如下所示:

rsmq_async = { version = "11", features = [ "break-js-comp" ] }

保证

如果您想实现“至少一次投递”保证,您需要使用“receive_message”接收消息,然后,一旦消息被成功处理,使用“delete_message”删除它。

连接池

如果您想使用连接池,只需使用PooledRsmq而不是Rsmq。它实现了与正常Rsmq相同的RsmqConnection特型。

如果您想接受任何一种实现,只需接受特型RsmqConnection

响应类型

有3个函数接受泛型类型

  • pop_messagereceive_message:接收消息的类型为RsmqMessage<E>,其中E: TryFrom<RedisBytes, Error = Vec<u8>>。因此,如果您有自定义类型,您可以实现针对YourCustomTypeTryFrom<RedisBytes>特型,并像这样使用它:rsmq.receive_message::<YourCustomType>("myqueue", None)。提供了对于StringVec<u8>的实现。
  • send_message,其中要发送的消息需要实现Into<RedisBytes> + Send。因此,您需要为您自己的类型实现特型。您可以检查RedisBytes类型的实现,看看我们是如何做到的。提供了对于String&strVec<u8>的实现。

所有这些原因都因为Rust中的字符串非常方便用于JSON消息,因此总是返回Vec可能不是最舒适的方法。但与此同时,我们只需为它添加一些现成的实现,您就可以使用您的类型,或者如果您正在发送图像等,只需使用如下方法:rsmq.receive_message::<Vec<u8>>("myqueue", None)之后将其转换为您的类型。(或者只需为您自己的类型实现TryFrom,转换将会自动进行。)

实现自定义类型的示例

impl TryFrom<RedisBytes> for String {

    // We sacrifice the ability of recovering the original error for the ability of having the
    // original data. If you know how to conserver both, let me know!
    
    type Error = Vec<u8>; // Always set Error as Vec<u8>;

    fn try_from(bytes: RedisBytes) -> Result<Self, Self::Error> {
        String::from_utf8(bytes.0).map_err(|e| e.into_bytes())
    }
}

依赖项

~6–18MB
~267K SLoC