scala spark UDF ClassCastException:WrappedArray $ ofRef无法强制转换为[Lscala.Tuple2

马莫努

所以我执行必要的导入等

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._
import spark.implicits._

然后定义一些拉特隆点

val london = (1.0, 1.0)
val suburbia = (2.0, 2.0)
val southampton = (3.0, 3.0)  
val york = (4.0, 4.0)  

然后,我像这样创建一个Spark Dataframe并检查其是否有效:

val exampleDF = Seq((List(london,suburbia),List(southampton,york)),
    (List(york,london),List(southampton,suburbia))).toDF("AR1","AR2")
exampleDF.show()

数据框由以下类型组成

DataFrame = [AR1: array<struct<_1:double,_2:double>>, AR2: array<struct<_1:double,_2:double>>]

我创建一个函数来创建点的组合

// function to do what I want
val latlongexplode =  (x: Array[(Double,Double)], y: Array[(Double,Double)]) => {
 for (a <- x; b <-y) yield (a,b)
}

我检查功能是否正常

latlongexplode(Array(london,york),Array(suburbia,southampton))

确实如此。但是在我通过此功能创建UDF之后

// declare function into a Spark UDF
val latlongexplodeUDF = udf (latlongexplode) 

当我尝试在spark数据框中使用它时,我已经像上面这样创建了:

exampleDF.withColumn("latlongexplode", latlongexplodeUDF($"AR1",$"AR2")).show(false)

我得到了一个很长的stacktrace,基本上可以归结为:

java.lang.ClassCastException:scala.collection.mutable.WrappedArray $ ofRef无法转换为 [Lscala.Tuple2;
org.apache.spark.sql.catalyst.expressions.ScalaUDF。$ anonfun $ f $ 3(ScalaUDF.scala:121)org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063)组织。 apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:151)org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:50)org.apache.spark.sql。 catalyst.expressions.InterpretedProjection.apply(Projection.scala:32)scala.collection.TraversableLike。$ anonfun $ map $ 1(TraversableLike.scala:273)

如何获得此udf在Scala Spark中工作?(如果有帮助,我目前正在使用2.4)

编辑:这可能是我构建示例df的方式存在问题。但是我作为实际数据所拥有的是每列上经/长元组的数组(大小未知)。

麦克

在UDF中使用结构类型时,它们表示为Row对象,而数组列则表示为Seq。另外,您需要以Row的形式返回结构,并且需要定义一个架构以返回结构。

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val london = (1.0, 1.0)
val suburbia = (2.0, 2.0)
val southampton = (3.0, 3.0)  
val york = (4.0, 4.0)
val exampleDF = Seq((List(london,suburbia),List(southampton,york)),
    (List(york,london),List(southampton,suburbia))).toDF("AR1","AR2")
exampleDF.show(false)
+------------------------+------------------------+
|AR1                     |AR2                     |
+------------------------+------------------------+
|[[1.0, 1.0], [2.0, 2.0]]|[[3.0, 3.0], [4.0, 4.0]]|
|[[4.0, 4.0], [1.0, 1.0]]|[[3.0, 3.0], [2.0, 2.0]]|
+------------------------+------------------------+
val latlongexplode = (x: Seq[Row], y: Seq[Row]) => {
    for (a <- x; b <- y) yield Row(a, b)
}

val udf_schema = ArrayType(
    StructType(Seq(
        StructField(
            "city1",
            StructType(Seq(
                StructField("lat", FloatType),
                StructField("long", FloatType)
            ))
        ),
        StructField(
            "city2",
            StructType(Seq(
                StructField("lat", FloatType),
                StructField("long", FloatType)
            ))
        )
    ))
)

// include this line if you see errors like 
// "You're using untyped Scala UDF, which does not have the input type information."
// spark.sql("set spark.sql.legacy.allowUntypedScalaUDF = true")

val latlongexplodeUDF = udf(latlongexplode, udf_schema)
result = exampleDF.withColumn("latlongexplode", latlongexplodeUDF($"AR1",$"AR2"))
result.show(false)
+------------------------+------------------------+--------------------------------------------------------------------------------------------------------+
|AR1                     |AR2                     |latlongexplode                                                                                          |
+------------------------+------------------------+--------------------------------------------------------------------------------------------------------+
|[[1.0, 1.0], [2.0, 2.0]]|[[3.0, 3.0], [4.0, 4.0]]|[[[1.0, 1.0], [3.0, 3.0]], [[1.0, 1.0], [4.0, 4.0]], [[2.0, 2.0], [3.0, 3.0]], [[2.0, 2.0], [4.0, 4.0]]]|
|[[4.0, 4.0], [1.0, 1.0]]|[[3.0, 3.0], [2.0, 2.0]]|[[[4.0, 4.0], [3.0, 3.0]], [[4.0, 4.0], [2.0, 2.0]], [[1.0, 1.0], [3.0, 3.0]], [[1.0, 1.0], [2.0, 2.0]]]|
+------------------------+------------------------+--------------------------------------------------------------------------------------------------------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Spark java.lang.ClassCastException:scala.collection.mutable.WrappedArray $ ofRef无法转换为java.util.ArrayList

无法将scala.collection.mutable.WrappedArray $ ofRef强制转换为Integer

Spark 2 将 Scala 数组转换为 WrappedArray

Spark:将JavaRDD <Tuple2>转换为JavaPairRDD <>

我可以从Apache Spark UDF中返回Tuple2吗(在Java中)?

无法将wrappedArray$ofRef 转换为scala.collection.immutable.Seq

将JavaRDD <Tuple2 <Object,long [] >>转换为Java中的Spark数据集<Row>

在Scala Spark中并置UDF

Scala和Spark UDF功能

在Spark Scala中定义UDF

UDF无法在Spark Scala中获取文件名

Spark在UDF Java中从WrappedArray <WrappedArray <Double >>获取值形式

如何在带有Scala的Apache Spark中将特定函数转换为udf函数?

在 Spark 中调用 Scala UDF 时如何将 BinaryType 转换为 Array[Byte]?

cas 异常:scala.collection.mutable.WrappedArray$ofRef 不能转换为 [D

Spark Scala 字符串匹配 UDF

如何使用反射从Scala调用Spark UDF?

通过UDF,Spark加密CSV列-Scala

Spark Scala UDF参数限制为10

Spark Scala数据框udf返回行

从Spark-Scala UDF返回Seq [Row]

在scala中将Spark Dataframe(带有WrappedArray)转换为RDD [labelPoint]

如何遍历包含scala中的列表的tuple2?

SCALA:生成满足某些条件的 Tuple2 对象列表

Spark-将Array [Array [Map [Map [String,String]]]类型的列转换为udf时,java.lang.ClassCastException

如何将 UDF 应用于收集的行?(因“java.lang.ClassCastException:java.lang.String 无法转换为 org.apache.spark.sql.Column”而失败)

从 Scala.WrappedArray 转换为 List<>

Spark (Scala) udf 修改数据框列中的地图

Scala中使用Spark udf对范围进行模式匹配