需要解析json文件

卡尔蒂基扬
root
 |-- eid: string (nullable = true)
 |-- keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

需要使用spark数据帧将具有上述架构的jsonfile解析为结构化格式。keys列的列名称在“ values”列中具有值。

样本数据文件:{'type':'logs','eid':'1','keys':['crt_ts','id','upd_ts','km','pivl','distance', 'speed'],'values':[['12343.0000.012','AAGA1567','1333.333.333','565656','10 .5','121','64']]}}

预期输出:

eid crt_ts id  upd_ts km  pivl distance speed type
  1  12343.0000.012 AAGA1567 1333.333.333 565656 10.5 121 64 logs
斯里尼瓦斯

请检查下面的代码,我已经使用groupBypivotagg

scala> val js = Seq(""" {'type': 'logs', 'eid': '1', 'keys': ['crt_ts', 'id', 'upd_ts', 'km', 'pivl', 'distance', 'speed'], 'values': [['12343.0000.012', 'AAGA1567', '1333.333.333', '565656', '10.5', '121', '64']]}""").toDS
js: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val jdf = spark.read.json(js)
jdf: org.apache.spark.sql.DataFrame = [eid: string, keys: array<string> ... 2 more fields]

scala> jdf.printSchema
root
 |-- eid: string (nullable = true)
 |-- keys: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)


scala> jdf.show(false)
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
|eid|keys                                           |type|values                                                           |
+---+-----------------------------------------------+----+-----------------------------------------------------------------+
|1  |[crt_ts, id, upd_ts, km, pivl, distance, speed]|logs|[[12343.0000.012, AAGA1567, 1333.333.333, 565656, 10.5, 121, 64]]|
+---+-----------------------------------------------+----+-----------------------------------------------------------------+

scala> :paste
// Entering paste mode (ctrl-D to finish)

jdf.select($"eid",$"keys",explode($"values").as("values"),$"type")
.select($"eid",$"type",explode(arrays_zip($"keys",$"values")).as("azip"))
.select($"eid",$"azip.*",$"type")
.groupBy($"type",$"eid")
.pivot($"keys")
.agg(first("values"))
.show(false)

// Exiting paste mode, now interpreting.

+----+---+--------------+--------+--------+------+----+-----+------------+
|type|eid|crt_ts        |distance|id      |km    |pivl|speed|upd_ts      |
+----+---+--------------+--------+--------+------+----+-----+------------+
|logs|1  |12343.0000.012|121     |AAGA1567|565656|10.5|64   |1333.333.333|
+----+---+--------------+--------+--------+------+----+-----+------------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章