在Jupyter笔记本上使用pyspark.sql.function时出错

苏拉布·普拉卡什(Sourabh Prakash)

我正在尝试使用推断的架构导入CSV文件。它以orderdate列作为字符串。因此,我尝试通过使用将其设置为日期格式spark.sql.function但是,当我尝试显示前4行时,会发生错误。

这是代码!如果我不应用代码来更正日期数据类型,它将很好地工作。或者,如果我只是打印模式(使用printSchema()函数)而不是show()

import pyspark.sql.functions as f

sample_df_inferred = spark.read.csv(
    '../data/sample_data.csv'
    , header=True
    , inferSchema = True
)

# code to correct the date datatype
 
sample_df_inferred = (
    sample_df_inferred
    .withColumn('OrderDate'
                , f.to_date('OrderDate', 'MM/dd/yy')
               )
)

sample_df_inferred.show(4)

错误如下:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-e3bd2200e9b7> in <module>
     14 )
     15 
---> 16 sample_df_inferred.show(4)

D:\program_files\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

D:\program_files\spark-3.0.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

D:\program_files\spark-3.0.1-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

D:\program_files\spark-3.0.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o49.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8, LAPTOP-ARQ1E3J3, executor driver): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '1/6/16' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set it to CORRECTED and treat it as an invalid datetime string.
    at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:150)
    at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:141)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.$anonfun$parse$1(TimestampFormatter.scala:86)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:77)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.time.format.DateTimeParseException: Text '1/6/16' could not be parsed at index 0
    at java.time.format.DateTimeFormatter.parseResolved0(Unknown Source)
    at java.time.format.DateTimeFormatter.parse(Unknown Source)
    at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.$anonfun$parse$1(TimestampFormatter.scala:78)
    ... 20 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '1/6/16' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
    at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:150)
    at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:141)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.$anonfun$parse$1(TimestampFormatter.scala:86)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:77)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    ... 1 more
Caused by: java.time.format.DateTimeParseException: Text '1/6/16' could not be parsed at index 0
    at java.time.format.DateTimeFormatter.parseResolved0(Unknown Source)
    at java.time.format.DateTimeFormatter.parse(Unknown Source)
    at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.$anonfun$parse$1(TimestampFormatter.scala:78)
    ... 20 more

如何解决呢?

麦克

无需(实际上是不适当的)设置传统时间解析器策略。如果您的日期/月份可以有1或2位数字,则应使用一个M / d。M / d的数字表示一天/每月最小位数。这适用于Spark 2或3。

df.show()
+---------+
|OrderDate|
+---------+
|   1/6/16|
| 11/12/16|
+---------+

df.withColumn('OrderDate', F.to_date('OrderDate', 'M/d/yy')).show()
+----------+
| OrderDate|
+----------+
|2016-01-06|
|2016-11-12|
+----------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

无法在 Jupyter 笔记本上使用 pyspark 从 Apache Spark 连接到 MS SQL

在jupyter笔记本中使用opencv时出错:

在jupyter笔记本中使用pyspark时如何指定驱动程序类路径?

在MacOS Big Sur上安装jupyterlab / jupyter笔记本时出错

使用SSH时无法运行jupyter笔记本

在 Jupyter 中创建笔记本时出错

使用Anaconda的Jupyter笔记本电脑安装熊猫时出错

在笔记本电脑上测试时使用MeteorCamera.getPicture()拍照时出错

在EMR中运行Jupyter笔记本时,没有名为“ pyspark”的模块

加载IPython笔记本时出错

Snakemake:尝试实施Jupyter笔记本时,“规则定义中的意外笔记本”

打开笔记本时,kernal继续死在jupyter笔记本中

使用nbconvert将jupyter笔记本转换为pdf时出现“缺少$插入”错误消息

为什么在使用docker机器时需要密码才能访问jupyter笔记本?

为jupyter笔记本启用python-markdown扩展时出错

我正在Windows上使用jupyter笔记本,ipython 3。每当我启动python 3时,我都会收到“内核死机”消息

是否可以在启动时启动 Jupyter 笔记本?

Jupyter:创建新笔记本时发生错误

Jupyter笔记本计数器在处理时

运行 dask 计算时,jupyter 笔记本内核死机

从 pyspark 连接到 SQL Server 时出错

使用 vue 创建 Excel 插件时。如何根据 Excel 笔记本上的数据计算属性?

在HPC而非笔记本电脑上使用mpi_send时,Fortran代码冻结

内存出现问题时,如何继续在笔记本电脑上使用Windows?

当我打开笔记本时,Jupyter笔记本内核会不断重新启动

在我的jupyter笔记本中使用matplotlib时,图形大小无法响应plt.figure()中的figsize设置

在使用conda创建的不同环境中切换jupyter笔记本时遇到问题

在Jupyter笔记本中使用wordcloud时出现“ DLL加载失败:找不到指定的模块”错误

在Google DFS文件夹中运行jupyter笔记本时,在“保存和检查点”期间出错