1 个不稳定版本
0.1.0 | 2024年1月7日 |
---|
#24 in #orchestrator
用于 lucidity
62KB
434 行
lucidity
基于 lunatic 的分布式执行引擎。
动机
基本上,lunatic
本身是一组 "低级" 功能:运行时、系统调用和语言包装器。
然而,当尝试保持代码可读性时,Process
架构使用起来会有些困难。这个库提供了一个 proc-macro
,以及一些针对常见平台(如 fly.io)的辅助工具,以简化在优秀的 lunatic 运行时之上编写分布式代码。
示例
下面是一个简单的示例。
fn main() {
let results = pythagorean_remote_fanout(vec![
(3, 4),
(4, 5),
(5, 6),
(6, 7),
(7, 8),
(8, 9),
(9, 10),
]);
println!("result: {:#?}", results);
}
#[lucidity::job]
fn pythagorean(a: u32, b: u32) -> f32 {
let num = ((square_remote_async(a).await_get() + square_remote_async(b).await_get()) as f32).sqrt();
num
}
#[lucidity::job]
fn square(a: u32) -> u32 {
let num = a * a;
num
}
对于您放置 proc macro 的每个方法,我们将生成一些其他的方法。
{name}_local
,当被调用时,在 本地节点 中启动函数,并阻塞调用Process
。{name}_remote
,当被调用时,在随机 分布式节点 上的Process
中启动函数,并阻塞调用Process
。{name}_local_async
,当被调用时,在 本地节点 中的Process
中启动函数,返回一个包装的Process
引用,可以进行轮询或阻塞。{name}_remote_async
,当被调用时,在随机 分布式节点 上的Process
中启动函数,返回一个包装的Process
引用,可以进行轮询或阻塞。{name}_remote_fanout
,它接受一个Vec
类型的参数元组,并通过轮询将调用分配给该函数,轮询所有Process
,直到所有调用完成,然后返回一个结果Vec
。
上面的示例使用lucidity::job
进程宏生成了一些这样的函数,它们可以像任何其他函数一样被调用。这里的目的是利用lunatic
的出色架构,同时减少编写分布式代码所需的一些样板代码。设置Process
、Mailbox
等都是由库自动处理的。
然而,这个库对编写代码的方式以及你可以用它做什么有特定的看法(尽管欢迎建议)。此外,这个库引入了一些简单的带超时的循环来避免可能的死锁,这会有一些开销。
库的使用
首先,安装lunatic。
$ cargo install lunatic-runtime
将以下内容添加到您的Cargo.toml
中:
[dependencies]
lucidity = "*" # choose a version
在您的.cargo/config.toml
中配置:
[build]
target = "wasm32-wasi"
[target.wasm32-wasi]
runner = "lunatic run"
分布式设置
要在分布式环境中使用此库,您需要做一些事情。此示例也可以通过使用回环地址在本地使用。
首先,您需要在某处运行控制节点。
$ lunatic control --bind-socket [::]:3030
然后,在您希望远程方法运行的任何其他机器上,您需要设置节点。
$ lunatic node --bind-socket [::]:3031 http://{IP_OR_HOST_OF_CONTROL}:3030/
本地测试
对于测试,您会构建您的代码并在lunatic
节点内运行它。例如:
cargo build --release && lunatic node --wasm path/to/built/wasm/exe.wasm --bind-socket [::]:3032 http://{IP_OR_HOST_OF_CONTROL}:3030/
生产设置
在更生产化的设置中,您可能会使用类似fly.io
的东西来部署您的代码(使用fly
功能),您可能希望在容器中构建和运行代码。最简单的方法是运行控制节点和应用程序节点的简单Docker容器。您的入口点可能如下所示:
注意:由于fly.io上的UDP问题,自动fly.io设置功能不起作用,但将在问题解决后启用。
#!bin/bash
/lunatic control --bind-socket [::]:3030 &
/lunatic node --wasm /irl_processor.wasm --bind-socket $NODE_REACHABLE_IP:3031 http://[::1]:3030/
然后,在构建的wasm中,您会在运行分布式方法之前连接到其他机器上的控制节点的某些节点。
lunatic
入门指南
此库建立在lunatic
之上,因此在使用此库之前,了解lunatic
的基本知识非常重要。
进程
lunatic
围绕Processes
的概念构建。一个Process
是运行时产生的轻量级执行线程。每个Process
都有自己的栈,并且与其他Process
隔离。通过Mailbox
进行通信,而Mailbox
是用于在Process
之间发送消息的队列。
在这个库的情况下,您可以直接使用Process
,但lucidity
库的目的是使编写分布式代码更容易,因此我们将重点放在这一点上。
邮箱
Mailbox
是Process
之间通信的主要方式。一个Mailbox
是一个队列,可以用于在Process
之间发送消息。每个Process
都有一个可以用于向该Process
发送消息的Mailbox
。
本库中,您无需担心Mailbox
,因为它们会由库来处理。然而,了解它们的存在很重要,因为本库提供的“语法糖”抽象了这些消息队列。这不同于“异步Rust”或其他任何“异步/await”类型的语言。这些Process
及其Mailbox
,在其他语言中更像协程或goroutine的行为。
因此,本库在“感觉”上类似于异步Rust或阻塞Rust,但它通过使用带超时的等待循环来实现这种感觉。由于本项目旨在将严格的工作“分发”到其他节点,这种开销是可以接受的,但重要的是要了解这并不像“异步Rust”。
WASM
lunatic
是基于WebAssembly(WASM)的概念构建的。WASM是一种二进制格式,旨在在沙盒环境中运行。lunatic
之所以能够如此高效地扩展到分布式模型,是因为它依赖于“运行时”包含一个WASM运行时的概念,同时WASM代码可以执行某些“运行时系统调用”进行通信。WASM抽象了机器码,使得每个节点只需另一个节点的WASM即可正常运行。
理论上,多个节点可以各自初始化自己的WASM,而lunatic
运行时能够在任何这些节点上启动Process
,因为每个节点都会将其WASM发送到其他节点。
远程进程
被远程启动的Process
利用了您的可执行文件是WASM的事实。基本上,lunatic
将您的WASM可执行文件副本发送到远程节点,然后在其中启动一个Process
,本质上使用函数指针来调用WASM可执行文件中的函数。这就是为什么您需要将代码构建为WASM,以及为什么需要使用相同的可执行文件运行控制节点和应用节点。
但是,您无需担心将代码部署到其他节点。lunatic
运行时将自动处理这一点。这也意味着您的“裸”函数“直接可用”。该函数在WASM中,所以如果进程调用该函数,它将在进程运行的节点上调用,因为该节点拥有WASM。
酷吧,不是吗?
示例
让我们看看几个示例,以了解何时使用特定类型的方法。
对于所有这些示例,我们可以假设我们已声明了square
函数如下。
#[lucidity::job]
fn square(a: u32) -> u32 {
a * a
}
没有进程
即使您使用lucidity::job
proc宏标记了一个函数,您仍然可以像正常函数一样调用它。
fn main() {
// Calling `square` here does not span a process, and is called by the currently executing process
// as if it were a normal function.
let result = square(3);
println!("result: {:#?}", result);
}
本地/远程进程
如果您想本地启动一个进程,可以使用{name}_local
方法。
fn main() {
// The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
// This function will be called on a set of nodes (round-robin-ed), and each of those nodes
// will run it in a process.
let results = pythagorean1_remote_fanout(vec![
(3, 4),
(4, 5),
(5, 6),
(6, 7),
(7, 8),
(8, 9),
(9, 10),
]);
println!("result: {:#?}", results);
}
#[lucidity::job]
fn pythagorean1(a: u32, b: u32) -> f32 {
// Calling `square_local` here spawns a process locally, and blocks the current process until the
// spawned process completes. This is great for allowing the `lunatic` executor to "yield" more often,
// especially if the `square` method had reasonable yield points. However, the process it is called from is blocked,
// so keep that in mind.
//
// In this case, we don't really mind blocking here, since we are in a lightweight process. However, you may notice
// there is an inefficiency in not computing the square of `a` and `b` in parallel.
((square_local(a) + square_local(b)) as f32).sqrt()
}
#[lucidity::job]
fn pythagorean2(a: u32, b: u32) -> f32 {
// Calling `square_remote` here spawns a process on a random remote node, and blocks the current process until the
// spawned process completes. This is great for ensuring a certain set of processes are being distributed,
// but blocks the process it is called from.
((square_remote(a) + square_remote(b)) as f32).sqrt()
}
本地/远程异步进程
如果您需要更精细地控制何时阻塞,可以使用{name}_local_async
和{name}_remote_async
方法。
fn main() {
// The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
// This function will be called on a set of nodes (round-robin-ed), and each of those nodes
// will run it in a process.
let results = pythagorean1_remote_fanout(vec![
(3, 4),
(4, 5),
(5, 6),
(6, 7),
(7, 8),
(8, 9),
(9, 10),
]);
println!("result: {:#?}", results);
}
#[lucidity::job]
fn pythagorean1(a: u32, b: u32) -> f32 {
// This spawns a local process, and hands back a `Job` that can be polled, or blocked upon.
// Here, we are going to block with `await_get`. This can be used for either local or remote
// async jobs.
let square_a_job = square_local_async(a);
let square_b_job = square_local_async(b);
// We get to this point "immediately". The `square_a_job` and `square_b_job` are running in
// their own processes, and we can do other work here.
// Maybe do some other work in here ...
let mut square_a = square_a_job.await_get();
let mut square_b = square_b_job.await_get();
((square_a + square_b) as f32).sqrt()
}
#[lucidity::job]
fn pythagorean2(a: u32, b: u32) -> f32 {
// This spawns a local process, and hands back a `Job` that can be polled, or blocked upon.
// Here, we are going to loop and check for completion with `try_get` (sort of naively). This can be used for either local or remote
// async jobs.
let square_a_job = square_remote_async(a);
let square_b_job = square_remote_async(b);
// We get to this point "immediately". The `square_a_job` and `square_b_job` are running in
// their own processes, and we can do other work here.
// Maybe do your own looping ...
let (square_a, square_b) = loop {
let Some(a) = square_a_job.try_get() else {
continue;
}
let Some(b) = square_b_job.try_get() else {
continue;
}
break (a, b);
}
((square_a + square_b) as f32).sqrt()
}
远程扩散
如果您实际上想执行相同的操作,但使用不同的参数,并且希望阻塞在所有这些操作上,可以使用{name}_remote_fanout
方法。
fn main() {
// The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
// This function will be called on a set of nodes (round-robin-ed), and each of those nodes
// will run it in a process.
//
// This is great for ensuring a certain set of processes are being distributed, but blocks the process it is called from.
// You may have something where you are processing a set of images. This would be a great use case to put those images
// in a `Vec`, and then call this method to fan out all of the work.
let results = square_remote_fanout(vec![1, 2, 3, 4, 5]);
println!("result: {:#?}", results);
}
作业属性选项
《lucidity::job》过程宏有几个选项,可以用来自定义生成方法的行文。
通常情况下,不需要使用这些选项,但如果需要,它们是可用的。
init_retry_interval_ms
:这是在尝试初始化一个Process
时,两次重试之间的等待时间(以毫秒为单位)。默认为100
。sync_retry_interval_ms
:这是在尝试从Process
获取阻塞操作(例如,{name}_local
或{name}_remote
)时,两次重试之间的等待时间(以毫秒为单位)。默认为100
。async_init_retry_interval_ms
:这是在尝试异步初始化一个Process
时(例如,{name}_local_async
或{name}_remote_async
)时,两次重试之间的等待时间(以毫秒为单位)。默认为100
。async_get_retry_interval_ms
:这是在尝试从Process
获取非阻塞结果(例如,{name}_local_async
或{name}_remote_async
)时,两次重试之间的等待时间(以毫秒为单位)。默认为100
。async_set_retry_interval_ms
:这是在执行Process
尝试设置非阻塞结果(例如,{name}_local_async
或{name}_remote_async
)时,两次重试之间的等待时间(以毫秒为单位)。默认为100
。shutdown_retry_interval_ms
:这是在尝试关闭一个Process
时,两次重试之间的等待时间(以毫秒为单位)。默认为100
。memory
:这是允许Process
使用的最大内存量。默认为100 * 1024 * 1024
(100MB)。fuel
:这是允许Process
使用的最大燃料量。默认为10
(每个燃料单位大约是100,000条WASM指令)。fanout
:这是在扇出时使用的方案类型。默认为roundrobin
。其他选项是random
。
功能标志
fly
:这启用了fly
功能,允许您使用fly.io
平台自动从主lunatic
节点设置节点。默认情况下不启用,因为这需要fly.io
账户和一些设置。有关更多信息,请参阅fly.io
文档。注意:由于在
fly.io
上 UDP 的限制(目前)也使此功能变得无效。我正在与fly.io
团队在论坛上共同努力解决此问题。
测试
cargo test
谢谢
特别感谢 lunatic 的作者和贡献者们的出色工作,并向主要作者 bkolobara 表示特别感谢。
许可证
MIT
依赖项
~3MB
~57K SLoC