PySpark窗口不适用于指定的整数范围

黑场

我正在尝试确定特定的处理作业在接下来的五次运行中是否出现负面结果。这是我的数据设置:

from pyspark.sql import SQLContext, functions as func
from pyspark.sql.window import Window
import datetime

job_history_df = sqlContext.createDataFrame(
    [
        ('A', 'X', datetime.datetime.strptime('2018-01-02 19:00:13.0', '%Y-%m-%d %H:%M:%S.0')),
        ('A', 'X', datetime.datetime.strptime('2018-01-03 19:00:09.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'X', datetime.datetime.strptime('2018-01-04 19:00:24.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'X', datetime.datetime.strptime('2018-01-05 19:00:21.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'X', datetime.datetime.strptime('2018-01-06 19:00:33.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'Y', datetime.datetime.strptime('2018-01-08 19:00:12.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'Y', datetime.datetime.strptime('2018-01-09 19:00:22.0', '%Y-%m-%d %H:%M:%S.0')),
        ('A', 'Y', datetime.datetime.strptime('2018-01-10 19:00:21.0', '%Y-%m-%d %H:%M:%S.0')),
        ('S', 'Y', datetime.datetime.strptime('2018-01-10 19:00:23.0', '%Y-%m-%d %H:%M:%S.0')),

    ],
    ['jhr_status', 'ajb_name', 'jhr_run_date']
)

def dt_to_timestamp():
    def _dt_to_timestamp(dt):
        return int(dt.timestamp() * 1000)
    return func.udf(_dt_to_timestamp)

job_history_df = job_history_df.withColumn('jhr_run_date_ts', dt_to_timestamp()(func.col('jhr_run_date')).cast('long'))
job_history_df = job_history_df.withColumn('was_abend', func.when(job_history_df['jhr_status'] == 'A', 1).otherwise(0))

下面是job_history_df看起来像:

>>> job_history_df.show(20, False)
+----------+--------+---------------------+---------------+---------+
|jhr_status|ajb_name|jhr_run_date         |jhr_run_date_ts|was_abend|
+----------+--------+---------------------+---------------+---------+
|A         |X       |2018-01-02 19:00:13.0|1514941213000  |1        |
|A         |X       |2018-01-03 19:00:09.0|1515027609000  |1        |
|S         |X       |2018-01-04 19:00:24.0|1515114024000  |0        |
|S         |X       |2018-01-05 19:00:21.0|1515200421000  |0        |
|S         |X       |2018-01-06 19:00:33.0|1515286833000  |0        |
|S         |Y       |2018-01-08 19:00:12.0|1515459612000  |0        |
|S         |Y       |2018-01-09 19:00:22.0|1515546022000  |0        |
|A         |Y       |2018-01-10 19:00:21.0|1515632421000  |1        |
|S         |Y       |2018-01-10 19:00:23.0|1515632423000  |0        |
+----------+--------+---------------------+---------------+---------+

>>> job_history_df.dtypes
[('jhr_status', 'string'), ('ajb_name', 'string'), ('jhr_run_date', 'timestamp'), ('jhr_run_date_ts', 'bigint'), ('was_abend', 'int')]

接下来,我将创建我的Window

base_job_window = Window().partitionBy('ajb_name').orderBy('jhr_run_date_ts')

接下来,我们将指定要累加的范围:

n_next_runs = 5

next_n_runs_window = base_job_window.rangeBetween(1, n_next_runs)
job_history_df = job_history_df.withColumn('n_abends_next_n_runs', func.sum('was_abend').over(next_n_runs_window))

让我们看看我们得到了什么:

>>> job_history_df.show(20, False)
+----------+--------+---------------------+---------------+---------+--------------------+
|jhr_status|ajb_name|jhr_run_date         |jhr_run_date_ts|was_abend|n_abends_next_n_runs|
+----------+--------+---------------------+---------------+---------+--------------------+
|S         |Y       |2018-01-08 19:00:12.0|1515459612000  |0        |null                |
|S         |Y       |2018-01-09 19:00:22.0|1515546022000  |0        |null                |
|A         |Y       |2018-01-10 19:00:21.0|1515632421000  |1        |null                |
|S         |Y       |2018-01-10 19:00:23.0|1515632423000  |0        |null                |
|A         |X       |2018-01-02 19:00:13.0|1514941213000  |1        |null                |
|A         |X       |2018-01-03 19:00:09.0|1515027609000  |1        |null                |
|S         |X       |2018-01-04 19:00:24.0|1515114024000  |0        |null                |
|S         |X       |2018-01-05 19:00:21.0|1515200421000  |0        |null                |
|S         |X       |2018-01-06 19:00:33.0|1515286833000  |0        |null                |
+----------+--------+---------------------+---------------+---------+--------------------+

真奇怪 n_abends_next_n_runs我认为,输出的输出应始终为1s。如果我们将所有先前的失败加在一起怎么办?

all_previous_window = base_job_window.rangeBetween(Window.unboundedPreceding, -1)
job_history_df = job_history_df.withColumn('n_abends_to_pt', func.sum('was_abend').over(all_previous_window))

这给出了正确的结果:

+----------+--------+---------------------+---------------+---------+--------------------+--------------+
|jhr_status|ajb_name|jhr_run_date         |jhr_run_date_ts|was_abend|n_abends_next_n_runs|n_abends_to_pt|
+----------+--------+---------------------+---------------+---------+--------------------+--------------+
|S         |Y       |2018-01-08 19:00:12.0|1515459612000  |0        |null                |null          |
|S         |Y       |2018-01-09 19:00:22.0|1515546022000  |0        |null                |0             |
|A         |Y       |2018-01-10 19:00:21.0|1515632421000  |1        |null                |0             |
|S         |Y       |2018-01-10 19:00:23.0|1515632423000  |0        |null                |1             |
|A         |X       |2018-01-02 19:00:13.0|1514941213000  |1        |null                |null          |
|A         |X       |2018-01-03 19:00:09.0|1515027609000  |1        |null                |1             |
|S         |X       |2018-01-04 19:00:24.0|1515114024000  |0        |null                |2             |
|S         |X       |2018-01-05 19:00:21.0|1515200421000  |0        |null                |2             |
|S         |X       |2018-01-06 19:00:33.0|1515286833000  |0        |null                |2             |
+----------+--------+---------------------+---------------+---------+--------------------+--------------+

指定整数范围而不是使用Window.unboundedPrecedingor可能是什么问题Window.unboundedFollowing

作为参考,我在RHEL6 VM上运行Spark版本2.1.1.2.6.2.14-5。

更多分析

进一步研究这一点,我想检查一下“常规的” SQL是否可以工作:

job_history_df.registerTempTable('table')

job_history_df = sqlContext.sql(
    '''
    SELECT
        *,
        SUM(was_abend) OVER (PARTITION BY ajb_name ORDER BY jhr_run_date_ts ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) AS abends_last_5_runs
    FROM table
    '''
)

实际上,确实如此!

>>> job_history_df.show(20, False)
+----------+--------+---------------------+---------------+---------+--------------------+------------------+
|jhr_status|ajb_name|jhr_run_date         |jhr_run_date_ts|was_abend|n_abends_next_n_runs|abends_last_5_runs|
+----------+--------+---------------------+---------------+---------+--------------------+------------------+
|S         |Y       |2018-01-08 19:00:12.0|1515459612000  |0        |null                |null              |
|S         |Y       |2018-01-09 19:00:22.0|1515546022000  |0        |null                |0                 |
|A         |Y       |2018-01-10 19:00:21.0|1515632421000  |1        |null                |0                 |
|S         |Y       |2018-01-10 19:00:23.0|1515632423000  |0        |null                |1                 |
|A         |X       |2018-01-02 19:00:13.0|1514941213000  |1        |null                |null              |
|A         |X       |2018-01-03 19:00:09.0|1515027609000  |1        |null                |1                 |
|S         |X       |2018-01-04 19:00:24.0|1515114024000  |0        |null                |2                 |
|S         |X       |2018-01-05 19:00:21.0|1515200421000  |0        |null                |2                 |
|S         |X       |2018-01-06 19:00:33.0|1515286833000  |0        |null                |2                 |
+----------+--------+---------------------+---------------+---------+--------------------+------------------+

尽管这仍然不能解决尝试尝试纯Spark-SQL的问题,但明天的工作确实会轻松很多:)

拉梅什·马哈然(Ramesh Maharjan)

您的spark-sql之所以有效,是因为您在查询中使用了rowsBetween而不是rangeBetweenrangeBetween在前两次尝试中使用rowsBetween过,在查询中使用过。

其语法格式如下两个相同的rowsBetweenrangeBetween,但他们的表现完全不同的方式。

让我用一个例子通过创建一个数据框,并使用相同的逻辑,你有使用rowsBetweenrangeBetween,而是中sumcollect_list用于让很清楚哪些和多少正在考虑

比方说你有一个数据帧作为

df = spark.createDataFrame([('X', -2),
                            ('X', 0),
                            ('X', 2),
                            ('X', 3),
                            ('X', 21),
                            ('X', 1)], ('col1', 'col2'))
+----+----+
|col1|col2|
+----+----+
|   X|  -2|
|   X|   0|
|   X|   2|
|   X|   3|
|   X|  21|
|   X|   1|
+----+----+

行之间

base_job_window = Window().partitionBy('col1').orderBy('col2')
n_next_runs = 4
next_n_runs_window = base_job_window.rowsBetween(2, n_next_runs)

df.withColumn('col2_next_n_runs', func.collect_list('col2').over(next_n_runs_window)).show(truncate=False)

+----+----+----------------+
|col1|col2|col2_next_n_runs|
+----+----+----------------+
|X   |-2  |[1, 2, 3]       |
|X   |0   |[2, 3, 21]      |
|X   |1   |[3, 21]         |
|X   |2   |[21]            |
|X   |3   |[]              |
|X   |21  |[]              |
+----+----+----------------+

如您所见,由于rowBetween中的值是2和4,因此收集了当前行的第二,第三和第四行

rangeBetween

base_job_window = Window().partitionBy('col1').orderBy('col2')
n_next_runs = 4
next_n_runs_window = base_job_window.rangeBetween(2, n_next_runs)

df.withColumn('col2_next_n_runs', func.collect_list('col2').over(next_n_runs_window)).show(truncate=False)

+----+----+----------------+
|col1|col2|col2_next_n_runs|
+----+----+----------------+
|X   |-2  |[0, 1, 2]       |
|X   |0   |[2, 3]          |
|X   |1   |[3]             |
|X   |2   |[]              |
|X   |3   |[]              |
|X   |21  |[]              |
+----+----+----------------+

如您所见 the values of rangeBetween are added to the value of the column used in orderBy in the current row and all the values that lie in between that range in col2 are collected.

如果您阅读Spark SQL中的介绍窗口函数,将会更清楚

此外,我要复制该文章的一部分

在此处输入图片说明

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章