#channel #multi-threading #sync #multi-consumer #thread-safe

multichannel

具有动态通道注册和冻结功能的 mpmc 优先多通道

2 个不稳定版本

0.2.0 2024 年 4 月 29 日
0.1.0 2024 年 4 月 26 日

#501并发

MIT/Apache

28KB
523

MIT License License

Logo

multichannel

具有动态通道注册和冻结功能的 mpmc 优先多通道。

安装

添加到您的 Cargo.toml 文件中

[dependencies]
multichannel = "0.2.0"

功能

  • 动态通道创建和删除
  • 基于优先级的消息选择
  • 加权消息选择
  • 通道冻结
  • 有界和无界通道
  • 线程安全
  • 无不安全代码
  • 多生产者和多消费者

性能

DynMultiReceiver 提供的功能很多,但代价也很大。由于冻结功能,每次 receive() 调用的最坏情况复杂度为 O(n),其中 n 是通道的数量。这是因为 DynMultiReceiver 必须遍历所有通道来找到具有非冻结消息的最高优先级通道。否则使用堆是一个不错的选择,但冻结功能使得这一点变得不可能。

因此,如果您有很多通道且不使用冻结功能,您可能需要考虑使用不同的实现。对于大多数用例,性能应该足够好。

如果您可以使用基本通道实现您的逻辑,您应该这样做。此实现旨在处理需要更高级功能的情况。

Hello World

  use multichannel::DynMultiReceiver;

 #[derive(Debug)]
 enum Msg {
     Shutdown,
     IntegerData(i32),
     FloatingData(f32),
 }

 #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
 enum Priority {
     High,
     Low,
 }

 fn main() {
     let mrx = DynMultiReceiver::<Msg, Priority>::new();
     // Create an unfrozen channel with high priority, a dummy weight and capacity of 1
     let shutdown_sender = mrx.new_channel(Priority::High, 1, false, Some(1));

     // Create two channels with low priority
     // int_sender has a weight of 33 and float_sender has a weight of 66
     // meaning that float_sender will be twice as likely to be selected
     // when calling receive() on mrx and no higher priority channel has a msg
     let int_sender = mrx.new_channel(Priority::Low, 33, false, None);
     let float_sender = mrx.new_channel(Priority::Low, 66, false, None);

     // Send some messages
     int_sender.send(Msg::IntegerData(33)).unwrap();
     int_sender.send(Msg::IntegerData(4031)).unwrap();
     float_sender.send(Msg::FloatingData(3.14)).unwrap();
     int_sender.send(Msg::IntegerData(2)).unwrap();
     float_sender.send(Msg::FloatingData(10.0)).unwrap();
     float_sender.send(Msg::FloatingData(0.0)).unwrap();

     // Receive some messages
     for _ in 0..4 {
         println!("{:?}", mrx.receive());
     }

     // Send a shutdown message
     shutdown_sender.send(Msg::Shutdown).unwrap();

     // There are still messages left in the int channel and float channel,
     // but the shutdown message will be received first, as it has higher priority
     match mrx.receive() {
         Msg::Shutdown => println!("Received shutdown message"),
         _ => unreachable!("Expected a shutdown message"),
     }
 }

贡献

除非您明确声明,否则您提交给作品的所有贡献,如 Apache-2.0 许可证中定义的,应作为上述双重许可,没有任何附加条款或条件。

依赖项

~2–2.7MB
~49K SLoC