#unix #pipe #unix-command #branch #connection #join #graph

bin+lib pipeawesome2

循环、分支和连接到 UNIX 管道……以一种合理的方式……

1 个不稳定版本

0.1.3 2022年10月14日

#463Unix API

GPL-2.0 许可证

270KB
5K SLoC

Pipeawesome 2

Rust

目录

正如我妈妈说的……指责地说……“你到底做了什么?!?!?!”

我给 UNIX 管道添加了循环、分支和连接。

你为什么这样做?

我觉得 UNIX 管道被低估了,应该更经常被考虑。

那你为什么喜欢 UNIX 管道?

UNIX 管道很棒,当你用它们编写软件时,它们具有

  • 高性能。
  • 回压。
  • 在单个 UNIX 进程级别上非常容易推理(它只是 STDIN 和 STDOUT/STDERR)。
  • 与“正确”的解决方案相比,非常轻量级且无基础设施。
  • 需要时易于与“正确”的解决方案集成。

那么这个项目添加了什么?

给定一个如下的 UNIX 管道 cat myfile | awk 'PAT { do something }' | grep '^good' | awk '$1='good' { print /dev/stdout }' | sed 's/good/perfect' | sort 这样既强大又美妙。

这可以可视化如下

然而,你可能想要

这可以通过像 mkfifo 这样的工具来完成,具体取决于你的技能水平,但肯定不会像前面显示的简单 UNIX 命令那样容易被人理解。

如果你对能够轻松组合更复杂的管道感兴趣,请继续阅读!

一个示例项目

我决定制作一个井字棋游戏来展示这个项目的功能。

这个游戏将具有

  • 一种可以将之转换为具有填充方格的人类可识别的井字棋网格的数据格式。
  • 两个计算机玩家
  • 他们会轮流选择一个空格,并用他们的“X”或“O”来填充。
  • 一个裁判来决定游戏何时胜利或平局。

数据格式

我首先决定了一个如下所示的数据格式

状态::方格_1 :方格_2 :方格_3 :方格_4 :方格_5 :方格_6 :方格_7 :方格_8 :方格_9

在这个状态中,可以是代表玩家轮次的XO,或者用其他东西表示游戏平局/胜利。

绘制网格

接下来,我编写了一些代码来显示网格。我大部分使用GNU AWK来编写,因为这是我偶尔学习的东西,而且(非常)简单的STDIN | STDOUT编码似乎非常适合这种语言。

我编写了以下代码

function get_player_text(player_O_or_X) {
    extra = ""
    if (player_O_or_X == "D") {
        return "DRAW!";
    }
    if (player_O_or_X ~ "^W") {
        extra = "WON!"
        player_O_or_X = substr(player_O_or_X, 2)
    }
    return "Player " player_O_or_X " " extra
}
function draw_turn_line(pos_1, pos_2, pos_3) {
    return sprintf(" %-1s | %-1s | %-1s ", pos_1, pos_2, pos_3)
}
function draw_line_line() {
    return "---+---+---"
}
BEGIN { GAME_NUMBER = 0 }
{
    print get_player_text($1) # " (" $0 ")"
    print ""
    print draw_turn_line($2, $3, $4)
    print draw_line_line()
    print draw_turn_line($5, $6, $7)
    print draw_line_line()
    print draw_turn_line($8, $9, $10)
    print ""
    fflush()
}

你可以用以下命令执行此代码:echo O:::X::O::::X' | gawk -F ':' -f ./examples/tic-tac-toe/draw.awk,然后它会绘制以下网格

Player O 

   |   | X 
---+---+---
   | O |   
---+---+---
   |   | X 

然后我编写了一个Pipeawesome配置文件,它包装了这个代码

connection:
  initial: "faucet:input | launch:draw | drain:output"
drain:
  output:
    destination: '-'
faucet:
  input:
    source: '-'
launch:
  draw:
    cmd: "awk"
    arg:
      - '-F'
      - ':'
      - '-f'
      - 'examples/tic-tac-toe/draw.awk'

这可以表示为

注意:我通过运行以下命令来让Pipeawesome绘制此图表:./target/debug/pa2 graph --config examples/tic-tac-toe/draw.pa.yaml --diagram-only

注意:使用以下命令将生成图例:./target/debug/pa2 graph --config examples/tic-tac-toe/draw.pa.yaml --legend-only,这对于所有图表都是常见的,并在附录中展示。

在Pipeawesome中,有管道连接不同类型的组件。这里的组件类型是faucetlaunchdrain,这些组件的名称分别是inputdrawoutput。这些名称只是名称,但根据组件类型,它们可能需要在配置文件的其他地方进行引用。

你可以使用以下命令执行此Pipeawesome配置文件:echo 'O:::X::O::::X' | ./target/debug/pa2 process --config examples/tic-tac-toe/draw.pa.yaml

这当然是一个微不足道且没有意义的示例,因为你可以直接运行awk,但这让我能够以最小的复杂性向您展示Pipeawesome文件格式。

让我们将其分解为其组成部分

连接/连接集

connection:
  initial: "faucet:input | launch:draw | drain:output"

连接集解释了如何将组件连接在一起。可以有多个连接集,但这里只有一个。

有关更多信息,请参阅管道变体、输出类型和输入优先级。

水龙头

faucet:
  input:
    source: '-'

水龙头是向Pipeawesome从外部世界获取数据的主要方式,这里的配置是为名为input的一个配置。

有关更多信息,请参阅组件:水龙头

注意:启动也可以生成初始数据,在这种情况下不需要水龙头。

启动

launch:
  draw:
    cmd: "awk"
    arg:
      - '-F'
      - ':'
      - '-f'
      - 'examples/tic-tac-toe/draw.awk'

这控制着程序的执行方式。

有关更多信息,请参阅组件:启动

排水

drain:
  output:
    destination: '-'

这是数据如何从Pipeawesome中退出。输出可以发送到STDOUT、STDERR或文件。

有关更多信息,请参阅组件:排水

注意:如果写入队列解决方案(如RabbitMQ或AWS SQS),可以使用启动代替。

进行游戏

让我们让玩家进行回合。

以下配置包含新代码,但配置中添加的概念你之前已经见过

connection:
  initial_word: "f:input | l:player | l:referee | l:draw | d:output"
drain:
  output: { destination: '-' }
faucet:
  input: { source: '-' }
launch:
  player:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=O', '-f', 'examples/tic-tac-toe/player.awk' ]
  referee:
    cmd: "gawk"
    arg: ['-F', ':', '-f', './examples/tic-tac-toe/referee.awk', 'NF=10', 'OFS=:']
  draw:
    cmd: "gawk"
    arg: [ '-F', ':', '-f', 'examples/tic-tac-toe/draw.awk' ]

这可以表示为

可以使用以下命令执行:echo 'O':::X::O::::X' | ./target/debug/pa2 process --config examples/tic-tac-toe/have_a_go.pa.yaml

此输出的结果与之前相同,但网格中多了一个额外的O

Player O 

   |   | X 
---+---+---
 O | O |   
---+---+---
   |   | X 

注意:输出中的O比输入中多一个,这是由player.awk添加的。

随机选择一个玩家开始游戏

生成随机玩家的代码

我想出了以下单行Bash片段来选择随机第一位玩家:echo $((RANDOM % 2))::::::::: | sed "s/1/X/" | sed "s/0/O/"

完整的配置现在看起来是这样的

connection:
  player_o_branch: "l:random_player | l:player_o_filter | l:player_o | l:referee"
  player_x_branch: "l:random_player | l:player_x_filter | l:player_x | l:referee"
  last_draw: "l:referee | l:draw | d:output"
drain:
  output: { destination: '-' }
launch:
  random_player:
    cmd: "bash"
    arg: [ '-c', 'echo $((RANDOM % 2))::::::::: | sed "s/1/X/" | sed "s/0/O/"' ]
  player_o_filter: { cmd: "grep", arg: [ "--line-buffered", "^O" ] }
  player_o:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=O', '-f', 'examples/tic-tac-toe/player.awk' ]
  player_x_filter: { cmd: "grep", arg: [ "--line-buffered", "^X" ] }
  player_x:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=X', '-f', 'examples/tic-tac-toe/player.awk' ]
  referee:
    cmd: "gawk"
    arg: ['-F', ':', '-f', './examples/tic-tac-toe/referee.awk', 'NF=10', 'OFS=:']
  draw:
    cmd: "gawk"
    arg: [ '-F', ':', '-f', 'examples/tic-tac-toe/draw.awk' ]

由Pipeawesome绘制的图表现在变得更加有趣

变化如下

水龙头配置已被完全删除(不需要),在这种情况下,初始消息来自l:random_player

尽管l:random_player出现在两个连接中,但它只运行一次,其输出被发送到两个

最大的变化是现在在connection:中有多个键/连接集/线路。你可能注意到l:random_player在连接集player_o_branchplayer_x_branch中出现了两次。这并不意味着它运行了两次,因为connection:部分只是描述了事物之间是如何连接的。

类似的事情也发生在l:referee上,尽管它实际上在connection:中有三个引用。在这个配置中,它既可以从l:player_o读取,也可以从l:player_x读取,并且它的输出发送到l:draw

注意:重要的是要知道,l:player_o_filterl:player_x_filter都接收由l:random player生成的线路。只是其中一个总是将其过滤掉。

运行此代码将得到一个网格,其中OX可以出现在网格的任何位置

Player O 

   |   |   
---+---+---
 O |   |   
---+---+---
   |   |   

完整的游戏

为了创建完整的游戏,还需要发生两件事

  1. 多回合 - 要完成一场游戏,我们必须有多个回合发生。
  2. 交替玩家 - 进行下一回合的玩家必须与前一个回合不同。

多回合

这实际上非常简单,我们只需要采取我们之前的配置,将launch:referee的输出添加到l:player_o_filterl:player_x_filter中,创建一个循环。现在的配置看起来像这样

通过引入一个新组件,一个位于l:random_playerl:player_o_filter / l:player_x_filter之间的Junction,这个情况将更容易解释。

Junction是一个优先级的多对多连接器。任何进入任何一个输入的东西都将发送到所有输出。

有关更多信息,请参阅组件:Junction

添加了Junction和相应的更改后,完整的配置如下

connection:
  random_selection: "l:random_player | j:turn"
  player_o_branch: "j:turn | l:player_o_filter | l:player_o | l:referee"
  player_x_branch: "j:turn | l:player_x_filter | l:player_x | l:referee"
  last_draw: "l:referee | j:loop | l:draw | d:output"
  looper: "j:loop | j:turn"
drain:
  output: { destination: '-' }
launch:
  random_player:
    cmd: "bash"
    arg:
      - '-c'
      - 'echo $((RANDOM % 2))::::::::: | sed "s/1/X/" | sed "s/0/O/"'
  player_o_filter: { cmd: "grep", arg: [ "--line-buffered", "^O" ] }
  player_o:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=O', '-f', 'examples/tic-tac-toe/player.awk' ]
  player_x_filter: { cmd: "grep", arg: [ "--line-buffered", "^X" ] }
  player_x:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=X', '-f', 'examples/tic-tac-toe/player.awk' ]
  referee:
    cmd: "gawk"
    arg: ['-F', ':', '-f', './examples/tic-tac-toe/referee.awk', 'NF=10', 'OFS=:']
  referee:
    cmd: "gawk"
    arg: ['-F', ':', '-f', './examples/tic-tac-toe/referee.awk', 'NF=10', 'OFS=:']
  draw:
    cmd: "gawk"
    arg: [ '-F', ':', '-f', 'examples/tic-tac-toe/draw.awk' ]

这可以表示为

注意:此图与额外的junction:loop和连接到(连接集looper)的线完全相同。

然而,这种配置的游戏并不激动人心,因为只有一个玩家有机会!

Player X 

   |   |   
---+---+---
 X |   |   
---+---+---
   |   |   

Player X 

   |   | X 
---+---+---
 X |   |   
---+---+---
   |   |   

Player X 

   |   | X 
---+---+---
 X |   | X 
---+---+---
   |   |   

Player X 

   |   | X 
---+---+---
 X |   | X 
---+---+---
   | X |   

Player X WON!

   |   | X 
---+---+---
 X |   | X 
---+---+---
   | X | X 

交替玩家

为了使进行回合的玩家交替,我们只需要在“X”和“O”之间交换junction:loopjunction:turn之间的第一个字符。以下配置中的组件称为turn_swapper

connection:
  random_selection: "l:random_player | j:turn"
  player_o_branch: "j:turn | l:player_o_filter | l:player_o | l:referee"
  player_x_branch: "j:turn | l:player_x_filter | l:player_x | l:referee"
  last_draw: "l:referee | j:loop | l:draw | d:output"
  looper: "j:loop | l:turn_swapper | j:turn"
drain:
  output: { destination: '-' }
launch:
  random_player:
    cmd: "bash"
    arg:
      - '-c'
      - 'echo $((RANDOM % 2))::::::::: | sed "s/1/X/" | sed "s/0/O/"'
  player_o_filter: { cmd: "grep", arg: [ "--line-buffered", "^O" ] }
  player_o:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=O', '-f', 'examples/tic-tac-toe/player.awk' ]
  player_x_filter: { cmd: "grep", arg: [ "--line-buffered", "^X" ] }
  player_x:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=X', '-f', 'examples/tic-tac-toe/player.awk' ]
  referee:
    cmd: "gawk"
    arg: ['-F', ':', '-f', './examples/tic-tac-toe/referee.awk', 'NF=10', 'OFS=:']
  draw:
    cmd: "gawk"
    arg: [ '-F', ':', '-f', 'examples/tic-tac-toe/draw.awk' ]
  turn_swapper:
    cmd: "sed"
    arg:
      - "--unbuffered"
      - |
        s/^O/9/
        s/^X/O/
        s/^9/X/

这可以表示为

最终结果是(某种程度上)看起来很逼真的井字棋游戏,玩家轮流进行,有人获胜(或者游戏以平局结束)

Player X 

   |   |   
---+---+---
 X |   |   
---+---+---
   |   |   

Player O 

   |   |   
---+---+---
 X | O |   
---+---+---
   |   |   

Player X 

   |   | X 
---+---+---
 X | O |   
---+---+---
   |   |   

Player O 

   | O | X 
---+---+---
 X | O |   
---+---+---
   |   |   

Player X 

   | O | X 
---+---+---
 X | O |   
---+---+---
 X |   |   

Player O WON!

   | O | X 
---+---+---
 X | O |   
---+---+---
 X | O |   

组件类型

组件类型可以是

  • 水龙头:当它来自外部时,是输入的来源。
  • 启动:一个可以处理数据的运行程序。
  • 排水口:此处写入的数据存在于Pipeawesome。
  • 汇合点:一个多对多的连接器,可以管理传入数据优先级。
  • 缓冲区/调节器:存储无限数量的消息/调节消息数量

注意:本节中有图表,图例在附录中的组件图例中展示

组件:水龙头

faucet:
  tap:
    input: "-",

水龙头是向Pipeawesome中获取数据的主要方式。水龙头有一个名为 source 的属性,可以是“-”用于STDIN,或者是一个要读取的文件名。

组件:启动

launch:
  draw:
    cmd: "awk"
    env:
      AWKLIBPATH: "./lib"
    path: "/home/forbesmyester/project/awesome"
    arg:
      - '-F'
      - ':'
      - '-f'
      - 'examples/tic-tac-toe/draw.awk'

这控制着程序是如何执行的。

以下是可以配置的

  • cmd:要运行的命令
  • path:运行的位置
  • env:运行时使用的环境变量
  • arg:传递给命令的参数

组件:排水口

drain:
  output:
    destination: '-'

这是从Pipeawesome中获取数据的正常方式。输出可以发送到"-"(标准输出),"_"(标准错误)或文件,这可以通过使用其他值来指定。

组件:汇合点

一个 汇合点 是一个多对多的连接器。任何进入其输入端的数据都会被发送到所有输出端。

注意:消息被认为是Windows或UNIX行结束符分隔的。这很容易配置。

没有为 汇合点 进行配置,但它是有理由尊重输入优先级的唯一组件。

组件:缓冲区 & 调节器

在井字棋示例中,我没有使用 缓冲区调节器

回顾一下在多次回合之后的一些井字棋配置,你会发现消息通常在非常接近流程末尾的地方从非常接近流程开始的地方发送。

如果有有限数量的消息进入系统,例如一次井字棋游戏,那么这不是问题。然而,如果越来越多的消息被添加到系统中,并且所有旧消息仍在被处理,那么可能会出现问题。这种情况是循环特有的,通常通过反压来处理。

组件之间的连接器具有有限的容量(它们是异步Rust通道的版本),这是我们反压的工作方式,但它确实在包含循环的配置中留下了一种死锁的可能。

以下我描述了两个可以用来控制这种情况的组件

  • 缓冲区 是一个具有无限消息容量的连接器(它是一个无界Rust通道)。
  • 调节器 可以通过观察配置的一组缓冲区中的消息数量来打开和关闭通过它的消息流。

下面的配置是上述井字棋配置的一个版本,但修改为运行10万场游戏

connection:
  random_selection: "l:random_player | regulator:regulate_flow | j:turn"
  player_o_branch: "j:turn | l:player_o_filter | l:player_o | l:referee"
  player_x_branch: "j:turn | l:player_x_filter | l:player_x | l:referee"
  last_draw: "l:referee | j:loop | l:only_finishes | l:draw | d:output"
  looper: "j:loop | l:turn_swapper | buffer:reprocess | j:turn"
drain:
  output: { destination: '-' }
regulator:
  reg:
    buffered: [10, 100]
    monitored_buffers: [ "reprocess" ]
launch:
  random_player:
    cmd: "bash"
    arg:
      - '-c'
      - 'for i in {1..100000}; do echo $((RANDOM % 2))::::::::: | sed "s/1/X/" | sed "s/0/O/"; done '
  player_o_filter: { cmd: "grep", arg: [ "--line-buffered", "^O" ] }
  player_o:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=O', '-f', 'examples/tic-tac-toe/player.awk' ]
  player_x_filter: { cmd: "grep", arg: [ "--line-buffered", "^X" ] }
  player_x:
    cmd: "gawk"
    arg: [ '-F', ':', '-v', 'PLAYER=X', '-f', 'examples/tic-tac-toe/player.awk' ]
  referee:
    cmd: "gawk"
    arg: ['-F', ':', '-v', 'DESIRED_GAME_COUNT=100000', '-f', './examples/tic-tac-toe/referee.awk', 'NF=10', 'OFS=:']
  draw:
    cmd: "gawk"
    arg: [ '-F', ':', '-f', 'examples/tic-tac-toe/draw.awk' ]
  only_finishes:
    cmd: "grep"
    arg:
      - "--line-buffered"
      - "^[DW]"
  turn_swapper:
    cmd: "sed"
    arg:
      - "--unbuffered"
      - |
        s/^O/9/
        s/^X/O/
        s/^9/X/

这可以表示为

在这种情况下,代码 l:random_player 不是只创建1条消息,而是创建了100,000条,但每一条都会在游戏结束时循环回来,所以我们会有大量的消息。在这个配置下,当 buffer:reprocess 中的消息超过100条时,regulator:regulate_flow 将停止接受消息,但当地板中的消息量低于10条时,它将恢复。使用这两个组件可以解决上述问题。

管道变体,输出类型和输入优先级。

管道变体

考虑在程序之间传递数据时,当接收程序死亡或关闭输入时,我们会遇到发送程序输出数据,但没有地方可以发送。在这种情况下,我认为我们有三种选择

  1. 终止(T):终止Pipeawesome。
  2. 完成(F):关闭管道 - 让发送程序自己处理问题(这可能会引起级联效应)。
  3. 消耗(C):Pipeawesome将通过消耗数据本身来保持管道开启(但丢弃数据)。

您可以使用以下方式指定管道变体

  • l:sender |T| l:reciever - 终止。
  • l:sender |F| l:reciever - 完成。
  • l:sender |C| l:reciever - 消耗。

正常的、单个管道符号(l:sender | l:reciever)仅仅是快速书写 "|T|" 的方法。

输出类型

运行中的程序可能不会将其所有输出都输出到STDOUT,它还可以将数据发送到STDERR。

Pipeawesome 允许您捕获程序输出到 STDOUT 和 STDERR 的情况,以及程序完成时的退出码。这是通过在连接集中的组件之后使用 [O][E][X] 来实现的,例如

connection:
  ls_stdout: "l:ls[O] | l:awk_stdout | j:out"
  ls_stderr: "l:ls[E] | l:awk_stderr | j:out"
  ls_exit: "l:ls[X] | l:awk_exit | j:out"
  out: "j:out | d:out"
drain:
  out: { destination: '-' }
launch:
  awk_stdout:
    cmd: awk
    arg: ['{ print "STDOUT: " $0 }']
  awk_stderr:
    cmd: awk
    arg: ['{ print "STDERR: " $0 }']
  awk_exit:
    cmd: awk
    arg: ['{ print "EXIT: " $0 }']
  ls:
    cmd: ls
    arg:
    - "."
    - "i_should_not_exist"

注意:实际上,UNIX 程序也可以读取和写入 /dev/fdN,其中 N 可以是 0(STDIN)、1(STDOUT)、2(STDERR)或其他 N 的值。目前不支持直接支持这些 N 的其他值。

输入优先级

除了枢纽之外的所有组件只有一个输入,但枢纽可以有多个输入。为了控制从哪个输入读取,我们可以添加优先级,这些优先级如下指定

connection:
  high_priority: "launch:one_thing | [5]junction:many_to_many"
  low_priority: "launch:something_else | [1]junction:many_to_many"

在这个例子中,5 和 1 是优先级,当未指定优先级时,它们将是 0。优先级也可以是负数。

附录

Pipeawesome 图表图例

您可以通过运行命令 ./target/debug/pa2 graph --config [YOUR_CONFIG_HERE] --legend-only 来绘制图例。输出将是 Graphviz DOT。

组件图例

这是在 组件类型 部分中显示的图表的图例。

通过命令行编辑配置

./target/debug/pa2 config --config examples/ls/pa.yaml launch --id l command ls | ./target/debug/pa2 config --config=- --format=yaml connection --id z join 'l:ls|d:out'

依赖项

约12–25MB
约331K SLoC