3个版本

使用旧Rust 2015

0.1.2 2018年12月2日
0.1.1 2018年12月2日
0.1.0 2018年11月28日

#1196 in 并发

Apache-2.0/MIT

110KB
2K SLoC

Tange-Collection

Tange-Collection是一个中等层级的用于高速数据处理的数据流库。

它是做什么的?

Tange-Collection提供了数据流操作符,用于快速执行数据处理任务。它使用基于任务的并行化来构建复杂的计算图,可扩展到数亿个独立的阶段。

它是为了解决与Dask和Spark相同类型的处理任务而创建的,更侧重于批量处理而不是分析。

API

示例 - 单词计数

extern crate tange;
extern crate tange_collection;

use tange::scheduler::GreedyScheduler;
use tange_collection::utils::read_text;

use std::env::args;

fn main() {
    let path = args().nth(1).unwrap();
    let col = read_text(&path, 4_000_000)
        .expect("File missing");

    let graph = col 
        .map(|line| line.split_whitespace().fold(0usize, |a,_x| a + 1)) 
        .fold_by(|_count| 1,
                 || 0usize,
                 |acc, c| { *acc += c },
                 |acc1, acc2| { *acc1 += acc2 },
                 1);
    
    if let Some(counts) = graph.run(&GreedyScheduler::new()) {
        println!("Counts: {:?}", counts);
    }   
}

示例 - IDF计数

extern crate tange;
extern crate tange_collection;

use tange::scheduler::GreedyScheduler;
use tange_collection::utils::read_text;

use std::env::args;
use std::collections::HashSet;

fn main() {
    env_logger::init();
     
    let path = args().nth(1).unwrap();
    let col = read_text(&path, 64_000_000)
        .expect("File missing");

    let total_lines = col.count();
    let word_freq = col
        .emit_to_disk("/tmp".into(), |line, emitter| {
            let unique: HashSet<_> = line.split_whitespace().map(|p| p.to_lowercase()).collect();
            for word in unique {
                emitter(word);
            }
        })
        .frequencies(16);

    // Cross product
    let idfs = total_lines.join_on(
            &word_freq.to_memory(),
            |_c| 1,
            |_wc| 1,
            |total, (word, count)| {
                (word.clone(), (1f64 + (*total as f64 / *count as f64)).ln())
            },  
            1   
        )
        .map(|(_k, x)| x.clone())
        .sort_by(|(word, _count)| word.clone());

    if let Some(word_idf) = idfs.run(&GreedyScheduler::new()) {
        for (w, idf) in word_idf {
            println!("{}: {}", w, idf);
        }
    }
}

依赖关系

~2–2.9MB
~56K SLoC