#stream #stream-processing #elasticsearch #window #flink

已删除 rlink-elasticsearch-connector

高性能流处理框架

0.2.5 2021年3月29日
0.2.3 2021年3月22日
0.2.2-alpha.6 2021年2月22日
0.2.0 2021年1月26日
0.1.1 2020年12月26日

#12 in #flink

每月下载量34次

MIT/Apache

605KB
16K SLoC

rlink-rs

Crates.io Released API docs MIT licensed License

高性能流处理框架。从头开始使用Rust实现Apache Flink的新、更快的版本。纯内存,零拷贝。生产环境中的单个集群稳定处理每秒数亿个窗口计算。

该框架已在Linux/MacOS/Windows上测试,需要稳定版Rust。

监控

img.png

图演变

img.png

img.png

示例

rlink = "0.6"
SELECT
  HOP_START(timestamp, INTERVAL '20' SECOND, INTERVAL '60' SECOND),
  HOP_END(timestamp, INTERVAL '20' SECOND, INTERVAL '60' SECOND),
  name,
  SUM(value),
  MAX(value),
  MIN(value),
  COUNT(*),
FROM stream_table
GROUP BY HOP(timestamp, INTERVAL '20' SECOND, INTERVAL '60' SECOND), name
#[derive(Clone, Debug)]
pub struct SimpleStreamApp {}

impl StreamApp for SimpleStreamApp {
    fn prepare_properties(&self, properties: &mut Properties) {
        properties.set_application_name("rlink-simple");
    }

    fn build_stream(&self, _properties: &Properties, env: &mut StreamExecutionEnvironment) {
        env.register_source(vec_source(gen_records(), &model::FIELD_METADATA), 1)
            .assign_timestamps_and_watermarks(
                DefaultWatermarkStrategy::new()
                    .for_bounded_out_of_orderness(Duration::from_secs(1))
                    .wrap_time_periodic(Duration::from_secs(10), Duration::from_secs(20))
                    .for_schema_timestamp_assigner("timestamp"),
            )
            .key_by(SchemaKeySelector::new(vec!["name"]))
            .window(SlidingEventTimeWindows::new(
                Duration::from_secs(60),
                Duration::from_secs(20),
                None,
            ))
            .reduce(
                SchemaReduceFunction::new(vec![sum("value"), max("value"), min("value"), count()]),
                2,
            )
            .add_sink(print_sink());
    }
}

构建

构建源

# debug
cargo build --color=always --all --all-targets
# release
cargo build --release --color=always --all --all-targets

独立部署

配置

standalone.yaml


---
# all job manager's addresses, one or more
application_manager_address:
  - "http://0.0.0.0:8770"
  - "http://0.0.0.0:8770"

metadata_storage:
  type: Memory

# bind ip
task_manager_bind_ip: 0.0.0.0
task_manager_work_dir: /data/rlink/application

任务管理器

任务管理器列表

10.1.2.1
10.1.2.2
10.1.2.3
10.1.2.4

启动

协调器

./start_job_manager.sh

工作节点

./start_task_manager.sh

提交应用

在独立模式下

## submit an application

# create job
curl http://x.x.x.x:8770/job/application \
  -X POST \
  -F "file=@/path/to/execute_file" \
  -v

# run job
curl http://x.x.x.x:8770/job/application/application-1591174445599 \
  -X POST \
  -H "Content-Type:application/json" \
  -d '{"batch_args":[{"cluster_mode":"Standalone", "manager_type":"Coordinator","num_task_managers":"15"}]}' \
  -v

# kill job
curl http://x.x.x.x:8770/job/application/application-1591174445599/shutdown \
  -X POST \
  -H "Content-Type:application/json"

在Yarn上

将更新管理器jar上传到hdfs

上传 rlink-yarn-manager-{version}-jar-with-dependencies.jar 到 hdfs

例如:上传到 hdfs://nn/path/to/rlink-yarn-manager-{version}-jar-with-dependencies.jar

将仪表板更新到hdfs

上传 rlink-dashboard.zip 到 hdfs

例如:上传到 hdfs://nn/path/to/rlink-dashboard.zip

将应用更新到hdfs

将您的应用程序可执行文件上传到hdfs。

例如:上传 rlink-showcasehdfs://nn/path/to/rlink-showcase

提交Yarn作业

使用 rlink-yarn-client-{version}.jar 提交Yarn作业

hadoop jar rlink-yarn-client-{version}.jar rlink.yarn.client.Client \
  --applicationName rlink-showcase \
  --worker_process_path hdfs://nn/path/to/rlink-showcase \
  --java_manager_path hdfs://nn/path/to/rlink-yarn-manager-{version}-jar-with-dependencies.jar \
  --yarn_manager_main_class rlink.yarn.manager.ResourceManagerCli \
  --dashboard_path hdfs://nn/path/to/rlink-dashboard.zip \
  --master_memory_mb 256 \
  --master_v_cores 1 \
  --memory_mb 256 \
  --v_cores 1 \
  --queue root.default \
  --cluster_mode YARN \
  --manager_type Coordinator \
  --num_task_managers 80 \
  --application_process_arg xxx

在Kubernetes上

准备

  • Kubernetes
  • KubeConfig,可通过 ~/.kube/config 配置。您可以通过运行 kubectl auth can-i <list|create|edit|delete> pods 验证权限

查看如何 设置Kubernetes集群

# start 
./target/release/rlink-kubernetes \
  name=my_first_rlink_application \
  image_path=name:tag \
  job_v_cores=1 \
  job_memory_mb=100 \
  task_v_cores=1 \
  task_memory_mb=100 \
  num_task_managers=1  \

# stop
kubectl delete deployment/my_first_rlink_application

构建镜像示例-simple

sudo docker build -t xxx:xx -f ./docker/Dockerfile_example_simple .

依赖关系

~30–46MB
~820K SLoC