在PySpark中导入用户定义的模块失败

martin32_

我有以下python代码:

from service import Api
from pyspark.sql import SparkSession
...
spark = SparkSession.builder.appName("App Name").enableHiveSupport().getOrCreate()
myApi= Api()

df = spark.sql('SELECT * FROM hive_table')

def map_function(row):
        sql = 'SELECT Name FROM sql_table LIMIT 1'
        result = myApi.executeSQL(sql)
        if int(row[4]) > 100:
            return (result[0][0], row[4])
        else:
            return (row[3], row[4])

schema = StructType([StructField('Name', StringType(), True), StructField('Value', IntegerType(), True)])
rdd_data = df.rdd.map(map_function)
df1 = spark.createDataFrame(rdd_data, schema)
df1.show()

我创建一个Spark DataFrame并使用map函数进行迭代。在map函数中,我访问SQL表格的先前定义的Api。

该代码在控制台和Apache Zeppelin Notebook中成功运行,没有错误。但是,如果我在脚本中执行它,则会发生以下错误:

ImportError: No module named Api

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

它是在访问map函数中的myApi对象时发生的。在服务模块的文件夹中是一种__init__.py方法,因此这不是问题。

有谁知道可能是什么问题?

Giorgos Myrianthous

如果您正在运行作业,则spark-submit需要使用--py-files标记提供python文件首先,创建一个.zip具有所有依赖关系文件:

pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .

最后使用--py-files

spark-submit --py-files dependencies.zip your_spark_job.py

最后,在您的spark作业的脚本中添加以下行:

sc.addPyFile("dependencies.zip")

另外,如果您使用的是Jupyter Notebook,您所要做的就是将模块的路径附加到PYTHONPATH:

export PYTHONPATH="${PYTHONPATH}:/path/to/your/service.py"

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

TOP 榜单

  1. 1

    来自Microsoft Office加载项taskpane.js的MySQL驱动程序模块的空引用

  2. 2

    使用AWS Cognito和React的仅限Facebook / Google的登录名(无用户名/密码)

  3. 3

    创建Windows Phone 8应用并将其连接到数据库的最佳方法(最好是SQL Server)

  4. 4

    为什么Java中的System.out.println()打印到控制台?

  5. 5

    卷曲函数无法解析来自bash中变量的代理

  6. 6

    是什么在Android的consumer-rules.pro和proguard-rules.pro之间的区别?

  7. 7

    设置与Apache POI Excel表散点图标记图标的颜色

  8. 8

    将Qt Pyside2与asyncio await语法一起使用?

  9. 9

    崇高的文字+蟒蛇的蟒蛇

  10. 10

    任务':app:minifyReleaseWithR8'.java.lang.NullPointerException的执行失败(无错误消息)

  11. 11

    OpenJDK的和AdoptOpenJDK的区别

  12. 12

    大型数据集缓存到Spark内存中时,“超出了GC开销限制”(通过sparklyr和RStudio)

  13. 13

    “执行测试CMAKE_HAVE_LIBC_PTHREAD”失败实际上是什么意思?

  14. 14

    使用Core 2.2中的Identity,如何在关闭浏览器15分钟后保持会话活动?

  15. 15

    React中的ForwardRefExoticComponent和ForwardRefRenderFunction有什么区别?

  16. 16

    猫鼬查找结果,然后将字段替换为findOne

  17. 17

    如何降级Google Colab的Torch版本

  18. 18

    Keras提前停止回调错误,val_loss指标不可用

  19. 19

    如何避免VSCode中的“导入路径不能以.ts扩展名结尾”错误?

  20. 20

    Nuxt.JS:如何在页面中获取路由URL参数

  21. 21

    是否有为什么会AccessibilityManager.sInstance导致内存泄漏的一个原因?

热门标签

归档