#scope #async #structured #experimental #parallel #data

moro

为异步 Rust 提供实验性的结构化并发支持(类似于 trio 的 nurseries)

5 个版本 (3 个破坏性更新)

0.4.0 2022 年 6 月 4 日
0.3.0 2022 年 6 月 4 日
0.2.0 2022 年 6 月 4 日
0.1.1 2022 年 4 月 17 日
0.1.0 2022 年 4 月 17 日

#381并发

Download history 144/week @ 2024-03-15 94/week @ 2024-03-22 86/week @ 2024-03-29 54/week @ 2024-04-05 4/week @ 2024-04-12 4/week @ 2024-04-19 6/week @ 2024-04-26 17/week @ 2024-05-17 8/week @ 2024-05-24 5/week @ 2024-05-31 5/week @ 2024-06-07 8/week @ 2024-06-14 51/week @ 2024-06-21 16/week @ 2024-06-28

每月 82 次下载
2 个 Crates 中使用(通过 earendil

MIT/Apache

25KB
295

moro

Rust 结构化并发的实验

TL;DR

类似于 rayonstd::thread::scope,moro 允许您使用 moro::async_scope! 宏创建一个 作用域。在此作用域内,您可以启动可以访问作用域外定义的堆栈数据的作业

let value = 22;
let result = moro::async_scope!(|scope| {
    let future1 = scope.spawn(async {
        let future2 = scope.spawn(async {
            value // access stack values that outlive scope
        });

        let v = future2.await * 2;
        v
    });

    let v = future1.await * 2;
    v
})
.await;
eprintln!("{result}"); // prints 88

堆栈访问,核心作用域 API

调用 moro::async_scope!(|scope| ...),您将获得一个表示整个作用域的 future,您可以等待它启动作用域。

在作用域体(...)内,您可以调用 scope.spawn(async { ... }) 来启动一个作业。此作业必须在作用域本身被认为完成之前终止。`scope.spawn` 的结果是返回作业结果的 future。

提前终止和取消

Moro作用域支持早期终止取消。您可以调用scope.terminate(v).await,作用域内的所有子线程将立即停止执行。当v是一个Result并希望取消其中的Err值时,通常使用终止(在引入中我们提供了如unwrap_or_cancel等辅助方法)。

使用取消的一个示例可以在monitor中看到——在这个示例中,生成了几个工作,它们都检查一个输入整数。如果任何整数是负数,整个作用域将被取消。

未来工作:集成与类似rayon的迭代器

我想这样做。 :)

常见问题

moro这个名字的由来是什么?

这个名字来自希腊语中“婴儿”的意思(μωρό)。流行的"trio"库使用“nursery”来指代作用域,因此我想尊重这种传统。

不过,“moros”也是即将到来的毁灭的“仇恨”精神,这是我之前不知道的,但感觉相当酷。

是否有其他异步nursery项目可用,moro与它们相比如何?

是的!我了解...

  • async_nursery,它类似于moro,但提供并行执行(不仅仅是并发),但——作为结果——需要'static约束。
  • FuturesUnordered,它可以作为某种nursery使用,但它也有一些已知问题(这在这里被称为“陷阱”)。此类型目前用于moro实现,但moro的API阻止了这些陷阱的发生。
  • select操作通常用于“建模”并行流;与FuturesUnordered一样,这是一种容易出错的办法,moro部分是为了替代类似select的API而演化的。

为什么moro的spawn只运行并发而不是并行?

在当前的Rust中,无法安全地执行并行moro任务。详细内容在后续问题中,但简而言之,当moro作用域向其调用者让步时,作用域“放弃控制权”,而调用者可以选择——如果它选择的话——完全忘记作用域并停止执行它。这意味着如果moro作用域已经开始并行线程,这些线程将继续访问调用者的数据,这可能导致数据竞争。这很不好。

运行并发不是一个巨大的限制吗?

有点?并行当然很好,但对于许多异步服务器来说,您在连接之间获得并行性,不需要在连接内部获得并行性。您还可以使用其他机制来获得并行性,但需要'static约束。

好吧,但为什么moro的spawn只运行并发而不是并行?给我详细说明!

方法 Future::poll 允许安全地“部分推进”一个future,然后,因为future是一个普通的Rust值,所以“忘记”它(例如,通过 std::mem::forget,尽管还有其他方法)。这可以让您创建一个作用域,执行几次,然后丢弃它,而无需运行任何析构函数

async fn method() {
    let data = vec![1, 2, 3];
    let some_future = moro::async_scope!(|scope| {
        scope.spawn(async { 
            for d in &data {
                tokio::task::yield_now().await;
            }
        });
    });

    // pseudo-code, we'd have to gin up a context etc:
    std::future::Future::poll(some_future);
    std::future::Future::poll(some_future);
    std::mem::forget(some_future);
    return;
}

如果moro任务并行运行,我们就没有办法确保在 method 返回之前,作用域内创建的并行线程被停止。因此,它们会继续从 data 中访问数据,即使栈帧弹出和数据被释放。不好。

但由于moro仅限于并发,这没问题。作用域内的任务只有在它们被轮询时才会推进(它们不是并行的)——所以当您“忘记”作用域时,您只是停止执行任务。

请注意,这个问题在像 rayon 或新的 std::thread::scope 这样的库中不会发生。这是因为同步代码有一个异步代码所缺乏的能力:同步函数可以阻塞其调用者(这是因为安全的Rust禁止longjmp)。但在异步代码中,根据Rust的当前模型,只要您“await”某事,您就是将控制权交给您的调用者,他们可以自由地不再轮询您。这意味着,我相信,不可能有一个像moro那样的“作用域”,它可以安全地引用作用域之外的数据,因为该数据属于您的调用者,您不能强迫他们不返回。换句话说,异步代码 可以 通过与执行器合作,确保某些future运行到完成。任何对 tokio::spawn 的调用都会这样做。但是您不能确保您的future被 嵌入 在运行到完成的其他东西中。

我认为,不修改 Future 特征或以某种方式修改Rust,就不能安全地启用并行执行。有一些提议要更改 Future 特征,以使moro支持并行执行(这些相同的提议将有助于支持io-uring、DMA和其他功能),但前进的具体路径尚未确定。

依赖项

~1.2–2MB
~38K SLoC