为什么在reduceByKey之后所有数据最终都集中在一个分区中?

帕特里克·麦格隆(Patrick McGloin)

我有包含以下部分的Spark应用程序:

val repartitioned = rdd.repartition(16)
val filtered: RDD[(MyKey, myData)] = MyUtils.filter(repartitioned, startDate, endDate)
val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, kv._2))
val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)

当我用一些日志记录运行它时,我看到的是:

repartitioned ======> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
filtered ======> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
mapped ======> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
reduced ======> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]

我的日志记录使用以下两行完成:

val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, true)
log.info(s"rdd ======> [${sizes.collect().toList}]")

我的问题是,为什么我的数据会在reduceByKey之后归结为一个分区?在过滤器之后,可以看到数据是均匀分布的,但是reduceByKey导致数据仅在一个分区中。

帕特里克·麦格隆(Patrick McGloin)

自从我弄清楚了以后,我将回答我自己的问题。我的DateTimes都没有秒和毫秒,因为我想对属于同一分钟的数据进行分组。相距一分钟的Joda DateTimes的hashCode()是一个常数:

scala> val now = DateTime.now
now: org.joda.time.DateTime = 2015-11-23T11:14:17.088Z

scala> now.withSecondOfMinute(0).withMillisOfSecond(0).hashCode - now.minusMinutes(1).withSecondOfMinute(0).withMillisOfSecond(0).hashCode
res42: Int = 60000

从该示例可以看出,如果hashCode值的间隔类似,则它们可以最终位于同一分区中:

scala> val nums = for(i <- 0 to 1000000) yield ((i*20 % 1000), i)
nums: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((0,0), (20,1), (40,2), (60,3), (80,4), (100,5), (120,6), (140,7), (160,8), (180,9), (200,10), (220,11), (240,12), (260,13), (280,14), (300,15), (320,16), (340,17), (360,18), (380,19), (400,20), (420,21), (440,22), (460,23), (480,24), (500,25), (520,26), (540,27), (560,28), (580,29), (600,30), (620,31), (640,32), (660,33), (680,34), (700,35), (720,36), (740,37), (760,38), (780,39), (800,40), (820,41), (840,42), (860,43), (880,44), (900,45), (920,46), (940,47), (960,48), (980,49), (0,50), (20,51), (40,52), (60,53), (80,54), (100,55), (120,56), (140,57), (160,58), (180,59), (200,60), (220,61), (240,62), (260,63), (280,64), (300,65), (320,66), (340,67), (360,68), (380,69), (400,70), (420,71), (440,72), (460,73), (480,74), (500...

scala> val rddNum = sc.parallelize(nums)
rddNum: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> val reducedNum = rddNum.reduceByKey(_+_)
reducedNum: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at reduceByKey at <console>:25

scala> reducedNum.mapPartitions(iter => Array(iter.size).iterator, true).collect.toList

res2: List[Int] = List(50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)

为了在分区上更均匀地分布数据,我创建了自己的自定义Partitoiner:

class JodaPartitioner(rddNumPartitions: Int) extends Partitioner {
  def numPartitions: Int = rddNumPartitions
  def getPartition(key: Any): Int = {
    key match {
      case dateTime: DateTime =>
        val sum = dateTime.getYear + dateTime.getMonthOfYear +  dateTime.getDayOfMonth + dateTime.getMinuteOfDay  + dateTime.getSecondOfDay
        sum % numPartitions
      case _ => 0
    }
  }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

为什么在reduceByKey之后所有数据最终都集中在一个分区中?

Pyspark 数据帧重新分区将所有数据放在一个分区中

如果我通过将搜索字段保留为空白并点击saarch按钮,并且所有条目都需要一个HTML按钮,为什么还要显示mysql databese中的所有数据?

为什么在使用GROUP BY之后,列表中所有对象的一个字段都设置为零?

在Linux中,为什么所有进程都存在一个全局/ dev / stdin文件?

为什么所有CSS文件都放在一个文件夹中?

我想要一个全新的 linux 安装,所有这些分区都需要吗?为什么有一个 999GB 的?

为什么Openstack Swift服务将其所有数据/文件都放在根目录而不是我的指定分区中?

当所有数据都存储在设备上时,创建一个强密码方案

如何使用php删除最后一个点之后的所有数据?

所有数据都显示在一行中

为什么所有的数组都删除了这个集合中的一个元素而不是被引用的那个?

即使所有数据都显示在 console.log() 中,为什么所有数据都不会显示在浏览器上?

为什么setInterval一次记录所有数据,而不是每次都等待指定的时间间隔?

我希望所有唯一的数组值都集中在 mongodb 的一个字段中

为什么当单击一个项目时,所有项目都打开

为什么所有反应组件都改变高度而不是一个

使用Spark时如何确保属于用户的所有数据都转到同一个文件?

虽然 this.state 是一个 Javascript 对象,但为什么即使在将具有数据的状态传递给它之后 state 还是 null?

在shell中拆分一个字符串,以使所有字段都达到最终的“。”。(重击)

我已经使用了数据库参考,那么为什么我要将所有用户数据都放入recyclerview中,而我只希望将一个用户数据放入recyclerview中?

将所有数字都放在一个字符串数组中

如何过滤掉Z列最后一个值为1之后的所有数据框行?

如何获取一个圈内的所有数据?

我创建了一个 php 表单并连接到 mysql 我的所有数据都添加到数据库中,只有性别没有显示在数据库中

为什么“期望解码数据但找到一个数组”为什么我的代码中没有数组

为什么在Java的TreeMap中重载可比时,键集中只有一个键

当所有数据都在一个表中时,MySQL连接多个SQL结果

postgresql外部联接查询以从一个表中获取所有数据