#rpc-framework #quic #9p #protocols #reliable #secure #low-latency

jetstream

JetStream是基于9P协议和QUIC的Rust RPC框架。

18个版本 (7个稳定版)

3.0.0 2024年3月30日
2.0.2 2024年3月30日
1.1.1 2024年3月29日
0.6.0 2024年3月21日
0.1.5 2024年3月14日

#146文件系统

Download history 1/week @ 2024-06-01 13/week @ 2024-07-06 63/week @ 2024-07-27

每月76次下载

BSD-3-Clause

765KB
5.5K SLoC

JetStream

crates.io docs.rs Build Status Build Status crates.io downloads

JetStream是建立在 s2n-quicp9 之上的RPC框架。它被设计为一个高性能、低延迟、安全、可靠的RPC框架。

特性

  • 双向流
  • 0-RTT
  • mTLS
  • 二进制编码

动机

在互联网上构建远程文件系统是JetStream背后的主要动机。

准备好了吗?

JetStream尚未准备好投入生产使用。它仍处于开发的早期阶段。

替代方案

文档

示例 测试

let _guard = slog_scope::set_global_logger(setup_logging());
let _guard = slog_envlogger::new(drain());
let (_tx, _rx) = tokio::io::duplex(1024);
pub static CA_CERT_PEM: &str =
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/ca-cert.pem");
pub static SERVER_CERT_PEM: &str =
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-cert.pem");
pub static SERVER_KEY_PEM: &str =
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-key.pem");
pub static CLIENT_CERT_PEM: &str =
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-cert.pem");
pub static CLIENT_KEY_PEM: &str =
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-key.pem");

let tls = tls::default::Server::builder()
    .with_trusted_certificate(Path::new(CA_CERT_PEM))
    .unwrap()
    .with_certificate(
        Path::new(SERVER_CERT_PEM),
        Path::new(SERVER_KEY_PEM),
    )
    .unwrap()
    .with_client_authentication()
    .unwrap()
    .build()
    .unwrap();

let barrier = Arc::new(Barrier::new(3)).clone();
let c = barrier.clone();
let srv_handle = tokio::spawn(async move {
    let server = Server::builder()
        .with_tls(tls)
        .unwrap()
        .with_io("127.0.0.1:4433")
        .unwrap()
        .start()
        .unwrap();
    let qsrv: QuicServer<Tframe, Rframe, EchoService> =
        QuicServer::new(ninepecho::EchoService);
    debug!("Server started, waiting for barrier");
    c.wait().await;
    let _ = qsrv.serve(server).await;
});

let cert = path::PathBuf::from(CLIENT_CERT_PEM).into_boxed_path();
let key = path::PathBuf::from(CLIENT_KEY_PEM).into_boxed_path();
let ca_cert = path::PathBuf::from(CA_CERT_PEM).into_boxed_path();
let temp_dir = tmpdir::TmpDir::new("q9p").await.unwrap();

let mut listen = temp_dir.to_path_buf();
listen.push("q9p.sock");
let listen = listen.into_boxed_path();
let l = listen.clone();
let c = barrier.clone();
let prxy_handle = tokio::spawn(async move {
    debug!("Proxy started, waiting for barrier");
    c.wait().await;

    let prxy = Proxy::new(
        DialQuic::new(
            "127.0.0.1".to_string(),
            4433,
            cert,
            key,
            ca_cert,
            "localhost".to_string(),
        ),
        l.clone(),
    );
    let _ = prxy.run().await;
});
let c = barrier.clone();
let l = listen.clone();
let client_handle = tokio::spawn(async move {
    c.clone().wait().await;
    // sleep for 5 milliseconds to give the server time to start
    tokio::time::sleep(std::time::Duration::from_millis(5)).await;
    debug!("Connecting to {:?}", listen);
    let (mut read, mut write) = tokio::net::UnixStream::connect(l)
        .await
        .unwrap()
        .into_split();

    // loop 100 times
    for _ in 0..100 {
        let test = Tframe {
            tag: 0,
            msg: Ok(Tmessage::Version(Tversion {
                msize: 8192,
                version: "9P2000.L".to_string(),
            })),
        };
        debug!("Sending tframe: {:?}", test);
        // ping
        test.encode_async(&mut write).await.unwrap();
        write.flush().await.unwrap();
        debug!("Reading rframe");
        read.readable().await.unwrap();
        // pong
        let resp = Rframe::decode_async(&mut read).await.unwrap();
        assert_eq!(resp.tag, 0);
    }
});

let timeout = std::time::Duration::from_secs(10);

let timeout = tokio::time::sleep(timeout);

tokio::select! {
    _ = timeout => {
        panic!("Timeout");
    }
    _ = srv_handle => {
        panic!("Quic server failed");
    }
    _ = prxy_handle => {
        panic!("Proxy failed");
    }
    _ = client_handle => {
        return;
    }
}

许可证

BSD-3-Clause

依赖

~28–41MB
~740K SLoC