19 个版本 (6 个重大更改)
新 0.7.3 | 2024 年 8 月 7 日 |
---|---|
0.7.0 | 2024 年 7 月 17 日 |
0.5.2 | 2024 年 3 月 5 日 |
0.5.1 | 2023 年 11 月 10 日 |
#121 在 并发
每月 460 次下载
200KB
4K SLoC
nbio
描述
该 crate 的目标是使单向和双向非阻塞 I/O 的推理更加容易。
这是通过使用超出直接处理原始字节的模式来完成的,这些模式扩展了 std::io::Read
和 std::io::Write
特性,以及 std::io::ErrorKind::WouldBlock
错误。由于此 crate 的主要重点是非阻塞 I/O,因此该 crate 提供的所有 Session
实现 默认为非阻塞。
会话
核心 Session
特性封装了控制单个连接或逻辑会话的实例。为了与仅处理原始字节的 std::io::Read
和 std::io::Write
特性区分开来,此 crate 使用 Publish
和 Receive
术语,这些术语使用关联类型来处理任何有效负载类型。
一个 Session
实现,通常是 Publish
、Receive
或两者都有。虽然 [tcp
] 模块提供了一个提供无框架非阻塞二进制 IO 操作的 Session
实现,但其他 Session
实现能够通过相同的非阻塞模式提供显著更多的功能。
此包通常使用术语 Duplex
来区分既是 Publish
也是 Receive
的 Session
。
关联类型
会话在特定的 Receive::ReceivePayload
和 Publish::PublishPayload
类型上操作。这些类型能够利用一个生命周期 'a
,它与底层 Session
的生命周期相关联,为实现提供引用内部缓冲区或队列而不复制的能力。
错误
此包的哲学是,一个 [Err
] 应始终表示传输或协议级别的错误。一个 [Err
] 不应作为一个在 正常 分支逻辑中应处理的条件由函数返回。因此,而不是迫使你在处理非阻塞代码的每个地方都处理 std::io::ErrorKind::WouldBlock
,此包将通过 ReceiveOutcome::Idle
、ReceiveOutcome::Buffered
和 PublishOutcome::Incomplete
作为 Result::Ok
来指示部分接收/发布操作。
特性
此包中的 Session
实现是通过某些特性启用的。默认情况下,不需要特殊构建环境的特性被启用以进行快速原型设计。在生产代码库中,你可能会想要挑选和选择你需要的特性。
特性列表
aeron
crossbeam
http
mock
mpsc
tcp
websocket
默认未启用的特性
aeron
:需要cmake
和clang
。
示例
流式 TCP
以下示例展示了如何使用流式 TCP 来发布和接收传统的字节流。
use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
use nbio::tcp::TcpSession;
// establish connection
let mut client = TcpSession::connect("192.168.123.456:54321").unwrap();
// publish some bytes until completion
let mut pending_publish = "hello world!".as_bytes();
while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
pending_publish = pending;
client.drive().unwrap();
}
// print received bytes
loop {
if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
println!("received: {payload:?}");
}
}
设置TCP
以下示例展示了如何使用frame
在TCP上对消息进行封装,以便发布和接收带有前缀u64长度字段的负载。请注意,它与上面的代码几乎相同,但它保证了读取的切片始终与其对应的写入切片相同。
use nbio::{Publish, PublishOutcome, Receive, ReceiveOutcome, Session};
use nbio::tcp::TcpSession;
use nbio::frame::{FrameDuplex, U64FrameDeserializer, U64FrameSerializer};
// establish connection wrapped in a framing session
let client = TcpSession::connect("192.168.123.456:54321").unwrap();
let mut client = FrameDuplex::new(client, U64FrameDeserializer::new(), U64FrameSerializer::new(), 4096);
// publish some bytes until completion
let mut pending_publish = "hello world!".as_bytes();
while let PublishOutcome::Incomplete(pending) = client.publish(pending_publish).unwrap() {
pending_publish = pending;
client.drive().unwrap();
}
// print received bytes
loop {
if let ReceiveOutcome::Payload(payload) = client.receive().unwrap() {
println!("received: {payload:?}");
}
}
HTTP客户端
以下示例展示了如何使用http
模块驱动HTTP 1.x请求/响应,同时使用相同的非阻塞模型。请注意,驱动缓冲写入并接收封装响应的原语与任何其他封装会话相同。实际上,由client.request(..)
返回的frame::FrameDuplex
只是利用了http::Http1RequestSerializer
和http::Http1ResponseDeserializer
的frame::FrameDuplex
。
use http::Request;
use nbio::{Receive, Session, ReceiveOutcome};
use nbio::http::HttpClient;
use tcp_stream::OwnedTLSConfig;
// create the client and make the request
let mut client = HttpClient::new();
let mut conn = client
.request(Request::get("http://icanhazip.com").body(()).unwrap())
.unwrap();
// drive and read the conn until a full response is received
loop {
conn.drive().unwrap();
if let ReceiveOutcome::Payload(r) = conn.receive().unwrap() {
println!("Response Body: {}", String::from_utf8_lossy(r.body()));
break;
}
}
WebSocket
以下示例发送一条消息,然后从WebSocket连接接收所有后续消息。就像HTTP示例一样,这只是一个封装frame::FrameDuplex
,但它使用了websocket::WebSocketFrameSerializer
和websocket::WebSocketFrameDeserializer
。所有的TLS和WebSocket握手都在SessionStatus::Establishing
Session::status
工作流程中进行处理。
use nbio::{Publish, PublishOutcome, Receive, Session, SessionStatus, ReceiveOutcome};
use nbio::websocket::{Message, WebSocketSession};
// connect and drive the handshake
let mut session = WebSocketSession::connect("wss://echo.websocket.org/", None).unwrap();
while session.status() == SessionStatus::Establishing {
session.drive().unwrap();
}
// publish a message
let mut pending_publish = Message::Text("hello world!".into());
while let PublishOutcome::Incomplete(pending) = session.publish(pending_publish).unwrap() {
pending_publish = pending;
session.drive().unwrap();
}
// drive and receive messages
loop {
session.drive().unwrap();
if let ReceiveOutcome::Payload(r) = session.receive().unwrap() {
println!("Received: {:?}", r);
break;
}
}
许可证:MIT OR Apache-2.0
依赖关系
~0.5–4.5MB
~78K SLoC