考虑下面的示例,该示例GROUP BY
以相对大量的聚合和相对大量的组运行:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._
val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt
case class Data(A: Long = (math.random*num_groups).toLong)
val table = (1 to num_rows).map(i => Data()).toDF
val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")
// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")
这项工作的输入只有8MB,输出约为2.4GB,我正在一个群集中运行此群集,该群集具有三台工作机,每台工作机具有61GB的内存。结果:所有工作程序都因OutOfMemory异常而崩溃。即使使用较低的值,num_columns
由于GC开销,作业也会变得异常缓慢。
我们尝试过的事情包括:
有没有更好的方法来达到预期的效果?
一般来说,解决此类问题的几乎通用方法是将分区大小保持在合理的大小。虽然“合理的”有点主观,并且可能因情况而异,但100-200MB似乎是一个不错的起点。
我可以轻松地汇总您在单个工作spark.executor.memory
线程上提供的示例数据,并保持默认值(1GB),并将总可用资源限制为8个内核和8GB RAM。所有这些都通过使用50个分区并将聚合时间保持在3秒左右而没有任何特殊调整(这在1.5.2到2.0.0之间或多或少是一致的)。
总结一下:如果可能spark.default.parallelism
,在创建时增加或显式设置分区数DataFrame
。spark.sql.shuffle.partitions
对于这样的小型数据集,默认值应足够。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句