25次发布

0.0.25 2024年5月15日
0.0.24 2024年5月6日
0.0.23 2024年4月30日
0.0.13 2024年3月28日

#55 in WebSocket

Download history 26/week @ 2024-04-19 123/week @ 2024-04-26 139/week @ 2024-05-03 114/week @ 2024-05-10 31/week @ 2024-05-17 4/week @ 2024-05-24 3/week @ 2024-05-31 3/week @ 2024-06-07 4/week @ 2024-06-28 34/week @ 2024-07-05

2,168 每月下载

MIT 许可证

205KB
2K SLoC

Build Status Latest Version Docs Badge License Badge

概述

BoomNet是一个高性能框架,旨在开发低延迟网络应用程序,尤其关注使用各种协议的TCP流式客户端。

安装

只需在您的 Cargo.toml 中声明对 boomnet 的依赖关系,并选择所需的功能。

[dependencies]
boomnet = { version = "0.0.25", features = ["full"]}

设计原则

框架分为多个层次,每个后续层次在其前一个层次的基础上构建,增强功能并提高抽象级别。

第一层将 stream 定义为TCP连接的抽象,遵循以下特性。

  • 必须实现 ReadWrite 特性以进行I/O操作。
  • 以非阻塞方式运行。
  • 通过 rustls 集成TLS。
  • 支持记录和回放网络字节流。
  • 允许绑定到特定网络接口。
  • 便于实现WebSocket、HTTP和FIX等面向TCP的客户端协议。

流设计为完全泛型,避免动态分发,并且可以灵活组合。

let stream: RecordedStream<TlsStream<TcpStream>> = TcpStream::bind_and_connect(addr, self.net_iface, None)?
    .into_tls_stream(self.url)
    .into_recorded_stream("plain");

然后可以在流上应用不同的协议。

let ws: Websocket<RecordedStream<TlsStream<TcpStream>>> = stream.into_websocket(self.url);

选择器

Selector 提供了对操作系统特定机制(如 epoll)的抽象,用于高效监控套接字就绪事件。尽管主要用于内部使用,但选择器对于 IOService 功能至关重要,目前提供 miodirect(无操作)实现。

let mut io_service = MioSelector::new()?.into_io_service(IdleStrategy::Sleep(Duration::from_millis(1)));

服务

最后一层管理端点生命周期,并通过 IOService 提供辅助服务(如异步DNS解析和自动断开连接)。

Endpoint 作为应用程序逻辑的低级构造。 IOService 负责端点内的连接生命周期。

协议

目标是支持各种协议,包括WebSocket、HTTP和FIX,目前提供WebSocket客户端功能。

WebSocket

WebSocket客户端协议符合RFC 6455规范,提供以下功能。

  • 与任何流兼容。
  • 对同一批次读取的帧使用TCP批量感知时间戳。
  • 不会因部分帧而阻塞。
  • 设计为零拷贝读写。
  • 可选的出站帧掩码。
  • 独立使用或在SelectorIOService结合使用。

示例用法

仓库包含一个示例列表

以下示例演示了如何使用IOService与多个WebSocket连接来从币安加密货币交易所接收消息。首先,我们需要定义和实现我们的Endpoint。框架提供了我们可以使用的TlsWebsocketEndpoint特质。


#[derive(Default)]
struct TradeEndpoint {
    id: u32,
    url: &'static str,
    instrument: &'static str,
}

impl TradeEndpoint {
    pub fn new(id: u32, url: &'static str, instrument: &'static str) -> TradeEndpoint {
        Self { id, url, instrument, }
    }
}

impl TlsWebsocketEndpoint for TradeEndpoint {
    type Stream = MioStream;

    fn url(&self) -> &str {
        self.url
    }

    // called by the IO service whenever a connection has to be established for this endpoint
    fn create_websocket(&mut self, addr: SocketAddr) -> io::Result<TlsWebsocket<Self::Stream>> {
        
        // create secure websocket
        let mut ws = TcpStream::bind_and_connect(addr, None, None)?
            .into_mio_stream()
            .into_tls_websocket(self.url);

        // send subscription message
        ws.send_text(
            true,
            Some(format!(r#"{{"method":"SUBSCRIBE","params":["{}@trade"],"id":1}}"#, self.instrument).as_bytes()),
        )?;

        Ok(ws)
    }

    #[inline]
    fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>) -> io::Result<()> {
        // keep calling receive_next until no more frames in the current batch
        while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
            // handle the message
            println!("[{}] {ts}: ({fin}) {}", self.id, String::from_utf8_lossy(data));
        }
        Ok(())
    }
}

定义端点后,将其注册到IOService并在事件循环中进行轮询。服务处理Endpoint连接管理和断开重连。


fn main() -> anyhow::Result<()> {
    let mut io_service = MioSelector::new()?.into_io_service(IdleStrategy::Sleep(Duration::from_millis(1)));

    let endpoint_btc = TradeEndpoint::new(0, "wss://stream1.binance.com:443/ws", None, "btcusdt");
    let endpoint_eth = TradeEndpoint::new(1, "wss://stream2.binance.com:443/ws", None, "ethusdt");
    let endpoint_xrp = TradeEndpoint::new(2, "wss://stream3.binance.com:443/ws", None, "xrpusdt");

    io_service.register(endpoint_btc);
    io_service.register(endpoint_eth);
    io_service.register(endpoint_xrp);

    loop {
        // will never block
        io_service.poll()?;
    }
}

通常需要将共享状态暴露给Endpoint。这可以通过用户定义的Context实现。

struct FeedContext {
    static_data: StaticData,
}

// use the marker trait
impl Context for FeedContext {}

在实现我们的TradeEndpoint时,我们可以使用TlsWebsocketEndpointWithContext特质。

impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {
    type Stream = MioStream;

    fn url(&self) -> &str {
        self.url
    }

    fn create_websocket(&mut self, addr: SocketAddr, ctx: &mut FeedContext) -> io::Result<TlsWebsocket<Self::Stream>> {
        // we now have access to context
    }

    #[inline]
    fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, ctx: &mut FeedContext) -> io::Result<()> {
        // we now have access to context
    }
}

我们还需要创建一个IOService,使其了解Context

let mut context = FeedContext::new(static_data);
let mut io_service = MioSelector::new()?.into_io_service_with_context(IdleStrategy::Sleep(Duration::from_millis(1)), &mut context);

现在必须将Context传递给服务的poll方法。

loop {
    io_service.poll(&mut context)?;
}

功能

BoomNet功能集是模块化的,可以根据项目需求定制功能。启用所有可用功能的full功能,而单个组件可以根据需要启用。

mio

添加了对mio存储库的依赖关系,并启用MioSelectorMioStream

tls

添加了对rustls存储库的依赖关系,并启用TlsStream和更灵活的TlsReadyStream

ws

支持Websocket协议。

依赖项

~4–15MB
~217K SLoC