13 个版本
0.4.0 | 2023 年 6 月 4 日 |
---|---|
0.3.1 | 2021 年 6 月 11 日 |
0.3.0 | 2020 年 11 月 3 日 |
0.2.1 | 2020 年 4 月 15 日 |
0.1.6 | 2019 年 11 月 12 日 |
#482 在 异步 中
5,070 每月下载量
在 12 个 crate 中使用 (直接使用 10 个)
38KB
563 行
futures_ringbuf
实现 AsyncRead/AsyncWrite 的环形缓冲区。
它可以用于测试异步网络 crate,无需建立 TCP 连接。该 crate 提供了一个类型 Endpoint
,允许使用环形缓冲区在两个方向上创建假网络流的两端。它简化了测试更复杂的情况,如背压。
它还可以用作异步任务之间的内存缓冲区。我还没有进行基准测试。
目前存在两个版本的 AsyncRead/Write 特性。一个是 futures-rs 版本,另一个是 tokio 版本。此 crate 实现了 futures 版本。您可以通过使用 tokio_util::compat
获取 tokio 版本。
传输中的数据存储在 ringbuf crate 的内部环形缓冲区中。
当启用 sketchy
功能时,将提供类型 Sketchy
,该类型随机化内存缓冲区的行为,否则这些缓冲区始终处于就绪状态,这并不适用于测试将来将运行于实际网络连接的代码。这将随机返回挂起状态并仅填充部分缓冲区。
目录
安装
使用 cargo add: cargo add futures_ringbuf
使用 cargo yaml
dependencies:
futures_ringbuf: ^0.4
使用原始 Cargo.toml
[dependencies]
futures_ringbuf = "^0.4"
升级
升级时请查看 变更日志。
依赖项
此crate依赖项很少。Cargo会自动为您处理其依赖项。
特性
sketchy
特性会启用Sketchy
类型,这允许随机更改异步流的操作行为,以便测试在实际网络上发生的场景,如超时、仅处理部分缓冲区、挂起等。
安全
此crate使用#![ forbid(unsafe_code) ]
,但其依赖项使用了相当多的不安全代码。在第一眼看来,ringbuf
中的不安全使用似乎是合理的,但我还没有仔细检查其每个细节,并且它没有文档说明。futures库中存在大量不安全代码,我还没有对其进行审查。
使用
此crate提供了一个实现AsyncRead
/AsyncWrite
的RingBuffer<T>
结构体,当T
为u8时。现在您可以调用split
,它由AsyncRead
提供,并将它们视为网络连接的两端。
当缓冲区为空时,读取器将返回Poll::Pending
,当缓冲区满时,写入器将返回。当有新数据/空间可用时,它们会相互唤醒。
如果您想与std::io::Read
/std::io::Write
交互,请直接查看ringbuf
crate,因为它的Producer
和Consumer
类型实现了这些特性,所以我没有在这里包含它们。
我还没有包含Stream<T>
和Sink<T>
,因为在u8
上这没有太多意义,但如果需求存在,它肯定可以添加。
对T
的要求是T: Sized + Copy
。
如果您想在使用futures_ringbuf之前对缓冲区进行初始化,可以使用ringbuf的Producer
和Consumer
类型。 futures_ringbuf::RingBuffer
实现了From< (Producer<T>, Consumer<T>) >
。
WASM
此crate在WASM上运行。请参阅集成测试中的代码。
基本示例
//! Frame a RingBuf with futures_codec. This example shows how the sending task will
//! block when the buffer is full. When a reader consumes the buffer, the sender is woken up.
//!
//! Run with `cargo run --example basic`.
//
use
{
futures_ringbuf :: { * } ,
futures :: { SinkExt, StreamExt, executor::block_on, join } ,
asynchronous_codec :: { Framed, LinesCodec } ,
};
#[ async_std::main ]
//
async fn main()
{
let mock = RingBuffer::new( 13 );
let (mut writer, mut reader) = Framed::new( mock, LinesCodec{} ).split();
let send_task = async move
{
writer.send( "Hello World\n".to_string() ).await.expect( "send" );
println!( "sent first line" );
writer.send( "Second line\n".to_string() ).await.expect( "send" );
println!( "sent second line" );
writer.close().await.expect( "close sender" );
println!( "sink closed" );
};
let receive_task = async move
{
// If we would return here, the second line will never get sent
// because the buffer is full.
//
// return;
while let Some(msg) = reader.next().await.transpose().expect( "receive message" )
{
println!( "Received: {:#?}", msg );
}
};
// Poll them in concurrently
//
join!( send_task, receive_task );
}
端点
当使用一个环形缓冲区时,我们获得一个连接的两端。如果我们想要一个更真实的全双工连接,我们需要两个环形缓冲区,一个端点从环形缓冲区读取,另一个端点写入。当有新数据或空间可用时,任务需要正确唤醒……为此,提供了一个Endpoint
类型,它会为您处理这个设置。
端点示例
use
{
futures_ringbuf :: { * } ,
futures :: { AsyncWriteExt, AsyncReadExt, executor::block_on } ,
};
#[ async_std::main ]
//
async fn main()
{
// Buffer of 10 bytes in each direction. The buffer size always refers to the writing side, so here
// the first 10 means the server can write 10 bytes before it's buffer is full.
// When it's full it will return pending on writing and when it's empty it returns
// pending on reading.
//
let (mut server, mut client) = Endpoint::pair( 10, 10 );
let data = vec![ 1,2,3 ];
let mut read = [0u8;3];
server.write( &data ).await.expect( "write" );
let n = client.read( &mut read ).await.expect( "read" );
assert_eq!( n , 3 );
assert_eq!( read, vec![ 1,2,3 ][..] );
}
API
API文档可以在docs.rs上找到。
贡献
请查看贡献指南。
测试
cargotest
在WASM上,安装wasm-pack之后
wasm-pack test--firefox--无头
或
wasm-pack test--chrome--无头
行为准则
任何在公民行为准则的第4点“不可接受的行为”中描述的行为都不受欢迎,可能会让您被禁言。如果包括维护者和项目管理员在内的任何人都未能尊重这些/您的限制,您有权指出。
许可
依赖项
~1–1.6MB
~30K SLoC