7 个版本

0.2.1 2024 年 8 月 2 日
0.2.0 2024 年 7 月 16 日
0.1.5 2024 年 7 月 16 日
0.1.3 2024 年 6 月 25 日
0.0.0-预留2023 年 11 月 15 日

413神奇豆

Download history 1/week @ 2024-06-09 3283/week @ 2024-06-16 4888/week @ 2024-06-23 4793/week @ 2024-06-30 6334/week @ 2024-07-07 7629/week @ 2024-07-14 7514/week @ 2024-07-21 8488/week @ 2024-07-28 11034/week @ 2024-08-04 9462/week @ 2024-08-11

37,046 每月下载量
7crate(6 个直接使用)中使用

MIT/Apache

165KB
2.5K SLoC

alloy-pubsub

Ethereum JSON-RPC 发布-订阅塔服务和类型定义。

概述

与常规 RPC 服务不同,PubSub 服务是长期存在且双向的。它们用于订阅服务器上的事件,并在这些事件发生时接收通知。

这里的 PubSub 系统由 3 个逻辑部分组成

  • 前端 是用户与之交互的系统部分。它提供了一个简单的 API,允许用户发出请求和管理订阅。
  • 服务 是一个中间层,负责管理请求/响应映射、订阅别名和后端生命周期事件。运行 PubSubConnect::into_service 将启动一个长期运行的服务任务。服务存在是为了管理请求和订阅在重新连接期间的生命周期,并为任意数量的 前端 服务。
  • 后端是与服务器保持活跃连接的连接。用户绝对不应该直接实例化后端。相反,他们应该使用 PubSubConnect::into_service 来创建某个连接对象。后端负责管理与服务器的连接,接受来自 服务 的用户请求,并将服务器响应转发到 服务

本软件包提供以下功能

  • PubSubConnect:一个通过连接到某个 后端 来实例化 PubSub 服务的特质。此特质的实现者负责确切的连接细节,并生成 后端 任务。用户始终应调用 PubSubConnect::into_service 以获得带有运行后端的服务。
  • ConnectionHandle:运行 后端 的句柄。此类型由 PubSubConnect::connect 返回,并由 服务 拥有。丢弃句柄将关闭 后端
  • ConnectionInterface:[连接句柄]的逆。此类型由 后端 拥有,并用于与 服务 进行通信。丢弃接口将通知 服务 出现终端错误。
  • PubSubFrontend:前端。运行 PubSub 服务 的句柄。它用于向 服务 发出请求和订阅生命周期指令。
  • RawSubscription:订阅的句柄。当用户发出 get_subscription() 请求时,由 服务 产生。这是一个 tokio::broadcast 通道,当服务器为订阅发送通知时,它接收来自 服务 的通知。
  • Subscription:期望特定响应类型的订阅句柄。它是 RawSubscription 的包装器,将通知反序列化为期望的类型,并允许用户接受或丢弃意外的响应。
  • SubscriptionItem:类型化 Subscription 中的一个项。此类型通过订阅通过 recv_any() API产生,当收到通知时包含反序列化的项。如果反序列化失败,它包含原始的 JSON 值。

处理订阅的方法

对于正常请求,用户向前端发送请求,稍后通过tokio单次通道接收响应。这很简单,易于理解。然而,订阅是其他请求的副作用,并且是长期存在的。它们由服务管理,并通过U256 ID识别。服务使用此ID来管理订阅生命周期,并将通知分发给正确的订阅者。

服务器 & 本地 ID

当用户发起订阅请求时,前端将订阅请求发送到服务。服务通过后端将请求发送到RPC服务器。然后服务拦截包含服务器ID的RPC服务器响应,并将local_id分配给订阅。此local_id用于在服务和消费订阅的任务中识别订阅,而server_id用于识别RPC服务器中的订阅,并将通知与特定活动订阅关联。

这允许我们使用长期存在的local_id值来管理跨多个重连的订阅,而无需在服务器连接丢失时通知前端用户ID更改。它还防止在重连期间或之后立即取消订阅时发生竞争条件。

什么是订阅请求?

服务使用请求中的is_subscription()方法来确定给定的RPC请求是否为订阅。通常,订阅请求使用eth_subscribe方法。但是,其他方法也可以用于创建订阅,例如admin_peerEvents。为了允许对未知方法的自定义订阅,RequestSerializedRequestRpcCall公开了set_is_subscription(),可以用来标记任何请求为订阅。

在将请求标记为订阅时,服务将拦截RPC响应,它必须是U256值。返回除U256值以外的任何值的订阅请求将无法工作。

订阅生命周期

常规请求生命周期

  1. 用户向前端发出请求。
  2. 前端通过单次通道将请求发送到服务以接收响应。
  3. 服务将单次通道存储在其RequestManager中。
  4. 服务将请求发送到后端。
  5. 后端将请求发送到RPC服务器。
  6. RPC服务器以JSON RPC响应的形式响应。
  7. 后端将响应发送到服务。
  8. 服务通过单次通道将响应发送到等待的任务。

订阅请求生命周期

  1. 用户向前端发出订阅请求。
  2. 前端通过单次通道将请求发送到服务以接收响应。
  3. 服务将单次通道存储在其RequestManager中。
  4. 服务将请求发送到后端。
  5. 后端将请求发送到RPC服务器。
  6. RPC服务器响应以U256值(即server_id)。
  7. 后端将响应发送到服务。
  8. 服务为订阅分配local_id,创建订阅广播通道,并将相关信息存储在其SubscriptionManager中。
  9. 服务将JSON RPC响应覆盖为local_id
  10. 服务通过单次通道将响应发送到等待的任务。

订阅通知生命周期

  1. RPC服务器向后端发送通知。
  2. 后端将通知发送到服务。
  3. 服务在其SubscriptionManager中查找local_id
  4. 如果存在,该服务会将通知发送到相关频道。
    1. 否则,该服务将忽略通知。

依赖关系

~23MB
~504K SLoC