1个不稳定版本

使用旧的Rust 2015

0.1.0 2016年5月30日

#1317 in 异步

MIT许可

55KB
1.5K SLoC

mai

mio 之上构建的高层次事件循环。 mai 管理缓冲区和流,以便您可以专注于发送和接收您协议的帧。

状态

功能基本完备。API可能会更改。

入门指南

使用 mai 需要三个步骤

  • 选择一个数据类型作为您协议的 Frame,一个可操作的消息。
  • 定义一个 Codec,它知道如何将 Frame 编码和解码到字节缓冲区。
  • 指定一个 Handler 来响应当前连接、传入的 Frame 和错误。

缓冲区池、底层 readswrites 以及 Token 管理由 mai 处理。

回声客户端示例

协议

通过指定您将要使用的类型家族来实现 Protocol 特性。

use mai::*;

struct EchoCodec;
struct EchoClientHandler;
struct EchoClient;

impl Protocol for EchoClient {
  type ByteStream = TcpStream; // vs a UnixStream, for example
  type Frame = String;
  type Codec = EchoCodec;
  type Handler = EchoClientHandler;
  type Timeout = usize;
}

编解码器

定义编码和解码帧的方法。使用返回代码来指示您已获得一个帧、尚未有足够的字节来读取一个帧或遇到协议错误。


// For a simple Echo server, we can use `String` as our Frame type.
// This codec would work for both a client and server connection.
impl Codec<String> for EchoCodec {
  // Provide a method to try to write a given frame to a byte buffer
  fn encode(&mut self, message: &String, buffer: &mut [u8]) -> EncodingResult {
    let bytes = message.as_bytes();
    // If the buffer isn't big enough, say so via the return value
    if bytes.len() > buffer.len() {
      return Err(EncodingError::InsufficientBuffer);
    }
    // Copy the bytes of our String into the buffer
    for (index, &byte) in bytes.iter().enumerate() {
        buffer[index] = byte;
    }
    // Tell the frame engine how many bytes we wrote
    Ok(BytesWritten(bytes.len()))
  }

  // Provide a method to try to parse a frame from a byte buffer
  fn decode(&mut self, buffer: &[u8]) -> DecodingResult<String> {
    use std::str;
    // Validate that the buffer contains a utf-8 String
    let message: String = match str::from_utf8(buffer) {
      Ok(message) => message.to_owned(),
      // For this example, assume that an invalid message means 
      // that we just don't have enough bytes yet
      Err(error) => return Err(DecodingError::IncompleteFrame)
    };
    Ok(DecodedFrame::new(message, BytesRead(buffer.len())))
  }
}

帧处理器

定义回调以处理字节流事件:连接、帧、超时、错误和断开连接。

use mai::*;

impl Handler<EchoClient> for EchoClientHandler {
  fn on_ready(&mut self, context: &mut Context<EchoClient>) {
    let stream = context.stream();
    println!("Connected to {:?}", stream.peer_addr());
    let message: String = "Supercalifragilisticexpialidocious!".to_owned();
    stream.send(message);
  }
  fn on_frame(&mut self, stream: &mut Context<EchoClient>, message: String) {
    let stream = context.stream();
    println!("Received a message from {:?}: '{}'", stream.peer_addr(), &message.trim_right());
  }
  fn on_timeout(&mut self, timeout: usize) {
    println!("A timeout has occurred: {:?}", timeout);
  }
  fn on_error(&mut self, context: &mut Context<EchoClient>, error: &Error) {
    let stream = context.stream();
    println!("Error. {:?}, {:?}", stream.peer_addr(), error);
  }
  fn on_closed(&mut self, stream: &Context<EchoClient>) {
    let stream = context.stream();
    println!("Disconnected from {:?}", stream.peer_addr());
  }
}

开始工作

创建一个 ProtocolEngine 并将其传递给任何 mio 类型,该类型是 Evented+Read+Write。看它运行吧!

fn main() {
  // Create a TcpStream connected to `nc` running as an echo server
  // nc -l -p 2000 -c 'xargs -n1 echo'
  println!("Connecting to localhost:9999...");
  let address = "0.0.0.0:9999".parse().unwrap();
  let socket = TcpSocket::v4().unwrap();
  let (stream, _complete) = socket.connect(&address).unwrap();
  
  // Hand the TcpStream off to our new `ProtocolEngine` configured to treat its
  // byte streams as Echo clients.
  let protocol_engine: ProtocolEngine<EchoClient> = mai::protocol_engine(EchoClientHandler)
    .with(InitialBufferSize(Kilobytes(32))
    .with(InitialBufferPoolSize(16))
    .with(MaxBufferPoolSize(128))
    .build();
  let token = protocol_engine.manage(stream);
  let _ = protocol_engine.wait();
}

创建服务器

目前 mai 没有内置的方式来管理传入的连接。这正在 进行中

运行服务器在概念上是一个简单的过程:使用 mio 创建一个单独的线程来监听传入的连接。每次客户端连接可用时,将相应的 TcpStream 传递给后台运行的 ProtocolEngine。在正式API出现之前,您可以通过运行 protocol_engine.command_sender.clone() 并发送一个包含您希望它管理的 ByteStream 的 Command::Manage(P::ByteStream) 消息来获取一个通道,发送命令到 ProtocolEngine 实例。

依赖项

~3.5MB
~70K SLoC