我有使用rdd的要求:
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)
结果是:
(纽约,名单(杰克))
(底特律清单(Michael,Peter,George))
(洛杉矶,名单(汤姆))
(休斯顿,李斯特(约翰))
(芝加哥,名单(戴维,安德鲁))
如何在spark2.0中使用数据集?
我有使用自定义函数的方法,但是感觉是如此复杂,没有简单的指向方法?
我建议您先创建一个case class
as
case class Monkey(city: String, firstName: String)
这case class
应该在主类之外定义。然后,您可以使用toDS
function,并按如下所示使用groupBy
和aggregation
functioncollect_list
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test)
.map(row => Monkey(row._1, row._2))
.toDS()
.groupBy("city")
.agg(collect_list("firstName") as "list")
.show(false)
您将输出为
+-----------+------------------------+
|city |list |
+-----------+------------------------+
|Los Angeles|[Tom] |
|Detroit |[Michael, Peter, George]|
|Chicago |[David, Andrew] |
|Houston |[John] |
|New York |[Jack] |
+-----------+------------------------+
您总是可以RDD
通过调用.rdd
函数来转换回
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句