4个版本 (重大更新)
0.5.0 | 2024年4月24日 |
---|---|
0.3.0 | 2023年7月11日 |
0.2.0 | 2023年3月25日 |
0.1.0 | 2023年3月2日 |
#736 in 并发
120KB
428 行
SeaStreamer 示例
这个crate作为SeaStreamer的演示,同时也可以作为您开发流处理器的起点,展示了各种流处理技术。
这个crate适用于tokio
和async-std
,可以将流发送到kafka
和stdio
。
-
consumer
:一个基本的消费者 -
producer
:一个基本的生产者 -
processor
:一个基本的流处理器 -
resumable
:一个可恢复的流处理器,从上次停止的地方继续 -
buffered
:一个具有内部缓冲和批处理的先进流处理器 -
blocking
:用于处理阻塞/计算密集型任务的先进流处理器 -
price-feed
:一个WebSocket到Redis / Kafka流生产者 -
sea-orm-sink
:一个Redis / Kafka到SQLite数据接收器
运行基本处理器示例
与Kafka一起
# Produce some input
cargo run --bin producer -- --stream kafka://127.0.0.1:9092/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input kafka://127.0.0.1:9092/hello1 --output kafka://127.0.0.1:9092/hello2 &
# Replay the output
cargo run --bin consumer -- --stream kafka://127.0.0.1:9092/hello2
# Remember to stop the processes
kill %1 %2
与Redis一起
# Produce some input
cargo run --bin producer -- --stream redis://127.0.0.1:6379/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input redis://127.0.0.1:6379/hello1 --output redis://127.0.0.1:6379/hello2 &
# Replay the output
cargo run --bin consumer -- --stream redis://127.0.0.1:6379/hello2
# Remember to stop the processes
kill %1 %2
与文件一起
# Create the file
file=/tmp/sea-streamer-$(date +%s)
touch $file && echo "File created at $file"
# Produce some input
cargo run --bin producer -- --stream file://$file/hello &
# Replay the input
cargo run --bin consumer -- --stream file://$file/hello
# Start the processor, producing some output
cargo run --bin processor -- --input file://$file/hello --output stdio:///hello
与Stdio一起
# Pipe the producer to the processor
cargo run --bin producer -- --stream stdio:///hello1 | \
cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2
运行可恢复处理器示例
可恢复处理器可以随时被杀死,并从上次停止的地方继续。这通常被称为“至少一次”处理,意味着不应该跳过任何消息,但同一个消息可能会被处理两次。
STREAMER_URI="kafka://127.0.0.1:9092"
STREAMER_URI="redis://127.0.0.1:6379"
# Produce lots of input
cargo run --bin producer -- --stream $STREAMER_URI/hello1
# Run the processor, but kill it before it can process the entire stream
cargo run --bin resumable -- --input $STREAMER_URI/hello1 --output stdio:///hello2 | head -n 10
cargo run --bin resumable -- --input $STREAMER_URI/hello1 --output stdio:///hello2 | head -n 10
cargo run --bin resumable -- --input $STREAMER_URI/hello1 --output stdio:///hello2 | head -n 10
输出
[2023-02-28T10:13:59 | hello2 | 0] "tick 0" processed
[2023-02-28T10:13:59 | hello2 | 1] "tick 1" processed
[2023-02-28T10:13:59 | hello2 | 2] "tick 2" processed
...
[2023-02-28T10:13:59 | hello2 | 9] "tick 9" processed
thread 'sea-streamer-stdio-stdout' panicked at 'failed printing to stdout: Broken pipe (os error 32)', library/std/src/io/stdio.rs:1009:9
[2023-02-28T10:14:08 | hello2 | 0] "tick 10" processed
...
[2023-02-28T10:14:08 | hello2 | 9] "tick 19" processed
thread 'sea-streamer-stdio-stdout' panicked at 'failed printing to stdout: Broken pipe (os error 32)', library/std/src/io/stdio.rs:1009:9
...
运行缓冲处理器示例
时钟速度比处理器快10倍,因此我们预计每个批次包含大约10条消息。
这种模式在输入流频率高,但处理器阻抗高时很有用。
alias clock='cargo run --package sea-streamer-stdio --features=executables --bin clock'
clock -- --stream clock --interval 100ms | \
cargo run --bin buffered -- --input stdio:///clock --output stdio:///output
输出
[2023-02-27T10:43:58 | output | 0] [batch 0] { "tick": 0 } processed
[2023-02-27T10:43:59 | output | 1] [batch 1] { "tick": 1 } processed
[2023-02-27T10:43:59 | output | 2] [batch 1] { "tick": 2 } processed
[2023-02-27T10:43:59 | output | 3] [batch 1] { "tick": 3 } processed
[2023-02-27T10:43:59 | output | 4] [batch 1] { "tick": 4 } processed
[2023-02-27T10:43:59 | output | 5] [batch 1] { "tick": 5 } processed
[2023-02-27T10:43:59 | output | 6] [batch 1] { "tick": 6 } processed
[2023-02-27T10:43:59 | output | 7] [batch 1] { "tick": 7 } processed
[2023-02-27T10:43:59 | output | 8] [batch 1] { "tick": 8 } processed
...
更深入的技术讨论
例如,要将记录插入数据库,批量插入效率更高。但您不能天真地将批量大小固定为10或100,因为它可能已经缓冲了9条消息,正在等待第10条,您无法处理消息的突然激增。
那么,如何最小化整体任务执行时间呢?您将两个繁忙的循环解耦,并使用队列作为流体耦合装置——这是我能想到的最佳机械类比:现在两个循环都可以在其最佳频率下旋转,最大化处理器的整体吞吐量。
运行阻塞处理器示例
时钟比处理器快3倍,但我们有4个线程,因此我们预计它能够在实时中赶上。任务随机分配给线程,也就是所谓的“扇出”模式。
当您必须执行阻塞I/O或CPU密集型计算时,这种模式很有用。
alias clock='cargo run --package sea-streamer-stdio --features=executables --bin clock'
clock -- --stream clock --interval 333ms | \
cargo run --bin blocking -- --input stdio:///clock --output stdio:///output
输出
[2023-03-07T06:00:52 | output | 0] [thread 0] { "tick": 0 } processed
[2023-03-07T06:00:53 | output | 1] [thread 1] { "tick": 1 } processed
[2023-03-07T06:00:53 | output | 2] [thread 2] { "tick": 2 } processed
[2023-03-07T06:00:53 | output | 3] [thread 3] { "tick": 3 } processed
[2023-03-07T06:00:54 | output | 4] [thread 0] { "tick": 4 } processed
[2023-03-07T06:00:54 | output | 5] [thread 1] { "tick": 5 } processed
[2023-03-07T06:00:54 | output | 6] [thread 2] { "tick": 6 } processed
依赖项
~24–38MB
~572K SLoC