6 个版本
0.3.0-alpha.4 | 2019年10月11日 |
---|---|
0.3.0-alpha.3 | 2019年6月20日 |
0.3.0-alpha.2 | 2019年6月14日 |
0.0.0 | 2019年6月14日 |
#1847 in 异步
每月 30 次下载
在 openssl-async 中使用
12KB
111 行
在 std::io 环境中使用异步读写流的适配器
许可证
许可协议为以下之一
- Apache License, Version 2.0, (LICENSE-APACHE 或 https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT 许可证 (LICENSE-MIT 或 http://opensource.org/licenses/MIT)
由您选择。
贡献
除非您明确说明,否则任何有意提交以包含在作品中的贡献,根据 Apache-2.0 许可证定义,应作为上述双重许可,不附加任何额外条款或条件。
lib.rs
:
在 std::io 环境中使用异步读写流的适配器
有时,您会遇到仅接受 std::io Read + Write
类型的接口,但还需要为异步/await 应用程序进行适配。[AsStdIo] 适配器允许 [AsyncRead] + [AsyncWrite] 流作为其 std::io 对应物使用。假设任何消费包装流的实体都会向上冒泡 io::ErrorKind::WouldBlock 错误并允许操作恢复,这提供了一种方法,既可以使用异步流与仅 std::io 接口,也可以围绕它编写异步包装器。
示例
#
#
use async_stdio::*;
struct ChunkReader<R> {
// ...
# reader: R,
# chunk_size: usize,
# buffer: VecDeque<u8>,
}
impl<R: Read> ChunkReader<R> {
fn new(reader: R, chunk_size: usize) -> Self {
// ...
# ChunkReader {
# reader,
# chunk_size,
# buffer: Default::default(),
# }
}
/// Reads a chunk from the stream
///
/// If the stream ends before a full chunk is read, may return a smaller
/// chunk. Returns an empty chunk if there is no more to be read.
fn read_chunk(&mut self) -> io::Result<Vec<u8>> {
// ...
# let mut tmp = vec![0u8; self.chunk_size];
# let mut bytes = self.chunk_size;
# loop {
# if self.buffer.len() >= self.chunk_size || bytes == 0 {
# let end = self.buffer.len().min(self.chunk_size);
# tmp.truncate(0);
# return Ok(self.buffer.drain(..end).fold(tmp, |mut out, b| {
# out.push(b);
# out
# }));
# }
# bytes = self.reader.read(&mut tmp)?;
# self.buffer.extend(&tmp[..bytes]);
# }
}
}
/// Wrapper around the std-only `ChunkReader` to turn it
/// into an async `Stream`
struct AsyncChunked<S> {
inner: ChunkReader<AsStdIo<S>>,
waker_ctrl: WakerCtrl,
}
impl<S: AsyncRead + Unpin> AsyncChunked<S> {
fn new(stream: S, chunk_size: usize) -> AsyncChunked<S> {
let (stream, waker_ctrl) = AsStdIo::new(stream, None);
let inner = ChunkReader::new(stream, chunk_size);
AsyncChunked { inner, waker_ctrl }
}
}
impl<S: AsyncRead + Unpin> Stream for AsyncChunked<S> {
type Item = io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// Make sure the waker is set before the calls to `std::io::Read::read`
this.waker_ctrl.register(cx.waker());
// `into_poll` (from `ResultExt`) converts `WouldBlock` into `Pending`
let chunk_res = ready!(this.inner.read_chunk().into_poll());
Poll::Ready(
chunk_res
.map(|chunk| if chunk.is_empty() { None } else { Some(chunk) })
.transpose(),
)
}
}
// Pretend this doesn't already implement `io::Read`
let stream = io::Cursor::new(vec![0, 1, 2, 3, 4, 5]);
let mut async_chunked = AsyncChunked::new(stream, 2);
let chunks: Vec<Vec<u8>> = block_on(async_chunked.map(|chunk| chunk.unwrap()).collect());
let expected: Vec<Vec<u8>> = vec![vec![0, 1], vec![2, 3], vec![4, 5]];
assert_eq!(chunks, expected,);
依赖关系
~1MB
~18K SLoC