59 个版本

0.21.1 2020 年 12 月 16 日
0.20.23 2020 年 10 月 15 日
0.20.21 2020 年 7 月 31 日
0.20.2 2020 年 3 月 29 日
0.1.4 2018 年 12 月 23 日

#3#sqs

Download history 3/week @ 2024-03-13 54/week @ 2024-03-27 57/week @ 2024-04-03

每月 132 次下载
用于 2 crates

MIT 许可证

77KB
2K SLoC

sqs-lambda

一个用于编写由 SQS 触发的 AWS Lambda 的 Rust crate

这个库允许您将单事件 SQS 触发的 Lambda 转换为一个更像流处理器的工具。这对于高吞吐量工作负载非常有用。

此示例显示了与 lambda_runtime 一起使用此库的方法。

在这个例子中,我们

  • 由 S3 通知事件触发的 SqsEvents 触发
  • S3PayloadRetriever 使用 ZstdDecoder 下载并解压缩有效载荷
  • 我们创建了 40 个 EventProcessors,将我们的 CustomService 封装起来
  • 完成处理程序将在上传到 S3 之前合并完成的事件
fn my_handler(event: SqsEvent, ctx: Context) -> Result<(), HandlerError> {
    tokio_compat::run_std(
        async {

            let queue_url = std::env::var("QUEUE_URL").expect("QUEUE_URL");
            info!("Queue Url: {}", queue_url);
            let output_bucket = "event-destination-bucket";

            let region = {
                let region_str = std::env::var("AWS_REGION").expect("AWS_REGION");
                Region::from_str(&region_str).expect("Region error")
            };

            info!("Defining consume policy");
            let consume_policy = ConsumePolicy::new(
                ctx, // Use the Context.deadline from the lambda_runtime
                Duration::from_secs(2), // Stop consuming when there's 2 seconds left in the runtime
                3, // If we get 3 empty receives in a row, stop consuming
            );

            info!("Defining consume policy");
            let (tx, shutdown_notify) = tokio::sync::oneshot::channel();

            info!("SqsConsumer");
            let sqs_consumer = SqsConsumerActor::new(
                SqsConsumer::new(SqsClient::new(region.clone()), queue_url.clone(), consume_policy, tx)
            );

            info!("SqsCompletionHandler");
            let sqs_completion_handler = SqsCompletionHandlerActor::new(
                SqsCompletionHandler::new(
                    SqsClient::new(region.clone()),
                    queue_url.to_string(),
                    SubgraphSerializer { proto: Vec::with_capacity(1024) },
                    S3EventEmitter::new(
                        S3Client::new(region.clone()),
                        bucket.to_owned(),
                        time_based_key_fn,
                    ),
                    CompletionPolicy::new(
                        1000, // Buffer up to 1000 messages
                        Duration::from_secs(30), // Buffer for up to 30 seconds
                    ),
                )
            );

            info!("EventProcessors");
            let event_processors: Vec<_> = (0..40)
                .map(|_| {
                    EventProcessorActor::new(EventProcessor::new(
                        sqs_consumer.clone(),
                        sqs_completion_handler.clone(),
                        CustomService {},
                        S3EventRetriever::new(S3Client::new(region.clone()), ZstdDecoder::default()),
                    ))
                })
                .collect();

            info!("Start Processing");

            futures::future::join_all(event_processors.iter().map(|ep| ep.start_processing())).await;

            let mut proc_iter = event_processors.iter().cycle();
            for event in event.records {
                let next_proc = proc_iter.next().unwrap();
                next_proc.process_event(
                    map_sqs_message(event)
                ).await;
            }

            info!("Waiting for shutdown notification");
            // Wait for the consumers to shutdown
            let _ = shutdown_notify.await;

            tokio::time::delay_for(Duration::from_millis(100)).await;
            info!("Consumer shutdown");

        });


    info!("Completed execution");
    Ok(())
}

依赖项

~31–44MB
~773K SLoC