1 个不稳定版本

0.1.2 2022年11月7日

#19 in #csp


csplib中使用

MIT许可协议

7KB
219

CSPLib

Crates.io documentation

通信顺序进程(CSP)

背景

通信顺序进程(CSP)是一种通过通道进行消息传递来编写并发应用程序的方法。它在实际中用于Go语言的通道中线程间的通信。

1_sMCQYHFh16sEPzNs1Dq1GA

计算图

深度学习的构建块是计算图,它可以作为CSP实现。

54-1

逻辑电路

逻辑电路也可以作为CSP实现。

Logic-Circuit-in-CircuiTikZ-IEEE-style

库设计

在CSP的教科书中,作者应该在读者消耗传递的值之前阻塞,以保持通道中只有一个值,我认为这是数学分析的好特性。然而,当写者将值放入通道时不存在任何读者的情况并不实用。此外,仅允许一个读者也限制了使用场景。

因此,在这个库中,写者永远不会因为读者而阻塞,并且基于读者在写者将值放在通道上时已准备好的假设,允许多个读者(SPMC)。如果不存在任何读者,则写者失败。

示例

flowchart LR
  Main -->|1| ch1
  ch1 -->|1| P1(x+2)
  ch1 -->|1| P2(x*2)
  P1 -->|3| ch2
  P2 -->|2| ch3
  ch2 -->|3| P4(x*y)
  ch3 -->|2| P4
  P4 -->|6| ch4
  ch4 -->|6| Main

等同于

#[csplib::process]
struct P1 {
    #[input]
    a: i32,
    #[output]
    b: i32,
}
// λx. x+2
async fn run_p1(inner: P1Inner) -> Result<()> {
    let x = inner.a.reader().get().await?;
    inner.b.put(x + 2)?;
    Ok(())
}
#[csplib::process]
struct P2 {
    #[input]
    a: i32,
    #[output]
    b: i32,
}
// λx. x*2
async fn run_p2(inner: P2Inner) -> Result<()> {
    let x = inner.a.reader().get().await?;
    // Emulating expensive I/O
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
    inner.b.put(x * 2)?;
    Ok(())
}
#[csplib::process]
struct P3 {
    #[input]
    a: i32,
    #[input]
    b: i32,
    #[output]
    c: i32,
}
// λxy. x*y
async fn run_p3(inner: P3Inner) -> Result<()> {
    let x = inner.a.reader().get().await?;
    let y = inner.b.reader().get().await?;
    inner.c.put(x * y)?;
    Ok(())
}

let (main_w, main_r) = channel();
let (p1, p1_inner) = P1::new();
let (p2, p2_inner) = P2::new();
let (p3, p3_inner) = P3::new();

tokio::spawn(run_p1(p1_inner));
tokio::spawn(run_p2(p2_inner));
tokio::spawn(run_p3(p3_inner));

tokio::spawn(connect(main_r.reader(), p1.a));
tokio::spawn(connect(main_r.reader(), p2.a));
tokio::spawn(connect(p1.b.reader(), p3.a));
tokio::spawn(connect(p2.b.reader(), p3.b));

// Wait for all spawnings.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

main_w.put(1).unwrap();
let ans = p3.c.reader().get().await.unwrap();
assert_eq!(ans, 6);

依赖项

~2MB
~43K SLoC