#任务队列 #任务 #任务运行器 #负载均衡 #异步

协调器

协调器是一个用于将任务负载均衡到异步任务运行器的库

2 个版本

0.1.1 2024年7月21日
0.1.0 2024年7月21日

#425Rust 模式

Download history 194/week @ 2024-07-21 26/week @ 2024-07-28

每月 220 次下载

MIT 许可证

31KB
581

协调器

License Cargo Documentation

描述

协调器是一个简单的库,可以将任务负载均衡到异步任务运行器。每个添加到协调器的工人将有一个队列来处理工作单元(或任务)。每个工人一次只能处理一个任务。

您可以使用以下 API 选择哪个工人来处理任务:

  • TaskPrefs::Any (my_coordinator.any() for #[coordinator] 宏): 这将告诉协调器将任务队列到可用的工人数量最多的工人
  • TaskPrefs::Preferred(worker_id) (my_coordinator.prefer(worker_id) for #[coordinator] 宏): 这将告诉协调器如果工人 worker_id 目前没有满,则将任务队列到具有该 ID 的工人,否则将任务队列到任何工人。
  • TaskPrefs::Required(worker_id) (my_coordinator.require(worker_id) for #[coordinator] 宏): 这将告诉协调器将任务队列到具有 ID worker_id 的工人。

协调器将尝试使用平均任务完成时间和工人的队列中任务数量来找到最可用的工人。

目录

安装

该包可在 crates.io 上找到。请访问链接以获取最新版本和安装说明。

使用

要查看完整示例,请检查 playground/examples

// Create a worker that sleeps for 1 sec and return a number that double the input
struct Doubler(String);
impl TaskProcessor<i32> for Doubler {
    type Output = i32;
    async fn do_work(&mut self, task: i32) -> Self::Output {
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("Task {} computed {}", self.0, task * 2);
        task * 2
    }
}

// the queue thershold of a single queue, if the number of task item in queue exceeded the thershold
// any `TaskPref::Preferred(x)` will be processed by a different task processor.
let queue_len = 3;
let b = Coordinator::new(queue_len);

// Add `Doubler` as task processor
b.add_worker("Doubler 1st", Doubler("Doubler 1st".to_string()))
    .await;

// Add a closure as a task processor. Any `FnMut` closure can be used as task processor!
b.add_worker("Doubler 2nd", |x| async move { x * 2 }).await;

// Schedule a task for processing. The task will be polled to completion in the worker future
// and not the current future. The `join_handle` can be used to retrieve the returned value
let join_handle = b.run(2, TaskPrefs::Any).await.unwrap();
println!("Task scheduled!");

// Do other works.....

// Wait for the task result
let rs = join_handle.join().await.unwrap().0;
println!("Task result: {}", rs);

如果你的任务处理器可以处理不同类型的任务(例如:CalculatorProcessor 可以处理 addsubtract 任务),你可以使用 #[coordinator] 属性宏来避免在实现 TaskProcessor 时需要定义自己的输入和输出枚举以及手动调度它们。

pub trait InteractableObject {
    fn size(&self) -> [f32; 3];
    fn weight(&self) -> f32;
    fn set_weight(&mut self, val: f32);
}

pub struct Ball /* ... */; // implements [`InteractableObject`]
pub struct Crystal /* ... */; // implements [`InteractableObject`]

// Type alias for not having to type out this long type every time we use it
type ArcMut<T> = Arc<AssertUnwindSafe<Mutex<T>>>;

#[coordinator]
pub trait CatFamily<I>
where
    I: InteractableObject + RefUnwindSafe,
{
    fn locate_object(obj: ArcMut<I>) -> Option<[f32; 3]>;
    fn upgrade<O: InteractableObject>(obj: ArcMut<I>, material: O);
    fn meow() -> bool;
    fn meow_repeatedly(times: usize)
    where
        Self: Send,
    {
        async move {
            for _ in 0..times {
                self.meow().await;
            }
        }
    }
}

pub struct DomesticatedCat {
    name: String,
    exp: usize,
}

impl DomesticatedCat {
    pub fn new(name: String) -> Self {
        Self { name, exp: 0 }
    }
}

// Instead of implementing the [`TaskProcessor`]  trait, we implement the trait generated by `#[coordinator]` instead, this way we don't have to enum dispatch ourself. The trait name will always be `[Name]Processor`
impl<I> CatFamilyProcessor<I> for DomesticatedCat
where
    I: InteractableObject + RefUnwindSafe + Send + Sync + 'static,
{
    async fn locate_object(&mut self, obj: ArcMut<I>) -> Option<[f32; 3]> {
        // ...
    }

    async fn upgrade<O: InteractableObject>(&mut self, obj: ArcMut<I>, material: O) {
        // ...
    }

    async fn meow(&mut self) -> bool {
        // ...
    }
}

pub struct RobotCat /* ... */; // Another CatProcessor impl

async fn main() -> Result<(), Box<dyn Error>> {
    // The `CatFamily` struct is generated automatically, with `From<Coordinator>` impl so you can convert any `Coordinator` into it using `into()`
    let cat_family: CatFamily<Ball, Crystal, &str> = Coordinator::new(3).into();
    cat_family
        .add_worker("Maple", DomesticatedCat::new("Maple".to_owned()))
        .await;

    cat_family
        .add_worker("Oktocat", RobotCat::new("Oktocat".to_owned()))
        .await;

    for _ in 0..10 {
        // Cloning here is only cloning the `Arc` under the hood, not creating a new `Coordinator`
        let cat_family = cat_family.clone();
        tokio::spawn(async move {
            let balls = Arc::new(AssertUnwindSafe(Mutex::new(Ball {
                size: [2.2, 3.3, 4.4],
                weight: 5.9,
                bounciness: 10.2,
            })));

            let crystal = Crystal {
                size: [5.2, 3.1, 6.4],
                weight: 15.9,
                purity: 0.9,
            };

            let (pos, cat) = cat_family
                .any()
                .locate_object(balls.clone())
                .await?
                .join()
                .await?;

            let Some(pos) = pos else {
                println!("Cat {} cannot find the object!", cat);
                return Ok(());
            };

            println!("Cat {} has found the ball at {:?}", cat, pos);

            let (_, cat) = cat_family
                .prefer(&cat)
                .upgrade(balls.clone(), crystal)
                .await?
                .join()
                .await?;

            println!(
                "Cat {} has upgrade ball to {}",
                cat,
                balls.0.lock().await.weight
            );

            // We don't care about the result here so no need to join
            cat_family.require(&cat).meow_repeatedly(3).await?;
            return Ok::<(), Box<dyn Error + Send + Sync + 'static>>(());
        });
    }

    Ok(())
}

贡献

我们欢迎对该项目的任何贡献。在提交拉取请求之前,请打开一个问题来检查是否有人已经在处理该功能。

许可证

本项目采用 MIT 许可证。

依赖关系

~4–11MB
~100K SLoC