31个版本
0.3.0 | 2021年5月7日 |
---|---|
0.2.3 | 2021年3月22日 |
0.2.2-alpha.6 | 2021年2月22日 |
0.2.1-alpha.3 | 2021年1月28日 |
0.1.1 | 2020年12月26日 |
#10 in #flink
每月115次下载
在 9 个crate 中使用
7KB
105 行
rlink-rs
高性能流处理框架。从头开始,以Rust实现Apache Flink的新、更快的方法。纯内存,零拷贝。生产环境中的单个集群稳定处理每秒数亿次的窗口计算。
框架已在Linux/MacOS/Windows上测试,需要稳定的Rust。
监控
图形
图形演变
rlink 计划可视化工具
示例
rlink = "0.6"
SELECT
HOP_START(timestamp, INTERVAL '20' SECOND, INTERVAL '60' SECOND),
HOP_END(timestamp, INTERVAL '20' SECOND, INTERVAL '60' SECOND),
name,
SUM(value),
MAX(value),
MIN(value),
COUNT(*),
FROM stream_table
GROUP BY HOP(timestamp, INTERVAL '20' SECOND, INTERVAL '60' SECOND), name
#[derive(Clone, Debug)]
pub struct SimpleStreamApp {}
impl StreamApp for SimpleStreamApp {
fn prepare_properties(&self, properties: &mut Properties) {
properties.set_application_name("rlink-simple");
}
fn build_stream(&self, _properties: &Properties, env: &mut StreamExecutionEnvironment) {
env.register_source(vec_source(gen_records(), &model::FIELD_METADATA), 1)
.assign_timestamps_and_watermarks(
DefaultWatermarkStrategy::new()
.for_bounded_out_of_orderness(Duration::from_secs(1))
.wrap_time_periodic(Duration::from_secs(10), Duration::from_secs(20))
.for_schema_timestamp_assigner("timestamp"),
)
.key_by(SchemaKeySelector::new(vec!["name"]))
.window(SlidingEventTimeWindows::new(
Duration::from_secs(60),
Duration::from_secs(20),
None,
))
.reduce(
SchemaReduceFunction::new(vec![sum("value"), max("value"), min("value"), count()]),
2,
)
.add_sink(print_sink());
}
}
构建
构建源
# debug
cargo build --color=always --all --all-targets
# release
cargo build --release --color=always --all --all-targets
独立部署
配置
standalone.yaml
---
# all job manager's addresses, one or more
application_manager_address:
- "http://0.0.0.0:8770"
- "http://0.0.0.0:8770"
metadata_storage:
type: Memory
# bind ip
task_manager_bind_ip: 0.0.0.0
task_manager_work_dir: /data/rlink/application
task_managers
TaskManager 列表
10.1.2.1
10.1.2.2
10.1.2.3
10.1.2.4
启动
协调器
./start_job_manager.sh
工作节点
./start_task_manager.sh
提交应用程序
在独立模式下
## submit an application
# create job
curl http://x.x.x.x:8770/job/application \
-X POST \
-F "file=@/path/to/execute_file" \
-v
# run job
curl http://x.x.x.x:8770/job/application/application-1591174445599 \
-X POST \
-H "Content-Type:application/json" \
-d '{"batch_args":[{"cluster_mode":"Standalone", "manager_type":"Coordinator","num_task_managers":"15"}]}' \
-v
# kill job
curl http://x.x.x.x:8770/job/application/application-1591174445599/shutdown \
-X POST \
-H "Content-Type:application/json"
在Yarn上
将更新管理器jar文件上传到hdfs
将 rlink-yarn-manager-{version}-jar-with-dependencies.jar
上传到hdfs
例如:上传到 hdfs://nn/path/to/rlink-yarn-manager-{version}-jar-with-dependencies.jar
将仪表板更新到hdfs
将 rlink-dashboard.zip
上传到hdfs
例如:上传到 hdfs://nn/path/to/rlink-dashboard.zip
将应用程序更新到hdfs
将您的应用程序可执行文件上传到hdfs。
例如:将 rlink-showcase
上传到 hdfs://nn/path/to/rlink-showcase
提交Yarn作业
使用 rlink-yarn-client-{version}.jar
提交 yarn 作业
hadoop jar rlink-yarn-client-{version}.jar rlink.yarn.client.Client \
--applicationName rlink-showcase \
--worker_process_path hdfs://nn/path/to/rlink-showcase \
--java_manager_path hdfs://nn/path/to/rlink-yarn-manager-{version}-jar-with-dependencies.jar \
--yarn_manager_main_class rlink.yarn.manager.ResourceManagerCli \
--dashboard_path hdfs://nn/path/to/rlink-dashboard.zip \
--master_memory_mb 256 \
--master_v_cores 1 \
--memory_mb 256 \
--v_cores 1 \
--queue root.default \
--cluster_mode YARN \
--manager_type Coordinator \
--num_task_managers 80 \
--application_process_arg xxx
在 Kubernetes 上
准备
- Kubernetes
- KubeConfig,可通过 ~/.kube/config 进行配置。您可以通过运行 kubectl auth can-i <list|create|edit|delete> pods 验证权限
查看如何 设置 Kubernetes 集群。
在 Kubernetes 上启动 rlink 应用程序
# start
./target/release/rlink-kubernetes \
name=my_first_rlink_application \
image_path=name:tag \
job_v_cores=1 \
job_memory_mb=100 \
task_v_cores=1 \
task_memory_mb=100 \
num_task_managers=1 \
# stop
kubectl delete deployment/my_first_rlink_application
构建 image example-simple
sudo docker build -t xxx:xx -f ./docker/Dockerfile_example_simple .
依赖
~1.5MB
~34K SLoC