7 个版本
0.4.0-beta.4 | 2022 年 11 月 11 日 |
---|---|
0.4.0-beta.1 | 2021 年 10 月 30 日 |
0.3.4 | 2021 年 10 月 14 日 |
0.3.3 | 2021 年 9 月 25 日 |
0.3.1 | 2021 年 7 月 14 日 |
#421 in 异步
712 个月下载
在 4 crates 中使用
34KB
535 行
stream-reconnect
此 crate 提供了一个 Stream
/Sink
包装结构体,可自动从潜在的断开/中断中恢复。
这是 stubborn-io 的分支,它是为相同的用途而构建的,但用于 AsyncRead
/AsyncWrite
。
要在项目中使用,请将以下内容添加到您的 Cargo.toml 中
stream-reconnect = "0.3"
最低支持的 Rust 版本:1.43.1
运行时支持
此 crate 支持两个 tokio
和 async-std
运行时。
tokio
支持默认启用。当在 async-std
运行时使用时,请将 Cargo.toml
中的相应依赖项更改为
stream-reconnect = { version = "0.3", default-features = false, features = ["async-std"] }
功能门
not-send
- 允许建立函数不是线程安全的。
示例
在这个示例中,我们将看到 tungstenite 的 WebSocketStream 的替代品,其区别在于它将自动尝试在连接失败的情况下重新连接。
use stream_reconnect::{UnderlyingStream, ReconnectStream};
use std::future::Future;
use std::io;
use std::pin::Pin;
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::{Message, error::Error as WsError};
use futures::{SinkExt, Stream, Sink};
struct MyWs;
impl UnderlyingStream<String, Result<Message, WsError>, WsError> for MyWs {
type Stream = WebSocketStream<MaybeTlsStream<TcpStream>>;
// Establishes connection.
// Additionally, this will be used when reconnect tries are attempted.
fn establish(addr: String) -> Pin<Box<dyn Future<Output = Result<Self::Stream, WsError>> + Send>> {
Box::pin(async move {
// In this case, we are trying to connect to the WebSocket endpoint
let ws_connection = connect_async(addr).await.unwrap().0;
Ok(ws_connection)
})
}
// The following errors are considered disconnect errors.
fn is_write_disconnect_error(err: &WsError) -> bool {
matches!(
err,
WsError::ConnectionClosed
| WsError::AlreadyClosed
| WsError::Io(_)
| WsError::Tls(_)
| WsError::Protocol(_)
)
}
// If an `Err` is read, then there might be an disconnection.
fn is_read_disconnect_error(item: &Result<Message, WsError>) -> bool {
if let Err(e) = item {
Self::is_write_disconnect_error(e)
} else {
false
}
}
// Return "Exhausted" if all retry attempts are failed.
fn exhaust_err() -> WsError {
WsError::Io(io::Error::new(io::ErrorKind::Other, "Exhausted"))
}
}
type ReconnectWs = ReconnectStream<MyWs, String, Result<Message, WsError>, WsError>;
let mut ws_stream = ReconnectWs::connect(String::from("ws://127.0.0.1:8000")).await.unwrap();
ws_stream.send("hello world!".into()).await.unwrap();
许可证
MIT
依赖项
~3–14MB
~157K SLoC