6 个版本 (3 个重大变更)
0.5.0 | 2024 年 4 月 24 日 |
---|---|
0.3.2 | 2023 年 9 月 5 日 |
0.3.0 | 2023 年 7 月 11 日 |
0.2.1 | 2023 年 5 月 7 日 |
0.1.0 | 2023 年 3 月 2 日 |
#58 在 并发 中
每月 1,872 次下载
在 3 crates 中使用
270KB
4K SLoC
SeaStreamer 是一个帮助你在 Rust 中构建实时流处理器的工具包。
特性
- 异步
SeaStreamer 提供了异步 API,并支持 tokio
和 async-std
。与其他异步 Rust 库协同使用,你可以构建高度并发的流处理器。
- 泛型
我们提供 Redis & Kafka / Redpanda 的集成,通过泛型 trait 接口,因此你的程序可以不依赖于后端。
- 可测试性
SeaStreamer 还提供了一套通过 Unix 管道处理流的工具,因此可以在不设置集群的情况下进行测试,这对于本地工作非常方便。
- 面向微服务
让我们在 Rust 中构建实时(多线程、无 GC)、自包含(即易于部署)、资源使用低、运行时间长的流处理器!
快速入门
将以下内容添加到你的 Cargo.toml
sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] }
这是一个基本的 流消费者
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::parse();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(SeaStreamReset::Earliest);
let consumer: SeaConsumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;
loop {
let mess: SeaMessage = consumer.next().await?;
println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
}
}
这是一个基本的 流生产者
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::parse();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(stream.stream_key()?, Default::default())
.await?;
for tick in 0..100 {
let message = format!(r#""tick {tick}""#);
eprintln!("{message}");
producer.send(message)?;
tokio::time::sleep(Duration::from_secs(1)).await;
}
producer.end().await?; // flush
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { input, output } = Args::parse();
let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
let consumer: SeaConsumer = streamer
.create_consumer(input.stream_keys(), options)
.await?;
let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(output.stream_key()?, Default::default())
.await?;
loop {
let message: SeaMessage = consumer.next().await?;
let message = process(message).await?;
eprintln!("{message}");
producer.send(message)?; // send is non-blocking
}
}
现在,让我们将它们付诸实践。
使用 Redis / Kafka
STREAMER_URI="redis://127.0.0.1:6379" # or
STREAMER_URI="kafka://127.0.0.1:9092"
# Produce some input
cargo run --bin producer -- --stream $STREAMER_URI/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 &
# Replay the output
cargo run --bin consumer -- --stream $STREAMER_URI/hello2
# Remember to stop the processes
kill %1 %2
使用 Stdio
# Pipe the producer to the processor
cargo run --bin producer -- --stream stdio:///hello1 | \
cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2
架构
sea-streamer
的架构由多个子 Crates 构建
所有 Crates 具有相同的较大版本。因此 0.1
的 sea-streamer
依赖于 0.1
的 sea-streamer-socket
。
sea-streamer-types
: Traits & Types
这个crate定义了SeaStreamer API的所有特性和类型,但不提供任何实现。
sea-streamer-socket
:后端无关的Socket API
类似于SeaORM允许您为不同的数据库构建应用程序,SeaStreamer允许您为不同的流服务器构建流处理器。
虽然sea-streamer-types
crate提供了一种基于特质的抽象,但这个crate提供了一个具体的类型API,这样您的程序就可以在运行时从/向任何SeaStreamer后端进行流式传输。
这允许您做一些很酷的事情,比如在本地生成数据然后将其流式传输到Redis / Kafka。或者相反,从服务器接收数据在本地进行处理。所有这些都不需要重新编译流处理器。
如果您只使用一个后端,可以直接依赖sea-streamer-redis
/ sea-streamer-kafka
。
提供了一些小的cli程序供演示。让我们先设置它们
# The `clock` program generate messages in the form of `{ "tick": N }`
alias clock='cargo run --package sea-streamer-stdio --features=executables --bin clock'
# The `relay` program redirect messages from `input` to `output`
alias relay='cargo run --package sea-streamer-socket --features=executables,backend-kafka,backend-redis --bin relay'
这是如何从Stdio ➡️ Redis / Kafka进行流式传输的。我们使用clock
生成消息,然后将它通过relay
管道传输,然后流式传输到Redis / Kafka
# Stdio -> Redis
clock -- --stream clock --interval 1s | \
relay -- --input stdio:///clock --output redis://127.0.0.1:6379/clock
# Stdio -> Kafka
clock -- --stream clock --interval 1s | \
relay -- --input stdio:///clock --output kafka://127.0.0.1:9092/clock
这是如何在Redis ↔️ Kafka之间进行流式传输的
# Redis -> Kafka
relay -- --input redis://127.0.0.1:6379/clock --output kafka://127.0.0.1:9092/clock
# Kafka -> Redis
relay -- --input kafka://127.0.0.1:9092/clock --output redis://127.0.0.1:6379/clock
这是如何从Kafka / Redis 重放 流的
relay -- --input redis://127.0.0.1:6379/clock --output stdio:///clock --offset start
relay -- --input kafka://127.0.0.1:9092/clock --output stdio:///clock --offset start
sea-streamer-kafka
:Kafka / Redpanda 后端
这是SeaStreamer的Kafka / Redpanda后端实现。这个crate提供了一个完整的类型系统,使得使用Kafka更加容易和安全。
首先,所有API(其中许多是同步的)都正确地包装为异步。方法也被标记为&mut
以消除可能的竞争条件。
KafkaConsumerOptions
具有类型参数。
KafkaConsumer
允许您seek
到某个时间点,rewind
到特定的偏移量,并commit
读取的消息。
KafkaProducer
允许您await
发送Receipt
或如果您不感兴趣则丢弃它。您还可以刷新生产者。
KafkaStreamer
允许您在disconnect
时刷新所有生产者。
请参阅测试以了解流语义的示例。
这个crate依赖于rdkafka
,它反过来依赖于librdkafka-sys,它本身是librdkafka的包装器。
配置参考:https://kafka.apache.org/documentation/#configuration
sea-streamer-redis
:Redis 后端
这是SeaStreamer的Redis后端实现。这个crate在Redis之上提供了一个高级异步API,使得使用Redis Streams变得无忧无虑
- 实现了熟悉的SeaStreamer抽象接口
- 一个全面的类型系统,通过API引导/限制您
- 高级API,因此您不再需要调用
XADD
、XREAD
或XACK
- 无互斥锁实现:通过消息传递实现并发
- 管道化的
XADD
和分页的XREAD
,吞吐量达到每秒100k消息
虽然我们希望提供一个类似Kafka的客户体验,但Redis和Kafka之间有一些根本性的区别
- 在Redis中,序列号不是连续的
- 在Kafka中,序列号是连续的
- 在Redis中,消息以先到先得的方式分发到消费者组中的成员,这导致了下一个问题
- 在Kafka中,消费者与分片之间在消费者组中是1对1的
- 在Redis中,必须对每条消息执行
ACK
- 在Kafka中,只需要一个Ack(读取到)来处理一系列读取操作
已实现的功能
- 实时模式,带有自动流重置
- 可恢复模式,带有自动ACK和/或自动提交
- 负载均衡模式,带有故障转移行为
- 按时间点进行Seek/rewind
- 基本流分片:将流拆分为多个子流
最佳做法是查看测试,以了解不同的流行为。
SeaStreamer如何提供更好的并发性?
考虑以下简单的流处理器
loop {
let input = XREAD.await;
let output = process(input).await;
XADD(output).await;
}
当它在读取或写入时,它没有在处理。因此,它在空闲时浪费时间,以更高的延迟读取消息,这反过来又限制了吞吐量。此外,读取的理想批次大小可能不是写入的理想批次大小。
在SeaStreamer中,读取和写入循环与您的进程循环分开,因此它们都可以并行发生(Rust中的异步是多线程的,因此是真正的并行)!
如果您从消费者组中读取,您还必须考虑何时ACK以及在一次请求中要批量多少个ACK。SeaStreamer可以在常规间隔后台提交,或者您可以异步提交而不阻塞您的进程循环。
在未来,我们希望支持Redis集群,因为没有集群的分区不是非常有用。目前它还在进行中。这是一个相当困难的任务,因为当与集群一起工作时,客户端必须承担责任。在Redis中,分区和节点是M-N映射 - 分区可以在任何时间在节点之间移动。这使得测试变得非常困难。如果您想帮忙,请告诉我们!
还有一个小型实用程序可以将Redis Streams消息导出到SeaStreamer文件。
此crate基于redis
构建。
sea-streamer-stdio
:标准I/O后端
这是SeaStreamer的stdio
后端实现。它旨在与Unix管道连接,在开发流处理器或本地处理数据时提供极大的灵活性。
您可以使用管道连接处理器:processor_a | processor_b
。
您也可以异步连接它们
touch stream # set up an empty file
tail -f stream | processor_b # program b can be spawned anytime
processor_a >> stream # append to the file
您还可以使用cat
重放文件,但它的运行速度尽可能快,然后停止,这可能是或可能不是期望的行为。
您可以将任何有效的UTF-8字符串写入stdin,每行将被视为一条消息。此外,您还可以以简单格式写入一些消息元数据
[timestamp | stream_key | sequence | shard_id] payload
注意:方括号是字面意义上的[
]
。
以下都是有效的
a plain, raw message
[2022-01-01T00:00:00] { "payload": "anything" }
[2022-01-01T00:00:00.123 | my_topic] "a string payload"
[2022-01-01T00:00:00 | my-topic-2 | 123] ["array", "of", "values"]
[2022-01-01T00:00:00 | my-topic-2 | 123 | 4] { "payload": "anything" }
[my_topic] a string payload
[my_topic | 123] { "payload": "anything" }
[my_topic | 123 | 4] { "payload": "anything" }
以下都是无效的
[Jan 1, 2022] { "payload": "anything" }
[2022-01-01T00:00:00] 12345
如果没有提供流键,它将分配名称broadcast
并将其发送到所有消费者。
您可以创建仅订阅部分主题的消费者。
同一ConsumerGroup
中的消费者将进行负载均衡(以轮询方式),这意味着您可以为并行处理消息生成多个异步任务。
sea-streamer-file
:文件后端
这非常类似于 sea-streamer-stdio
,但不同之处在于 SeaStreamerStdio 是实时工作的,而 sea-streamer-file
则是实时和回放。这意味着 SeaStreamerFile 有能力遍历 .ss
(sea-stream)文件,并寻找到特定的时戳/偏移量。
此外,Stdio 只能处理 UTF-8 文本数据,而 File 能够处理二进制数据。在 Stdio 中,每个进程只有一个 Streamer。在 File 中,同一个进程中可以有多个独立的 Streamer。毕竟,Streamer 只是一个文件。
SeaStreamerFile 的基本思想类似于带有每行一条消息的 tail -f
,其中消息框架携带二进制有效负载。有趣的是,在 SeaStreamer 中,我们并不使用分隔符来分隔消息。这消除了编码/解码消息有效负载的开销。但它增加了文件格式的复杂性。
SeaStreamerFile 格式是为了高效的快速前进和搜索而设计的。这是通过在文件中放置固定间隔的灯塔数组来实现的。灯塔包含流的摘要,因此它就像一个原地索引。它还允许读者与消息边界对齐。想了解更多关于文件格式的信息,请阅读src/format.rs
。
除此之外,还有高级 SeaStreamer 多生产者、多消费者流语义,类似于其他 SeaStreamer 后端的行为。特别是,负载均衡行为与 Stdio 相同,即轮询。
解码器
我们提供了一个小工具来解码 .ss
文件
cargo install sea-streamer-file --features=executables --bin ss-decode
# local version
alias ss-decode='cargo run --package sea-streamer-file --features=executables --bin ss-decode'
ss-decode -- --file <file> --format <format>
小贴士:将其通过 less
管道输出进行分页
ss-decode --file mystream.ss | less
示例 log
格式
# header
[2023-06-05T13:55:53.001 | hello | 1 | 0] message-1
# beacon
示例 ndjson
格式
/* header */
{"header":{"stream_key":"hello","shard_id":0,"sequence":1,"timestamp":"2023-06-05T13:55:53.001"},"payload":"message-1"}
/* beacon */
此外,还有一个位于 sea-streamer-file-reader
下的 TypeScript 实现。
待办事项
- 可恢复:目前未实现。一个可能的实现是将它们提交到本地的 SQLite 数据库中。
- 分片:目前它只流到 SHARD ZERO。
- 验证:一个用于验证和修复 SeaStreamer 二进制文件的工具程序。
sea-streamer-runtime
:异步运行时抽象
这个 crate 提供了一组小的函数,用于在 async-std
和 tokio
之间对齐类型签名,以便您可以构建针对这两个运行时通用的应用程序。
许可证
许可方式为以下之一
- Apache License,版本 2.0(《LICENSE-APACHE》或 https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT 许可证(《LICENSE-MIT》或 http://opensource.org/licenses/MIT)
任选其一。
除非您明确指出,否则您提交给作品的所有有意贡献,根据 Apache-2.0 许可证定义,应按上述方式双重许可,不附加任何额外条款或条件。
赞助商
SeaQL.org 是一个由热情的开发者运营的独立开源组织。如果您喜欢使用我们的库,请为我们点赞并分享我们的存储库。如果您感到慷慨,通过 GitHub Sponsor 做小额捐赠将非常感激,这将对维持该组织大有帮助。
我们邀请您参与、贡献,并共同帮助构建 Rust 的未来。
依赖项
~3–17MB
~245K SLoC