4个版本

0.2.3 2024年1月31日
0.2.2 2024年1月29日
0.2.1 2024年1月29日
0.2.0 2024年1月29日

#755 in 异步

MIT 许可证

25KB
336

Tokiactor

关于 Tokiactor

tokiactor 是基于 tokio 运行时对 actor 模式的精简实现。不涉及 SystemContextMessage 等概念,只有 HandleActor

tokiactor 中,Actor 是一个包装函数,可以是 syncasync

  • sync 函数将在多个系统线程中执行
  • async 函数将在 tokio 绿色线程中异步执行。

可以利用来自crate futuresfutures::StreamExt trait进行大批量任务,如处理数千张图片,并行执行。

安装

tokiactor 添加到 Cargo.toml

[dependencies]
tokiactor = "*"

tokiactor 需要 tokio 来使事情正常工作。

入门

以下代码将创建 Adder Actor,然后,Actor通过 Handle::handle 方法异步调用。

use tokio;
use tokiactor::*;

let rt = tokio::runtime::Runtime::new().unwrap().block_on(
	async move {
		// create handle, then spawn a closure.
		let handle = Handle::new(1).spawn(move |i: i32| i+ 41);
		// call actor thru 'handle' method
		assert_eq!(handle.handle(1).await, 42);
	}
);

或者,我们可以从 async Closure 创建 Actor

use tokio;
use tokiactor::*;

let rt = tokio::runtime::Runtime::new().unwrap().block_on(
	async move {
		let handle = Handle::new(1).spawn_tokio(move |i: i32| async move {i + 41});
		assert_eq!(handle.handle(1).await, 42);
	}
);

或者从阻塞 fn 创建 Actor,然后在 parallel 中运行 Actor

use tokio;
use tokiactor::*;
use futures::StreamExt;

fn adder_fn(i: i32) -> i32 {
	std::thread::sleep(std::time::Duration::from_secs(1));
	i+ 41
}

let rt = tokio::runtime::Runtime::new().unwrap().block_on(
	async move {
		let handle = Handle::new(10).spawn_n(10, adder_fn);
		let results = futures::stream::iter((0..10))
			.map(|i| handle.handle(i))
			.buffered(10)
			.collect::<Vec<_>>().await;
		assert_eq!(results[9], 50)
	}
);

Actor启动

有不同的方式启动 Actor

  • 启动 sync Actor

    • Handle::spawn:在 1 个后台线程中创建 Actor

    • Handle::spawn_n:在 fn 实现的 Clone 中创建 nActor,在 n 个后台线程中。

  • 创建异步 async Actor

    • Handle::spawn_tokio:在后台 tokio 线程中创建 Actor,每个异步 handle 都会在后台创建一个新的 tokio 线程。

请查阅 docs.rs 获取更多信息。

Handle

Handle 可以连接在一起,构建另一种类型的 Handle

use tokio;
use tokiactor::*;

let rt = tokio::runtime::Runtime::new().unwrap().block_on(
	async move {
		let add = Handle::new(1).spawn(move |i: i32| i + 1);
		let sub = Handle::new(1).spawn(move |i: i32| i - 1);
		let div = Handle::new(1).spawn(move |i: i32| {
			match i == 0 {
				false => Some(10/i),
				true => None
			}
		});
		let mul = Handle::new(1).spawn(move |i: i32| i * 10);

		let handle = add.then(sub).then(div).map(mul);

		assert_eq!(handle.handle(0).await, None);
		assert_eq!(handle.handle(2).await, Some(50));
	}
);

Handle 可以同时创建异步和同步的 actor

use tokio;
use tokiactor::*;

let rt = tokio::runtime::Runtime::new().unwrap().block_on(
	async move {
		// Just a demo, don't do it in real code.
		// spawn a fn
		let handle = Handle::new(1).spawn(move |i: i32| i + 1);
		// spawn an async fn
		handle.spawn_tokio(move |i: i32| async move {i * 2});
	}
);

请查看 examples/icecream.rs 以了解更复杂的使用案例。

依赖关系

~4–12MB
~111K SLoC