Python数据仓库与ETL构建实战_Airflow调度流程详解

13次阅读

airflow在etl中核心作用是调度与编排流程而非执行数据处理,通过DAG定义任务依赖、重试策略、定时触发及通知机制,协调python/sql/spark等实际执行工具

Python数据仓库与ETL构建实战_Airflow调度流程详解

什么是Airflow在ETL中的核心作用

Airflow不是执行ETL任务的工具,而是调度和编排ETL流程的“指挥官”。它不直接处理数据清洗或加载,但能精准控制:哪个任务先跑、失败后怎么重试、依赖关系如何串联、每天几点触发、出错时通知谁。实际项目中,真正干活的是python脚本、SQL、Spark或dbt,Airflow负责把它们按逻辑串起来、稳住节奏、留下记录。

用DAG定义一个典型的数据仓库ETL流程

DAG(有向无环图)是Airflow调度的蓝图。比如构建一张销售宽表,典型DAG包含:拉取原始订单数据 → 清洗并去重 → 关联用户维度 → 计算日销售额指标 → 写入数仓汇总表 → 发送完成通知。每个步骤是一个operator(如PythonOperator、PostgresOperator),通过set_downstream>>明确先后顺序。

  • @task装饰器写轻量python函数,比传统Operator更易调试
  • 关键任务加retries=3retry_delay=timedelta(minutes=2)防临时故障
  • 跨天任务设schedule_interval='0 2 * * *'(每天凌晨2点跑昨日数据)
  • 敏感任务用trigger_rule='all_success'确保前置全成功才执行

让Airflow真正适配数据仓库场景的实操要点

纯演示DAG跑得通,但上线后常卡在权限、性能和可观测性上。真实数据仓库ETL需注意:

  • 连接数仓(如redshift、BigQuery)时,用Connection管理凭证,避免硬编码;密码存于Airflow密钥后端(如AWS Secrets Manager)
  • 大表全量同步容易OOM,改用分页查询或增量字段(如updated_at > '{{ ds }}')配合execution_date变量
  • 在任务里加Logging.info(f"Processed {row_count} rows"),方便在ui的Task Logs里快速定位瓶颈
  • Sensor(如ExternalTaskSensor)等待上游DAG完成,避免数仓表未就绪就启动下游计算

排查调度异常的三个高频入口

Airflow报错不总在代码里,常藏在环境与配置中:

立即学习Python免费学习笔记(深入)”;

  • Web UI的Graph View:一眼看出哪步断开、是否被跳过(skipped)、是否因上游失败而未触发(upstream_failed)
  • Task Instance Details页的Log:点击具体任务→View Log,重点看最后一屏——不是开头的INFO,而是真正的Traceback或SQL错误码
  • airflow.cfg里的parallelism和max_active_tasks_per_dag并发超限会导致任务排队甚至假死,数仓批量作业建议调高但不超过数据库连接池上限
text=ZqhQzanResources