我正在建立一个聚类算法,我需要存储模型以供将来加载。我有一个具有以下架构的数据框:
val schema = new StructType()
.add(StructField("uniqueId", LongType))
.add(StructField("timestamp", LongType))
.add(StructField("pt", ArrayType(DoubleType)))
.add(StructField("norm", DoubleType))
.add(StructField("kNN", ArrayType(LongType)))
.add(StructField("kDist", DoubleType))
.add(StructField("lrd", DoubleType))
.add(StructField("lof", DoubleType))
.add(StructField("isClusterCenter", BooleanType))
.add(StructField("clusterSize", DoubleType))
.add(StructField("clusterId", IntegerType))
我正在使用parquet()
方法编写镶木地板文件:
df.write.mode(SaveMode.Overwrite).parquet(Loader.dataPath("/tmp/milof/model"))
我已经打印了数据框,看起来不错
+--------+-------------+--------------------+------------------+------------+-------+--------------------+-------------------+---------------+-----------+---------+
|uniqueId| timestamp| pt| norm| kNN| kDist| lrd| lof|isClusterCenter|clusterSize|clusterId|
+--------+-------------+--------------------+------------------+------------+-------+--------------------+-------------------+---------------+-----------+---------+
| 1|1516459162000|[14.0, 78.0, 52.0...|219.61784991206886|[2, 3, 5, 4]|54363.0|4.950813666226044E-5| 0.3926170684395501| false| 5.0| 1|
但是当我到达上述行时,出现以下错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:508)
at it.gagliano.giuseppe.spark.clustering.milof.MiLOFModel$SaveLoadV1_0$.save(MiLOFModel.scala:593)
at it.gagliano.giuseppe.spark.clustering.milof.MiLOFModel.save(MiLOFModel.scala:364)
at it.gagliano.giuseppe.spark.clustering.milof.KafkaTrainer$.main(KafkaTrainer.scala:91)
at it.gagliano.giuseppe.spark.clustering.milof.KafkaTrainer.main(KafkaTrainer.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 60.0 failed 1 times, most recent failure: Lost task 0.0 in stage 60.0 (TID 77, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:108)
at org.apache.spark.sql.types.StructType$$anonfun$6.apply(StructType.scala:414)
at org.apache.spark.sql.types.StructType$$anonfun$6.apply(StructType.scala:414)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.types.StructType$.fromString(StructType.scala:414)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.init(ParquetWriteSupport.scala:80)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:341)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:303)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:312)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
... 8 more
有人可以解释这意味着什么吗?我怀疑是DataFrameWriter不支持某些类型,但我在Internet上找不到任何有关此类型的信息。任何建议将不胜感激。谢谢。
版本号
Spark 2.2.1
Scala 2.11.11
Json4S 'org.json4s', name: 'json4s-jackson_2.11', version: '3.6.0-M2'
切换到json4s依赖关系的先前版本有效,我使用了以下内容
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.11</artifactId>
<version>3.2.11</version>
</dependency>
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句