3个版本
使用旧Rust 2015
0.1.2 | 2018年12月2日 |
---|---|
0.1.1 | 2018年12月2日 |
0.1.0 | 2018年11月28日 |
#1196 in 并发
110KB
2K SLoC
Tange-Collection
Tange-Collection是一个中等层级的用于高速数据处理的数据流库。
它是做什么的?
Tange-Collection提供了数据流操作符,用于快速执行数据处理任务。它使用基于任务的并行化来构建复杂的计算图,可扩展到数亿个独立的阶段。
它是为了解决与Dask和Spark相同类型的处理任务而创建的,更侧重于批量处理而不是分析。
API
示例 - 单词计数
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::utils::read_text;
use std::env::args;
fn main() {
let path = args().nth(1).unwrap();
let col = read_text(&path, 4_000_000)
.expect("File missing");
let graph = col
.map(|line| line.split_whitespace().fold(0usize, |a,_x| a + 1))
.fold_by(|_count| 1,
|| 0usize,
|acc, c| { *acc += c },
|acc1, acc2| { *acc1 += acc2 },
1);
if let Some(counts) = graph.run(&GreedyScheduler::new()) {
println!("Counts: {:?}", counts);
}
}
示例 - IDF计数
extern crate tange;
extern crate tange_collection;
use tange::scheduler::GreedyScheduler;
use tange_collection::utils::read_text;
use std::env::args;
use std::collections::HashSet;
fn main() {
env_logger::init();
let path = args().nth(1).unwrap();
let col = read_text(&path, 64_000_000)
.expect("File missing");
let total_lines = col.count();
let word_freq = col
.emit_to_disk("/tmp".into(), |line, emitter| {
let unique: HashSet<_> = line.split_whitespace().map(|p| p.to_lowercase()).collect();
for word in unique {
emitter(word);
}
})
.frequencies(16);
// Cross product
let idfs = total_lines.join_on(
&word_freq.to_memory(),
|_c| 1,
|_wc| 1,
|total, (word, count)| {
(word.clone(), (1f64 + (*total as f64 / *count as f64)).ln())
},
1
)
.map(|(_k, x)| x.clone())
.sort_by(|(word, _count)| word.clone());
if let Some(word_idf) = idfs.run(&GreedyScheduler::new()) {
for (w, idf) in word_idf {
println!("{}: {}", w, idf);
}
}
}
依赖关系
~2–2.9MB
~56K SLoC