我有一个 .csv 文件,其中包含以下结构的 258 列。
["label", "index_1", "index_2", ... , "index_257"]
现在我想将此 .csv 文件转换为 RDD[Row]:
val data_csv = sc.textFile("~/test.csv")
val rowRDD = data_csv.map(_.split(",")).map(p => Row( p(0), p(1).trim, p(2).trim))
如果我这样进行转换,我必须专门写下 258 列。所以我试过:
val rowRDD = data_csv.map(_.split(",")).map(p => Row( _ => p(_).trim))
和
val rowRDD = data_csv.map(_.split(",")).map(p => Row( x => p(x).trim))
但是这两个也不起作用并报告错误:
error: missing parameter type for expanded function ((x$2) => p(x$2).trim)
谁能告诉我如何进行这种转换?非常感谢。
你应该使用sqlContext
而不是sparkContext
作为
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", true)
.load(("~/test.csv")
这将创建dataframe
. 呼吁.rdd
对df
应该给你RDD[Row]
val rdd = df.rdd
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句