15个版本
0.4.1 | 2024年6月22日 |
---|---|
0.4.0 | 2024年6月22日 |
0.3.0 | 2022年5月14日 |
0.2.6 | 2022年4月27日 |
0.1.7 | 2022年4月25日 |
#152 in 网络编程
130 每月下载量
在 phoenix_gui 中使用
460KB
5.5K SLoC
tsyncp
基于TCP的消息传递的同步原语。
主要的Rust库,如std和tokio,提供了用于线程和任务之间消息传递的出色同步原语。然而,很少有库提供类似的API,可以在网络中使用。
Tsyncp通过在TCP上提供类似的API(mpsc、广播、屏障等)来填补这一空白。如果您有一个只有少量服务运行的个人项目,并且它们需要相互传递一些数据;而不是设置一个消息代理服务,您可以使用tsyncp轻松地在它们之间传递数据。
Tsyncp还允许自定义不同的序列化/反序列化方法来编码/解码数据;目前,直接从库中支持的方案是Json、Protobuf和Bincode;然而,用户可以非常容易地实现自己的EncodeMethod和DecodeMethod。
提供的API
目前,tsyncp提供了5种类型的通道
- mpsc:多生产者/单消费者通道。
- broadcast:单生产者/多消费者通道。
- barrier:确保多个等待者等待屏障释放。
- channel:用于发送/接收数据的通用单连接通道。可以拆分为Sender和Receiver对。
- multi_channel:用于发送/接收数据的通用多连接通道。可以拆分为Sender和Receiver对。
示例
项目的目标是提供简单、直观但可扩展的原始数据通过网络传递。这就是为什么这个库广泛使用Future-chaining的原因。
入门非常简单
mpsc::Receiver
use color_eyre::Result;
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;
#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
field1: String,
field2: u64,
field3: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut rx: mpsc::JsonReceiver<Dummy> = mpsc::receiver_on("localhost:11114").await?;
// accept a new connection coming from a sender application.
rx.accept().await?;
// after accepting connection, you can start receiving data from the receiver.
if let Some(Ok(item)) = rx.recv().await {
// below line is to show the type of received item.
let item: Dummy = item;
println!("received item: {item:?}");
}
Ok(())
}
但您可以通过以下方式轻松扩展它,通过连接future
use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;
#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
field1: String,
field2: u64,
field3: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut rx: mpsc::JsonReceiver<Dummy> = mpsc::receiver_on("localhost:11114")
.limit(10) // limit allowed connections to 10.
.set_tcp_reuseaddr(true) // set tcp config reuseaddr to `true`.
.accept() // accept connection. (default: 1)
.to_limit() // accept until limit is reached. (10)
.handle(|a| println!("{a} connected!")) // print address when a connection is accepted.
.await?;
// At this point, the receiver has 10 connections in the connection pool,
// which all have `reuseaddr` as `true`.
while let Some(Ok((item, addr))) = rx.recv().with_addr().await {
println!("received item: {item:?} from {addr}");
}
Ok(())
}
我刚刚吐出了一堆链,但您只需使用适合您的任何链。
mpsc::Sender
use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use tsyncp::mpsc;
#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
field1: String,
field2: u64,
field3: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut tx: mpsc::JsonSender<Dummy> = mpsc::sender_to("localhost:11114").await?;
let dummy = Dummy {
field1: String::from("hello world"),
field2: 1234567,
field3: vec![1, 2, 3, 4]
};
tx.send(dummy).await?;
Ok(())
}
但是您也可以通过链式调用将未来进行扩展:
use color_eyre::{Result, Report};
use serde::{Serialize, Deserialize};
use std::time::Duration;
use tsyncp::mpsc;
#[derive(Debug, Serialize, Deserialize)]
struct Dummy {
field1: String,
field2: u64,
field3: Vec<u8>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut tx: mpsc::JsonSender<Dummy> = mpsc::sender_to("localhost:11114")
.retry(Duration::from_millis(500), 100) // retry connecting 100 times every 500 ms.
.set_tcp_reuseaddr(true) // set tcp config reuseaddr to `true`.
.await?;
let dummy = Dummy {
field1: String::from("hello world"),
field2: 1234567,
field3: vec![1, 2, 3, 4]
};
// send some item.
tx.send(dummy).await?;
Ok(())
}
API 文档([链接](https://docs.rs/tsyncp/))有关于如何使用原语的非常详细的指南。所以请查阅它们!
未来计划
如果足够多的人发现这个库有用或有趣,我主要将致力于以下工作:
- 单元测试:由于作者时间资源有限,这个库目前缺乏广泛的单元测试。
- 基准测试:目前,基准测试是在本地和用简单的消息进行的。
- CI/CD 流:目前,我只手动推送到 git 和 crates.io。
- 加密流:Tls 和 Noise。技术上,这个库已经足够通用,可以支持任何类型的字节流。因此,实现 Tls 和 Noise 流选项应该不会那么困难!(抱有希望...!)
- 更多原语:一次性、发布/订阅和客户端/服务器原语?
- 支持其他编码/解码实现:目前,我正在考虑像 rkyv 和 speedy 这样的库;如果您有其他选择,请让我知道!
注意:Tsyncp 是基于 tokio 构建的;因此,可能与其他异步运行时(如 async-std)不兼容。
注意:如果您担心实现的质最和作者的技术能力,您无需担心!(过于担心...)这个库是在 tokio::net::TcpStream 的基础上,对于 TCP、tokio_util::codec::Framed 对于字节流封包,以及 serde、serde_json、Prost 和 [bincode] 对于序列化字节数据!您可以将其视为由镇上最好的原料制成的沙拉碗,并加上一点自制的酱汁。
话虽如此,我确实在其中加入了一些自己的酱汁。我加入的两种风味是
- 一个非常简单的 VariedLengthDelimitedCodec 实现,该实现根据字节数据的大小编码/解码不同长度的头部。
- 一个最小化的 TCP 流池 StreamPool 实现,该实现允许从池中推送和弹出流,从池中流式传输数据,并将数据广播到池中。
您可以查看这些实现的实现,它们都很简单。而且我尽量不让太多的个人观点融入其中!
但是,由于这个库仍然是一个婴儿,如果您在代码中发现了任何问题,请通过 [email protected] 联系我!
警告:Tsyncp 不是一个消息代理,也不是试图成为;它只是一个用于简单和方便用例的消息传递库。
警告:Tsyncp 仍然是 WIP!它非常实用,但仍需要一些编码/解码功能的实现、广泛的测试、文档和示例。
依赖项
~4–13MB
~145K SLoC