#rpc #rpc-framework #tokio #base #magic-ver

tokio-rpc

基于 tokio 的 Rust RPC 框架

2 个版本

使用旧 Rust 2015

0.1.1 2017年4月3日
0.1.0 2017年4月1日

#51 in #rpc-framework


rocksd 中使用

MIT 许可证

17KB
145

Tokio-RPC

基于 tokio 的 Rust RPC 框架。

Crates version Build Status Coverage Status Crates downloads Docs Status

帧协议

+-- MAGIC_VER: 1 --+--- request_id: 7 ---+-- payload_len: 4 --+-- payload --+
|  0b00101010, 42  |  0b00000000000001   | 0xffffffff, 4G - 1 |   message   |
+------------------+---------------------+--------------------+-------------+

最小帧大小:1 + 7 + 4 = 12 字节

最大帧大小:1 + 7 + 4 + 0xffffffff = 4294967307 字节

使用 protobuf 的示例 RPC

https://github.com/iorust/tokio-rpc/blob/master/examples/protobuf_rpc.rs

运行

cargo run --example protobuf_rpc
extern crate tokio_rpc;
extern crate futures;
extern crate tokio_core;
extern crate tokio_service;
extern crate protobuf;

use futures::{future, Future};
use tokio_core::reactor::Core;
use tokio_service::{Service, NewService};
use std::{io, thread};
use std::time::Duration;
use protobuf::core::{Message, parse_from_bytes};

mod rpcpb;

struct Demo;

impl Service for Demo {
    type Request = Vec<u8>;
    type Response = Vec<u8>;
    type Error = io::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let req = parse_from_bytes::<rpcpb::Request>(req.as_slice());
        if let Err(err) = req {
            return future::err(io::Error::new(io::ErrorKind::Other, err)).boxed();
        }

        let req = req.unwrap();
        let req = req.get_cmd_ping_req();
        println!("Request {:?}", req);

        let mut msg = rpcpb::CmdPingResponse::new();
        msg.set_message(req.get_message().to_string());
        let mut res = rpcpb::Response::new();
        res.set_field_type(rpcpb::MessageType::CmdPing);
        res.set_cmd_ping_res(msg);

        match res.write_to_bytes() {
            Ok(val) => future::ok(val).boxed(),
            Err(err) => future::err(io::Error::new(io::ErrorKind::Other, err)).boxed(),
        }
    }
}

impl NewService for Demo {
    type Request = Vec<u8>;
    type Response = Vec<u8>;
    type Error = io::Error;
    type Instance = Demo;

    fn new_service(&self) -> io::Result<Self::Instance> {
        Ok(Demo {})
    }
}

pub fn main() {
    let mut core = Core::new().unwrap();
    let addr = "127.0.0.1:12345".parse().unwrap();
    let demo = Demo {};
    thread::spawn(move || { tokio_rpc::serve(addr, demo); });
    // A bit annoying, but we need to wait for the server to connect
    thread::sleep(Duration::from_millis(100));

    let handle = core.handle();
    core.run(tokio_rpc::Client::connect(&addr, &handle).and_then(|client| {
            let mut msg = rpcpb::CmdPingRequest::new();
            msg.set_message("Hello world!".to_string());

            let mut req = rpcpb::Request::new();
            req.set_field_type(rpcpb::MessageType::CmdPing);
            req.set_cmd_ping_req(msg);
            let buf = req.write_to_bytes().unwrap();
            client.call(buf)
                .and_then(|res| {
                              let res = parse_from_bytes::<rpcpb::Response>(res.as_slice());
                              println!("CLIENT Res: {:?}", res);
                              Ok(())
                          })
                .or_else(|err| {
                             println!("CLIENT Err: {:?}", err);
                             Ok(())
                         })
        }))
        .unwrap();
}

依赖项

~6.5MB
~103K SLoC