6个版本

0.0.4 2023年2月8日
0.0.3-alpha.12023年2月8日
0.0.2 2023年2月7日
0.0.1-alpha2023年2月6日

1364数据库接口

每月21次 下载

MIT 许可证

19KB
289

Postgres消息队列

Postgres的一个轻量级消息队列扩展。提供类似AWS SQS和Redis Simple Message Queue的体验,但基于Postgres。

安装

TODO docker run ...

Python示例

连接到Postgres

import json
import pprint

from sqlalchemy import create_engine, text

engine = create_engine("postgresql://postgres:postrgres@localhost:28814/pgx_pgmq")

创建和列出队列

with engine.connect() as con:
    # create a queue
    created = con.execute(text( "select * from pgmq_create('myqueue');"))
    # list queues
    list_queues = con.execute(text( "select * from pgmq_list_queues()"))
    column_names = list_queues.keys()
    rows = list_queues.fetchall()
    print("### Queues ###")
    for row in rows:
        pprint.pprint(dict(zip(column_names, row)))
'### Queues ###'
{'created_at': datetime.datetime(2023, 2, 7, 2, 5, 39, 946356, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
 'queue_name': 'myqueue'}

向队列发送消息

with engine.connect() as con:
    # send a message
    msg = json.dumps({"yolo": 42})
    msg_id = con.execute(text(f"select * from pgmq_send('x', '{msg}') as msg_id;"))
    column_names = msg_id.keys()
    rows = msg_id.fetchall()
    print("### Message ID ###")
    for row in rows:
        pprint.pprint(dict(zip(column_names, row)))
'### Message ID ###'
{'msg_id': 1}

从队列读取消息

with engine.connect() as con:
    # read a message, make it unavailable to be read again for 5 seconds
    read = con.execute(text("select * from pgmq_read('x', 5);"))
    column_names = read.keys()
    rows = read.fetchall()
    print("### Read Message ###")
    for row in rows:
        pprint.pprint(dict(zip(column_names, row)))
'### Read Message ###'
{'enqueued_at': datetime.datetime(2023, 2, 7, 2, 51, 50, 468837, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800))),
 'message': {'myqueue': 42},
 'msg_id': 1,
 'read_ct': 1,
 'vt': datetime.datetime(2023, 2, 7, 16, 9, 4, 826669, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=64800)))}

从队列删除消息

with engine.connect() as con:
    # delete a message
    deleted = con.execute(text("select pgmq_delete('x', 1);"))
    column_names = deleted.keys()
    rows = deleted.fetchall()
    print("### Message Deleted ###")
    for row in rows:
        pprint.pprint(dict(zip(column_names, row)))
'### Message Deleted ###'
{'pgmq_delete': True}

SQL示例

CREATE EXTENSION pgmq;

创建队列

SELECT pgmq_create('my_queue');

 pgmq_create
-------------

发送消息

pgmq=# SELECT * from pgmq_send('my_queue', '{"foo": "bar"}');
 pgmq_send
--------------
            1

读取消息

从队列中读取一条消息。使其在30秒内不可见。

pgmq=# SELECT * from pgmq_read('my_queue', 30);

 msg_id | read_ct |              vt               |          enqueued_at          |    message
--------+---------+-------------------------------+-------------------------------+---------------
      1 |       2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}

如果队列为空,或者所有消息当前都不可见,则不会返回任何行。

pgx_pgmq=# SELECT * from pgmq_read('my_queue', 30);
 msg_id | read_ct | vt | enqueued_at | message
--------+---------+----+-------------+---------

弹出消息

读取一条消息并立即从队列中删除它。如果队列为空,则返回 None

pgmq=# SELECT * from pgmq_pop('my_queue');

 msg_id | read_ct |              vt               |          enqueued_at          |    message
--------+---------+-------------------------------+-------------------------------+---------------
      1 |       2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar"}

存档消息

存档消息将将其从队列中删除,并插入到存档表中。TODO

删除消息

从名为 my_queue 的队列中删除ID为 1 的消息。

pgmq=# select pgmq_delete('my_queue', 1);
 pgmq_delete
-------------
 t

开发

设置 pgx

cargo install --locked cargo-pgx
cargo pgx init

然后,克隆此存储库并切换到该目录。

git clone [email protected]:CoreDB-io/coredb.git
cd coredb/extensions/pgmq/

运行开发环境

cargo pgx run pg14

打包

运行此脚本以打包成 .deb 文件,该文件可以安装在Ubuntu上。

/bin/bash build-extension.sh

依赖关系

~28–43MB
~849K SLoC