I have a question similar to this one but with the addition that extra columns need to be applied and I need to know what element was the last of the list where the sliding window was applied.
I'll give an example:
Given a df:
input_df = spark.createDataFrame([
(2,[1,2,3,4,5], ["a","b","c","c","b"], ["a","a","c","c","d"]),
], ("id", "target", "feature1", "feature2"))
input_df.show():
+---+---------------+---------------+---------------+
| id| target| feature1| feature2|
+---+---------------+---------------+---------------+
| 2|[1, 2, 3, 4, 5]|[a, b, c, c, b]|[a, a, c, c, d]|
+---+---------------+---------------+---------------+
I would like to break each single line into multiple rows with a fixed size sliding window over the line. Resultant df would be like this:
output_df = spark.createDataFrame([
(2, [1,2], 3, ["a","b"], ["a","a"], False), (2, [2,3], 4, ["b","c"], ["a","c"], False), (2, [3,4], 5, ["c","c"], ["c","c"], True),
], ("id", "past-target", "future-target", "past-feature1", "past-feature2", "islast"))
output_df.show():
+---+-----------+-------------+-------------+-------------+------+
| id|past-target|future-target|past-feature1|past-feature2|islast|
+---+-----------+-------------+-------------+-------------+------+
| 2| [1, 2]| 3| [a, b]| [a, a]| false|
| 2| [2, 3]| 4| [b, c]| [a, c]| false|
| 2| [3, 4]| 5| [c, c]| [c, c]| true|
+---+-----------+-------------+-------------+-------------+------+
The logic should be to take columns ["target", "feature1", "feature2"]
and apply a sliding window of N(given as parameter, 2 in this case) where a pointer is put on the N element, creating a list for the past indexes of values in the column as [past-target, past-feature1, past-feature2]
and the current value as future-target
. Current value on the features columns can be ignored.
The first column of output df is created by looking at the first index after N
(3rd, since n=2), use it as future-target
. Then look at the 1st and 2nd value on the lists on ["target", "feature1", "feature2"]
to create values [1,2], [a,b], [a,b]
of the [past-target, past-feature1, past-feature2]
. Value of islast
is set to False since the pointer is not the last element of target
. This is done over and over to create the output_df
It's a hard logic to follow, and I don't really know how to do it with pyspark, happy to explain more if needed.
So I found an answer with support to also allow for padding (add extra values to the list if we want to have all values a possible future_target)
Code:
filler_token = "-1"
main_feature = "target"
extra_features = ["feature1", "feature2"]
past_length = 2
expr = f'TRANSFORM({main_feature}, (element, i) -> STRUCT(TRANSFORM(sequence({past_length}, 1), k -> COALESCE({main_feature}[i - k], {filler_token})) AS pasts,' + \
''.join([f'TRANSFORM(sequence({past_length}, 1), k -> COALESCE({feature}[i - k], {filler_token})) AS past{feature}, ' for feature in extra_features]) + \
f'element AS future, \
i=SIZE({main_feature})-1 as final))'
output_df = (input_df
.withColumn(features[0], f.expr(expr))
.selectExpr('id', f'inline({features[0]})'))
padding=False
if(not padding):
output_df = output_df.filter(~array_contains(col("pasts"), filler_token)) #filter out those with -1
output_df.show()
+---+------+-------------+-------------+------+-----+
| id| pasts|pastsfeature1|pastsfeature1|future|final|
+---+------+-------------+-------------+------+-----+
| 2|[1, 2]| [a, b]| [a, a]| 3|false|
| 2|[2, 3]| [b, c]| [a, c]| 4|false|
| 2|[3, 4]| [c, c]| [c, c]| 5| true|
+---+------+-------------+-------------+------+-----+
# If we put padding=True it will be like this
+---+--------+-------------+-------------+------+-----+
| id| pasts|pastsfeature1|pastsfeature1|future|final|
+---+--------+-------------+-------------+------+-----+
| 2|[-1, -1]| [-1, -1]| [-1, -1]| 1|false|
| 2| [-1, 1]| [-1, a]| [-1, a]| 2|false|
| 2| [1, 2]| [a, b]| [a, a]| 3|false|
| 2| [2, 3]| [b, c]| [a, c]| 4|false|
| 2| [3, 4]| [c, c]| [c, c]| 5| true|
+---+--------+-------------+-------------+------+-----+
I didnt go with udf as they were much slower than this solution.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments