#task-queue #actix #generic #service #worker #i32 #pair

actix-taskqueue

为 Actix 提供的通用任务队列服务

1 个不稳定版本

0.1.0 2020 年 8 月 17 日

#34 in #task-queue

自定义许可证

16KB
123 代码行(不含注释)

Actix 任务队列

Actix 任务队列是一个通用任务队列服务。在您的应用程序中,您可以为许多不同的 Task 和 Task Result 对提供许多不同的任务队列。

一对 Task 和 Task Result 的任务队列将是一个 SystemService

要使用队列,您首先需要为您的 Task 和 Task Result 定义一个数据结构

例如

#[derive(Debug, Default, Clone, Copy)]
struct Task(i32);
struct TaskResult(i32);

接下来,您需要为您的一对 Task 和 Task Result 实现 QueueConsumer 特性

#[async_trait]
impl QueueConsumer<Task, TaskResult> for TaskWorker<Task, TaskResult> {
    async fn execute(&self, task: Task) -> Result<TaskResult, WorkerExecuteError> {
      ...
    }

    fn get_queue(&self) -> Addr<TaskQueue<Task>> {
      ...
    }

    fn retry(&self, task: Task) -> Task {
      ...
    }

    fn drop(&self, task: Task) {
      ...
    }

    fn result(&self, result: TaskResult) {
      ...
    }
}

当您需要运行队列中下一个可用的任务时,调用您的任务工作者的 next() 方法。

worker.next().await

此方法将返回一个 Result<TaskResult, WorkerExecuteError>


以下示例描述了一个 i32 数字的任务队列,在执行时,工作者将

  • 如果数字大于或等于 5,则将该数字加 5。
  • 如果数字大于 0 但小于 5,则将数字加 1 并将其送回队列以重试。
  • 如果数字小于 0,则将数字从队列中删除。
#[derive(Debug, Default, Clone, Copy)]
struct PlusFive(i32);
struct PlusFiveResult(i32);

#[async_trait]
impl QueueConsumer<PlusFive, PlusFiveResult> for TaskWorker<PlusFive, PlusFiveResult> {
    async fn execute(&self, task: PlusFive) -> Result<PlusFiveResult, WorkerExecuteError> {
        let PlusFive(n) = task;
        if n >= 5 {
            return Ok(PlusFiveResult(n + 5));
        } else if n > 0 {
            return Err(WorkerExecuteError::Retryable);
        } else {
            return Err(WorkerExecuteError::NonRetryable);
        }
    }

    fn get_queue(&self) -> Addr<TaskQueue<PlusFive>> {
        TaskQueue::<PlusFive>::from_registry()
    }

    fn retry(&self, task: PlusFive) -> PlusFive {
        let PlusFive(n) = task;
        println!("RETRYING VALUE = {}", n);
        PlusFive(n + 1)
    }

    fn drop(&self, task: PlusFive) {
        let PlusFive(n) = task;
        println!("DROPPED TASK WITH VALUE = {}", n);
    }

    fn result(&self, result: PlusFiveResult) {
        let PlusFiveResult(n) = result;
        println!("RESULT = {}", n);
    }
}

#[actix_rt::main]
async fn main() {
    let queue = TaskQueue::<PlusFive>::from_registry();
    let worker = TaskWorker::<PlusFive, PlusFiveResult>::new();

    queue.do_send(Push::new(PlusFive(5)));
    queue.do_send(Push::new(PlusFive(8)));
    queue.do_send(Push::new(PlusFive(3)));
    queue.do_send(Push::new(PlusFive(11)));
    queue.do_send(Push::new(PlusFive(0)));
    queue.do_send(Push::new(PlusFive(20)));

    worker.next().await;
    worker.next().await;
    worker.next().await;
    worker.next().await;
    worker.next().await;
    worker.next().await;
    worker.next().await;
    worker.next().await;
    worker.next().await;
    worker.next().await;
}

依赖关系

~8MB
~132K SLoC