7 个版本
0.0.7 | 2024 年 8 月 10 日 |
---|---|
0.0.6 | 2024 年 8 月 8 日 |
0.0.4 | 2024 年 7 月 25 日 |
#190 在 并发
每月 721 次下载
415KB
11K SLoC
这是 C++ 发送者/接收者的一个实现(针对某些实现值)。
有关如何使用此库的说明,请参阅rust 文档。
注意
此代码非常处于开发中,大部分功能尚未实现。API 也仍在变化中。
一个小示例
use senders_receivers::*;
let sender = Just::from((1, 2, 3, 4))
| Then::from(|(a, b, c, d)| (a * b * c * d,));
println!("outcome: {}", sender.sync_wait().expect("no error").expect("no cancelation").0);
这是做什么的
Just::from
:声明发送链的起始值。Then::from
:声明对这些值的转换。|
(管道符号):用于将它们绑定在一起。
在调用 sync_wait
之前,不会执行任何步骤。
发送者?接收者?
此代码的主要概念来自具有发送者和接收者。发送者是一个产生值信号(或错误信号,或完成信号)的类型。接收者是一个接受此信号的类型。
发送者是可组合的:它们可以组合在一起,创建一个新的发送者。这是通过 |
(管道)符号实现的。
如果您正在使用此库,除非您正在实现自己的扩展,否则您将看不到任何接收者。
信号
每个发送者产生一个 value signal
、一个 error signal
或一个 done signal。其中之一将被产生。
value signal
指示发送者成功完成并生成了一个值。error signal
指示发送者失败并生成了一个 [Error]。done signal
指示发送者取消了其工作(既没有产生值,也没有产生错误)。
调度器
每个操作将在一个 [Scheduler] 上运行。(有时会多于一个,例如如果你 [Transfer] 到不同的调度器。)
调度器封装了 CPU 时间概念。不同的调度器将具有不同的特性,与何时/何地运行有关。
目前,已实现了以下调度器
- [ImmediateScheduler] 立即运行每个任务。这几乎是默认调度器。
- ThreadPool 使用线程池运行任务。
与 C++ 的比较
试图保持与C++对应版本的可用性,但不得不做出一些牺牲。
Error
类型是类型擦除/动态的。
对于一个非动态错误类型,我们需要一个具有可变类型参数的变体类型。Value
类型是一个元组。
对于一个非元组类型,我们需要可变类型参数。Value
不是一个变体。
对于一个变体类型,我们需要一个具有可变类型参数的变体类型。此外,Rust闭包在首次使用时绑定它们的类型,向重载分发将会很困难。- 省略基于调度器的特殊化。
我认为对于这一点,我们需要特殊化。(尽管io::write 实现允许特殊化,使用一个可选的默认实现。) - 连接不能再失败。
在C++中,connect
调用可能会失败,这将导致错误被传播。这意味着当connect
调用在let_value
步骤中失败时,错误必须通过接收器传播。相同的接收器也将被传递给连接调用,使用移动语义。
在Rust中,这将导致借用检查器将其标记为错误,我相当喜欢借用检查器。相反,我决定:如果connect
失败,它将必须创建一个操作状态,该状态将传播错误。 - 操作状态不会嵌套。
大多数限制来自Rust对移动语义的非常挑剔(我同意)。此外,Rust不允许就地构造(这是嵌套操作状态所需的;C++设计需要就地构造)。相反,我们在连接调用期间将后续操作包装到接收器中。这有一个小的缺点,即我们无法在OperationState::start 步骤中进行任何准备。 - [LetValue] 参数通过引用传递(类似于C++),但不能获取所有权。
相反,参数被保留,并与LetValue函数返回的发送器链中的值组合。
一些牺牲源于我与C++设计的不同意见:我喜欢设计中关于调度器更改是显式的承诺。但在实践中,使用起来非常困难,我的代码一旦异步,就需要获取接收器调度器(因为发送器没有(已知)调度器的情况太常见了)。
done
和error
通道不再有相关的调度器。
没有方法可以正确地保证错误是在哪个调度器上传播的。value
通道现在始终有一个相关的调度器。
这允许例如非阻塞写入(例如aio_write
)挂起执行,并在之前运行的同一调度器上恢复。- 接收器不再有调度器。
由于值信号现在有一个调度器,我们不再需要它。(此外,C++接收器有一个可选的调度器,这使得针对它们进行编码变得非常繁琐。) - 调度器有一个LocalScheduler类型。
对于大多数调度器,那将是调度器本身。这允许调度器声明在另一个调度器上运行的后续代码。例如,令人尴尬的并行调度器 第一次在任何线程上调度,然后传播一个将在后续调度中始终使用同一线程的调度器。 - 错误/完成恢复操作必须与在值路径中要发送的相同的调度器类型和值类型一起完成。
这是我们将事物限制为单一类型并沿价值信号发送调度程序的结果。(注意,错误和完成信号不会传播调度程序;如果不使[LetValue]更复杂,这将是不可能的。)
error
通道不再与调度程序相关联的原因是,调度程序传输可能会失败,这将破坏错误调度程序的公理。
从done
和error
信号中删除调度程序,意味着调度程序切换只会发生在愉快路径(和恢复路径)上。
依赖项
~0.8–11MB
~62K SLoC