我正在尝试使用推断的架构导入CSV文件。它以orderdate
列作为字符串。因此,我尝试通过使用将其设置为日期格式spark.sql.function
。但是,当我尝试显示前4行时,会发生错误。
这是代码!如果我不应用代码来更正日期数据类型,它将很好地工作。或者,如果我只是打印模式(使用printSchema()
函数)而不是show()
。
import pyspark.sql.functions as f
sample_df_inferred = spark.read.csv(
'../data/sample_data.csv'
, header=True
, inferSchema = True
)
# code to correct the date datatype
sample_df_inferred = (
sample_df_inferred
.withColumn('OrderDate'
, f.to_date('OrderDate', 'MM/dd/yy')
)
)
sample_df_inferred.show(4)
错误如下:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-5-e3bd2200e9b7> in <module>
14 )
15
---> 16 sample_df_inferred.show(4)
D:\program_files\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
438 """
439 if isinstance(truncate, bool) and truncate:
--> 440 print(self._jdf.showString(n, 20, vertical))
441 else:
442 print(self._jdf.showString(n, int(truncate), vertical))
D:\program_files\spark-3.0.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
D:\program_files\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
D:\program_files\spark-3.0.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o49.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8, LAPTOP-ARQ1E3J3, executor driver): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '1/6/16' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set it to CORRECTED and treat it as an invalid datetime string.
at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:150)
at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:141)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.$anonfun$parse$1(TimestampFormatter.scala:86)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:77)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.time.format.DateTimeParseException: Text '1/6/16' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(Unknown Source)
at java.time.format.DateTimeFormatter.parse(Unknown Source)
at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.$anonfun$parse$1(TimestampFormatter.scala:78)
... 20 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '1/6/16' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:150)
at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:141)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.$anonfun$parse$1(TimestampFormatter.scala:86)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:77)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
Caused by: java.time.format.DateTimeParseException: Text '1/6/16' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(Unknown Source)
at java.time.format.DateTimeFormatter.parse(Unknown Source)
at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.$anonfun$parse$1(TimestampFormatter.scala:78)
... 20 more
如何解决呢?
无需(实际上是不适当的)设置传统时间解析器策略。如果您的日期/月份可以有1或2位数字,则应使用一个M / d。M / d的数字表示一天/每月的最小位数。这适用于Spark 2或3。
df.show()
+---------+
|OrderDate|
+---------+
| 1/6/16|
| 11/12/16|
+---------+
df.withColumn('OrderDate', F.to_date('OrderDate', 'M/d/yy')).show()
+----------+
| OrderDate|
+----------+
|2016-01-06|
|2016-11-12|
+----------+
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句