4个版本
使用旧Rust 2015
0.1.3 | 2018年8月15日 |
---|---|
0.1.2 | 2018年6月27日 |
0.1.1 | 2018年6月26日 |
0.1.0 | 2018年6月24日 |
#1579 in 异步
每月 31 次下载
在 2 crates 中使用
32KB
974 代码行
一个将Rust打造成分布式计算前沿的项目。
Constellation是一个用于Rust(nightly)的框架,它帮助编写、调试和部署分布式程序。它从Erlang/OTP、MPI和CSP中汲取了大量灵感;并利用Rust生态系统,包括用于网络序列化的serde + bincode,以及用于TCP异步通道的mio和futures-rs。
大多数用户将通过更高级的库来利用Constellation,例如:
要直接利用Constellation,请继续阅读。
Constellation框架
- Constellation是一个框架,您需要在程序开始时通过调用
init()
来初始化。 - 您可以使用
spawn(closure)
创建新进程,这些进程运行closure
。 spawn(closure)
返回新进程的Pid。- 您可以通过创建通道来在进程之间进行通信,使用
Sender::new(remote_pid)
和Receiver::new(remote_pid)
。 - 通道可以异步使用,通过
sender.send(value).await
和receiver.recv().await
。 - futures-rs 为处理通道提供了有用的函数和适配器,包括
select()
和join()
。 - 您还可以使用
.block()
便捷方法在通道上阻塞:sender.send().block()
和receiver.recv().block()
。 - 有关 Rust 中异步编程的更多信息,请查看 Async Book!
以下是一个递归生成进程以分配寻找斐波那契数任务的任务的简单示例
点击显示 Cargo.toml。
[dependencies]
# The core APIs, including init(), spawn(), Sender, Receiver and select().
# Always required when using Constellation.
constellation-rs = "0.1"
# Support for FnOnce!(), FnMut!() and Fn!() macros to create Serde serializable
# closures. Required to pass a closure to spawn().
serde_closure = "0.1"
use constellation::*;
use serde_closure::FnOnce;
fn fibonacci(x: usize) -> usize {
if x <= 1 {
return x;
}
let left_pid = spawn(
Resources::default(),
FnOnce!(move |parent_pid| {
println!("Left process with {}", x);
Sender::<usize>::new(parent_pid)
.send(fibonacci(x - 1))
.block()
}),
)
.block()
.unwrap();
let right_pid = spawn(
Resources::default(),
FnOnce!(move |parent_pid| {
println!("Right process with {}", x);
Sender::<usize>::new(parent_pid)
.send(fibonacci(x - 2))
.block()
}),
)
.block()
.unwrap();
Receiver::<usize>::new(left_pid).recv().block().unwrap()
+ Receiver::<usize>::new(right_pid).recv().block().unwrap()
}
fn main() {
init(Resources::default());
println!("11th Fibonacci number is {}!", fibonacci(10));
}
点击显示输出。
** TODO!这是错误的截图!**
查看此示例的更真实版本,包括异步和错误处理,请访问 此处!
运行分布式
Constellation 有两个组件
- 一个函数库,可以让你
spawn()
进程,以及在这些进程之间send()
和recv()
数据 - 当您想在多个服务器上运行时,一个分布式执行织布机,以及添加到 cargo 的
deploy
命令,以便将程序部署到它上面。
它们都输出到命令行,如上所示 - 唯一的区别是后者已通过网络转发。
Constellation 仍处于起步阶段 - 正在开发和测试以支持 Windows(目前仅支持 Linux 和 macOS)并达到更高的成熟度。
目前的主要工作集中在测试、文档、改进 API(特别是错误信息和异步原语)以及移植到 Windows 上。
功能
Constellation 负责以下事项
spawn()
将具有定义的内存和CPU资源需求的进程分配到具有可用资源的服务器上- TODO:尽可能强制执行这些内存和资源需求,以避免有缺陷/贪婪的进程饿死其他进程
- 使用TCP的进程间通道,具有自动设置和拆除
- 异步(反)序列化通过通道发送/接收的值(利用
serde
,bincode 和可选的libfringe
以避免分配) - 通道实现了
std::future::Future
,futures::stream::Stream
和futures::sink::Sink
,使得可以使用诸如select()
和join()
等有用的函数和适配器,以及与futures-rs
的兼容性,以及与tokio
和runtime
的兼容性。 - 由一个运行高效边缘触发epoll循环的后台线程提供动力
- 确保在进程退出之前发送和确认数据,以避免连接重置和数据丢失(利用
atexit
和TIOCOUTQ
) - 地址:所有通道都是集群范围内的
Pid
之间的,而不是(ip,port)
的 - 性能:设计用于在底层操作系统之上带来最小的开销
用途是什么
Constellation使编写分布式程序变得更容易。像MPI一样,它抽象化了套接字,让您专注于业务逻辑,而不是寻址、连接、复用、异步性、事件和拆除。与MPI不同,它具有现代、简洁的接口,使用 serde
处理(反)序列化,提供如 select()
一样强大的异步构建块,并集成到Rust异步生态系统中。
它如何工作
有两种执行模式:使用 cargo run
正常运行和使用 cargo deploy
将其部署到集群。我们将讨论第一种,然后介绍第二种的不同之处。
监控进程
每个进程都有一个用于捕获进程输出的监控进程,并对其调用waitpid
来获取退出状态(无论是退出码还是信号)。这是在进程初始化时通过分叉来设置的,父进程作为监控进程,而子进程继续运行用户的程序。通过替换文件描述符0、1、2(对应stdin、stdout和stderr)为管道,它捕获输出,使得当用户的进程写入例如fd 1时,它实际上是写入一个管道,监控进程随后从该管道读取并转发给桥接器。
桥接器
桥接器是用来从各个监控进程收集输出并在终端上格式化输出的。它是在init()
内部启动的,进程通过分叉,使得父进程成为桥接器,而子进程继续运行用户的程序。
创建进程
spawn()
接受一个函数、一个参数和资源限制,并使用它们创建一个新的进程。这是通过调用当前二进制文件的干净副本来实现的,使用execve("/proc/self/exe",argv,envp)
,在其调用init()
时,它的工作略有不同:它会连接回现有的桥接器,而不是返回控制流,而是在正常退出之前调用指定的用户函数。函数指针相对于文本节区的固定基址进行调整。
通道
通过创建Sender<T>
和Receiver<T>
来进行通信。创建操作需要Pid
,并在幕后进行大量记录工作以确保
- 双工TCP连接能够正确且恰当地创建和销毁,以支持用户创建的单工通道。
- TCP连接在操作系统中的资源消耗与用户持有的通道数量成正比。
Pid
是唯一的。- 每个进程有一个单独的端口(在初始化时临时绑定以避免饥饿或失败),所有通道支持的TCP连接都是从该端口到或从该端口。
- (反)序列化可以异步发生,即为了避免在套接字未准备好写入时不得不分配无界内存来存储serde序列化的结果,利用
libfringe
的协程。 - 可以通过丢弃并重新创建它来改变通道消息的类型。
运行分布式
在集群上运行时存在四个主要区别
星系节点
监听可配置的地址,接收二进制文件并执行它们。
星系主节点
接受零个或多个其他星系实例的地址和资源作为输入,以及要自动启动的进程——这几乎总是指桥接器。
它监听可配置的地址,以部署具有资源要求的二进制文件——但几乎总是只有当桥接器给它这些二进制文件才有意义。
桥接器
它不是由用户进程内的fork调用启动的,而是在星座主初始化时自动启动。它监听一个可配置的地址,用于接收 cargo deploy
ments,此时它会运行二进制程序,并使用特殊的env变量来触发 init()
打印初始进程的资源需求并退出,然后将具有确定资源需求的二进制文件发送到 星座主。成功分配后,它由一个 星座 实例执行。在 init()
内,它回连到 网桥,该网桥尽职尽责地将其输出转发到 cargo deploy
。
cargodeploy
这是一个添加到 cargo 的命令,它底层调用 cargo run
,但是生成的二进制文件不是在本地运行,而是发送到 网桥。然后网桥将任何输出发送回,输出将在终端上格式化。
如何使用
[dependencies]
constellation-rs = "0.1"
use constellation::*;
fn main() {
init(Resources::default());
println!("Hello, world!");
}
$ cargo run
3fecd01:
Hello, world!
exited: 0
或者,运行分布式
机器 2
cargo install constellation-rs
constellation 10.0.0.2:9999 # local address to bind to
机器 3
cargo install constellation-rs
constellation 10.0.0.3:9999
机器 1
cargo install constellation-rs
constellation 10.0.0.1:9999 nodes.toml
nodes.toml
[[nodes]]
fabric_addr = "10.0.0.1:9999" # local address to bind to
bridge_bind = "10.0.0.1:8888" # local address of the bridge to bind to
mem = "100 GiB" # resource capacity of the node
cpu = 16 # number of logical cores
[[nodes]]
fabric_addr = "10.0.0.2:9999"
mem = "100 GiB"
cpu = 16
[[nodes]]
fabric_addr = "10.0.0.3:9999"
mem = "100 GiB"
cpu = 16
您的笔记本电脑
cargo install constellation-rs
cargo deploy --release 10.0.0.1:8888 # address of the bridge
833d3de:
Hello, world!
exited
需求
Rust: nightly。
Linux: 内核 >= 3.9; /proc
文件系统。
macOS: 已测试 >= 10.10,也可能在旧版本上工作。
如果您遇到其他需求,请提交问题。
API
测试
为什么?
星座是我正在从事的一个大型数据处理项目的基础。我决定开始对其进行打磨,并作为开源软件发布,以防它对其他人可能有趣甚至有用!
许可
在 Apache License,版本 2.0 下许可,(LICENSE.txt 或 https://apache.ac.cn/licenses/LICENSE-2.0).
除非您明确声明,否则您有意提交以包含在作品中的任何贡献,根据 Apache-2.0 许可证定义,应按上述方式许可,不附加任何其他条款或条件。
依赖项
~4.5MB
~89K SLoC