熊猫 groupby 聚合到 dask

用户702846

我有一个想要翻译成 Dask 的 Pandas 代码

让我们有一个虚拟数据

import dask.dataframe as dd
df = pd.DataFrame({'item_id': [10, 10, 10, 8, 8, 8], 'rating': [3, 4, 2, 1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=2)

这是熊猫代码

bb = df[['item_id', 'rating']].\
        groupby(['item_id']).agg(
        item_hist_rating_up=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x>=3.75) / len(x))),
        item_hist_rating_down=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x<3.75) / len(x))),
        item_hist_rating_q25=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q = 0.25 )),
        item_hist_rating_q75=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q = 0.75 )),
        item_hist_rating_min=pd.NamedAgg(column='rating', aggfunc='min'),
        item_hist_rating_count=pd.NamedAgg(column='rating', aggfunc='count'),
        item_hist_rating_max=pd.NamedAgg(column='rating', aggfunc='max'),
        item_hist_rating_avg=pd.NamedAgg(column='rating', aggfunc=np.mean),
    ).reset_index().round(2)
bb

我知道使用 Dask 可以计算其中的四个数字,如下所示

ddf.groupby(['item_id'])['rating'].aggregate(['sum', 'mean', 'max', 'min']).compute()

和另外两个喜欢

ddf['rating'].ge(3.75).groupby(ddf['item_id']).mean().compute()
ddf['rating'].lt(3.75).groupby(ddf['item_id']).mean().compute()

但我不知道 1) 如何做 groupby.quantile - 2) 如何连接这些结果?

克罗基洛夫
import numpy as np
import pandas as pd
import dask.dataframe as dd

stats_df = ddf.groupby(['item_id'])['rating'].aggregate(['sum', 'mean', 'max', 'min', 'count'])
stats_df['rating_up'] = ddf['rating'].ge(3.75).groupby(ddf['item_id']).mean() * 100
stats_df['rating_down'] = ddf['rating'].lt(3.75).groupby(ddf['item_id']).mean() * 100

q25 = ddf.groupby('item_id')['rating'].apply(
    lambda x: x.quantile(0.25))

q75 = ddf.groupby('item_id')['rating'].apply(
    lambda x: x.quantile(0.75))
  
qdf = dd.merge(stats_df, q25, left_index=True, right_index=True)
edf = dd.merge(stats_df, q75, left_index=True, right_index=True)
ldf = dd.merge(qdf, edf[['rating']], left_index=True, right_index=True)
ldf.columns = ['sum', 'mean', 'max', 'min', 'count', 'rating_up', 'rating_down', 'q25', 'q75']
ldf.compute().reset_index().round(2)

输出:

Out[24]: 
   item_id  sum  mean  max  min  count  rating_up  rating_down  q25  q75
0        8    6   2.0    3    1      3       0.00       100.00  1.5  2.5
1       10    9   3.0    4    2      3      33.33        66.67  2.5  3.5

或者,使用 dask.delayed 并行计算 pandas 命令。根据他们的文档,并行性是通过许多延迟调用来实现的。因此,您的聚合分解为以下几个延迟函数。

from dask import delayed

@delayed
def rating_up(x):
    return x.groupby(['item_id']).agg(
        rating_up=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x>=3.75) / len(x), 2)))

@delayed
def rating_down(x):
    return x.groupby(['item_id']).agg(
        rating_down=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x<3.75) / len(x), 2)))

@delayed
def q_25(x):
    return x.groupby(['item_id']).agg(
        rating_q25=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q=0.25 )))

@delayed
def q_75(x):
    return x.groupby(['item_id']).agg(
        rating_q75=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q=0.75 )))

@delayed
def rating_min(x):
    return x.groupby(['item_id']).agg(
        rating_min=pd.NamedAgg(column='rating', aggfunc='min'))

@delayed
def rating_max(x):
    return x.groupby(['item_id']).agg(
        rating_max=pd.NamedAgg(column='rating', aggfunc='max'))

@delayed
def rating_count(x):
    return x.groupby(['item_id']).agg(
        rating_count=pd.NamedAgg(column='rating', aggfunc='count'))

@delayed
def rating_avg(x):
    return x.groupby(['item_id']).agg(
        rating_avg=pd.NamedAgg(column='rating', aggfunc=np.mean))


def stats(x):
    count_ = rating_count(x)
    up = rating_up(x)
    down = rating_down(x)
    q25 = q_25(x)
    q75 = q_75(x)
    rate_min = rating_min(x)
    rate_max = rating_max(x)
    rate_avg = rating_avg(x)
    ddf = count_.join(up).join(down).join(q25).join(q75).join(rate_min).\
        join(rate_max).join(rate_avg)
    return ddf

stats_df = stats(df)
print(stats_df.compute().reset_index().round(2))

输出:

   item_id  rating_count  rating_up  rating_down  rating_q25  rating_q75  rating_min  rating_max  rating_avg
0        8             3       0.00       100.00         1.5         2.5           1           3           2
1       10             3      33.33        66.67         2.5         3.5           2           4           3

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章