7个稳定版本
1.2.2 | 2024年7月15日 |
---|---|
1.2.1 | 2024年3月7日 |
1.1.1 | 2023年10月27日 |
1.1.0 | 2023年9月8日 |
1.0.1 | 2023年8月29日 |
#250 in 异步
124 每月下载量
150KB
3.5K SLoC
scoped-stream-sink
构建流和汇的便利库。
📌 2.0计划
自从AFIT(以及RPITIT)稳定下来后,我计划升级这个库的接口以使用它们。这应该会消除对Box::pin
的要求,但代价是复杂的类型边界(也许更难使用)。到目前为止,我还没有成功完全理解类型边界。
以下是(可能的)2.0版本的大致计划
- 消除
Box::pin
的要求(也许添加类型别名以支持动态版本)。 - 增强
StreamSink
的功能(目前它有点实验性)。
如何使用
只需将此crate导入到您的项目中即可。文档已包含在内,并且使用起来非常简单。
示例
use std::time::Duration;
use anyhow::Error;
use futures_sink::Sink;
use futures_core::Stream;
use futures_util::{SinkExt, StreamExt};
use scoped_stream_sink::*;
#[tokio::main]
async fn main() -> Result<(), Error> {
// Create new scoped stream
let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
// We have to Box::pin it because otherwise the trait bounds is too complex
// Interior sink cannot outlast the lifetime of it's outer stream
// This will not work
// tokio::spawn(async move { sink.send(10000).await.unwrap() }).await.unwrap();
// Assume this is a complex task
let (mut a, mut b) = (1usize, 1);
for _ in 0..10 {
sink.send(a).await.unwrap();
(a, b) = (b, a + b);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}));
let mut v = Vec::new();
while let Some(i) = stream.next().await {
v.push(i);
}
println!("{v:?}");
// Create new sink
let mut sink = <ScopedSink<usize, Error>>::new(|mut stream| Box::pin(async move {
// Unlike ScopedStream, this closure will be called over and over again,
// until all values are consumed
// Assume this is a complex task
tokio::time::sleep(Duration::from_millis(100)).await;
if let Some(v) = stream.next().await {
println!("Value: {v}");
}
Ok(())
}));
for i in 0..10 {
sink.send(i).await?;
}
sink.close().await?;
Ok(())
}
为什么?
因为实现合适的Stream
和Sink
很困难。最初我用来制作一个网络数据包处理器(类似于原始服务)。由于代码有潜力,我决定将其提取到一个专门的crate中。此外,这是一种有点黑客/酷的方式来制作生成器。
use core::pin::pin;
use core::ptr::NonNull;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use futures_sink::Sink;
use futures_core::Stream;
use futures_util::{SinkExt, StreamExt};
use scoped_stream_sink::*;
/// Create a null waker. It does nothing when waken.
fn nil_waker() -> Waker {
fn raw() -> RawWaker {
RawWaker::new(NonNull::dangling().as_ptr(), &VTABLE)
}
unsafe fn clone(_: *const ()) -> RawWaker {
raw()
}
unsafe fn wake(_: *const ()) {}
unsafe fn wake_by_ref(_: *const ()) {}
unsafe fn drop(_: *const ()) {}
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
unsafe { Waker::from_raw(raw()) }
}
fn main() {
// Create a generator
//
// Yes, this is a very hacky way of doing it in stable Rust
let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
for i in 0usize..10 {
sink.send(i).await.unwrap();
}
}));
let mut stream = pin!(stream);
// Setup waker and context
let waker = nil_waker();
let mut cx = Context::from_waker(&waker);
// The loop
loop {
let v = match stream.as_mut().poll_next(&mut cx) {
Poll::Pending => continue, // Should not happen, but continue anyways
Poll::Ready(None) => break, // Stop iteration
Poll::Ready(Some(v)) => v, // Process value
};
println!("{v}");
}
}
如何?
在基本层面上,有一个内嵌的固定值,在汇/流内部和外部都可以可变访问。由于这有点违反了别名可变借用,它只能用于单个线程中。然而,由于借用是顺序进行的,"传递"在内部和外部上下文之间,所以这应该没问题。为了强制执行这一点,本地变体简单地不允许Send
,而那些非本地的则使用简单的锁,如果锁定失败会引发恐慌。
在实际中,必须确保流/汇可以从内部和外部唤醒,同时不使用唤醒器本身。除非它已经完成/关闭,否则流/汇不应该等待任何东西。
许可证
此库受Apache 2.0许可证的许可。
依赖项
~250KB