我想通过气流从自定义python项目调用脚本
我的目录结构是:
/home/user/
├──airflow/
│ ├──dags
├──.venv_airflow (virtual environment for airflow)
│ ├──my_dag.py
├──my_project
├──.venv (virtual environment for my_project)
├──folderA
├──__init__.py
├──folderB
├──call_me.py (has a line "from my_project.folderA.folderB import import_me")
├──import_me.py
我的dag文件如下所示:
from airflow import DAG
import datetime as dt
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'arpita',
'start_date': dt.datetime(2019, 11, 20),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
}
with DAG('sample',
default_args=default_args,
schedule_interval='30 * * * *',
) as dag:
enter_project = BashOperator(task_id='enter_project',
bash_command='cd /home/user/my_project',
retries=2)
setup_environment = BashOperator(task_id='setup_environment',
bash_command='source /home/user/my_project/.venv/bin/activate',
retries=2)
call_script = BashOperator(task_id='call_script',
bash_command='python -m my_project.folderA.folderB.call_me,
retries=2)
enter_project >> setup_environment >> call_script
但是我收到这个错误
[2019-11-22 11:56:49,311] {bash_operator.py:115} INFO - Running command: python -m my_project.folderA.folderB.call_me
[2019-11-22 11:56:49,315] {bash_operator.py:124} INFO - Output:
[2019-11-22 11:56:49,349] {bash_operator.py:128} INFO - /home/user/airflow/.venv/bin/python: Error while finding spec for 'my_project.folderA.folderB.call_me' (ImportError: No module named 'my_project')
项目和脚本在外部气流中正常工作。在气流中,它会导入其他程序包(如pandas和tensorflow),但不会导入自定义程序包。我尝试使用sys.path.insert插入路径,但这不起作用。感谢您的阅读:)
您的bash命令在三个单独的bash运算符中运行。它应该合而为一。
call_script = BashOperator(
task_id='call_script',
bash_command='cd /home/user/my_project;'
'source /home/user/my_project/.venv/bin/activate;'
'python -m my_project.folderA.folderB.call_me',
retries=2)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句