#api #valve #plumber #restful #connection #web-api #axum

bin+lib valve-rs

使用 Tokio、Axum 和 Deadpool 自动扩展 R {plumber} API 并发

4 个版本

0.1.3 2023 年 8 月 6 日
0.1.2 2023 年 8 月 6 日
0.1.1 2023 年 8 月 6 日
0.1.0 2023 年 7 月 9 日

#757Web 编程

BSD-3-Clause

28KB
318

Valve

自动重定向你的管道。

valve 创建由 Rust 的 Plumber API 驱动的多线程 API,使用 Rust 的 tokioaxum 网络框架。使用 deadpool 自动创建、池化和终止 Plumber 连接。应用程序连接自动由 hyper 池化。

Valve 是一个 Rust CLI,附带一个 R 包。从 R 会话中运行 Valve 将会阻塞会话。如果你感到舒适,建议安装 CLI,以便可以从终端运行 Valve,从而在 R 会话中调用 plumber API。

动机

Plumber 是一个 R 包,它从 R 函数创建 RESTful API。它有限制,因为每个 API 都是一个单独的 R 进程,因此是一个单独的线程。多个查询按照它们到达的顺序执行。扩展 plumber API 并不容易。valve 的目的是能够简化 plumber API 的扩展,从而简化 R 本身。我们可以通过利用 Rust 的 “无畏并发” 来使 R 更加出色。

安装

CLI 指令

要仅安装可执行文件,请运行

cargo install valve-rs --no-default-features

R 包指令

对于不熟悉 Rust 或 CLI 工具的人来说,有一个 R 包可以简化 Valve 的使用。它作为 Windows、Linux 和 MacOSX 的二进制文件提供,多亏了 R-universe

install.packages("valve", repos = c("https://josiahparry.r-universe.dev", "https://cloud.r-project.org"))

当构建 R 包时,它还包括在 inst/valve 中的二进制可执行文件。所以如果你需要可执行文件,system.file("valve", package = "valve") 将直接指向它!这将始终是 R 包使用的可执行文件的版本。

您可以通过运行以下命令来验证二进制文件是否适用于您的机器。如果您使用的是Windows机器,请包括以下命令以执行可执行文件:system.file("valve.exe", package = "valve")

# get executable path and included api paths
valve_executable <- system.file("valve", package = "valve")
plumber_api_path <- system.file("plumber.R", package = "valve")

# check that they exist
file.exists(c(valve_executable, plumber_api_path))

# run Valve from the R-package's executable
processx::run(
  valve_executable,
  args = c("-f", plumber_api_path),
  echo = TRUE
)

创建Valve应用程序

要使用R包并发运行plumber API,请使用valve_run()。最重要的参数是filepath,它确定哪个Plumber API将被执行,同时指定hostport以确定应用程序运行的位置。还可以使用n_maxworkerscheck_unusedmax_age参数来指定应用程序如何进行扩展。默认情况下,应用程序将在主机127.0.0.1上运行,端口为3000

library(valve)
# get included plumber API path
plumber_api_path <- system.file("plumber.R", package = "valve")

valve_run(plumber_api_path, n_max = 5)
#> Docs hosted at <http://127.0.0.1:3000/__docs__/>

使用cli

valve -f plumber.R -n 5 

理解参数

您提供的参数决定了Valve如何根据请求的进入来扩展和缩小应用程序。

  • host (--host)
    • 默认为127.0.0.1。定义Axum应用程序和plumber API托管的主机。
  • port (--port)
    • 默认为3000。定义主Axum应用程序监听哪个端口。
  • file (--file)
    • 默认为plumber.R。定义plumber API的R脚本的路径。
  • workers (--workers)
    • 默认3。确定在Tokio Runtime中设置了多少个工作线程。这些工作线程处理传入的请求并返回响应。
  • n_max (--n-max)
    • 默认3。指的是可以启动的最大后台Plumber API的数量,而workers指定有多少个主工作线程可用于处理传入的请求。通常,workers的数量应该与plumber API的数量相等,因为plumber是单线程的。这是默认值。如果workers小于n_max,则永远不会启动最大数量的API。
  • check_unused (--check-unused)
    • 默认10。检查未使用连接的时间间隔,单位为秒。
  • max_age (--max-age)
    • 默认300(五分钟)。指定连接在未经使用的情况下可以持续多长时间而不会被终止。如果一个连接达到这个年龄,它将在下一个池检查(由check_unused确定的时间间隔)时被终止。
  • n_min:
    • 默认 1。始终保持活跃的最小连接数。如果打开这么多连接,则不会修剪连接。

示例:使用多个工作进程调用阀门

阀门的工作方式是接受主端口(默认为3000)上的请求,然后将这些请求循环分配给在随机端口上生成的管道API。请求被 axum 捕获并代理到管道API进程。

您可以使用以下代码块在R中将Valve附带示例管道API在后台运行:

# create temp file
tmp <- tempfile(fileext = ".R")

# create script lines
valve_script <- '
plumber_api_path <- system.file("plumber.R", package = "valve")
valve::valve_run(plumber_api_path, workers = 5)
'
# write to temp
writeLines(valve_script, tmp)

# run in the background
rstudioapi::jobRunScript(tmp)

或直接从终端启动:

valve -f $(Rscript -e 'cat(system.file("plumber.R", package = "valve"))')

一旦Valve应用程序在后台运行,我们就可以开始示例。首先,我将定义一个函数来调用 /sleep 端点。该函数将接受两个参数:端口号和睡眠持续时间。端口号将用于在Valve应用程序和单个管道API之间切换。

sleep <- function(port, secs) {
  httr2::request(
        paste0("127.0.0.1:", port, "/sleep?zzz=", secs)
    ) |> 
    httr2::req_perform() |> 
    httr2::resp_body_string()
}

使用此函数,我们将创建5个总R会话,每个会话都会请求睡眠2秒。

library(furrr)
plan(multisession, workers = 21)

首先,我们将ping主阀门应用程序,该应用程序将分配请求。第一次运行可能较慢,因为没有管道API在池中。

start <- Sys.time()
multi_sleep <- future_map(1:5, ~ sleep(3000, 2))
multi_total <- Sys.time() - start

接下来,我们仅选择一个可用的管道API并对其进行查询。

start <- Sys.time()
single_sleep <- furrr::future_map(1:5, ~ sleep(53869, 2))
single_total <- Sys.time() - start

注意性能差异。

print(paste0("Multiple Plumber APIs: ", round(multi_total, 2)))
#> [1] "Multiple Plumber APIs: 2.63"
print(paste0("One Plumber API: ", round(single_total, 2)))
#> [1] "One Plumber API: 10.08"

在前者中,每个工作进程都在大约相同的时间内发出请求。在后者中,必须等待每个后续步骤完成才能进行下一个步骤。因此,我们已经有效地分配了工作负载。

阀门是如何工作的

从高层次上讲,此图捕捉了该架构。

该系统实际上有三个关键组件

  • 托科 Runtime,
  • 阿克斯姆 Router,
  • 和连接 Pool.

请求处理

tokio Runtime 允许Valve异步运行。它处理I/O、任务等。它也是 Axum 的后端。在Valve中,我们 定义了一个异步运行时,具有预定义的 workers 数量。这些工作进程处理传入的请求。

当接收到请求时,它随后被发送到阿克斯姆 Router。路由器接收传入的请求并将它们发送到适当的端点。

定义的路线是 //*key/ 是永久重定向到管道API文档。而 /*key 捕获其他所有请求。这些请求具有特殊处理程序,简而言之,充当阿克斯姆和管道API之间的反向代理。处理程序捕获请求并从 Pool 获取管道连接。 管道结构 包含API所在的宿主和端口号。然后解析请求并将其重定向到管道API。 捕获响应并将其作为响应返回给阿克斯姆路由器

连接池

阀门实现了一个自定义的 管理池,用于管道API。池由包含主机、端口和 子进程Plumber 结构体组成。

当Deadpool为池创建新的连接时,它因此创建一个新的管道API。这是通过使用 Command::new() 创建一个分离的子进程来完成的。生成一个随机端口,检查,然后分配给管道API。然后通过调用 Command 通过 R -e "plumber::plumb('{filepath}')$run(host = '{host}', port = {port})" 启动进程。这意味着 R 必须在路径上,如果有多个R安装,将使用路径上的任何一个。

为了防止管道API频繁启动,它们被保持活跃的时间由 max_age 定义。一个连接可以在这段时间内未被使用。如果超过了这个年龄而未被使用,Deadpool将修剪连接并终止进程。这个检查在 单独的线程 上进行,每隔 check_unused 秒发生一次。

使用drill进行的基准测试

简单的基准测试可以使用drill在 inst/bench-sleep-plumber.ymlbench-sleep-valve.yml 中找到。

基准测试调用 /sleep 端点,并使用5个并发线程以500ms的间隔睡眠100次。这本身就可以说明我们可以用valve多快地加快单个管道API的响应时间。

管道的基准测试

Time taken for tests      50.7 seconds
Total requests            100
Successful requests       100
Failed requests           0
Requests per second       1.97 [#/sec]
Median time per request   2540ms
Average time per request  2482ms
Sample standard deviation 272ms
99.0'th percentile        2556ms
99.5'th percentile        2556ms
99.9'th percentile        2556ms

阀门的基准测试

Time taken for tests      10.2 seconds
Total requests            100
Successful requests       100
Failed requests           0
Requests per second       9.78 [#/sec]
Median time per request   510ms
Average time per request  510ms
Sample standard deviation 2ms
99.0'th percentile        516ms
99.5'th percentile        518ms
99.9'th percentile        518ms

说到这里...

valve最适合轻到中等负载。每个后台管道API将持有自己的R对象的副本。所以如果你在为一个GB大小的机器学习模型提供服务,该模型将必须复制到每个线程中,这可能会迅速增加你的RAM。所以要明智!如果你在R会话中有大量对象,尝试减少杂乱并使其变得瘦。

依赖关系

~7–20MB
~232K SLoC