9 个版本

0.2.4 2024 年 8 月 10 日
0.2.3 2024 年 6 月 1 日
0.2.2 2024 年 5 月 12 日
0.2.1 2023 年 9 月 25 日
0.1.0 2020 年 2 月 9 日

并发 中排名 833

Download history 342522/week @ 2024-05-04 399673/week @ 2024-05-11 377043/week @ 2024-05-18 371924/week @ 2024-05-25 421792/week @ 2024-06-01 416474/week @ 2024-06-08 397978/week @ 2024-06-15 400175/week @ 2024-06-22 389322/week @ 2024-06-29 423874/week @ 2024-07-06 421029/week @ 2024-07-13 453075/week @ 2024-07-20 446923/week @ 2024-07-27 446281/week @ 2024-08-03 487693/week @ 2024-08-10 424297/week @ 2024-08-17

每月下载 1,885,048
4,065 个 Crates 中使用 (6 直接使用)

MIT/Apache

40KB
436

piper

Rust 异步程序的单生产者单消费者管道。

许可证

许可协议为以下之一

由您选择。

贡献

除非您明确声明,否则任何有意提交以包含在您的工作中的贡献,如 Apache-2.0 许可证中定义,均应双重许可如上所述,无需附加条款或条件。


lib.rs:

有界单生产者单消费者管道。

此包提供了一种可以异步读取和写入的环形缓冲区。它通过 pipe 函数创建,该函数返回一对 ReaderWriter 处理程序。它们分别实现了 AsyncReadAsyncWrite 特性。

处理程序是单生产者/单消费者;为了明确起见,它们不能被克隆,并且需要 &mut 访问来读取或写入。如果需要多生产者/多消费者处理程序,请考虑将它们包装在 Arc<Mutex<...>> 或类似结构中。

当发送者被丢弃时,管道中剩余的字节仍然可以读取。之后,读取尝试将导致 Ok(0),即它们将始终“成功”读取 0 个字节。

当接收器被丢弃时,管道被关闭,无法再向其中写入字节。进一步的写入将导致返回 Ok(0),即它们总是会“成功”写入 0 字节。

版本 0.2.0 注释

之前,这个crate包含其他同步原语,如有限通道、锁和事件监听器。这些已经被分割到它们自己的crate中

示例

异步任务

在可能的线程上,在异步任务之间进行通信。

use async_channel::unbounded;
use async_executor::Executor;
use easy_parallel::Parallel;
use futures_lite::{future, prelude::*};
use std::time::Duration;


// Create a pair of handles.
let (mut reader, mut writer) = piper::pipe(1024);

// Create the executor.
let ex = Executor::new();
let (signal, shutdown) = unbounded::<()>();

// Spawn a detached task for random data to the pipe.
let writer = ex.spawn(async move {
    for _ in 0..1_000 {
        // Generate 8 random numnbers.
        let random = fastrand::u64(..).to_le_bytes();

        // Write them to the pipe.
        writer.write_all(&random).await.unwrap();

        // Wait a bit.
        async_io::Timer::after(Duration::from_millis(5)).await;
    }

    // Drop the writer to close the pipe.
    drop(writer);
});

// Detach the task so that it runs in the background.
writer.detach();

// Spawn a task for reading from the pipe.
let reader = ex.spawn(async move {
    let mut buf = vec![];

    // Read all bytes from the pipe.
    reader.read_to_end(&mut buf).await.unwrap();

    println!("Random data: {:#?}", buf);
});

Parallel::new()
    // Run four executor threads.
    .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
    // Run the main future on the current thread.
    .finish(|| future::block_on(async {
        // Wait for the reader to finish.
        reader.await;

        // Signal the executor threads to shut down.
        drop(signal);
    }));

阻塞I/O

文件I/O是阻塞的;因此,在async代码中,必须在另一个线程上运行它。此示例生成另一个线程来读取文件并将其写入管道。

use futures_lite::{future, prelude::*};
use std::fs::File;
use std::io::prelude::*;
use std::thread;

// Create a pair of handles.
let (mut r, mut w) = piper::pipe(1024);

// Spawn a thread for reading a file.
thread::spawn(move || {
    let mut file = File::open("Cargo.toml").unwrap();

    // Read the file into a buffer.
    let mut buf = [0u8; 16384];
    future::block_on(async move {
        loop {
            // Read a chunk of bytes from the file.
            // Blocking is okay here, since this is a separate thread.
            let n = file.read(&mut buf).unwrap();
            if n == 0 {
                break;
            }

            // Write the chunk to the pipe.
            w.write_all(&buf[..n]).await.unwrap();
        }

        // Close the pipe.
        drop(w);
    });
});

// Read bytes from the pipe.
let mut buf = vec![];
r.read_to_end(&mut buf).await.unwrap();

println!("Read {} bytes", buf.len());

然而,较低级别的poll_fillpoll_drain方法分别接受impl Readimpl Write参数。这允许您完全跳过缓冲区,并直接从文件到管道进行读写。当可能时,应首选此方法,因为它避免了额外的复制。

// In the `future::block_on` call above...
loop {
    let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
    if n == 0 {
        break;
    }
}

在此用例中,首选blocking crate,因为它使用更高效的线程管理和管道策略。

依赖项

~43–295KB