14 个版本
0.5.3 | 2021 年 10 月 31 日 |
---|---|
0.5.2 | 2021 年 6 月 10 日 |
0.5.1 | 2021 年 2 月 18 日 |
0.4.2 | 2019 年 11 月 13 日 |
0.2.0 | 2019 年 7 月 18 日 |
#56 在 异步
237,907 每月下载量
在 404 个 Crates 中使用 (22 个直接使用)
52KB
842 行
pharos
Pharos 的介绍以多种格式提供:视频、维基百科,甚至被许多艺术家如 这幅由 Micheal Turner 绘制的画 所赞誉。
更严肃地说,pharos 是一个小型的 观察者 库,允许您创建观察者可以监听的未来 0.3 流。
我创建它是为了利用我们可以通过使用异步 Stream 和 Sink 从 futures 库中创建的互操作性。因此,您可以使用所有流组合器,将其转发到 Sinks 等。
最小 rustc 版本:1.39。
目录
安全性
这个 crate 当前主要的问题是可观察对象可能会超过观察者。当使用有界通道时,存在背压,这可能允许在到达网络数据包上使用该模式时进行 DDoS 攻击。当使用无界通道时,如果观察者落后,可能会导致过度消耗内存。
待办事项:为了有效地缓解这些问题,我将添加一个环形通道,其中通道将只缓冲一定数量的事件,并在缓冲区满时覆盖最旧的事件,而不是阻塞发送者。
这个 crate 有: #![ forbid( unsafe_code ) ]
,但它的依赖(futures 库)使用了大量不安全代码。
限制
- 目前只支持有界和无界通道作为后端
Events
当前不可克隆(需要我们使用的后端通道的支持,例如广播类型通道)- 性能调整仍需进行
未来工作
请查看待办事项以获取目标。
安装
使用cargo add:cargo add pharos
dependencies:
pharos: ^0.5
使用原始Cargo.toml
[dependencies]
pharos = "0.5"
升级
升级时请查看变更日志。
依赖关系
此crate只有一个依赖。Cargo会自动为您处理。此依赖包含unsafe
代码。
dependencies:
futures: { version: ^0.3, default-features: false }
用法
pharos
仅在异步代码中工作,实现Sink以通知观察者。您可以通过直接调用Sink实现中的poll方法来在poll_*
方法内部通知观察者。在异步上下文中,您可以使用SinkExt::send。观察者必须足够快地消费消息,否则它们将减慢可观察的(有界通道)或导致内存泄漏(无界通道)。
当观察者想要取消订阅时,他们只需丢弃流或在该流上调用close
。如果您是一个可观察的,并且想要通知观察者没有更多消息将跟随,只需丢弃pharos对象。如果失败,创建一个表示EOF的事件类型并将其发送给观察者。
您的事件类型将针对每个观察者克隆一次,因此如果它大于2个指针大小(例如,没有必要将没有数据的枚举放入Arc中),您可能想将其放入Arc中。
当您需要从多个异步任务中通知pharos对象时,可以使用SharedPharos
。此类型允许使用共享引用进行观察和通知,并在内部处理同步。
示例可以在示例目录中找到。以下是最基本的示例
use
{
pharos :: { * } ,
futures :: { executor::block_on, StreamExt, SinkExt } ,
};
// here we put a pharos object on our struct
//
struct Goddess { pharos: Pharos<GoddessEvent> }
impl Goddess
{
fn new() -> Self
{
Self { pharos: Pharos::default() }
}
// Send Goddess sailing so she can tweet about it!
//
pub async fn sail( &mut self )
{
// It's infallible. Observers that error will be dropped, since the only kind of errors on
// channels are when the channel is closed.
//
self.pharos.send( GoddessEvent::Sailing ).await.expect( "notify observers" );
}
}
// Event types need to implement clone, but you can wrap them in Arc if not. Also they will be
// cloned, so if you will have several observers and big event data, putting them in an Arc is
// definitely best. It has no benefit to put a simple dataless enum in an Arc though.
//
#[ derive( Clone, Debug, PartialEq, Copy ) ]
//
enum GoddessEvent
{
Sailing
}
// This is the needed implementation of Observable. We might one day have a derive for this,
// but it's not so interesting, since you always have to point it to your pharos object,
// and when you want to be observable over several types of events, you might want to keep
// pharos in a hashmap over type_id, and a derive would quickly become a mess.
//
impl Observable<GoddessEvent> for Goddess
{
type Error = PharErr;
fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Observe< '_, GoddessEvent, Self::Error >
{
self.pharos.observe( options )
}
}
#[ async_std::main ]
//
async fn main()
{
let mut isis = Goddess::new();
// subscribe, the observe method takes options to let you choose:
// - channel type (bounded/unbounded)
// - a predicate to filter events
//
let mut events = isis.observe( Channel::Bounded( 3 ).into() ).await.expect( "observe" );
// trigger an event
//
isis.sail().await;
// read from stream and let's put on the console what the event looks like.
//
let evt = dbg!( events.next().await.unwrap() );
// After this reads on the event stream will return None.
//
drop( isis );
assert_eq!( GoddessEvent::Sailing, evt );
assert_eq!( None, events.next().await );
}
过滤器
有时您对可观察的可以发出的所有事件类型不感兴趣。一个常见的用例是仅在网络连接上监听关闭事件。观察方法接受选项,允许您设置谓词。对于给定的观察者,您只能设置一个谓词。
use pharos::*;
#[ derive( Clone, Debug, PartialEq, Copy ) ]
//
enum NetworkEvent
{
Open ,
Error ,
Closing ,
Closed ,
}
struct Connection { pharos: Pharos<NetworkEvent> }
impl Observable<NetworkEvent> for Connection
{
type Error = PharErr;
fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Observe< '_, NetworkEvent, Self::Error >
{
self.pharos.observe( options )
}
}
#[ async_std::main ]
//
async fn main()
{
let mut conn = Connection{ pharos: Pharos::default() };
// We will only get close events. Note that here we don't need access to any surrounding variables in
// the closure, so we can use a function pointer which avoids having to box the closure.
//
// Filter also has a variant `Closure` which allows you to pass in a `Box<dyn FnMut(&Event) -> bool + Send>`
// if you need access to surrounding context to make the decision.
//
let filter = Filter::Pointer( |e| e == &NetworkEvent::Closed );
// By creating the config object through into, other options will be defaults, notably here
// this will use unbounded channels.
//
let observer = conn.observe( filter.into() ).await.expect( "observe" );
// Combine both options.
//
let filter = Filter::Pointer( |e| e != &NetworkEvent::Closed );
let opts = ObserveConfig::from( filter ).channel( Channel::Bounded(5) );
// Get everything but close events over a bounded channel with queue size 5.
//
let bounded_observer = conn.observe( opts ).await.expect( "observe" );
}
API
API文档可以在docs.rs上找到。
贡献
请查看贡献指南。
行为准则
任何在公民行为准则第4点“不可接受的行为”中描述的行为都不受欢迎,可能会让您被禁止。如果包括维护者和项目管理员在内的任何人都未能尊重这些/您的限制,您有权对他们提出批评。
许可
依赖关系
~0.8–1.2MB
~21K SLoC