#分布式 #执行引擎 #编排器 # #编排 #助手

lucidity

一个用于Rust的分布式编排平台

1个不稳定版本

0.1.0 2024年1月7日

#217 in 进程宏

MIT许可证

48KB
186

Build and Test Version Crates.io Documentation Rust License:MIT

lucidity

基于lunatic构建的分布式执行引擎。

动机

基本上,lunatic本身是一套“低级”特性:运行时、系统调用和语言包装器。

然而,当试图保持代码可读性时,Process架构的使用稍微有些困难。这个库提供了一个proc-macro,以及一些针对常见平台(如fly.io)的帮助工具,以使得在优秀的lunatic运行时之上编写分布式代码更加容易。

示例

下面是一个简单的示例。

fn main() {
    let results = pythagorean_remote_fanout(vec![
        (3, 4),
        (4, 5),
        (5, 6),
        (6, 7),
        (7, 8),
        (8, 9),
        (9, 10),
    ]);

    println!("result: {:#?}", results);
}

#[lucidity::job]
fn pythagorean(a: u32, b: u32) -> f32 {
    let num = ((square_remote_async(a).await_get() + square_remote_async(b).await_get()) as f32).sqrt();

    num
}

#[lucidity::job]
fn square(a: u32) -> u32 {
    let num = a * a;

    num
}

对于您放置进程宏(lucidity::job)的每个方法,我们都会生成几个其他的方法。

  • {name}_local,当调用时,在本地节点中启动函数,并阻塞调用Process
  • {name}_remote,当调用时,在随机分布式节点上的Process中启动函数,并阻塞调用Process
  • {name}_local_async,当调用时,在本地节点中的Process中启动函数,返回一个包装的Process引用,可以轮询或阻塞。
  • {name}_remote_async,当调用时,在随机分布式节点上的Process中启动函数,返回一个包装的Process引用,可以轮询或阻塞。
  • {name}_remote_fanout,它接收一个包含参数元组的 Vec,并使用轮询算法将调用分配给该函数,轮询所有 Process,直到所有调用完成,返回一个包含结果的 Vec

上述示例使用 lucidity::job 进程宏生成了一些这样的函数,并且它们可以像其他任何函数一样被调用。这里的目标是利用 lunatic 的优秀架构,同时减少编写分布式代码所需的样板代码。设置 ProcessMailbox 等操作都由您处理。
代价是,这个库对您编写代码的方式以及您可以做什么有一定的看法(尽管欢迎建议)。此外,这个库引入了一些带有超时时间的简单循环,以避免可能的死锁,这带来了一些开销。

库使用

首先,安装 lunatic。

$ cargo install lunatic-runtime

将以下内容添加到您的 Cargo.toml

[dependencies]
lucidity = "*" # choose a version

在您的 .cargo/config.toml

[build]
target = "wasm32-wasi"

[target.wasm32-wasi]
runner = "lunatic run"

分布式设置

要在分布式环境中使用此库,您需要做一些事情。此示例也可以通过使用回环地址轻松本地使用。

首先,您需要在一个地方运行控制节点。

$ lunatic control --bind-socket [::]:3030

然后,在您希望远程方法运行的任何其他机器上,您需要设置节点。

$ lunatic node --bind-socket [::]:3031 http://{IP_OR_HOST_OF_CONTROL}:3030/

本地测试

对于测试,您将构建您的代码,并在 lunatic 节点内部运行它。例如。

cargo build --release && lunatic node --wasm path/to/built/wasm/exe.wasm --bind-socket [::]:3032 http://{IP_OR_HOST_OF_CONTROL}:3030/

生产设置

在更生产化的设置中,您可能会使用类似 fly.io 的服务来部署您的代码(使用 fly 功能),并且您可能希望在容器中构建和运行您的代码。最简单的方法是运行控制节点和应用节点的简单 Docker 容器。您的入口点可能看起来像这样。

注意:由于 fly.io 上的 UDP 问题,自动 fly.io 设置功能不起作用,但在我解决这些问题后将会启用。

#!bin/bash

/lunatic control --bind-socket [::]:3030 &

/lunatic node --wasm /irl_processor.wasm --bind-socket $NODE_REACHABLE_IP:3031 http://[::1]:3030/

然后,在您的构建好的 wasm 中,您将连接一些节点,这些节点会在运行任何分布式方法之前连接到其他机器上的控制节点。

lunatic 入门指南

此库建立在 lunatic 的基础上,因此在使用此库之前,了解 lunatic 的基础知识很重要。

进程

lunatic 是围绕 Processes 的概念构建的。一个 Process 是由运行时生成的轻量级执行线程。每个 Process 都有自己的堆栈,并且与其他 Process 隔离。进程之间通过 Mailbox 进行通信,Mailbox 实质上是队列,可以用于在进程之间发送消息。

在此库的情况下,您完全可以直接使用 Process,但 lucidity 库的目的是使编写分布式代码更加容易,因此我们将重点关注这一点。

邮箱

Mailbox 是进程之间通信的主要方式。一个 Mailbox 是一个队列,可以用于在进程之间发送消息。每个 Process 都有一个 Mailbox,可以用来向该 Process 发送消息。

对于这个库来说,你不需要担心 Mailbox,因为它们会为你处理。然而,了解它们的存在是很重要的,因为这个库提供的“语法糖”抽象掉了这些消息队列。这与“异步Rust”或任何其他“异步/await”类型的语言不同。这些 Process 和它们的 Mailbox 更像其他语言的协程或goroutine行为。

因此,这个库在“感觉”上添加了一些开销,有点像异步Rust或阻塞Rust,但它通过使用带有等待循环的超时来实现这种感觉。因为这个项目更多是为了将严格的工作“扇出”到其他节点,所以这种开销是可以接受的,但重要的是要理解这并不像“异步Rust”。

WASM

lunatic 是基于 WebAssembly (WASM) 的概念构建的。WASM 是一种二进制格式,旨在在沙盒环境中运行。由于 lunatic 依赖于“运行时”与 WASM 运行时一起打包,并且 WASM 代码可以进行某些“运行时系统调用”以进行通信,因此它能够非常有效地扩展到分布式模型。WASM 抽象掉了机器代码,使得每个节点只需要另一个节点的 WASM 就可以正常工作。

理论上,多个节点可以各自初始化自己的 WASM,而 lunatic 运行时能够在任何这些节点上生成 Process,因为每个节点都会将其 WASM 发送到其他节点。

远程进程

被远程生成的 Process 利用可执行文件实际上是 WASM 的事实。基本上,lunatic 将你的 WASM 可执行文件的副本发送到远程节点,然后在那里生成一个 Process,本质上使用函数指针调用你的 WASM 可执行文件中的函数。这就是为什么你需要将你的代码作为 WASM 构建,以及为什么你需要使用相同的可执行文件来运行控制节点和应用节点。

但是,你不需要担心将你的代码放到其他节点上。lunatic 运行时自动处理这一点。这也意味着你的“裸”函数“直接工作”。该函数在 WASM 中,所以如果进程调用该函数,它将在进程运行的节点上调用,因为该节点WASM。

很酷,对吧?

示例

让我们看看几个示例,以了解你何时会使用特定类型的函数。

对于所有这些示例,我们可以假设我们已经声明了 square 函数如下。

#[lucidity::job]
fn square(a: u32) -> u32 {
    a * a
}

无进程

即使你用 lucidity::job proc 宏标记了一个函数,你仍然可以像正常函数一样调用它。

fn main() {
    // Calling `square` here does not span a process, and is called by the currently executing process
    // as if it were a normal function.
    let result = square(3);

    println!("result: {:#?}", result);
}

本地/远程进程

如果你想在本地生成进程,你可以使用 {name}_local 方法。

fn main() {
    // The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
    // This function will be called on a set of nodes (round-robin-ed), and each of those nodes
    // will run it in a process.
    let results = pythagorean1_remote_fanout(vec![
        (3, 4),
        (4, 5),
        (5, 6),
        (6, 7),
        (7, 8),
        (8, 9),
        (9, 10),
    ]);

    println!("result: {:#?}", results);
}

#[lucidity::job]
fn pythagorean1(a: u32, b: u32) -> f32 {
    // Calling `square_local` here spawns a process locally, and blocks the current process until the
    // spawned process completes.  This is great for allowing the `lunatic` executor to "yield" more often,
    // especially if the `square` method had reasonable yield points.  However, the process it is called from is blocked,
    // so keep that in mind.
    //
    // In this case, we don't really mind blocking here, since we are in a lightweight process.  However, you may notice
    // there is an inefficiency in not computing the square of `a` and `b` in parallel.
    ((square_local(a) + square_local(b)) as f32).sqrt()
}

#[lucidity::job]
fn pythagorean2(a: u32, b: u32) -> f32 {
    // Calling `square_remote` here spawns a process on a random remote node, and blocks the current process until the
    // spawned process completes.  This is great for ensuring a certain set of processes are being distributed,
    // but blocks the process it is called from.
    ((square_remote(a) + square_remote(b)) as f32).sqrt()
}

本地/远程异步进程

如果你想更精细地控制何时阻塞,你可以使用 {name}_local_async{name}_remote_async 方法。

fn main() {
    // The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
    // This function will be called on a set of nodes (round-robin-ed), and each of those nodes
    // will run it in a process.
    let results = pythagorean1_remote_fanout(vec![
        (3, 4),
        (4, 5),
        (5, 6),
        (6, 7),
        (7, 8),
        (8, 9),
        (9, 10),
    ]);

    println!("result: {:#?}", results);
}


#[lucidity::job]
fn pythagorean1(a: u32, b: u32) -> f32 {
    // This spawns a local process, and hands back a `Job` that can be polled, or blocked upon.
    // Here, we are going to block with `await_get`.  This can be used for either local or remote
    // async jobs.
    let square_a_job = square_local_async(a);
    let square_b_job = square_local_async(b);

    // We get to this point "immediately".  The `square_a_job` and `square_b_job` are running in
    // their own processes, and we can do other work here.

    // Maybe do some other work in here ...

    let mut square_a = square_a_job.await_get();
    let mut square_b = square_b_job.await_get();

    ((square_a + square_b) as f32).sqrt()
}

#[lucidity::job]
fn pythagorean2(a: u32, b: u32) -> f32 {
    // This spawns a local process, and hands back a `Job` that can be polled, or blocked upon.
    // Here, we are going to loop and check for completion with `try_get` (sort of naively).  This can be used for either local or remote
    // async jobs.
    let square_a_job = square_remote_async(a);
    let square_b_job = square_remote_async(b);

    // We get to this point "immediately".  The `square_a_job` and `square_b_job` are running in
    // their own processes, and we can do other work here.
    
    // Maybe do your own looping ...
    let (square_a, square_b) = loop {
        let Some(a) = square_a_job.try_get() else {
            continue;
        }

        let Some(b) = square_b_job.try_get() else {
            continue;
        }

        break (a, b);
    }

    ((square_a + square_b) as f32).sqrt()
}

远程扇出

如果你本质上想执行相同的操作,但使用不同的参数,并且你想要阻塞在所有这些操作上,你可以使用 {name}_remote_fanout 方法。

fn main() {
    // The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
    // This function will be called on a set of nodes (round-robin-ed), and each of those nodes
    // will run it in a process.
    //
    // This is great for ensuring a certain set of processes are being distributed, but blocks the process it is called from.
    // You may have something where you are processing a set of images.  This would be a great use case to put those images
    // in a `Vec`, and then call this method to fan out all of the work.
    let results = square_remote_fanout(vec![1, 2, 3, 4, 5]);

    println!("result: {:#?}", results);
}

作业属性选项

《lucidity::job》过程宏有几个选项,可以用来自定义生成方法的行怛。

通常情况下,不需要使用这些选项,但如果需要,它们是可用的。

  • init_retry_interval_ms:这是在尝试初始化一个 Process 时重试之间等待的毫秒数。默认为 100
  • sync_retry_interval_ms:这是在尝试从一个 Process 获取阻塞调用(例如,{name}_local{name}_remote)时重试之间等待的毫秒数。默认为 100
  • async_init_retry_interval_ms:这是在尝试异步初始化一个 Process(例如,{name}_local_async{name}_remote_async)时重试之间等待的毫秒数。默认为 100
  • async_get_retry_interval_ms:这是在尝试从一个 Process 获取非阻塞结果(例如,{name}_local_async{name}_remote_async)时重试之间等待的毫秒数。默认为 100
  • async_set_retry_interval_ms:这是在执行 Process 尝试设置非阻塞结果(例如,{name}_local_async{name}_remote_async)时重试之间等待的毫秒数。默认为 100
  • shutdown_retry_interval_ms:这是在尝试关闭一个 Process 时重试之间等待的毫秒数。默认为 100
  • memory:这是允许 Process 的最大内存量。默认为 100 * 1024 * 1024(100MB)。
  • fuel:这是允许 Process 的最大燃料量。默认为 10(每个燃料单位大约是 100,000 个 WASM 指令)。
  • fanout:这是在广播时使用的方案类型。默认为 roundrobin。另一个选项是 random

特性标志

  • fly:这启用了 fly 特性,允许您使用 fly.io 平台自动从主 lunatic 节点设置节点。默认情况下不启用,因为它需要一个 fly.io 账户和一些设置。有关更多信息,请参阅 fly.io 文档。

    注意:由于在 fly.io 上 UDP 的限制(目前)也使此功能变得无用。我正在与 fly.io 团队在论坛上解决这个问题。

测试

cargo test

感谢

特别感谢 lunatic 的作者和贡献者们的出色工作,特别感谢主要作者 bkolobara

许可协议

MIT

依赖项

~18MB
~552K SLoC