77个版本 (30个重大变更)
0.42.0 | 2024年8月14日 |
---|---|
0.41.3 | 2024年7月2日 |
0.41.2 | 2024年6月24日 |
0.38.3 | 2024年3月18日 |
0.12.1 | 2021年2月13日 |
#20 在 数据库实现
每月下载量167,045
在 168 个crate中使用(8个直接使用)
7.5MB
180K SLoC
polars-lazy
polars-lazy
作为 Polars DataFrame库的延迟查询引擎。它允许以延迟方式对DataFrames进行操作,仅在必要时执行。这可以显著提高大型数据集的性能。
重要提示:此crate 不适用于外部使用。请参阅主 Polars crate 了解预期用途。
lib.rs
:
Polars的延迟API
Polars的延迟API支持急切API的部分子集。除了分布式计算之外,它与 Apache Spark 非常相似。您可以使用特定领域语言编写查询。这些查询被转换为逻辑计划,表示查询步骤。在执行之前,此逻辑计划将被优化,可能改变操作顺序以提高性能。或者,可以添加隐式类型转换,以防止查询执行时出现类型错误(如果可以解决)。
Polars的延迟DSL
Polars的延迟API通过将 DataFrame
替换为 LazyFrame
来替换急切API。 LazyFrame
表示逻辑执行计划:对具体数据源执行的一系列操作。这些操作在调用 collect
之前不会执行。这允许Polars优化/重新排序查询,从而可能提高查询速度或减少类型错误。
通常,一个LazyFrame
需要一个具体的数据源,比如一个DataFrame
、磁盘上的文件等。然后,polars-lazy会应用用户指定的操作序列。要从现有的DataFrame
获取一个LazyFrame
,我们可以在DataFrame
上调用lazy
方法。还可以通过文件读取器的lazy版本,如LazyCsvReader
,来获取LazyFrame
。
polars-lazy API的另一个主要组件是Expr
,它表示要在LazyFrame
上执行的操作,如映射列、过滤或groupby聚合。可以在dsl模块中找到Expr
及其生成函数。
大多数对LazyFrame
的操作都会消耗LazyFrame
,并返回一个包含更新计划的新的LazyFrame
。如果您需要多次使用同一个LazyFrame
,您应该先调用clone
,并可选地预先cache
它。
示例
向懒DataFrame添加新列
#[macro_use] extern crate polars_core;
use polars_core::prelude::*;
use polars_lazy::prelude::*;
let df = df! {
"column_a" => &[1, 2, 3, 4, 5],
"column_b" => &["a", "b", "c", "d", "e"]
}.unwrap();
let new = df.lazy()
// Note the reverse here!!
.reverse()
.with_column(
// always rename a new column
(col("column_a") * lit(10)).alias("new_column")
)
.collect()
.unwrap();
assert!(new.column("new_column")
.unwrap()
.equals(
&Series::new("new_column", &[50, 40, 30, 20, 10])
)
);
基于某些谓词修改列
#[macro_use] extern crate polars_core;
use polars_core::prelude::*;
use polars_lazy::prelude::*;
let df = df! {
"column_a" => &[1, 2, 3, 4, 5],
"column_b" => &["a", "b", "c", "d", "e"]
}.unwrap();
let new = df.lazy()
.with_column(
// value = 100 if x < 3 else x
when(
col("column_a").lt(lit(3))
).then(
lit(100)
).otherwise(
col("column_a")
).alias("new_column")
)
.collect()
.unwrap();
assert!(new.column("new_column")
.unwrap()
.equals(
&Series::new("new_column", &[100, 100, 3, 4, 5])
)
);
Groupby + 聚合
use polars_core::prelude::*;
use polars_core::df;
use polars_lazy::prelude::*;
use arrow::legacy::prelude::QuantileInterpolOptions;
fn example() -> PolarsResult<DataFrame> {
let df = df!(
"date" => ["2020-08-21", "2020-08-21", "2020-08-22", "2020-08-23", "2020-08-22"],
"temp" => [20, 10, 7, 9, 1],
"rain" => [0.2, 0.1, 0.3, 0.1, 0.01]
)?;
df.lazy()
.group_by([col("date")])
.agg([
col("rain").min().alias("min_rain"),
col("rain").sum().alias("sum_rain"),
col("rain").quantile(lit(0.5), QuantileInterpolOptions::Nearest).alias("median_rain"),
])
.sort(["date"], Default::default())
.collect()
}
调用任何函数
以下我们懒调用一个自定义闭包,其类型为Series => Result<Series>
。因为闭包改变了Series的类型/变体,所以我们还定义了返回类型。这很重要,因为由于延迟,类型应该在事先就知道。注意,通过应用这些自定义函数,您可以访问Series/ChunkedArrays的整个急切API。
#[macro_use] extern crate polars_core;
use polars_core::prelude::*;
use polars_lazy::prelude::*;
let df = df! {
"column_a" => &[1, 2, 3, 4, 5],
"column_b" => &["a", "b", "c", "d", "e"]
}.unwrap();
let new = df.lazy()
.with_column(
col("column_a")
// apply a custom closure Series => Result<Series>
.map(|_s| {
Ok(Some(Series::new("", &[6.0f32, 6.0, 6.0, 6.0, 6.0])))
},
// return type of the closure
GetOutput::from_type(DataType::Float64)).alias("new_column")
)
.collect()
.unwrap();
连接、过滤和投影
在下面的查询中,我们进行了一个懒连接,然后根据谓词a < 2
过滤行。最后,我们选择列"b"
和"c_first"
。在急切API中,此查询将非常低效,因为我们将在比所需的更多列和行上连接DataFrame。在这种情况下,查询优化器将在连接之前执行列的选择(投影)和行的过滤(选择),从而减少查询所需的工作量。
fn example(df_a: DataFrame, df_b: DataFrame) -> LazyFrame {
df_a.lazy()
.left_join(df_b.lazy(), col("b_left"), col("b_right"))
.filter(
col("a").lt(lit(2))
)
.group_by([col("b")])
.agg(
vec![col("b").first().alias("first_b"), col("c").first().alias("first_c")]
)
.select(&[col("b"), col("c_first")])
}
如果我们想要对所有列进行聚合,可以使用通配符运算符*
来实现。
fn aggregate_all_columns(df_a: DataFrame) -> LazyFrame {
df_a.lazy()
.group_by([col("b")])
.agg(
vec![col("*").first()]
)
}
依赖关系
~11-50MB
~765K SLoC