7个不稳定版本 (3个破坏性更改)
0.4.2 | 2021年3月7日 |
---|---|
0.4.1 | 2021年3月7日 |
0.4.0 | 2021年2月11日 |
0.3.0 | 2021年1月3日 |
0.1.1 | 2020年11月28日 |
#763 in 并发
41KB
610 代码行
slottle
一个简单的Rust crate,提供基于线程的节流池。可以通过用户提供的资源ID动态创建多个节流器。
例如,一个网络爬虫工具可能会将域名作为资源ID来以通用方式控制每个主机的访问速度。用户可以同时创建多个池,每个池针对不同情况有不同的配置。
功能
- 不仅提供单个节流器,还提供节流池。(如果不需要,可以忽略)
- 并发和延迟间隔均可配置。
- 允许用户定义算法动态生成延迟间隔。
- 对故障敏感,并支持内置的重试。
- 易于使用。
- 详尽的文档。
请查看在线文档以获取更多详细信息。
许可证
MIT
lib.rs
:
专为基于线程的并发设计的节流池库。
概念
该crate包含两种主要类型:ThrottlePool
和 Throttle
。
每个 Throttle
都有自己的并发和延迟状态。另一方面,ThrottlePool
可以在第一个使用对应的 id
时自动创建 Throttle
。用户可以将 id
视为某种类型的资源标识,如主机名、IP地址等。
以下是一个 ThrottlePool
的运行图,其中 concurrent
== 2
。
ThrottlePool
|
+-- Throttle (resource-1)
| |
| +-- Thread quota 1 ... run ...
| +-- Thread quota 2 ... run ...
|
+-- Throttle (resource-2)
| |
| +-- Thread quota 3 ... run ...
| +-- Thread quota 4 ... run ...
...
+-- Throttle (resource-N)
|
+-- Thread quota 2N-1 ... run ...
+-- Thread quota 2N ... run ...
如果 concurrent == 1
,线程配额使用可能如下所示
f: assigned jobs, s: sleep function
thread 1: |f()----|s()----|f()--|s()------|f()----------------|f()-----|..........|f()--
| interval | interval | interval |...| interval |...|
^^^^^^^^^^^^^^^^^^^^^
job run longer than interval --^ ^^^^^^^^
so skip sleep() step /
/
If new job not inject into the -----------------------
"should wait interval", sleep() will not be triggered
time pass ----->
如果 concurrent == 2
,线程将按如下方式工作
f: assigned jobs, s: sleep function
thread 1: |f()----|s()----|f()--|s()------|f()------------------------------|....|f()--
| interval | interval | 2x interval |......|
thread 2: ........|f()-|s()-------|f()-------|s()-|f()|s()|f()--|s|f()-|s-|f()---------
........| interval | interval | 1/2 | 1/2 | 1/2 |
^^^^^^^^^^^^^^^^^^^^^^^^^
max concurrent forced to 2 -------^
but expected value of maximux access speed is "concurrent per interval".
time pass ----->
Throttle
不会创建线程,而只是阻塞当前线程。用户应该自己创建线程,并将节流器同步到所有这些线程,以完全控制访问速度。
如果不需要与连接池相关的功能,用户可以直接使用 Throttle
。
示例
use rayon::prelude::*;
use slottle::ThrottlePool;
use std::time::{Duration, Instant};
// Make sure we have enough of threads can be blocked.
// Here we use rayon as example but you can choice any thread implementation.
rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build_global()
.unwrap();
// Create ThrottlePool.
//
// In here `id` is `bool` type for demonstration.
// If you're writing a web spider, type of `id` might should be `url::Host`.
let throttles: ThrottlePool<bool> = ThrottlePool::builder()
.interval(Duration::from_millis(20)) // set interval to 20ms
.concurrent(2) // set concurrent to 2
.build()
.unwrap();
// HINT: according previous config, expected access speed is
// 2 per 20ms = 1 per 10ms (in each throttle)
let started_time = Instant::now();
let mut all_added_one: Vec<i32> = vec![1, 2, 3, 4, 5, 6]
.into_par_iter()
.map(|x| {
throttles
.get(x >= 5) // 5,6 in throttle `true` & 1,2,3,4 in throttle `false`
.run(|| { // here is the operation we want to throttling
let time_passed_ms = started_time.elapsed().as_secs_f64() * 1000.0;
println!(
"[throttle: {:>5}] allowed job {} to start at: {:.2}ms",
x >= 5, x, time_passed_ms,
);
// // you can add some long-running task to see how throttle work
// std::thread::sleep(Duration::from_millis(40));
x + 1
})
})
.collect();
assert_eq!(all_added_one, vec![2, 3, 4, 5, 6, 7]);
输出
[throttle: false] allowed job 1 to start at: 0.09ms
[throttle: true] allowed job 6 to start at: 0.10ms
[throttle: false] allowed job 4 to start at: 10.40ms
[throttle: true] allowed job 5 to start at: 10.42ms
[throttle: false] allowed job 3 to start at: 20.12ms
[throttle: false] allowed job 2 to start at: 30.12ms
存储库命名
存储库名称 slottle
是“槽式节流”的缩写。这是 ThrottlePool
的原始名称。