我正在使用气流1.10.2,但气流似乎忽略了我为DAG设置的超时。
我正在使用dagrun_timeout参数设置DAG的超时时间(例如20秒),并且我有一个任务需要2分钟才能运行,但是气流将DAG标记为成功!
args = {
'owner': 'me',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}
dag = DAG('test_timeout',
schedule_interval=None,
default_args=args,
dagrun_timeout=timedelta(seconds=20))
def this_passes(**kwargs):
return
def this_passes_with_delay(**kwargs):
time.sleep(120)
return
would_succeed = PythonOperator(task_id='would_succeed',
dag=dag,
python_callable=this_passes,
email=to)
would_succeed_with_delay = PythonOperator(task_id='would_succeed_with_delay',
dag=dag,
python_callable=this_passes_with_delay,
email=to)
would_succeed >> would_succeed_with_delay
不会引发任何错误消息。我使用的参数不正确吗?
如源代码中所述:
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created. The timeout
is only enforced for scheduled DagRuns, and only once the
# of active DagRuns == max_active_runs.
因此,这可能是您设置的预期行为schedule_interval=None
。这里的想法是确保计划的DAG不会永远持续下去并阻止后续的运行实例。
现在,您可能会对execution_timeout
所有运营商提供的功能感兴趣。例如,您可以这样设置60s超时PythonOperator
:
would_succeed_with_delay = PythonOperator(task_id='would_succeed_with_delay',
dag=dag,
execution_timeout=timedelta(seconds=60),
python_callable=this_passes_with_delay,
email=to)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句