导读(Introduction)欢迎来到 Apache Airflow 源码深度解析系列的第十五课。在前面的课程中,我们深入探索了 Airflow 的调度引擎、执行框架、资产系统和 Trigger 异步机制。这些底层组件构成了 Airflow 的"引擎",但要让用户和外部系统能够与这台引擎交互——查看 DAG 状态、触发任务、管理连接和变量——就需要一套精心设计的API 服务层。Airflow 3.x 在 API 架构上做出了根本性的变革:彻底抛弃了 Airflow 2.x 中基于 Flask + Connexion 的传统架构,全面拥抱FastAPI作为底层 Web 框架。这不仅仅是换一个框架那么简单——它带来了原生异步支持、自动 OpenAPI 文档生成、类型安全的请求验证、以及基于依赖注入的优雅安全层。更具创新性的是,Airflow 3.x 引入了双层 API 架构:面向用户的Core API(/api/v2)和面向 Worker 的Execution API(/execution)。这种分离体现了 Airflow 架构向微服务化演进的设计哲学——不同的消费者有不同的需求,应该由不同的接口来服务。本课将带你深入 Airflow 的 API 服务架构,从应用入口到路由注册,从认证授权到版本管理,全面剖析这套现代化的 API 系统是如何构建的。学习目标(Learning Objectives)完成本课学习后,你将能够:理解双层 API 架构设计——明确 Core API 与 Execution API 的职责边界和服务对象掌握 FastAPI 应用启动流程——从create_app()到子应用挂载、中间件注册的完整链路分析 Core API 路由体系——理解public_router、authenticated_router的分层设计和各资源路由模块深入 Execution API 安全机制——JWT Token 类型(execution/workload)、JWTBearer验证、ExecutionAPIRoute的 scope 预计算理解认证与授权框架——BaseAuthManager抽象层、SimpleAuthManager实现、RBAC 权限模型掌握 JWT 令牌系统——JWTGenerator/JWTValidator的对称/非对称签名、JWKS 密钥集管理、令牌刷新与撤销实践 API 自动化运维——使用 REST API 构建自动化脚本正文内容(Main Content)1. 双层 API 架构:设计哲学与全局视图1.1 为什么需要两套 API?在 Airflow 2.x 中,所有组件(Web UI、CLI、Worker)共享同一套 REST API。这种设计在功能上可行,但在实践中暴露了几个问题:问题说明安全粒度不足Worker 只需要执行任务相关的接口,却拥有全部 API 的访问权限性能瓶颈用户管理接口的复杂查询与 Worker 的高频状态更新共享同一个服务版本耦合Worker 与 API Server 的协议演进被用户接口拖累认证复杂度用户认证(用户名/密码/OAuth)与组件认证(服务间通信)混在一起Airflow 3.x 通过引入双层架构来解决这些问题:┌─────────────────────────────────────────────────────────┐ │ FastAPI Main App │ │ (Airflow API) │ ├──────────────────────────┬──────────────────────────────┤ │ Core API (/api/v2) │ Execution API (/execution) │ │ │ │ │ ┌─ DAGs │ ┌─ Task Instances │ │ ├─ DagRuns │ ├─ XComs │ │ ├─ TaskInstances │ ├─ Connections │ │ ├─ Connections │ ├─ Variables │ │ ├─ Variables │ ├─ Dag Runs │ │ ├─ Pools │ ├─ Assets │ │ ├─ Assets │ └─ Health │ │ ├─ Config │ │ │ ├─ Monitor │ 认证: JWT (execution/workload │ │ └─ ... │ token scopes) │ │ │ │ │ 认证: JWT + Auth Manager│ 消费者: Task SDK / Worker │ │ 消费者: Web UI / CLI / │ │ │ 外部系统 │ │ └──────────────────────────┴──────────────────────────────┘Core API(/api/v2):面向人类用户和外部系统。提供 DAG 管理、运行监控、连接/变量 CRUD 等管理接口。使用用户身份认证(JWT + Auth Manager),支持 RBAC 权限控制。Execution API(/execution):面向Task SDK 和 Worker 进程。提供任务状态上报、XCom 数据交换、运行时连接/变量获取等执行接口。使用短期 JWT 令牌(Task Identity Token),支持细粒度的 scope 控制。1.2 应用入口:create_app()的启动链路整个 API 服务的入口位于airflow-core/src/airflow/api_fastapi/app.py,通过create_app()函数完成所有初始化:# airflow-core/src/airflow/api_fastapi/app.py@providers_configuration_loadeddefcreate_app(apps:str="all")-FastAPI:apps_list=apps.split(",")ifappselse["all"]app=FastAPI(title="Airflow API",description="...",lifespan=lifespan,root_path=API_ROOT_PATH.removesuffix("/"),version="2",docs_url="/docs"ifconf.getboolean("api","enable_swagger_ui")elseNone,redoc_url="/redoc"ifconf.getboolean("api","enable_swagger_ui")elseNone,)dag_bag=create_dag_bag()if"all"inapps_listor"execution"inapps_list:task_exec_api_app=create_task_execution_api_app()task_exec_api_app.state.dag_bag=dag_bag init_error_handlers(task_exec_api_app)app.mount("/execution",task_exec_api_app)if"all"inapps_listor"core"inapps_list:app.state.dag_bag=dag_bag init_plugins(app)init_auth_manager(app)init_flask_plugins(app)init_views(app)init_error_handlers(app)init_middlewares(app)init_config(app)returnapp关键设计要点:apps参数控制子应用加载:生产环境可以将 Core API 和 Execution API 部署在不同的进程/容器中,通过apps="core"或apps="execution"分别启动。Execution API 作为子应用挂载:通过app.mount("/execution", task_exec_api_app)实现路径隔离。FastAPI 的mount机制使子应用拥有独立的中间件栈、错误处理和生命周期。共享 DagBag:两个 API 共享同一个dag_bag实例,存储在各自的app.state中,避免重复反序列化 DAG 定义。@providers_configuration_loaded装饰器:确保在创建应用之前加载所有 Provider 的配置。初始化顺序有讲究:init_views(app)必须最后调用,因为它注册了一个 catch-all 路由(/{rest_of_path:path})用于 SPA 前端。1.3 保留 URL 前缀RESERVED_URL_PREFIXES=["/api/v2","/ui","/execution"]Airflow 通过预定义保留前缀来防止插件注册冲突——任何试图使用这些前缀的插件都会被拒绝并记录错误日志。2. Core API:面向用户的管理接口2.1 路由注册体系Core API 的路由注册遵循清晰的分层结构,定义在core_api/routes/public/__init__.py中:# airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.pypublic_router=AirflowRouter(prefix="/api/v2")# 需要认证的路由组authenticated_router=AirflowRouter(responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED,status.HTTP_403_FORBIDDEN]),)# 注册所有需要认证的资源路由authenticated_router.include_router(assets_router)authenticated_router.include_router(connections_router)authenticated_router.include_router(dag_run_router)authenticated_router.include_router(dags_router)authenticated_router.include_router(task_instances_router)authenticated_router.include_router(variables_router)authenticated_router.include_router(pools_router)authenticated_router.include_router(xcom_router)# ... 更多路由# 将认证路由组纳入公共路由public_router.include_router(authenticated_router)# 不需要认证的路由(健康检查、版本信息、认证端点)public_router.include_router(monitor_router)public_router.include_router(version_router)public_router.include_router(auth_router)这种设计的巧妙之处在于:authenticated_router统一声明了 401/403 的 OpenAPI 文档,所有子路由自动继承。monitor_router和version_router直接挂在public_router下,无需认证即可访问——这对健康检查探针(如 Kubernetes readiness probe)至关重要。AirflowRouter是APIRouter的扩展,自动将函数名作为operation_id,确保 OpenAPI 文档中操作 ID 的一致性。2.2 核心资源路由模块每个资源路由模块遵循统一的设计模式。以 DAG 管理路由为例:# airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.pydags_router=AirflowRouter(tags=["DAG"],prefix="/dags")@dags_router.get("",dependencies=[Depends(requires_access_dag(method="GET"))])defget_dags(limit:QueryLimit,offset:QueryOffset,tags:QueryTagsFilter,owners:QueryOwnersFilter,dag_id_pattern:QueryDagIdPatternSearch,paused:QueryPausedFilter,# ... 更多过滤参数readable_dags_filter:ReadableDagsFilterDep,session:SessionDep,dag_bag:DagBagDep,)-DAGCollectionResponse:"""List DAGs."""...每个路由方法的参数都利用了 FastAPI 的依赖注入机制:参数类型说明QueryLimit/QueryOffset标准分页参数,通过Annotated[int, Query(...)]定义QueryTagsFilter过滤参数工厂生成的类型标注,自动转换为 SQLAlchemy 过滤条件ReadableDagsFilterDep权限过滤器,基于当前用户的 RBAC 角色限制可见 DAGSessionDep数据库会话依赖,自动管理事务生命周期DagBagDep从app.state获取 DagBag 的依赖2.3 Connection 管理路由示例# airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.pyconnections_router=AirflowRouter(tags=["Connection"],prefix="/connections")@connections_router.delete("/{connection_id}",status_code=status.HTTP_204_NO_CONTENT,responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),dependencies=[Depends(requires_access_connection(method="DELETE")),Depends(action_logging()),],)defdelete_connection(connection_id:str,session:SessionDep):"""Delete a connection entry."""connection=session.scalar(select(Connection).filter_by(conn_id=connection_id))ifconnectionisNone:raiseHTTPException(status.HTTP_404_NOT_FOUND,f"The Connection with connection_id: `{connection_id}` was not found",)session.delete(connection)注意dependencies中同时包含了权限检查(requires_access_connection)和操作审计(action_logging),这些都通过依赖注入在路由处理之前执行。2.4 Core API 资源全景路由前缀模块核心功能/dagsdags.py列表查询、暂停/恢复、删除 DAG/dags/{dag_id}/dagRunsdag_run.py触发/查询/清除 DagRun/dags/{dag_id}/dagRuns/{run_id}/taskInstancestask_instances.pyTaskInstance 状态查询与操作/connectionsconnections.py连接的 CRUD、批量操作、测试连接/variablesvariables.py变量的 CRUD、批量操作/poolspools.py资源池管理/assetsassets.py数据资产管理/configconfig.py运行时配置查询/monitor/healthmonitor.py健康检查/versionversion.pyAirflow 版本信息3. Execution API:面向 Worker 的执行接口3.1 设计定位Execution API 是 Airflow 3.x 架构中的内部通信协议——它连接 API Server 与 Task SDK(运行在 Worker 上的任务执行代码)。当 Worker 上的任务需要获取连接信息、存储 XCom、上报状态时,都通过 Execution API 完成。# airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.pyexecution_api_router=APIRouter()execution_api_router.include_router(health.router,prefix="/health",tags=["Health"])# 所有认证路由authenticated_router=VersionedAPIRouter(dependencies=[Security(require_auth)])authenticated_router.include_router(task_instances.router,prefix="/task-instances",tags=["Task Instances"])authenticated_router.include_router(xcoms.router,prefix="/xcoms",tags=["XComs"])authenticated_router.include_router(connections.router,prefix="/connections",tags=["Connections"])authenticated_router.include_router(variables.router,prefix="/variables",tags=["Variables"])authenticated_router.include_router(dag_runs.router,prefix="/dag-runs",tags=["Dag Runs"])authenticated_rou