40 个版本 (8 个稳定)
4.1.0 | 2024 年 7 月 18 日 |
---|---|
3.1.4 | 2024 年 3 月 18 日 |
3.1.0 | 2023 年 3 月 24 日 |
3.0.9-alpha | 2022 年 12 月 22 日 |
0.1.1 | 2022 年 11 月 9 日 |
#84 in 并发
每月 131 次下载
170KB
3.5K SLoC
messaging_thread_pool
概述
messaging_thread_pool
提供了一组特性和结构,允许构建一个简单的泛型线程池。
当需要分配的类型具有复杂状态且不可发送/同步时,非常有用。
如果状态可发送和同步,则可能最好使用更传统的线程池,如 rayon。
该类型的实例将分布在线程池的线程中,并在其整个生命周期内与其分配的线程绑定。
因此,实例不需要发送或同步(尽管与它们通信的消息需要)。
然后库基础设施允许根据键将消息路由到特定的实例。
响应消息所需的所有工作都在分配给实例的线程池线程上执行。
然后通过基础设施将响应消息路由回调用者。
它提供了简单的调用方案,易于推理生命周期和可预测的池行为。
该类型需要定义一个消息类型的枚举并提供一些简单特质的实现,以便它可以在线程池中托管。
例如,例如,一个简单类型,它包含一组随机数,如下所示
// define what a pool item looks like
pub struct Randoms {
// pool items require an id so they can be identified within
// the thread pool
pub id: u64,
pub numbers: Vec<u64>,
}
可以托管在线程池中,并通过定义的消息集进行通信,通过为 PoolItem
特质提供实现。
这相当于提供池项的构造函数、一组消息和一个消息处理器
// defining the "api" with which to communicate with the pool item
api_specification!(pool_item: Randoms, api_name: RandomsApi, add_request: RandomsAddRequest,
calls: [
{ call_name: Mean, request: MeanRequest, response: MeanResponse },
{ call_name: Sum, request: SumRequest, response: SumResponse },
]);
// a request needs to contain the id of the targeted pool item
pub struct MeanRequest(pub u64);
// a response contains the results of the operation
pub struct MeanResponse {
pub id: u64,
pub mean: u128,
}
impl PoolItem for Randoms {
type Init = RandomsAddRequest;
type Api = RandomsApi;
type ThreadStartInfo = ();
// this function (from the PoolItem trait) defines what to do
// on receipt of a request and how to respond to it
fn process_message(&mut self, request: &Self::Api)
-> ThreadRequestResponse<Self> {
match request {
// calculate the mean of the contained randoms and
// return the result
RandomsApi::Mean(request) => MeanResponse {
id: request.id(),
mean: self.mean(),
}
.into(),
// calculate the sum of the contained randoms and return
RandomsApi::Sum(request) => SumResponse {
id: request.id(),
sum: self.sum(),
}
.into(),
}
}
}
// a request is defined defined to construct a pool item
pub struct RandomsAddRequest(pub u64);
// ... and the implementation of this function (in the
// PoolItem trait) defines how to use that message to
// construct a new pool item
fn new_pool_item(request: &Self::Init)
-> Result<Self, NewPoolItemError> {
Ok(Randoms::new(request.0))
}
有了这个基础设施,池项就可以使用库提供的结构在固定大小的线程池中托管池项的实例。
use std::iter;
use messaging_thread_pool::{samples::*,
thread_request_response::*,
ThreadPool};
// creates a thread pool with 4 threads and a mechanism
// by which to communicate with the threads in the pool.
// The lifetime of the structs created (the Randoms)
// will be tied to the life of this struct
let thread_pool = ThreadPool::<Randoms>::new(10);
// create a 1000 Randoms across the thread pool by
// sending a thousand add requests.
// The creation of these objects (with the keys 0..1000)
// will be distributed across the 10 threads in the pool.
// Their owning thread will create and store them.
// They will not be dropped until they are either
// requested to be dropped or until the thread pool
// itself is dropped.
thread_pool
.send_and_receive((0..1000u64)
.expect("thread pool to be available")
.map(|i| RandomsAddRequest(i)))
.for_each(|response: AddResponse|
assert!(response.success()));
// now create 1000 messages asking each of them for the sum of
// the Randoms objects contained random numbers
// The message will be routed to the thread to where
// the targeted object resides
// This call will block until all of the work is done and
// the responses returned
let sums: Vec<SumResponse> = thread_pool
.send_and_receive((0..1000u64)
.expect("thread pool to be available")
.map(|i| SumRequest(i)))
.collect();
assert_eq!(1000, sums.len());
// now get the mean of the randoms for object with id 0, this
// will execute on thread 0.
// this call will block until complete
let mean_response_0: MeanResponse = thread_pool
.send_and_receive_once(MeanRequest(0))
.expect("thread pool to be available");
println!("{}", mean_response_0.mean());
// remove object with id 1
// it will be dropped from the thread where it was residing
// freeing up any memory it was using
assert!(thread_pool
.send_and_receive_once(RemovePoolItemRequest(1))
.expect("thread pool to be available")
.success());
// add a new object with id 1000
assert!(thread_pool
.send_and_receive_once(RandomsAddRequest(1000))
.expect("thread pool to be available")
.success());
// all objects are dropped when the thread pool is
// dropped, the worker threads are shutdown and
// joined back the the main thread
drop(thread_pool);
库最初的动机是为了处理具有复杂依赖关系的长生存期对象层次结构,每个对象都需要自己的线程池以避免任何复杂的线程依赖。所有操作都是 CPU 密集型的。
需要注意的是,除非执行的操作非常耗时(>50ms),否则消息基础设施的成本开始变得显著,并开始侵蚀多线程的好处。
依赖关系
~6–8.5MB
~148K SLoC