获取pyspark中每行最大值的列索引

古斯塔沃蒂

我有一个 pyspark 数据框,例如:

ID 第 1 列 Col2 第 3 列 ... 列宁
1 10 5 21 ... -9
2 87 1 1 ... 1
3 1 95 1 ... 1

如何创建一个 pyspark 数据框列MAX来表示索引列,其中每行的值最大,例如:

ID 第 1 列 Col2 第 3 列 ... 列宁 最大限度
1 10 5 21 ... -9 3
2 87 1 1 ... 1 1
3 1 95 1 ... 1 2
墙壁

在每行中创建一个具有最大值的列

列出可以找到最大值的列

消除列表中的 NaN

下面的代码

import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import*
w=Window.partitionBy('ID').orderBy().rowsBetween(Window.unboundedPreceding,0)
df=(df.withColumn(
    "max",
    F.greatest(*[F.col(x) for x in df.columns[1:]])#Find the max in each row
)
.withColumn(
  'maxcol', array(*[when(col(c) ==col('max'), lit(c)) for c in df.columns])#Find intersection of max with all other columns
).withColumn(
  'maxcol', expr("filter(maxcol, x -> x is not null)")#Filter ou the nans in the intersection
).show())

+---+----+----+----+----+---+------+
| ID|Col1|Col2|Col3|ColN|max|maxcol|
+---+----+----+----+----+---+------+
|  1|  10|   5|  21|  -9| 21|[Col3]|
|  2|  87|   1|   1|   1| 87|[Col1]|
|  3|   1|  95|   1|   1| 95|[Col2]|
+---+----+----+----+----+---+------+

您也可以使用 pandas_udf 虽然我不确定 pyspark.sql.functions import pandas_udf 的功效

import pandas as pd
from pyspark.sql.types import *
def max_col(a:pd.DataFrame) -> pd.DataFrame:
  s=a.isin(a.iloc[:,1:].max(1))
  return a.assign(maxcol=s.agg(lambda x: x.index[x].values, axis=1))
schema=StructType([\
                  StructField('ID',LongType(),True),\
                  StructField('Col1',LongType(),True),\
                  StructField('Col2',LongType(),True),\
                  StructField('Col3',LongType(),True),\
                  StructField('ColN',LongType(),True),\
                  StructField('maxcol',ArrayType(StringType(),True),False)\
                 ])
df.groupby('ID').applyInPandas(max_col, schema).show()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章