假设我有一个data
type的对象Array[RDD]
。我想在此对象中的每一个上学习独立的机器学习模型RDD
。例如,对于随机森林:
data.map{ d => RandomForest.trainRegressor(d,2,Map[Int,Int](),2,"auto","gini",2,10) }
当我使用开展这项工作时spark-submit --master yarn-client ...
,独立学习任务似乎并未在多个节点上并行化。从应用程序UI的屏幕截图中可以看出,几乎所有工作都仅由一个节点(在此即节点10)完成:
附录
为了完整起见,整个代码如下:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest
object test {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("test")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
// Load data
val rawData = sc.textFile("data/mllib/sample_tree_data.csv")
val data = rawData.map { line =>
val parts = line.split(',').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.tail))
}
val CV_data = (1 to 100).toArray.map(_ => {val splits = data.randomSplit(Array(0.7, 0.3)) ; splits(0)})
CV_data.map(d => RandomForest.trainClassifier(d, 2, Map[Int, Int](), 2, "sqrt", "gini", 2, 100))
sc.stop()
System.exit(0)
}
}
问题在于这RandomForest.trainClassifier
可以看作是一个动作,因为它会急切地触发某些涉及的RDD计算的执行。因此,无论何时调用RandomForest.trainClassifier
,Spark作业都会被提交到集群并执行。
由于map
对Scala的操作Array
是按顺序执行的,因此您最终会trainClassifier
接连执行一个作业。为了并行执行作业,您必须调用map
并行集合。以下代码段可以解决问题:
CV_data.par.map(d => RandomForest.trainClassifier(d, 2, Map[Int, Int](), 2, "sqrt", "gini", 2, 100))
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句