19 个版本 (6 个重大更改)

0.7.3 2024 年 8 月 7 日
0.7.0 2024 年 7 月 17 日
0.5.2 2024 年 3 月 5 日
0.5.1 2023 年 11 月 10 日

#121并发

Download history 155/week @ 2024-05-13 20/week @ 2024-05-20 113/week @ 2024-05-27 5/week @ 2024-06-03 4/week @ 2024-06-10 13/week @ 2024-07-01 113/week @ 2024-07-15 128/week @ 2024-07-29 219/week @ 2024-08-05

每月 460 次下载

MIT/Apache

200KB
4K SLoC

nbio

描述

该 crate 的目标是使单向和双向非阻塞 I/O 的推理更加容易。

这是通过使用超出直接处理原始字节的模式来完成的,这些模式扩展了 std::io::Readstd::io::Write 特性,以及 std::io::ErrorKind::WouldBlock 错误。由于此 crate 的主要重点是非阻塞 I/O,因此该 crate 提供的所有 Session 实现 默认为非阻塞。

会话

核心 Session 特性封装了控制单个连接或逻辑会话的实例。为了与仅处理原始字节的 std::io::Readstd::io::Write 特性区分开来,此 crate 使用 PublishReceive 术语,这些术语使用关联类型来处理任何有效负载类型。

一个 Session 实现,通常是 PublishReceive 或两者都有。虽然 [tcp] 模块提供了一个提供无框架非阻塞二进制 IO 操作的 Session 实现,但其他 Session 实现能够通过相同的非阻塞模式提供显著更多的功能。

此包通常使用术语 Duplex 来区分既是 Publish 也是 ReceiveSession

关联类型

会话在特定的 Receive::ReceivePayloadPublish::PublishPayload 类型上操作。这些类型能够利用一个生命周期 'a,它与底层 Session 的生命周期相关联,为实现提供引用内部缓冲区或队列而不复制的能力。

错误

此包的哲学是,一个 [Err] 应始终表示传输或协议级别的错误。一个 [Err] 不应作为一个在 正常 分支逻辑中应处理的条件由函数返回。因此,而不是迫使你在处理非阻塞代码的每个地方都处理 std::io::ErrorKind::WouldBlock,此包将通过 ReceiveOutcome::IdleReceiveOutcome::BufferedPublishOutcome::Incomplete 作为 Result::Ok 来指示部分接收/发布操作。

特性

此包中的 Session 实现是通过某些特性启用的。默认情况下,不需要特殊构建环境的特性被启用以进行快速原型设计。在生产代码库中,你可能会想要挑选和选择你需要的特性。

特性列表

  • aeron
  • crossbeam
  • http
  • mock
  • mpsc
  • tcp
  • websocket

默认未启用的特性

  • aeron:需要 cmakeclang

示例

流式 TCP

以下示例展示了如何使用流式 TCP 来发布和接收传统的字节流。

use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
use nbio::tcp::TcpSession;

// establish connection
let mut client = TcpSession::connect("192.168.123.456:54321").unwrap();

// publish some bytes until completion
let mut pending_publish = "hello world!".as_bytes();
while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
    pending_publish = pending;
    client.drive().unwrap();
}

// print received bytes
loop {
    if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
        println!("received: {payload:?}");
    }
}

设置TCP

以下示例展示了如何使用frame在TCP上对消息进行封装,以便发布和接收带有前缀u64长度字段的负载。请注意,它与上面的代码几乎相同,但它保证了读取的切片始终与其对应的写入切片相同。

use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
use nbio::tcp::TcpSession;
use nbio::frame::{FrameDuplex, U64FrameDeserializer, U64FrameSerializer};

// establish connection wrapped in a framing session
let client = TcpSession::connect("192.168.123.456:54321").unwrap();
let mut client = FrameDuplex::new(client, U64FrameDeserializer::new(), U64FrameSerializer::new(), 4096);

// publish some bytes until completion
let mut pending_publish = "hello world!".as_bytes();
while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
    pending_publish = pending;
    client.drive().unwrap();
}

// print received bytes
loop {
    if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
        println!("received: {payload:?}");
    }
}

HTTP客户端

以下示例展示了如何使用http模块驱动HTTP 1.x请求/响应,同时使用相同的非阻塞模型。请注意,驱动缓冲写入并接收封装响应的原语与任何其他封装会话相同。实际上,由client.request(..)返回的frame::FrameDuplex只是利用了http::Http1RequestSerializerhttp::Http1ResponseDeserializerframe::FrameDuplex

use http::Request;
use nbio::{Receive, Session, ReceiveOutcome};
use nbio::http::HttpClient;
use tcp_stream::OwnedTLSConfig;

// create the client and make the request
let mut client = HttpClient::new();
let mut conn = client
    .request(Request::get("http://icanhazip.com").body(()).unwrap())
    .unwrap();

// drive and read the conn until a full response is received
loop {
    conn.drive().unwrap();
    if let ReceiveOutcome::Payload(r) = conn.receive().unwrap() {
        println!("Response Body: {}", String::from_utf8_lossy(r.body()));
        break;
    }
}

WebSocket

以下示例发送一条消息,然后从WebSocket连接接收所有后续消息。就像HTTP示例一样,这只是一个封装frame::FrameDuplex,但它使用了websocket::WebSocketFrameSerializerwebsocket::WebSocketFrameDeserializer。所有的TLS和WebSocket握手都在SessionStatus::Establishing Session::status工作流程中进行处理。

use nbio::{Publish, PublishOutcome, Receive, Session, SessionStatus, ReceiveOutcome};
use nbio::websocket::{Message, WebSocketSession};

// connect and drive the handshake
let mut session = WebSocketSession::connect("wss://echo.websocket.org/", None).unwrap();
while session.status() == SessionStatus::Establishing {
     session.drive().unwrap();
}

// publish a message
let mut pending_publish = Message::Text("hello world!".into());
while let PublishOutcome::Incomplete(pending) = session.publish(pending_publish).unwrap() {
    pending_publish = pending;
    session.drive().unwrap();
}

// drive and receive messages
loop {
    session.drive().unwrap();
    if let ReceiveOutcome::Payload(r) = session.receive().unwrap() {
        println!("Received: {:?}", r);
        break;
    }
}

许可证:MIT OR Apache-2.0

依赖关系

~0.5–4.5MB
~78K SLoC