我正在研究一个简单的ETL项目,该项目读取CSV文件,对每列进行一些修改,然后将结果写为JSON。我希望读取结果的下游进程可以确信我的输出符合约定的模式,但是我的问题是,即使我为所有字段都使用nullable = false定义了输入模式,null也会潜入并破坏我的输出文件,而且似乎没有让我的Spark输入字段强制执行“非null”的(性能)方法。
正如Spark在《权威指南》中所述,这似乎是一项功能:
当您定义一个架构,其中所有列均声明为不具有空值时,Spark将不会强制执行该操作,而是会乐于让空值进入该列。可为空的信号仅是为了帮助Spark SQL优化处理该列。如果列中的空值不应包含空值,则可能会得到错误的结果,或者会看到难以调试的奇怪异常。
我编写了一个小小的检查实用程序来遍历数据帧的每一行,如果在任何列中检测到空值(在任何嵌套级别,对于字段或子字段,例如map,struct或array),都会引发错误。 )
我特别想知道:我是否已使用此检查实用程序重新插入车轮?是否有任何现有的库或Spark技术可以为我做到这一点(理想情况是比实现的方法更好)?
检查实用程序和管道的简化版本如下所示。如图所示,对check实用程序的调用已被注释掉。如果在未启用检查实用程序的情况下运行,则会在/tmp/output.csv中看到此结果。
cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5
标头后的第二行应该是一个数字,但它是一个空字符串(我想这是spark如何写出null的方式。)对于读取我的ETL作业输出的下游组件,此输出将有问题:这些组件只需要整数。
现在,我可以通过取消注释行来启用检查
//checkNulls(inDf)
当我这样做时,我得到一个异常,通知我无效的空值并打印出整个有问题的行,如下所示:
java.lang.RuntimeException: found null column value in row: [null,4]
Spark /权威指南中给出的一种可能的替代方法
权威指南Spark提到了执行此操作的可能性:
<dataframe>.na.drop()
但这将(AFAIK)静默删除坏记录,而不是标记坏记录。然后,我可以在删除前后对输入进行“设置减法”,但这似乎对查找无效和无效的性能造成了很大的影响。乍一看,我更喜欢我的方法。。。但是我仍然想知道是否还有更好的方法。完整的代码如下。谢谢 !
package org
import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {
import NullCheckMethods._
//val input = "2,3\n\"xxx\",4" // this will be dropped as malformed
val input = "2,3\n,4" // BUT.. this will be let through
new PrintWriter("/tmp/foo.csv") { write(input); close }
lazy val sparkConf = new SparkConf()
.setAppName("Learn Spark")
.setMaster("local[*]")
lazy val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val spark = sparkSession
val schema = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", IntegerType, nullable = false)
)
)
val inDf: DataFrame =
spark.
read.
option("header", "false").
option("mode", "dropMalformed").
schema(schema).
csv("/tmp/foo.csv")
//checkNulls(inDf)
val plusOneDf = inDf.selectExpr("one+1", "two+1")
plusOneDf.show()
plusOneDf.
write.
option("header", "true").
csv("/tmp/output.csv")
}
object NullCheckMethods extends Serializable {
def checkNull(columnValue: Any): Unit = {
if (columnValue == null)
throw new RuntimeException("got null")
columnValue match {
case item: Seq[_] =>
item.foreach(checkNull)
case item: Map[_, _] =>
item.values.foreach(checkNull)
case item: Row =>
item.toSeq.foreach {
checkNull
}
case default =>
println(
s"bad object [ $default ] of type: ${default.getClass.getName}")
}
}
def checkNulls(row: Row): Unit = {
try {
row.toSeq.foreach {
checkNull
}
} catch {
case err: Throwable =>
throw new RuntimeException(
s"found null column value in row: ${row}")
}
}
def checkNulls(df: DataFrame): Unit = {
df.foreach { row => checkNulls(row) }
}
}
您可以使用内置的Row方法anyNull拆分数据帧并以不同的方式处理这两种拆分:
val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)
如果您不打算手动进行空值处理,则使用内置的DataFrame.na方法会更简单,因为它已经实现了所有自动处理空值的常用方法(即使用默认值删除或填充空值)。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句