我正在读取一个文件,其中包含一些损坏的数据。我的文件如下所示:
30149;E;LDI0775100 350000003221374461
30153;168034601 350000003486635135
第二行应该是什么样子。第一行在第一列中有一些额外的字符。因此,我想捕捉由于数据损坏而引发的所有异常。不仅仅是上面的示例。下面是将文件加载到RDD中并尝试在map Function中捕获异常的代码。
val rawCustfile = sc.textFile("/tmp/test_custmap")
case class Row1(file_id:Int,mk_cust_id: String,ind_id:Long)
val cleanedcustmap = rawCustfile.map(x => x.replaceAll(";",
"\t").split("\t")).map(x => Try{
Row1(x(0).toInt, x(1), x(2).toLong)}match {
case Success(map) => Right(map)
case Failure(e) => Left(e)
})
//get the good columns
val good_rows=cleanedcustmap.filter(_.isRight)
//get the errors
val error=cleanedcustmap.filter(_.isLeft)
good_rows.collect().foreach(println)
error.collect().foreach(println)
val df = sqlContext.createDataFrame(cleanedcustmap.filter(_.isRight))
此good_rows.collect()。foreach(println)打印:
Right(Row1(30153,168034601,350000003486635135))
此error.collect()。foreach(println)打印:
Left(java.lang.NumberFormatException: For input string: "LDI0775100")
一切正常,直到尝试将rdd转换为DataFrame为止。我收到以下异常:
Name: scala.MatchError
Message: Product with Serializable with
scala.util.Either[scala.Throwable,Row1] (of class scala.reflect.internal.Types$RefinedType0)
StackTrace: org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:676)
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:630)
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
$line79.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
$line79.$read$$iwC$$iwC$$iwC.<init>(<console>:61)
$line79.$read$$iwC$$iwC.<init>(<console>:63)
$line79.$read$$iwC.<init>(<console>:65)
$line79.$read.<init>(<console>:67)
$line79.$read$.<init>(<console>:71)
$line79.$read$.<clinit>(<console>)
$line79.$eval$.<init>(<console>:7)
$line79.$eval$.<clinit>(<console>)
$line79.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:361)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
我的第一个问题是,我是否以正确的方式捕获异常?我想得到错误,因为我要打印它们。有没有更好的办法?。我的第二个问题是将RDD转换为DataFrama时我做错了什么
我是否以正确的方式捕捉到异常
好吧,差不多。不确定将映射Try
到anEither
是否很重要(您可以将aTry
视为Either
左侧类型是Throwable
...的一种特殊形式),但是两者都可以工作。您可能要解决的另一个问题是-的使用replaceAll
-它会在第一个参数之外生成一个正则表达式(在这种情况下不需要),因此它比慢replace
,请参阅String replace()和replaceAll()之间的区别。
将RDD转换为DataFrame时,我在做什么错
DataFrame仅支持有限的一组“标准”类型:
Either
不是这些类型之一(也不是Try
),因此不能在DataFrame中使用。
您可以通过以下方法解决它:
仅使用DataFrame中的“成功”记录(错误除了记录外,还有什么用?这些错误会有效吗?无论哪种方式,都应该分别处理它们):
// this would work:
val df = spark.createDataFrame(good_rows.map(_.right.get))
将转换Either
为受支持的类型,例如Tuple2(Row1, String)
,字符串是错误消息,元组中的值之一是null
(对于错误的记录,左边为null,对于成功的记录,右边为null)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句