7个版本 (4个稳定版)

1.2.0 2021年7月27日
1.1.0 2020年6月23日
0.1.2 2019年9月11日
0.1.1 2019年8月27日
0.1.0 2019年7月15日

#446 in 算法

每月下载量43次
用于 3 crates

MIT许可证

79KB
1.5K SLoC

Latest Version Documentation License

Rust批处理算法实现库。

批处理通过累积项目,当批处理达到限制时,自动将所有项目一起刷新。单批收集的所有项目都可供进一步处理(例如,批量插入数据库)。

这些实现将根据以下内容构建批处理:

  • 单个批处理中收集的项目数量限制,
  • 从第一个项目添加到批处理以来的时间限制,
  • 调用批处理消费方法之一,
  • 在批处理项目之间发送刷新命令(基于通道的实现)。

查看可用算法的文档

示例

通过达到不同的单个批处理限制并使用Flush命令来收集来自两个流的物品批处理。

use multistream_batch::channel::multi_buf_batch::MultiBufBatchChannel;
use multistream_batch::channel::multi_buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread with a channel-based, multi-stream batching implementation configured with a maximum size
// of 4 items (for each stream) and a maximum batch duration since the first received item of 200 ms.
let mut batch = MultiBufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
	// Send a sequence of `Append` commands with integer stream key and item value
	sender.send(Append(1, 1)).unwrap();
	sender.send(Append(0, 1)).unwrap();
	sender.send(Append(1, 2)).unwrap();
	sender.send(Append(0, 2)).unwrap();
	sender.send(Append(1, 3)).unwrap();
	sender.send(Append(0, 3)).unwrap();
	sender.send(Append(1, 4)).unwrap();
	// At this point batch with stream key `1` should have reached its capacity of 4 items
	sender.send(Append(0, 4)).unwrap();
	// At this point batch with stream key `0` should have reached its capacity of 4 items

	// Send some more to buffer up for next batch
	sender.send(Append(0, 5)).unwrap();
	sender.send(Append(1, 5)).unwrap();
	sender.send(Append(1, 6)).unwrap();
	sender.send(Append(0, 6)).unwrap();

	// Introduce delay to trigger maximum duration timeout
	std::thread::sleep(Duration::from_millis(400));

	// Send items that will be flushed by `Flush` command
	sender.send(Append(0, 7)).unwrap();
	sender.send(Append(1, 7)).unwrap();
	sender.send(Append(1, 8)).unwrap();
	sender.send(Append(0, 8)).unwrap();
	// Flush outstanding items for batch with stream key `1` and `0`
	sender.send(Flush(1)).unwrap();
	sender.send(Flush(0)).unwrap();

	// Last buffered up items will be flushed automatically when this thread exits
	sender.send(Append(0, 9)).unwrap();
	sender.send(Append(1, 9)).unwrap();
	sender.send(Append(1, 10)).unwrap();
	sender.send(Append(0, 10)).unwrap();
	// Exiting closure will shutdown the producer thread
});

// Batches flushed due to individual batch size limit
assert_matches!(batch.next(), Ok((1, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);

assert_matches!(batch.next(), Ok((0, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);

// Batches flushed due to duration limit
assert_matches!(batch.next(), Ok((0, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);

assert_matches!(batch.next(), Ok((1, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);

// Batches flushed by sending `Flush` command starting from batch with stream key `1`
assert_matches!(batch.next(), Ok((1, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);

assert_matches!(batch.next(), Ok((0, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);

// Batches flushed by dropping sender (thread exit)
assert_matches!(batch.next(), Ok((0, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);

assert_matches!(batch.next(), Ok((1, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);

依赖关系

~395KB