我在Scala中使用spark来转换Dataframe,在这里我想计算一个新变量,该变量计算许多变量中每行一个变量的排名。
范例-
Input DF-
+---+---+---+
|c_0|c_1|c_2|
+---+---+---+
| 11| 11| 35|
| 22| 12| 66|
| 44| 22| 12|
+---+---+---+
Expected DF-
+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 11| 11| 35| 2| 3| 1|
| 22| 12| 66| 2| 3| 1|
| 44| 22| 12| 1| 2| 3|
+---+---+---+--------+--------+--------+
使用R-在R中的多列上的每一行的等级来回答这个问题,
但是我需要使用scala在spark-sql中执行相同的操作。谢谢您的帮助!
编辑-4/1。遇到一种情况,如果值相同,则等级应该不同。编辑第一行以复制情况。
如果我理解正确,则希望获得每一行中每一列的排名。
首先定义数据,然后对列进行“排名”。
val df = Seq((11, 21, 35),(22, 12, 66),(44, 22 , 12))
.toDF("c_0", "c_1", "c_2")
val cols = df.columns
然后,我们定义一个UDF来查找数组中元素的索引。
val pos = udf((a : Seq[Int], elt : Int) => a.indexOf(elt)+1)
最后,我们创建一个排序数组(降序排列),并使用UDF查找每列的排名。
val ranks = cols.map(c => pos(col("array"), col(c)).as(c+"_rank"))
df.withColumn("array", sort_array(array(cols.map(col) : _*), false))
.select((cols.map(col)++ranks) :_*).show
+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 11| 12| 35| 3| 2| 1|
| 22| 12| 66| 2| 3| 1|
| 44| 22| 12| 1| 2| 3|
+---+---+---+--------+--------+--------+
编辑:从Spark 2.4开始,pos
我定义的UDF可以由array_position(column: Column, value: Any)
工作方式完全相同的内置函数(第一个索引为1)代替。这样可以避免使用效率可能略低的UDF。
EDIT2:如果键重复,则上面的代码将生成重复的索引。如果要避免这种情况,可以创建数组,将其压缩以记住是哪一列,对其进行排序并再次压缩以得到最终排名。它看起来像这样:
val colMap = df.columns.zipWithIndex.map(_.swap).toMap
val zip = udf((s: Seq[Int]) => s
.zipWithIndex
.sortBy(-_._1)
.map(_._2)
.zipWithIndex
.toMap
.mapValues(_+1))
val ranks = (0 until cols.size)
.map(i => 'zip.getItem(i) as colMap(i) + "_rank")
val result = df
.withColumn("zip", zip(array(cols.map(col) : _*)))
.select(cols.map(col) ++ ranks :_*)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句