#async-stream #io #context #adapter #read-write #io-read #error

async-stdio

在 std::io 环境中使用异步读写流的适配器

6 个版本

0.3.0-alpha.42019年10月11日
0.3.0-alpha.32019年6月20日
0.3.0-alpha.22019年6月14日
0.0.0 2019年6月14日

#1847 in 异步

每月 30 次下载
openssl-async 中使用

MIT/Apache

12KB
111

在 std::io 环境中使用异步读写流的适配器

version documentation license

许可证

许可协议为以下之一

由您选择。

贡献

除非您明确说明,否则任何有意提交以包含在作品中的贡献,根据 Apache-2.0 许可证定义,应作为上述双重许可,不附加任何额外条款或条件。


lib.rs:

std::io 环境中使用异步读写流的适配器

有时,您会遇到仅接受 std::io Read + Write 类型的接口,但还需要为异步/await 应用程序进行适配。[AsStdIo] 适配器允许 [AsyncRead] + [AsyncWrite] 流作为其 std::io 对应物使用。假设任何消费包装流的实体都会向上冒泡 io::ErrorKind::WouldBlock 错误并允许操作恢复,这提供了一种方法,既可以使用异步流与仅 std::io 接口,也可以围绕它编写异步包装器。

示例

#
#
use async_stdio::*;

struct ChunkReader<R> {
    // ...
    # reader: R,
    # chunk_size: usize,
    # buffer: VecDeque<u8>,
}

impl<R: Read> ChunkReader<R> {
    fn new(reader: R, chunk_size: usize) -> Self {
        // ...
        # ChunkReader {
        #     reader,
        #     chunk_size,
        #     buffer: Default::default(),
        # }
    }

    /// Reads a chunk from the stream
    ///
    /// If the stream ends before a full chunk is read, may return a smaller
    /// chunk. Returns an empty chunk if there is no more to be read.
    fn read_chunk(&mut self) -> io::Result<Vec<u8>> {
        // ...
        # let mut tmp = vec![0u8; self.chunk_size];
        # let mut bytes = self.chunk_size;
        # loop {
        #     if self.buffer.len() >= self.chunk_size || bytes == 0 {
        #         let end = self.buffer.len().min(self.chunk_size);
        #         tmp.truncate(0);
        #         return Ok(self.buffer.drain(..end).fold(tmp, |mut out, b| {
        #             out.push(b);
        #             out
        #         }));
        #     }
        #     bytes = self.reader.read(&mut tmp)?;
        #     self.buffer.extend(&tmp[..bytes]);
        # }
    }
}

/// Wrapper around the std-only `ChunkReader` to turn it
/// into an async `Stream`
struct AsyncChunked<S> {
    inner: ChunkReader<AsStdIo<S>>,
    waker_ctrl: WakerCtrl,
}

impl<S: AsyncRead + Unpin> AsyncChunked<S> {
    fn new(stream: S, chunk_size: usize) -> AsyncChunked<S> {
        let (stream, waker_ctrl) = AsStdIo::new(stream, None);
        let inner = ChunkReader::new(stream, chunk_size);
        AsyncChunked { inner, waker_ctrl }
    }
}

impl<S: AsyncRead + Unpin> Stream for AsyncChunked<S> {
    type Item = io::Result<Vec<u8>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        // Make sure the waker is set before the calls to `std::io::Read::read`
        this.waker_ctrl.register(cx.waker());
        // `into_poll` (from `ResultExt`) converts `WouldBlock` into `Pending`
        let chunk_res = ready!(this.inner.read_chunk().into_poll());

        Poll::Ready(
            chunk_res
                .map(|chunk| if chunk.is_empty() { None } else { Some(chunk) })
                .transpose(),
        )
    }
}

// Pretend this doesn't already implement `io::Read`
let stream = io::Cursor::new(vec![0, 1, 2, 3, 4, 5]);
let mut async_chunked = AsyncChunked::new(stream, 2);

let chunks: Vec<Vec<u8>> = block_on(async_chunked.map(|chunk| chunk.unwrap()).collect());

let expected: Vec<Vec<u8>> = vec![vec![0, 1], vec![2, 3], vec![4, 5]];

assert_eq!(chunks, expected,);

依赖关系

~1MB
~18K SLoC