6 个版本

使用旧的 Rust 2015

0.1.5 2019 年 2 月 15 日
0.1.4 2016 年 6 月 21 日
0.1.2 2016 年 5 月 6 日

#5 in #socket-server

MPL-2.0 许可证

41KB
582

hydrogen

文档

hydrogen 是一个基于 epoll 的非阻塞套接字服务器框架。它负责处理线程间的繁琐的连接和 I/O 传输,并通过特质的实现将 I/O 读写具体细节留给消费者。


hydrogen 与 Stream 特质对象一起工作,因此可以使用任何自定义类型。与 simple-stream 一起构建,提供包括普通和加密流以及基本和 WebSocket 封装在内的多种流抽象和类型。

多线程

hydrogen 是多线程的。它使用一个线程来接受传入的连接,一个线程来更新 epoll 报告的事件,以及一个线程用于将 I/O 传输到用户指定大小的线程池中。

堆块分配

连接池被管理为一个堆块,这意味着遍历时间类似于遍历 Vector,插入和删除时间为 O(1)。

示例

simple-stream

extern crate hydrogen;
extern crate simple_stream as ss;

use hydrogen;
use hydrogen::{Stream as HydrogenStream, HydrogenSocket};
use ss::frame::Frame;
use ss::frame::simple::{SimpleFrame, SimpleFrameBuilder};
use ss::{Socket, Plain, NonBlocking, SocketOptions};



// Hydrogen requires a type that implements `hydrogen::Stream`.
// We'll implement it atop the `simple-stream` crate.
#[derive(Clone)]
pub struct Stream {
    inner: Plain<Socket, SimpleFrameBuilder>
}

impl HydrogenStream for Stream {
    // This method is called when epoll reports data is available for reading.
    fn recv(&mut self) -> Result<Vec<Vec<u8>>, Error> {
        match self.inner.nb_recv() {
            Ok(frame_vec) => {
                let mut ret_buf = Vec::<Vec<u8>>::with_capacity(frame_vec.len());
                for frame in frame_vec.iter() {
                    ret_buf.push(frame.payload());
                }
                Ok(ret_buf)
            }
            Err(e) => Err(e)
        }
    }

    // This method is called when a previous attempt to write has returned `ErrorKind::WouldBlock`
    // and epoll has reported that the socket is now writable.
    fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
        let frame = SimpleFrame::new(buf);
        self.inner.nb_send(&frame)
    }

    // This method is called when connection has been reported as reset by epoll, or when any
    // `std::io::Error` has been returned.
    fn shutdown(&mut self) -> Result<(), Error> {
        self.inner.shutdown()
    }
}
impl AsRawFd for Stream {
    fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}


// The following will be our server that handles all reported events
struct Server;
impl hydrogen::Handler for Server {
    fn on_server_created(&mut self, fd: RawFd) {
        // Do any secific flag/option setting on the underlying listening fd.
        // This will be the fd that accepts all incoming connections.
    }

    fn on_new_connection(&mut self, fd: RawFd) -> Arc<UnsafeCell<HydrogenStream>> {
        // With the passed fd, create your type that implements `hydrogen::Stream`
        // and return it.
    }

    fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec<u8>) {
        // Called when a complete, consumer defined, chunk of data has been read.
    }

    fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
        // Called when a connection has been removed from the watch list, with the
        // `std::io::Error` as the reason removed.
    }
}


fn main() {
    hydrogen::begin(Server, hydrogen::Config {
        addr: "0.0.0.0".to_string(),
        port: 1337,
        max_threads: 8,
        pre_allocated: 100000
    });
}

std::net::TcpStream

extern crate hydrogen;

use std::cell::UnsafeCell;
use std::io::{Read, Write, Error, ErrorKind};
use std::net::{TcpStream, Shutdown};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;

use hydrogen::{Stream as HydrogenStream, HydrogenSocket};


pub struct Stream {
    inner: TcpStream
}

impl Stream {
    pub fn from_tcp_stream(tcp_stream: TcpStream) -> Stream {
        tcp_stream.set_nonblocking(true);
        Stream {
            inner: tcp_stream
        }
    }
}

impl HydrogenStream for Stream {
    // This method is called when epoll reports data is available for reading.
    fn recv(&mut self) -> Result<Vec<Vec<u8>>, Error> {
        let mut msgs = Vec::<Vec<u8>>::new();

        // Our socket is set to non-blocking, we need to read until
        // there is an error or the system returns WouldBlock.
        // TcpStream offers no guarantee it will return in non-blocking mode.
        // Double check OS specifics on this when using.
        // https://doc.rust-lang.net.cn/std/io/trait.Read.html#tymethod.read
        let mut total_read = Vec::<u8>::new();
        loop {
            let mut buf = [0u8; 4098];
            let read_result = self.inner.read(&mut buf);
            if read_result.is_err() {
                let err = read_result.unwrap_err();
                if err.kind() == ErrorKind::WouldBlock {
                    break;
                }

                return Err(err);
            }

            let num_read = read_result.unwrap();
            total_read.extend_from_slice(&buf[0..num_read]);
        }

        // Multiple frames, or "msgs", could have been gathered here. Break up
        // your frames here and save remainer somewhere to come back to on the
        // next reads....
        //
        // Frame break out code goes here
        //

        msgs.push(total_read);

        return Ok(msgs);
    }

    // This method is called when a previous attempt to write has returned `ErrorKind::WouldBlock`
    // and epoll has reported that the socket is now writable.
    fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
        self.inner.write_all(buf)
    }

    // This method is called when connection has been reported as reset by epoll, or when any
    // `std::io::Error` has been returned.
    fn shutdown(&mut self) -> Result<(), Error> {
        self.inner.shutdown(Shutdown::Both)
    }
}

impl AsRawFd for Stream {
    fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}

// The following will be our server that handles all reported events
struct Server;
impl hydrogen::Handler for Server {
    fn on_server_created(&mut self, fd: RawFd) {
        // Do any secific flag/option setting on the underlying listening fd.
        // This will be the fd that accepts all incoming connections.
    }

    fn on_new_connection(&mut self, fd: RawFd) -> Arc<UnsafeCell<HydrogenStream>> {
        // With the passed fd, create your type that implements `hydrogen::Stream`
        // and return it.
    }

    fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec<u8>) {
        // Called when a complete, consumer defined, chunk of data has been read.
    }

    fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
        // Called when a connection has been removed from the watch list, with the
        // `std::io::Error` as the reason removed.
    }
}


fn main() {
    hydrogen::begin(Server, hydrogen::Config {
        addr: "0.0.0.0".to_string(),
        port: 1337,
        max_threads: 8,
        pre_allocated: 100000
    });
}

作者

Nathan Sizemore,[email protected]

许可证

hydrogen 在 MPL-2.0 许可证下可用。有关更多信息,请参阅 LICENSE 文件。

依赖关系

~320KB