22个版本
0.1.1 | 2020年1月27日 |
---|---|
0.1.0 | 2020年1月27日 |
0.1.0-beta.5 | 2019年12月5日 |
0.1.0-beta.4 | 2019年11月29日 |
0.1.0-alpha.5 | 2019年10月28日 |
#8 在 #re-exported
128 每月下载次数
245KB
4K SLoC
join!
、join_async!
、join_spawn!
、join_async_spawn!
、async_spawn!
、try_join!
、try_join_async!
、try_join_spawn!
、try_join_async_spawn!
、try_async_spawn!
等宏的导出,这些宏由 join
包重新导出。
join!
宏 提供有用的快捷组合子,组合同步/异步链,支持单线程和多线程(同步/异步)逐步执行分支,将结果元组转换为元组结果。
join!
宏将仅返回最终值。如果您正在处理迭代器/流等,请使用它。try_join!
宏将转换元组的Option
/Result
为元组的Option
/Result
。当您处理选项或结果时,请使用它。如果在步骤末尾,某个分支产生None
/Err
,则后续步骤的执行将被中止。在异步宏的情况下,您只能提供Result
,因为::futures::try_join
不支持Option
。
请使用这些文档进行开发,它们更方便。
特性
- 性能。宏生成经过优化的代码(在步骤中不使用不活跃的分支,不克隆结果/选项或其他任何值,不在堆上分配任何内存[除非将future包装进
Box::pin
]) - 您可以使用cargo expand
来检查。 - 步骤允许编写依赖于前一次迭代中分支结果的代码。
- 一行链,无法使用纯
Rust
(没有宏)创建。 - 简洁。表达相同流程的代码更少。快捷组合子 = 更少的括号。
async
宏生成futures,因此它们可以用于非async
函数。- 可配置性。有许多选项可以独立配置,以完全更改宏的行为。
宏
try_join!
- 结合Result
/Option
,将Result
/Option
的元组转换为元组的Result
/Option
。
assert_eq!(
try_join!(Ok::<_,()>(1), Ok::<_,()>("2"), Ok::<_,()>(3.0)),
Ok::<_,()>((1, "2", 3.0))
);
try_join_async!
- 结合futures,将Result
的元组转换为元组的Result
。
assert_eq!(
try_join_async!(ok::<_,()>(1), ok::<_,()>("2"), ok::<_,()>(3.0)).await,
Ok::<_,()>((1, "2", 3.0))
);
try_join_spawn!
- 为每个分支启动一个std::thread
,并将结果合并,将Result
/Option
的元组转换为元组的Result
/Option
。
assert_eq!(
try_join_spawn!(Ok::<_,()>(1), Ok::<_,()>("2"), Ok::<_,()>(3.0)),
Ok::<_,()>((1, "2", 3.0))
);
try_spawn!
-try_join_spawn!
的别名。try_join_async_spawn!
- 使用tokio::spawn
为每个分支启动tokio任务,将Result
的元组转换为元组的Result
。
assert_eq!(
try_join_async_spawn!(ok::<_,()>(1), ok::<_,()>("2"), ok::<_,()>(3.0)).await,
Ok::<_,()>((1, "2", 3.0))
);
try_async_spawn!
-try_join_async_spawn!
的别名。join!
- 合并值。
assert_eq!(
join!(1, "2", 3.0), (1, "2", 3.0)
);
join_async!
- 合并futures。
assert_eq!(
join_async!(ready(1), ready("2"), ready(3.0)).await, (1, "2", 3.0)
);
join_spawn!
- 为每个分支启动一个std::thread
。
assert_eq!(
join_spawn!(1, "2", 3.0), (1, "2", 3.0)
);
spawn!
-join_spawn!
的别名。join_async_spawn!
- 使用tokio::spawn
为每个分支启动tokio任务。
assert_eq!(
join_async_spawn!(ready(1), ready("2"), ready(3.0)).await, (1, "2", 3.0)
);
async_spawn!
-join_async_spawn!
的别名。
组合子
- 然后:
->
join! { value -> expr }; // => expr(value)
- 映射:
|>
join! { value |> expr }; // => value.map(expr)
- 然后:
=>
join! { value => expr }; // => value.and_then(expr)
- 过滤:
?>
join! { value ?> expr }; // => value.filter(expr)
- 点:
..
或>.
join! { value .. expr }; // => value.expr
join! { value >. expr }; // => value.expr
- 或:
<|
join! { value <| expr }; // => value.or(expr)
- 或否则:
<=
join! { value <= expr }; // => value.or_else(expr)
- MapErr:
!>
join! { value !> expr }; // => value.map_err(expr)
- Collect:
=>[]
(类型为可选)
join! { value =>[] T }; // => value.collect::<T>()
join! { value =>[] }; // => value.collect()
- Chain:
>@>
join! { value >@> expr }; // => value.chain(expr)
- FindMap:
?|>@
join! { value ?|>@ expr }; // => value.find_map(expr)
- FilterMap:
?|>
join! { value ?|> expr }; // => value.filter_map(expr)
- Enumerate:
|n>
join! { value |n> }; // => value.enumerate()
- Partition:
?&!>
join! { value ?&!> expr }; // => value.partition(expr)
- Flatten:
^^>
join! { value ^^> }; // => value.flatten()
- Fold:
^@
join! { value ^@ init_expr, fn_expr }; // => value.fold(init_expr, fn_expr)
- TryFold:
?^@
join! { value ?^@ init_expr, fn_expr }; // => value.try_fold(init_expr, fn_expr)
- Find:
?@
join! { value ?@ expr }; // => value.find(expr)
- Zip:
>^>
join! { value >^> expr }; // => value.zip(expr)
- Unzip:
<->
(类型为可选)
join! { value <-> A, B, FromA, FromB }; // => value.unzip::<A, B, FromA, FromB>()
join! { value <-> }; // => value.unzip()
- Inspect:
??
join! { value ?? expr }; // => (|value| { (expr)(&value); value })(value) // for sync
join_async! { value ?? expr }; // => value.inspect(expr) // for async
其中 value
是上一个值。
所有以 ~
开头的组合子将作为延迟操作(所有操作将在每一步完成后等待,然后才移动到下一步)。
嵌套组合子
- Wrap:
combinator
>>>
combinator
(s)...
try_join! { value => >>> |> |v| v + 2 } // => value.and_then(|value| value.map(|v| v + 2))
用于进入嵌套构造,例如
a.and_then(
// >>>
|b| b.and_then(
// >>>
|c| c.and_then(
|v| Ok(v + 2)
)
)
)
- Unwrap:
<<<
try_join! {
value
=> >>>
|> |v| v + 2
<<<
|> |v| Some(v + 4)
} // => value.and_then(|value| value.map(|v| v + 2)).map(|v| Some(v + 4))
用于从嵌套构造中退出
a.and_then(
// >>>
|b| b.and_then(
// >>>
|c| c.and_then(
|v| Ok(v + 2)
)
// <<<
)
// <<<
).map(
|v| v + 1
)
处理器
可能是以下之一
map
=> 仅对try
宏有效。 将作为results.map(|(result0, result1, ..)| handler(result0, result1, ..))
assert_eq!(
try_join! {
Some(1),
Some(2),
Some(3),
map => |a, b, c| a + b + c
},
Some(6)
);
and_then
=> 仅对try
宏有效。 将作为results.and_then(|(result0, result1, ..)| handler(result0, result1, ..))
assert_eq!(
try_join! {
Some(1),
Some(2),
Some(3),
and_then => |a, b, c| Some(a + b + c)
},
Some(6)
);
then
=> 仅对非try
宏有效。 总是执行,作为handler(result0, result1, ..)
assert_eq!(
join! {
Some(1),
Some(2),
Some(3),
then => |a: Option<u8>, b: Option<u8>, c: Option<u8>|
Some(a.unwrap() + b.unwrap() + c.unwrap())
},
Some(6)
);
如果没有指定或指定为 - 对于 try
宏将返回 Result<(result0, result1, ..), Error>
,对于非 try
宏将返回 Option<(result0, result1, ..)>
自定义配置
您可以在宏调用开始处指定任何参数。
futures_crate_path
- 指定用于futures
crate 的自定义 crate 路径,该路径将用于所有与futures
相关的项,由async
join!
宏使用。仅对async
宏有效。custom_joiner
- 指定自定义的连接 函数 或 宏,当活动分支数量大于 1 时,将连接这些分支。transpose_results
- 指定宏是否应该将Result
或Option
的元组转换成元组的Result
或Option
。在连接器已返回元组的Result
且不需要转换时非常有用。lazy_branches
- 当传递值给连接器时,将每个分支包装成move || {}
。默认情况下,对于try_join_spawn!
、try_spawn!
和join_spawn!
宏以及spawn!
宏,因为它们使用thread::spawn
调用。只有当活动分支数量 > 1 时才有效。
#![recursion_limit="256"]
use join::try_join_async;
use futures::future::ok;
macro_rules! custom_futures_joiner {
($($futures: expr),+) => {
::futures::try_join!($($futures),*);
}
}
#[tokio::main]
async fn main() {
let value = try_join_async! {
futures_crate_path(::futures)
custom_joiner(custom_futures_joiner!)
transpose_results(false)
ok::<_,()>(2u16), ok::<_,()>(3u16),
map => |a, b| a + b
}.await.unwrap();
assert_eq!(value, 5);
}
Rayon 示例
#![recursion_limit="256"]
use join::{try_join, join};
fn fib(num: u8) -> usize {
let mut prev = 0;
let mut cur = if num > 0 { 1 } else { 0 };
for _ in 1..num as usize {
let tmp = cur;
cur = prev + cur;
prev = tmp;
}
cur
}
fn main() {
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
let calculated = pool.install(||
try_join! {
custom_joiner(rayon::join)
|| Some(fib(50)),
|| Some(
join! {
custom_joiner(rayon::join)
lazy_branches(true)
fib(20) -> |v| v + 25,
fib(30) -> |v| vec![v; 10].into_iter() |n> |> |(index, value)| value + index ..sum::<usize>(),
then => |a, b| a + b
}
),
map => |a, b| a * b
}
);
assert_eq!(calculated.unwrap(), 104808819944395875);
}
模式匹配
您可以为每个分支指定 let
模式,以与其他分支共享结果,或者在需要在不同步骤之间有 mut
值的情况下。
assert_eq!(
try_join! {
let mut branch_0 = Ok::<_,()>(1) ~|> |v| v + 1,
let branch_1 = Ok::<_,()>(2) ~|> { let value_0 = branch_0.as_ref().unwrap(); move |v| v + value_0 },
map => |b_0, b_1| b_0 * b_1
}.unwrap(),
6
);
块捕获
为了捕获变量(例如,示例中其他分支的值),您可以通过传递代码块而不是函数来传递
let mut some_value = Some("capture me");
assert_eq!(try_join! {
Some(0) |> |v| {
// assign `None` to some_value in step expr
some_value = None;
v
} |> {
// capture value before step and get str len
let captured_len = some_value.as_ref().unwrap().len();
move |v| v + captured_len
}
}.unwrap(), 10);
这些代码块将放在实际步骤表达式之前。
示例
同步示例
使用此宏,您可以编写类似以下内容
#![recursion_limit = "256"]
use rand::prelude::*;
use std::sync::Arc;
use join::try_join_spawn;
// Problem: generate vecs filled by random numbers in parallel, make some operations on them in parallel,
// find max of each vec in parallel and find final max of 3 vecs
// Solution:
fn main() {
// Branches will be executed in parallel, each in its own thread
let max = try_join_spawn! {
let branch_0 =
generate_random_vec(1000, 10000000u64)
.into_iter()
// .map(power2) (Multiply every element by itself)
|> power2
// .filter(is_even) (Filter even values)
?> is_even
// .collect::<Vec<_>>() (Collect values into `Vec<_>`)
=>[] Vec<_>
// Arc::new(Some(...))
// Use `Arc` to share data with branch 1
-> Arc::new -> Some
// Find max and clone its value
// .and_then(|v| v.iter().max().map(Clone::clone))
~=> >>> ..iter().max() |> Clone::clone,
generate_random_vec(10000, 100000000000000f64)
.into_iter()
// .map(get_sqrt) (Extract sqrt from every element)
|> get_sqrt
// Some(...)
-> Some
// .and_then(|v| v...)
~=> >>>
// .enumerate() (Add index in order to compare with the values of branch_0)
|n>
// .map(...)
|> {
// Get data from branch 0 by cloning arc
let branch_0 = branch_0.as_ref().unwrap().clone();
let len = branch_0.len();
// Compare every element of branch 1 with element of branch_0
// with the same index and take min
move |(index, value)|
if index < len && value as u64 > branch_0[index] {
branch_0[index]
} else {
value as u64
}
}..max(),
generate_random_vec(100000, 100000u32)
.into_iter()
-> Some
// .and_then(|v| v.max())
~=> >>> ..max(),
and_then => |max0, max1, max2|
// Find final max
[max0, max1, max2 as u64].iter().max().map(Clone::clone)
}
.unwrap();
println!("Max: {}", max);
}
fn generate_random_vec<T>(size: usize, max: T) -> Vec<T>
where
T: From<u8>
+ rand::distributions::uniform::SampleUniform
+ rand::distributions::uniform::SampleBorrow<T>
+ Copy,
{
let mut rng = rand::thread_rng();
(0..size)
.map(|_| rng.gen_range(T::from(0u8), max))
.collect()
}
fn is_even<T>(value: &T) -> bool
where
T: std::ops::Rem<Output = T> + std::cmp::PartialEq + From<u8> + Copy
{
*value % 2u8.into() == 0u8.into()
}
fn get_sqrt<T>(value: T) -> T
where
T: Into<f64>,
f64: Into<T>,
{
let value_f64: f64 = value.into();
value_f64.sqrt().into()
}
fn power2<T>(value: T) -> T
where
T: std::ops::Mul<Output = T> + Copy,
{
value * value
}
#![recursion_limit="256"]
extern crate rand;
extern crate join;
use rand::prelude::*;
use join::try_join;
fn main() {
let mut rng = rand::thread_rng();
let result = try_join! {
(0..10)
// .map(|index| { let value ... })
|> |index| { let value = rng.gen_range(0, index + 5); if rng.gen_range(0f32, 2.0) > 1.0 { Ok(value) } else { Err(value) }}
// .filter(|result| ...)
?> |result| match result { Ok(_) => true, Err(value) => *value > 2 }
// .map(|v| v.map(|value| value + 1))
|> >>> |> |value| value + 1
<<<
// .try_fold(0i32, |acc, cur| {...})
?^@ 0i32, |acc, cur| {
cur.map(|cur| acc + cur).or_else(|cur| Ok(acc - cur))
}
// .and_then(|value| if ...)
=> |value| if value > 0 { Ok(value as u8) } else { Err(0) }
// Wait for all branches to be successful and then calculate fib
~|> fib,
(0..6)
// .map(|index| { let value ... })
|> |index| { let value = rng.gen_range(0, index + 5); if rng.gen_range(0f32, 2.0) > 1.0 { Some(value) } else { None }}
// .filter_map(|v| v)
?|> >>>
<<<
..sum::<u16>()
// Return `Ok` only if value is less than 20
-> |value| if value < 20 { Ok(value as u8) } else { Err(0) }
// Wait for all branches to be successful and then calculate fib
~|> fib,
// In case of success, multilpy fibs
map => |v_1, v_2| v_1 * v_2
};
result.map(|value| println!("Result: {}", value)).unwrap_or_else(|err| println!("Error: {:#?}", err));
}
fn fib(num: u8) -> usize {
println!("CALLED FIB!");
let mut prev = 0;
let mut cur = if num > 0 { 1 } else { 0 };
for _ in 1..num as usize {
let tmp = cur;
cur = prev + cur;
prev = tmp;
}
cur
}
未来示例
请注意:本示例使用 tokio = "0.2.0-alpha.6"
,但是 join!
宏与最新版本的 tokio
兼容。
Cargo.toml
[dependencies]
futures = { version = "=0.3.0-alpha.19", package = "futures-preview", features=["async-await"] }
tokio = "0.2.0-alpha.6"
failure = "0.1.6"
futures-timer = "1.0.2"
reqwest = "0.10.0-alpha.2"
就像这样
#![recursion_limit="1024"]
use join::try_join_async;
use futures::stream::{iter, Stream};
use reqwest::Client;
use futures::future::{try_join_all, ok, ready};
use failure::{format_err, Error};
#[tokio::main]
async fn main() {
println!(
"{} {}\n{}",
"Hello.\nThis's is the game where winner is player, which number is closest to",
"the max count of links (starting with `https://`) found on one of random pages.",
"You play against random generator (0-500)."
);
enum GameResult {
Won,
Lost,
Draw
}
let client = Client::new();
let game = try_join_async! {
// Make requests to several sites
// and calculate count of links starting from `https://`
get_urls_to_calculate_link_count()
|> {
// If pass block statement instead of fn, it will be placed before current step,
// so it will us allow to capture some variables from context
let ref client = client;
move |url|
// `try_join_async!` wraps its content into `Box::pin(async move { })`
try_join_async! {
client
.get(url).send()
=> |value| value.text()
=> |body| ok((url, body.matches("https://").collect::<Vec<_>>().len()))
}
}
// .collect::<Vec<_>>() (Collect values into `Vec<_>`)
=>[] Vec<_>
// .map(Ok)
|> Ok
// .and_then(try_join_all)
=> try_join_all
// .map_err(|err| ...)
!> |err| format_err!("Error retrieving pages to calculate links: {:#?}", err)
// .and_then(|v| v.into_iter()...)
=> >>>
..into_iter()
.max_by_key(|(_, link_count)| *link_count)
.ok_or(format_err!("Failed to find max link count"))
// Wrap previous result into `ready(...)`
-> ready
// It waits for input in stdin before log max links count
~?? >>>
..as_ref()
// .map(|number| ...)
|> |(url, count)| {
let split = url.to_owned().split('/').collect::<Vec<_>>();
let domain_name = split.get(2).unwrap_or(&url);
println!("Max `https://` link count found on `{}`: {}", domain_name, count)
}
..unwrap_or(()),
// Concurrently it makes request to the site which generates random number
get_url_to_get_random_number()
// Wrap previous result into `ok(...)`
-> ok
// .and_then(...)
=> {
// If pass block statement instead of fn, it will be placed before current step,
// so it will allow us to capture some variables from context
let ref client = client;
let map_parse_error = |error, value| format_err!("Failed to parse random number: {:#?}, value: {}", error, value);
move |url|
try_join_async! {
client
.get(url)
.send()
=> |value| value.text()
!> |err| format_err!("Error retrieving random number: {:#?}", err)
=> |value| ok(value[..value.len() - 1].to_owned()) // remove \n from `154\n`
=> |value|
ready(
value
.parse::<u16>()
.map_err(|err| map_parse_error(err, value))
)
}
}
// It waits for input in stdin before log random value
// .inspect(|v| v.as_ref()...)
~?? >>>
..as_ref()
// .map(|number| ...)
|> |number| println!("Random: {}", number)
..unwrap_or(()),
// Concurrently it reads value from stdin
read_number_from_stdin() |> Ok,
// Finally, when we will have all results, we can decide, who is winner
map => |(_url, link_count), random_number, number_from_stdin| {
let random_diff = (link_count as i32 - random_number as i32).abs();
let stdin_diff = (link_count as i32 - number_from_stdin as i32).abs();
match () {
_ if random_diff > stdin_diff => GameResult::Won,
_ if random_diff < stdin_diff => GameResult::Lost,
_ => GameResult::Draw
}
}
};
let _ = game.await.map(
|result|
println!(
"You {}",
match result {
GameResult::Won => "won!",
GameResult::Lost => "lose...",
_ => "have the same result as random generator!"
}
)
).unwrap_or_else(|error| eprintln!("Error: {:#?}", error));
}
fn get_urls_to_calculate_link_count() -> impl Stream<Item = &'static str> {
iter(
vec![
"https://en.wikipedia.org/w/api.php?format=json&action=query&generator=random&grnnamespace=0&prop=revisions|images&rvprop=content&grnlimit=100",
"https://github.com/explore",
"https://twitter.com/search?f=tweets&vertical=news&q=%23news&src=unkn"
]
)
}
fn get_url_to_get_random_number() -> &'static str {
"https://www.random.org/integers/?num=1&min=0&max=500&col=1&base=10&format=plain&rnd=new"
}
async fn read_number_from_stdin() -> u16 {
use tokio::*;
use futures::stream::StreamExt;
let map_parse_error = |error, value| format_err!("Value from stdin isn't a correct `u16`: {:?}, input: {}", error, value);
let mut reader = codec::FramedRead::new(io::BufReader::new(io::stdin()), codec::LinesCodec::new());
loop {
println!("Please, enter number (`u16`)");
let next = reader.next();
let result = try_join_async! {
next
// .map(|v| v.ok_or()...)
|> >>>
..ok_or(format_err!("Unexpected end of input"))
// .and_then(|v| v.map_err(|err| ...))
=> >>> !> |err| format_err!("Failed to apply codec: {:#?}", err)
<<<
<<<
// .and_then(|value| ready(...))
=> |value|
ready(
value
.parse()
.map_err(|err| map_parse_error(err, value))
)
// .map_err(|error| ...)
!> |error| { eprintln!("Error: {:#?}", error); error}
}.await;
if let Ok(value) = result {
break value
}
}
}
单线程组合
同步分支
将输入转换为一系列链式结果,并逐步将它们连接起来。
use std::error::Error;
use join::try_join;
type Result<T> = std::result::Result<T, Box<dyn Error>>;
fn action_1() -> Result<u16> {
Ok(1)
}
fn action_2() -> Result<u8> {
Ok(2)
}
fn main() {
let sum = try_join! {
// action_1(),
action_1(),
// action_2().map(|v| v as u16),
action_2() |> |v| v as u16,
// action_2().map(|v| v as u16 + 1).and_then(|v| Ok(v * 4)),
action_2() |> |v| v as u16 + 1 => |v| Ok(v * 4),
// action_1().and_then(|_| Err("5".into())).or(Ok(2)),
action_1() => |_| Err("5".into()) <| Ok(2),
map => |a, b, c, d| a + b + c + d
}.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
}
未来
每个分支将代表一个未来链。所有分支都将使用 ::futures::join!
/::futures::try_join!
宏以及 join_async!
/try_join_async!
返回 未查询
未来。
#![recursion_limit="256"]
use std::error::Error;
use join::try_join_async;
use futures::future::{ok, err};
type Result<T> = std::result::Result<T, Box<dyn Error>>;
async fn action_1() -> Result<u16> {
Ok(1)
}
async fn action_2() -> Result<u8> {
Ok(2)
}
#[tokio::main]
async fn main() {
let sum = try_join_async! {
// action_1(),
action_1(),
// action_2().and_then(|v| ok(v as u16)),
action_2() => |v| ok(v as u16),
// action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)),
action_2() |> |v| v.map(|v| v as u16 + 1) => |v| ok(v * 4u16),
// action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)),
action_1() => |_| err("5".into()) <= |_| ok(2u16),
and_then => |a, b, c, d| ok(a + b + c + d)
}.await.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
}
多线程组合
要并行执行多个任务,您可以使用 join_spawn!
(spawn!
)来同步任务,以及 join_async_spawn!
(async_spawn!
)来处理未来。由于 join_async
已经在一线程中提供了并发未来执行,所以 join_async_spawn!
将每个分支都放入 tokio
执行器,因此它们将在多线程执行器中进行评估。
同步线程
join_spawn
为每个分支的每个步骤创建一个 ::std::thread
(分支数是当时的最大线程数)。
use std::error::Error;
use join::try_join_spawn;
type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
fn action_1() -> Result<usize> {
Ok(1)
}
fn action_2() -> Result<u16> {
Ok(2)
}
fn main() {
// Branches will be executed in parallel
let sum = try_join_spawn! {
// thread::spawn(move || action_1()),
action_1(),
// thread::spawn(move || action_2().map(|v| v as usize)),
action_2() |> |v| v as usize,
// thread::spawn(move || action_2().map(|v| v as usize + 1).and_then(|v| Ok(v * 4))),
action_2() |> |v| v as usize + 1 => |v| Ok(v * 4),
// thread::spawn(move || action_1().and_then(|_| Err("5".into())).or(Ok(2))),
action_1() => |_| Err("5".into()) <| Ok(2),
map => |a, b, c, d| a + b + c + d
}.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
}
线程名称
在运行时,线程的名称将由父线程的名称和 join_%分支索引% 构成。
具有多个分支的示例
extern crate join;
use std::thread;
use join::try_join_spawn;
fn get_current_thread_name() -> String {
thread::current().name().unwrap().to_owned()
}
fn print_branch_thread_name(index: &Result<usize, ()>) {
println!("Branch: {}. Thread name: {}.", index.unwrap(), get_current_thread_name());
}
fn main() {
let _ = try_join_spawn! {
Ok(0) ?? print_branch_thread_name,
Ok(1) ?? print_branch_thread_name,
try_join_spawn! {
Ok(2) ?? print_branch_thread_name,
try_join_spawn! {
Ok(3) ?? print_branch_thread_name,
}
}
}.unwrap();
}
// Branch: 0. Thread name: main_join_0.
// Branch: 1. Thread name: main_join_1.
// Branch: 2. Thread name: main_join_2_join_0.
// Branch: 3. Thread name: main_join_2_join_1_join_0.
// Order could be different.
未来任务
join_async_spawn!
使用 ::tokio::spawn
函数来生成任务,因此它应该在 tokio
运行时内完成(分支数是当时的 tokio
任务的最大数量)。
#![recursion_limit="256"]
use std::error::Error;
use join::try_join_async_spawn;
use futures::future::{ok, err};
type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
async fn action_1() -> Result<u16> {
Ok(1)
}
async fn action_2() -> Result<u8> {
Ok(2)
}
#[tokio::main]
async fn main() {
let sum = try_join_async_spawn! {
// tokio::spawn(Box::pin(action_1()))
action_1(),
// tokio::spawn(Box::pin(action_2().and_then(|v| ok(v as u16))))
action_2() => |v| ok(v as u16),
// tokio::spawn(Box::pin(action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16))))
action_2() |> |v| v.map(|v| v as u16 + 1) => |v| ok(v * 4u16),
// tokio::spawn(Box::pin(action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16))))
action_1() => |_| err("5".into()) <= |_| ok(2u16),
and_then => |a, b, c, d| ok(a + b + c + d)
}.await.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
}
详细步骤示例
通过在操作中分离链,您将使操作在进入下一步之前等待当前步骤中所有操作的完成。
#![recursion_limit="256"]
use std::error::Error;
use join::try_join;
type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;
fn action_1() -> Result<u16> {
Ok(1)
}
fn action_2() -> Result<u8> {
Ok(2)
}
fn main() {
let sum = try_join! {
action_1(),
let result_1 = action_2() ~|> |v| v as u16 + 1,
action_2() ~|> {
// `result_1` now is the result of `action_2()` [Ok(1u8)]
let result_1 = result_1.as_ref().ok().map(Clone::clone);
move |v| {
if result_1.is_some() {
v as u16 + 1
} else {
unreachable!()
}
}
} ~=> {
// `result_1` now is the result of `|v| v as u16 + 1` [Ok(2u16)]
let result_1 = result_1.as_ref().ok().map(Clone::clone);
move |v| {
if let Some(result_1) = result_1 {
Ok(v * 4 + result_1)
} else {
unreachable!()
}
}
},
action_1() ~=> |_| Err("5".into()) <| Ok(2),
map => |a, b, c, d| a + b + c + d
}.expect("Failed to calculate sum");
println!("Calculated: {}", sum);
}
依赖项
~1.5MB
~35K SLoC