7个不稳定版本 (3个破坏性更改)
| 0.4.2 | 2021年3月7日 | 
|---|---|
| 0.4.1 | 2021年3月7日 | 
| 0.4.0 | 2021年2月11日 | 
| 0.3.0 | 2021年1月3日 | 
| 0.1.1 | 2020年11月28日 | 
#763 in 并发
41KB
610 代码行
slottle
一个简单的Rust crate,提供基于线程的节流池。可以通过用户提供的资源ID动态创建多个节流器。
例如,一个网络爬虫工具可能会将域名作为资源ID来以通用方式控制每个主机的访问速度。用户可以同时创建多个池,每个池针对不同情况有不同的配置。
功能
- 不仅提供单个节流器,还提供节流池。(如果不需要,可以忽略)
- 并发和延迟间隔均可配置。
- 允许用户定义算法动态生成延迟间隔。
- 对故障敏感,并支持内置的重试。
- 易于使用。
- 详尽的文档。
请查看在线文档以获取更多详细信息。
许可证
MIT
lib.rs:
专为基于线程的并发设计的节流池库。
概念
该crate包含两种主要类型:ThrottlePool 和 Throttle。
每个 Throttle 都有自己的并发和延迟状态。另一方面,ThrottlePool 可以在第一个使用对应的 id 时自动创建 Throttle。用户可以将 id 视为某种类型的资源标识,如主机名、IP地址等。
以下是一个 ThrottlePool 的运行图,其中 concurrent == 2。
ThrottlePool
 |
 +-- Throttle (resource-1)
 |      |
 |      +-- Thread quota 1     ... run ...
 |      +-- Thread quota 2     ... run ...
 |
 +-- Throttle (resource-2)
 |      |
 |      +-- Thread quota 3     ... run ...
 |      +-- Thread quota 4     ... run ...
 ...
 +-- Throttle (resource-N)
        |
        +-- Thread quota 2N-1  ... run ...
        +-- Thread quota 2N    ... run ...
如果 concurrent == 1,线程配额使用可能如下所示
f: assigned jobs, s: sleep function
thread 1:   |f()----|s()----|f()--|s()------|f()----------------|f()-----|..........|f()--
            |   interval    |   interval    |   interval    |...|   interval    |...|
                                            ^^^^^^^^^^^^^^^^^^^^^
            job run longer than interval --^                             ^^^^^^^^
            so skip sleep() step                                        /
                                                                       /
                If new job not inject into the -----------------------
                "should wait interval", sleep() will not be triggered
time pass ----->
如果 concurrent == 2,线程将按如下方式工作
f: assigned jobs, s: sleep function
thread 1:   |f()----|s()----|f()--|s()------|f()------------------------------|....|f()--
            |   interval    |   interval    |           2x interval         |......|
thread 2:   ........|f()-|s()-------|f()-------|s()-|f()|s()|f()--|s|f()-|s-|f()---------
            ........|   interval    |   interval    |  1/2  |  1/2  |  1/2  |
                                                    ^^^^^^^^^^^^^^^^^^^^^^^^^
                max concurrent forced to 2  -------^
                but expected value of maximux access speed is "concurrent per interval".
time pass ----->
Throttle 不会创建线程,而只是阻塞当前线程。用户应该自己创建线程,并将节流器同步到所有这些线程,以完全控制访问速度。
如果不需要与连接池相关的功能,用户可以直接使用 Throttle。
示例
use rayon::prelude::*;
use slottle::ThrottlePool;
use std::time::{Duration, Instant};
// Make sure we have enough of threads can be blocked.
// Here we use rayon as example but you can choice any thread implementation.
rayon::ThreadPoolBuilder::new()
    .num_threads(8)
    .build_global()
    .unwrap();
// Create ThrottlePool.
//
// In here `id` is `bool` type for demonstration.
// If you're writing a web spider, type of `id` might should be `url::Host`.
let throttles: ThrottlePool<bool> = ThrottlePool::builder()
    .interval(Duration::from_millis(20)) // set interval to 20ms
    .concurrent(2) // set concurrent to 2
    .build()
    .unwrap();
// HINT: according previous config, expected access speed is
// 2 per 20ms = 1 per 10ms (in each throttle)
let started_time = Instant::now();
let mut all_added_one: Vec<i32> = vec![1, 2, 3, 4, 5, 6]
    .into_par_iter()
    .map(|x| {
        throttles
            .get(x >= 5)    // 5,6 in throttle `true` & 1,2,3,4 in throttle `false`
            .run(|| {       // here is the operation we want to throttling
                let time_passed_ms = started_time.elapsed().as_secs_f64() * 1000.0;
                println!(
                    "[throttle: {:>5}] allowed job {} to start at: {:.2}ms",
                    x >= 5, x, time_passed_ms,
                );
                // // you can add some long-running task to see how throttle work
                // std::thread::sleep(Duration::from_millis(40));
                x + 1
            })
    })
    .collect();
assert_eq!(all_added_one, vec![2, 3, 4, 5, 6, 7]);
输出
[throttle: false] allowed job 1 to start at: 0.09ms
[throttle:  true] allowed job 6 to start at: 0.10ms
[throttle: false] allowed job 4 to start at: 10.40ms
[throttle:  true] allowed job 5 to start at: 10.42ms
[throttle: false] allowed job 3 to start at: 20.12ms
[throttle: false] allowed job 2 to start at: 30.12ms
存储库命名
存储库名称 slottle 是“槽式节流”的缩写。这是 ThrottlePool 的原始名称。