如何使用DataFrames在PySpark中使用窗口函数?

埃文·扎米尔(Evan Zamir)

试图弄清楚如何在PySpark中使用窗口功能。这是我想做的一个示例,只需计算用户发生“事件”的次数(在这种情况下,“ dt”是模拟时间戳)即可。

from pyspark.sql.window import Window
from pyspark.sql.functions import count

df = sqlContext.createDataFrame([{"id": 123, "dt": 0}, {"id": 123, "dt": 1}, {"id": 234, "dt":0}, {"id": 456, "dt":0}, {"id": 456, "dt":1}, {"id":456, "dt":2}])
df.select(["id","dt"], count("dt").over(Window.partitionBy("id").orderBy("dt")).alias("count")).show()

这会产生一个错误。使用窗口功能的正确方法是什么?我读到1.4.1(我们需要使用的版本,因为这是AWS上的标准)应该能够通过DataFrame API来完成。

FWIW,有关此主题的文档很少。而且我很难获得任何实际运行的示例。

零323

因为您传递了列列表,所以引发了异常。DataFrame.select外观签名如下

df.select(self, *cols)

使用窗口函数的表达式是一个与其他列一样的列,因此您需要的是这样的内容:

w = Window.partitionBy("id").orderBy("dt") # Just for clarity
df.select("id","dt", count("dt").over(w).alias("count")).show()

## +---+---+-----+
## | id| dt|count|
## +---+---+-----+
## |234|  0|    1|
## |456|  0|    1|
## |456|  1|    2|
## |456|  2|    3|
## |123|  0|    1|
## |123|  1|    2|
## +---+---+-----+

一般来说,Spark SQL窗口函数的行为与任何现代RDBMS中的行为完全相同。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章