27个稳定版本
新 2.1.16 | 2024年8月16日 |
---|---|
2.1.6 | 2024年7月28日 |
2.0.18 | 2024年3月15日 |
2.0.7 |
|
0.3.1 |
|
#15 in 并发
508,737 每月下载量
在 327 个Crate中使用 (直接使用 27)
1MB
17K SLoC
可扩展并发容器
一组用于并发和异步编程的高性能容器和实用工具。
特性
- 阻塞和同步方法的异步对应方法。
Loom
和Serde
支持:features = ["loom", "serde"]
。- 近线性可扩展性。
- 没有自旋锁和忙等待循环。
- 使用SIMD查找并行扫描多个条目 [^note]。
[^note]: 仅在启用相应目标特性时使用高级SIMD指令,例如,-C target_feature=+avx2
。
并发和异步容器
HashMap
是一个并发和异步的哈希表。HashSet
是一个并发和异步的哈希集。HashIndex
是一个读取优化的并发和异步哈希表。HashCache
是一个由HashMap
支持的32路关联缓存。TreeIndex
是一个读取优化的并发和异步B+树。
并发编程实用工具
LinkedList
是一个实现无锁并发单链表的类型特质。Queue
是一个并发无锁先进先出容器。Stack
是一个并发无锁后进先出容器。Bag
是一个并发无锁无序不可见容器。
HashMap
HashMap
是一个并发哈希表,针对高并发的写密集型工作负载进行了优化。 HashMap
采用无锁栈结构,由桶数组组成。桶数组由 sdd
管理,因此可以无锁访问并实现非阻塞的容器大小调整。每个桶是一个固定大小的条目数组,由特殊的读写锁保护,提供阻塞和非阻塞的访问方法。
锁定行为
条目访问:细粒度锁定
对条目的读写访问由包含条目的桶中的读写锁进行序列化。没有容器级别的锁,因此容器越大,桶级别的锁竞争的可能性就越低。
调整大小:无锁
HashMap
的调整大小是完全无锁和非阻塞的;调整大小不会阻塞容器中其他读写访问或调整大小的尝试。 调整大小类似于将新的桶数组推入无锁栈。在将来的容器访问中,旧桶数组中的每个条目都将逐步重新定位到新桶数组中,并且当旧桶数组变为空时,最终将其丢弃。
示例
如果键是唯一的,则可以插入条目。插入的条目可以同步或异步地更新、读取和删除。
use scc::HashMap;
let hashmap: HashMap<u64, u32> = HashMap::default();
assert!(hashmap.insert(1, 0).is_ok());
assert!(hashmap.insert(1, 1).is_err());
assert_eq!(hashmap.upsert(1, 1).unwrap(), 0);
assert_eq!(hashmap.update(&1, |_, v| { *v = 3; *v }).unwrap(), 3);
assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 3);
assert_eq!(hashmap.remove(&1).unwrap(), (1, 3));
hashmap.entry(7).or_insert(17);
assert_eq!(hashmap.read(&7, |_, v| *v).unwrap(), 17);
let future_insert = hashmap.insert_async(2, 1);
let future_remove = hashmap.remove_async(&1);
如果工作流程复杂,HashMap
的 Entry
API 很有用。
use scc::HashMap;
let hashmap: HashMap<u64, u32> = HashMap::default();
hashmap.entry(3).or_insert(7);
assert_eq!(hashmap.read(&3, |_, v| *v), Some(7));
hashmap.entry(4).and_modify(|v| { *v += 1 }).or_insert(5);
assert_eq!(hashmap.read(&4, |_, v| *v), Some(5));
let future_entry = hashmap.entry_async(3);
HashMap
没有提供 Iterator
,因为无法将 Iterator::Item
的生命周期限制在 Iterator 中。可以通过依赖内部可变性来规避这种限制,例如让返回的引用持有锁,但如果不正确使用,则很容易导致死锁,并且频繁获取锁可能会影响性能。因此,没有实现 Iterator
,而是提供了多种方法来同步或异步地遍历条目:any
、any_async
、prune
、prune_async
、retain
、retain_async
、scan
、scan_async
、OccupiedEntry::next
和 OccupiedEntry::next_async
。
use scc::HashMap;
let hashmap: HashMap<u64, u32> = HashMap::default();
assert!(hashmap.insert(1, 0).is_ok());
assert!(hashmap.insert(2, 1).is_ok());
// Entries can be modified or removed via `retain`.
let mut acc = 0;
hashmap.retain(|k, v_mut| { acc += *k; *v_mut = 2; true });
assert_eq!(acc, 3);
assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 2);
assert_eq!(hashmap.read(&2, |_, v| *v).unwrap(), 2);
// `any` returns `true` as soon as an entry satisfying the predicate is found.
assert!(hashmap.insert(3, 2).is_ok());
assert!(hashmap.any(|k, _| *k == 3));
// Multiple entries can be removed through `retain`.
hashmap.retain(|k, v| *k == 1 && *v == 2);
// `hash_map::OccupiedEntry` also can return the next closest occupied entry.
let first_entry = hashmap.first_entry();
assert!(first_entry.is_some());
let second_entry = first_entry.and_then(|e| e.next());
assert!(second_entry.is_none());
// Asynchronous iteration over entries using `scan_async`.
let future_scan = hashmap.scan_async(|k, v| println!("{k} {v}"));
HashSet
HashSet
是 HashMap
的一个特殊版本,其值类型为 ()
。
示例
大多数 HashSet
方法与 HashMap
相同,只是它们不接收值参数,并且一些用于值修改的 HashMap
方法在 HashSet
中没有实现。
use scc::HashSet;
let hashset: HashSet<u64> = HashSet::default();
assert!(hashset.read(&1, |_| true).is_none());
assert!(hashset.insert(1).is_ok());
assert!(hashset.read(&1, |_| true).unwrap());
let future_insert = hashset.insert_async(2);
let future_remove = hashset.remove_async(&1);
HashIndex
HashIndex
是 HashMap
的读取优化版本。在 HashIndex
中,不仅桶数组的内存由 sdd
管理着,而且条目桶的内存也受到 sdd
的保护,从而实现了对单个条目无锁的读取访问。
条目生命周期
HashIndex
不会立即丢弃删除的条目,而是在以下任一条件满足时才会进行丢弃。
Epoch
达到桶中上次删除条目后的下一代,并且桶被写入访问。HashIndex
被清空或调整大小。- 充满删除条目的桶占用了50%的容量。
这些条件并不能保证删除的条目在特定时间内被丢弃,因此如果工作负载以写入为主且条目大小较大,那么 HashIndex
可能不是最佳选择。
示例
peek
和 peek_with
方法是完全无锁的。
use scc::HashIndex;
let hashindex: HashIndex<u64, u32> = HashIndex::default();
assert!(hashindex.insert(1, 0).is_ok());
// `peek` and `peek_with` are lock-free.
assert_eq!(hashindex.peek_with(&1, |_, v| *v).unwrap(), 0);
let future_insert = hashindex.insert_async(2, 1);
let future_remove = hashindex.remove_if_async(&1, |_| true);
HashIndex
的 Entry
API 可以用于就地更新条目。
use scc::HashIndex;
let hashindex: HashIndex<u64, u32> = HashIndex::default();
assert!(hashindex.insert(1, 1).is_ok());
if let Some(mut o) = hashindex.get(&1) {
// Create a new version of the entry.
o.update(2);
};
if let Some(mut o) = hashindex.get(&1) {
// Update the entry in-place.
unsafe { *o.get_mut() = 3; }
};
为 HashIndex
实现了 Iterator
,因为任何派生的引用只要关联的 ebr::Guard
存活,就可以存活。
use scc::ebr::Guard;
use scc::HashIndex;
let hashindex: HashIndex<u64, u32> = HashIndex::default();
assert!(hashindex.insert(1, 0).is_ok());
// Existing values can be replaced with a new one.
hashindex.get(&1).unwrap().update(1);
let guard = Guard::new();
// An `Guard` has to be supplied to `iter`.
let mut iter = hashindex.iter(&guard);
// The derived reference can live as long as `guard`.
let entry_ref = iter.next().unwrap();
assert_eq!(iter.next(), None);
drop(hashindex);
// The entry can be read after `hashindex` is dropped.
assert_eq!(entry_ref, (&1, &1));
HashCache
HashCache
是一个基于 HashMap
实现的32路关联并发缓存。与 HashCache
不同,它不跟踪整个缓存中最少使用过的条目,而是每个桶维护一个双链表,用于存储已占用条目,以便在访问条目时更新,以跟踪桶内最少使用的条目。
示例
当一个桶满时,新条目被插入,最少使用的条目将被淘汰。
use scc::HashCache;
let hashcache: HashCache<u64, u32> = HashCache::with_capacity(100, 2000);
/// The capacity cannot exceed the maximum capacity.
assert_eq!(hashcache.capacity_range(), 128..=2048);
/// If the bucket corresponding to `1` or `2` is full, the LRU entry will be evicted.
assert!(hashcache.put(1, 0).is_ok());
assert!(hashcache.put(2, 0).is_ok());
/// `1` becomes the most recently accessed entry in the bucket.
assert!(hashcache.get(&1).is_some());
/// An entry can be normally removed.
assert_eq!(hashcache.remove(&2).unwrap(), (2, 0));
TreeIndex
TreeIndex
是一种B+树变体,优化了读取操作。由于 sdd
保护了各个条目使用的内存,因此可以实现对它们的无锁读取访问。
锁定行为
读取访问总是无锁且非阻塞的。只要不需要进行结构变更,对条目的写入访问也是无锁和非阻塞的。然而,当节点由于写入操作而被分割或合并时,受影响的范围内对键的其它写入操作将被阻塞。
条目生命周期
TreeIndex
不会立即丢弃删除的条目,而是在叶节点被清空或分割时才会进行丢弃,这使得 TreeIndex
在工作负载以写入为主时不是一个最佳选择。
示例
如果键是唯一的,可以插入条目,然后读取,最后删除。只有在内部节点分割或合并时才会获取或等待锁。
use scc::TreeIndex;
let treeindex: TreeIndex<u64, u32> = TreeIndex::new();
assert!(treeindex.insert(1, 2).is_ok());
// `peek` and `peek_with` are lock-free.
assert_eq!(treeindex.peek_with(&1, |_, v| *v).unwrap(), 2);
assert!(treeindex.remove(&1));
let future_insert = treeindex.insert_async(2, 3);
let future_remove = treeindex.remove_if_async(&1, |v| *v == 2);
可以无锁地进行条目扫描。
use scc::TreeIndex;
use sdd::Guard;
let treeindex: TreeIndex<u64, u32> = TreeIndex::new();
assert!(treeindex.insert(1, 10).is_ok());
assert!(treeindex.insert(2, 11).is_ok());
assert!(treeindex.insert(3, 13).is_ok());
let guard = Guard::new();
// `visitor` iterates over entries without acquiring a lock.
let mut visitor = treeindex.iter(&guard);
assert_eq!(visitor.next().unwrap(), (&1, &10));
assert_eq!(visitor.next().unwrap(), (&2, &11));
assert_eq!(visitor.next().unwrap(), (&3, &13));
assert!(visitor.next().is_none());
可以扫描特定范围的键。
use scc::ebr::Guard;
use scc::TreeIndex;
let treeindex: TreeIndex<u64, u32> = TreeIndex::new();
for i in 0..10 {
assert!(treeindex.insert(i, 10).is_ok());
}
let guard = Guard::new();
assert_eq!(treeindex.range(1..1, &guard).count(), 0);
assert_eq!(treeindex.range(4..8, &guard).count(), 4);
assert_eq!(treeindex.range(4..=8, &guard).count(), 5);
Bag
Bag
是一个无锁并发非顺序容器。 Bag
完全透明,不允许访问包含的实例,直到它们被弹出。 Bag
特别高效,如果包含的实例数量可以保持在 ARRAY_LEN (默认: usize::BITS / 2)
示例
use scc::Bag;
let bag: Bag<usize> = Bag::default();
bag.push(1);
assert!(!bag.is_empty());
assert_eq!(bag.pop(), Some(1));
assert!(bag.is_empty());
队列
示例
use scc::Queue;
let queue: Queue<usize> = Queue::default();
queue.push(1);
assert!(queue.push_if(2, |e| e.map_or(false, |x| **x == 1)).is_ok());
assert!(queue.push_if(3, |e| e.map_or(false, |x| **x == 1)).is_err());
assert_eq!(queue.pop().map(|e| **e), Some(1));
assert_eq!(queue.pop().map(|e| **e), Some(2));
assert!(queue.pop().is_none());
栈
示例
use scc::Stack;
let stack: Stack<usize> = Stack::default();
stack.push(1);
stack.push(2);
assert_eq!(stack.pop().map(|e| **e), Some(2));
assert_eq!(stack.pop().map(|e| **e), Some(1));
assert!(stack.pop().is_none());
链表
LinkedList
是一个类型特性,它实现了由 sdd
支持的无锁并发单链表操作。它还提供了一个方法来标记链表中的一个条目,以表示用户定义的状态。
示例
use scc::ebr::{AtomicShared, Guard, Shared};
use scc::LinkedList;
use std::sync::atomic::Ordering::Relaxed;
#[derive(Default)]
struct L(AtomicShared<L>, usize);
impl LinkedList for L {
fn link_ref(&self) -> &AtomicShared<L> {
&self.0
}
}
let guard = Guard::new();
let head: L = L::default();
let tail: Shared<L> = Shared::new(L(AtomicShared::null(), 1));
// A new entry is pushed.
assert!(head.push_back(tail.clone(), false, Relaxed, &guard).is_ok());
assert!(!head.is_marked(Relaxed));
// Users can mark a flag on an entry.
head.mark(Relaxed);
assert!(head.is_marked(Relaxed));
// `next_ptr` traverses the linked list.
let next_ptr = head.next_ptr(Relaxed, &guard);
assert_eq!(next_ptr.as_ref().unwrap().1, 1);
// Once `tail` is deleted, it becomes invisible.
tail.delete_self(Relaxed);
assert!(head.next_ptr(Relaxed, &guard).is_null());
性能
HashMap
尾部延迟
在 Apple M2 Max 上,1048576 次插入操作的延迟分布的预期尾部延迟从 180 微秒到 200 微秒不等。
HashMap
和 HashIndex
吞吐量
- Apple M2 Max(12 核)上的结果.
- Intel Xeon(32 核,avx2)上的结果.
- 谨慎解释结果,因为基准测试通常不能代表真实世界的工作负载。
变更日志
依赖项
~0.1–25MB
~337K SLoC