30 个版本 (5 个稳定版)
1.1.2 | 2024年5月22日 |
---|---|
1.0.1 | 2023年6月5日 |
0.12.5 | 2023年5月12日 |
0.12.4 | 2023年3月22日 |
0.4.1 | 2019年10月6日 |
#24 in 并发
41,156 每月下载
在 26 个 Crates 中使用 (17 个直接使用)
77KB
1K SLoC
leaky-bucket
基于 漏桶 算法的令牌速率限制器。
如果桶溢出并且超过最大配置容量,则尝试获取令牌的任务将被挂起,直到从桶中排出所需数量的令牌。
由于这个 crate 使用了来自 tokio 的计时功能,因此必须在启用 time
特性 的 Tokio 运行时中使用。
这个库有一些很酷的功能,包括
不需要后台任务。这通常是令牌桶速率限制器驱动进度所必需的。相反,等待的任务之一暂时承担协调者的角色(称为 核心)。这减少了需要睡眠的任务数量,这可能是不精确睡眠实现和紧密限制器的抖动来源。以下有更多详细信息。
丢弃的任务会释放它们所保留的任何资源。因此,构建和取消异步任务不会占用它们从未使用过的等待槽,这对于基于单元格的速率限制器的情况是会发生的。
用法
核心类型是 RateLimiter
,它允许通过其 acquire
、try_acquire
和 acquire_one
方法来限制某部分的吞吐量。
以下是一个简单的示例,其中我们通过 HTTP Client
包装请求,以确保我们不超过给定的限制
use leaky_bucket::RateLimiter;
/// A blog client.
pub struct BlogClient {
limiter: RateLimiter,
client: Client,
}
struct Post {
// ..
}
impl BlogClient {
/// Get all posts from the service.
pub async fn get_posts(&self) -> Result<Vec<Post>> {
self.request("posts").await
}
/// Perform a request against the service, limiting requests to abide by a rate limit.
async fn request<T>(&self, path: &str) -> Result<T>
where
T: DeserializeOwned
{
// Before we start sending a request, we block on acquiring one token.
self.limiter.acquire(1).await;
self.client.request::<T>(path).await
}
}
实现细节
每个速率限制器有两种获取模式。一种快速路径和一种慢速路径。如果所需的令牌数量随时可用,则使用快速路径,只需递减共享池中可用的令牌数量即可。
如果所需的令牌数量不可用,任务将被迫暂停,直到下一次补充间隔。此时,获取任务中的一个将切换为作为核心工作。这被称为核心切换。
use leaky_bucket::RateLimiter;
use tokio::time::Duration;
let limiter = RateLimiter::builder()
.initial(10)
.interval(Duration::from_millis(100))
.build();
// This is instantaneous since the rate limiter starts with 10 tokens to
// spare.
limiter.acquire(10).await;
// This however needs to core switch and wait for a while until the desired
// number of tokens is available.
limiter.acquire(3).await;
核心负责在配置的间隔内休眠,以便可以添加更多令牌。之后,它确保所有等待获取包括它自己的任务都适当地取消暂停。
按需核心切换使得这个速率限制器实现可以在没有协调后台线程的情况下工作。但是,我们需要确保使用RateLimiter
的任何异步任务必须完成一个acquire
调用,或者通过被丢弃来取消。
如果这些都不成立,核心可能会泄漏并无限期锁定,阻止速率限制器在未来取得进展。这类似于如果你锁定了一个异步Mutex
但从不释放其保护者。
你可以使用以下命令运行此示例
cargo run --example block_forever
use std::future::Future;
use std::sync::Arc;
use std::task::Context;
use leaky_bucket::RateLimiter;
struct Waker;
let limiter = Arc::new(RateLimiter::builder().build());
let waker = Arc::new(Waker).into();
let mut cx = Context::from_waker(&waker);
let mut a0 = Box::pin(limiter.acquire(1));
// Poll once to ensure that the core task is assigned.
assert!(a0.as_mut().poll(&mut cx).is_pending());
assert!(a0.is_core());
// We leak the core task, preventing the rate limiter from making progress
// by assigning new core tasks.
std::mem::forget(a0);
// Awaiting acquire here would block forever.
// limiter.acquire(1).await;
公平性
默认情况下,RateLimiter
使用一个公平调度器。这确保即使在有很多任务等待获取令牌的情况下,核心任务也能取得进展。这可能会增加更多核心切换,从而增加总工作量。不公平的调度器预期在竞争情况下做更少的工作。但是,如果没有公平调度,某些任务的实际获取时间可能会比预期更长。
不公平的速率限制器也有访问获取令牌的快速路径,这可能会进一步提高吞吐量。
可以通过Builder::fair
选项调整这种行为。
use leaky_bucket::RateLimiter;
let limiter = RateLimiter::builder()
.fair(false)
.build();
unfair-scheduling
示例可以展示这种现象。
cargo run --example unfair_scheduling
# fair
Max: 1011ms, Total: 1012ms
Timings:
0: 101ms
1: 101ms
2: 101ms
3: 101ms
4: 101ms
...
# unfair
Max: 1014ms, Total: 1014ms
Timings:
0: 1014ms
1: 101ms
2: 101ms
3: 101ms
4: 101ms
...
如上所示,在不公平调度器中的第一个任务运行时间更长,因为它优先考虑释放等待获取的其他任务,而不是自己。
依赖项
~2.4–8.5MB
~61K SLoC