#service #pub-sub #tower-service #subscription #back-end #front-end #request

linera-alloy-pubsub

Ethereum JSON-RPC 发布-订阅塔服务和类型定义

1 个不稳定版本

0.1.0 2024年6月1日

#1921神奇豆


6 个crate 中使用

MIT/Apache

145KB
2.5K SLoC

linera-alloy-pubsub

Ethereum JSON-RPC 发布-订阅 塔服务和类型定义。

概述

与常规 RPC 服务不同,PubSub 服务是长期存在的双向服务。它们用于订阅服务器上的事件,并在这些事件发生时接收通知。

此 PubSub 系统由 3 个逻辑部分组成

  • 前端是用户交互的系统部分。它公开了一个简单的 API,允许用户发出请求和管理订阅。
  • 服务是一个中间层,负责管理请求/响应映射、订阅别名和后端生命周期事件。通过 PubSubConnect::into_service 将启动一个长期运行的服务任务。服务存在是为了管理请求和订阅在重新连接时的生命周期,并为任何数量的前端提供服务。
  • 后端是与服务器建立的活跃连接。用户绝不应直接实例化后端。相反,他们应使用 PubSubConnect::into_service 为某些连接对象。后端负责管理与服务器的连接,从服务接受用户请求并将服务器响应转发给服务。

此 crate 提供以下功能

  • PubSubConnect:通过连接到某些 后端 来实例化 PubSub 服务的 trait。此 trait 的实现者负责精确的连接细节,以及启动 后端 任务。用户应始终调用 PubSubConnect::into_service 来获取具有运行后端的运行服务。
  • ConnectionHandle:指向运行中的 后端 的句柄。此类型由 PubSubConnect::connect 返回,并由 服务 所拥有。丢弃句柄将关闭 后端
  • ConnectionInterface:[ConnectionHandle] 的逆。此类型由 后端 所拥有,并用于与 服务 进行通信。丢弃接口将通知 服务 发生了终端错误。
  • PubSubFrontend:前端。指向运行中的 PubSub 服务 的句柄。它用于向 服务 发出请求和订阅生命周期指令。
  • RawSubscription:指向订阅的句柄。当用户发出 get_subscription() 请求时,由 服务 产生。它是一个接收来自 服务 通知的 tokio::broadcast 通道。
  • Subscription:指向特定响应类型的订阅的句柄。它是对 RawSubscription 的包装,它将通知反序列化为预期类型,并允许用户接受或丢弃意外的响应。
  • SubscriptionItem:一个类型化的 Subscription 中的项。当通过 recv_any() API 接收通知时,由订阅产生此类型。它包含反序列化的项。如果反序列化失败,它包含原始的 JSON 值。

处理订阅

对于正常请求,用户向 前端 发送请求,并通过 tokio oneshot 通道接收响应。这是直接且易于理解的。然而,订阅是其他请求的副作用,并且是长期存在的。它们由 服务 管理,并由一个 U256 id 标识。服务使用此 id 来管理订阅生命周期,并将通知派发到正确的订阅者。

服务器与本地 ID

当用户发起订阅请求时,前端将订阅请求发送至服务。服务通过后端将其分发给RPC服务器。服务随后拦截包含服务ID的RPC服务器响应,并为订阅分配一个local_id。这个local_id用于在服务和消费订阅的任务中识别订阅,而server_id则用于识别订阅到RPC服务器,并将通知与特定的活动订阅关联。

这允许我们使用长生命周期的local_id值来管理多重新连接的订阅,而无需在服务器连接丢失时通知前端用户ID变更。这还防止了在重新连接期间或重新连接后立即取消订阅时的竞态条件。

什么是订阅请求?

服务使用请求中的is_subscription()方法来确定给定的RPC请求是否为订阅。通常,订阅请求使用eth_subscribe方法。但是,也可以使用其他方法来创建订阅,例如admin_peerEvents。为了允许在未知方法上自定义订阅,RequestSerializedRequestRpcCall公开了set_is_subscription(),它可以将任何给定的请求标记为订阅。

在标记请求为订阅时,服务将拦截RPC响应,该响应必须是一个U256值。返回任何除U256之外的值的订阅请求将无法正常工作。

订阅生命周期

常规请求生命周期

  1. 用户向前端发起请求。
  2. 前端将请求发送至服务,并通过oneshot通道接收响应。
  3. 服务将oneshot通道存储在其RequestManager中。
  4. 服务将请求发送至后端。
  5. 后端将请求发送至RPC服务器。
  6. RPC服务器返回一个JSON RPC响应。
  7. 后端将响应发送至服务。
  8. 服务通过oneshot将响应发送至等待的任务。

订阅请求生命周期

  1. 用户向前端发起订阅请求。
  2. 前端将请求发送至服务,并通过oneshot通道接收响应。
  3. 服务将oneshot通道存储在其RequestManager中。
  4. 服务将请求发送至后端。
  5. 后端将请求发送至RPC服务器。
  6. RPC服务器返回一个U256值(即server_id)。
  7. 后端将响应发送至服务。
  8. 服务为订阅分配一个local_id,创建订阅广播通道,并在其SubscriptionManager中存储相关信息。
  9. 服务将JSON RPC响应覆盖为local_id
  10. 服务通过oneshot将响应发送至等待的任务。

订阅通知生命周期

  1. RPC服务器向后端发送通知。
  2. 后端将通知发送至服务。
  3. 服务在其SubscriptionManager中查找local_id
  4. 如果存在,服务将通知发送至相关通道。
    1. 否则,服务忽略该通知。

依赖项

~24MB
~508K SLoC