#websocket-server #client-server #io #web

socket-flow

一个简单的 WebSocket 实现,用于客户端和服务器

1 个不稳定版本

1.0.0 2024 年 8 月 13 日
0.5.0 2024 年 8 月 23 日
0.4.0 2024 年 8 月 13 日
0.3.0 2024 年 8 月 13 日

64WebSocket

Download history 298/week @ 2024-08-12

298 每月下载量

Apache-2.0

60KB
906

socket-flow

为 Tokio 栈提供的简单异步 WebSocket 实现。

Apache licensed Crates.io

简介

这个库旨在提供简单的 WebSocket 实现,以便最终用户可以使用它将 WebSocket 服务器/客户端封装到他们的应用程序中,以提供一种流畅的方式将其设置到其代码中。

它是一个基于 tokio 运行的异步库,在幕后使用 tokio TcpStream,使用它作为实现 WebSocket 协议标准的起点,执行握手、读取帧、解析掩码、处理操作码和内部有效载荷。

可以作为客户端或服务器使用,返回一个 WSConnection,该连接实现了 Stream 特性,因此您可以连续消费传入的消息或发送消息。

背后的动机是提供一个简单的方法,在您的应用程序中拥有 WebSocket 连接,同时作为参考使用广泛建立的库,如 tungstenite-rstokio-tungstenite

特性

实现了大部分 WebSocket RFC 特性,如

  • 握手过程、密钥解析和生成
  • 处理操作码,如 TextBinaryPingPongContinue
  • 多个订阅
  • 可伸缩性
  • 错误处理
  • 通过了 autobahn-test-suite

待添加的特性

  • TLS/SSL 支持
  • 压缩

使用方法

在您的 Cargo.toml 中添加以下内容

[dependencies]
socket-flow = "*"

使用示例

此仓库包含不同的示例和使用其依赖项的灵活方式来设计代码,以满足最终用户的需求。

我们有从头开始配置所有内容的选项,创建 TcpListener 并管理 WebSocket 连接,同时还有一个即插即用的选项,您可以以更少的代码生成 WebSocket 服务器。

即插即用服务器

这是一个非常实用的例子,因为它允许你通过调用 start_server 函数来启动一个服务器,该函数返回一个 EventStream,用于消费服务器事件,如新的连接、消息、错误和断开连接。

use futures::StreamExt;
use log::*;
use socket_flow::event::{Event, ID};
use socket_flow::server::start_server;
use socket_flow::split::WSWriter;
use std::collections::HashMap;

#[tokio::main]
async fn main() {
    env_logger::init();

    let port: u16 = 8080;
    match start_server(8080).await {
        Ok(mut event_receiver) => {
            let mut clients: HashMap<ID, WSWriter> = HashMap::new();
            info!("Server started on address 127.0.0.1:{}", port);
            while let Some(event) = event_receiver.next().await {
                match event {
                    Event::NewClient(id, client_conn) => {
                        info!("New client {} connected", id);
                        clients.insert(id, client_conn);
                    }
                    Event::NewMessage(client_id, message) => {
                        info!("Message from client {}: {:?}", client_id, message);
                        let ws_writer = clients.get_mut(&client_id).unwrap();
                        ws_writer.send_message(message).await.unwrap();
                    }
                    Event::Disconnect(client_id) => {
                        info!("Client {} disconnected", client_id);
                        clients.remove(&client_id);
                    }
                    Event::Error(client_id, error) => {
                        error!("Error occurred for client {}: {:?}", client_id, error);
                    }
                }
            }
        }
        Err(err) => {
            eprintln!("Could not start the server due to: {:?}", err);
        }
    }
}

要运行此示例,您可以克隆仓库并执行

cargo run --color=always --package socket-flow --example simple_server

回声服务器

以下是一个回声服务器示例,您也可以在:示例中找到

use futures::StreamExt;
use log::*;
use socket_flow::handshake::accept_async;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};

async fn handle_connection(_: SocketAddr, stream: TcpStream) {
    match accept_async(stream).await {
        Ok(mut ws_connection) => {
            while let Some(result) = ws_connection.next().await {
                match result {
                    Ok(frame) => {
                        if ws_connection.send_frame(frame).await.is_err() {
                            error!("Failed to send message");
                            break;
                        }
                    }
                    Err(e) => {
                        error!("Received error from the stream: {}", e);
                        break;
                    }
                }
            }
        }
        Err(err) => error!("Error when performing handshake: {}", err),
    }
}

#[tokio::main]
async fn main() {
    env_logger::init();

    let addr = "127.0.0.1:9002";
    let listener = TcpListener::bind(&addr).await.expect("Can't listen");
    info!("Listening on: {}", addr);

    while let Ok((stream, _)) = listener.accept().await {
        let peer = stream
            .peer_addr()
            .expect("connected streams should have a peer address");
        info!("Peer address: {}", peer);

        tokio::spawn(handle_connection(peer, stream));
    }
}

要运行此示例,您可以克隆仓库并执行

cargo run --color=always --package socket-flow --example echo_server

此示例创建了一个 TcpListener,将其绑定到端口,接受连接,在 tokio 任务中处理每个连接,以并发处理客户端。handle_connection 函数确保执行握手过程,返回一个 WSConnection,它实现了 Stream trait,其中您可以消费此客户端的传入数据,并在套接字中执行写入操作。它包括通过 Result 的错误处理。

简单客户端

以下是一个运行客户端的示例,该客户端将执行一些操作并优雅地断开连接

use futures::StreamExt;
use rand::distr::Alphanumeric;
use rand::{thread_rng, Rng};
use log::*;
use socket_flow::handshake::connect_async;
use tokio::select;
use tokio::time::{interval, Duration};

async fn handle_connection(addr: &str) {
    match connect_async(addr).await {
        Ok(mut ws_connection) => {
            let mut ticker = interval(Duration::from_secs(5));
            // it will be used for closing the connection
            let mut counter = 0;

            loop {
                select! {
                    Some(result) = ws_connection.next() => {
                        match result {
                            Ok(message) => {
                                 info!("Received message: {}", message.as_text().unwrap());
                                counter = counter + 1;
                                // close the connection if 3 messages have already been sent and received
                                if counter >= 3 {
                                    if ws_connection.close_connection().await.is_err() {
                                         error!("Error occurred when closing connection");
                                    }
                                    break;
                                }
                            }
                            Err(err) => {
                                error!("Received error from the stream: {}", err);

                                break;
                            }
                        }
                    }
                    _ = ticker.tick() => {
                        let random_string = generate_random_string();
                        let binary_data = Vec::from(random_string);

                        if ws_connection.send(binary_data).await.is_err() {
                            eprintln!("Failed to send message");
                            break;
                        }
                    }
                }
            }
        }
        Err(err) => error!("Error when performing handshake: {}", err),
    }
}

#[tokio::main]
async fn main() {
    env_logger::init();
    handle_connection("ws://127.0.0.1:9002").await;
}

fn generate_random_string() -> String {
    thread_rng()
        .sample_iter(&Alphanumeric)
        .take(30)
        .map(char::from)
        .collect()
}

由于您需要一个服务器来测试客户端,您可以执行我们的回声服务器示例,并在另一个选项卡中执行客户端示例

cargo run --color=always --package socket-flow --example client

在此示例中,客户端将尝试连接到 ws://127.0.0.1:9002,如果建立连接,它将每 5 秒向套接字发送随机字符串。发送三个字符串后,它将优雅地关闭连接并结束其执行。

您可以在 示例中查看更多内容

测试

Socket-flow通过了Autobahn 测试套件的WebSocket测试。它还有一些内部测试,以确保可靠性

参考文献

依赖关系

~7–15MB
~192K SLoC