2 个不稳定版本

0.2.0 2024 年 3 月 14 日
0.1.0 2023 年 12 月 15 日

#7 in #noir

每月 46 次下载

LGPL-3.0-or-later

740KB
15K SLoC

Noir

预印本

Rust 中的操作符网络

API 文档

Noir 是一个基于数据流范式的分布式数据处理平台,它提供了一个类似于 Apache Flink 的直观编程接口,但具有更好的性能特性。

Noir 将每个作业转换为操作符的数据流图,并将它们分组到块中。块包含一系列操作符,它们按顺序处理数据而不重新分区。它们是系统使用的部署单元,可以在多个系统上分布式和执行。

Noir 程序的常见布局从创建 StreamContext 开始,然后初始化一个或多个 Source 创建一个 Stream。操作符图使用 Stream 对象的方法组成,这些方法遵循 Rust 的 Iterator 特性,允许通过方法链以直观的方式定义处理工作流程。

示例

词频统计

use noir_compute::prelude::*;

fn main() {
    // Convenience method to parse deployment config from CLI arguments
    let (config, args) = RuntimeConfig::from_args();
    config.spawn_remote_workers();
    let env = StreamContext::new(config);

    let result = env
        // Open and read file line by line in parallel
        .stream_file(&args[0])
        // Split into words
        .flat_map(|line| tokenize(&line))
        // Partition
        .group_by(|word| word.clone())
        // Count occurrences
        .fold(0, |count, _word| *count += 1)
        // Collect result
        .collect_vec();
        
    env.execute_blocking(); // Start execution (blocking)
    if let Some(result) = result.get() {
        // Print word counts
        result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
    }
}

fn tokenize(s: &str) -> Vec<String> {
    // Simple tokenisation strategy
    s.split_whitespace().map(str::to_lowercase).collect()
}

// Execute on 6 local hosts `cargo run -- -l 6 input.txt`

词频统计(更快速)

use noir_compute::prelude::*;

fn main() {
    // Convenience method to parse deployment config from CLI arguments
    let (config, args) = RuntimeConfig::from_args();
    let env = StreamContext::new(config);

    let result = env
        .stream_file(&args[0])
        // Adaptive batching(default) has predictable latency
        // Fixed size batching often leads to shorter execution times
        // If data is immediately available and latency is not critical
        .batch_mode(BatchMode::fixed(1024))
        .flat_map(move |line| tokenize(&line))
        .map(|word| (word, 1))
        // Associative operators split the operation in a local and a
        // global step for faster execution
        .group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
        .unkey()
        .collect_vec();

    env.execute_blocking(); // Start execution (blocking)
    if let Some(result) = result.get() {
        // Print word counts
        result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
    }
}

fn tokenize(s: &str) -> Vec<String> {
    s.split_whitespace().map(str::to_lowercase).collect()
}

// Execute on multiple hosts `cargo run -- -r config.yaml input.txt`

远程部署

# config.yaml
hosts:
  - address: host1.lan
    base_port: 9500
    num_cores: 16
  - address: host2.lan
    base_port: 9500
    num_cores: 8
    ssh:
      username: noir-compute
      key_file: /home/user/.ssh/id_rsa

有关更多示例,请参阅 示例目录

依赖关系

~7–21MB
~252K SLoC