#fibers #mio #context #async #coroutine #http-proxy #http-client

easyfibers

easyfibers 是一个无闭包协程库,旨在尽可能轻松地执行异步任务

5个不稳定版本

使用旧的Rust 2015

0.7.0 2017年7月31日
0.6.2 2017年7月25日
0.6.1 2017年7月25日
0.6.0 2017年7月18日
0.5.0 2017年7月17日

#8 in #fibers

MIT/Apache

170KB
3.5K SLoC

easyfibers

easyfibers 是一个无闭包协程库,旨在尽可能轻松地执行异步任务。它是在 mio 和 context-rs 之上的一层薄薄的封装。

描述

easyfibers 允许你编写类似使用阻塞套接字的代码,无需将你的代码放入尴尬的闭包中。它将在读取、写入和接受函数调用上无缝轮询和调度纤程。

愿景

让 easyfibers 纤程运行协议,让客户端代码在主堆栈上(使用 Fiber::join_main)。

这样

  • 协议实现可以高效且直接地进行。

  • 协议客户端和服务器用户不会遇到堆栈大小问题。

  • 用户可以自由选择他们的代码实现方式,无论是否受到框架限制;无需强制将代码放入回调或闭包中。

警告

每个纤程在自己的堆栈中执行。这些堆栈非常有限,因此必须小心,不要超出限制(否则它将使用 SIGBUS 杀死你的应用程序)。

这种风险值得吗?我认为是的。与其他协程/纤程库相比,它的易用性。

过度使用闭包会使代码难看,产生可怕的编译错误,并使与其他代码的集成变得困难。

文档

TODO

  • 读取、写入、接受上的纤程调度
  • 子纤程
  • 流式响应
  • 文件(使用线程池)
  • join_main 从纤程调用主堆栈,并使用 resume_fiber 恢复它
  • SSL/TLS
  • Fiber::hibernate_for_read 用于保持连接的场景
  • 多个运行者,启用运行多个不同的服务
  • 基于计时器的纤程
  • 异步DNS查找

示例 - 随机 http/1.1 代理

使用3种类型的纤程

  • 接受连接的 TcpListener

  • 接收请求并启动 http 客户端纤程的 TcpStream 服务器

  • 具有两个角色的 TcpStream 客户端

    • 从外部服务创建请求并流式传输响应回父纤程

    • 主堆栈上的请求调用我们的代理

在一个终端上运行下面的示例

cargo test -- --nocapture
extern crate easyfibers;
extern crate rand;
use easyfibers::*;
use mio::net::{TcpStream,TcpListener};
use std::io::{Write,Read};
use std::time::{Duration};
use std::net::{SocketAddr,Ipv4Addr,IpAddr};
use std::str;
use native_tls::{TlsConnector};
use std::io;

#[derive(Clone)]
struct Param {
    chosen: Option<String>,
    is_https: bool,
    proxy_client: bool,
    http_hosts: Vec<String>,
    https_hosts: Vec<String>,
}

#[derive(PartialEq)]
enum Resp<'a> {
    Done,
    Bytes(&'a[u8])
}

// Receive list of hosts.
// Return slices.
fn get_http(mut fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
    // We will read in 500B chunks
    let mut v = [0u8;2000];
    let host = p.chosen.unwrap();

    if p.is_https {
        let connector = TlsConnector::builder().unwrap().build().unwrap();
        fiber.tcp_tls_connect(connector, host.as_str()).unwrap();
        // https requires longer timeout
        fiber.socket_timeout(Some(Duration::from_millis(2000)));
    } else {
        fiber.socket_timeout(Some(Duration::from_millis(1000)));
    };

    // We want to time out so use keep-alive
    let req = format!("GET / HTTP/1.1\r\nHost: {}\r\nConnection: keep-alive\r\nUser-Agent: test\r\n\r\n",host);
    fiber.write(req.as_bytes()).expect("Can not write to socket");
    loop {
        // Whenever socket would normally return WouldBlock, fiber gets executed out and another
        // one takes its place in the background.
        match fiber.read(&mut v[..]) {
            Ok(sz) => {
                // Return slice to parent, directly from our stack!
                fiber.resp_chunk(Resp::Bytes(&v[0..sz]));
            }
            Err(e) => {
                assert_eq!(e.kind(), io::ErrorKind::TimedOut);
                break;
            }
        }
    }
    println!("Client fiber closing {}", p.proxy_client);
    Some(Resp::Done)
}

fn rand_http_proxy(mut fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
    fiber.socket_timeout(Some(Duration::from_millis(500)));

    // Pick a random host from our list.
    let chosen = rand::random::<usize>() % p.http_hosts.len();
    // Pick http or https.
    let port = if rand::random::<u8>() % 2 == 0 { 80 } else { 443 };
    let p1 = if port == 443 {
        Param {
            chosen: Some(p.https_hosts[chosen].clone()),
            is_https: port == 443,
            http_hosts: Vec::new(),
            https_hosts: Vec::new(),
            proxy_client: true,
        }
    } else {
            Param {
            chosen: Some(p.http_hosts[chosen].clone()),
            is_https: port == 443,
            http_hosts: Vec::new(),
            https_hosts: Vec::new(),
            proxy_client: true,
        }
    };
    let addr = if let &Some(ref ch) = &p1.chosen {
            ch.clone()
    } else {
        panic!("")
    };
    fiber.join_resolve_connect(addr.as_str(), SocketType::Tcp, port, Duration::from_millis(3000), get_http, p1).unwrap();
    println!("Returning: {}{}", if port == 443 { "https://" } else { "http://" },  addr);

    // Fibers can stream response to parent. So we iterate on responses.
    // We could also create multiple children and iterate on all of them.
    while let Some(resp) = fiber.get_child() {
        if let Resp::Bytes(slice) = resp {
            println!("Server got {}", slice.len());
            fiber.write(slice);
        }
    }
    println!("Server socket fiber closing");
    // return empty slice, so main stack knows a server connection has closed
    None
}

// Accept sockets in an endless loop.
fn sock_acceptor(fiber: Fiber<Param,Resp>, p: Param) -> Option<Resp> {
    loop {
        // If no sockets available, fiber will be scheduled out for execution until something connects. 
        match fiber.accept_tcp() {
            Ok((sock,_)) => {
                // Create a new fiber on received socket. Use rand_http_proxy function to run it.
                fiber.new_tcp(sock,rand_http_proxy, p.clone()).unwrap();
            }
            _ => {
                println!("Listen socket error");
                break;
            }
        }
    }
    None
}

fn main() {
    // First time calling random requires a large stack, we must initialize it on main stack!
    rand::random::<u8>();
    let p = Param {
        chosen: None,
        is_https: false,
        proxy_client: false,
        http_hosts: vec!["www.liquiddota.com".to_string(),"www.google.com".to_string(),
            "www.sqlite.org".to_string(),"edition.cnn.com".to_string()],
        https_hosts: vec!["www.reddit.com".to_string(), "www.google.com".to_string(),
            "arstechnica.com".to_string(), "news.ycombinator.com".to_string()],
    };
    // Start our poller.
    // Set this stack lower to see some SIGBUS action.
    let poller:Poller = Poller::new(Some(4096*10)).unwrap();
    // Start runner with Param and Resp types.
    let runner:Runner<Param,Resp> = Runner::new().unwrap();
    // Start a TCP listener socket
    let listener = TcpListener::bind(&"127.0.0.1:10000".parse().unwrap()).unwrap();
    // Create a fiber from it. Listener socket will use sock_acceptor function.
    runner.new_listener(listener, sock_acceptor, p).unwrap();
    // Run 20 requests and exit.
    let mut reqs_remain = 20;
    // Start requests. We can directly start a TcpStream because we are not resolving anything.
    // Requests will call our own server.
    for _ in 0..reqs_remain {
        let p = Param {
            chosen: Some("127.0.0.1:10000".to_string()),
            is_https: false,
            proxy_client: false,
            http_hosts: Vec::new(),
            https_hosts: Vec::new(),
        };
        let addr = IpAddr::V4(Ipv4Addr::new(127,0,0,1));
        let client_sock = TcpStream::connect(&SocketAddr::new(addr, 10000)).unwrap();
        runner.new_tcp(client_sock, get_http, p).unwrap();
    }
    while reqs_remain > 0 {
        if !poller.poll(Duration::from_millis(10)) {
            continue;
        }
        if !runner.run() {
            continue;
        }
        while let Some(r) = runner.get_response() {
            if Resp::Done == r {
                reqs_remain -= 1;
                println!("Finished executing, req_remain: {}", reqs_remain);
            } else if let Resp::Bytes(slice) = r {
                println!("Main stack got {} bytes", slice.len());
            }
        }
        while let Some(_) = runner.get_fiber() {
        }
    }
    println!("poll out");
}

依赖项

~1.5–9.5MB
~76K SLoC