15个稳定版本
1.8.2 | 2024年7月2日 |
---|---|
1.8.1 | 2024年6月28日 |
1.5.1 | 2024年5月30日 |
1.3.1 | 2024年4月26日 |
1.0.1 | 2024年1月2日 |
1038 在 过程宏 中排名
每月275次 下载
在 girolle 中使用
16KB
92 行
girolle
data:image/s3,"s3://crabby-images/222bf/222bf11bbabba77ff604ba15d513755db7837c16" alt=""
描述
一个类似nameko-rpc的库,用于Rust。查看待办事项部分以了解限制。
请勿在生产环境中使用!
Girolle 使用 Nameko 架构发送请求并获取响应。
文档
安装
cargo添加 girolle
配置
有两种方法可以创建配置。第一种是使用 Config::with_yaml_defaults
函数,它将从YAML文件中读取配置,查看示例。第二种是手动创建配置。
从yaml文件创建配置
配置是通过yaml文件完成的。它应该符合Nameko的一个。文件看起来应该是这样的
AMQP_URI: 'amqp://toto:super@$172.16.1.1:5672//'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10
在这个示例中
AMQP_URI
是连接到RabbitMQ服务器的连接字符串。rpc_exchange
是rpc调用的交换名称。max_workers
是将创建以处理rpc调用的最大工作进程数。parent_calls_tracked
是服务将跟踪的父调用数。
手动创建配置
let conf = Config::default_config();
conf.with_amqp_uri("amqp://toto:super@localhost:5672/")
.with_rpc_exchange("nameko-rpc")
.with_max_workers(10)
.with_parent_calls_tracked(10);
环境变量
配置支持以下语法扩展环境变量:${VAR_NAME}
。例如,在这个示例中
AMQP_URI: 'amqp://${RABBITMQ_USER}:${RABBITMQ_PASSWORD}@${RABBITMQ_HOST}:${RABBITMQ_PORT}/%2f'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10
如何使用它
核心概念是通过mocking Nameko 架构中的 RpcService
或 RpcClient
来移除队列创建和回复的痛苦,并使用一个抽象类型 serde_json::Value
来操作可序列化的数据。
如果你不使用宏 #[girolle]
,你需要创建一个函数来从 &[Value]
中提取数据,如下所示
fn fibonacci_reccursive(s: &[Value]) -> Result<Value> {
let n: u64 = serde_json::from_value(s[0].clone())?;
let result: Value = serde_json::to_value(fibonacci(n))?;
Ok(result)
}
示例
创建一个简单的服务
use girolle::prelude::*;
use std::{thread, time};
#[girolle]
fn hello(s: String) -> String {
format!("Hello, {}!", s)
}
#[girolle]
fn sub(a: i64, b: i64) -> i64 {
a - b
}
#[girolle]
fn slip(n: u64) -> String {
thread::sleep(time::Duration::from_secs(n));
format!("Slept for {} seconds", n)
}
#[girolle]
fn fibonacci(n: u64) -> u64 {
if n <= 1 {
return n;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}
fn main() {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string()).unwrap();
let _ = RpcService::new(conf, "video")
.register(hello)
.register(sub)
.register(slip)
.register(fibonacci)
.start();
}
对服务的多个方法调用进行同步和异步操作
use girolle::prelude::Payload;
use girolle::{serde_json, Config, RpcClient, Value};
use std::time::Instant;
use std::{thread, time};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string())?;
let service_name = "video";
// Create the rpc call struct
let mut rpc_client = RpcClient::new(conf);
rpc_client.register_service(service_name).await?;
rpc_client.start().await?;
// Send the payload
let p = Payload::new().arg(30);
let new_result = rpc_client.send(service_name, "fibonacci", p)?;
let fib_result: u64 = serde_json::from_value(new_result.get_value())?;
// Print the result
println!("fibonacci :{:?}", fib_result);
assert_eq!(fib_result, 832040);
let sub_result = rpc_client.send(service_name, "sub", Payload::new().arg(10).arg(5))?;
assert_eq!(sub_result.get_value(), Value::Number(serde_json::Number::from(5)));
// Create a future result
let future_result = rpc_client.call_async(service_name, "hello", Payload::new().arg("Toto"))?;
// Send a message during the previous async process
let result = rpc_client.send(service_name, "hello", Payload::new().arg("Girolle"))?;
// Print the result
println!("{:?}", result.get_value());
assert_eq!(result.get_value(), Value::String("Hello, Girolle!".to_string()));
// wait for it
let tempo: time::Duration = time::Duration::from_secs(4);
thread::sleep(tempo);
println!("exit sleep");
// Print the result
let async_result = rpc_client.result(&future_result)?;
println!("{:?}", async_result.get_value());
assert_eq!(async_result.get_value(), Value::String("Hello, Toto!".to_string()));
println!("Time elapsed in for the previous call is: {:?}", async_result.get_elapsed_time()-tempo);
let start = Instant::now();
let mut consummers: Vec<_> = Vec::new();
for n in 1..1001 {
consummers.push((
n,
rpc_client.call_async(service_name, "hello", Payload::new().arg(n.to_string())),
));
}
// wait for it
thread::sleep(tempo);
for con in consummers {
let _async_result = rpc_client.result(&con.1?)?;
}
let duration = start.elapsed() - tempo;
println!("Time elapsed in expensive_function() is: {:?}", duration);
rpc_client.unregister_service("video")?;
rpc_client.close().await?;
Ok(())
}
堆栈
Girolle 使用 lapin 作为 AMQP 客户端/服务器库。
支持的功能
- 创建客户端
- 在 Rust 中创建一个代理服务以与其他服务交互
- 创建一个简单的服务
- 处理错误
- 编写测试
- 添加宏以简化服务的创建
- 添加基本宏
- 修复宏以处理
return
- 修复宏以处理递归函数
- 监听 pub/sub 队列
nameko-client
Girolle 客户端具有发送同步请求和异步请求的基本功能。我对它与 Nameko 交互的方式并不满意。我希望找到一个更优雅的方式,就像在 Nameko 中一样。但它确实可以工作,使用它并不痛苦。
nameko-rpc
RpcService
和宏过程式是库的核心。它不支持 proxy,我知道这是 Nameko 库最重要的功能之一。我将在未来尝试实现它。但我想我需要对 Rust 的非面向对象方面进行一些重构,这可能会使其更难。
nameko-pubsub
PubSub 服务尚未实现。我不知道我是否对此感兴趣。
nameko-web
网络服务尚未实现。我不确定是否会实现它。我需要重构客户端以使其完全线程安全。它应该与代理共享一个共同的主题。
限制
当前的代码已经在本仓库中的 nameko 和 girolle 示例中进行了测试。
nameko_test.py | simple_sender.rs | |
---|---|---|
nameko_service.py | x | x |
simple_macro | x | x |
基准测试
简单消息基准测试
nameko_test.py | simple_sender.rs | |
---|---|---|
nameko_service.py | 15.587 秒 | 11.532 秒 |
simple_macro.rs | 15.654 秒 | 8.078 秒 |
客户端基准测试
使用 hyperfine 测试客户端基准测试。
Girolle 客户端(带 Girolle 服务)
hyperfine -N './target/release/examples/simple_sender'
Benchmark 1: ./target/release/examples/simple_sender
Time (mean ± σ): 9.995 s ± 0.116 s [User: 0.163 s, System: 0.197 s]
Range (min … max): 9.778 s … 10.176 s 10 runs
Nameko 客户端(带 Girolle 服务)
hyperfine -N --warmup 3 'python nameko_test.py'
Benchmark 1: python nameko_test.py
Time (mean ± σ): 15.654 s ± 0.257 s [User: 1.455 s, System: 0.407 s]
Range (min … max): 15.202 s … 15.939 s 10 runs
服务基准测试
Girolle 服务(带 Girolle 客户端)
hyperfine -N './target/release/examples/simple_sender'
Benchmark 1: ./target/release/examples/simple_sender
Time (mean ± σ): 9.995 s ± 0.116 s [User: 0.163 s, System: 0.197 s]
Range (min … max): 9.778 s … 10.176 s 10 runs
Nameko 服务运行 python 3.9.15(带 Girolle 客户端)
hyperfine -N --warmup 3 'target/release/examples/simple_sender'
Benchmark 1: target/release/examples/simple_sender
Time (mean ± σ): 11.532 s ± 0.091 s [User: 0.199 s, System: 0.213 s]
Range (min … max): 11.396 s … 11.670 s 10 runs
Nameko 服务运行 python 3.9.15(带 Nameko 客户端)
hyperfine -N --warmup 3 'python nameko_test.py'
Benchmark 1: python nameko_test.py
Time (mean ± σ): 15.587 s ± 0.325 s [User: 1.443 s, System: 0.420 s]
Range (min … max): 15.181 s … 16.034 s 10 runs
斐波那契基准测试
基准测试使用一个 静态的随机整数集合 来计算斐波那契数。
nameko_fib_payload.py | |
---|---|
nameko_service.py | 3 分 58.11 秒 |
simple_macro.rs | 6.99 秒 |
宏开销基准测试
此基准测试是为了测试宏的开销。
依赖关系
~0.7–1.6MB
~35K SLoC