#实时 #supabase #cdc #websocket #事件

realtime-rs

Supabase 实时客户端库。异步和同步接口。

1 个不稳定版本

0.1.0 2024年2月9日

#853数据库接口

MIT/Apache

71KB
1.5K SLoC

Supabase 实时-rs

为 Supabase 实时提供异步和同步 WebSocket 客户端包装。

安装

cargo添加 realtime-rs

使用方法

同步 API

examples/broadcast_sync.rs

use std::{collections::HashMap, thread::sleep, time::Duration};

use realtime_rs::{
    message::payload::BroadcastConfig, realtime_channel::RealtimeChannelBuilder,
    realtime_client::RealtimeClientBuilder,
};

fn main() {
    let endpoint = "http://127.0.0.1:54321";
    let access_token = std::env::var("LOCAL_ANON_KEY").unwrap();

    let client = RealtimeClientBuilder::new(endpoint, access_token)
        .heartbeat_interval(Duration::from_secs(29))
        .connect()
        .to_sync();

    let channel = RealtimeChannelBuilder::new("TestTopic")
        .broadcast(BroadcastConfig {
            broadcast_self: true,
            ack: false,
        })
        .on_broadcast("test_event", |map| println!("Event get! {:?}", map))
        .build_sync(&client)
        .unwrap();

    channel.subscribe();

    let mut payload = realtime_rs::message::payload::BroadcastPayload {
        event: "test_event".into(),
        payload: HashMap::new(),
        ..Default::default()
    };

    let mut count = 0;

    loop {
        count += 1;
        println!("SENDING {}", count);
        payload.payload.insert("count".into(), count.into());
        let _ = channel.broadcast(payload.clone());

        sleep(Duration::from_millis(1000));
    }
}

异步 API

该包内部使用 tokio

examples/broadcast_async.rs

use std::{collections::HashMap, time::Duration};

use realtime_rs::{
    message::payload::BroadcastConfig, realtime_channel::RealtimeChannelBuilder,
    realtime_client::RealtimeClientBuilder,
};
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let endpoint = "http://127.0.0.1:54321";
    let access_token = std::env::var("LOCAL_ANON_KEY").unwrap();

    let client = RealtimeClientBuilder::new(endpoint, access_token)
        .heartbeat_interval(Duration::from_secs(29))
        .connect();

    let channel = RealtimeChannelBuilder::new("TestTopic")
        .broadcast(BroadcastConfig {
            broadcast_self: true,
            ack: false,
        })
        .on_broadcast("test_event", |map| println!("Event get! {:?}", map))
        .build(&client)
        .await
        .unwrap();

    channel.subscribe_blocking().await.unwrap();

    let mut payload = realtime_rs::message::payload::BroadcastPayload {
        event: "test_event".into(),
        payload: HashMap::new(),
        ..Default::default()
    };

    let mut count = 0;

    tokio::spawn(async move {
        loop {
            count += 1;
            println!("SENDING {}", count);
            payload.payload.insert("count".into(), count.into());
            let _ = channel.broadcast(payload.clone());
            sleep(Duration::from_millis(1000)).await;
        }
    })
    .await
    .unwrap();
}

更多内容请参阅 /examples

待办事项

  • 连接超时
  • 可进行 doctest 的示例
  • 自定义中间件消息修改函数
  • REST 通道发送
  • 移除未使用的 derive

    意味着手动实现许多 SerializeDeserialize 特性.. 繁重的工作

  • 节流

示例

  • 示例:对系统消息进行操作 && 测试当我们忽略它们时会发生什么?需要处理无法订阅的错误。

中间件

  • 中间件顺序
  • 中间件示例 (?) 尝试使用当前 API 看看是否需要中间件
  • 通过 MessageEvent 过滤中间件

贡献

欢迎建议和 PR!

请随意打开任何有关错误、建议或想法的问题。

要提交 PR,请克隆、分支然后向仓库提交请求。

许可证

MIT / Apache 2,您知道这是 Rust 项目。

依赖关系

~8–20MB
~306K SLoC