13 个版本
0.2.0-alpha.2 | 2020 年 7 月 15 日 |
---|---|
0.2.0-alpha.1 | 2020 年 2 月 8 日 |
0.1.10 | 2020 年 1 月 20 日 |
0.1.8 | 2019 年 11 月 15 日 |
0.1.4 | 2019 年 8 月 31 日 |
在 异步 中排名第 636
每月下载量 46
在 amadeus 中使用
255KB
7K SLoC
一个使 Rust 成为分布式计算前沿的项目。
星座是一个用于 Rust (nightly) 的框架,有助于编写、调试和部署分布式程序。它大量借鉴了 Erlang/OTP、MPI 和 CSP,并在 Rust 生态系统中有用,包括 serde + bincode 用于网络序列化,以及 mio 和 futures-rs 用于 TCP 的异步通道。
大多数用户将通过高级库利用星座,例如
要直接利用星座,请继续阅读。
星座框架
- 星座是一个框架,您可以在程序开始时通过调用
init()
来初始化。 - 您可以使用
spawn(closure)
创建新进程,这些进程运行closure
。 spawn(closure)
返回新进程的 Pid。- 您可以通过使用
Sender::new(remote_pid)
和Receiver::new(remote_pid)
创建通道来在进程之间进行通信。 - 通道可以使用异步方式与
sender.send(value).await
和receiver.recv().await
一起使用。 - futures-rs 为使用通道提供了有用的函数和适配器,包括
select()
和join()
。 - 您还可以使用
.block()
方便方法在通道上进行阻塞:sender.send().block()
和receiver.recv().block()
。 - 有关Rust中异步编程的更多信息,请参阅 Async Book!
以下是一个递归地生成进程以分配查找斐波那契数任务的简单示例
点击显示 Cargo.toml。
[dependencies]
# The core APIs, including init(), spawn(), Sender, Receiver and select().
# Always required when using Constellation.
constellation-rs = "0.1"
# Support for FnOnce!(), FnMut!() and Fn!() macros to create Serde serializable
# closures. Required to pass a closure to spawn().
serde_closure = "0.1"
use constellation::*;
use serde_closure::FnOnce;
fn fibonacci(x: usize) -> usize {
if x <= 1 {
return x;
}
let left_pid = spawn(
Resources::default(),
FnOnce!(move |parent_pid| {
println!("Left process with {}", x);
Sender::<usize>::new(parent_pid)
.send(fibonacci(x - 1))
.block()
}),
)
.block()
.unwrap();
let right_pid = spawn(
Resources::default(),
FnOnce!(move |parent_pid| {
println!("Right process with {}", x);
Sender::<usize>::new(parent_pid)
.send(fibonacci(x - 2))
.block()
}),
)
.block()
.unwrap();
Receiver::<usize>::new(left_pid).recv().block().unwrap()
+ Receiver::<usize>::new(right_pid).recv().block().unwrap()
}
fn main() {
init(Resources::default());
println!("11th Fibonacci number is {}!", fibonacci(10));
}
点击显示输出。
** TODO! 这是错误的截图! **
请查看此示例的更真实版本,包括异步和错误处理,在此!
分布式运行
Constellation 有两个组件
- 一组函数库,使您能够
spawn()
进程,以及在这些进程之间send()
和recv()
。 - 对于您想要在多个服务器上运行的情况,还有一个分布式执行框架,以及添加到 cargo 中的
deploy
命令,用于将程序部署到该框架。
两者都输出到命令行,如上所示 - 唯一的区别是后者已经通过网络转发。
Constellation 仍然处于起步阶段 - 开发和测试正在进行,以支持 Windows(目前仅支持 Linux 和 macOS)并达到更高的成熟度。
目前的重点是在测试、文档、API 精炼(特别是错误信息和异步原语)以及移植到 Windows 上。
功能
Constellation 负责以下工作
- 使用
spawn()
在具有可用资源的服务器上分配具有定义内存和CPU资源需求的过程 - TODO:尽力执行这些内存和资源需求,以避免有缺陷的/贪婪的过程饿死其他进程
- TCP 之间的进程通道,具有自动设置和删除
- 异步(反)序列化通过通道发送/接收的值(利用
serde
、bincode 和可选的libfringe
以避免分配) - 通道实现
std::future::Future
、futures::stream::Stream
和futures::sink::Sink
,使得可以使用来自futures-rs
的有用函数和适配器,包括select()
和join()
,以及与tokio
和runtime
的兼容性。 - 由一个运行高效边缘触发 epoll 循环的后台线程提供支持
- 确保在进程退出之前发送和确认数据,以避免连接重置和数据丢失(利用
atexit
和TIOCOUTQ
) - 地址:所有通道都在集群范围的
Pid
之间,而不是(ip,port)
- 高性能:设计用于在底层操作系统之上提供最小的开销
这是为了什么
Constellation 使编写分布式程序更加容易。像 MPI 一样,它抽象出套接字,让您专注于业务逻辑,而不是寻址、连接、多路复用、异步、事件和删除。与 MPI 不同,它有一个现代、简洁的接口,使用 serde
处理(反)序列化,提供像 select()
这样的强大异步构建块,并与 Rust 异步生态系统集成。
它是如何工作的
有两种执行模式:使用 cargo run
正常运行和使用 cargo deploy
将其部署到集群。我们将讨论第一种,然后介绍第二种的不同之处。
监控进程
每个进程都有一个监控进程,用于捕获进程的输出,并调用waitpid
来捕获退出状态(无论是退出代码还是信号)。这是在进程初始化时通过复制来设置的,父进程是监控进程,子进程则继续运行用户的程序。通过替换文件描述符0、1、2(分别对应stdin、stdout和stderr)为管道,监控进程可以捕获输出,当用户的进程写入例如fd 1时,它实际上是在写入一个监控进程随后读取并转发给桥梁的管道。
桥梁
桥梁是用来收集来自各个监控进程的输出,并在终端上格式化输出的。它在init()
内部启动,通过复制进程,使得父进程成为桥梁,而子进程则继续运行用户的程序。
生成
spawn()
接受一个函数、一个参数和资源限制,并使用这些参数生成一个新的进程。这是通过调用当前二进制的一个干净副本来实现的execve("/proc/self/exe",argv,envp)
,在其调用init()
时,行为略有不同:它连接回现有的桥梁,而不是返回控制流,而是在正常退出之前调用指定的用户函数。函数指针相对于文本段的固定基址进行调整。
通道
通过创建Sender<T>
和Receiver<T>
来实现通信。创建过程需要Pid
,并在幕后进行大量的记录工作,以确保
- 双工TCP连接被正确且恰当地创建和销毁,以支持用户创建的单工通道。
- TCP连接在操作系统中的资源消耗与用户持有的通道数量成正比。
Pid
是唯一的。- 每个进程有一个单一的端口(在初始化时临时绑定,以避免饥饿或失败),所有通道支持的TCP连接都是从该端口到或来自。
- 序列化可以异步发生,即为了避免在套接字未准备好写入时分配无界内存来保存serde序列化的结果,利用
libfringe
的协程。 - 可以通过删除和重新创建通道来更改通道消息的类型。
分布式运行
在集群上运行时有四个主要区别
星系节点
监听可配置的地址,接收二进制文件并执行它们。
星系主节点
接收零个或多个其他星系实例的地址和资源作为输入,以及要自动启动的进程——这几乎总是桥梁。
它监听可配置的地址,以接收具有资源要求的二进制文件——但几乎总是只有桥梁向它提供这些二进制文件才有意义。
桥梁
它不是由用户进程内部的fork调用启动的,而是在星系主初始化时自动启动。它监听一个可配置的地址,等待 cargo deploy
部署命令,此时它使用特殊的环境变量运行二进制程序,触发 init()
打印初始进程的资源需求并退出,然后将带有确定资源需求的二进制程序发送给 星系主节点。成功分配后,它由一个 星系 实例执行。在 init()
中,它重新连接到 桥接器,桥接器尽职尽责地将输出转发到 cargo deploy
。
cargodeploy
这是一个添加到 cargo 的命令,底层调用 cargo run
,但与本地运行生成的二进制文件不同,它被发送到 桥接器。然后桥接器将任何输出发送回,该输出在终端以格式化的形式输出。
如何使用
[dependencies]
constellation-rs = "0.1"
use constellation::*;
fn main() {
init(Resources::default());
println!("Hello, world!");
}
$ cargo run
3fecd01:
Hello, world!
exited: 0
或者,以分布式方式运行
机器2
cargo install constellation-rs
constellation 10.0.0.2:9999 # local address to bind to
机器3
cargo install constellation-rs
constellation 10.0.0.3:9999
机器1
cargo install constellation-rs
constellation 10.0.0.1:9999 nodes.toml
nodes.toml
[[nodes]]
fabric_addr = "10.0.0.1:9999" # local address to bind to
bridge_bind = "10.0.0.1:8888" # local address of the bridge to bind to
mem = "100 GiB" # resource capacity of the node
cpu = 16 # number of logical cores
[[nodes]]
fabric_addr = "10.0.0.2:9999"
mem = "100 GiB"
cpu = 16
[[nodes]]
fabric_addr = "10.0.0.3:9999"
mem = "100 GiB"
cpu = 16
您的笔记本电脑
cargo install constellation-rs
cargo deploy --release 10.0.0.1:8888 # address of the bridge
833d3de:
Hello, world!
exited
需求
Rust: nightly。
Linux: 内核 >= 3.9; /proc
文件系统。
macOS: 已测试 >= 10.10,也可能在较旧版本上运行。
如果您遇到其他需求,请提交问题。
API
测试
为什么?
星系是我正在开发的一个大型数据处理项目的基础。我决定开始完善它,并公开发布为开源软件,万一它对其他人有趣或有用呢!
许可
根据 Apache 许可证版本 2.0 许可,(LICENSE.txt 或 http://www.apache.org/licenses/LICENSE-2.0)。
除非您明确说明,否则您提交的任何旨在包含在本作品中的贡献(根据 Apache-2.0 许可证定义),均应按上述方式许可,不附加任何其他条款或条件。
依赖
~11–29MB
~519K SLoC