77个版本 (30个重大变更)

0.42.0 2024年8月14日
0.41.3 2024年7月2日
0.41.2 2024年6月24日
0.38.3 2024年3月18日
0.12.1 2021年2月13日

#20数据库实现

Download history 16709/week @ 2024-05-05 20511/week @ 2024-05-12 20744/week @ 2024-05-19 25443/week @ 2024-05-26 25020/week @ 2024-06-02 26741/week @ 2024-06-09 29395/week @ 2024-06-16 31337/week @ 2024-06-23 31898/week @ 2024-06-30 31284/week @ 2024-07-07 31203/week @ 2024-07-14 40310/week @ 2024-07-21 42574/week @ 2024-07-28 35511/week @ 2024-08-04 37657/week @ 2024-08-11 46659/week @ 2024-08-18

每月下载量167,045
168 个crate中使用(8个直接使用)

MIT 许可证

7.5MB
180K SLoC

polars-lazy

polars-lazy 作为 Polars DataFrame库的延迟查询引擎。它允许以延迟方式对DataFrames进行操作,仅在必要时执行。这可以显著提高大型数据集的性能。

重要提示:此crate 不适用于外部使用。请参阅主 Polars crate 了解预期用途。


lib.rs:

Polars的延迟API

Polars的延迟API支持急切API的部分子集。除了分布式计算之外,它与 Apache Spark 非常相似。您可以使用特定领域语言编写查询。这些查询被转换为逻辑计划,表示查询步骤。在执行之前,此逻辑计划将被优化,可能改变操作顺序以提高性能。或者,可以添加隐式类型转换,以防止查询执行时出现类型错误(如果可以解决)。

Polars的延迟DSL

Polars的延迟API通过将 DataFrame 替换为 LazyFrame 来替换急切API。 LazyFrame 表示逻辑执行计划:对具体数据源执行的一系列操作。这些操作在调用 collect 之前不会执行。这允许Polars优化/重新排序查询,从而可能提高查询速度或减少类型错误。

通常,一个LazyFrame需要一个具体的数据源,比如一个DataFrame、磁盘上的文件等。然后,polars-lazy会应用用户指定的操作序列。要从现有的DataFrame获取一个LazyFrame,我们可以在DataFrame上调用lazy方法。还可以通过文件读取器的lazy版本,如LazyCsvReader,来获取LazyFrame

polars-lazy API的另一个主要组件是Expr,它表示要在LazyFrame上执行的操作,如映射列、过滤或groupby聚合。可以在dsl模块中找到Expr及其生成函数。

大多数对LazyFrame的操作都会消耗LazyFrame,并返回一个包含更新计划的新的LazyFrame。如果您需要多次使用同一个LazyFrame,您应该先调用clone,并可选地预先cache它。

示例

向懒DataFrame添加新列

 #[macro_use] extern crate polars_core;
 use polars_core::prelude::*;
 use polars_lazy::prelude::*;

 let df = df! {
     "column_a" => &[1, 2, 3, 4, 5],
     "column_b" => &["a", "b", "c", "d", "e"]
 }.unwrap();

 let new = df.lazy()
     // Note the reverse here!!
     .reverse()
     .with_column(
         // always rename a new column
         (col("column_a") * lit(10)).alias("new_column")
     )
     .collect()
     .unwrap();

 assert!(new.column("new_column")
     .unwrap()
     .equals(
         &Series::new("new_column", &[50, 40, 30, 20, 10])
     )
 );

基于某些谓词修改列

 #[macro_use] extern crate polars_core;
 use polars_core::prelude::*;
 use polars_lazy::prelude::*;

 let df = df! {
     "column_a" => &[1, 2, 3, 4, 5],
     "column_b" => &["a", "b", "c", "d", "e"]
 }.unwrap();

 let new = df.lazy()
     .with_column(
         // value = 100 if x < 3 else x
         when(
             col("column_a").lt(lit(3))
         ).then(
             lit(100)
         ).otherwise(
             col("column_a")
         ).alias("new_column")
     )
     .collect()
     .unwrap();

 assert!(new.column("new_column")
     .unwrap()
     .equals(
         &Series::new("new_column", &[100, 100, 3, 4, 5])
     )
 );

Groupby + 聚合

 use polars_core::prelude::*;
 use polars_core::df;
 use polars_lazy::prelude::*;
 use arrow::legacy::prelude::QuantileInterpolOptions;

 fn example() -> PolarsResult<DataFrame> {
     let df = df!(
         "date" => ["2020-08-21", "2020-08-21", "2020-08-22", "2020-08-23", "2020-08-22"],
         "temp" => [20, 10, 7, 9, 1],
         "rain" => [0.2, 0.1, 0.3, 0.1, 0.01]
     )?;

     df.lazy()
     .group_by([col("date")])
     .agg([
         col("rain").min().alias("min_rain"),
         col("rain").sum().alias("sum_rain"),
         col("rain").quantile(lit(0.5), QuantileInterpolOptions::Nearest).alias("median_rain"),
     ])
     .sort(["date"], Default::default())
     .collect()
 }

调用任何函数

以下我们懒调用一个自定义闭包,其类型为Series => Result<Series>。因为闭包改变了Series的类型/变体,所以我们还定义了返回类型。这很重要,因为由于延迟,类型应该在事先就知道。注意,通过应用这些自定义函数,您可以访问Series/ChunkedArrays的整个急切API

 #[macro_use] extern crate polars_core;
 use polars_core::prelude::*;
 use polars_lazy::prelude::*;

 let df = df! {
     "column_a" => &[1, 2, 3, 4, 5],
     "column_b" => &["a", "b", "c", "d", "e"]
 }.unwrap();

 let new = df.lazy()
     .with_column(
         col("column_a")
         // apply a custom closure Series => Result<Series>
         .map(|_s| {
             Ok(Some(Series::new("", &[6.0f32, 6.0, 6.0, 6.0, 6.0])))
         },
         // return type of the closure
         GetOutput::from_type(DataType::Float64)).alias("new_column")
     )
     .collect()
     .unwrap();

连接、过滤和投影

在下面的查询中,我们进行了一个懒连接,然后根据谓词a < 2过滤行。最后,我们选择列"b""c_first"。在急切API中,此查询将非常低效,因为我们将在比所需的更多列和行上连接DataFrame。在这种情况下,查询优化器将在连接之前执行列的选择(投影)和行的过滤(选择),从而减少查询所需的工作量。


fn example(df_a: DataFrame, df_b: DataFrame) -> LazyFrame {
    df_a.lazy()
    .left_join(df_b.lazy(), col("b_left"), col("b_right"))
    .filter(
        col("a").lt(lit(2))
    )
    .group_by([col("b")])
    .agg(
        vec![col("b").first().alias("first_b"), col("c").first().alias("first_c")]
     )
    .select(&[col("b"), col("c_first")])
}

如果我们想要对所有列进行聚合,可以使用通配符运算符*来实现。


fn aggregate_all_columns(df_a: DataFrame) -> LazyFrame {
    df_a.lazy()
    .group_by([col("b")])
    .agg(
        vec![col("*").first()]
     )
}

依赖关系

~11-50MB
~765K SLoC