我有以下任務一直在運行我知道這是因為它在 Snowflake 中運行查詢並且我不斷收到 DUO 推送通知。每一個。5秒!我能做些什麼來阻止它並且只在 DAG 運行時運行它
這是任務:
create_foreign_keys = SnowflakeQueryOperator(
dag=dag,
task_id='check_and_run_foreign_key_query',
sql=SnowHook().run_fk_alter_statements(schema,query),
trigger_rule=TriggerRule.ALL_DONE
)
這是在sql部分調用的方法:
def run_fk_alter_statements(self, schema, additional_fk):
fk_query_path = "/fkeys.sql"
fd = open(f'{fk_query_path}', 'r')
query = fd.read()
fd.close()
additions = []
for fk in additional_fk:
additions.append(f""" or (t2.table_name = '{fk['table_name']}' and t2.column_name = '{fk['column_name']}'
and t1.table_name = '{fk['ref_table_name']}' and t1.column_name = '{fk['ref_column_name']}')\n""".upper())
raw_out = self.execute_query(query.format(schema=schema, fks=''.join(additions)), fetch_all=True)
query_jobs = []
for raw_query in raw_out:
query_jobs.append(raw_query[0])
return query_jobs
在sql=SnowHook().run_fk_alter_statements(schema,query)
你的實例調用SnowflakeQueryOperator
實際上是頂級的代碼,以便將執行每次DAG由調度分析時間。您需要找到一種方法來在運算符的execute()
方法中調用該函數。
您可以添加一個 TaskFlow 函數/PythonOperator
任務以將輸出推run_fk_alter_statements()
送到XCom
,然後SnowflakeQueryOperator
使用它XCom
來執行生成的 SQL。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句