#stream #reconnect #retry #io #async-io

stream-reconnect

自动从潜在断开/中断中恢复的 Stream 包装特性和结构体

7 个版本

0.4.0-beta.42022 年 11 月 11 日
0.4.0-beta.12021 年 10 月 30 日
0.3.4 2021 年 10 月 14 日
0.3.3 2021 年 9 月 25 日
0.3.1 2021 年 7 月 14 日

#421 in 异步

Download history 250/week @ 2024-03-13 301/week @ 2024-03-20 289/week @ 2024-03-27 215/week @ 2024-04-03 110/week @ 2024-04-10 74/week @ 2024-04-17 112/week @ 2024-04-24 74/week @ 2024-05-01 90/week @ 2024-05-08 123/week @ 2024-05-15 128/week @ 2024-05-22 123/week @ 2024-05-29 187/week @ 2024-06-05 272/week @ 2024-06-12 124/week @ 2024-06-19 87/week @ 2024-06-26

712 个月下载
4 crates 中使用

MIT 许可证

34KB
535

stream-reconnect

crates.io Documentation

此 crate 提供了一个 Stream/Sink 包装结构体,可自动从潜在的断开/中断中恢复。

这是 stubborn-io 的分支,它是为相同的用途而构建的,但用于 AsyncRead/AsyncWrite

要在项目中使用,请将以下内容添加到您的 Cargo.toml 中

stream-reconnect = "0.3"

最低支持的 Rust 版本:1.43.1

运行时支持

此 crate 支持两个 tokioasync-std 运行时。

tokio 支持默认启用。当在 async-std 运行时使用时,请将 Cargo.toml 中的相应依赖项更改为

stream-reconnect = { version = "0.3", default-features = false, features = ["async-std"] }

功能门

not-send - 允许建立函数不是线程安全的。

示例

在这个示例中,我们将看到 tungstenite 的 WebSocketStream 的替代品,其区别在于它将自动尝试在连接失败的情况下重新连接。

use stream_reconnect::{UnderlyingStream, ReconnectStream};
use std::future::Future;
use std::io;
use std::pin::Pin;
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::{Message, error::Error as WsError};
use futures::{SinkExt, Stream, Sink};

struct MyWs;

impl UnderlyingStream<String, Result<Message, WsError>, WsError> for MyWs {
    type Stream = WebSocketStream<MaybeTlsStream<TcpStream>>;

    // Establishes connection.
    // Additionally, this will be used when reconnect tries are attempted.
    fn establish(addr: String) -> Pin<Box<dyn Future<Output = Result<Self::Stream, WsError>> + Send>> {
        Box::pin(async move {
            // In this case, we are trying to connect to the WebSocket endpoint
            let ws_connection = connect_async(addr).await.unwrap().0;
            Ok(ws_connection)
        })
    }

    // The following errors are considered disconnect errors.
    fn is_write_disconnect_error(err: &WsError) -> bool {
        matches!(
                err,
                WsError::ConnectionClosed
                    | WsError::AlreadyClosed
                    | WsError::Io(_)
                    | WsError::Tls(_)
                    | WsError::Protocol(_)
            )
    }

    // If an `Err` is read, then there might be an disconnection.
    fn is_read_disconnect_error(item: &Result<Message, WsError>) -> bool {
        if let Err(e) = item {
            Self::is_write_disconnect_error(e)
        } else {
            false
        }
    }

    // Return "Exhausted" if all retry attempts are failed.
    fn exhaust_err() -> WsError {
        WsError::Io(io::Error::new(io::ErrorKind::Other, "Exhausted"))
    }
}

type ReconnectWs = ReconnectStream<MyWs, String, Result<Message, WsError>, WsError>;

let mut ws_stream = ReconnectWs::connect(String::from("ws://127.0.0.1:8000")).await.unwrap();
ws_stream.send("hello world!".into()).await.unwrap();

许可证

MIT

依赖项

~3–14MB
~157K SLoC