1个不稳定版本
0.1.0 | 2021年11月13日 |
---|
#1114 in 并发
32KB
357 行
lf-queue
一个无锁的多生产者多消费者无界队列。
示例
[dependencies]
lf-queue = "0.1"
单生产者-单消费者
use lf_queue::Queue;
fn main() {
const COUNT: usize = 1_000;
let queue: Queue<usize> = Queue::new();
for i in 0..COUNT {
queue.push(i);
}
for i in 0..COUNT {
assert_eq!(i, queue.pop().unwrap());
}
assert!(queue.pop().is_none());
}
多生产者-单消费者
use lf_queue::Queue;
use std::thread;
fn main() {
const COUNT: usize = 1_000;
const CONCURRENCY: usize = 4;
let queue: Queue<usize> = Queue::new();
let ths: Vec<_> = (0..CONCURRENCY)
.map(|_| {
let q = queue.clone();
thread::spawn(move || {
for i in 0..COUNT {
q.push(i);
}
})
})
.collect();
for th in ths {
th.join().unwrap();
}
for _ in 0..COUNT * CONCURRENCY {
assert!(queue.pop().is_some());
}
assert!(queue.pop().is_none());
}
单生产者-多消费者
use lf_queue::Queue;
use std::thread;
fn main() {
const COUNT: usize = 1_000;
const CONCURRENCY: usize = 4;
let queue: Queue<usize> = Queue::new();
for i in 0..COUNT * CONCURRENCY {
queue.push(i);
}
let ths: Vec<_> = (0..CONCURRENCY)
.map(|_| {
let q = queue.clone();
thread::spawn(move || {
for _ in 0..COUNT {
loop {
if q.pop().is_some() {
break;
}
}
}
})
})
.collect();
for th in ths {
th.join().unwrap();
}
assert!(queue.pop().is_none());
}
多生产者-多消费者
use lf_queue::Queue;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
const COUNT: usize = 1_000;
const CONCURRENCY: usize = 4;
let queue: Queue<usize> = Queue::new();
let items = Arc::new((0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>());
let ths: Vec<_> = (0..CONCURRENCY)
.map(|_| {
let q = queue.clone();
let its = items.clone();
thread::spawn(move || {
for _ in 0..COUNT {
let n = loop {
if let Some(x) = q.pop() {
break x;
} else {
thread::yield_now();
}
};
its[n].fetch_add(1, Ordering::SeqCst);
}
})
})
.map(|_| {
let q = queue.clone();
thread::spawn(move || {
for i in 0..COUNT {
q.push(i);
}
})
})
.collect();
for th in ths {
th.join().unwrap();
}
thread::sleep(std::time::Duration::from_millis(10));
for c in &*items {
assert_eq!(c.load(Ordering::SeqCst), CONCURRENCY);
}
assert!(queue.pop().is_none());
}
致谢
此Rust中无锁队列的实现受到concurrent-queue
crate的启发,旨在用于教育目的。代码文档帮助您了解在Rust中实现并发无锁队列所使用的算法,但可能还不是初学者友好。随着时间的推移,将逐步添加更多详细信息和学习资料。
许可证
本项目采用MIT许可证。
贡献
除非您明确说明,否则您有意提交以供包含在本作品中的任何贡献,均应按上述方式许可,不得附加任何额外条款或条件。
请注意,截至目前,我的重点是改进此crate的文档,而不是添加任何额外功能。请在开始任何重大PR之前提出问题并开始讨论。
欢迎贡献。
依赖项
~0–26MB
~328K SLoC