#sink #stream #async-stream #async

无std scoped-stream-sink

轻松创建流和汇

7个稳定版本

1.2.2 2024年7月15日
1.2.1 2024年3月7日
1.1.1 2023年10月27日
1.1.0 2023年9月8日
1.0.1 2023年8月29日

#250 in 异步

Download history 124/week @ 2024-07-14

124 每月下载量

Apache-2.0

150KB
3.5K SLoC

scoped-stream-sink

构建流和汇的便利库。

📌 2.0计划

自从AFIT(以及RPITIT)稳定下来后,我计划升级这个库的接口以使用它们。这应该会消除对Box::pin的要求,但代价是复杂的类型边界(也许更难使用)。到目前为止,我还没有成功完全理解类型边界。

以下是(可能的)2.0版本的大致计划

  • 消除Box::pin的要求(也许添加类型别名以支持动态版本)。
  • 增强StreamSink的功能(目前它有点实验性)。

如何使用

只需将此crate导入到您的项目中即可。文档已包含在内,并且使用起来非常简单。

示例

use std::time::Duration;

use anyhow::Error;
use futures_sink::Sink;
use futures_core::Stream;
use futures_util::{SinkExt, StreamExt};

use scoped_stream_sink::*;

#[tokio::main]
async fn main() -> Result<(), Error> {
    // Create new scoped stream
    let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
        // We have to Box::pin it because otherwise the trait bounds is too complex
        // Interior sink cannot outlast the lifetime of it's outer stream

        // This will not work
        // tokio::spawn(async move { sink.send(10000).await.unwrap() }).await.unwrap();

        // Assume this is a complex task
        let (mut a, mut b) = (1usize, 1);
        for _ in 0..10 {
            sink.send(a).await.unwrap();
            (a, b) = (b, a + b);
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    }));

    let mut v = Vec::new();
    while let Some(i) = stream.next().await {
        v.push(i);
    }
    println!("{v:?}");

    // Create new sink
    let mut sink = <ScopedSink<usize, Error>>::new(|mut stream| Box::pin(async move {
        // Unlike ScopedStream, this closure will be called over and over again,
        // until all values are consumed

        // Assume this is a complex task
        tokio::time::sleep(Duration::from_millis(100)).await;
        if let Some(v) = stream.next().await {
            println!("Value: {v}");
        }
        Ok(())
    }));

    for i in 0..10 {
        sink.send(i).await?;
    }
    sink.close().await?;

    Ok(())
}

为什么?

因为实现合适的StreamSink很困难。最初我用来制作一个网络数据包处理器(类似于原始服务)。由于代码有潜力,我决定将其提取到一个专门的crate中。此外,这是一种有点黑客/酷的方式来制作生成器。

use core::pin::pin;
use core::ptr::NonNull;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

use futures_sink::Sink;
use futures_core::Stream;
use futures_util::{SinkExt, StreamExt};

use scoped_stream_sink::*;

/// Create a null waker. It does nothing when waken.
fn nil_waker() -> Waker {
    fn raw() -> RawWaker {
        RawWaker::new(NonNull::dangling().as_ptr(), &VTABLE)
    }

    unsafe fn clone(_: *const ()) -> RawWaker {
        raw()
    }
    unsafe fn wake(_: *const ()) {}
    unsafe fn wake_by_ref(_: *const ()) {}
    unsafe fn drop(_: *const ()) {}

    static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);

    unsafe { Waker::from_raw(raw()) }
}

fn main() {
    // Create a generator
    //
    // Yes, this is a very hacky way of doing it in stable Rust
    let mut stream = ScopedStream::new(|mut sink| Box::pin(async move {
        for i in 0usize..10 {
            sink.send(i).await.unwrap();
        }
    }));
    let mut stream = pin!(stream);

    // Setup waker and context
    let waker = nil_waker();
    let mut cx = Context::from_waker(&waker);

    // The loop
    loop {
        let v = match stream.as_mut().poll_next(&mut cx) {
            Poll::Pending => continue, // Should not happen, but continue anyways
            Poll::Ready(None) => break, // Stop iteration
            Poll::Ready(Some(v)) => v, // Process value
        };

        println!("{v}");
    }
}

如何?

在基本层面上,有一个内嵌的固定值,在汇/流内部和外部都可以可变访问。由于这有点违反了别名可变借用,它只能用于单个线程中。然而,由于借用是顺序进行的,"传递"在内部和外部上下文之间,所以这应该没问题。为了强制执行这一点,本地变体简单地不允许Send,而那些非本地的则使用简单的锁,如果锁定失败会引发恐慌。

在实际中,必须确保流/汇可以从内部和外部唤醒,同时不使用唤醒器本身。除非它已经完成/关闭,否则流/汇不应该等待任何东西。

许可证

此库受Apache 2.0许可证的许可。

依赖项

~250KB