19个版本

0.2.4 2024年7月31日
0.2.3 2024年1月16日
0.2.0 2023年11月10日
0.1.13 2023年7月18日
0.1.9 2023年6月29日

#67异步

Download history 34/week @ 2024-04-27 3/week @ 2024-05-04 10/week @ 2024-05-11 73/week @ 2024-05-18 80/week @ 2024-05-25 57/week @ 2024-06-01 30/week @ 2024-06-08 23/week @ 2024-06-15 76/week @ 2024-06-22 14/week @ 2024-06-29 28/week @ 2024-07-06 54/week @ 2024-07-13 7/week @ 2024-07-20 158/week @ 2024-07-27 32/week @ 2024-08-03 12/week @ 2024-08-10

每月214次下载
3 crate 中使用

MIT/Apache

75KB
842

Service Async

Crates.io

一个类似于tower-service https://docs.rs/tower/latest/tower/trait.Service.html 的 Service 特性,采用纯异步风格

动机:克服Tower服务模型的局限性

Tower框架的 Service 特性,虽然功能强大,但也存在一些挑战

  1. 捕获范围有限:作为用于串行和并行执行的future工厂,Tower的 Service futures无法捕获 &self&mut self。这需要克隆并将所有权移动到future中。

  2. 复杂的Poll风格实现:Tower的 Service 特性定义为poll风格,需要手动管理状态。这通常会导致使用 Box<Pin<...>> 来利用async/await语法的冗长实现。

这些局限性通常会导致以下代码模式

impl<S, Req> tower::Service<Req> for SomeStruct<S>
where
    // ...
{
    type Response = // ...;
    type Error = // ...;
    type Future = Pin<Box<dyn Future<Output = ...> + Send + 'static>>;
    
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }
    
    fn call(&mut self, req: Req) -> Self::Future {
        let client = self.client.clone();
        Box::pin(async move {
            client.get(req).await;
            // ...
        })
    }
}

引入精炼的服务特性

此crate利用 impl Trait 引入一个新的 Service 特性,旨在简化实现并提高性能

  1. 高效借用:通过在返回位置使用 impl Trait,futures现在可以捕获 &self&mut self,消除了不必要的克隆。

  2. 零成本抽象:使用 impl(特质)而非 Box<dyn...>,可以提供更深入的代码内联优化,尤其是对于不跨越 await 点的操作。

这种方法结合了 impl 的强大功能和改进的 Service 特质,以提供灵活性和性能改进。

为了启用与此新设计并行执行,我们提出了两种方法

  1. 共享不可变访问:使用单个 Service 实例与 &self
  2. 独占可变访问:使用 &mut self 并为每个调用创建一个新的 Service 实例。

第一种方法通常更受欢迎,因为对于单次使用场景,通常不需要可变的 Service 实例。

我们改进的 Service 特质定义如下

pub trait Service<Request> {
    /// Responses given by the service.
    type Response;
    /// Errors produced by the service.
    type Error;
    /// Process the request and return the response asynchronously.
    fn call(&self, req: Request) -> impl Future<Output = Result<Self::Response, Self::Error>>;
}

此设计消除了对 poll_ready 函数的需求,因为状态被维护在未来的自身中。

关键差异和优势

与 Tower 的方法相比,我们的 Service 特质代表了范式转变

  • 角色:它作为请求处理器而不是未来工厂。
  • 状态管理:可变状态需要显式同步原语,如 MutexRefCell
  • 资源效率:我们的方法维护引用关系,仅在需要可变性时才产生成本,而 Tower 的共享所有权模型中,每个共享都与关联的成本相关。

此改进的 Service 特质为在 Rust 中构建异步服务提供了一种更直观、高效和灵活的方法。

MakeService

MakeService 特质提供了一种灵活的方式来构建服务链,同时允许从先前实例迁移状态。这在服务管理状态资源(如连接池)时特别有用,您需要使用新配置更新服务链,同时保留现有资源。

关键特性

  • make_via_ref 方法允许创建新的服务,同时可选地引用现有的一个。
  • 在服务实例之间启用状态保留和资源重用。
  • make 方法提供了一种方便的方式来创建服务,而无需现有的引用。

示例用法

struct SvcA {
    pass_flag: bool,
    not_pass_flag: bool,
}

struct InitFlag(bool);

struct SvcAFactory {
    init_flag: InitFlag,
}

impl MakeService for SvcAFactory {
    type Service = SvcA;
    type Error = Infallible;

    fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
        Ok(match old {
            // SvcAFactory can access state from the older service
            // which was created. 
            Some(r) => SvcA {
                pass_flag: r.pass_flag,
                not_pass_flag: self.init_flag.0,
            },
            // There was no older service, so create SvcA from
            // service factory config.
            None => SvcA {
                pass_flag: self.init_flag.0,
                not_pass_flag: self.init_flag.0,
            },
        })
    }
}

此方法允许对服务链进行有效更新,在重新配置服务时保留宝贵的资源。

服务工厂和组合

服务工厂

在复杂系统中,创建和管理服务通常需要比简单构造函数更多的灵活性。这就是服务工厂概念发挥作用的地方。服务工厂负责创建服务的实例,可能具有复杂的初始化逻辑或状态管理。

MakeService 特质

MakeService 特质是我们服务工厂系统的基石。它提供了一种灵活的方式来构建服务链,同时允许从先前实例迁移状态。这在服务管理状态资源(如连接池)时特别有用,您需要使用新配置更新服务链,同时保留现有资源。

MakeService 的关键特性

  • make_via_ref 方法允许创建新的服务,同时可选地引用现有的一个。
  • 在服务实例之间启用状态保留和资源重用。
  • make 方法提供了一种方便的方式来创建服务,而无需现有的引用。

示例用法

struct SvcA {
    pass_flag: bool,
    not_pass_flag: bool,
}

struct InitFlag(bool);

struct SvcAFactory {
    init_flag: InitFlag,
}

impl MakeService for SvcAFactory {
    type Service = SvcA;
    type Error = Infallible;

    fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
        Ok(match old {
            // SvcAFactory can access state from the older service
            // which was created. 
            Some(r) => SvcA {
                pass_flag: r.pass_flag,
                not_pass_flag: self.init_flag.0,
            },
            // There was no older service, so create SvcA from
            // service factory config.
            None => SvcA {
                pass_flag: self.init_flag.0,
                not_pass_flag: self.init_flag.0,
            },
        })
    }
}

此方法允许对服务链进行有效更新,在重新配置服务时保留宝贵的资源。

FactoryLayer

为了启用更复杂的服务组合,我们引入了FactoryLayer的概念。FactoryLayer是一个特质,用于定义如何将一个工厂包装在另一个工厂中,创建一个新的复合工厂。这允许创建可重用、模块化的功能部件,可以轻松组合。

为了简化链式组装,工厂可以定义一个layer函数来创建一个工厂包装器。这个概念类似于Tower框架的Layer,但有一个关键的区别

  1. Tower的Layer:创建一个包装内部ServiceService
  2. 我们的layer:创建一个包装内部FactoryFactory,然后可以用来创建整个Service链。

FactoryStack

FactoryStack是一个强大的抽象,允许创建复杂的服务链。它管理一个服务工厂栈,提供将新层推入栈中以及从组装的栈中创建服务的方法。

FactoryStack通过将多个FactoryLayer组合在一起来工作。栈中的每一层都会包装其下方的层,创建一个嵌套的工厂结构。当你在FactoryStack上调用makemake_async时,它会从最外层遍历到最内层,创建完整的服务链。

这种方法允许用户通过直观地将多个工厂层链接在一起来创建复杂的服务工厂。每一层都可以添加自己的功能,修改内部层的行为,甚至完全转换服务链。

使用FactoryStack创建服务链

  1. 从初始化你的配置的FactoryStack开始。
  2. 使用push方法将层添加到栈中。
  3. 每一层都可以修改或增强内部层的功能。
  4. 最后,调用makemake_async来创建完整的服务链。

这个系统提供了一种强大而灵活的方式来构建和更新服务链,同时高效地管理状态和资源。它允许模块化和可重用的功能部件,易于重新配置服务链,并在服务逻辑的不同部分之间清晰地分离关注点。

将所有内容整合在一起

此示例演示了MakeServiceFactoryLayerFactoryStack概念的实用应用。它定义了几个服务(SvcASvcB)及其相应的工厂。然后使用FactoryStack以分层的方式组合这些服务。Config结构体提供初始配置,该配置通过层传递。最后,在main函数中创建一个服务栈,结合SvcAFactorySvcBFactory。然后多次调用生成的服务,展示了服务链如何处理请求并维护状态。

为了更全面地展示这些概念及其高级用法,鼓励读者检查项目示例目录中的demo.rs文件。

use std::{
    convert::Infallible,
    sync::atomic::{AtomicUsize, Ordering},
};

use service_async::{
    layer::{layer_fn, FactoryLayer},
    stack::FactoryStack,
    AsyncMakeService, BoxedMakeService, BoxedService, MakeService, Param, Service,
};

#[cfg(unix)]
use monoio::main as main_macro;
#[cfg(not(unix))]
use tokio::main as main_macro;

// ===== Svc*(impl Service) and Svc*Factory(impl NewService) =====

struct SvcA {
    pass_flag: bool,
    not_pass_flag: bool,
}

// Implement Service trait for SvcA
impl Service<()> for SvcA {
    type Response = ();
    type Error = Infallible;

    async fn call(&self, _req: ()) -> Result<Self::Response, Self::Error> {
        println!(
            "SvcA called! pass_flag = {}, not_pass_flag = {}",
            self.pass_flag, self.not_pass_flag
        );
        Ok(())
    }
}

struct SvcAFactory {
    init_flag: InitFlag,
}

struct InitFlag(bool);

impl MakeService for SvcAFactory {
    type Service = SvcA;
    type Error = Infallible;

    fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
        Ok(match old {
            // SvcAFactory can access state from the older service
            // which was created. 
            Some(r) => SvcA {
                pass_flag: r.pass_flag,
                not_pass_flag: self.init_flag.0,
            },
            // There was no older service, so create SvcA from
            // service factory config.
            None => SvcA {
                pass_flag: self.init_flag.0,
                not_pass_flag: self.init_flag.0,
            },
        })
    }
}

struct SvcB<T> {
    counter: AtomicUsize,
    inner: T,
}

impl<T> Service<usize> for SvcB<T>
where
    T: Service<(), Error = Infallible>,
{
    type Response = ();
    type Error = Infallible;

    async fn call(&self, req: usize) -> Result<Self::Response, Self::Error> {
        let old = self.counter.fetch_add(req, Ordering::AcqRel);
        let new = old + req;
        println!("SvcB called! {old}->{new}");
        self.inner.call(()).await?;
        Ok(())
    }
}

struct SvcBFactory<T>(T);

impl<T> MakeService for SvcBFactory<T>
where
    T: MakeService<Error = Infallible>,
{
    type Service = SvcB<T::Service>;
    type Error = Infallible;

    fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
        Ok(match old {
            Some(r) => SvcB {
                counter: r.counter.load(Ordering::Acquire).into(),
                inner: self.0.make_via_ref(Some(&r.inner))?,
            },
            None => SvcB {
                counter: 0.into(),
                inner: self.0.make()?,
            },
        })
    }
}

// ===== impl layer fn for Factory instead of defining manually =====

impl SvcAFactory {
    fn layer<C>() -> impl FactoryLayer<C, (), Factory = Self>
    where
        C: Param<InitFlag>,
    {
        layer_fn(|c: &C, ()| SvcAFactory {
            init_flag: c.param(),
        })
    }
}

impl<T> SvcBFactory<T> {
    fn layer<C>() -> impl FactoryLayer<C, T, Factory = Self> {
        layer_fn(|_: &C, inner| SvcBFactory(inner))
    }
}


// ===== Define Config and impl Param<T> for it =====
#[derive(Clone, Copy)]
struct Config {
    init_flag: bool,
}

impl Param<InitFlag> for Config {
    fn param(&self) -> InitFlag {
        InitFlag(self.init_flag)
    }
}

#[main_macro]
async fn main() {
    let config = Config { init_flag: false };
    let stack = FactoryStack::new(config)
        .push(SvcAFactory::layer())
        .push(SvcBFactory::layer());

    let svc = stack.make_async().await.unwrap();
    svc.call(1).await.unwrap();
    svc.call(2).await.unwrap();
    svc.call(3).await.unwrap();
}

依赖项