#dataframe #client-server #distributed #key-value-store #distributed-systems #map #memory

bin+lib liquid-ml

一个用于构建 UDF 分布式计算系统的大学项目

1 个不稳定版本

0.1.0 2022年6月6日

#1662数据库接口

MIT 许可证

190KB
2.5K SLoC

简介

LiquidML 是一个用于分布式、可扩展数据集分析的平台,这些数据集太大,无法在一个机器的内存中运行。它旨在易于使用,允许用户轻松创建自己的 mapfilter 操作,将其他一切留给 liquid_ml。它附带了多个示例用法,包括内置的决策树和随机森林机器学习算法,展示了该平台的功能和易用性。

LiquidML 用 Rust 编写,出于性能和安全原因,允许更轻松地进行许多优化,而不会出现内存安全问题。这有助于确保我们客户的数据安全,因为许多内存安全问题可能被恶意黑客利用。

LiquidML 目前处于 MVP 阶段。可以在 LiquidML 上构建工具,并且在此包中包含几个示例以展示各种用例。

架构

LiquidML 由以下架构组件组成

  • SoRer,用于推断模式并解析 .sor 文件
  • 网络层,用于通过 TCP 进行通信
  • KV 存储,用于将数据帧块的所有权与节点关联
  • DataFrame,用于本地和分布式数据帧
  • 应用层,用于方便使用整个系统

SoRer

SoRer 的主要目的是处理未知模式且可能存在损坏行的文件的解析。模式基于前 500 行推断,任何损坏的行都会被丢弃。有关 SoRer 的架构和实现的具体信息,请参阅这里

网络

网络层是liquid_ml实例的最低级别组件。它提供了一个简单的注册Server和一个由Server分配唯一ID的分布式Client网络,每个客户端都有一个唯一的ID。网络层使用TCP进行通信。Client在注册到Server后,能够直接向同类型的任何其他Client发送消息。

KV存储

KVStore存储与Key相关联的序列化数据块,并在内存中缓存反序列化值。在liquid_ml中,KVStore存储LocalDataFrame,以便将数据帧的块与系统中的不同节点相关联。

数据帧

liquid_ml中的数据帧受到Rpandas中数据帧的轻度启发,并支持可选命名的列。提供了许多构造函数,可以轻松以不同的方式创建任何类型的数据帧。您可以通过实现Rower特质来分析数据帧中的数据,以执行mapfilter操作。这些操作可以轻松地对适合内存的LocalDataFrame或太大而不能在一个机器中适合的DistributedDataFrame执行。

dataframe模块为数据帧提供了两种实现:一个LocalDataFrame和一个DistributedDataFrame,差异在实现部分有进一步解释。

注意:如果您需要DistributedDataFrame,强烈建议您查看LiquidML结构,因为它提供了许多方便的辅助函数,用于处理DistributedDataFrame。除非您真的知道自己在做什么,否则不建议直接使用DistributedDataFrame。在LiquidML文档中也有mapfilter的有用示例。

应用层

应用层,也称为 LiquidML 结构,是一个更高级别的 API,用于编写在整个分布式系统上执行数据分析的程序。它允许用户创建和分析多个 DistributedDataFrame,而无需担心 KVStore、节点、网络或其他分布式系统的复杂性,这使得用户能够轻松运行 mapfilter 操作。

LiquidML 还提供了一个 run 方法,该方法接收一个函数并执行该函数。用户实现的函数签名是 KVStore -> ()。这允许更高级的用户获得更底层的访问权限,以便他们可以更强大和更通用地使用系统,而不仅仅是我们的 mapfilter 实现之外。

请参阅用例部分和 examples 目录中的示例。

实现

网络

网络层由 ClientServer 结构组成,以及 network 模块中的某些辅助函数。

在网络层中,对与分布式系统相关的许多复杂边缘情况的处理很少。假设大多数事情都无错误发生,尽管进行了基本的错误检查,例如检查连接是否已关闭等。

客户端

可以使用 Client 通过以下方法在任何时候将消息直接发送到同一类型的任何其他 Client

客户端.send_msg(target_id: usize,message:Serialize)

Client 首次创建时,它必须注册到 Server 并连接到所有其他现有的 Client

Client 视角的注册过程

  1. 连接到 Server
  2. Server 发送包含此 ClientMessage<ControlMsg::Introduction> 消息,其中包含 IP:Portnetwork_name
  3. 服务器Server将以包含所有当前连接的ClientIP:Port的消息ControlMsg::Directory>响应。
  4. 新创建的Client连接到所有其他现有的Client
  5. Client等待所有其他尚未开始连接到它的Client,除非我们已经连接到所有节点。

服务器

Server通过异步注册新的Client,使用Server::accept_new_connections,并允许发送任何Message<ControlMsg>类型,例如用于有序关闭的Kill消息。

由于服务器功能相对简单,默认的服务器实现包含在LiquidML系统中,可以通过运行以下命令启动

cargo运行 --bin服务器 ----address<可选IP地址>

如果没有提供IP地址,服务器默认为127.0.0.1:9000

KV存储

内部KVStore将数据以序列化块的形式存储在内存中(也称为Vec<u8>)。KVStore基于最少使用原则将反序列化的值缓存在它们的类型T上。缓存大小的硬限制设置为机器总内存的1/3,尽管这将改为可配置。

KVStore具有内部异步消息处理任务,因为它们直接使用网络并需要与其他KVStore通信。此外,KVStore还提供了一个较低级别的网络通信接口,通过公开一个方法直接将任何序列化数据块发送到任何其他KVStore

数据帧

本地数据帧

LocalDataFrame 实现了实际的数据存储和处理。数据以列格式存储,并具有明确的结构,LocalDataFrame 定义了实际的单一和多线程的 mapfilter 操作。需要注意的是,所有的 mapfilter 操作都是按行处理的,但数据以列格式存储,以避免boxed类型并减少内存使用。

DistributedDataFrame

DistributedDataFrame 是一个在运行 KVStore 的分布式节点系统上的抽象,这些 KVStore 包含了 LocalDataFrame 的数据块。因此,每个 DistributedDataFrame 只包含一个 KVStore 的指针以及一个将行索引范围映射到具有该行索引范围的数据块 [Key] 的映射。由于 DistributedDataFrame 是不可变的,因此可以轻松保持数据帧的全局状态一致性。

因此,DistributedDataFrame 的实现主要关注网络通信以及获取和放置来自不同 KVStore 的数据块。其中一个主要问题是创建一个新的 DistributedDataFrame 意味着将所有数据块 [Key] 分发到所有节点。

在创建时,DistributedDataFrame 的节点1将从 SoR 文件、迭代器和其他方便添加数据的方式将数据块分布到多个节点。请注意,我们的实验测试发现,使用每个节点上可容纳的最大数据块可以提高性能超过 2x

由于我们的实验测试发现大块数据最适合 mapfilter 性能,我们不能简单地通过发送数据块来支持 KVStore 以支持 DistributedDataFrame 的API,因为每个数据块太大而无法通过网络传输,所以像 get 这样的方法将不起作用,除非每个 DistributedDataFrame 都有一种(有意义的)与其他 DistributedDataFrame 通信的方式,这意味着它们需要自己的 Client

应用层,即 LiquidML

由于将大部分工作委托给DistributedDataFrame,`LiquidML`结构体的实现非常简单。它只管理自己节点的状态,并允许创建和分析多个DistributedDataFrame

示例和用例

请查看`examples/`目录以获取更多功能齐全的示例。

使用`LiquidML`创建`DistributedDataFrame`并使用简单的`Rower`

此示例展示了Rower的简单实现,并使用整个LiquidML系统执行map操作,同时不了解系统的分布式内部结构。

use liquid_ml::dataframe::{Data, Rower, Row};
use liquid_ml::LiquidML;
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Clone)]
struct MyRower {
    sum: i64
}

impl Rower for MyRower {
    fn visit(&mut self, r: &Row) -> bool {
        let i = r.get(0).unwrap();
        match i {
            Data::Int(val) => {
                if *val < 0 {
                    return false;
                }
                self.sum += *val;
                true
            },
            _ => panic!(),
        }
    }

    fn join(mut self, other: Self) -> Self {
        self.sum += other.sum;
        self
    }
}

#[tokio::main]
async fn main() {
    // This main does the following:
    // 1. Creates a `LiquidML` struct, which registers with the `Server`
    //    running at the address "192.168.0.0:9000"
    // 2. Construct a new `DistributedDataFrame` with the name "my-df". If
    //    we are node 1, schema-on-read and parse the file, distributing
    //    chunks to all the other nodes. Afterwards, all nodes will have
    //    an identical `LiquidML` struct and we can call `map`
    // 3. We call `map` and each node performs the operation on their local
    //    chunk(s). Since the `Rower` trait defines how to join chunks, the
    //    results from each node will be joined until we have a result
    let mut app = LiquidML::new("192.155.22.11:9000",
                                "192.168.0.0:9000",
                                20)
                                .await
                                .unwrap();
    app.df_from_sor("foo.sor", "my-df").await.unwrap();
    let r = MyRower { sum: 0 };
    app.map("my-df", r);
}

通用、低级用例

use liquid_ml::dataframe::LocalDataFrame;
use liquid_ml::LiquidML;
use liquid_ml::kv::KVStore;
use std::sync::Arc;
use tokio::sync::RwLock;

async fn something_complicated(kv: Arc<KVStore<LocalDataFrame>>) {
    println!("Use your imagination :D");
}

#[tokio::main]
async fn main() {
    let app =
        LiquidML::new("192.15.2.1:900", "192.16.0.0:900", 20).await.unwrap();
    app.run(something_complicated).await;
}

随机森林

分布式随机森林实现可以在`examples/random_forest.rs`中找到,此示例展示了在我们分布式平台上从头开始构建的随机森林。

这目前是一个非常初步的概念验证,因为它假设最后一列是布尔标签,并且不支持布尔标签以外的标签。

该程序可以按以下方式运行:

  1. 使用以下命令启动Servercargo run --bin server
  2. 使用以下命令启动3个客户端,每个客户端具有不同的IP:Port

cargo运行 --发布 --示例随机森林 ---m<我的IP:端口> -s<服务器IP:端口> -d<数据文件路径>

有关可用的命令行参数的完整描述,可以通过运行以下命令查看:

cargo运行 --示例随机森林 ----help

Linus的度数

此示例展示了如何计算有多少人在n度内与Linus Torvalds共事。它需要一个非常大的数据文件来运行,因此数据文件不包括在此存储库中。如果您需要数据文件,请与我们联系。

代码可以在examples/seven_degrees.rs中找到

该程序可以按以下方式运行:

  1. 使用以下命令启动Servercargo run --bin server
  2. 使用以下命令启动3个客户端,每个客户端具有不同的IP:Port

cargo运行 --发布 --示例seven_degrees ---m<IP:端口> -c<数据文件的全路径>

有关可用的命令行参数的完整描述,可以通过运行以下命令查看:

cargo运行 --示例seven_degrees ----help

我们发现,即使在使用交换空间的情况下,`seven_degrees`示例的峰值内存使用量达到了18GB,并且在使用完整尺寸的文件时,4度内耗时约5分钟。

一台电脑是只有8GB RAM和i5-4690k的台式机,另一台是带有16GB RAM和i7-8550u的(插拔)笔记本电脑。

单词计数

此程序统计给定文本文件中每个唯一单词出现的次数。程序通过空格分割单词,不考虑标点符号(尽管这可以通过仅修改示例本身轻松实现),因此“foo”和“foo,”被视为不同的单词。

代码可以在examples/word_count.rs中找到

此程序运行单词计数示例,可以按以下方式运行:

  1. 使用以下命令启动Servercargo run --bin server
  2. 使用以下命令启动3个客户端,每个客户端具有不同的IP:Port

cargo运行 --发布 --示例word_count ---m<IP:端口>

有关可用的命令行参数的完整描述,可以通过运行以下命令查看:

cargo运行 --示例word_count ----help

简单演示

我们实现了一个简单的演示,将一些数字放入kv中,然后将它们相加并验证加法是否正确。

本程序运行演示程序,可以按以下方式运行:

  1. 使用以下命令启动 服务器cargo run --bin server
  2. 使用以下命令启动3个客户端,每个客户端具有不同的IP:Port

cargo运行 --示例demo_client ---m<IP:端口>

有关可用的命令行参数的完整描述,可以通过运行以下命令查看:

cargo运行 --示例demo_client ----help

第三个客户端将在末尾打印 SUCCESS

路线图

该项目不再处于积极开发状态,因为它作为学习工具已经完成了其使命,并且开发者没有足够的带宽来改进它。然而,以下是我们认为最重要的、将会有很好的改进的项目:

  • 添加从 sor 构建的功能,假设文件位于每个节点上(以减少测试/调试时的网络延迟)
  • .sor -> .csv
  • 将复制粘贴的代码(由于原始类型类型检查)替换为宏
  • 测试(特别是在分块时,以确保没有错误)
  • 调查是否应该在任何地方使用 tokio::spawn_blocking
  • 使用位图代替克隆行作为过滤结果

依赖项

~13MB
~217K SLoC