我的星火代码计算每个组织的员工有什么问题?

投降王

我正在执行Spark-scala的一些基本动手操作

要求是显示每个组织中的员工人数。

我已经通过使用groupByKey和Mapvalues达到了相同的要求,并且还通过创建一个keyValueRDD作为Array((CTS,1)(CTS,1),(TCS,1))然后应用了reduceByKey(( x,y)=> x + y)。两者均产生了正确的预期结果。

现在,我尝试下面的逻辑样式。我想使用reduceByKey,但我不想使用硬编码值为1的KeyValueRDD来实现员工人数。

请帮我更改下面的代码以获得预期的输出。我也想知道为什么我的代码在这里输出错误

由于reduceByKey是可交换的,我得到了不同的输出。

scala> myList
res34: List[String] = List(100|Surender|CTS|CHN, 101|Raja|CTS|CHN, 102|Kumar|TCS|BNG)

scala> val listRDD = sc.parallelize(myList)
listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:23

scala> val mapRDD = listRDD.map(elem => elem.split("\\|"))
mapRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:25

scala> val keyValueRDD = mapRDD.map(elem => (elem(2),elem(0).toInt))
keyValueRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:27

scala> val resultRDD = keyValueRDD.reduceByKey((x,y) => { var incr = 0 ; incr+1 } )
resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:29

scala> resultRDD.collect
res36: Array[(String, Int)] = Array((TCS,102), (CTS,1)

预期产量:

Array((TCS,1), (CTS,2)
流氓一号

即使问题明确指出,它也不想映射硬编码的值1,这绝对是正确的方法,如下所示。

scala> keyValueRDD.map({case (x,y) => x -> 1 }).reduceByKey(_ + _).collect()
res46: Array[(String, Int)] = Array((TCS,1), (CTS,2))

如果您了解spark的工作原理,则永远不要在需要{ var incr = 0 ; incr+1 }lambda函数的地方编写这样的命令性代码

reduceByKey应该接受一个累加器的两个参数,并且当前值要减小,并且它必须返回该累加器的新值。在您的代码中,您总是返回1,因为对于每个减少的值,incr变量都会实例化为0。因此,累加器值始终保持为1。这解释了为什么CTS在有缺陷的结果中的值为1。

对于TCS,由于spark看到密钥TCS仅具有单个记录,因此不需要进一步减少它,因此返回其原始值。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章