#async-channel #async #actor #send-receive #send-message #async-task

lifeline

生命线是一个用于异步基于消息的应用程序的依赖注入库

11个版本 (5个重大更改)

0.6.1 2021年2月2日
0.6.0 2021年1月19日
0.5.0 2021年1月11日
0.4.0 2020年10月24日
0.1.0 2020年8月24日

#17 in #actors

每月下载量50
用于 6 crates

MIT许可证

92KB
1.5K SLoC

lifeline-rs

生命线是一个基于消息的依赖注入库。生命线产生的应用程序是

  • 干净的:总线实现提供了应用程序的高级概述,并且服务清楚地定义了它们发送和接收的消息。
  • 解耦的:服务和任务不依赖于它们的同伴,因为它们只依赖于它们发送和接收的消息类型。
  • 可停止的:服务和任务可以轻松取消。例如,当客户端断开连接时,您可以终止与连接关联的所有任务。
  • 可搜索的:通过在源代码中搜索类型,可以轻松理解消息的影响/范围。
  • 可测试的:生命线应用程序通过消息进行通信,这使得单元测试变得容易。创建总线,启动服务,发送消息,并期望输出消息。

为了实现这些目标,生命线提供模式、特性和实现

  • 总线,它构建并分配通道发送器/接收器和资源。
  • 载体,它将消息在两个总线实例之间进行转换。载体在构建大型应用程序时至关重要,有助于最小化每个总线上的消息复杂度。
  • 服务,它从总线获取通道,并启动发送和接收消息的任务。
  • 任务,是一个异步future,当启动时返回生命线。当生命线被丢弃时,future立即取消。
  • 资源,一个可以存储在总线中的结构体,当服务启动时可以获取(或克隆)。

为了快速入门,请参阅hello.rs示例。对于全规模应用程序,请参阅tab-rs。

快速入门

生命线可以与tokioasync-std运行时一起使用。默认情况下,生命线使用tokio

lifeline = "0.6"

async-std可以通过启用async-std-executor功能启用。并且可以使用mpsc实现启用async-std-channels功能

lifeline = { version = "0.6", default-features = false, features = ["dyn-bus", "async-std-executor", "async-std-channels"] }

Lifeline还支持postage channels,这是一个提供可移植的通道实现集合(与任何执行器兼容)的库。Postage还提供了Stream和Sink组合器(类似于futures StreamExt),这些组合器针对异步通道进行了优化。
Postage旨在替换在lifeline v0.6.0中移除的LifelineSender/LifelineReceiver包装器。

升级

v0.6.0包含几个破坏性更改

  • LifelineSender和LifelineReceiver包装器已被移除。这是由于Stream生态系统近期发生变化,以及即将到来的Stream RFC的稳定化。如果您需要Stream/Sink组合器,请查看postagetokio-stream
  • 屏障通道已被移除。它可以被postage::barrier所替代。
  • 订阅通道已被移除。如果您需要它,可以在移除之前找到代码这里
  • 从预导入中移除了Sender和Receiver特质。这样做是为了防止导入lifeline预导入与Sink/Stream特质冲突。您可以按如下方式导入它们:use lifeline::{Sender, Receiver}

总线

总线承载通道和资源。当服务启动时,它们会收到总线的引用。

可以从总线上获取通道。如果通道端点是可复制的,它将可供其他服务使用。如果通道不可复制,未来的调用将收到一个Err值。Rx/Tx类型参数是类型安全的,如果尝试获取总线不携带的消息类型的通道,将产生编译错误。

总线示例

use lifeline::lifeline_bus;
use lifeline::Message;
use lifeline::Bus;
use myapp::message::MainSend;
use myapp::message::MainRecv;
use tokio::sync::mpsc;

lifeline_bus!(pub struct MainBus);

impl Message<MainBus> for MainSend {
    type Channel = mpsc::Sender<Self>;
}

impl Message<MainBus> for MainRecv {
    type Channel = mpsc::Sender<Self>;
}

fn use_bus() -> anyhow::Result<()> {
    let bus = MainBus::default();

    let tx_main = bus.tx::<MainRecv>()?;
    tx_main.send(MainRecv {});

    Ok(())
}

载体

载体提供在总线间移动消息的方法。载体可以转换、忽略或收集信息,为每个总线提供所需的消息。

大型应用有一个总线树。这是好的,因为它将您的应用分解成小块。

- MainBus
  | ConnectionListenerBus
  |  | ConnectionBus
  | DomainSpecificBus
  |  | ...

载体允许每个总线定义最小表示其服务需要的信息的消息,并防止消息爆炸,这些消息被复制到所有总线。

载体集中了总线间的通信,使得大型应用更容易推理。

载体示例

树中更深层的总线应该为其父级实现FromCarrier - 更多详情请参阅carrier.rs示例

let main_bus = MainBus::default();
let connection_listener_bus = ConnectionListenerBus::default();
let _carrier = connection_listener_bus.carry_from(&main_bus)?;
// you can also use the IntoCarrier trait, which has a blanket implementation
let _carrier = main_bus.carry_into(&main_bus)?;

服务

服务同步地从总线获取通道,并启动一个异步任务树(发送和接收消息)。当启动时,服务返回一个或多个Lifeline值。当Lifeline被丢弃时,相关任务立即取消。

通常Service::spawn会返回一个Result。获取通道端点是可能失败的。这是因为,根据通道类型,端点可能不可复制。Lifeline在可能的情况下会克隆端点(mpsc::Senderbroadcast::*watch::Receiver)。Lifeline会尽可能早地完成这个操作。

use lifeline::{Service, Task};
pub struct ExampleService {
    _greet: Lifeline,
}

impl Service for ExampleService {
    type Bus = ExampleBus;
    type Lifeline = Lifeline;

    fn spawn(bus: &Self::Bus) -> Self::Lifeline {
        let mut rx = bus.rx::<ExampleRecv>()?;
        let mut tx = bus.tx::<ExampleSend>()?;
        let config = bus.resource::<ExampleConfig>()?;

        Self::task("greet", async move {
            // drive the channels!
        })
    }
}

任务

任务执行一个 Future,并在启动时返回一个 Lifeline 值。当生命线释放时,未来任务将被立即取消。

Task 是一个对所有类型都实现的 trait,您可以导入它并在任何类型中使用 Self::task。在生命线中,它最常用于服务实现。

任务可以是不可靠的

Self::task("greet", async move {
    // do something
})

或者,如果您有一个可能出错的任务,您可以使用 anyhow::Result<T> 返回。Anyhow 用于解决类型推断问题。

Self::try_task("greet", async move {
    // do something
    let something = do_something()?;
    Ok(())
})

测试

Lifeline 的一个目标是为提供非常容易测试的接口。Lifeline 运行时在测试中很容易构建。

#[tokio::test]
async fn test() -> anyhow::Result<()> {
    // this is zero-cost.  channel construction is lazy.
    let bus = MainBus::default();
    let service = MainService::spawn(&bus)?;

    // the service took `bus.rx::<MainRecv>()`
    //                + `bus.tx::<MainSend>()`
    // let's communicate using channels we take.
    let tx = bus.tx::<MainRecv>()?;
    let rx = bus.rx::<MainSend>()?;

    // drop the bus, so that any 'channel closed' errors will occur during our test.
    // this would likely happen in practice during the long lifetime of the program
    drop(bus);

    tx.send(MainRecv::MyMessage)?;

    // wait up to 200ms for the message to arrive
    // if we remove the 200 at the end, the default is 50ms
    lifeline::assert_completes!(async move {
        let response = rx.recv().await;
        assert_eq!(MainSend::MyResponse, response);
    }, 200);

    Ok(())
}

详细信息

日志记录

任务(通过 log)在启动、结束或取消时提供调试日志。

如果任务返回一个值,它也会使用 Debug 打印到调试日志中。

2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/ok_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] END ExampleService/ok_task

2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/valued_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] END ExampleService/valued_task: MyStruct {}

如果任务被取消(因为其生命线被释放),也会打印出来。

2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/cancelled_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] CANCEL ExampleService/cancelled_task

如果使用 Task::try_task 启动任务,则使用 Display 打印 Ok/Err 值。

2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/ok_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] OK ExampleService/ok_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] END ExampleService/ok_task

2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/err_task
2020-08-23 16:45:10,422 ERROR [lifeline::service] ERR: ExampleService/err_task: my error
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] END ExampleService/err_task

关于自动补全的说明

rust-analyzer 目前不支持在宏中定义的结构体的自动导入。Lifeline 真的需要在宏中定义的结构体,因为它在运行时注入了存储通道的魔法字段。

有一个解决方案:在您的 crate 根目录中定义一个 prelude.rs 文件,该文件导出所有您的总线实现的 pub use

pub use lifeline::*;
pub use crate::bus::MainBus;
pub use crate::other::OtherBus;
...

然后在所有您的模块中:use crate::prelude::*

资源

资源可以存储在总线上。这对于配置(例如 MainConfig)或连接(例如 TcpStream)非常有用。

资源实现了 Storage trait,可以使用 impl_storage_clone!impl_storage_take! 宏轻松实现。

use lifeline::{lifeline_bus, impl_storage_clone};
lifeline_bus!(MainBus);
pub struct MainConfig {
    pub port: u16
}

impl_storage_clone!(MainConfig);

fn main() {
    let bus = MainBus::default()
    bus.store_resource::<MainConfig>(MainConfig { port: 12345 });
    // from here
}

Lifeline 不提供通道端点的 Resource 实现 - 使用 bus.rx()bus.tx()

通道

通道发送者必须实现 Channel trait 才能在 impl Message 绑定中使用。

在大多数情况下,通道端点仅实现 Storage,这决定了在 bus.rx()bus.tx() 调用中是 'take' 还是 'clone' 端点。

下面是一个示例实现

use lifeline::Channel;
use crate::{impl_channel_clone, impl_channel_take};
use tokio::sync::{broadcast, mpsc, oneshot, watch};

impl<T: Send + 'static> Channel for mpsc::Sender<T> {
    type Tx = Self;
    type Rx = mpsc::Receiver<T>;

    fn channel(capacity: usize) -> (Self::Tx, Self::Rx) {
        mpsc::channel(capacity)
    }

    fn default_capacity() -> usize {
        16
    }
}

impl_channel_clone!(mpsc::Sender<T>);
impl_channel_take!(mpsc::Receiver<T>);

广播发送者应实现带有重写 clone_rx 方法的特型,从 Rx 中获取,然后订阅 Tx

依赖项

~3–15MB
~195K SLoC