#多路复用器 #通道 #帕拉斯 #线程 #奥uroboros #载体 #特定

no-std pallas-multiplexer

使用 mpsc 通道的多线程 Ouroboros 多路复用器实现

55 次发布

0.18.2 2023 年 8 月 24 日
0.18.1 2023 年 4 月 21 日
0.18.0 2023 年 2 月 4 日
0.15.0 2022 年 11 月 13 日
0.3.4 2021 年 12 月 19 日

#2621 in 魔法豆

Download history 56/week @ 2024-03-25 129/week @ 2024-04-01 23/week @ 2024-04-08 33/week @ 2024-04-15 47/week @ 2024-04-22 97/week @ 2024-04-29 66/week @ 2024-05-06 75/week @ 2024-05-13 82/week @ 2024-05-20 34/week @ 2024-05-27 23/week @ 2024-06-03 23/week @ 2024-06-10 41/week @ 2024-06-17 23/week @ 2024-06-24 205/week @ 2024-07-01 37/week @ 2024-07-08

313 每月下载量
用于 11 个 crate(7 个直接使用)

Apache-2.0

225KB
2.5K SLoC

帕拉斯多路复用器

这是 Ouroboros 多路复用器逻辑的实现,该逻辑在 Shelley 网络协议规范 中定义。

架构决策

为这个特定的 Rust 实现,做出了以下架构决策

  • 每个微型协议状态机应能够在自己的线程中工作
  • 一个有界队列应作为缓冲区,以解耦微型协议逻辑与多路复用器工作
  • 实现应支持流水线,即使我们没有当前的使用案例
  • 多路复用器应与微型协议的实现细节无关。

实现细节

根据上述定义,Rust 的 mpsc 通道 似乎是组织和多路复用器进程中不同线程之间通信的正确工具。

以下图表概述了涉及的组件

Multiplexer Diagram

用法

以下代码提供了一个非常粗略的示例,说明如何设置一个客户端连接到节点并启动两个并发线程,它们独立运行并使用 帕拉斯 多路复用器在同一个载体上进行通信。

// Setup a new bearer. In this case, we use a unix socket to connect
// to a node running on the local machine.
let bearer = UnixStream::connect("/tmp/pallas").unwrap();

// Setup a new multiplexer using the created bearer and a specification
// of the mini-protocol IDs that we'll be using for our session. In this case, we
// pass id #0 (handshake) and #2 (chainsync).
let muxer = Multiplexer::setup(tcp, &[0, 2])

// Ask the multiplexer to provide us with the channel for the miniprotocol #0.
let mut channel_0 = muxer.use_channel(0);

// Spawn a thread and pass the ownership of the channel.
thread::spawn(move || {
    // Deconstruct the channel to get a handle for sending data into the muxer
    // ingress and a handle to receive data from the demuxer egress.
    let Channel(mux_tx, demux_rx) = channel_0;

    // Do something with the channel. In this case, we just keep sending
    // dumb data every 50 millis.
    loop {
        let payload = vec![1; 65545];
        tx.send(payload).unwrap();
        thread::sleep(Duration::from_millis(50));
    }
});

// Ask the multiplexer to provide us with the channel for the miniprotocol #2.
let mut channel_2 = muxer.use_channel(2);

// Spawn a different thread and pass the ownership of the 2nd channel.
thread::spawn(move || {
    // Deconstruct the channel to get a handle for sending data into the muxer
    // ingress and a handle to receive data from the demuxer egress.
    let Channel(mux_tx, demux_rx) = channel_2;
    
    // Do something with the channel. In this case, we just print in stdout
    // whatever get received for this mini-protocol.
    loop {
        let payload = rx.recv().unwrap();
        println!("id:{protocol}, length:{}", payload.len());
    }
});

运行示例

要查看两个节点(发送者和监听者)通信的工作示例,请检查 示例文件夹。要运行示例,打开两个不同的终端并在每个终端中运行不同的节点

# on terminal 1, start the listener
RUST_LOG=info cargo run --example listener
# on terminal 2, start the sender
RUST_LOG=info cargo run --example sender

实际应用

为了一个更复杂、现实世界的示例,请查看Oura 仓库,它提供了一个完整的客户端工具,用于从本地或远程节点实时流式传输区块数据。

依赖项

~1.6–2.4MB
~48K SLoC