Spark DataFrame基于权重col值n次添加行

红夜

我想做的是“超采样”一个小的csv文件,每行中都有一个权重值列。

Age|City|Weight
20 | NY |2
30 | SF |3

 Age|City|
 20 | NY |
 20 | NY |
 30 | SF |
 30 | SF |
 30 | SF |

我和熊猫和NP一起做的

df = pd.read_csv('file.csv',low_memory=False)
weights=round(df.weight)
df.loc[np.repeat(df.index.values,weights)]

但是它太慢了,它在超过24小时的时间内使用了1个CPU的100%(15个可用)和所有65G内存,最终崩溃了。最终文件应包含超过7,000万行。

所以我尝试使用Spark。

rdd.map(lamba x: rdd.udf())或类似的东西,结合explode()应该会有所帮助,但我不知道如何使它正确。最后,我需要将DataFrame或RDD保存在未分区的CSV中:一个可以与熊猫一起使用的CSV。

谢谢 !

用户9711200

如果可以将数据加载到内存中并且计划使用Pandas在本地读取输出,则没有任何建议表明您需要Spark。简单一点

import csv

with open("input.csv") as fr, open("output.csv", "w") as fw:
    reader = csv.reader(fr)
    writer = csv.writer(fw)
    for age, city, weight in reader:
        if age == "age":
            writer.writerow((age, city))
        else:
            writer.writerows((age, city) for _ in range(int(weight)))

或列数较多(我假设权重是最后一列,请根据实际数据的形状进行调整):

with open("input.csv") as fr, open("output.csv", "w") as fw:
    reader = csv.reader(fr)
    writer = csv.writer(fw)
    for row:
        if row[0] == "age":
            writer.writerow(row[:-1])  
        else:
            writer.writerows(row[:-1] for _ in range(int(row[-1])))

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章