21 个版本 (稳定版)

1.8.2 2024 年 7 月 2 日
1.4.0 2024 年 5 月 24 日
1.2.1 2024 年 2 月 10 日
0.5.3 2023 年 12 月 31 日

#381网络编程

Download history 250/week @ 2024-04-23 27/week @ 2024-04-30 1/week @ 2024-05-14 147/week @ 2024-05-21 93/week @ 2024-05-28 315/week @ 2024-06-04 181/week @ 2024-06-11 175/week @ 2024-06-18 138/week @ 2024-06-25 207/week @ 2024-07-02 197/week @ 2024-07-30

197 每月下载量

自定义许可

85KB
1.5K SLoC

girolle

描述

一个类似于 nameko-rpc 的 rust 库。查看待办事项部分查看限制。

不要在生产环境中使用!

Girolle 使用 Nameko 架构发送请求并获取响应。

文档

用户文档Rust 文档

安装

cargo添加 girolle

配置

有两种方式创建配置。第一种是使用 Config::with_yaml_defaults 函数,它会从一个 YAML 文件中读取配置,查看示例。第二种是通过手动创建配置。

从 YAML 文件创建配置

配置是通过一个 YAML 文件完成的。它应该符合 Nameko 的规范。文件应该看起来像这样

AMQP_URI: 'amqp://toto:super@$172.16.1.1:5672//'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10

在这个示例中

  • 这里的 AMQP_URI 是连接到 RabbitMQ 服务器的连接字符串。
  • 这里的 rpc_exchange 是 rpc 调用的交换机名称。
  • 这里的 max_workers 是将创建以处理 rpc 调用的最大工作者数量。
  • 这里的 parent_calls_tracked 是将被服务跟踪的父调用数量。

手动创建配置

let conf = Config::default_config();
conf.with_amqp_uri("amqp://toto:super@localhost:5672/")
    .with_rpc_exchange("nameko-rpc")
    .with_max_workers(10)
    .with_parent_calls_tracked(10);

环境变量

配置支持以下语法的环境变量扩展 ${VAR_NAME}。如下示例所示

AMQP_URI: 'amqp://${RABBITMQ_USER}:${RABBITMQ_PASSWORD}@${RABBITMQ_HOST}:${RABBITMQ_PORT}/%2f'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10

如何使用它

核心概念是通过使用 RpcServiceRpcClient 模拟 Nameko 架构来移除队列创建和回复的痛苦,并使用一个抽象类型 serde_json::Value 来操作可序列化数据。

如果您不使用宏 #[girolle],则需要创建一个函数来从 &[Value] 中提取数据,如下所示

fn fibonacci_reccursive(s: &[Value]) -> Result<Value> {
    let n: u64 = serde_json::from_value(s[0].clone())?;
    let result: Value = serde_json::to_value(fibonacci(n))?;
    Ok(result)
}

示例

创建一个简单的服务

use girolle::prelude::*;
use std::{thread, time};

#[girolle]
fn hello(s: String) -> String {
    format!("Hello, {}!", s)
}

#[girolle]
fn sub(a: i64, b: i64) -> i64 {
    a - b
}

#[girolle]
fn slip(n: u64) -> String {
    thread::sleep(time::Duration::from_secs(n));
    format!("Slept for {} seconds", n)
}

#[girolle]
fn fibonacci(n: u64) -> u64 {
    if n <= 1 {
        return n;
    }
    return fibonacci(n - 1) + fibonacci(n - 2);
}

fn main() {
    let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string()).unwrap();
    let _ = RpcService::new(conf, "video")
        .register(hello)
        .register(sub)
        .register(slip)
        .register(fibonacci)
        .start();
}

对服务方法进行多次调用,包括同步和异步

use girolle::prelude::Payload;
use girolle::{serde_json, Config, RpcClient, Value};
use std::time::Instant;
use std::{thread, time};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?;
    let service_name = "video";
    // Create the rpc call struct
    let mut rpc_client = RpcClient::new(conf);
    rpc_client.register_service(service_name).await?;
    rpc_client.start().await?;
    // Send the payload
    let p = Payload::new().arg(30);
    let new_result = rpc_client.send(service_name, "fibonacci", p)?;
    let fib_result: u64 = serde_json::from_value(new_result.get_value())?;
    // Print the result
    println!("fibonacci :{:?}", fib_result);
    assert_eq!(fib_result, 832040);
    let sub_result = rpc_client.send(service_name, "sub", Payload::new().arg(10).arg(5))?;
    assert_eq!(sub_result.get_value(), Value::Number(serde_json::Number::from(5)));
    // Create a future result
    let future_result = rpc_client.call_async(service_name, "hello", Payload::new().arg("Toto"))?;
    // Send a message during the previous async process
    let result = rpc_client.send(service_name, "hello", Payload::new().arg("Girolle"))?;
    // Print the result
    println!("{:?}", result.get_value());
    assert_eq!(result.get_value(), Value::String("Hello, Girolle!".to_string()));
    // wait for it
    let tempo: time::Duration = time::Duration::from_secs(4);
    thread::sleep(tempo);
    println!("exit sleep");
    // Print the result
    let async_result = rpc_client.result(&future_result)?;
    println!("{:?}", async_result.get_value());
    assert_eq!(async_result.get_value(), Value::String("Hello, Toto!".to_string()));
    println!("Time elapsed in for the previous call is: {:?}", async_result.get_elapsed_time()-tempo);
    let start = Instant::now();
    let mut consummers: Vec<_> = Vec::new();
    for n in 1..1001 {
        consummers.push((
            n,
            rpc_client.call_async(service_name, "hello", Payload::new().arg(n.to_string())),
        ));
    }
    // wait for it
    thread::sleep(tempo);
    for con in consummers {
        let _async_result = rpc_client.result(&con.1?)?;
    }
    let duration = start.elapsed() - tempo;
    println!("Time elapsed in expensive_function() is: {:?}", duration);
    rpc_client.unregister_service("video")?;
    rpc_client.close().await?;
    Ok(())
}

堆栈

Girolle 使用 lapin 作为 AMQP 客户端/服务器库。

支持的功能

  • 创建客户端
    • 在 Rust 中创建一个代理服务以与其他服务交互
  • 创建一个简单的服务
    • 处理错误
    • 编写测试
  • 添加宏以简化服务的创建
    • 添加基本宏
    • 修复宏以处理 return
    • 修复宏以处理递归函数
  • 监听 pub/sub 队列

nameko-client

Girolle 客户端具有发送同步请求和异步请求的基本功能。我对它需要与之交互的方式并不十分满意。我希望找到一种更优雅的方式,如 nameko 中的方式。但它是可行的,并且使用起来并不痛苦。

nameko-rpc

RpcService 和宏过程是库的核心。它不支持 代理,我知道这是 Nameko 库最重要的功能之一。我将来会尝试实现它。但我觉得我需要稍微重构 Rust 的非面向对象方面,这会使其更难。

nameko-pubsub

PubSub 服务尚未实现。我不知道我是否对此感兴趣。

nameko-web

网络服务尚未实现。我不确定是否会实现它。我需要重构客户端以使其 100% 线程安全。这应该是与代理的共同主题。

限制

当前的代码已在存储库中的 nameko 和 girolle 示例中进行了测试。

nameko_test.py simple_sender.rs
nameko_service.py x x
simple_macro x x

基准测试

简单消息基准测试

nameko_test.py simple_sender.rs
nameko_service.py 15.587 秒 11.532 秒
simple_macro.rs 15.654 秒 8.078 秒

客户端基准测试

使用 hyperfine 进行客户端基准测试。

Girolle 客户端(带有 Girolle 服务)

hyperfine -N './target/release/examples/simple_sender'
Benchmark 1: ./target/release/examples/simple_sender
  Time (mean ± σ):      9.995 s ±  0.116 s    [User: 0.163 s, System: 0.197 s]
  Range (min … max):    9.778 s … 10.176 s    10 runs

Nameko 客户端(带有 Girolle 服务)

hyperfine -N --warmup 3 'python nameko_test.py'
Benchmark 1: python nameko_test.py
  Time (mean ± σ):     15.654 s ±  0.257 s    [User: 1.455 s, System: 0.407 s]
  Range (min … max):   15.202 s … 15.939 s    10 runs

服务基准测试

Girolle 服务(带有 Girolle 客户端)

hyperfine -N './target/release/examples/simple_sender'
Benchmark 1: ./target/release/examples/simple_sender
  Time (mean ± σ):      9.995 s ±  0.116 s    [User: 0.163 s, System: 0.197 s]
  Range (min … max):    9.778 s … 10.176 s    10 runs

Nameko 服务运行 python 3.9.15(带有 Girolle 客户端)

hyperfine -N --warmup 3 'target/release/examples/simple_sender'
Benchmark 1: target/release/examples/simple_sender
  Time (mean ± σ):     11.532 s ±  0.091 s    [User: 0.199 s, System: 0.213 s]
  Range (min … max):   11.396 s … 11.670 s    10 runs

Nameko 服务运行 python 3.9.15(带有 Nameko 客户端)

hyperfine -N --warmup 3 'python nameko_test.py'
Benchmark 1: python nameko_test.py
  Time (mean ± σ):     15.587 s ±  0.325 s    [User: 1.443 s, System: 0.420 s]
  Range (min … max):   15.181 s … 16.034 s    10 runs

斐波那契基准测试

基准测试使用一个 静态随机整数集 来计算斐波那契数。

nameko_fib_payload.py
nameko_service.py 3 分钟 58.11 秒
simple_macro.rs 6.99 秒

宏开销基准测试

此基准测试是为了测试宏的开销。

benchmark

依赖项

~15–29MB
~456K SLoC