10 个版本
0.1.11 | 2023 年 4 月 13 日 |
---|---|
0.1.10 | 2021 年 10 月 30 日 |
在 并发 中排名第 122
每月下载量 464
用于 3 个 crates (2 个直接使用)
43KB
581 行代码(不包括注释)
par_iter_sync: 并行迭代器带顺序输出
类似于 rayon
的 Crate 不提供同步机制。这个 Crate 提供了并行和同步的简单混合。
考虑这种情况,多个线程共享一个缓存,该缓存只能在先前的任务写入后才能读取(例如,任务 4 的读取依赖于任务 1-4 的写入)。
使用 IntoParallelIteratorSync
特征
// in concurrency: task1 write | task2 write | task3 write | task4 write
// \_____________\_____________\_____________\
// task4 read depends on task 1-4 write \___________
// \
// in concurrency: | task2 read | task3 read | task4 read
use par_iter_sync::IntoParallelIteratorSync;
use std::sync::{Arc, Mutex};
use std::collections::HashSet;
// there are 100 tasks
let tasks = 0..100;
// an in-memory cache for integers
let cache: Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
let cache_clone = cache.clone();
// iterate through tasks
tasks.into_par_iter_sync(move |task_number| {
// writes cache (write the integer in cache), in parallel
cache.lock().unwrap().insert(task_number);
// return the task number to the next iterator
Ok(task_number)
}).into_par_iter_sync(move |task_number| { // <- synced to sequential order
// reads
assert!(cache_clone.lock().unwrap().contains(&task_number));
Ok(())
// append a for each to actually run the whole chain
}).for_each(|_| ());
使用注意事项
这个 Crate 被设计为为每个线程克隆闭包捕获的所有资源。为了防止意外的 RAM 使用,您可以使用 Arc
包装大型数据结构(尤其是 Clone
对象的向量)。
顺序一致性
输出顺序保证与上游迭代器相同,但执行顺序不是顺序的。
开销基准测试
平台:Macbook Air(2015 晚期)8GB RAM,Intel Core i5,1.6GHz(2 核心)。
结果
每次运行一百万(1,000,000)个空迭代。
test iter_async::test_par_iter_async::bench_into_par_iter_async
... bench: 110,277,577 ns/iter (+/- 28,510,054)
test test_par_iter::bench_into_par_iter_sync
... bench: 121,063,787 ns/iter (+/- 103,787,056)
结果
- 异步迭代器开销
110 ns (+/- 28 ns)
。 - 同步迭代器开销
121 ns (+/- 103 ns)
。
基准测试程序
iter_async
#[bench]
fn bench_into_par_iter_async(b: &mut Bencher) {
b.iter(|| {
(0..1_000_000).into_par_iter_async(|a| Ok(a)).for_each(|_|{})
});
}
iter_sync
#[bench]
fn bench_into_par_iter_sync(b: &mut Bencher) {
b.iter(|| {
(0..1_000_000).into_par_iter_sync(|a| Ok(a)).for_each(|_|{})
});
}
示例
通过链式调用混合同步和并行
use par_iter_sync::IntoParallelIteratorSync;
(0..100).into_par_iter_sync(|i| {
Ok(i) // <~ async execution
}).into_par_iter_sync(|i| { // <- sync order
Ok(i) // <~async execution
}).into_par_iter_sync(|i| { // <- sync order
Ok(i) // <~async execution
}); // <- sync order
使用 std::iter::IntoIterator
接口
use par_iter_sync::IntoParallelIteratorSync;
let mut count = 0;
// for loop
for i in (0..100).into_par_iter_sync(|i| Ok(i)) {
assert_eq!(i, count);
count += 1;
}
// sum
let sum: i32 = (1..=100).into_par_iter_sync(|i| Ok(i)).sum();
// take and collect
let results: Vec<i32> = (0..10).into_par_iter_sync(|i| Ok(i)).take(5).collect();
assert_eq!(sum, 5050);
assert_eq!(results, vec![0, 1, 2, 3, 4])
闭包捕获变量
捕获的变量会自动克隆到每个线程。
use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;
// use `Arc` to save RAM
let resource_captured = Arc::new(vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]);
let len = resource_captured.len();
let result_iter = (0..len).into_par_iter_sync(move |i| {
// `resource_captured` is moved into the closure
// and cloned to worker threads.
let read_from_resource = resource_captured.get(i).unwrap();
Ok(*read_from_resource)
});
// the result is produced in sequential order
let collected: Vec<i32> = result_iter.collect();
assert_eq!(collected, vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3])
异常时的快速失败
一旦内部函数返回 Err
,迭代器就会停止。
use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;
use log::warn;
/// this function returns `Err` when it reads 1000
fn error_at_1000(n: i32) -> Result<i32, ()> {
if n == 1000 {
// you may log this error
warn!("Some Error Occurs");
Err(())
} else {
Ok(n)
}
}
let results: Vec<i32> = (0..10000).into_par_iter_sync(move |a| {
Ok(a)
}).into_par_iter_sync(move |a| {
// error at 1000
error_at_1000(a)
}).into_par_iter_sync(move |a| {
Ok(a)
}).collect();
let expected: Vec<i32> = (0..1000).collect();
assert_eq!(results, expected)
您可以选择跳过错误
如果您不想在 Err
时停止,这是一个解决方案。
use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;
let results: Vec<Result<i32, ()>> = (0..5).into_par_iter_sync(move |n| {
// error at 3, but skip
if n == 3 {
Ok(Err(()))
} else {
Ok(Ok(n))
}
}).collect();
assert_eq!(results, vec![Ok(0), Ok(1), Ok(2), Err(()), Ok(4)])
实现说明
输出缓冲
- 每个工作线程使用一个同步的单生产者多生产者(mpsc)通道来缓冲输出。因此,当一个线程等待轮询时,它不会阻塞。每个线程的通道大小硬编码为100。
- 线程数等于逻辑核心数。
同步机制
- 当每个线程获取一个任务时,它会将其线程ID(
thread_number
)和任务ID(task_number
)注册到一个mpsc通道中。 - 当调用
next()
时,消费者从任务注册表(task_order
)中获取下一个线程ID和任务ID。 - 如果
next()
检测到某些线程由于异常而没有产生结果,它会调用kill()
,这将停止线程获取新任务,刷新剩余任务,并加入工作线程。
错误处理和丢弃
- 当发生任何异常时,停止生产者获取新任务。
- 在丢弃结构之前,停止所有生产者获取任务,刷新所有剩余任务,并加入所有线程。
依赖
~205KB