#worker #tower #amqp #lapin #building #task #up

lapin-tower-worker

使用lapin和tower构建AMQP工作框架

3个版本

新版本 0.1.3 2024年8月26日
0.1.2 2024年8月26日
0.1.1 2024年8月21日

#5 in #lapin

Download history 140/week @ 2024-08-19

每月 140 次下载

MIT 许可证

18KB
283

lapin-tower-worker

当您需要一种标准化的方式来构建AMQP任务和数据的工作者时

用法

该库在 taskresult 模型上工作,一旦定义了这些,工作者就可以自动化拉取任务、执行您的逻辑并发布结果的过程。

为了标准化应用程序逻辑的集成方式,我们使用 towertokio 作为主要构建模块。当我结束编写工作者重试和超时逻辑以及并发限制时,我想到这个库的想法,因为我只是重新实现了更差且可重用性较低的tower中间件。

定义任务

#[derive(Debug)]
struct MyAwesomeTask {
    value: String
}

impl AMQPTask for MyTask {
    type DecodeError = FromUtf8Error;

    type TaskResult = MyAwesomeResult;

    fn decode(data: Vec<u8>) -> Result<Self, Self::DecodeError> {
        String::from_utf8(data).map(|value| MyTask { value })
    }

    fn queue() -> &'static str {
        "awesome-tasks"
    }
}

要定义任务,您需要定义如何从 Vec<u8> 解码它,以及它从哪个队列中拉取。

另外,别忘了将 Debug 添加到您的任务中或覆盖特质的 debug 函数,这些用于创建可读的跟踪跨度以跟踪任务的执行。

定义结果

#[derive(Debug)]
struct MyAwesomeResult {
    value: String
}

impl AMQPTaskResult for MyAwesomeResult {
    type EncodeError = Infallible;

    fn encode(self) -> Result<Vec<u8>, Self::EncodeError> {
        self.value.into_bytes()
    }

    fn publish_exchange(&self) -> String {
        "" // direct exchange
    }

    fn publish_routing_key(&self) -> String {
        "awesome-results" // through direct exchange this will end up in the queue "awesome-results"
    }
}

我的任务不产生结果...

AmqpTaskResult() 实现,并且 publish 实现是一个无操作。如果您指定 () 作为您的 TaskResult,则不会发布任何内容。

创建工作者

请将您的逻辑放入一个 tower::Service 中,有关更多信息,请参阅 tower 文档。让我们使用我们熟悉的好朋友 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>> 来定义我们的服务未来类型,并将 anyhow::Error 抛入以保持错误简单。

请记住,您的 Service 需要是 Clone 可的,这允许工作器在您的服务上并发运行任务。

poll_readytower 中用于确定服务是否准备好处理请求,我不会详细介绍在每次调用时在您的服务中存储未来以便轮询的细节。如果您希望由于某种原因使您的服务停止接受任务并终止,您可以返回您的 Error 类型。

我承认 tower 不容易使用,但它是一种好且标准化的方法来应用有用的中间件。

#[derive(Clone)]
struct AwesomeService {
    db_conn: DbConnection, // This thing is almost always cloneable
}

impl Service<MyAwesomeTask> for AwesomeService {
    type Response = MyAwesomeResult;
    type Error = anyhow::Error;

    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(
        &mut self,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, task: MyAwesomeTask) -> Self::Future {
        let db_conn = self.db_conn.clone();
        Box::pin(async move {
            let stuff = do_awesome_stuff(&db_conn, task.value).await?;
            Ok(MyAwesomeResult { value: stuff })
        })
    }

让我们将所有内容组合到 worker 中并运行它。

获取一个 lapin 连接并创建一个通道,这就是您将使用此包的大部分内容。

let amqp =
    lapin::Connection::connect("amqp://user:password@localhost:5672", Default::default())
        .await
        .unwrap();
let channel = amqp.create_channel().await.unwrap();

初始化服务并将其交给工作器,然后在 tokio::task::JoinSet 上启动工作器。工作器需要一个用于消费者标签和跟踪的名称。您可以将 WorkerConfig 设置为更改某些行为,例如在解码错误时 ack 消息或不禁用,因为我不想剥夺这个决定。

let service = AwesomeService {
    db_conn: get_db_conn().await.unwrap(),
};
let worker = AMQPWorker::new(
    "awesome-worker",
    service,
    channel.clone(),
    WorkerConfig::default(),
);

让我们为我们的工作器添加一个 ConcurrencyLimitLayer,这将限制可以启动的并发任务数量,否则工作器将拉取队列中的所有任务并并发运行它们,这在分布式架构中可能是不希望的。add_layer 在每次都会返回一个新的工作器实例。

let worker = worker.add_layer(ConcurrencyLimitLayer::new(8));

工作器不能太长时间,让我们添加一个超时。

let worker = worker.add_layer(TimeoutLayer::new(Duration::from_secs(60*5))); // 5 minutes

现在我们可以在 joinset 上启动工作器。

let mut set = tokio::task::JoinSet::new();
let consume_options = BasicConsumeOptions::default();
let consume_arguments = FieldTable::default();
worker.consume_spawn(consume_options, consume_arguments, &mut set);

依赖关系

~10–22MB
~349K SLoC