Airflow无法导入自定义python程序包

阿尔皮塔·夏尔马

我想通过气流从自定义python项目调用脚本

我的目录结构是:

/home/user/
      ├──airflow/
      │  ├──dags
             ├──.venv_airflow (virtual environment for airflow)
      │      ├──my_dag.py
      ├──my_project
         ├──.venv (virtual environment for my_project)
         ├──folderA
            ├──__init__.py
            ├──folderB
               ├──call_me.py (has a line "from my_project.folderA.folderB import import_me")
               ├──import_me.py

我的dag文件如下所示:

from airflow import DAG
import datetime as dt
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'arpita',
    'start_date': dt.datetime(2019, 11, 20),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
}

with DAG('sample',
         default_args=default_args,
         schedule_interval='30 * * * *',
         ) as dag:
    enter_project = BashOperator(task_id='enter_project',
                                 bash_command='cd /home/user/my_project',
                                 retries=2)
    setup_environment = BashOperator(task_id='setup_environment',
                                     bash_command='source /home/user/my_project/.venv/bin/activate',
                                     retries=2)
    call_script = BashOperator(task_id='call_script',
                                 bash_command='python -m my_project.folderA.folderB.call_me,
                                 retries=2)

enter_project >> setup_environment >> call_script

但是我收到这个错误

[2019-11-22 11:56:49,311] {bash_operator.py:115} INFO - Running command: python -m my_project.folderA.folderB.call_me
[2019-11-22 11:56:49,315] {bash_operator.py:124} INFO - Output:
[2019-11-22 11:56:49,349] {bash_operator.py:128} INFO - /home/user/airflow/.venv/bin/python: Error while finding spec for 'my_project.folderA.folderB.call_me' (ImportError: No module named 'my_project')

项目和脚本在外部气流中正常工作。在气流中,它会导入其他程序包(如pandas和tensorflow),但不会导入自定义程序包。我尝试使用sys.path.insert插入路径,但这不起作用。感谢您的阅读:)

果汁

您的bash命令在三个单独的bash运算符中运行。它应该合而为一。

call_script = BashOperator(
    task_id='call_script',
    bash_command='cd /home/user/my_project;'
                 'source /home/user/my_project/.venv/bin/activate;'
                 'python -m my_project.folderA.folderB.call_me',
    retries=2)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

导入自定义程序包显示“错误:程序包com不存在”

CakePHP:无法从自定义程序包加载类

jax-ws导入和自定义程序包到名称空间的映射

如何在IntelliJ IDEA(Java)中导入自定义程序包?

Java Compiler API:导入自定义程序包不起作用

无法从自定义包导入共享的python代码

无法导入自定义包

自定义通知布局:从程序包中发布的错误通知无法展开RemoteViews

找不到模块:无法解析自定义程序包[React Import]

构建自定义RPM,但程序包为空?

如何为自定义程序包设置别名?

在自定义GO程序包之上提交

Laradock-添加自定义npm程序包

定义程序包范围的默认导入

VSCode:无法导入Golang程序包

无法导入Imagick程序包

NameError:在自定义程序包中调用函数时未定义名称“ pd”

proto3->使用自定义扩展名,导致在go代码中导入到程序包(“ google / protobuf”)

导入带有连字符的python自定义包

使用自定义设置导入Python包

Python3关于导入自定义包的问题

如何在自定义程序包中使用tidyselect“ where”?

具有依赖项初始化错误的自定义程序包:ModuleNotFoundError或ImportError

自定义程序包名称cxf-codegen-plugin

buildroot将错误的PREFIX附加到自定义程序包构建中

自定义节点程序包不提供名为

如何通过自定义操作读取程序包代码

如何将更多变体或自定义程序包添加到debootstrap?

外部程序包的自定义JSON封送处理