#channel #receiver #send #thread #port #sync #bounded-channel

parking_lot_mpsc

Rust std::sync::mpsc模块的端口,使用parking_lot并发类型而不是标准库中的类型

6个版本

使用旧的Rust 2015

0.1.5 2017年8月20日
0.1.5-alpha.12017年8月1日
0.1.4 2017年7月31日

#791 in 并发

28 每月下载量

Apache-2.0/MIT

215KB
4K SLoC

parking_lot_mpsc

Build Status Build status Crates.io

这是Rust std::sync::mpsc模块的端口,使用parking_lot并发类型,而不是标准库中的类型。

它仍在开发中,可能并不稳定。特别是,现在有多个impl !Sync/!Send行被注释掉了。


lib.rs:

多生产者,单消费者FIFO队列通信原语。

本模块通过通道提供基于消息的通信,具体定义在三种类型之间

一个 SenderSyncSender 用于向 Receiver 发送数据。两个发送者都是可复制的(多生产者),这样多个线程可以同时向一个接收者发送数据(单消费者)。

这些通道有两种形式

  1. 一个异步、无限缓存的通道。函数 channel 将返回一个 (Sender, Receiver) 元组,其中所有发送都将 异步(它们永远不会阻塞)。从概念上讲,通道有一个无限缓冲区。

  2. 一个同步、有限缓存的通道。函数 sync_channel 将返回一个 (SyncSender, Receiver) 元组,其中待处理消息的存储是一个预分配的固定大小的缓冲区。所有发送都将 同步,通过阻塞直到有缓冲区空间可用。请注意,允许边界为0,这将导致通道成为一个“会合”通道,其中每个发送者原子性地将消息传递给接收者。

断开连接

通道上的发送和接收操作将返回一个 Result,表示操作是否成功。不成功的操作通常表明通道的另一部分在相应的线程中“挂起”,因为被丢弃。

一旦一个通道的一半已经被释放,大多数操作将无法继续进行,因此将返回Err。许多应用程序将继续unwrap此模块返回的结果,如果其中一个意外死亡,则会在线程间引发失败的传播。

示例

简单使用

use std::thread;
use parking_lot_mpsc::channel;

// Create a simple streaming channel
let (tx, rx) = channel();
thread::spawn(move|| {
    tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);

共享使用

use std::thread;
use parking_lot_mpsc::channel;

// Create a shared channel that can be sent along from many threads
// where tx is the sending half (tx for transmission), and rx is the receiving
// half (rx for receiving).
let (tx, rx) = channel();
for i in 0..10 {
    let tx = tx.clone();
    thread::spawn(move|| {
        tx.send(i).unwrap();
    });
}

for _ in 0..10 {
    let j = rx.recv().unwrap();
    assert!(0 <= j && j < 10);
}

传播恐慌

use parking_lot_mpsc::channel;

// The call to recv() will return an error because the channel has already
// hung up (or been deallocated)
let (tx, rx) = channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());

同步通道

use std::thread;
use parking_lot_mpsc::sync_channel;

let (tx, rx) = sync_channel::<i32>(0);
thread::spawn(move|| {
    // This will wait for the parent thread to start receiving
    tx.send(53).unwrap();
});
rx.recv().unwrap();

依赖

~1MB
~17K SLoC