2个版本

0.1.1 2024年6月16日
0.1.0 2024年5月9日

#1113 in 网络编程

MPL-2.0 许可证

22KB
223

戈佐

NATS消息调度器。

戈佐连接到NATS服务器(启用JetStream),监听主题为gozo、回复主题、负载和相关信息头部的消息。当接收到一个时,它将安排发送一个带有回复主题作为消息主题、相同的负载和用法部分中描述的相关头部的消息。由于所有通信都通过NATS进行,因此戈佐可以用任何具有客户端库的编程语言使用。

选项

选项可以通过两种方式设置:命令行或环境变量。以下列出命令行选项。所有环境变量均以大写形式书写,并以NATS_前缀,例如:--address选项变为NATS_ADDRESS。启动时,gozo从/etc/gozo.env./gozo.env文件中读取环境变量(如果存在)。如果同时设置了命令行选项和环境变量,则使用命令行选项。

  • -a, --address: 设置NATS服务器地址,默认:nats://127.0.0.1:4222
  • -s, --secure: 强制安全(TLS)连接
  • -t, --token: 设置令牌认证
  • -u, --user: 设置用户名
  • -p, --password: 设置密码
  • -c, --cert: 设置客户端TLS证书
  • -k, --key: 设置客户端TLS密钥
  • -n, --nkey: 设置NKey认证
  • -j, --jwt: 设置凭据文件路径

用法

为了可读性,以下所有示例均以Python编写。

使用绝对时间安排消息

Gozo-When头包含一个字符串,表示消息将被发送回的Unix纪元时间戳。

Gozo-Id头是可选的,但如果存在,它必须是唯一的。具有Gozo-Id头的安排可以被覆盖或取消。此外,具有Gozo-Id头的安排被写入NATS键/值存储,因此当gozo重新启动时它们不会被丢弃。

import asyncio
import nats
import time
import uuid

async def message_cb(msg):
	print(msg.subject) # 'reply.subject'
	print(msg.data) # b'Hello Gozo!'
	print(msg.headers) # {'Gozo-Reply': 'Yes', 'Gozo-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3'}

async def main():
	nc = await nats.connect('nats://127.0.0.1:4222')
	sub = await nc.subscribe('reply.subject', cb=message_cb)
	headers = {
		'Gozo-When': str(int(time.time()) + 10),
		'Gozo-Id': str(uuid.uuid4()),
	}
	await nc.publish('gozo', b'Hello Gozo!', 'reply.subject', headers)
	await asyncio.sleep(11)

if __name__ == '__main__':
	asyncio.run(main())

使用相对时间安排消息

这次 Gozo-When 头部包含一个以加号“+”开头的字符串,后面跟消息发送回之前的秒数。

import asyncio
import nats
import uuid

async def message_cb(msg):
	print(msg.subject) # 'reply.subject'
	print(msg.data) # b'Hello Gozo!'
	print(msg.headers) # {'Gozo-Reply': 'Yes', 'Gozo-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3'}

async def main():
	nc = await nats.connect('nats://127.0.0.1:4222')
	sub = await nc.subscribe('reply.subject', cb=message_cb)
	headers = {
		'Gozo-When': '+10',
		'Gozo-Id': str(uuid.uuid4()),
	}
	await nc.publish('gozo', b'Hello Gozo!', 'reply.subject', headers)
	await asyncio.sleep(11)

if __name__ == '__main__':
	asyncio.run(main())

取消计划

要取消计划,发送一个头部为 Gozo-Del-Id 的消息,并将相应的ID设置为它的值。

import asyncio
import nats

async def main():
	nc = await nats.connect('nats://127.0.0.1:4222')
	headers = {
		'Gozo-Del-Id': 'c8d816dd-578b-47ff-84c1-031f3ee7ade3',
	}
	await nc.publish('gozo', b'', None, headers)

if __name__ == '__main__':
	asyncio.run(main())

依赖项

~22–34MB
~650K SLoC