我正在尝试将文件目录加载到Spark RDD中,并且需要为每行附加原始文件名。
我无法找到一种使用sc.textFile进行常规RDD操作的方法,因此我现在尝试使用WholeTextFiles方法来加载每个文件的文件名。
我正在使用此代码:
val lines =
sc.wholeTextFiles(logDir).flatMap{ case (filename, content) =>
val hash = modFiles.md5(filename)
content.split("\n")
.filter(line =>
<filter conditions>
.map(line => line+hash)
}
但是这段代码给了我一个Java堆内存不足的错误,我想它正在尝试一次加载所有文件?
是否有一种不通过使用WholeTextFiles来解决此问题的方法和/或是否有一种不使用WholeTextFiles一次加载所有文件的方法?
解决方案是使用此页面上的代码:http : //themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.HadoopRDD
// Create the text file
val text = sc.hadoopFile(logDir,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
// Cast to a HadoopRDD
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
val linesRaw = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
// get file name hash - you need to define your own hash function
val fileHash = hash(inputSplit.asInstanceOf[FileSplit].getPath.toString)
// input split is in _1 and line is in _2
iterator.map(splitAndLine => splitAndLine._2+fileHash)
}
与使用sc.textFile相比,使用此代码可降低约10%的性能。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句