55 次发布
0.18.2 | 2023 年 8 月 24 日 |
---|---|
0.18.1 | 2023 年 4 月 21 日 |
0.18.0 | 2023 年 2 月 4 日 |
0.15.0 | 2022 年 11 月 13 日 |
0.3.4 | 2021 年 12 月 19 日 |
#2621 in 魔法豆
313 每月下载量
用于 11 个 crate(7 个直接使用)
225KB
2.5K SLoC
帕拉斯多路复用器
这是 Ouroboros 多路复用器逻辑的实现,该逻辑在 Shelley 网络协议规范 中定义。
架构决策
为这个特定的 Rust 实现,做出了以下架构决策
- 每个微型协议状态机应能够在自己的线程中工作
- 一个有界队列应作为缓冲区,以解耦微型协议逻辑与多路复用器工作
- 实现应支持流水线,即使我们没有当前的使用案例
- 多路复用器应与微型协议的实现细节无关。
实现细节
根据上述定义,Rust 的 mpsc 通道 似乎是组织和多路复用器进程中不同线程之间通信的正确工具。
以下图表概述了涉及的组件
用法
以下代码提供了一个非常粗略的示例,说明如何设置一个客户端连接到节点并启动两个并发线程,它们独立运行并使用 帕拉斯 多路复用器在同一个载体上进行通信。
// Setup a new bearer. In this case, we use a unix socket to connect
// to a node running on the local machine.
let bearer = UnixStream::connect("/tmp/pallas").unwrap();
// Setup a new multiplexer using the created bearer and a specification
// of the mini-protocol IDs that we'll be using for our session. In this case, we
// pass id #0 (handshake) and #2 (chainsync).
let muxer = Multiplexer::setup(tcp, &[0, 2])
// Ask the multiplexer to provide us with the channel for the miniprotocol #0.
let mut channel_0 = muxer.use_channel(0);
// Spawn a thread and pass the ownership of the channel.
thread::spawn(move || {
// Deconstruct the channel to get a handle for sending data into the muxer
// ingress and a handle to receive data from the demuxer egress.
let Channel(mux_tx, demux_rx) = channel_0;
// Do something with the channel. In this case, we just keep sending
// dumb data every 50 millis.
loop {
let payload = vec![1; 65545];
tx.send(payload).unwrap();
thread::sleep(Duration::from_millis(50));
}
});
// Ask the multiplexer to provide us with the channel for the miniprotocol #2.
let mut channel_2 = muxer.use_channel(2);
// Spawn a different thread and pass the ownership of the 2nd channel.
thread::spawn(move || {
// Deconstruct the channel to get a handle for sending data into the muxer
// ingress and a handle to receive data from the demuxer egress.
let Channel(mux_tx, demux_rx) = channel_2;
// Do something with the channel. In this case, we just print in stdout
// whatever get received for this mini-protocol.
loop {
let payload = rx.recv().unwrap();
println!("id:{protocol}, length:{}", payload.len());
}
});
运行示例
要查看两个节点(发送者和监听者)通信的工作示例,请检查 示例文件夹。要运行示例,打开两个不同的终端并在每个终端中运行不同的节点
# on terminal 1, start the listener
RUST_LOG=info cargo run --example listener
# on terminal 2, start the sender
RUST_LOG=info cargo run --example sender
实际应用
为了一个更复杂、现实世界的示例,请查看Oura 仓库,它提供了一个完整的客户端工具,用于从本地或远程节点实时流式传输区块数据。
依赖项
~1.6–2.4MB
~48K SLoC