25次发布
0.0.25 | 2024年5月15日 |
---|---|
0.0.24 | 2024年5月6日 |
0.0.23 | 2024年4月30日 |
0.0.13 | 2024年3月28日 |
#55 in WebSocket
2,168 每月下载
205KB
2K SLoC
概述
BoomNet是一个高性能框架,旨在开发低延迟网络应用程序,尤其关注使用各种协议的TCP流式客户端。
安装
只需在您的 Cargo.toml
中声明对 boomnet
的依赖关系,并选择所需的功能。
[dependencies]
boomnet = { version = "0.0.25", features = ["full"]}
设计原则
框架分为多个层次,每个后续层次在其前一个层次的基础上构建,增强功能并提高抽象级别。
流
第一层将 stream
定义为TCP连接的抽象,遵循以下特性。
- 必须实现
Read
和Write
特性以进行I/O操作。 - 以非阻塞方式运行。
- 通过
rustls
集成TLS。 - 支持记录和回放网络字节流。
- 允许绑定到特定网络接口。
- 便于实现WebSocket、HTTP和FIX等面向TCP的客户端协议。
流设计为完全泛型,避免动态分发,并且可以灵活组合。
let stream: RecordedStream<TlsStream<TcpStream>> = TcpStream::bind_and_connect(addr, self.net_iface, None)?
.into_tls_stream(self.url)
.into_recorded_stream("plain");
然后可以在流上应用不同的协议。
let ws: Websocket<RecordedStream<TlsStream<TcpStream>>> = stream.into_websocket(self.url);
选择器
Selector
提供了对操作系统特定机制(如 epoll
)的抽象,用于高效监控套接字就绪事件。尽管主要用于内部使用,但选择器对于 IOService
功能至关重要,目前提供 mio
和 direct
(无操作)实现。
let mut io_service = MioSelector::new()?.into_io_service(IdleStrategy::Sleep(Duration::from_millis(1)));
服务
最后一层管理端点生命周期,并通过 IOService
提供辅助服务(如异步DNS解析和自动断开连接)。
Endpoint
作为应用程序逻辑的低级构造。 IOService
负责端点内的连接生命周期。
协议
目标是支持各种协议,包括WebSocket、HTTP和FIX,目前提供WebSocket客户端功能。
WebSocket
WebSocket客户端协议符合RFC 6455规范,提供以下功能。
- 与任何流兼容。
- 对同一批次读取的帧使用TCP批量感知时间戳。
- 不会因部分帧而阻塞。
- 设计为零拷贝读写。
- 可选的出站帧掩码。
- 独立使用或在
Selector
和IOService
结合使用。
示例用法
仓库包含一个示例列表。
以下示例演示了如何使用IOService
与多个WebSocket连接来从币安加密货币交易所接收消息。首先,我们需要定义和实现我们的Endpoint
。框架提供了我们可以使用的TlsWebsocketEndpoint
特质。
#[derive(Default)]
struct TradeEndpoint {
id: u32,
url: &'static str,
instrument: &'static str,
}
impl TradeEndpoint {
pub fn new(id: u32, url: &'static str, instrument: &'static str) -> TradeEndpoint {
Self { id, url, instrument, }
}
}
impl TlsWebsocketEndpoint for TradeEndpoint {
type Stream = MioStream;
fn url(&self) -> &str {
self.url
}
// called by the IO service whenever a connection has to be established for this endpoint
fn create_websocket(&mut self, addr: SocketAddr) -> io::Result<TlsWebsocket<Self::Stream>> {
// create secure websocket
let mut ws = TcpStream::bind_and_connect(addr, None, None)?
.into_mio_stream()
.into_tls_websocket(self.url);
// send subscription message
ws.send_text(
true,
Some(format!(r#"{{"method":"SUBSCRIBE","params":["{}@trade"],"id":1}}"#, self.instrument).as_bytes()),
)?;
Ok(ws)
}
#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>) -> io::Result<()> {
// keep calling receive_next until no more frames in the current batch
while let Some(WebsocketFrame::Text(ts, fin, data)) = ws.receive_next()? {
// handle the message
println!("[{}] {ts}: ({fin}) {}", self.id, String::from_utf8_lossy(data));
}
Ok(())
}
}
定义端点后,将其注册到IOService
并在事件循环中进行轮询。服务处理Endpoint
连接管理和断开重连。
fn main() -> anyhow::Result<()> {
let mut io_service = MioSelector::new()?.into_io_service(IdleStrategy::Sleep(Duration::from_millis(1)));
let endpoint_btc = TradeEndpoint::new(0, "wss://stream1.binance.com:443/ws", None, "btcusdt");
let endpoint_eth = TradeEndpoint::new(1, "wss://stream2.binance.com:443/ws", None, "ethusdt");
let endpoint_xrp = TradeEndpoint::new(2, "wss://stream3.binance.com:443/ws", None, "xrpusdt");
io_service.register(endpoint_btc);
io_service.register(endpoint_eth);
io_service.register(endpoint_xrp);
loop {
// will never block
io_service.poll()?;
}
}
通常需要将共享状态暴露给Endpoint
。这可以通过用户定义的Context
实现。
struct FeedContext {
static_data: StaticData,
}
// use the marker trait
impl Context for FeedContext {}
在实现我们的TradeEndpoint
时,我们可以使用TlsWebsocketEndpointWithContext
特质。
impl TlsWebsocketEndpointWithContext<FeedContext> for TradeEndpoint {
type Stream = MioStream;
fn url(&self) -> &str {
self.url
}
fn create_websocket(&mut self, addr: SocketAddr, ctx: &mut FeedContext) -> io::Result<TlsWebsocket<Self::Stream>> {
// we now have access to context
}
#[inline]
fn poll(&mut self, ws: &mut TlsWebsocket<Self::Stream>, ctx: &mut FeedContext) -> io::Result<()> {
// we now have access to context
}
}
我们还需要创建一个IOService
,使其了解Context
。
let mut context = FeedContext::new(static_data);
let mut io_service = MioSelector::new()?.into_io_service_with_context(IdleStrategy::Sleep(Duration::from_millis(1)), &mut context);
现在必须将Context
传递给服务的poll
方法。
loop {
io_service.poll(&mut context)?;
}
功能
BoomNet功能集是模块化的,可以根据项目需求定制功能。启用所有可用功能的full
功能,而单个组件可以根据需要启用。
mio
添加了对mio
存储库的依赖关系,并启用MioSelector
和MioStream
。
tls
添加了对rustls
存储库的依赖关系,并启用TlsStream
和更灵活的TlsReadyStream
。
ws
支持Websocket
协议。
依赖项
~4–15MB
~217K SLoC