#json-rpc #abstraction #handler #id #response #prople #why

prople-jsonrpc-core

一个库提供与JSON-RPC交互的核心抽象

2个版本

0.1.3 2024年8月19日
0.1.0 2024年6月24日

#770网络编程

Download history 136/week @ 2024-06-24

94 每月下载
用于 2 包

GPL-3.0-only

31KB
556

prople/jsonrpc/core

Prople 中,默认的 CCP (客户端通信协议),即客户端设备或用户应用程序的通信协议将是 JSONRPC

选择 JSONRPC 而不是 REST API 的原因是 JSONRPC 本身比 REST 更简单。我们只需要维护一个端点和其处理程序。正如它在官方网站上所说

一个轻量级的远程过程调用协议。它被设计成简单的!

来源: https://www.jsonrpc.org/

rpc调用示例

{"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1}

响应示例

{"jsonrpc": "2.0", "result": 19, "id": 1}

Prople JSONRPC

这个库提供与 JSONRPC 自身的基抽象。基抽象将提供

  • 请求抽象
  • 响应抽象
  • 错误抽象

这些抽象遵循 JSON-RPC 2.0 规范

除了这些抽象之外,prople/jsonrpc/core 还提供了 rpc处理器。这个抽象被设计来处理每个rpc方法和其处理程序。

Handler 抽象

#[async_trait]
pub trait Handler {
    async fn call(&self, params: Value) -> Result<Option<Box<dyn ErasedSerialized>>>;
}

pub type Method = String;

RpcProcessor 抽象

pub struct RpcProcessorObject {
    pub handlers: HashMap<RpcMethod, Box<dyn RpcHandler + Send + Sync>>,
}

这些抽象被设计为 JSONRPC 的核心,我们不需要关心任何HTTP框架实现。

通过使用核心rpc抽象,所有抽象和逻辑处理程序都将从任何特定框架中分离出来,这意味着我们可以轻松地更改HTTP框架。

这些抽象实现的示例

fn build_rpc_processor(args: AnaArgs) -> RpcState {
    let mut cf_names: Vec<String> = Vec::new();
    cf_names.push(types::STORAGE_COLUMN_DID.to_string());
    cf_names.push(types::STORAGE_COLUMN_DID_DOC.to_string());
    cf_names.push(types::STORAGE_COLUMN_DID_KEY_SECURES.to_string());

    let mut rocksdb_opts = RocksDBOptions::default();
    rocksdb_opts.max_write_buffer_number = 6;

    let storage_path = args.db_path;
    let storage = RocksDB::new(&storage_path, cf_names, rocksdb_opts)
        .expect("Failed to initiate RocksDB instance");
    let storage_did = StorageDID::new(storage);
    let handler_did = IdentityRPCHandler::new(Box::new(storage_did));

    let mut processor = RpcProcessorObject::build();
    processor.register_handler(
        String::from("prople.agent.controller.setup"),
        Box::new(handler_did),
    );

    RpcState {
        processor: Arc::new(processor),
    }
}

Tokio Axum 中的示例

pub async fn run_rpc(args: AnaArgs, canceller: Receiver<i32>) {
    let rpc_state = build_rpc_processor(args.clone());
    let app = Router::new()
        .route("/rpc", post(rpc_handler))
        .layer((
            TraceLayer::new_for_http()
                .make_span_with(|request: &Request| {
                    let matched_path = request
                        .extensions()
                        .get::<MatchedPath>()
                        .map(MatchedPath::as_str);

                    info_span!(
                        "http_request",
                        method = ?request.method(),
                        matched_path,
                        some_other_field = tracing::field::Empty,
                    )
                })
                .on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
            TimeoutLayer::new(Duration::from_secs(5)),
        ))
        .with_state(Arc::new(rpc_state));

    let rpc_addr = format!("0.0.0.0:{}", args.rpc_port);
    tracing::info!("listening on: {}", rpc_addr);

    let listener = tokio::net::TcpListener::bind(rpc_addr).await.unwrap();
    let (close_tx, close_rx) = watch::channel(());

    let mut canceller = canceller.clone();
    loop {
        let (socket, remote_addr) = select! {
            result = listener.accept() => {
                result.unwrap()
            }

            _ = canceller.changed() => {
                tracing::warn!("canceller catched! stopping tcp listener to receive request...");
                break;
            }
        };

        let tower_svc = app.clone();
        let close_rx = close_rx.clone();

        let mut canceller = canceller.clone();
        task::spawn(async move {
            let socket = TokioIo::new(socket);
            let hyper_svc = hyper::service::service_fn(move |request: Request<Incoming>| {
                tower_svc.clone().call(request)
            });

            let conn = hyper::server::conn::http1::Builder::new()
                .serve_connection(socket, hyper_svc)
                .with_upgrades();

            let mut conn = std::pin::pin!(conn);
            loop {
                select! {
                    result = conn.as_mut() => {
                        if let Err(err) = result {
                            tracing::debug!("failed to serve connection: {}", err)
                        }

                        break;
                    }

                    _ = canceller.changed() => {
                        tracing::warn!("canceller catched! starting hyper connection to gracefully shutdown");
                        conn.as_mut().graceful_shutdown();
                    }
                }
            }

            tracing::debug!("connection {remote_addr} closed");
            drop(close_rx);
        });
    }

    drop(close_rx);
    drop(listener);
    tracing::debug!("waiting for {} tasks to finish", close_tx.receiver_count());
    close_tx.closed().await;
}

RPC处理器实际上与框架的实现是分离的。HTTP框架(Axum)仅负责打开TCP端口,获取请求,将请求负载路由到RPC处理器,获取响应,并将其转发回请求调用者。

默认处理器

存在一个默认的RPC方法及其处理器。

    let mut handlers: HashMap<String, Box<dyn RpcHandler + Send + Sync>> = HashMap::new();
    handlers.insert("prople.agent.ping".to_string(), Box::new(AgentPingHandler));

这个默认方法和处理器用于发送ping-pong请求-响应。处理器本身看起来像这样

#[async_trait]
impl RpcHandler for AgentPingHandler {
    async fn call(&self, _: Value) -> Result<Option<Box<dyn ErasedSerialized>>> {
        let output = AgentPingResponse {
            message: String::from("pong!"),
        };
        Ok(Some(Box::new(output)))
    }
}

依赖项

~8-19MB
~285K SLoC