SparkSQL在现有列的基础上向数据框添加新列

用户名

我在SparkSQL上遇到有关DateFrame的以下问题。我基于avro文件加载到spark-shell中的DateFrame具有以下结构:

[id: bigint, Nachrichtentyp: bigint, MelderID: bigint, Wartung: bigint, Ruestung: bigint, Fehler: bigint, Leerlauf: bigint, Zeitstempel: string]

我试图添加一个新列,以将Zeitstempel(-> Timestamp,格式:1990-10-10 19:30:30)转换为毫秒:

  1. val df = sqlContext.load("hdfs://quickstart/user/hive/warehouse/ma_transport2/077cf09f-b157-40a7-9d70-b5b9f70550d9.avro", "com.databricks.spark.avro").orderBy("Zeitstempel")

  2. val d = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")

  3. vall dtFunc: (String => Long) = (arg1: String) => DateTime.parse(arg1, d).getMillis()

  4. val dtFunc2 = udf(dtFunc)

  5. val x = df.withColumn("dt", dtFunc2(col("Zeitstempel")))

  6. x.show()

错误信息:

scala> x.show()
15/08/25 06:55:33 INFO MemoryStore: ensureFreeSpace(283428) called with curMem=2248728, maxMem=278302556
15/08/25 06:55:33 INFO MemoryStore: Block broadcast_48 stored as values in memory (estimated size 276.8 KB, free 263.0 MB)
15/08/25 06:55:33 INFO MemoryStore: ensureFreeSpace(22390) called with curMem=2532156, maxMem=278302556
15/08/25 06:55:33 INFO MemoryStore: Block broadcast_48_piece0 stored as bytes in memory (estimated size 21.9 KB, free 263.0 MB)
15/08/25 06:55:33 INFO BlockManagerInfo: Added broadcast_48_piece0 in memory on 192.168.72.167:60712 (size: 21.9 KB, free: 265.2 MB)
15/08/25 06:55:33 INFO BlockManagerMaster: Updated info of block broadcast_48_piece0
15/08/25 06:55:33 INFO SparkContext: Created broadcast 48 from hadoopFile at AvroRelation.scala:75
15/08/25 06:55:34 INFO FileInputFormat: Total input paths to process : 1
15/08/25 06:55:34 INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:88
15/08/25 06:55:34 INFO DAGScheduler: Got job 32 (RangePartitioner at Exchange.scala:88) with 2 output partitions (allowLocal=false)
15/08/25 06:55:34 INFO DAGScheduler: Final stage: Stage 52(RangePartitioner at Exchange.scala:88)
15/08/25 06:55:34 INFO DAGScheduler: Parents of final stage: List()
15/08/25 06:55:34 INFO DAGScheduler: Missing parents: List()
15/08/25 06:55:34 INFO DAGScheduler: Submitting Stage 52 (MapPartitionsRDD[100] at RangePartitioner at Exchange.scala:88), which has no missing parents
15/08/25 06:55:34 INFO MemoryStore: ensureFreeSpace(4040) called with curMem=2554546, maxMem=278302556
15/08/25 06:55:34 INFO MemoryStore: Block broadcast_49 stored as values in memory (estimated size 3.9 KB, free 263.0 MB)
15/08/25 06:55:34 INFO MemoryStore: ensureFreeSpace(2243) called with curMem=2558586, maxMem=278302556
15/08/25 06:55:34 INFO MemoryStore: Block broadcast_49_piece0 stored as bytes in memory (estimated size 2.2 KB, free 263.0 MB)
15/08/25 06:55:34 INFO BlockManagerInfo: Added broadcast_49_piece0 in memory on 192.168.72.167:60712 (size: 2.2 KB, free: 265.2 MB)
15/08/25 06:55:34 INFO BlockManagerMaster: Updated info of block broadcast_49_piece0
15/08/25 06:55:34 INFO SparkContext: Created broadcast 49 from broadcast at DAGScheduler.scala:839
15/08/25 06:55:34 INFO DAGScheduler: Submitting 2 missing tasks from Stage 52 (MapPartitionsRDD[100] at RangePartitioner at Exchange.scala:88)
15/08/25 06:55:34 INFO YarnScheduler: Adding task set 52.0 with 2 tasks
15/08/25 06:55:34 INFO TaskSetManager: Starting task 0.0 in stage 52.0 (TID 840, quickstart.cloudera, NODE_LOCAL, 1425 bytes)
15/08/25 06:55:34 INFO BlockManagerInfo: Added broadcast_49_piece0 in memory on quickstart.cloudera:61000 (size: 2.2 KB, free: 530.1 MB)
15/08/25 06:55:34 INFO BlockManagerInfo: Added broadcast_48_piece0 in memory on quickstart.cloudera:61000 (size: 21.9 KB, free: 530.1 MB)
15/08/25 06:55:34 INFO TaskSetManager: Starting task 1.0 in stage 52.0 (TID 841, quickstart.cloudera, NODE_LOCAL, 1425 bytes)
15/08/25 06:55:34 INFO TaskSetManager: Finished task 0.0 in stage 52.0 (TID 840) in 95 ms on quickstart.cloudera (1/2)
15/08/25 06:55:34 INFO TaskSetManager: Finished task 1.0 in stage 52.0 (TID 841) in 22 ms on quickstart.cloudera (2/2)
15/08/25 06:55:34 INFO DAGScheduler: Stage 52 (RangePartitioner at Exchange.scala:88) finished in 0.117 s
15/08/25 06:55:34 INFO YarnScheduler: Removed TaskSet 52.0, whose tasks have all completed, from pool 
15/08/25 06:55:34 INFO DAGScheduler: Job 32 finished: RangePartitioner at Exchange.scala:88, took 0.132854 s
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1628)
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
    at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:96)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:103)
    at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:758)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:809)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:314)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:320)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:71)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:73)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:75)
    at $iwC$$iwC$$iwC.<init>(<console>:77)
    at $iwC$$iwC.<init>(<console>:79)
    at $iwC.<init>(<console>:81)
    at <init>(<console>:83)
    at .<init>(<console>:87)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
    at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 66 more
Caused by: java.lang.ArrayIndexOutOfBoundsException

安迪的帮助真的很感激!

谢谢!PL

用户名

问题解决了:

而是拆分步骤2和3:

def dtFunc: (String => Long) = (arg1: String) => DateTime.parse(arg1, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).getMillis()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Python Pandas在现有列的基础上添加新列-“值的长度与索引的长度不匹配”

在现有列的基础上创建一个新列,但在上方行

我想在熊猫中的另一列数据的基础上添加新列

在列的基础上的数据帧添加或Subract两列?

根据现有列的值向熊猫数据框添加新列

使用 Pandas 根据现有列值向数据框添加新列

根据给定条件向现有数据框列添加新列

如何根据 Python 数据框中现有的值向新列添加值?

向现有数据框添加新列并填充 numpy 数组

根据给定条件向现有数据框添加多个新列

无法在现有分类列的基础上创建数字列

无法在另一列基础上替换现有列

向数据框添加新列

向现有数据框添加新行

在两列的基础上合并两个数据框,同时考虑 nan 值

如果ID相似,则从另一个数据框向现有数据框添加新列

根据现有列向数据框添加多行和单列

如何在pyspark aws emr中向现有数据框添加多个列?

使用 pyspark 从小部件值向现有数据框添加列

熊猫数据框以添加新列并更改现有列的方向为索引

根据现有列与pyspark的交互将新列添加到数据框

python2.7 数据框:从现有列值添加新列

需要使用列将新的DateTime列添加到现有数据框

如何在 R 的数据框中添加新列并使用现有列?

有没有一种方法可以向pandas数据框添加新列,并将新列的每个唯一值附加到数据框的每个现有行?

向现有数据库表添加新列

通过 Symfony 命令向现有数据库添加新列

向现有表添加新列的效果

在 Laravel 的迁移中向现有表添加新列