#channel #async-channel #mpsc #tcp #communication-channel #broadcast-channel #async

tsyncp

基于TCP的消息传递的异步通道API(mpsc、广播、屏障等)

15个版本

0.4.1 2024年6月22日
0.4.0 2024年6月22日
0.3.0 2022年5月14日
0.2.6 2022年4月27日
0.1.7 2022年4月25日

#152 in 网络编程

Download history 278/week @ 2024-06-21 13/week @ 2024-06-28 28/week @ 2024-07-05 120/week @ 2024-07-26 10/week @ 2024-08-02

130 每月下载量
phoenix_gui 中使用

MIT 许可证

460KB
5.5K SLoC

tsyncp

Crates.io MIT licensed

基于TCP的消息传递的同步原语。

主要的Rust库,如stdtokio,提供了用于线程和任务之间消息传递的出色同步原语。然而,很少有库提供类似的API,可以在网络中使用。

Tsyncp通过在TCP上提供类似的API(mpsc、广播、屏障等)来填补这一空白。如果您有一个只有少量服务运行的个人项目,并且它们需要相互传递一些数据;而不是设置一个消息代理服务,您可以使用tsyncp轻松地在它们之间传递数据。

Tsyncp还允许自定义不同的序列化/反序列化方法来编码/解码数据;目前,直接从库中支持的方案是JsonProtobufBincode;然而,用户可以非常容易地实现自己的EncodeMethodDecodeMethod

提供的API

目前,tsyncp提供了5种类型的通道

  • mpsc:多生产者/单消费者通道。
  • broadcast:单生产者/多消费者通道。
  • barrier:确保多个等待者等待屏障释放。
  • channel:用于发送/接收数据的通用单连接通道。可以拆分为SenderReceiver对。
  • multi_channel:用于发送/接收数据的通用多连接通道。可以拆分为SenderReceiver对。

示例

项目的目标是提供简单、直观但可扩展的原始数据通过网络传递。这就是为什么这个库广泛使用Future-chaining的原因。

入门非常简单

mpsc::Receiver

use color_eyre::Result;
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;

#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
    field1: String,
    field2: u64,
    field3: Vec<u8>,
}

#[tokio::main]
async fn main() -> Result<()> {
    let mut rx: mpsc::JsonReceiver<Dummy> = mpsc::receiver_on("localhost:11114").await?;

    // accept a new connection coming from a sender application.
    rx.accept().await?;

    // after accepting connection, you can start receiving data from the receiver.
    if let Some(Ok(item)) = rx.recv().await {
        // below line is to show the type of received item.
        let item: Dummy = item;

        println!("received item: {item:?}");
    }

    Ok(())
}

但您可以通过以下方式轻松扩展它,通过连接future

use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;

#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
    field1: String,
    field2: u64,
    field3: Vec<u8>,
}

#[tokio::main]
async fn main() -> Result<()> {
    let mut rx: mpsc::JsonReceiver<Dummy> = mpsc::receiver_on("localhost:11114")
        .limit(10)                              // limit allowed connections to 10.
        .set_tcp_reuseaddr(true)                // set tcp config reuseaddr to `true`.
        .accept()                               // accept connection. (default: 1)
        .to_limit()                             // accept until limit is reached. (10)
        .handle(|a| println!("{a} connected!")) // print address when a connection is accepted.
        .await?;

    // At this point, the receiver has 10 connections in the connection pool,
    // which all have `reuseaddr` as `true`.
    while let Some(Ok((item, addr))) = rx.recv().with_addr().await {
        println!("received item: {item:?} from {addr}");
    }

    Ok(())
 }

我刚刚吐出了一堆链,但您只需使用适合您的任何链。

mpsc::Sender

use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;

#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
    field1: String,
    field2: u64,
    field3: Vec<u8>,
}

#[tokio::main]
async fn main() -> Result<()> {
    let mut tx: mpsc::JsonSender<Dummy> = mpsc::sender_to("localhost:11114").await?;

    let dummy = Dummy {
        field1: String::from("hello world"),
        field2: 1234567,
        field3: vec![1, 2, 3, 4]
    };

    tx.send(dummy).await?;

    Ok(())
}

但是您也可以通过链式调用将未来进行扩展:

use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use std::time::Duration;
use tsyncp::mpsc;


#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
    field1: String,
    field2: u64,
    field3: Vec<u8>,
}

#[tokio::main]
async fn main() -> Result<()> {
    let mut tx: mpsc::JsonSender<Dummy> = mpsc::sender_to("localhost:11114")
        .retry(Duration::from_millis(500), 100)     // retry connecting 100 times every 500 ms.
        .set_tcp_reuseaddr(true)                    // set tcp config reuseaddr to `true`.
        .await?;

    let dummy = Dummy {
        field1: String::from("hello world"),
        field2: 1234567,
        field3: vec![1, 2, 3, 4]
    };

    // send some item.
    tx.send(dummy).await?;

    Ok(())
}

API 文档([链接](https://docs.rs/tsyncp/))有关于如何使用原语的非常详细的指南。所以请查阅它们!

未来计划

如果足够多的人发现这个库有用或有趣,我主要将致力于以下工作:

  • 单元测试:由于作者时间资源有限,这个库目前缺乏广泛的单元测试。
  • 基准测试:目前,基准测试是在本地和用简单的消息进行的。
  • CI/CD 流:目前,我只手动推送到 git 和 crates.io。
  • 加密流TlsNoise。技术上,这个库已经足够通用,可以支持任何类型的字节流。因此,实现 Tls 和 Noise 流选项应该不会那么困难!(抱有希望...!)
  • 更多原语:一次性、发布/订阅和客户端/服务器原语?
  • 支持其他编码/解码实现:目前,我正在考虑像 rkyvspeedy 这样的库;如果您有其他选择,请让我知道!

注意:Tsyncp 是基于 tokio 构建的;因此,可能与其他异步运行时(如 async-std)不兼容。

注意:如果您担心实现的质最和作者的技术能力,您无需担心!(过于担心...)这个库是在 tokio::net::TcpStream 的基础上,对于 TCP、tokio_util::codec::Framed 对于字节流封包,以及 serdeserde_jsonProst 和 [bincode] 对于序列化字节数据!您可以将其视为由镇上最好的原料制成的沙拉碗,并加上一点自制的酱汁。

话虽如此,我确实在其中加入了一些自己的酱汁。我加入的两种风味是

  • 一个非常简单的 VariedLengthDelimitedCodec 实现,该实现根据字节数据的大小编码/解码不同长度的头部。
  • 一个最小化的 TCP 流池 StreamPool 实现,该实现允许从池中推送和弹出流,从池中流式传输数据,并将数据广播到池中。

您可以查看这些实现的实现,它们都很简单。而且我尽量不让太多的个人观点融入其中!

但是,由于这个库仍然是一个婴儿,如果您在代码中发现了任何问题,请通过 [email protected] 联系我!

警告:Tsyncp 不是一个消息代理,也不是试图成为;它只是一个用于简单和方便用例的消息传递库。

警告:Tsyncp 仍然是 WIP!它非常实用,但仍需要一些编码/解码功能的实现、广泛的测试、文档和示例。

依赖项

~4–13MB
~145K SLoC