9 个版本
0.2.4 | 2024 年 8 月 10 日 |
---|---|
0.2.3 | 2024 年 6 月 1 日 |
0.2.2 | 2024 年 5 月 12 日 |
0.2.1 | 2023 年 9 月 25 日 |
0.1.0 | 2020 年 2 月 9 日 |
在 并发 中排名 833
每月下载 1,885,048 次
在 4,065 个 Crates 中使用 (6 直接使用)
40KB
436 行
piper
Rust 异步程序的单生产者单消费者管道。
许可证
许可协议为以下之一
- Apache License,版本 2.0 (LICENSE-APACHE 或 https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT 许可证 (LICENSE-MIT 或 https://opensource.org/licenses/MIT)
由您选择。
贡献
除非您明确声明,否则任何有意提交以包含在您的工作中的贡献,如 Apache-2.0 许可证中定义,均应双重许可如上所述,无需附加条款或条件。
lib.rs
:
有界单生产者单消费者管道。
此包提供了一种可以异步读取和写入的环形缓冲区。它通过 pipe
函数创建,该函数返回一对 Reader
和 Writer
处理程序。它们分别实现了 AsyncRead
和 AsyncWrite
特性。
处理程序是单生产者/单消费者;为了明确起见,它们不能被克隆,并且需要 &mut
访问来读取或写入。如果需要多生产者/多消费者处理程序,请考虑将它们包装在 Arc<Mutex<...>>
或类似结构中。
当发送者被丢弃时,管道中剩余的字节仍然可以读取。之后,读取尝试将导致 Ok(0)
,即它们将始终“成功”读取 0 个字节。
当接收器被丢弃时,管道被关闭,无法再向其中写入字节。进一步的写入将导致返回 Ok(0)
,即它们总是会“成功”写入 0 字节。
版本 0.2.0 注释
之前,这个crate包含其他同步原语,如有限通道、锁和事件监听器。这些已经被分割到它们自己的crate中
示例
异步任务
在可能的线程上,在异步任务之间进行通信。
use async_channel::unbounded;
use async_executor::Executor;
use easy_parallel::Parallel;
use futures_lite::{future, prelude::*};
use std::time::Duration;
// Create a pair of handles.
let (mut reader, mut writer) = piper::pipe(1024);
// Create the executor.
let ex = Executor::new();
let (signal, shutdown) = unbounded::<()>();
// Spawn a detached task for random data to the pipe.
let writer = ex.spawn(async move {
for _ in 0..1_000 {
// Generate 8 random numnbers.
let random = fastrand::u64(..).to_le_bytes();
// Write them to the pipe.
writer.write_all(&random).await.unwrap();
// Wait a bit.
async_io::Timer::after(Duration::from_millis(5)).await;
}
// Drop the writer to close the pipe.
drop(writer);
});
// Detach the task so that it runs in the background.
writer.detach();
// Spawn a task for reading from the pipe.
let reader = ex.spawn(async move {
let mut buf = vec![];
// Read all bytes from the pipe.
reader.read_to_end(&mut buf).await.unwrap();
println!("Random data: {:#?}", buf);
});
Parallel::new()
// Run four executor threads.
.each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
// Run the main future on the current thread.
.finish(|| future::block_on(async {
// Wait for the reader to finish.
reader.await;
// Signal the executor threads to shut down.
drop(signal);
}));
阻塞I/O
文件I/O是阻塞的;因此,在async
代码中,必须在另一个线程上运行它。此示例生成另一个线程来读取文件并将其写入管道。
use futures_lite::{future, prelude::*};
use std::fs::File;
use std::io::prelude::*;
use std::thread;
// Create a pair of handles.
let (mut r, mut w) = piper::pipe(1024);
// Spawn a thread for reading a file.
thread::spawn(move || {
let mut file = File::open("Cargo.toml").unwrap();
// Read the file into a buffer.
let mut buf = [0u8; 16384];
future::block_on(async move {
loop {
// Read a chunk of bytes from the file.
// Blocking is okay here, since this is a separate thread.
let n = file.read(&mut buf).unwrap();
if n == 0 {
break;
}
// Write the chunk to the pipe.
w.write_all(&buf[..n]).await.unwrap();
}
// Close the pipe.
drop(w);
});
});
// Read bytes from the pipe.
let mut buf = vec![];
r.read_to_end(&mut buf).await.unwrap();
println!("Read {} bytes", buf.len());
然而,较低级别的poll_fill
和poll_drain
方法分别接受impl Read
和impl Write
参数。这允许您完全跳过缓冲区,并直接从文件到管道进行读写。当可能时,应首选此方法,因为它避免了额外的复制。
// In the `future::block_on` call above...
loop {
let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
if n == 0 {
break;
}
}
在此用例中,首选blocking
crate,因为它使用更高效的线程管理和管道策略。
依赖项
~43–295KB