4 个版本 (2 个破坏性更新)
0.3.0 | 2022 年 12 月 29 日 |
---|---|
0.2.1 | 2022 年 6 月 7 日 |
0.2.0 | 2021 年 9 月 28 日 |
0.1.0 | 2020 年 8 月 2 日 |
#470 在 并发 中
55KB
579 行
ductile
一个允许同时使用本地内存通道和远程 TCP/Unix 通道(具有相同的接口)的通道实现。
组件
此 crate 提供了一个与 std::sync::mpsc
通道类似的接口。它提供了一个多生产者、单消费者通道,可以在底层使用本地内存通道(由 crossbeam_channel
提供),也可以通过 TCP/Unix 套接字使用网络通道。远程连接也可以使用 ChaCha20 加密。
与 std::sync::mpsc
类似,可以有多个 ChannelSender
,但只能有一个 ChannelReceiver
。通道的两端是对消息类型的泛型,但类型必须匹配(对于本地通道,这仅在编译时进行检查)。如果类型不匹配,将返回错误,并且可能会发生 panic,因为通道已中断。
通道还提供了一种 原始 模式,其中数据未序列化并以原始形式发送,这极大地提高了非结构化数据的性能。需要注意的是,您可以在同一个通道中混合两种模式,但您必须始终以正确的模式接收(您不能使用正常的 recv
方法接收原始数据)。当从多个线程使用克隆的发送器时,应格外小心。
对于远程通道,消息使用 bincode
进行序列化。
用法
以下是您可以如何使用此 crate 的简单示例
简单本地通道
let (tx, rx) = new_local_channel();
tx.send(42u64).unwrap();
tx.send_raw(&vec![1, 2, 3, 4]).unwrap();
let answer = rx.recv().unwrap();
assert_eq!(answer, 42u64);
let data = rx.recv_raw().unwrap();
assert_eq!(data, vec![1, 2, 3, 4]);
具有自定义数据类型的本地通道
请注意,您的类型必须是 Serialize
和 Deserialize
。
#[derive(Serialize, Deserialize)]
struct Thing {
pub x: u32,
pub y: String,
}
let (tx, rx) = new_local_channel();
tx.send(Thing {
x: 42,
y: "foobar".into(),
})
.unwrap();
let thing: Thing = rx.recv().unwrap();
assert_eq!(thing.x, 42);
assert_eq!(thing.y, "foobar");
远程通道
let port = 18452; // let's hope we can bind this port!
let mut server = ChannelServer::bind(("127.0.0.1", port)).unwrap();
// this examples need a second thread since the handshake cannot be done using a single thread
// only
let client_thread = std::thread::spawn(move || {
let (sender, receiver) = connect_channel(("127.0.0.1", port)).unwrap();
sender.send(vec![1, 2, 3, 4]).unwrap();
let data: Vec<i32> = receiver.recv().unwrap();
assert_eq!(data, vec![5, 6, 7, 8]);
sender.send(vec![9, 10, 11, 12]).unwrap();
sender.send_raw(&vec![1, 2, 3, 4, 5, 6, 7, 8, 9]).unwrap();
});
let (sender, receiver, _addr) = server.next().unwrap();
let data: Vec<i32> = receiver.recv().unwrap();
assert_eq!(data, vec![1, 2, 3, 4]);
sender.send(vec![5, 6, 7, 8]).unwrap();
let data = receiver.recv().unwrap();
assert_eq!(data, vec![9, 10, 11, 12]);
let data = receiver.recv_raw().unwrap();
assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
加密的远程通道
let port = 18453;
let enc_key = [69u8; 32];
let mut server = ChannelServer::bind_with_enc(("127.0.0.1", port), enc_key).unwrap();
let client_thread = std::thread::spawn(move || {
let (sender, receiver) = connect_channel_with_enc(("127.0.0.1", port), &enc_key).unwrap();
sender.send(vec![1u8, 2, 3, 4]).unwrap();
let data: Vec<u8> = receiver.recv().unwrap();
assert_eq!(data, vec![5u8, 6, 7, 8]);
sender.send(vec![69u8; 12345]).unwrap();
sender.send_raw(&vec![1, 2, 3, 4, 5, 6, 7, 8, 9]).unwrap();
});
let (sender, receiver, _addr) = server.next().unwrap();
let data: Vec<u8> = receiver.recv().unwrap();
assert_eq!(data, vec![1u8, 2, 3, 4]);
sender.send(vec![5u8, 6, 7, 8]).unwrap();
let data = receiver.recv().unwrap();
assert_eq!(data, vec![69u8; 12345]);
let file = receiver.recv_raw().unwrap();
assert_eq!(file, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
协议
消息
所有 正常(非原始)消息都封装在 ChannelMessage::Message
中,并进行正常序列化和发送。
原始模式下的消息发送方式取决于通道是本地还是远程。如果是本地通道,则没有序列化惩罚,数据直接发送到通道。如果是远程通道,为了避免序列化,首先发送一个包含数据长度的短消息,然后是实际的有效负载(最终可能被加密)。
握手
本地通道不需要握手,因此本节仅涉及远程通道。
有2种握手:一种用于加密通道,另一种用于非加密通道。握手的目的是共享加密信息(如nonce)并检查加密密钥是否正确。
对于加密通道,进行两轮握手
- 双方生成12字节具有加密安全性的随机数据(通道nonce)并加密后发送给对方。
- 接收到通道nonce后,每方加密一个已知常数(一个4字节的魔数)并将其发送回对方。
- 接收到这4个字节后,对其进行解密,如果它们与初始魔数匹配,则密钥可能有效,握手完成。
对于非加密通道,使用相同的握手过程,但使用静态密钥和nonce,只有魔数被加密。所有后续消息都将加密发送。
许可证:MIT
依赖关系
~1.7–2.6MB
~54K SLoC