14次发布
使用旧的Rust 2015
0.1.13 | 2019年11月21日 |
---|---|
0.1.12 | 2018年8月27日 |
0.1.11 | 2018年7月21日 |
0.1.9 | 2018年1月15日 |
0.1.1 | 2016年12月21日 |
#264 in 异步
708 每月下载量
用于 32 个crate(28 个直接使用)
130KB
2.5K SLoC
fibers
这是一个用于执行多个轻量级异步任务(即纤维)的库。
请注意,fibers
严重使用了 futures 来表示异步任务。如果您不熟悉它,我们建议您在阅读以下内容之前参考 futures 的 README.md
和 TUTORIAL.md
。
此库还使用了 mio 来实现高效的异步I/O处理(主要用于网络原语)。然而,其对用户是隐藏的,因此您通常不需要担心它。
Future
是表示异步任务的绝佳方式。它直观,易于与其他futures组合来表示复杂任务,而无需运行时开销。但是,还有一个未解决的问题:“如何有效地执行(可能数量非常庞大的)并发任务?”fibers
是对这个问题的回答。
从概念上讲,fibers
的责任非常简单。它将异步任务(即纤维)表示为一个future实例。并且有一个执行器可以像下面那样接受futures并执行它们。
// Creates an executor.
let mut executor = ThreadPoolExecutor::new().unwrap();
// Spawns fibers (i.e., passes futures to the executor).
executor.spawn(futures::lazy(|| { println!("Hello"); Ok(())} ));
executor.spawn(futures::lazy(|| { println!("World!"); Ok(())} ));
// Executes them.
executor.run().unwrap();
纤维可以在不同的后台线程上运行,但用户不需要注意它。如果它在拥有大量处理器的机器上运行,性能将自然提高。
简单来说,如果一个未来对Async::NotReady
响应一个Future::poll
方法的调用,那么与该未来关联的纤程将进入“等待”状态。然后,它将被挂起(取消调度),直到未来感兴趣的事件发生(例如,等待目标TCP套接字上数据到达)。最后,如果未来返回Async::Ready
响应,纤程将被视为完成,执行器将丢弃纤程。
这个库为以高效异步方式编写程序提供了原语(有关更多详细信息,请参阅net、sync、io、time模块的文档)。
这个库的主要关注点是“如何执行纤程”。因此,建议使用外部crate(例如,handy_async)来描述“如何表示异步任务”。
安装
首先,将以下行添加到您的Cargo.toml
中
[dependencies]
fibers = "0.1"
futures = "0.1" # In practical, `futures` is mandatory to use `fibers`.
接下来,将此添加到您的crate中
extern crate fibers;
extern crate futures;
下一节提供了几个可运行的示例。
示例
以下是如何编写执行异步任务的代码的示例。
其他示例可以在“fibers/examples”目录中找到。您可以执行以下命令来运行一个示例。
$ cargo run --example ${EXAMPLE_NAME}
斐波那契数计算
// See also: "fibers/examples/fibonacci.rs"
extern crate fibers;
extern crate futures;
use fibers::{Spawn, Executor, ThreadPoolExecutor};
use futures::Future;
fn main() {
// Creates an executor instance.
let mut executor = ThreadPoolExecutor::new().unwrap();
// Creates a future which will calculate the fibonacchi number of `10`.
let input_number = 10;
let future = fibonacci(input_number, executor.handle());
// Spawns and executes the future (fiber).
let monitor = executor.spawn_monitor(future);
let answer = executor.run_fiber(monitor).unwrap();
// Checkes the answer.
assert_eq!(answer, Ok(55));
}
fn fibonacci<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> {
if n < 2 {
Box::new(futures::finished(n))
} else {
/// Spawns a new fiber per recursive call.
let f0 = handle.spawn_monitor(fibonacci(n - 1, handle.clone()));
let f1 = handle.spawn_monitor(fibonacci(n - 2, handle.clone()));
Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()))
}
}
TCP Echo服务器和客户端
以下是一个TCP echo服务器示例,监听在地址“127.0.0.1:3000”上
// See also: "fibers/examples/tcp_echo_srv.rs"
extern crate fibers;
extern crate futures;
extern crate handy_async;
use std::io;
use fibers::{Spawn, Executor, ThreadPoolExecutor};
use fibers::net::TcpListener;
use futures::{Future, Stream};
use handy_async::io::{AsyncWrite, ReadFrom};
use handy_async::pattern::AllowPartial;
fn main() {
let server_addr = "127.0.0.1:3000".parse().expect("Invalid TCP bind address");
let mut executor = ThreadPoolExecutor::new().expect("Cannot create Executor");
let handle0 = executor.handle();
let monitor = executor.spawn_monitor(TcpListener::bind(server_addr)
.and_then(move |listener| {
println!("# Start listening: {}: ", server_addr);
// Creates a stream of incoming TCP client sockets
listener.incoming().for_each(move |(client, addr)| {
// New client is connected.
println!("# CONNECTED: {}", addr);
let handle1 = handle0.clone();
// Spawns a fiber to handle the client.
handle0.spawn(client.and_then(move |client| {
// For simplicity, splits reading process and
// writing process into differrent fibers.
let (reader, writer) = (client.clone(), client);
let (tx, rx) = fibers::sync::mpsc::channel();
// Spawns a fiber for the writer side.
// When a message is arrived in `rx`,
// this fiber sends it back to the client.
handle1.spawn(rx.map_err(|_| -> io::Error { unreachable!() })
.fold(writer, |writer, buf: Vec<u8>| {
println!("# SEND: {} bytes", buf.len());
writer.async_write_all(buf).map(|(w, _)| w).map_err(|e| e.into_error())
})
.then(|r| {
println!("# Writer finished: {:?}", r);
Ok(())
}));
// The reader side is executed in the current fiber.
let stream = vec![0;1024].allow_partial().into_stream(reader);
stream.map_err(|e| e.into_error())
.fold(tx, |tx, (mut buf, len)| {
buf.truncate(len);
println!("# RECV: {} bytes", buf.len());
// Sends received to the writer half.
tx.send(buf).expect("Cannot send");
Ok(tx) as io::Result<_>
})
})
.then(|r| {
println!("# Client finished: {:?}", r);
Ok(())
}));
Ok(())
})
}));
let result = executor.run_fiber(monitor).expect("Execution failed");
println!("# Listener finished: {:?}", result);
}
以下是客户端的代码
// See also: "fibers/examples/tcp_echo_cli.rs"
extern crate fibers;
extern crate futures;
extern crate handy_async;
use fibers::{Spawn, Executor, InPlaceExecutor};
use fibers::net::TcpStream;
use futures::{Future, Stream};
use handy_async::io::{AsyncWrite, ReadFrom};
use handy_async::pattern::AllowPartial;
fn main() {
let server_addr = "127.0.0.1:3000".parse().unwrap();
// `InPlaceExecutor` is suitable to execute a few fibers.
// It does not create any background threads,
// so the overhead to manage fibers is lower than `ThreadPoolExecutor`.
let mut executor = InPlaceExecutor::new().expect("Cannot create Executor");
let handle = executor.handle();
// Spawns a fiber for echo client.
let monitor = executor.spawn_monitor(TcpStream::connect(server_addr).and_then(move |stream| {
println!("# CONNECTED: {}", server_addr);
let (reader, writer) = (stream.clone(), stream);
// Writer: It sends data read from the standard input stream to the connected server.
let stdin_stream = vec![0; 256].allow_partial().into_stream(fibers::io::stdin());
handle.spawn(stdin_stream.map_err(|e| e.into_error())
.fold(writer, |writer, (mut buf, size)| {
buf.truncate(size);
writer.async_write_all(buf).map(|(w, _)| w).map_err(|e| e.into_error())
})
.then(|r| {
println!("# Writer finished: {:?}", r);
Ok(())
}));
// Reader: It outputs data received from the server to the standard output stream.
let stream = vec![0; 256].allow_partial().into_stream(reader);
stream.map_err(|e| e.into_error())
.for_each(|(mut buf, len)| {
buf.truncate(len);
println!("{}", String::from_utf8(buf).expect("Invalid UTF-8"));
Ok(())
})
}));
// Runs until the above fiber is terminated (i.e., The TCP stream is disconnected).
let result = executor.run_fiber(monitor).expect("Execution failed");
println!("# Disconnected: {:?}", result);
}
使用fibers
的真正示例
以下是一个使用fibers
的已知项目的列表
- erl_dist:Erlang分布协议实现
- miasht:最小异步HTTP服务器/客户端
- rustun:STUN(RFC5389)服务器/客户端实现
- rusturn:TURN(RFC5766)服务器/客户端实现
许可证
此库根据MIT许可证发布。
有关完整的许可证信息,请参阅LICENSE文件。
版权(c)2016 DWANGO Co., Ltd. 版权所有。
依赖关系
~0.8–1.1MB
~18K SLoC