26 个版本 (9 个破坏性更新)

0.11.0 2024 年 5 月 8 日
0.9.0 2024 年 2 月 20 日
0.8.0 2024 年 1 月 18 日
0.7.2 2023 年 12 月 13 日
0.1.7 2022 年 10 月 4 日

#80HTTP 服务器

Download history 268/week @ 2024-05-03 172/week @ 2024-05-10 136/week @ 2024-05-17 262/week @ 2024-05-24 234/week @ 2024-05-31 239/week @ 2024-06-07 155/week @ 2024-06-14 109/week @ 2024-06-21 123/week @ 2024-06-28 36/week @ 2024-07-05 26/week @ 2024-07-12 32/week @ 2024-07-19 71/week @ 2024-07-26 61/week @ 2024-08-02 74/week @ 2024-08-09 77/week @ 2024-08-16

每月 296 次下载

BSD-2-Clause

93KB
2K SLoC

SHORS

Latest Version Docs badge

Shors - 用于创建使用 tarantool-module 构建的分布式系统传输层的库。Shors 包含四个组件

HTTP

创建 HTTP 路由

使用 route::Builder 创建 HTTP 路由。创建路由后,只需将其注册到 HTTP 服务器

示例

use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request};
use shors::transport::Context;
use std::error::Error;

fn make_http_endpoint() {
  let endpoint = Builder::new()
          .with_method("GET")
          .with_path("/concat/a/:a/b/:b")
          .build(
            |_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
              let a = request
                      .stash
                      .get("a")
                      .map(|s| s.as_str())
                      .unwrap_or_default();
              let b = request
                      .stash
                      .get("b")
                      .map(|s| s.as_str())
                      .unwrap_or_default();

              Ok(a.to_string() + b)
            },
          );

  let s = server::Server::new();
  s.register(Box::new(endpoint));
}

更复杂的示例(包含组、错误处理、自定义和内置中间件)

use once_cell::sync::Lazy;
use opentelemetry::sdk::export::trace::stdout;
use opentelemetry::sdk::trace::Tracer;
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request, Response};
use shors::transport::Context;
use shors::{middleware, shors_error};
use std::error::Error;

static OPENTELEMETRY_TRACER: Lazy<Tracer> = Lazy::new(|| stdout::new_pipeline().install_simple());

pub fn make_http_endpoints() {
  let route_group = Builder::new()
          .with_path("/v1")
          .with_middleware(|route| {
            println!("got new http request!");
            route
          })
          .with_middleware(middleware::http::otel(&OPENTELEMETRY_TRACER))
          .group();

  #[derive(serde::Deserialize)]
  struct EchoRequest {
    text: String,
  }

  let echo = route_group
          .builder()
          .with_method("POST")
          .with_path("/echo")
          .build(
            |_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
              let req: EchoRequest = request.parse()?;
              Ok(req.text)
            },
          );

  let ping = route_group
          .builder()
          .with_method("GET")
          .with_path("/ping")
          .build(|_ctx: &mut Context, _request: Request| -> Result<_, Box<dyn Error>> { Ok("pong") });

  let s = server::Server::new();
  s.register(Box::new(echo));
  s.register(Box::new(ping));
}

添加 OpenAPI 文档

首先,将 shors = { ..., features = ["open-api"]} 添加到您的项目的 Cargo.toml 中。在路由构建器上使用 .define_open_api 方法并定义 OpenAPI 操作。使用 utoipa crate 在下划线 shors 上创建 OpenAPI 模式。为了方便用户,此 crate 作为 shors::utoipa 重新导出。

!重要:如果您使用来自 shors::utoipa 的 derive 宏,请将此行添加到 .rs 文件中

use shors::utoipa as utoipa;

以正确工作 derive 宏。

要访问生成的OpenAPI文档,请使用shors::transport::http::openapi::with_open_api函数。请参考测试应用程序中的路由示例以熟悉用法。

有关Swagger的用法,请参阅shors::transport::http::openapi::swagger_ui_route函数。

RPC

准备

Rpc传输需要导出存储过程 - rpc_handler。

创建存储过程。示例(其中RPC_SERVER是服务器实例)

#[no_mangle]
pub extern "C" fn rpc_handler(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
    RPC_SERVER.with(|srv| srv.handle(ctx, args))
}

并从卡式角色导出它。示例

    box.schema.func.create('mylib.rpc_handler', { language = 'C', if_not_exists = true })
    rawset(_G, 'rpc_handler', function(path, ctx, mp_request)
      return box.func['mylib.rpc_handler']:call({ path, ctx, mp_request })
    end)

创建rpc路由

与rpc路由一起工作与http相同:使用route::Builder创建rpc路由。路由创建后,将其注册到rpc 服务器

复杂示例

use once_cell::unsync::Lazy;
use shors::log::RequestIdOwner;
use shors::transport::rpc::server::Server;
use shors::transport::{rpc, Context};
use std::error::Error;
use shors::{middleware, shors_error};

thread_local! {
    pub static RPC_SERVER: Lazy<Server> = Lazy::new(Server::new);
}

#[tarantool::proc]
fn init_rpc() -> tarantool::Result<()> {
  let routes = rpc::route::Builder::new()
          .with_error_handler(|ctx, err| {
            shors_error!(ctx: ctx, "rpc error {}", err);
          })
          .with_middleware(|route| {
            println!("got new rpc request!");
            route
          })
          .with_middleware(middleware::rpc::otel(&OPENTELEMETRY_TRACER))
          .group();
  
  let sum_route = routes.builder().with_path("/sum").build(
    |_ctx: &mut Context, req: rpc::Request| -> Result<_, Box<dyn Error>> {
      let numbers = req.parse::<Vec<u64>>()?;
      Ok(numbers.into_iter().sum::<u64>())
    },
  );

  RPC_SERVER.with(|srv| {
    srv.register(Box::new(sum_route));
  });

  Ok(())
}

RPC客户端

有一个特殊的组件用于与远程rpc端点交互。目前,客户端可以通过四种模式调用rpc端点

  • 通过bucket_id(vshard)
  • 通过bucket_id(vshard)异步(不等待响应)
  • 通过replicaset id(调用当前主节点)
  • 通过卡式角色(调用当前主节点)
  • 通过卡式角色和uri(调用uri对应的实例,该实例可能不是当前主节点)

准备

RPC客户端需要注册一些lua代码,无论是注册在luaopen_函数中还是初始化函数中。请注意,当相关库正确加载时,会调用luaopen-function,例如从init.lua文件或初始化函数中故意初始化RPC客户端,如下例所示

示例

直接从init方法初始化rpc-client

#[proc]
fn init_rpc_client() {
  init_logger();
  let lua = unsafe { tlua::StaticLua::from_static(tarantool::lua_state().as_lua()) };
  shors::init_lua_functions(&lua)}

定义luaopen-function

#[no_mangle]
pub unsafe extern "C" fn luaopen_libstub(l: *mut ffi_lua::lua_State) -> c_int {
    let lua = tlua::StaticLua::from_static(l);
    shors::init_lua_functions(&lua).unwrap();
    1
}

按bucket_id调用rpc端点

    let lua = tarantool::lua_state();

    let params = vec![2, 3, 4];
    let resp = rpc::client::Builder::new(&lua)
        .shard_endpoint("/add")
        .call(&mut Context::background(), bucket_id, &params)?;

异步按bucket_id调用rpc端点

    let lua = tarantool::lua_state();

    rpc::client::Builder::new(&lua)
        .async_shard_endpoint("/ping")
        .call(&mut Context::background(), bucket_id, &())?;

按replicaset uuid调用rpc端点

    let lua = tarantool::lua_state();

    let params = vec![2, 3, 4];
    let resp = rpc::client::Builder::new(&lua)
        .replicaset_endpoint("/add")
        .prefer_replica()
        .call(&mut Context::background(), rs_uuid, &params)?;

按卡式角色调用rpc端点

注意:通过角色调用rpc端点需要注册导出的rpc_handler存储过程作为导出角色的方法。例如

return {
    role_name = 'app.roles.stub',
    ...
    rpc_handler = function(path, ctx, mp_request)
      return box.func['libstub.rpc_handle']:call({ path, ctx, mp_request })
    end,
}

调用示例

    let lua = tarantool::lua_state();

    let resp = rpc::client::Builder::new(&lua)
      .role_endpoint("app.roles.stub", "/ping")
      .call(&mut Context::background(), &())?;

按卡式角色调用rpc端点

注意:取决于rpc_handler(详细信息请参阅按卡式角色调用rpc端点项)

    let lua = tarantool::lua_state();

    let resp = rpc::client::Builder::new(&lua)
      .role_endpoint("app.roles.stub", "/ping")
      .with_uri("localhost:3031")
      .call(&mut Context::background(), &())?;

内置中间件

  • http服务器
    • debug - 在调试日志中打印调试信息
    • otel - opentelemetry追踪
    • otel_conditional - opentelemetry追踪,如果未设置http-header with-trace则禁用
    • access_logs - nginx样式的访问日志
  • rpc服务器
    • debug - 在调试日志中打印调试信息
    • otel - opentelemetry追踪
    • record_latency - 将路由延迟记录为prometheus度量
  • rpc 客户端
    • otel - opentelemetry追踪
    • retry - 在服务器端错误上重试调用
    • record_latency - 将调用延迟记录为prometheus度量

测试

单元测试

  make unit-test

集成测试

  (cd tests/testapplication/ && cartridge build)
  make int-test

请求管道(适用于0.1.x版本)

!注意以下内容不适用于shors v 0.2.0+

Shors使用vshard/cartridge API底层进行远程请求。卡式和vshard API都是LUA API。因此,查看Shors rpc请求的管道

客户端

  • Rust端
    1. sender创建rpc::client::Client的实例
    2. sender使用rpc::client::Client::call方法,该方法使用表示请求数据负载的rust结构
    3. rust结构序列化为LUA表(使用tarantool::tlua - Push trait)
  • LUA端
    1. 使用从上一步中得到的LUA表调用vshard或cartridge方法
    2. vshard或cartridge API调用iproto
    3. iproto 将 LUA 表格序列化为 msgpack 并发送到服务器端

服务器端

  • LUA端
    1. 从 iproto 接收消息,消息表示为 tarantool 元组
    2. 使用内部路由表调用 rust 中的函数处理器
  • Rust端
    1. 在 rpc::Request 上调用 .decode 方法以恢复请求

因此,序列化/反序列化方案看起来是这样的: rust 结构 -> lua 表格 -> msgpack 表示 -> rust 结构

这种方案中存在问题:如果我们使用 enum 在初始 rust 结构的字段中,会发生什么?例如

  #[derive(Debug, Deserialize, Serialize, tlua::Push)]
enum Value {
  String(String),
  Code(String),
}
#[derive(Debug, Deserialize, Serialize, tlua::Push)]
struct Foo {
  bar: Value,
}
let example = Foo{ bar: Value::Code("abc".to_string()) };

tarantool::tlua 将此结构序列化为 lua 表格如下

{
  bar = "abc"
}

所以如您所见,使用哪个 enum 变体的信息丢失了。将来,当我们将此 LUA 表格序列化为 msgpack 然后反序列化为 rust 结构时,我们将得到一个 serde 错误。Serde 预期在反序列化时需要一些类型信息,但没有。在下面的例子中,如果 LUA 表格看起来像

{
  bar = {"Code" = "abc"}
}

反序列化将成功且不会发生错误。

我们如何解决这个问题?目前最通用的方法是实现 tlua::Push trait 为 enum。对于此示例(注意,这是一个示例实现,不要在生产代码中创建 hashmap)

impl<L> tlua::Push<L> for Value
  where
          L: tlua::AsLua,
{
  type Err = TuplePushError<tlua::Void, tlua::Void>;

  fn push_to_lua(&self, lua: L) -> Result<PushGuard<L>, (Self::Err, L)> {
    let mut hm = HashMap::<String, String>::new();
    match self {
      Value::String(s) => hm.insert("String".to_string(), s.to_string()),
      Value::Code(s) => hm.insert("Code".to_string(), s.to_string()),
    };
    hm.push_to_lua(lua)
  }
}

impl<L> tlua::PushOne<L> for Value where L: tlua::AsLua {}

依赖项

~15–26MB
~393K SLoC