#async-io #mocking #async-stream #future #async #stream

dev futures_ringbuf

用于测试的 Mock 类型,实现 AsyncRead/AsyncWrite

13 个版本

0.4.0 2023 年 6 月 4 日
0.3.1 2021 年 6 月 11 日
0.3.0 2020 年 11 月 3 日
0.2.1 2020 年 4 月 15 日
0.1.6 2019 年 11 月 12 日

#482异步

Download history 1740/week @ 2024-03-13 2327/week @ 2024-03-20 1867/week @ 2024-03-27 1418/week @ 2024-04-03 1202/week @ 2024-04-10 1556/week @ 2024-04-17 1418/week @ 2024-04-24 1577/week @ 2024-05-01 1988/week @ 2024-05-08 1966/week @ 2024-05-15 1189/week @ 2024-05-22 1990/week @ 2024-05-29 1874/week @ 2024-06-05 1515/week @ 2024-06-12 850/week @ 2024-06-19 436/week @ 2024-06-26

5,070 每月下载量
12 crate 中使用 (直接使用 10 个)

Unlicense

38KB
563

futures_ringbuf

standard-readme compliant Build Status Docs crates.io

实现 AsyncRead/AsyncWrite 的环形缓冲区。

它可以用于测试异步网络 crate,无需建立 TCP 连接。该 crate 提供了一个类型 Endpoint,允许使用环形缓冲区在两个方向上创建假网络流的两端。它简化了测试更复杂的情况,如背压。

它还可以用作异步任务之间的内存缓冲区。我还没有进行基准测试。

目前存在两个版本的 AsyncRead/Write 特性。一个是 futures-rs 版本,另一个是 tokio 版本。此 crate 实现了 futures 版本。您可以通过使用 tokio_util::compat 获取 tokio 版本。

传输中的数据存储在 ringbuf crate 的内部环形缓冲区中。

当启用 sketchy 功能时,将提供类型 Sketchy,该类型随机化内存缓冲区的行为,否则这些缓冲区始终处于就绪状态,这并不适用于测试将来将运行于实际网络连接的代码。这将随机返回挂起状态并仅填充部分缓冲区。

目录

安装

使用 cargo add: cargo add futures_ringbuf

使用 cargo yaml

dependencies:

   futures_ringbuf: ^0.4

使用原始 Cargo.toml

[dependencies]

    futures_ringbuf = "^0.4"

升级

升级时请查看 变更日志

依赖项

此crate依赖项很少。Cargo会自动为您处理其依赖项。

特性

sketchy特性会启用Sketchy类型,这允许随机更改异步流的操作行为,以便测试在实际网络上发生的场景,如超时、仅处理部分缓冲区、挂起等。

安全

此crate使用#![ forbid(unsafe_code) ],但其依赖项使用了相当多的不安全代码。在第一眼看来,ringbuf中的不安全使用似乎是合理的,但我还没有仔细检查其每个细节,并且它没有文档说明。futures库中存在大量不安全代码,我还没有对其进行审查。

使用

此crate提供了一个实现AsyncRead/AsyncWriteRingBuffer<T>结构体,当T为u8时。现在您可以调用split,它由AsyncRead提供,并将它们视为网络连接的两端。

当缓冲区为空时,读取器将返回Poll::Pending,当缓冲区满时,写入器将返回。当有新数据/空间可用时,它们会相互唤醒。

如果您想与std::io::Read/std::io::Write交互,请直接查看ringbuf crate,因为它的ProducerConsumer类型实现了这些特性,所以我没有在这里包含它们。

我还没有包含Stream<T>Sink<T>,因为在u8上这没有太多意义,但如果需求存在,它肯定可以添加。

T的要求是T: Sized + Copy

如果您想在使用futures_ringbuf之前对缓冲区进行初始化,可以使用ringbuf的ProducerConsumer类型。 futures_ringbuf::RingBuffer实现了From< (Producer<T>, Consumer<T>) >

WASM

此crate在WASM上运行。请参阅集成测试中的代码。

基本示例

//! Frame a RingBuf with futures_codec. This example shows how the sending task will
//! block when the buffer is full. When a reader consumes the buffer, the sender is woken up.
//!
//! Run with `cargo run --example basic`.
//
use
{
   futures_ringbuf    :: { *                                            } ,
   futures            :: { SinkExt, StreamExt, executor::block_on, join } ,
   asynchronous_codec :: { Framed, LinesCodec                           } ,
};


#[ async_std::main ]
//
async fn main()
{
   let mock = RingBuffer::new( 13 );
   let (mut writer, mut reader) = Framed::new( mock, LinesCodec{} ).split();


   let send_task = async move
   {
      writer.send( "Hello World\n".to_string() ).await.expect( "send" );
      println!( "sent first line" );

      writer.send( "Second line\n".to_string() ).await.expect( "send" );
      println!( "sent second line" );

      writer.close().await.expect( "close sender" );
      println!( "sink closed" );
   };


   let receive_task = async move
   {
      // If we would return here, the second line will never get sent
      // because the buffer is full.
      //
      // return;

      while let Some(msg) = reader.next().await.transpose().expect( "receive message" )
      {
         println!( "Received: {:#?}", msg );
      }
   };


   // Poll them in concurrently
   //
   join!( send_task, receive_task );
}

端点

当使用一个环形缓冲区时,我们获得一个连接的两端。如果我们想要一个更真实的全双工连接,我们需要两个环形缓冲区,一个端点从环形缓冲区读取,另一个端点写入。当有新数据或空间可用时,任务需要正确唤醒……为此,提供了一个Endpoint类型,它会为您处理这个设置。

端点示例

use
{
   futures_ringbuf :: { *                                               } ,
   futures         :: { AsyncWriteExt, AsyncReadExt, executor::block_on } ,
};


#[ async_std::main ]
//
async fn main()
{
   // Buffer of 10 bytes in each direction. The buffer size always refers to the writing side, so here
   // the first 10 means the server can write 10 bytes before it's buffer is full.
   // When it's full it will return pending on writing and when it's empty it returns
   // pending on reading.
   //
   let (mut server, mut client) = Endpoint::pair( 10, 10 );

   let     data = vec![ 1,2,3 ];
   let mut read = [0u8;3];

   server.write( &data ).await.expect( "write" );

   let n = client.read( &mut read ).await.expect( "read" );

   assert_eq!( n   , 3                 );
   assert_eq!( read, vec![ 1,2,3 ][..] );
}

API

API文档可以在docs.rs上找到。

贡献

请查看贡献指南

测试

cargotest

在WASM上,安装wasm-pack之后

wasm-pack test--firefox--无头

wasm-pack test--chrome--无头

行为准则

任何在公民行为准则的第4点“不可接受的行为”中描述的行为都不受欢迎,可能会让您被禁言。如果包括维护者和项目管理员在内的任何人都未能尊重这些/您的限制,您有权指出。

许可

许可

依赖项

~1–1.6MB
~30K SLoC