#semaphore #distributed-systems #throttle #http #client #peer #server

throttle-client

Throttle 客户端。Throttle 是一个 http 信号量服务,为分布式系统提供信号量。

15 个版本

0.5.3 2024 年 4 月 21 日
0.4.5 2023 年 12 月 8 日
0.4.3 2023 年 3 月 25 日
0.4.0 2022 年 6 月 8 日
0.3.12 2020 年 6 月 3 日

并发 中排名第 388

MIT 许可证

22KB
172

Throttle

分布式系统的信号量。

动机

Throttle 通过 http 接口提供信号量服务。正如其名称所示,其主要用例是为系统访问资源提供节流,即让该系统的元素先请求许可(即获取租约)。如果系统由运行在不同机器上的多个进程或同一网络中的虚拟机组成,则 Throttle 可能适合该用途。

Throttle 致力于易于操作,边缘情况表现良好,且无需持久化后端。

特性

  • 服务器在 Windows、Linux 和 OS-X 上构建和运行。
  • 客户端
    • Python:具有阻塞 API 的高级客户端。
    • Rust:具有异步 API 的低级客户端。
  • 通过强制锁定层次结构防止死锁。
  • 公平性(等待时间较长的对等方具有优先级)
  • 具有大计数的锁定,不会被许多具有小计数的其他锁定饿死。
  • 对网络中断具有弹性。
    • 锁定将过期,以防止由于网络错误或客户端崩溃而导致的信号量计数泄漏。
    • 可以通过发送到服务器的心跳无限期地延长锁定。
  • 可观察性
    • Prometheus 指标
    • 将日志记录到 stderr
  • 无需持久化后端。
    • 服务器在内存中保留状态。
    • 在服务器重新启动的情况下,客户端将状态恢复到服务器。

安装

服务器

使用 Docker 运行

Throttle 服务器也作为小型容器镜像发布到 Docker Hub。

docker pull pacman82/throttle

假设您在当前工作目录中有一个 throttle.toml 配置文件,您可以使用以下命令运行服务器:

docker run --rm -v ${PWD}:/cfg -p 8000:8000 pacman82/throttle -c cfg/throttle.toml

下载二进制发布版

对于 Windows、OS-X 和 Linux,有可预构建的二进制文件与 GitHub 发布版一起提供。您可以在 https://github.com/pacman82/throttle/releases/latest 找到最新发布版。

使用 Cargo 安装

服务器二进制文件发布到 crates.io,因此可以通过 cargo 安装。

cargo install throttle-server

Python 客户端

Python 客户端已发布到 PyPi,并可以使用pip进行安装。

pip install throttle-client

使用方法

操作 Throttle 服务器

启动和关闭

假设 throttle 可执行文件已在您的路径环境变量中,您可以通过执行它来启动 Throttle 服务器。您可以使用 --help 标志显示可用的命令行选项。不带任何参数启动它将使用默认配置启动服务器。

throttle

这将在当前进程中启动服务器。使用浏览器导航到 localhost:8000 以查看欢迎信息。您可以通过按下 Ctrl + C 来优雅地关闭 Throttle。

将日志记录到 stderr

THROTTLE_LOG 环境变量设置为在标准错误上查看更多输出。有效值是 ERRORWARNINFODEBUGTRACE

在 bash 中

THROTTLE_LOG=WARN

或 PowerShell

$env:THROTTLE_LOG="INFO"

现在启动服务器将提供更多信息。

[2020-04-12T18:56:23Z INFO  throttle] Hello From Throttle
[2020-04-12T18:56:23Z WARN  throttle] No semaphores configured.

Toml 配置文件

为了实际上传信号量,我们需要配置它们的名称和完整计数。默认情况下,如果存在,Throttle 将在当前工作目录中查找名为 throttle.toml 的配置文件。

# Sample throttle.cfg Explaining the options

[semaphores]
# Specify name and full count of semaphores. Below line creates a semaphore named A with a full
# count of 42. Setting the count to 1 would create a Mutex.
A = 42

## Optional logging config, to log to stderr. Can be overwritten using the `THROTTLE_LOG`
## environment variable.
# [logging.stderr]
# Set this to either ERROR, WARN, INFO, DEBUG or TRACE.
# level = "INFO"

指标

Throttle 通过 /metrics 端点支持 Prometheus 指标。根据您的配置和状态,它们可能看起来像这样

# HELP throttle_acquired Sum of all acquired locks.
# TYPE throttle_acquired gauge
throttle_acquired{semaphore="A"} 0
# HELP throttle_longest_pending_sec Time the longest pending peer is waiting until now, to acquire a lock to a semaphore.
# TYPE throttle_longest_pending_sec gauge
throttle_longest_pending_sec{semaphore="A"} 0
# HELP throttle_max Maximum allowed lock count for this semaphore.
# TYPE throttle_max gauge
throttle_max{semaphore="A"} 42
# HELP throttle_num_404 Number of Get requests to unknown resource.
# TYPE throttle_num_404 counter
throttle_num_404 0
# HELP throttle_pending Sum of all pending locks
# TYPE throttle_pending gauge
throttle_pending{semaphore="A"} 0

Python 客户端

Throttle 随附 Python 客户端。以下是如何简要使用它的说明。

from throttle_client import Peer, lock

# Configure endpoint to throttle server
url = "https://127.0.0.1:8000"

# Acquire a lock (with count 1) to semaphore A
with lock(url, "A"):
    # Do stuff while holding lock to "A"
    # ...

# For acquiring lock count != 1 the count can be explicitly specified.
with lock(url, "A", count=4):
    # Do stuff while holding lock to "A"
    # ...

# A is released at the end of with block

使用锁层次结构防止死锁

假设有两个信号量 AB

[semaphores]
A = 1
B = 1

您想以嵌套的方式获取对它们的锁定

from throttle_client import Peer, lock

# Configure endpoint to throttle server
url = "https://127.0.0.1:8000"

# Acquire a lock to semaphore A
with lock(url, "A"):
    # Do stuff while holding lock to "A"
    # ...
    with lock(url, "B") # <-- This throws an exception: "Lock Hierarchy Violation".
      # ...

Throttle 服务器可以帮助您防止死锁。如果 AB 不总是以相同的顺序锁定,那么您的系统可能在某个时刻发生死锁。此类错误很难调试,这就是为什么 Throttle 在任何可能发生死锁的情况下都会尽早失败。要启用上述用例,请给 A 分配比 B 更高的锁级别。

[semaphores]
A = { max=1, level=1 }
# Level 0 is default. So the short notation is still good for B.
B = 1

Http 路由

  • GET /:打印问候消息
  • GET /health:始终回答 200 OK
  • GET /metrics: Prometheus 指标
  • GET /version:返回服务器版本。

管理对等方和锁的路由

  • POST new_peer:创建新的对等方。此请求的正文必须包含一个带有维度的可读时间持续性的引号。例如:"expires_in": "5m""expires_in": "30s""expires_in": "12h"。这是对等方在延长其过期时间之前将过期的持续时间。每个获取的锁始终与一个对等方相关联。如果对等方过期,则释放所有锁。该请求返回一个随机整数作为对等方 ID。

  • DELETE /peers/{id}:删除对等节点,在此过程中释放其所有锁。每次调用 new_peer 都应与对这条路由的调用相匹配,这样其他节点就不必等待该节点到期才能获取到同一信号量的锁。

  • PUT /peers/{id}/{semaphore}:为现有节点获取信号量的锁。请求体必须包含所需的锁计数。如果锁可以被获取,则 Throttle 将以 200 Ok 响应;如果其他节点需要释放其锁才能获取锁,则返回 202 Accepted。指定比锁消息完整计数更高的锁计数或违反锁层次结构将导致 409 冲突 错误。请求未知信号量或未知节点的锁将导致 400 错误请求。此请求是幂等的,因此可以重复获取锁,以防超时,而不会耗尽信号量。如果客户端等待锁,可以使用可选的 block_for 查询参数避免忙等待。例如,/peer/{id}/{semaphore}?block_for=10s。获取计数为 0 的锁的语义可能很尴尬,因此目前禁止。

  • DELETE /peers/{id}/{semaphore}:释放节点的一个特定锁。

  • POST /restore:客户端可以使用它来响应包含 Unknown Semaphore400 错误请求。此错误表示服务器不记得客户端的状态(例如,客户端可能因长时间连接丢失而过期)。在这种情况下,客户端可以选择将以前的状态和获取的锁恢复到服务器。请求体包含如下 JSON 格式的数据

    {
      "expires_in": "5m",
      "peer_id": 42,
      "acquired": {
        "A": 3,
        "B": 1
      }
    }
    

    这将恢复 ID 为 42 且有效期为 5 分钟的客户端。它有一个针对 A 的锁,计数为 3,还有一个针对 B 的锁,计数为 1。

  • Get /remainder?semaphore={semaphore}:回答最大信号量计数减去此信号量所有已获取锁的总和。响应是纯文本整数。

  • Get /peers/{id}/is_acquired:如果节点有一个挂起的锁,则回答 false。如果节点的所有锁都已获取,则答案为 true

依赖关系

~4–17MB
~213K SLoC