16个版本 (稳定)
2.0.0 | 2023年8月19日 |
---|---|
2.0.0-beta0 | 2023年8月9日 |
1.4.2 | 2023年8月4日 |
1.2.4 | 2023年7月28日 |
0.1.0 | 2023年3月18日 |
#52 in WebSocket
每月175次下载
67KB
1K SLoC
HardLight
一个安全、实时、低延迟的二进制WebSocket RPC子协议。
use std::time::Instant;
use hardlight::*;
use tokio::time::{sleep, Duration};
use tracing::{info, info_span, Instrument};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
// --- server side code
tokio::spawn(
async move {
let config = ServerConfig::new_self_signed("localhost:8080");
let mut server = Server::new(config, factory!(Handler));
// multiple emitters can be created (just keep calling this)
// this lets you send events to topics (which will be sent to all
// clients subscribed to that topic)
let event_emitter = server.get_event_emitter();
// only one topic notifier can be created
// this will receive notifications about topics being created and
// removed
let mut topic_notifier = server.get_topic_notifier().unwrap();
tokio::spawn(
async move {
while let Some(notif) = topic_notifier.recv().await {
match notif {
TopicNotification::Created(topic) => {
info!("Topic created: {:?}", topic);
event_emitter
.emit(&topic, CounterEvent::ServerSentTest)
.await;
}
TopicNotification::Removed(topic) => {
info!("Topic removed: {:?}", topic);
}
}
}
}
.instrument(info_span!("topic_notifier")),
);
server.run().await.unwrap()
}
.instrument(info_span!("server")),
);
// --- client side code
tokio::spawn(
async move {
// the macros generate a client struct for us
let mut client = CounterClient::new_self_signed(
"localhost:8080",
Compression::default(),
);
client.connect().await.unwrap();
// get a client event receiver
// this will only receive the events from topics the server handler
// has subscribed this client to
let mut subscription = client.subscribe().await.unwrap();
// spawn a task to receive events in the background and print them
tokio::spawn(
async move {
while let Ok(event) = subscription.recv().await {
let (topic, event): (Topic, CounterEvent) =
(event.topic, event.payload.into());
info!(
"Received event on topic {:?}: {:?}",
topic, event
);
}
}
.instrument(info_span!("subscription")),
);
// create a new counter and subscribes the client to it
client.new_counter().await.unwrap();
// find out what topic we are subscribed to
let topic = {
let start = Instant::now();
let mut topic = client.state().await.unwrap().topic.clone();
while topic.as_bytes().is_empty() {
sleep(Duration::from_nanos(100)).await;
topic = client.state().await.unwrap().topic.clone();
}
info!(
"Received state update for topic {:?} after RPC response",
start.elapsed()
);
topic
};
info!("Subscribed to topic {:?}", topic);
// this will emit an "incremented" event to the counter's topic
client.increment(1).await.unwrap();
// this will emit a "decremented" event to the counter's topic
client.decrement(1).await.unwrap();
info!("Counter: {}", client.get().await.unwrap());
info!("{:?}", client.state().await.unwrap());
}
.instrument(info_span!("client")),
)
.await
.unwrap();
}
#[codable]
#[derive(Debug, Clone)]
enum CounterEvent {
/// Emitted when the counter is incremented.
Incremented { to: u32, from: u32 },
/// Emitted when the counter is decremented.
Decremented { to: u32, from: u32 },
/// Emitted when the server sends a test event.
ServerSentTest,
}
/// These RPC methods are executed on the server and can be called by clients.
#[rpc]
trait Counter {
/// Creates a new counter.
async fn new_counter(&self) -> HandlerResult<()>;
/// Increments the counter by the given amount.
async fn increment(&self, amount: u32) -> HandlerResult<()>;
/// Decrements the counter by the given amount.
async fn decrement(&self, amount: u32) -> HandlerResult<()>;
/// Gets the current value of the counter.
async fn get(&self) -> HandlerResult<u32>;
}
#[connection_state]
struct State {
counter: u32,
topic: Topic,
}
#[rpc_handler]
impl Counter for Handler {
async fn new_counter(&self) -> HandlerResult<()> {
let mut state = self.state.write().await;
self.subscriptions.remove(&state.topic);
state.topic = (0..1)
.map(|_| rand::random::<u8>())
.collect::<Vec<_>>()
.into();
state.counter = 0;
self.subscriptions.add(&state.topic);
Ok(())
}
async fn increment(&self, amount: u32) -> HandlerResult<()> {
let mut state = self.state.write().await;
let new = state.counter + amount;
let event = CounterEvent::Incremented {
to: new,
from: state.counter,
};
state.counter = new;
self.events.emit(&state.topic, event).await;
Ok(())
}
async fn decrement(&self, amount: u32) -> HandlerResult<()> {
let mut state = self.state.write().await;
let new = state.counter - amount;
let event = CounterEvent::Decremented {
to: new,
from: state.counter,
};
state.counter = new;
self.events.emit(&state.topic, event).await;
Ok(())
}
async fn get(&self) -> HandlerResult<u32> {
Ok(self.state.read().await.counter)
}
}
什么是HardLight?
HardLight是一个二进制WebSocket RPC子协议。它设计得比gRPC更快(更低的延迟,更大的容量),同时更容易使用且默认更安全。它是基于rkyv(一个零拷贝反序列化库)和tokio-tungstenite(用于服务器/客户端实现)构建的。
HardLight有两种数据模型
- RPC:客户端连接到服务器,并可以调用服务器上的函数
- 事件:服务器可以将指定类型的事件推送到客户端
示例:多个客户端使用事件订阅“chat-topic-abc”主题。连接状态持有用户信息,客户端和服务器之间自动同步。客户端使用RPC函数send_message
发送消息,服务器然后将消息持久化并发送到订阅的客户端。您可以在创建服务器时使用主题通知器来绑定自己的事件基础设施。我们在Valera使用NATS。
HardLight的名字来源于虚构的先行者技术,该技术“允许将光转化为固态,能够承受重量并执行各种任务”。
虽然没有官方的“规范”,但我们的方法类似于比特币核心,其中协议由实现定义。这个实现应该被认为是该协议的“参考实现”,端口应该匹配此实现的行為。
特性跟踪(按优先级排序)
- RPC
- 连接状态
- 宏
- 事件
- WASM支持
- 更好的文档
特性
- 并发RPC:单个连接上可以同时进行多达256个RPC调用
- 事件:非常强大的服务器端事件
- 无需IDL:无需编写
.proto
文件,只需使用Rust类型并在其上添加#[codable]
安装
cargo add hardlight
为什么是WebSockets?
WebSockets实际上在TCP流上几乎没有抽象。参见RFC6455
从概念上讲,WebSocket实际上只是在TCP之上添加了一个层,执行以下操作:
- 为浏览器添加基于Web源的安全模型
- 添加地址和协议命名机制,以支持一个端口上的多个服务和一个IP地址上的多个主机名
- 在TCP之上添加了分层机制,以返回TCP构建的IP数据包机制,但没有长度限制
- 包含一个额外的带内关闭握手,设计用于代理和其他中间件的存在下工作
实际上,我们获得了TLS、广泛采用和防火墙支持(它在与HTTPS一起在TCP 443上运行)的好处,同时几乎没有缺点。这意味着HardLight可以在浏览器中使用,这是我们对该框架的要求。事实上,我们正式支持使用“wasm”功能从浏览器中使用HardLight。
在Valera,我们使用HardLight将Atlas SDK连接到Atlas服务器。
注意:我们正在研究添加QUIC。这不会是一个破坏性更改——我们将支持同一端口号上的WebSockets和QUIC。WS在TCP 443上,QUIC在UDP 443上。
依赖项
~19–32MB
~570K SLoC