#frame #byte #codec #decoder #future #byte-string #io

已删除 futures-codec2

从 tokio-util 分支出的帧编解码器实用工具

0.1.0 2020年12月31日

#byte-string 中排名 #68

MIT 许可证

93KB
1K SLoC

futures-codec2

tokio-util 分支出的帧编解码器实用工具。

许可证

此项目采用 MIT 许可证

贡献

除非你明确说明,否则你提交给 Tokio 的任何贡献,都应作为 MIT 许可,不附加任何额外条款或条件。


lib.rs:

从 AsyncRead/AsyncWrite 到 Stream/Sink 的适配器

原始 I/O 对象与字节序列一起工作,但高级代码通常希望将这些批处理成有意义的块,称为 "帧"。

此模块包含适配器,可以将字节流、AsyncReadAsyncWrite 转换为实现了 SinkStream 的帧流。帧流也称为传输。

解码器 trait

解码器 DecoderFramedReadFramed 一起使用,将 AsyncRead 转换为 Stream。解码器 trait 的任务是指定如何将字节序列转换为帧序列,并确定帧之间的边界。FramedRead 的任务是反复在从 IO 资源读取更多数据之间切换,并询问解码器是否已经收到足够的数据来解码另一帧数据。

Decoder trait 的主要方法是 decode 方法。此方法以已读取的数据作为参数,当它被调用时,它将处于以下情况之一

  1. 缓冲区包含少于一个完整的帧。
  2. 缓冲区包含正好一个完整的帧。
  3. 缓冲区包含多于一个完整的帧。

在第一种情况下,解码器应返回 Ok(None)

在第二种情况下,解码器应该清除提供的缓冲区,并返回Ok(Some(the_decoded_frame))

在第三种情况下,解码器应该使用类似split_toadvance的方法修改缓冲区,使帧从缓冲区中移除,但该帧之后的数据仍然保留在缓冲区中。在这种情况下,解码器也应返回Ok(Some(the_decoded_frame))

最后,如果数据以某种方式无效,解码器可能会返回错误。解码器不应该仅仅因为没有收到完整的帧就返回错误。

保证从一个decode调用到另一个调用,提供的缓冲区将包含与之前完全相同的数据,除非通过IO资源到达了更多数据,那么这些数据将被附加到缓冲区。这意味着从FramedRead读取帧本质上等同于以下循环

use futures::io::AsyncBufReadExt;

let mut buf = bytes::BytesMut::new();
loop {
    // The read_buf call will append to buf rather than overwrite existing data.
    let len = {
        let rbuf = io_resource.fill_buf().await?;
        buf.extend_from_slice(rbuf);
        rbuf.len()
    };
    io_resource.consume_unpin(len);

    if len == 0 {
        while let Some(frame) = decoder.decode_eof(&mut buf)? {
            yield frame;
        }
        break;
    }

    while let Some(frame) = decoder.decode(&mut buf)? {
        yield frame;
    }
}

上面的例子在Stream产生项时使用yield

示例解码器

作为一个例子,考虑一种可以用来发送字符串的协议,其中每个帧都是一个包含帧长度的四个字节整数,后面跟着相应数量的字符串数据字节。如果字符串数据不是有效的utf-8或太长,解码器会失败。

这样的解码器可以写成这样

use futures_codec2::Decoder;
use bytes::{BytesMut, Buf};

struct MyStringDecoder {}

const MAX: usize = 8 * 1024 * 1024;

impl Decoder for MyStringDecoder {
    type Item = String;
    type Error = std::io::Error;

    fn decode(
        &mut self,
        src: &mut BytesMut
    ) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 4 {
            // Not enough data to read length marker.
            return Ok(None);
        }

        // Read length marker.
        let mut length_bytes = [0u8; 4];
        length_bytes.copy_from_slice(&src[..4]);
        let length = u32::from_le_bytes(length_bytes) as usize;

        // Check that the length is not too large to avoid a denial of
        // service attack where the server runs out of memory.
        if length > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", length)
            ));
        }

        if src.len() < 4 + length {
            // The full string has not yet arrived.
            //
            // We reserve more space in the buffer. This is not strictly
            // necessary, but is a good idea performance-wise.
            src.reserve(4 + length - src.len());

            // We inform the Framed that we need more bytes to form the next
            // frame.
            return Ok(None);
        }

        // Use advance to modify src such that it no longer contains
        // this frame.
        let data = src[4..4 + length].to_vec();
        src.advance(4 + length);

        // Convert the data to a string, or fail if it is not valid utf-8.
        match String::from_utf8(data) {
            Ok(string) => Ok(Some(string)),
            Err(utf8_error) => {
                Err(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    utf8_error.utf8_error(),
                ))
            },
        }
    }
}

编码器特质

EncoderFramedWriteFramed一起使用,将AsyncWrite转换成Sink。编码器特质的工作是指定如何将帧转换成字节序列。FramedWrite的工作是将生成的字节序列写入IO资源。

Encoder特质的encode方法是主方法。该方法接收要写入的项和一个写入项的缓冲区。缓冲区可能已经包含数据,在这种情况下,编码器应将新帧附加到缓冲区,而不是覆盖现有数据。

保证从一个encode调用到另一个调用,提供的缓冲区将包含与之前完全相同的数据,除了可能有一些数据已经被从缓冲区的前端移除。写入FramedWrite本质上等同于以下循环

use futures::future::FutureExt;
use futures::io::AsyncWriteExt;
use bytes::Buf; // for advance

const MAX: usize = 8192;

let mut buf = bytes::BytesMut::new();
loop {
    futures::select! {
        num_written = io_resource.write(&buf).fuse() => {
            if !buf.is_empty() {
                buf.advance(num_written?);
            }
        },
        frame = next_frame().fuse() => {
            if buf.len() < MAX {
                encoder.encode(frame, &mut buf)?;
            }
        },
        _ = no_more_frames().fuse() => {
            io_resource.write_all(&buf).await?;
            io_resource.close().await?;
            return Ok(());
        },
    }
}

这里next_frame方法对应于您写入FramedWrite的任何帧。no_more_frames方法对应于使用SinkExt::close关闭FramedWrite

示例编码器

例如,考虑一个可以用来发送字符串的协议,其中每个帧是一个包含帧长度(四个字节整数)的字节数据,然后是相应长度的字符串数据。如果字符串太长,编码器将失败。

这样的编码器可以写成这样

use futures_codec2::Encoder;
use bytes::BytesMut;

struct MyStringEncoder {}

const MAX: usize = 8 * 1024 * 1024;

impl Encoder<String> for MyStringEncoder {
    type Error = std::io::Error;

    fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
        // Don't send a string if it is longer than the other end will
        // accept.
        if item.len() > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", item.len())
            ));
        }

        // Convert the length into a byte array.
        // The cast to u32 cannot overflow due to the length check above.
        let len_slice = u32::to_le_bytes(item.len() as u32);

        // Reserve space in the buffer.
        dst.reserve(4 + item.len());

        // Write the length and string to the buffer.
        dst.extend_from_slice(&len_slice);
        dst.extend_from_slice(item.as_bytes());
        Ok(())
    }
}

依赖项

~0.7–1.3MB
~26K SLoC