#join #macro #re-exported #exports #async #join-async #join-async-spawn

macro no-std join_export

join!join_async!join_spawn!join_async_spawn!async_spawn! 等宏的导出,这些宏由 join 包重新导出。

22个版本

0.1.1 2020年1月27日
0.1.0 2020年1月27日
0.1.0-beta.52019年12月5日
0.1.0-beta.42019年11月29日
0.1.0-alpha.52019年10月28日

#8#re-exported

Download history 31/week @ 2024-04-07 16/week @ 2024-04-14 15/week @ 2024-04-21 10/week @ 2024-04-28 18/week @ 2024-05-05 67/week @ 2024-05-12 34/week @ 2024-05-19 33/week @ 2024-05-26 39/week @ 2024-06-02 50/week @ 2024-06-09 65/week @ 2024-06-16 44/week @ 2024-06-23 13/week @ 2024-06-30 23/week @ 2024-07-07 56/week @ 2024-07-14 32/week @ 2024-07-21

128 每月下载次数

MIT 许可证

245KB
4K SLoC

join!join_async!join_spawn!join_async_spawn!async_spawn!try_join!try_join_async!try_join_spawn!try_join_async_spawn!try_async_spawn! 等宏的导出,这些宏由 join 包重新导出。

join!

提供有用的快捷组合子,组合同步/异步链,支持单线程和多线程(同步/异步)逐步执行分支,将结果元组转换为元组结果。

  • join! 宏将仅返回最终值。如果您正在处理迭代器/流等,请使用它。
  • try_join! 宏将转换元组的 Option/Result 为元组的 Option/Result。当您处理选项或结果时,请使用它。如果在步骤末尾,某个分支产生 None/Err,则后续步骤的执行将被中止。在异步宏的情况下,您只能提供 Result,因为 ::futures::try_join 不支持 Option

Docs Crates.io MIT licensed Build Status

请使用这些文档进行开发,它们更方便。

特性

  • 性能。宏生成经过优化的代码(在步骤中不使用不活跃的分支,不克隆结果/选项或其他任何值,不在堆上分配任何内存[除非将future包装进Box::pin]) - 您可以使用cargo expand来检查。
  • 步骤允许编写依赖于前一次迭代中分支结果的代码。
  • 一行链,无法使用纯Rust(没有宏)创建。
  • 简洁。表达相同流程的代码更少。快捷组合子 = 更少的括号。
  • async 生成futures,因此它们可以用于非async函数。
  • 可配置性。有许多选项可以独立配置,以完全更改宏的行为。

  • try_join! - 结合Result/Option,将Result/Option的元组转换为元组的Result/Option
assert_eq!(
    try_join!(Ok::<_,()>(1), Ok::<_,()>("2"), Ok::<_,()>(3.0)), 
    Ok::<_,()>((1, "2", 3.0))
);
  • try_join_async! - 结合futures,将Result的元组转换为元组的Result
assert_eq!(
    try_join_async!(ok::<_,()>(1), ok::<_,()>("2"), ok::<_,()>(3.0)).await, 
    Ok::<_,()>((1, "2", 3.0))
);
  • try_join_spawn! - 为每个分支启动一个std::thread,并将结果合并,将Result/Option的元组转换为元组的Result/Option
assert_eq!(
    try_join_spawn!(Ok::<_,()>(1), Ok::<_,()>("2"), Ok::<_,()>(3.0)), 
    Ok::<_,()>((1, "2", 3.0))
);
  • try_spawn! - try_join_spawn!的别名。
  • try_join_async_spawn! - 使用tokio::spawn为每个分支启动tokio任务,将Result的元组转换为元组的Result
assert_eq!(
    try_join_async_spawn!(ok::<_,()>(1), ok::<_,()>("2"), ok::<_,()>(3.0)).await, 
    Ok::<_,()>((1, "2", 3.0))
);
  • try_async_spawn! - try_join_async_spawn!的别名。
  • join! - 合并值。
assert_eq!(
    join!(1, "2", 3.0), (1, "2", 3.0)
);
  • join_async! - 合并futures。
assert_eq!(
    join_async!(ready(1), ready("2"), ready(3.0)).await, (1, "2", 3.0)
);
  • join_spawn! - 为每个分支启动一个std::thread
assert_eq!(
    join_spawn!(1, "2", 3.0), (1, "2", 3.0)
);
  • spawn! - join_spawn!的别名。
  • join_async_spawn! - 使用tokio::spawn为每个分支启动tokio任务。
assert_eq!(
    join_async_spawn!(ready(1), ready("2"), ready(3.0)).await, (1, "2", 3.0)
);
  • async_spawn! - join_async_spawn!的别名。

组合子

  • 然后:->
join! { value -> expr }; // => expr(value)
  • 映射:|>
join! { value |> expr }; // => value.map(expr)
  • 然后:=>
join! { value => expr }; // => value.and_then(expr)
  • 过滤:?>
join! { value ?> expr }; // => value.filter(expr)
  • 点:..>.
join! { value .. expr }; // => value.expr
join! { value >. expr }; // => value.expr
  • 或:<|
join! { value <| expr }; // => value.or(expr)
  • 或否则:<=
join! { value <= expr }; // => value.or_else(expr)  
  • MapErr:!>
join! { value !> expr }; // => value.map_err(expr)
  • Collect:=>[](类型为可选)
join! { value =>[] T }; // => value.collect::<T>()
join! { value =>[] }; // => value.collect()
  • Chain:>@>
join! { value >@> expr }; // => value.chain(expr)
  • FindMap:?|>@
join! { value ?|>@ expr }; // => value.find_map(expr)
  • FilterMap:?|>
join! { value ?|> expr }; // => value.filter_map(expr)
  • Enumerate:|n>
join! { value |n> }; // => value.enumerate()
  • Partition:?&!>
join! { value ?&!> expr }; // => value.partition(expr)
  • Flatten:^^>
join! { value ^^> }; // => value.flatten()
  • Fold:^@
join! { value ^@ init_expr, fn_expr }; // => value.fold(init_expr, fn_expr)
  • TryFold:?^@
join! { value ?^@ init_expr, fn_expr }; // => value.try_fold(init_expr, fn_expr)
  • Find:?@
join! { value ?@ expr }; // => value.find(expr)
  • Zip:>^>
join! { value >^> expr }; // => value.zip(expr)
  • Unzip:<->(类型为可选)
join! { value <-> A, B, FromA, FromB }; // => value.unzip::<A, B, FromA, FromB>()
join! { value <-> }; // => value.unzip()
  • Inspect:??
join! { value ?? expr }; // => (|value| { (expr)(&value); value })(value) // for sync
join_async! { value ?? expr }; // => value.inspect(expr) // for async 

其中 value 是上一个值。

所有以 ~ 开头的组合子将作为延迟操作(所有操作将在每一步完成后等待,然后才移动到下一步)。

嵌套组合子

  • Wrap:combinator >>> combinator(s)...
try_join! { value => >>> |> |v| v + 2 } // => value.and_then(|value| value.map(|v| v + 2))

用于进入嵌套构造,例如

a.and_then(
    // >>>
    |b| b.and_then(
        // >>>
        |c| c.and_then(
            |v| Ok(v + 2)
        )
    )
)
  • Unwrap:<<<
try_join! { 
    value 
    => >>> 
        |> |v| v + 2 
    <<<
    |> |v| Some(v + 4)  
} // => value.and_then(|value| value.map(|v| v + 2)).map(|v| Some(v + 4))

用于从嵌套构造中退出

a.and_then(
    // >>>
    |b| b.and_then(
        // >>>
        |c| c.and_then(
            |v| Ok(v + 2)
        )
        // <<<
    )
    // <<<
).map(
    |v| v + 1
)

处理器

可能是以下之一

  • map => 仅对 try 宏有效。 将作为 results.map(|(result0, result1, ..)| handler(result0, result1, ..))
assert_eq!(
    try_join! { 
        Some(1), 
        Some(2), 
        Some(3), 
        map => |a, b, c| a + b + c
    },
    Some(6)
);
  • and_then => 仅对 try 宏有效。 将作为 results.and_then(|(result0, result1, ..)| handler(result0, result1, ..))
assert_eq!(
    try_join! { 
        Some(1), 
        Some(2), 
        Some(3), 
        and_then => |a, b, c| Some(a + b + c)
    },
    Some(6)
);
  • then => 仅对非 try 宏有效。 总是执行,作为 handler(result0, result1, ..)
assert_eq!(
    join! { 
        Some(1),
        Some(2),
        Some(3),
        then => |a: Option<u8>, b: Option<u8>, c: Option<u8>| 
            Some(a.unwrap() + b.unwrap() + c.unwrap()) 
    },
    Some(6)
);

如果没有指定或指定为 - 对于 try 宏将返回 Result<(result0, result1, ..), Error>,对于非 try 宏将返回 Option<(result0, result1, ..)>

自定义配置

您可以在宏调用开始处指定任何参数。

  • futures_crate_path - 指定用于 futures crate 的自定义 crate 路径,该路径将用于所有与 futures 相关的项,由 async join! 宏使用。仅对 async 宏有效。
  • custom_joiner - 指定自定义的连接 函数,当活动分支数量大于 1 时,将连接这些分支。
  • transpose_results - 指定宏是否应该将 ResultOption 的元组转换成元组的 ResultOption。在连接器已返回元组的 Result 且不需要转换时非常有用。
  • lazy_branches - 当传递值给连接器时,将每个分支包装成 move || {}。默认情况下,对于 try_join_spawn!try_spawn!join_spawn! 宏以及 spawn! 宏,因为它们使用 thread::spawn 调用。只有当活动分支数量 > 1 时才有效。
#![recursion_limit="256"]

use join::try_join_async;
use futures::future::ok;

macro_rules! custom_futures_joiner {
    ($($futures: expr),+) => {
        ::futures::try_join!($($futures),*);
    }
}

#[tokio::main]
async fn main() {
    let value = try_join_async! {
        futures_crate_path(::futures)
        custom_joiner(custom_futures_joiner!)
        transpose_results(false)
        ok::<_,()>(2u16), ok::<_,()>(3u16),
        map => |a, b| a + b
    }.await.unwrap();
    
    assert_eq!(value, 5);
}

Rayon 示例

#![recursion_limit="256"]

use join::{try_join, join};

fn fib(num: u8) -> usize {
    let mut prev = 0;
    let mut cur = if num > 0 { 1 } else { 0 };
    for _ in 1..num as usize {
        let tmp = cur;
        cur = prev + cur;
        prev = tmp;
    }
    cur
} 

fn main() {
    let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
    let calculated = pool.install(|| 
        try_join! {
            custom_joiner(rayon::join)
            || Some(fib(50)),
            || Some(
                join! {
                    custom_joiner(rayon::join)
                    lazy_branches(true)
                    fib(20) -> |v| v + 25,
                    fib(30) -> |v| vec![v; 10].into_iter() |n> |> |(index, value)| value + index ..sum::<usize>(),
                    then => |a, b| a + b
                }
            ),
            map => |a, b| a * b
        }
    );
    assert_eq!(calculated.unwrap(), 104808819944395875);
}

模式匹配

您可以为每个分支指定 let 模式,以与其他分支共享结果,或者在需要在不同步骤之间有 mut 值的情况下。

assert_eq!(
    try_join! {
        let mut branch_0 = Ok::<_,()>(1) ~|> |v| v + 1,
        let branch_1 = Ok::<_,()>(2) ~|> { let value_0 = branch_0.as_ref().unwrap(); move |v| v + value_0 },
        map => |b_0, b_1| b_0 * b_1
    }.unwrap(), 
    6
);

块捕获

为了捕获变量(例如,示例中其他分支的值),您可以通过传递代码块而不是函数来传递

let mut some_value = Some("capture me");
assert_eq!(try_join! {
    Some(0) |> |v| { 
        // assign `None` to some_value in step expr
        some_value = None; 
        v 
    } |> { 
        // capture value before step and get str len
        let captured_len = some_value.as_ref().unwrap().len(); 
        move |v| v + captured_len
    }
}.unwrap(), 10);

这些代码块将放在实际步骤表达式之前。

示例

同步示例

使用此宏,您可以编写类似以下内容

#![recursion_limit = "256"]

use rand::prelude::*;
use std::sync::Arc;
use join::try_join_spawn;

// Problem: generate vecs filled by random numbers in parallel, make some operations on them in parallel,
// find max of each vec in parallel and find final max of 3 vecs

// Solution:
fn main() {
    // Branches will be executed in parallel, each in its own thread
    let max = try_join_spawn! {
        let branch_0 =
            generate_random_vec(1000, 10000000u64)
                .into_iter()
                // .map(power2) (Multiply every element by itself)
                |> power2
                // .filter(is_even) (Filter even values)
                ?> is_even
                // .collect::<Vec<_>>() (Collect values into `Vec<_>`)
                =>[] Vec<_>
                // Arc::new(Some(...))
                // Use `Arc` to share data with branch 1
                -> Arc::new -> Some
                // Find max and clone its value
                // .and_then(|v| v.iter().max().map(Clone::clone))
                ~=> >>> ..iter().max() |> Clone::clone,
        generate_random_vec(10000, 100000000000000f64)
            .into_iter()
            // .map(get_sqrt) (Extract sqrt from every element)
            |> get_sqrt
            // Some(...)
            -> Some
            // .and_then(|v| v...)
            ~=> >>> 
                // .enumerate() (Add index in order to compare with the values of branch_0)
                |n>
                // .map(...)
                |> {
                    // Get data from branch 0 by cloning arc
                    let branch_0 = branch_0.as_ref().unwrap().clone();
                    let len = branch_0.len();
                    // Compare every element of branch 1 with element of branch_0
                    // with the same index and take min
                    move |(index, value)|
                        if index < len && value as u64 > branch_0[index] {
                            branch_0[index]
                        } else {
                            value as u64
                        }
                }..max(),
        generate_random_vec(100000, 100000u32)
            .into_iter()
            -> Some
            // .and_then(|v| v.max())
            ~=> >>> ..max(),
        and_then => |max0, max1, max2|
            // Find final max
            [max0, max1, max2 as u64].iter().max().map(Clone::clone)
    }
    .unwrap();
    println!("Max: {}", max);
}

fn generate_random_vec<T>(size: usize, max: T) -> Vec<T>
where
    T: From<u8>
        + rand::distributions::uniform::SampleUniform
        + rand::distributions::uniform::SampleBorrow<T>
        + Copy,
{
    let mut rng = rand::thread_rng();
    (0..size)
        .map(|_| rng.gen_range(T::from(0u8), max))
        .collect()
}

fn is_even<T>(value: &T) -> bool
where
    T: std::ops::Rem<Output = T> + std::cmp::PartialEq + From<u8> + Copy
{
    *value % 2u8.into() == 0u8.into()
}

fn get_sqrt<T>(value: T) -> T
where
    T: Into<f64>,
    f64: Into<T>,
{
    let value_f64: f64 = value.into();
    value_f64.sqrt().into()
}

fn power2<T>(value: T) -> T
where
    T: std::ops::Mul<Output = T> + Copy,
{
    value * value
}
#![recursion_limit="256"]

extern crate rand;
extern crate join;

use rand::prelude::*;
use join::try_join;

fn main() {
    let mut rng = rand::thread_rng();

    let result = try_join! {
        (0..10)
            // .map(|index| { let value ... })
            |> |index| { let value = rng.gen_range(0, index + 5); if rng.gen_range(0f32, 2.0) > 1.0 { Ok(value) } else { Err(value) }}
            // .filter(|result| ...)
            ?> |result| match result { Ok(_) => true, Err(value) => *value > 2 }
            // .map(|v| v.map(|value| value + 1))
            |> >>> |> |value| value + 1
            <<<
            // .try_fold(0i32, |acc, cur| {...})
            ?^@ 0i32, |acc, cur| {
                cur.map(|cur| acc + cur).or_else(|cur| Ok(acc - cur))
            }
            // .and_then(|value| if ...)
            => |value| if value > 0 { Ok(value as u8) } else { Err(0) }
            // Wait for all branches to be successful and then calculate fib
            ~|> fib,
        (0..6)
            // .map(|index| { let value ... })
            |> |index| { let value = rng.gen_range(0, index + 5); if rng.gen_range(0f32, 2.0) > 1.0 { Some(value) } else { None }}
            // .filter_map(|v| v)
            ?|> >>>
            <<<
            ..sum::<u16>()
            // Return `Ok` only if value is less than 20
            -> |value| if value < 20 { Ok(value as u8) } else { Err(0) }
            // Wait for all branches to be successful and then calculate fib
            ~|> fib,
        // In case of success, multilpy fibs
        map => |v_1, v_2| v_1 * v_2
    };

    result.map(|value| println!("Result: {}", value)).unwrap_or_else(|err| println!("Error: {:#?}", err));
}

fn fib(num: u8) -> usize {
    println!("CALLED FIB!");
    let mut prev = 0;
    let mut cur = if num > 0 { 1 } else { 0 };
    for _ in 1..num as usize {
        let tmp = cur;
        cur = prev + cur;
        prev = tmp;
    }
    cur
}

未来示例

请注意:本示例使用 tokio = "0.2.0-alpha.6",但是 join! 宏与最新版本的 tokio 兼容。

Cargo.toml

[dependencies]
futures = { version = "=0.3.0-alpha.19", package = "futures-preview", features=["async-await"] }
tokio = "0.2.0-alpha.6"
failure = "0.1.6"
futures-timer = "1.0.2"
reqwest = "0.10.0-alpha.2"

就像这样

#![recursion_limit="1024"]

use join::try_join_async;
use futures::stream::{iter, Stream};
use reqwest::Client;
use futures::future::{try_join_all, ok, ready};
use failure::{format_err, Error};

#[tokio::main]
async fn main() {
    println!(
        "{} {}\n{}",
        "Hello.\nThis's is the game where winner is player, which number is closest to",
        "the max count of links (starting with `https://`) found on one of random pages.",
        "You play against random generator (0-500)."
    );

    enum GameResult {
        Won,
        Lost,
        Draw
    }

    let client = Client::new();
    
    let game = try_join_async! {
        // Make requests to several sites
        // and calculate count of links starting from `https://`
        get_urls_to_calculate_link_count()
            |> {
                // If pass block statement instead of fn, it will be placed before current step,
                // so it will us allow to capture some variables from context
                let ref client = client;
                move |url|
                    // `try_join_async!` wraps its content into `Box::pin(async move { })`
                    try_join_async! {
                        client
                            .get(url).send()
                            => |value| value.text()
                            => |body| ok((url, body.matches("https://").collect::<Vec<_>>().len()))
                    }
            }
            // .collect::<Vec<_>>() (Collect values into `Vec<_>`)
            =>[] Vec<_>
            // .map(Ok)
            |> Ok
            // .and_then(try_join_all)
            => try_join_all
            // .map_err(|err| ...)
            !> |err| format_err!("Error retrieving pages to calculate links: {:#?}", err)
            // .and_then(|v| v.into_iter()...)
            => >>>
                ..into_iter()
                .max_by_key(|(_, link_count)| *link_count)
                .ok_or(format_err!("Failed to find max link count"))
                // Wrap previous result into `ready(...)`
                -> ready
            // It waits for input in stdin before log max links count
            ~?? >>>
                ..as_ref()
                // .map(|number| ...)
                |> |(url, count)| {
                    let split = url.to_owned().split('/').collect::<Vec<_>>();
                    let domain_name = split.get(2).unwrap_or(&url);
                    println!("Max `https://` link count found on `{}`: {}", domain_name, count)
                }
                ..unwrap_or(()),
        // Concurrently it makes request to the site which generates random number
        get_url_to_get_random_number()
            // Wrap previous result into `ok(...)`
            -> ok
            // .and_then(...)
            => {
                // If pass block statement instead of fn, it will be placed before current step,
                // so it will allow us to capture some variables from context
                let ref client = client;
                let map_parse_error = |error, value| format_err!("Failed to parse random number: {:#?}, value: {}", error, value);
                move |url|
                    try_join_async! {
                        client
                            .get(url)
                            .send()
                            => |value| value.text()
                            !> |err| format_err!("Error retrieving random number: {:#?}", err)
                            => |value| ok(value[..value.len() - 1].to_owned()) // remove \n from `154\n`
                            => |value|  
                                ready(
                                    value
                                        .parse::<u16>()
                                        .map_err(|err| map_parse_error(err, value))
                                )
                    }
            }
            // It waits for input in stdin before log random value
            // .inspect(|v| v.as_ref()...)
            ~?? >>>
                ..as_ref()
                // .map(|number| ...)
                |> |number| println!("Random: {}", number)
                ..unwrap_or(()),
        // Concurrently it reads value from stdin
        read_number_from_stdin() |> Ok,
        // Finally, when we will have all results, we can decide, who is winner
        map => |(_url, link_count), random_number, number_from_stdin| {
            let random_diff = (link_count as i32 - random_number as i32).abs();
            let stdin_diff = (link_count as i32 - number_from_stdin as i32).abs();
            match () {
                _ if random_diff > stdin_diff => GameResult::Won,
                _ if random_diff < stdin_diff => GameResult::Lost,
                _ => GameResult::Draw
            }
        }    
    };

    let _ = game.await.map(
        |result|
            println!(
                "You {}",
                match result {
                    GameResult::Won => "won!",
                    GameResult::Lost => "lose...",
                    _ => "have the same result as random generator!"
                }
            )
    ).unwrap_or_else(|error| eprintln!("Error: {:#?}", error));
}

fn get_urls_to_calculate_link_count() -> impl Stream<Item = &'static str> {
    iter(
        vec![
            "https://en.wikipedia.org/w/api.php?format=json&action=query&generator=random&grnnamespace=0&prop=revisions|images&rvprop=content&grnlimit=100",
            "https://github.com/explore",
            "https://twitter.com/search?f=tweets&vertical=news&q=%23news&src=unkn"
        ]
    )   
}

fn get_url_to_get_random_number() -> &'static str {
    "https://www.random.org/integers/?num=1&min=0&max=500&col=1&base=10&format=plain&rnd=new"
}

async fn read_number_from_stdin() -> u16 {
    use tokio::*;
    use futures::stream::StreamExt;
    
let map_parse_error = |error, value| format_err!("Value from stdin isn't a correct `u16`: {:?}, input: {}", error, value);

    let mut reader = codec::FramedRead::new(io::BufReader::new(io::stdin()), codec::LinesCodec::new());

    loop {
        println!("Please, enter number (`u16`)");

        let next = reader.next();
    
        let result = try_join_async! {
            next
                // .map(|v| v.ok_or()...)
                |> >>>
                    ..ok_or(format_err!("Unexpected end of input"))
                    // .and_then(|v| v.map_err(|err| ...))
                    => >>> !> |err| format_err!("Failed to apply codec: {:#?}", err)
                    <<<
                <<<
                // .and_then(|value| ready(...))
                => |value|
                    ready(
                        value
                            .parse()
                            .map_err(|err| map_parse_error(err, value))
                    )
                // .map_err(|error| ...)
                !> |error| { eprintln!("Error: {:#?}", error); error}
        }.await;

        if let Ok(value) = result {
            break value
        }
    }
}

单线程组合

同步分支

将输入转换为一系列链式结果,并逐步将它们连接起来。

use std::error::Error;
use join::try_join;

type Result<T> = std::result::Result<T, Box<dyn Error>>;

fn action_1() -> Result<u16> {
    Ok(1)
}

fn action_2() -> Result<u8> {
    Ok(2)
}

fn main() {
    let sum = try_join! {
        // action_1(),
        action_1(),
        
        // action_2().map(|v| v as u16),
        action_2() |> |v| v as u16,
        
        // action_2().map(|v| v as u16 + 1).and_then(|v| Ok(v * 4)),
        action_2() |> |v| v as u16 + 1 => |v| Ok(v * 4),
        
        // action_1().and_then(|_| Err("5".into())).or(Ok(2)),
        action_1() => |_| Err("5".into()) <| Ok(2),
        
        map => |a, b, c, d| a + b + c + d
    }.expect("Failed to calculate sum");

    println!("Calculated: {}", sum);
}

未来

每个分支将代表一个未来链。所有分支都将使用 ::futures::join!/::futures::try_join! 宏以及 join_async!/try_join_async! 返回 未查询 未来。

#![recursion_limit="256"]

use std::error::Error;
use join::try_join_async;
use futures::future::{ok, err};

type Result<T> = std::result::Result<T, Box<dyn Error>>;

async fn action_1() -> Result<u16> {
    Ok(1)
}
async fn action_2() -> Result<u8> {
    Ok(2)
}

#[tokio::main]
async fn main() {
    let sum = try_join_async! {
        // action_1(),
        action_1(),

        // action_2().and_then(|v| ok(v as u16)),
        action_2() => |v| ok(v as u16),

        // action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16)),
        action_2() |> |v| v.map(|v| v as u16 + 1) => |v| ok(v * 4u16),

        // action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16)),
        action_1() => |_| err("5".into()) <= |_| ok(2u16),

        and_then => |a, b, c, d| ok(a + b + c + d)
    }.await.expect("Failed to calculate sum");

    println!("Calculated: {}", sum);
}

多线程组合

要并行执行多个任务,您可以使用 join_spawn!spawn!)来同步任务,以及 join_async_spawn!async_spawn!)来处理未来。由于 join_async 已经在一线程中提供了并发未来执行,所以 join_async_spawn! 将每个分支都放入 tokio 执行器,因此它们将在多线程执行器中进行评估。

同步线程

join_spawn 为每个分支的每个步骤创建一个 ::std::thread(分支数是当时的最大线程数)。


use std::error::Error;
use join::try_join_spawn;

type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;

fn action_1() -> Result<usize> {
    Ok(1)
}

fn action_2() -> Result<u16> {
    Ok(2)
}

fn main() {
    // Branches will be executed in parallel
    let sum = try_join_spawn! {
        // thread::spawn(move || action_1()),
        action_1(),
        
        // thread::spawn(move || action_2().map(|v| v as usize)),
        action_2() |> |v| v as usize,
        
        // thread::spawn(move || action_2().map(|v| v as usize + 1).and_then(|v| Ok(v * 4))),
        action_2() |> |v| v as usize + 1 => |v| Ok(v * 4),
        
        // thread::spawn(move || action_1().and_then(|_| Err("5".into())).or(Ok(2))),
        action_1() => |_| Err("5".into()) <| Ok(2),
        
        map => |a, b, c, d| a + b + c + d
    }.expect("Failed to calculate sum");

    println!("Calculated: {}", sum);
}

线程名称

在运行时,线程的名称将由父线程的名称和 join_%分支索引% 构成。

具有多个分支的示例

extern crate join;

use std::thread;

use join::try_join_spawn;

fn get_current_thread_name() -> String {
    thread::current().name().unwrap().to_owned()
}

fn print_branch_thread_name(index: &Result<usize, ()>) {
    println!("Branch: {}. Thread name: {}.", index.unwrap(), get_current_thread_name());
}

fn main() {
    let _ = try_join_spawn! {
        Ok(0) ?? print_branch_thread_name,
        Ok(1) ?? print_branch_thread_name,
        try_join_spawn! {
            Ok(2) ?? print_branch_thread_name,
            try_join_spawn! {
                Ok(3) ?? print_branch_thread_name,
            }
        }
    }.unwrap();
}

// Branch: 0. Thread name: main_join_0.
// Branch: 1. Thread name: main_join_1.
// Branch: 2. Thread name: main_join_2_join_0.
// Branch: 3. Thread name: main_join_2_join_1_join_0.
// Order could be different.

未来任务

join_async_spawn! 使用 ::tokio::spawn 函数来生成任务,因此它应该在 tokio 运行时内完成(分支数是当时的 tokio 任务的最大数量)。

#![recursion_limit="256"]

use std::error::Error;
use join::try_join_async_spawn;
use futures::future::{ok, err};

type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;

async fn action_1() -> Result<u16> {
    Ok(1)
}

async fn action_2() -> Result<u8> {
    Ok(2)
}

#[tokio::main]
async fn main() {
    let sum = try_join_async_spawn! {
        // tokio::spawn(Box::pin(action_1()))
        action_1(),

        // tokio::spawn(Box::pin(action_2().and_then(|v| ok(v as u16))))
        action_2() => |v| ok(v as u16),

        // tokio::spawn(Box::pin(action_2().map(|v| v.map(|v| v as u16 + 1)).and_then(|v| ok(v * 4u16))))
        action_2() |> |v| v.map(|v| v as u16 + 1) => |v| ok(v * 4u16),

        // tokio::spawn(Box::pin(action_1().and_then(|_| err("5".into())).or_else(|_| ok(2u16))))
        action_1() => |_| err("5".into()) <= |_| ok(2u16),

        and_then => |a, b, c, d| ok(a + b + c + d)
    }.await.expect("Failed to calculate sum");

    println!("Calculated: {}", sum);
}

详细步骤示例

通过在操作中分离链,您将使操作在进入下一步之前等待当前步骤中所有操作的完成。

#![recursion_limit="256"]

use std::error::Error;
use join::try_join;

type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;

fn action_1() -> Result<u16> {
    Ok(1)
}

fn action_2() -> Result<u8> {
    Ok(2)
}

fn main() {
    let sum = try_join! {
        action_1(),
        let result_1 = action_2() ~|> |v| v as u16 + 1,
        action_2() ~|> {
            // `result_1` now is the result of `action_2()` [Ok(1u8)]
            let result_1 = result_1.as_ref().ok().map(Clone::clone);
            move |v| {
                if result_1.is_some() {
                    v as u16 + 1
                } else {
                    unreachable!()
                }
            }
        } ~=> {
            // `result_1` now is the result of `|v| v as u16 + 1` [Ok(2u16)]
            let result_1 = result_1.as_ref().ok().map(Clone::clone);
            move |v| {
                if let Some(result_1) = result_1 {
                    Ok(v * 4 + result_1)
                } else {
                    unreachable!()
                }
            }
        },
        action_1() ~=> |_| Err("5".into()) <| Ok(2),
        map => |a, b, c, d| a + b + c + d
    }.expect("Failed to calculate sum");

    println!("Calculated: {}", sum);
}

依赖项

~1.5MB
~35K SLoC