我正在执行Spark-scala的一些基本动手操作
要求是显示每个组织中的员工人数。
我已经通过使用groupByKey和Mapvalues达到了相同的要求,并且还通过创建一个keyValueRDD作为Array((CTS,1)(CTS,1),(TCS,1))然后应用了reduceByKey(( x,y)=> x + y)。两者均产生了正确的预期结果。
现在,我尝试下面的逻辑样式。我想使用reduceByKey,但我不想使用硬编码值为1的KeyValueRDD来实现员工人数。
请帮我更改下面的代码以获得预期的输出。我也想知道为什么我的代码在这里输出错误
由于reduceByKey是可交换的,我得到了不同的输出。
scala> myList
res34: List[String] = List(100|Surender|CTS|CHN, 101|Raja|CTS|CHN, 102|Kumar|TCS|BNG)
scala> val listRDD = sc.parallelize(myList)
listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:23
scala> val mapRDD = listRDD.map(elem => elem.split("\\|"))
mapRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:25
scala> val keyValueRDD = mapRDD.map(elem => (elem(2),elem(0).toInt))
keyValueRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:27
scala> val resultRDD = keyValueRDD.reduceByKey((x,y) => { var incr = 0 ; incr+1 } )
resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:29
scala> resultRDD.collect
res36: Array[(String, Int)] = Array((TCS,102), (CTS,1)
预期产量:
Array((TCS,1), (CTS,2)
即使问题明确指出,它也不想映射硬编码的值1,这绝对是正确的方法,如下所示。
scala> keyValueRDD.map({case (x,y) => x -> 1 }).reduceByKey(_ + _).collect()
res46: Array[(String, Int)] = Array((TCS,1), (CTS,2))
如果您了解spark的工作原理,则永远不要在需要{ var incr = 0 ; incr+1 }
lambda函数的地方编写这样的命令性代码。
reduceByKey应该接受一个累加器的两个参数,并且当前值要减小,并且它必须返回该累加器的新值。在您的代码中,您总是返回1,因为对于每个减少的值,incr变量都会实例化为0。因此,累加器值始终保持为1。这解释了为什么CTS在有缺陷的结果中的值为1。
对于TCS,由于spark看到密钥TCS仅具有单个记录,因此不需要进一步减少它,因此返回其原始值。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句