#async-task #async #observable #async-await #synchronization #reactive #await

async-observable

异步与响应式同步模型,以保持多个异步任务/线程的部分同步

7 个不稳定版本

0.4.2 2024 年 4 月 8 日
0.4.1 2024 年 4 月 3 日
0.4.0 2022 年 10 月 11 日
0.3.0 2022 年 10 月 7 日
0.1.0 2022 年 2 月 17 日

#558 in 异步

Download history 23/week @ 2024-04-13 4/week @ 2024-04-20 3/week @ 2024-04-27 13/week @ 2024-05-11 26/week @ 2024-05-18 8/week @ 2024-05-25 68/week @ 2024-06-01 79/week @ 2024-06-08 9/week @ 2024-06-15 1/week @ 2024-06-22 33/week @ 2024-06-29 8/week @ 2024-07-06 1/week @ 2024-07-13 16/week @ 2024-07-20 35/week @ 2024-07-27

84 每月下载量
async-subscription-map 中使用

MIT 许可证

28KB
493 代码行

async-observable

异步与响应式同步模型,以保持多个异步任务/线程的部分同步。

示例

简单分叉

use async_observable::Observable;

#[async_std::main]
async fn main() {
    let (mut a, mut b) = Observable::new(0).split();

    a.publish(1);

    assert_eq!(b.wait().await, 1);
}

通知任务

use async_std::task::{sleep, spawn};
use async_observable::Observable;

#[async_std::main]
async fn main() {
    let (mut main, mut task) = Observable::new(0).split();

    let task = spawn(async move {
        loop {
            let update = task.next().await;
            println!("task received update {}", update);

            if update >= 3 {
                break;
            }
        }
    });

    main.publish(1);
    sleep(std::time::Duration::from_millis(100)).await;
    main.publish(2);
    sleep(std::time::Duration::from_millis(100)).await;
    main.publish(3);

    task.await;
}

执行控制

您可以用互斥锁的行为来模拟,但如果有可观察的值发生变化,您可以用可观察的来启动许多异步任务。我们只使用一个布尔型可观察值,我们只发布一次。

use async_std::task::{sleep, spawn};
use async_observable::Observable;
use futures::join;

#[async_std::main]
async fn main() {
    let mut execute = Observable::new(false);
    let mut execute_fork_one = execute.clone();
    let mut execute_fork_two = execute.clone();

    let task_one = spawn(async move {
        println!("task one started");
        execute_fork_one.next().await;
        println!("task one ran");
    });

    let task_two = spawn(async move {
        println!("task two started");
        execute_fork_two.next().await;
        println!("task two ran");
    });

    join!(
        task_one,
        task_two,
        spawn(async move {
            println!("main task started");

            // run some fancy business logic
            sleep(std::time::Duration::from_millis(100)).await;
            // then release our tasks to do stuff when we are done
            execute.publish(true);

            println!("main task ran");
        })
    );
}

您可能会争论说,您可以在启动某些内容的那一刻就启动任务 - 这是真的,如果您只是想启动子任务,这是一个更好的解决方案。 但如果你想要通知程序中的完全不同的部分,这就会变得很困难。 或者例如,如果您想在任务的一半运行时等待另一个任务所做的事情,然后再继续。


此代码最初由 HUM Systems 发布。这个仓库继续开发这个库,因为他们不幸地停止了开源工作。

依赖项

~0.7–1MB
~20K SLoC