6 个版本
0.3.4 | 2023年4月21日 |
---|---|
0.3.3 | 2023年4月20日 |
0.3.2 | 2021年9月30日 |
0.3.1 | 2021年5月26日 |
0.2.0 | 2020年9月3日 |
#496 in 数据库接口
每月 24 次下载
3.5MB
579 行
包含 (JAR 文件,56KB) gradle-wrapper.jar
目的
Redis 列表非常适合作为分布式工作者的任务队列。工作者可以安全地原子性地取走一个任务,即使在多个工作者监视同一个队列时也是如此。
Resc 是 Redis 的一个可靠且可配置的任务生成器。
规则和 Redis 队列定义在配置文件中,该文件可以是 JSON 或 Hjson 格式。
它会监视一个或多个队列中的事件,这些事件可以是任务完成通知或简单的“root”事件,并应用规则生成任务。
它以安全且可监控的方式实现这一点,并注意避免重复任务等问题。
Resc 使用 rust 编写,以确保安全和性能。
其一般工作方式
队列设置
这里我们将有一个非常简单的设置,只有 4 个 Redis 队列和一个类型的任务(“tasks-A”)以及两个准备执行这些任务的工人。
我们不会展示任务去重、记录或发布用于监督的方式。
某些事件生成器将事件推送到“global/done”输入队列
除了“global”队列,还可能有多个输入队列,但一个队列通常就足够了,即使有数百个事件源。
Resc 接收事件
brpoplpush
是一个原子操作,事件保证不会丢失:它要么在 global/done 中,要么在 global/taken 中。
Resc 应用其规则生成零个或多个任务
Resc 从 global/taken 中移除事件
它保留在这里,以便在规则计算期间发生崩溃时可以重新播放规则(某些规则可能涉及调用服务器)。
工作者取走任务
同样,brpoplpush
是一个原子操作,工作者可以在努力过程中死亡,但知道任务不会丢失。
当工作完成时,工作者通过事件通知它
... 然后从其取走的队列中删除任务
... 然后继续进行
Resc 现在将应用其规则到 done-1
事件,这可能导致新的任务,或者可能没有更多的事情要做。
当然,在数千个事件、任务甚至工作者的情况下,一切都可以发生。
工作者
Resc作为调度器,假设工作者以这种方式处理任务:
-
在队列中挑选一个任务,并原子性地将其移动到“已取”列表中:
BRPOPLPUSH myqueue/todo myqueue/taken 0
-
执行任务
-
清理“已取”列表:
LREM myqueue/taken the-task
-
通过将任务推送到“完成”队列来通知调度器任务已完成:
LPUSH global/done the-task
此方案确保多个工作者可以安全地在同一队列上工作。
您通常希望队列中不包含重复项。您可能仍然希望在处理任务时将任务放入队列中(例如,当有新信息到达时,您可能需要重新计算)。
如果您想要去重任务队列,您可以在配置中声明一个任务集,工作者在从队列中取出任务并在执行它之前,也将它从集合中删除。
在示例目录中提供了Java、Go、Rust和node.js的工作者实现。它们都展示了如何使用(或未使用)去重队列。
入门示例
有关执行此示例的完整说明和业务逻辑解释,请参阅examples/simple-example.md。
基于简单正则表达式的任务生成
这是一个简单的Hjson配置文件
{
redis: {
url: "redis://127.0.0.1/"
}
watchers: [
{
input_queue: global/events
taken_queue: global/taken
rules: [
{
name: TRT computation on data acquisition
on: "^acq/(?P<process_id>\\w+)/(?P<product_id>\\w+)$"
make: {
task: "trt/${process_id}/${product_id}"
queue: "trt/${process_id}/todo-queue"
set: "trt/${process_id}/todo-set"
}
}
]
}
]
}
可以使用此配置启动Resc
resc myconf.hjson
Resc启动一个监视器,一个线程,用于监视指定的input_queue
。
当出现新的事件(global/events
列表中的字符串)时,它会原子性地移动到global/taken
列表,并执行监视器的规则。
假设即将到来的任务是"acq/123/456"
,那么根据"on""
中的正则表达式,我们的示例的第一个(也是唯一的)规则将匹配。
动态生成和赋值的几个变量
process_id = 123
product_id = 456
这些变量用于外推规则中待办部分的任务和队列。
然后创建任务"trt/123/456"
。
如果"trt/123/todo-set"
集合中不包含该任务,则将其添加到该集合中(可能用于监控的时间),然后添加到"trt/123/todo-queue"
队列。
执行完此任务的所有规则后,将其从"global/taken"
队列中清除,监视器继续监视"global/events"
队列以查找其他任务。
日志记录
您通常不需要大量的日志,因此默认日志只包含警告,但在您设置系统期间,您可能想查看进入队列的事件以及生成的任务。
您可以通过设置日志级别为INFO
来查看更多内容
RUST_LOG="info" resc myconf.hjson
或者如果您想查看激活了哪些规则
RUST_LOG="debug" resc myconf.hjson
获取一些数据以计算新任务
有时可能需要查询Web服务以计算对事件做出响应要生成的任务。
假设有一个REST服务,它返回当某个其他服务发生变化时可能受到影响的所有元素(例如,客户订单的变化可能涉及重新计算该订单的一些产品有效性)。
如果产品5ab7342600000040上发生某些事件,你想查询
http://my-web-service/products/5ab7342600000040/direct-children
它以JSON格式返回应重新计算的产品列表
[
{"processId":634876914,"productId":"5ab7e7dc00000040"},
{"processId":634876914,"productId":"5ab7ebe800000040"}
]
并且对于这些产品中的每一个,你都想生成一个新任务。
然后相关的规则可能如下所示
{
name: TRT propagation to children
on: "^trt/(?P<process_id>\\d+)/(?P<product_id>\\w{16})$"
fetch: [{
url: "http://my-web-service/products/${product_id}/direct-children"
returns: child
}]
todo: {
task: "trt/${child.processId}/${child.productId}"
queue: "trt/${child.processId}/todo-queue"
set: "trt/${child.processId}/todo-set"
}
}
fetch
元素描述了HTTP查询以及从web服务的响应中读取并用于生成任务、队列和集合的变量命名空间。
在我们的例子中,我们会得到两个新任务,分别是"trt/634876914/5ab7e7dc00000040"
(添加到队列"trt/634876914/todo-queue"
)和"trt/634876914/5ab7ebe800000040"
(添加到队列"trt/634876914/todo-queue"
)。
切换队列,默认配置值
当你有多个规则,其中一个涉及查询远程服务,如我们的例子所示,你不想所有规则都受到这个远程服务可能缓慢的影响。
这就是你可能想有一个另一个监视器,另一个线程,来处理这些特定任务生成的原因。
为了做到这一点,你希望有一个规则只将任务传递给另一个监视器监视的队列。
让我们称这个新队列为global/to-propagate
(你可以根据自己的需求命名队列)。
新的配置变为
{
"redis": {
"url": "redis://127.0.0.1/"
},
"watchers": [
{
"input_queue": "global/events",
"taken_queue": "global/taken",
"rules": [
{
"name": "TRT computation on data acquisition",
"on": "^acq/(?P<process_id>\\d+)/(?P<product_id>\\d+)$",
"make": {
"task": "trt/${process_id}/${product_id}",
"queue": "trt/${process_id}/todo-queue",
"set": "trt/${process_id}/todo-set"
}
},
{
"name": "TRT propagation to children : switch queue",
"on": "^trt/(?P<process_id>\\d+)/(?P<product_id>\\w{16})$",
"make": {
"queue": "global/to-propagate"
}
}
]
},
{
"input_queue": "global/to-propagate",
"rules": [
{
"name": "TRT propagation to children : make child tasks",
"on": "^trt/(?P<process_id>\\d+)/(?P<product_id>\\w{16})$",
"fetch": [{
"url": "http://my-web-service/products/${product_id}/direct-children",
"returns": "child"
}],
"make": {
"task": "trt/${child.processId}/${child.productId}",
"queue": "trt/${child.processId}/todo-queue",
"set": "trt/${child.processId}/todo-set"
}
}
]
}
]
}
这样就没有远程服务可以减慢全局队列管理。
你可能已经注意到配置比预期的要轻。这是因为一些设置是可选的。
当省略时,taken_queue
只是input_queue
加上/taken
。所以这里第二个监视器将使用临时队列global/to-propagate/taken
。
当省略make/task
时,生成的任务与输入任务相同的字符串。更确切地说,make/task
的默认值是"${input_task}"
,其中${input_task}
是一个你可以用在你的任务/队列/集合生成中的变量。
许可证
MIT
依赖项
~22–33MB
~570K SLoC