4个版本
0.2.3 | 2024年1月31日 |
---|---|
0.2.2 | 2024年1月29日 |
0.2.1 | 2024年1月29日 |
0.2.0 | 2024年1月29日 |
#755 in 异步
25KB
336 行
Tokiactor
关于 Tokiactor
tokiactor 是基于 tokio
运行时对 actor
模式的精简实现。不涉及 或 System
或 Context
等概念,只有 Message
Handle
和 Actor
。
在 tokiactor
中,Actor
是一个包装函数,可以是 sync
或 async
。
sync
函数将在多个系统线程中执行async
函数将在tokio
绿色线程中异步执行。
可以利用来自crate futures
的 futures::StreamExt
trait进行大批量任务,如处理数千张图片,并行执行。
安装
将 tokiactor
添加到 Cargo.toml
[dependencies]
tokiactor = "*"
tokiactor
需要 tokio
来使事情正常工作。
入门
以下代码将创建 Adder
Actor,然后,Actor通过 Handle::handle
方法异步调用。
use tokio;
use tokiactor::*;
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
// create handle, then spawn a closure.
let handle = Handle::new(1).spawn(move |i: i32| i+ 41);
// call actor thru 'handle' method
assert_eq!(handle.handle(1).await, 42);
}
);
或者,我们可以从 async
Closure
创建 Actor
use tokio;
use tokiactor::*;
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
let handle = Handle::new(1).spawn_tokio(move |i: i32| async move {i + 41});
assert_eq!(handle.handle(1).await, 42);
}
);
或者从阻塞 fn
创建 Actor
,然后在 parallel
中运行 Actor
use tokio;
use tokiactor::*;
use futures::StreamExt;
fn adder_fn(i: i32) -> i32 {
std::thread::sleep(std::time::Duration::from_secs(1));
i+ 41
}
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
let handle = Handle::new(10).spawn_n(10, adder_fn);
let results = futures::stream::iter((0..10))
.map(|i| handle.handle(i))
.buffered(10)
.collect::<Vec<_>>().await;
assert_eq!(results[9], 50)
}
);
Actor启动
有不同的方式启动 Actor
-
启动 sync
Actor
-
Handle::spawn
:在1
个后台线程中创建Actor
-
Handle::spawn_n
:在fn
实现的Clone
中创建n
个Actor
,在n
个后台线程中。
-
-
创建异步 async
Actor
Handle::spawn_tokio
:在后台 tokio 线程中创建Actor
,每个异步handle
都会在后台创建一个新的 tokio 线程。
请查阅 docs.rs 获取更多信息。
Handle
Handle
可以连接在一起,构建另一种类型的 Handle
use tokio;
use tokiactor::*;
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
let add = Handle::new(1).spawn(move |i: i32| i + 1);
let sub = Handle::new(1).spawn(move |i: i32| i - 1);
let div = Handle::new(1).spawn(move |i: i32| {
match i == 0 {
false => Some(10/i),
true => None
}
});
let mul = Handle::new(1).spawn(move |i: i32| i * 10);
let handle = add.then(sub).then(div).map(mul);
assert_eq!(handle.handle(0).await, None);
assert_eq!(handle.handle(2).await, Some(50));
}
);
Handle
可以同时创建异步和同步的 actor
use tokio;
use tokiactor::*;
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
// Just a demo, don't do it in real code.
// spawn a fn
let handle = Handle::new(1).spawn(move |i: i32| i + 1);
// spawn an async fn
handle.spawn_tokio(move |i: i32| async move {i * 2});
}
);
请查看 examples/icecream.rs
以了解更复杂的使用案例。
依赖关系
~4–12MB
~111K SLoC