Pyspark explode multiple columns with sliding window

Jaime Ferrando Huertas

I have a question similar to this one but with the addition that extra columns need to be applied and I need to know what element was the last of the list where the sliding window was applied.

I'll give an example:

Given a df:

input_df = spark.createDataFrame([
    (2,[1,2,3,4,5], ["a","b","c","c","b"], ["a","a","c","c","d"]),
    ], ("id", "target", "feature1", "feature2"))

input_df.show():
+---+---------------+---------------+---------------+
| id|         target|       feature1|       feature2|
+---+---------------+---------------+---------------+
|  2|[1, 2, 3, 4, 5]|[a, b, c, c, b]|[a, a, c, c, d]|
+---+---------------+---------------+---------------+

I would like to break each single line into multiple rows with a fixed size sliding window over the line. Resultant df would be like this:

output_df = spark.createDataFrame([
    (2, [1,2], 3, ["a","b"], ["a","a"], False), (2, [2,3], 4, ["b","c"], ["a","c"], False), (2, [3,4], 5, ["c","c"], ["c","c"], True),
    ], ("id", "past-target", "future-target", "past-feature1", "past-feature2", "islast"))

output_df.show():
+---+-----------+-------------+-------------+-------------+------+
| id|past-target|future-target|past-feature1|past-feature2|islast|
+---+-----------+-------------+-------------+-------------+------+
|  2|     [1, 2]|            3|       [a, b]|       [a, a]| false|
|  2|     [2, 3]|            4|       [b, c]|       [a, c]| false|
|  2|     [3, 4]|            5|       [c, c]|       [c, c]|  true|
+---+-----------+-------------+-------------+-------------+------+

The logic should be to take columns ["target", "feature1", "feature2"] and apply a sliding window of N(given as parameter, 2 in this case) where a pointer is put on the N element, creating a list for the past indexes of values in the column as [past-target, past-feature1, past-feature2] and the current value as future-target. Current value on the features columns can be ignored.

The first column of output df is created by looking at the first index after N (3rd, since n=2), use it as future-target. Then look at the 1st and 2nd value on the lists on ["target", "feature1", "feature2"] to create values [1,2], [a,b], [a,b] of the [past-target, past-feature1, past-feature2]. Value of islast is set to False since the pointer is not the last element of target. This is done over and over to create the output_df

It's a hard logic to follow, and I don't really know how to do it with pyspark, happy to explain more if needed.

Jaime Ferrando Huertas

So I found an answer with support to also allow for padding (add extra values to the list if we want to have all values a possible future_target)

Code:

filler_token = "-1"

main_feature = "target"
extra_features = ["feature1", "feature2"]
past_length = 2

expr = f'TRANSFORM({main_feature}, (element, i) -> STRUCT(TRANSFORM(sequence({past_length}, 1), k -> COALESCE({main_feature}[i - k], {filler_token})) AS pasts,' + \
       ''.join([f'TRANSFORM(sequence({past_length}, 1), k -> COALESCE({feature}[i - k], {filler_token})) AS past{feature}, ' for feature in extra_features]) + \
       f'element AS future, \
       i=SIZE({main_feature})-1 as final))'

output_df = (input_df
             .withColumn(features[0], f.expr(expr))
             .selectExpr('id', f'inline({features[0]})'))

padding=False
if(not padding):
  output_df = output_df.filter(~array_contains(col("pasts"), filler_token)) #filter out those with -1


output_df.show()

+---+------+-------------+-------------+------+-----+
| id| pasts|pastsfeature1|pastsfeature1|future|final|
+---+------+-------------+-------------+------+-----+
|  2|[1, 2]|       [a, b]|       [a, a]|     3|false|
|  2|[2, 3]|       [b, c]|       [a, c]|     4|false|
|  2|[3, 4]|       [c, c]|       [c, c]|     5| true|
+---+------+-------------+-------------+------+-----+

# If we put padding=True it will be like this
+---+--------+-------------+-------------+------+-----+
| id|   pasts|pastsfeature1|pastsfeature1|future|final|
+---+--------+-------------+-------------+------+-----+
|  2|[-1, -1]|     [-1, -1]|     [-1, -1]|     1|false|
|  2| [-1, 1]|      [-1, a]|      [-1, a]|     2|false|
|  2|  [1, 2]|       [a, b]|       [a, a]|     3|false|
|  2|  [2, 3]|       [b, c]|       [a, c]|     4|false|
|  2|  [3, 4]|       [c, c]|       [c, c]|     5| true|
+---+--------+-------------+-------------+------+-----+

I didnt go with udf as they were much slower than this solution.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related