3 个版本
0.1.2 | 2022 年 11 月 7 日 |
---|---|
0.1.1 | 2022 年 11 月 6 日 |
0.1.0 | 2022 年 11 月 5 日 |
#528 in 并发
8KB
89 行
CSPLib
通信顺序进程 (CSP)
背景
通信顺序进程 (CSP) 是一种通过通道传递消息来编写并发应用程序的方法。它在 Go 的通道中用于线程间的通信。
计算图
深度学习的构建块是计算图,它可以实现为 CSP。
逻辑电路
逻辑电路也可以实现为 CSP。
库设计
在 CSP 的教科书中,作者应该阻塞直到读取器消耗传递的值,以在通道中保持只有一个值,这可能是数学分析的一个好属性。然而,当写入器在通道中放置一个值时不存在任何读取器的情况并不是实际案例。此外,仅允许一个读取器也限制了使用场景。
因此,在这个库中,写入器永远不会被读取器阻塞,并允许基于假设读取器在写入器将值放置在通道上时已准备就绪的多个读取器(SPMC)。如果不存在任何读取器,则写入器失败。
示例
flowchart LR
Main -->|1| ch1
ch1 -->|1| P1(x+2)
ch1 -->|1| P2(x*2)
P1 -->|3| ch2
P2 -->|2| ch3
ch2 -->|3| P4(x*y)
ch3 -->|2| P4
P4 -->|6| ch4
ch4 -->|6| Main
等价于
#[csplib::process]
struct P1 {
#[input]
a: i32,
#[output]
b: i32,
}
// λx. x+2
async fn run_p1(inner: P1Inner) -> Result<()> {
let x = inner.a.reader().get().await?;
inner.b.put(x + 2)?;
Ok(())
}
#[csplib::process]
struct P2 {
#[input]
a: i32,
#[output]
b: i32,
}
// λx. x*2
async fn run_p2(inner: P2Inner) -> Result<()> {
let x = inner.a.reader().get().await?;
// Emulating expensive I/O
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
inner.b.put(x * 2)?;
Ok(())
}
#[csplib::process]
struct P3 {
#[input]
a: i32,
#[input]
b: i32,
#[output]
c: i32,
}
// λxy. x*y
async fn run_p3(inner: P3Inner) -> Result<()> {
let x = inner.a.reader().get().await?;
let y = inner.b.reader().get().await?;
inner.c.put(x * y)?;
Ok(())
}
let (main_w, main_r) = channel();
let (p1, p1_inner) = P1::new();
let (p2, p2_inner) = P2::new();
let (p3, p3_inner) = P3::new();
tokio::spawn(run_p1(p1_inner));
tokio::spawn(run_p2(p2_inner));
tokio::spawn(run_p3(p3_inner));
tokio::spawn(connect(main_r.reader(), p1.a));
tokio::spawn(connect(main_r.reader(), p2.a));
tokio::spawn(connect(p1.b.reader(), p3.a));
tokio::spawn(connect(p2.b.reader(), p3.b));
// Wait for all spawnings.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
main_w.put(1).unwrap();
let ans = p3.c.reader().get().await.unwrap();
assert_eq!(ans, 6);
依赖关系
~1.5–2.2MB
~44K SLoC