24 个版本 (13 个重大更新)

0.14.0 2024 年 6 月 3 日
0.13.0 2023 年 12 月 11 日
0.12.0 2023 年 11 月 19 日
0.11.1 2023 年 7 月 31 日
0.3.0 2021 年 3 月 2 日

#16WebSocket

Download history 6514/week @ 2024-05-02 8422/week @ 2024-05-09 6726/week @ 2024-05-16 4705/week @ 2024-05-23 7746/week @ 2024-05-30 7511/week @ 2024-06-06 5740/week @ 2024-06-13 5876/week @ 2024-06-20 5301/week @ 2024-06-27 5250/week @ 2024-07-04 6501/week @ 2024-07-11 6160/week @ 2024-07-18 7317/week @ 2024-07-25 4645/week @ 2024-08-01 4724/week @ 2024-08-08 3121/week @ 2024-08-15

20,900 每月下载量
33 个 Crates 中使用 (22 个直接使用)

BSD-2-Clause

18KB
102

Docs.rs CI

hyper-tungstenite

此 crate 允许 hyper 服务器接受 websocket 连接,由 tungstenite 支持。

upgrade 函数允许您将 HTTP 连接升级为 websocket 连接。它返回发送给客户端的 HTTP 响应,以及一个解析为 WebSocketStream 的 future。必须将响应发送给客户端才能解析 future。在实践中,这意味着您必须在不同的任务中启动 future。

请注意,upgrade 函数本身不会检查请求是否确实是升级请求。对于简单的情况,您可以在调用 upgrade 之前使用 is_upgrade_request 函数进行检查。对于更复杂的情况,其中服务器应支持多个升级协议,您可以手动检查 ConnectionUpgrade 头部。

示例

use futures::sink::SinkExt;
use futures::stream::StreamExt;
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
use hyper::{Request, Response};
use hyper_tungstenite::{tungstenite, HyperWebsocket};
use hyper_util::rt::TokioIo;
use tungstenite::Message;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Handle a HTTP or WebSocket request.
async fn handle_request(mut request: Request<Incoming>) -> Result<Response<Full<Bytes>>, Error> {
    // Check if the request is a websocket upgrade request.
    if hyper_tungstenite::is_upgrade_request(&request) {
        let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;

        // Spawn a task to handle the websocket connection.
        tokio::spawn(async move {
            if let Err(e) = serve_websocket(websocket).await {
                eprintln!("Error in websocket connection: {e}");
            }
        });

        // Return the response so the spawned future can continue.
        Ok(response)
    } else {
        // Handle regular HTTP requests here.
        Ok(Response::new(Full::<Bytes>::from("Hello HTTP!")))
    }
}

/// Handle a websocket connection.
async fn serve_websocket(websocket: HyperWebsocket) -> Result<(), Error> {
    let mut websocket = websocket.await?;
    while let Some(message) = websocket.next().await {
        match message? {
            Message::Text(msg) => {
                println!("Received text message: {msg}");
                websocket.send(Message::text("Thank you, come again.")).await?;
            },
            Message::Binary(msg) => {
                println!("Received binary message: {msg:02X?}");
                websocket.send(Message::binary(b"Thank you, come again.".to_vec())).await?;
            },
            Message::Ping(msg) => {
                // No need to send a reply: tungstenite takes care of this for you.
                println!("Received ping message: {msg:02X?}");
            },
            Message::Pong(msg) => {
                println!("Received pong message: {msg:02X?}");
            }
            Message::Close(msg) => {
                // No need to send a reply: tungstenite takes care of this for you.
                if let Some(msg) = &msg {
                    println!("Received close message with code {} and message: {}", msg.code, msg.reason);
                } else {
                    println!("Received close message");
                }
            },
            Message::Frame(_msg) => {
                unreachable!();
            }
        }
    }

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    let addr: std::net::SocketAddr = "[::1]:3000".parse()?;
    let listener = tokio::net::TcpListener::bind(&addr).await?;
    println!("Listening on http://{addr}");

    let mut http = hyper::server::conn::http1::Builder::new();
    http.keep_alive(true);

    loop {
        let (stream, _) = listener.accept().await?;
        let connection = http
            .serve_connection(TokioIo::new(stream), hyper::service::service_fn(handle_request))
            .with_upgrades();
        tokio::spawn(async move {
            if let Err(err) = connection.await {
                println!("Error serving HTTP connection: {err:?}");
            }
        });
    }
}

依赖项

~5–14MB
~167K SLoC