4个版本
| 0.2.3 | 2024年1月31日 |
|---|---|
| 0.2.2 | 2024年1月29日 |
| 0.2.1 | 2024年1月29日 |
| 0.2.0 | 2024年1月29日 |
#755 in 异步
25KB
336 行
Tokiactor
关于 Tokiactor
tokiactor 是基于 tokio 运行时对 actor 模式的精简实现。不涉及 或 System 或 Context 等概念,只有 MessageHandle 和 Actor。
在 tokiactor 中,Actor 是一个包装函数,可以是 sync 或 async。
sync函数将在多个系统线程中执行async函数将在tokio绿色线程中异步执行。
可以利用来自crate futures 的 futures::StreamExt trait进行大批量任务,如处理数千张图片,并行执行。
安装
将 tokiactor 添加到 Cargo.toml
[dependencies]
tokiactor = "*"
tokiactor 需要 tokio 来使事情正常工作。
入门
以下代码将创建 Adder Actor,然后,Actor通过 Handle::handle 方法异步调用。
use tokio;
use tokiactor::*;
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
// create handle, then spawn a closure.
let handle = Handle::new(1).spawn(move |i: i32| i+ 41);
// call actor thru 'handle' method
assert_eq!(handle.handle(1).await, 42);
}
);
或者,我们可以从 async Closure 创建 Actor
use tokio;
use tokiactor::*;
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
let handle = Handle::new(1).spawn_tokio(move |i: i32| async move {i + 41});
assert_eq!(handle.handle(1).await, 42);
}
);
或者从阻塞 fn 创建 Actor,然后在 parallel 中运行 Actor
use tokio;
use tokiactor::*;
use futures::StreamExt;
fn adder_fn(i: i32) -> i32 {
std::thread::sleep(std::time::Duration::from_secs(1));
i+ 41
}
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
let handle = Handle::new(10).spawn_n(10, adder_fn);
let results = futures::stream::iter((0..10))
.map(|i| handle.handle(i))
.buffered(10)
.collect::<Vec<_>>().await;
assert_eq!(results[9], 50)
}
);
Actor启动
有不同的方式启动 Actor
-
启动 sync
Actor-
Handle::spawn:在1个后台线程中创建Actor -
Handle::spawn_n:在fn实现的Clone中创建n个Actor,在n个后台线程中。
-
-
创建异步 async
ActorHandle::spawn_tokio:在后台 tokio 线程中创建Actor,每个异步handle都会在后台创建一个新的 tokio 线程。
请查阅 docs.rs 获取更多信息。
Handle
Handle 可以连接在一起,构建另一种类型的 Handle
use tokio;
use tokiactor::*;
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
let add = Handle::new(1).spawn(move |i: i32| i + 1);
let sub = Handle::new(1).spawn(move |i: i32| i - 1);
let div = Handle::new(1).spawn(move |i: i32| {
match i == 0 {
false => Some(10/i),
true => None
}
});
let mul = Handle::new(1).spawn(move |i: i32| i * 10);
let handle = add.then(sub).then(div).map(mul);
assert_eq!(handle.handle(0).await, None);
assert_eq!(handle.handle(2).await, Some(50));
}
);
Handle 可以同时创建异步和同步的 actor
use tokio;
use tokiactor::*;
let rt = tokio::runtime::Runtime::new().unwrap().block_on(
async move {
// Just a demo, don't do it in real code.
// spawn a fn
let handle = Handle::new(1).spawn(move |i: i32| i + 1);
// spawn an async fn
handle.spawn_tokio(move |i: i32| async move {i * 2});
}
);
请查看 examples/icecream.rs 以了解更复杂的使用案例。
依赖关系
~4–12MB
~111K SLoC