我似乎不明白如何将模块导入到Apache Airflow DAG定义文件中。我希望这样做是为了能够创建一个库,例如,该库使声明具有相似设置的任务不再那么冗长。
这是我能想到的最简单的示例,它重复了该问题:我修改了气流教程(https://airflow.apache.org/tutorial.html#recap),以简单地导入模块并从该模块运行定义。像这样:
目录结构:
- dags/
-- __init__.py
-- lib.py
-- tutorial.py
tutorial.py:
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# Here is my added import
from lib import print_double
# And my usage of the imported def
print_double(2)
## -- snip, because this is just the tutorial code,
## i.e., some standard DAG defintion stuff --
print_double
只是一个简单的def,它将您输入的任何内容乘以2,然后输出结果,但是显然这根本没有关系,因为这是导入问题。
我能够airflow test tutorial print_date 2015-06-01
按照教程文档成功运行-dag运行,而且print_double成功。4
按预期打印到控制台。一切都很好。
然后,我进入Web UI,并被招呼Broken DAG: [/home/airflow/airflow/dags/tutorial.py] No module named 'lib'
。取消暂停dag并尝试使用UI进行手动运行会导致状态为“正在运行”,但它永远不会成功或失败。它只是永远坐在“运行中”。我可以根据需要排队,但他们都只能处于“运行”状态。
我已经检查了气流日志,但没有在其中看到任何有用的调试信息。
那我想念什么呢?
再次添加sys路径对我有用,
import sys
sys.path.insert(0,os.path.abspath(os.path.dirname(__file__)))
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句