6 个版本

0.3.4 2023年4月21日
0.3.3 2023年4月20日
0.3.2 2021年9月30日
0.3.1 2021年5月26日
0.2.0 2020年9月3日

#496 in 数据库接口

每月 24 次下载

MIT 许可证

3.5MB
579

包含 (JAR 文件,56KB) gradle-wrapper.jar

MIT Latest Version

目的

Redis 列表非常适合作为分布式工作者的任务队列。工作者可以安全地原子性地取走一个任务,即使在多个工作者监视同一个队列时也是如此。

Resc 是 Redis 的一个可靠且可配置的任务生成器。

规则和 Redis 队列定义在配置文件中,该文件可以是 JSONHjson 格式。

它会监视一个或多个队列中的事件,这些事件可以是任务完成通知或简单的“root”事件,并应用规则生成任务。

它以安全且可监控的方式实现这一点,并注意避免重复任务等问题。

Resc 使用 rust 编写,以确保安全和性能。

其一般工作方式

队列设置

这里我们将有一个非常简单的设置,只有 4 个 Redis 队列和一个类型的任务(“tasks-A”)以及两个准备执行这些任务的工人。

我们不会展示任务去重、记录或发布用于监督的方式。

某些事件生成器将事件推送到“global/done”输入队列

lpush event

除了“global”队列,还可能有多个输入队列,但一个队列通常就足够了,即使有数百个事件源。

Resc 接收事件

brpoplpush event

brpoplpush 是一个原子操作,事件保证不会丢失:它要么在 global/done 中,要么在 global/taken 中。

Resc 应用其规则生成零个或多个任务

lpush task

Resc 从 global/taken 中移除事件

lrem event

它保留在这里,以便在规则计算期间发生崩溃时可以重新播放规则(某些规则可能涉及调用服务器)。

工作者取走任务

brpoplpush task

同样,brpoplpush 是一个原子操作,工作者可以在努力过程中死亡,但知道任务不会丢失。

当工作完成时,工作者通过事件通知它

lpush done

... 然后从其取走的队列中删除任务

lrem task

... 然后继续进行

Resc 现在将应用其规则到 done-1 事件,这可能导致新的任务,或者可能没有更多的事情要做。

当然,在数千个事件、任务甚至工作者的情况下,一切都可以发生。

工作者

Resc作为调度器,假设工作者以这种方式处理任务:

  1. 在队列中挑选一个任务,并原子性地将其移动到“已取”列表中: BRPOPLPUSH myqueue/todo myqueue/taken 0

  2. 执行任务

  3. 清理“已取”列表: LREM myqueue/taken the-task

  4. 通过将任务推送到“完成”队列来通知调度器任务已完成: LPUSH global/done the-task

此方案确保多个工作者可以安全地在同一队列上工作。

您通常希望队列中不包含重复项。您可能仍然希望在处理任务时将任务放入队列中(例如,当有新信息到达时,您可能需要重新计算)。

如果您想要去重任务队列,您可以在配置中声明一个任务集,工作者在从队列中取出任务并在执行它之前,也将它从集合中删除。

在示例目录中提供了Java、Go、Rust和node.js的工作者实现。它们都展示了如何使用(或未使用)去重队列。

入门示例

有关执行此示例的完整说明和业务逻辑解释,请参阅examples/simple-example.md

基于简单正则表达式的任务生成

这是一个简单的Hjson配置文件

{
	redis: {
		url: "redis://127.0.0.1/"
	}
	watchers: [
		{
			input_queue: global/events
			taken_queue: global/taken
			rules: [
				{
					name: TRT computation on data acquisition
					on: "^acq/(?P<process_id>\\w+)/(?P<product_id>\\w+)$"
					make: {
						task: "trt/${process_id}/${product_id}"
						queue: "trt/${process_id}/todo-queue"
						set: "trt/${process_id}/todo-set"
					}
				}
			]
		}
	]
}

可以使用此配置启动Resc

resc myconf.hjson

Resc启动一个监视器,一个线程,用于监视指定的input_queue

当出现新的事件(global/events列表中的字符串)时,它会原子性地移动到global/taken列表,并执行监视器的规则。

假设即将到来的任务是"acq/123/456",那么根据"on""中的正则表达式,我们的示例的第一个(也是唯一的)规则将匹配。

动态生成和赋值的几个变量

process_id = 123
product_id = 456

这些变量用于外推规则中待办部分的任务和队列。

然后创建任务"trt/123/456"

如果"trt/123/todo-set"集合中不包含该任务,则将其添加到该集合中(可能用于监控的时间),然后添加到"trt/123/todo-queue"队列。

执行完此任务的所有规则后,将其从"global/taken"队列中清除,监视器继续监视"global/events"队列以查找其他任务。

日志记录

您通常不需要大量的日志,因此默认日志只包含警告,但在您设置系统期间,您可能想查看进入队列的事件以及生成的任务。

您可以通过设置日志级别为INFO来查看更多内容

RUST_LOG="info" resc myconf.hjson

或者如果您想查看激活了哪些规则

RUST_LOG="debug" resc myconf.hjson

获取一些数据以计算新任务

有时可能需要查询Web服务以计算对事件做出响应要生成的任务。

假设有一个REST服务,它返回当某个其他服务发生变化时可能受到影响的所有元素(例如,客户订单的变化可能涉及重新计算该订单的一些产品有效性)。

如果产品5ab7342600000040上发生某些事件,你想查询

 http://my-web-service/products/5ab7342600000040/direct-children

它以JSON格式返回应重新计算的产品列表

[
	{"processId":634876914,"productId":"5ab7e7dc00000040"},
	{"processId":634876914,"productId":"5ab7ebe800000040"}
]

并且对于这些产品中的每一个,你都想生成一个新任务。

然后相关的规则可能如下所示

{
	name: TRT propagation to children
	on: "^trt/(?P<process_id>\\d+)/(?P<product_id>\\w{16})$"
	fetch: [{
		url: "http://my-web-service/products/${product_id}/direct-children"
		returns: child
	}]
	todo: {
		task: "trt/${child.processId}/${child.productId}"
		queue: "trt/${child.processId}/todo-queue"
		set: "trt/${child.processId}/todo-set"
	}
}

fetch元素描述了HTTP查询以及从web服务的响应中读取并用于生成任务、队列和集合的变量命名空间。

在我们的例子中,我们会得到两个新任务,分别是"trt/634876914/5ab7e7dc00000040"(添加到队列"trt/634876914/todo-queue")和"trt/634876914/5ab7ebe800000040"(添加到队列"trt/634876914/todo-queue")。

切换队列,默认配置值

当你有多个规则,其中一个涉及查询远程服务,如我们的例子所示,你不想所有规则都受到这个远程服务可能缓慢的影响。

这就是你可能想有一个另一个监视器,另一个线程,来处理这些特定任务生成的原因。

为了做到这一点,你希望有一个规则只将任务传递给另一个监视器监视的队列。

让我们称这个新队列为global/to-propagate(你可以根据自己的需求命名队列)。

新的配置变为

{
	"redis": {
		"url": "redis://127.0.0.1/"
	},
	"watchers": [
		{
			"input_queue": "global/events",
			"taken_queue": "global/taken",
			"rules": [
				{
					"name": "TRT computation on data acquisition",
					"on": "^acq/(?P<process_id>\\d+)/(?P<product_id>\\d+)$",
					"make": {
						"task": "trt/${process_id}/${product_id}",
						"queue": "trt/${process_id}/todo-queue",
						"set": "trt/${process_id}/todo-set"
					}
				},
				{
					"name": "TRT propagation to children : switch queue",
					"on": "^trt/(?P<process_id>\\d+)/(?P<product_id>\\w{16})$",
					"make": {
						"queue": "global/to-propagate"
					}
				}
			]
		},
		{
			"input_queue": "global/to-propagate",
			"rules": [
				{
					"name": "TRT propagation to children : make child tasks",
					"on": "^trt/(?P<process_id>\\d+)/(?P<product_id>\\w{16})$",
					"fetch": [{
						"url": "http://my-web-service/products/${product_id}/direct-children",
						"returns": "child"
					}],
					"make": {
						"task": "trt/${child.processId}/${child.productId}",
						"queue": "trt/${child.processId}/todo-queue",
						"set": "trt/${child.processId}/todo-set"
					}
				}
			]
		}
	]
}

这样就没有远程服务可以减慢全局队列管理。

你可能已经注意到配置比预期的要轻。这是因为一些设置是可选的。

当省略时,taken_queue只是input_queue加上/taken。所以这里第二个监视器将使用临时队列global/to-propagate/taken

当省略make/task时,生成的任务与输入任务相同的字符串。更确切地说,make/task的默认值是"${input_task}",其中${input_task}是一个你可以用在你的任务/队列/集合生成中的变量。

许可证

MIT

依赖项

~22–33MB
~570K SLoC