6 个版本
使用旧的 Rust 2015
0.1.5 | 2019 年 2 月 15 日 |
---|---|
0.1.4 | 2016 年 6 月 21 日 |
0.1.2 | 2016 年 5 月 6 日 |
#5 in #socket-server
41KB
582 行
hydrogen
hydrogen 是一个基于 epoll 的非阻塞套接字服务器框架。它负责处理线程间的繁琐的连接和 I/O 传输,并通过特质的实现将 I/O 读写具体细节留给消费者。
流
hydrogen 与 Stream
特质对象一起工作,因此可以使用任何自定义类型。与 simple-stream 一起构建,提供包括普通和加密流以及基本和 WebSocket 封装在内的多种流抽象和类型。
多线程
hydrogen 是多线程的。它使用一个线程来接受传入的连接,一个线程来更新 epoll 报告的事件,以及一个线程用于将 I/O 传输到用户指定大小的线程池中。
堆块分配
连接池被管理为一个堆块,这意味着遍历时间类似于遍历 Vector,插入和删除时间为 O(1)。
示例
simple-stream
extern crate hydrogen;
extern crate simple_stream as ss;
use hydrogen;
use hydrogen::{Stream as HydrogenStream, HydrogenSocket};
use ss::frame::Frame;
use ss::frame::simple::{SimpleFrame, SimpleFrameBuilder};
use ss::{Socket, Plain, NonBlocking, SocketOptions};
// Hydrogen requires a type that implements `hydrogen::Stream`.
// We'll implement it atop the `simple-stream` crate.
#[derive(Clone)]
pub struct Stream {
inner: Plain<Socket, SimpleFrameBuilder>
}
impl HydrogenStream for Stream {
// This method is called when epoll reports data is available for reading.
fn recv(&mut self) -> Result<Vec<Vec<u8>>, Error> {
match self.inner.nb_recv() {
Ok(frame_vec) => {
let mut ret_buf = Vec::<Vec<u8>>::with_capacity(frame_vec.len());
for frame in frame_vec.iter() {
ret_buf.push(frame.payload());
}
Ok(ret_buf)
}
Err(e) => Err(e)
}
}
// This method is called when a previous attempt to write has returned `ErrorKind::WouldBlock`
// and epoll has reported that the socket is now writable.
fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
let frame = SimpleFrame::new(buf);
self.inner.nb_send(&frame)
}
// This method is called when connection has been reported as reset by epoll, or when any
// `std::io::Error` has been returned.
fn shutdown(&mut self) -> Result<(), Error> {
self.inner.shutdown()
}
}
impl AsRawFd for Stream {
fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}
// The following will be our server that handles all reported events
struct Server;
impl hydrogen::Handler for Server {
fn on_server_created(&mut self, fd: RawFd) {
// Do any secific flag/option setting on the underlying listening fd.
// This will be the fd that accepts all incoming connections.
}
fn on_new_connection(&mut self, fd: RawFd) -> Arc<UnsafeCell<HydrogenStream>> {
// With the passed fd, create your type that implements `hydrogen::Stream`
// and return it.
}
fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec<u8>) {
// Called when a complete, consumer defined, chunk of data has been read.
}
fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
// Called when a connection has been removed from the watch list, with the
// `std::io::Error` as the reason removed.
}
}
fn main() {
hydrogen::begin(Server, hydrogen::Config {
addr: "0.0.0.0".to_string(),
port: 1337,
max_threads: 8,
pre_allocated: 100000
});
}
std::net::TcpStream
extern crate hydrogen;
use std::cell::UnsafeCell;
use std::io::{Read, Write, Error, ErrorKind};
use std::net::{TcpStream, Shutdown};
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use hydrogen::{Stream as HydrogenStream, HydrogenSocket};
pub struct Stream {
inner: TcpStream
}
impl Stream {
pub fn from_tcp_stream(tcp_stream: TcpStream) -> Stream {
tcp_stream.set_nonblocking(true);
Stream {
inner: tcp_stream
}
}
}
impl HydrogenStream for Stream {
// This method is called when epoll reports data is available for reading.
fn recv(&mut self) -> Result<Vec<Vec<u8>>, Error> {
let mut msgs = Vec::<Vec<u8>>::new();
// Our socket is set to non-blocking, we need to read until
// there is an error or the system returns WouldBlock.
// TcpStream offers no guarantee it will return in non-blocking mode.
// Double check OS specifics on this when using.
// https://doc.rust-lang.net.cn/std/io/trait.Read.html#tymethod.read
let mut total_read = Vec::<u8>::new();
loop {
let mut buf = [0u8; 4098];
let read_result = self.inner.read(&mut buf);
if read_result.is_err() {
let err = read_result.unwrap_err();
if err.kind() == ErrorKind::WouldBlock {
break;
}
return Err(err);
}
let num_read = read_result.unwrap();
total_read.extend_from_slice(&buf[0..num_read]);
}
// Multiple frames, or "msgs", could have been gathered here. Break up
// your frames here and save remainer somewhere to come back to on the
// next reads....
//
// Frame break out code goes here
//
msgs.push(total_read);
return Ok(msgs);
}
// This method is called when a previous attempt to write has returned `ErrorKind::WouldBlock`
// and epoll has reported that the socket is now writable.
fn send(&mut self, buf: &[u8]) -> Result<(), Error> {
self.inner.write_all(buf)
}
// This method is called when connection has been reported as reset by epoll, or when any
// `std::io::Error` has been returned.
fn shutdown(&mut self) -> Result<(), Error> {
self.inner.shutdown(Shutdown::Both)
}
}
impl AsRawFd for Stream {
fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() }
}
// The following will be our server that handles all reported events
struct Server;
impl hydrogen::Handler for Server {
fn on_server_created(&mut self, fd: RawFd) {
// Do any secific flag/option setting on the underlying listening fd.
// This will be the fd that accepts all incoming connections.
}
fn on_new_connection(&mut self, fd: RawFd) -> Arc<UnsafeCell<HydrogenStream>> {
// With the passed fd, create your type that implements `hydrogen::Stream`
// and return it.
}
fn on_data_received(&mut self, socket: HydrogenSocket, buffer: Vec<u8>) {
// Called when a complete, consumer defined, chunk of data has been read.
}
fn on_connection_removed(&mut self, fd: RawFd, err: Error) {
// Called when a connection has been removed from the watch list, with the
// `std::io::Error` as the reason removed.
}
}
fn main() {
hydrogen::begin(Server, hydrogen::Config {
addr: "0.0.0.0".to_string(),
port: 1337,
max_threads: 8,
pre_allocated: 100000
});
}
作者
Nathan Sizemore,[email protected]
许可证
hydrogen 在 MPL-2.0 许可证下可用。有关更多信息,请参阅 LICENSE 文件。
依赖关系
~320KB