#lock-free-queue #spsc #mpmc #mpsc #spmc

lf-queue

一个无锁的多生产者多消费者无界队列

1个不稳定版本

0.1.0 2021年11月13日

#1114 in 并发

MIT许可证

32KB
357

lf-queue

Crates.io Documentation Build Status MIT licensed

一个无锁的多生产者多消费者无界队列。

示例

[dependencies]
lf-queue = "0.1"

单生产者-单消费者

use lf_queue::Queue;

fn main() {
    const COUNT: usize = 1_000;
    let queue: Queue<usize> = Queue::new();

    for i in 0..COUNT {
        queue.push(i);
    }

    for i in 0..COUNT {
        assert_eq!(i, queue.pop().unwrap());
    }

    assert!(queue.pop().is_none());
}

多生产者-单消费者

use lf_queue::Queue;
use std::thread;

fn main() {
    const COUNT: usize = 1_000;
    const CONCURRENCY: usize = 4;

    let queue: Queue<usize> = Queue::new();

    let ths: Vec<_> = (0..CONCURRENCY)
        .map(|_| {
            let q = queue.clone();
            thread::spawn(move || {
                for i in 0..COUNT {
                    q.push(i);
                }
            })
        })
        .collect();

    for th in ths {
        th.join().unwrap();
    }

    for _ in 0..COUNT * CONCURRENCY {
        assert!(queue.pop().is_some());
    }

    assert!(queue.pop().is_none());
}

单生产者-多消费者

use lf_queue::Queue;
use std::thread;

fn main() {
    const COUNT: usize = 1_000;
    const CONCURRENCY: usize = 4;

    let queue: Queue<usize> = Queue::new();

    for i in 0..COUNT * CONCURRENCY {
        queue.push(i);
    }

    let ths: Vec<_> = (0..CONCURRENCY)
        .map(|_| {
            let q = queue.clone();
            thread::spawn(move || {
                for _ in 0..COUNT {
                    loop {
                        if q.pop().is_some() {
                            break;
                        }
                    }
                }
            })
        })
        .collect();

    for th in ths {
        th.join().unwrap();
    }

    assert!(queue.pop().is_none());
}

多生产者-多消费者

use lf_queue::Queue;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    const COUNT: usize = 1_000;
    const CONCURRENCY: usize = 4;

    let queue: Queue<usize> = Queue::new();
    let items = Arc::new((0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>());

    let ths: Vec<_> = (0..CONCURRENCY)
        .map(|_| {
            let q = queue.clone();
            let its = items.clone();
            thread::spawn(move || {
                for _ in 0..COUNT {
                    let n = loop {
                        if let Some(x) = q.pop() {
                            break x;
                        } else {
                            thread::yield_now();
                        }
                    };
                    its[n].fetch_add(1, Ordering::SeqCst);
                }
            })
        })
        .map(|_| {
            let q = queue.clone();
            thread::spawn(move || {
                for i in 0..COUNT {
                    q.push(i);
                }
            })
        })
        .collect();

    for th in ths {
        th.join().unwrap();
    }

    thread::sleep(std::time::Duration::from_millis(10));

    for c in &*items {
        assert_eq!(c.load(Ordering::SeqCst), CONCURRENCY);
    }

    assert!(queue.pop().is_none());
}

致谢

此Rust中无锁队列的实现受到concurrent-queue crate的启发,旨在用于教育目的。代码文档帮助您了解在Rust中实现并发无锁队列所使用的算法,但可能还不是初学者友好。随着时间的推移,将逐步添加更多详细信息和学习资料。

许可证

本项目采用MIT许可证

贡献

除非您明确说明,否则您有意提交以供包含在本作品中的任何贡献,均应按上述方式许可,不得附加任何额外条款或条件。

请注意,截至目前,我的重点是改进此crate的文档,而不是添加任何额外功能。请在开始任何重大PR之前提出问题并开始讨论。

欢迎贡献。

依赖项

~0–26MB
~328K SLoC