2个版本
新 0.1.3 | 2024年8月19日 |
---|---|
0.1.0 | 2024年6月24日 |
#770 在 网络编程
94 每月下载
用于 2 包
31KB
556 行
prople/jsonrpc/core
在 Prople
中,默认的 CCP (客户端通信协议)
,即客户端设备或用户应用程序的通信协议将是 JSONRPC
。
选择 JSONRPC
而不是 REST API
的原因是 JSONRPC
本身比 REST
更简单。我们只需要维护一个端点和其处理程序。正如它在官方网站上所说
一个轻量级的远程过程调用协议。它被设计成简单的!
rpc调用示例
{"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1}
响应示例
{"jsonrpc": "2.0", "result": 19, "id": 1}
Prople JSONRPC
这个库提供与 JSONRPC
自身的基抽象。基抽象将提供
- 请求抽象
- 响应抽象
- 错误抽象
这些抽象遵循 JSON-RPC 2.0 规范。
除了这些抽象之外,prople/jsonrpc/core
还提供了 rpc处理器。这个抽象被设计来处理每个rpc方法和其处理程序。
Handler
抽象
#[async_trait]
pub trait Handler {
async fn call(&self, params: Value) -> Result<Option<Box<dyn ErasedSerialized>>>;
}
pub type Method = String;
RpcProcessor
抽象
pub struct RpcProcessorObject {
pub handlers: HashMap<RpcMethod, Box<dyn RpcHandler + Send + Sync>>,
}
这些抽象被设计为 JSONRPC
的核心,我们不需要关心任何HTTP框架实现。
通过使用核心rpc抽象,所有抽象和逻辑处理程序都将从任何特定框架中分离出来,这意味着我们可以轻松地更改HTTP框架。
这些抽象实现的示例
fn build_rpc_processor(args: AnaArgs) -> RpcState {
let mut cf_names: Vec<String> = Vec::new();
cf_names.push(types::STORAGE_COLUMN_DID.to_string());
cf_names.push(types::STORAGE_COLUMN_DID_DOC.to_string());
cf_names.push(types::STORAGE_COLUMN_DID_KEY_SECURES.to_string());
let mut rocksdb_opts = RocksDBOptions::default();
rocksdb_opts.max_write_buffer_number = 6;
let storage_path = args.db_path;
let storage = RocksDB::new(&storage_path, cf_names, rocksdb_opts)
.expect("Failed to initiate RocksDB instance");
let storage_did = StorageDID::new(storage);
let handler_did = IdentityRPCHandler::new(Box::new(storage_did));
let mut processor = RpcProcessorObject::build();
processor.register_handler(
String::from("prople.agent.controller.setup"),
Box::new(handler_did),
);
RpcState {
processor: Arc::new(processor),
}
}
在 Tokio Axum
中的示例
pub async fn run_rpc(args: AnaArgs, canceller: Receiver<i32>) {
let rpc_state = build_rpc_processor(args.clone());
let app = Router::new()
.route("/rpc", post(rpc_handler))
.layer((
TraceLayer::new_for_http()
.make_span_with(|request: &Request| {
let matched_path = request
.extensions()
.get::<MatchedPath>()
.map(MatchedPath::as_str);
info_span!(
"http_request",
method = ?request.method(),
matched_path,
some_other_field = tracing::field::Empty,
)
})
.on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
TimeoutLayer::new(Duration::from_secs(5)),
))
.with_state(Arc::new(rpc_state));
let rpc_addr = format!("0.0.0.0:{}", args.rpc_port);
tracing::info!("listening on: {}", rpc_addr);
let listener = tokio::net::TcpListener::bind(rpc_addr).await.unwrap();
let (close_tx, close_rx) = watch::channel(());
let mut canceller = canceller.clone();
loop {
let (socket, remote_addr) = select! {
result = listener.accept() => {
result.unwrap()
}
_ = canceller.changed() => {
tracing::warn!("canceller catched! stopping tcp listener to receive request...");
break;
}
};
let tower_svc = app.clone();
let close_rx = close_rx.clone();
let mut canceller = canceller.clone();
task::spawn(async move {
let socket = TokioIo::new(socket);
let hyper_svc = hyper::service::service_fn(move |request: Request<Incoming>| {
tower_svc.clone().call(request)
});
let conn = hyper::server::conn::http1::Builder::new()
.serve_connection(socket, hyper_svc)
.with_upgrades();
let mut conn = std::pin::pin!(conn);
loop {
select! {
result = conn.as_mut() => {
if let Err(err) = result {
tracing::debug!("failed to serve connection: {}", err)
}
break;
}
_ = canceller.changed() => {
tracing::warn!("canceller catched! starting hyper connection to gracefully shutdown");
conn.as_mut().graceful_shutdown();
}
}
}
tracing::debug!("connection {remote_addr} closed");
drop(close_rx);
});
}
drop(close_rx);
drop(listener);
tracing::debug!("waiting for {} tasks to finish", close_tx.receiver_count());
close_tx.closed().await;
}
RPC处理器实际上与框架的实现是分离的。HTTP框架(Axum
)仅负责打开TCP端口,获取请求,将请求负载路由到RPC处理器,获取响应,并将其转发回请求调用者。
默认处理器
存在一个默认的RPC方法及其处理器。
let mut handlers: HashMap<String, Box<dyn RpcHandler + Send + Sync>> = HashMap::new();
handlers.insert("prople.agent.ping".to_string(), Box::new(AgentPingHandler));
这个默认方法和处理器用于发送ping-pong请求-响应。处理器本身看起来像这样
#[async_trait]
impl RpcHandler for AgentPingHandler {
async fn call(&self, _: Value) -> Result<Option<Box<dyn ErasedSerialized>>> {
let output = AgentPingResponse {
message: String::from("pong!"),
};
Ok(Some(Box::new(output)))
}
}
依赖项
~8-19MB
~285K SLoC