2 个版本
使用旧 Rust 2015
0.1.1 | 2017年4月3日 |
---|---|
0.1.0 | 2017年4月1日 |
#51 in #rpc-framework
在 rocksd 中使用
17KB
145 行
Tokio-RPC
基于 tokio 的 Rust RPC 框架。
帧协议
+-- MAGIC_VER: 1 --+--- request_id: 7 ---+-- payload_len: 4 --+-- payload --+
| 0b00101010, 42 | 0b00000000000001 | 0xffffffff, 4G - 1 | message |
+------------------+---------------------+--------------------+-------------+
最小帧大小:1 + 7 + 4 = 12 字节
最大帧大小:1 + 7 + 4 + 0xffffffff = 4294967307 字节
使用 protobuf 的示例 RPC
https://github.com/iorust/tokio-rpc/blob/master/examples/protobuf_rpc.rs
运行
cargo run --example protobuf_rpc
extern crate tokio_rpc;
extern crate futures;
extern crate tokio_core;
extern crate tokio_service;
extern crate protobuf;
use futures::{future, Future};
use tokio_core::reactor::Core;
use tokio_service::{Service, NewService};
use std::{io, thread};
use std::time::Duration;
use protobuf::core::{Message, parse_from_bytes};
mod rpcpb;
struct Demo;
impl Service for Demo {
type Request = Vec<u8>;
type Response = Vec<u8>;
type Error = io::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
fn call(&self, req: Self::Request) -> Self::Future {
let req = parse_from_bytes::<rpcpb::Request>(req.as_slice());
if let Err(err) = req {
return future::err(io::Error::new(io::ErrorKind::Other, err)).boxed();
}
let req = req.unwrap();
let req = req.get_cmd_ping_req();
println!("Request {:?}", req);
let mut msg = rpcpb::CmdPingResponse::new();
msg.set_message(req.get_message().to_string());
let mut res = rpcpb::Response::new();
res.set_field_type(rpcpb::MessageType::CmdPing);
res.set_cmd_ping_res(msg);
match res.write_to_bytes() {
Ok(val) => future::ok(val).boxed(),
Err(err) => future::err(io::Error::new(io::ErrorKind::Other, err)).boxed(),
}
}
}
impl NewService for Demo {
type Request = Vec<u8>;
type Response = Vec<u8>;
type Error = io::Error;
type Instance = Demo;
fn new_service(&self) -> io::Result<Self::Instance> {
Ok(Demo {})
}
}
pub fn main() {
let mut core = Core::new().unwrap();
let addr = "127.0.0.1:12345".parse().unwrap();
let demo = Demo {};
thread::spawn(move || { tokio_rpc::serve(addr, demo); });
// A bit annoying, but we need to wait for the server to connect
thread::sleep(Duration::from_millis(100));
let handle = core.handle();
core.run(tokio_rpc::Client::connect(&addr, &handle).and_then(|client| {
let mut msg = rpcpb::CmdPingRequest::new();
msg.set_message("Hello world!".to_string());
let mut req = rpcpb::Request::new();
req.set_field_type(rpcpb::MessageType::CmdPing);
req.set_cmd_ping_req(msg);
let buf = req.write_to_bytes().unwrap();
client.call(buf)
.and_then(|res| {
let res = parse_from_bytes::<rpcpb::Response>(res.as_slice());
println!("CLIENT Res: {:?}", res);
Ok(())
})
.or_else(|err| {
println!("CLIENT Err: {:?}", err);
Ok(())
})
}))
.unwrap();
}
依赖项
~6.5MB
~103K SLoC