我有一個 Airflow DAG,我需要從 Airflow 上下文中獲取觸發 DAG 的參數。
以前,我擁有在 DAG 步驟中獲取這些參數的代碼(我使用的是 Airflow 2 中的 Taskflow API)——類似於:
from typing import Dict, Any, List
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago
default_args = {"owner": "airflow"}
@dag(
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None,
tags=["my_pipeline"],
)
def my_pipeline():
@task(multiple_outputs=True)
def get_params() -> Dict[str, Any]:
context = get_current_context()
params = context["params"]
assert isinstance(params, dict)
return params
params = get_params()
pipeline = my_pipeline()
這按預期工作。
但是,我需要分幾個步驟來獲取這些參數,所以我認為最好將代碼移動到全局範圍內的一個單獨的函數中,如下所示:
# ...
from airflow.operators.python import get_current_context
# other top-level code here
def get_params() -> Dict[str, Any]:
context = get_current_context()
params = context["params"]
return params
@dag(...)
def my_pipeline():
@task()
def get_data():
params = get_params()
# other DAG tasks here
get_data()
pipeline = my_pipeline()
現在,這在 DAG 導入時中斷,出現以下錯誤(名稱已更改以匹配上面的示例):
Broken DAG: [/home/airflow/gcs/dags/my_pipeline.py] Traceback (most recent call last):
File "/home/airflow/gcs/dags/my_pipeline.py", line 26, in get_params
context = get_context()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 467, in get_context
raise AirflowException(
airflow.exceptions.AirflowException: Current context was requested but no context was found! Are you running within an airflow task?
我知道錯誤在說什麼以及如何修復它(移動代碼以將上下文返回到 a 中@task
)。但我的問題是——為什麼在 DAG 導入時會出現錯誤?
get_params
不會在其他任務之外的任何地方被調用,並且這些任務顯然在 DAG 運行之前不會運行。那麼為什麼在get_params
導入 DAG 時代碼運行正常呢?
在這一點上,我想理解這一點,因為這個錯誤出現的事實打破了我對導入時如何評估 Python 模塊的理解。在函數運行之前,函數中的代碼不應運行,並且在運行之前可能出現的唯一錯誤是SyntaxError
(也許還有一些我現在不記得的其他核心錯誤)。
Airflow 是在做一些特殊的魔法,還是我遺漏了一些更簡單的事情?
我正在運行由 Google Cloud Composer 1.17.2 管理的 Airflow 2.1.2。
不幸的是,我無法重現您的問題。下面的類似代碼在 Airflow 2.0、2,1 和 2.2 上解析、呈現 DAG 並成功完成:
from datetime import datetime
from typing import Any, Dict
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
def get_params() -> Dict[str, Any]:
context = get_current_context()
params = context["params"]
return params
@dag(
dag_id="get_current_context_test",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
params={"my_param": "param_value"},
)
def my_pipeline():
@task()
def get_data():
params = get_params()
print(params)
get_data()
pipeline = my_pipeline()
但是,context
對象可以在任務修飾的函數中直接訪問。您可以更新任務簽名以包含一個 arg for params=None
(使用默認值,以便文件解析無TypeError
異常),然後對該 arg 應用您需要的任何邏輯。這是可以做到的ti
,dag_run
等過。也許這有幫助?
@dag(
dag_id="get_current_context_test",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
params={"my_param": "param_value"},
)
def my_pipeline():
@task()
def get_data(params=None):
print(params)
get_data()
pipeline = my_pipeline()
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句