#分布式 #编排器 # #执行引擎 #编排 #lucidity

lucidity-core

适用于Rust的分布式编排平台

1个不稳定版本

0.1.0 2024年1月7日

#16 in #编排器


用于 2 个crate

MIT 协议

20KB

Build and Test Version Crates.io Documentation Rust License:MIT

lucidity

基于 lunatic 构建的分布式执行引擎。

动机

基本上,lunatic 本身是一套“低级”特性:运行时、系统调用和语言包装器。

然而,在尝试保持代码可读性的同时,Process 架构使用起来稍微有些困难。这个库提供了一个 proc-macro,以及一些针对常见平台(如fly.io)的辅助工具,以简化在优秀的lunatic运行时之上编写分布式代码的过程。

示例

以下是一个简单示例。

fn main() {
    let results = pythagorean_remote_fanout(vec![
        (3, 4),
        (4, 5),
        (5, 6),
        (6, 7),
        (7, 8),
        (8, 9),
        (9, 10),
    ]);

    println!("result: {:#?}", results);
}

#[lucidity::job]
fn pythagorean(a: u32, b: u32) -> f32 {
    let num = ((square_remote_async(a).await_get() + square_remote_async(b).await_get()) as f32).sqrt();

    num
}

#[lucidity::job]
fn square(a: u32) -> u32 {
    let num = a * a;

    num
}

对于每个你放置proc宏(lucidity::job)的方法,我们生成几个其他的方法。

  • {name}_local,当调用时,在本地节点中启动函数,并阻塞调用 Process
  • {name}_remote,当调用时,在随机分布的节点上启动函数,并阻塞调用 Process
  • {name}_local_async,当调用时,在本地节点中启动函数,返回一个包装的 Process 引用,可以轮询或阻塞。
  • {name}_remote_async,当调用时,在随机分布的节点上启动函数,返回一个包装的 Process 引用,可以轮询或阻塞。
  • {name}_remote_fanout函数接收一个参数元组数组Vec,然后以轮询方式将调用分配给该函数,轮询所有Process进程,并阻塞直到所有进程完成,返回一个包含结果的Vec

上述示例使用lucidity::job进程宏生成了一些这样的函数,并且它们可以像任何其他函数一样“调用”。这里的目的是使用lunatic的优秀架构,同时减少成功编写分布式代码所需的样板代码。设置Process进程、Mailbox邮箱等都是由库自动处理的。
这种权衡是,这个库对如何编写代码以及你可以用它做什么有固定的看法(尽管欢迎建议)。此外,这个库引入了一些带超时的简单循环来避免可能的死锁,这会有一些开销。

库使用方法

首先,安装lunatic。

$ cargo install lunatic-runtime

将以下内容添加到您的Cargo.toml文件中

[dependencies]
lucidity = "*" # choose a version

在您的.cargo/config.toml文件中

[build]
target = "wasm32-wasi"

[target.wasm32-wasi]
runner = "lunatic run"

分布式设置

要在分布式设置中使用此库,您需要做几件事情。这个例子也可以通过使用环回地址轻松本地使用。

首先,您需要在某处运行控制节点。

$ lunatic control --bind-socket [::]:3030

然后,在您希望远程方法运行的任何其他机器上,您需要设置节点。

$ lunatic node --bind-socket [::]:3031 http://{IP_OR_HOST_OF_CONTROL}:3030/

本地测试

为了测试,您将构建您的代码并在lunatic节点中运行它。例如:

cargo build --release && lunatic node --wasm path/to/built/wasm/exe.wasm --bind-socket [::]:3032 http://{IP_OR_HOST_OF_CONTROL}:3030/

生产设置

在一个更生产化的设置中,您可能会使用类似fly.io的服务来部署您的代码(使用fly功能),并且您可能希望在容器中构建和运行您的代码。最简单的方法是运行控制节点和应用程序节点的简单Docker容器。您的入口点可能看起来像这样。

注意:由于fly.io上的UDP问题,“自动”的fly.io设置功能不工作,但在我解决这些问题后将会启用。

#!bin/bash

/lunatic control --bind-socket [::]:3030 &

/lunatic node --wasm /irl_processor.wasm --bind-socket $NODE_REACHABLE_IP:3031 http://[::1]:3030/

然后,在构建的wasm中,您将启动一些节点,这些节点在运行任何分布式方法之前将连接到其他机器上的控制节点。

lunatic简介

此库建立在lunatic之上,因此在使用此库之前,了解lunatic的基本知识非常重要。

进程

lunatic围绕Processes的概念构建。一个Process是由运行时创建的轻量级执行线程。每个Process都有自己的栈,并且与其他Process隔离。通过Mailbox进行通信,这些是可以在Process之间发送消息的队列。

在这种情况下,您可以完全直接使用Process,但lucidity库的目的是使编写分布式代码更加容易,所以我们将重点关注这一点。

邮箱

MailboxProcess之间通信的主要方式。一个Mailbox是一个队列,可以用来在Process之间发送消息。每个Process都有一个Mailbox,可以用来向该Process发送消息。

本库的使用中,您无需担心Mailbox,因为它们会自动为您处理。然而,了解它们的存在很重要,因为本库提供的“语法糖”抽象了这些消息队列。这不同于“异步Rust”或其他“异步/await”类型的语言。这些Process及其Mailbox,更类似于其他语言的协程或goroutine行为。

因此,这个库在“感觉”上增加了一些开销,类似于异步Rust或阻塞Rust,但它通过使用带有等待循环的超时来实现这种感觉。由于本项目的主要目的是将严谨的工作“分发”到其他节点,这种开销是可以接受的,但重要的是要理解,这不同于“异步Rust”。

WASM

lunatic建立在WebAssembly(WASM)的概念之上。WASM是一种二进制格式,旨在在沙盒环境中运行。lunatic之所以能够如此出色地扩展到分布式模型,是因为它依赖于“运行时”与WASM运行时捆绑在一起的概念,而WASM代码可以执行某些“运行时系统调用”以进行通信。WASM抽象了机器代码,使得每个节点只需另一个节点的WASM即可正常工作。

理论上,每个节点都可以初始化自己的WASM,并且lunatic运行时能够在任何这些节点上启动Process,因为每个节点都会将它的WASM发送到其他节点。

远程进程

远程启动的Process利用了您的可执行文件实际上是WASM的事实。基本上,lunatic将您的WASM可执行文件的副本发送到远程节点,然后在那里启动一个Process,本质上使用函数指针调用WASM可执行文件中的函数。这就是为什么您需要将代码构建为WASM,以及为什么需要使用相同的可执行文件来运行控制节点和应用节点。

但是,您无需担心将代码上传到其他节点。《code>lunatic运行时会自动处理这一点。这也意味着您的“裸”函数“直接工作”。该函数在WASM中,所以如果进程调用该函数,它将在进程运行的节点上调用,因为该节点确实有WASM。

很酷,不是吗?

示例

让我们看看几个示例,以了解您何时会使用特定类型的方法。

对于所有这些示例,我们可以假设我们已经声明了square函数如下。

#[lucidity::job]
fn square(a: u32) -> u32 {
    a * a
}

无进程

即使您使用lucidity::job proc宏标记一个函数,您仍然可以像普通函数一样调用它。

fn main() {
    // Calling `square` here does not span a process, and is called by the currently executing process
    // as if it were a normal function.
    let result = square(3);

    println!("result: {:#?}", result);
}

本地/远程进程

如果您想本地启动一个进程,您可以使用{name}_local方法。

fn main() {
    // The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
    // This function will be called on a set of nodes (round-robin-ed), and each of those nodes
    // will run it in a process.
    let results = pythagorean1_remote_fanout(vec![
        (3, 4),
        (4, 5),
        (5, 6),
        (6, 7),
        (7, 8),
        (8, 9),
        (9, 10),
    ]);

    println!("result: {:#?}", results);
}

#[lucidity::job]
fn pythagorean1(a: u32, b: u32) -> f32 {
    // Calling `square_local` here spawns a process locally, and blocks the current process until the
    // spawned process completes.  This is great for allowing the `lunatic` executor to "yield" more often,
    // especially if the `square` method had reasonable yield points.  However, the process it is called from is blocked,
    // so keep that in mind.
    //
    // In this case, we don't really mind blocking here, since we are in a lightweight process.  However, you may notice
    // there is an inefficiency in not computing the square of `a` and `b` in parallel.
    ((square_local(a) + square_local(b)) as f32).sqrt()
}

#[lucidity::job]
fn pythagorean2(a: u32, b: u32) -> f32 {
    // Calling `square_remote` here spawns a process on a random remote node, and blocks the current process until the
    // spawned process completes.  This is great for ensuring a certain set of processes are being distributed,
    // but blocks the process it is called from.
    ((square_remote(a) + square_remote(b)) as f32).sqrt()
}

本地/远程异步进程

如果您想更精细地控制何时阻塞,您可以使用{name}_local_async{name}_remote_async方法。

fn main() {
    // The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
    // This function will be called on a set of nodes (round-robin-ed), and each of those nodes
    // will run it in a process.
    let results = pythagorean1_remote_fanout(vec![
        (3, 4),
        (4, 5),
        (5, 6),
        (6, 7),
        (7, 8),
        (8, 9),
        (9, 10),
    ]);

    println!("result: {:#?}", results);
}


#[lucidity::job]
fn pythagorean1(a: u32, b: u32) -> f32 {
    // This spawns a local process, and hands back a `Job` that can be polled, or blocked upon.
    // Here, we are going to block with `await_get`.  This can be used for either local or remote
    // async jobs.
    let square_a_job = square_local_async(a);
    let square_b_job = square_local_async(b);

    // We get to this point "immediately".  The `square_a_job` and `square_b_job` are running in
    // their own processes, and we can do other work here.

    // Maybe do some other work in here ...

    let mut square_a = square_a_job.await_get();
    let mut square_b = square_b_job.await_get();

    ((square_a + square_b) as f32).sqrt()
}

#[lucidity::job]
fn pythagorean2(a: u32, b: u32) -> f32 {
    // This spawns a local process, and hands back a `Job` that can be polled, or blocked upon.
    // Here, we are going to loop and check for completion with `try_get` (sort of naively).  This can be used for either local or remote
    // async jobs.
    let square_a_job = square_remote_async(a);
    let square_b_job = square_remote_async(b);

    // We get to this point "immediately".  The `square_a_job` and `square_b_job` are running in
    // their own processes, and we can do other work here.
    
    // Maybe do your own looping ...
    let (square_a, square_b) = loop {
        let Some(a) = square_a_job.try_get() else {
            continue;
        }

        let Some(b) = square_b_job.try_get() else {
            continue;
        }

        break (a, b);
    }

    ((square_a + square_b) as f32).sqrt()
}

远程扇出

如果您基本上想执行相同的操作,但使用不同的参数,并且您想阻塞所有这些,您可以使用{name}_remote_fanout方法。

fn main() {
    // The `remote_fanout` is discussed below, but this is sort of the main meat and potatoes.
    // This function will be called on a set of nodes (round-robin-ed), and each of those nodes
    // will run it in a process.
    //
    // This is great for ensuring a certain set of processes are being distributed, but blocks the process it is called from.
    // You may have something where you are processing a set of images.  This would be a great use case to put those images
    // in a `Vec`, and then call this method to fan out all of the work.
    let results = square_remote_fanout(vec![1, 2, 3, 4, 5]);

    println!("result: {:#?}", results);
}

作业属性选项

lucidity::job进程宏有几个选项,可以用来自定义生成方法的行怍。

通常情况下,不需要使用这些选项,但如果你需要它们,它们是可用的。

  • init_retry_interval_ms:这是在尝试初始化Process时重试之间的等待时间(毫秒)。默认值为100
  • sync_retry_interval_ms:这是在尝试从Process获取阻塞(例如,{name}_local{name}_remote)时重试之间的等待时间(毫秒)。默认值为100
  • async_init_retry_interval_ms:这是在尝试异步初始化Process时重试之间的等待时间(毫秒)(例如,{name}_local_async{name}_remote_async)。默认值为100
  • async_get_retry_interval_ms:这是在尝试从Process获取非阻塞结果时重试之间的等待时间(毫秒)(例如,{name}_local_async{name}_remote_async)。默认值为100
  • async_set_retry_interval_ms:这是在执行Process尝试设置非阻塞结果时重试之间的等待时间(毫秒)(例如,{name}_local_async{name}_remote_async)。默认值为100
  • shutdown_retry_interval_ms:这是在尝试关闭Process时重试之间的等待时间(毫秒)。默认值为100
  • memory:这是允许Process使用的最大内存量。默认值为100 * 1024 * 1024(100MB)。
  • fuel:这是允许Process使用的最大燃料量。默认值为10(每个燃料单位大约是100,000 WASM指令)。
  • fanout:这是在扇出时使用的方案类型。默认值为roundrobin。另一个选项是random

功能标志

  • fly:这启用了 fly 功能,允许您使用 fly.io 平台自动设置从主 lunatic 节点。默认情况下,此功能未启用,因为它需要一个 fly.io 账户和一些配置。有关更多信息,请参阅 fly.io 文档。

    注意:此功能(目前)也由于 fly.io 上 UDP 的限制而变得无意义。我正在论坛上与 fly.io 团队合作解决这个问题。

测试

cargo test

谢谢

特别感谢 lunatic 的作者和贡献者们的出色工作,并特别感谢主要作者 bkolobara

许可

MIT

依赖关系

~2.1–2.9MB
~55K SLoC