Polars UDF 行级映射与多行结果 DataFrame 的高效拼接

1次阅读

Polars UDF 行级映射与多行结果 DataFrame 的高效拼接

本文详解如何在 Polars 中正确实现「每行输入生成一个同结构但行数可变的 DataFrame」的 UDF 模式,对比 map_rows、iter_rows + concat 与表达式向量化三种方案,强调性能陷阱与最佳实践。

本文详解如何在 polars 中正确实现「每行输入生成一个同结构但行数可变的 dataframe」的 udf 模式,对比 `map_rows`、`iter_rows + concat` 与表达式向量化三种方案,强调性能陷阱与最佳实践。

在 Polars 中,当需要对每一行输入执行复杂逻辑(如基于参数切片参考数据、生成变长结果集)并最终合并为单个 DataFrame 时,开发者常误用 map_rows 返回嵌套 DataFrame,导致性能劣化与类型混乱。核心原则是:map_rows 本质是逐行标量映射,不适用于返回多行结构;真正支持“1→N”行扩展的操作应使用 iter_rows + pl.concat() 或(更优)完全向量化表达式重构

✅ 推荐方案一:避免 map_rows,改用 iter_rows + pl.concat()

这是最直接、语义清晰且可控性高的方式,尤其适合逻辑无法轻易向量化(如需调用外部库、动态切片大表等)的场景:

import polars as pl  df = pl.DataFrame({     "foo": [1, 2, 3],     "bar": [6.0, 7.0, 8.0],     "ham": ["a", "b", "c"], })  reference_data = pl.DataFrame({     "x": list(range(10)),     "y": [chr(ord("a") + i % 26) for i in range(10)], })  def myUDF(row_tuple):     foo, bar, ham = row_tuple     # 动态切片:从 foo 开始取 floor(bar/2) 行     n_rows = int(bar / 2)     return (         reference_data         .slice(foo, n_rows)         .with_columns(name=pl.lit(ham))     )  # ✅ 正确做法:逐行调用 UDF 并拼接 result = pl.concat([myUDF(row) for row in df.iter_rows()], how="vertical") print(result)

输出:

shape: (10, 3) ┌─────┬─────┬──────┐ │ x   ┆ y   ┆ name │ │ --- ┆ --- ┆ ---  │ │ i64 ┆ str ┆ str  │ ╞═════╪═════╪══════╡ │ 1   ┆ b   ┆ a    │ │ 2   ┆ c   ┆ a    │ │ 3   ┆ d   ┆ a    │ │ 2   ┆ c   ┆ b    │ │ 3   ┆ d   ┆ b    │ │ 4   ┆ e   ┆ b    │ │ 3   ┆ d   ┆ c    │ │ 4   ┆ e   ┆ c    │ │ 5   ┆ f   ┆ c    │ │ 6   ┆ g   ┆ c    │ └─────┴─────┴──────┘

⚠️ 注意事项:

  • pl.concat(…, how=”vertical”) 要求所有子 DataFrame 具有完全一致的 schema(列名、类型、顺序),否则会报错;
  • iter_rows() 返回 Python tuple,默认按列顺序解包,确保 UDF 内部解构顺序与 DataFrame 列序严格一致;
  • 若 reference_data 极大(如千万级),建议预先 .lazy() 化并在 UDF 中构建 LazyFrame,最后统一 .collect(),避免重复 eager 计算。

✅ 推荐方案二:彻底向量化 —— 用 Polars 表达式替代 UDF(最高性能)

若逻辑可表达为列运算(如本例中“切片范围由 foo 和 bar 决定”),应优先放弃行级循环,转为纯表达式操作:

# ✅ 向量化实现(无 Python 循环,充分利用 Polars 引擎) result_vec = (     df     .with_row_index("row_idx")  # 添加索引用于后续 join 或 expand     .with_columns(         start = pl.col("foo"),         stop = (pl.col("bar") / 2).cast(pl.Int64),         name = pl.col("ham")     )     # 生成每个 row 对应的 range(start, stop),再 explode 展开     .with_columns(         range_list = pl.arange(pl.col("start"), pl.col("stop"), eager=False)     )     .explode("range_list")     .join(         reference_data.rename({"x": "range_list"}),         on="range_list",         how="left"     )     .select("x", "y", "name") )

该方式零 Python 解释器开销,支持线程并行,是 Polars 的设计哲学首选。

❌ 不推荐方案:滥用 map_rows 返回 DataFrame

map_rows 的设计目标是返回标量元组(如 (a, b)),其输出会被自动构造成新 DataFrame 的一行。若在函数内创建 DataFrame 并返回(如 return (result,)),Polars 仅将其作为 Object 类型存储,后续需 .unnest().explode() 等昂贵操作,丧失 Polars 的内存与计算优势:

# ❌ 反模式:低效、类型丢失、难以调试 df.map_rows(lambda r: (pl.DataFrame({"a": [r[0]+r[1]], "b": [r[2]]}),))

总结:选择路径的决策树

场景 推荐方式 理由
逻辑完全可用 Polars 表达式描述(如算术、条件、窗口、join) 纯表达式链 最高性能、可并行、惰性优化、类型安全
需动态访问外部资源 / 调用非 Polars 函数 / 复杂控制流 iter_rows() + pl.concat() 语义明确、易于调试、可控性强;注意 schema 一致性
临时调试或极小数据集 map_rows + 标量元组(非 DataFrame) 仅限简单 1→1 映射,避免嵌套结构

记住:Polars 的强大源于列式计算与查询优化器,而非模拟 pandas 的 .apply(axis=1)。拥抱向量化,谨慎使用行级迭代——这才是“正确的 Polars 方式”。

text=ZqhQzanResources