5个不稳定版本
使用旧的Rust 2015
0.7.0 | 2017年7月31日 |
---|---|
0.6.2 | 2017年7月25日 |
0.6.1 | 2017年7月25日 |
0.6.0 | 2017年7月18日 |
0.5.0 | 2017年7月17日 |
#8 in #fibers
170KB
3.5K SLoC
easyfibers
easyfibers 是一个无闭包协程库,旨在尽可能轻松地执行异步任务。它是在 mio 和 context-rs 之上的一层薄薄的封装。
描述
easyfibers 允许你编写类似使用阻塞套接字的代码,无需将你的代码放入尴尬的闭包中。它将在读取、写入和接受函数调用上无缝轮询和调度纤程。
愿景
让 easyfibers 纤程运行协议,让客户端代码在主堆栈上(使用 Fiber::join_main)。
这样
-
协议实现可以高效且直接地进行。
-
协议客户端和服务器用户不会遇到堆栈大小问题。
-
用户可以自由选择他们的代码实现方式,无论是否受到框架限制;无需强制将代码放入回调或闭包中。
警告
每个纤程在自己的堆栈中执行。这些堆栈非常有限,因此必须小心,不要超出限制(否则它将使用 SIGBUS 杀死你的应用程序)。
这种风险值得吗?我认为是的。与其他协程/纤程库相比,它的易用性。
过度使用闭包会使代码难看,产生可怕的编译错误,并使与其他代码的集成变得困难。
文档
TODO
- 读取、写入、接受上的纤程调度
- 子纤程
- 流式响应
- 文件(使用线程池)
- join_main 从纤程调用主堆栈,并使用 resume_fiber 恢复它
- SSL/TLS
- Fiber::hibernate_for_read 用于保持连接的场景
- 多个运行者,启用运行多个不同的服务
- 基于计时器的纤程
- 异步DNS查找
示例 - 随机 http/1.1 代理
使用3种类型的纤程
-
接受连接的 TcpListener
-
接收请求并启动 http 客户端纤程的 TcpStream 服务器
-
具有两个角色的 TcpStream 客户端
-
从外部服务创建请求并流式传输响应回父纤程
-
主堆栈上的请求调用我们的代理
-
在一个终端上运行下面的示例
cargo test -- --nocapture
extern crate easyfibers;
extern crate rand;
use easyfibers::*;
use mio::net::{TcpStream,TcpListener};
use std::io::{Write,Read};
use std::time::{Duration};
use std::net::{SocketAddr,Ipv4Addr,IpAddr};
use std::str;
use native_tls::{TlsConnector};
use std::io;
#[derive(Clone)]
struct Param {
chosen: Option<String>,
is_https: bool,
proxy_client: bool,
http_hosts: Vec<String>,
https_hosts: Vec<String>,
}
#[derive(PartialEq)]
enum Resp<'a> {
Done,
Bytes(&'a[u8])
}
// Receive list of hosts.
// Return slices.
fn get_http(mut fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
// We will read in 500B chunks
let mut v = [0u8;2000];
let host = p.chosen.unwrap();
if p.is_https {
let connector = TlsConnector::builder().unwrap().build().unwrap();
fiber.tcp_tls_connect(connector, host.as_str()).unwrap();
// https requires longer timeout
fiber.socket_timeout(Some(Duration::from_millis(2000)));
} else {
fiber.socket_timeout(Some(Duration::from_millis(1000)));
};
// We want to time out so use keep-alive
let req = format!("GET / HTTP/1.1\r\nHost: {}\r\nConnection: keep-alive\r\nUser-Agent: test\r\n\r\n",host);
fiber.write(req.as_bytes()).expect("Can not write to socket");
loop {
// Whenever socket would normally return WouldBlock, fiber gets executed out and another
// one takes its place in the background.
match fiber.read(&mut v[..]) {
Ok(sz) => {
// Return slice to parent, directly from our stack!
fiber.resp_chunk(Resp::Bytes(&v[0..sz]));
}
Err(e) => {
assert_eq!(e.kind(), io::ErrorKind::TimedOut);
break;
}
}
}
println!("Client fiber closing {}", p.proxy_client);
Some(Resp::Done)
}
fn rand_http_proxy(mut fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
fiber.socket_timeout(Some(Duration::from_millis(500)));
// Pick a random host from our list.
let chosen = rand::random::<usize>() % p.http_hosts.len();
// Pick http or https.
let port = if rand::random::<u8>() % 2 == 0 { 80 } else { 443 };
let p1 = if port == 443 {
Param {
chosen: Some(p.https_hosts[chosen].clone()),
is_https: port == 443,
http_hosts: Vec::new(),
https_hosts: Vec::new(),
proxy_client: true,
}
} else {
Param {
chosen: Some(p.http_hosts[chosen].clone()),
is_https: port == 443,
http_hosts: Vec::new(),
https_hosts: Vec::new(),
proxy_client: true,
}
};
let addr = if let &Some(ref ch) = &p1.chosen {
ch.clone()
} else {
panic!("")
};
fiber.join_resolve_connect(addr.as_str(), SocketType::Tcp, port, Duration::from_millis(3000), get_http, p1).unwrap();
println!("Returning: {}{}", if port == 443 { "https://" } else { "http://" }, addr);
// Fibers can stream response to parent. So we iterate on responses.
// We could also create multiple children and iterate on all of them.
while let Some(resp) = fiber.get_child() {
if let Resp::Bytes(slice) = resp {
println!("Server got {}", slice.len());
fiber.write(slice);
}
}
println!("Server socket fiber closing");
// return empty slice, so main stack knows a server connection has closed
None
}
// Accept sockets in an endless loop.
fn sock_acceptor(fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
loop {
// If no sockets available, fiber will be scheduled out for execution until something connects.
match fiber.accept_tcp() {
Ok((sock,_)) => {
// Create a new fiber on received socket. Use rand_http_proxy function to run it.
fiber.new_tcp(sock,rand_http_proxy, p.clone()).unwrap();
}
_ => {
println!("Listen socket error");
break;
}
}
}
None
}
fn main() {
// First time calling random requires a large stack, we must initialize it on main stack!
rand::random::<u8>();
let p = Param {
chosen: None,
is_https: false,
proxy_client: false,
http_hosts: vec!["www.liquiddota.com".to_string(),"www.google.com".to_string(),
"www.sqlite.org".to_string(),"edition.cnn.com".to_string()],
https_hosts: vec!["www.reddit.com".to_string(), "www.google.com".to_string(),
"arstechnica.com".to_string(), "news.ycombinator.com".to_string()],
};
// Start our poller.
// Set this stack lower to see some SIGBUS action.
let poller:Poller = Poller::new(Some(4096*10)).unwrap();
// Start runner with Param and Resp types.
let runner:Runner<Param,Resp> = Runner::new().unwrap();
// Start a TCP listener socket
let listener = TcpListener::bind(&"127.0.0.1:10000".parse().unwrap()).unwrap();
// Create a fiber from it. Listener socket will use sock_acceptor function.
runner.new_listener(listener, sock_acceptor, p).unwrap();
// Run 20 requests and exit.
let mut reqs_remain = 20;
// Start requests. We can directly start a TcpStream because we are not resolving anything.
// Requests will call our own server.
for _ in 0..reqs_remain {
let p = Param {
chosen: Some("127.0.0.1:10000".to_string()),
is_https: false,
proxy_client: false,
http_hosts: Vec::new(),
https_hosts: Vec::new(),
};
let addr = IpAddr::V4(Ipv4Addr::new(127,0,0,1));
let client_sock = TcpStream::connect(&SocketAddr::new(addr, 10000)).unwrap();
runner.new_tcp(client_sock, get_http, p).unwrap();
}
while reqs_remain > 0 {
if !poller.poll(Duration::from_millis(10)) {
continue;
}
if !runner.run() {
continue;
}
while let Some(r) = runner.get_response() {
if Resp::Done == r {
reqs_remain -= 1;
println!("Finished executing, req_remain: {}", reqs_remain);
} else if let Resp::Bytes(slice) = r {
println!("Main stack got {} bytes", slice.len());
}
}
while let Some(_) = runner.get_fiber() {
}
}
println!("poll out");
}
依赖项
~1.5–9.5MB
~76K SLoC