#非阻塞 #无锁 #垃圾回收 #原子操作

wasserglas

一个具有自动重新连接功能的固定大小线程安全对象池

1 个不稳定版本

0.1.0 2021年5月11日

#1045 in 并发

MIT/Apache

24KB
365

Wasserglas

License Cargo Documentation

Wasserglas 是一个具有自动重新连接功能的固定大小线程安全对象池。

对象池的目标是重用昂贵或频繁分配的对象。它是在处理大量对象时编写的,其中每个作业的结果都存储在一个大型结构中,该结构是 a) 初始化成本高昂,b) rayon 工作者的作业窃取性质可能创建过多的此类结构,超出 RAM。

这最初是 CJP10 的 object-pool 的分支,但由于固定大小改变了同步语义,我决定创建一个新的 crate。

用法

[dependencies]
object-pool = "0.1"
use wasserglas::pool;

示例

创建池

池以一定的容量创建。池中的对象数量应该与使用的线程数量大致相同,这样线程就不会等待对象可用。如果池中的对象少于尝试从中获取对象的线程数量,则线程将阻塞,直到有对象可用。

示例池具有 16 个 Vec<u8>

let capacity: usize = 16;
let pool: Pool<Vec<u32>> = Pool::new(capacity);

for _ in 0..capacity {
    assert!(pool.push(Vec::new()).is_ok());
}

// Pushing another object exceeding the capacity returns the object in error position.
assert_eq!(pool.push(Vec::new()), Err(Vec::new()));

// Pulling an object from the pool and dropping it reattaches it to the pool
assert_eq!(pool.n_available(), 16);
assert_eq!(pool.len(), 16);

let vec = pool.pull();

assert_eq!(pool.n_available(), 15);
assert_eq!(pool.len(), 16);

std::me::drop(vec);

assert_eq!(pool.n_available(), 16);
assert_eq!(pool.len(), 16);

// Detaching an object removes it from the pool permanently
let vec = pool.pull();

assert_eq!(pool.n_available(), 15);
assert_eq!(pool.len(), 16);

let vec = vec.detach();

assert_eq!(pool.n_available(), 15);
assert_eq!(pool.len(), 15);

使用池

由于池的预期用途是用于多线程环境,通常应该将其包装在 Arc 中

use std::sync::Arc;

use ray::prelude::*;

let num_threads = rayon::current_num_threads();
let pool: Arc<Pool<Vec<u32>>> = Arc::new(Pool::new(num_threads));
for _ in 0..num_threads {
    assert!(pool.push(Vec::new()).is_ok());
}

data.par_iter().for_each_init(|| pool.clone(), |pool, item| {
    let mut buffer = pool.pull();
    perform_calculation(&item, &mut buffer);
    // Upon finishing, the buffer is collected and put into the pool.
});

如果池对象的构建应该分散到所有线程,而不是按顺序预加载

data.par_iter().for_each_init(|| pool.clone(), |pool, item| {
    let mut buffer = pool.pull_or_else(|| Vec::new());
    perform_calculation(&item, &mut buffer);

    // The buffer needs to be explicitly cleared if that's required.
    buffer.clear();
});

依赖项

~480–790KB
~13K SLoC