5 个不稳定版本
0.3.1 | 2022 年 8 月 29 日 |
---|---|
0.3.0 |
|
0.2.1 | 2019 年 12 月 24 日 |
0.2.0 | 2018 年 9 月 13 日 |
0.1.1 | 2018 年 9 月 13 日 |
#216 在 并发 中
15,047 每月下载量
用于 2 crates
57KB
545 行
rsevents
此 crate 包含了一个类似于 Microsoft Windows 中找到的自动和手动重置事件的跨平台实现,它是在核心停车库 crate 的基础上实现的,作为一个跨平台的 futex 抽象。
关于 rsevents
事件最好与可等待的布尔值进行比较,并且可以处于两种状态之一:设置和未设置。调用者可以直接等待事件本身,无需关联条件变量和互斥锁。根据事件的特定类型,事件也可以被视为多生产者、多消费者 Channel<()>
(手动重置事件是相同通道的广播版本)的效率实现。
与 WIN32 事件一样,rsevents 有两种形式,AutoResetEvent
和 ManualResetEvent
,它们在设置(即发出信号)事件时的行为不同。一个 AutoResetEvent
一旦发出信号,将允许恰好一个(并且只有一个)等待同一事件的过去或未来调用者解除阻塞,而一个 ManualResetEvent
则没有这种细粒度控制,对于所有过去/未来等待者要么全部发出信号并解除阻塞,要么都不(直到其状态被手动/显式更改)。它们的使用差异很大,所以请确保您正在使用正确的事件类型来满足您的需求!
核心 rsevents
crate 中的类型通常非常有用,可以用于实现其他同步原语。包含一些您可能发现有用的同步类型的 rsevents-extra
crate 包括
- 倒计时事件,用于在分配给各个线程的任务完成之前有效地等待 n 个挂起的任务。
- 信号量,用于将并发访问某个部分或资源的线程数限制为 n。
示例
以下代码示例展示了主线程将任务调度到一组已生成的线程池中,然后由第一个可用的线程接收。这演示了自动重置事件的独特属性(一次只通知一个线程,调用event.set()
和调用event.wait()
的线程之间的内存一致性,等待工作时的有效阻塞,以及限时等待)。对原始指针(用于SHARED
线程消息变量)的非直观使用仅用于说明自动重置事件的安全性保证——您可以在RwLock
或一个简单的包装类型(如果方便的话,可以公开一个更友好的API)中包装您的共享状态。
use std::time::Duration;
use rsevents::{Awaitable, AutoResetEvent, EventState};
#[derive(Clone, Copy, Debug)]
enum ThreadMessage {
/// Used in lieu of wrapping `ThreadMessage` in an `Option`
None,
/// Hands off a value to a worker thread for processing
Input(u32),
}
// Events are cheap: each one is only a single byte!
static TASK_READY: AutoResetEvent = AutoResetEvent::new(EventState::Unset);
static DISPATCHED: AutoResetEvent = AutoResetEvent::new(EventState::Unset);
pub fn main() {
// The events above synchronize access to this !Sync, !Send shared state
static mut SHARED: ThreadMessage = ThreadMessage::None;
const THREAD_COUNT: usize = 3;
let mut threads = Vec::with_capacity(THREAD_COUNT);
for thread_idx in 0..THREAD_COUNT {
let join_handle = std::thread::spawn(move || {
loop {
// Wait efficiently for the main thread to signal _one_ (and
// only one) worker thread at a time.
if !TASK_READY.wait_for(Duration::from_millis(500)) {
// When there's not enough work, let the thread pool drain
break;
}
// This is safe because our events guarantee that
// * one thread will be accessing this variable at a time
// * shared memory will be consistent betwixt a call to
// event.set() from one thread and a call to event.wait()
// from another.
let work_msg = unsafe { *(&SHARED as *const ThreadMessage) };
// Signal to the main thread that we've taken the value and that
// it can overwrite `shared` at its leisure. Afterwards,
// processing can take as long as it needs.
DISPATCHED.set();
match work_msg {
ThreadMessage::None =>
unreachable!("The AutoResetEvent guarantees against this"),
ThreadMessage::Input(value) =>
eprintln!("Thread {thread_idx} handling value {value}"),
}
}
});
threads.push(join_handle);
}
// Generate some "random" values at an interval, dispatching each exactly
// once to exactly one worker thread.
for value in [4, 8, 15, 16, 23, 42] {
unsafe {
// It's perfectly safe to access - and even write - to SHARED here
// because our two events guarantee exclusive access (as
// AutoResetEvents wake one thread at a time) and take care of
// synchronizing the memory plus any cache coherence issues between
// the writing thread (this one) and the reading worker thread.
*(&mut SHARED as * mut _) = ThreadMessage::Input(value);
}
// Signal a currently idle or the next idle worker thread to handle this
// value.
TASK_READY.set();
// Remember that work is usually dispatched faster than it can be
// handled!
// Wait for a worker thread to signal it has received the payload and we
// can stomp the `SHARED` value and dispatch work to the next worker.
DISPATCHED.wait();
}
// Wait for the thread pool to drain and exit
for jh in threads {
jh.join().expect("Worker thread panicked!");
}
eprintln!("All work completed - exiting!")
}
类型
结构体ManualResetEvent
和AutoResetEvent
都实现了Awaitable
特质,该特质公开了一个API,允许无限期等待、等待零时间以及等待固定时间限制(Duration
)的事件触发。在rsevents
类型上构建自己的同步原语的依赖包应类似地实现Awaitable
,以公开对对象等待的统一接口(并且应该重新导出Awaitable
特质(或所有rsevents
)以使用户不必单独将rsevents
依赖添加到他们的Cargo.toml
中)。
有关更多信息,请参阅文档。
何时使用
一般来说,当涉及到保护关键部分和确保独占访问时,应始终首选互斥锁或条件变量,因为它们具有公认的同步范例和广泛的支持。然而,在其他时候,当需要一个不与显式关键部分或受保护数据耦合的同步原语时,在这种情况下,如果实际上只需要一个单一的替代同步原语,那么使用互斥锁和关键部分就没有意义。
事件有点像一种假设的多生产者、多消费者RwLock
,它不拥有其保护的数据。自动重置事件(如AutoResetEvent
)非常适合于信号,并且通常用于轻松构建其他同步原语,而不需要使用futex或支付一个或多个互斥锁的代价。
因此,事件提供了比标准库同步原语(如Mutex
、RwLock
或CondVar
)更多的自由,但在使用时必须更加小心——有一些例外。
手动重置事件(如ManualResetEvent
)实际上非常容易且灵活,用于向所有线程广播信号(影响已等待和尚未等待的线程),并且对于在某个非线程安全条件(如全局中止指示器)上无限期或固定长度等待非常方便。
关于
rsevents 由 NeoSmart Technologies 的 Mahmoud Al-Qudsi 编写和维护,电子邮件为 [email protected],网址为 https://neosmart.net,并按照 MIT 公共许可证的条款向公众发布。
依赖项
~0.1–5MB