#throttle #sync #threading #throttling

slottle

专为基于线程的并发设计的节流池库

7个不稳定版本 (3个破坏性更改)

0.4.2 2021年3月7日
0.4.1 2021年3月7日
0.4.0 2021年2月11日
0.3.0 2021年1月3日
0.1.1 2020年11月28日

#763 in 并发

MIT许可证

41KB
610 代码行

slottle

一个简单的Rust crate,提供基于线程的节流池。可以通过用户提供的资源ID动态创建多个节流器。

例如,一个网络爬虫工具可能会将域名作为资源ID来以通用方式控制每个主机的访问速度。用户可以同时创建多个池,每个池针对不同情况有不同的配置。

功能

  • 不仅提供单个节流器,还提供节流池。(如果不需要,可以忽略)
  • 并发和延迟间隔均可配置。
  • 允许用户定义算法动态生成延迟间隔。
  • 对故障敏感,并支持内置的重试。
  • 易于使用。
  • 详尽的文档。

请查看在线文档以获取更多详细信息。

许可证

MIT


lib.rs:

专为基于线程的并发设计的节流池库。

概念

该crate包含两种主要类型:ThrottlePoolThrottle

每个 Throttle 都有自己的并发和延迟状态。另一方面,ThrottlePool 可以在第一个使用对应的 id 时自动创建 Throttle。用户可以将 id 视为某种类型的资源标识,如主机名、IP地址等。

以下是一个 ThrottlePool 的运行图,其中 concurrent == 2

ThrottlePool
 |
 +-- Throttle (resource-1)
 |      |
 |      +-- Thread quota 1     ... run ...
 |      +-- Thread quota 2     ... run ...
 |
 +-- Throttle (resource-2)
 |      |
 |      +-- Thread quota 3     ... run ...
 |      +-- Thread quota 4     ... run ...
 ...
 +-- Throttle (resource-N)
        |
        +-- Thread quota 2N-1  ... run ...
        +-- Thread quota 2N    ... run ...

如果 concurrent == 1,线程配额使用可能如下所示

f: assigned jobs, s: sleep function

thread 1:   |f()----|s()----|f()--|s()------|f()----------------|f()-----|..........|f()--
            |   interval    |   interval    |   interval    |...|   interval    |...|
                                            ^^^^^^^^^^^^^^^^^^^^^
            job run longer than interval --^                             ^^^^^^^^
            so skip sleep() step                                        /
                                                                       /
                If new job not inject into the -----------------------
                "should wait interval", sleep() will not be triggered

time pass ----->

如果 concurrent == 2,线程将按如下方式工作

f: assigned jobs, s: sleep function

thread 1:   |f()----|s()----|f()--|s()------|f()------------------------------|....|f()--
            |   interval    |   interval    |           2x interval         |......|

thread 2:   ........|f()-|s()-------|f()-------|s()-|f()|s()|f()--|s|f()-|s-|f()---------
            ........|   interval    |   interval    |  1/2  |  1/2  |  1/2  |
                                                    ^^^^^^^^^^^^^^^^^^^^^^^^^
                max concurrent forced to 2  -------^
                but expected value of maximux access speed is "concurrent per interval".

time pass ----->

Throttle 不会创建线程,而只是阻塞当前线程。用户应该自己创建线程,并将节流器同步到所有这些线程,以完全控制访问速度。

如果不需要与连接池相关的功能,用户可以直接使用 Throttle

示例

use rayon::prelude::*;
use slottle::ThrottlePool;
use std::time::{Duration, Instant};

// Make sure we have enough of threads can be blocked.
// Here we use rayon as example but you can choice any thread implementation.
rayon::ThreadPoolBuilder::new()
    .num_threads(8)
    .build_global()
    .unwrap();

// Create ThrottlePool.
//
// In here `id` is `bool` type for demonstration.
// If you're writing a web spider, type of `id` might should be `url::Host`.
let throttles: ThrottlePool<bool> = ThrottlePool::builder()
    .interval(Duration::from_millis(20)) // set interval to 20ms
    .concurrent(2) // set concurrent to 2
    .build()
    .unwrap();

// HINT: according previous config, expected access speed is
// 2 per 20ms = 1 per 10ms (in each throttle)

let started_time = Instant::now();

let mut all_added_one: Vec<i32> = vec![1, 2, 3, 4, 5, 6]
    .into_par_iter()
    .map(|x| {
        throttles
            .get(x >= 5)    // 5,6 in throttle `true` & 1,2,3,4 in throttle `false`
            .run(|| {       // here is the operation we want to throttling
                let time_passed_ms = started_time.elapsed().as_secs_f64() * 1000.0;
                println!(
                    "[throttle: {:>5}] allowed job {} to start at: {:.2}ms",
                    x >= 5, x, time_passed_ms,
                );

                // // you can add some long-running task to see how throttle work
                // std::thread::sleep(Duration::from_millis(40));

                x + 1
            })
    })
    .collect();

assert_eq!(all_added_one, vec![2, 3, 4, 5, 6, 7]);

输出

[throttle: false] allowed job 1 to start at: 0.09ms
[throttle:  true] allowed job 6 to start at: 0.10ms
[throttle: false] allowed job 4 to start at: 10.40ms
[throttle:  true] allowed job 5 to start at: 10.42ms
[throttle: false] allowed job 3 to start at: 20.12ms
[throttle: false] allowed job 2 to start at: 30.12ms

存储库命名

存储库名称 slottle 是“槽式节流”的缩写。这是 ThrottlePool 的原始名称。

依赖关系