23个版本 (7个破坏性更新)
0.8.0 | 2024年6月20日 |
---|---|
0.7.1 | 2024年3月12日 |
0.6.0 | 2023年12月15日 |
0.5.0 | 2023年10月30日 |
0.4.2 | 2023年5月11日 |
#6 in WebSocket
每月23,987次下载
在 36 个Crate中使用了(直接使用11个)
75KB
1.5K SLoC
fastwebsockets 是一个快速的WebSocket协议实现。
通过Autobahn|TestSuite1测试,并使用LLVM的libfuzzer进行了模糊测试。
您可以用它作为原始WebSocket帧解析器并自行处理规范合规性,或者将其用作完整的WebSocket客户端/服务器。
use fastwebsockets::{Frame, OpCode, WebSocket};
async fn handle_client(
mut socket: TcpStream,
) -> Result<(), WebSocketError> {
handshake(&mut socket).await?;
let mut ws = WebSocket::after_handshake(socket);
ws.set_writev(true);
ws.set_auto_close(true);
ws.set_auto_pong(true);
loop {
let frame = ws.read_frame().await?;
match frame {
OpCode::Close => break,
OpCode::Text | OpCode::Binary => {
let frame = Frame::new(true, frame.opcode, None, frame.payload);
ws.write_frame(frame).await?;
}
}
}
Ok(())
}
碎片化
默认情况下,fastwebsockets将给应用程序提供设置FIN的原始帧。其他如tungstenite这样的crate将提供所有帧连接在一起的单个消息。
对于连接的帧,请使用FragmentCollector
let mut ws = WebSocket::after_handshake(socket);
let mut ws = FragmentCollector::new(ws);
let incoming = ws.read_frame().await?;
// Always returns full messages
assert!(incoming.fin);
尚未支持permessage-deflate。
HTTP升级
启用upgrade
功能进行服务器端升级和客户端握手。
此功能由hyper提供。
use fastwebsockets::upgrade::upgrade;
use hyper::{Request, body::{Incoming, Bytes}, Response};
use http_body_util::Empty;
use anyhow::Result;
async fn server_upgrade(
mut req: Request<Incoming>,
) -> Result<Response<Empty<Bytes>>> {
let (response, fut) = upgrade::upgrade(&mut req)?;
tokio::spawn(async move {
if let Err(e) = handle_client(fut).await {
eprintln!("Error in websocket connection: {}", e);
}
});
Ok(response)
}
使用handshake
模块进行客户端握手。
use fastwebsockets::handshake;
use fastwebsockets::WebSocket;
use hyper::{Request, body::Bytes, upgrade::Upgraded, header::{UPGRADE, CONNECTION}};
use http_body_util::Empty;
use tokio::net::TcpStream;
use std::future::Future;
use anyhow::Result;
async fn connect() -> Result<WebSocket<Upgraded>> {
let stream = TcpStream::connect("localhost:9001").await?;
let req = Request::builder()
.method("GET")
.uri("https://127.0.0.1:9001/")
.header("Host", "localhost:9001")
.header(UPGRADE, "websocket")
.header(CONNECTION, "upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.body(Empty::<Bytes>::new())?;
let (ws, _) = handshake::client(&SpawnExecutor, req, stream).await?;
Ok(ws)
}
// Tie hyper's executor to tokio runtime
struct SpawnExecutor;
impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
fn execute(&self, fut: Fut) {
tokio::task::spawn(fut);
}
}
与Axum一起使用
在Cargo.toml中启用Axum集成,使用features = ["upgrade", "with_axum"]
。
use axum::{response::IntoResponse, routing::get, Router};
use fastwebsockets::upgrade;
use fastwebsockets::OpCode;
use fastwebsockets::WebSocketError;
#[tokio::main]
async fn main() {
let app = Router::new().route("/", get(ws_handler));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> {
let mut ws = fastwebsockets::FragmentCollector::new(fut.await?);
loop {
let frame = ws.read_frame().await?;
match frame.opcode {
OpCode::Close => break,
OpCode::Text | OpCode::Binary => {
ws.write_frame(frame).await?;
}
_ => {}
}
}
Ok(())
}
async fn ws_handler(ws: upgrade::IncomingUpgrade) -> impl IntoResponse {
let (response, fut) = ws.upgrade().unwrap();
tokio::task::spawn(async move {
if let Err(e) = handle_client(fut).await {
eprintln!("Error in websocket connection: {}", e);
}
});
response
}
依赖关系
~3–12MB
~123K SLoC