如何将条件应用于 PySpark 中的 groupby 数据框

亚瑟

我有一个这样的数据框:

ID   Transaction_time     Status     final_time
1     1981-01-12           hit    
1     1981-01-13           hit        
1     1981-01-14           good     1981-01-15   
1     1981-01-15           OK       1981-01-16
2     1981-01-06           good     1981-01-17
3     1981-01-07           hit      1981-01-16
4     1981-01-06           hit      
4     1981-01-07           good      
4     1981-01-08           good     1981-01-10

我想保留,ID如果:

  • Status 有“命中”和“好”/“好”
  • final_time不是空的最后Transaction_time

然后,我想提取:

  • id - 这 ID
  • status - 最后 Transaction_time
  • start_time-Transaction_time什么时候Status从“命中”变为“好”
  • finish_time -final_time最后Transaction_time

对于上面的示例,它将是:

id    status       start_time       finish_time
1     OK           1981-01-14       1981-01-16
4     good         1981-01-07       1981-01-10

如何在 PySpark 中做到这一点?

齐格

我主要使用窗口函数而不是 groupby:

w1 = Window.partitionBy('ID').orderBy(F.col('Transaction_time').desc())
w2 = Window.partitionBy('ID').orderBy(F.col('final_time').desc())

df2 = df1.withColumn('next_st', F.lag('Status', 1).over(w1)) \
         .withColumn('next_tt', F.lag('Transaction_time', 1).over(w1)) \
         .withColumn('max_tt', F.max('Transaction_time').over(w1)) \
         .withColumn('max_ft', F.max('final_time').over(w2))
df3 = df2.join(df2.filter((F.col('Transaction_time') == F.col('max_tt')) & F.col('final_time').isNotNull()), 'ID', 'leftsemi')
df4 = df3.filter((F.col('Status') == 'hit') & F.col('next_st').isin(['good', 'OK']))
df5 = (
    df4.alias('df4')
    .join(df1.alias('df1'), (df1.ID == df4.ID) & (F.col('df1.final_time') == F.col('df4.max_ft')))
    .select(
        F.col('df4.ID').alias('id'),
        F.col('df1.Status').alias('status'),
        F.col('df4.next_tt').alias('start_time'),
        F.col('df4.max_ft').alias('finish_time')
    )
)
df5.show()
#  +---+------+----------+-----------+
#  | id|status|start_time|finish_time|
#  +---+------+----------+-----------+
#  |  4|  good|1981-01-07| 1981-01-10|
#  |  1|    OK|1981-01-14| 1981-01-16|
#  +---+------+----------+-----------+

进口:

from pyspark.sql import functions as F, Window

原始数据集:

data = [
(1, '1981-01-12', 'hit', None),
(1, '1981-01-13', 'hit', None),
(1, '1981-01-14', 'good', '1981-01-15'),
(1, '1981-01-15', 'OK', '1981-01-16'),
(2, '1981-01-06', 'good', '1981-01-17'),
(3, '1981-01-07', 'hit', '1981-01-16'),
(4, '1981-01-06', 'hit', None),
(4, '1981-01-07', 'good', None),
(4, '1981-01-08', 'good', '1981-01-10')]
df1 = spark.createDataFrame(data, ['ID', 'Transaction_time', 'Status', 'final_time'])
df1 = df1.withColumn('Transaction_time', F.col('Transaction_time').cast('date')) \
         .withColumn('final_time', F.col('final_time').cast('date'))
df1.show()
#  +---+----------------+------+----------+
#  | ID|Transaction_time|Status|final_time|
#  +---+----------------+------+----------+
#  |  1|      1981-01-12|   hit|      null|
#  |  1|      1981-01-13|   hit|      null|
#  |  1|      1981-01-14|  good|1981-01-15|
#  |  1|      1981-01-15|    OK|1981-01-16|
#  |  2|      1981-01-06|  good|1981-01-17|
#  |  3|      1981-01-07|   hit|1981-01-16|
#  |  4|      1981-01-06|   hit|      null|
#  |  4|      1981-01-07|  good|      null|
#  |  4|      1981-01-08|  good|1981-01-10|
#  +---+----------------+------+----------+

中级dfs:

df1
+---+----------------+------+----------+
| ID|Transaction_time|Status|final_time|
+---+----------------+------+----------+
|  1|      1981-01-12|   hit|      null|
|  1|      1981-01-13|   hit|      null|
|  1|      1981-01-14|  good|1981-01-15|
|  1|      1981-01-15|    OK|1981-01-16|
|  2|      1981-01-06|  good|1981-01-17|
|  3|      1981-01-07|   hit|1981-01-16|
|  4|      1981-01-06|   hit|      null|
|  4|      1981-01-07|  good|      null|
|  4|      1981-01-08|  good|1981-01-10|
+---+----------------+------+----------+

df2
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st|   next_tt|    max_tt|    max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
|  1|      1981-01-15|    OK|1981-01-16|   null|      null|1981-01-15|1981-01-16|
|  1|      1981-01-14|  good|1981-01-15|     OK|1981-01-15|1981-01-15|1981-01-16|
|  1|      1981-01-13|   hit|      null|   good|1981-01-14|1981-01-15|1981-01-16|
|  1|      1981-01-12|   hit|      null|    hit|1981-01-13|1981-01-15|1981-01-16|
|  3|      1981-01-07|   hit|1981-01-16|   null|      null|1981-01-07|1981-01-16|
|  2|      1981-01-06|  good|1981-01-17|   null|      null|1981-01-06|1981-01-17|
|  4|      1981-01-08|  good|1981-01-10|   null|      null|1981-01-08|1981-01-10|
|  4|      1981-01-07|  good|      null|   good|1981-01-08|1981-01-08|1981-01-10|
|  4|      1981-01-06|   hit|      null|   good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+

df3
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st|   next_tt|    max_tt|    max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
|  1|      1981-01-15|    OK|1981-01-16|   null|      null|1981-01-15|1981-01-16|
|  1|      1981-01-14|  good|1981-01-15|     OK|1981-01-15|1981-01-15|1981-01-16|
|  1|      1981-01-13|   hit|      null|   good|1981-01-14|1981-01-15|1981-01-16|
|  1|      1981-01-12|   hit|      null|    hit|1981-01-13|1981-01-15|1981-01-16|
|  3|      1981-01-07|   hit|1981-01-16|   null|      null|1981-01-07|1981-01-16|
|  2|      1981-01-06|  good|1981-01-17|   null|      null|1981-01-06|1981-01-17|
|  4|      1981-01-08|  good|1981-01-10|   null|      null|1981-01-08|1981-01-10|
|  4|      1981-01-07|  good|      null|   good|1981-01-08|1981-01-08|1981-01-10|
|  4|      1981-01-06|   hit|      null|   good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+

df4
+---+----------------+------+----------+-------+----------+----------+----------+
| ID|Transaction_time|Status|final_time|next_st|   next_tt|    max_tt|    max_ft|
+---+----------------+------+----------+-------+----------+----------+----------+
|  1|      1981-01-13|   hit|      null|   good|1981-01-14|1981-01-15|1981-01-16|
|  4|      1981-01-06|   hit|      null|   good|1981-01-07|1981-01-08|1981-01-10|
+---+----------------+------+----------+-------+----------+----------+----------+

df5
+---+------+----------+-----------+
| id|status|start_time|finish_time|
+---+------+----------+-----------+
|  4|  good|1981-01-07| 1981-01-10|
|  1|    OK|1981-01-14| 1981-01-16|
+---+------+----------+-----------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何将条件计数(重置)应用于PySpark中的分组数据?

如何将groupBy和聚合函数应用于PySpark DataFrame中的特定窗口?

将StringIndexer应用于PySpark数据框中的几列

将函数应用于 PySpark 数据框中的列

使用pyspark将功能应用于groupBy数据

我如何使用groupby和aggregation将pyspark数据框中的行与多列连接

Pyspark数据框中的条件

将函数应用于pandas groupby 数据框中的所有列

如何将条件颜色样式应用于多索引熊猫数据框中的列

将条件应用于数据框中的段号

如何将格式编辑应用于数据框中的所有数据

如何将日期时间应用于存储在字典中的数据框中的列?

如何将 .agg 应用于数据框中的前 n 个项目

如何将函数应用于数据框中的一定数量的行?

如何将统计测试应用于 R 中数据框的几列

如何将.astype()方法应用于Python中的数据框?

如何将权重应用于数据框中的特定列以汇总新的“得分”列?

如何将公式应用于数据框中的每个值?

如何将功能应用于GROUPWISELY数据框的所有列?(在python熊猫中)

如何将字典应用于包含 Pandas 中的 numpy 数组的数据框列

如何将函数应用于列表中的数据框?

如何将函数应用于 R 中的数据框列?

在Julia中,如何将函数应用于具有数组的数据框?

如何将函数应用于数据框中的每个元素?

¿如何将权重应用于r中的数据框?

如何将函数应用于R中的每一行数据框?

基于 Spark 数据框中条件的 GroupBy

将转换应用于多列pyspark数据框

将UDF应用于pyspark数据框的子集