#queue #persistent #disk #data-structures

yaque

Yaque 是一个基于磁盘的 Rust 持久队列

18 个版本

0.6.6 2023 年 11 月 4 日
0.6.4 2022 年 8 月 21 日
0.6.3 2022 年 2 月 4 日
0.6.2 2021 年 5 月 15 日
0.4.2 2020 年 6 月 25 日

#101文件系统

Download history 150/week @ 2024-03-13 148/week @ 2024-03-20 210/week @ 2024-03-27 222/week @ 2024-04-03 168/week @ 2024-04-10 172/week @ 2024-04-17 238/week @ 2024-04-24 100/week @ 2024-05-01 144/week @ 2024-05-08 118/week @ 2024-05-15 114/week @ 2024-05-22 178/week @ 2024-05-29 288/week @ 2024-06-05 248/week @ 2024-06-12 270/week @ 2024-06-19 132/week @ 2024-06-26

969 次每月下载
用于 atm0s-media-server

Apache-2.0

150KB
2.5K SLoC

Yaque: 另一个队列

Yaque 是一个基于磁盘的 Rust 持久队列(和互斥锁)。它使用操作系统的文件系统实现一个 SPSC 通道。与简单的 VecDeque<T> 相比,它的主要优点是:

  • 您不受 RAM 大小的限制,只受磁盘大小的限制。这意味着您可以在不发生 OOM 杀死的情况下存储千兆字节的数据。
  • 即使程序崩溃,您的数据也是安全的。队列的所有状态都会在队列释放时写入磁盘。
  • 您的数据可以 持久化,即可以通过程序的多轮执行存在。将其视为一种非常基础的数据库类型。
  • 您可以在两个进程之间传递数据。

Yaque 是 异步的,并且直接构建在 mionotify 之上。因此,它与您为应用程序使用的运行时完全无关。它可以与 tokioasync-std 或您选择的任何其他执行器无缝工作。

示例用法

要创建一个新队列,只需使用 channel 函数,传递一个用于挂载队列的目录路径。如果创建时目录不存在,它(以及可能的所有父目录)将被创建。

use yaque::channel;

futures::executor::block_on(async {
    let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
})

如果您需要,还可以使用 Sender::openReceiver::open 只打开通道的一半。

用法类似于标准库中的 MPSC 通道,除了接收方法 Receiver::recv 是异步的。使用发送者写入队列基本上是无锁和原子的。

use yaque::{channel, queue::try_clear};

futures::executor::block_on(async {
    // Open using the `channel` function or directly with the constructors.
    let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
    
    // Send stuff with the sender...
    sender.send(b"some data").await.unwrap();

    // ... and receive it in the other side.
    let data = receiver.recv().await.unwrap();

    assert_eq!(&*data, b"some data");

    // Call this to make the changes to the queue permanent.
    // Not calling it will revert the state of the queue.
    data.commit();
});

// After everything is said and done, you may delete the queue.
// Use `clear` for awaiting for the queue to be released.
try_clear("data/my-queue").unwrap();

返回值 data 是一种守卫,它实现了对底层类型的 DerefDerefMut

queue::RecvGuard 和事务行为

需要注意的一个重要事项是,从队列中的读取是事务性的。函数Receiver::recv返回一个queue::RecvGuard,它充当一个死权门开关。如果它被丢弃,它将撤销出队操作,除非显式调用queue::RecvGuard::commit。这确保了操作在恐慌和从错误中提前返回(例如使用?表示法时)会回滚。然而,在回滚过程中,还需要执行一个文件系统操作。在丢弃过程中,这是以“尽力而为”的方式完成的:如果发生错误,它会被记录并忽略。这是因为在丢弃之外,错误无法传播,并且在丢弃中的恐慌可能会使程序被中止。如果你任何从回滚中清理错误的行为,你可以调用queue::RecvGuard::rollback,这将确实返回底层错误。

批量操作

您还可以使用yaque队列发送和接收数据批量。保证与单个读取和写入相同,只是在发送项目时可以节省OS开销,因为只进行一次磁盘操作。有关接收者批量的更多信息,请参阅Sender::send_batchReceiver::recv_batchReceiver::recv_until

厌倦了等待.await?支持超时

如果您需要您的应用程序在队列上没有内容时不会停滞,您可以使用Receiver::recv_timeoutReceiver::recv_batch_timeout来接收数据,等待提供的未来(例如延迟或通道)完成。以下是一个示例

use yaque::channel;
use std::time::Duration;
use futures_timer::Delay;

futures::executor::block_on(async {
    let (mut sender, mut receiver) = channel("data/my-queue-2").unwrap();
    
    // receive some data up to a second
    let data = receiver
        .recv_timeout(Delay::new(Duration::from_secs(1)))
        .await
        .unwrap();

    // Nothing was sent, so no data...
    assert!(data.is_none());
    drop(data);
    
    // ... but if you do send something...
    sender.send(b"some data").await.unwrap();
 
    // ... now you receive something:
    let data = receiver
        .recv_timeout(Delay::new(Duration::from_secs(1)))
        .await
        .unwrap();

    assert_eq!(&*data.unwrap(), b"some data");  
});

Ctrl+C和其他意外事件

首先,“不要慌张”!向队列写入是一个原子操作。因此,除非你的操作系统真的出了问题,否则在大多数时候你不用担心数据损坏。

在发生恐慌(程序的不是程序员的)的情况下,队列将保证保存接收者最新的所有元数据。对于读者来说甚至更简单:一开始就没有什么需要保存的。唯一例外是,如果保存操作由于IO错误失败。请记住,程序不允许在恐慌期间进行恐慌。因此在这种情况下,yaque不会尝试从错误中恢复。

不能这么说操作系统信号。操作系统信号不是由这个库自动处理的。理解到应用程序程序员最了解如何处理它们。如果您选择在Ctrl+C或其他信号上关闭队列,那么您很幸运!保存队列的每一侧都是async-signal-safe,因此您可以直接使用例如signal_hookhttps://docs.rs/signal-hook/)设置裸信号钩子,如果您喜欢unsafe代码。如果不这样做,有大量完全安全的替代方案。选择最适合您的那一个。

不幸的是,有时您会遇到中止被杀的情况。这些信号无法被任何库处理。当这种情况发生时,并不是所有的东西都丢失了。我们提供了一个完整的模块recovery,以帮助您进行自动队列恢复。请检查该模块以获取具体的函数名称。从架构角度来看,我们提供了两种不同的队列恢复方法,可能适用于不同的用例

  1. 使用重放(标准方法):我们可以在崩溃期间重建队列实际状态的下限,该下限由以下两个位置的最大值组成:
    • 目录中最小段的底部。
    • 元数据文件中指示的位置。

由于这是一个下限,一些元素可能需要重放。如果您的处理是幂等的,这不会成为问题,您不会丢失任何数据。

  1. 使用损失恢复:我们还可以重建队列实际状态的上限:队列中第二小段的底部。在这种情况下,最小段被简单地删除,接收器继续像什么都没有发生一样运行。如果重放是不可接受的,但某些数据损失是可以接受的,这可能是一个适合您的选择。您可以通过在SenderBuilder上配置此选项来限制段大小,从而限制数据损失。

如果您真的想更安全,可以使用Receiver::save定期备份接收器状态。只需选择您喜欢的定时器实现,并设置一个每数百毫秒执行一次的简单周期性任务。但是,请注意,这只是一个缓解措施,而不是解决方案。

已知问题和下一步

  • 这是一个全新的项目。尽管我已经测试过它,并且它肯定不会让您的计算机崩溃,但您还不能依赖它。此代码正在非关键应用程序的生产环境中运行。
  • 当队列足够小并且发送者频繁发送许多非原子的小消息时,会浪费过多的内核时间。您可以通过批量写入队列来缓解这个问题。
  • 可能存在一些未知的错误隐藏在某些边缘情况中。如果您发现一个错误,请在GitHub上提交问题。我们也非常欢迎pull requests和贡献。

依赖关系

~2-13MB
~102K SLoC