13个不稳定版本 (3个重大变更)
使用旧的Rust 2015
0.5.2 | 2016年11月7日 |
---|---|
0.5.1 | 2016年11月2日 |
0.5.0 | 2016年10月23日 |
0.4.0 | 2016年10月20日 |
0.2.9 | 2016年8月30日 |
#1177 在 并发
用于 tcp-echo
110KB
3K SLoC
acto-rs 库
这个库是一个概念验证,从未在任何生产环境中运行过,测试也不充分。自行承担风险。已被警告。
这个库是连接独立部件形成数据处理管道的多种概念混合体。这些独立部件可以具有
- 内部状态
- 类型化通道与其他部件通信
- 调度规则
这些部件(actor)由一个 调度器 管理,该调度器有预定义的线程数量来运行它们。输入和输出通道的数量由actor的类型确定。可能的类型包括
- Source: 1个输出
- Sink: 1个输入
- Filter: 1个输入,1个输出
- Y-split: 1个输入,2个可能的输出类型
- Y-merge: 2个可能的输入类型,1个输出
- Scatter: 1个输入,多个相同类型的输出
- Gather: 多个相同类型的输入,1个输出
调度规则决定了何时运行actor
- Loop - 持续地,与其他调度任务轮询
- OnMessage - 当消息到达其输入通道之一时
- OnExternalEvent - 当通过Scheduler::notify(..)传递外部事件时(例如与MIO集成)
- Periodic(PeriodLengthInUsec) - 定期
用法
您需要设计组件的拓扑结构,因为组件的连接需要在传递给调度器之前完成。调度器拥有组件,并且您无法从外部更改它们。
当您将组件传递给调度器时,您需要告诉它如何根据上述规则之一来调度它们的执行。最后,您需要启动调度器。启动调度器后,您还可以向其中添加新的actor。
软件包
[dependencies]
acto-rs = "0.5.2"
概述
- 基于elem traits实现actor
- 启动/停止调度器
- 将actor实例传递给调度器
创建actors
actors需要实现上述特之一。示例
- 来源: dummy source
- 过滤器: dummy filter
- 汇: dummy sink
创建源元素
这是一个相对更真实的元素,它从网络读取UDP消息并将其传递到拓扑中的下一个元素。
use actors::*;
use std::net::{UdpSocket, SocketAddr, Ipv4Addr, SocketAddrV4};
use std::io;
use std::mem;
pub struct ReadBytes {
socket: UdpSocket
}
//
// this item reads 1024 bytes on UDP and passes the data forward with
// the data size and the sender address. if an error happens, then the
// error goes forward instead.
//
impl source::Source for ReadBytes {
type OutputValue = ([u8; 1024], (usize, SocketAddr));
type OutputError = io::Error;
fn process(&mut self,
output: &mut Sender<Message<Self::OutputValue, Self::OutputError>>,
_stop: &mut bool)
{
output.put(|value| {
if let &mut Some(Message::Value(ref mut item)) = value {
// re-use the preallocated space in the queue
match self.socket.recv_from(&mut item.0) {
Ok((read_bytes, from_addr)) => {
item.1 = (read_bytes, from_addr);
},
Err(io_error) => {
// swap in the error message
let error_message = Some(Message::Error(ChannelPosition(output.seqno()), io_error));
mem::swap(value, &mut error_message);
}
};
} else {
// allocate new buffer and swap it in
let dummy_address = Ipv4Addr::from(0);
let dummy_sockaddr = SocketAddrV4::new(dummy_address, 1);
let item = ([0; 1024],(0, SocketAddr::V4(dummy_sockaddr)));
match self.socket.recv_from(&mut item.0) {
Ok((read_bytes, from_addr)) => {
item.1 = (read_bytes, from_addr);
let message = Some(Message::Value(item));
mem::swap(value, &mut message);
},
Err(io_error) => {
// swap in the error message
let error_message = Some(Message::Error(ChannelPosition(output.seqno()), io_error));
mem::swap(value, &mut error_message);
}
};
}
});
}
}
启动调度器
调度器允许在运行时或启动前添加新任务。调度器只能启动/停止一次。任务本身决定何时停止,并且将通过执行时传递给它们的stop
标志通知调度器。
let mut sched1 = Scheduler::new();
sched1.start(); // this uses one single execution thread
sched1.stop();
// to use more threads, do:
let mut sched_multi = Scheduler::new();
sched_multi.start_with_threads(12);
sched_multi.stop();
将actors传递给调度器
let mut sched = Scheduler::new();
sched.start_with_threads(4);
// create two dummy tasks
let dummy_queue_size = 2_000;
let (source_task, mut source_out) = source::new( "Source", dummy_queue_size, Box::new(DummySource{}));
let mut sink_task = sink::new( "Sink", Box::new(DummySink{}));
// connect the sink to the source
sink_task.connect(&mut source_out).unwrap();
// add the source and the sink to the scheduler and tell
// the scheduler how to run them
let source_id = sched.add_task(source_task, SchedulingRule::OnExternalEvent).unwrap();
sched.add_task(sink_task, SchedulingRule::OnMessage).unwrap();
// send an example notification to the source task
sched.notify(&source_id).unwrap();
sched.stop();
项目目标
主要目标是可预测的低延迟处理。我不希望做出任何关于性能的声明。我能说的是,我在测量组件、调度器和结果管道的延迟上投入了相当多的时间。
另一个目标是我想将消息传递的开销降到最低。这既包括发送者执行发送操作所需的时间,也包括从发送开始到实际接收消息的端到端延迟。我发现前者的估计(未命名的)actor实现中偏低。我想将这个开销保持在几十纳秒的范围内。
我想这个库在超负荷下表现良好。在实践中这意味着(可能)丢弃消息。组件使用有界消息队列进行通信。发送者可以检测是否即将覆盖先前的消息,并可能采取相应措施,但我认为这不是正确的方法。如果消息队列读取器落后,这意味着系统无法应对负载。在这种情况下,我们不应该堆积消息,填满所有内存,然后让系统崩溃。
项目非目标
我从其他actor模型中获取了一些想法,但我不想严格遵循它们。Erlang/Elixir actor模型是一个很好的灵感来源,我非常钦佩他们的工作。
许可证
依赖项
~70KB