#connection-pool #pool #nodejs #traits #service #node #cueball

cueball-tcp-stream-connection

这是对 Rust 标准库中 TcpStream 的 cueball 连接 trait 的实现

2 个版本

0.3.1 2020 年 5 月 19 日
0.3.0 2020 年 1 月 8 日

#74 in #connection-pool


用于 cueball-static-resolver

MPL-2.0 许可证

82KB
1.5K SLoC

cueball

关于

多节点服务连接池

Cueball 是一个用于“玩台球”的库——管理对多节点服务的连接池。这个 cueball 的实现受到了 Joyent 的许多服务和软件组件使用的原始 Node.js 实现 cueball 的启发。Rust 实现依赖于两个主要特质来管理服务节点上的连接集。这些是 Resolver 特质和 Connection 特质。

解析器

解析器负责定位逻辑服务中所有可用的节点或后端,获取它们的 IP 地址和端口信息(或连接所需的其他信息),并跟踪它们。这通常是某种形式的服务发现客户端。一个例子是一个基于 DNS 的解析器实现,它使用 DNS SRV 记录作为服务发现机制来查找后端。

连接

在 cueball 中,连接不一定是 TCP 套接字。它可以是有某种逻辑连接到服务的东西,只要它遵守与套接字相似的接口。

这旨在允许 API 的用户将“连接”表示为应用层概念。例如,这可能在构造连接到 LDAP 服务器的连接池时很有用,在它们被视为连接之前,它们需要执行绑定操作(认证)。

除了 ResolverConnection 实现,cueball 用户还向 cueball 连接池提供一个建立到所需服务的连接的函数。cueball 连接池为此函数建立的特质约束如下

FnMut(&Backend) -> C + Send + 'static
where C: Connection

需求是一个函数,它接受解析器中的一个 Backend 引用,并返回 Connection 的某个实例。

此函数的目的是提供一种方式来捕获建立服务连接所需的应用层配置信息。例如,数据库连接可能需要特定于应用程序的配置,如数据库名称或用户名,才能建立连接。

重新平衡

随着服务后端的出现和消失,连接池会重新分配配置的连接数量(max_connections)到可用的后端集合中。当Resolver通知连接池添加了新的后端或删除了现有后端时,就会发生重新分配。连接池会根据这些事件之一重新分配连接,以保持连接在可用后端之间的均匀分布。

连接池在执行实际重新分配之前,会使用一个可配置的延迟来处理从Resolver接收到的消息。这种延迟是为了应对Resolver在极短的时间内发送多个消息的情况,并使连接池在重新分配连接时更加高效。默认的重新分配延迟时间为100毫秒。

重新分配可能导致连接池暂时超过配置的池最大连接数。如果Resolver通知连接池删除了一个后端,但该后端的连接仍在使用中,则连接数可能超过最大值,直到这些连接返回连接池并被丢弃。

去相干性

在cueball中,去相干性被用来指连接池中连接顺序的周期性随机洗牌。去相干性的目标是避免在连接池生命周期中可能出现的不良模式。例如,假设一个服务有三个后端,分别为ABC,连接池的最大连接数为九。初始连接分布可能如下所示:

1 2 3 4 5 6 7 8 9
A B C A B C A B C

cueball连接池内部使用队列来存储连接。由于连接池中的连接可能被非均匀地占用一段时间,队列可能从初始状态达到以下状态:

1 4 7 2 5 8 3 6 9
A A A B B B C C C

这种情况并不理想,因为同一后端必须处理多个连续请求,而其他后端处于空闲状态。对于cueball来说,理想的状况是后端的工作负载均匀分布,不仅从连接数来看,而且从随时间分布的请求来看。诚然,上述示例是一个极端情况,但根据工作负载,这种模式可能会很快自行解决,但无法保证这一点。cueball周期性去相干性洗牌的目标是破坏可能出现的并持续一段时间的此类模式。

有关去相干性有一个相关的配置选项:decoherence_intervaldecoherence_interval表示去相干性洗牌周期的长度(以秒为单位)。如果在ConnectionPoolOptions结构中未指定此值,则默认值为300秒。

示例

使用cueball进行连接管理需要实现Resolver特性和Connection特性。Resolver特性的实现者向连接池提供有关提供特定服务的可用节点信息。Connection特性定义了建立和关闭特定服务的连接的行为。

以下是一个示例,使用假设的ResolverConnection实现来创建cueball连接池。

use std::thread;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Barrier, Mutex};
use std::sync::mpsc::Sender;
use std::{thread, time};

use slog::{Drain, Logger, info, o};

use cueball::backend;
use cueball::backend::{Backend, BackendAddress, BackendPort};
use cueball::connection::Connection;
use cueball::connection_pool::ConnectionPool;
use cueball::connection_pool::types::ConnectionPoolOptions;
use cueball::error::Error;
use cueball::resolver::{BackendAddedMsg, BackendMsg, Resolver};

fn main() {
    let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
    let log = Logger::root(
        Mutex::new(
            slog_term::FullFormat::new(plain).build()
        ).fuse(),
        o!("build-id" => "0.1.0")
    );

    let be1 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55555);
    let be2 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55556);
    let be3 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55557);

    let resolver = FakeResolver::new(vec![be1, be2, be3]);

    let pool_opts = ConnectionPoolOptions::<FakeResolver> {
        max_connections: 15,
        claim_timeout: Some(1000)
        resolver: resolver,
        log: log.clone(),
        decoherence_interval: None,
    };

    let pool = ConnectionPool::<DummyConnection, FakeResolver>::new(pool_opts);

    for _ in 0..10 {
        let pool = pool.clone();
        thread::spawn(move || {
            let conn = pool.claim()?;
            // Do stuff here
            // The connection is returned to the pool when it falls out of scope.
        })
    }
}

存在几个ResolverConnection特性的实现,可能对希望开始使用cueball的人有用。

Resolver特性实现者

Connection特性实现者

最低支持的Rust版本

当前最低支持的Rust版本是1.39。

依赖项

~5.5MB
~102K SLoC