1 个不稳定版本
0.1.0 | 2021年5月11日 |
---|
#1045 in 并发
24KB
365 行
Wasserglas
Wasserglas 是一个具有自动重新连接功能的固定大小线程安全对象池。
对象池的目标是重用昂贵或频繁分配的对象。它是在处理大量对象时编写的,其中每个作业的结果都存储在一个大型结构中,该结构是 a) 初始化成本高昂,b) rayon 工作者的作业窃取性质可能创建过多的此类结构,超出 RAM。
这最初是 CJP10 的 object-pool 的分支,但由于固定大小改变了同步语义,我决定创建一个新的 crate。
用法
[dependencies]
object-pool = "0.1"
use wasserglas::pool;
示例
创建池
池以一定的容量创建。池中的对象数量应该与使用的线程数量大致相同,这样线程就不会等待对象可用。如果池中的对象少于尝试从中获取对象的线程数量,则线程将阻塞,直到有对象可用。
示例池具有 16 个 Vec<u8>
let capacity: usize = 16;
let pool: Pool<Vec<u32>> = Pool::new(capacity);
for _ in 0..capacity {
assert!(pool.push(Vec::new()).is_ok());
}
// Pushing another object exceeding the capacity returns the object in error position.
assert_eq!(pool.push(Vec::new()), Err(Vec::new()));
// Pulling an object from the pool and dropping it reattaches it to the pool
assert_eq!(pool.n_available(), 16);
assert_eq!(pool.len(), 16);
let vec = pool.pull();
assert_eq!(pool.n_available(), 15);
assert_eq!(pool.len(), 16);
std::me::drop(vec);
assert_eq!(pool.n_available(), 16);
assert_eq!(pool.len(), 16);
// Detaching an object removes it from the pool permanently
let vec = pool.pull();
assert_eq!(pool.n_available(), 15);
assert_eq!(pool.len(), 16);
let vec = vec.detach();
assert_eq!(pool.n_available(), 15);
assert_eq!(pool.len(), 15);
使用池
由于池的预期用途是用于多线程环境,通常应该将其包装在 Arc 中
use std::sync::Arc;
use ray::prelude::*;
let num_threads = rayon::current_num_threads();
let pool: Arc<Pool<Vec<u32>>> = Arc::new(Pool::new(num_threads));
for _ in 0..num_threads {
assert!(pool.push(Vec::new()).is_ok());
}
data.par_iter().for_each_init(|| pool.clone(), |pool, item| {
let mut buffer = pool.pull();
perform_calculation(&item, &mut buffer);
// Upon finishing, the buffer is collected and put into the pool.
});
如果池对象的构建应该分散到所有线程,而不是按顺序预加载
data.par_iter().for_each_init(|| pool.clone(), |pool, item| {
let mut buffer = pool.pull_or_else(|| Vec::new());
perform_calculation(&item, &mut buffer);
// The buffer needs to be explicitly cleared if that's required.
buffer.clear();
});
依赖项
~480–790KB
~13K SLoC