#面向服务 #WebSocket #服务 #镓钨酸盐 #模式 #非阻塞 #设计

sod-tungstenite

面向服务设计 - 镓钨酸盐

6 个版本

0.3.4 2024 年 6 月 26 日
0.3.3 2023 年 10 月 31 日
0.2.2 2023 年 7 月 25 日
0.2.1 2023 年 4 月 22 日

Rust 模式 中排名 2487

每月下载量 48

MIT/Apache 协议

80KB
1.5K SLoC

sod-tungstenite

sod::Service 实现与 tungstenite WebSocket 交互。

服务实现

所有服务都是 Retryable 并且可以是阻塞的或非阻塞的。

  • WsSession 是一个包装了 tungstenite::WebSocketMutService,接受 WsSessionEvent 以发送或接收消息。 WsSession::into_split 可以将 WsSession 分割为 WsReaderWsWriterWsFlusher
  • WsReader 是一个包装了 Mutex<tungstenite::WebSocket>Service,接受一个 () 作为输入并生成 tungstenite::Message 作为输出。
  • WsWriter 是一个包装了 Mutex<tungstenite::WebSocket>Service,接受一个 tungstenite::Message 作为输入。
  • WsFlusher 是一个包装了 Mutex<tungstenite::WebSocket>Service,接受一个 () 作为输入。
  • WsServer 是一个监听 TCP 端口的 Service,接受一个 () 作为输入并生成一个 WsSession 作为输出。

功能

  • native-tls 启用本地 TLS
  • __rustls-tls 启用 Rustls TLS

阻塞示例

use sod::{idle::backoff, MaybeProcessService, MutService, RetryService, Service, ServiceChain};
use sod_tungstenite::{UninitializedWsSession, WsServer, WsSession, WsSessionEvent};
use std::{sync::atomic::Ordering, thread::spawn};
use tungstenite::{http::StatusCode, Message};
use url::Url;

// server session logic to add `"pong: "` in front of text payload
struct PongService;
impl Service for PongService {
    type Input = Message;
    type Output = Option<Message>;
    type Error = ();
    fn process(&self, input: Message) -> Result<Self::Output, Self::Error> {
        match input {
            Message::Text(text) => Ok(Some(Message::Text(format!("pong: {text}")))),
            _ => Ok(None),
        }
    }
}

// wires session logic and spawns in new thread
struct SessionSpawner;
impl Service for SessionSpawner {
    type Input = UninitializedWsSession;
    type Output = ();
    type Error = ();
    fn process(&self, input: UninitializedWsSession) -> Result<Self::Output, Self::Error> {
        spawn(|| {
            let (r, w, f) = input.handshake().unwrap().into_split();
            let chain = ServiceChain::start(r)
                .next(PongService)
                .next(MaybeProcessService::new(w))
                .next(MaybeProcessService::new(f))
                .end();
            sod::thread::spawn_loop(chain, |err| {
                println!("Session: {err:?}");
                Err(err) // stop thread on error
            });
        });
        Ok(())
    }
}

// start a blocking server that creates blocking sessions
let server = WsServer::bind("127.0.0.1:48490").unwrap();

// spawn a thread to start accepting new server sessions
let handle = sod::thread::spawn_loop(
    ServiceChain::start(server).next(SessionSpawner).end(),
    |err| {
        println!("Server: {err:?}");
        Err(err) // stop thread on error
    },
);

// connect a client to the server
let (mut client, _) =
    WsSession::connect(Url::parse("ws://127.0.0.1:48490/socket").unwrap()).unwrap();

// client writes `"hello world"` payload
client
    .process(WsSessionEvent::WriteMessage(Message::Text(
        "hello world!".to_owned(),
    )))
    .unwrap();

// client receives `"pong: hello world"` payload
println!(
    "Received: {:?}",
    client.process(WsSessionEvent::ReadMessage).unwrap()
);

// join until server crashes
handle.join().unwrap();

非阻塞示例

use sod::{idle::backoff, MaybeProcessService, MutService, RetryService, Service, ServiceChain};
use sod_tungstenite::{UninitializedWsSession, WsServer, WsSession, WsSessionEvent};
use std::{sync::atomic::Ordering, thread::spawn};
use tungstenite::{http::StatusCode, Message};
use url::Url;

// server session logic to add `"pong: "` in front of text payload
struct PongService;
impl Service for PongService {
    type Input = Message;
    type Output = Option<Message>;
    type Error = ();
    fn process(&self, input: Message) -> Result<Self::Output, Self::Error> {
        match input {
            Message::Text(text) => Ok(Some(Message::Text(format!("pong: {text}")))),
            _ => Ok(None),
        }
    }
}

// wires session logic and spawns in new thread
struct SessionSpawner;
impl Service for SessionSpawner {
    type Input = UninitializedWsSession;
    type Output = ();
    type Error = ();
    fn process(&self, input: UninitializedWsSession) -> Result<Self::Output, Self::Error> {
        spawn(|| {
            let (r, w, f) = input.handshake().unwrap().into_split();
            let chain = ServiceChain::start(RetryService::new(r, backoff))
                .next(PongService)
                .next(MaybeProcessService::new(RetryService::new(w, backoff)))
                .next(MaybeProcessService::new(f))
                .end();
            sod::thread::spawn_loop(chain, |err| {
                println!("Session: {err:?}");
                Err(err) // stop thread on error
            });
        });
        Ok(())
    }
}

// start a non-blocking server that creates non-blocking sessions
let server = WsServer::bind("127.0.0.1:48490")
    .unwrap()
    .with_nonblocking_sessions(true)
    .with_nonblocking_server(true)
    .unwrap();

// spawn a thread to start accepting new server sessions
let handle = sod::thread::spawn_loop(
    ServiceChain::start(RetryService::new(server, backoff))
        .next(SessionSpawner)
        .end(),
    |err| {
        println!("Server: {err:?}");
        Err(err) // stop thread on error
    },
);

// connect a client to the server
let (mut client, response) =
    WsSession::connect(Url::parse("ws://127.0.0.1:48490/socket").unwrap()).unwrap();
assert_eq!(response.status(), StatusCode::SWITCHING_PROTOCOLS);

// client writes `"hello world"` payload
client
    .process(WsSessionEvent::WriteMessage(Message::Text(
        "hello world!".to_owned(),
    )))
    .unwrap();

// client receives `"pong: hello world"` payload
assert_eq!(
    client.process(WsSessionEvent::ReadMessage).unwrap(),
    Some(Message::Text("pong: hello world!".to_owned()))
);

// stop the server
sod::idle::KEEP_RUNNING.store(false, Ordering::Release);
handle.join().unwrap();

依赖

~1–1.6MB
~32K SLoC