默认情况下,Spark sql模式中的可空性是建议性的。严格执行此规则的最佳方法是什么?

克里斯·贝德福德

我正在研究一个简单的ETL项目,该项目读取CSV文件,对每列进行一些修改,然后将结果写为JSON。我希望读取结果的下游进程可以确信我的输出符合约定的模式,但是我的问题是,即使我为所有字段都使用nullable = false定义了输入模式,null也会潜入并破坏我的输出文件,而且似乎没有让我的Spark输入字段强制执行“非null”的(性能)方法。

正如Spark在《权威指南》中所述,这似乎是一项功能:

当您定义一个架构,其中所有列均声明为不具有空值时,Spark将不会强制执行该操作,而是会乐于让空值进入该列。可为空的信号仅是为了帮助Spark SQL优化处理该列。如果列中的空值不应包含空值,则可能会得到错误的结果,或者会看到难以调试的奇怪异常。

我编写了一个小小的检查实用程序来遍历数据帧的每一行,如果在任何列中检测到空值(在任何嵌套级别,对于字段或子字段,例如map,struct或array),都会引发错误。 )

我特别想知道:我是否已使用此检查实用程序重新插入车轮?是否有任何现有的库或Spark技术可以为我做到这一点(理想情况是比实现的方法更好)?

检查实用程序和管道的简化版本如下所示。如图所示,对check实用程序的调用已被注释掉。如果在未启用检查实用程序的情况下运行,则会在/tmp/output.csv中看到此结果。

cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5

标头后的第二行应该是一个数字,但它是一个空字符串(我想这是spark如何写出null的方式。)对于读取我的ETL作业输出的下游组件,此输出将有问题:这些组件只需要整数。

现在,我可以通过取消注释行来启用检查

   //checkNulls(inDf)

当我这样做时,我得到一个异常,通知我无效的空值并打印出整个有问题的行,如下所示:

        java.lang.RuntimeException: found null column value in row: [null,4]

Spark /权威指南中给出的一种可能的替代方法

权威指南Spark提到了执行此操作的可能性:

<dataframe>.na.drop() 

但这将(AFAIK)静默删除坏记录,而不是标记坏记录。然后,我可以在删除前后对输入进行“设置减法”,但这似乎对查找无效和无效的性能造成了很大的影响。乍一看,我更喜欢我的方法。。。但是我仍然想知道是否还有更好的方法。完整的代码如下。谢谢 !

package org

import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {

  import NullCheckMethods._

  //val input = "2,3\n\"xxx\",4"          // this will be dropped as malformed
  val input = "2,3\n,4"                   // BUT.. this will be let through

  new PrintWriter("/tmp/foo.csv") { write(input); close }

  lazy val sparkConf = new SparkConf()
    .setAppName("Learn Spark")
    .setMaster("local[*]")
  lazy val sparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .getOrCreate()
  val spark = sparkSession

  val schema = new StructType(
    Array(
      StructField("one", IntegerType, nullable = false),
      StructField("two", IntegerType, nullable = false)
    )
  )

  val inDf: DataFrame =
    spark.
      read.
      option("header", "false").
      option("mode", "dropMalformed").
      schema(schema).
      csv("/tmp/foo.csv")

  //checkNulls(inDf)

  val plusOneDf = inDf.selectExpr("one+1", "two+1")
  plusOneDf.show()

  plusOneDf.
    write.
    option("header", "true").
    csv("/tmp/output.csv")

}

object NullCheckMethods extends Serializable {

  def checkNull(columnValue: Any): Unit = {
    if (columnValue == null)
      throw new RuntimeException("got null")
    columnValue match {
      case item: Seq[_] =>
        item.foreach(checkNull)
      case item: Map[_, _] =>
        item.values.foreach(checkNull)
      case item: Row =>
        item.toSeq.foreach {
          checkNull
        }
      case default =>
        println(
          s"bad object [ $default ] of type: ${default.getClass.getName}")
    }
  }

  def checkNulls(row: Row): Unit = {
    try {
      row.toSeq.foreach {
        checkNull
      }
    } catch {
      case err: Throwable =>
        throw new RuntimeException(
          s"found null column value in row: ${row}")
    }
  }


  def checkNulls(df: DataFrame): Unit = {
    df.foreach { row => checkNulls(row) }
  }
}
鲁塔

您可以使用内置的Row方法anyNull拆分数据帧并以不同的方式处理这两种拆分:

val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)

如果您不打算手动进行空值处理,则使用内置的DataFrame.na方法会更简单,因为它已经实现了所有自动处理空值的常用方法(即使用默认值删除或填充空值)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

为什么在SPARK中默认情况下未启用KryoSerializer?

默认情况下什么是TextView Gravity?

为什么默认情况下默认情况下默认保留opt文件夹?

默认情况下如何激活MathJax可访问性?

默认情况下可以删除的安装内容是什么?

Python 3默认情况下是什么“ __new__”?

让VSCode Intellisense建议默认情况下使用“ if block”而不是“ if”

默认情况下__eq __()方法中的内容

默认情况下,Spring是否使用工厂模式?

默认情况下,需要启用xeditable表单的“编辑”模式

默认情况下,IE 11文档模式

默认情况下以大图片模式启动Steam

在模式匹配的默认情况下,如何访问匹配值?

默认情况下是否启用了约束语言模式?

飞行模式无响应,默认情况下已激活

默认情况下如何启动tmux执行命令?

为什么默认情况下无法调试Android应用?

为什么默认情况下禁用CORS?

默认情况下,Perl解释器使用什么版本?

为什么默认情况下Hibernate不设置@DynamicInsert

默认情况下,什么是世界可写目录?

为什么默认情况下未设置@F?

为什么默认情况下iframe内联元素?

为什么默认情况下,scala的集合不是“视图”?

为什么默认情况下禁用PHP ZTS?

16.04:为什么默认情况下会安装“奶酪”?

默认情况下,MySQL在哪里存储.sql文件?

SQL-SELECT,如果为null,则默认情况下

|默认情况下,PyCharm在模板中说|预期}}