#tokio #worker-thread #async #utility #request-response #rust-patterns

tenorite

并发抽象库。为异步工作提供客户端/服务器模型

3个版本

0.1.2 2022年5月18日
0.1.1 2022年4月9日
0.1.0 2022年4月8日

793并发

Download history

1,528 每月下载量

MIT 许可证

14KB
147

Tenorite

Tenorite旨在简化使用Rust构建并发系统。通过在Rust和Tokio提供的坚实基础之上构建简单抽象,Tenorite帮助构建异步工作,这些工作可以使用客户端/服务器模型从其他线程服务请求。

示例仓库

查看示例仓库,了解如何构建和使用TenoriteService

服务设计

Tenorite服务通过将自定义类型馈入Tenorite核心组件的泛型类型参数来创建。一般设计流程包括构建4个数据类型和一个工作进程,然后通过实现TenoriteSerivce特质将它们绑定在一起,形成一个易于使用的服务。

Service
  - Request
  - Response
  - Error
  - Worker
  - Config

请求、响应、错误和配置

这些结构为每个服务定义。所有这些类型都必须满足Send + 'static'特质界限。RequestResponseError还必须实现Clone特质,以便TenoriteCaller可以被克隆以共享对服务的引用。

以下是基于示例仓库的这些类型示例集

#[derive(Debug, Clone)]
pub enum ExampleRequest {
    Set { key: String, value: String },
    Get { key: String },
    Delete { key: String },
}

#[derive(Debug, Clone)]
pub enum ExampleResponse {
    EmptyResponse,
    StringResponse(String),
}

#[derive(Debug, Clone, thiserror::Error)]
pub enum ExampleError {
    #[error("Invalid key!")]
    InvalidKey(String),
    #[error("Unexpected error!")]
    Unexpected,
}

pub struct ExampleConfig {
    pub data: std::collections::HashMap<String, String>,
}

工作进程类型

TenoriteWorker特质被满足,以提供将在其自己的tokio任务中运行的服务的/工作进程实现。在示例项目中,它完全实现了“HashMap-as-a-Service”工作进程

pub struct ExampleWorker {}

#[async_trait]
impl TenoriteWorker<ExampleRequest, ExampleResponse, ExampleError,
                    ExampleConfig>
    for ExampleWorker
{
    async fn task(
        mut receiver: tenorite::Receiver<
            TenoriteRequest<ExampleRequest, ExampleResponse, ExampleError>,
        >,
        mut config: ExampleConfig,
    ) {
        while let Some(request) = receiver.recv().await {
            println!("[ExampleTask] Received Request: {:?}",
                     request.request);

            use ExampleRequest::*;
            use ExampleResponse::*;
            let response = match request.request {
                Set { key, value } => {
                    config.data.insert(key, value);
                    Ok(EmptyResponse)
                }
                Get { key } => match config.data.get(&key) {
                    Some(value) => Ok(StringResponse(value.to_string())),
                    None => Err(ExampleError::InvalidKey(key)),
                },
                Delete { key } => match config.data.remove(&key) {
                    Some(_) => Ok(EmptyResponse),
                    None => Err(ExampleError::InvalidKey(key))
                }
            };

            match request.client.send(response) {
                Err(_result) => {
                    panic!("Error!!!!!")
                }
                _ => {}
            }
        }
    }
}

服务类型

Service的构建不需要太多努力,只需包含所需类型即可!

pub struct ExampleService {}

impl TenoriteService<ExampleRequest, ExampleResponse, ExampleError,
                     ExampleWorker, ExampleConfig>
    for ExampleService
{
}

服务使用

使用这种模式构建的服务甚至比构建服务还要简单!实例化自定义ServiceConfig结构体实例,然后启动工作进程任务,提供一个队列大小(这里为32)和Config结构体。此函数返回工作线程的JoinHandle和一个用于向工作进程发出请求的TenoriteCaller结构体。此调用句柄可以克隆以与其他线程共享。

使用

let service = ExampleService {};
let config = ExampleConfig {
    data: HashMap::new(),
};
let (task, caller) = service.start_task(32, config);

调用模式简单,模仿在Rust中调用普通异步函数的方式。在这种情况下,一个接受单个 ExampleRequest 参数并返回一个 Result<ExampleResponse, TenoriteError> 枚举值的异步函数。如果服务抛出 ExampleError 结构,错误将在 ServiceError(Error) 变体中返回。

let key = "test".to_string();
let value = "weeee".to_string();
let request = ExampleRequest::Set { key, value };
let response = caller.send_request(request).await;

最后,当所有 caller 处理程序超出作用域并释放时,Worker 线程将终止。

task.await;

依赖项

~2.3–4MB
~64K SLoC