为什么在写入镶木地板时流查询失败并显示“ InvalidSchemaException:组类型不能为空(...)空组:spark_schema”?

我将Spark 2.2.1与Parquet 1.8.1一起使用。

我想从Kafka读取JSON数据并进行一些转换,然后将数据写入实木复合地板文件中,然后可以由Apache Hive加载。但是当writeStream到实木复合地板时遇到了以下错误。

Caused by: org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema
    at org.apache.parquet.schema.GroupType.<init>(GroupType.java:92)
    at org.apache.parquet.schema.GroupType.<init>(GroupType.java:48)
    at org.apache.parquet.schema.MessageType.<init>(MessageType.java:50)
    at org.apache.parquet.schema.Types$MessageTypeBuilder.named(Types.java:1256)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.<init>(ParquetSchemaConverter.scala:563)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.<clinit>(ParquetSchemaConverter.scala)
    ... 22 more

我用谷歌搜索,发现其他人遇到了相同的问题,其根本原因不是所有字段都是实木复合地板不支持的叶子字段,而是在我的数据框中有所有叶子字段。为什么?提前致谢!

这是我的代码:

  val nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
  val jsonOptions: Map[String, String] = Map{ "timestampFormat" -> nestTimestampFormat }
  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "wikipedia-edits")
    .option("startingOffsets", "earliest")
    .option("group.id", "SparkProcessor")
    .load()
    .select(from_json(col("value").cast("string"), schema, jsonOptions) as "wikiEdit")

  val parsed = df.select("wikiEdit.bot", "wikiEdit.title", "wikiEdit.user", "wikiEdit.wiki")

  parsed.printSchema()

  //parsed.writeStream.format("console").option("truncate", false).start().awaitTermination(30000)

  parsed.writeStream.format("parquet")
    .option("path","hdfs://localhost:9000/wiki-parquet-spark")
    .option("checkpointLocation", "hdfs://localhost:9000/checkpoint")
    .trigger(Trigger.ProcessingTime(10*1000))
    .start.awaitTermination()

该程序可以打印模式并在数据框中显示一些数据。

root
 |-- bot: boolean (nullable = true)
 |-- title: string (nullable = true)
 |-- user: string (nullable = true)
 |-- wiki: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----------------------------+----------+-----------+
|bot  |title                        |user      |wiki       |
+-----+-----------------------------+----------+-----------+
|false|Jak Roberto                  |WikiPedant|enwiki     |
|false|File:Oostkamp01.jpg          |Herpoel   |commonswiki|
|false|Category:CC-BY-SA-4.0        |Herpoel   |commonswiki|
|false|Category:Self-published work |Herpoel   |commonswiki|
|false|Category:Geography of Belgium|Herpoel   |commonswiki|
|false|Category:CC-BY-SA-4.0        |Herpoel   |commonswiki|
|false|Category:Self-published work |Herpoel   |commonswiki|
|false|Category:Geography of Belgium|Herpoel   |commonswiki|
|false|Category:CC-BY-SA-4.0        |Herpoel   |commonswiki|
|false|Category:Self-published work |Herpoel   |commonswiki|
|false|Category:Geography of Belgium|Herpoel   |commonswiki|
|false|Category:CC-BY-SA-4.0        |Herpoel   |commonswiki|
|false|Category:Self-published work |Herpoel   |commonswiki|
|false|Category:Geography of Belgium|Herpoel   |commonswiki|
|false|Category:CC-BY-SA-4.0        |Herpoel   |commonswiki|
|false|Category:Self-published work |Herpoel   |commonswiki|
|false|Category:Geography of Belgium|Herpoel   |commonswiki|
|true |Category:CC-BY-SA-4.0        |Herpoel   |commonswiki|
|true |Category:Self-published work |Herpoel   |commonswiki|
|true |Category:Geography of Belgium|Herpoel   |commonswiki|
+-----+-----------------------------+----------+-----------+
only showing top 20 rows
杰西克·拉斯考夫斯基(Jacek Laskowski)

TL; DR升级到Spark 2.2.0(甚至更好地升级到2.2.1)。

认为这与PARQUET-363有关。无法为确实提到错误消息和Spark的ReadContext.requestedSchema构造空的MessageType

在parquet-mr 1.8.1中,不再允许构造空的GroupType(因此也不能构造MessageType)(请参阅PARQUET-278)。在大多数情况下,此更改是有意义的,因为Parquet不支持空组。但是,在一个用例中,一个空的MessageType有效,即在对Parquet文件中的行进行计数时,将一个空的MessageType作为ReadContext的requiredSchema构造函数参数传递。它起作用的原因是,Parquet可以从块元数据中检索行数,而无需实现任何列。

然后在问题报告中:

我们可以看到,Spark SQL没有将请求的列传递给基础的Parquet阅读器。

该问题报告链接到请求请求263请求显示:

这将删除PARQUET-278中添加的拒绝没有字段的架构组的检查。Hive和SparkSQL允许从文件中选择0列,并用于实现诸如select count(1)之类的查询。

拉请求确实删除了检查(您在流式数据集/ Spark结构化流式处理中一直面临着)。

throw new InvalidSchemaException("A group type can not be empty. Parquet does not support empty group without leaves. Empty group: " + name);

这样,我们了解到Spark中的Parquet版本可能与1.8版本分支有所不同。

这导致了对请求请求的讨论,该请求最终由于另一个请求而被关闭,该请求被接受为将实木复合地板版本提高到1.8.2的一部分这就是我们想要摆脱错误消息的Parquet的版本。

由于Spark从Spark 2.2.0开始使用Parquet 1.8.2,因此我的建议是升级到Spark 2.2.0(甚至更好地升级到2.2.1)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章