71 个版本 (28 个破坏性更新)
新版本 0.29.2 | 2024年8月24日 |
---|---|
0.28.2 | 2024年6月13日 |
0.26.1 | 2024年1月29日 |
0.25.0 | 2023年10月24日 |
0.9.0 | 2023年3月29日 |
#68 在 数据库接口 中
每月下载量 866
用于 pgx_pgmq
82KB
1K SLoC
Postgres 消息队列 (PGMQ)
PGMQ 是一个轻量级的分布式消息队列。它类似于 AWS SQS 和 RSMQ,但它是 Postgres 的本地实现。
消息队列允许您解耦和连接微服务。以可扩展的方式在组件之间发送、存储和接收消息,而不会丢失消息或需要其他服务可用。
PGMQ 由 Tembo 创建。我们的目标是使整个 Postgres 生态系统对每个人开放。我们正在构建一个简化到极致的 Postgres 平台,旨在以开发者为中心且易于扩展。PGMQ 是该项目的组成部分。
该项目包含两个 API,一个是纯 Rust 客户端库,另一个是围绕 Postgres 扩展封装的 Rust SDK。
Postgres 扩展的 Rust 客户端
。这为您提供了类似于 ORM 的体验,并使得管理连接池、事务以及序列化和反序列化变得更加容易。
use pgmq::PGMQueueExt;
纯 Rust 客户端
。这提供了最小功能,但可以用于任何现有的 Postgres 实例。
use pgmq::PGMQueue;
快速入门
- 首先,您需要安装 Postgres。在这个例子中,我们使用容器。
docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
git clone https://github.com/tembo-io/pgmq.git
cd pgmq-rs
cargo run --example basic
快速查看最小示例
use pgmq::{errors::PgmqError, Message, PGMQueue};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[tokio::main]
async fn main() -> Result<(), PgmqError> {
// Initialize a connection to Postgres
println!("Connecting to Postgres");
let queue: PGMQueue = PGMQueue::new("postgres://postgres:[email protected]:5432".to_owned())
.await
.expect("Failed to connect to postgres");
// Create a queue
println!("Creating a queue 'my_queue'");
let my_queue = "my_example_queue".to_owned();
queue.create(&my_queue)
.await
.expect("Failed to create queue");
// Structure a message
#[derive(Serialize, Debug, Deserialize)]
struct MyMessage {
foo: String,
}
let message = MyMessage {
foo: "bar".to_owned(),
};
// Send the message
let message_id: i64 = queue
.send(&my_queue, &message)
.await
.expect("Failed to enqueue message");
// Use a visibility timeout of 30 seconds
// Once read, the message will be unable to be read
// until the visibility timeout expires
let visibility_timeout_seconds: i32 = 30;
// Read a message
let received_message: Message<MyMessage> = queue
.read::<MyMessage>(&my_queue, Some(&visibility_timeout_seconds))
.await
.unwrap()
.expect("No messages in the queue");
println!("Received a message: {:?}", received_message);
assert_eq!(received_message.msg_id, message_id);
// archive the messages
let _ = queue.archive(&my_queue, &received_message.msg_id)
.await
.expect("Failed to archive message");
println!("archived the messages from the queue");
Ok(())
}
事务
您可以在事务中执行PGMQ的所有操作,以及其他数据库操作。请参阅事务示例或使用以下命令运行示例:
cargo run --example transactions
发送消息
您可以使用queue.send()
一次发送一条消息,或使用queue.send_batch()
发送多条消息。这些方法可以传递任何实现了serde::Serialize
的类型。这意味着您可以以JSON或结构体的形式准备您的消息。
读取消息
读取消息将使其在可见性超时(vt)期间不可见(不可消费)。当队列空或所有消息都不可见时,不返回任何消息。
消息可以解析为serde_json::Value或结构体。 queue.read()
返回一个Result<Option<Message<T>>, PgmqError>
,其中T
是队列中消息的类型。当解析消息时出现问题时(PgmqError::JsonParsingError
)或PGMQ无法连接到postgres时(PgmqError::DatabaseError
),将返回错误。请注意,当解析为结构体时(例如,您期望MyMessage{foo: "bar"}
),并且消息的数据与结构体定义不对应(例如,{"hello": "world"}
),将返回错误,并且像上面的示例中那样解包此结果将导致panic,因此您更希望正确处理这种情况。
使用queue.read()
读取单个消息或使用queue.read_batch()
读取尽可能多的消息。
存档或删除消息
处理完消息后,从队列中删除它。您可以选择完全.delete()
,或者.archive()
消息。存档的消息将从队列中删除并插入到队列的存档表中。删除的消息只是被删除。
使用SQL读取队列存档中的消息
SELECT *
FROM pgmq.a_{your_queue_name};
许可证:PostgreSQL
依赖项
~37–50MB
~875K SLoC