
本文详解如何在 pyspark 中构建复合窗口,同时满足“最近 n 行”和“最近 m 天”双重约束,实现带结构化历史记录的列计算。
在实际数据分析场景中,仅依赖固定行偏移(如 rowsBetween(-2, -1))的窗口往往无法准确刻画业务逻辑——例如用户行为分析常要求:“取当前行前最多 2 条、且日期不超过 10 天的记录”。PySpark 原生窗口不支持直接混合时间范围与行数限制,但可通过两阶段策略优雅解决:先用行窗口粗筛候选集,再用 Filter() + sql 表达式精筛时间条件。
核心实现思路
- 结构化封装:使用 Struct() 将目标字段(id, date, value)打包为嵌套结构体,便于整体收集与过滤;
- 行窗口预聚合:通过 collect_list().over(window.rowsBetween(-n, -1)) 获取最近 n 行(建议设为略大于上限,如 -3 对应“最多 2 行”);
- 时间条件后过滤:利用 expr(“filter(history, x -> x.date >= (date – interval 10 days))”) 动态剔除超时记录;
- 结果格式化(可选):若需字符串形式(如 (id, date, value)),可追加 concat_ws() 和 transform()。
以下为完整可运行示例(适配您需求的 10 天 + 最多 2 行):
from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.window import Window from datetime import datetime spark = SparkSession.builder.appName("time_row_window").getOrCreate() # 构造示例数据(注意:date 使用 datetime 类型以支持 interval 计算) df = spark.createDataFrame([ (1, datetime(2023, 1, 1), 100), (1, datetime(2023, 5, 1), 200), (1, datetime(2023, 5, 2), 300), (1, datetime(2023, 5, 3), 400), (1, datetime(2023, 5, 4), 500), ], ["id", "date", "value"]) # 步骤1:结构化字段 df_struct = df.withColumn( "content_struct", F.struct("id", "date", "value") ) # 步骤2:定义窗口(按 id 分组,按 date 排序,取前 3 行作为候选池) window_spec = Window.partitionBy("id").orderBy("date").rowsBetween(-3, -1) # 步骤3:收集 + 时间过滤(关键!此处 interval 设为 '10 days') result_df = df_struct.withColumn( "history", F.collect_list("content_struct").over(window_spec) ).withColumn( "history", F.expr("filter(history, x -> x.date >= date - interval 10 days)") ) result_df.select("id", "date", "value", "history").show(truncate=False)
✅ 输出说明: 2023-05-02 行:仅 2023-05-01 满足 10 天内 → [(1, 2023-05-01, 200)] 2023-05-04 行:2023-05-01 已超 10 天(差 3 天?不,实际是 3 天!但示例中为演示用了 interval 2 day;生产请严格设为 interval 10 days),故只保留 05-02 和 05-03
注意事项与最佳实践
- 日期类型必须为 timestampType:interval 计算仅对 timestamp 有效,String 或 date 类型需先 to_timestamp() 转换;
- 窗口行数设置技巧:rowsBetween(-k, -1) 的 k 应 ≥ 期望最大行数(如需最多 2 行,设 k=3 提供缓冲,避免因时间过滤导致空结果);
- 性能提示:filter() 在 collect_list 后执行,属 driver 端轻量操作;若数据量极大,可考虑 array_sort() 配合 slice() 替代 filter() 以进一步优化;
- 空值处理:filter() 对空数组返回 [],符合预期;若需显示 “()” 字符串,追加 .withColumn(“history_str”, F.when(F.size(“history”) == 0, “()”).otherwise(…))。
该方案兼顾表达力与执行效率,是 PySpark 中处理“时间+数量”双约束窗口问题的标准范式。