8 个不稳定版本 (3 个破坏性更新)
使用旧的 Rust 2015
0.3.0 | 2015年10月11日 |
---|---|
0.2.1 | 2015年8月3日 |
0.2.0 | 2015年5月24日 |
0.1.3 | 2015年4月20日 |
0.0.1 | 2015年2月19日 |
#143 in #thread-pool
每月 95 次下载
用于 2 crates
36KB
522 行
simple_parallel
基本数据并行操作的直接函数和类型。
lib.rs
:
基本数据并行操作的直接函数和类型。
此库提供了一些用于在并行操作数据(尤其是迭代器)时的构建块。目前,它不是为了提供健壮性或榨取最后一点性能而设计的,而是探索 Rust 类型系统允许以安全性保证编写一些相当复杂的事情的方式,所有这些都不需要垃圾回收器。
核心设计是简单地允许单线程上的操作在多个线程上执行,它并不打算作为线程之间的硬边界;特别是,如果在顺序执行时(例如,使用 panic!
)会摧毁主线程,那么在调用此库中的函数时(最终)也会摧毁主线程。
关于性能和健壮性,顶级函数不做线程池操作,因此实际上每个元素都会产生一个新线程,这在很多方面都是次优的。幸运的是,并非所有都丢失,功能设计得尽可能通用,因此迭代器函数与许多迭代器一起工作,例如,用户可以将向量分成不相交的部分,并将这些部分分散到更少的线程上(例如,使用 chunks
方法)。
此外,现有的线程池有大量的同步开销,因此实际上很少是性能提升(尽管它比顶级函数更健壮,因为它限制了将要产生的线程数量)。
无论如何,不推荐一般使用。
用法
在 crates.io 上可用。将其添加到您的 Cargo.toml 文件中
[dependencies]
simple_parallel = "0.3"
最新开发版本可以在GitHub上获取:GitHub。
示例
并行初始化一个数组。
let mut data = [0; 10];
// fill the array, with one thread for each element:
simple_parallel::for_(data.iter_mut().enumerate(), |(i, elem)| {
*elem = i as i32;
});
// now adjust that data, with a threadpool:
let mut pool = simple_parallel::Pool::new(4);
pool.for_(data.iter_mut(), |elem| *elem *= 2);
以花哨的方式并行转换有序映射的每个元素,使用map
(map
确保输出顺序与输入顺序匹配,与unordered_map
不同),
extern crate crossbeam;
extern crate simple_parallel;
use std::collections::BTreeMap;
let mut map = BTreeMap::new();
map.insert('a', 1);
map.insert('x', 55);
crossbeam::scope(|scope| {
// (`IntoIterator` is used, so "direct" iteration like this is fine.)
let par_iter = simple_parallel::map(scope, &map, |(&c, &elem)| {
let mut x = elem * c as i32;
// ... something complicated and expensive ...
return x as f64
});
// the computation is executing on several threads in the
// background, so that elements are hopefully ready as soon as
// possible.
for value in par_iter {
println!("I computed {}", value);
}
});
并行求任意长切片的总和,通过求子区间的和并将所有内容加到存储在主线程堆栈上的共享互斥锁中。(目前缺少并行折叠,因此需要互斥锁。)
use std::sync::Mutex;
// limit the spew of thread spawning to something sensible
const NUM_CHUNKS: usize = 8;
fn sum(x: &[f64]) -> f64 {
// (round up)
let elements_per_chunk = (x.len() + NUM_CHUNKS - 1) / NUM_CHUNKS;
let total = Mutex::new(0.0);
simple_parallel::for_(x.chunks(elements_per_chunk), |chunk| {
// sum up this little subsection
let subsum = chunk.iter().fold(0.0, |a, b| a + *b);
*total.lock().unwrap() += subsum;
});
let answer = *total.lock().unwrap();
answer
}
或者,可以使用线程池,并为每个子区间分配一个绝对元素数,让池管理在线程间分配工作,而不是被迫计算子区间的长度以限制生成的线程数。
use std::sync::Mutex;
// limit the spew of thread spawning to something sensible
const ELEMS_PER_JOB: usize = 1_000;
fn pooled_sum(pool: &mut simple_parallel::Pool, x: &[f64]) -> f64 {
let total = Mutex::new(0.0);
pool.for_(x.chunks(ELEMS_PER_JOB), |chunk| {
// sum up this little subsection
let subsum = chunk.iter().fold(0.0, |a, b| a + *b);
*total.lock().unwrap() += subsum;
});
let answer = *total.lock().unwrap();
answer
}
一个非常简单的递归并行归并排序的草图,使用both
来处理递归。(实际实现可能真的需要一些临时缓冲区来破坏数据,但关键点是both
自然并行运行。)
/// Merges the two sorted runs `left` and `right`.
/// That is, after `merge(left, right)`,
///
/// left[0] <= left[1] <= ... <= left[last] <= right[0] <= ...
fn merge<T: Ord>(left: &mut [T], right: &mut [T]) {
// magic (but non-parallel, so boring)
}
fn parallel_merge_sort<T: Ord + Send>(x: &mut [T]) {
// base case
if x.len() <= 1 { return }
// get two disjoint halves of the `x`,
let half = x.len() / 2;
let (left, right) = x.split_at_mut(half);
// and sort them recursively, in parallel
simple_parallel::both(&mut *left, &mut *right, |v| parallel_merge_sort(v));
// now combine the two sorted halves
merge(left, right)
}
examples
文件夹包含更复杂的示例,例如并行快速傅里叶变换实现(它真的有效,并行性确实带来了好处...当调整得当的时候)。
依赖项
~15KB