将csv RDD转换为地图

多伊

我有一个很大的 CSV(> 500 MB),我把它放入一个 spark RDD,我想把它存储到一个大的 Map[String, Array[Long]] 中。CSV 有多个列,但我目前只需要两列。第一列和第二列的格式为:

A 12312 [some_value] ....
B 123123[some_value] ....
A 1222 [some_value] ....
C 1231 [some_value] ....

我希望我的地图基本上按字符串分组并存储一个 long 数组,对于上述情况,我的地图将是:{"A": [12312, 1222], "B": 123123, "C":1231 }

但是因为这张地图会很大,我不能简单地直接做这个。特卡

我在 sql.dataframe 中获取 CSV

到目前为止我的代码(虽然看起来不正确):

def getMap(df: sql.DataFrame, sc: SparkContext): RDD[Map[String, Array[Long]]] = {
    var records = sc.emptyRDD[Map[String, Array[Long]]]
    val rows: RDD[Row] =  df.rdd
    rows.foreachPartition( iter => {
      iter.foreach(x =>
        if(records.contains(x.get(0).toString)){
        val arr = temp_map.getOrElse()
          records = records + (x.get(0).toString -> (temp_map.getOrElse(x.get(0).toString) :+ x.get(1).toString.toLong))
      }
        else{
          val arr = new Array[Long](1)
          arr(0) = x.get(1).toString.toLong
          records = records + (x.get(0).toString -> arr)
        }



      )
    })

  }

提前致谢!

如果我正确理解你的问题,那么

你可以groupBy第一列和collect_list第二column

import org.apache.spark.sql.functions._
val newDF = df.groupBy("column1").agg(collect_list("column2"))
newDF.show(faslse)

val rdd = newDF.rdd.map(r => (r.getString(0), r.getAs[List[Long]](1)))

这将为您提供RDD[(String, List[Long])]字符串唯一的位置

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章