18 个版本
0.6.6 | 2023 年 11 月 4 日 |
---|---|
0.6.4 | 2022 年 8 月 21 日 |
0.6.3 | 2022 年 2 月 4 日 |
0.6.2 | 2021 年 5 月 15 日 |
0.4.2 | 2020 年 6 月 25 日 |
#101 在 文件系统 中
969 次每月下载
用于 atm0s-media-server
150KB
2.5K SLoC
Yaque: 另一个队列
Yaque 是一个基于磁盘的 Rust 持久队列(和互斥锁)。它使用操作系统的文件系统实现一个 SPSC 通道。与简单的 VecDeque<T>
相比,它的主要优点是:
- 您不受 RAM 大小的限制,只受磁盘大小的限制。这意味着您可以在不发生 OOM 杀死的情况下存储千兆字节的数据。
- 即使程序崩溃,您的数据也是安全的。队列的所有状态都会在队列释放时写入磁盘。
- 您的数据可以 持久化,即可以通过程序的多轮执行存在。将其视为一种非常基础的数据库类型。
- 您可以在两个进程之间传递数据。
Yaque 是 异步的,并且直接构建在 mio
和 notify
之上。因此,它与您为应用程序使用的运行时完全无关。它可以与 tokio
、async-std
或您选择的任何其他执行器无缝工作。
示例用法
要创建一个新队列,只需使用 channel
函数,传递一个用于挂载队列的目录路径。如果创建时目录不存在,它(以及可能的所有父目录)将被创建。
use yaque::channel;
futures::executor::block_on(async {
let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
})
如果您需要,还可以使用 Sender::open
和 Receiver::open
只打开通道的一半。
用法类似于标准库中的 MPSC 通道,除了接收方法 Receiver::recv
是异步的。使用发送者写入队列基本上是无锁和原子的。
use yaque::{channel, queue::try_clear};
futures::executor::block_on(async {
// Open using the `channel` function or directly with the constructors.
let (mut sender, mut receiver) = channel("data/my-queue").unwrap();
// Send stuff with the sender...
sender.send(b"some data").await.unwrap();
// ... and receive it in the other side.
let data = receiver.recv().await.unwrap();
assert_eq!(&*data, b"some data");
// Call this to make the changes to the queue permanent.
// Not calling it will revert the state of the queue.
data.commit();
});
// After everything is said and done, you may delete the queue.
// Use `clear` for awaiting for the queue to be released.
try_clear("data/my-queue").unwrap();
返回值 data
是一种守卫,它实现了对底层类型的 Deref
和 DerefMut
。
queue::RecvGuard
和事务行为
需要注意的一个重要事项是,从队列中的读取是事务性的。函数Receiver::recv
返回一个queue::RecvGuard
,它充当一个死权门开关。如果它被丢弃,它将撤销出队操作,除非显式调用queue::RecvGuard::commit
。这确保了操作在恐慌和从错误中提前返回(例如使用?
表示法时)会回滚。然而,在回滚过程中,还需要执行一个文件系统操作。在丢弃过程中,这是以“尽力而为”的方式完成的:如果发生错误,它会被记录并忽略。这是因为在丢弃之外,错误无法传播,并且在丢弃中的恐慌可能会使程序被中止。如果你有任何从回滚中清理错误的行为,你可以调用queue::RecvGuard::rollback
,这将确实返回底层错误。
批量操作
您还可以使用yaque
队列发送和接收数据批量。保证与单个读取和写入相同,只是在发送项目时可以节省OS开销,因为只进行一次磁盘操作。有关接收者批量的更多信息,请参阅Sender::send_batch
、Receiver::recv_batch
和Receiver::recv_until
。
厌倦了等待.await
?支持超时
如果您需要您的应用程序在队列上没有内容时不会停滞,您可以使用Receiver::recv_timeout
和Receiver::recv_batch_timeout
来接收数据,等待提供的未来(例如延迟或通道)完成。以下是一个示例
use yaque::channel;
use std::time::Duration;
use futures_timer::Delay;
futures::executor::block_on(async {
let (mut sender, mut receiver) = channel("data/my-queue-2").unwrap();
// receive some data up to a second
let data = receiver
.recv_timeout(Delay::new(Duration::from_secs(1)))
.await
.unwrap();
// Nothing was sent, so no data...
assert!(data.is_none());
drop(data);
// ... but if you do send something...
sender.send(b"some data").await.unwrap();
// ... now you receive something:
let data = receiver
.recv_timeout(Delay::new(Duration::from_secs(1)))
.await
.unwrap();
assert_eq!(&*data.unwrap(), b"some data");
});
Ctrl+C
和其他意外事件
首先,“不要慌张”!向队列写入是一个原子操作。因此,除非你的操作系统真的出了问题,否则在大多数时候你不用担心数据损坏。
在发生恐慌(程序的不是程序员的)的情况下,队列将保证保存接收者最新的所有元数据。对于读者来说甚至更简单:一开始就没有什么需要保存的。唯一例外是,如果保存操作由于IO错误失败。请记住,程序不允许在恐慌期间进行恐慌。因此在这种情况下,yaque
不会尝试从错误中恢复。
不能这么说操作系统信号。操作系统信号不是由这个库自动处理的。理解到应用程序程序员最了解如何处理它们。如果您选择在Ctrl+C
或其他信号上关闭队列,那么您很幸运!保存队列的每一侧都是async-signal-safe,因此您可以直接使用例如signal_hook
(https://docs.rs/signal-hook/)设置裸信号钩子,如果您喜欢unsafe
代码。如果不这样做,有大量完全安全的替代方案。选择最适合您的那一个。
不幸的是,有时您会遇到中止
或被杀
的情况。这些信号无法被任何库处理。当这种情况发生时,并不是所有的东西都丢失了。我们提供了一个完整的模块recovery
,以帮助您进行自动队列恢复。请检查该模块以获取具体的函数名称。从架构角度来看,我们提供了两种不同的队列恢复方法,可能适用于不同的用例
- 使用重放(标准方法):我们可以在崩溃期间重建队列实际状态的下限,该下限由以下两个位置的最大值组成:
- 目录中最小段的底部。
- 元数据文件中指示的位置。
由于这是一个下限,一些元素可能需要重放。如果您的处理是幂等的,这不会成为问题,您不会丢失任何数据。
- 使用损失恢复:我们还可以重建队列实际状态的上限:队列中第二小段的底部。在这种情况下,最小段被简单地删除,接收器继续像什么都没有发生一样运行。如果重放是不可接受的,但某些数据损失是可以接受的,这可能是一个适合您的选择。您可以通过在
SenderBuilder
上配置此选项来限制段大小,从而限制数据损失。
如果您真的想更安全,可以使用Receiver::save
定期备份接收器状态。只需选择您喜欢的定时器实现,并设置一个每数百毫秒执行一次的简单周期性任务。但是,请注意,这只是一个缓解措施,而不是解决方案。
已知问题和下一步
这是一个全新的项目。尽管我已经测试过它,并且它肯定不会让您的计算机崩溃,但您还不能依赖它。此代码正在非关键应用程序的生产环境中运行。- 当队列足够小并且发送者频繁发送许多非原子的小消息时,会浪费过多的内核时间。您可以通过批量写入队列来缓解这个问题。
- 可能存在一些未知的错误隐藏在某些边缘情况中。如果您发现一个错误,请在GitHub上提交问题。我们也非常欢迎pull requests和贡献。
依赖关系
~2-13MB
~102K SLoC