25个版本 (3个稳定版)

1.1.0 2024年5月1日
0.8.1 2023年11月2日
0.6.0 2023年3月3日
0.5.3 2022年11月25日
0.2.1 2022年2月17日

#118异步

Download history 763/week @ 2024-04-30 580/week @ 2024-05-07 585/week @ 2024-05-14 627/week @ 2024-05-21 617/week @ 2024-05-28 512/week @ 2024-06-04 458/week @ 2024-06-11 530/week @ 2024-06-18 556/week @ 2024-06-25 17/week @ 2024-07-02 357/week @ 2024-07-09 611/week @ 2024-07-16 565/week @ 2024-07-23 512/week @ 2024-07-30 521/week @ 2024-08-06 498/week @ 2024-08-13

每月2,178次下载

MIT/Apache

17KB
186

Rust事件总线

为Rust中事件驱动系统提供灵活且易于配置的事件总线。

通过发送强类型有效负载来创建自己的struct并集成你的服务/微服务。

创建处理器

从队列接收到的消息将被发送到处理器。

你的消息处理器必须实现trait MessageHandler,并告知它处理的消息类型。

pub struct NotifyUserHandler {
    received_messages: Arc<Mutex<Vec<UserCreatedMessage>>>,
}

impl NotifyUserHandler {
    pub fn new(received_messages: Arc<Mutex<Vec<UserCreatedMessage>>>) -> Self {
        Self { received_messages }
    }
}

impl MessageHandler<UserCreatedMessage> for NotifyUserHandler {
    fn handle(&self, message: Box<UserCreatedMessage>) -> Result<(), HandleError> {
        self.received_messages.lock().unwrap().push(*message.clone());

        if message.user_id == "100".to_owned() {
            return Err(HandleError::new("ID 100 rejected on NotifyUserHandler".to_owned(), false));
        }
        println!("Message received on NotifyUserHandler: {:?}", message);

        Ok(())
    }
}

此方法将接收配置类型的消息,从订阅者监听的队列中。

HandleError结构体用于告知过程没有正确发生,尽管消息已被接收。

使用错误对象,你还可以告诉总线是否应该将此消息重新入队以尝试再次处理。

在容器中创建RabbitMQ实例以进行本地测试

在终端运行 make up 访问 https://127.0.0.1:15672/ 用户名和密码都输入为guest。

在终端运行 cargo test 以运行测试。

你可以在RabbitMQ管理页面上检查交换、队列和消息。

代码示例

文件send_receive_integration_test包含发送消息和订阅交换以接收消息的有用示例。

创建事件消息

请注意,我们希望在服务之间发送和接收的消息类型是UserCreatedMessage

当我们创建事件消息struct和消息处理器时,我们正在定义

  • 我们在服务之间发送的事件消息的格式。
  • 将处理传入消息的struct/method。

以下是我们当前示例中使用的消息示例

#[derive(Debug, Clone, BorshDeserialize, BorshSerialize)]
pub struct UserCreatedMessage {
    pub user_id: String,
    pub user_name: String,
    pub user_email: String,
}

请注意,你的struct必须从BorshDeserialize和BorshSerialize派生,这样Crosstow Bus才能将你定义的struct序列化以发送到RabbitMQ,并将来自RabbitMQ的消息反序列化为你的自定义格式。

所以,不要忘记将导入添加到你的cargo.toml文件中。

borsh = "1.4.0"
borsh-derive = "1.4.0"

监听事件消息

首先,让我们创建一个Receiver对象

let subscriber = CrosstownBus::new_subscriber("amqp://guest:guest@localhost:5672".to_owned())?;

之后,调用subscribe_event方法,传递您想要订阅的事件名/队列名。如果队列尚未在RabbitMQ中创建,则在接收器订阅时将创建它。

subscriber.subscribe(
        "user_created".to_owned(),
        NotifyUserHandler::new(received_messages.clone()),
        QueueProperties {
            auto_delete: false,
            durable: false,
            use_dead_letter: true,
            consume_queue_name: Some("queue2".to_string()),
        },
    )?;

请注意,subscribe_event方法为异步,因此我在调用它时使用了await。另一个选项是使用以下符号来阻塞它

futures::executor::block_on(receiver.receive("notify_user".to_owned(), NotifyUserEventHandler, None));

参数queue_properties是可选的,它包含特定的队列配置,例如队列是否应该自动删除和持久化。

!!! 重要:consume_queue_name !!!

当您订阅一个事件时,订阅者有自己的队列。

发布者将消息发布到交换机,而不是队列。

每个队列都会接收到消息的一个副本。

每个订阅者可能都有自己的队列。所有这些队列都会接收到消息的一个副本。

使用相同队列的订阅者将争夺消息,因此这些消息将在它们之间分配。结果是它们不会每次都接收到所有消息。

因此,如果您有多个需要处理相同消息的订阅者,请为每个订阅者选择不同的队列名。

发布/发送事件消息

创建发送器的过程基本上是相同的,只是创建方法不同。

let mut publisher = CrosstownBus::new_publisher("amqp://guest:guest@localhost:5672".to_owned())?;

_ = publisher.publish("notify_user".to_owned(), 
        UserCreatedMessage {
            user_id: "asdf".to_owned(),
            user_name: "Billy Gibbons".to_owned(),
            email: "[email protected]".to_owned()
        });

由于send方法接收一个通用的Message参数,我们可以使用同一个发送对象将多个对象类型发布到多个队列。 警告:如果您在队列上发布的消息类型与订阅者处理程序期望的类型不匹配,则无法解析消息,并将记录一条消息。

死信交换机

当您想要处理未正确处理的消息时,死信交换机非常有用。

在订阅事件时,您可以设置use_dead_letter属性为true,这样未正确处理的消息就会被发送到死信交换机。

            use_dead_letter: true,

订阅死信交换机

当处理程序在尝试处理消息时失败,该消息就会被发送到死信交换机。

您可以创建一个处理这些消息的处理程序,并订阅死信交换机。

dl_subscriber.subscribe(
        "insert_user.dlx".to_owned(),
        AddUserToDBDeadLetterHandler::new(received_messages.clone()),
        QueueProperties {
            auto_delete: false,
            durable: false,
            use_dead_letter: false,
            consume_queue_name: Some("handle_insert_user_dl".to_string()),
        },
    )?;

关闭与RabbitMQ的连接

如果需要,您也可以手动关闭连接

_ = publisher.close_connection();

依赖项

~13–26MB
~417K SLoC