#stream #split #predicate #extension #traits #future #left

split-stream-by

将 Stream 按条件拆分为两个的 Stream 扩展包

2 个不稳定版本

0.1.0 2021年9月11日
0.0.0 2021年9月9日

异步 分类中排名 #891

Download history 309/week @ 2024-03-24 166/week @ 2024-03-31 85/week @ 2024-04-07 235/week @ 2024-04-14 324/week @ 2024-04-21 123/week @ 2024-04-28 150/week @ 2024-05-05 314/week @ 2024-05-12 332/week @ 2024-05-19 86/week @ 2024-05-26 160/week @ 2024-06-02 145/week @ 2024-06-09 243/week @ 2024-06-16 248/week @ 2024-06-23 325/week @ 2024-06-30 270/week @ 2024-07-07

每月下载量 1,090
2 个包中使用(通过 vtubestudio

MIT 许可证

42KB
816

此仓库是一个 Rust crate,提供了一个 futures::Stream 扩展 trait,允许使用一个检查每个 Stream::Item 的谓词函数将 Stream 拆分为两个流。

use split_stream_by::SplitStreamByExt;

let incoming_stream = futures::stream::iter([0,1,2,3,4,5]);
let (mut even_stream, mut odd_stream) = incoming_stream.split_by(|&n| n % 2 == 0);

tokio::spawn(async move {
	while let Some(even_number) = even_stream.next().await {
		println!("Even {}",even_number);
	}
});

while let Some(odd_number) = odd_stream.next().await {
	println!("Odd {}",odd_number);
}

更高级的用法使用 split_by_map,允许在拆分的同时提取值

use split_stream_by::{Either,SplitStreamByExt};

struct Request {
	//...
}

struct Response {
	//...
}

enum Message {
	Request(Request),
	Response(Response)
}

let incoming_stream = futures::stream::iter([
	Message::Request(Request {}),
	Message::Response(Response {}),
	Message::Response(Response {}),
]);
let (mut request_stream, mut response_stream) = incoming_stream.split_by_map(|item| match item {
	Message::Request(req) => Either::Left(req),
	Message::Response(res) => Either::Right(res),
});

tokio::spawn(async move {
	while let Some(request) = request_stream.next().await {
		// ...
	}
});

while let Some(response) = response_stream.next().await {
	// ...
}

在其当前实现中,当请求下一个项目的流不是匹配谓词的流时,它只能缓冲一个元素。将来,这可能通过使用 const 泛型参数来配置

依赖项

~1–1.7MB
~34K SLoC