捕获Spark中映射函数引发的异常

达尼洛

我正在读取一个文件,其中包含一些损坏的数据。我的文件如下所示:

30149;E;LDI0775100  350000003221374461
30153;168034601 350000003486635135

第二行应该是什么样子。第一行在第一列中有一些额外的字符。因此,我想捕捉由于数据损坏而引发的所有异常。不仅仅是上面的示例。下面是将文件加载到RDD中并尝试在map Function中捕获异常的代码。

  val rawCustfile = sc.textFile("/tmp/test_custmap")

  case class Row1(file_id:Int,mk_cust_id: String,ind_id:Long)


   val cleanedcustmap = rawCustfile.map(x => x.replaceAll(";", 
 "\t").split("\t")).map(x => Try{
     Row1(x(0).toInt, x(1), x(2).toLong)}match {
      case Success(map) => Right(map)
      case Failure(e) => Left(e)
    }) 

    //get the good columns
   val good_rows=cleanedcustmap.filter(_.isRight)

    //get the errors
    val error=cleanedcustmap.filter(_.isLeft)

     good_rows.collect().foreach(println)

   error.collect().foreach(println)

   val df = sqlContext.createDataFrame(cleanedcustmap.filter(_.isRight))

good_rows.collect()。foreach(println)打印:

 Right(Row1(30153,168034601,350000003486635135))

error.collect()。foreach(println)打印:

 Left(java.lang.NumberFormatException: For input string: "LDI0775100")

一切正常,直到尝试将rdd转换为DataFrame为止。我收到以下异常:

 Name: scala.MatchError
 Message: Product with Serializable with 
 scala.util.Either[scala.Throwable,Row1] (of class scala.reflect.internal.Types$RefinedType0)
    StackTrace: org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:676)
    org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:630)
    org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
    $line79.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
    $line79.$read$$iwC$$iwC$$iwC.<init>(<console>:61)
    $line79.$read$$iwC$$iwC.<init>(<console>:63)
    $line79.$read$$iwC.<init>(<console>:65)
    $line79.$read.<init>(<console>:67)
    $line79.$read$.<init>(<console>:71)
    $line79.$read$.<clinit>(<console>)
    $line79.$eval$.<init>(<console>:7)
    $line79.$eval$.<clinit>(<console>)
    $line79.$eval.$print(<console>)
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:497)
    org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:361)
    org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
    org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
    org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
    org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
    org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

我的第一个问题是,我是否以正确的方式捕获异常?我想得到错误,因为我要打印它们。有没有更好的办法?。我的第二个问题是将RDD转换为DataFrama时我做错了什么

扎克·佐哈(Tzach Zohar)

我是否以正确的方式捕捉到异常

好吧,差不多。不确定将映射Try到anEither是否很重要(您可以将aTry视为Either左侧类型是Throwable...的一种特殊形式),但是两者都可以工作。您可能要解决的另一个问题是-的使用replaceAll-它会在第一个参数之外生成一个正则表达式(在这种情况下不需要),因此它比慢replace,请参阅String replace()和replaceAll()之间的区别

将RDD转换为DataFrame时,我在做什么错

DataFrame仅支持有限的一组“标准”类型

  • 基本体(例如,整数,长型,字符串...)
  • 数组/映射(其他受支持的类型)
  • 产品(案例类或其他受支持类型的元组)

Either不是这些类型之一(也不是Try),因此不能在DataFrame中使用

您可以通过以下方法解决它:

  1. 仅使用DataFrame中的“成功”记录(错误除了记录外,还有什么用?这些错误会有效吗?无论哪种方式,都应该分别处理它们):

    // this would work:
    val df = spark.createDataFrame(good_rows.map(_.right.get))
    
  2. 将转换Either为受支持的类型,例如Tuple2(Row1, String),字符串是错误消息,元组中的值之一是null(对于错误的记录,左边为null,对于成功的记录,右边为null)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章