#batch #low-latency #async #synchronize

benjamin_batchly

低延迟批量工具。将大量单个并发操作打包成顺序工作批次。

3 个版本

0.1.2 2024年6月28日
0.1.1 2022年7月14日
0.1.0 2022年6月4日

#206并发

Download history 126/week @ 2024-06-23 22/week @ 2024-06-30 17/week @ 2024-07-07 92/week @ 2024-07-28 4/week @ 2024-08-04

每月 96 次下载

Apache-2.0

15KB
170 代码行

benjamin_batchly crates.io 文档

低延迟批量工具。将大量单个并发操作打包成顺序工作批次。

use benjamin_batchly::{BatchMutex, BatchResult};

let batcher = BatchMutex::default();

// BatchMutex synchronizes so only one `Work` happens at a time (for a given batch_key).
// All concurrent submissions made while an existing `Work` is being processed will
// await completion and form the next `Work` batch.
match batcher.submit(batch_key, item).await {
    BatchResult::Work(mut batch) => {
        db_bulk_insert(&batch.items).await?;
        batch.notify_all_done();
        Ok(())
    }
    BatchResult::Done(_) => Ok(()),
    BatchResult::Failed => Err("failed"),
}

lib.rs:

低延迟批量工具。将大量单个并发操作打包成顺序工作批次。

例如,许多并发竞争的单个数据库更新任务可以打包成批量更新。

示例

use benjamin_batchly::{BatchMutex, BatchResult};

let batcher = BatchMutex::default();

// BatchMutex synchronizes so only one `Work` happens at a time (for a given batch_key).
// All concurrent submissions made while an existing `Work` is being processed will
// await completion and form the next `Work` batch.
match batcher.submit(batch_key, item).await {
    BatchResult::Work(mut batch) => {
        db_bulk_insert(&batch.items).await?;
        batch.notify_all_done();
        Ok(())
    }
    BatchResult::Done(_) => Ok(()),
    BatchResult::Failed => Err("failed"),
}

示例:返回值

每个项目也可以在其内部的 BatchResult::Done 中收到它自己的返回值。

例如,一个 Result 来传递为什么某些批量项目失败给它们的提交者。

use anyhow::anyhow;
use benjamin_batchly::{BatchMutex, BatchResult};

// 3rd type is value returned by BatchResult::Done
let batcher: BatchMutex<_, _, anyhow::Result<()>> = BatchMutex::default();

match batcher.submit(batch_key, my_item).await {
    BatchResult::Work(mut batch) => {
        let results = db_bulk_insert(&batch.items).await;

        // iterate over results and notify each item's submitter
        for (index, success) in results {
            if success {
                batch.notify_done(index, Ok(()));
            } else {
                batch.notify_done(index, Err(anyhow!("insert failed")));
            }
        }

        // receive the local `my_item` return value
        batch.recv_local_notify_done().unwrap()
    }
    BatchResult::Done(result) => result,
    BatchResult::Failed => Err(anyhow!("batch failed")),
}

依赖项

~3.5–9.5MB
~81K SLoC