在scala中,当我在scala中使用全局地图变量而不进行广播时会发生什么?
例如,如果我使用collect*
(例如collectAsMap
)获得了一个变量,则它似乎是一个全局变量,并且可以在所有RDD.mapValues()
函数中使用它而无需显式广播它。
但是我知道spark是分布式工作的,并且不广播它就不能处理全局存储的变量。所以发生了什么事?
代码示例(此代码在文本中称为tf-idf,其中df存储在Map中):
//dfMap is a String->int Map in memory
//Array[(String, Int)] = Array((B,2), (A,3), (C,1))
val dfMap = dfrdd.collectAsMap;
//tfrdd is a rdd, and I can use dfMap in its mapValues function
//tfrdd: Array((doc1,Map(A -> 3.0)), (doc2,Map(A -> 2.0, B -> 1.0)))
val tfidfrdd = tfrdd.mapValues( e => e.map(x => x._1 -> x._2 * lineNum / dfMap.getOrElse(x._1, 1) ) );
tfidfrdd.saveAsTextFile("/somedir/result/");
该代码工作正常。我的问题是那里发生了什么?驱动程序是否像广播一样将dfMap发送给所有工作人员?
如果我像这样显式编码广播,有什么区别:
dfMap = sc.broadcast(dfrdd.collectAsMap)
val tfidfrdd = tfrdd.mapValues( e => e.map(x => x._1 -> x._2 * lineNum / dfMap.value.getOrElse(x._1, 1) )
我检查了更多的资源并汇总了其他人的答案,并进行了整理。使用外部变量DIRECTLY(作为我所谓的“全局变量”)与使用sc.broadcast()广播变量之间的区别是这样的:
1)直接使用外部变量时,spark将与每个任务一起发送序列化变量的副本。而通过sc.broadcast,该变量将在每个EXECUTOR中发送一份。任务的数量通常比执行程序大10倍。
因此,当变量(例如映射)足够大(超过20K)时,前一个操作可能会花费大量时间进行网络转换并导致频繁的GC,从而减慢了触发时间。因此,建议显式广播较大的变量(> 20K)。
2)当直接使用外部变量时,该变量不持久,它以任务结束,因此无法重用。而通过sc.broadcast()将该变量自动持久化在执行者的内存中,它一直持续到您明确取消持久化为止。因此,sc.broadcast变量可用于任务和阶段。
因此,如果预计该变量将被多次使用,则建议使用sc.broadcast()。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句