#同步原语 #事件 #同步 #winapi #信号 #futex #sync

rsevents

用于线程信号和编写新的同步原语(如信号量)的手动和自动重置事件

5 个不稳定版本

0.3.1 2022 年 8 月 29 日
0.3.0 2022 年 8 月 25 日
0.2.1 2019 年 12 月 24 日
0.2.0 2018 年 9 月 13 日
0.1.1 2018 年 9 月 13 日

#216并发

Download history 1277/week @ 2024-03-14 1839/week @ 2024-03-21 3494/week @ 2024-03-28 3173/week @ 2024-04-04 2953/week @ 2024-04-11 2864/week @ 2024-04-18 3369/week @ 2024-04-25 1958/week @ 2024-05-02 2546/week @ 2024-05-09 2428/week @ 2024-05-16 4032/week @ 2024-05-23 4829/week @ 2024-05-30 3105/week @ 2024-06-06 3043/week @ 2024-06-13 3218/week @ 2024-06-20 5113/week @ 2024-06-27

15,047 每月下载量
用于 2 crates

MIT 许可证

57KB
545

rsevents

crates.io docs.rs

此 crate 包含了一个类似于 Microsoft Windows 中找到的自动和手动重置事件的跨平台实现,它是在核心停车库 crate 的基础上实现的,作为一个跨平台的 futex 抽象。

关于 rsevents

事件最好与可等待的布尔值进行比较,并且可以处于两种状态之一:设置和未设置。调用者可以直接等待事件本身,无需关联条件变量和互斥锁。根据事件的特定类型,事件也可以被视为多生产者、多消费者 Channel<()>(手动重置事件是相同通道的广播版本)的效率实现。

与 WIN32 事件一样,rsevents 有两种形式,AutoResetEventManualResetEvent,它们在设置(即发出信号)事件时的行为不同。一个 AutoResetEvent 一旦发出信号,将允许恰好一个(并且只有一个)等待同一事件的过去或未来调用者解除阻塞,而一个 ManualResetEvent 则没有这种细粒度控制,对于所有过去/未来等待者要么全部发出信号并解除阻塞,要么都不(直到其状态被手动/显式更改)。它们的使用差异很大,所以请确保您正在使用正确的事件类型来满足您的需求!

核心 rsevents crate 中的类型通常非常有用,可以用于实现其他同步原语。包含一些您可能发现有用的同步类型的 rsevents-extra crate 包括

  • 倒计时事件,用于在分配给各个线程的任务完成之前有效地等待 n 个挂起的任务。
  • 信号量,用于将并发访问某个部分或资源的线程数限制为 n

示例

以下代码示例展示了主线程将任务调度到一组已生成的线程池中,然后由第一个可用的线程接收。这演示了自动重置事件的独特属性(一次只通知一个线程,调用event.set()和调用event.wait()的线程之间的内存一致性,等待工作时的有效阻塞,以及限时等待)。对原始指针(用于SHARED线程消息变量)的非直观使用仅用于说明自动重置事件的安全性保证——您可以在RwLock或一个简单的包装类型(如果方便的话,可以公开一个更友好的API)中包装您的共享状态。

use std::time::Duration;
use rsevents::{Awaitable, AutoResetEvent, EventState};

#[derive(Clone, Copy, Debug)]
enum ThreadMessage {
    /// Used in lieu of wrapping `ThreadMessage` in an `Option`
    None,
    /// Hands off a value to a worker thread for processing
    Input(u32),
}

// Events are cheap: each one is only a single byte!
static TASK_READY: AutoResetEvent = AutoResetEvent::new(EventState::Unset);
static DISPATCHED: AutoResetEvent = AutoResetEvent::new(EventState::Unset);

pub fn main() {
    // The events above synchronize access to this !Sync, !Send shared state
    static mut SHARED: ThreadMessage = ThreadMessage::None;

    const THREAD_COUNT: usize = 3;
    let mut threads = Vec::with_capacity(THREAD_COUNT);
    for thread_idx in 0..THREAD_COUNT {
        let join_handle = std::thread::spawn(move || {
            loop {
                // Wait efficiently for the main thread to signal _one_ (and
                // only one) worker thread at a time.
                if !TASK_READY.wait_for(Duration::from_millis(500)) {
                    // When there's not enough work, let the thread pool drain
                    break;
                }

                // This is safe because our events guarantee that
                // * one thread will be accessing this variable at a time
                // * shared memory will be consistent betwixt a call to
                //   event.set() from one thread and a call to event.wait()
                //   from another.
                let work_msg = unsafe { *(&SHARED as *const ThreadMessage) };

                // Signal to the main thread that we've taken the value and that
                // it can overwrite `shared` at its leisure. Afterwards,
                // processing can take as long as it needs.
                DISPATCHED.set();

                match work_msg {
                    ThreadMessage::None =>
                        unreachable!("The AutoResetEvent guarantees against this"),
                    ThreadMessage::Input(value) =>
                        eprintln!("Thread {thread_idx} handling value {value}"),
                }
            }
        });
        threads.push(join_handle);
    }

    // Generate some "random" values at an interval, dispatching each exactly
    // once to exactly one worker thread.
    for value in [4, 8, 15, 16, 23, 42] {
        unsafe {
            // It's perfectly safe to access - and even write - to SHARED here
            // because our two events guarantee exclusive access (as
            // AutoResetEvents wake one thread at a time) and take care of
            // synchronizing the memory plus any cache coherence issues between
            // the writing thread (this one) and the reading worker thread.
            *(&mut SHARED as * mut _) = ThreadMessage::Input(value);
        }

        // Signal a currently idle or the next idle worker thread to handle this
        // value.
        TASK_READY.set();

        // Remember that work is usually dispatched faster than it can be
        // handled!
        // Wait for a worker thread to signal it has received the payload and we
        // can stomp the `SHARED` value and dispatch work to the next worker.
        DISPATCHED.wait();
    }

    // Wait for the thread pool to drain and exit
    for jh in threads {
        jh.join().expect("Worker thread panicked!");
    }
    eprintln!("All work completed - exiting!")
}

类型

结构体ManualResetEventAutoResetEvent都实现了Awaitable特质,该特质公开了一个API,允许无限期等待、等待零时间以及等待固定时间限制(Duration)的事件触发。在rsevents类型上构建自己的同步原语的依赖包应类似地实现Awaitable,以公开对对象等待的统一接口(并且应该重新导出Awaitable特质(或所有rsevents)以使用户不必单独将rsevents依赖添加到他们的Cargo.toml中)。

有关更多信息,请参阅文档

何时使用

一般来说,当涉及到保护关键部分和确保独占访问时,应始终首选互斥锁或条件变量,因为它们具有公认的同步范例和广泛的支持。然而,在其他时候,当需要一个不与显式关键部分或受保护数据耦合的同步原语时,在这种情况下,如果实际上只需要一个单一的替代同步原语,那么使用互斥锁和关键部分就没有意义。

事件有点像一种假设的多生产者、多消费者RwLock,它不拥有其保护的数据。自动重置事件(如AutoResetEvent)非常适合于信号,并且通常用于轻松构建其他同步原语,而不需要使用futex或支付一个或多个互斥锁的代价。

因此,事件提供了比标准库同步原语(如MutexRwLockCondVar)更多的自由,但在使用时必须更加小心——有一些例外。

手动重置事件(如ManualResetEvent)实际上非常容易且灵活,用于向所有线程广播信号(影响已等待和尚未等待的线程),并且对于在某个非线程安全条件(如全局中止指示器)上无限期或固定长度等待非常方便。

关于

rsevents 由 NeoSmart Technologies 的 Mahmoud Al-Qudsi 编写和维护,电子邮件为 [email protected],网址为 https://neosmart.net,并按照 MIT 公共许可证的条款向公众发布。

依赖项

~0.1–5MB