#rpc #macro #value #abstract #proc-macro #nameko #girolle

girolle_macro

一个用于生成Nameko函数的Nameko宏过程宏。

15个稳定版本

1.8.2 2024年7月2日
1.8.1 2024年6月28日
1.5.1 2024年5月30日
1.3.1 2024年4月26日
1.0.1 2024年1月2日

1038过程宏 中排名

Download history 187/week @ 2024-04-20 85/week @ 2024-04-27 3/week @ 2024-05-04 121/week @ 2024-05-18 213/week @ 2024-05-25 403/week @ 2024-06-01 84/week @ 2024-06-08 326/week @ 2024-06-15 138/week @ 2024-06-22 196/week @ 2024-06-29 59/week @ 2024-07-06 207/week @ 2024-07-27

每月275次 下载
girolle 中使用

自定义许可证

16KB
92

girolle

描述

一个类似nameko-rpc的库,用于Rust。查看待办事项部分以了解限制。

请勿在生产环境中使用!

Girolle 使用 Nameko 架构发送请求并获取响应。

文档

用户文档Rust文档

安装

cargo添加 girolle

配置

有两种方法可以创建配置。第一种是使用 Config::with_yaml_defaults 函数,它将从YAML文件中读取配置,查看示例。第二种是手动创建配置。

从yaml文件创建配置

配置是通过yaml文件完成的。它应该符合Nameko的一个。文件看起来应该是这样的

AMQP_URI: 'amqp://toto:super@$172.16.1.1:5672//'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10

在这个示例中

  • AMQP_URI 是连接到RabbitMQ服务器的连接字符串。
  • rpc_exchange 是rpc调用的交换名称。
  • max_workers 是将创建以处理rpc调用的最大工作进程数。
  • parent_calls_tracked 是服务将跟踪的父调用数。

手动创建配置

let conf = Config::default_config();
conf.with_amqp_uri("amqp://toto:super@localhost:5672/")
    .with_rpc_exchange("nameko-rpc")
    .with_max_workers(10)
    .with_parent_calls_tracked(10);

环境变量

配置支持以下语法扩展环境变量:${VAR_NAME}。例如,在这个示例中

AMQP_URI: 'amqp://${RABBITMQ_USER}:${RABBITMQ_PASSWORD}@${RABBITMQ_HOST}:${RABBITMQ_PORT}/%2f'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10

如何使用它

核心概念是通过mocking Nameko 架构中的 RpcServiceRpcClient 来移除队列创建和回复的痛苦,并使用一个抽象类型 serde_json::Value 来操作可序列化的数据。

如果你不使用宏 #[girolle],你需要创建一个函数来从 &[Value] 中提取数据,如下所示

fn fibonacci_reccursive(s: &[Value]) -> Result<Value> {
    let n: u64 = serde_json::from_value(s[0].clone())?;
    let result: Value = serde_json::to_value(fibonacci(n))?;
    Ok(result)
}

示例

创建一个简单的服务

use girolle::prelude::*;
use std::{thread, time};

#[girolle]
fn hello(s: String) -> String {
    format!("Hello, {}!", s)
}

#[girolle]
fn sub(a: i64, b: i64) -> i64 {
    a - b
}

#[girolle]
fn slip(n: u64) -> String {
    thread::sleep(time::Duration::from_secs(n));
    format!("Slept for {} seconds", n)
}

#[girolle]
fn fibonacci(n: u64) -> u64 {
    if n <= 1 {
        return n;
    }
    return fibonacci(n - 1) + fibonacci(n - 2);
}

fn main() {
    let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string()).unwrap();
    let _ = RpcService::new(conf, "video")
        .register(hello)
        .register(sub)
        .register(slip)
        .register(fibonacci)
        .start();
}

对服务的多个方法调用进行同步和异步操作

use girolle::prelude::Payload;
use girolle::{serde_json, Config, RpcClient, Value};
use std::time::Instant;
use std::{thread, time};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?;
    let service_name = "video";
    // Create the rpc call struct
    let mut rpc_client = RpcClient::new(conf);
    rpc_client.register_service(service_name).await?;
    rpc_client.start().await?;
    // Send the payload
    let p = Payload::new().arg(30);
    let new_result = rpc_client.send(service_name, "fibonacci", p)?;
    let fib_result: u64 = serde_json::from_value(new_result.get_value())?;
    // Print the result
    println!("fibonacci :{:?}", fib_result);
    assert_eq!(fib_result, 832040);
    let sub_result = rpc_client.send(service_name, "sub", Payload::new().arg(10).arg(5))?;
    assert_eq!(sub_result.get_value(), Value::Number(serde_json::Number::from(5)));
    // Create a future result
    let future_result = rpc_client.call_async(service_name, "hello", Payload::new().arg("Toto"))?;
    // Send a message during the previous async process
    let result = rpc_client.send(service_name, "hello", Payload::new().arg("Girolle"))?;
    // Print the result
    println!("{:?}", result.get_value());
    assert_eq!(result.get_value(), Value::String("Hello, Girolle!".to_string()));
    // wait for it
    let tempo: time::Duration = time::Duration::from_secs(4);
    thread::sleep(tempo);
    println!("exit sleep");
    // Print the result
    let async_result = rpc_client.result(&future_result)?;
    println!("{:?}", async_result.get_value());
    assert_eq!(async_result.get_value(), Value::String("Hello, Toto!".to_string()));
    println!("Time elapsed in for the previous call is: {:?}", async_result.get_elapsed_time()-tempo);
    let start = Instant::now();
    let mut consummers: Vec<_> = Vec::new();
    for n in 1..1001 {
        consummers.push((
            n,
            rpc_client.call_async(service_name, "hello", Payload::new().arg(n.to_string())),
        ));
    }
    // wait for it
    thread::sleep(tempo);
    for con in consummers {
        let _async_result = rpc_client.result(&con.1?)?;
    }
    let duration = start.elapsed() - tempo;
    println!("Time elapsed in expensive_function() is: {:?}", duration);
    rpc_client.unregister_service("video")?;
    rpc_client.close().await?;
    Ok(())
}

堆栈

Girolle 使用 lapin 作为 AMQP 客户端/服务器库。

支持的功能

  • 创建客户端
    • 在 Rust 中创建一个代理服务以与其他服务交互
  • 创建一个简单的服务
    • 处理错误
    • 编写测试
  • 添加宏以简化服务的创建
    • 添加基本宏
    • 修复宏以处理 return
    • 修复宏以处理递归函数
  • 监听 pub/sub 队列

nameko-client

Girolle 客户端具有发送同步请求和异步请求的基本功能。我对它与 Nameko 交互的方式并不满意。我希望找到一个更优雅的方式,就像在 Nameko 中一样。但它确实可以工作,使用它并不痛苦。

nameko-rpc

RpcService 和宏过程式是库的核心。它不支持 proxy,我知道这是 Nameko 库最重要的功能之一。我将在未来尝试实现它。但我想我需要对 Rust 的非面向对象方面进行一些重构,这可能会使其更难。

nameko-pubsub

PubSub 服务尚未实现。我不知道我是否对此感兴趣。

nameko-web

网络服务尚未实现。我不确定是否会实现它。我需要重构客户端以使其完全线程安全。它应该与代理共享一个共同的主题。

限制

当前的代码已经在本仓库中的 nameko 和 girolle 示例中进行了测试。

nameko_test.py simple_sender.rs
nameko_service.py x x
simple_macro x x

基准测试

简单消息基准测试

nameko_test.py simple_sender.rs
nameko_service.py 15.587 秒 11.532 秒
simple_macro.rs 15.654 秒 8.078 秒

客户端基准测试

使用 hyperfine 测试客户端基准测试。

Girolle 客户端(带 Girolle 服务)

hyperfine -N './target/release/examples/simple_sender'
Benchmark 1: ./target/release/examples/simple_sender
  Time (mean ± σ):      9.995 s ±  0.116 s    [User: 0.163 s, System: 0.197 s]
  Range (min … max):    9.778 s … 10.176 s    10 runs

Nameko 客户端(带 Girolle 服务)

hyperfine -N --warmup 3 'python nameko_test.py'
Benchmark 1: python nameko_test.py
  Time (mean ± σ):     15.654 s ±  0.257 s    [User: 1.455 s, System: 0.407 s]
  Range (min … max):   15.202 s … 15.939 s    10 runs

服务基准测试

Girolle 服务(带 Girolle 客户端)

hyperfine -N './target/release/examples/simple_sender'
Benchmark 1: ./target/release/examples/simple_sender
  Time (mean ± σ):      9.995 s ±  0.116 s    [User: 0.163 s, System: 0.197 s]
  Range (min … max):    9.778 s … 10.176 s    10 runs

Nameko 服务运行 python 3.9.15(带 Girolle 客户端)

hyperfine -N --warmup 3 'target/release/examples/simple_sender'
Benchmark 1: target/release/examples/simple_sender
  Time (mean ± σ):     11.532 s ±  0.091 s    [User: 0.199 s, System: 0.213 s]
  Range (min … max):   11.396 s … 11.670 s    10 runs

Nameko 服务运行 python 3.9.15(带 Nameko 客户端)

hyperfine -N --warmup 3 'python nameko_test.py'
Benchmark 1: python nameko_test.py
  Time (mean ± σ):     15.587 s ±  0.325 s    [User: 1.443 s, System: 0.420 s]
  Range (min … max):   15.181 s … 16.034 s    10 runs

斐波那契基准测试

基准测试使用一个 静态的随机整数集合 来计算斐波那契数。

nameko_fib_payload.py
nameko_service.py 3 分 58.11 秒
simple_macro.rs 6.99 秒

宏开销基准测试

此基准测试是为了测试宏的开销。

benchmark

依赖关系

~0.7–1.6MB
~35K SLoC