#channel #remote #client-server #rpc #stream #multiplexer #tcp-connection

remoc

🦑 远程复用对象、通道、可观察集合和 RPC,使远程交互无缝。提供多个远程通道和基于 TCP、TLS 或任何其他传输的 RPC。

40 个版本

0.14.0 2024 年 8 月 2 日
0.14.0-pre12024 年 7 月 30 日
0.13.1 2024 年 7 月 14 日
0.11.7 2024 年 3 月 22 日
0.9.7 2021 年 11 月 26 日

#31 in 异步

Download history 65/week @ 2024-05-03 315/week @ 2024-05-10 31/week @ 2024-05-17 410/week @ 2024-05-24 113/week @ 2024-05-31 245/week @ 2024-06-07 415/week @ 2024-06-14 611/week @ 2024-06-21 278/week @ 2024-06-28 356/week @ 2024-07-05 375/week @ 2024-07-12 293/week @ 2024-07-19 361/week @ 2024-07-26 757/week @ 2024-08-02 128/week @ 2024-08-09 164/week @ 2024-08-16

1,461 每月下载量
用于 remoc-obs

Apache-2.0

690KB
13K SLoC

Remoc 🦑 — 远程复用对象和通道

Remoc 使 Rust 程序之间的远程交互变得无缝和顺畅。通过单个底层传输,如 TCP 或 TLS,它提供

Remoc 使用 100% 安全的 Rust 编写,基于 Tokio,与 Serde 支持的任何类型和数据格式一起工作,不依赖于任何特定的传输类型。

crates.io page docs.rs page Apache 2 license codecov

简介

Rust 程序中的一种常见模式是使用通道在线程和异步任务之间进行通信。设置通道只需一行代码,并且在很大程度上避免了共享状态及其相关复杂性。Remoc 通过提供在远程连接上无缝工作的通道,扩展了这种编程模型以用于分布式系统。

为此,它使用 Serde 将数据序列化和反序列化,这些数据在底层传输(可能是 TCP 网络连接、WebSocket、UNIX 管道或甚至是串行链路)中传输。

打开新通道非常简单,只需通过现有通道发送新通道的发送方或接收方一半即可,就像在本地线程和任务之间做的那样。所有通道都通过相同的远程连接进行复用,以数据块的形式传输数据,以避免在传输大消息时一个通道阻塞另一个通道。

在远程通道的基础上,Remoc 允许调用远程函数和闭包。此外,可以将特质设置为远程可调用,并自动生成客户端和服务器实现,类似于经典的远程过程调用(RPC)模型。

向前和向后兼容性

分布式系统通常需要运行不同软件版本的端点相互交互。通过利用类似于 JSON 的自描述数据格式对数据进行编码,以确保具有高度的向前和向后兼容性。

始终可以添加新的字段到枚举和结构体中,并使用 #[serde(default)] 属性来为该字段提供默认值,如果它是由一个不知道它的旧客户端发送的。同样,在接收时,未知的新字段会被静默忽略,这样您就可以在不打扰旧端点的情况下扩展数据格式。

请查看所使用数据格式的文档以获取详细信息。

仓库特性

Remoc 的大部分功能都由仓库特性控制。以下特性是可用的

  • serde 启用 codec 模块,并为所有配置和错误类型实现序列化和反序列化。
  • rch 启用由 rch 模块提供的远程通道。
  • rfn 启用由 rfn 模块提供的远程函数调用。
  • robj 启用由 robj 模块提供的远程对象工具。
  • robs 启用由 robs 模块提供的远程可观察集合。
  • rtc 启用由 rtc 模块提供的远程特质调用。

元特性 full 启用上述所有特性,但不包括编解码器。

以下特性启用了传输数据的数据格式

  • codec-bincode 提供 Bincode 格式。
  • codec-ciborium 提供 CBOR 格式。
  • codec-json 提供 JSON 格式。
  • codec-message-pack 提供 MessagePack 格式。
  • codec-postcard 提供 Postcard 格式。

特性 default-codec-* 选择相应的编解码器作为默认值。最多只能选择一个,并且这应该只由应用程序使用,而不是库。

特性 full-codecs 启用所有编解码器。

默认情况下,所有特性都是启用的,并且使用 JSON 编解码器作为默认值。

支持的 Rust 版本

Remoc 是基于最新的稳定版本构建的。最低支持的 Rust 版本(MSRV)是 1.72。

示例

这是一个简短的示例;要查看完整的远程特质调用(RTC)示例,请参阅 示例目录

以下示例中,服务器监听TCP端口9870,客户端连接到它。然后,双方通过TCP连接使用Connect::io()建立Remoc连接。连接调度器在新任务中创建,并使用已建立的基通道调用client()server()函数。然后,客户端创建一个新的远程MPSC通道,并在计数请求中将它发送给服务器。服务器接收计数请求并在提供的通道上计数。客户端通过新通道接收每个计数值。

use std::net::Ipv4Addr;
use tokio::net::{TcpStream, TcpListener};
use remoc::prelude::*;

#[tokio::main]
async fn main() {
    // For demonstration we run both client and server in
    // the same process. In real life connect_client() and
    // connect_server() would run on different machines.
    futures::join!(connect_client(), connect_server());
}

// This would be run on the client.
// It establishes a Remoc connection over TCP to the server.
async fn connect_client() {
    // Wait for server to be ready.
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    // Establish TCP connection.
    let socket =
        TcpStream::connect((Ipv4Addr::LOCALHOST, 9870)).await.unwrap();
    let (socket_rx, socket_tx) = socket.into_split();

    // Establish Remoc connection over TCP.
    // The connection is always bidirectional, but we can just drop
    // the unneeded receiver.
    let (conn, tx, _rx): (_, _, rch::base::Receiver<()>) =
        remoc::Connect::io(remoc::Cfg::default(), socket_rx, socket_tx)
        .await.unwrap();
    tokio::spawn(conn);

    // Run client.
    client(tx).await;
}

// This would be run on the server.
// It accepts a Remoc connection over TCP from the client.
async fn connect_server() {
    // Listen for incoming TCP connection.
    let listener =
        TcpListener::bind((Ipv4Addr::LOCALHOST, 9870)).await.unwrap();
    let (socket, _) = listener.accept().await.unwrap();
    let (socket_rx, socket_tx) = socket.into_split();

    // Establish Remoc connection over TCP.
    // The connection is always bidirectional, but we can just drop
    // the unneeded sender.
    let (conn, _tx, rx): (_, rch::base::Sender<()>, _) =
        remoc::Connect::io(remoc::Cfg::default(), socket_rx, socket_tx)
        .await.unwrap();
    tokio::spawn(conn);

    // Run server.
    server(rx).await;
}

// User-defined data structures needs to implement Serialize
// and Deserialize.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct CountReq {
    up_to: u32,
    // Most Remoc types like channels can be included in serializable
    // data structures for transmission to remote endpoints.
    seq_tx: rch::mpsc::Sender<u32>,
}

// This would be run on the client.
// It sends a count request to the server and receives each number
// as it is counted over a newly established MPSC channel.
async fn client(mut tx: rch::base::Sender<CountReq>) {
    // By sending seq_tx over an existing remote channel, a new remote
    // channel is automatically created and connected to the server.
    // This all happens inside the existing TCP connection.
    let (seq_tx, mut seq_rx) = rch::mpsc::channel(1);
    tx.send(CountReq { up_to: 4, seq_tx }).await.unwrap();

    // Receive counted numbers over new channel.
    assert_eq!(seq_rx.recv().await.unwrap(), Some(0));
    assert_eq!(seq_rx.recv().await.unwrap(), Some(1));
    assert_eq!(seq_rx.recv().await.unwrap(), Some(2));
    assert_eq!(seq_rx.recv().await.unwrap(), Some(3));
    assert_eq!(seq_rx.recv().await.unwrap(), None);
}

// This would be run on the server.
// It receives a count request from the client and sends each number
// as it is counted over the MPSC channel sender provided by the client.
async fn server(mut rx: rch::base::Receiver<CountReq>) {
    // Receive count request and channel sender to use for counting.
    while let Some(CountReq {up_to, seq_tx}) = rx.recv().await.unwrap()
    {
        for i in 0..up_to {
            // Send each counted number over provided channel.
            seq_tx.send(i).await.unwrap();
        }
    }
}

赞助商

Remoc的部分开发由ENQT GmbH赞助。

ENQT Logo

许可证

Remoc根据Apache 2.0许可证授权。

贡献

除非您明确声明,否则您有意提交以包括在Remoc中的任何贡献均应按Apache 2.0授权,不附加任何额外条款或条件。

依赖项

~4–6.5MB
~114K SLoC