6 个版本
0.3.4 | 2024 年 6 月 26 日 |
---|---|
0.3.3 | 2023 年 10 月 31 日 |
0.2.2 | 2023 年 7 月 25 日 |
0.2.1 | 2023 年 4 月 22 日 |
在 Rust 模式 中排名 2487
每月下载量 48
80KB
1.5K SLoC
sod-tungstenite
sod::Service
实现与 tungstenite
WebSocket 交互。
服务实现
所有服务都是 Retryable
并且可以是阻塞的或非阻塞的。
WsSession
是一个包装了tungstenite::WebSocket
的MutService
,接受WsSessionEvent
以发送或接收消息。WsSession::into_split
可以将WsSession
分割为WsReader
、WsWriter
和WsFlusher
。WsReader
是一个包装了Mutex<tungstenite::WebSocket>
的Service
,接受一个()
作为输入并生成tungstenite::Message
作为输出。WsWriter
是一个包装了Mutex<tungstenite::WebSocket>
的Service
,接受一个tungstenite::Message
作为输入。WsFlusher
是一个包装了Mutex<tungstenite::WebSocket>
的Service
,接受一个()
作为输入。WsServer
是一个监听 TCP 端口的Service
,接受一个()
作为输入并生成一个WsSession
作为输出。
功能
native-tls
启用本地 TLS__rustls-tls
启用 Rustls TLS
阻塞示例
use sod::{idle::backoff, MaybeProcessService, MutService, RetryService, Service, ServiceChain};
use sod_tungstenite::{UninitializedWsSession, WsServer, WsSession, WsSessionEvent};
use std::{sync::atomic::Ordering, thread::spawn};
use tungstenite::{http::StatusCode, Message};
use url::Url;
// server session logic to add `"pong: "` in front of text payload
struct PongService;
impl Service for PongService {
type Input = Message;
type Output = Option<Message>;
type Error = ();
fn process(&self, input: Message) -> Result<Self::Output, Self::Error> {
match input {
Message::Text(text) => Ok(Some(Message::Text(format!("pong: {text}")))),
_ => Ok(None),
}
}
}
// wires session logic and spawns in new thread
struct SessionSpawner;
impl Service for SessionSpawner {
type Input = UninitializedWsSession;
type Output = ();
type Error = ();
fn process(&self, input: UninitializedWsSession) -> Result<Self::Output, Self::Error> {
spawn(|| {
let (r, w, f) = input.handshake().unwrap().into_split();
let chain = ServiceChain::start(r)
.next(PongService)
.next(MaybeProcessService::new(w))
.next(MaybeProcessService::new(f))
.end();
sod::thread::spawn_loop(chain, |err| {
println!("Session: {err:?}");
Err(err) // stop thread on error
});
});
Ok(())
}
}
// start a blocking server that creates blocking sessions
let server = WsServer::bind("127.0.0.1:48490").unwrap();
// spawn a thread to start accepting new server sessions
let handle = sod::thread::spawn_loop(
ServiceChain::start(server).next(SessionSpawner).end(),
|err| {
println!("Server: {err:?}");
Err(err) // stop thread on error
},
);
// connect a client to the server
let (mut client, _) =
WsSession::connect(Url::parse("ws://127.0.0.1:48490/socket").unwrap()).unwrap();
// client writes `"hello world"` payload
client
.process(WsSessionEvent::WriteMessage(Message::Text(
"hello world!".to_owned(),
)))
.unwrap();
// client receives `"pong: hello world"` payload
println!(
"Received: {:?}",
client.process(WsSessionEvent::ReadMessage).unwrap()
);
// join until server crashes
handle.join().unwrap();
非阻塞示例
use sod::{idle::backoff, MaybeProcessService, MutService, RetryService, Service, ServiceChain};
use sod_tungstenite::{UninitializedWsSession, WsServer, WsSession, WsSessionEvent};
use std::{sync::atomic::Ordering, thread::spawn};
use tungstenite::{http::StatusCode, Message};
use url::Url;
// server session logic to add `"pong: "` in front of text payload
struct PongService;
impl Service for PongService {
type Input = Message;
type Output = Option<Message>;
type Error = ();
fn process(&self, input: Message) -> Result<Self::Output, Self::Error> {
match input {
Message::Text(text) => Ok(Some(Message::Text(format!("pong: {text}")))),
_ => Ok(None),
}
}
}
// wires session logic and spawns in new thread
struct SessionSpawner;
impl Service for SessionSpawner {
type Input = UninitializedWsSession;
type Output = ();
type Error = ();
fn process(&self, input: UninitializedWsSession) -> Result<Self::Output, Self::Error> {
spawn(|| {
let (r, w, f) = input.handshake().unwrap().into_split();
let chain = ServiceChain::start(RetryService::new(r, backoff))
.next(PongService)
.next(MaybeProcessService::new(RetryService::new(w, backoff)))
.next(MaybeProcessService::new(f))
.end();
sod::thread::spawn_loop(chain, |err| {
println!("Session: {err:?}");
Err(err) // stop thread on error
});
});
Ok(())
}
}
// start a non-blocking server that creates non-blocking sessions
let server = WsServer::bind("127.0.0.1:48490")
.unwrap()
.with_nonblocking_sessions(true)
.with_nonblocking_server(true)
.unwrap();
// spawn a thread to start accepting new server sessions
let handle = sod::thread::spawn_loop(
ServiceChain::start(RetryService::new(server, backoff))
.next(SessionSpawner)
.end(),
|err| {
println!("Server: {err:?}");
Err(err) // stop thread on error
},
);
// connect a client to the server
let (mut client, response) =
WsSession::connect(Url::parse("ws://127.0.0.1:48490/socket").unwrap()).unwrap();
assert_eq!(response.status(), StatusCode::SWITCHING_PROTOCOLS);
// client writes `"hello world"` payload
client
.process(WsSessionEvent::WriteMessage(Message::Text(
"hello world!".to_owned(),
)))
.unwrap();
// client receives `"pong: hello world"` payload
assert_eq!(
client.process(WsSessionEvent::ReadMessage).unwrap(),
Some(Message::Text("pong: hello world!".to_owned()))
);
// stop the server
sod::idle::KEEP_RUNNING.store(false, Ordering::Release);
handle.join().unwrap();
依赖
~1–1.6MB
~32K SLoC