16个版本

使用旧Rust 2015

0.5.3 2016年4月12日
0.5.2 2015年11月10日
0.5.1 2015年10月22日
0.5.0 2015年7月28日

#461并发 类别中

Download history 289/week @ 2024-03-16 341/week @ 2024-03-23 310/week @ 2024-03-30 205/week @ 2024-04-06 290/week @ 2024-04-13 282/week @ 2024-04-20 287/week @ 2024-04-27 252/week @ 2024-05-04 267/week @ 2024-05-11 288/week @ 2024-05-18 290/week @ 2024-05-25 302/week @ 2024-06-01 154/week @ 2024-06-08 312/week @ 2024-06-15 265/week @ 2024-06-22 98/week @ 2024-06-29

879 每月下载量
14 个Crates中使用(通过 timebomb

Apache-2.0

105KB
585

Pulse

Build Status

想象一下,你正在构建一个用于在两个线程之间发送数据的复杂高性能通道。在某个时刻,你将需要想出一种方法来在队列上等待新数据变得可用。在 try_recv 上空转很糟糕,而且有些人希望他们的手机电池能使用超过30分钟。我们需要实现 recv

有几种方法可以做到这一点,你可以使用条件变量 + 锁。所以你锁定你的通道,检查是否有数据,然后在没有数据可读的情况下等待条件变量。如果你想在不需要锁的情况下做这件事,你可以使用信号量。只需在信号量上执行 acquire() 即可。如果它返回,你就知道有数据等待读取。作为额外的好处,它允许多个线程在同一个通道上等待。这很棒。

所以你都很高兴,你的基准测试都很不错。然后有人发帖询问是否有一种方法可以在两个通道上等待。嗯,这可不是个简单的问题。信号量不提供一次只能在一个信号量上执行 acquire() 的API。条件变量也不行:)

脉冲来救场!脉冲针对该问题的解决方案是一次性信号。信号表示某事是否发生。它有一个极其简单的状态转换图。它只能是挂起、就绪或已丢弃的状态,并且无法取消设置信号,因为其状态是粘性的。

state

在实践中,它看起来像这样

// Create a new signal and pulse, the pulse is the setting side of a signal
let (signal, pulse) = Signal::new();

thread::spawn(move || {
    // Do awesome things here, like uploading a cat picture to internet

    // Trigger a pulse now that we are done
    pulse.pulse();
});

// Wait for the pulse! :D
signal.wait().unwrap();

你现在可能想知道,“好吧,这如何帮助我处理多个通道?你有一个愚蠢的标志变量,你可以等待它。所以现在,我不再等待信号量,而是等待你的愚蠢信号。”

嗯,你还可以选择多个信号

let mut select = SelectMap::new();
let (signal, pulse) = Signal::new();
let join = thread::spawn(move || {
    // Do something slow
    pulse.pulse();
});
select.add(signal, join);

let (signal, pulse) = Signal::new();
let join = thread::spawn(move || {
    // Do something else slow
    pulse.pulse();
});
select.add(signal, join);

for (_, join) in select {
    join.join(); // \o/
}

你还没看到最精彩的。 SelectSelectMap 都需要等待,它们将如何阻塞...它们需要某种阻塞原语来在信号上等待...你看到了我在哪里吗?

这是Pulse设计真正的美。所有的阻塞原语都是可组合的。选择器只等待一个或多个信号。一旦任何一个信号准备就绪,它就会发出脉冲。这就像一个逻辑或门。就像逻辑或门一样,它们可以被链式连接。

or

如果你想等待所有信号都确认后再继续呢?你可以这样做:

for signal in signals {
    signal.wait();
}

但现在每次信号就绪时你都会被唤醒。这可能需要相当多的上下文切换。我们需要一个与门或者某种类型的屏障。

// A barrier of all the signals, waits until they are all ready
Barrier::new(&signals).wait();

and

当然,屏障也只是信号而已...你可以将它连接到选择器或另一个屏障。它们都是可组合的。

还有更棒的

这些大的信号链可以用来轻松构建自己的工作调度器。由于信号可以表示一个等待点,你可以将信号映射到继续。当信号确认就绪时,继续可以运行。有各种更改等待意义的方案,这使得Pulse对于编写自定义纤程/绿色线程/协程实现非常有用。:D

这一切中一定有丑陋的部分吧?

确实有,Pulse并不能在所有情况下神奇地使你的超快通道工作。知道何时需要唤醒事物仍然是一个难题。如果你安装了一个脉冲,而另一个线程在同时入队,你必须确保你没有让自己处于没有任何线程会触发脉冲的情况。

我的推荐策略是在安装脉冲之前和之后都进行双重检查条件。安装脉冲的线程可以取消自己的脉冲并触发它。但这里有很多未知因素。

你可以使用像atom这样的库,在不需要锁的情况下将脉冲存储在数据结构中。

为什么叫Pulse?

  1. 因为有人已经在crates.io上注册了Event
  2. 这是硬件中设置触发器的一种常见方式。你发送一个脉冲形式的信号,触发器被设置并输出高信号。这个信号可以连接到与门、或门等。

它是1.0.0兼容的吗?

是的。默认情况下,它只使用稳定的API。有对不兼容rust 1.0.0的回调实验性支持,但这被隐藏在功能标志后面。

依赖项

~0.7–1MB
~15K SLoC