#hash-map #async #cache #tree

scc

高性能的并发和异步编程容器和实用工具

27个稳定版本

2.1.16 2024年8月16日
2.1.6 2024年7月28日
2.0.18 2024年3月15日
2.0.7 2023年12月22日
0.3.1 2020年11月27日

#15 in 并发

Download history 58941/week @ 2024-05-03 66716/week @ 2024-05-10 63559/week @ 2024-05-17 66618/week @ 2024-05-24 76597/week @ 2024-05-31 85350/week @ 2024-06-07 87849/week @ 2024-06-14 90350/week @ 2024-06-21 89245/week @ 2024-06-28 110808/week @ 2024-07-05 102593/week @ 2024-07-12 111139/week @ 2024-07-19 123521/week @ 2024-07-26 120552/week @ 2024-08-02 129877/week @ 2024-08-09 110340/week @ 2024-08-16

508,737 每月下载量
327 个Crate中使用 (直接使用 27)

Apache-2.0

1MB
17K SLoC

可扩展并发容器

Cargo Crates.io GitHub Workflow Status

一组用于并发和异步编程的高性能容器和实用工具。

特性

  • 阻塞和同步方法的异步对应方法。
  • LoomSerde 支持: features = ["loom", "serde"]
  • 近线性可扩展性。
  • 没有自旋锁和忙等待循环。
  • 使用SIMD查找并行扫描多个条目 [^note]。

[^note]: 仅在启用相应目标特性时使用高级SIMD指令,例如,-C target_feature=+avx2

并发和异步容器

  • HashMap 是一个并发和异步的哈希表。
  • HashSet 是一个并发和异步的哈希集。
  • HashIndex 是一个读取优化的并发和异步哈希表。
  • HashCache 是一个由 HashMap 支持的32路关联缓存。
  • TreeIndex 是一个读取优化的并发和异步B+树。

并发编程实用工具

  • LinkedList 是一个实现无锁并发单链表的类型特质。
  • Queue 是一个并发无锁先进先出容器。
  • Stack 是一个并发无锁后进先出容器。
  • Bag 是一个并发无锁无序不可见容器。

HashMap

HashMap 是一个并发哈希表,针对高并发的写密集型工作负载进行了优化。 HashMap 采用无锁栈结构,由桶数组组成。桶数组由 sdd 管理,因此可以无锁访问并实现非阻塞的容器大小调整。每个桶是一个固定大小的条目数组,由特殊的读写锁保护,提供阻塞和非阻塞的访问方法。

锁定行为

条目访问:细粒度锁定

对条目的读写访问由包含条目的桶中的读写锁进行序列化。没有容器级别的锁,因此容器越大,桶级别的锁竞争的可能性就越低。

调整大小:无锁

HashMap 的调整大小是完全无锁和非阻塞的;调整大小不会阻塞容器中其他读写访问或调整大小的尝试。 调整大小类似于将新的桶数组推入无锁栈。在将来的容器访问中,旧桶数组中的每个条目都将逐步重新定位到新桶数组中,并且当旧桶数组变为空时,最终将其丢弃。

示例

如果键是唯一的,则可以插入条目。插入的条目可以同步或异步地更新、读取和删除。

use scc::HashMap;

let hashmap: HashMap<u64, u32> = HashMap::default();

assert!(hashmap.insert(1, 0).is_ok());
assert!(hashmap.insert(1, 1).is_err());
assert_eq!(hashmap.upsert(1, 1).unwrap(), 0);
assert_eq!(hashmap.update(&1, |_, v| { *v = 3; *v }).unwrap(), 3);
assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 3);
assert_eq!(hashmap.remove(&1).unwrap(), (1, 3));

hashmap.entry(7).or_insert(17);
assert_eq!(hashmap.read(&7, |_, v| *v).unwrap(), 17);

let future_insert = hashmap.insert_async(2, 1);
let future_remove = hashmap.remove_async(&1);

如果工作流程复杂,HashMapEntry API 很有用。

use scc::HashMap;

let hashmap: HashMap<u64, u32> = HashMap::default();

hashmap.entry(3).or_insert(7);
assert_eq!(hashmap.read(&3, |_, v| *v), Some(7));

hashmap.entry(4).and_modify(|v| { *v += 1 }).or_insert(5);
assert_eq!(hashmap.read(&4, |_, v| *v), Some(5));

let future_entry = hashmap.entry_async(3);

HashMap 没有提供 Iterator,因为无法将 Iterator::Item 的生命周期限制在 Iterator 中。可以通过依赖内部可变性来规避这种限制,例如让返回的引用持有锁,但如果不正确使用,则很容易导致死锁,并且频繁获取锁可能会影响性能。因此,没有实现 Iterator,而是提供了多种方法来同步或异步地遍历条目:anyany_asyncpruneprune_asyncretainretain_asyncscanscan_asyncOccupiedEntry::nextOccupiedEntry::next_async

use scc::HashMap;

let hashmap: HashMap<u64, u32> = HashMap::default();

assert!(hashmap.insert(1, 0).is_ok());
assert!(hashmap.insert(2, 1).is_ok());

// Entries can be modified or removed via `retain`.
let mut acc = 0;
hashmap.retain(|k, v_mut| { acc += *k; *v_mut = 2; true });
assert_eq!(acc, 3);
assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 2);
assert_eq!(hashmap.read(&2, |_, v| *v).unwrap(), 2);

// `any` returns `true` as soon as an entry satisfying the predicate is found.
assert!(hashmap.insert(3, 2).is_ok());
assert!(hashmap.any(|k, _| *k == 3));

// Multiple entries can be removed through `retain`.
hashmap.retain(|k, v| *k == 1 && *v == 2);

// `hash_map::OccupiedEntry` also can return the next closest occupied entry.
let first_entry = hashmap.first_entry();
assert!(first_entry.is_some());
let second_entry = first_entry.and_then(|e| e.next());
assert!(second_entry.is_none());

// Asynchronous iteration over entries using `scan_async`.
let future_scan = hashmap.scan_async(|k, v| println!("{k} {v}"));

HashSet

HashSetHashMap 的一个特殊版本,其值类型为 ()

示例

大多数 HashSet 方法与 HashMap 相同,只是它们不接收值参数,并且一些用于值修改的 HashMap 方法在 HashSet 中没有实现。

use scc::HashSet;

let hashset: HashSet<u64> = HashSet::default();

assert!(hashset.read(&1, |_| true).is_none());
assert!(hashset.insert(1).is_ok());
assert!(hashset.read(&1, |_| true).unwrap());

let future_insert = hashset.insert_async(2);
let future_remove = hashset.remove_async(&1);

HashIndex

HashIndexHashMap 的读取优化版本。在 HashIndex 中,不仅桶数组的内存由 sdd 管理着,而且条目桶的内存也受到 sdd 的保护,从而实现了对单个条目无锁的读取访问。

条目生命周期

HashIndex 不会立即丢弃删除的条目,而是在以下任一条件满足时才会进行丢弃。

  1. Epoch 达到桶中上次删除条目后的下一代,并且桶被写入访问。
  2. HashIndex 被清空或调整大小。
  3. 充满删除条目的桶占用了50%的容量。

这些条件并不能保证删除的条目在特定时间内被丢弃,因此如果工作负载以写入为主且条目大小较大,那么 HashIndex 可能不是最佳选择。

示例

peekpeek_with 方法是完全无锁的。

use scc::HashIndex;

let hashindex: HashIndex<u64, u32> = HashIndex::default();

assert!(hashindex.insert(1, 0).is_ok());

// `peek` and `peek_with` are lock-free.
assert_eq!(hashindex.peek_with(&1, |_, v| *v).unwrap(), 0);

let future_insert = hashindex.insert_async(2, 1);
let future_remove = hashindex.remove_if_async(&1, |_| true);

HashIndexEntry API 可以用于就地更新条目。

use scc::HashIndex;

let hashindex: HashIndex<u64, u32> = HashIndex::default();
assert!(hashindex.insert(1, 1).is_ok());

if let Some(mut o) = hashindex.get(&1) {
    // Create a new version of the entry.
    o.update(2);
};

if let Some(mut o) = hashindex.get(&1) {
    // Update the entry in-place.
    unsafe { *o.get_mut() = 3; }
};

HashIndex 实现了 Iterator,因为任何派生的引用只要关联的 ebr::Guard 存活,就可以存活。

use scc::ebr::Guard;
use scc::HashIndex;

let hashindex: HashIndex<u64, u32> = HashIndex::default();

assert!(hashindex.insert(1, 0).is_ok());

// Existing values can be replaced with a new one.
hashindex.get(&1).unwrap().update(1);

let guard = Guard::new();

// An `Guard` has to be supplied to `iter`.
let mut iter = hashindex.iter(&guard);

// The derived reference can live as long as `guard`.
let entry_ref = iter.next().unwrap();
assert_eq!(iter.next(), None);

drop(hashindex);

// The entry can be read after `hashindex` is dropped.
assert_eq!(entry_ref, (&1, &1));

HashCache

HashCache 是一个基于 HashMap 实现的32路关联并发缓存。与 HashCache 不同,它不跟踪整个缓存中最少使用过的条目,而是每个桶维护一个双链表,用于存储已占用条目,以便在访问条目时更新,以跟踪桶内最少使用的条目。

示例

当一个桶满时,新条目被插入,最少使用的条目将被淘汰。

use scc::HashCache;

let hashcache: HashCache<u64, u32> = HashCache::with_capacity(100, 2000);

/// The capacity cannot exceed the maximum capacity.
assert_eq!(hashcache.capacity_range(), 128..=2048);

/// If the bucket corresponding to `1` or `2` is full, the LRU entry will be evicted.
assert!(hashcache.put(1, 0).is_ok());
assert!(hashcache.put(2, 0).is_ok());

/// `1` becomes the most recently accessed entry in the bucket.
assert!(hashcache.get(&1).is_some());

/// An entry can be normally removed.
assert_eq!(hashcache.remove(&2).unwrap(), (2, 0));

TreeIndex

TreeIndex 是一种B+树变体,优化了读取操作。由于 sdd 保护了各个条目使用的内存,因此可以实现对它们的无锁读取访问。

锁定行为

读取访问总是无锁且非阻塞的。只要不需要进行结构变更,对条目的写入访问也是无锁和非阻塞的。然而,当节点由于写入操作而被分割或合并时,受影响的范围内对键的其它写入操作将被阻塞。

条目生命周期

TreeIndex 不会立即丢弃删除的条目,而是在叶节点被清空或分割时才会进行丢弃,这使得 TreeIndex 在工作负载以写入为主时不是一个最佳选择。

示例

如果键是唯一的,可以插入条目,然后读取,最后删除。只有在内部节点分割或合并时才会获取或等待锁。

use scc::TreeIndex;

let treeindex: TreeIndex<u64, u32> = TreeIndex::new();

assert!(treeindex.insert(1, 2).is_ok());

// `peek` and `peek_with` are lock-free.
assert_eq!(treeindex.peek_with(&1, |_, v| *v).unwrap(), 2);
assert!(treeindex.remove(&1));

let future_insert = treeindex.insert_async(2, 3);
let future_remove = treeindex.remove_if_async(&1, |v| *v == 2);

可以无锁地进行条目扫描。

use scc::TreeIndex;
use sdd::Guard;

let treeindex: TreeIndex<u64, u32> = TreeIndex::new();

assert!(treeindex.insert(1, 10).is_ok());
assert!(treeindex.insert(2, 11).is_ok());
assert!(treeindex.insert(3, 13).is_ok());

let guard = Guard::new();

// `visitor` iterates over entries without acquiring a lock.
let mut visitor = treeindex.iter(&guard);
assert_eq!(visitor.next().unwrap(), (&1, &10));
assert_eq!(visitor.next().unwrap(), (&2, &11));
assert_eq!(visitor.next().unwrap(), (&3, &13));
assert!(visitor.next().is_none());

可以扫描特定范围的键。

use scc::ebr::Guard;
use scc::TreeIndex;

let treeindex: TreeIndex<u64, u32> = TreeIndex::new();

for i in 0..10 {
    assert!(treeindex.insert(i, 10).is_ok());
}

let guard = Guard::new();

assert_eq!(treeindex.range(1..1, &guard).count(), 0);
assert_eq!(treeindex.range(4..8, &guard).count(), 4);
assert_eq!(treeindex.range(4..=8, &guard).count(), 5);

Bag

Bag 是一个无锁并发非顺序容器。 Bag 完全透明,不允许访问包含的实例,直到它们被弹出。 Bag 特别高效,如果包含的实例数量可以保持在 ARRAY_LEN (默认: usize::BITS / 2)

示例

use scc::Bag;

let bag: Bag<usize> = Bag::default();

bag.push(1);
assert!(!bag.is_empty());
assert_eq!(bag.pop(), Some(1));
assert!(bag.is_empty());

队列

队列 是一个由 sdd 支持的并发无锁先进先出容器。

示例

use scc::Queue;

let queue: Queue<usize> = Queue::default();

queue.push(1);
assert!(queue.push_if(2, |e| e.map_or(false, |x| **x == 1)).is_ok());
assert!(queue.push_if(3, |e| e.map_or(false, |x| **x == 1)).is_err());
assert_eq!(queue.pop().map(|e| **e), Some(1));
assert_eq!(queue.pop().map(|e| **e), Some(2));
assert!(queue.pop().is_none());

Stack 是一个由 sdd 支持的并发无锁后进先出容器。

示例

use scc::Stack;

let stack: Stack<usize> = Stack::default();

stack.push(1);
stack.push(2);
assert_eq!(stack.pop().map(|e| **e), Some(2));
assert_eq!(stack.pop().map(|e| **e), Some(1));
assert!(stack.pop().is_none());

链表

LinkedList 是一个类型特性,它实现了由 sdd 支持的无锁并发单链表操作。它还提供了一个方法来标记链表中的一个条目,以表示用户定义的状态。

示例

use scc::ebr::{AtomicShared, Guard, Shared};
use scc::LinkedList;
use std::sync::atomic::Ordering::Relaxed;

#[derive(Default)]
struct L(AtomicShared<L>, usize);
impl LinkedList for L {
    fn link_ref(&self) -> &AtomicShared<L> {
        &self.0
    }
}

let guard = Guard::new();

let head: L = L::default();
let tail: Shared<L> = Shared::new(L(AtomicShared::null(), 1));

// A new entry is pushed.
assert!(head.push_back(tail.clone(), false, Relaxed, &guard).is_ok());
assert!(!head.is_marked(Relaxed));

// Users can mark a flag on an entry.
head.mark(Relaxed);
assert!(head.is_marked(Relaxed));

// `next_ptr` traverses the linked list.
let next_ptr = head.next_ptr(Relaxed, &guard);
assert_eq!(next_ptr.as_ref().unwrap().1, 1);

// Once `tail` is deleted, it becomes invisible.
tail.delete_self(Relaxed);
assert!(head.next_ptr(Relaxed, &guard).is_null());

性能

HashMap 尾部延迟

在 Apple M2 Max 上,1048576 次插入操作的延迟分布的预期尾部延迟从 180 微秒到 200 微秒不等。

HashMapHashIndex 吞吐量

变更日志

依赖项

~0.1–25MB
~337K SLoC