我有一个包含Parquet文件的文件夹。像这样:
scala> val df = sc.parallelize(List(1,2,3,4)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.write.parquet("/tmp/test/df/1.parquet")
scala> val df = sc.parallelize(List(5,6,7,8)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.write.parquet("/tmp/test/df/2.parquet")
保存数据帧后,当我去读取文件df
夹中的所有镶木地板文件时,它给了我错误。
scala> val read = spark.read.parquet("/tmp/test/df")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425)
... 48 elided
我知道我可以通过提供完整路径来读取Parquet文件,但是如果有一种方法可以读取文件夹中的所有Parquet文件,那会更好。
Spark不会像您想象的那样写/读实木复合地板。
它使用Hadoop库写入/读取分区的实木复合地板文件。
因此,您的第一个实木复合地板文件位于目录的路径/tmp/test/df/1.parquet/
下1.parquet
。这意味着从实木复合地板读取时,您需要提供实木复合地板目录的路径或单个文件的路径。
val df = spark.read.parquet("/tmp/test/df/1.parquet/")
我建议您阅读官方文档以了解更多详细信息。[cf. SQL编程指南-Parquet文件]
编辑:
您必须在寻找这样的东西:
scala> sqlContext.range(1,100).write.save("/tmp/test/df/1.parquet")
scala> sqlContext.range(100,500).write.save("/tmp/test/df/2.parquet")
scala> val df = sqlContext.read.load("/tmp/test/df/*")
// df: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df.show(3)
// +---+
// | id|
// +---+
// |400|
// |401|
// |402|
// +---+
// only showing top 3 rows
scala> df.count
// res3: Long = 499
您还可以在文件路径URI中使用通配符。
您可以提供以下多个文件路径:
scala> val df2 = sqlContext.read.load("/tmp/test/df/1.parquet","/tmp/test/df/2.parquet")
// df2: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df2.count
// res5: Long = 499
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句