使用Spark 1.6.0,cdh 5.7.0
我有一个csv文件,其中包含要处理的表的列表,我想在处理中实现并行性。截至目前,我正在使用收集来处理每个对象,尝试在Scala中使用Future选项,甚至尝试使用此https://blog.knoldus.com/2015/10/21/demystifying-asynchronous-actions-in-spark/
val allTables = sc.textFile("hdfs://.......")
allTables.collect().foreach(
table => {
val processing = sqlContext.sql(s"select * from ${table} ")
processing.saveAsParquetFile("hdfs://.......")
}
)
以及使用scala集合并行功能。
allTables.collect().par.foreach(table => ..)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句