1 个不稳定版本
0.1.0 | 2022年6月6日 |
---|
#1662 在 数据库接口
190KB
2.5K SLoC
简介
LiquidML 是一个用于分布式、可扩展数据集分析的平台,这些数据集太大,无法在一个机器的内存中运行。它旨在易于使用,允许用户轻松创建自己的 map
和 filter
操作,将其他一切留给 liquid_ml
。它附带了多个示例用法,包括内置的决策树和随机森林机器学习算法,展示了该平台的功能和易用性。
LiquidML 用 Rust 编写,出于性能和安全原因,允许更轻松地进行许多优化,而不会出现内存安全问题。这有助于确保我们客户的数据安全,因为许多内存安全问题可能被恶意黑客利用。
LiquidML 目前处于 MVP 阶段。可以在 LiquidML 上构建工具,并且在此包中包含几个示例以展示各种用例。
架构
LiquidML 由以下架构组件组成
- SoRer,用于推断模式并解析
.sor
文件 - 网络层,用于通过 TCP 进行通信
- KV 存储,用于将数据帧块的所有权与节点关联
- DataFrame,用于本地和分布式数据帧
- 应用层,用于方便使用整个系统
SoRer
SoRer
的主要目的是处理未知模式且可能存在损坏行的文件的解析。模式基于前 500 行推断,任何损坏的行都会被丢弃。有关 SoRer
的架构和实现的具体信息,请参阅这里
网络
网络层是liquid_ml
实例的最低级别组件。它提供了一个简单的注册Server
和一个由Server
分配唯一ID的分布式Client
网络,每个客户端都有一个唯一的ID。网络层使用TCP
进行通信。Client
在注册到Server
后,能够直接向同类型的任何其他Client
发送消息。
KV存储
KVStore
存储与Key
相关联的序列化数据块,并在内存中缓存反序列化值。在liquid_ml
中,KVStore
存储LocalDataFrame
,以便将数据帧的块与系统中的不同节点相关联。
数据帧
liquid_ml
中的数据帧受到R
或pandas
中数据帧的轻度启发,并支持可选命名的列。提供了许多构造函数,可以轻松以不同的方式创建任何类型的数据帧。您可以通过实现Rower
特质来分析数据帧中的数据,以执行map
或filter
操作。这些操作可以轻松地对适合内存的LocalDataFrame
或太大而不能在一个机器中适合的DistributedDataFrame
执行。
dataframe
模块为数据帧提供了两种实现:一个LocalDataFrame
和一个DistributedDataFrame
,差异在实现部分有进一步解释。
注意:如果您需要DistributedDataFrame
,强烈建议您查看LiquidML
结构,因为它提供了许多方便的辅助函数,用于处理DistributedDataFrame
。除非您真的知道自己在做什么,否则不建议直接使用DistributedDataFrame
。在LiquidML
文档中也有map
和filter
的有用示例。
应用层
应用层,也称为 LiquidML
结构,是一个更高级别的 API,用于编写在整个分布式系统上执行数据分析的程序。它允许用户创建和分析多个 DistributedDataFrame
,而无需担心 KVStore
、节点、网络或其他分布式系统的复杂性,这使得用户能够轻松运行 map
或 filter
操作。
LiquidML
还提供了一个 run
方法,该方法接收一个函数并执行该函数。用户实现的函数签名是 KVStore -> ()
。这允许更高级的用户获得更底层的访问权限,以便他们可以更强大和更通用地使用系统,而不仅仅是我们的 map
和 filter
实现之外。
请参阅用例部分和 examples
目录中的示例。
实现
网络
网络层由 Client
和 Server
结构组成,以及 network
模块中的某些辅助函数。
在网络层中,对与分布式系统相关的许多复杂边缘情况的处理很少。假设大多数事情都无错误发生,尽管进行了基本的错误检查,例如检查连接是否已关闭等。
客户端
可以使用 Client
通过以下方法在任何时候将消息直接发送到同一类型的任何其他 Client
:
客户端.send_msg(target_id: usize,message:Serialize)
当 Client
首次创建时,它必须注册到 Server
并连接到所有其他现有的 Client
。
从 Client
视角的注册过程
- 连接到
Server
- 向
Server
发送包含此Client
的Message<ControlMsg::Introduction>
消息,其中包含IP:Port
和network_name
。 - 服务器Server将以包含所有当前连接的
Client
的IP:Port
的消息ControlMsg::Directory>
响应。 - 新创建的
Client
连接到所有其他现有的Client
。 Client
等待所有其他尚未开始连接到它的Client
,除非我们已经连接到所有节点。
服务器
Server
通过异步注册新的Client
,使用Server::accept_new_connections
,并允许发送任何Message<ControlMsg>
类型,例如用于有序关闭的Kill
消息。
由于服务器功能相对简单,默认的服务器实现包含在LiquidML
系统中,可以通过运行以下命令启动
cargo运行 --bin服务器 ----address<可选IP地址>
如果没有提供IP地址,服务器默认为127.0.0.1:9000
。
KV存储
内部KVStore
将数据以序列化块的形式存储在内存中(也称为Vec<u8>
)。KVStore
基于最少使用原则将反序列化的值缓存在它们的类型T
上。缓存大小的硬限制设置为机器总内存的1/3
,尽管这将改为可配置。
KVStore
具有内部异步消息处理任务,因为它们直接使用网络并需要与其他KVStore
通信。此外,KVStore
还提供了一个较低级别的网络通信接口,通过公开一个方法直接将任何序列化数据块发送到任何其他KVStore
。
数据帧
本地数据帧
LocalDataFrame
实现了实际的数据存储和处理。数据以列格式存储,并具有明确的结构,LocalDataFrame
定义了实际的单一和多线程的 map
和 filter
操作。需要注意的是,所有的 map
和 filter
操作都是按行处理的,但数据以列格式存储,以避免boxed类型并减少内存使用。
DistributedDataFrame
DistributedDataFrame
是一个在运行 KVStore
的分布式节点系统上的抽象,这些 KVStore
包含了 LocalDataFrame
的数据块。因此,每个 DistributedDataFrame
只包含一个 KVStore
的指针以及一个将行索引范围映射到具有该行索引范围的数据块 [Key
] 的映射。由于 DistributedDataFrame
是不可变的,因此可以轻松保持数据帧的全局状态一致性。
因此,DistributedDataFrame
的实现主要关注网络通信以及获取和放置来自不同 KVStore
的数据块。其中一个主要问题是创建一个新的 DistributedDataFrame
意味着将所有数据块 [Key
] 分发到所有节点。
在创建时,DistributedDataFrame
的节点1将从 SoR
文件、迭代器和其他方便添加数据的方式将数据块分布到多个节点。请注意,我们的实验测试发现,使用每个节点上可容纳的最大数据块可以提高性能超过 2x
。
由于我们的实验测试发现大块数据最适合 map
和 filter
性能,我们不能简单地通过发送数据块来支持 KVStore
以支持 DistributedDataFrame
的API,因为每个数据块太大而无法通过网络传输,所以像 get
这样的方法将不起作用,除非每个 DistributedDataFrame
都有一种(有意义的)与其他 DistributedDataFrame
通信的方式,这意味着它们需要自己的 Client
。
应用层,即 LiquidML
由于将大部分工作委托给DistributedDataFrame
,`LiquidML`结构体的实现非常简单。它只管理自己节点的状态,并允许创建和分析多个DistributedDataFrame
。
示例和用例
请查看`examples/`目录以获取更多功能齐全的示例。
使用`LiquidML`创建`DistributedDataFrame`并使用简单的`Rower`
此示例展示了Rower
的简单实现,并使用整个LiquidML
系统执行map
操作,同时不了解系统的分布式内部结构。
use liquid_ml::dataframe::{Data, Rower, Row};
use liquid_ml::LiquidML;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Clone)]
struct MyRower {
sum: i64
}
impl Rower for MyRower {
fn visit(&mut self, r: &Row) -> bool {
let i = r.get(0).unwrap();
match i {
Data::Int(val) => {
if *val < 0 {
return false;
}
self.sum += *val;
true
},
_ => panic!(),
}
}
fn join(mut self, other: Self) -> Self {
self.sum += other.sum;
self
}
}
#[tokio::main]
async fn main() {
// This main does the following:
// 1. Creates a `LiquidML` struct, which registers with the `Server`
// running at the address "192.168.0.0:9000"
// 2. Construct a new `DistributedDataFrame` with the name "my-df". If
// we are node 1, schema-on-read and parse the file, distributing
// chunks to all the other nodes. Afterwards, all nodes will have
// an identical `LiquidML` struct and we can call `map`
// 3. We call `map` and each node performs the operation on their local
// chunk(s). Since the `Rower` trait defines how to join chunks, the
// results from each node will be joined until we have a result
let mut app = LiquidML::new("192.155.22.11:9000",
"192.168.0.0:9000",
20)
.await
.unwrap();
app.df_from_sor("foo.sor", "my-df").await.unwrap();
let r = MyRower { sum: 0 };
app.map("my-df", r);
}
通用、低级用例
use liquid_ml::dataframe::LocalDataFrame;
use liquid_ml::LiquidML;
use liquid_ml::kv::KVStore;
use std::sync::Arc;
use tokio::sync::RwLock;
async fn something_complicated(kv: Arc<KVStore<LocalDataFrame>>) {
println!("Use your imagination :D");
}
#[tokio::main]
async fn main() {
let app =
LiquidML::new("192.15.2.1:900", "192.16.0.0:900", 20).await.unwrap();
app.run(something_complicated).await;
}
随机森林
分布式随机森林实现可以在`examples/random_forest.rs`中找到,此示例展示了在我们分布式平台上从头开始构建的随机森林。
这目前是一个非常初步的概念验证,因为它假设最后一列是布尔标签,并且不支持布尔标签以外的标签。
该程序可以按以下方式运行:
- 使用以下命令启动
Server
:cargo run --bin server
- 使用以下命令启动3个客户端,每个客户端具有不同的
IP:Port
:
cargo运行 --发布 --示例随机森林 ---m<我的IP:端口> -s<服务器IP:端口> -d<数据文件路径>
有关可用的命令行参数的完整描述,可以通过运行以下命令查看:
cargo运行 --示例随机森林 ----help
Linus的度数
此示例展示了如何计算有多少人在n
度内与Linus Torvalds共事。它需要一个非常大的数据文件来运行,因此数据文件不包括在此存储库中。如果您需要数据文件,请与我们联系。
代码可以在examples/seven_degrees.rs
中找到
该程序可以按以下方式运行:
- 使用以下命令启动
Server
:cargo run --bin server
- 使用以下命令启动3个客户端,每个客户端具有不同的
IP:Port
:
cargo运行 --发布 --示例seven_degrees ---m<IP:端口> -c<数据文件的全路径>
有关可用的命令行参数的完整描述,可以通过运行以下命令查看:
cargo运行 --示例seven_degrees ----help
我们发现,即使在使用交换空间的情况下,`seven_degrees`示例的峰值内存使用量达到了18GB
,并且在使用完整尺寸的文件时,4度内耗时约5分钟。
一台电脑是只有8GB RAM和i5-4690k的台式机,另一台是带有16GB RAM和i7-8550u的(插拔)笔记本电脑。
单词计数
此程序统计给定文本文件中每个唯一单词出现的次数。程序通过空格分割单词,不考虑标点符号(尽管这可以通过仅修改示例本身轻松实现),因此“foo”和“foo,”被视为不同的单词。
代码可以在examples/word_count.rs
中找到
此程序运行单词计数示例,可以按以下方式运行:
- 使用以下命令启动
Server
:cargo run --bin server
- 使用以下命令启动3个客户端,每个客户端具有不同的
IP:Port
:
cargo运行 --发布 --示例word_count ---m<IP:端口>
有关可用的命令行参数的完整描述,可以通过运行以下命令查看:
cargo运行 --示例word_count ----help
简单演示
我们实现了一个简单的演示,将一些数字放入kv
中,然后将它们相加并验证加法是否正确。
本程序运行演示程序,可以按以下方式运行:
- 使用以下命令启动
服务器
:cargo run --bin server
- 使用以下命令启动3个客户端,每个客户端具有不同的
IP:Port
:
cargo运行 --示例demo_client ---m<IP:端口>
有关可用的命令行参数的完整描述,可以通过运行以下命令查看:
cargo运行 --示例demo_client ----help
第三个客户端将在末尾打印 SUCCESS
。
路线图
该项目不再处于积极开发状态,因为它作为学习工具已经完成了其使命,并且开发者没有足够的带宽来改进它。然而,以下是我们认为最重要的、将会有很好的改进的项目:
- 添加从 sor 构建的功能,假设文件位于每个节点上(以减少测试/调试时的网络延迟)
.sor
->.csv
- 将复制粘贴的代码(由于原始类型类型检查)替换为宏
- 测试(特别是在分块时,以确保没有错误)
- 调查是否应该在任何地方使用
tokio::spawn_blocking
- 使用位图代替克隆行作为过滤结果
依赖项
~13MB
~217K SLoC