#future #async #async-await #macro #few #futures-await #definition

nightly macro futures-await-async-macro

#[async] 宏的定义,用于 futures-await 库以及其他一些杂项宏

6 个版本

使用旧的 Rust 2015

0.2.0-alpha2018年3月8日
0.1.4 2018年5月25日
0.1.2 2018年4月11日
0.1.1 2017年11月19日
0.1.0 2017年10月28日

5#futures-await

Download history 71/week @ 2024-03-11 60/week @ 2024-03-18 27/week @ 2024-03-25 109/week @ 2024-04-01 39/week @ 2024-04-08 64/week @ 2024-04-15 80/week @ 2024-04-22 50/week @ 2024-04-29 54/week @ 2024-05-06 54/week @ 2024-05-13 70/week @ 2024-05-20 49/week @ 2024-05-27 61/week @ 2024-06-03 43/week @ 2024-06-10 49/week @ 2024-06-17 57/week @ 2024-06-24

每月215 次下载
futures-await 中使用

MIT/Apache

27KB
569

futures-await

现在 async/await 已经正式成为 Rust 语言的组成部分,这个库就不再必要了,感谢所有为此做出贡献的人!

Rust 和 futures 库的 async/await 语法

这是什么?

在 Rust 中,今天使用 futures 的主要方式是通过 Future 特性上的各种组合器。这并不完全是 "回调地狱",但随着每个新闭包附加到代码中的右边移动,有时会感觉像它。async/await 的目的是提供一种更 "同步" 的代码感觉,同时保留所有异步代码的功能!

这里有一个这个库所做的小例子

#[async]
fn fetch_rust_lang(client: hyper::Client) -> io::Result<String> {
    let response = await!(client.get("https://rust-lang.net.cn"))?;
    if !response.status().is_success() {
        return Err(io::Error::new(io::ErrorKind::Other, "request failed"))
    }
    let body = await!(response.body().concat())?;
    let string = String::from_utf8(body)?;
    Ok(string)
}

这里值得注意的是

  • 函数被标记为 #[async],这意味着它们是 异步 的,在调用时返回一个 future,而不是返回列出的结果。
  • await! 允许在 future 完成时进行阻塞。但这并不会实际阻塞线程,它只是阻止从 fetch_rust_lang 返回的 future 继续执行。可以将 await! 宏视为从 future 到其 Result 的函数,在消费 future 的过程中。
  • #[async] 函数中,错误处理要自然得多。我们可以使用 ? 操作符以及简单的 return Err 语句。使用 futures 包中的组合器,这通常要复杂得多。
  • fetch_rust_lang 返回的 future 实际上是一个在编译时生成的状态机,并且不会进行隐式的内存分配。这是基于以下描述的生成器构建的。

您还可以有异步方法

impl Foo {
    #[async]
    fn do_work(self) -> io::Result<u32> {
        // ...
    }
}

您还可以指定您实际上希望返回的是一个 trait 对象,例如 Box<Future<Item = i32, Error = io::Error>>

#[async(boxed)]
fn foo() -> io::Result<i32> {
    // ...
}

您还可以有“异步 for 循环”,该循环操作在 Stream trait 上

#[async]
for message in stream {
    // ...
}

异步 for 循环将错误传播到函数外部,因此 messagestream 传入的 Item 类型。请注意,异步 for 循环只能在 #[async] 函数中使用。

最后,您可以通过 #[async_stream(item = _)] 创建一个 Stream 而不是 Future

#[async]
fn fetch(client: hyper::Client, url: &'static str) -> io::Result<String> {
    // ...
}

/// Fetch all provided urls one at a time
#[async_stream(item = String)]
fn fetch_all(client: hyper::Client, urls: Vec<&'static str>) -> io::Result<()> {
    for url in urls {
        let s = await!(fetch(client, url))?;
        stream_yield!(s);
    }
    Ok(())
}

#[async_stream] 必须通过 item = some::Path 指定一个项目类型,并且从流中输出的值必须被包装到 Result 中,并通过 stream_yield! 宏进行产生。此宏也支持与 #[async] 相同的功能,额外的 boxed 参数以返回一个 Box<Stream>,异步 for 循环等。

如何使用它?

此实现目前基本上是基于生成器/协程构建的。此功能于2017年8月29日刚刚落地,并将作为 Rust 的 nightly 通道的一部分。您可以通过以下方式获取 nightly 通道;

rustup update nightly

完成此操作后,您可能需要确保 Cargo 使用 nightly 工具链,如下所示:

cargo +nightly build

接下来,您需要将以下内容添加到您的 Cargo.toml

[dependencies]
futures-await = "0.1"

然后...

#![feature(proc_macro, generators)]

extern crate futures_await as futures;

use futures::prelude::*;

#[async]
fn foo() -> Result<i32, i32> {
    Ok(1 + await!(bar())?)
}

#[async]
fn bar() -> Result<i32, i32> {
    Ok(2)
}

fn main() {
    assert_eq!(foo().wait(), Ok(3));
}

这个crate的目的是“伪装”成futures crate,重新导出其整个层次结构,并仅增加必要的运行时支持、async属性和await!宏。当您导入时,这些导入都包含在futures::prelude::*中。

对于一大堆代码中的许多示例,您还可以查看sccacheasync-await分支,该分支正在进行许多位置的异步/await语法的过渡。您通常会发现在此之后代码的可读性要好得多,特别是这些更改

技术细节

如前所述,这个crate在Rust的生成器特性基础上构建。生成器,在本例中通常称为无栈协程,允许编译器为函数生成一个最优的Future实现,将看似同步的代码块转换成可以异步执行的Future。这里的反编译过程从您编写的代码到rustc编译的代码非常简单,您可以浏览futures-async-macro crate的源代码以获取更多详细信息。

此外,这个crate还提供了一些主要的“API”

  • #[async] - 这个属性可以应用于方法和函数,表示它是一个异步函数。函数的签名必须返回某种形式的Result(尽管它可以返回结果的typedef)。此外,函数的参数必须全部是拥有值,换句话说,不能包含任何引用。这种限制可能在将来被放宽!

    一些例子如下

    // attribute on a bare function
    #[async]
    fn foo(a: i32) -> Result<u32, String> {
        // ...
    }
    
    // returning a typedef works too!
    #[async]
    fn foo() -> io::Result<u32> {
        // ...
    }
    
    impl Foo {
        // methods also work!
        #[async]
        fn foo(self) -> io::Result<u32> {
            // ...
        }
    }
    
    // For now, however, these do not work, they both have arguments which contain
    // references! This may work one day, but it does not work today.
    //
    // #[async]
    // fn foo(a: &i32) -> io::Result<u32> { /* ... */ }
    //
    // impl Foo {
    //     #[async]
    //     fn foo(&self) -> io::Result<u32> { /* ... */ }
    // }
    

    请注意,一个#[async]函数的行为与其同步版本非常相似。例如,?操作符在内部工作,您可以使用早期的return语句等。

    在底层,一个#[async]函数被编译成一个状态机,该状态机表示该函数的未来,状态机的挂起点将位于await!宏的位置。

  • async_block! - 这个宏与 #[async] 类似,因为它都会创建一个未来(future),但与 #[async] 不同的是,它可以在表达式上下文中使用,并且不需要专门的函数。这个宏可以被视为“异步运行此代码块”,其中提供的代码块,就像异步函数一样,返回某种形式的结果。例如

    let server = TcpListener::bind(..);
    let future = async_block! {
        #[async]
        for connection in server.incoming() {
            spawn(handle_connection(connection));
        }
        Ok(())
    };
    core.run(future).unwrap();
    

    或者如果您想在函数中执行一些预计算

    fn hash_file(path: &Path, pool: &CpuPool)
        -> impl Future<Item = u32, Error = io::Error>
    {
        let abs_path = calculate_abs_path(path);
        async_block! {
            let contents = await!(read_file)?;
            Ok(hash(&contents))
        }
    }
    
  • await! - 这是一个在 futures-await-macro 库中提供的宏,允许等待一个未来(future)完成。await! 宏只能在 #[async] 函数或 async_block! 中使用,可以将其视为一个类似函数的对象

    fn await!<F: Future>(future: F) -> Result<F::Item, F::Error> {
        // magic
    }
    

    此宏的一些示例

    #[async]
    fn fetch_url(client: hyper::Client, url: String) -> io::Result<Vec<u8>> {
        // note the trailing `?` to propagate errors
        let response = await!(client.get(url))?;
        await!(response.body().concat())
    }
    
  • #[async] for 循环 - 能够异步迭代一个 Stream。您可以通过将 #[async] 属性附加到一个 for 循环上,其中正在迭代的对象实现了 Stream 特性来实现这一点。

    流中的错误将自动传播,否则,for 循环将遍历流直到完成,将每个元素绑定到提供的模式上。

    一些例子如下

    #[async]
    fn accept_connections(listener: TcpListener) -> io::Result<()> {
        #[async]
        for connection in server.incoming() {
            // `connection` here has type `TcpStream`
        }
        Ok(())
    }
    

    请注意,与 await! 一样,#[async] for 循环只能在异步函数或异步块中使用。

  • #[async_stream(item = ...)] - 定义了一个实现 Stream 而不是 Future 的函数。此函数使用 stream_yield! 宏产生项,并与其他 await! 宏和 #[async] for 循环一起工作。

    声明的函数必须返回一个 Result<(), E>,其中 E 变成了返回的 Stream 的错误。通过从函数返回 Ok(()) 或返回一个错误来终止流。函数内部可以使用 ? 等操作来传播错误。

    示例

    #[async_stream(item = u32)]
    fn accept_connections(listener: TcpListener) -> io::Result<()> {
        #[async]
        for object in fetch_all_objects() {
            let description = await!(fetch_object_description(object));
            stream_yield!(description);
        }
        Ok(())
    }
    

夜间功能

目前这个包需要两个nightly特性才能使用

  • #![feature(generators)] - 这是一个实验性的语言特性,尚未稳定,但它是async/await实现的基础。最近实现了这个特性,有关稳定化的进度可以在其跟踪问题上找到。

  • #![feature(proc_macro)] - 这也被称作 "宏 2.0",是此包中定义 #[async] 属性的方式,而不是编译器本身。我们还利用了其他宏 2.0 特性,例如通过 use 导入宏,而不是 #[macro_use]。有关此特性的跟踪问题包括#38356#35896

接下来是什么?

这个包仍然相当新,生成器刚刚在Rust的nightly通道上落地。尽管目前没有计划对这个包进行重大更改,但我们仍需看看这一切会如何发展!将生成器尽快实现到语言中的主要动机之一是促进async/await的实现,我们需要您的帮助来评估这一点!

值得注意的是,我们希望对此包中的async/await以及生成器作为语言特性本身收集反馈。我们想确保,如果我们稳定了生成器或过程宏,它们能够提供最佳的async/await体验!

目前建议您在使用此包进行生产时谨慎行事。这个包本身只进行了非常有限的测试,并且到目前为止,语言特性 generators 也只进行了有限的测试。建议您保持警惕,注意查找错误!如果您遇到任何问题或疑问,欢迎创建问题

注意事项

正如许多nightly特性所预期的那样,当与此项目一起工作时,有一些需要注意的问题。尽管欢迎提交错误报告或经验报告,但了解需要解决的问题总是好的!

借用

当前借用并不十分有效。编译器可能会拒绝许多借用,或者由于生成器特性正在开发中,可能存在一些安全隐患。例如,如果您有一个函数

#[async]
fn foo(s: &str) -> io::Result<()> {
    // ..
}

这可能无法编译!原因是返回的未来通常需要遵守 'static 约束。异步函数在调用时不会执行任何代码,只有在轮询时才会取得进展。这意味着当您调用异步函数时,它会创建一个未来,包装参数,然后将其返回给您。在这种情况下,它必须返回一个 &str,这并不总是符合生存期的要求。

使上述函数编译的一个示例可能是这样做

#[async]
fn foo(s: String) -> io::Result<()> {
    // ...
}

或以其他方式使用所有权的值而不是借用引用。

注意,借用的问题不仅仅是参数。例如,以下代码今天(或者至少不应该)无法编译:

for line in string.lines() {
    await!(process(line));
    println!("processed: {}", line);
}

这里的问题是line是一个在yield点(即对await!的调用)持续存在的借用值。这意味着当future可能返回到栈上时,它是await!过程的一部分,它必须在重新进入future时恢复line变量。这实际上并没有完全实现,并且在今天可能是不安全的。作为目前的经验法则,你只需在调用await!或异步for循环期间,只拥有值(没有借用内部值)即可。

正在投入大量精力来解决这个问题!借用是Rust中许多舒适模式的精髓,我们希望这能够实现!

最后一点,今天“没有借用参数”的一个后果是,函数签名如下:

#[async]
fn foo(&self) -> io::Result<()> {
    // ...
}

很遗憾,将无法工作。你可能需要按值取self,或者推迟到不同的#[async]函数。

特质的Future

假设你有一个这样的特质:

trait MyStuff {
    fn do_async_task(??self) -> Box<Future<...>>;
}

在这里我们暂时忽略self的细节,但本质上我们有一个在特质中返回future的函数。不幸的是,实际上使用它相当困难!目前有几个需要注意的问题:

  • 理想情况下,你想标记这个#[async],但这不起作用,因为编译器今天尚未实现返回impl Future的特质函数。我被告知这最终会实现,但至少目前还没有!
  • 那么,接下来最好的选择是使用#[async(boxed)]来返回一个boxed特质对象,而不是返回impl Future。这在运行时可能并不是我们想要的,但我们也有其他问题...
  • 但现在这让我们来到了处理self的问题。由于今天#[async]的限制,我们只有两种选择:selfself: Box<Self>。前者很遗憾不是对象安全的(现在我们不能使用这个特质的虚拟调度),后者通常很浪费(每次调用现在都需要新的分配)。理想情况下,我们想要的是self: Rc<Self>!但遗憾的是,编译器还没有实现这一点 😦

总的来说,目前在 traits 中返回 futures 有两种选择

trait MyStuff {
    // Trait is not object safe because of `self` so can't have virtual
    // dispatch, and the allocation of `Box<..>` as a return value is required
    // until the compiler implements returning `impl Future` from traits.
    //
    // Note that the upside of this approach, though, is that `self` could be
    // something like `Rc` or have a bunch fo `Rc` inside of `self`, so this
    // could be cheap to call.
    #[async]
    fn do_async_task(self) -> Box<Future<...>>;
}

或者说是另一种选择

trait MyStuff {
    // Like above we returned a trait object but here the trait is indeed object
    // safe, allowing virtual dispatch. The downside is that we must have a
    // `Box` on hand every time we call this function, which may be costly in
    // some situations.
    #[async]
    fn do_async_task(self: Box<Self>) -> Box<Future<...>>;
}

在 traits 中实现 futures 的理想目标是这样的

trait MyStuff {
    #[async]
    fn do_async_task(self: Rc<Self>) -> Result<i32, u32>;
}

但需要实现两个部分

  • 编译器必须接受返回 implTrait 的 trait 函数
  • 编译器需要支持 self: Rc<Self>,基本上是 traits 中的对象安全自定义智能指针。

你可以通过以下方式在当今实现这个功能,但这种方法并不十分方便!

trait Foo {
    #[async]
    fn do_async_task(me: Rc<Self>) -> Result<i32, u32>;
}

fn foo<T: Foo>(t: Rc<T>) {
    let x = Foo::trait_fn(t);
    // ...
}

但这并不是最人性化的!

关联类型

当在 traits 中使用 futures 时,另一个限制是与关联类型相关。比如说,你有一个这样的 trait

trait Service {
    type Request;
    type Response;
    type Error;
    type Future: Future<Item = Self::Response, Error = Self::Error>;

    fn call(&self, req: Self::Request) -> Self::Future;
}

如果你想要用 implFuture 实现 async_block!,或者从一个用 #[async] 生成的函数中返回一个 future,你可能想使用 implTrait。不幸的是,目前无法用 Service::Future 这样的关联常量来表示。

目前最好的解决方案是使用 Box<Future<...>>

impl Service for MyStruct {
    type Request = ...;
    type Response = ...;
    type Error = ...;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        // ...
        Box::new(future)
    }
}

许可证

本项目可使用以下任一许可证

任由您选择。

贡献

除非您明确声明,否则根据 Apache-2.0 许可证定义,您提交给 futures-await 的任何贡献将按照上述方式双许可,无需附加条款或条件。


lib.rs:

用于 #[async] 属性的过程宏。

这个包是作为过程宏实现的 #[async] 属性。目前,它只用于 nightly 版本,因为它使用过程宏的不稳定特性。此外,它生成使用新关键字 yield 和新结构 generator 的代码,这两者也都是不稳定的。

目前这个包依赖于 synquote 来进行所有繁重的工作,这只是一个创建从 generator 到闭包/未来的非常小的适配器。

依赖关系

~2MB
~47K SLoC