#连接池 #postgresql # #解析器 #cueball #特性 #服务

cueball-postgres-connection

这是从rust-postgres包中实现的cueball连接特质的postgres::Client数据库接口

3个版本

0.3.2 2020年6月1日
0.3.1 2020年5月19日
0.3.0 2020年1月8日

#2451 in 数据库接口

MPL-2.0 许可证

89KB
1.5K SLoC

cueball

关于

多节点服务连接池

Cueball是一个用于“玩桌球”的库——管理到多节点服务的连接池。这个cueball的实现灵感来源于Joyent许多服务和软件组件使用的cueball的原始Node.js实现。Rust实现依赖于两个主要特质来管理跨一组提供服务的节点的一组连接。这些是Resolver特质和Connection特质。

解析器

解析器负责定位逻辑服务中所有可用的节点或后端,获取它们的IP地址和端口信息(或连接到它们所需的一切),并跟踪它们。这通常是一种服务发现客户端。一个例子是使用DNS SRV记录作为服务发现机制来查找后端的基于DNS的解析器实现。

连接

在cueball中,连接不一定是一个TCP套接字。它可以是一切可以提供某种逻辑连接到服务的东西,只要它遵守与套接字类似的接口。

这是为了允许API用户将“连接”表示为应用层或会话层概念。例如,构建一个连接到LDAP服务器的连接池,在它们被视为“连接”之前执行绑定操作(认证)可能很有用。

除了ResolverConnection实现之外,cueball用户还向cueball连接池提供一个函数来建立到所需服务的连接。cueball连接池为该函数建立的特质界限如下

FnMut(&Backend) -> C + Send + 'static
where C: Connection

要求是一个函数,它接受解析器中一个Backend的引用,并返回Connection的某个实例。

这个函数的目的是提供一种方法来捕获建立服务连接所需的应用层配置信息。例如,数据库连接可能需要特定的配置信息,如数据库名称或用户名,才能建立连接。

重新平衡

随着服务后端(Backend)的增减,连接池会重新平衡配置的连接数(max_connections)到可用的后端集合中。当Resolver通知连接池已添加新的后端或移除现有的后端时,就会发生重新平衡。连接池会根据这些事件之一重新平衡连接,以保持连接在可用后端之间的均匀分布。

当连接池从Resolver接收到消息并执行实际的重新平衡之前,使用一个可配置的延迟。这种延迟是为了应对Resolver在非常短的时间内可能发送多个消息的情况,允许连接池在重新平衡连接时更加高效。默认的重新平衡延迟时间是100毫秒。

重新平衡可能会导致连接池临时超过池中配置的最大连接数。如果Resolver通知连接池已移除后端,但该后端的连接仍在使用中,连接数可能会超过最大值,直到这些连接返回连接池并被丢弃。

不一致性

在cueball中,“不一致性”一词用来指代连接池中连接顺序的周期性随机打乱。不一致性的目标是避免在连接池的生命周期中出现的不理想模式。例如,假设一个服务有三个后端,分别是ABC,连接池的最大连接数是九。初始连接分布可能如下所示:

1 2 3 4 5 6 7 8 9
A B C A B C A B C

cueball连接池内部使用队列存储连接。考虑到从池中获取连接的时间可能是不均匀的,队列可能从初始状态变为以下状态:

1 4 7 2 5 8 3 6 9
A A A B B B C C C

这种情况并不理想,因为同一个后端必须处理多个连续请求,而其他后端处于空闲状态。对于cueball的理想情况是在后端之间均匀分配工作,这不仅与连接数有关,还与随时间请求分布有关。诚然,上面的例子是一个极端情况,该模式可能会根据工作量迅速解决,但没有保证这一点。cueball周期性不一致性打乱的目标是破坏可能持续一段时间的这些模式。

与不一致性相关的有一个配置选项:decoherence_intervaldecoherence_interval代表不一致性打乱周期的长度,以秒为单位。如果在ConnectionPoolOptions结构中没有指定此值,则默认值是300秒。

示例

使用cueball进行连接管理需要实现Resolver特性和Connection特性。Resolver特性的实现者向连接池提供有关提供特定服务的可用节点信息。Connection特性定义了到特定服务建立和关闭连接的行为。

以下是一个示例,它使用假设的ResolverConnection实现来创建cueball连接池。

use std::thread;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Barrier, Mutex};
use std::sync::mpsc::Sender;
use std::{thread, time};

use slog::{Drain, Logger, info, o};

use cueball::backend;
use cueball::backend::{Backend, BackendAddress, BackendPort};
use cueball::connection::Connection;
use cueball::connection_pool::ConnectionPool;
use cueball::connection_pool::types::ConnectionPoolOptions;
use cueball::error::Error;
use cueball::resolver::{BackendAddedMsg, BackendMsg, Resolver};

fn main() {
    let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
    let log = Logger::root(
        Mutex::new(
            slog_term::FullFormat::new(plain).build()
        ).fuse(),
        o!("build-id" => "0.1.0")
    );

    let be1 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55555);
    let be2 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55556);
    let be3 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55557);

    let resolver = FakeResolver::new(vec![be1, be2, be3]);

    let pool_opts = ConnectionPoolOptions::<FakeResolver> {
        max_connections: 15,
        claim_timeout: Some(1000)
        resolver: resolver,
        log: log.clone(),
        decoherence_interval: None,
    };

    let pool = ConnectionPool::<DummyConnection, FakeResolver>::new(pool_opts);

    for _ in 0..10 {
        let pool = pool.clone();
        thread::spawn(move || {
            let conn = pool.claim()?;
            // Do stuff here
            // The connection is returned to the pool when it falls out of scope.
        })
    }
}

有几种实现 ResolverConnection 特性的方式,这些可能对想要开始使用 cueball 的人很有用。

Resolver 特性实现者

Connection 特性实现者

最低支持的 Rust 版本

当前最低支持的 Rust 版本是 1.39。

依赖项

~10–22MB
~323K SLoC