PySpark 中基于左连接合并 DataFrame 并动态标记数据来源表

9次阅读

PySpark 中基于左连接合并 DataFrame 并动态标记数据来源表

本文介绍如何在 pyspark 中对两个 dataframe 执行左连接,填充缺失字段,并新增 `src` 列标识数据来源(如 `tbl1` 或 `tbl2`),最终统一输出为标准格式。

在 PySpark 数据处理中,常需将主表(如用户邮箱表)与参考表(如员工编号映射表)进行关联,以补全或校验关键字段。本例中,df1 包含 mail_id 和 e_no(员工号),df2 包含 email 和 emp_no,但 df2 中部分 email 为空 —— 这意味着:当 df1 的邮箱在 df2 中未匹配时,应优先使用 df1 自身的 e_no 值作为 emp_no,并标记来源为 tbl1;反之,若成功匹配,则取 df2 的 emp_no 并标记为 tbl2。

关键点在于连接条件与 src 判断逻辑的一致性:原问题中错误地用 e_no == emp_no 连接,但实际业务语义是“用邮箱对齐”,即 df1.mail_id == df2.email;同时,src 的判断应基于连接后 emp_no 是否为空(因左连接下 df2.emp_no 为 NULL 表示未匹配),而非检查 mail_id 字段本身。

以下是完整、可运行的解决方案:

from pyspark.sql import functions as F from pyspark.sql.functions import col, when  # 步骤1:执行左连接(以 df1 为主表,按邮箱对齐) result = df1.join(df2, df1.mail_id == df2.email, "left")  # 步骤2:构造 src 列——emp_no 为 null 表示未从 df2 匹配到,故来源为 tbl1;否则为 tbl2 result = result.withColumn(     "src",      when(col("emp_no").isNull(), "tbl1").otherwise("tbl2") )  # 步骤3:选择并重命名字段,统一输出结构 # 若 emp_no 为空,取 df1 的 e_no;否则取 df2 的 emp_no final_result = result.select(     col("mail_id").alias("email"),     when(col("emp_no").isNull(), col("e_no")).otherwise(col("emp_no")).alias("emp_no"),     "src" )  final_result.show(truncate=False)

✅ 输出效果(与预期一致):

+------------------+------+-----+ |email             |emp_no|src  | +------------------+------+-----+ |[email protected] |111   |tbl1 | |[email protected] |222   |tbl2 | |[email protected] |333   |tbl2 | +------------------+------+-----+

⚠️ 注意事项:

  • 连接键必须语义正确:本例是邮箱关联,不是员工号相等,切勿误用 df1.e_no == df2.emp_no;
  • isNull() 判断对象是连接后的列:col(“emp_no”).isNull() 可靠反映 df2 是否提供有效值;
  • 避免空字符串陷阱:df2.email 含空值(非 null),但左连接后 emp_no 仍可能为 null,因此直接判 emp_no 更健壮;
  • 若后续需去重或处理多匹配,建议在连接前对 df2 按 email 去重(如 df2.dropDuplicates([“email”]))。

该方案结构清晰、逻辑明确,适用于 etl 中常见的“主表补全 + 来源追踪”场景。

text=ZqhQzanResources