将Spark UDF与结构序列一起使用

史蒂芬·谢菲

给定一个数据帧,其中一列是由以下序列生成的结构序列

val df = spark
  .range(10)
  .map((i) => (i % 2, util.Random.nextInt(10), util.Random.nextInt(10)))
  .toDF("a","b","c")
  .groupBy("a")
  .agg(collect_list(struct($"b",$"c")).as("my_list"))
df.printSchema
df.show(false)

产出

root
 |-- a: long (nullable = false)
 |-- my_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- b: integer (nullable = false)
 |    |    |-- c: integer (nullable = false)

+---+-----------------------------------+
|a  |my_list                            |
+---+-----------------------------------+
|0  |[[0,3], [9,5], [3,1], [4,2], [3,3]]|
|1  |[[1,7], [4,6], [5,9], [6,4], [3,9]]|
+---+-----------------------------------+

我需要在每个结构列表上运行一个函数。函数原型类似于下面的函数

case class DataPoint(b: Int, c: Int)
def do_something_with_data(data: Seq[DataPoint]): Double = {
  // This is an example. I don't actually want the sum
  data.map(data_point => data_point.b + data_point.c).sum
}

我想将此函数的结果存储到另一个DataFrame列。

我试着跑

val my_udf = udf(do_something_with_data(_))
val df_with_result = df.withColumn("result", my_udf($"my_list"))
df_with_result.show(false)

并得到

17/07/13 12:33:42 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 225, REDACTED, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<struct<b:int,c:int>>) => double)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line27.$read$$iw$$iw$DataPoint
    at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$do_something_with_data$1.apply(<console>:29)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.do_something_with_data(<console>:29)
    at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
    at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)

是否可以使用这样的UDF,而无需先通过DataFrame API将我的行强制转换为容器结构?

做类似的事情:

case class MyRow(a: Long, my_list: Seq[DataPoint])
df.as[MyRow].map(_ => (a, my_list, my_udf(my_list)))

使用DataSet api可以,但是如果可能的话,我更喜欢使用DataFrame API。

拉斐尔·罗斯(Raphael Roth)

您不能将案例类用作UDF的输入参数(但是您可以从UDF返回案例类)。要映射结构数组,您可以将传递Seq[Row]给UDF:

val  my_uDF = udf((data: Seq[Row]) => {
  // This is an example. I don't actually want the sum
  data.map{case Row(x:Int,y:Int) => x+y}.sum
})

df.withColumn("result", my_udf($"my_list")).show

+---+--------------------+------+
|  a|             my_list|result|
+---+--------------------+------+
|  0|[[0,3], [5,5], [3...|    41|
|  1|[[0,9], [4,9], [6...|    54|
+---+--------------------+------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

将malloc与结构一起使用

如何将完整的SQL与Spark结构化流一起使用

将 Scalapb 与 Spark 结构化流和 Java 生成的 protobuf 类一起使用

将Celery Chunk与对象或整数序列一起使用

将扫描仪与整数序列一起使用

MATLAB:如何将cellfun与结构一起使用?

将bsearch与结构内的数组一起使用

将.NET结构与WINAPI函数一起使用

在c中,将if语句与结构项一起使用

将QDeclarativeContext层次结构与QDeclarativeView一起使用

如何将XCTAssertNil与可选结构一起使用?

如何将结构与堆栈一起使用?

将STL函数与结构一起使用?

将malloc与函数中的结构一起使用

Apache Spark-UDF似乎无法与spark-submit一起使用

将Spring和Spark一起使用

如何将 directJoin 与 spark (scala) 一起使用?

将Spark软件包与RStudio一起使用

将结构体(字节)与SWIFT一起使用-结构体为NSData,而NSData为结构

是否可以使 Spark UDF 与可能为 None 的 Array 一起使用?

如何将Pin结构与自引用结构一起使用?

将内联汇编与序列化说明一起使用

如何将INSERT ... ON CONFLICT(id)UPDATE ...语法与序列ID一起使用?

如何将DRF序列化器与石墨烯一起使用

将整数序列与元组和可变参数模板一起使用

将参数传递给 Django REST 框架序列化程序以与模型一起使用

将Prometheus与Connexion一起使用-ValueError:CollectorRegistry中的时间序列重复

使用XCTest,如何将{期望->等待}的多个离散序列链接在一起?

如何将序列化的CRFClassifier与StanfordCoreNLP prop'ner'一起使用