#二进制 #实时 #服务器 #事件 #安全 #低延迟

hardlight

一个安全、实时、低延迟的二进制WebSocket RPC子协议。可能是构建过的最快的Web兼容RPC框架。

16个版本 (稳定)

2.0.0 2023年8月19日
2.0.0-beta02023年8月9日
1.4.2 2023年8月4日
1.2.4 2023年7月28日
0.1.0 2023年3月18日

#52 in WebSocket

Download history 7/week @ 2024-03-08 5/week @ 2024-03-15 42/week @ 2024-03-29 4/week @ 2024-04-05

每月175次下载

MIT许可协议

67KB
1K SLoC

HardLight Crates.io Docs.rs 聊天

一个安全、实时、低延迟的二进制WebSocket RPC子协议。

use std::time::Instant;

use hardlight::*;
use tokio::time::{sleep, Duration};
use tracing::{info, info_span, Instrument};

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    // --- server side code
    tokio::spawn(
        async move {
            let config = ServerConfig::new_self_signed("localhost:8080");
            let mut server = Server::new(config, factory!(Handler));

            // multiple emitters can be created (just keep calling this)
            // this lets you send events to topics (which will be sent to all
            // clients subscribed to that topic)
            let event_emitter = server.get_event_emitter();
            // only one topic notifier can be created
            // this will receive notifications about topics being created and
            // removed
            let mut topic_notifier = server.get_topic_notifier().unwrap();

            tokio::spawn(
                async move {
                    while let Some(notif) = topic_notifier.recv().await {
                        match notif {
                            TopicNotification::Created(topic) => {
                                info!("Topic created: {:?}", topic);
                                event_emitter
                                    .emit(&topic, CounterEvent::ServerSentTest)
                                    .await;
                            }
                            TopicNotification::Removed(topic) => {
                                info!("Topic removed: {:?}", topic);
                            }
                        }
                    }
                }
                .instrument(info_span!("topic_notifier")),
            );

            server.run().await.unwrap()
        }
        .instrument(info_span!("server")),
    );

    // --- client side code
    tokio::spawn(
        async move {
            // the macros generate a client struct for us
            let mut client = CounterClient::new_self_signed(
                "localhost:8080",
                Compression::default(),
            );

            client.connect().await.unwrap();

            // get a client event receiver
            // this will only receive the events from topics the server handler
            // has subscribed this client to
            let mut subscription = client.subscribe().await.unwrap();

            // spawn a task to receive events in the background and print them
            tokio::spawn(
                async move {
                    while let Ok(event) = subscription.recv().await {
                        let (topic, event): (Topic, CounterEvent) =
                            (event.topic, event.payload.into());
                        info!(
                            "Received event on topic {:?}: {:?}",
                            topic, event
                        );
                    }
                }
                .instrument(info_span!("subscription")),
            );
            // create a new counter and subscribes the client to it

            client.new_counter().await.unwrap();

            // find out what topic we are subscribed to
            let topic = {
                let start = Instant::now();
                let mut topic = client.state().await.unwrap().topic.clone();
                while topic.as_bytes().is_empty() {
                    sleep(Duration::from_nanos(100)).await;
                    topic = client.state().await.unwrap().topic.clone();
                }
                info!(
                    "Received state update for topic {:?} after RPC response",
                    start.elapsed()
                );
                topic
            };
            info!("Subscribed to topic {:?}", topic);

            // this will emit an "incremented" event to the counter's topic
            client.increment(1).await.unwrap();
            // this will emit a "decremented" event to the counter's topic
            client.decrement(1).await.unwrap();

            info!("Counter: {}", client.get().await.unwrap());
            info!("{:?}", client.state().await.unwrap());
        }
        .instrument(info_span!("client")),
    )
    .await
    .unwrap();
}

#[codable]
#[derive(Debug, Clone)]
enum CounterEvent {
    /// Emitted when the counter is incremented.
    Incremented { to: u32, from: u32 },
    /// Emitted when the counter is decremented.
    Decremented { to: u32, from: u32 },
    /// Emitted when the server sends a test event.
    ServerSentTest,
}

/// These RPC methods are executed on the server and can be called by clients.
#[rpc]
trait Counter {
    /// Creates a new counter.
    async fn new_counter(&self) -> HandlerResult<()>;
    /// Increments the counter by the given amount.
    async fn increment(&self, amount: u32) -> HandlerResult<()>;
    /// Decrements the counter by the given amount.
    async fn decrement(&self, amount: u32) -> HandlerResult<()>;
    /// Gets the current value of the counter.
    async fn get(&self) -> HandlerResult<u32>;
}

#[connection_state]
struct State {
    counter: u32,
    topic: Topic,
}

#[rpc_handler]
impl Counter for Handler {
    async fn new_counter(&self) -> HandlerResult<()> {
        let mut state = self.state.write().await;
        self.subscriptions.remove(&state.topic);
        state.topic = (0..1)
            .map(|_| rand::random::<u8>())
            .collect::<Vec<_>>()
            .into();
        state.counter = 0;
        self.subscriptions.add(&state.topic);
        Ok(())
    }

    async fn increment(&self, amount: u32) -> HandlerResult<()> {
        let mut state = self.state.write().await;
        let new = state.counter + amount;
        let event = CounterEvent::Incremented {
            to: new,
            from: state.counter,
        };
        state.counter = new;
        self.events.emit(&state.topic, event).await;
        Ok(())
    }

    async fn decrement(&self, amount: u32) -> HandlerResult<()> {
        let mut state = self.state.write().await;
        let new = state.counter - amount;
        let event = CounterEvent::Decremented {
            to: new,
            from: state.counter,
        };
        state.counter = new;
        self.events.emit(&state.topic, event).await;
        Ok(())
    }

    async fn get(&self) -> HandlerResult<u32> {
        Ok(self.state.read().await.counter)
    }
}

什么是HardLight?

HardLight是一个二进制WebSocket RPC子协议。它设计得比gRPC更快(更低的延迟,更大的容量),同时更容易使用且默认更安全。它是基于rkyv(一个零拷贝反序列化库)和tokio-tungstenite(用于服务器/客户端实现)构建的。

HardLight有两种数据模型

  • RPC:客户端连接到服务器,并可以调用服务器上的函数
  • 事件:服务器可以将指定类型的事件推送到客户端

示例:多个客户端使用事件订阅“chat-topic-abc”主题。连接状态持有用户信息,客户端和服务器之间自动同步。客户端使用RPC函数send_message发送消息,服务器然后将消息持久化并发送到订阅的客户端。您可以在创建服务器时使用主题通知器来绑定自己的事件基础设施。我们在Valera使用NATS。

HardLight的名字来源于虚构的先行者技术,该技术“允许将光转化为固态,能够承受重量并执行各种任务”。

虽然没有官方的“规范”,但我们的方法类似于比特币核心,其中协议由实现定义。这个实现应该被认为是该协议的“参考实现”,端口应该匹配此实现的行為。

特性跟踪(按优先级排序)

  • RPC
  • 连接状态
  • 事件
  • WASM支持
  • 更好的文档

特性

  • 并发RPC:单个连接上可以同时进行多达256个RPC调用
  • 事件:非常强大的服务器端事件
  • 无需IDL:无需编写.proto文件,只需使用Rust类型并在其上添加#[codable]

安装

cargo add hardlight

为什么是WebSockets?

WebSockets实际上在TCP流上几乎没有抽象。参见RFC6455

从概念上讲,WebSocket实际上只是在TCP之上添加了一个层,执行以下操作:

  • 为浏览器添加基于Web源的安全模型
  • 添加地址和协议命名机制,以支持一个端口上的多个服务和一个IP地址上的多个主机名
  • 在TCP之上添加了分层机制,以返回TCP构建的IP数据包机制,但没有长度限制
  • 包含一个额外的带内关闭握手,设计用于代理和其他中间件的存在下工作

实际上,我们获得了TLS、广泛采用和防火墙支持(它在与HTTPS一起在TCP 443上运行)的好处,同时几乎没有缺点。这意味着HardLight可以在浏览器中使用,这是我们对该框架的要求。事实上,我们正式支持使用“wasm”功能从浏览器中使用HardLight。

在Valera,我们使用HardLight将Atlas SDK连接到Atlas服务器。

注意:我们正在研究添加QUIC。这不会是一个破坏性更改——我们将支持同一端口号上的WebSockets和QUIC。WS在TCP 443上,QUIC在UDP 443上。

依赖项

~19–32MB
~570K SLoC