无法在 PySpark 中导入镶木地板数据

里卡尔多·托纳吉

我以镶木地板格式保存了一个 Pandas DataFrame。由于它很大而且我需要执行梯度提升分类,我想使用 PySpark 来加速这个过程。我的熊猫df是这样的

Y     X

a     3.0
b     3.5
c     4.9
d     6.8

我所有的 X 的类型都是 int64 或 float64,Y 是对象。所以我将数据集保存在镶木地板 ( df.to_parquet('DF.parquet')) 中,然后按照这个文档https://spark.apache.org/docs/2.3.0/ml-classification-regression.html#gradient-boosted-tree-classifier我做

In: data = spark.read.load("DF.parquet")
Out: DataFrame[X: double, Y: string]
In: labelIndexer = StringIndexer(inputCol="Y", outputCol="indexedLabel").fit(data)
Out: Py4JJavaError: An error occurred while calling o492.fit.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
ObjectHashAggregate(keys=[], functions=[stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[StringIndexerAggregator(org.apache.spark.sql.Row)#1256])
+- Exchange SinglePartition, true, [id=#279]
   +- ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
      +- *(1) ColumnarToRow
         +- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.doExecute(ObjectHashAggregateExec.scala:102)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2938)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2938)
    at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
    at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
    at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:241)
    at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:144)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition, true, [id=#279]
+- ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
   +- *(1) ColumnarToRow
      +- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:95)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 33 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
+- *(1) ColumnarToRow
   +- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.doExecute(ObjectHashAggregateExec.scala:102)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:64)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:64)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:83)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:98)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 41 more
Caused by: org.apache.spark.sql.AnalysisException: Attribute name "Y" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
    at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:583)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:574)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.$anonfun$setSchema$2(ParquetWriteSupport.scala:472)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.$anonfun$setSchema$2$adapted(ParquetWriteSupport.scala:472)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.setSchema(ParquetWriteSupport.scala:472)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:221)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:398)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:389)
    at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:484)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
    at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:519)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
    at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:196)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 53 more
代码.gsoni

看起来您的列名 Y 中有一个空格或 \t。

请检查并删除它。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

无法使用pyspark加载镶木地板文件(不支持镶木地板类型:INT32(UINT_8);)

镶木地板Pyspark中的UPSERT

Pyspark 新手 - 导入 CSV 并创建带有数组列的镶木地板文件

无法在pyspark中导入pyarrow

无法在 presto db 中查询带有嵌套字段的镶木地板数据

在pyspark中写入镶木地板时忽略缺失值

如何使用Spark(pyspark)编写镶木地板文件?

使用 pyspark 迭代加载多个镶木地板文件

如何使用 Pyspark 并行处理多个镶木地板文件?

AWS Glue 无法写入镶木地板,内存不足

Spark 2.2无法将df写入镶木地板

无法从镶木地板读取零件文件

无法在pyspark中导入lzo文件

无法在pyspark中导入parse_url

无法将数据从S3存储桶的EMR加载到Pig中(镶木地板文件)

在pyspark中有效选择键值镶木地板列

使用pyspark读取镶木地板文件时如何指定架构?

手动选择镶木地板分区与在 pyspark 中过滤它们

Pyspark:读取镶木地板文件——检查文件总数和文件大小?

无法将 Pandas 数据框保存到带有浮点数列表作为单元格值的镶木地板上

E0401:无法在Windows 10的VSCode中导入'pyspark

Nifi:无法在 ExecuteScript 处理器中导入 pyspark

PySpark:无法导入名称“ OneHotEncoderEstimator”

我们如何在 AWS Glue 中的 PYSPARK 或 Dynamic Frames 中重命名生成/输出的镶木地板文件?

pyspark write.parquet() 创建一个文件夹而不是一个镶木地板文件

Pyspark - 导入错误:无法从“pyspark”导入名称“SparkContext”

有没有一种方法可以像Pyspark一样将大熊猫数据保存在多个(镶木地板/ csv)文件中?

Pyspark木地板-分区后排序

无法读取pyspark中的mongodb数据(json)