1 个不稳定版本
0.1.0 | 2019 年 10 月 7 日 |
---|
#285 在 WebSocket
9KB
237 行
ws_stream_tungstenite
提供基于 WebSocket 的 AsyncRead/Write/AsyncBufRead,可以与编解码器一起使用。
此包提供 AsyncRead
/AsyncWrite
/AsyncBufRead
在 async-tungstenite WebSocket 上。它主要允许使用 rust wasm 代码并通过字节流进行通信。此包为非 WASM 目标(例如服务器端)提供功能。客户端端有可用的 WASM 版本 在此处。
目前有两个版本的 AsyncRead/Write 特性。有 futures-rs 版本和 tokio 版本。如果您想实现 traits 的 tokio 版本,则需要启用 tokio_io
功能。
您可能会想,为什么不直接序列化您的结构体并将其发送到 WebSocket 消息中。首先,在发布 ws_stream_wasm 之前,wasm 上没有方便的 WebSocket Rust 包,甚至没有 AsyncRead
/AsyncWrite
。其次,这允许您通过仅接受 AsyncRead
/AsyncWrite
来保持代码的通用性,而不是将其适配到特定的协议(如 WebSocket),这在库包中特别有用。此外,您无需处理 WebSocket 协议和库的怪癖。这几乎与任何其他的异步字节流一样工作(例外:如何关闭连接)。由于这种间接性,会有一些额外的开销,但应该很小。
ws_stream_tungstenite 在 async-tungstenite 上工作,因此您将不得不使用 async-tungstenite 的 API 来设置连接,并将 WebSocketStream
传递给 WsStream
。
目录
安装
使用 cargo add: cargo add ws_stream_tungstenite
使用 cargo yaml
dependencies:
ws_stream_tungstenite: ^0.13
使用原始 Cargo.toml
[dependencies]
ws_stream_tungstenite = "0.13"
升级
升级时请查看 变更日志
依赖关系
此软件包依赖项较少。Cargo 会自动为您处理依赖项。
安全
此软件包使用 #![ forbid( unsafe_code ) ]
,但我们的依赖项没有。
请确保您的编解码器具有最大消息大小限制。
功能
tokio_io
功能启用实现来自 tokio 的 AsyncRead
和 AsyncWrite
特性。
使用
请查看 存储库的示例目录。
集成测试 也有用。
示例
这是最基本的想法(针对客户端代码)
use
{
ws_stream_tungstenite :: { * } ,
futures :: { StreamExt } ,
tracing :: { * } ,
async_tungstenite :: { accept_async } ,
asynchronous_codec :: { LinesCodec, Framed } ,
async_std :: { net::TcpListener } ,
};
#[ async_std::main ]
//
async fn main() -> Result<(), std::io::Error>
{
let socket = TcpListener::bind( "127.0.0.1:3012" ).await?;
let mut connections = socket.incoming();
let tcp = connections.next().await.expect( "1 connection" ).expect( "tcp connect" );
let s = accept_async( tcp ).await.expect( "ws handshake" );
let ws = WsStream::new( s );
// ws here is observable with pharos to detect non fatal errors and ping/close events, which cannot
// be represented in the AsyncRead/Write API. See the events example in the repository.
let (_sink, mut stream) = Framed::new( ws, LinesCodec {} ).split();
while let Some( msg ) = stream.next().await
{
let msg = match msg
{
Err(e) =>
{
error!( "Error on server stream: {:?}", e );
// Errors returned directly through the AsyncRead/Write API are fatal, generally an error on the underlying
// transport.
//
continue;
}
Ok(m) => m,
};
info!( "server received: {}", msg.trim() );
// ... do something useful
}
// safe to drop the TCP connection
Ok(())
}
如何关闭连接
WebSocket RFC 规定了关闭握手,总结如下
- 当端点想要关闭连接时,它会发送一个关闭帧,然后不再发送更多数据。由于另一个端点可能仍在发送数据,最好继续处理传入的数据,直到
- 远程端点确认关闭帧。
- 端点已经发送并接收了关闭帧后,连接被认为是已关闭,服务器应关闭底层的 TCP 连接。如果服务器未能及时关闭,客户端可以选择关闭它。
使用 ws_stream_tungstenite 正确关闭连接相当简单。如果远程端点启动关闭,只需轮询流即可确保连接保持到握手完成。当流返回 None
时,您就可以将其丢弃了。
如果您想启动关闭,请调用 sink 上的 close。从那时起,情况与上面相同。只需轮询流,直到它返回 None
,您就可以离开了。
Tungstenite 仅在客户端返回 None
时才返回,这是当服务器关闭底层连接时,因此它将确保您遵守 WebSocket 协议。
如果您启动关闭握手,您可能想与超时进行竞速,如果远程端点没有及时完成关闭握手,则丢弃连接。有关如何做到这一点,请参阅 存储库的示例目录中的 close.rs 示例。
错误处理
ws_stream_tungstenite 是关于 AsyncRead
/AsyncWrite
的,所以我们只接受二进制消息。如果我们收到 WebSocket 文本消息,则视为协议错误。
有关详细信息,请参阅 WsStream
的 API 文档。特别是针对 AsyncRead
/AsyncWrite
的实现,它详细说明了您可能遇到的错误。
由于 AsyncRead
/AsyncWrite
只允许返回 std::io::Error
,在流中某些错误可能不是致命的,但编解码器通常会认为任何错误都是致命的,因此错误将通过 Pharos 以外的方式返回。您应该关注 WsStream
,并且至少记录任何报告的错误。
限制
- 没有提供发送 Ping 消息的 API。解决这个问题意味着要创建一个类似 ws_stream_wasm 的
WsMeta
类型。 - 收到的文本消息被视为错误。另一个可考虑的选项是将这些消息以外部方式返回给客户端代码,而不是将它们包含在
AsyncRead
/AsyncWrite
的数据中。这也与 ws_stream_wasm 不一致,后者会调用to_bytes
并将字节包含在字节流中。
API
API 文档可在 docs.rs 上找到。
参考
了解 WebSocket 及浏览器如何处理它们的参考文档包括:
- RFC 6455 - WebSocket 协议
- ws 的安全性:WebSockets not Bound by SOP and CORS? Does this mean…
- 另一个:Cross-Site WebSocket Hijacking (CSWSH)
贡献
请查看 贡献指南。
测试
cargotest --all-features
行为准则
任何在 公民行为准则的第 4 点“不可接受的行为” 中描述的行为都是不受欢迎的,并且可能会让您被禁止。如果包括维护者和项目管理员在内的任何人未能尊重这些/您的限制,您有权指出。
许可
依赖关系
~4MB
~59K SLoC