如何将CSV文件转换为RDD

拉米亚:

我是新来的火花。我想对CSV记录中的特定数据执行一些操作。

我正在尝试读取CSV文件并将其转换为RDD。我的进一步操作基于CSV文件中提供的标题。

(摘自评论)到目前为止,这是我的代码:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 
    @Override public Iterable<String> call(String s) { 
    return Arrays.asList(EOL.split(s)); 
    } 
});
final String heading=lines.first().toString();

我可以获得这样的标题值。我想将此映射到CSV文件中的每个记录。

final String[] header=heading.split(" "); 

我可以获得这样的标题值。我想将此映射到CSV文件中的每个记录。

在Java中,我CSVReader record.getColumnValue(Column header)用来获取特定值。我需要做类似这里的事情。

maasg:

一种简单的方法是拥有一种保留标头的方法。

假设您有一个file.csv,例如:

user, topic, hits
om,  scala, 120
daniel, spark, 80
3754978, spark, 1

我们可以定义一个标题类,该标题类使用第一行的解析版本:

class SimpleCSVHeader(header:Array[String]) extends Serializable {
  val index = header.zipWithIndex.toMap
  def apply(array:Array[String], key:String):String = array(index(key))
}

我们可以使用该标头来处理以后的数据:

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...

请注意,header仅仅不过是助记符到数组索引的简单映射。几乎所有这些操作都可以在数组中元素的顺序位置上完成,例如user = row(0)

PS:欢迎来到Scala :-)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章