18个版本

0.5.6 2024年6月7日
0.5.5 2024年5月31日
0.4.5 2024年5月18日
0.3.4 2024年5月14日

#2 in #renamed


2 crates中使用

MIT 许可证

39KB
755

已重命名为 Krossbar RPC


lib.rs:

用于Krossbar平台通信的RPC库。

该库

  • 接收tokio::net::UnixStream并返回RPC句柄;
  • 允许使用RPC连接进行调用、订阅端点以及发送tokio::net::UnixStream
  • 支持在重新连接后替换流,重新订阅活动订阅,并保持所有客户端句柄有效;
  • 支持通过[监控]进行消息交换监控;

使用rpc::Rpc::poll方法轮询流。这包括等待调用或订阅响应。

示例

RPC调用

use futures::{select, FutureExt};
use tokio::net::UnixStream;

use krossbar_common_rpc::rpc::Rpc;

async fn call() {
let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
let mut rpc = Rpc::new(stream, "hub");

let call = rpc.call::<u32, u32>("echo", &42).await.unwrap();

select! {
response = call.fuse() => {
println!("Call response: {response:?}")
},
_ = rpc.poll().fuse() => {}
}
}

RPC订阅

use futures::{select, FutureExt, StreamExt};
use tokio::net::UnixStream;

use krossbar_common_rpc::rpc::Rpc;

async fn subscribe() {
let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
let mut rpc = Rpc::new(stream, "hub");

let subscription = rpc.subscribe::<u32>("signal").await.unwrap();

select! {
response = subscription.take(2).collect::<Vec<krossbar_common_rpc::Result<u32>>>() => {
println!("Subscription response: {response:?}")
},
_ = rpc.poll().fuse() => {}
}
}

单向消息

use futures::{select, FutureExt};
use tokio::net::UnixStream;

use krossbar_common_rpc::rpc::Rpc;

async fn message() {
let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
let mut rpc = Rpc::new(stream, "hub");

let call = rpc.send_message("echo", &42).await.unwrap();

let incoming_message = rpc.poll().await;
}

轮询传入消息

use futures::{select, FutureExt};
use tokio::net::UnixStream;

use krossbar_common_rpc::{rpc::Rpc, request::Body};

async fn poll() {
let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
let mut rpc = Rpc::new(stream, "hub");

loop {
let request = rpc.poll().await;

if request.is_none() {
println!("Client disconnected");
return;
}

let mut request = request.unwrap();

println!("Incoming method call: {}", request.endpoint());
match request.take_body().unwrap() {
Body::Message(bson) => {
println!("Incoming message: {bson:?}");
},
Body::Call(bson) => {
println!("Incoming call: {bson:?}");
request.respond(Ok(bson)).await;
},
Body::Subscription => {
println!("Incoming subscription");
request.respond(Ok(41)).await;
request.respond(Ok(42)).await;
request.respond(Ok(43)).await;
},
Body::Fd { client_name, .. } => {
println!("Incoming connection request from {client_name}");
request.respond(Ok(())).await;
}
}
}
}

更多示例请见tests/

依赖项

~10–20MB
~274K SLoC