我在SparkSQL上遇到有关DateFrame的以下问题。我基于avro文件加载到spark-shell中的DateFrame具有以下结构:
[id: bigint, Nachrichtentyp: bigint, MelderID: bigint, Wartung: bigint, Ruestung: bigint, Fehler: bigint, Leerlauf: bigint, Zeitstempel: string]
我试图添加一个新列,以将Zeitstempel(-> Timestamp,格式:1990-10-10 19:30:30)转换为毫秒:
val df = sqlContext.load("hdfs://quickstart/user/hive/warehouse/ma_transport2/077cf09f-b157-40a7-9d70-b5b9f70550d9.avro", "com.databricks.spark.avro").orderBy("Zeitstempel")
val d = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
vall dtFunc: (String => Long) = (arg1: String) => DateTime.parse(arg1, d).getMillis()
val dtFunc2 = udf(dtFunc)
val x = df.withColumn("dt", dtFunc2(col("Zeitstempel")))
x.show()
错误信息:
scala> x.show()
15/08/25 06:55:33 INFO MemoryStore: ensureFreeSpace(283428) called with curMem=2248728, maxMem=278302556
15/08/25 06:55:33 INFO MemoryStore: Block broadcast_48 stored as values in memory (estimated size 276.8 KB, free 263.0 MB)
15/08/25 06:55:33 INFO MemoryStore: ensureFreeSpace(22390) called with curMem=2532156, maxMem=278302556
15/08/25 06:55:33 INFO MemoryStore: Block broadcast_48_piece0 stored as bytes in memory (estimated size 21.9 KB, free 263.0 MB)
15/08/25 06:55:33 INFO BlockManagerInfo: Added broadcast_48_piece0 in memory on 192.168.72.167:60712 (size: 21.9 KB, free: 265.2 MB)
15/08/25 06:55:33 INFO BlockManagerMaster: Updated info of block broadcast_48_piece0
15/08/25 06:55:33 INFO SparkContext: Created broadcast 48 from hadoopFile at AvroRelation.scala:75
15/08/25 06:55:34 INFO FileInputFormat: Total input paths to process : 1
15/08/25 06:55:34 INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:88
15/08/25 06:55:34 INFO DAGScheduler: Got job 32 (RangePartitioner at Exchange.scala:88) with 2 output partitions (allowLocal=false)
15/08/25 06:55:34 INFO DAGScheduler: Final stage: Stage 52(RangePartitioner at Exchange.scala:88)
15/08/25 06:55:34 INFO DAGScheduler: Parents of final stage: List()
15/08/25 06:55:34 INFO DAGScheduler: Missing parents: List()
15/08/25 06:55:34 INFO DAGScheduler: Submitting Stage 52 (MapPartitionsRDD[100] at RangePartitioner at Exchange.scala:88), which has no missing parents
15/08/25 06:55:34 INFO MemoryStore: ensureFreeSpace(4040) called with curMem=2554546, maxMem=278302556
15/08/25 06:55:34 INFO MemoryStore: Block broadcast_49 stored as values in memory (estimated size 3.9 KB, free 263.0 MB)
15/08/25 06:55:34 INFO MemoryStore: ensureFreeSpace(2243) called with curMem=2558586, maxMem=278302556
15/08/25 06:55:34 INFO MemoryStore: Block broadcast_49_piece0 stored as bytes in memory (estimated size 2.2 KB, free 263.0 MB)
15/08/25 06:55:34 INFO BlockManagerInfo: Added broadcast_49_piece0 in memory on 192.168.72.167:60712 (size: 2.2 KB, free: 265.2 MB)
15/08/25 06:55:34 INFO BlockManagerMaster: Updated info of block broadcast_49_piece0
15/08/25 06:55:34 INFO SparkContext: Created broadcast 49 from broadcast at DAGScheduler.scala:839
15/08/25 06:55:34 INFO DAGScheduler: Submitting 2 missing tasks from Stage 52 (MapPartitionsRDD[100] at RangePartitioner at Exchange.scala:88)
15/08/25 06:55:34 INFO YarnScheduler: Adding task set 52.0 with 2 tasks
15/08/25 06:55:34 INFO TaskSetManager: Starting task 0.0 in stage 52.0 (TID 840, quickstart.cloudera, NODE_LOCAL, 1425 bytes)
15/08/25 06:55:34 INFO BlockManagerInfo: Added broadcast_49_piece0 in memory on quickstart.cloudera:61000 (size: 2.2 KB, free: 530.1 MB)
15/08/25 06:55:34 INFO BlockManagerInfo: Added broadcast_48_piece0 in memory on quickstart.cloudera:61000 (size: 21.9 KB, free: 530.1 MB)
15/08/25 06:55:34 INFO TaskSetManager: Starting task 1.0 in stage 52.0 (TID 841, quickstart.cloudera, NODE_LOCAL, 1425 bytes)
15/08/25 06:55:34 INFO TaskSetManager: Finished task 0.0 in stage 52.0 (TID 840) in 95 ms on quickstart.cloudera (1/2)
15/08/25 06:55:34 INFO TaskSetManager: Finished task 1.0 in stage 52.0 (TID 841) in 22 ms on quickstart.cloudera (2/2)
15/08/25 06:55:34 INFO DAGScheduler: Stage 52 (RangePartitioner at Exchange.scala:88) finished in 0.117 s
15/08/25 06:55:34 INFO YarnScheduler: Removed TaskSet 52.0, whose tasks have all completed, from pool
15/08/25 06:55:34 INFO DAGScheduler: Job 32 finished: RangePartitioner at Exchange.scala:88, took 0.132854 s
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1628)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:96)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:103)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:758)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:809)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:314)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:320)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:71)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:73)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:75)
at $iwC$$iwC$$iwC.<init>(<console>:77)
at $iwC$$iwC.<init>(<console>:79)
at $iwC.<init>(<console>:81)
at <init>(<console>:83)
at .<init>(<console>:87)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 66 more
Caused by: java.lang.ArrayIndexOutOfBoundsException
安迪的帮助真的很感激!
谢谢!PL
问题解决了:
而是拆分步骤2和3:
def dtFunc: (String => Long) = (arg1: String) => DateTime.parse(arg1, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).getMillis()
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句