32 个版本
0.5.3 | 2024年4月21日 |
---|---|
0.4.5 | 2023年12月8日 |
0.4.3 | 2023年3月25日 |
0.4.0 | 2022年6月8日 |
0.1.8 | 2020年3月29日 |
#22 in 并发
1,753 每月下载量
84KB
1.5K SLoC
Throttle
分布式系统的信号量。
动机
Throttle 通过 http 接口提供信号量作为服务。正如其名所示,主要用例是限制系统对资源的访问,通过让该系统的元素先请求许可(即获取租约)。如果该系统由运行在不同机器上或同一网络中的虚拟机上运行的多个进程组成,则 Throttle 可能适合。
Throttle 旨在易于操作,在边缘情况中表现良好,并且无需持久后端即可工作。
特性
- 服务器可在 Windows、Linux 和 OS-X 上构建和运行。
- 客户端
- Python:具有阻塞 API 的高级客户端。
- Rust:具有异步 API 的低级客户端。
- 通过强制执行锁层次结构来防止死锁。
- 公平性(等待时间较长的对等体具有优先级)
- 计数较大的锁不会被计数较少的许多其他锁饿死。
- 对网络中断具有弹性。
- 锁会过期,以防止由于网络错误或客户端崩溃而导致信号量计数泄漏。
- 可以通过向服务器发送心跳无限期地延长锁。
- 可观察性
- Prometheus 指标
- 记录到 stderr
- 无需持久后端。
- 服务器在内存中保持状态。
- 客户端在服务器重新启动的情况下将状态恢复到服务器。
安装
服务器
使用 Docker 运行
Throttle 服务器也作为小型容器镜像发布到 Docker Hub。
docker pull pacman82/throttle
假设您在当前工作目录中有一个 throttle.toml
配置文件,您可以使用以下命令运行服务器:
docker run --rm -v ${PWD}:/cfg -p 8000:8000 pacman82/throttle -c cfg/throttle.toml
下载二进制版本
对于 Windows、OS-X 和 Linux,GitHub 发布中提供了预构建的二进制文件。您可以在 https://github.com/pacman82/throttle/releases/latest 找到最新版本。
使用 Cargo 安装
服务器二进制文件发布到 crates.io,因此可以通过 cargo 安装。
cargo install throttle-server
Python 客户端
Python 客户端发布到 PyPi,并可以使用 pip 安装。
pip install throttle-client
用法
操作 Throttle 服务器
启动和关闭
假设节流可执行文件位于您的路径环境变量中,您可以通过执行它来启动节流服务器。您可以使用 --help
标志显示可用的命令行选项。不带任何参数启动它将使用默认配置启动服务器。
throttle
这将启动当前进程中的服务器。使用浏览器导航到 localhost:8000
以查看欢迎消息。您可以通过按 Ctrl + C
来优雅地关闭 Throttle。
记录到 stderr
将 THROTTLE_LOG
环境变量设置为在标准错误上查看更多输出。有效值有 ERROR
、WARN
、INFO
、DEBUG
和 TRACE
。
在 bash 中
THROTTLE_LOG=WARN
或 PowerShell
$env:THROTTLE_LOG="INFO"
现在启动服务器将提供更多信息。
[2020-04-12T18:56:23Z INFO throttle] Hello From Throttle
[2020-04-12T18:56:23Z WARN throttle] No semaphores configured.
Toml 配置文件
为了实际提供信号量,我们需要配置它们的名字和完整计数。默认情况下,如果存在,Throttle 会在工作目录中查找 throttle.toml
文件作为配置。
# Sample throttle.cfg Explaining the options
[semaphores]
# Specify name and full count of semaphores. Below line creates a semaphore named A with a full
# count of 42. Setting the count to 1 would create a Mutex.
A = 42
## Optional logging config, to log to stderr. Can be overwritten using the `THROTTLE_LOG`
## environment variable.
# [logging.stderr]
# Set this to either ERROR, WARN, INFO, DEBUG or TRACE.
# level = "INFO"
度量
Throttle 支持通过 /metrics
端点提供 Prometheus 度量。根据您的配置和状态,它们可能看起来像这样
# HELP throttle_acquired Sum of all acquired locks.
# TYPE throttle_acquired gauge
throttle_acquired{semaphore="A"} 0
# HELP throttle_longest_pending_sec Time the longest pending peer is waiting until now, to acquire a lock to a semaphore.
# TYPE throttle_longest_pending_sec gauge
throttle_longest_pending_sec{semaphore="A"} 0
# HELP throttle_max Maximum allowed lock count for this semaphore.
# TYPE throttle_max gauge
throttle_max{semaphore="A"} 42
# HELP throttle_num_404 Number of Get requests to unknown resource.
# TYPE throttle_num_404 counter
throttle_num_404 0
# HELP throttle_pending Sum of all pending locks
# TYPE throttle_pending gauge
throttle_pending{semaphore="A"} 0
Python 客户端
Throttle 随附 Python 客户端。以下是它的简要用法。
from throttle_client import Peer, lock
# Configure endpoint to throttle server
url = "https://127.0.0.1:8000"
# Acquire a lock (with count 1) to semaphore A
with lock(url, "A"):
# Do stuff while holding lock to "A"
# ...
# For acquiring lock count != 1 the count can be explicitly specified.
with lock(url, "A", count=4):
# Do stuff while holding lock to "A"
# ...
# A is released at the end of with block
使用锁层次结构防止死锁
假设有两个信号量 A
和 B
。
[semaphores]
A = 1
B = 1
您希望以嵌套的方式获取它们的锁
from throttle_client import Peer, lock
# Configure endpoint to throttle server
url = "https://127.0.0.1:8000"
# Acquire a lock to semaphore A
with lock(url, "A"):
# Do stuff while holding lock to "A"
# ...
with lock(url, "B") # <-- This throws an exception: "Lock Hierarchy Violation".
# ...
节流服务器可以帮助您防止死锁。如果 A
和 B
不总是以相同的顺序锁定,那么在某个时刻,您的系统可能会发生死锁。这类错误很难调试,这就是为什么节流在发生任何死锁机会时都会提前失败。为了启用上述用例,给 A
分配比 B
更高的锁级别。
[semaphores]
A = { max=1, level=1 }
# Level 0 is default. So the short notation is still good for B.
B = 1
Http 路由
- GET
/
:打印问候消息 - GET
/health
:始终回答200 OK
- GET
/metrics:
:Prometheus 度量 - GET
/version
:返回服务器版本。
管理对等方和锁的路由
-
POST
new_peer
:创建新的对等方。此请求的主体必须包含带引号的带维度的人类可读时间持续时间。例如:"expires_in": "5m"
、"expires_in": "30s"
或"expires_in": "12h"
。这是对等方将过期的持续时间,除非通过延长其过期时间来保持其对等方。每个获取的锁总是与一个对等方关联。如果对等方过期,所有锁都将释放。请求返回一个随机整数作为对等方 ID。 -
DELETE
/peers/{id}
:删除对等方,在此过程中释放所有其锁。每个对new_peer
的调用都应该与对这条路由的调用相匹配,这样其他对等方就不必等待此对等方过期才能获取相同信号量的锁。 -
PUT
/peers/{id}/{semaphore}
:获取现有节点上信号量的锁。请求体必须包含所需的锁计数。如果锁可以获取,则节流将返回200 Ok
,如果锁无法获取直到其他节点释放它们的锁,则返回202 Accepted
。指定大于锁消息完整计数的锁计数或违反锁层次结构将导致409 Conflict
错误。请求未知信号量或未知节点的锁将导致400 Bad Request
。此请求是幂等的,因此,在超时的情况下可以重复获取锁,而不会耗尽信号量。如果客户端等待锁,可以使用可选的block_for
查询参数避免忙等待。例如:/peer/{id}/{semaphore}?block_for=10s
。获取计数为0
的锁的语义可能很尴尬,因此目前禁止。 -
DELETE
/peers/{id}/{semaphore}
:释放节点的一个特定锁。 -
POST
/restore
:客户端可以用来响应包含Unknown Semaphore
的400 Bad Request
。此错误表示服务器不记得客户端的状态(例如,客户端可能由于长时间连接丢失而过期)。在这种情况下,客户端可以选择将其以前的状态和获取的锁恢复到服务器。请求体包含如下JSON:{ "expires_in": "5m", "peer_id": 42, "acquired": { "A": 3, "B": 1 } }
这将恢复ID为
42
,生存期为5分钟的客户端。它对A有3个锁,对B有1个锁。 -
Get
/remainder?semaphore={semaphore}
:回答最大信号量计数减去此信号量所有已获取锁的总和。响应是纯文本整数。 -
Get
/peers/{id}/is_acquired
:如果节点有一个挂起的锁,则回答false
。如果节点所有锁都已获取,则答案为true
。
依赖关系
~9–16MB
~211K SLoC