#slot #signal #thread-safe #connection #callback

signals2

受boost::signals2启发的线程安全信号/槽库

7个版本

0.3.3 2023年4月28日
0.3.2 2022年1月18日
0.3.1 2021年8月1日
0.2.1 2021年7月31日
0.1.0 2021年7月27日

#150并发

Download history 90/week @ 2024-03-13 42/week @ 2024-03-20 46/week @ 2024-03-27 62/week @ 2024-04-03 35/week @ 2024-04-10 31/week @ 2024-04-17 22/week @ 2024-04-24 6/week @ 2024-05-01 9/week @ 2024-05-08 11/week @ 2024-05-15 20/week @ 2024-05-22 46/week @ 2024-05-29 21/week @ 2024-06-05 30/week @ 2024-06-12 10/week @ 2024-06-19 31/week @ 2024-06-26

104 每月下载量
用于 wolfengine

BSL-1.0 许可证

62KB
730

关于

signals2 是一个受 boost::signals2 C++ 库启发的线程安全信号/槽库。信号是包含要执行时调用的回调函数列表("槽")的对象。信号及其对应的槽可以通过连接和共享连接块来管理。

signals2 不包含不安全代码,并且可以在稳定的 Rust 1.53 上编译。

signals2Boost 软件许可证,版本 1.0 下分发。

基本用法

这些基本使用模式应该涵盖了大多数使用场景。

创建信号并连接槽

以下示例创建了一个具有无参数且无返回值的槽函数的信号。给 Signal 模板的 () 参数指示槽函数将接受的参数类型。在这种情况下,槽函数不接受任何参数。

use signals2::*; 

let sig: Signal<()> = Signal::new();
sig.connect(|| println!("Hello, world!"));
sig.emit(); // prints "Hello, world!"

作为一个更复杂的例子,我们创建了一个接受两个参数的槽的信号。

let sig: Signal<(i32, i32)> = Signal::new();
sig.connect(|x, y| println!("x + y = {}", x + y));
sig.emit(2, 3); // prints "x + y = 5"

在创建仅接受一个参数的槽的信号时必须特别小心。单个参数类型必须仍然在信号类型签名中表现为一个元组。Rust 编译器不将 (T) 识别为一元组类型的元组,而是简单地识别为类型 T。必须使用逗号来强制 Rust 编译器将其识别为元组,例如 (T,)

let sig: Signal<(i32,)> = Signal::new(); // Note that using Signal<(i32)> or Signal<i32> will not compile!
sig.connect(|x| println!("x = {}", x));
sig.emit(7); // prints "x = 7"

槽可以返回值,调用 emit 的槽的返回值由信号的组合器确定。默认情况下,emit 将简单地返回一个表示最后执行的槽返回值的 Option<R>。如果没有槽被执行,将返回 None

let sig: Signal<(i32, i32), i32> = Signal::new();
assert_eq!(sig.emit(2, 3), None); // no slots have been added yet
sig.connect(|x, y| x + y);
assert_eq!(sig.emit(2, 3), Some(5));

复制一个信号将导致产生一个新的信号,该信号“指向”与原始信号相同的底层槽集合。对复制信号的任何修改都将影响原始信号,反之亦然。

let sig1: Signal<()> = Signal::new();
let sig2 = sig1.clone();
sig1.connect(|| println!("Hello, world!")); // connect to the first signal
sig2.emit(); // prints "Hello, world!" because sig1 and sig2 share the same set of slots.

使用连接来管理槽

连接用于管理槽的生命周期。当一个新的槽连接到一个信号时,会创建一个相应的连接。连接管理特定信号的槽的生命周期。可以使用连接将槽从其信号断开。

let sig: Signal<()> = Signal::new();
let conn = sig.connect(|| println!("Hello, world!"));
conn.disconnect(); // disconnect the slot
sig.emit(); // prints nothing, the slot has been disconnected

单个槽可能具有任意数量的连接。可以通过在现有连接上调用 Clone 来创建槽的更多连接。请注意,复制连接会创建一个指向相同槽的新连接。断开一个槽的连接将断开该槽的所有其他连接。一旦槽被断开,它将永久地从信号中删除,无法重新连接。

let sig: Signal<()> = Signal::new();
let conn1 = sig.connect(|| println!("Hello, world!"));
let conn2 = conn1.clone();

conn2.disconnect();
assert!(conn1.connected() == false); // disconnecting conn2 has also disconnected conn1
assert!(conn2.connected() == false);

sig.emit(); // prints nothing, the slot has been disconnected.

连接在它们被丢弃时不会自动断开。如果您希望连接在丢弃时自动断开,请使用作用域连接。可以通过在它们上调用 scoped() 将常规连接转换为作用域连接。

let sig: Signal<()> = Signal::new();
{
    let _conn = sig.connect(|| println!("Hello, world!")).scoped(); // create a scoped connection
    sig.emit(); // prints "Hello, world!"
} // _conn is dropped and disconnects

sig.emit(); // prints nothing, the slot has been disconnected.

弱信号

槽函数可能需要访问其自己的信号,例如在槽希望递归地发出其自己的信号或连接新的槽到信号的情况下。为了实现这一点,可能的第一反应是克隆信号,然后将克隆的信号移动到闭包中,然后将其连接到原始信号,如下面的示例所示。这将导致内存泄漏。

let sig: Signal<()> = Signal::new();
let sig_clone = sig.clone();
sig.connect(move || { // memory leak!
    sig_clone.connect(|| println!("Hello, world!"));
});

sig.emit(); // prints nothing (see the "Concurrency" section for why nothing is printed here)
sig.emit(); // prints "Hello, world!" once
sig.emit(); // prints "Hello, world!" twice
// etc...

信号保留对其槽的所有权,因此槽不能同时拥有其自己的信号。可以使用弱信号来打破这种循环依赖。

let sig: Signal<()> = Signal::new();
let weak_sig = sig.weak();
sig.connect(move || { // no memory leak!
    weak_sig.upgrade().unwrap().connect(|| println!("Hello, world!"));
});

sig.emit(); // prints nothing
sig.emit(); // prints "Hello, world!" once
sig.emit(); // prints "Hello, world!" twice
// etc...

高级用法

较少见的用法模式。

共享连接块

用户可能希望暂时阻止槽执行,而不永久断开槽。可以使用共享连接块来实现这一点。任何特定槽都可以有任意数量的共享连接块。如果有任何共享连接块正在阻止槽执行,则当信号发出时,该槽将不会执行。

let sig: Signal<(), i32> = Signal::new();
let conn = sig.connect(|| 4);
assert_eq!(sig.emit(), Some(4));

let blocker1 = conn.shared_block(true); // blocking
let blocker2 = blocker1.clone(); // also blocking, since blocker1 is blocking

assert_eq!(conn.blocker_count(), 2);
assert_eq!(sig.emit(), None); // slot is blocked and will not execute 

blocker1.unblock();
assert_eq!(conn.blocker_count(), 1);
assert_eq!(sig.emit(), None); // slot is still blocked

blocker2.unblock();
assert_eq!(conn.blocker_count(), 0);
assert_eq!(sig.emit(), Some(4)); // no more active blockers

共享连接块在丢弃时会自动解除阻塞。

let sig: Signal<(), i32> = Signal::new();
let conn = sig.connect(|| 4);
assert_eq!(sig.emit(), Some(4));
{
    let _blocker = conn.shared_block(true);
    assert_eq!(conn.blocker_count(), 1);
    assert_eq!(sig.emit(), None);
}

assert_eq!(conn.blocker_count(), 0);
assert_eq!(sig.emit(), Some(4)); // blocker was dropped

使用 ConnectHandlesEmitHandles 限制对信号的访问

可能存在一些情况,在这些情况下,虽然需要连接新的槽或发出信号,但仍然不允许公开访问信号。例如,考虑一个具有公共信号成员的结构体的库。期望的编程模式可能是库的用户将槽连接到该结构体的信号,同时结构体定期发出自己的信号。但是,如果信号是公共成员,这将使用户能够完全访问信号API,包括发出信号的能力。如果结构体本身应该是唯一有权发出信号的对象,这将是一个问题。同样的问题也可能发生:一个结构体具有公共信号,库用户应该能够发出信号,但不能连接到或从信号断开槽。

这两个问题的解决方案是 ConnectHandlesEmitHandles。一个 ConnectHandle 是一个对象,它允许向信号添加新的槽,同时不允许发出信号。同样,一个 EmitHandle 是一个对象,它允许发出信号,而不允许连接或断开槽。以下是一个使用 ConnectHandle 的示例。

let sig: Signal<(), i32> = Signal::new();
let connect_handle = sig.get_connect_handle();
let conn = connect_handle.connect(|| 1);
assert!(conn.connected());
assert_eq!(sig.emit(), Some(1));

std::mem::drop(sig);
let conn = connect_handle.connect(|| 2);
assert!(!conn.connected());

使用 EmitHandle 的示例。

let sig: Signal<(), i32> = Signal::new();
let emit_handle = sig.get_emit_handle();
sig.connect(|| 1);
assert_eq!(emit_handle.emit(), Some(Some(1)));

std::mem::drop(sig);
assert_eq!(emit_handle.emit(), None);

自定义组合器

在信号上调用emit的返回值由信号的组合器类型决定。默认组合器简单返回一个表示最后一个要执行的槽的返回值的Option(如果没有槽被执行,则为None)。可以通过在某个类型上实现Combiner<R>特质来创建自定义组合器。注意,消耗传递给Combiner特质的combine()函数的迭代器会导致槽执行。每次在迭代器上调用next函数时都会执行一个槽。

#[derive(Default)]
struct ProductCombiner {}

impl Combiner<i32> for ProductCombiner {
    type Output = i32;

    /// ProductCombiner computes the product of all values returned by the slots
    fn combine(&self, iter: impl Iterator<Item=i32>) -> Self::Output {
        iter.product()
    }
}

let sig: Signal<(), i32, ProductCombiner> = Signal::new();

sig.connect(|| 2);
sig.connect(|| 3);
sig.connect(|| 4);

assert_eq!(sig.emit(), 24);

使用组和位置控制槽的执行顺序

内部,信号按组存储其槽。槽组是有序的,优先级较高的组先执行。默认情况下存在两个“未命名”的槽组。这些组被称为“前组”和“后组”。“前组”中的槽将始终在所有其他槽组执行之前执行。“后组”中的槽将始终在所有其他槽组执行之后执行。可以创建任意数量的“命名”组,它们将根据它们的顺序执行。命名组将始终在“前组”之后和“后组”之前执行。

此外,当槽连接到一个组时,它可以连接到组的FrontBack。当一个槽连接到组的开头时,它保证在连接时组中存在的所有其他槽之前执行。当一个槽连接到组的末尾时,它保证在连接时组中存在的所有其他槽之后执行。

以下示例显示了如何使用GroupPosition枚举来控制槽的执行顺序。

let sig: Signal<()> = Signal::new();

sig.connect_group_position(|| println!("World!"), Group::Front, Position::Front);
sig.connect_group_position(|| print!("Hello,"), Group::Front, Position::Front);

sig.emit(); // prints "Hello, world!"

使用命名组的更复杂示例。

let sig: Signal<()> = Signal::new();

sig.connect_group_position(|| print!("lazy "), Group::Back, Position::Back);
sig.connect_group_position(|| print!("brown "), Group::Named(5), Position::Back);
sig.connect_group_position(|| print!("over "), Group::Named(7), Position::Back);
sig.connect_group_position(|| print!("the "), Group::Front, Position::Back);
sig.connect_group_position(|| print!("jumps "), Group::Named(7), Position::Front);
sig.connect_group_position(|| println!("dog"), Group::Back, Position::Back);
sig.connect_group_position(|| print!("the "), Group::Named(7), Position::Back);
sig.connect_group_position(|| print!("quick "), Group::Named(-8), Position::Back);
sig.connect_group_position(|| print!("fox "), Group::Named(5), Position::Back);

sig.emit(); // prints "the quick brown fox jumps over the lazy dog"

命名组根据为它们的底层类型实现的Ord特质进行排序。默认情况下,它们的底层类型是i32。可以通过在信号的类型中指定类型来将其更改为任何其他实现了Ord特质的其他类型。例如,命名组可以由字符串标识并按字典顺序排序。

use signals2::*;
use combiner::DefaultCombiner;

let sig: Signal<(), (), DefaultCombiner, String> = Signal::new();

基本用法部分中的示例仅使用 connect 函数,而不是 connect_position_group 函数。使用 connect(f) 与使用 connect_position_group(f, Group::Back, Position::Back) 相同。还有一个 connect_group(f, group) 函数,它只允许指定一个组。它等同于 connect_position_group(f, group, Position::Back)。同样,还有一个 connect_position(f, position) 函数,它只允许指定一个位置。它与 connect_position_group(f, Group::Back, position) 相等。

扩展槽位

在基本用法部分,我们看到了槽位如何使用弱信号来保持对其信号持久引用。这在槽位函数可能需要递归地发出自己的信号或连接新槽位到信号的情况下很有用。然而,如果一个槽位需要能够断开/阻止自己,怎么办?断开信号需要连接。槽位如何访问自己的连接?以下尝试将会失败。

let sig: Signal<(i32,)> = Signal::new();
let weak_sig = sig.weak();

let conn = sig.connect(move |x| {
    if x == 5 {
        println!("Slot recursively called itself 5 times! Disconnecting now.");
        conn.disconnect(); // compiler error
    }

    weak_sig.upgrade().unwrap().emit(x + 1);
});

sig.emit(0);

问题在于槽位函数无法捕获自己的连接,因为其连接是在实际连接到信号之后才创建的。可以通过使用“扩展槽位”来解决这个问题。扩展槽位与常规槽位类似,只是它们的函数多了一个参数:一个连接。

let sig: Signal<(i32,)> = Signal::new();
let weak_sig = sig.weak();

sig.connect_extended(move |conn, x| { // the connection is passed in as the first parameter to the slot
    if x == 5 {
        println!("Slot recursively called itself 5 times! Disconnecting now.");
        conn.disconnect();
    }

    weak_sig.upgrade().unwrap().emit(x + 1);
});

sig.emit(0);

还有相应的 connect_position_extendedconnect_group_extendedconnect_position_group_extended 函数。

并发

信号是线程安全的,可以在线程之间共享(前提是槽位函数、组合器和组类型是线程安全的)。一个信号可以被并发发出(即两个或更多线程可以同时发出信号)。一个信号可以在发出时连接新的槽位。这两种情况都不会导致死锁。信号的内部 RWLock 不会因使用信号的不同线程数量或递归发出的次数而导致死锁。然而,在修改正在发出的信号时有一些微妙之处。

在信号发出过程中修改信号是可能的(并且是安全的)。问题是:在信号发出时对信号所做的修改是否对当前正在发出的槽位“可见”?答案是:这取决于。在信号发出时阻塞/断开槽位将是可见的。新阻塞/断开的槽位将不会执行(假设槽位尚未开始执行)。然而,在信号发出时连接新的槽位到信号或更改其组合器都是对当前正在发出的槽位不可见的更改。考虑以下示例。

let sig: Signal<()> = Signal::new();
let weak_sig = sig.weak();
sig.connect(move || {
    weak_sig.upgrade().unwrap().connect(|| println!("Hello, world!"));
});

sig.emit(); // prints nothing
sig.emit(); // prints "Hello, world!" once
sig.emit(); // prints "Hello, world!" twice
// etc...

在这个例子中,即使当信号正在发射时连接了一个新的插槽,第一次调用emit也不会打印任何内容。新连接的插槽将不会在下次信号发射之前执行。

无运行时依赖