6个版本

0.3.5 2020年5月11日
0.3.3 2020年3月4日
0.3.2 2020年1月30日
0.3.0 2019年12月31日
0.2.0 2019年10月25日

#55 in #解析器

6 星 & 26 关注者

MPL-2.0 许可证

78KB
1.5K SLoC

多节点服务连接池

Cueball是一个用于“玩池”的库——管理到多节点服务的连接池。这个Cueball的实现受到Joyent许多服务和软件组件使用的原始Node.js实现cueball的启发。Rust实现依赖于两个主要的特质来管理一组跨越多个服务节点的连接。这些是Resolver特质和Connection特质。

解析器

解析器负责定位一个逻辑服务中所有可用的节点或后端,获取它们的IP地址和端口号(或连接它们所需的任何信息),并跟踪它们。这通常是一种服务发现客户端。一个例子是使用DNS SRV记录作为服务发现机制来查找后端的基于DNS的解析器实现。

连接

在cueball中,连接不一定是TCP套接字。它可以是有某种逻辑连接到服务的东西,只要它遵守与套接字类似的面相。

这是为了让API的用户将“连接”表示为应用层或会话层概念。例如,在它们被认为是“已连接”之前,构建到LDAP服务器的连接池执行绑定操作(认证)可能是有用的。

除了ResolverConnection实现之外,cueball用户还为cueball连接池提供了一个建立到所需服务的连接的功能。cueball连接池为此函数定义的特质界限如下

FnMut(&Backend) -> C + Send + 'static
where C: Connection

该要求是一个函数,它接受一个从解析器中获取的Backend的引用,并返回Connection的某个实例。

此函数的目的是提供一种方式来捕获建立到服务连接所需的应用程序级配置信息。例如,数据库连接可能需要特定于应用程序的配置,例如数据库名称或用户名,以便建立连接。

重新平衡

随着服务Backend的来去,连接池将配置的连接数(max_connections)重新平衡到可用的Backend集合中。当解析器通知连接池添加了新的后端或已删除现有后端时,将发生重新平衡。连接池通过响应这些事件之一来重新平衡连接,以保持连接在可用后端之间均匀分布。

在执行实际重新平衡之前,连接池使用可配置的延迟来处理从解析器接收到的消息。这种延迟是为了考虑到解析器可能在很短的时间内发送多个消息的情况,并使连接池在重新平衡连接时更加高效。默认的重新平衡延迟时间为100毫秒。

重新平衡可能导致连接池暂时超过池中配置的最大连接数。如果解析器通知连接池删除后端,但该后端的连接仍在使用中,则连接计数可能会超过最大值,直到那些连接返回到连接池并被丢弃。

不一致性

在cueball中,不一致性是指连接池中连接顺序的周期性随机打乱。不一致性的目标是避免在连接池的生命周期中出现的不良模式。例如,假设一个服务有三个后端,ABC,并且连接池的最大连接数为九。初始连接分布可能如下所示

1 2 3 4 5 6 7 8 9
A B C A B C A B C

cueball连接池使用队列内部存储连接。鉴于池中的连接可能被声明为非均匀时间段,队列可能从其初始状态到达以下状态

1 4 7 2 5 8 3 6 9
A A A B B B C C C

这种情况并不理想,因为同一后端必须处理多个连续请求,而其他后端处于空闲状态。对于cueball来说,理想的方案是在后端之间均匀分配工作,不仅与连接数有关,而且与随时间分布的请求有关。现在诚然上述示例是一个极端情况,该模式可能会根据工作量迅速解决,但无法保证这种情况会发生。cueball中周期性去相干洗牌的目标是破坏可能产生并持续很长时间的这种模式。

与去相干相关的配置选项有一个:decoherence_intervaldecoherence_interval表示去相干洗牌周期的长度(秒)。如果在ConnectionPoolOptions结构中未指定此值,则默认值为300秒。

示例

使用cueball进行连接管理需要实现Resolver特性和Connection特性。Resolver特性的实现者向连接池提供有关可提供给定服务的节点信息。Connection特性定义了建立和关闭特定服务的连接的行为。

以下是一个示例,使用假设的ResolverConnection实现来创建cueball连接池。

use std::thread;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Barrier, Mutex};
use std::sync::mpsc::Sender;
use std::{thread, time};

use slog::{Drain, Logger, info, o};

use cueball::backend;
use cueball::backend::{Backend, BackendAddress, BackendPort};
use cueball::connection::Connection;
use cueball::connection_pool::ConnectionPool;
use cueball::connection_pool::types::ConnectionPoolOptions;
use cueball::error::Error;
use cueball::resolver::{BackendAddedMsg, BackendMsg, Resolver};

let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let log = Logger::root(
    Mutex::new(
        slog_term::FullFormat::new(plain).build()
    ).fuse(),
    o!("build-id" => "0.1.0")
);

let be1 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55555);
let be2 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55556);
let be3 = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 55557);

let resolver = FakeResolver::new(vec![be1, be2, be3]);

let pool_opts = ConnectionPoolOptions::<FakeResolver> {
    max_connections: 15,
    claim_timeout: Some(1000)
    resolver: resolver,
    log: log.clone(),
    decoherence_interval: None,
};

let pool = ConnectionPool::<DummyConnection, FakeResolver>::new(pool_opts);

for _ in 0..10 {
    let pool = pool.clone();
    thread::spawn(move || {
        let conn = pool.claim()?;
        // Do stuff here
        // The connection is returned to the pool when it falls out of scope.
    })
}

有几个ResolverConnection特性的实现,可能对任何想开始使用cueball

Resolver特性实现者

Connection特性实现者

依赖项

~6MB
~110K SLoC