#send-message #consumer #producer-consumer #multi-consumer #multiple #polling #thread

mp2c

一个多生产者多轮询消费者的库,允许多个生产者完全异步地向多个消费者发送消息

3 个版本

0.1.2 2020年11月18日
0.1.1 2020年9月14日
0.1.0 2020年9月14日

#815 in 并发

MIT 许可证

12KB
186

mp2c

多生产者多消费者

什么是 mp2c?

MP2C 是一种数据结构,允许多个生产者/发布者向多个消费者/订阅者发送消息。异步 mp2c 数据结构称为 Carousel

你说的消息是什么意思?

mp2c 的上下文中,message 是一个 u8 的向量。如何将消息序列化和反序列化取决于生产者/消费者如何最适合他们的需求。

mp2c 是线程安全的吗?

是的。

多个生产者(线程)如何向 mp2c Carousel 发送消息?

克隆一个 mp2c::asynch::Carousel 会创建一个底层的 std::sync::Sender 的克隆,每次调用 Carousel::put 都会将消息发送给消费者。

mp2c 支持异步消息发布吗?

mp2c::asynch::Carousel 支持完全的异步行为。所有放入 Carousel 的消息都异步发送给消费者。

有内存开销吗?

是的。在“不要通过共享内存来通信,而要通过通信来共享内存”的精神下,所有消息都根据 mp2c::asynch::Consumer 的数量进行了克隆。

多生产者多消费者示例

 use mp2c::asynch::{Carousel, Consumer};

 struct TestConsumer1;

 impl Consumer for TestConsumer1 {
   fn consume(&mut self, data: Vec<u8>) {
     let msg = String::from_utf8(data).unwrap();
     // do something with msg
   }
 }

 struct TestConsumer2;

 impl Consumer for TestConsumer2 {
  fn consume(&mut self, data: Vec<u8>) {
    let msg = String::from_utf8(data).unwrap();
    // do something with msg   
  }
 }

 let mut v: Vec<Box<dyn Consumer + Send + 'static>> = Vec::new();
 v.push(Box::new(TestConsumer1));
 v.push(Box::new(TestConsumer2));

 let c = Carousel::new(v);

 for _ in 1..10 {
   let cloned_c = c.clone();
   let t = std::thread::spawn(move || {
     cloned_c.put(String::from("test").into_bytes());
   });
   t.join().unwrap();
 }

接下来是什么?

消息 ID

将消息 ID 添加到要放入数据 Carousel 的每个消息中。

发布历史

v0.1.2

使消费者可变

v0.1.1

更新了 README,包含示例。

v0.1.0

初始发布。

无运行时依赖项