Polars 中对连续 n 行进行分组聚合的惯用方法

9次阅读

Polars 中对连续 n 行进行分组聚合的惯用方法

在 polars 中,对 dataframe 按固定行数(如每 n 行)进行滑动或分段聚合,最简洁、高效且符合惯用法的方式是结合 `with_row_index()` 与 `group_by_dynamic()`,并显式将索引列转为有符号整型(如 `int32`)。

当需要将大型 DataFrame(例如百万级行、多列)按连续 n 行(如 n = 100)划分为非重叠段并执行聚合(如均值、求和、统计量等)时,group_by_dynamic(index_column=…, every=”ni”) 是 Polars 官方推荐且性能最优的方案——它专为基于索引/时间的动态分组设计,底层高度优化,支持并行执行,远优于手动切片或 group_by(pl.int_range().floor_div(n)) 等替代写法。

你最初的实现逻辑正确,但可进一步简化与健壮化。推荐写法如下:

import polars as pl  df = pl.DataFrame({"a": [1, 1, 3, 8, 62, 535, 4213], "b": [10, 20, 30, 40, 50, 60, 70]})  n = 3 result = (     df     .with_row_index()  # 自动添加 'index' 列(UInt32 类型)     .group_by_dynamic(         index_column=pl.col("index").cast(pl.Int32),  # 必须转为有符号整型         every=f"{n}i",                                 # "3i" 表示每 3 行一组(i = Integer)         period=f"{n}i",                                # 可选:若需重叠窗口,设 period < every         closed="left"                                  # 默认行为:[0,3), [3,6), ...     )     .agg(         pl.col("a").mean().alias("a_mean"),         pl.col("b").sum().alias("b_sum"),         pl.col("a").count().alias("n_rows")  # 验证每组实际行数(末组可能不足 n 行)     ) ) print(result)

关键要点说明:

  • with_row_index() 比 with_columns(index=pl.int_range(...)) 更简洁、语义更清晰,且避免手写长度计算;
  • group_by_dynamic 要求 index_column 为有符号整型(Int32 或 Int64),因此必须 .cast(pl.Int32);
  • "ni" 中的 i 表示 integer(整数步长),区别于时间单位(如 "3h"),这是对行索引分组的正确语法;
  • 若数据量极大(如 10**6 行),该方案仍保持 O(N) 时间复杂度与良好内存局部性,无显式 python 循环或副本开销;
  • 注意:末组可能不足 n 行(如 7 行分 3 行一组 → 三组:[0–2], [3–5], [6]),closed="left" 确保行为可预测;如需严格丢弃不完整组,可在后续用 Filter(pl.col("n_rows") == n)。

⚠️ 不推荐的替代方式(性能或可维护性较差):

  • df.group_by(pl.int_range(0, pl.len()).floor_div(n)):虽可行,但 int_range + floor_div 构造键列效率略低,且语义不如 group_by_dynamic 直观;
  • 手动切片循环(for i in range(0, len(df), n): ...):完全违背 Polars 向量化原则,性能急剧下降;
  • 使用 rolling:适用于重叠窗口(如移动平均),而非非重叠分段聚合。

综上,你已走在正确的路上——只需将 with_columns(...int_range...) 升级为 with_row_index() 并补上类型转换,便是 Polars 社区公认的惯用、高效、可扩展的最佳实践。

text=ZqhQzanResources