#task #coordinator #task-runner #load-balancing #task-queue

coordinator derive

coordinator_derive 是宏的辅助库

1 个不稳定版本

0.1.0 2024年7月21日

#5#coordinator

Download history 116/week @ 2024-07-21 7/week @ 2024-07-28

123 每月下载量
coordinator 中使用

MIT 协议

29KB
545

Coordinator

描述

Coordinator 是一个简单的库,可以将任务均衡分配给异步运行的任务运行器。每个添加到协调器的工人将有一个队列来处理工作单元(或任务)。每个工人在给定时间内只处理一个任务。

您可以使用以下API选择哪个工人来处理任务:

  • TaskPrefs::Any (my_coordinator.any() 对于 #[coordinator] 宏):这将告诉协调器将任务排队到可用工人最多的队列中
  • TaskPrefs::Preferred(worker_id) (my_coordinator.prefer(worker_id) 对于 #[coordinator] 宏):这将告诉协调器如果工人ID为 worker_id 的工人当前未满,则将任务排队到该工人,否则将任务排队到任何工人。
  • TaskPrefs::Required(worker_id) (my_coordinator.require(worker_id) 对于 #[coordinator] 宏):这将告诉协调器将任务排队到工人ID为 worker_id 的工人。

协调器将尝试使用平均任务完成时间和工人队列中的任务数量来找到最可用的工人。

目录

安装

此crate可在crates.io上找到。请访问此链接以获取最新版本和安装说明。

使用

有关完整示例,请查看playground/examples

// Create a worker that sleeps for 1 sec and return a number that double the input
struct Doubler(String);
impl TaskProcessor<i32> for Doubler {
    type Output = i32;
    async fn do_work(&mut self, task: i32) -> Self::Output {
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("Task {} computed {}", self.0, task * 2);
        task * 2
    }
}

// the queue thershold of a single queue, if the number of task item in queue exceeded the thershold
// any `TaskPref::Preferred(x)` will be processed by a different task processor.
let queue_len = 3;
let b = Coordinator::new(queue_len);

// Add `Doubler` as task processor
b.add_worker("Doubler 1st", Doubler("Doubler 1st".to_string()))
    .await;

// Add a closure as a task processor. Any `FnMut` closure can be used as task processor!
b.add_worker("Doubler 2nd", |x| async move { x * 2 }).await;

// Schedule a task for processing. The task will be polled to completion in the worker future
// and not the current future. The `join_handle` can be used to retrieve the returned value
let join_handle = b.run(2, TaskPrefs::Any).await.unwrap();
println!("Task scheduled!");

// Do other works.....

// Wait for the task result
let rs = join_handle.join().await.unwrap().0;
println!("Task result: {}", rs);

如果您的任务处理器可以处理不同类型的任务(例如:CalculatorProcessor可以处理addsubtract任务),您可以使用#[coordinator]属性宏,以避免在实现TaskProcessor时需要定义自己的输入和输出枚举以及手动调度它们

pub trait InteractableObject {
    fn size(&self) -> [f32; 3];
    fn weight(&self) -> f32;
    fn set_weight(&mut self, val: f32);
}

pub struct Ball /* ... */; // implements [`InteractableObject`]
pub struct Crystal /* ... */; // implements [`InteractableObject`]

// Type alias for not having to type out this long type every time we use it
type ArcMut<T> = Arc<AssertUnwindSafe<Mutex<T>>>;

#[coordinator]
pub trait CatFamily<I>
where
    I: InteractableObject + RefUnwindSafe,
{
    fn locate_object(obj: ArcMut<I>) -> Option<[f32; 3]>;
    fn upgrade<O: InteractableObject>(obj: ArcMut<I>, material: O);
    fn meow() -> bool;
    fn meow_repeatedly(times: usize)
    where
        Self: Send,
    {
        async move {
            for _ in 0..times {
                self.meow().await;
            }
        }
    }
}

pub struct DomesticatedCat {
    name: String,
    exp: usize,
}

impl DomesticatedCat {
    pub fn new(name: String) -> Self {
        Self { name, exp: 0 }
    }
}

// Instead of implementing the [`TaskProcessor`]  trait, we implement the trait generated by `#[coordinator]` instead, this way we don't have to enum dispatch ourself. The trait name will always be `[Name]Processor`
impl<I> CatFamilyProcessor<I> for DomesticatedCat
where
    I: InteractableObject + RefUnwindSafe + Send + Sync + 'static,
{
    async fn locate_object(&mut self, obj: ArcMut<I>) -> Option<[f32; 3]> {
        // ...
    }

    async fn upgrade<O: InteractableObject>(&mut self, obj: ArcMut<I>, material: O) {
        // ...
    }

    async fn meow(&mut self) -> bool {
        // ...
    }
}

pub struct RobotCat /* ... */; // Another CatProcessor impl

async fn main() -> Result<(), Box<dyn Error>> {
    // The `CatFamily` struct is generated automatically, with `From<Coordinator>` impl so you can convert any `Coordinator` into it using `into()`
    let cat_family: CatFamily<Ball, Crystal, &str> = Coordinator::new(3).into();
    cat_family
        .add_worker("Maple", DomesticatedCat::new("Maple".to_owned()))
        .await;

    cat_family
        .add_worker("Oktocat", RobotCat::new("Oktocat".to_owned()))
        .await;

    for _ in 0..10 {
        // Cloning here is only cloning the `Arc` under the hood, not creating a new `Coordinator`
        let cat_family = cat_family.clone();
        tokio::spawn(async move {
            let balls = Arc::new(AssertUnwindSafe(Mutex::new(Ball {
                size: [2.2, 3.3, 4.4],
                weight: 5.9,
                bounciness: 10.2,
            })));

            let crystal = Crystal {
                size: [5.2, 3.1, 6.4],
                weight: 15.9,
                purity: 0.9,
            };

            let (pos, cat) = cat_family
                .any()
                .locate_object(balls.clone())
                .await?
                .join()
                .await?;

            let Some(pos) = pos else {
                println!("Cat {} cannot find the object!", cat);
                return Ok(());
            };

            println!("Cat {} has found the ball at {:?}", cat, pos);

            let (_, cat) = cat_family
                .prefer(&cat)
                .upgrade(balls.clone(), crystal)
                .await?
                .join()
                .await?;

            println!(
                "Cat {} has upgrade ball to {}",
                cat,
                balls.0.lock().await.weight
            );

            // We don't care about the result here so no need to join
            cat_family.require(&cat).meow_repeatedly(3).await?;
            return Ok::<(), Box<dyn Error + Send + Sync + 'static>>(());
        });
    }

    Ok(())
}

贡献

我们欢迎对这个项目做出任何贡献。在提交pull请求之前,请先打开一个问题以检查是否有人已经在该特性上工作。

许可

本项目采用MIT许可证。

依赖项

~225–660KB
~16K SLoC