我想使用Spark读取avro文件(我使用的是Spark 1.3.0,所以没有数据帧)
我使用这段代码读取了avro文件
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.spark.SparkContext
private def readAvro(sparkContext: SparkContext, path: String) = {
sparkContext.newAPIHadoopFile[
AvroKey[GenericRecord],
NullWritable,
AvroKeyInputFormat[GenericRecord]
](path)
}
我执行此操作并获得一个RDD。现在,从RDD中,如何提取特定列的值?像循环遍历所有记录并提供列名的值?
[编辑]如下面贾斯汀的建议,我尝试了
val rdd = sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](input)
rdd.map(record=> record._1.get("accountId")).toArray().foreach(println)
但我得到一个错误
<console>:34: error: value get is not a member of org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]
rdd.map(record=> record._1.get("accountId")).toArray().foreach(println)
AvroKey
具有datum
提取包装值的方法。并且GenericRecord
具有get
将列名作为字符串接受的方法。因此,您可以使用map
rdd.map(record=>record._1.datum.get("COLNAME"))
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句