20个版本
0.6.4 | 2021年10月25日 |
---|---|
0.6.1 | 2021年7月8日 |
0.5.0 | 2021年1月23日 |
0.4.5 | 2020年12月16日 |
0.4.3 | 2020年7月24日 |
#424 in 压缩
15KB
252 代码行
Compress prost! messages with zstd, async streams support
简单的压缩/解压缩
fn do_roundtrip_coders(level: i32, dummies: Vec<proto::Dummy>) {
tracing_subscriber::fmt::try_init().ok();
let writer = vec![];
let mut encoder = ProstEncoder::new(writer, level).unwrap();
for dummy in &dummies {
encoder.write(dummy).unwrap();
}
let compressed = encoder.finish().unwrap();
let mut decoder = ProstDecoder::<proto::Dummy>::new_decompressed(&compressed[..]).unwrap();
let mut i: usize = 0;
while let Some(dummy) = decoder.next() {
let dummy = dummy.unwrap();
assert_eq!(&dummy, dummies.get(i).unwrap());
i += 1;
}
assert_eq!(dummies.len(), i);
}
异步流支持
enable-async
Cargo功能(默认启用)暴露 Compressor
和 Decompressor
结构体
Compressor::build_stream
将 prost! 消息流转换为字节流;Decompressor::stream
将压缩字节流转换为 prost! 消息流。
尽管此示例使用了tokio通道,但此crate不依赖于tokio,它只是在测试中使用。
fn do_roundtrip_channels(chunk_size: usize, level: i32, dummies: Vec<proto::Dummy>) {
tracing_subscriber::fmt::try_init().ok();
let mut rt = Runtime::new().unwrap();
// Dummy source ~> Compressor
let (mut source, dummy_rx) = mpsc::channel::<proto::Dummy>(dummies.len());
// Compressor ~> Decompressor
let (compressed_tx, compressed_rx) = mpsc::channel::<Vec<u8>>(dummies.len());
// Decompressor ~> Dummy sink
let (dummy_tx, mut sink) = mpsc::channel::<proto::Dummy>(dummies.len());
let compressor = Compressor::build_stream(dummy_rx, level, chunk_size).unwrap();
let decompressor = Decompressor::stream(compressed_rx);
rt.block_on(async move {
let compress_task = tokio::task::spawn(
compressor
.map_err(anyhow::Error::new)
.try_fold(compressed_tx, |mut ctx, compressed| async {
ctx.send(compressed)
.await
.map_err(|_| anyhow!("Failed to send compressed"))?;
Ok(ctx)
})
.map_ok(|_| ()),
);
let decompress_task = tokio::task::spawn(
decompressor
.map_err(anyhow::Error::new)
.try_fold(dummy_tx, |mut utx, message| async {
utx.send(message)
.await
.map_err(|_| anyhow!("Failed to send decompressed"))?;
Ok(utx)
})
.map_ok(|_| ()),
);
for dummy in &dummies {
source
.send(dummy.clone())
.await
.map_err(|_| anyhow!("Failed to send to source"))
.unwrap();
}
std::mem::drop(source);
let mut i: usize = 0;
while let Some(dummy) = sink.recv().await {
assert_eq!(&dummy, dummies.get(i).unwrap());
i += 1;
}
let (compress, decompress) =
futures::try_join!(compress_task, decompress_task).unwrap();
compress.unwrap();
decompress.unwrap();
assert_eq!(dummies.len(), i);
});
}
依赖项
~5.5MB
~96K SLoC