#protobuf #zstd #stream #prost #async-stream

prosto

Compress prost! messages with zstd, optional tokio channels support

20个版本

0.6.4 2021年10月25日
0.6.1 2021年7月8日
0.5.0 2021年1月23日
0.4.5 2020年12月16日
0.4.3 2020年7月24日

#424 in 压缩

自定义许可

15KB
252 代码行

Compress prost! messages with zstd, async streams support

docs

简单的压缩/解压缩

fn do_roundtrip_coders(level: i32, dummies: Vec<proto::Dummy>) {
    tracing_subscriber::fmt::try_init().ok();

    let writer = vec![];
    let mut encoder = ProstEncoder::new(writer, level).unwrap();
    for dummy in &dummies {
        encoder.write(dummy).unwrap();
    }
    let compressed = encoder.finish().unwrap();

    let mut decoder = ProstDecoder::<proto::Dummy>::new_decompressed(&compressed[..]).unwrap();

    let mut i: usize = 0;
    while let Some(dummy) = decoder.next() {
        let dummy = dummy.unwrap();
        assert_eq!(&dummy, dummies.get(i).unwrap());
        i += 1;
    }

    assert_eq!(dummies.len(), i);
}

异步流支持

enable-async Cargo功能(默认启用)暴露 CompressorDecompressor 结构体

  • Compressor::build_stream 将 prost! 消息流转换为字节流;
  • Decompressor::stream 将压缩字节流转换为 prost! 消息流。

尽管此示例使用了tokio通道,但此crate不依赖于tokio,它只是在测试中使用。

fn do_roundtrip_channels(chunk_size: usize, level: i32, dummies: Vec<proto::Dummy>) {
    tracing_subscriber::fmt::try_init().ok();

    let mut rt = Runtime::new().unwrap();

    // Dummy source ~> Compressor
    let (mut source, dummy_rx) = mpsc::channel::<proto::Dummy>(dummies.len());
    // Compressor ~> Decompressor
    let (compressed_tx, compressed_rx) = mpsc::channel::<Vec<u8>>(dummies.len());
    // Decompressor ~> Dummy sink
    let (dummy_tx, mut sink) = mpsc::channel::<proto::Dummy>(dummies.len());

    let compressor = Compressor::build_stream(dummy_rx, level, chunk_size).unwrap();
    let decompressor = Decompressor::stream(compressed_rx);

    rt.block_on(async move {
        let compress_task = tokio::task::spawn(
            compressor
                .map_err(anyhow::Error::new)
                .try_fold(compressed_tx, |mut ctx, compressed| async {
                    ctx.send(compressed)
                        .await
                        .map_err(|_| anyhow!("Failed to send compressed"))?;
                    Ok(ctx)
                })
                .map_ok(|_| ()),
        );
        let decompress_task = tokio::task::spawn(
            decompressor
                .map_err(anyhow::Error::new)
                .try_fold(dummy_tx, |mut utx, message| async {
                    utx.send(message)
                        .await
                        .map_err(|_| anyhow!("Failed to send decompressed"))?;
                    Ok(utx)
                })
                .map_ok(|_| ()),
        );

        for dummy in &dummies {
            source
                .send(dummy.clone())
                .await
                .map_err(|_| anyhow!("Failed to send to source"))
                .unwrap();
        }

        std::mem::drop(source);

        let mut i: usize = 0;
        while let Some(dummy) = sink.recv().await {
            assert_eq!(&dummy, dummies.get(i).unwrap());
            i += 1;
        }

        let (compress, decompress) =
            futures::try_join!(compress_task, decompress_task).unwrap();
        compress.unwrap();
        decompress.unwrap();
        assert_eq!(dummies.len(), i);
    });
}

依赖项

~5.5MB
~96K SLoC