9 个版本
0.1.10 | 2023年3月11日 |
---|---|
0.1.9 | 2022年11月6日 |
0.1.8 | 2022年6月18日 |
766 在 并发 中排名
每月 31 次下载
19KB
309 行
scherben-map.rs
为 Rust 提供的并发碎片化 HashMap。
scherben-map 将 HashMap 分割成 N 个碎片,每个碎片都由其自身的读/写锁保护。
示例
使用原子引用计数的 HashMap
use scherben_map;
use std;
fn main() {
let map = scherben_map::HashMap::<&str, &str, /*shard count*/ 8>::new_arc();
map.insert("Hallo", "Welt");
{
let (map_a, map_b) = (map.clone(), map.clone());
let handle_a = std::thread::spawn(move || {
println!("{}", map_a.get_owned("Hallo").unwrap());
});
let handle_b = std::thread::spawn(move || {
println!("{}", map_b.get_owned("Hallo").unwrap());
});
_ = handle_a.join();
_ = handle_b.join();
}
// get all key/value pairs
let pairs = map.pairs();
// get all keys
let keys = map.keys();
}
使用线程池和通道从碎片中提取键/值对。
use crossbeam::{
channel::{bounded, unbounded, Receiver, Sender},
select,
sync::WaitGroup,
};
use scoped_threadpool::Pool;
use std::hash::Hash;
extern crate scoped_threadpool;
#[rustfmt::skip]
fn collect<K: Send + Sync + Clone + 'static, V: Send + Sync + Clone + 'static, const N: usize>(map: &scherben_map::HashMap<K, V, N>, pool: &mut scoped_threadpool::Pool) -> Vec<(K, V)>
where
K: Hash + Eq + scherben_map::IKey<K>,
{
let (result_tx, result_rx): (Sender<Vec<(K, V)>>, Receiver<Vec<(K, V)>>) = unbounded();
{
let result_tx = result_tx.clone();
pool.scoped(move |s| {
let (buffer_tx, buffer_rx): (Sender<Vec<(K, V)>>, Receiver<Vec<(K, V)>>) = bounded(64);
let wg = WaitGroup::new();
{
let buffer_rx = buffer_rx.clone();
s.execute(move || {
let mut result: Vec<(K, V)> = Vec::new();
loop {
select! {
recv(buffer_rx) -> msg => {
match msg {
Ok(value) => result.extend(value),
Err(_) => break,
}
}
}
}
_ = result_tx.send(result);
});
}
{
for shard_slot in map.into_iter() {
let wg = wg.clone();
let shard_handle = shard_slot.clone();
let buffer_tx = buffer_tx.clone();
s.execute(move || {
let mut buffer: Vec<(K, V)> = Vec::new();
let _shard = shard_handle.read();
(*_shard).fill_pairs_into(&mut buffer);
_ = buffer_tx.send(buffer);
drop(wg);
});
}
}
wg.wait();
drop(buffer_rx);
});
}
match result_rx.recv() {
Ok(result) => result,
Err(_) => Vec::new(),
}
}
#[rustfmt::skip]
pub fn keys<K: Send + Sync + Clone + 'static, V: Send + Sync + Clone + 'static, const N: usize>(map: &scherben_map::HashMap<K, V, N>, pool: &mut scoped_threadpool::Pool) -> Vec<K>
where
K: Hash + Eq + scherben_map::IKey<K>,
{
collect(map, pool).iter().map(|x| x.0.clone()).collect()
}
pub fn pairs<K: Send + Sync + Clone + 'static, V: Send + Sync + Clone + 'static, const N: usize>(
map: &scherben_map::HashMap<K, V, N>,
pool: &mut scoped_threadpool::Pool,
) -> Vec<(K, V)>
where
K: Hash + Eq + scherben_map::IKey<K> + Clone,
V: Clone,
{
collect(&map, pool)
}
fn main() {
let map: scherben_map::HashMap<String, i64, 8> = Default::default();
let mut pool = Pool::new(num_cpus::get().try_into().unwrap());
for i in 0..1000 {
let item = format!("Hello, Many Worlds {}!", i);
map.insert(item, i);
}
let map_keys = pairs(&map, &mut pool);
if map_keys.len() != 1000 {
panic!("inconsistent state");
}
}
状态
此库处于开发中。
此库受 concurrent-map 和 sharded 的启发。
依赖
~1.2–6.5MB
~26K SLoC