1个不稳定版本
0.1.0 | 2024年1月7日 |
---|
#16 in #编排器
用于 2 个crate
20KB
lucidity
基于 lunatic 构建的分布式执行引擎。
动机
基本上,lunatic
本身是一套“低级”特性:运行时、系统调用和语言包装器。
然而,在尝试保持代码可读性的同时,Process
架构使用起来稍微有些困难。这个库提供了一个 proc-macro
,以及一些针对常见平台(如fly.io)的辅助工具,以简化在优秀的lunatic运行时之上编写分布式代码的过程。
示例
以下是一个简单示例。
fn main() {
let results = pythagorean_remote_fanout(vec![
(3, 4),
(4, 5),
(5, 6),
(6, 7),
(7, 8),
(8, 9),
(9, 10),
]);
println!("result: {:#?}", results);
}
#[lucidity::job]
fn pythagorean(a: u32, b: u32) -> f32 {
let num = ((square_remote_async(a).await_get() + square_remote_async(b).await_get()) as f32).sqrt();
num
}
#[lucidity::job]
fn square(a: u32) -> u32 {
let num = a * a;
num
}
对于每个你放置proc宏(lucidity::job
)的方法,我们生成几个其他的方法。
{name}_local
,当调用时,在本地节点中启动函数,并阻塞调用Process
。{name}_remote
,当调用时,在随机分布的节点上启动函数,并阻塞调用Process
。{name}_local_async
,当调用时,在本地节点中启动函数,返回一个包装的Process
引用,可以轮询或阻塞。{name}_remote_async
,当调用时,在随机分布的节点上启动函数,返回一个包装的Process
引用,可以轮询或阻塞。{name}_remote_fanout
函数接收一个参数元组数组Vec
,然后以轮询方式将调用分配给该函数,轮询所有Process
进程,并阻塞直到所有进程完成,返回一个包含结果的Vec
。
上述示例使用lucidity::job
进程宏生成了一些这样的函数,并且它们可以像任何其他函数一样“调用”。这里的目的是使用lunatic
的优秀架构,同时减少成功编写分布式代码所需的样板代码。设置Process
进程、Mailbox
邮箱等都是由库自动处理的。
这种权衡是,这个库对如何编写代码以及你可以用它做什么有固定的看法(尽管欢迎建议)。此外,这个库引入了一些带超时的简单循环来避免可能的死锁,这会有一些开销。
库使用方法
首先,安装lunatic。
$ cargo install lunatic-runtime
将以下内容添加到您的Cargo.toml
文件中
[dependencies]
lucidity = "*" # choose a version
在您的.cargo/config.toml
文件中
[build]
target = "wasm32-wasi"
[target.wasm32-wasi]
runner = "lunatic run"
分布式设置
要在分布式设置中使用此库,您需要做几件事情。这个例子也可以通过使用环回地址轻松本地使用。
首先,您需要在某处运行控制节点。
$ lunatic control --bind-socket [::]:3030
然后,在您希望远程方法运行的任何其他机器上,您需要设置节点。
$ lunatic node --bind-socket [::]:3031 http://{IP_OR_HOST_OF_CONTROL}:3030/
本地测试
为了测试,您将构建您的代码并在lunatic
节点中运行它。例如:
cargo build --release && lunatic node --wasm path/to/built/wasm/exe.wasm --bind-socket [::]:3032 http://{IP_OR_HOST_OF_CONTROL}:3030/
生产设置
在一个更生产化的设置中,您可能会使用类似fly.io
的服务来部署您的代码(使用fly
功能),并且您可能希望在容器中构建和运行您的代码。最简单的方法是运行控制节点和应用程序节点的简单Docker容器。您的入口点可能看起来像这样。
注意:由于fly.io上的UDP问题,“自动”的fly.io设置功能不工作,但在我解决这些问题后将会启用。
#!bin/bash
/lunatic control --bind-socket [::]:3030 &
/lunatic node --wasm /irl_processor.wasm --bind-socket $NODE_REACHABLE_IP:3031 http://[::1]:3030/
然后,在构建的wasm中,您将启动一些节点,这些节点在运行任何分布式方法之前将连接到其他机器上的控制节点。
lunatic
简介
此库建立在lunatic
之上,因此在使用此库之前,了解lunatic
的基本知识非常重要。
进程
lunatic
围绕Processes
的概念构建。一个Process
是由运行时创建的轻量级执行线程。每个Process
都有自己的栈,并且与其他Process
隔离。通过Mailbox
进行通信,这些是可以在Process
之间发送消息的队列。
在这种情况下,您可以完全直接使用Process
,但lucidity
库的目的是使编写分布式代码更加容易,所以我们将重点关注这一点。
邮箱
Mailbox
是Process
之间通信的主要方式。一个Mailbox
是一个队列,可以用来在Process
之间发送消息。每个Process
都有一个Mailbox
,可以用来向该Process
发送消息。
本库的使用中,您无需担心Mailbox
,因为它们会自动为您处理。然而,了解它们的存在很重要,因为本库提供的“语法糖”抽象了这些消息队列。这不同于“异步Rust”或其他“异步/await”类型的语言。这些Process
及其Mailbox
,更类似于其他语言的协程或goroutine行为。
因此,这个库在“感觉”上增加了一些开销,类似于异步Rust或阻塞Rust,但它通过使用带有等待循环的超时来实现这种感觉。由于本项目的主要目的是将严谨的工作“分发”到其他节点,这种开销是可以接受的,但重要的是要理解,这不同于“异步Rust”。
WASM
lunatic
建立在WebAssembly(WASM)的概念之上。WASM是一种二进制格式,旨在在沙盒环境中运行。lunatic
之所以能够如此出色地扩展到分布式模型,是因为它依赖于“运行时”与WASM运行时捆绑在一起的概念,而WASM代码可以执行某些“运行时系统调用”以进行通信。WASM抽象了机器代码,使得每个节点只需另一个节点的WASM即可正常工作。
理论上,每个节点都可以初始化自己的WASM,并且lunatic
运行时能够在任何这些节点上启动Process
,因为每个节点都会将它的WASM发送到其他节点。
远程进程
远程启动的Process
利用了您的可执行文件实际上是WASM的事实。基本上,lunatic
将您的WASM可执行文件的副本发送到远程节点,然后在那里启动一个Process
,本质上使用函数指针调用WASM可执行文件中的函数。这就是为什么您需要将代码构建为WASM,以及为什么需要使用相同的可执行文件来运行控制节点和应用节点。
但是,您无需担心将代码上传到其他节点。《code>lunatic运行时会自动处理这一点。这也意味着您的“裸”函数“直接工作”。该函数在WASM中,所以如果进程调用该函数,它将在进程运行的节点上调用,因为该节点确实有WASM。
很酷,不是吗?
示例
让我们看看几个示例,以了解您何时会使用特定类型的方法。
对于所有这些示例,我们可以假设我们已经声明了square
函数如下。
#[lucidity::job]
fn square(a: u32) -> u32 {
a * a
}
无进程
即使您使用lucidity::job
proc宏标记一个函数,您仍然可以像普通函数一样调用它。
fn main() {
// Calling `square` here does not span a process, and is called by the currently executing process
// as if it were a normal function.
let result = square(3);
println!("result: {:#?}", result);
}
本地/远程进程
如果您想本地启动一个进程,您可以使用{name}_local
方法。
fn main() {
// The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
// This function will be called on a set of nodes (round-robin-ed), and each of those nodes
// will run it in a process.
let results = pythagorean1_remote_fanout(vec![
(3, 4),
(4, 5),
(5, 6),
(6, 7),
(7, 8),
(8, 9),
(9, 10),
]);
println!("result: {:#?}", results);
}
#[lucidity::job]
fn pythagorean1(a: u32, b: u32) -> f32 {
// Calling `square_local` here spawns a process locally, and blocks the current process until the
// spawned process completes. This is great for allowing the `lunatic` executor to "yield" more often,
// especially if the `square` method had reasonable yield points. However, the process it is called from is blocked,
// so keep that in mind.
//
// In this case, we don't really mind blocking here, since we are in a lightweight process. However, you may notice
// there is an inefficiency in not computing the square of `a` and `b` in parallel.
((square_local(a) + square_local(b)) as f32).sqrt()
}
#[lucidity::job]
fn pythagorean2(a: u32, b: u32) -> f32 {
// Calling `square_remote` here spawns a process on a random remote node, and blocks the current process until the
// spawned process completes. This is great for ensuring a certain set of processes are being distributed,
// but blocks the process it is called from.
((square_remote(a) + square_remote(b)) as f32).sqrt()
}
本地/远程异步进程
如果您想更精细地控制何时阻塞,您可以使用{name}_local_async
和{name}_remote_async
方法。
fn main() {
// The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
// This function will be called on a set of nodes (round-robin-ed), and each of those nodes
// will run it in a process.
let results = pythagorean1_remote_fanout(vec![
(3, 4),
(4, 5),
(5, 6),
(6, 7),
(7, 8),
(8, 9),
(9, 10),
]);
println!("result: {:#?}", results);
}
#[lucidity::job]
fn pythagorean1(a: u32, b: u32) -> f32 {
// This spawns a local process, and hands back a `Job` that can be polled, or blocked upon.
// Here, we are going to block with `await_get`. This can be used for either local or remote
// async jobs.
let square_a_job = square_local_async(a);
let square_b_job = square_local_async(b);
// We get to this point "immediately". The `square_a_job` and `square_b_job` are running in
// their own processes, and we can do other work here.
// Maybe do some other work in here ...
let mut square_a = square_a_job.await_get();
let mut square_b = square_b_job.await_get();
((square_a + square_b) as f32).sqrt()
}
#[lucidity::job]
fn pythagorean2(a: u32, b: u32) -> f32 {
// This spawns a local process, and hands back a `Job` that can be polled, or blocked upon.
// Here, we are going to loop and check for completion with `try_get` (sort of naively). This can be used for either local or remote
// async jobs.
let square_a_job = square_remote_async(a);
let square_b_job = square_remote_async(b);
// We get to this point "immediately". The `square_a_job` and `square_b_job` are running in
// their own processes, and we can do other work here.
// Maybe do your own looping ...
let (square_a, square_b) = loop {
let Some(a) = square_a_job.try_get() else {
continue;
}
let Some(b) = square_b_job.try_get() else {
continue;
}
break (a, b);
}
((square_a + square_b) as f32).sqrt()
}
远程扇出
如果您基本上想执行相同的操作,但使用不同的参数,并且您想阻塞所有这些,您可以使用{name}_remote_fanout
方法。
fn main() {
// The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
// This function will be called on a set of nodes (round-robin-ed), and each of those nodes
// will run it in a process.
//
// This is great for ensuring a certain set of processes are being distributed, but blocks the process it is called from.
// You may have something where you are processing a set of images. This would be a great use case to put those images
// in a `Vec`, and then call this method to fan out all of the work.
let results = square_remote_fanout(vec![1, 2, 3, 4, 5]);
println!("result: {:#?}", results);
}
作业属性选项
lucidity::job
进程宏有几个选项,可以用来自定义生成方法的行怍。
通常情况下,不需要使用这些选项,但如果你需要它们,它们是可用的。
init_retry_interval_ms
:这是在尝试初始化Process
时重试之间的等待时间(毫秒)。默认值为100
。sync_retry_interval_ms
:这是在尝试从Process
获取阻塞(例如,{name}_local
或{name}_remote
)时重试之间的等待时间(毫秒)。默认值为100
。async_init_retry_interval_ms
:这是在尝试异步初始化Process
时重试之间的等待时间(毫秒)(例如,{name}_local_async
或{name}_remote_async
)。默认值为100
。async_get_retry_interval_ms
:这是在尝试从Process
获取非阻塞结果时重试之间的等待时间(毫秒)(例如,{name}_local_async
或{name}_remote_async
)。默认值为100
。async_set_retry_interval_ms
:这是在执行Process
尝试设置非阻塞结果时重试之间的等待时间(毫秒)(例如,{name}_local_async
或{name}_remote_async
)。默认值为100
。shutdown_retry_interval_ms
:这是在尝试关闭Process
时重试之间的等待时间(毫秒)。默认值为100
。memory
:这是允许Process
使用的最大内存量。默认值为100 * 1024 * 1024
(100MB)。fuel
:这是允许Process
使用的最大燃料量。默认值为10
(每个燃料单位大约是100,000 WASM指令)。fanout
:这是在扇出时使用的方案类型。默认值为roundrobin
。另一个选项是random
。
功能标志
fly
:这启用了fly
功能,允许您使用fly.io
平台自动设置从主lunatic
节点。默认情况下,此功能未启用,因为它需要一个fly.io
账户和一些配置。有关更多信息,请参阅fly.io
文档。注意:此功能(目前)也由于
fly.io
上 UDP 的限制而变得无意义。我正在论坛上与fly.io
团队合作解决这个问题。
测试
cargo test
谢谢
特别感谢 lunatic 的作者和贡献者们的出色工作,并特别感谢主要作者 bkolobara。
许可
MIT
依赖关系
~2.1–2.9MB
~55K SLoC