我想写一个自定义Transformer
的火花2.0阶管道。到目前为止,这是不是真的清楚我的什么copy
或transformSchema
方法应返回。他们返回a是否正确null
?https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java复制吗?
由于Transformer
延长PipelineStage
我的结论,一个fit
调用的transformSchema
方法。我是否正确理解transformSchema
类似于sk-learns fit?
由于我Transformer
应该将数据集与(非常小的)第二个数据集结合在一起,因此我也想将其存储在序列化管道中。我应如何将其存储在转换器中以正确使用管道序列化机制?
一个简单的转换器看起来如何,该转换器可以计算单个列的平均值并填充nan值+保持该值?
@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {
def transform(df: Dataset[MyClass]): DataFrame = {
}
override def copy(extra: ParamMap): Transformer = {
}
override def transformSchema(schema: StructType): StructType = {
schema
}
}
transformSchema
应该返回该应用后预计的模式Transformer
。例:
如果transfomer添加的列IntegerType
,则输出列名称为foo
:
import org.apache.spark.sql.types._
override def transformSchema(schema: StructType): StructType = {
schema.add(StructField("foo", IntegerType))
}
因此,如果未更改数据集的架构,因为仅填充了用于均值插补的名称值,我应该返回原始案例类作为架构?
在Spark SQL(以及MLlib)中,这是不可能的,因为创建后aDataset
是不可变的。只能添加或“替换”(这就是增加之后drop
的操作)列。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句