Scala-Spark-如何将包含一个字符串列的数据帧转换为具有rigth类型列的DF?

逃避

我目前面临无法解决的问题。我正在使用Spark 1.6。

我有一个TEXT数据框,其中一列包含具有很多字段的String JSON。根据我从正确的Json推断出的某些模式,必须将某些字段推断为String,将其他字段推断为Array,将某些字段推断为Long。

 {"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":["20295930","20295930"]}

我只设法将其转换为具有String字段列的df。我无法将其转换为正确的类型。

期望的模式位于df_schema中。“值”列包含我需要解析的String JSON。这是我的代码:

     var b = sqlContext.createDataFrame(df_txt.rdd,df_schema)
     val z= {
     b.select( b.columns.map(c => get_json_object(b("value"), s"$$.$c").alias(c)): _*)
     }
    var c = sqlContext.createDataFrame(z.rdd,df_schema)
    c.show(1)

我最终遇到这个异常,因为字段“ prices_vat”中的数组被理解为一个字符串,而不是像df_schema这样的数组:

   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 32, localhost): scala.MatchError: ["20295930","20295930"] (of class java.lang.String)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:159)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

请帮我 !

伊万曼

幸运的是,Spark具有一些内置功能来处理JSON数据:

scala> val jsonRDD = sc.parallelize(
     |      """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":["20295930","20295930"]}""" :: Nil)
jsonRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27

scala> val df = sqlContext.read.json(jsonRDD)
df: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<string>]

scala> df.show
+-------------+--------------------+--------------------+
|        email|             eventid|          prices_vat|
+-------------+--------------------+--------------------+
|[email protected]|3bc1c5d2-c10f-48d...|[20295930, 20295930]|
+-------------+--------------------+--------------------+


scala> df.printSchema
root
 |-- email: string (nullable = true)
 |-- eventid: string (nullable = true)
 |-- prices_vat: array (nullable = true)
 |    |-- element: string (containsNull = true)

还要注意,如果您希望Spark在prices_vat字段中识别这些数字,则应相应地设置其格式:

scala> val jsonRDD2 = sc.parallelize(
     |      """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":[20295930,20295930]}""" :: Nil)
jsonRDD2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:27

scala> val df2 = sqlContext.read.json(jsonRDD2)
df2: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<bigint>]

scala> df2.show
+-------------+--------------------+--------------------+
|        email|             eventid|          prices_vat|
+-------------+--------------------+--------------------+
|[email protected]|3bc1c5d2-c10f-48d...|[20295930, 20295930]|
+-------------+--------------------+--------------------+


scala> df2.printSchema
root
 |-- email: string (nullable = true)
 |-- eventid: string (nullable = true)
 |-- prices_vat: array (nullable = true)
 |    |-- element: long (containsNull = true)

如果您已经有json,则DataFrame可以执行以下操作:

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val df = sc.parallelize(
     |      """{"eventid":"3bc1c5d2-c10f-48d6-8b35-05db8665415c","email":"[email protected]","prices_vat":[20295930,20295930]}""" :: Nil).toDF("json")
df: org.apache.spark.sql.DataFrame = [json: string]

scala> df.show
+--------------------+
|                json|
+--------------------+
|{"eventid":"3bc1c...|
+--------------------+


scala> val rdd = df.rdd.map{case Row(json: String) => json}
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[43] at map at <console>:30

scala> val outDF = sqlContext.read.json(rdd)
outDF: org.apache.spark.sql.DataFrame = [email: string, eventid: string, prices_vat: array<bigint>]

scala> outDF.show
+-------------+--------------------+--------------------+
|        email|             eventid|          prices_vat|
+-------------+--------------------+--------------------+
|[email protected]|3bc1c5d2-c10f-48d...|[20295930, 20295930]|
+-------------+--------------------+--------------------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Apache Spark:将具有JSON字符串的列转换为Scala spark中的新数据框

spark scala:将结构数组列转换为字符串列

Spark:scala任何转换

scala spark 将结构类型列转换为 json 数据

Scala Spark如何将列array [string]转换为带有JSON数组的字符串?

如何将Spark数据集转换为Scala Seq

Spark 2.0.x从包含一个字符串类型数组的数据帧中转储一个csv文件

如何使用scala通过spark中的一个或多个字符串参数传递selectExpr中的列名?

Spark 2.2 数据帧 [scala]

Spark Scala中的数据转换

将包含字符串数组(大小不等)的列转换为 scala spark 中具有多行的恰好两列

Spark scala 如何将数据框中的整数列转换为十六进制大写字符串?

将字符串转换为BigInt DataFrame Spark Scala

Scala Spark包含与不包含

Scala Spark:如何从字符串列表创建RDD并将其转换为DataFrame

时间戳转换 Spark Scala

从HDFS -Spark Scala加载数据

如何使用Scala在Spark 2.1中将毫秒级字符串列转换为毫秒级时间戳?

Spark Scala排序PIVOT列

Scala Spark 创建多列

如何迭代记录Spark Scala?

将HiveQL转换为Spark Scala

Spark Scala:将StructType转换为String

Spark(scala)数据帧-检查列中的字符串是否包含集合中的任何项目

如何使用Spark Scala将一列数据转换为矢量

在Scala中将数据帧转换为Spark mllib矩阵

在Scala Spark中将嵌套的JSON转换为数据帧

如何删除Scala Spark中的多个字符?

在Scala Spark中以编程方式将所有特定的数据类型列转换为其他数据类型