21 个版本 (稳定版)
1.8.2 | 2024 年 7 月 2 日 |
---|---|
1.4.0 | 2024 年 5 月 24 日 |
1.2.1 | 2024 年 2 月 10 日 |
0.5.3 | 2023 年 12 月 31 日 |
#381 在 网络编程
197 每月下载量
85KB
1.5K SLoC
girolle
data:image/s3,"s3://crabby-images/4913e/4913e7e82440d5983f7c53f0070c27312bc7e553" alt=""
描述
一个类似于 nameko-rpc 的 rust 库。查看待办事项部分查看限制。
不要在生产环境中使用!
Girolle 使用 Nameko 架构发送请求并获取响应。
文档
安装
cargo添加 girolle
配置
有两种方式创建配置。第一种是使用 Config::with_yaml_defaults
函数,它会从一个 YAML 文件中读取配置,查看示例。第二种是通过手动创建配置。
从 YAML 文件创建配置
配置是通过一个 YAML 文件完成的。它应该符合 Nameko 的规范。文件应该看起来像这样
AMQP_URI: 'amqp://toto:super@$172.16.1.1:5672//'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10
在这个示例中
- 这里的
AMQP_URI
是连接到 RabbitMQ 服务器的连接字符串。 - 这里的
rpc_exchange
是 rpc 调用的交换机名称。 - 这里的
max_workers
是将创建以处理 rpc 调用的最大工作者数量。 - 这里的
parent_calls_tracked
是将被服务跟踪的父调用数量。
手动创建配置
let conf = Config::default_config();
conf.with_amqp_uri("amqp://toto:super@localhost:5672/")
.with_rpc_exchange("nameko-rpc")
.with_max_workers(10)
.with_parent_calls_tracked(10);
环境变量
配置支持以下语法的环境变量扩展 ${VAR_NAME}
。如下示例所示
AMQP_URI: 'amqp://${RABBITMQ_USER}:${RABBITMQ_PASSWORD}@${RABBITMQ_HOST}:${RABBITMQ_PORT}/%2f'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10
如何使用它
核心概念是通过使用 RpcService
或 RpcClient
模拟 Nameko 架构来移除队列创建和回复的痛苦,并使用一个抽象类型 serde_json::Value
来操作可序列化数据。
如果您不使用宏 #[girolle]
,则需要创建一个函数来从 &[Value]
中提取数据,如下所示
fn fibonacci_reccursive(s: &[Value]) -> Result<Value> {
let n: u64 = serde_json::from_value(s[0].clone())?;
let result: Value = serde_json::to_value(fibonacci(n))?;
Ok(result)
}
示例
创建一个简单的服务
use girolle::prelude::*;
use std::{thread, time};
#[girolle]
fn hello(s: String) -> String {
format!("Hello, {}!", s)
}
#[girolle]
fn sub(a: i64, b: i64) -> i64 {
a - b
}
#[girolle]
fn slip(n: u64) -> String {
thread::sleep(time::Duration::from_secs(n));
format!("Slept for {} seconds", n)
}
#[girolle]
fn fibonacci(n: u64) -> u64 {
if n <= 1 {
return n;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}
fn main() {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string()).unwrap();
let _ = RpcService::new(conf, "video")
.register(hello)
.register(sub)
.register(slip)
.register(fibonacci)
.start();
}
对服务方法进行多次调用,包括同步和异步
use girolle::prelude::Payload;
use girolle::{serde_json, Config, RpcClient, Value};
use std::time::Instant;
use std::{thread, time};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?;
let service_name = "video";
// Create the rpc call struct
let mut rpc_client = RpcClient::new(conf);
rpc_client.register_service(service_name).await?;
rpc_client.start().await?;
// Send the payload
let p = Payload::new().arg(30);
let new_result = rpc_client.send(service_name, "fibonacci", p)?;
let fib_result: u64 = serde_json::from_value(new_result.get_value())?;
// Print the result
println!("fibonacci :{:?}", fib_result);
assert_eq!(fib_result, 832040);
let sub_result = rpc_client.send(service_name, "sub", Payload::new().arg(10).arg(5))?;
assert_eq!(sub_result.get_value(), Value::Number(serde_json::Number::from(5)));
// Create a future result
let future_result = rpc_client.call_async(service_name, "hello", Payload::new().arg("Toto"))?;
// Send a message during the previous async process
let result = rpc_client.send(service_name, "hello", Payload::new().arg("Girolle"))?;
// Print the result
println!("{:?}", result.get_value());
assert_eq!(result.get_value(), Value::String("Hello, Girolle!".to_string()));
// wait for it
let tempo: time::Duration = time::Duration::from_secs(4);
thread::sleep(tempo);
println!("exit sleep");
// Print the result
let async_result = rpc_client.result(&future_result)?;
println!("{:?}", async_result.get_value());
assert_eq!(async_result.get_value(), Value::String("Hello, Toto!".to_string()));
println!("Time elapsed in for the previous call is: {:?}", async_result.get_elapsed_time()-tempo);
let start = Instant::now();
let mut consummers: Vec<_> = Vec::new();
for n in 1..1001 {
consummers.push((
n,
rpc_client.call_async(service_name, "hello", Payload::new().arg(n.to_string())),
));
}
// wait for it
thread::sleep(tempo);
for con in consummers {
let _async_result = rpc_client.result(&con.1?)?;
}
let duration = start.elapsed() - tempo;
println!("Time elapsed in expensive_function() is: {:?}", duration);
rpc_client.unregister_service("video")?;
rpc_client.close().await?;
Ok(())
}
堆栈
Girolle 使用 lapin 作为 AMQP 客户端/服务器库。
支持的功能
- 创建客户端
- 在 Rust 中创建一个代理服务以与其他服务交互
- 创建一个简单的服务
- 处理错误
- 编写测试
- 添加宏以简化服务的创建
- 添加基本宏
- 修复宏以处理
return
- 修复宏以处理递归函数
- 监听 pub/sub 队列
nameko-client
Girolle 客户端具有发送同步请求和异步请求的基本功能。我对它需要与之交互的方式并不十分满意。我希望找到一种更优雅的方式,如 nameko 中的方式。但它是可行的,并且使用起来并不痛苦。
nameko-rpc
RpcService 和宏过程是库的核心。它不支持 代理,我知道这是 Nameko 库最重要的功能之一。我将来会尝试实现它。但我觉得我需要稍微重构 Rust 的非面向对象方面,这会使其更难。
nameko-pubsub
PubSub 服务尚未实现。我不知道我是否对此感兴趣。
nameko-web
网络服务尚未实现。我不确定是否会实现它。我需要重构客户端以使其 100% 线程安全。这应该是与代理的共同主题。
限制
当前的代码已在存储库中的 nameko 和 girolle 示例中进行了测试。
nameko_test.py | simple_sender.rs | |
---|---|---|
nameko_service.py | x | x |
simple_macro | x | x |
基准测试
简单消息基准测试
nameko_test.py | simple_sender.rs | |
---|---|---|
nameko_service.py | 15.587 秒 | 11.532 秒 |
simple_macro.rs | 15.654 秒 | 8.078 秒 |
客户端基准测试
使用 hyperfine 进行客户端基准测试。
Girolle 客户端(带有 Girolle 服务)
hyperfine -N './target/release/examples/simple_sender'
Benchmark 1: ./target/release/examples/simple_sender
Time (mean ± σ): 9.995 s ± 0.116 s [User: 0.163 s, System: 0.197 s]
Range (min … max): 9.778 s … 10.176 s 10 runs
Nameko 客户端(带有 Girolle 服务)
hyperfine -N --warmup 3 'python nameko_test.py'
Benchmark 1: python nameko_test.py
Time (mean ± σ): 15.654 s ± 0.257 s [User: 1.455 s, System: 0.407 s]
Range (min … max): 15.202 s … 15.939 s 10 runs
服务基准测试
Girolle 服务(带有 Girolle 客户端)
hyperfine -N './target/release/examples/simple_sender'
Benchmark 1: ./target/release/examples/simple_sender
Time (mean ± σ): 9.995 s ± 0.116 s [User: 0.163 s, System: 0.197 s]
Range (min … max): 9.778 s … 10.176 s 10 runs
Nameko 服务运行 python 3.9.15(带有 Girolle 客户端)
hyperfine -N --warmup 3 'target/release/examples/simple_sender'
Benchmark 1: target/release/examples/simple_sender
Time (mean ± σ): 11.532 s ± 0.091 s [User: 0.199 s, System: 0.213 s]
Range (min … max): 11.396 s … 11.670 s 10 runs
Nameko 服务运行 python 3.9.15(带有 Nameko 客户端)
hyperfine -N --warmup 3 'python nameko_test.py'
Benchmark 1: python nameko_test.py
Time (mean ± σ): 15.587 s ± 0.325 s [User: 1.443 s, System: 0.420 s]
Range (min … max): 15.181 s … 16.034 s 10 runs
斐波那契基准测试
基准测试使用一个 静态随机整数集 来计算斐波那契数。
nameko_fib_payload.py | |
---|---|
nameko_service.py | 3 分钟 58.11 秒 |
simple_macro.rs | 6.99 秒 |
宏开销基准测试
此基准测试是为了测试宏的开销。
依赖项
~15–29MB
~456K SLoC