#message-queue #queue #postgresql #messaging #amazon-sqs #decoupling

pgmq

Postgres 上为 Rust 应用提供的分布式消息队列

71 个版本 (28 个破坏性更新)

新版本 0.29.2 2024年8月24日
0.28.2 2024年6月13日
0.26.1 2024年1月29日
0.25.0 2023年10月24日
0.9.0 2023年3月29日

#68数据库接口

Download history 243/week @ 2024-05-04 125/week @ 2024-05-11 379/week @ 2024-05-18 78/week @ 2024-05-25 536/week @ 2024-06-01 323/week @ 2024-06-08 128/week @ 2024-06-15 116/week @ 2024-06-22 128/week @ 2024-06-29 95/week @ 2024-07-06 105/week @ 2024-07-13 150/week @ 2024-07-20 136/week @ 2024-07-27 395/week @ 2024-08-03 139/week @ 2024-08-10 168/week @ 2024-08-17

每月下载量 866
用于 pgx_pgmq

PostgreSQL

82KB
1K SLoC

Postgres 消息队列 (PGMQ)

Latest Version

PGMQ 是一个轻量级的分布式消息队列。它类似于 AWS SQSRSMQ,但它是 Postgres 的本地实现。

消息队列允许您解耦和连接微服务。以可扩展的方式在组件之间发送、存储和接收消息,而不会丢失消息或需要其他服务可用。

PGMQ 由 Tembo 创建。我们的目标是使整个 Postgres 生态系统对每个人开放。我们正在构建一个简化到极致的 Postgres 平台,旨在以开发者为中心且易于扩展。PGMQ 是该项目的组成部分。

该项目包含两个 API,一个是纯 Rust 客户端库,另一个是围绕 Postgres 扩展封装的 Rust SDK。

Postgres 扩展的 Rust 客户端。这为您提供了类似于 ORM 的体验,并使得管理连接池、事务以及序列化和反序列化变得更加容易。

use pgmq::PGMQueueExt;

纯 Rust 客户端。这提供了最小功能,但可以用于任何现有的 Postgres 实例。

use pgmq::PGMQueue;

快速入门

  • 首先,您需要安装 Postgres。在这个例子中,我们使用容器。
docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
git clone https://github.com/tembo-io/pgmq.git

cd pgmq-rs

cargo run --example basic

快速查看最小示例

use pgmq::{errors::PgmqError, Message, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[tokio::main]
async fn main() -> Result<(), PgmqError> {

    // Initialize a connection to Postgres
    println!("Connecting to Postgres");
    let queue: PGMQueue = PGMQueue::new("postgres://postgres:[email protected]:5432".to_owned())
        .await
        .expect("Failed to connect to postgres");

    // Create a queue
    println!("Creating a queue 'my_queue'");
    let my_queue = "my_example_queue".to_owned();
    queue.create(&my_queue)
        .await
        .expect("Failed to create queue");

    // Structure a message
    #[derive(Serialize, Debug, Deserialize)]
    struct MyMessage {
        foo: String,
    }
    let message = MyMessage {
        foo: "bar".to_owned(),
    };
    // Send the message
    let message_id: i64 = queue
        .send(&my_queue, &message)
        .await
        .expect("Failed to enqueue message");

    // Use a visibility timeout of 30 seconds
    // Once read, the message will be unable to be read
    // until the visibility timeout expires
    let visibility_timeout_seconds: i32 = 30;

    // Read a message
    let received_message: Message<MyMessage> = queue
        .read::<MyMessage>(&my_queue, Some(&visibility_timeout_seconds))
        .await
        .unwrap()
        .expect("No messages in the queue");
    println!("Received a message: {:?}", received_message);

    assert_eq!(received_message.msg_id, message_id);

    // archive the messages
    let _ = queue.archive(&my_queue, &received_message.msg_id)
        .await
        .expect("Failed to archive message");
    println!("archived the messages from the queue");
    Ok(())

}

事务

您可以在事务中执行PGMQ的所有操作,以及其他数据库操作。请参阅事务示例或使用以下命令运行示例:

cargo run --example transactions

发送消息

您可以使用queue.send()一次发送一条消息,或使用queue.send_batch()发送多条消息。这些方法可以传递任何实现了serde::Serialize的类型。这意味着您可以以JSON或结构体的形式准备您的消息。

读取消息

读取消息将使其在可见性超时(vt)期间不可见(不可消费)。当队列空或所有消息都不可见时,不返回任何消息。

消息可以解析为serde_json::Value或结构体。 queue.read()返回一个Result<Option<Message<T>>, PgmqError>,其中T是队列中消息的类型。当解析消息时出现问题时(PgmqError::JsonParsingError)或PGMQ无法连接到postgres时(PgmqError::DatabaseError),将返回错误。请注意,当解析为结构体时(例如,您期望MyMessage{foo: "bar"}),并且消息的数据与结构体定义不对应(例如,{"hello": "world"}),将返回错误,并且像上面的示例中那样解包此结果将导致panic,因此您更希望正确处理这种情况。

使用queue.read()读取单个消息或使用queue.read_batch()读取尽可能多的消息。

存档或删除消息

处理完消息后,从队列中删除它。您可以选择完全.delete(),或者.archive()消息。存档的消息将从队列中删除并插入到队列的存档表中。删除的消息只是被删除。

使用SQL读取队列存档中的消息

SELECT *
FROM pgmq.a_{your_queue_name};

许可证:PostgreSQL

依赖项

~37–50MB
~875K SLoC