#channel #process #session #state-transition #framework #api #reactive

nightly apis

反应式、会话导向、异步进程演算框架

17 个版本

0.5.7 2024年7月30日
0.5.6 2023年11月26日
0.5.4 2023年1月8日
0.5.0 2022年7月9日
0.4.0 2018年5月15日

#67异步

Download history 129/week @ 2024-07-02 62/week @ 2024-07-09 160/week @ 2024-07-30

每月下载量 351 次

MPL-2.0 许可证

345KB
3K SLoC

Apis

反应式、会话导向、异步进程演算框架

文档

此 crate 提供了用于定义反应式线程(在“进程演算”中的“进程”)的“会话”的特性和宏接口,通过固定拓扑的“通道”传递消息。它还提供了一个宏,用于将“程序”定义为会话节点上的状态转换系统,称为“模式”,在该模式中,线程本地状态可以从一个模式的过程传递到下一个过程。

当前功能

  • 在会话内验证进程和通道的连接性和一致性
  • 异步发送和无界通道
  • 按连接拓扑分类的三种通道
    • 'Simplex' — 单向 SPSC 流
    • 'Sink' — 标准的 MPSC 通道
    • 'Source' — SPMC 单播通道
  • 四种具有不同接收和更新行为的进程,大致分为两类,由接收机制划分
    • "异步" — 对单个端点的阻塞等待接收('异步')
    • "同步" — 在任意数量的端点上轮询循环,具有各种定时方案('等时', '中时', '非等时')
  • 使用 log 日志 API 记录事件
  • 输出会话数据流图和程序状态转换图的 Graphviz DOT 文件

当前限制

  • 进程和通道定义是特定会话定义的局部定义;它们不能被重用。部分缓解此限制的一种方法是创建一个上下文类型,封装预期的“角色”,并在多个进程中重用它。
  • 在会话之间传递状态采用 continuation-passing style 实现,Rust 编译器无法优化这种情况下的尾递归;注意这仅在状态显式在会话之间传递时发生;请注意,这仅在显式在会话之间传递状态时发生,否则不受影响的线程将重新连接到主线程,不会发生尾递归。

用法

该库的功能是通过两个顶级宏定义实现的,分别是 def_session!def_program!,分别用于定义会话和程序。

内部这些宏使用 enum_iterator::IntoEnumIterator 继承宏,该宏要求在 Cargo.toml 中导入 enum-iterator

apis = "0.4"
enum-iterator = "0.7"

会话

def_session! 宏扩展为定义过程、通道和消息的数据类型和函数实现。

示例

定义一个名为 'IntSource' 的会话,其中源线程交替地向两个对等节点发送 u64 值,这些节点将接收到的值相加,并在 会话结果 中返回最终的总和。

extern crate apis;

pub mod int_source {
  use apis;

  const MAX_UPDATES : u64 = 10;

  apis::def_session! {
    context IntSource {
      PROCESSES where
        let process    = self,
        let message_in = message_in
      [
        process IntGen (update_count : u64) {
          kind { apis::process::Kind::Isochronous { tick_ms: 20, ticks_per_update: 1 } }
          sourcepoints   [Ints]
          endpoints      []
          handle_message { unreachable!() }
          update         { process.int_gen_update() }
        }
        process Sum1 (sum : u64) -> (u64) {
          kind           { apis::process::Kind::asynchronous_default() }
          sourcepoints   []
          endpoints      [Ints]
          handle_message { process.sum1_handle_message (message_in) }
          update         { apis::process::ControlFlow::Continue }
        }
        process Sum2 (sum : u64) -> (u64) {
          kind           { apis::process::Kind::asynchronous_default() }
          sourcepoints   []
          endpoints      [Ints]
          handle_message { process.sum2_handle_message (message_in) }
          update         { apis::process::ControlFlow::Continue }
        }
      ]
      CHANNELS  [
        channel Ints <Intsmessage> (Source) {
          producers [IntGen]
          consumers [Sum1, Sum2]
        }
      ]
      MESSAGES [
        message Intsmessage {
          Anint (u64),
          Quit
        }
      ]
    }
  }

  impl IntGen {
    pub fn int_gen_update (&mut self) -> apis::process::ControlFlow {
      use apis::Process;
      let to_id = (self.update_count % 2 + 1) as apis::process::IdReprType;
      let anint = self.update_count;
      let mut result = self.send_to (
        ChannelId::Ints,
        ProcessId::try_from (to_id).unwrap(),
        Intsmessage::Anint (anint)
      ).into();
      self.update_count += 1;
      if result == apis::process::ControlFlow::Break || MAX_UPDATES < self.update_count {
        // quit
        let _ = self.send_to (ChannelId::Ints, ProcessId::Sum1, Intsmessage::Quit);
        let _ = self.send_to (ChannelId::Ints, ProcessId::Sum2, Intsmessage::Quit);
        result = apis::process::ControlFlow::Break
      }
      result
    }
  }
  impl Sum1 {
    fn sum1_handle_message (&mut self, message : GlobalMessage) -> apis::process::ControlFlow {
      match message {
        GlobalMessage::Intsmessage (Intsmessage::Anint (anint)) => {
          self.sum += anint;
          apis::process::ControlFlow::Continue
        }
        GlobalMessage::Intsmessage (Intsmessage::Quit) => {
          self.result = self.sum;
          apis::process::ControlFlow::Break
        }
      }
    }
  }
  impl Sum2 {
    fn sum2_handle_message (&mut self, message : GlobalMessage) -> apis::process::ControlFlow {
      match message {
        GlobalMessage::Intsmessage (Intsmessage::Anint (anint)) => {
          self.sum += anint;
          apis::process::ControlFlow::Continue
        }
        GlobalMessage::Intsmessage (Intsmessage::Quit) => {
          self.result = self.sum;
          apis::process::ControlFlow::Break
        }
      }
    }
  }
}

fn main() {
  use int_source::*;
  use apis::session::Context;
  // verifies the validity of the session definition
  let session_def = IntSource::def().unwrap();
  // create the session in the 'Ready' state
  let mut session : apis::Session <IntSource> = session_def.into();
  // run the session and collect results
  let results = session.run();
  println!("results: {:?}", results);
}

请注意,在会话定义中引入变量标识符(此处为 processmessage_in)是必要的,以便它们可以在 handle_messageupdate 块或可选的 initializeterminate 块(未显示)中进行引用。在此处,标识符 process 将在每个块中作为对本地过程的可变自我引用,而 message_in 将仅在 handle_message 块的作用域内作为接收到的消息的别名。

生成表示会话数据流图的 graphviz DOT 文件并将其写入文件

  let session_def = IntSource::def().unwrap();
  use std::io::Write;
  let mut f = std::fs::File::create ("intsource.dot").unwrap();
  f.write_all (session_def.dotfile().as_bytes()).unwrap();
  drop (f);

使用 $ dot -Tpng intsource.dot > intsource.png 以 PNG 格式渲染

请注意,会话在宏调用的作用域中定义了一些类型。将每个会话放入自己的模块中可以按顺序将它们组合成 "程序",如下所述。

程序

示例

在模块 char_sink 中定义另一个名为 CharSink 的会话,具有不同的行为和相反的消息流(实现省略,见 ./examples/readme.rs

然后可以定义一个运行两个会话的 程序

apis::def_program! {
  program Myprogram where let result = session.run() {
    MODES [
      mode int_source::IntSource {
        use apis::Process;
        let sum1 = int_source::Sum1::extract_result (&mut result).unwrap();
        let sum2 = int_source::Sum2::extract_result (&mut result).unwrap();
        println!("combined sums: {}", sum1 + sum2);
        Some (EventId::ToCharSink)
      }
      mode char_sink::CharSink
    ]
    TRANSITIONS  [
      transition ToCharSink <int_source::IntSource> => <char_sink::CharSink>
    ]
    initial_mode: IntSource
  }
}

fn main() {
  use apis::Program;
  // create a program in the initial mode
  let mut myprogram = Myprogram::initial();
  // run to completion
  myprogram.run();
}

请注意,在这里引入 result 标识符是必要的,以便在模式(在此为 'IntSource')的(可选)'转换选择块'中访问会话的 run 调用的结果。在这里,转换始终相同,但是可以使用会话结果的内容非确定性地选择任何具有与完成会话匹配的源头的转换。如果没有定义转换选择块(如上面的 'CharSink' 所示),或者如果转换选择块的评估结果为 'None',则程序将退出,不会转换到任何其他会话。

有关将一个会话的过程的状态传递到下一个会话的程序示例,请参阅 ./examples/ 目录中的 program.rsinteractive.rsgraphical.rs

程序作为状态机实现,可以生成一个 DOT 文件以显示程序状态转换系统

  use std::io::Write;
  let mut f = std::fs::File::create ("myprogram.dot").unwrap();
  f.write_all (Myprogram::dotfile().as_bytes()).unwrap();
  drop (f);

进程控制

进程运行循环将在以下任一条件满足后结束:所有端点通道都从 handle_message() 返回 ControlFlow::Break,或者如果 update() 返回 ControlFlow::Break。注意,在最后一个端点通道关闭后,最终的 update() 仍然会被处理。当 update() 返回 ControlFlow::Break 时,将不再调用 handle_message()

示例

./examples/ 目录中提供了多个示例程序。可以使用 ./run-examples.sh 脚本运行非交互式示例,该脚本还将从生成的 DOT 文件中构建图像。graphical.rsinteractive.rs 示例是交互式的,需要用户输入。可以使用 ./run-interactive.sh 脚本运行这些示例,该脚本也将为这些示例生成从生成的 DOT 文件中的图像。

大多数这些示例都会有意生成警告,请参阅各个示例的文档注释以获取详细信息。

运行测试

需要使用 --features "test" 运行进程和通道定义的文档测试才能成功编译

$ cargo test --features "test"

(见 https://github.com/rust-lang/rust/issues/45599).

依赖项

依赖项

~1–11MB
~70K SLoC