#redis-server #redis-queue #packages #clients #transfer #different #order

app redismultiplexer

这是一个在Redis服务器之间(推送/弹出)队列之间传输数据的程序

8个稳定版本

1.0.7 2022年8月24日
1.0.6 2022年8月17日
1.0.4 2022年7月6日
1.0.0 2022年7月5日

#511 in 数据库接口

Apache-2.0GPL-3.0 许可协议

2.5MB
1.5K SLoC

包含(debian包,780KB)debian/redismultiplexer_1.0.7_amd64.deb,(debian包,780KB)debian/redismultiplexer_1.0.6_amd64.deb,(debian包,780KB)redismultiplexer_1.0.6_amd64.deb

RedisMultiplexer

这是一个在Redis服务器之间(推送/弹出)队列之间传输数据的程序

描述

RedisMultiplexer 将负责您的Redis队列。目的是将包从源队列移动到目标队列,这些队列可能位于同一服务器和端口,也可能不位于同一服务器和端口。

您可以配置尽可能多的“客户端”,系统以两种模式(复制模式和分发模式)工作

  • 复制模式将从源服务器复制传入的包到所有客户端或目的地
  • 分发模式将一次将一个包发送到每个目标,使用轮询目标选择,RedisMultiplexer 将通过检查目标队列的大小来避免过载,您可以使用配置中的 *limit 选项控制所有这些,您还可以使用 filter* 选项过滤要发送到何处的数据,最后,您可以使用 ordering* 选项重新排列传入队列

配置

每个节点或Redis的连接将共享相同的 通用配置 和可选的 过滤器

配置有两个定义区域,第一个是源服务器,它将在YAML配置的根处拥有所有字段,而目标或目的服务器将位于 clients 条目下

YAML配置的根 属于源服务器,包含

  • 通用配置:以下解释
  • 可选的 过滤器:以下解释
  • 可选的 排序:以下解释
  • pid:执行RedisMultiplexer的pid文件
  • status:状态文件将包含程序运行时的统计信息的JSON字符串
  • children:为处理而启动的线程或工作者的总数(通常2个就足够了)
  • mode:有两种工作模式:复制模式和分发模式,前面已解释
  • clients:将包含所需数量的目标服务器

在 YAML 配置的 clients 条目内部

  • 通用配置:以下解释
  • 可选的 Limits:以下将进行说明
  • 可选的 过滤器:以下解释

通用配置

每个配置都将包含

  • name:当 RedisMultiplexer 需要在屏幕上显示某些消息时将使用
  • hostname:Redis 服务器的主机名
  • port:Redis 服务器的端口号
  • password:Redis 服务器的密码
  • channel:要处理的队列名称

过滤器是可选的

  • filter:这是一个将匹配包头的正则表达式
  • filter_until:包头将与这个子字符串达到的长度一样长(将使用 filter_until 和 filter_limit 之间的最小值)
  • filter_limit:包头将与这些总字节数达到的长度一样长(将使用 filter_until 和 filter_limit 之间的最小值)
  • filter_replace:如果定义了此过滤器选项,正则表达式将被替换为这个字符串(可能包含 Regex 的 $X 组)

例如

  • 对于传入的字符串 "Hello world"
  • 我们希望过滤器在 11 个字节或 "r" 之后停止分析
  • 我们希望过滤 "ell"
  • 我们希望被过滤的字符串被替换为 ELL,因此结果将是 "HELLo world"
  • 我们将使用以下配置
filter: "ell"
filter_until: "r"
filter_limit: 11
filter_replace: "ELL"

限制是可选的

  • timelimit:每 n 秒检查一次队列的大小
  • checklimit:每 n 个包检查一次队列的大小
  • hardlimit:在队列释放并达到 softlimit 之前,软件将停止向目的地发送包
  • softlimit:检测到 hardlimit 并队列释放后,软件将继续向目的地发送包,直到低于 softlimit
  • deleteblock:当达到 hardlimit 时,软件将从队列中删除 n 个最旧的包,直到队列的大小低于 hardlimit

排序是可选的

当多个服务器将它们的信息倒入同一个队列时,由于实时处理的随机性,包可能会出现顺序混乱。让我们假设你需要通过机器学习系统处理这些数据,由于它是无序的,ML 可能会学习其未来行动。如果队列在处理之前被排序,就可以很容易地避免这个问题。

此功能负责在处理之前对传入的队列进行排序,它保持几秒钟的定时缓冲区,并使用前缀和后缀查找可以解析用于与其他包比较的字符串。RedisMultiplexer 将从包中提取前 ordering_limit 字节,并查找匹配名为 "ts" 的组的 ordering 正则表达式,作为排序包列表的时戳。提取的 ts 字符串将被解析为 u128

  • ordering:用于解析时间子串的正则表达式是什么(你的 Regex 表达式中将需要匹配与组 'ts' 相关的正则表达式。示例
ordering: '.*"ts": *(?P<ts>\d+),.*#'
  • ordering_buffer_time:新包必须停留在缓冲区中的秒数,以便进行排序
  • ordering_limit:在提取子串期间处理的字节数

例如

  • 对于包: '{"a": "abc", "ts": 12345678, "b": 88}'
  • 其中 "ts" 是持有时间字符串的键,例如
ordering: '.*"ts": *(?P<ts>\d+),.*#'
ordering_buffer_time: 30
ordering_limit: 200

这一切是如何工作的

示例 1:在服务器之间转发包

RedisMultiplexer可以作为服务器之间的转发器。这在服务器间传输数据包时特别有用,可以避免源服务器过载。在连接的源端设置RedisMultiplexer是一个非常好的主意,这样如果存在网络延迟或中断,系统将在源端丢弃数据包,从而避免源队列过载引起的问题。

示例 2:在多个服务器之间平衡队列

RedisMultiplexer可以用作负载均衡器。使用“分发器”模式,您可以将数据包放入源队列,RedisMultiplexer将所有数据包分发到所有目标服务器或客户端。当某些服务器过载时,RedisMultiplexer会暂时避免,直到其队列释放。

示例 3:将队列复制到多个服务器

RedisMultiplexer可以用作复制系统。使用“复制器”模式,您可以将数据包放入源队列,RedisMultiplexer将它的副本发送到所有目标服务器或客户端。当某些服务器过载时,RedisMultiplexer会暂时避免,直到其队列释放。

示例 4:网络中断的保留系统

RedisMultiplexer的一个有趣用途是作为网络延迟和网络中断的保留系统。当它用于连接的源端时,这非常好,因为它会在网络问题或远程连接的另一侧队列过载时保持本地队列为空。

示例 5:多个RedisMultiplexer服务

在同一个服务器上设置多个RedisMultiplexer是一个常见的用法。假设您想要在远程服务器上教授多个机器学习系统,并且每个系统将使用特定的数据包进行教学。

最可能的方法是在复制器模式下设置RedisMultiplexer,以便以相同的方式分配给所有客户端,并在多个队列(如不同的机器学习系统数量)中过滤数据。每个客户端的过滤系统将帮助您决定将哪些数据发送到哪个队列。您还可以使用排序系统来保持传入数据的顺序。

现在让我们想象其中一个队列将用于用不同的设置教授相同的机器学习,因此您将使用复制器模式的RedisMultiplexer向两个机器学习系统发送数据。

但是,让我们更进一步,您需要一个测试数据和一组学习数据,以便学习系统不会从测试数据中学习,并且我们可以使用测试数据来进行预测,以测试机器学习系统的学习效果。您可以按百分比分割数据,比如10%测试/90%学习。您可以使用分发器模式的RedisMultiplexer向10个不同的客户端分发,其中1个客户端是测试队列,而其他9个客户端是相同的训练队列。您还可以使用另一个RedisMultiplexer作为转发器来重新排序数据。

您刚刚创建的每个队列都在同一个服务器上,因此您将使用每个队列的RedisMultiplexer将数据从该服务器发送到另一个远程服务器,这样您就可以使用RedisMultiplexer作为保留系统,以防出现网络问题。

完整示例配置

name        : "Source"
hostname    : "127.0.0.1"
port        : 6379
password    : "abcdefghijklmnopqrstuvwxyz"
channel     : "SourceQueue"
children    : 2
mode        : "replicant"               # choose between: "replicant" and "spreader"
pid         : "config.pid"              # optional
status      : "config.stat"             # optional
filter      : "ell"                     # optional
filter_until: "r"                       # optional
filter_limit: 11                        # optional
filter_replace: "ELL"                   # optional
ordering: '.*"ts": *(?P<ts>\d+),.*#'    # optional
ordering_buffer_time: 30                # optional
ordering_limit: 200                     # optional

clients:
  - name        : "Target 1"
    hostname    : "127.0.0.1"
    port        : 6379
    password    : "abcdefghijklmnopqrstuvwxyz"
    channel     : "TargetQueue1"
    timelimit   : 5                     # optional
    checklimit  : 100                   # optional
    softlimit   : 400                   # optional
    hardlimit   : 410                   # optional
    deleteblock : 100                   # optional
    filter      : "^(1|3|5|7|9)#"       # optional
    filter_until: "#"                   # optional
    filter_limit: 100                   # optional
    filter_replace: ""                  # optional
  - name        : "DB2"
    hostname    : "127.0.0.1"
    port        : 6379
    password    : "abcdefghijklmnopqrstuvwxyz"
    channel     : "TargetQueue2"
    timelimit   : 5                     # optional
    checklimit  : 100                   # optional
    softlimit   : 400                   # optional
    hardlimit   : 410                   # optional
    filter      : "^(0|2|4|6|8)#"       # optional
    filter_until: "#"                   # optional
    filter_limit: 100                   # optional
    filter_replace: ""                  # optional

依赖关系

~10-21MB
~349K SLoC