如何将Spark数据帧转换为Pandas并返回Kedro?

德米特里·德里亚宾

我试图了解在Kedro中将一个节点发出的Spark数据帧转换为另一个节点的输入所需的Pandas的最佳方法,而无需创建冗余转换步骤。

德米特里·德里亚宾

Kedro目前支持以下两种策略:

使用转码功能

这需要一个定义两个DataCatalog条目同样的数据集,在一个共同的格式相同的文件工作(实木复合地板,JSON,CSV,等),您catalog.yml

my_dataframe@spark:
  type: kedro.contrib.io.pyspark.SparkDataSet
  filepath: data/02_intermediate/data.parquet

my_dataframe@pandas:
  type: ParquetLocalDataSet
  filepath: data/02_intermediate/data.parquet

然后像这样在管道中使用它们:

Pipeline([
    node(my_func1, "spark_input", "my_dataframe@spark"),
    node(my_func2, "my_dataframe@pandas", "output"),
])

在这种情况下,应kedro了解这my_dataframe两种情况下的数据集相同,并正确解析节点的执行顺序。同时,kedro将使用SparkDataSet实现进行保存和ParquetLocalDataSet加载,因此第一个节点应输出pyspark.sql.DataFrame,而第二个节点将接收pandas.Dataframe

使用Pandas进行SparkSpark进行Pandas节点装饰

注意: Spark <-> Pandas内存转换因其内存需求臭名昭著,因此仅当已知数据帧较小时,这才是可行的选择。

可以按照文档装饰节点:

from spark import get_spark
from kedro.contrib.decorators import pandas_to_spark

@pandas_to_spark(spark_session)
def my_func3(data):
    data.show() # data is pyspark.sql.DataFrame

甚至整个管道:

Pipeline([
    node(my_func4, "pandas_input", "some_output"),
    ...
]).decorate(pandas_to_spark)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章