Airflow 2.0 - 本地運行保持運行功能

克里斯蒂盧娜

我有以下任務一直在運行我知道這是因為它在 Snowflake 中運行查詢並且我不斷收到 DUO 推送通知。每一個。5秒!我能做些什麼來阻止它並且只在 DAG 運行時運行它

這是任務:

create_foreign_keys = SnowflakeQueryOperator(
    dag=dag,
    task_id='check_and_run_foreign_key_query',
    sql=SnowHook().run_fk_alter_statements(schema,query),
    trigger_rule=TriggerRule.ALL_DONE
)

這是在sql部分調用的方法:

def run_fk_alter_statements(self, schema, additional_fk):

    fk_query_path = "/fkeys.sql"

    fd = open(f'{fk_query_path}', 'r')
    query = fd.read()
    fd.close()

    additions = []


    for fk in additional_fk:
            additions.append(f""" or (t2.table_name = '{fk['table_name']}' and t2.column_name = '{fk['column_name']}'
                            and t1.table_name = '{fk['ref_table_name']}' and t1.column_name = '{fk['ref_column_name']}')\n""".upper())


    raw_out = self.execute_query(query.format(schema=schema, fks=''.join(additions)), fetch_all=True)

    query_jobs = []
    for raw_query in raw_out:
        query_jobs.append(raw_query[0])


    return query_jobs
喬什·菲爾

sql=SnowHook().run_fk_alter_statements(schema,query)你的實例調用SnowflakeQueryOperator實際上是頂級的代碼,以便將執行每次DAG由調度分析時間。您需要找到一種方法來在運算符的execute()方法中調用該函數

您可以添加一個 TaskFlow 函數/PythonOperator任務以將輸出推run_fk_alter_statements()送到XCom,然後SnowflakeQueryOperator使用它XCom來執行生成的 SQL。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

在 Apache Airflow 2 中監控長時間運行的任務的進度

Airflow Dag 不會自動運行

Airflow - 創建在所有其他任務成功運行後運行的任務

如何獲取運行ID並將其放入Airflow中的變量中

如何在不退出代碼 0 的情況下保持我的 docker-compose 運行

PyTorch - 運行時錯誤:張量的大小必須匹配,但維度 2 除外。得到 55 和 54(違規索引為 0)

在 EC2 上運行涼亭

為什麼此獲取 Airflow 上下文的代碼會在 DAG 導入時運行?

為什麼在 python 中使用 map 時運行時為 0

Airflow始终在Airflow本地主机中加载相同的连接

在 Reactor 和 r2dbc 中使用 Thymeleaf 使用火災和忘記策略運行長時間運行的作業

Airflow 2 Push Xcom with Key Name

Airflow 2:获取任务内的执行日期

第 2 次運行代碼後的 C 雙免費錯誤

Apache 使用位置標籤運行 2 laravel 應用程序

在“運行 PL/SQL”窗口中增加 OUT VARCHAR2 變量的默認大小

Android 在任務列表中運行 2 個彼此相鄰的活動

如何使用 Playwright 運行 e2e Angular 測試?

循環運行 5 次但只要求輸入 2 次

为什么 Airflow PythonOperator 任务失败但返回码为 0?

是否可以使用 #if NET6_0_OR_GREATER 從 BenchmarkDotNet 運行中排除基準方法?

在運行 aws ec2 run-instances 時為 ec2 實例指定卷類型

在 Briceno 等人的 A5/2 實現中,他們延遲 LSFR 週期而不運行時鐘週期功能。有人可以幫我理解嗎?

Apache Airflow DAG无法导入本地模块

Airflow 2 - ModuleNotFoundError:没有名为“airflow.operators.sensors”的模块

升级到 Airflow 2,没有名为“airflow.hooks.base”的模块

當它可以在 CMD 上運行時,如何運行此功能

運行 Dijkstra 算法

無法連接到使用 Docker 運行並託管在 EC2 上的 Dash 應用程序