為什麼此獲取 Airflow 上下文的代碼會在 DAG 導入時運行?

安娜_希望

我有一個 Airflow DAG,我需要從 Airflow 上下文中獲取觸發 DAG 的參數。

以前,我擁有在 DAG 步驟中獲取這些參數的代碼(我使用的是 Airflow 2 中的 Taskflow API)——類似於:

from typing import Dict, Any, List
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago

default_args = {"owner": "airflow"}

@dag(
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval=None,
    tags=["my_pipeline"],
)
def my_pipeline():
    @task(multiple_outputs=True)
    def get_params() -> Dict[str, Any]:
        context = get_current_context()
        params = context["params"]
        assert isinstance(params, dict)
        return params

    params = get_params()

pipeline = my_pipeline()

這按預期工作。

但是,我需要分幾個步驟來獲取這些參數,所以我認為最好將代碼移動到全局範圍內的一個單獨的函數中,如下所示:

# ...
from airflow.operators.python import get_current_context

# other top-level code here

def get_params() -> Dict[str, Any]:
    context = get_current_context()
    params = context["params"]
    return params

@dag(...)
def my_pipeline():
    @task()
    def get_data():
        params = get_params()

    # other DAG tasks here
    get_data()

pipeline = my_pipeline()

現在,這在 DAG 導入時中斷,出現以下錯誤(名稱已更改以匹配上面的示例):

Broken DAG: [/home/airflow/gcs/dags/my_pipeline.py] Traceback (most recent call last):
  File "/home/airflow/gcs/dags/my_pipeline.py", line 26, in get_params
    context = get_context()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 467, in get_context
    raise AirflowException(
airflow.exceptions.AirflowException: Current context was requested but no context was found! Are you running within an airflow task?

我知道錯誤在說什麼以及如何修復它(移動代碼以將上下文返回到 a 中@task)。但我的問題是——為什麼在 DAG 導入時會出現錯誤?

get_params不會在其他任務之外的任何地方被調用,並且這些任務顯然在 DAG 運行之前不會運行。那麼為什麼在get_params導入 DAG 時代碼運行正常呢?

在這一點上,我想理解這一點,因為這個錯誤出現的事實打破了我對導入時如何評估 Python 模塊的理解。在函數運行之前,函數中的代碼不應運行,並且在運行之前可能出現的唯一錯誤是SyntaxError(也許還有一些我現在不記得的其他核心錯誤)。

Airflow 是在做一些特殊的魔法,還是我遺漏了一些更簡單的事情?

我正在運行由 Google Cloud Composer 1.17.2 管理的 Airflow 2.1.2。

喬什·菲爾

不幸的是,我無法重現您的問題。下面的類似代碼在 Airflow 2.0、2,1 和 2.2 上解析、呈現 DAG 並成功完成:

from datetime import datetime
from typing import Any, Dict

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context


def get_params() -> Dict[str, Any]:
    context = get_current_context()
    params = context["params"]
    return params


@dag(
    dag_id="get_current_context_test",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    params={"my_param": "param_value"},
)
def my_pipeline():
    @task()
    def get_data():
        params = get_params()
        print(params)

    get_data()


pipeline = my_pipeline()

任務日誌片段: 任務日誌

但是context對象可以在任務修飾的函數中直接訪問。您可以更新任務簽名以包含一個 arg for params=None(使用默認值,以便文件解析無TypeError異常),然後對該 arg 應用您需要的任何邏輯。這是可以做到的tidag_run等過。也許這有幫助?

@dag(
    dag_id="get_current_context_test",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    params={"my_param": "param_value"},
)
def my_pipeline():
    @task()
    def get_data(params=None):
        print(params)

    get_data()


pipeline = my_pipeline()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

為什麼在只導入類時運行類定義之外的代碼?

Airflow Dag 不會自動運行

Airflow DAG - 使用 SimpleHttpOperator 访问上下文以启用 XCOM 拉取

為什麼此代碼不會導致重新定義錯誤?

為什麼這個閉包代碼在不使用捕獲列表時會拋出錯誤?

當我運行下面的代碼時,它會打印“4294967295”而不是“-1”,為什麼?

為什麼tomcat運行時jsp會渲染?

如何從此代碼中獲取alertdialog.builder 的上下文?

為什麼在運行我的 discord.py 代碼時出現錯誤?

為什麼我需要輸入兩次才能運行代碼

為什麼在組件中導入 VS 硬編碼時,對像數組的行為會有所不同?

為什麼這段代碼會導致hitbox異常?

為什麼相同的生成的彙編代碼不會導致相同的輸出?

為什麼 OCaml 中的模塊類型註釋會導致此代碼無法編譯?

為什麼我的Qt程序中未執行的代碼會導致程序崩潰?

為什麼 for 循環在使用相同的代碼時會調用警告?

Airflow DAG 的可选参数

當 n 等於 -100 並且例如邏輯為假時,為什麼此代碼打印“正”?

為什麼我的 java Scanner.nextLine() 代碼有時會跳過一行?

為什麼我運行代碼時 elif 沒有註冊?簡單的骰子遊戲

為什麼很長的 packageName 會在作為量詞導入之前導致換行?

為什麼當我運行 javascript 函數時我的 RAM 會超載?

為什麼我會收到 bar_index 的運行時錯誤?

為什麼GDB在GCC編譯運行正常時會顯示segmentation fault

為什麼在 bash 中運行 for 循環會導致不需要的輸出而不是循環?

為什麼在嘗試安裝 mongoDB 時會出現此錯誤?

為什麼將此 Google 表格公式設為 ARRAYFORMULA 會改變其行為?

為什麼等待會導致我的流被讀取

為什麼我的代碼會針對此問題返回錯誤?