59 个版本
0.21.1 | 2020 年 12 月 16 日 |
---|---|
0.20.23 | 2020 年 10 月 15 日 |
0.20.21 | 2020 年 7 月 31 日 |
0.20.2 | 2020 年 3 月 29 日 |
0.1.4 | 2018 年 12 月 23 日 |
#3 在 #sqs
每月 132 次下载
用于 2 crates
77KB
2K SLoC
sqs-lambda
一个用于编写由 SQS 触发的 AWS Lambda 的 Rust crate
这个库允许您将单事件 SQS 触发的 Lambda 转换为一个更像流处理器的工具。这对于高吞吐量工作负载非常有用。
此示例显示了与 lambda_runtime 一起使用此库的方法。
在这个例子中,我们
- 由 S3 通知事件触发的 SqsEvents 触发
- S3PayloadRetriever 使用 ZstdDecoder 下载并解压缩有效载荷
- 我们创建了 40 个 EventProcessors,将我们的 CustomService 封装起来
- 完成处理程序将在上传到 S3 之前合并完成的事件
fn my_handler(event: SqsEvent, ctx: Context) -> Result<(), HandlerError> {
tokio_compat::run_std(
async {
let queue_url = std::env::var("QUEUE_URL").expect("QUEUE_URL");
info!("Queue Url: {}", queue_url);
let output_bucket = "event-destination-bucket";
let region = {
let region_str = std::env::var("AWS_REGION").expect("AWS_REGION");
Region::from_str(®ion_str).expect("Region error")
};
info!("Defining consume policy");
let consume_policy = ConsumePolicy::new(
ctx, // Use the Context.deadline from the lambda_runtime
Duration::from_secs(2), // Stop consuming when there's 2 seconds left in the runtime
3, // If we get 3 empty receives in a row, stop consuming
);
info!("Defining consume policy");
let (tx, shutdown_notify) = tokio::sync::oneshot::channel();
info!("SqsConsumer");
let sqs_consumer = SqsConsumerActor::new(
SqsConsumer::new(SqsClient::new(region.clone()), queue_url.clone(), consume_policy, tx)
);
info!("SqsCompletionHandler");
let sqs_completion_handler = SqsCompletionHandlerActor::new(
SqsCompletionHandler::new(
SqsClient::new(region.clone()),
queue_url.to_string(),
SubgraphSerializer { proto: Vec::with_capacity(1024) },
S3EventEmitter::new(
S3Client::new(region.clone()),
bucket.to_owned(),
time_based_key_fn,
),
CompletionPolicy::new(
1000, // Buffer up to 1000 messages
Duration::from_secs(30), // Buffer for up to 30 seconds
),
)
);
info!("EventProcessors");
let event_processors: Vec<_> = (0..40)
.map(|_| {
EventProcessorActor::new(EventProcessor::new(
sqs_consumer.clone(),
sqs_completion_handler.clone(),
CustomService {},
S3EventRetriever::new(S3Client::new(region.clone()), ZstdDecoder::default()),
))
})
.collect();
info!("Start Processing");
futures::future::join_all(event_processors.iter().map(|ep| ep.start_processing())).await;
let mut proc_iter = event_processors.iter().cycle();
for event in event.records {
let next_proc = proc_iter.next().unwrap();
next_proc.process_event(
map_sqs_message(event)
).await;
}
info!("Waiting for shutdown notification");
// Wait for the consumers to shutdown
let _ = shutdown_notify.await;
tokio::time::delay_for(Duration::from_millis(100)).await;
info!("Consumer shutdown");
});
info!("Completed execution");
Ok(())
}
依赖项
~31–44MB
~773K SLoC