10 个版本

0.1.11 2023 年 4 月 13 日
0.1.10 2021 年 10 月 30 日

并发 中排名第 122

Download history 20/week @ 2024-04-15 13/week @ 2024-04-22 8/week @ 2024-04-29 9/week @ 2024-05-06 11/week @ 2024-05-13 26/week @ 2024-05-20 8/week @ 2024-05-27 9/week @ 2024-06-03 10/week @ 2024-06-10 18/week @ 2024-06-17 53/week @ 2024-06-24 115/week @ 2024-07-01 238/week @ 2024-07-08 78/week @ 2024-07-15 60/week @ 2024-07-22 49/week @ 2024-07-29

每月下载量 464
用于 3 个 crates (2 个直接使用)

自定义许可证

43KB
581 行代码(不包括注释)

par_iter_sync: 并行迭代器带顺序输出

rust test

类似于 rayon 的 Crate 不提供同步机制。这个 Crate 提供了并行和同步的简单混合。

考虑这种情况,多个线程共享一个缓存,该缓存只能在先前的任务写入后才能读取(例如,任务 4 的读取依赖于任务 1-4 的写入)。

使用 IntoParallelIteratorSync 特征

// in concurrency: task1 write | task2 write | task3 write | task4 write
//                      \_____________\_____________\_____________\
//             task4 read depends on task 1-4 write  \___________
//                                                               \
// in concurrency:              | task2 read  | task3 read  | task4 read

use par_iter_sync::IntoParallelIteratorSync;
use std::sync::{Arc, Mutex};
use std::collections::HashSet;

// there are 100 tasks
let tasks = 0..100;

// an in-memory cache for integers
let cache: Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
let cache_clone = cache.clone();

// iterate through tasks
tasks.into_par_iter_sync(move |task_number| {

    // writes cache (write the integer in cache), in parallel
    cache.lock().unwrap().insert(task_number);
    // return the task number to the next iterator
    Ok(task_number)

}).into_par_iter_sync(move |task_number| { // <- synced to sequential order

    // reads
    assert!(cache_clone.lock().unwrap().contains(&task_number));
    Ok(())

// append a for each to actually run the whole chain
}).for_each(|_| ());

使用注意事项

这个 Crate 被设计为为每个线程克隆闭包捕获的所有资源。为了防止意外的 RAM 使用,您可以使用 Arc 包装大型数据结构(尤其是 Clone 对象的向量)。

顺序一致性

输出顺序保证与上游迭代器相同,但执行顺序不是顺序的。

开销基准测试

平台:Macbook Air(2015 晚期)8GB RAM,Intel Core i5,1.6GHz(2 核心)。

结果

每次运行一百万(1,000,000)个空迭代。

test iter_async::test_par_iter_async::bench_into_par_iter_async
    ... bench: 110,277,577 ns/iter (+/- 28,510,054)

test test_par_iter::bench_into_par_iter_sync
    ... bench: 121,063,787 ns/iter (+/- 103,787,056)

结果

  • 异步迭代器开销 110 ns (+/- 28 ns)
  • 同步迭代器开销 121 ns (+/- 103 ns)

基准测试程序

iter_async

    #[bench]
    fn bench_into_par_iter_async(b: &mut Bencher) {
        b.iter(|| {
            (0..1_000_000).into_par_iter_async(|a| Ok(a)).for_each(|_|{})
        });
    }

iter_sync

    #[bench]
    fn bench_into_par_iter_sync(b: &mut Bencher) {
        b.iter(|| {
            (0..1_000_000).into_par_iter_sync(|a| Ok(a)).for_each(|_|{})
        });
    }

示例

通过链式调用混合同步和并行

use par_iter_sync::IntoParallelIteratorSync;

(0..100).into_par_iter_sync(|i| {
    Ok(i)                     // <~ async execution
}).into_par_iter_sync(|i| { // <- sync order
    Ok(i)                     // <~async execution
}).into_par_iter_sync(|i| { // <- sync order
    Ok(i)                     // <~async execution
});                           // <- sync order

使用 std::iter::IntoIterator 接口

use par_iter_sync::IntoParallelIteratorSync;

let mut count = 0;

// for loop
for i in (0..100).into_par_iter_sync(|i| Ok(i)) {
    assert_eq!(i, count);
    count += 1;
}

// sum
let sum: i32 = (1..=100).into_par_iter_sync(|i| Ok(i)).sum();

// take and collect
let results: Vec<i32> = (0..10).into_par_iter_sync(|i| Ok(i)).take(5).collect();

assert_eq!(sum, 5050);
assert_eq!(results, vec![0, 1, 2, 3, 4])

闭包捕获变量

捕获的变量会自动克隆到每个线程。

use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;

// use `Arc` to save RAM
let resource_captured = Arc::new(vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]);
let len = resource_captured.len();

let result_iter = (0..len).into_par_iter_sync(move |i| {
    // `resource_captured` is moved into the closure
    // and cloned to worker threads.
    let read_from_resource = resource_captured.get(i).unwrap();
    Ok(*read_from_resource)
});

// the result is produced in sequential order
let collected: Vec<i32> = result_iter.collect();
assert_eq!(collected, vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3])

异常时的快速失败

一旦内部函数返回 Err,迭代器就会停止。

use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;
use log::warn;

/// this function returns `Err` when it reads 1000
fn error_at_1000(n: i32) -> Result<i32, ()> {
    if n == 1000 {
        // you may log this error
        warn!("Some Error Occurs");
        Err(())
    } else {
        Ok(n)
    }
}

let results: Vec<i32> = (0..10000).into_par_iter_sync(move |a| {
    Ok(a)
}).into_par_iter_sync(move |a| {
    // error at 1000
    error_at_1000(a)
}).into_par_iter_sync(move |a| {
    Ok(a)
}).collect();

let expected: Vec<i32> = (0..1000).collect();
assert_eq!(results, expected)

您可以选择跳过错误

如果您不想在 Err 时停止,这是一个解决方案。

use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;

let results: Vec<Result<i32, ()>> = (0..5).into_par_iter_sync(move |n| {
    // error at 3, but skip
    if n == 3 {
        Ok(Err(()))
    } else {
        Ok(Ok(n))
    }
}).collect();

assert_eq!(results, vec![Ok(0), Ok(1), Ok(2), Err(()), Ok(4)])

实现说明

输出缓冲

  • 每个工作线程使用一个同步的单生产者多生产者(mpsc)通道来缓冲输出。因此,当一个线程等待轮询时,它不会阻塞。每个线程的通道大小硬编码为100。
  • 线程数等于逻辑核心数。

同步机制

  • 当每个线程获取一个任务时,它会将其线程ID(thread_number)和任务ID(task_number)注册到一个mpsc通道中。
  • 当调用next()时,消费者从任务注册表(task_order)中获取下一个线程ID和任务ID。
  • 如果next()检测到某些线程由于异常而没有产生结果,它会调用kill(),这将停止线程获取新任务,刷新剩余任务,并加入工作线程。

错误处理和丢弃

  • 当发生任何异常时,停止生产者获取新任务。
  • 在丢弃结构之前,停止所有生产者获取任务,刷新所有剩余任务,并加入所有线程。

依赖

~205KB