PySpark Dataframe在所有列上向前填充

表:

我有以下问题。我有一个跟踪状态变化的数据集。

id  valid  eventdate
 1  False 2020-05-01
 1   True 2020-05-06
 2   True 2020-05-04
 2  False 2020-05-07
 2   True 2020-05-09
 3  False 2020-05-11

目标

SELECT valid FROM table WHERE id = 1 AND eventdate = "2020-05-05"

我需要知道在任何给定日期(从开始到今天)的状态。例如,对于id1个有效日期,仍然False是5月5日。

在熊猫中,我有一个像这样的解决方案,我在其中使用pivotffill填写空值。我使用融合将其还原为三列数据框。

from datetime import datetime
import pandas as pd

test_data = [
  [1,"False","2020-05-01"],
  [1,"True","2020-05-06"],
  [2,"True","2020-05-04"],
  [2,"False","2020-05-07"],
  [2,"True","2020-05-09"],
  [3,"False","2020-05-11"]
]
# Create inputframe
df = pd.DataFrame(test_data, columns=['id', 'valid', 'eventdate'])
df['id'] = df['id'].astype(str)
df['valid'] = df['valid'] == "True"
df['eventdate'] = pd.to_datetime(df['eventdate'])
print(df.head(6))
#   id  valid  eventdate
# 0  1  False 2020-05-01
# 1  1   True 2020-05-06
# 2  2   True 2020-05-04
# 3  2  False 2020-05-07
# 4  2   True 2020-05-09
# 5  3  False 2020-05-11

# Create full time range as frame
timeframe = pd.date_range(start=min(df['eventdate']),
                          end=datetime.now().date()).to_frame().reset_index(drop=True).rename(columns={0: 'eventdate'})
print(timeframe.head())
#    eventdate
# 0 2020-05-01
# 1 2020-05-02
# 2 2020-05-03
# 3 2020-05-04
# 4 2020-05-05

# Merge timeframe into original frame
df = df.merge(timeframe,
              left_on='eventdate',
              right_on='eventdate',
              how='right')
print(df.sort_values('eventdate').head())
#     id  valid  eventdate
# 0    1  False 2020-05-01
# 6  NaN    NaN 2020-05-02
# 7  NaN    NaN 2020-05-03
# 2    2   True 2020-05-04
# 8  NaN    NaN 2020-05-05

# 1. Pivot to get dates on rows and ids as columns
# 2. Forward fill values per id
# 3. Fill remaining NaNs with False
df = df.pivot(index='eventdate',
              columns='id',
              values='valid')\
       .fillna(method='ffill')\
       .fillna(False)
print(df.head())
# id            NaN      1      2      3
# eventdate                             
# 2020-05-01  False  False  False  False
# 2020-05-02  False  False  False  False
# 2020-05-03  False  False  False  False
# 2020-05-04  False  False   True  False
# 2020-05-05  False  False   True  False

# Drop NaN column and reset the index
df = df.loc[:, df.columns.notnull()].reset_index()
# Melt the columns back
out = pd.melt(df,
              id_vars='eventdate',
              value_name='valid')
print(out.head(10))
#    eventdate id  valid
# 0 2020-05-01  1  False
# 1 2020-05-02  1  False
# 2 2020-05-03  1  False
# 3 2020-05-04  1  False
# 4 2020-05-05  1  False
# 5 2020-05-06  1   True
# 6 2020-05-07  1   True
# 7 2020-05-08  1   True
# 8 2020-05-09  1   True
# 9 2020-05-10  1   True

我正在尝试在Spark中实现相同的功能,但是不存在正向填充。我知道如何达到最新状态id

w = Window().partitionBy("id").orderBy(F.col("eventdate").desc())
df.withColumn("rn", F.row_number().over(w)) \
  .where(F.col("rn") == 1) \
  .selectExpr("id", "valid", "eventdate AS last_change") \
  .dropna() \
  .show()

可以使用以下方法进行透视:

df\
.select(["id", "valid", "eventdate"])\
.groupBy(["eventdate"])\
.pivot("id")\
.agg(F.min("valid"))\
.drop('null')\
.sort('eventdate')\
.show()

为了进行正向填充,我通过将数据集限制为一个来做到这一点id

import sys
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql import Window

test_data = [
  [1,"False","2020-05-01"],
  [1,"True","2020-05-06"],
  [2,"True","2020-05-04"],
  [2,"False","2020-05-07"],
  [2,"True","2020-05-09"],
  [3,"False","2020-05-11"]
]
# Create dataframe
df = sc\
  .parallelize(test_data)\
  .toDF(("id", "valid", "eventdate"))\
  .withColumn("eventdate", F.to_date(F.to_timestamp("eventdate")))\
  .withColumn("valid", F.when(F.col("valid") == "True", 1).otherwise(0))
df.createOrReplaceTempView("df")
# Create event frame
event_dates = spark.sql("SELECT sequence(min(eventdate), CURRENT_DATE(), interval 1 day) as eventdate FROM df")\
                   .withColumn("eventdate",
                               F.explode(F.col("eventdate")))
# Join dates and data
df = df.join(event_dates, on='eventdate', how='right')

df2 = df.where(df.id == 1)\
  .join(event_dates, on='eventdate', how='right')\
  .withColumn('id', F.lit(1))
#df2.sort('eventdate').show()
# +----------+---+-----+
# | eventdate| id|valid|
# +----------+---+-----+
# |2020-05-01|  1|    0|
# |2020-05-02|  1| null|
# |2020-05-03|  1| null|
# |2020-05-04|  1| null|
# |2020-05-05|  1| null|
# |2020-05-06|  1|    1|
# |2020-05-07|  1| null|
# |2020-05-08|  1| null|
# |2020-05-09|  1| null|
# |2020-05-10|  1| null|
# |2020-05-11|  1| null|
# |2020-05-12|  1| null|
# |2020-05-13|  1| null|
# +----------+---+-----+

# Forward fill
window = Window.partitionBy('id')\
               .orderBy('eventdate')\
               .rowsBetween(-sys.maxsize, 0)
# Set filter
read_last = F.last(df2['valid'], ignorenulls=True).over(window)
df2.withColumn("ffill", read_last).show()
# +----------+---+-----+-----+
# | eventdate| id|valid|ffill|
# +----------+---+-----+-----+
# |2020-05-01|  1|    0|    0|
# |2020-05-02|  1| null|    0|
# |2020-05-03|  1| null|    0|
# |2020-05-04|  1| null|    0|
# |2020-05-05|  1| null|    0|
# |2020-05-06|  1|    1|    1|
# |2020-05-07|  1| null|    1|
# |2020-05-08|  1| null|    1|
# |2020-05-09|  1| null|    1|
# |2020-05-10|  1| null|    1|
# |2020-05-11|  1| null|    1|
# |2020-05-12|  1| null|    1|
# |2020-05-13|  1| null|    1|
# +----------+---+-----+-----+

我认为第一件事就是这种回答问题的方法是否正确。这样做pivot将创建一个包含几列的长表,同时存储了大量冗余数据。Spark不是解决该问题的正确工具,或者更好的是,该问题并不适合使用Spark。我知道理想情况下,您将需要使用并行处理,也许需要广播timeframe所有节点并计算id每个节点的前向填充量

最好使用一些不同的方法,例如,存储enddate事件的和当您查询时使用类似以下的方法:

id  valid  eventdate enddate
 1  False 2020-05-01 2020-05-06
 1   True 2020-05-06 2999-12-31
 2   True 2020-05-04 2020-05-07
 2  False 2020-05-07 2020-05-08
 2   True 2020-05-09 2999-12-31
 3  False 2020-05-11 2999-12-31

SELECT valid FROM table WHERE id = 1 AND "2020-05-05" between eventdate and enddate

请让我知道Spark方法是否可以实现,对于这种稀疏数据集,在任何给定日历状态下查找状态的最佳方法是什么?

谢谢。

Mohammad Murtaza Hashmi:

对于spark2.4+可以使用sequence,然后explode它转发填充。我也假设你的日期是这种格式yyyy-MM-dd

df.show() #sample dataframe

#+---+-----+----------+
#| id|valid| eventdate|
#+---+-----+----------+
#|  1|false|2020-05-01|
#|  1| true|2020-05-06|
#|  2| true|2020-05-04|
#|  2|false|2020-05-07|
#|  2| true|2020-05-09|
#|  3|false|2020-05-11|
#+---+-----+----------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("id").orderBy(F.to_date("eventdate","yyyy-MM-dd"))

df.withColumn("lead", F.lead("eventdate").over(w))\
  .withColumn("sequence", F.when(F.col("lead").isNotNull(),
                                 F.expr("""sequence(to_date(eventdate),date_sub(to_date(lead),1), interval 1 day)"""))\
                                 .otherwise(F.array("eventdate")))\
 .select("id","valid",F.explode("sequence").alias("eventdate"))\
 .show(truncate=False)


#+---+-----+----------+
#|id |valid|eventdate |
#+---+-----+----------+
#|1  |false|2020-05-01|
#|1  |false|2020-05-02|
#|1  |false|2020-05-03|
#|1  |false|2020-05-04|
#|1  |false|2020-05-05|
#|1  |true |2020-05-06|
#|3  |false|2020-05-11|
#|2  |true |2020-05-04|
#|2  |true |2020-05-05|
#|2  |true |2020-05-06|
#|2  |false|2020-05-07|
#|2  |false|2020-05-08|
#|2  |true |2020-05-09|
#+---+-----+----------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章