#actor #compute #pipeline #graph #actor-framework

hollywood_macros

为Hollywood演员框架提供宏

11个版本 (6个重大更新)

0.7.0 2024年5月1日
0.5.2 2024年4月11日
0.5.0 2024年3月8日
0.2.2 2023年10月5日

#1098过程宏

Download history 180/week @ 2024-04-25 53/week @ 2024-05-02 14/week @ 2024-05-09 24/week @ 2024-05-16 46/week @ 2024-05-23 41/week @ 2024-05-30 30/week @ 2024-06-06 29/week @ 2024-06-13 228/week @ 2024-06-20 116/week @ 2024-06-27 4/week @ 2024-07-04 24/week @ 2024-07-11 9/week @ 2024-07-18 72/week @ 2024-07-25 19/week @ 2024-08-01

每月 101 次下载
3 个crate中使用(通过 hollywood

Apache-2.0

49KB
1K SLoC

hollywood

Hollywood是一个专注于表示具有异构输入和输出的演员的Rust演员框架,这些演员被安排在一个非循环计算图/管道中。设计目标是简单性和最小化样板代码。因此,Hollywood是异步Rust的一般抽象,特别是异步tokio运行时。如果你不需要这样的抽象,你可能想直接使用tokio(或另一个异步运行时)。

演员是有状态的实体,它们通过发送消息相互通信。演员演员通过其入站通道接收消息流,处理它们并通过其出站通道将消息发送给其他演员。演员要么是无状态的,因此其出站流是入站流的纯函数,要么有内部状态,该状态由传入的消息更新,并可能影响其出站消息的内容。

演员被安排在一个计算管道中(或更准确地说,是一个有向无环图)。图的第一层由一组一个或多个源演员组成,其出站流由外部资源提供。源演员的典型例子包括传感器驱动程序或日志文件读取器。管道的终端节点是没有出站通道或出站通道未连接的演员。终端演员通常是汇点,将数据提供给外部资源。汇演员的例子包括机器人操作器、日志文件写入器或可视化组件。

除了演员之间的前馈连接之外,演员还可以通过一组请求-回复通道相互通信。没有限制可以连接哪些演员对使用此类请求-回复通道。例如,请求-回复通道可用于在计算流中创建反馈循环。

示例

以下示例演示了如何创建一个简单的流水线,其中包含一个周期性源演员,该演员为移动平均演员提供数据。移动平均演员连接到两个打印演员,这两个演员将时间戳和移动平均数打印到控制台。

use hollywood::actors::{Periodic, Printer, PrinterProp};
use hollywood::prelude::*;
use hollywood::example_actors::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState};

let pipeline = Hollywood::configure(&mut |context| {
    let mut timer = Periodic::new_with_period(context, 1.0);
    let mut moving_average = MovingAverage::from_prop_and_state(
        context,
        MovingAverageProp {
            alpha: 0.3,
            timeout: 5.0,
        },
        MovingAverageState::default(),
    );
    let mut time_printer = Printer::<f64>::from_prop_and_state(
        context,
        PrinterProp {
            topic: "time".to_string(),
        },
        NullState::default(),
    );
    let mut average_printer = Printer::<f64>::from_prop_and_state(
        context,
        PrinterProp {
            topic: "average".to_string(),
        },
        NullState::default(),
    );
    timer
        .outbound
        .time_stamp
        .connect(context, &mut moving_average.inbound.value);
    timer
        .outbound
        .time_stamp
        .connect(context, &mut time_printer.inbound.printable);
    moving_average
        .outbound
        .average
        .connect(context, &mut average_printer.inbound.printable);
});
pipeline.print_flow_graph();   
pipeline.run();

print_flow_graph 方法的输出是

*Periodic_0*
|   time_stamp   |
        ⡏⠉⠑⠒⠢⠤⠤⣀⣀
        ⡇        ⠉⠉⠑⠒⠢⠤⠤⣀⣀
        ⠁                 ⠁
|     Value      |                  |   Printable    |
*MovingAverage_0*                  *Printer(time)_0*
|    average     |
        ⡇
        ⡇
        ⠁
|   Printable    |
*Printer(average)*  

依赖关系

~0.8–1.3MB
~25K SLoC