13 个版本

0.2.0-alpha.22020 年 7 月 15 日
0.2.0-alpha.12020 年 2 月 8 日
0.1.10 2020 年 1 月 20 日
0.1.8 2019 年 11 月 15 日
0.1.4 2019 年 8 月 31 日

#401 in 异步


2 个 crate 中使用(通过 constellation-rs

Apache-2.0

67KB
2K SLoC

Constellation

一个旨在使 Rust 成为分布式计算前沿的项目。

Crates.io Apache-2.0 licensed Build Status

文档

Constellation 是一个用于 Rust(nightly)的框架,它帮助编写、调试和部署分布式程序。它从 Erlang/OTPMPICSP 中汲取灵感,并在可能的情况下利用 Rust 生态系统,包括 serde + bincode 用于网络序列化,以及 miofutures-rs 用于 TCP 异步通道。

大多数用户将通过更高级的库来利用 Constellation,例如

  • Amadeus:在 Rust 中实现和谐的分布式数据分析。受 Rayon 启发,它提供分布式进程池和内置的数据科学工具来利用它。
  • 还有更多即将推出!

要直接利用 Constellation,请继续阅读。

Constellation 框架

  • Constellation 是一个框架,在程序开始时通过调用 init() 初始化。
  • 您可以使用 spawn(closure) 启动新进程,这些进程将运行 closure
  • spawn(closure) 返回新进程的 Pid。
  • 您可以通过使用 Sender::new(remote_pid)Receiver::new(remote_pid) 创建通道,在进程之间进行通信。
  • 通道可以异步使用,例如使用 sender.send(value).awaitreceiver.recv().await
  • futures-rs 为使用通道提供了有用的函数和适配器,包括 select()join()
  • 您还可以使用 .block() 便利方法在通道上进行阻塞:sender.send().block()receiver.recv().block()
  • 有关 Rust 中异步编程的更多信息,请参阅 Async Book

以下是一个递归创建进程以分配寻找斐波那契数任务的任务的简单示例

点击显示 Cargo.toml。
[dependencies]

# The core APIs, including init(), spawn(), Sender, Receiver and select().
# Always required when using Constellation.
constellation-rs = "0.1"

# Support for FnOnce!(), FnMut!() and Fn!() macros to create Serde serializable
# closures. Required to pass a closure to spawn().
serde_closure = "0.1"

use constellation::*;
use serde_closure::FnOnce;

fn fibonacci(x: usize) -> usize {
    if x <= 1 {
        return x;
    }
    let left_pid = spawn(
        Resources::default(),
        FnOnce!(move |parent_pid| {
            println!("Left process with {}", x);
            Sender::<usize>::new(parent_pid)
                .send(fibonacci(x - 1))
                .block()
        }),
    )
    .block()
    .unwrap();

    let right_pid = spawn(
        Resources::default(),
        FnOnce!(move |parent_pid| {
            println!("Right process with {}", x);
            Sender::<usize>::new(parent_pid)
                .send(fibonacci(x - 2))
                .block()
        }),
    )
    .block()
    .unwrap();

    Receiver::<usize>::new(left_pid).recv().block().unwrap()
        + Receiver::<usize>::new(right_pid).recv().block().unwrap()
}

fn main() {
    init(Resources::default());

    println!("11th Fibonacci number is {}!", fibonacci(10));
}
点击显示输出。

** TODO! 这是错误的截图! ** 使用 constellation 的截图

查看此示例的更真实版本,包括异步和错误处理,请访问 此处

分布式运行

Constellation 有两个组件

  • 一组函数库,使您能够使用 spawn() 创建进程,并在它们之间发送和接收消息
  • 当您想要在多个服务器上运行时,分布式执行框架,以及添加到 cargo 中的 deploy 命令来部署程序。

两者都像上面一样输出到命令行 - 唯一的区别是后者已通过网络转发。

Constellation 还处于初期阶段 - 正在开发测试,以将支持扩展到 Windows(目前仅限于 Linux 和 macOS)并达到更高的成熟度。

目前的重点在于测试、文档、API 精炼(特别是错误消息和异步原语)以及移植到 Windows。

特性

星系负责

  • spawn() 将具有定义内存和CPU资源需求的进程分配到具有可用资源的服务器
  • 待办事项:尽力执行那些内存和资源需求,以避免有缺陷/贪婪的进程饿死其他进程
  • 使用TCP的进程之间的通道,具有自动设置和拆卸
  • 异步(反)序列化通过通道发送/接收的值(利用 serdebincode 以及可选的 libfringe 以避免分配)
  • 通道实现了 std::future::Futurefutures::stream::Streamfutures::sink::Sink,这使您可以使用来自 futures-rs 的有用函数和适配器,包括 select()join(),以及与 tokioruntime 的兼容性。
  • 由一个运行高效边缘触发epoll循环的后台线程提供动力
  • 确保在进程退出之前发送和确认数据,以避免连接重置和丢失数据(利用 atexitTIOCOUTQ
  • 地址:所有通道都是在集群范围内的 Pid 之间,而不是 (ip,port)
  • 性能:设计用于在底层操作系统之上带来最小的开销

它是做什么用的

星系使编写分布式程序更加容易。像MPI一样,它抽象出套接字,让您专注于业务逻辑,而不是寻址、连接、复用、异步、事件和拆卸。与MPI不同,它有一个现代、简洁的接口,使用 serde 处理序列化,提供了如 select() 之类的强大异步构建块,并集成了Rust异步生态系统。

它的工作原理

有两种执行模式:使用 cargo run 正常运行和使用 cargo deploy 将其部署到集群。我们将讨论第一种,然后介绍第二种的不同之处。

监控进程

每个进程都有一个监控进程来捕获进程的输出,并对其调用waitpid以捕获退出状态(无论是退出代码还是信号)。这是在进程初始化时通过分叉设置的,父进程是监控进程,子进程继续运行用户的程序。它通过替换文件描述符0、1、2(对应于stdin、stdout和stderr)为管道来捕获输出,这样当用户的进程写入例如fd 1时,它实际上是在写入一个监控进程随后从中读取并转发给桥接器的管道。

桥接器

桥接器是收集来自各个监控进程的输出并将其以格式化的方式输出到终端的工具。它在init()内部启动,进程通过分叉,使得父进程成为桥接器,而子进程继续运行用户的程序。

启动

spawn()接受一个函数、一个参数和资源约束,并使用它们启动一个新进程。这是通过调用当前二进制文件的干净副本来实现的,使用execve("/proc/self/exe",argv,envp),在其调用init()时,作用略有不同:它连接回现有的桥接器,并且不是返回控制流,而是在正常退出之前调用指定的用户函数。函数指针相对于文本段中的固定基址进行调整。

通道

通过创建Sender<T>Receiver<T>来进行通信。创建时需要一个Pid,并在幕后进行大量记录以确保

  • 双工TCP连接被正确且适时地创建和销毁,以支持用户创建的单工通道。
  • TCP连接在操作系统中的资源消耗与用户持有的通道数量成比例。
  • Pid是唯一的。
  • 每个进程有一个单一端口(在初始化时临时绑定以避免饥饿或失败)用于所有支持通道的TCP连接。
  • (De)序列化可以异步发生,即为了避免在套接字未准备好写入时分配未限定的内存来保存serde序列化的结果,利用libfringe的协程。
  • 可以通过删除和重新创建来更改通道消息的类型。

分布式运行

在集群上运行时有四个主要区别

星系节点

监听可配置的地址,接收二进制文件并执行它们。

星系主节点

输入零个或多个其他星系实例的地址和资源,以及要自动启动的进程——这几乎总是桥接器。

它监听可配置的地址以接收具有资源要求的二进制文件——但几乎总是只有桥接器向其提供这些二进制文件才有意义。

桥接器

它不是在用户进程内部通过fork来调用,而是在星座主初始化时自动启动。它监听一个可配置的地址,等待 cargo deployments,到那时它会运行二进制程序,并使用特殊的env变量来触发 init() 打印初始进程的资源需求并退出,然后将带有确定资源需求的二进制文件发送到 星座主。成功分配后,它由一个 星座 实例执行。在 init() 中,它回连到 桥接器,桥接器尽职尽责地将它的输出转发到 cargo deploy

cargodeploy

这是一个添加到 cargo 的命令,底层调用 cargo run,但与本地运行生成的二进制文件不同,它被发送到 桥接器。然后桥接器将任何输出发送回来,在终端格式化输出。

如何使用它

[dependencies]
constellation-rs = "0.1"
use constellation::*;

fn main() {
    init(Resources::default());
    println!("Hello, world!");
}
$ cargo run
3fecd01:
    Hello, world!
    exited: 0

或者,运行分布式

机器2

cargo install constellation-rs
constellation 10.0.0.2:9999 # local address to bind to

机器3

cargo install constellation-rs
constellation 10.0.0.3:9999

机器1

cargo install constellation-rs
constellation 10.0.0.1:9999 nodes.toml

nodes.toml

[[nodes]]
fabric_addr = "10.0.0.1:9999" # local address to bind to
bridge_bind = "10.0.0.1:8888" # local address of the bridge to bind to
mem = "100 GiB"               # resource capacity of the node
cpu = 16                      # number of logical cores

[[nodes]]
fabric_addr = "10.0.0.2:9999"
mem = "100 GiB"
cpu = 16

[[nodes]]
fabric_addr = "10.0.0.3:9999"
mem = "100 GiB"
cpu = 16

您的笔记本电脑

cargo install constellation-rs
cargo deploy --release 10.0.0.1:8888 # address of the bridge
833d3de:
    Hello, world!
    exited

要求

Rust: nightly.

Linux: 内核 >= 3.9; /proc 文件系统。

macOS: 已测试 >= 10.10,也可能在较旧版本上运行。

如果您遇到其他要求,请提交问题。

API

查看Rust文档

测试

查看TESTING.md

为什么?

星座是我正在工作的一个大规模数据处理项目的基础。我决定开始对其进行润色,并作为开源发布,以供其他人可能感兴趣或甚至有用的机会!

许可证

根据Apache License,版本2.0授权,(LICENSE.txthttp://www.apache.org/licenses/LICENSE-2.0)。

除非您明确声明,否则根据Apache-2.0许可证定义的,您有意提交给作品的任何贡献,均将根据上述内容授权,不附加任何其他条款或条件。

依赖项

~5MB
~108K SLoC