Spark 过滤器功能出错

用户1670805

以下是样本原始数据

tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)

下面是我的程序

val data = sc.textFile("/user/inputs/Tweets.csv")
val map_data = data.map(x=> x.split(","))
val filterdata  = map_data.filter(x=> x(5) == "Virgin America").count()  

它给出了如下异常:

[Stage 0:>                                                                                                                                                                        (0 + 2) / 2]
20/02/21 21:50:41 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-0-1-10.ec2.internal, executor 1): java.lang.ArrayIndexOutOfBoundsException: 5                       
        at $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)                                                                                               
        at $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)   
拉姆·加迪亚拉姆

您的数据不可拆分,这就是为什么数组索引越界的原因,请参见下面的代码.... 它将在选项 1 中复制您的版本

我使用 spark csv api 对其进行了改进,这可能对您有用。


    package examples

    import org.apache.log4j.Level

    object CSVTest extends App {
      import org.apache.spark.sql.{Dataset, SparkSession}
      val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
      val logger = org.apache.log4j.Logger.getLogger("org")
      logger.setLevel(Level.WARN)
      import spark.implicits._
      import org.apache.spark.sql.functions._
      val csvData: Dataset[String] = spark.sparkContext.parallelize(
        """
          |tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
          |570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)
        """.stripMargin.lines.toList).toDS()


      println("option 2 : spark csv version ")
      val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
      frame.show()
      frame.printSchema()
     println( frame.filter($"airline" === "Virgin America").count())

     println("option 1: your version which is not splittable thats the reason getting arrayindex out of bound ")
      val filterdata = csvData.map(x=> x.split(","))
      filterdata.foreach(x => println(x.mkString))
    //    filterdata.show(false)
    //    filterdata.filter {x=> {
    //      println(x)
    //      x(5) == "Virgin America"
    //    }
    //    }
    //  .count()


    }

结果 :

option 2 : spark csv version 
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|   name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|tweet_location|       user_timezone|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|570306133677760513|          neutral|                         1.0|          null|                     null|Virgin America|                  null|cairdin|               null|            0|@VirginAmerica Wh...|       null|2015-02-24 11:35:...|          null|Eastern Time (US ...|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+

root
 |-- tweet_id: long (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- airline_sentiment_confidence: double (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_sentiment_gold: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negativereason_gold: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)

1
option 1: your version which is not splittable thats the reason getting arrayindex out of bound 
tweet_idairline_sentimentairline_sentiment_confidencenegativereasonnegativereason_confidenceairlineairline_sentiment_goldnamenegativereason_goldretweet_counttexttweet_coordtweet_createdtweet_locationuser_timezone
570306133677760513neutral1.0Virgin Americacairdin0@VirginAmerica What @dhepburn said.2015-02-24 11:35:52 -0800Eastern Time (US & Canada)


Process finished with exit code 0

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章