#async #future #async-await #io #proc-macro #syntax #generator

nightly futures-await

通过过程宏为 Rust 实现的异步/await 语法。这个包定义了用于函数的 #[async] 宏,用于函数体的 await! 宏,并打算与 crates.io 上的 futures 包一起使用。

3 个不稳定版本

使用旧的 Rust 2015

0.2.0-alpha2018 年 3 月 8 日
0.1.1 2018 年 4 月 11 日
0.1.0 2017 年 10 月 28 日

#149 in #async-await

Download history 74/week @ 2024-03-11 76/week @ 2024-03-18 90/week @ 2024-03-25 176/week @ 2024-04-01 59/week @ 2024-04-08 84/week @ 2024-04-15 88/week @ 2024-04-22 75/week @ 2024-04-29 69/week @ 2024-05-06 74/week @ 2024-05-13 81/week @ 2024-05-20 66/week @ 2024-05-27 72/week @ 2024-06-03 51/week @ 2024-06-10 66/week @ 2024-06-17 72/week @ 2024-06-24

268 每月下载量
用于 少于 8 个

MIT/Apache

41KB
398

futures-await

Rust 的异步/await 语法和 futures

这是什么?

在 Rust 中,目前使用 futures 的主要方式是通过 Future 特性上的各种组合子。这并不完全是“回调地狱”,但随着每个新闭包的添加,代码的右移可能会让人感觉像它。异步/await 的目的是在保持异步代码所有功能的同时,为代码提供一种更“同步”的感觉!

这里这个小例子展示了这个包的功能

#[async]
fn fetch_rust_lang(client: hyper::Client) -> io::Result<String> {
    let response = await!(client.get("https://rust-lang.net.cn"))?;
    if !response.status().is_success() {
        return Err(io::Error::new(io::ErrorKind::Other, "request failed"))
    }
    let body = await!(response.body().concat())?;
    let string = String::from_utf8(body)?;
    Ok(string)
}

这里值得注意的点是

  • 函数被标记为 #[async],这意味着它们是 异步的,在调用时返回一个 future 而不是返回列出的结果。
  • await! 宏允许在 future 上进行阻塞,直到完成。但这实际上并没有阻塞线程,它只是“阻塞”了从 fetch_rust_lang 返回的 future。您可以将 await! 宏视为一个从 future 到其 Result 的函数,在过程中消耗 future。
  • ? 函数中,错误处理变得更加自然。我们可以使用 ? 操作符以及简单的 return Err 语句。在 futures crate 的组合子中,这通常要繁琐得多。
  • fetch_rust_lang 返回的未来实际上是在编译时生成的一个状态机,并且不做隐式内存分配。这是基于下面描述的生成器构建的。

您还可以有异步方法

impl Foo {
    #[async]
    fn do_work(self) -> io::Result<u32> {
        // ...
    }
}

您可以指定您实际上更喜欢返回一个特质对象,例如 Box<Future<Item = i32, Error = io::Error>>

#[async(boxed)]
fn foo() -> io::Result<i32> {
    // ...
}

您还可以有“异步 for 循环”,它操作于 Stream 特质

#[async]
for message in stream {
    // ...
}

异步 for 循环会传播函数外的错误,因此 message 具有传入 streamItem 类型。请注意,异步 for 循环只能用于 #[async] 函数内。

最后,您可以通过 #[async_stream(item = _)] 创建一个 Stream 而不是 Future

#[async]
fn fetch(client: hyper::Client, url: &'static str) -> io::Result<String> {
    // ...
}

/// Fetch all provided urls one at a time
#[async_stream(item = String)]
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) -> io::Result<()> {
    for url in urls {
        let s = await!(fetch(client, url))?;
        stream_yield!(s);
    }
    Ok(())
}

#[async_stream] 必须指定一个项目类型,通过 item = some::Path,并且从流输出的值必须被包装成一个 Result,并通过 stream_yield! 宏产出。此宏还支持与 #[async] 相同的功能,额外的 boxed 参数用于返回一个 Box<Stream>,异步 for 循环等。

如何使用这个功能?

此实现目前基本上基于生成器/协程。这个特性在 刚刚发布,并于 2017-08-29 成为 Rust 的夜间频道的一部分。您可以通过以下方式获取夜间频道;

rustup update nightly

完成此操作后,您需要确保 Cargo 使用夜间工具链,通过以下方式调用它

cargo +nightly build

接下来,您将将其添加到您的 Cargo.toml

[dependencies]
futures-await = "0.1"

然后...

#![feature(proc_macro, conservative_impl_trait, generators)]

extern crate futures_await as futures;

use futures::prelude::*;

#[async]
fn foo() -> Result<i32, i32> {
    Ok(1 + await!(bar())?)
}

#[async]
fn bar() -> Result<i32, i32> {
    Ok(2)
}

fn main() {
    assert_eq!(foo().wait(), Ok(3));
}

这个crate旨在“伪装”成futures crate,重新导出其整个层次结构,并仅通过必要的运行时支持、async属性以及await!宏对其进行增强。当你导入它时,这些导入都包含在futures::prelude::*中。

如果你想要查看大量代码示例,也可以查看async-await分支的sccache,它正在使用async/await语法进行多地点的过渡。你会发现代码在之后要容易阅读得多,尤其是这些更改

技术细节

如前所述,这个crate本质上基于Rust的生成器功能。生成器,在本例中通常称为无栈协程,允许编译器为函数生成最优的Future实现,将看起来同步的代码块转换为可以异步执行的Future。这里的反编译过程从你编写的代码到rustc编译的代码非常直接,你可以浏览futures-async-macro crate的源代码来获取更多详细信息。

除此之外,该crate还提供了一些主要的“API”

  • #[async] - 这个属性可以应用于方法和函数,表示它是一个异步函数。函数的签名必须返回某种形式的Result(尽管它可以返回结果的类型定义)。此外,函数的参数必须全部是拥有值,换句话说,不能包含任何引用。这个限制可能在未来被取消!

    一些例子是

    // attribute on a bare function
    #[async]
    fn foo(a: i32) -> Result<u32, String> {
        // ...
    }
    
    // returning a typedef works too!
    #[async]
    fn foo() -> io::Result<u32> {
        // ...
    }
    
    impl Foo {
        // methods also work!
        #[async]
        fn foo(self) -> io::Result<u32> {
            // ...
        }
    }
    
    // For now, however, these do not work, they both have arguments which contain
    // references! This may work one day, but it does not work today.
    //
    // #[async]
    // fn foo(a: &i32) -> io::Result<u32> { /* ... */ }
    //
    // impl Foo {
    //     #[async]
    //     fn foo(&self) -> io::Result<u32> { /* ... */ }
    // }
    

    请注意,#[async]函数的行为非常类似于它的同步版本。例如,?操作符在内部是如何工作的,你可以使用提前的return语句等。

    在底层,#[async]函数被编译成一个表示该函数未来的状态机,状态机的挂起点将位于await!宏所在的位置。

  • async_block! - 这个宏与#[async]类似,因为它也创建了一个future,但它可以在表达式上下文中使用,不需要一个专门的功能。这个宏可以被认为是“异步运行这段代码”,其中提供的代码块,就像一个异步函数一样,返回某种形式的结果。例如

    let server = TcpListener::bind(..);
    let future = async_block! {
        #[async]
        for connection in server.incoming() {
            spawn(handle_connection(connection));
        }
        Ok(())
    };
    core.run(future).unwrap();
    

    或者如果你想在函数中做一些预计算

    fn hash_file(path: &Path, pool: &CpuPool)
        -> impl Future<Item = u32, Error = io::Error>
    {
        let abs_path = calculate_abs_path(path);
        async_block! {
            let contents = await!(read_file)?;
            Ok(hash(&contents))
        }
    }
    
  • await! - 这是一个由 futures-await-macro 包提供的宏,它允许等待一个future的完成。宏 await! 只能在 #[async] 函数或 async_block! 中使用,可以将其视为一个函数,如下所示

    fn await!<F: Future>(future: F) -> Result<F::Item, F::Error> {
        // magic
    }
    

    该宏的一些示例包括

    #[async]
    fn fetch_url(client: hyper::Client, url: String) -> io::Result<Vec<u8>> {
        // note the trailing `?` to propagate errors
        let response = await!(client.get(url))?;
        await!(response.body().concat())
    }
    
  • #[async] 循环 - 能够异步遍历一个 Stream。你可以通过将 #[async] 属性附加到一个迭代对象实现了 Stream 特质的 for 循环上来实现这一点。

    流中的错误将自动传播,否则,for循环将遍历流直到完成,并将每个元素绑定到提供的模式。

    一些例子是

    #[async]
    fn accept_connections(listener: TcpListener) -> io::Result<()> {
        #[async]
        for connection in server.incoming() {
            // `connection` here has type `TcpStream`
        }
        Ok(())
    }
    

    请注意,与 await! 一样,#[async] for 循环只能用在异步函数或异步块中。

  • #[async_stream(item = ...)] - 定义了一个实现了 Stream 而不是 Future 的函数。此函数使用 stream_yield! 宏产生项,并且与其他 await! 宏和 #[async] for 循环一起工作。

    声明的函数必须返回一个 Result<(), E>,其中 E 成为返回的 Stream 的错误。通过从函数中返回 Ok(()) 或返回一个错误来终止流。在函数内部,操作如 ? 也可以用于传播错误。

    一个示例是

    #[async_stream(item = u32)]
    fn accept_connections(listener: TcpListener) -> io::Result<()> {
        #[async]
        for object in fetch_all_objects() {
            let description = await!(fetch_object_description(object));
            stream_yield!(description);
        }
        Ok(())
    }
    

夜间功能

目前,此包需要两个夜间功能才能使用,实际上需要三个功能才能充分利用。这三个功能是

  • #![feature(generators)] - 这是一个实验性语言特性,尚未稳定,但它是实现 async/await 的基础。最近已经实现的 功能,稳定化的进展可以在其 跟踪问题 上找到。

  • #![feature(proc_macro)] - 这也被称为 "宏 2.0",是本 crate 中定义 #[async] 属性的方式,而不是编译器本身。我们还可以利用其他宏 2.0 功能,例如通过 use 导入宏,而不是 #[macro_use]。此功能的跟踪问题包括 #38356#35896

  • #![feature(conservative_impl_trait)] - 如果您选择始终通过 #[async(boxed)] 使用 trait 对象,则此功能并非必须使用此 crate,但它对于优雅地使用此 crate 实际上是必需的。此功能被用于因为每个函数都返回 impl Future<...> 而不是具体类型的未来。此功能在 #34511#42183 上进行跟踪。

此 crate 的意图是,最新的功能 generators 将是最后一个稳定的。其他两个,proc_macroconservative_impl_trait,希望能在 generators 之前稳定下来!

接下来是什么?

此 crate 仍然相当新,并且 generators 只是刚刚在 Rust 的 nightly 频道上落地。虽然目前没有计划对此 crate 进行重大更改,但我们将看看这一切会如何发展!将 generators 这样快地纳入语言的主要动机之一是间接地增强 async/await 的实现,您在评估这一点时需要提供帮助!

值得注意的是,我们正在寻找对此 crate 中 async/await 以及作为语言特性的 generators 的反馈。我们想确保,如果我们稳定了 generators 或过程宏,它们将提供最佳的 async/await 体验!

目前建议在生产环境中使用此功能时谨慎行事。该软件包本身只经过了非常有限的测试,并且 generators 语言特性也仅进行了有限的测试。请谨慎操作,并留意可能出现的问题!如果您有任何疑问或遇到任何问题,欢迎您提交问题

注意事项

正如许多夜间特性一样,在处理此项目时,有一些需要注意的问题。尽管欢迎提交错误报告或经验报告,但了解需要修复的内容总是好事!

借用

目前,借用并不十分有效。编译器将拒绝许多借用,或者由于生成器特性正在开发中,可能存在一些潜在的不安全性。例如,如果您有一个这样的函数:

#[async]
fn foo(s: &str) -> io::Result<()> {
    // ..
}

这可能会编译失败!原因在于返回的未来通常需要遵守 'static 绑定。异步函数在调用时不会执行任何代码,它们只有在轮询时才会取得进展。这意味着当您调用异步函数时,它将创建一个未来,包装参数,然后将其返回给您。在这种情况下,它必须返回 &str,但这并不总是符合生命周期的工作方式。

使上述函数编译的一个例子是

#[async]
fn foo(s: String) -> io::Result<()> {
    // ...
}

或以某种方式使用拥有的值而不是借用引用。

请注意,问题不仅限于参数。例如,以下代码(或至少今天不应该)可以编译

for line in string.lines() {
    await!(process(line));
    println!("processed: {}", line);
}

问题在于 line 是一个在某个 yield 点(即对 await! 的调用)中活跃的借用值。这意味着当未来可能返回到堆栈时,它是 await! 过程的一部分,它必须在重新进入未来时恢复 line 变量。这尚未完全实现,并且可能今天是不安全的。目前,作为经验法则,您需要仅在 await! 调用或异步 for 循环期间拥有值(没有借用内部)。

目前正在投入大量精力来找出如何减轻这种限制!借用是 Rust 中许多易于使用的模式的核心,我们希望它能够正常工作!

作为最后的要点,今天“没有借用参数”的后果之一是,以下函数签名

#[async]
fn foo(&self) -> io::Result<()> {
    // ...
}

不幸的是,将无法工作。您要么需要通过值传递 self,要么依赖于不同的 #[async] 函数。

特质中的未来

假设您有一个这样的特质

trait MyStuff {
    fn do_async_task(??self) -> Box<Future<...>>;
}

我们将在一段时间内忽略 self 的细节,但本质上,我们有一个在特质中返回未来的函数。不幸的是,使用它实际上相当困难!目前有一些注意事项

  • 理想情况下,您想为它标记 #[async],但这不起作用,因为今天编译器中没有实现返回 impl Future 的特质函数。我被告知,这最终会工作!
  • 那么接下来最好的选择是返回一个包装的特质对象,而不是在短时间内返回 impl Future。这可能并不是我们想要的运行时特性,但我们也有其他问题...
  • 但现在这让我们面临处理 self 的问题。由于今天 #[async] 的限制,我们只有两种选择,selfself: Box<Self>。前者很不幸不是对象安全的(现在我们无法使用这个特质的虚拟分发),而后者通常效率低下(每次调用都需要新的分配)。理想情况下,self: Rc<Self> 正是我们需要的!但不幸的是,编译器还没有实现这个功能 😦

所以基本上,总结一下,你现在有两种方法可以在特质中返回未来

trait MyStuff {
    // Trait is not object safe because of `self` so can't have virtual
    // dispatch, and the allocation of `Box<..>` as a return value is required
    // until the compiler implements returning `impl Future` from traits.
    //
    // Note that the upside of this approach, though, is that `self` could be
    // something like `Rc` or have a bunch fo `Rc` inside of `self`, so this
    // could be cheap to call.
    #[async]
    fn do_async_task(self) -> Box<Future<...>>;
}

或者另一种选择

trait MyStuff {
    // Like above we returned a trait object but here the trait is indeed object
    // safe, allowing virtual dispatch. The downside is that we must have a
    // `Box` on hand every time we call this function, which may be costly in
    // some situations.
    #[async]
    fn do_async_task(self: Box<Self>) -> Box<Future<...>>;
}

特质中未来(futures-in-traits)的理想最终目标是这个

trait MyStuff {
    #[async]
    fn do_async_task(self: Rc<Self>) -> Result<i32, u32>;
}

但这需要两个部分的实现

  • 编译器必须接受返回 impl Trait 的特质函数
  • 编译器需要支持 self: Rc<Self>,基本上是在特质中对象安全的自定义智能指针。

你现在可以通过以下方式通过非对象安全的实现来模拟这个功能

trait Foo {
    #[async]
    fn do_async_task(me: Rc<Self>) -> Result<i32, u32>;
}

fn foo<T: Foo>(t: Rc<T>) {
    let x = Foo::trait_fn(t);
    // ...
}

但这并不是最方便的!

关联类型

当使用特质中的未来时,另一个限制是与关联类型相关。比如说,你有一个这样的特质:

trait Service {
    type Request;
    type Error;
    type Future: Future<Item = Self::Response, Error = Self::Error>;

    fn call(&self) -> Self::Future;
}

如果你想要使用 callasync_block! 实现,或者通过返回另一个由 #[async] 生成的函数返回的未来,你可能想要使用 impl Future。不幸的是,目前无法用 impl Trait 表达一个关联常量,如 Service::Future

目前最好的解决方案是使用 Box<Future<...>>

impl Service for MyStruct {
    type Request = ...;
    type Error = ...;
    type Future = Box<Future<Item = Self::Item, Error = Self::Error>>;

    fn call(&self) -> Self::Future {
        // ...
        Box::new(future)
    }
}

许可证

本项目许可采用以下之一:

供您选择。

贡献

除非您明确声明,否则您根据Apache-2.0许可证定义的,有意提交供 futures-await 包含的贡献,应按上述方式双授权,没有任何额外条款或条件。

依赖关系

~3MB
~64K SLoC