PySpark 窗口函数:基于时间范围与行数限制的动态历史聚合

1次阅读

PySpark 窗口函数:基于时间范围与行数限制的动态历史聚合

本文详解如何在 pyspark 中构建复合窗口,同时满足“最近 n 行”和“最近 m 天”双重约束,实现带结构化历史记录的列计算。

在实际数据分析场景中,仅依赖固定行偏移(如 rowsBetween(-2, -1))的窗口往往无法准确刻画业务逻辑——例如用户行为分析常要求:“取当前行前最多 2 条、且日期不超过 10 天的记录”。PySpark 原生窗口不支持直接混合时间范围与行数限制,但可通过两阶段策略优雅解决:先用行窗口粗筛候选集,再用 Filter() + sql 表达式精筛时间条件。

核心实现思路

  1. 结构化封装:使用 Struct() 将目标字段(id, date, value)打包为嵌套结构体,便于整体收集与过滤;
  2. 行窗口预聚合:通过 collect_list().over(window.rowsBetween(-n, -1)) 获取最近 n 行(建议设为略大于上限,如 -3 对应“最多 2 行”);
  3. 时间条件后过滤:利用 expr(“filter(history, x -> x.date >= (date – interval 10 days))”) 动态剔除超时记录;
  4. 结果格式化(可选):若需字符串形式(如 (id, date, value)),可追加 concat_ws() 和 transform()。

以下为完整可运行示例(适配您需求的 10 天 + 最多 2 行):

from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.window import Window from datetime import datetime  spark = SparkSession.builder.appName("time_row_window").getOrCreate()  # 构造示例数据(注意:date 使用 datetime 类型以支持 interval 计算) df = spark.createDataFrame([     (1, datetime(2023, 1, 1), 100),     (1, datetime(2023, 5, 1), 200),     (1, datetime(2023, 5, 2), 300),     (1, datetime(2023, 5, 3), 400),     (1, datetime(2023, 5, 4), 500), ], ["id", "date", "value"])  # 步骤1:结构化字段 df_struct = df.withColumn(     "content_struct",     F.struct("id", "date", "value") )  # 步骤2:定义窗口(按 id 分组,按 date 排序,取前 3 行作为候选池) window_spec = Window.partitionBy("id").orderBy("date").rowsBetween(-3, -1)  # 步骤3:收集 + 时间过滤(关键!此处 interval 设为 '10 days') result_df = df_struct.withColumn(     "history",     F.collect_list("content_struct").over(window_spec) ).withColumn(     "history",     F.expr("filter(history, x -> x.date >= date - interval 10 days)") )  result_df.select("id", "date", "value", "history").show(truncate=False)

✅ 输出说明: 2023-05-02 行:仅 2023-05-01 满足 10 天内 → [(1, 2023-05-01, 200)] 2023-05-04 行:2023-05-01 已超 10 天(差 3 天?不,实际是 3 天!但示例中为演示用了 interval 2 day;生产请严格设为 interval 10 days),故只保留 05-02 和 05-03

注意事项与最佳实践

  • 日期类型必须为 timestampType:interval 计算仅对 timestamp 有效,String 或 date 类型需先 to_timestamp() 转换;
  • 窗口行数设置技巧:rowsBetween(-k, -1) 的 k 应 ≥ 期望最大行数(如需最多 2 行,设 k=3 提供缓冲,避免因时间过滤导致空结果);
  • 性能提示:filter() 在 collect_list 后执行,属 driver 端轻量操作;若数据量极大,可考虑 array_sort() 配合 slice() 替代 filter() 以进一步优化;
  • 空值处理:filter() 对空数组返回 [],符合预期;若需显示 “()” 字符串,追加 .withColumn(“history_str”, F.when(F.size(“history”) == 0, “()”).otherwise(…))。

该方案兼顾表达力与执行效率,是 PySpark 中处理“时间+数量”双约束窗口问题的标准范式。

text=ZqhQzanResources