#websocket #async-io #stream #async #future #websocket-client

ws_stream_tungstenite

提供基于 Tungstenite WebSockets 的 AsyncRead/AsyncWrite

21 个版本 (12 个重大更改)

0.13.0 2024 年 2 月 16 日
0.11.0 2023 年 10 月 7 日
0.10.0 2023 年 5 月 4 日
0.9.0 2022 年 10 月 29 日
0.1.0-alpha.52019 年 11 月 14 日

#111 in 异步

Download history 3471/week @ 2024-04-15 4203/week @ 2024-04-22 4286/week @ 2024-04-29 3101/week @ 2024-05-06 3225/week @ 2024-05-13 4034/week @ 2024-05-20 3925/week @ 2024-05-27 3443/week @ 2024-06-03 4247/week @ 2024-06-10 4030/week @ 2024-06-17 4507/week @ 2024-06-24 4318/week @ 2024-07-01 6106/week @ 2024-07-08 5358/week @ 2024-07-15 5495/week @ 2024-07-22 6223/week @ 2024-07-29

23,346 每月下载量
用于 17 个 crate(直接使用 8 个)

无许可证

64KB
948

ws_stream_tungstenite

standard-readme compliant Build Status Docs crates.io

提供可以与编解码器一起进行帧化的 WebSockets 的 AsyncRead/Write/AsyncBufRead

此 crate 提供了 AsyncRead/AsyncWrite/AsyncBufReadasync-tungstenite WebSockets 上。它主要使您能够使用 rust wasm 代码,并通过字节流的帧进行通信。此 crate 为非 WASM 目标(例如服务器端)提供了功能。客户端有一个可用的 WASM 版本 在此

目前有 2 个版本的 AsyncRead/Write 特性。一个是 futures-rs 版本,另一个是 tokio 版本。如果您想使用特质的 tokio 版本实现,需要启用 tokio_io 功能。

您可能会想,为什么不直接序列化您的结构体并在 WebSocket 消息中发送它。首先,在发布 ws_stream_wasm 之前,wasm 上没有方便的 WebSocket Rust crate,甚至没有 AsyncRead/AsyncWrite。其次,这允许您通过仅使用 AsyncRead/AsyncWrite 而不是将其适配到特定的协议(如 WebSocket)来保持代码的通用性,这在库 crate 中特别有用。此外,您不需要处理 WebSocket 协议和库的怪癖。这几乎就像任何其他的异步字节流一样工作(例外:关闭连接)。由于这种间接性,会有一些额外的开销,但应该很小。

ws_stream_tungsteniteasync-tungstenite 上运行,因此您将不得不使用 async-tungstenite 的 API 来设置您的连接,并将 WebSocketStream 传递给 WsStream

目录

安装

使用 cargo addcargo add ws_stream_tungstenite

使用 cargo yaml

dependencies:

  ws_stream_tungstenite: ^0.13

使用原始 Cargo.toml

[dependencies]

   ws_stream_tungstenite = "0.13"

升级

升级时请检查 变更日志

依赖

此包有少量依赖。Cargo 将自动为您处理依赖。

安全性

此包使用 #![ forbid( unsafe_code ) ],但我们的依赖没有。

请确保您的编解码器具有最大消息大小。

特性

tokio_io 特性允许实现来自 tokioAsyncReadAsyncWrite 特性。

用法

请参阅存储库的示例目录

集成测试也非常有用。

示例

这是最基本的想法(对于客户端代码)

use
{
   ws_stream_tungstenite :: { *                  } ,
   futures               :: { StreamExt          } ,
   tracing               :: { *                  } ,
   async_tungstenite     :: { accept_async       } ,
   asynchronous_codec    :: { LinesCodec, Framed } ,
   async_std             :: { net::TcpListener   } ,
 };

#[ async_std::main ]
//
async fn main() -> Result<(), std::io::Error>
{
   let     socket      = TcpListener::bind( "127.0.0.1:3012" ).await?;
   let mut connections = socket.incoming();

   let tcp = connections.next().await.expect( "1 connection" ).expect( "tcp connect" );
   let s   = accept_async( tcp ).await.expect( "ws handshake" );
   let ws  = WsStream::new( s );

   // ws here is observable with pharos to detect non fatal errors and ping/close events, which cannot
   // be represented in the AsyncRead/Write API. See the events example in the repository.

   let (_sink, mut stream) = Framed::new( ws, LinesCodec {} ).split();


   while let Some( msg ) = stream.next().await
   {
      let msg = match msg
      {
         Err(e) =>
         {
            error!( "Error on server stream: {:?}", e );

            // Errors returned directly through the AsyncRead/Write API are fatal, generally an error on the underlying
            // transport.
            //
            continue;
         }

         Ok(m) => m,
      };


      info!( "server received: {}", msg.trim() );

      // ... do something useful
   }

   // safe to drop the TCP connection

   Ok(())
}

如何关闭连接

WebSocket RFC 规范了关闭握手,总结如下

  • 当端点想要关闭连接时,它会发送一个关闭帧,然后不再发送任何数据。由于另一个端点可能仍在发送数据,最好继续处理传入的数据,直到
  • 远程发送对关闭帧的确认。
  • 当一个端点发送并接收了关闭帧后,连接被认为是已关闭,服务器应关闭底层的TCP连接。客户端可以选择在服务器不及时关闭时关闭它。

使用 ws_stream_tungstenite 正确关闭连接相当简单。如果远程端点启动关闭,只需轮询流即可确保连接保持到握手完成。当流返回 None 时,您就可以释放它了。

如果您想启动关闭,请调用 sink 上的 close。从那时起,情况与上述相同。只需轮询流直到它返回 None,然后您就可以开始了。

Tungstenite 仅在客户端返回 None 时才会返回,这意味着它将确保您遵守 WebSocket 协议。

如果您启动了关闭握手,您可能想要与超时竞争,并在远程端点未能及时完成关闭握手时断开连接。有关如何做到这一点的示例,请参阅存储库的示例目录中的 close.rs

错误处理

ws_stream_tungstenite 是关于 AsyncRead/AsyncWrite 的,所以我们只接受二进制消息。如果我们收到一个 WebSocket 文本消息,那么这被视为一个协议错误。

有关详细信息,请参阅 WsStream 的 API 文档。特别是对于 AsyncRead/AsyncWrite 的实现,其中详细说明了您可能遇到的所有错误。

由于 AsyncRead/AsyncWrite 只允许返回 std::io::Error,并且在流中某些错误可能不是致命的,但编解码器通常会认为任何错误都是致命的,因此错误将通过 pharos 以外带方式返回。您应该观察 WsStream 并至少记录任何报告的错误。

限制

  • 没有提供发送 Ping 消息的 API。解决这个问题意味着要创建一个类似于 ws_stream_wasmWsMeta 类型。
  • 接收到的文本消息被视为错误。我们还可以考虑将这些消息以外带方式返回给客户端代码,而不是将它们包含在 AsyncRead/AsyncWrite 的数据中。这与 ws_stream_wasm 不一致,后者在它们上调用 to_bytes 并将字节包含在字节流中。

API

API 文档可以在 docs.rs 上找到。

参考

了解 WebSocket 和浏览器如何处理它们的参考文档是

贡献

请查看贡献指南

测试

cargotest --all-features

行为准则

公民行为准则第 4 点“不可接受的行为”中描述的任何行为在这里都不受欢迎,并可能导致您被禁止。如果包括维护者和项目管理员在内的任何人未能尊重这些/您的限制,您有权将他们指出。

许可证

未经许可

依赖

~2.3–4.5MB
~81K SLoC