如何在PySpark中使用UnaryTransformer?

佩蒂纳托

我在这里无法弄清楚实现的问题,也找不到如何使用UnaryTransformer在PySpark管道中计算自定义转换的示例。

from pyspark.ml import Pipeline, UnaryTransformer
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType

df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

class ScaleUp(UnaryTransformer):
    def createTransformFunc(self):
        """
        Creates the transform function using the given param map. The input param map already takes
        account of the embedded param map. So the param values should be determined
        solely by the input param map.
        """
        return f.udf(lambda item: item * 10, returnType=DoubleType())

    def outputDataType(self):
        """
        Returns the data type of the output column.
        """
        return DoubleType()

    def validateInputType(self, inputType):
        """
        Validates the input type. Throw an exception if it is invalid.
        """
        assert inputType == DoubleType(), f'Expected DoubleType() and found {inputType}'
  
scale_up = ScaleUp().setInputCol('categoryIndex2')
pipeline = Pipeline(stages=[scale_up])
pipeline.fit(df).transform(df).show()
麦克

createTransformFunc函数需要Python函数,而不是Spark UDF:

class ScaleUp(UnaryTransformer):
    def createTransformFunc(self):
        return lambda item: item * 10

    def outputDataType(self):
        return DoubleType()

    def validateInputType(self, inputType):
        assert inputType == DoubleType(), f'Expected DoubleType() and found {inputType}'

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在pyspark中使用链接?

如何在pyspark中使用combinedByKey

如何在PySpark中使用窗口功能?

如何在pyspark中使用udf函数

如何在PySpark中使用Scala UDF?

使用pyspark时如何在条件中使用for循环?

如何在pyspark中使用groupby创建转换矩阵

如何在Pyspark中使用重复键countByValue?

如何在Pyspark的熊猫中使用iloc获得相同的结果?

如何在pyspark结构化流中使用maxOffsetsPerTrigger?

如何在pyspark中使用Spark Riak连接器?

如何在pyspark中使用外部(自定义)包?

如何在Pyspark中使用动态列旋转表

如何在pyspark中使用MultiClassMetrics计算f分数?

如何在PySpark脚本中使用pmml模型?

如何在过滤条件pyspark中使用功能

如何在pyspark中使用udf和class withcolumn

如何在 Pyspark 中使用 groupby 在条件中删除列

使用pyspark时如何在agg和groupBy中使用lambda?

如何在Pyspark中使用Rlike使用多个正则表达式模式

如何在 pyspark 中使用 group by 归一化进行值计数

如何在Jupyter Notebook中使用PySpark时包含外部Spark库

如何在pyspark中使用第一个和最后一个功能?

我们如何在pyspark中使用dense_rank()函数?

如何在Pyspark数据框中使用列表理解变量名称

如何在pyspark的postgres jdbc驱动程序中使用nextval()?

如何在PySpark中使用公共密钥加入/合并数据框列表?

如何在Pyspark的循环中使用相同的火花上下文

如何在PySpark window()中使用毫秒作为参数。功能之间?