#stream-processing #async-stream #stream #async #data-processing #processor

bin+lib sea-streamer-stdio

🌊 SeaStreamer 标准I/O后端

4 个版本 (破坏性)

0.5.0 2024年4月24日
0.3.0 2023年7月11日
0.2.0 2023年3月25日
0.1.0 2023年3月2日

#405并发

Download history 1122/week @ 2024-04-21 640/week @ 2024-04-28 961/week @ 2024-05-05 921/week @ 2024-05-12 860/week @ 2024-05-19 698/week @ 2024-05-26 650/week @ 2024-06-02 676/week @ 2024-06-09 711/week @ 2024-06-16 574/week @ 2024-06-23 627/week @ 2024-06-30 598/week @ 2024-07-07 732/week @ 2024-07-14 646/week @ 2024-07-21 408/week @ 2024-07-28 406/week @ 2024-08-04

每月2,246 次下载
4 个crate中使用了(直接使用2个)

MIT/Apache

92KB
2K SLoC

sea-streamer-stdio: 标准I/O后端

这是SeaStreamer的stdio后端实现。它设计用来与Unix管道连接,这使得在开发流处理器或本地处理数据时具有很高的灵活性。

您可以使用管道将处理器连接起来: processor_a | processor_b.

您也可以异步地连接它们

touch stream # set up an empty file
tail -f stream | processor_b # program b can be spawned anytime
processor_a >> stream # append to the file

您也可以使用 cat 来重放文件,但它从开始到结束尽可能快地运行然后停止,这可能是或可能不是期望的行为。

您可以将任何有效的UTF-8字符串写入stdin,每行将被视为一条消息。此外,您还可以以简单格式写入一些消息元数据

[timestamp | stream_key | sequence | shard_id] payload

注意:方括号是字面意义上的 [ ]

以下都是有效的

a plain, raw message
[2022-01-01T00:00:00] { "payload": "anything" }
[2022-01-01T00:00:00.123 | my_topic] "a string payload"
[2022-01-01T00:00:00 | my-topic-2 | 123] ["array", "of", "values"]
[2022-01-01T00:00:00 | my-topic-2 | 123 | 4] { "payload": "anything" }
[my_topic] a string payload
[my_topic | 123] { "payload": "anything" }
[my_topic | 123 | 4] { "payload": "anything" }

以下都是无效的

[Jan 1, 2022] { "payload": "anything" }
[2022-01-01T00:00:00] 12345

如果没有提供流键,它将被分配名称 broadcast 并发送给所有消费者。

您可以创建只订阅部分主题的消费者。

同一 ConsumerGroup 中的消费者将被负载均衡(以轮询方式),这意味着您可以在并行处理消息时启动多个异步任务。

依赖项

~4–16MB
~239K SLoC