6个版本
0.0.4 | 2023年2月8日 |
---|---|
0.0.3-alpha.1 | 2023年2月8日 |
0.0.2 | 2023年2月7日 |
0.0.1-alpha | 2023年2月6日 |
1364 在 数据库接口 中
每月21次 下载
19KB
289 行
Postgres消息队列
Postgres的一个轻量级消息队列扩展。提供类似AWS SQS和Redis Simple Message Queue的体验,但基于Postgres。
安装
TODO docker run ...
Python示例
连接到Postgres
import json
import pprint
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://postgres:postrgres@localhost:28814/pgx_pgmq")
创建和列出队列
with engine.connect() as con:
# create a queue
created = con.execute(text( "select * from pgmq_create('myqueue');"))
# list queues
list_queues = con.execute(text( "select * from pgmq_list_queues()"))
column_names = list_queues.keys()
rows = list_queues.fetchall()
print("### Queues ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Queues ###'
{'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'queue_name': 'myqueue'}
向队列发送消息
with engine.connect() as con:
# send a message
msg = json.dumps({"yolo": 42})
msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;"))
column_names = msg_id.keys()
rows = msg_id.fetchall()
print("### Message ID ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Message ID ###'
{'msg_id': 1}
从队列读取消息
with engine.connect() as con:
# read a message, make it unavailable to be read again for 5 seconds
read = con.execute(text("select * from pgmq_read('x', 5);"))
column_names = read.keys()
rows = read.fetchall()
print("### Read Message ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Read Message ###'
{'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
'message': {'myqueue': 42},
'msg_id': 1,
'read_ct': 1,
'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800)))}
从队列删除消息
with engine.connect() as con:
# delete a message
deleted = con.execute(text("select pgmq_delete('x', 1);"))
column_names = deleted.keys()
rows = deleted.fetchall()
print("### Message Deleted ###")
for row in rows:
pprint.pprint(dict(zip(column_names, row)))
'### Message Deleted ###'
{'pgmq_delete': True}
SQL示例
CREATE EXTENSION pgmq;
创建队列
SELECT pgmq_create('my_queue');
pgmq_create
-------------
发送消息
pgmq=# SELECT * from pgmq_send('my_queue', '{"foo": "bar"}');
pgmq_send
--------------
1
读取消息
从队列中读取一条消息。使其在30秒内不可见。
pgmq=# SELECT * from pgmq_read('my_queue', 30);
msg_id | read_ct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}
如果队列为空,或者所有消息当前都不可见,则不会返回任何行。
pgx_pgmq=# SELECT * from pgmq_read('my_queue', 30);
msg_id | read_ct | vt | enqueued_at | message
--------+---------+----+-------------+---------
弹出消息
读取一条消息并立即从队列中删除它。如果队列为空,则返回 None
。
pgmq=# SELECT * from pgmq_pop('my_queue');
msg_id | read_ct | vt | enqueued_at | message
--------+---------+-------------------------------+-------------------------------+---------------
1 | 2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}
存档消息
存档消息将将其从队列中删除,并插入到存档表中。TODO
删除消息
从名为 my_queue
的队列中删除ID为 1
的消息。
pgmq=# select pgmq_delete('my_queue', 1);
pgmq_delete
-------------
t
开发
设置 pgx
。
cargo install --locked cargo-pgx
cargo pgx init
然后,克隆此存储库并切换到该目录。
git clone [email protected]:CoreDB-io/coredb.git
cd coredb/extensions/pgmq/
运行开发环境
cargo pgx run pg14
打包
运行此脚本以打包成 .deb
文件,该文件可以安装在Ubuntu上。
/bin/bash build-extension.sh
依赖关系
~28–43MB
~849K SLoC