1 个不稳定版本
0.2.0 | 2024年3月18日 |
---|
#230 in 并发
740KB
15K SLoC
Renoir
REactive Network of Operators In Rust
Renoir (简称: Noir) [/ʁənwaʁ/, /nwaʁ/] 是一个基于数据流范式的分布式数据处理平台,它提供了一个类似于Apache Flink的直观编程接口,但具有更好的性能特征。
Renoir将每个作业转换为算子数据流图,并将它们分组到块中。块包含一系列算子,它们按顺序处理数据而不会重新分区。它们是系统使用的部署单元,可以在多个系统上分布和执行。
Renoir程序的一般布局从创建一个StreamContext
开始,然后初始化一个或多个Source
来创建一个Stream
。算子图是通过Stream
对象的方法组成的,这些方法遵循Rust的Iterator
trait的类似方法,允许通过方法链定义一个处理工作流程。
示例
单词计数
use renoir::prelude::*;
fn main() {
// Convenience method to parse deployment config from CLI arguments
let (config, args) = RuntimeConfig::from_args();
config.spawn_remote_workers();
let env = StreamContext::new(config);
let result = env
// Open and read file line by line in parallel
.stream_file(&args[0])
// Split into words
.flat_map(|line| tokenize(&line))
// Partition
.group_by(|word| word.clone())
// Count occurrences
.fold(0, |count, _word| *count += 1)
// Collect result
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
if let Some(result) = result.get() {
// Print word counts
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
// Simple tokenisation strategy
s.split_whitespace().map(str::to_lowercase).collect()
}
// Execute on 6 local hosts `cargo run -- -l 6 input.txt`
单词计数关联(更快)
use renoir::prelude::*;
fn main() {
// Convenience method to parse deployment config from CLI arguments
let (config, args) = RuntimeConfig::from_args();
let env = StreamContext::new(config);
let result = env
.stream_file(&args[0])
// Adaptive batching(default) has predictable latency
// Fixed size batching often leads to shorter execution times
// If data is immediately available and latency is not critical
.batch_mode(BatchMode::fixed(1024))
.flat_map(move |line| tokenize(&line))
.map(|word| (word, 1))
// Associative operators split the operation in a local and a
// global step for faster execution
.group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
.unkey()
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
if let Some(result) = result.get() {
// Print word counts
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
s.split_whitespace().map(str::to_lowercase).collect()
}
// Execute on multiple hosts `cargo run -- -r config.toml input.txt`
远程部署
# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16
[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }
有关更多示例,请参阅示例目录
依赖项
~6–17MB
~212K SLoC