14 个版本

0.5.3 2021 年 10 月 31 日
0.5.2 2021 年 6 月 10 日
0.5.1 2021 年 2 月 18 日
0.4.2 2019 年 11 月 13 日
0.2.0 2019 年 7 月 18 日

#56异步

Download history 49589/week @ 2024-03-14 50810/week @ 2024-03-21 45512/week @ 2024-03-28 48137/week @ 2024-04-04 46261/week @ 2024-04-11 54672/week @ 2024-04-18 52287/week @ 2024-04-25 49884/week @ 2024-05-02 52922/week @ 2024-05-09 55009/week @ 2024-05-16 55583/week @ 2024-05-23 57473/week @ 2024-05-30 57318/week @ 2024-06-06 59771/week @ 2024-06-13 61278/week @ 2024-06-20 48923/week @ 2024-06-27

237,907 每月下载量
404 个 Crates 中使用 (22 个直接使用)

无许可

52KB
842

pharos

standard-readme compliant Build Status Docs Crates.io Crates.io downloads

Pharos 的介绍以多种格式提供:视频维基百科,甚至被许多艺术家如 这幅由 Micheal Turner 绘制的画 所赞誉。

更严肃地说,pharos 是一个小型的 观察者 库,允许您创建观察者可以监听的未来 0.3 流。

我创建它是为了利用我们可以通过使用异步 StreamSink 从 futures 库中创建的互操作性。因此,您可以使用所有流组合器,将其转发到 Sinks 等。

最小 rustc 版本:1.39。

目录

安全性

这个 crate 当前主要的问题是可观察对象可能会超过观察者。当使用有界通道时,存在背压,这可能允许在到达网络数据包上使用该模式时进行 DDoS 攻击。当使用无界通道时,如果观察者落后,可能会导致过度消耗内存。

待办事项:为了有效地缓解这些问题,我将添加一个环形通道,其中通道将只缓冲一定数量的事件,并在缓冲区满时覆盖最旧的事件,而不是阻塞发送者。

这个 crate 有: #![ forbid( unsafe_code ) ],但它的依赖(futures 库)使用了大量不安全代码。

限制

  • 目前只支持有界和无界通道作为后端
  • Events 当前不可克隆(需要我们使用的后端通道的支持,例如广播类型通道)
  • 性能调整仍需进行

未来工作

请查看待办事项以获取目标。

安装

使用cargo addcargo add pharos

使用cargo yaml

dependencies:

  pharos: ^0.5

使用原始Cargo.toml

[dependencies]

   pharos = "0.5"

升级

升级时请查看变更日志

依赖关系

此crate只有一个依赖。Cargo会自动为您处理。此依赖包含unsafe代码。

dependencies:

  futures: { version: ^0.3, default-features: false }

用法

pharos仅在异步代码中工作,实现Sink以通知观察者。您可以通过直接调用Sink实现中的poll方法来在poll_*方法内部通知观察者。在异步上下文中,您可以使用SinkExt::send。观察者必须足够快地消费消息,否则它们将减慢可观察的(有界通道)或导致内存泄漏(无界通道)。

当观察者想要取消订阅时,他们只需丢弃流或在该流上调用close。如果您是一个可观察的,并且想要通知观察者没有更多消息将跟随,只需丢弃pharos对象。如果失败,创建一个表示EOF的事件类型并将其发送给观察者。

您的事件类型将针对每个观察者克隆一次,因此如果它大于2个指针大小(例如,没有必要将没有数据的枚举放入Arc中),您可能想将其放入Arc中。

当您需要从多个异步任务中通知pharos对象时,可以使用SharedPharos。此类型允许使用共享引用进行观察和通知,并在内部处理同步。

示例可以在示例目录中找到。以下是最基本的示例

use
{
   pharos  :: { *                                      } ,
   futures :: { executor::block_on, StreamExt, SinkExt } ,
};


// here we put a pharos object on our struct
//
struct Goddess { pharos: Pharos<GoddessEvent> }


impl Goddess
{
   fn new() -> Self
   {
      Self { pharos: Pharos::default() }
   }

   // Send Goddess sailing so she can tweet about it!
   //
   pub async fn sail( &mut self )
   {
      // It's infallible. Observers that error will be dropped, since the only kind of errors on
      // channels are when the channel is closed.
      //
      self.pharos.send( GoddessEvent::Sailing ).await.expect( "notify observers" );
   }
}


// Event types need to implement clone, but you can wrap them in Arc if not. Also they will be
// cloned, so if you will have several observers and big event data, putting them in an Arc is
// definitely best. It has no benefit to put a simple dataless enum in an Arc though.
//
#[ derive( Clone, Debug, PartialEq, Copy ) ]
//
enum GoddessEvent
{
   Sailing
}


// This is the needed implementation of Observable. We might one day have a derive for this,
// but it's not so interesting, since you always have to point it to your pharos object,
// and when you want to be observable over several types of events, you might want to keep
// pharos in a hashmap over type_id, and a derive would quickly become a mess.
//
impl Observable<GoddessEvent> for Goddess
{
   type Error = PharErr;

   fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Observe< '_, GoddessEvent, Self::Error >
   {
      self.pharos.observe( options )
   }
}


#[ async_std::main ]
//
async fn main()
{
  let mut isis = Goddess::new();

  // subscribe, the observe method takes options to let you choose:
  // - channel type (bounded/unbounded)
  // - a predicate to filter events
  //
  let mut events = isis.observe( Channel::Bounded( 3 ).into() ).await.expect( "observe" );

  // trigger an event
  //
  isis.sail().await;

  // read from stream and let's put on the console what the event looks like.
  //
  let evt = dbg!( events.next().await.unwrap() );

  // After this reads on the event stream will return None.
  //
  drop( isis );

  assert_eq!( GoddessEvent::Sailing, evt );
  assert_eq!( None, events.next().await );
}

过滤器

有时您对可观察的可以发出的所有事件类型不感兴趣。一个常见的用例是仅在网络连接上监听关闭事件。观察方法接受选项,允许您设置谓词。对于给定的观察者,您只能设置一个谓词。

use pharos::*;

#[ derive( Clone, Debug, PartialEq, Copy ) ]
//
enum NetworkEvent
{
   Open    ,
   Error   ,
   Closing ,
   Closed  ,
}

struct Connection { pharos: Pharos<NetworkEvent> }

impl Observable<NetworkEvent> for Connection
{
   type Error = PharErr;

   fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Observe< '_, NetworkEvent, Self::Error >
   {
       self.pharos.observe( options )
   }
}


#[ async_std::main ]
//
async fn main()
{
   let mut conn = Connection{ pharos: Pharos::default() };

   // We will only get close events. Note that here we don't need access to any surrounding variables in
   // the closure, so we can use a function pointer which avoids having to box the closure.
   //
   // Filter also has a variant `Closure` which allows you to pass in a `Box<dyn FnMut(&Event) -> bool + Send>`
   // if you need access to surrounding context to make the decision.
   //
   let filter = Filter::Pointer( |e| e == &NetworkEvent::Closed );

   // By creating the config object through into, other options will be defaults, notably here
   // this will use unbounded channels.
   //
   let observer = conn.observe( filter.into() ).await.expect( "observe" );

   // Combine both options.
   //
   let filter = Filter::Pointer( |e| e != &NetworkEvent::Closed );
   let opts   = ObserveConfig::from( filter ).channel( Channel::Bounded(5) );

   // Get everything but close events over a bounded channel with queue size 5.
   //
   let bounded_observer = conn.observe( opts ).await.expect( "observe" );
}

API

API文档可以在docs.rs上找到。

贡献

请查看贡献指南

行为准则

任何在公民行为准则第4点“不可接受的行为”中描述的行为都不受欢迎,可能会让您被禁止。如果包括维护者和项目管理员在内的任何人都未能尊重这些/您的限制,您有权对他们提出批评。

许可

未经许可

依赖关系

~0.8–1.2MB
~21K SLoC