#actor #scheduler #message #channel #input #piece

bin+lib acto-rs

实验性actor库,正在开发中。(原名minions_rs)。

13个不稳定版本 (3个重大变更)

使用旧的Rust 2015

0.5.2 2016年11月7日
0.5.1 2016年11月2日
0.5.0 2016年10月23日
0.4.0 2016年10月20日
0.2.9 2016年8月30日

#1177并发


用于 tcp-echo

MIT/Apache

110KB
3K SLoC

acto-rs 库

这个库是一个概念验证,从未在任何生产环境中运行过,测试也不充分。自行承担风险。已被警告。


这个库是连接独立部件形成数据处理管道的多种概念混合体。这些独立部件可以具有

  • 内部状态
  • 类型化通道与其他部件通信
  • 调度规则

这些部件(actor)由一个 调度器 管理,该调度器有预定义的线程数量来运行它们。输入和输出通道的数量由actor的类型确定。可能的类型包括

  • Source: 1个输出
  • Sink: 1个输入
  • Filter: 1个输入,1个输出
  • Y-split: 1个输入,2个可能的输出类型
  • Y-merge: 2个可能的输入类型,1个输出
  • Scatter: 1个输入,多个相同类型的输出
  • Gather: 多个相同类型的输入,1个输出

调度规则决定了何时运行actor

  • Loop - 持续地,与其他调度任务轮询
  • OnMessage - 当消息到达其输入通道之一时
  • OnExternalEvent - 当通过Scheduler::notify(..)传递外部事件时(例如与MIO集成)
  • Periodic(PeriodLengthInUsec) - 定期

用法

您需要设计组件的拓扑结构,因为组件的连接需要在传递给调度器之前完成。调度器拥有组件,并且您无法从外部更改它们。

当您将组件传递给调度器时,您需要告诉它如何根据上述规则之一来调度它们的执行。最后,您需要启动调度器。启动调度器后,您还可以向其中添加新的actor。

软件包

[dependencies]
acto-rs = "0.5.2"

概述

  • 基于elem traits实现actor
  • 启动/停止调度器
  • 将actor实例传递给调度器

创建actors

actors需要实现上述特之一。示例

创建源元素

这是一个相对更真实的元素,它从网络读取UDP消息并将其传递到拓扑中的下一个元素。

use actors::*;
use std::net::{UdpSocket, SocketAddr, Ipv4Addr, SocketAddrV4};
use std::io;
use std::mem;

pub struct ReadBytes {
  socket: UdpSocket
}

//
// this item reads 1024 bytes on UDP and passes the data forward with
// the data size and the sender address. if an error happens, then the
// error goes forward instead.
//
impl source::Source for ReadBytes {

  type OutputValue = ([u8; 1024], (usize, SocketAddr));
  type OutputError = io::Error;

  fn process(&mut self,
             output: &mut Sender<Message<Self::OutputValue, Self::OutputError>>,
             _stop: &mut bool)
  {
    output.put(|value| {
      if let &mut Some(Message::Value(ref mut item)) = value {
        // re-use the preallocated space in the queue
        match self.socket.recv_from(&mut item.0) {
          Ok((read_bytes, from_addr)) => {
            item.1 = (read_bytes, from_addr);
          },
          Err(io_error) => {
            // swap in the error message
            let error_message = Some(Message::Error(ChannelPosition(output.seqno()), io_error));
            mem::swap(value, &mut error_message);
          }
        };
      } else {
        // allocate new buffer and swap it in
        let dummy_address  = Ipv4Addr::from(0);
        let dummy_sockaddr = SocketAddrV4::new(dummy_address, 1);
        let item = ([0; 1024],(0, SocketAddr::V4(dummy_sockaddr)));

        match self.socket.recv_from(&mut item.0) {
          Ok((read_bytes, from_addr)) => {
            item.1 = (read_bytes, from_addr);
            let message = Some(Message::Value(item));
            mem::swap(value, &mut message);
          },
          Err(io_error) => {
            // swap in the error message
            let error_message = Some(Message::Error(ChannelPosition(output.seqno()), io_error));
            mem::swap(value, &mut error_message);
          }
        };
      }
    });
  }
}

启动调度器

调度器允许在运行时或启动前添加新任务。调度器只能启动/停止一次。任务本身决定何时停止,并且将通过执行时传递给它们的stop标志通知调度器。

let mut sched1 = Scheduler::new();
sched1.start(); // this uses one single execution thread
sched1.stop();

// to use more threads, do:
let mut sched_multi = Scheduler::new();
sched_multi.start_with_threads(12);
sched_multi.stop();

将actors传递给调度器

let mut sched = Scheduler::new();
sched.start_with_threads(4);

// create two dummy tasks
let dummy_queue_size = 2_000;
let (source_task, mut source_out) = source::new( "Source", dummy_queue_size, Box::new(DummySource{}));
let mut sink_task = sink::new( "Sink", Box::new(DummySink{}));

// connect the sink to the source
sink_task.connect(&mut source_out).unwrap();

// add the source and the sink to the scheduler and tell
// the scheduler how to run them
let source_id = sched.add_task(source_task, SchedulingRule::OnExternalEvent).unwrap();
sched.add_task(sink_task, SchedulingRule::OnMessage).unwrap();

// send an example notification to the source task
sched.notify(&source_id).unwrap();

sched.stop();

项目目标

主要目标是可预测的低延迟处理。我不希望做出任何关于性能的声明。我能说的是,我在测量组件、调度器和结果管道的延迟上投入了相当多的时间。

另一个目标是我想将消息传递的开销降到最低。这既包括发送者执行发送操作所需的时间,也包括从发送开始到实际接收消息的端到端延迟。我发现前者的估计(未命名的)actor实现中偏低。我想将这个开销保持在几十纳秒的范围内。

我想这个库在超负荷下表现良好。在实践中这意味着(可能)丢弃消息。组件使用有界消息队列进行通信。发送者可以检测是否即将覆盖先前的消息,并可能采取相应措施,但我认为这不是正确的方法。如果消息队列读取器落后,这意味着系统无法应对负载。在这种情况下,我们不应该堆积消息,填满所有内存,然后让系统崩溃。

项目非目标

我从其他actor模型中获取了一些想法,但我不想严格遵循它们。Erlang/Elixir actor模型是一个很好的灵感来源,我非常钦佩他们的工作。

许可证

MITApache 2.0

依赖项

~70KB