#r-socket #protocols #transport #tokio #traits #server #client-server

rsocket_rust

rsocket-rust 是 Rust 语言中 RSocket 协议的实现

20 个版本

0.7.5 2023 年 12 月 26 日
0.7.4 2023 年 6 月 1 日
0.7.3 2023 年 5 月 31 日
0.7.2 2021 年 9 月 1 日
0.2.0 2019 年 11 月 29 日

#1 in #r-socket

Download history 6/week @ 2024-04-22 5/week @ 2024-05-13 4/week @ 2024-05-20 20/week @ 2024-05-27 8/week @ 2024-06-03 19/week @ 2024-06-10 7/week @ 2024-06-17 19/week @ 2024-06-24 18/week @ 2024-07-01 19/week @ 2024-07-08 65/week @ 2024-07-15 37/week @ 2024-07-22 53/week @ 2024-07-29 54/week @ 2024-08-05

每月 209 次下载
用于 5 crates

Apache-2.0

145KB
4K SLoC

RSocket 核心库

示例

以下是一些示例代码,展示了 RSocket 在 Rust 中的使用方法。

依赖

在您的 Cargo.toml 中添加依赖项。

[dependencies]
tokio = "0.3.6"
rsocket_rust = "0.7.0"

# add transport dependencies:
# rsocket_rust_transport_tcp = "0.7.0"
# rsocket_rust_transport_websocket = "0.7.0"

服务器

use rsocket_rust::prelude::*;
use rsocket_rust::utils::EchoRSocket;
use rsocket_rust::Result;
use rsocket_rust_transport_tcp::TcpServerTransport;

#[tokio::main]
async fn main() -> Result<()> {
    RSocketFactory::receive()
        .transport(TcpServerTransport::from("127.0.0.1:7878"))
        .acceptor(Box::new(|setup, _socket| {
            println!("accept setup: {:?}", setup);
            Ok(Box::new(EchoRSocket))
            // Or you can reject setup
            // Err(From::from("SETUP_NOT_ALLOW"))
        }))
        .on_start(Box::new(|| println!("+++++++ echo server started! +++++++")))
        .serve()
        .await
}

客户端

use rsocket_rust::prelude::*;
use rsocket_rust::Result;
use rsocket_rust_transport_tcp::TcpClientTransport;

#[tokio::main]
async fn main() -> Result<()> {
    let cli = RSocketFactory::connect()
        .transport(TcpClientTransport::from("127.0.0.1:7878"))
        .setup(Payload::from("READY!"))
        .mime_type("text/plain", "text/plain")
        .on_close(Box::new(|| println!("connection closed")))
        .start()
        .await?;
    let req = Payload::builder()
        .set_data_utf8("Hello World!")
        .set_metadata_utf8("Rust")
        .build();
    let res = cli.request_response(req).await?;
    println!("got: {:?}", res);

    // If you want to block until socket disconnected.
    cli.wait_for_close().await;

    Ok(())
}

实现 RSocket 特性

访问 Redis 的示例(crates

注意:在 Cargo.toml 中添加依赖项 => redis = { version = "0.19.0", features = [ "aio" ] }

use std::str::FromStr;

use redis::Client as RedisClient;
use rsocket_rust::async_trait;
use rsocket_rust::prelude::*;
use rsocket_rust::Result;

#[derive(Clone)]
pub struct RedisDao {
    inner: RedisClient,
}

// Create RedisDao from str.
// Example: RedisDao::from_str("redis://127.0.0.1").expect("Connect redis failed!");
impl FromStr for RedisDao {
    type Err = redis::RedisError;

    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
        let client = redis::Client::open(s)?;
        Ok(RedisDao { inner: client })
    }
}

#[async_trait]
impl RSocket for RedisDao {
    async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
        let client = self.inner.clone();
        let mut conn = client.get_async_connection().await?;
        let value: redis::RedisResult<Option<String>> = redis::cmd("GET")
            .arg(&[req.data_utf8()])
            .query_async(&mut conn)
            .await;
        match value {
            Ok(Some(value)) => Ok(Some(Payload::builder().set_data_utf8(&value).build())),
            Ok(None) => Ok(None),
            Err(e) => Err(e.into()),
        }
    }

    async fn metadata_push(&self, _req: Payload) -> Result<()> {
        todo!()
    }

    async fn fire_and_forget(&self, _req: Payload) -> Result<()> {
        todo!()
    }

    fn request_stream(&self, _req: Payload) -> Flux<Result<Payload>> {
        todo!()
    }

    fn request_channel(&self, _reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
        todo!()
    }
}

依赖

~4–13MB
~135K SLoC