#星座 #分布式 #部署 #分布式计算 #集群 #异步编程

无 std 程序+库 constellation-rs

星座是一个用于 Rust (nightly) 的框架,有助于编写、调试和部署分布式程序

13 个版本

0.2.0-alpha.22020 年 7 月 15 日
0.2.0-alpha.12020 年 2 月 8 日
0.1.10 2020 年 1 月 20 日
0.1.8 2019 年 11 月 15 日
0.1.4 2019 年 8 月 31 日

异步 中排名第 636

每月下载量 46
amadeus 中使用

Apache-2.0

255KB
7K SLoC

Constellation

一个使 Rust 成为分布式计算前沿的项目。

Crates.io Apache-2.0 licensed Build Status

文档

星座是一个用于 Rust (nightly) 的框架,有助于编写、调试和部署分布式程序。它大量借鉴了 Erlang/OTPMPICSP,并在 Rust 生态系统中有用,包括 serde + bincode 用于网络序列化,以及 miofutures-rs 用于 TCP 的异步通道。

大多数用户将通过高级库利用星座,例如

  • Amadeus:在 Rust 中实现和谐分布式数据分析。受 Rayon 启发,它提供了一个分布式进程池和内置的数据科学工具来利用它。
  • 更多内容即将推出!

要直接利用星座,请继续阅读。

星座框架

  • 星座是一个框架,您可以在程序开始时通过调用 init() 来初始化。
  • 您可以使用 spawn(closure) 创建新进程,这些进程运行 closure
  • spawn(closure) 返回新进程的 Pid。
  • 您可以通过使用 Sender::new(remote_pid)Receiver::new(remote_pid) 创建通道来在进程之间进行通信。
  • 通道可以使用异步方式与 sender.send(value).awaitreceiver.recv().await 一起使用。
  • futures-rs 为使用通道提供了有用的函数和适配器,包括 select()join()
  • 您还可以使用 .block() 方便方法在通道上进行阻塞:sender.send().block()receiver.recv().block()
  • 有关Rust中异步编程的更多信息,请参阅 Async Book

以下是一个递归地生成进程以分配查找斐波那契数任务的简单示例

点击显示 Cargo.toml。
[dependencies]

# The core APIs, including init(), spawn(), Sender, Receiver and select().
# Always required when using Constellation.
constellation-rs = "0.1"

# Support for FnOnce!(), FnMut!() and Fn!() macros to create Serde serializable
# closures. Required to pass a closure to spawn().
serde_closure = "0.1"

use constellation::*;
use serde_closure::FnOnce;

fn fibonacci(x: usize) -> usize {
    if x <= 1 {
        return x;
    }
    let left_pid = spawn(
        Resources::default(),
        FnOnce!(move |parent_pid| {
            println!("Left process with {}", x);
            Sender::<usize>::new(parent_pid)
                .send(fibonacci(x - 1))
                .block()
        }),
    )
    .block()
    .unwrap();

    let right_pid = spawn(
        Resources::default(),
        FnOnce!(move |parent_pid| {
            println!("Right process with {}", x);
            Sender::<usize>::new(parent_pid)
                .send(fibonacci(x - 2))
                .block()
        }),
    )
    .block()
    .unwrap();

    Receiver::<usize>::new(left_pid).recv().block().unwrap()
        + Receiver::<usize>::new(right_pid).recv().block().unwrap()
}

fn main() {
    init(Resources::default());

    println!("11th Fibonacci number is {}!", fibonacci(10));
}
点击显示输出。

** TODO! 这是错误的截图! ** 使用 constellation 的截图

请查看此示例的更真实版本,包括异步和错误处理,在此

分布式运行

Constellation 有两个组件

  • 一组函数库,使您能够 spawn() 进程,以及在这些进程之间 send()recv()
  • 对于您想要在多个服务器上运行的情况,还有一个分布式执行框架,以及添加到 cargo 中的 deploy 命令,用于将程序部署到该框架。

两者都输出到命令行,如上所示 - 唯一的区别是后者已经通过网络转发。

Constellation 仍然处于起步阶段 - 开发和测试正在进行,以支持 Windows(目前仅支持 Linux 和 macOS)并达到更高的成熟度。

目前的重点是在测试、文档、API 精炼(特别是错误信息和异步原语)以及移植到 Windows 上。

功能

Constellation 负责以下工作

  • 使用 spawn() 在具有可用资源的服务器上分配具有定义内存和CPU资源需求的过程
  • TODO:尽力执行这些内存和资源需求,以避免有缺陷的/贪婪的过程饿死其他进程
  • TCP 之间的进程通道,具有自动设置和删除
  • 异步(反)序列化通过通道发送/接收的值(利用 serdebincode 和可选的 libfringe 以避免分配)
  • 通道实现 std::future::Futurefutures::stream::Streamfutures::sink::Sink,使得可以使用来自 futures-rs 的有用函数和适配器,包括 select()join(),以及与 tokioruntime 的兼容性。
  • 由一个运行高效边缘触发 epoll 循环的后台线程提供支持
  • 确保在进程退出之前发送和确认数据,以避免连接重置和数据丢失(利用 atexitTIOCOUTQ
  • 地址:所有通道都在集群范围的 Pid 之间,而不是 (ip,port)
  • 高性能:设计用于在底层操作系统之上提供最小的开销

这是为了什么

Constellation 使编写分布式程序更加容易。像 MPI 一样,它抽象出套接字,让您专注于业务逻辑,而不是寻址、连接、多路复用、异步、事件和删除。与 MPI 不同,它有一个现代、简洁的接口,使用 serde 处理(反)序列化,提供像 select() 这样的强大异步构建块,并与 Rust 异步生态系统集成。

它是如何工作的

有两种执行模式:使用 cargo run 正常运行和使用 cargo deploy 将其部署到集群。我们将讨论第一种,然后介绍第二种的不同之处。

监控进程

每个进程都有一个监控进程,用于捕获进程的输出,并调用waitpid来捕获退出状态(无论是退出代码还是信号)。这是在进程初始化时通过复制来设置的,父进程是监控进程,子进程则继续运行用户的程序。通过替换文件描述符0、1、2(分别对应stdin、stdout和stderr)为管道,监控进程可以捕获输出,当用户的进程写入例如fd 1时,它实际上是在写入一个监控进程随后读取并转发给桥梁的管道。

桥梁

桥梁是用来收集来自各个监控进程的输出,并在终端上格式化输出的。它在init()内部启动,通过复制进程,使得父进程成为桥梁,而子进程则继续运行用户的程序。

生成

spawn()接受一个函数、一个参数和资源限制,并使用这些参数生成一个新的进程。这是通过调用当前二进制的一个干净副本来实现的execve("/proc/self/exe",argv,envp),在其调用init()时,行为略有不同:它连接回现有的桥梁,而不是返回控制流,而是在正常退出之前调用指定的用户函数。函数指针相对于文本段的固定基址进行调整。

通道

通过创建Sender<T>Receiver<T>来实现通信。创建过程需要Pid,并在幕后进行大量的记录工作,以确保

  • 双工TCP连接被正确且恰当地创建和销毁,以支持用户创建的单工通道。
  • TCP连接在操作系统中的资源消耗与用户持有的通道数量成正比。
  • Pid是唯一的。
  • 每个进程有一个单一的端口(在初始化时临时绑定,以避免饥饿或失败),所有通道支持的TCP连接都是从该端口到或来自。
  • 序列化可以异步发生,即为了避免在套接字未准备好写入时分配无界内存来保存serde序列化的结果,利用libfringe的协程。
  • 可以通过删除和重新创建通道来更改通道消息的类型。

分布式运行

在集群上运行时有四个主要区别

星系节点

监听可配置的地址,接收二进制文件并执行它们。

星系主节点

接收零个或多个其他星系实例的地址和资源作为输入,以及要自动启动的进程——这几乎总是桥梁。

它监听可配置的地址,以接收具有资源要求的二进制文件——但几乎总是只有桥梁向它提供这些二进制文件才有意义。

桥梁

它不是由用户进程内部的fork调用启动的,而是在星系主初始化时自动启动。它监听一个可配置的地址,等待 cargo deploy 部署命令,此时它使用特殊的环境变量运行二进制程序,触发 init() 打印初始进程的资源需求并退出,然后将带有确定资源需求的二进制程序发送给 星系主节点。成功分配后,它由一个 星系 实例执行。在 init() 中,它重新连接到 桥接器,桥接器尽职尽责地将输出转发到 cargo deploy

cargodeploy

这是一个添加到 cargo 的命令,底层调用 cargo run,但与本地运行生成的二进制文件不同,它被发送到 桥接器。然后桥接器将任何输出发送回,该输出在终端以格式化的形式输出。

如何使用

[dependencies]
constellation-rs = "0.1"
use constellation::*;

fn main() {
    init(Resources::default());
    println!("Hello, world!");
}
$ cargo run
3fecd01:
    Hello, world!
    exited: 0

或者,以分布式方式运行

机器2

cargo install constellation-rs
constellation 10.0.0.2:9999 # local address to bind to

机器3

cargo install constellation-rs
constellation 10.0.0.3:9999

机器1

cargo install constellation-rs
constellation 10.0.0.1:9999 nodes.toml

nodes.toml

[[nodes]]
fabric_addr = "10.0.0.1:9999" # local address to bind to
bridge_bind = "10.0.0.1:8888" # local address of the bridge to bind to
mem = "100 GiB"               # resource capacity of the node
cpu = 16                      # number of logical cores

[[nodes]]
fabric_addr = "10.0.0.2:9999"
mem = "100 GiB"
cpu = 16

[[nodes]]
fabric_addr = "10.0.0.3:9999"
mem = "100 GiB"
cpu = 16

您的笔记本电脑

cargo install constellation-rs
cargo deploy --release 10.0.0.1:8888 # address of the bridge
833d3de:
    Hello, world!
    exited

需求

Rust: nightly。

Linux: 内核 >= 3.9; /proc 文件系统。

macOS: 已测试 >= 10.10,也可能在较旧版本上运行。

如果您遇到其他需求,请提交问题。

API

查看 Rust 文档

测试

查看 TESTING.md

为什么?

星系是我正在开发的一个大型数据处理项目的基础。我决定开始完善它,并公开发布为开源软件,万一它对其他人有趣或有用呢!

许可

根据 Apache 许可证版本 2.0 许可,(LICENSE.txthttp://www.apache.org/licenses/LICENSE-2.0)。

除非您明确说明,否则您提交的任何旨在包含在本作品中的贡献(根据 Apache-2.0 许可证定义),均应按上述方式许可,不附加任何其他条款或条件。

依赖

~11–29MB
~519K SLoC