17个版本
0.0.17 | 2021年9月19日 |
---|---|
0.0.16 | 2021年8月22日 |
#526 in 并发
每月 38 次下载
31KB
531 行
Rust Python集成变得简单
这是为什么
PIME是一个Rust crate,它允许基于tokio的Rust程序轻松执行Python代码片段。
PIME基于PyO3、Serde和neotasker Python模块。
PIME允许Rust并行执行Python阻塞代码,使用标准的ThreadPoolExecutor和await调用并发futures对象的返回结果。
PIME完全线程安全,目标是使Python集成到Rust尽可能简单。
PIME允许Rust程序具有基于Python的扩展、插件、将Python脚本直接集成到Rust Web服务器等。
它是如何工作的
让我们看看PIME集成到Rust程序内部
Rust线程 | Rust线程 | Rust线程 | Python GIL线程 |
---|---|---|---|
rust代码 | rust代码 | rust代码 | ThreadPoolExecutor |
rust代码 | await task1 | rust代码 | task1 |
await task2 | await task3 | rust代码 | task2 task3 |
当Rust协程想要执行Python任务时,它创建一个
在Python方面,所有任务都由代理函数处理。该函数有两个参数:command和params(没有基于关键字的参数,抱歉——当前asyncio循环的
run_in_executor
函数的限制)。代理函数实例使用Python的ThreadPoolExecutor并行启动。当代理返回结果或引发异常时,这会报告回Rust代码。
通信通过线程安全的mpsc通道进行。
使用示例
准备
为Python安装neotasker模块
pip3 install neotasker
Cargo依赖项
[dependencies]
tokio = { version = "1.4", features = ["full"] }
pyo3 = { version = "0.14.1" }
serde-value = "0.7.0"
pime = "*"
Rust代码
use pyo3::prelude::*;
use serde_value::Value;
use std::collections::BTreeMap;
use std::env;
// create tokio runtime or use #[tokio::main]
// ...............................................
// ...............................................
// init and start PIME
tokio::task::spawn_blocking(move || {
// omit if auto-prepared
pyo3::prepare_freethreaded_python();
Python::with_gil(|py| {
// as Python has GIL,
// all work with the Python object MUST be performed in this thread
// after there is no way to reconfigure it
let engine = pime::PySyncEngine::new(&py).unwrap();
// inserts directories into Python's sys.path
let cwd = env::current_dir().unwrap().to_str().unwrap().to_owned();
engine.add_import_path(&cwd).unwrap();
// enables debug mode
engine.enable_debug().unwrap();
// sets ThreadPoolExecutor size to min = 10, max = 10
engine.set_thread_pool_size(10, 10).unwrap();
let module = py.import("mymod").unwrap();
let broker = module.getattr("broker").unwrap();
// Perform additional work, e.g. add Rust functions to Python modules
// .................................
// fire and go
engine.launch(&py, broker).unwrap();
});
});
// wait engine to be started
pime::wait_online().await;
// Done! Now tasks can be called from any coroutine
// ...............................................
// ...............................................
let mut params = BTreeMap::new();
params.insert("name".to_owned(), Value::String("Rust".to_owned()));
let mut task = pime::PyTask::new(Value::String("hello".to_owned()), params);
// If the task result is not required, the task can be marked to be executed
// forever in ThreadPoolExecutor, until finished. In this case, "call" always
// returns result None
//task.no_wait();
// If a task performs calculations only, it can be marked as exclusive.
// Tasks of this type lock Python thread until completed. Use with care!
//task.mark_exclusive();
match pime::call(task).await {
Ok(result) => {
// The result is returned as Option<Value>
println!("{:?}", result);
},
Err(e) if e.kind == pime::ErrorKind::PyException => {
println!("Exception raised {}: {}", e.exception.unwrap(), e.message);
println!("{}", e.traceback.unwrap());
}
Err(e) => {
println!("An error is occurred: {}", e.message);
}
};
// stop the engine gracefully
pime::stop().await;
Python代码(mymod/__init__.py)
def broker(command, params):
if command == 'hello':
return f'Hi from Python, {params["name"]}!'
elif command == 'bye':
return 'Bye bye'
else:
raise RuntimeError('command unsupported')
更多示例
依赖关系
~8MB
~136K SLoC