11个版本 (5个重大更改)
0.6.1 | 2021年2月2日 |
---|---|
0.6.0 | 2021年1月19日 |
0.5.0 | 2021年1月11日 |
0.4.0 | 2020年10月24日 |
0.1.0 | 2020年8月24日 |
#17 in #actors
每月下载量50
用于 6 crates
92KB
1.5K SLoC
lifeline-rs
生命线是一个基于消息的依赖注入库。生命线产生的应用程序是
- 干净的:总线实现提供了应用程序的高级概述,并且服务清楚地定义了它们发送和接收的消息。
- 解耦的:服务和任务不依赖于它们的同伴,因为它们只依赖于它们发送和接收的消息类型。
- 可停止的:服务和任务可以轻松取消。例如,当客户端断开连接时,您可以终止与连接关联的所有任务。
- 可搜索的:通过在源代码中搜索类型,可以轻松理解消息的影响/范围。
- 可测试的:生命线应用程序通过消息进行通信,这使得单元测试变得容易。创建总线,启动服务,发送消息,并期望输出消息。
为了实现这些目标,生命线提供模式、特性和实现
- 总线,它构建并分配通道发送器/接收器和资源。
- 载体,它将消息在两个总线实例之间进行转换。载体在构建大型应用程序时至关重要,有助于最小化每个总线上的消息复杂度。
- 服务,它从总线获取通道,并启动发送和接收消息的任务。
- 任务,是一个异步future,当启动时返回生命线。当生命线被丢弃时,future立即取消。
- 资源,一个可以存储在总线中的结构体,当服务启动时可以获取(或克隆)。
为了快速入门,请参阅hello.rs示例。对于全规模应用程序,请参阅tab-rs。
快速入门
生命线可以与tokio和async-std运行时一起使用。默认情况下,生命线使用tokio
。
lifeline = "0.6"
async-std可以通过启用async-std-executor
功能启用。并且可以使用mpsc
实现启用async-std-channels
功能
lifeline = { version = "0.6", default-features = false, features = ["dyn-bus", "async-std-executor", "async-std-channels"] }
Lifeline还支持postage channels,这是一个提供可移植的通道实现集合(与任何执行器兼容)的库。Postage还提供了Stream和Sink组合器(类似于futures StreamExt),这些组合器针对异步通道进行了优化。
Postage旨在替换在lifeline v0.6.0中移除的LifelineSender/LifelineReceiver包装器。
升级
v0.6.0包含几个破坏性更改
- LifelineSender和LifelineReceiver包装器已被移除。这是由于Stream生态系统近期发生变化,以及即将到来的Stream RFC的稳定化。如果您需要Stream/Sink组合器,请查看postage或tokio-stream。
- 屏障通道已被移除。它可以被postage::barrier所替代。
- 订阅通道已被移除。如果您需要它,可以在移除之前找到代码这里。
- 从预导入中移除了Sender和Receiver特质。这样做是为了防止导入lifeline预导入与Sink/Stream特质冲突。您可以按如下方式导入它们:
use lifeline::{Sender, Receiver}
。
总线
总线承载通道和资源。当服务启动时,它们会收到总线的引用。
可以从总线上获取通道。如果通道端点是可复制的,它将可供其他服务使用。如果通道不可复制,未来的调用将收到一个Err
值。Rx/Tx类型参数是类型安全的,如果尝试获取总线不携带的消息类型的通道,将产生编译错误。
总线示例
use lifeline::lifeline_bus;
use lifeline::Message;
use lifeline::Bus;
use myapp::message::MainSend;
use myapp::message::MainRecv;
use tokio::sync::mpsc;
lifeline_bus!(pub struct MainBus);
impl Message<MainBus> for MainSend {
type Channel = mpsc::Sender<Self>;
}
impl Message<MainBus> for MainRecv {
type Channel = mpsc::Sender<Self>;
}
fn use_bus() -> anyhow::Result<()> {
let bus = MainBus::default();
let tx_main = bus.tx::<MainRecv>()?;
tx_main.send(MainRecv {});
Ok(())
}
载体
载体提供在总线间移动消息的方法。载体可以转换、忽略或收集信息,为每个总线提供所需的消息。
大型应用有一个总线树。这是好的,因为它将您的应用分解成小块。
- MainBus
| ConnectionListenerBus
| | ConnectionBus
| DomainSpecificBus
| | ...
载体允许每个总线定义最小表示其服务需要的信息的消息,并防止消息爆炸,这些消息被复制到所有总线。
载体集中了总线间的通信,使得大型应用更容易推理。
载体示例
树中更深层的总线应该为其父级实现FromCarrier
- 更多详情请参阅carrier.rs示例。
let main_bus = MainBus::default();
let connection_listener_bus = ConnectionListenerBus::default();
let _carrier = connection_listener_bus.carry_from(&main_bus)?;
// you can also use the IntoCarrier trait, which has a blanket implementation
let _carrier = main_bus.carry_into(&main_bus)?;
服务
服务同步地从总线获取通道,并启动一个异步任务树(发送和接收消息)。当启动时,服务返回一个或多个Lifeline值。当Lifeline被丢弃时,相关任务立即取消。
通常Service::spawn
会返回一个Result。获取通道端点是可能失败的。这是因为,根据通道类型,端点可能不可复制。Lifeline在可能的情况下会克隆端点(mpsc::Sender
、broadcast::*
和watch::Receiver
)。Lifeline会尽可能早地完成这个操作。
use lifeline::{Service, Task};
pub struct ExampleService {
_greet: Lifeline,
}
impl Service for ExampleService {
type Bus = ExampleBus;
type Lifeline = Lifeline;
fn spawn(bus: &Self::Bus) -> Self::Lifeline {
let mut rx = bus.rx::<ExampleRecv>()?;
let mut tx = bus.tx::<ExampleSend>()?;
let config = bus.resource::<ExampleConfig>()?;
Self::task("greet", async move {
// drive the channels!
})
}
}
任务
任务执行一个 Future
,并在启动时返回一个 Lifeline
值。当生命线释放时,未来任务将被立即取消。
Task
是一个对所有类型都实现的 trait,您可以导入它并在任何类型中使用 Self::task
。在生命线中,它最常用于服务实现。
任务可以是不可靠的
Self::task("greet", async move {
// do something
})
或者,如果您有一个可能出错的任务,您可以使用 anyhow::Result<T>
返回。Anyhow 用于解决类型推断问题。
Self::try_task("greet", async move {
// do something
let something = do_something()?;
Ok(())
})
测试
Lifeline 的一个目标是为提供非常容易测试的接口。Lifeline 运行时在测试中很容易构建。
#[tokio::test]
async fn test() -> anyhow::Result<()> {
// this is zero-cost. channel construction is lazy.
let bus = MainBus::default();
let service = MainService::spawn(&bus)?;
// the service took `bus.rx::<MainRecv>()`
// + `bus.tx::<MainSend>()`
// let's communicate using channels we take.
let tx = bus.tx::<MainRecv>()?;
let rx = bus.rx::<MainSend>()?;
// drop the bus, so that any 'channel closed' errors will occur during our test.
// this would likely happen in practice during the long lifetime of the program
drop(bus);
tx.send(MainRecv::MyMessage)?;
// wait up to 200ms for the message to arrive
// if we remove the 200 at the end, the default is 50ms
lifeline::assert_completes!(async move {
let response = rx.recv().await;
assert_eq!(MainSend::MyResponse, response);
}, 200);
Ok(())
}
详细信息
日志记录
任务(通过 log)在启动、结束或取消时提供调试日志。
如果任务返回一个值,它也会使用 Debug
打印到调试日志中。
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/ok_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] END ExampleService/ok_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/valued_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] END ExampleService/valued_task: MyStruct {}
如果任务被取消(因为其生命线被释放),也会打印出来。
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/cancelled_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] CANCEL ExampleService/cancelled_task
如果使用 Task::try_task
启动任务,则使用 Display
打印 Ok
/Err
值。
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/ok_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] OK ExampleService/ok_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] END ExampleService/ok_task
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] START ExampleService/err_task
2020-08-23 16:45:10,422 ERROR [lifeline::service] ERR: ExampleService/err_task: my error
2020-08-23 16:45:10,422 DEBUG [lifeline::spawn] END ExampleService/err_task
关于自动补全的说明
rust-analyzer
目前不支持在宏中定义的结构体的自动导入。Lifeline 真的需要在宏中定义的结构体,因为它在运行时注入了存储通道的魔法字段。
有一个解决方案:在您的 crate 根目录中定义一个 prelude.rs
文件,该文件导出所有您的总线实现的 pub use
。
pub use lifeline::*;
pub use crate::bus::MainBus;
pub use crate::other::OtherBus;
...
然后在所有您的模块中:use crate::prelude::*
资源
资源可以存储在总线上。这对于配置(例如 MainConfig
)或连接(例如 TcpStream
)非常有用。
资源实现了 Storage
trait,可以使用 impl_storage_clone!
和 impl_storage_take!
宏轻松实现。
use lifeline::{lifeline_bus, impl_storage_clone};
lifeline_bus!(MainBus);
pub struct MainConfig {
pub port: u16
}
impl_storage_clone!(MainConfig);
fn main() {
let bus = MainBus::default()
bus.store_resource::<MainConfig>(MainConfig { port: 12345 });
// from here
}
Lifeline 不提供通道端点的 Resource
实现 - 使用 bus.rx()
和 bus.tx()
。
通道
通道发送者必须实现 Channel
trait 才能在 impl Message
绑定中使用。
在大多数情况下,通道端点仅实现 Storage
,这决定了在 bus.rx()
或 bus.tx()
调用中是 'take' 还是 'clone' 端点。
下面是一个示例实现
use lifeline::Channel;
use crate::{impl_channel_clone, impl_channel_take};
use tokio::sync::{broadcast, mpsc, oneshot, watch};
impl<T: Send + 'static> Channel for mpsc::Sender<T> {
type Tx = Self;
type Rx = mpsc::Receiver<T>;
fn channel(capacity: usize) -> (Self::Tx, Self::Rx) {
mpsc::channel(capacity)
}
fn default_capacity() -> usize {
16
}
}
impl_channel_clone!(mpsc::Sender<T>);
impl_channel_take!(mpsc::Receiver<T>);
广播发送者应实现带有重写 clone_rx
方法的特型,从 Rx
中获取,然后订阅 Tx
。
依赖项
~3–15MB
~195K SLoC