1 个不稳定版本
0.1.2 | 2022年11月7日 |
---|
#19 in #csp
在csplib中使用
7KB
219 行
CSPLib
通信顺序进程(CSP)
背景
通信顺序进程(CSP)是一种通过通道进行消息传递来编写并发应用程序的方法。它在实际中用于Go语言的通道中线程间的通信。
计算图
深度学习的构建块是计算图,它可以作为CSP实现。
逻辑电路
逻辑电路也可以作为CSP实现。
库设计
在CSP的教科书中,作者应该在读者消耗传递的值之前阻塞,以保持通道中只有一个值,我认为这是数学分析的好特性。然而,当写者将值放入通道时不存在任何读者的情况并不实用。此外,仅允许一个读者也限制了使用场景。
因此,在这个库中,写者永远不会因为读者而阻塞,并且基于读者在写者将值放在通道上时已准备好的假设,允许多个读者(SPMC)。如果不存在任何读者,则写者失败。
示例
flowchart LR
Main -->|1| ch1
ch1 -->|1| P1(x+2)
ch1 -->|1| P2(x*2)
P1 -->|3| ch2
P2 -->|2| ch3
ch2 -->|3| P4(x*y)
ch3 -->|2| P4
P4 -->|6| ch4
ch4 -->|6| Main
等同于
#[csplib::process]
struct P1 {
#[input]
a: i32,
#[output]
b: i32,
}
// λx. x+2
async fn run_p1(inner: P1Inner) -> Result<()> {
let x = inner.a.reader().get().await?;
inner.b.put(x + 2)?;
Ok(())
}
#[csplib::process]
struct P2 {
#[input]
a: i32,
#[output]
b: i32,
}
// λx. x*2
async fn run_p2(inner: P2Inner) -> Result<()> {
let x = inner.a.reader().get().await?;
// Emulating expensive I/O
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
inner.b.put(x * 2)?;
Ok(())
}
#[csplib::process]
struct P3 {
#[input]
a: i32,
#[input]
b: i32,
#[output]
c: i32,
}
// λxy. x*y
async fn run_p3(inner: P3Inner) -> Result<()> {
let x = inner.a.reader().get().await?;
let y = inner.b.reader().get().await?;
inner.c.put(x * y)?;
Ok(())
}
let (main_w, main_r) = channel();
let (p1, p1_inner) = P1::new();
let (p2, p2_inner) = P2::new();
let (p3, p3_inner) = P3::new();
tokio::spawn(run_p1(p1_inner));
tokio::spawn(run_p2(p2_inner));
tokio::spawn(run_p3(p3_inner));
tokio::spawn(connect(main_r.reader(), p1.a));
tokio::spawn(connect(main_r.reader(), p2.a));
tokio::spawn(connect(p1.b.reader(), p3.a));
tokio::spawn(connect(p2.b.reader(), p3.b));
// Wait for all spawnings.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
main_w.put(1).unwrap();
let ans = p3.c.reader().get().await.unwrap();
assert_eq!(ans, 6);
依赖项
~2MB
~43K SLoC