3个不稳定版本
0.2.1 | 2020年5月16日 |
---|---|
0.2.0 | 2020年5月16日 |
0.1.0 | 2020年5月13日 |
#619 in 异步
44KB
631 行
cb_fut
此crate提供实用宏,将需要回调来处理返回值的函数调用转换为返回值或生成值的Future或Stream函数。它将额外的语法-> ()
和-> () ->
引入宏调用签名中。
限制
以下是此crate的限制。
- 如果回调不是用于返回值,不要使用这两个宏。这将破坏函数。
- 如果函数也返回值,则返回的值将被静默丢弃。
- 该宏只支持单个回调转换。如果函数需要多个回调在不同情况下返回值,则无法使用。
- 与使用原始回调相比,这将更慢。这是因为它需要某种间接方式将回调通过通道指向Future。
如何使用
此crate提供四组宏,分为两个类别。
非阻塞
它使用futures::channel::unbound
在函数回调和Future之间进行通信。此类别有两个宏。
once
- 将函数调用转换为返回作为回调参数使用的值的Future。stream
- 将函数调用转换为返回作为回调参数传递的值的Stream。
阻塞
它使用两个通道在CBBlockResult
和回调之间保持同步。它启动一个新线程来执行函数,并阻塞函数等待调用者执行CBBlockResult
上的return_value
方法或直到CBBlockResult
被丢弃。此类别有两个宏。
once_blocked
- 从带有回调函数的调用中获取结果的Future
。它与once
宏不同,因为它会阻塞函数,直到结果被丢弃或显式调用方法return_value
。当CBBlockResult
被丢弃时,此宏将加入执行函数的线程。stream_blocked
- 获取一个CBStreamBlocked
,该阻塞会产生一个结果,CBBlockResult
包含类似于发送到回调的值。与常规的stream
宏相比,其结果不同,因为它会阻塞函数,直到结果被丢弃或显式调用return_value
方法。
这两个的主要区别在于,如果回调将恰好被调用一次,则应使用once
和once_blocked
,而如果回调将被多次调用,则应使用stream
和stream_blocked
。
这两个类别需要一个特殊的语法来识别期望的返回变量数。语法是-> ()
和->()->v
。前者类似于返回类型的函数签名,例如Fn(i32) -> ()
。后者是从前者扩展而来的,因为回调本身会返回一些东西给函数,这可能会影响该函数的执行。当调用者没有调用return_value
方法时,将使用->v
部分作为默认值。这是为了保证函数会接收到返回值以继续执行。
这些转换会产生一个Future
或Stream
,该值会解析为值的元组。请参阅下面的示例。
示例
将带有回调的函数转换为future。
// A function that need a callback
fn func(v: i32, cb: impl FnOnce(i32, i32)) {
std::thread::sleep(std::time::Duration::from_secs(2));
cb(v, v * 2)
}
// Use once! to convert the call to `func` to return a `Future` instead.
// We use ->(a, b) to tell macro that `Future` shall return two variables.
let (a, b) = futures::executor::block_on(once!(func(2 + 3, ->(a, b))));
assert_eq!(5, a);
assert_eq!(10, b);
回调可以在函数签名中的任何位置。回调占位符只需要反映这一点即可。
// A function that put callback in the middle between other two parameters
fn func(u: i32, cb: impl FnOnce(i32, i32), v: i32) {
std::thread::sleep(std::time::Duration::from_secs(2));
cb(u, v)
}
// We use `->(a, b)` between 1, and, 2 + 3 to tell macro that this parameter is a callback and it take 2 parameters.
let (a, b) = futures::executor::block_on(once!(func(1, ->(a, b), 2 + 3)));
assert_eq!(1, a);
assert_eq!(5, b);
如果回调不带参数,我们需要放置-> ()
// a function that take no arguments callback
fn func(_v: i32, cb: impl FnOnce()) {
std::thread::sleep(std::time::Duration::from_secs(2));
cb()
}
// A callback placeholder with no argument
futures::executor::block_on(once!(func(2 + 3, -> ())));
如果回调将被多次调用,请使用stream!
use futures::stream::StreamExt;
// A function that take callback as first argument.
// It'll call callback 5 times with two arguments, an original value and the original value times number of called.
fn func(mut cb: impl FnMut(i32, i32), v: i32) {
for i in 0..5 {
cb(v, v * i)
}
}
let mut counter = 0;
// `stream!` will return `CBStream` which implement `Stream` trait. We use `enumerate` and `for_each` from `StreamExt` trait to iterate over each values tuples that suppose to be passed to callback function.
// The `for_each` method signature require a return value of type `Future` for given callback. The final return value from `for_each` is a single consolidated `Future` which when resolve, all `Future`s inside it are all resolved.
futures::executor::block_on(stream!(func(->(a, b), 2 + 3)).enumerate().for_each(|(i, fut)| {
counter += 1;
async move {
let (a, b) = fut;
assert_eq!(5, a);
assert_eq!(5 * i as i32, b);
}
}));
assert_eq!(5, counter);
回调在回调执行后对函数有副作用。
fn func(v: i32, cb: impl FnOnce(i32, i32) -> i32) {
if cb(v, v * 2) == 0i32 {
dbg!("Ok !");
} else {
panic!("Something wrong")
}
}
let mut ret = futures::executor::block_on(once_blocked!(func(2 + 3, ->(a, b) -> 1i32)));
let (a, b) = *ret;
assert_eq!(5, a);
assert_eq!(10, b);
if a + b == 15 && a * b == 50 {
ret.return_value(0).unwrap();
}
assert_eq!(ret.return_value(0).unwrap_err(), super::AlreadyReturnError);
控制Stream控制流的回调
use futures::stream::StreamExt;
fn func(u: i32, mut cb: impl FnMut(i32, i32)->i32, v: i32) {
let mut j = 0;
while j < 5 {
j = cb(u + j, v * j)
}
}
let mut counter = 0;
futures::executor::block_on(cb_fut::stream_blocked!(func(2 * 3, ->(a, b)->0i32, 2 + 3)).enumerate().for_each(|(i, mut fut)| {
counter += 1;
async move {
let (a, b) = *fut;
assert_eq!(2 * 3 + i as i32, a);
assert_eq!((2 + 3) * i as i32, b);
fut.return_value(i as i32 + 1);
}
}));
依赖关系
~0.6–1MB
~17K SLoC