9个版本 (破坏性更新)

0.8.0 2024年3月28日
0.7.0 2023年11月17日
0.6.1 2023年5月5日
0.6.0 2023年3月15日
0.3.0 2022年11月25日

#35 in WebSocket

Download history 23/week @ 2024-04-22 109/week @ 2024-04-29 16/week @ 2024-05-27 7/week @ 2024-06-03 22/week @ 2024-06-10 24/week @ 2024-06-17 55/week @ 2024-06-24 1/week @ 2024-07-01 35/week @ 2024-07-08 204/week @ 2024-07-15 146/week @ 2024-07-22

389 每月下载量
用于 2 crates

MIT 许可证

89KB
1.5K SLoC

Yrs WebSocket连接

这个库是在Yjs/Yrs无冲突复杂数据类型(CRDT)消息交换协议之上的扩展。它提供了使用Rust tokio的warp web服务器连接到Yjs WebSocket提供者的实用工具。

演示

可以在examples子目录下看到一个工作演示。它将此库API与Code Mirror 6集成,增强了协作丰富文本文档编辑功能。

示例

为了在不同WebSocket连接之间传播来自客户端的更新,可以使用广播组


#[tokio::main]
async fn main() {
    // We're using a single static document shared among all the peers.
    let awareness = Arc::new(RwLock::new(Awareness::new(Doc::new())));

    // open a broadcast group that listens to awareness and document updates
    // and has a pending message buffer of up to 32 updates
    let bcast = Arc::new(BroadcastGroup::new(awareness, 32).await);

    let ws = warp::path("my-room")
        .and(warp::ws())
        .and(warp::any().map(move || bcast.clone()))
        .and_then(ws_handler);

    warp::serve(ws).run(([0, 0, 0, 0], 8000)).await;
}

async fn ws_handler(ws: Ws, bcast: Arc<BroadcastGroup>) -> Result<impl Reply, Rejection> {
    Ok(ws.on_upgrade(move |socket| peer(socket, bcast)))
}

async fn peer(ws: WebSocket, bcast: Arc<BroadcastGroup>) {
    let (sink, stream) = ws.split();
    let sink = Arc::new(Mutex::new(WarpSink::from(sink)));
    let stream = WarpStream::from(stream);
    let sub = bcast.subscribe(sink, stream);
    match sub.completed().await {
        Ok(_) => println!("broadcasting for channel finished successfully"),
        Err(e) => eprintln!("broadcasting for channel finished abruptly: {}", e),
    }
}

自定义协议扩展

y-sync协议允许扩展其自己的协议,而yrs-warp也支持这一点。这可以通过实现自己的协议来完成,例如。

use y_sync::sync::Protocol;

struct EchoProtocol;
impl Protocol for EchoProtocol {
    fn missing_handle(
        &self,
        awareness: &mut Awareness,
        tag: u8,
        data: Vec<u8>,
    ) -> Result<Option<Message>, Error> {
        // all messages prefixed with tags unknown to y-sync protocol
        // will be echo-ed back to the sender
        Ok(Some(Message::Custom(tag, data)))
    }
}

async fn peer(ws: WebSocket, awareness: AwarenessRef) {
    //.. later in code subscribe with custom protocol parameter
    let sub = bcast.subscribe_with(sink, stream, EchoProtocol);
    // .. rest of the code
}

y-webrtc和信令服务

除了作为y-websocket服务器的作用外,yrs-warp还提供了一种信令服务器实现,该实现由y-webrtc客户端用于交换连接WebRTC对等方和订阅/取消订阅特定房间所需的信息。

use warp::{Filter, Rejection, Reply};
use warp::ws::{Ws, WebSocket};
use yrs_warp::signaling::{SignalingService, signaling_conn};

#[tokio::main]
async fn main() {
  let signaling = SignalingService::new();
  let ws = warp::path("signaling")
      .and(warp::ws())
      .and(warp::any().map(move || signaling.clone()))
      .and_then(ws_handler);
  warp::serve(routes).run(([0, 0, 0, 0], 8000)).await;
}
async fn ws_handler(ws: Ws, svc: SignalingService) -> Result<impl Reply, Rejection> {
  Ok(ws.on_upgrade(move |socket| peer(socket, svc)))
}
async fn peer(ws: WebSocket, svc: SignalingService) {
  match signaling_conn(ws, svc).await {
    Ok(_) => println!("signaling connection stopped"),
    Err(e) => eprintln!("signaling connection failed: {}", e),
  }
}

赞助商

NLNET

依赖项

~10–20MB
~274K SLoC