0.1.0 |
|
---|
在 #byte-string 中排名 #68
93KB
1K SLoC
futures-codec2
从 tokio-util 分支出的帧编解码器实用工具。
许可证
此项目采用 MIT 许可证。
贡献
除非你明确说明,否则你提交给 Tokio 的任何贡献,都应作为 MIT 许可,不附加任何额外条款或条件。
lib.rs
:
从 AsyncRead/AsyncWrite 到 Stream/Sink 的适配器
原始 I/O 对象与字节序列一起工作,但高级代码通常希望将这些批处理成有意义的块,称为 "帧"。
此模块包含适配器,可以将字节流、AsyncRead
和 AsyncWrite
转换为实现了 Sink
和 Stream
的帧流。帧流也称为传输。
解码器 trait
解码器 Decoder
与 FramedRead
或 Framed
一起使用,将 AsyncRead
转换为 Stream
。解码器 trait 的任务是指定如何将字节序列转换为帧序列,并确定帧之间的边界。FramedRead 的任务是反复在从 IO 资源读取更多数据之间切换,并询问解码器是否已经收到足够的数据来解码另一帧数据。
Decoder
trait 的主要方法是 decode
方法。此方法以已读取的数据作为参数,当它被调用时,它将处于以下情况之一
- 缓冲区包含少于一个完整的帧。
- 缓冲区包含正好一个完整的帧。
- 缓冲区包含多于一个完整的帧。
在第一种情况下,解码器应返回 Ok(None)
。
在第二种情况下,解码器应该清除提供的缓冲区,并返回Ok(Some(the_decoded_frame))
。
在第三种情况下,解码器应该使用类似split_to
或advance
的方法修改缓冲区,使帧从缓冲区中移除,但该帧之后的数据仍然保留在缓冲区中。在这种情况下,解码器也应返回Ok(Some(the_decoded_frame))
。
最后,如果数据以某种方式无效,解码器可能会返回错误。解码器不应该仅仅因为没有收到完整的帧就返回错误。
保证从一个decode
调用到另一个调用,提供的缓冲区将包含与之前完全相同的数据,除非通过IO资源到达了更多数据,那么这些数据将被附加到缓冲区。这意味着从FramedRead
读取帧本质上等同于以下循环
use futures::io::AsyncBufReadExt;
let mut buf = bytes::BytesMut::new();
loop {
// The read_buf call will append to buf rather than overwrite existing data.
let len = {
let rbuf = io_resource.fill_buf().await?;
buf.extend_from_slice(rbuf);
rbuf.len()
};
io_resource.consume_unpin(len);
if len == 0 {
while let Some(frame) = decoder.decode_eof(&mut buf)? {
yield frame;
}
break;
}
while let Some(frame) = decoder.decode(&mut buf)? {
yield frame;
}
}
上面的例子在Stream
产生项时使用yield
。
示例解码器
作为一个例子,考虑一种可以用来发送字符串的协议,其中每个帧都是一个包含帧长度的四个字节整数,后面跟着相应数量的字符串数据字节。如果字符串数据不是有效的utf-8或太长,解码器会失败。
这样的解码器可以写成这样
use futures_codec2::Decoder;
use bytes::{BytesMut, Buf};
struct MyStringDecoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Decoder for MyStringDecoder {
type Item = String;
type Error = std::io::Error;
fn decode(
&mut self,
src: &mut BytesMut
) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 {
// Not enough data to read length marker.
return Ok(None);
}
// Read length marker.
let mut length_bytes = [0u8; 4];
length_bytes.copy_from_slice(&src[..4]);
let length = u32::from_le_bytes(length_bytes) as usize;
// Check that the length is not too large to avoid a denial of
// service attack where the server runs out of memory.
if length > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", length)
));
}
if src.len() < 4 + length {
// The full string has not yet arrived.
//
// We reserve more space in the buffer. This is not strictly
// necessary, but is a good idea performance-wise.
src.reserve(4 + length - src.len());
// We inform the Framed that we need more bytes to form the next
// frame.
return Ok(None);
}
// Use advance to modify src such that it no longer contains
// this frame.
let data = src[4..4 + length].to_vec();
src.advance(4 + length);
// Convert the data to a string, or fail if it is not valid utf-8.
match String::from_utf8(data) {
Ok(string) => Ok(Some(string)),
Err(utf8_error) => {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
utf8_error.utf8_error(),
))
},
}
}
}
编码器特质
Encoder
与FramedWrite
或Framed
一起使用,将AsyncWrite
转换成Sink
。编码器特质的工作是指定如何将帧转换成字节序列。FramedWrite
的工作是将生成的字节序列写入IO资源。
Encoder
特质的encode
方法是主方法。该方法接收要写入的项和一个写入项的缓冲区。缓冲区可能已经包含数据,在这种情况下,编码器应将新帧附加到缓冲区,而不是覆盖现有数据。
保证从一个encode
调用到另一个调用,提供的缓冲区将包含与之前完全相同的数据,除了可能有一些数据已经被从缓冲区的前端移除。写入FramedWrite
本质上等同于以下循环
use futures::future::FutureExt;
use futures::io::AsyncWriteExt;
use bytes::Buf; // for advance
const MAX: usize = 8192;
let mut buf = bytes::BytesMut::new();
loop {
futures::select! {
num_written = io_resource.write(&buf).fuse() => {
if !buf.is_empty() {
buf.advance(num_written?);
}
},
frame = next_frame().fuse() => {
if buf.len() < MAX {
encoder.encode(frame, &mut buf)?;
}
},
_ = no_more_frames().fuse() => {
io_resource.write_all(&buf).await?;
io_resource.close().await?;
return Ok(());
},
}
}
这里next_frame
方法对应于您写入FramedWrite
的任何帧。no_more_frames
方法对应于使用SinkExt::close
关闭FramedWrite
。
示例编码器
例如,考虑一个可以用来发送字符串的协议,其中每个帧是一个包含帧长度(四个字节整数)的字节数据,然后是相应长度的字符串数据。如果字符串太长,编码器将失败。
这样的编码器可以写成这样
use futures_codec2::Encoder;
use bytes::BytesMut;
struct MyStringEncoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Encoder<String> for MyStringEncoder {
type Error = std::io::Error;
fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
// Don't send a string if it is longer than the other end will
// accept.
if item.len() > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", item.len())
));
}
// Convert the length into a byte array.
// The cast to u32 cannot overflow due to the length check above.
let len_slice = u32::to_le_bytes(item.len() as u32);
// Reserve space in the buffer.
dst.reserve(4 + item.len());
// Write the length and string to the buffer.
dst.extend_from_slice(&len_slice);
dst.extend_from_slice(item.as_bytes());
Ok(())
}
}
依赖项
~0.7–1.3MB
~26K SLoC