#disk #que #store #data-store #database #memory-mapped #data

rustque

这是一个为 Rust 设计的快速磁盘持久内存映射队列,它将 Vec<u8> 存储到磁盘上的文件。

12 个稳定版本

1.1.1 2022年2月13日
1.1.0 2022年2月7日
1.0.9 2022年1月24日
1.0.3 2021年12月21日

81数据库实现 中排名

44 每月下载次数
被用于 letterman

GPL-3.0 许可证

400KB
9K SLoC

rustque

这是一个基于 tokio 的快速优化持久队列,为 Rust 设计,它将 Vec 写入磁盘上的文件,支持的函数包括添加、获取和删除,它保留一个内存中的数据映射,因此可以支持大型数据集,但需要适当的内存。

示例代码


use rustque::{Que,Config};

#[tokio::main]
async fn main(){

    //---------------------------
    //initiate que
    //---------------------------
    let mut que:Que;
    match Que::new(Config::new(
        vec![
            "D://workstation/expo/rust/rust_store/test/rustque/que1.rustque".to_string(),
            "D://workstation/expo/rust/rust_store/test/rustque/que2.rustque".to_string(),
            "D://workstation/expo/rust/rust_store/test/rustque/que3.rustque".to_string()
        ],                  //que files
        500000000,          //min que size on disk 
        5000000,            //expand file on disk by this many bytes when full
        100                 //no of disk workers per que file
    )).await{
        Ok(v)=>{
            que = v;
            println!("que initiated : {:?}",hold.elapsed());
        },
        Err(e)=>{
            println!("!!! failed-que::new => {:?}",e);
            return;
        }
    }

    //---------------------------
    //write items to the que
    //---------------------------
    if true {
        match que.add(vec![1,2,3]).await{
            Ok(mut que_response)=>{
                collect.push(async move{
                    que_response.check().await
                });
            },
            Err(_e)=>{
                println!("!!! failed-que-add : {:?}",_e);
            }
        }
    }

    //---------------------------
    // please enable get, remove and reset
    // functions once at a time or write 
    // que items for each of them
    //---------------------------

    //---------------------------
    //get qued item from que
    //---------------------------
    if true{
        match que.next().await{
            Ok(mut next_response)=>{
                let _quer_resp = next_response.check().await;
                if !_quer_resp {break;}
                match next_response.data().await{
                    Some((value,pointer))=>{
                        println!("value : {:?}",value);
                    },
                    None=>{}
                }
            },
            Err(_e)=>{
                println!("!!! failed-que-get : {:?}",_e);
            }
        }
    }

    //---------------------------
    //remove item from que
    //---------------------------
    if true{
        match que.next().await{
            Ok(mut next_response)=>{
                if next_response.check().await {
                    match next_response.data().await{
                        Some((_value,pointer))=>{
                            match que.remove(pointer).await{
                                Ok(mut remove_response)=>{
                                    let remove_resp = remove_response.check().await;
                                    println!("remove resp : {:?}",remove_resp);
                                },
                                Err(_e)=>{
                                    println!("!!! failed-que-remove : {:?}",_e);
                                }
                            }
                        },
                        None=>{}
                    }
                }
            },
            Err(_e)=>{
                println!("!!! failed-que-get : {:?}",_e);
            }
        }
    }

    //---------------------------
    //reset item in que
    //---------------------------
    if true{
        match que.next().await{
            Ok(mut next_response)=>{
                if next_response.check().await {
                    match next_response.data().await{
                        Some((_value,pointer))=>{
                            match que.reset(pointer).await{
                                Ok(mut reset_response)=>{
                                    let reset_resp = reset_response.check().await;
                                    println!("reset resp : {:?}",reset_resp);
                                },
                                Err(_e)=>{
                                    println!("!!! failed-que-reset : {:?}",_e);
                                }
                            }
                        },
                        None=>{}
                    }
                }
            },
            Err(_e)=>{
                println!("!!! failed-que-get : {:?}",_e);
            }
        }
    }

}

基准测试

基准测试是一个需要在 cargo.toml 中启用的功能,这个功能提供了测试硬件最佳设置的工具,您需要提供多个设置进行测试,结果将写入您指定的文件。

[dependencies]
//enable bechmark feature in cargo.toml
rustque = {version="1.0.4", features=["benchmark"]}
use rustque::bechmark::{BenchmarkBuilder,Benchmark};

#[tokio::main]
async fn main(){

    //---------------------------
    //init benchmark builder
    //---------------------------
    let mut build = BenchmarkBuilder::new(
        "D://workstation/expo/rust/rust_store/test/rustque/bechmark_8.txt".to_string()
    );

    //---------------------------
    //add a benchmark
    //---------------------------
    build.add(Benchmark{
        no_of_writers:10,
        no_of_writes:5000,
        map_files:vec![
            "D://workstation/expo/rust/rust_store/test/rustque/que1.rustque".to_string(),
            "D://workstation/expo/rust/rust_store/test/rustque/que2.rustque".to_string(),
            "D://workstation/expo/rust/rust_store/test/rustque/que3.rustque".to_string(),
        ],
        write_size:256,
        min_que_size:10000000,
        expansion_size:5000000,
        no_of_disk_workers:10
    });

    //---------------------------
    //run the benchmarks
    //---------------------------
    build.run().await;

}

依赖项

~3–10MB
~79K SLoC