3个不稳定版本

0.2.1 2020年5月16日
0.2.0 2020年5月16日
0.1.0 2020年5月13日

#619 in 异步

BSD-3-Clause

44KB
631

cb_fut

此crate提供实用宏,将需要回调来处理返回值的函数调用转换为返回值或生成值的Future或Stream函数。它将额外的语法-> ()-> () ->引入宏调用签名中。

限制

以下是此crate的限制。

  • 如果回调不是用于返回值,不要使用这两个宏。这将破坏函数。
  • 如果函数也返回值,则返回的值将被静默丢弃。
  • 该宏只支持单个回调转换。如果函数需要多个回调在不同情况下返回值,则无法使用。
  • 与使用原始回调相比,这将更慢。这是因为它需要某种间接方式将回调通过通道指向Future。

如何使用

此crate提供四组宏,分为两个类别。

非阻塞

它使用futures::channel::unbound在函数回调和Future之间进行通信。此类别有两个宏。

  1. once - 将函数调用转换为返回作为回调参数使用的值的Future。
  2. stream - 将函数调用转换为返回作为回调参数传递的值的Stream。

阻塞

它使用两个通道在CBBlockResult和回调之间保持同步。它启动一个新线程来执行函数,并阻塞函数等待调用者执行CBBlockResult上的return_value方法或直到CBBlockResult被丢弃。此类别有两个宏。

  1. once_blocked - 从带有回调函数的调用中获取结果的Future。它与once宏不同,因为它会阻塞函数,直到结果被丢弃或显式调用方法return_value。当CBBlockResult被丢弃时,此宏将加入执行函数的线程。
  2. stream_blocked - 获取一个CBStreamBlocked,该阻塞会产生一个结果,CBBlockResult包含类似于发送到回调的值。与常规的stream宏相比,其结果不同,因为它会阻塞函数,直到结果被丢弃或显式调用return_value方法。

这两个的主要区别在于,如果回调将恰好被调用一次,则应使用onceonce_blocked,而如果回调将被多次调用,则应使用streamstream_blocked

这两个类别需要一个特殊的语法来识别期望的返回变量数。语法是-> ()->()->v。前者类似于返回类型的函数签名,例如Fn(i32) -> ()。后者是从前者扩展而来的,因为回调本身会返回一些东西给函数,这可能会影响该函数的执行。当调用者没有调用return_value方法时,将使用->v部分作为默认值。这是为了保证函数会接收到返回值以继续执行。

这些转换会产生一个FutureStream,该值会解析为值的元组。请参阅下面的示例。

示例

将带有回调的函数转换为future。

// A function that need a callback
fn func(v: i32, cb: impl FnOnce(i32, i32)) {
    std::thread::sleep(std::time::Duration::from_secs(2));
    cb(v, v * 2)
}

// Use once! to convert the call to `func` to return a `Future` instead.
// We use ->(a, b) to tell macro that `Future` shall return two variables.
let (a, b) = futures::executor::block_on(once!(func(2 + 3, ->(a, b))));

assert_eq!(5, a);
assert_eq!(10, b);

回调可以在函数签名中的任何位置。回调占位符只需要反映这一点即可。

// A function that put callback in the middle between other two parameters
fn func(u: i32, cb: impl FnOnce(i32, i32), v: i32) {
    std::thread::sleep(std::time::Duration::from_secs(2));
    cb(u, v)
}
// We use `->(a, b)` between 1, and, 2 + 3 to tell macro that this parameter is a callback and it take 2 parameters.
let (a, b) = futures::executor::block_on(once!(func(1, ->(a, b), 2 + 3)));
assert_eq!(1, a);
assert_eq!(5, b);

如果回调不带参数,我们需要放置-> ()

// a function that take no arguments callback
fn func(_v: i32, cb: impl FnOnce()) {
    std::thread::sleep(std::time::Duration::from_secs(2));
    cb()
}
// A callback placeholder with no argument
futures::executor::block_on(once!(func(2 + 3, -> ())));

如果回调将被多次调用,请使用stream!

use futures::stream::StreamExt;
// A function that take callback as first argument.
// It'll call callback 5 times with two arguments, an original value and the original value times number of called.
fn func(mut cb: impl FnMut(i32, i32), v: i32) {
    for i in 0..5 {
        cb(v, v * i)
    }
}
let mut counter = 0;

// `stream!` will return `CBStream` which implement `Stream` trait. We use `enumerate` and `for_each` from `StreamExt` trait to iterate over each values tuples that suppose to be passed to callback function. 
// The `for_each` method signature require a return value of type `Future` for given callback. The final return value from `for_each` is a single consolidated `Future` which when resolve, all `Future`s inside it are all resolved.
futures::executor::block_on(stream!(func(->(a, b), 2 + 3)).enumerate().for_each(|(i, fut)| {
    counter += 1;
    async move {
        let (a, b) = fut;
        assert_eq!(5, a);
        assert_eq!(5 * i as i32, b);
    }
}));

assert_eq!(5, counter);

回调在回调执行后对函数有副作用。

fn func(v: i32, cb: impl FnOnce(i32, i32) -> i32) {
    if cb(v, v * 2) == 0i32 {
        dbg!("Ok !");
    } else {
        panic!("Something wrong")
    }
}
let mut ret = futures::executor::block_on(once_blocked!(func(2 + 3, ->(a, b) -> 1i32)));
let (a, b) = *ret;
assert_eq!(5, a);
assert_eq!(10, b);
if a + b == 15 && a * b == 50 {
    ret.return_value(0).unwrap();
}
assert_eq!(ret.return_value(0).unwrap_err(), super::AlreadyReturnError);

控制Stream控制流的回调

use futures::stream::StreamExt;
fn func(u: i32, mut cb: impl FnMut(i32, i32)->i32, v: i32) {
    let mut j = 0;
    while j < 5 {
        j = cb(u + j, v * j)
    }
}
let mut counter = 0;

futures::executor::block_on(cb_fut::stream_blocked!(func(2 * 3, ->(a, b)->0i32, 2 + 3)).enumerate().for_each(|(i, mut fut)| {
    counter += 1;
    async move {
        let (a, b) = *fut;
        assert_eq!(2 * 3 + i as i32, a);
        assert_eq!((2 + 3) * i as i32, b);
        fut.return_value(i as i32 + 1);
    }
}));

依赖关系

~0.6–1MB
~17K SLoC