Spark 2.2无法将df写入镶木地板

加利亚诺

我正在建立一个聚类算法,我需要存储模型以供将来加载。我有一个具有以下架构的数据框:

val schema = new StructType()
        .add(StructField("uniqueId", LongType))
        .add(StructField("timestamp", LongType))
        .add(StructField("pt", ArrayType(DoubleType)))
        .add(StructField("norm", DoubleType))
        .add(StructField("kNN", ArrayType(LongType)))
        .add(StructField("kDist", DoubleType))
        .add(StructField("lrd", DoubleType))
        .add(StructField("lof", DoubleType))
        .add(StructField("isClusterCenter", BooleanType))
        .add(StructField("clusterSize", DoubleType))
        .add(StructField("clusterId", IntegerType))

我正在使用parquet()方法编写镶木地板文件:

df.write.mode(SaveMode.Overwrite).parquet(Loader.dataPath("/tmp/milof/model"))

我已经打印了数据框,看起来不错

+--------+-------------+--------------------+------------------+------------+-------+--------------------+-------------------+---------------+-----------+---------+
|uniqueId|    timestamp|                  pt|              norm|         kNN|  kDist|                 lrd|                lof|isClusterCenter|clusterSize|clusterId|
+--------+-------------+--------------------+------------------+------------+-------+--------------------+-------------------+---------------+-----------+---------+
|       1|1516459162000|[14.0, 78.0, 52.0...|219.61784991206886|[2, 3, 5, 4]|54363.0|4.950813666226044E-5| 0.3926170684395501|          false|        5.0|        1|

但是当我到达上述行时,出现以下错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:508)
    at it.gagliano.giuseppe.spark.clustering.milof.MiLOFModel$SaveLoadV1_0$.save(MiLOFModel.scala:593)
    at it.gagliano.giuseppe.spark.clustering.milof.MiLOFModel.save(MiLOFModel.scala:364)
    at it.gagliano.giuseppe.spark.clustering.milof.KafkaTrainer$.main(KafkaTrainer.scala:91)
    at it.gagliano.giuseppe.spark.clustering.milof.KafkaTrainer.main(KafkaTrainer.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 60.0 failed 1 times, most recent failure: Lost task 0.0 in stage 60.0 (TID 77, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
    at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:108)
    at org.apache.spark.sql.types.StructType$$anonfun$6.apply(StructType.scala:414)
    at org.apache.spark.sql.types.StructType$$anonfun$6.apply(StructType.scala:414)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.types.StructType$.fromString(StructType.scala:414)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.init(ParquetWriteSupport.scala:80)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:341)
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:303)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:312)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
    ... 8 more

有人可以解释这意味着什么吗?我怀疑是DataFrameWriter不支持某些类型,但我在Internet上找不到任何有关此类型的信息。任何建议将不胜感激。谢谢。

版本号

Spark 2.2.1
Scala 2.11.11
Json4S 'org.json4s', name: 'json4s-jackson_2.11', version: '3.6.0-M2'
加利亚诺

切换到json4s依赖关系的先前版本有效,我使用了以下内容

<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-jackson_2.11</artifactId>
    <version>3.2.11</version>
</dependency>

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Spark在HDFS上写入镶木地板

将包含列表的python 2d列表写入镶木地板文件

如何从 spark2.3 访问 us-east-2 区域上的镶木地板文件(使用 hadoop aws 2.7)

从Spark将镶木地板存储到Kerberos安全的Webhdfs

镶木地板列上的Spark MergeSchema

禁止记录Spark镶木地板

如何解决Data Lake Gen2中过多的ADF / Databricks镶木地板Azure blob写入成本

从Spark写入镶木地板时如何处理空值

Spark保存(写入)镶木地板仅一个文件

从Spark将许多文件写入木地板-缺少一些木地板文件

如何使用Spark将镶木地板数据转换为案例类?

将镶木地板读入Spark数据集而忽略缺少的字段

如何使用Spark将镶木地板文件加载到Hive表中?

从 spark 读取 gzip 压缩的镶木地板文件

如何使用Spark(pyspark)编写镶木地板文件?

在独立Spark上合并镶木地板文件

具有镶木地板和分区的Spark DataFrames

Spark DataFrame分区和镶木地板分区

Spark不利用镶木地板的HDFS分区

在Spark中同时读取几个镶木地板文件

在Spark中获取镶木地板目录的源文件

Spark 2.3.3 输出镶木地板到 S3

在Spark中高效读取嵌套的镶木地板列

在Apache Spark中延迟加载分区镶木地板

Azure数据工厂v2-从镶木地板复制到SQL DB的年份错误

多个Spark作业通过分区将镶木地板数据附加到同一基本路径

如何将记录从镶木地板写入另一个镶木地板?

为什么在写入镶木地板时流查询失败并显示“ InvalidSchemaException:组类型不能为空(...)空组:spark_schema”?

Spark 2 到 Spark 1.6