26 个版本 (9 个破坏性更新)
0.11.0 | 2024 年 5 月 8 日 |
---|---|
0.9.0 | 2024 年 2 月 20 日 |
0.8.0 | 2024 年 1 月 18 日 |
0.7.2 | 2023 年 12 月 13 日 |
0.1.7 | 2022 年 10 月 4 日 |
#80 在 HTTP 服务器
每月 296 次下载
93KB
2K SLoC
SHORS
Shors - 用于创建使用 tarantool-module 构建的分布式系统传输层的库。Shors 包含四个组件
HTTP
创建 HTTP 路由
使用 route::Builder 创建 HTTP 路由。创建路由后,只需将其注册到 HTTP 服务器。
示例
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request};
use shors::transport::Context;
use std::error::Error;
fn make_http_endpoint() {
let endpoint = Builder::new()
.with_method("GET")
.with_path("/concat/a/:a/b/:b")
.build(
|_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
let a = request
.stash
.get("a")
.map(|s| s.as_str())
.unwrap_or_default();
let b = request
.stash
.get("b")
.map(|s| s.as_str())
.unwrap_or_default();
Ok(a.to_string() + b)
},
);
let s = server::Server::new();
s.register(Box::new(endpoint));
}
更复杂的示例(包含组、错误处理、自定义和内置中间件)
use once_cell::sync::Lazy;
use opentelemetry::sdk::export::trace::stdout;
use opentelemetry::sdk::trace::Tracer;
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request, Response};
use shors::transport::Context;
use shors::{middleware, shors_error};
use std::error::Error;
static OPENTELEMETRY_TRACER: Lazy<Tracer> = Lazy::new(|| stdout::new_pipeline().install_simple());
pub fn make_http_endpoints() {
let route_group = Builder::new()
.with_path("/v1")
.with_middleware(|route| {
println!("got new http request!");
route
})
.with_middleware(middleware::http::otel(&OPENTELEMETRY_TRACER))
.group();
#[derive(serde::Deserialize)]
struct EchoRequest {
text: String,
}
let echo = route_group
.builder()
.with_method("POST")
.with_path("/echo")
.build(
|_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
let req: EchoRequest = request.parse()?;
Ok(req.text)
},
);
let ping = route_group
.builder()
.with_method("GET")
.with_path("/ping")
.build(|_ctx: &mut Context, _request: Request| -> Result<_, Box<dyn Error>> { Ok("pong") });
let s = server::Server::new();
s.register(Box::new(echo));
s.register(Box::new(ping));
}
添加 OpenAPI 文档
首先,将 shors = { ..., features = ["open-api"]}
添加到您的项目的 Cargo.toml 中。在路由构建器上使用 .define_open_api
方法并定义 OpenAPI 操作。使用 utoipa crate 在下划线 shors 上创建 OpenAPI 模式。为了方便用户,此 crate 作为 shors::utoipa
重新导出。
!重要:如果您使用来自 shors::utoipa
的 derive 宏,请将此行添加到 .rs 文件中
use shors::utoipa as utoipa;
以正确工作 derive 宏。
要访问生成的OpenAPI文档,请使用shors::transport::http::openapi::with_open_api
函数。请参考测试应用程序中的路由示例以熟悉用法。
有关Swagger的用法,请参阅shors::transport::http::openapi::swagger_ui_route
函数。
RPC
准备
Rpc传输需要导出存储过程 - rpc_handler。
创建存储过程。示例(其中RPC_SERVER是服务器实例)
#[no_mangle]
pub extern "C" fn rpc_handler(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
RPC_SERVER.with(|srv| srv.handle(ctx, args))
}
并从卡式角色导出它。示例
box.schema.func.create('mylib.rpc_handler', { language = 'C', if_not_exists = true })
rawset(_G, 'rpc_handler', function(path, ctx, mp_request)
return box.func['mylib.rpc_handler']:call({ path, ctx, mp_request })
end)
创建rpc路由
与rpc路由一起工作与http相同:使用route::Builder创建rpc路由。路由创建后,将其注册到rpc 服务器。
复杂示例
use once_cell::unsync::Lazy;
use shors::log::RequestIdOwner;
use shors::transport::rpc::server::Server;
use shors::transport::{rpc, Context};
use std::error::Error;
use shors::{middleware, shors_error};
thread_local! {
pub static RPC_SERVER: Lazy<Server> = Lazy::new(Server::new);
}
#[tarantool::proc]
fn init_rpc() -> tarantool::Result<()> {
let routes = rpc::route::Builder::new()
.with_error_handler(|ctx, err| {
shors_error!(ctx: ctx, "rpc error {}", err);
})
.with_middleware(|route| {
println!("got new rpc request!");
route
})
.with_middleware(middleware::rpc::otel(&OPENTELEMETRY_TRACER))
.group();
let sum_route = routes.builder().with_path("/sum").build(
|_ctx: &mut Context, req: rpc::Request| -> Result<_, Box<dyn Error>> {
let numbers = req.parse::<Vec<u64>>()?;
Ok(numbers.into_iter().sum::<u64>())
},
);
RPC_SERVER.with(|srv| {
srv.register(Box::new(sum_route));
});
Ok(())
}
RPC客户端
有一个特殊的组件用于与远程rpc端点交互。目前,客户端可以通过四种模式调用rpc端点
- 通过bucket_id(vshard)
- 通过bucket_id(vshard)异步(不等待响应)
- 通过replicaset id(调用当前主节点)
- 通过卡式角色(调用当前主节点)
- 通过卡式角色和uri(调用uri对应的实例,该实例可能不是当前主节点)
准备
RPC客户端需要注册一些lua代码,无论是注册在luaopen_
函数中还是初始化函数中。请注意,当相关库正确加载时,会调用luaopen-function,例如从init.lua
文件或初始化函数中故意初始化RPC客户端,如下例所示
示例
直接从init方法初始化rpc-client
#[proc]
fn init_rpc_client() {
init_logger();
let lua = unsafe { tlua::StaticLua::from_static(tarantool::lua_state().as_lua()) };
shors::init_lua_functions(&lua)}
定义luaopen-function
#[no_mangle]
pub unsafe extern "C" fn luaopen_libstub(l: *mut ffi_lua::lua_State) -> c_int {
let lua = tlua::StaticLua::from_static(l);
shors::init_lua_functions(&lua).unwrap();
1
}
按bucket_id调用rpc端点
let lua = tarantool::lua_state();
let params = vec![2, 3, 4];
let resp = rpc::client::Builder::new(&lua)
.shard_endpoint("/add")
.call(&mut Context::background(), bucket_id, ¶ms)?;
异步按bucket_id调用rpc端点
let lua = tarantool::lua_state();
rpc::client::Builder::new(&lua)
.async_shard_endpoint("/ping")
.call(&mut Context::background(), bucket_id, &())?;
按replicaset uuid调用rpc端点
let lua = tarantool::lua_state();
let params = vec![2, 3, 4];
let resp = rpc::client::Builder::new(&lua)
.replicaset_endpoint("/add")
.prefer_replica()
.call(&mut Context::background(), rs_uuid, ¶ms)?;
按卡式角色调用rpc端点
注意:通过角色调用rpc端点需要注册导出的rpc_handler存储过程作为导出角色的方法。例如
return {
role_name = 'app.roles.stub',
...
rpc_handler = function(path, ctx, mp_request)
return box.func['libstub.rpc_handle']:call({ path, ctx, mp_request })
end,
}
调用示例
let lua = tarantool::lua_state();
let resp = rpc::client::Builder::new(&lua)
.role_endpoint("app.roles.stub", "/ping")
.call(&mut Context::background(), &())?;
按卡式角色调用rpc端点
注意:取决于rpc_handler(详细信息请参阅按卡式角色调用rpc端点项)
let lua = tarantool::lua_state();
let resp = rpc::client::Builder::new(&lua)
.role_endpoint("app.roles.stub", "/ping")
.with_uri("localhost:3031")
.call(&mut Context::background(), &())?;
内置中间件
- http服务器
- debug - 在调试日志中打印调试信息
- otel - opentelemetry追踪
- otel_conditional - opentelemetry追踪,如果未设置http-header
with-trace
则禁用 - access_logs - nginx样式的访问日志
- rpc服务器
- debug - 在调试日志中打印调试信息
- otel - opentelemetry追踪
- record_latency - 将路由延迟记录为prometheus度量
- rpc 客户端
- otel - opentelemetry追踪
- retry - 在服务器端错误上重试调用
- record_latency - 将调用延迟记录为prometheus度量
测试
单元测试
make unit-test
集成测试
(cd tests/testapplication/ && cartridge build)
make int-test
请求管道(适用于0.1.x版本)
!注意以下内容不适用于shors v 0.2.0+
Shors使用vshard/cartridge API底层进行远程请求。卡式和vshard API都是LUA API。因此,查看Shors rpc请求的管道
客户端
- Rust端
- sender创建rpc::client::Client的实例
- sender使用rpc::client::Client::call方法,该方法使用表示请求数据负载的rust结构
- rust结构序列化为LUA表(使用tarantool::tlua - Push trait)
- LUA端
- 使用从上一步中得到的LUA表调用vshard或cartridge方法
- vshard或cartridge API调用iproto
- iproto 将 LUA 表格序列化为 msgpack 并发送到服务器端
服务器端
- LUA端
- 从 iproto 接收消息,消息表示为 tarantool 元组
- 使用内部路由表调用 rust 中的函数处理器
- Rust端
- 在 rpc::Request 上调用 .decode 方法以恢复请求
因此,序列化/反序列化方案看起来是这样的: rust 结构 -> lua 表格 -> msgpack 表示 -> rust 结构
这种方案中存在问题:如果我们使用 enum 在初始 rust 结构的字段中,会发生什么?例如
#[derive(Debug, Deserialize, Serialize, tlua::Push)]
enum Value {
String(String),
Code(String),
}
#[derive(Debug, Deserialize, Serialize, tlua::Push)]
struct Foo {
bar: Value,
}
let example = Foo{ bar: Value::Code("abc".to_string()) };
tarantool::tlua 将此结构序列化为 lua 表格如下
{
bar = "abc"
}
所以如您所见,使用哪个 enum 变体的信息丢失了。将来,当我们将此 LUA 表格序列化为 msgpack 然后反序列化为 rust 结构时,我们将得到一个 serde 错误。Serde 预期在反序列化时需要一些类型信息,但没有。在下面的例子中,如果 LUA 表格看起来像
{
bar = {"Code" = "abc"}
}
反序列化将成功且不会发生错误。
我们如何解决这个问题?目前最通用的方法是实现 tlua::Push trait 为 enum。对于此示例(注意,这是一个示例实现,不要在生产代码中创建 hashmap)
impl<L> tlua::Push<L> for Value
where
L: tlua::AsLua,
{
type Err = TuplePushError<tlua::Void, tlua::Void>;
fn push_to_lua(&self, lua: L) -> Result<PushGuard<L>, (Self::Err, L)> {
let mut hm = HashMap::<String, String>::new();
match self {
Value::String(s) => hm.insert("String".to_string(), s.to_string()),
Value::Code(s) => hm.insert("Code".to_string(), s.to_string()),
};
hm.push_to_lua(lua)
}
}
impl<L> tlua::PushOne<L> for Value where L: tlua::AsLua {}
依赖项
~15–26MB
~393K SLoC