我在这里无法弄清楚实现的问题,也找不到如何使用UnaryTransformer在PySpark管道中计算自定义转换的示例。
from pyspark.ml import Pipeline, UnaryTransformer
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
df = spark.createDataFrame([
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])
class ScaleUp(UnaryTransformer):
def createTransformFunc(self):
"""
Creates the transform function using the given param map. The input param map already takes
account of the embedded param map. So the param values should be determined
solely by the input param map.
"""
return f.udf(lambda item: item * 10, returnType=DoubleType())
def outputDataType(self):
"""
Returns the data type of the output column.
"""
return DoubleType()
def validateInputType(self, inputType):
"""
Validates the input type. Throw an exception if it is invalid.
"""
assert inputType == DoubleType(), f'Expected DoubleType() and found {inputType}'
scale_up = ScaleUp().setInputCol('categoryIndex2')
pipeline = Pipeline(stages=[scale_up])
pipeline.fit(df).transform(df).show()
该createTransformFunc
函数需要Python函数,而不是Spark UDF:
class ScaleUp(UnaryTransformer):
def createTransformFunc(self):
return lambda item: item * 10
def outputDataType(self):
return DoubleType()
def validateInputType(self, inputType):
assert inputType == DoubleType(), f'Expected DoubleType() and found {inputType}'
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句