导读(Introduction)欢迎来到 Apache Airflow 源码深度解析系列的第二十八课。在数据工程的日常工作中,“回填”(Backfill)是一个高频操作。当你修复了一个数据转换逻辑的 bug、新增了一个数据列的计算、或者需要重新处理因上游系统故障导致的历史缺失数据时,你需要让 DAG 针对过去的时间区间重新执行——这就是 Backfill。与普通的定时调度不同,Backfill 涉及一系列独特的工程挑战:如何避免重复处理已成功的数据?如何控制回填的并发度以免压垮下游系统?如何处理依赖过去执行结果(depends_on_past)的任务?如何在多调度器环境下安全地并行回填?Airflow 3.x 引入了全新设计的 Backfill 模型——一个独立的数据库实体,通过BackfillDagRun关联表将回填操作与 DAG Run 解耦,支持暂停/恢复/取消等生命周期管理,并通过 REST API 和 CLI 提供完整的操作界面。本课将深入分析 Backfill 的数据模型、创建流程、调度集成和最佳实践,帮助你掌握安全高效的大规模数据回填方案。学习目标(Learning Objectives)完成本课学习后,你将能够:理解 Backfill 的概念与应用场景——明确回填在数据工程中解决的核心问题掌握 Backfill 数据模型——理解Backfill、BackfillDagRun的字段设计和关联关系理解 Backfill 与 DagRun 的关系——分析backfill_id如何关联到正常调度流程区分 catchup 与按需回填——理解两种历史数据补充方式的区别与适用场景掌握 ReprocessBehavior 策略——理解 NONE/FAILED/COMPLETED 三种重处理行为分析回填的安全保障机制——depends_on_past验证、未来日期检测、并发回填互斥使用 CLI 和 REST API 管理回填——掌握创建、暂停、取消回填的操作接口设计大规模回填方案——掌握并发控制、分区回填、错误处理等最佳实践正文内容(Main Content)1. Backfill 的概念与应用场景1.1 什么是 BackfillBackfill(回填)是指让一个 DAG 针对过去的时间区间执行,补充历史数据处理。在 Airflow 的调度模型中,每个 DAG Run 对应一个特定的数据区间(data_interval),回填就是为那些"本应执行但未执行"或"需要重新执行"的数据区间创建 DAG Run。时间线: ──────────────────────────────────────────────→ 现在 │ 2024-01-01 │ 2024-01-02 │ 2024-01-03 │ ... │ 2024-06-15 │ 正常调度:只创建当前时间点对应的 DAG Run Backfill:为 2024-01-01 到 2024-03-31 的每一天创建 DAG Run1.2 典型应用场景场景描述回填策略Bug 修复修复了数据转换逻辑,需要重算历史数据全量回填(COMPLETED)新增字段添加了新的计算列,需要填充历史值全量回填(COMPLETED)数据恢复上游系统故障导致某些天数据缺失失败重跑(FAILED)新 DAG 上线新建 DAG 需要处理从 start_date 至今的所有数据增量回填(NONE)Schema 变更数据仓库表结构变化,需要重新加载全量回填(COMPLETED)1.3 Backfill 与正常调度的区别维度正常调度Backfill触发方式调度器自动创建用户手动触发DagRun 类型scheduledbackfill_job并发控制max_active_runs(DAG 级)max_active_runs(Backfill 级)时间方向仅向前可正序或反序生命周期管理无支持暂停/恢复/取消调度器参与创建 + 调度仅调度(创建由 Backfill 系统完成)2. Backfill 数据模型2.1 Backfill 主表Backfill模型是回填操作的核心实体,记录了一次回填的完整配置和状态:# airflow-core/src/airflow/models/backfill.pyclassBackfill(Base):"""Model representing a backfill job."""__tablename__="backfill"id:Mapped[int]=mapped_column(Integer,primary_key=True,autoincrement=True)dag_id:Mapped[str]=mapped_column(StringID(),nullable=False)from_date:Mapped[datetime]=mapped_column(UtcDateTime,nullable=False)to_date:Mapped[datetime]=mapped_column(UtcDateTime,nullable=False)dag_run_conf:Mapped[dict]=mapped_column(sa.JSON(),nullable=False,default={})is_paused:Mapped[bool|None]=mapped_column(Boolean,default=False,nullable=True)reprocess_behavior:Mapped[str]=mapped_column(StringID(),nullable=False,default=ReprocessBehavior.NONE)max_active_runs:Mapped[int]=mapped_column(Integer,default=10,nullable=False)created_at:Mapped[datetime]=mapped_column(UtcDateTime,default=timezone.utcnow)completed_at:Mapped[datetime|None]=mapped_column(UtcDateTime,nullable=True)updated_at:Mapped[datetime]=mapped_column(UtcDateTime,default=timezone.utcnow,onupdate=timezone.utcnow)triggering_user_name:Mapped[str|None]=mapped_column(String(512),nullable=True)字段设计解析:字段作用设计考量dag_id关联的 DAG一个 DAG 同时只允许一个活跃的 Backfillfrom_date/to_date回填时间范围定义需要处理的数据区间dag_run_confDAG Run 配置参数支持传递运行时参数is_paused暂停标志控制是否继续创建新的 DAG Runreprocess_behavior重处理策略控制遇到已有 Run 时的行为max_active_runs最大并发数独立于 DAG 级别的并发控制completed_at完成时间NULL 表示进行中,非 NULL 表示已完成triggering_user_name触发用户审计追踪2.2 BackfillDagRun 关联表BackfillDagRun是 Backfill 和 DagRun 之间的映射表,记录了回填中每个时间点的处理状态:classBackfillDagRun(Base):"""Mapping table between backfill run and Dag run."""__tablename__="backfill_dag_run"id:Mapped[int]=mapped_column(Integer,primary_key=True,autoincrement=True)backfill_id:Mapped[int]=mapped_column(Integer,nullable=False)dag_run_id:Mapped[int|None]=mapped_column(Integer,nullable=True)exception_reason:Mapped[str|None]=mapped_column(StringID(),nullable=True)logical_date:Mapped[datetime]=mapped_column(UtcDateTime,nullable=True)partition_key:Mapped[datetime]=mapped_column(StringID(),nullable=True)sort_ordinal:Mapped[int]=mapped_column(Integer,nullable=False)__table_args__=(UniqueConstraint("backfill_id","dag_run_id",name="ix_bdr_backfill_id_dag_run_id"),ForeignKeyConstraint([backfill_id],["backfill.id"],name="bdr_backfill_fkey",ondelete="cascade",),ForeignKeyConstraint([dag_run_id],["dag_run.id"],name="bdr_dag_run_fkey",ondelete="set null",),)@validates("sort_ordinal")defvalidate_sort_ordinal(self,key,val):ifval1:raiseValueError("sort_ordinal must be = 1")returnval关键设计要点:dag_run_id可为 NULL:当 DAG Run 无法创建时(如已存在同时间点的 Run),dag_run_id为 NULL,原因记录在exception_reason中ondelete="cascade":删除 Backfill 时自动清除所有关联记录ondelete="set null":删除 DagRun 时保留 BackfillDagRun 记录,仅置空dag_run_idsort_ordinal:记录执行顺序,支持正序和反序回填2.3 BackfillDagRunExceptionReason当某个时间点的 DAG Run 无法创建时,原因被记录为枚举值:classBackfillDagRunExceptionReason(str,Enum):IN_FLIGHT="in flight"# 该时间点的 Run 正在执行中ALREADY_EXISTS="already exists"# 已存在且不需要重处理UNKNOWN=