使用spark-sql GROUP BY解决性能和内存问题

丹尼尔·M

考虑下面的示例,该示例GROUP BY以相对大量的聚合和相对大量的组运行:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._

val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt

case class Data(A: Long = (math.random*num_groups).toLong)

val table = (1 to num_rows).map(i => Data()).toDF

val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")

// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")

这项工作的输入只有8MB,输出约为2.4GB,我正在一个群集中运行此群集,该群集具有三台工作机,每台工作机具有61GB的内存。结果:所有工作程序都因OutOfMemory异常而崩溃。即使使用较低的值,num_columns由于GC开销,作业也会变得异常缓慢。

我们尝试过的事情包括:

  • 减小分区大小(减少内存占用,但增加簿记开销)
  • 在进行聚合之前,使用HashPartitioner对数据进行预分区(减少了内存消耗,但需要进行完全重新组合才能进行任何实际工作)

有没有更好的方法来达到预期的效果?

零323

一般来说,解决此类问题的几乎通用方法是将分区大小保持在合理的大小。虽然“合理的”有点主观,并且可能因情况而异,但100-200MB似乎是一个不错的起点。

我可以轻松地汇总您在单个工作spark.executor.memory线程上提供的示例数据,并保持默认值(1GB),并将总可用资源限制为8个内核和8GB RAM。所有这些都通过使用50个分区并将聚合时间保持在3秒左右而没有任何特殊调整(这在1.5.2到2.0.0之间或多或少是一致的)。

总结一下:如果可能spark.default.parallelism,在创建时增加或显式设置分区数DataFramespark.sql.shuffle.partitions对于这样的小型数据集,默认值应足够。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章