#future #async-io #fiber #thread #async-task #async

fibers

Rust库,用于基于futures和mio执行多个轻量级异步任务(即纤维)

14次发布

使用旧的Rust 2015

0.1.13 2019年11月21日
0.1.12 2018年8月27日
0.1.11 2018年7月21日
0.1.9 2018年1月15日
0.1.1 2016年12月21日

#264 in 异步

Download history 207/week @ 2024-01-12 210/week @ 2024-01-19 193/week @ 2024-01-26 190/week @ 2024-02-02 243/week @ 2024-02-09 285/week @ 2024-02-16 400/week @ 2024-02-23 210/week @ 2024-03-01 178/week @ 2024-03-08 184/week @ 2024-03-15 204/week @ 2024-03-22 219/week @ 2024-03-29 167/week @ 2024-04-05 146/week @ 2024-04-12 149/week @ 2024-04-19 220/week @ 2024-04-26

708 每月下载量
用于 32 个crate(28 个直接使用)

MIT 许可证

130KB
2.5K SLoC

fibers

Documentation Build Status License: MIT

这是一个用于执行多个轻量级异步任务(即纤维)的库。

文档

请注意,fibers 严重使用了 futures 来表示异步任务。如果您不熟悉它,我们建议您在阅读以下内容之前参考 futuresREADME.mdTUTORIAL.md

此库还使用了 mio 来实现高效的异步I/O处理(主要用于网络原语)。然而,其对用户是隐藏的,因此您通常不需要担心它。


Future 是表示异步任务的绝佳方式。它直观,易于与其他futures组合来表示复杂任务,而无需运行时开销。但是,还有一个未解决的问题:“如何有效地执行(可能数量非常庞大的)并发任务?”fibers 是对这个问题的回答。

从概念上讲,fibers 的责任非常简单。它将异步任务(即纤维)表示为一个future实例。并且有一个执行器可以像下面那样接受futures并执行它们。

// Creates an executor.
let mut executor = ThreadPoolExecutor::new().unwrap();

// Spawns fibers (i.e., passes futures to the executor).
executor.spawn(futures::lazy(|| { println!("Hello"); Ok(())} ));
executor.spawn(futures::lazy(|| { println!("World!"); Ok(())} ));

// Executes them.
executor.run().unwrap();

纤维可以在不同的后台线程上运行,但用户不需要注意它。如果它在拥有大量处理器的机器上运行,性能将自然提高。

简单来说,如果一个未来对Async::NotReady响应一个Future::poll方法的调用,那么与该未来关联的纤程将进入“等待”状态。然后,它将被挂起(取消调度),直到未来感兴趣的事件发生(例如,等待目标TCP套接字上数据到达)。最后,如果未来返回Async::Ready响应,纤程将被视为完成,执行器将丢弃纤程。

这个库为以高效异步方式编写程序提供了原语(有关更多详细信息,请参阅netsynciotime模块的文档)。

这个库的主要关注点是“如何执行纤程”。因此,建议使用外部crate(例如,handy_async)来描述“如何表示异步任务”。

安装

首先,将以下行添加到您的Cargo.toml

[dependencies]
fibers = "0.1"
futures = "0.1"  # In practical, `futures` is mandatory to use `fibers`.

接下来,将此添加到您的crate中

extern crate fibers;
extern crate futures;

下一节提供了几个可运行的示例。

示例

以下是如何编写执行异步任务的代码的示例。

其他示例可以在“fibers/examples”目录中找到。您可以执行以下命令来运行一个示例。

$ cargo run --example ${EXAMPLE_NAME}

斐波那契数计算

// See also: "fibers/examples/fibonacci.rs"
extern crate fibers;
extern crate futures;

use fibers::{Spawn, Executor, ThreadPoolExecutor};
use futures::Future;

fn main() {
    // Creates an executor instance.
    let mut executor = ThreadPoolExecutor::new().unwrap();

    // Creates a future which will calculate the fibonacchi number of `10`.
    let input_number = 10;
    let future = fibonacci(input_number, executor.handle());

    // Spawns and executes the future (fiber).
    let monitor = executor.spawn_monitor(future);
    let answer = executor.run_fiber(monitor).unwrap();

    // Checkes the answer.
    assert_eq!(answer, Ok(55));
}

fn fibonacci<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> {
    if n < 2 {
        Box::new(futures::finished(n))
    } else {
        /// Spawns a new fiber per recursive call.
        let f0 = handle.spawn_monitor(fibonacci(n - 1, handle.clone()));
        let f1 = handle.spawn_monitor(fibonacci(n - 2, handle.clone()));
        Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()))
    }
}

TCP Echo服务器和客户端

以下是一个TCP echo服务器示例,监听在地址“127.0.0.1:3000”上

// See also: "fibers/examples/tcp_echo_srv.rs"
extern crate fibers;
extern crate futures;
extern crate handy_async;

use std::io;
use fibers::{Spawn, Executor, ThreadPoolExecutor};
use fibers::net::TcpListener;
use futures::{Future, Stream};
use handy_async::io::{AsyncWrite, ReadFrom};
use handy_async::pattern::AllowPartial;

fn main() {
    let server_addr = "127.0.0.1:3000".parse().expect("Invalid TCP bind address");

    let mut executor = ThreadPoolExecutor::new().expect("Cannot create Executor");
    let handle0 = executor.handle();
    let monitor = executor.spawn_monitor(TcpListener::bind(server_addr)
        .and_then(move |listener| {
            println!("# Start listening: {}: ", server_addr);

            // Creates a stream of incoming TCP client sockets
            listener.incoming().for_each(move |(client, addr)| {
                // New client is connected.
                println!("# CONNECTED: {}", addr);
                let handle1 = handle0.clone();

                // Spawns a fiber to handle the client.
                handle0.spawn(client.and_then(move |client| {
                        // For simplicity, splits reading process and
                        // writing process into differrent fibers.
                        let (reader, writer) = (client.clone(), client);
                        let (tx, rx) = fibers::sync::mpsc::channel();

                        // Spawns a fiber for the writer side.
                        // When a message is arrived in `rx`,
                        // this fiber sends it back to the client.
                        handle1.spawn(rx.map_err(|_| -> io::Error { unreachable!() })
                            .fold(writer, |writer, buf: Vec<u8>| {
                                println!("# SEND: {} bytes", buf.len());
                                writer.async_write_all(buf).map(|(w, _)| w).map_err(|e| e.into_error())
                            })
                            .then(|r| {
                                println!("# Writer finished: {:?}", r);
                                Ok(())
                            }));

                        // The reader side is executed in the current fiber.
                        let stream = vec![0;1024].allow_partial().into_stream(reader);
                        stream.map_err(|e| e.into_error())
                            .fold(tx, |tx, (mut buf, len)| {
                                buf.truncate(len);
                                println!("# RECV: {} bytes", buf.len());

                                // Sends received  to the writer half.
                                tx.send(buf).expect("Cannot send");
                                Ok(tx) as io::Result<_>
                            })
                    })
                    .then(|r| {
                        println!("# Client finished: {:?}", r);
                        Ok(())
                    }));
                Ok(())
            })
        }));
    let result = executor.run_fiber(monitor).expect("Execution failed");
    println!("# Listener finished: {:?}", result);
}

以下是客户端的代码

// See also: "fibers/examples/tcp_echo_cli.rs"
extern crate fibers;
extern crate futures;
extern crate handy_async;

use fibers::{Spawn, Executor, InPlaceExecutor};
use fibers::net::TcpStream;
use futures::{Future, Stream};
use handy_async::io::{AsyncWrite, ReadFrom};
use handy_async::pattern::AllowPartial;

fn main() {
    let server_addr = "127.0.0.1:3000".parse().unwrap();

    // `InPlaceExecutor` is suitable to execute a few fibers.
    // It does not create any background threads,
    // so the overhead to manage fibers is lower than `ThreadPoolExecutor`.
    let mut executor = InPlaceExecutor::new().expect("Cannot create Executor");
    let handle = executor.handle();

    // Spawns a fiber for echo client.
    let monitor = executor.spawn_monitor(TcpStream::connect(server_addr).and_then(move |stream| {
        println!("# CONNECTED: {}", server_addr);
        let (reader, writer) = (stream.clone(), stream);

        // Writer: It sends data read from the standard input stream to the connected server.
        let stdin_stream = vec![0; 256].allow_partial().into_stream(fibers::io::stdin());
        handle.spawn(stdin_stream.map_err(|e| e.into_error())
            .fold(writer, |writer, (mut buf, size)| {
                buf.truncate(size);
                writer.async_write_all(buf).map(|(w, _)| w).map_err(|e| e.into_error())
            })
            .then(|r| {
                println!("# Writer finished: {:?}", r);
                Ok(())
            }));

        // Reader: It outputs data received from the server to the standard output stream.
        let stream = vec![0; 256].allow_partial().into_stream(reader);
        stream.map_err(|e| e.into_error())
            .for_each(|(mut buf, len)| {
                buf.truncate(len);
                println!("{}", String::from_utf8(buf).expect("Invalid UTF-8"));
                Ok(())
            })
    }));

    // Runs until the above fiber is terminated (i.e., The TCP stream is disconnected).
    let result = executor.run_fiber(monitor).expect("Execution failed");
    println!("# Disconnected: {:?}", result);
}

使用fibers的真正示例

以下是一个使用fibers的已知项目的列表

  • erl_dist:Erlang分布协议实现
  • miasht:最小异步HTTP服务器/客户端
  • rustun:STUN(RFC5389)服务器/客户端实现
  • rusturn:TURN(RFC5766)服务器/客户端实现

许可证

此库根据MIT许可证发布。

有关完整的许可证信息,请参阅LICENSE文件。

版权(c)2016 DWANGO Co., Ltd. 版权所有。

依赖关系

~0.8–1.1MB
~18K SLoC