2个版本
0.1.1 | 2024年6月16日 |
---|---|
0.1.0 | 2024年5月9日 |
#1113 in 网络编程
22KB
223 行
戈佐
NATS消息调度器。
戈佐连接到NATS服务器(启用JetStream),监听主题为gozo
、回复主题、负载和相关信息头部的消息。当接收到一个时,它将安排发送一个带有回复主题作为消息主题、相同的负载和用法部分中描述的相关头部的消息。由于所有通信都通过NATS进行,因此戈佐可以用任何具有客户端库的编程语言使用。
选项
选项可以通过两种方式设置:命令行或环境变量。以下列出命令行选项。所有环境变量均以大写形式书写,并以NATS_
前缀,例如:--address
选项变为NATS_ADDRESS
。启动时,gozo从/etc/gozo.env
和./gozo.env
文件中读取环境变量(如果存在)。如果同时设置了命令行选项和环境变量,则使用命令行选项。
- -a, --address: 设置NATS服务器地址,默认:nats://127.0.0.1:4222
- -s, --secure: 强制安全(TLS)连接
- -t, --token: 设置令牌认证
- -u, --user: 设置用户名
- -p, --password: 设置密码
- -c, --cert: 设置客户端TLS证书
- -k, --key: 设置客户端TLS密钥
- -n, --nkey: 设置NKey认证
- -j, --jwt: 设置凭据文件路径
用法
为了可读性,以下所有示例均以Python编写。
使用绝对时间安排消息
Gozo-When
头包含一个字符串,表示消息将被发送回的Unix纪元时间戳。
Gozo-Id
头是可选的,但如果存在,它必须是唯一的。具有Gozo-Id
头的安排可以被覆盖或取消。此外,具有Gozo-Id
头的安排被写入NATS键/值存储,因此当gozo重新启动时它们不会被丢弃。
import asyncio
import nats
import time
import uuid
async def message_cb(msg):
print(msg.subject) # 'reply.subject'
print(msg.data) # b'Hello Gozo!'
print(msg.headers) # {'Gozo-Reply': 'Yes', 'Gozo-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3'}
async def main():
nc = await nats.connect('nats://127.0.0.1:4222')
sub = await nc.subscribe('reply.subject', cb=message_cb)
headers = {
'Gozo-When': str(int(time.time()) + 10),
'Gozo-Id': str(uuid.uuid4()),
}
await nc.publish('gozo', b'Hello Gozo!', 'reply.subject', headers)
await asyncio.sleep(11)
if __name__ == '__main__':
asyncio.run(main())
使用相对时间安排消息
这次 Gozo-When
头部包含一个以加号“+”开头的字符串,后面跟消息发送回之前的秒数。
import asyncio
import nats
import uuid
async def message_cb(msg):
print(msg.subject) # 'reply.subject'
print(msg.data) # b'Hello Gozo!'
print(msg.headers) # {'Gozo-Reply': 'Yes', 'Gozo-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3'}
async def main():
nc = await nats.connect('nats://127.0.0.1:4222')
sub = await nc.subscribe('reply.subject', cb=message_cb)
headers = {
'Gozo-When': '+10',
'Gozo-Id': str(uuid.uuid4()),
}
await nc.publish('gozo', b'Hello Gozo!', 'reply.subject', headers)
await asyncio.sleep(11)
if __name__ == '__main__':
asyncio.run(main())
取消计划
要取消计划,发送一个头部为 Gozo-Del-Id
的消息,并将相应的ID设置为它的值。
import asyncio
import nats
async def main():
nc = await nats.connect('nats://127.0.0.1:4222')
headers = {
'Gozo-Del-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3',
}
await nc.publish('gozo', b'', None, headers)
if __name__ == '__main__':
asyncio.run(main())
依赖项
~22–34MB
~650K SLoC