#python #integration #channel #engine #exception #task #error

pime

Rust Python集成变得简单

17个版本

0.0.17 2021年9月19日
0.0.16 2021年8月22日

#526 in 并发

每月 38 次下载

MIT 许可证

31KB
531

Rust Python集成变得简单

这是为什么

PIME是一个Rust crate,它允许基于tokio的Rust程序轻松执行Python代码片段。

PIME基于PyO3Serdeneotasker Python模块。

PIME允许Rust并行执行Python阻塞代码,使用标准的ThreadPoolExecutor和await调用并发futures对象的返回结果。

PIME完全线程安全,目标是使Python集成到Rust尽可能简单。

PIME允许Rust程序具有基于Python的扩展、插件、将Python脚本直接集成到Rust Web服务器等。

它是如何工作的

让我们看看PIME集成到Rust程序内部

Rust线程 Rust线程 Rust线程 Python GIL线程
rust代码 rust代码 rust代码 ThreadPoolExecutor
rust代码 await task1 rust代码 task1
await task2 await task3 rust代码 task2 task3

当Rust协程想要执行Python任务时,它创建一个对象并执行方法。如果执行成功,结果将作为对象返回,否则作为,其中包含Python异常信息或引擎错误。

在Python方面,所有任务都由代理函数处理。该函数有两个参数:commandparams(没有基于关键字的参数,抱歉——当前asyncio循环的

run_in_executor

函数的限制)。代理函数实例使用Python的ThreadPoolExecutor并行启动。

当代理返回结果或引发异常时,这会报告回Rust代码。

通信通过线程安全的mpsc通道进行。

使用示例

准备

为Python安装neotasker模块

pip3 install neotasker

Cargo依赖项

[dependencies]
tokio = { version = "1.4", features = ["full"] }
pyo3 = { version = "0.14.1" }
serde-value = "0.7.0"
pime = "*"

Rust代码

use pyo3::prelude::*;
use serde_value::Value;
use std::collections::BTreeMap;
use std::env;

// create tokio runtime or use #[tokio::main]
// ...............................................
// ...............................................


// init and start PIME
tokio::task::spawn_blocking(move || {
    // omit if auto-prepared
    pyo3::prepare_freethreaded_python();
    Python::with_gil(|py| {
        // as Python has GIL,
        // all work with the Python object MUST be performed in this thread
        // after there is no way to reconfigure it
        let engine = pime::PySyncEngine::new(&py).unwrap();
        // inserts directories into Python's sys.path
        let cwd = env::current_dir().unwrap().to_str().unwrap().to_owned();
        engine.add_import_path(&cwd).unwrap();
        // enables debug mode
        engine.enable_debug().unwrap();
        // sets ThreadPoolExecutor size to min = 10, max = 10
        engine.set_thread_pool_size(10, 10).unwrap();
        let module = py.import("mymod").unwrap();
        let broker = module.getattr("broker").unwrap();
        // Perform additional work, e.g. add Rust functions to Python modules
        // .................................
        // fire and go
        engine.launch(&py, broker).unwrap();
    });
});
// wait engine to be started
pime::wait_online().await;

// Done! Now tasks can be called from any coroutine
// ...............................................
// ...............................................

let mut params = BTreeMap::new();
params.insert("name".to_owned(), Value::String("Rust".to_owned()));
let mut task = pime::PyTask::new(Value::String("hello".to_owned()), params);
// If the task result is not required, the task can be marked to be executed
// forever in ThreadPoolExecutor, until finished. In this case, "call" always
// returns result None
//task.no_wait();
// If a task performs calculations only, it can be marked as exclusive.
// Tasks of this type lock Python thread until completed. Use with care!
//task.mark_exclusive();
match pime::call(task).await {
    Ok(result) => {
        // The result is returned as Option<Value>
        println!("{:?}", result);
    },
    Err(e) if e.kind == pime::ErrorKind::PyException => {
        println!("Exception raised {}: {}", e.exception.unwrap(), e.message);
        println!("{}", e.traceback.unwrap());
    }
    Err(e) => {
        println!("An error is occurred: {}", e.message);
    }
};
// stop the engine gracefully
pime::stop().await;

Python代码(mymod/__init__.py)

def broker(command, params):
    if command == 'hello':
        return f'Hi from Python, {params["name"]}!'
    elif command == 'bye':
        return 'Bye bye'
    else:
        raise RuntimeError('command unsupported')

更多示例

https://github.com/alttch/pime/tree/main/examples/

依赖关系

~8MB
~136K SLoC