Spark如何将RDD [JSONObject]转换为数据集

xstack2000

我正在从com.google.gson.JsonObject类型的Element的RDD中读取数据。尝试将其转换为DataSet,但不知道如何执行此操作。

import com.google.gson.{JsonParser}
import org.apache.hadoop.io.LongWritable
import org.apache.spark.sql.{SparkSession}

object tmp {
  class people(name: String, age: Long, phone: String)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val sc = spark.sparkContext

    val parser = new JsonParser();
    val jsonObject1 = parser.parse("""{"name":"abc","age":23,"phone":"0208"}""").getAsJsonObject()
    val jsonObject2 = parser.parse("""{"name":"xyz","age":33}""").getAsJsonObject()

    val PairRDD = sc.parallelize(List(
      (new LongWritable(1l), jsonObject1),
      (new LongWritable(2l), jsonObject2)
    ))

    val rdd1 =PairRDD.map(element => element._2)

    import spark.implicits._

    //How to create Dataset as schema People from rdd1?
  }
}

即使试图打印rdd1元素抛出

object not serializable (class: org.apache.hadoop.io.LongWritable, value: 1)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (1,{"name":"abc","age":23,"phone":"0208"}))

基本上,我从BigQuery表中获得了该RDD [LongWritable,JsonParser] ,我想将其转换为数据集,以便可以将SQL应用于转换。

我故意将第二条记录中的手机留空,BigQuery对该值为null的元素不返回任何内容。

Shoaib伯克

感谢您的澄清。您需要在kryo中将类注册为Serializable。以下显示工作。我在spark-shell中运行,因此必须销毁旧上下文并使用包含已注册的Kryo类的配置创建新的spark上下文。

import com.google.gson.{JsonParser}
import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext

sc.stop()

val conf = sc.getConf
conf.registerKryoClasses( Array(classOf[LongWritable], classOf[JsonParser] ))
conf.get("spark.kryo.classesToRegister")

val sc = new SparkContext(conf)

val parser = new JsonParser();
val jsonObject1 = parser.parse("""{"name":"abc","age":23,"phone":"0208"}""").getAsJsonObject()
val jsonObject2 = parser.parse("""{"name":"xyz","age":33}""").getAsJsonObject()

val pairRDD = sc.parallelize(List(
  (new LongWritable(1l), jsonObject1),
  (new LongWritable(2l), jsonObject2)
))


val rdd = pairRDD.map(element => element._2)

rdd.collect()
// res9: Array[com.google.gson.JsonObject] = Array({"name":"abc","age":23,"phone":"0208"}, {"name":"xyz","age":33})

val jsonstrs = rdd.map(e=>e.toString).collect()
val df = spark.read.json( sc.parallelize(jsonstrs) )    
df.printSchema
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)
//  |-- phone: string (nullable = true)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章