1 个不稳定版本
1.0.0 |
|
---|---|
0.5.0 | 2024 年 8 月 23 日 |
0.4.0 |
|
0.3.0 |
|
64 在 WebSocket 中
298 每月下载量
60KB
906 行
socket-flow
为 Tokio 栈提供的简单异步 WebSocket 实现。
简介
这个库旨在提供简单的 WebSocket 实现,以便最终用户可以使用它将 WebSocket 服务器/客户端封装到他们的应用程序中,以提供一种流畅的方式将其设置到其代码中。
它是一个基于 tokio 运行的异步库,在幕后使用 tokio TcpStream,使用它作为实现 WebSocket 协议标准的起点,执行握手、读取帧、解析掩码、处理操作码和内部有效载荷。
可以作为客户端或服务器使用,返回一个 WSConnection
,该连接实现了 Stream
特性,因此您可以连续消费传入的消息或发送消息。
背后的动机是提供一个简单的方法,在您的应用程序中拥有 WebSocket 连接,同时作为参考使用广泛建立的库,如 tungstenite-rs
和 tokio-tungstenite
特性
实现了大部分 WebSocket RFC 特性,如
- 握手过程、密钥解析和生成
- 处理操作码,如
Text
、Binary
、Ping
、Pong
和Continue
- 多个订阅
- 可伸缩性
- 错误处理
- 通过了 autobahn-test-suite
待添加的特性
- TLS/SSL 支持
- 压缩
使用方法
在您的 Cargo.toml
中添加以下内容
[dependencies]
socket-flow = "*"
使用示例
此仓库包含不同的示例和使用其依赖项的灵活方式来设计代码,以满足最终用户的需求。
我们有从头开始配置所有内容的选项,创建 TcpListener 并管理 WebSocket 连接,同时还有一个即插即用的选项,您可以以更少的代码生成 WebSocket 服务器。
即插即用服务器
这是一个非常实用的例子,因为它允许你通过调用 start_server
函数来启动一个服务器,该函数返回一个 EventStream
,用于消费服务器事件,如新的连接、消息、错误和断开连接。
use futures::StreamExt;
use log::*;
use socket_flow::event::{Event, ID};
use socket_flow::server::start_server;
use socket_flow::split::WSWriter;
use std::collections::HashMap;
#[tokio::main]
async fn main() {
env_logger::init();
let port: u16 = 8080;
match start_server(8080).await {
Ok(mut event_receiver) => {
let mut clients: HashMap<ID, WSWriter> = HashMap::new();
info!("Server started on address 127.0.0.1:{}", port);
while let Some(event) = event_receiver.next().await {
match event {
Event::NewClient(id, client_conn) => {
info!("New client {} connected", id);
clients.insert(id, client_conn);
}
Event::NewMessage(client_id, message) => {
info!("Message from client {}: {:?}", client_id, message);
let ws_writer = clients.get_mut(&client_id).unwrap();
ws_writer.send_message(message).await.unwrap();
}
Event::Disconnect(client_id) => {
info!("Client {} disconnected", client_id);
clients.remove(&client_id);
}
Event::Error(client_id, error) => {
error!("Error occurred for client {}: {:?}", client_id, error);
}
}
}
}
Err(err) => {
eprintln!("Could not start the server due to: {:?}", err);
}
}
}
要运行此示例,您可以克隆仓库并执行
cargo run --color=always --package socket-flow --example simple_server
回声服务器
以下是一个回声服务器示例,您也可以在:示例中找到
use futures::StreamExt;
use log::*;
use socket_flow::handshake::accept_async;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
async fn handle_connection(_: SocketAddr, stream: TcpStream) {
match accept_async(stream).await {
Ok(mut ws_connection) => {
while let Some(result) = ws_connection.next().await {
match result {
Ok(frame) => {
if ws_connection.send_frame(frame).await.is_err() {
error!("Failed to send message");
break;
}
}
Err(e) => {
error!("Received error from the stream: {}", e);
break;
}
}
}
}
Err(err) => error!("Error when performing handshake: {}", err),
}
}
#[tokio::main]
async fn main() {
env_logger::init();
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
info!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", peer);
tokio::spawn(handle_connection(peer, stream));
}
}
要运行此示例,您可以克隆仓库并执行
cargo run --color=always --package socket-flow --example echo_server
此示例创建了一个 TcpListener,将其绑定到端口,接受连接,在 tokio 任务中处理每个连接,以并发处理客户端。handle_connection 函数确保执行握手过程,返回一个 WSConnection
,它实现了 Stream
trait,其中您可以消费此客户端的传入数据,并在套接字中执行写入操作。它包括通过 Result
的错误处理。
简单客户端
以下是一个运行客户端的示例,该客户端将执行一些操作并优雅地断开连接
use futures::StreamExt;
use rand::distr::Alphanumeric;
use rand::{thread_rng, Rng};
use log::*;
use socket_flow::handshake::connect_async;
use tokio::select;
use tokio::time::{interval, Duration};
async fn handle_connection(addr: &str) {
match connect_async(addr).await {
Ok(mut ws_connection) => {
let mut ticker = interval(Duration::from_secs(5));
// it will be used for closing the connection
let mut counter = 0;
loop {
select! {
Some(result) = ws_connection.next() => {
match result {
Ok(message) => {
info!("Received message: {}", message.as_text().unwrap());
counter = counter + 1;
// close the connection if 3 messages have already been sent and received
if counter >= 3 {
if ws_connection.close_connection().await.is_err() {
error!("Error occurred when closing connection");
}
break;
}
}
Err(err) => {
error!("Received error from the stream: {}", err);
break;
}
}
}
_ = ticker.tick() => {
let random_string = generate_random_string();
let binary_data = Vec::from(random_string);
if ws_connection.send(binary_data).await.is_err() {
eprintln!("Failed to send message");
break;
}
}
}
}
}
Err(err) => error!("Error when performing handshake: {}", err),
}
}
#[tokio::main]
async fn main() {
env_logger::init();
handle_connection("ws://127.0.0.1:9002").await;
}
fn generate_random_string() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect()
}
由于您需要一个服务器来测试客户端,您可以执行我们的回声服务器示例,并在另一个选项卡中执行客户端示例
cargo run --color=always --package socket-flow --example client
在此示例中,客户端将尝试连接到 ws://127.0.0.1:9002
,如果建立连接,它将每 5 秒向套接字发送随机字符串。发送三个字符串后,它将优雅地关闭连接并结束其执行。
您可以在 示例中查看更多内容
测试
Socket-flow通过了Autobahn 测试套件的WebSocket测试。它还有一些内部测试,以确保可靠性
参考文献
依赖关系
~7–15MB
~192K SLoC