#pub-sub #zero-copy #inter-process #request-response #shared-memory #low-latency

iceoryx-rs

为 Eclipse iceoryx 提供安全的 Rust 绑定 - 真正的零拷贝进程间通信

4 个版本

0.1.0 2022 年 6 月 30 日
0.0.13 2022 年 6 月 29 日
0.0.12 2022 年 6 月 29 日
0.0.11 2022 年 6 月 28 日

#651 in 数据结构


iceray 中使用

Apache-2.0

81KB
2K SLoC

iceoryx-rs

Crates.io License Build & Test Codecov

Eclipse iceoryx 提供安全的 Rust 绑定。

  1. 关于
  2. 示例
  3. API 示例
  4. 限制

关于

Eclipse iceoryx 是一个真正的零拷贝、进程间通信框架,旨在通过其高数据吞吐量和低延迟需求来提升自动驾驶。凭借其带宽和速度,iceoryx 还非常适合其他需要低延迟和传输大数据结构的领域。如果您想了解更多关于 Eclipse iceoryx 的信息,可以查看 iceoryx.io 上的 入门 部分,或者查看主项目的 README.md

Rust 绑定仍在开发中,目前仅支持 pub-sub 消息模式。未来的版本将填补这一空白,目标是使 Rust 绑定成为 iceoryx 生态系统中的第一类公民。

这个项目最初的目标是创建一个看起来很酷的 Rust TUI 交互式程序,最终导致了 iceray。去看看吧。

示例

在运行示例之前,您必须先使用以下命令构建它们:

cargo build --all --examples

为了运行 iceoryx 应用程序,必须运行 RouDi 守护进程。

如何启动 RouDi

RouDi 是一个中心守护进程,负责资源管理和连接已注册的服务。在注册阶段之后,它不再参与通信。

您可以在 这里 找到有关 RouDi 的更多信息,如果您更倾向于视觉,只需在页面上稍微向上滚动一点,就可以看到一个令人惊叹的动画。

如果您在系统上安装了 iceoryx,您可以使用该安装中的 iox-roudi 二进制文件。如果没有,您可以从您的 crate 根目录运行以下命令来运行 RouDi

find target -type f -wholename "*/iceoryx-install/bin/iox-roudi" -exec {} \;

运行简单的发布者和订阅者示例

《publisher_simple》和《subscriber_simple》示例展示了典型的进程间通信用例。

要演示进程间通信,一个很好的入门示例就是《publisher_simple》和《subscriber_simple》。

您可以在不同的终端中运行发布者和订阅者,执行以下命令:

cargo run --example publisher_simple
cargo run --example subscriber_simple

您应该会看到发布者发送的消息,

Sending: 0
Sending: 1
Sending: 2
Sending: 3
Sending: 4

以及它们是如何被订阅者接收的。

Receiving: 0
Receiving: 1
Receiving: 2
Receiving: 3
Receiving: 4

您还可能看到这样的输出:

[Warning]: IPC channel still there, doing an unlink of [app_name]

不用担心,这是由于使用Ctrl+C中断了之前的运行。为了防止这种输出,必须注册信号处理程序并优雅地关闭应用程序。《RouDi》守护进程将自动清理异常终止应用程序的共享资源。应用程序在重新启动时也会清理自己的残留数据,因此出现了这些输出。

如果《RouDi》没有运行,您将得到以下输出:

[Warning]: RouDi not found - waiting ...

经过一段等待时间后,应用程序将关闭。

API 示例

这是一份简短的API指南,介绍如何编写一个简单的应用程序。

我们从cargo new开始。

cargo new --bin hypnotoad

Cargo.toml清单文件中,我们创建了两个二进制文件并添加了iceoryx-rs依赖项。

[[bin]]
name = "publisher"
path = "src/publisher.rs"

[[bin]]
name = "subscriber"
path = "src/subscriber.rs"

[dependencies]
iceoryx-rs = "0.1"

现在让我们定义要传输的数据,并将它们存储在src/topic.rs中。

use iceoryx_rs::marker::ShmSend;

#[repr(C)]
#[derive(Default)]
pub struct Counter {
    pub counter: u32,
}

unsafe impl ShmSend for Counter {}

ShmSend标记特质用于可以通过共享内存传输的类型,类似于用于可以在线程边界间传输的类型的Send标记特质。

实现ShmSend的类型必须满足以下约束

  • 不使用堆
  • 数据结构完全包含在共享内存中 - 没有指向进程本地内存的指针,没有指向进程本地结构的引用,没有动态分配器
  • 数据结构必须是可重定位的,因此不能内部使用指针/引用
  • 该类型不能实现Drop;当内存释放时,由于内存可能位于没有写入权限的订阅者的shm段中,因此不会调用drop。一般来说,可以实现Copy特质的类型都满足这些要求。

数据类型还有#[repr(C)]属性,以便能够与C和C++应用程序通信,并实现Default特质。如果未实现Default特质,必须使用unsafe API来借用样本。

接下来是src/publisher.rs文件。

use iceoryx_rs::PublisherBuilder;
use iceoryx_rs::Runtime;

use std::error::Error;
use std::thread;
use std::time::Duration;

mod topic;
use topic::Counter;

fn main() -> Result<(), Box<dyn Error>> {
    Runtime::init("publisher");

    let publisher = PublisherBuilder::<Counter>::new("all", "glory", "hypnotoad").create()?;

    let mut counter = 0u32;
    loop {
        let mut sample = publisher.loan()?;
        sample.counter = counter;
        publisher.publish(sample);

        println!("Send praise hypnotoad: {}", counter);
        counter += 1;

        thread::sleep(Duration::from_millis(1000));
    }
}

首先要做的是初始化iceoryx的Runtime。这将在中央RouDi守护进程中进行注册,并接受应用程序名称作为参数。

然后,通过指定服务、事件和实例ID,使用PublisherBuilder创建Publisher。这些可以是任意字符串,用于匹配发布者和订阅者。

发布者代码在一个循环中完成,每秒借用并发布一个样本。

最后,我们创建src/subscriber.rs文件。

use iceoryx_rs::Runtime;
use iceoryx_rs::SubscriberBuilder;

use std::error::Error;
use std::thread;
use std::time::Duration;

mod topic;
use topic::Counter;

fn main() -> Result<(), Box<dyn Error>> {
    Runtime::init("subscriber");

    let (subscriber, sample_receive_token) =
        SubscriberBuilder::<Counter>::new("all", "glory", "hypnotoad")
            .queue_capacity(5)
            .create()?;

    let sample_receiver = subscriber.get_sample_receiver(sample_receive_token);

    loop {
        if let Some(sample) = sample_receiver.take() {
            println!("Receiving praise hypnotoad: {}", sample.counter);
        } else {
            thread::sleep(Duration::from_millis(100));
        }
    }
}

与发布者应用类似,首先需要初始化 Runtime

使用 SubscriberBuilder 创建一个 Subscriber 和一个 SampleReceiveToken,指定与发布者相同的三个服务、事件和实例 ID 字符串。

在进入循环之前,获取一个 SampleReceiver。在循环中,我们 take 样本,直到接收者队列为空,并打印我们接收到的数据。如果没有数据,线程将暂停一秒钟,然后我们尝试获取新的样本。

完成了。让我们运行我们的代码。

  1. 使用以下命令启动 RouDi
  2. 使用以下命令启动 publisher
  3. 使用以下命令启动 subscriber

请查看仓库中的示例。它包含额外的示例,展示如何借出未初始化的样本,以及如何使用 SampleReceiverwait_for_samples 方法来接收新样本的通知。

限制

目前,仅支持 Eclipse iceoryx v2.0 的一部分,并且缺少一些功能。

支持

  • pub-sub 消息模式
  • 访问内存使用和可用的发布者和订阅者等内省主题

缺少

  • 用户定义的 pub-sub 数据头
  • 请求-响应消息模式
  • ListenerWaitSet
  • 查找可用的服务,即 ServiceDiscovery

依赖项

~0.9–2MB
~37K SLoC