0.1.1 2020年11月30日

#20 in #query-execution

GPL-3.0 许可证

99KB
2.5K SLoC

GitHub Workflow Status License: GPL v3 Codecov Crates.io Crates.io GitHub contributors GitHub Release Date Scc Count Badge Scc Count Badge

HETNETDB

HETNETDB 是一个旨在极其灵活且易于使用的数据库。在 hetnetdb(het,代表异构,net,代表网络化)中,目标是通过 SQL 查询轻松访问异构存储平台和网络中的数据。数据库的目标包括支持存储在多个存储平台(包括本地、服务器、边缘和流式传输)上的 CSV 和 JSON。为了实现这一目标,将支持浏览器、iOS、android、linux 和 macOS 的代理支持。(不包括 windows,我们明确不希望支持 windows 用户!)计算操作将通过不透明的 HTTP 端点执行,以实现极大的易用性。

目录

  1. HETNETDB
    1. 目录
    2. 优先级、IO、压缩、网络
    3. 通用架构和命名法
      1. 执行图特质和结构体
    4. 拓扑
      1. v0 目标拓扑
      2. v1 目标拓扑
      3. v3 目标拓扑
    5. 性能目标
    6. 开发
    7. 示例用法
  2. 记账
  3. 功能路线图和愿望清单
  4. 改进愿望清单
  5. 里程碑交互
    1. 第一次点赞
    2. 第一次查询 1 亿行
    3. 钻探基准测试
    4. 第一次执行的图

优先级、IO、压缩、网络

在执行图中以单元形式支持边缘外围设备,我们为支持极其庞大的设备网络打开了大门。这使得处理由小设备生成的数字变得非常容易且可追踪,但这也可能增加整体查询执行时间,并创建执行时间各个阶段的长期尾部。

在公开这些限制的情况下,执行图必须考虑到磁盘、网络和处理的巨大差异,以便为各种拓扑提供低延迟查询性能。提高此性能是次要目标;设计决策将侧重于实用性第一!

通用架构和命名法

在定义术语之前,让我们定义一个等级列表:好(TG)、坏(TB)和丑(TU)。此等级列表将用于引用属性的等级。这些排名表明数量级差异。例如,通过 BLE 通信的边缘设备将在丑陋等级或简称为 TU 上具有网络功能。您的智能手机可能具有 TB 级别的网络功能,而数据中心的服务器将获得 TG。

  • 查询服务器:这是云中的主机。它运行 HTTP 服务器,具有提交查询请求、请求执行等端点。通常,查询服务器应该是 TG。
  • 代理:这是一个执行者的集合。代理在hetnetdb中异构,具有覆盖整个层级的各种能力集合。除非拥有TU排名,否则代理不应与其他代理共享资源。代理可以管理执行者以重试或平衡工作负载,但当他们找不到可接受的执行者配置时,查询将失败。
  • 执行者:实际的数据操作发生在执行者内部。它们尊重自己的资源限制,并尽力完成工作。它们彼此完全独立。它们产生结果或可能的可恢复错误。
  • 解析器:访问数据的接口是SQL。解析器将您的查询转换为错误消息或执行图。
  • 执行节点:这是查询的高级待办事项列表。查询服务器在遍历执行图时跟踪代理并将节点委派给代理。
  • 执行图:执行图是执行节点的解析执行计划。每个查询都有一个待最终确定的图,作为错误或结果。

执行图特质和结构体

存在一个名为 GraphBuilder 的结构体,用于将构建模式公开,以简洁的语法创建、验证和优化执行图。通过 GraphBuilder::new(query_id) 开始构建一个持久化查询的执行图,通过 query_id 进行创建,以便在执行期间可以渲染新的执行图,通过 query_id 进行搜索。在 GraphBuilder::build(&mut self)-> RootNode 上,图被优化并准备好查询(或继续查询)。

执行图本身有一个 RootNode,它旨在充当一个作为元数据、透传节点的元节点,支持与图中其他 HyperNode 相同的异步 Node 接口。每个 HyperNode 是执行图中的一个执行节点,与 关系代数 有直接联系。像 RootNode 一样,HyperNode 提供了一个 async fn curse() -> Arc<WorkNodeCursor> 接口,用于遍历由异步处理接口的 WorkNodeCursor 产生的结果集。一个 HyperNode 是一个不实际执行计算或读取的元节点;它在 WorkNode 实例上初始化并产生计算和读取的结果。 WorkNode 实例旨在灵活部署和分区,它们实现了支持各种 OpTypeIoType 的逻辑。

在以下拓扑部分中讨论和说明,一个 HyperNode 可能在一个代理上的执行者中运行,并向在另一个代理上的执行者中运行的 WorkNode 提供异步游标。

拓扑

显然,有一些雄心勃勃的目标在不久的将来不会完成。

v0 目标拓扑

  1. 查询服务器 + (输入通用客户端)代理:用户将CSV/JSON上传到(网站应用程序)。他们在(浏览器手机)上运行SQL查询。
  2. 查询服务器 + 本地代理:用户有一个CSV/JSON目录。他们运行SQL查询来处理这些数据,而不需要上传。
  3. 查询服务器 + 本地代理 + 远程流式代理:用户有一个CSV/JSON目录。远程流式代理正在测量传感器数据并将其上传到查询服务器。远程流式代理的数据在查询服务器 + 本地代理上处理。

v1 目标拓扑

  1. 查询服务器服务 + 多租户代理池 + 弹性存储:用户向查询服务提交查询。数据从代理池和弹性存储流向。计算发生在查询服务和代理池中。
  2. 查询服务器服务 + 弹性存储 + 本地代理 + 远程流式代理:用户向查询服务提交查询。数据从弹性存储和远程流式代理流向。流式代理的新数据持久化在弹性存储和/或本地代理中。
  3. 查询服务器服务 + 边缘代理:僵尸网络也可以进行计算和存储
  4. 查询服务器服务 + 数据重新分配:用户可以将数据移动到所需的代理,或者让服务决定最佳位置

v3 目标拓扑

  1. 查询服务器服务 + 多租户代理池 + 边缘代理:运行一个社交媒体平台,用广告收入支付代理。

性能目标

考虑到目标数据集将包括各种属性,以下是一些重要的工作负载:

  1. 时间过滤器
  2. 地理过滤器
  3. 标识符过滤器
  4. 捕获后过滤
  5. 数据分布洗牌

实际上,考虑到v0的目标架构,我们所能期望的最好结果是亚秒级延迟。在每个执行图跳转处都有实时数据解析和HTTP服务器,500毫秒的查询执行时间将是惊人的。随着代理进入池中,索引变得更加复杂,将为新的工作负载设定新的目标。

开发

  1. 安装:cargo, libpq, diesel_cli(与postgres一起使用),systemfd,cargo-watch
  2. 构建/测试:cargo buildcargo test
  3. 运行开发服务器:systemfd --no-pid -s http::6969 -- cargo watch -x run
  4. 运行生产服务器:cargo run --release

示例用法

  1. 安装httpie
  2. 分部分地或整体提交查询到端点
jwtrueb@jbmp hetnetdb % echo '{ "text": "SELECT count(*) from agents" }' | http post :6969/query/submit
HTTP/1.1 200 OK
content-length: 87
content-type: application/json
date: Sat, 14 Nov 2020 21:23:58 GMT

{
    "records": [
        {
            "columns": [
                {
                    "i64": 42
                }
            ],
            "ready": {
                "dt_utc": "2020-11-14T21:23:58.730786Z"
            }
        }
    ]
}

记账

从master分支创建和标记版本,使用语义版本控制。README应该是最新的。目录表可以自动更新,使用markdown toc生成器: cargo install markdown-tocmd-toc README.md。使用cargo install cargo-license检查了许可证,但运行工具时很奇怪 rustup run nightly cargo-license

功能路线图和愿望清单

在项目的早期阶段,此TODO列表将包含临时的功能路线图。

  • 创建一个HTTP服务器,通过它路由所有操作。所有请求将通过HTTP请求进行消息传递,并通过与postgres交易来操作共享状态。
  • 为提交选择查询创建路由
    • 尝试解析SQL查询
    • 优化SQL查询
    • 执行优化后的SQL查询
    • 提交未经检查的SQL查询并等待结果
  • 将优化后的查询膨胀为执行图
    • 定义基本图类型和关系
  • 为数据创建执行节点
    • 数据过滤:WHERE
    • 数据分组:GROUP BY
    • 数据排序:ORDER BY
    • 数据函数
      • COUNT(*)
      • COUNT()
      • COUNT(DISTINCT)
      • min
      • max
      • avg
      • ?stddev
  • 为数据加载创建执行节点
    • 数据限制:LIMIT
    • 数据偏移:OFFSET
  • 创建具有模式强制的数据加载路由
    • 上传要缓存的CSV
    • 解析缓存的CSV
    • 将CSV流式传输到缓存表
    • 注册S3配置以下载数据(通过HTTP请求)
    • 注册代理配置以本地处理数据(需要机构CLI/守护程序服务)
  • 创建机构CLI
    • 注册能力
    • 心跳系统负载
    • 在计算组中标记注册
    • 在存储组中标记注册
    • 从查询服务器服务处理计算事件
    • 从查询服务器服务处理存储事件
  • 配置监控仪表板
    • TICK堆栈
    • 对单个查询进行LogDNA事件跟踪
  • 创建执行图可视化工具

改进愿望清单

  • 创建执行成本模型和基准
  • 改进查询优化
  • 改进执行图膨胀
  • 在CSV被引用的地方添加JSON支持
    • 选择一个更快的serde格式
  • 在心跳系统负载事件上重新路由工作负载
  • 切换到支持不同的解析器
    • 公共表表达式
    • 窗口函数
    • 合理任意语法扩展
  • 在约10亿行和/或100GB未压缩CSV上运行基准测试
  • 在从边缘设备流式传输产生行的地方运行机构CLI服务
  • 提供跟踪部分摄取数据的可靠性机制(在端点之前)。

里程碑交互

第一次点赞

数据是

  • 未持久化
  • seq 1 100000生成
  • 并行上传100次
  • 只能来自1个表
  • 预计要逐行遍历以计数
jwtrueb@jbmp hetnetdb % for i in `seq 1 100`; do http --multipart POST :6969/tables/upload/1 'Authorization: Bearer zKpze8PrHL0RfEoZwTeFKCrzL56RprSwJRm1hFp6KwTOfInwAzW8btLHuiMtfD12' csv@./sequence.csv & ; done
jwtrueb@jbmp hetnetdb % echo '{ "text": "select count(*) from simple" }' | http POST :6969/query/submit 'Authorization: Bearer zKpze8PrHL0RfEoZwTeFKCrzL56RprSwJRm1hFp6KwTOfInwAzW8btLHuiMtfD12'
HTTP/1.1 200 OK
content-length: 92
content-type: application/json
date: Mon, 16 Nov 2020 04:36:29 GMT

{
    "records": [
        {
            "columns": [
                {
                    "i64": 1000000
                }
            ],
            "ready": {
                "dt_utc": "2020-11-16T04:36:29.126917Z"
            }
        }
    ]
}

第一次查询 1 亿行

数据是

  • 未持久化
  • for i in $(seq 0 999999); do echo $i,$i,$i >> data/i64_i64_i64.csv; done(21MB csv)
  • 并行上传100次在 🔥 29.44s 🔥
  • 在上传期间验证和缓存,速度为 🔥 600Mbps 🔥(使用请求完成,因此包括上传时间(峰值1.34Gbps))
  • 通过在缓存的共享应用数据中求和行来计算
  • 查询在 ⚡ 9.4ms ⚡ 上运行

客户端

jwtrueb@jbmp hetnetdb % ./query.sh $(cat target/auth.txt)
HTTP/1.1 200 OK
content-length: 94
content-type: application/json
date: Sun, 22 Nov 2020 20:46:46 GMT

{
    "records": [
        {
            "columns": [
                {
                    "i64": 100000000
                }
            ],
            "ready": {
                "dt_utc": "2020-11-22T20:46:46.697957Z"
            }
        }
    ]
}

服务器

[2020-11-22T20:46:46Z INFO  hetnetdb::query::routes] /query/execute Query { text: "select count(*) from hndefault", parse: Some(Select(SelectStatement { tables: [Table { name: "hndefault", alias: None }], distinct: false, fields: [Col(Column { name: "count(*)", alias: None, table: None, function: Some(CountStar) })], join: [], where_clause: None, group_by: None, order: None, limit: None })), optimal_parse: Some(Select(SelectStatement { tables: [Table { name: "hndefault", alias: None }], distinct: false, fields: [Col(Column { name: "count(*)", alias: None, table: None, function: Some(CountStar) })], join: [], where_clause: None, group_by: None, order: None, limit: None })) }
[2020-11-22T20:46:46Z INFO  actix_web::middleware::logger] 127.0.0.1:52091 "POST /query/submit HTTP/1.1" 200 94 "-" "HTTPie/2.3.0" 0.009403

钻探基准测试

基准测试套件的开始使用drill(cargo install drill)直接击中HTTP服务器。配置在 drill.yml。第一种情况是并行运行1000次count star。

有趣的是,此代码片段是从上一个里程碑中加载的1亿行数据中查询得到的。

jwtrueb@jbmp hetnetdb % drill --benchmark drill.yml --stats
Concurrency 4
Iterations 1000
Rampup 2
Base URL http://localhost:6969

Run queries               http://localhost:6969/query/submit 200 OK 13ms
Run queries               http://localhost:6969/query/submit 200 OK 17ms
Run queries               http://localhost:6969/query/submit 200 OK 14ms
Run queries               http://localhost:6969/query/submit 200 OK 16ms
Run queries               http://localhost:6969/query/submit 200 OK 7ms
...
Run queries               http://localhost:6969/query/submit 200 OK 7ms
Run queries               http://localhost:6969/query/submit 200 OK 6ms

Run queries               Total requests            1000
Run queries               Successful requests       1000
Run queries               Failed requests           0
Run queries               Median time per request   6ms
Run queries               Average time per request  7ms
Run queries               Sample standard deviation 1ms

Time taken for tests      1.8 seconds
Total requests            1000
Successful requests       1000
Failed requests           0
Requests per second       547.94 [#/sec]
Median time per request   6ms
Average time per request  7ms
Sample standard deviation 1ms

第一次执行的图

通过drill运行了一个select star,其中包含重新排序、项目和从缓存中选择。

jwtrueb@jbmp hetnetdb % drill --benchmark drill.yml --stats
Concurrency 4
Iterations 1000
Rampup 2
Base URL http://localhost:6969
...
Run queries               http://localhost:6969/query/submit 200 OK 24ms
Run queries               http://localhost:6969/query/submit 200 OK 24ms
Run queries               http://localhost:6969/query/submit 200 OK 25ms

Run queries               Total requests            1000
Run queries               Successful requests       1000
Run queries               Failed requests           0
Run queries               Median time per request   17ms
Run queries               Average time per request  23ms
Run queries               Sample standard deviation 107ms

Time taken for tests      6.1 seconds
Total requests            1000
Successful requests       1000
Failed requests           0
Requests per second       165.13 [#/sec]
Median time per request   17ms
Average time per request  23ms
Sample standard deviation 107ms

以下是一些日志片段,显示了语句的编译和执行过程。

[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Found SelectStatement: SelectStatement { tables: [Table { name: "hndefault", alias: None }], distinct: false, fields: [All], join: [], where_clause: None, group_by: None, order: None, limit: None }
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Beginning collect for Server(
        Whole,
    )
    NodeInfo {
        input: Single(
            HyperNode {
                name: "project",
                columns: None,
                info: NodeInfo {
                    input: Single(
                        HyperNode {
                            name: "select_hndefault",
                            columns: None,
                            info: NodeInfo {
                                input: Leaf,
                                personality: Leaf(
                                    Ram(
                                        "hndefault",
                                    ),
                                ),
                            },
                            execution_info: Mutex {
                                is_locked: false,
                                has_waiters: false,
                            },
                        },
                    ),
                    personality: Op(
                        Project,
                    ),
                },
                execution_info: Mutex {
                    is_locked: false,
                    has_waiters: false,
                },
            },
        ),
        personality: Op(
            Reorder,
        ),
    }
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Collecting Op Reorder
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Beginning collect for Server(
        Whole,
    )
    NodeInfo {
        input: Single(
            HyperNode {
                name: "select_hndefault",
                columns: None,
                info: NodeInfo {
                    input: Leaf,
                    personality: Leaf(
                        Ram(
                            "hndefault",
                        ),
                    ),
                },
                execution_info: Mutex {
                    is_locked: false,
                    has_waiters: false,
                },
            },
        ),
        personality: Op(
            Project,
        ),
    }
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Collecting Op Project
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Beginning collect for Server(
        Whole,
    )
    NodeInfo {
        input: Leaf,
        personality: Leaf(
            Ram(
                "hndefault",
            ),
        ),
    }
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Collecting Leaf Ram("hndefault")
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Loading table_data from ram cache
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Found table_data with 1 partitions
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Processing 50 records for partition 0

依赖关系

~31–44MB
~758K SLoC