#thread-pool #worker-thread #thread #async #future #send

rusty_pool

基于 crossbeam 的多生产者多消费者通道的自动增长/缩减 ThreadPool 实现,支持等待任务结果并提供异步支持

13 个不稳定版本

0.7.0 2022年5月5日
0.6.0 2021年5月9日
0.5.1 2021年2月27日
0.4.3 2020年8月11日
0.4.2 2020年6月30日

#63并发

Download history 19505/week @ 2024-03-14 16102/week @ 2024-03-21 16868/week @ 2024-03-28 22327/week @ 2024-04-04 24380/week @ 2024-04-11 20881/week @ 2024-04-18 21559/week @ 2024-04-25 20319/week @ 2024-05-02 22634/week @ 2024-05-09 25810/week @ 2024-05-16 24249/week @ 2024-05-23 30151/week @ 2024-05-30 24535/week @ 2024-06-06 31075/week @ 2024-06-13 20660/week @ 2024-06-20 7188/week @ 2024-06-27

89,050 每月下载量
19 Crates 中使用(13 个直接使用)

Apache-2.0

100KB
1.5K SLoC

rusty_pool

基于 crossbeam 的多生产者多消费者通道的自动增长/缩减 ThreadPool 实现,支持等待任务结果并提供异步支持。

ThreadPool 有两个不同的池大小;一个核心池大小,其中填充着与通道一样长的线程,以及一个最大池大小,它描述了可能同时存在的最大工作线程数量。这些额外的非核心线程在创建 ThreadPool 时有一个特定的 keep_alive 时间,该时间定义了这些线程在没有收到任何工作之前可以空闲多久,然后放弃并终止其工作循环。

ThreadPool 不会在提交任务之前创建任何线程。然后,它将为每个任务创建一个新线程,直到核心池大小填满。之后,只有在当前池小于最大池大小且没有空闲线程的情况下,才会创建新线程。

类似于 evaluate()complete() 的函数返回一个 JoinHandle,可用于等待提交的任务或未来的结果。可以将 JoinHandles 发送到线程池以创建一个任务,该任务阻塞工作线程直到收到其他任务的结果,然后对该结果进行操作。如果任务崩溃,JoinHandle 将收到一个取消错误。这是通过使用 futures oneshot 通道与工作线程进行通信来实现的。

ThreadPool 可用作未来执行器,如果启用“异步”功能,默认情况下是启用的。该“异步”功能包括 spawn()try_spawn() 函数,这些函数创建一个任务,逐个轮询未来,并在可以取得进展时重新提交未来到池中。如果没有“异步”功能,可以使用 complete 函数简单地执行到完成,该函数会阻塞工作线程直到未来被轮询完成。

如果不需要,可以通过将以下内容添加到您的 Cargo 依赖项中禁用“异步”功能

[dependencies.rusty_pool]
default-features = false
version = "*"

在创建新工作线程时,此 ThreadPool 会尝试使用比较和交换机制增加工作线程计数,如果由于另一个线程已经将总工作线程计数增加到指定的限制(尝试创建核心线程时的 core_size,否则为 max_size)而导致增加失败,则池会尝试创建非核心工作线程(如果之前尝试创建核心线程且没有空闲工作线程)或将任务发送到通道。恐慌工作线程始终被克隆并替换。

锁仅用于 join 函数,以锁定 Condvar,除此之外,此 ThreadPool 实现完全依赖于 crossbeam 和原子操作。此 ThreadPool 通过比较总工作线程数和空闲工作线程数来决定当前是否空闲(并且应该快速返回 join 尝试),这两个值存储在一个 AtomicUsize 中(都是 usize 的一半大小),确保如果两者都被更新,则可以在单个原子操作中更新。

可以使用 shutdown 函数销毁线程池及其 crossbeam 通道,但这不会停止已运行的任务,但会在下一次尝试从通道获取工作时会终止线程。通道仅在所有 ThreadPool 克隆都已关闭/丢弃后才会被销毁。

安装

要将 rusty_pool 添加到您的项目中,只需添加以下 Cargo 依赖项

[dependencies]
rusty_pool = "0.7.0"

或者要排除“异步”功能

[dependencies.rusty_pool]
version = "0.7.0"
default-features = false

用法

创建一个新的 ThreadPool

use rusty_pool::Builder;
use rusty_pool::ThreadPool;
// Create default `ThreadPool` configuration with the number of CPUs as core pool size
let pool = ThreadPool::default();
// Create a `ThreadPool` with default naming:
use std::time::Duration;
let pool2 = ThreadPool::new(5, 50, Duration::from_secs(60));
// Create a `ThreadPool` with a custom name:
let pool3 = ThreadPool::new_named(String::from("my_pool"), 5, 50, Duration::from_secs(60));
// using the Builder struct:
let pool4 = Builder::new().core_size(5).max_size(50).build();

将闭包提交给 ThreadPool 执行

use rusty_pool::ThreadPool;
use std::thread;
use std::time::Duration;
let pool = ThreadPool::default();
pool.execute(|| {
    thread::sleep(Duration::from_secs(5));
    print!("hello");
});

提交任务并等待结果

use rusty_pool::ThreadPool;
use std::thread;
use std::time::Duration;
let pool = ThreadPool::default();
let handle = pool.evaluate(|| {
    thread::sleep(Duration::from_secs(5));
    return 4;
});
let result = handle.await_complete();
assert_eq!(result, 4);

使用 ThreadPool 创建未来

async fn some_async_fn(x: i32, y: i32) -> i32 {
    x + y
}

async fn other_async_fn(x: i32, y: i32) -> i32 {
    x - y
}

use rusty_pool::ThreadPool;
let pool = ThreadPool::default();

// simply complete future by blocking a worker until the future has been completed
let handle = pool.complete(async {
    let a = some_async_fn(4, 6).await; // 10
    let b = some_async_fn(a, 3).await; // 13
    let c = other_async_fn(b, a).await; // 3
    some_async_fn(c, 5).await // 8
});
assert_eq!(handle.await_complete(), 8);

use std::sync::{Arc, atomic::{AtomicI32, Ordering}};

// spawn future and create waker that automatically re-submits itself to the threadpool if ready to make progress, this requires the "async" feature which is enabled by default
let count = Arc::new(AtomicI32::new(0));
let clone = count.clone();
pool.spawn(async move {
    let a = some_async_fn(3, 6).await; // 9
    let b = other_async_fn(a, 4).await; // 5
    let c = some_async_fn(b, 7).await; // 12
    clone.fetch_add(c, Ordering::SeqCst);
});
pool.join();
assert_eq!(count.load(Ordering::SeqCst), 12);

加入并关闭 ThreadPool

use std::thread;
use std::time::Duration;
use rusty_pool::ThreadPool;
use std::sync::{Arc, atomic::{AtomicI32, Ordering}};

let pool = ThreadPool::default();
for _ in 0..10 {
    pool.execute(|| { thread::sleep(Duration::from_secs(10)) })
}
// wait for all threads to become idle, i.e. all tasks to be completed including tasks added by other threads after join() is called by this thread or for the timeout to be reached
pool.join_timeout(Duration::from_secs(5));

let count = Arc::new(AtomicI32::new(0));
for _ in 0..15 {
    let clone = count.clone();
    pool.execute(move || {
        thread::sleep(Duration::from_secs(5));
        clone.fetch_add(1, Ordering::SeqCst);
    });
}

// shut down and drop the only instance of this `ThreadPool` (no clones) causing the channel to be broken leading all workers to exit after completing their current work
// and wait for all workers to become idle, i.e. finish their work.
pool.shutdown_join();
assert_eq!(count.load(Ordering::SeqCst), 15);

性能

从提交任务到池的线程的角度来看性能,rusty_pool 应该比任何使用 std::sync::mpsc 的池(如 rust-threadpool)在大多数情况下提供更好的性能,这归功于 crossbeam 团队的大量工作。在某些极端竞争的情况下,rusty_pool 可能会落后于 rust-threadpool,尽管已经发现这种情况的场景几乎不切实际,它们需要循环提交空任务,并且这取决于平台。在测试的场景中,macOS 表现得特别出色,可能是因为 macOS 在优化原子操作上投入了大量努力,Swift 的引用计数依赖于它。显然,这应该在 Apple Silicon 上得到放大,但 rusty_pool 尚未在该平台上进行测试。以下测试是在具有 AMD Ryzen 9 3950X 的 PC(Linux 和 Windows)以及具有 Intel i9-9880H 的 MacBook Pro 15" 2019(macOS)上执行的。

测试 1:无竞争

所有任务都由同一线程提交,且任务持续时间长于测试,这意味着所有原子操作(读取和递增工作计数器)都由主线程执行,因为新创建的工作者不会在完成初始任务和递增空闲计数器之前更改计数器。

fn main() {
    let now = std::time::Instant::now();

    let pool = rusty_pool::Builder::new().core_size(10).max_size(10).build();
    //let pool = threadpool::ThreadPool::new(10);

    for _ in 0..10000000 {
        pool.execute(|| {
            thread::sleep(std::time::Duration::from_secs(1));
        });
    }

    let millis = now.elapsed().as_millis();
    println!("millis: {}", millis);
}

结果(以毫秒为单位,平均值)

rusty_pool 0.5.1

Windows MacOS Linux
221.6 293.07 183.73

rusty_pool 0.5.0

Windows MacOS Linux
224.6 315.6 187.0

rust-threadpool 1.8.1

Windows MacOS Linux
476.4 743.4 354.3

rusty_pool 0.4.3

Windows MacOS Linux
237.5 318.1 181.3

测试 2:多个生产者

除了主线程外,还有 10 个其他线程向池提交任务。与之前的测试不同,任务的持续时间不再长于测试,因此不仅生产者之间在工作者计数器上存在竞争,而且工作者线程在更新空闲计数器时也存在竞争。这是一个相当真实但有些极端的例子。

fn main() {
    let now = std::time::Instant::now();

    let pool = rusty_pool::Builder::new().core_size(10).max_size(10).build();
    //let pool = threadpool::ThreadPool::new(10);

    for _ in 0..10 {
        let pool = pool.clone();

        std::thread::spawn(move || {
            for _ in 0..10000000 {
                pool.execute(|| {
                    std::thread::sleep(std::time::Duration::from_secs(1));
                });
            }
        });
    }

    for _ in 0..10000000 {
        pool.execute(|| {
            std::thread::sleep(std::time::Duration::from_secs(1));
        });
    }

    let millis = now.elapsed().as_millis();
    println!("millis: {}", millis);
}

结果(以毫秒为单位,平均值)

rusty_pool 0.5.1

Windows* MacOS Linux
7692.4 3656.2 7514.53

rusty_pool 0.5.0

Windows MacOS Linux Windows*
6251.0 4417.7 7903.1 7774.67

rust-threadpool 1.8.1

Windows MacOS Linux
10030.5 5810.5 9743.3

rusty_pool 0.4.3

Windows MacOS Linux Windows*
6342.2 4444.6 7962.0 8564.93

* 在测试 0.5.1 时,Windows 的性能似乎明显较差,因此重新计算了 rusty_pool 的早期版本的结果,并且发现这些结果也比最初记录的要差,这可能是由于外部影响(例如,后台任务消耗了大量 CPU 时间,尽管测试在实时优先级下重试,但结果相似)。由于 rust-threadpool 1.8.1 的结果似乎与上次记录的相似,因此未完全重新计算这些结果。

测试 3:最坏情况

此测试用例突出了 rusty_pool 的上述最坏情况,即池被空任务轰炸。由于工作者在完成任务后递增空闲计数器,并且任务实际上立即执行,因此空闲计数器的递增与循环中读取计数器的下一个 execute() 调用同时发生。工作者数量越多,竞争越激烈,性能越差。

fn main() {
    let now = std::time::Instant::now();

    let pool = rusty_pool::Builder::new().core_size(10).max_size(10).build();
    //let pool = threadpool::ThreadPool::new(10);

    for _ in 0..10000000 {
        pool.execute(|| {});
    }

    let millis = now.elapsed().as_millis();
    println!("millis: {}", millis);
}

rusty_pool 0.5.1

Windows MacOS Linux
1967.93 698.8 2150.0

rusty_pool 0.5.0

Windows MacOS Linux
1991.6 679.93 2175.1

rust-threadpool 1.8.1

Windows MacOS Linux
980.33 1224.6 677.0

rusty_pool 0.4.3

Windows MacOS Linux
2016.8 683.13 2175.1

奇怪的是,macOS 在这个情况下极大地青睐 rusty_pool,而 Windows 和 Linux 则青睐 rust-threadpool。然而,这种情况在现实世界场景中几乎不会发生。在所有其他测试场景中,当提交任务时,rusty_pool 的表现更好,其中 MacOS 在有大量竞争的情况下似乎占优势,但在其他情况下则落后,这可能是由于特定测试设备硬件较弱。Linux 在竞争很小或没有竞争的情况下似乎表现最好,但在竞争激烈时表现最差。

依赖关系

~1MB
~23K SLoC