以下是样本原始数据
tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)
下面是我的程序
val data = sc.textFile("/user/inputs/Tweets.csv")
val map_data = data.map(x=> x.split(","))
val filterdata = map_data.filter(x=> x(5) == "Virgin America").count()
它给出了如下异常:
[Stage 0:> (0 + 2) / 2]
20/02/21 21:50:41 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-0-1-10.ec2.internal, executor 1): java.lang.ArrayIndexOutOfBoundsException: 5
at $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)
at $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)
您的数据不可拆分,这就是为什么数组索引越界的原因,请参见下面的代码.... 它将在选项 1 中复制您的版本
我使用 spark csv api 对其进行了改进,这可能对您有用。
package examples
import org.apache.log4j.Level
object CSVTest extends App {
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
import spark.implicits._
import org.apache.spark.sql.functions._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
"""
|tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
|570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)
""".stripMargin.lines.toList).toDS()
println("option 2 : spark csv version ")
val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()
println( frame.filter($"airline" === "Virgin America").count())
println("option 1: your version which is not splittable thats the reason getting arrayindex out of bound ")
val filterdata = csvData.map(x=> x.split(","))
filterdata.foreach(x => println(x.mkString))
// filterdata.show(false)
// filterdata.filter {x=> {
// println(x)
// x(5) == "Virgin America"
// }
// }
// .count()
}
结果 :
option 2 : spark csv version
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
| tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence| airline|airline_sentiment_gold| name|negativereason_gold|retweet_count| text|tweet_coord| tweet_created|tweet_location| user_timezone|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|570306133677760513| neutral| 1.0| null| null|Virgin America| null|cairdin| null| 0|@VirginAmerica Wh...| null|2015-02-24 11:35:...| null|Eastern Time (US ...|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
root
|-- tweet_id: long (nullable = true)
|-- airline_sentiment: string (nullable = true)
|-- airline_sentiment_confidence: double (nullable = true)
|-- negativereason: string (nullable = true)
|-- negativereason_confidence: string (nullable = true)
|-- airline: string (nullable = true)
|-- airline_sentiment_gold: string (nullable = true)
|-- name: string (nullable = true)
|-- negativereason_gold: string (nullable = true)
|-- retweet_count: integer (nullable = true)
|-- text: string (nullable = true)
|-- tweet_coord: string (nullable = true)
|-- tweet_created: string (nullable = true)
|-- tweet_location: string (nullable = true)
|-- user_timezone: string (nullable = true)
1
option 1: your version which is not splittable thats the reason getting arrayindex out of bound
tweet_idairline_sentimentairline_sentiment_confidencenegativereasonnegativereason_confidenceairlineairline_sentiment_goldnamenegativereason_goldretweet_counttexttweet_coordtweet_createdtweet_locationuser_timezone
570306133677760513neutral1.0Virgin Americacairdin0@VirginAmerica What @dhepburn said.2015-02-24 11:35:52 -0800Eastern Time (US & Canada)
Process finished with exit code 0
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句