如何基于同一字段加入两个rdds?

学习大数据

我是Scala的新手,并开始与之合作RDDs我有两个具有以下标头和数据的csv文件:csv1.txt

id,"location", "zipcode" 
1, "a", "12345"
2, "b", "67890"
3, "c" "54321"

csv2.txt

"location_x", "location_y", trip_hrs
"a", "b", 1
"a", "c", 3
"b", "c", 2
"a", "b", 1
"c", "b", 2

基本上,csv1数据是一组不同的位置和邮政编码,而csv2数据的行程持续时间在location_x和location_y之间。

在这两个数据集信息的公共部分是定位csv1location_xCSV2,即使他们有不同的头名。

我想创建两个RDDs包含从数据的一个csv1,并从其他CSV2

然后,我想join对这两个RDDs位置进行返回,并返回该位置的邮政编码,所有行程时间的总和,如下所示:

("a", "zipcode", 5)
("b", "zipcode", 2)
("c", "zipcode", 2)

我想知道你们中的一个人是否可以帮助我解决这个问题。谢谢。

呼叫

我将给您代码(在IntelliJ中是一个完整的应用程序),并附带一些解释。希望对您有所帮助。

请阅读Spark文档以获取详细信息。

使用键值对

可以使用Spark Dataframes解决此问题,您可以自己尝试。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object Joining {

  val spark = SparkSession
    .builder()
    .appName("Joining")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "Joining")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val path = "/home/cloudera/files/tests/"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      // read the files
      val file1 = sc.textFile(s"${path}join1.csv")
      val header1 = file1.first // extract the header of the file
      val file2 = sc.textFile(s"${path}join2.csv")
      val header2 = file2.first // extract the header of the file

      val rdd1 = file1
        .filter(line => line != header1) // to leave out the header
        .map(line => line.split(",")) // split the lines => Array[String]
        .map(arr => (arr(1).trim,arr(2).trim)) // to make up a pairRDD with arr(1) as key and zipcode

      val rdd2 = file2
          .filter(line => line != header2)
          .map(line => line.split(",")) // split the lines => Array[String]
          .map(arr => (arr(0).trim, arr(2).trim.toInt)) // to make up a pairRDD with arr(0) as key and trip_hrs

      val joined = rdd1 // join the pairRDD by its keys
          .join(rdd2)
          .cache()  // cache joined in memory

      joined.foreach(println) // checking data
      println("**************")

//      ("c",("54321",2))
//      ("b",("67890",2))
//      ("a",("12345",1))
//      ("a",("12345",3))
//      ("a",("12345",1))

      val result = joined.reduceByKey({ case((zip, time), (zip1, time1) ) => (zip, time + time1) })

      result.map({case( (id,(zip,time)) ) => (id, zip, time)}).foreach(println) // checking output

//      ("b","67890",2)
//      ("c","54321",2)
//      ("a","12345",5)

      // To have the opportunity to view the web console of Spark: http://localhost:4041/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在Django中选择同一字段的两个范围?

如何在同一字段中查找/打印两个符号相反的连续浮点数?

比较两个不同列中同一字段的值

Cosmos DB Querying文档必须在同一字段中具有两个值

基于同一字段中2个值的条件分组

如何选择两个唯一字段的结果并显示为一列结果?

如何从同一字段中选择包含不同数据的两列

如何通过同一字段的两次查找过滤查询集?

两个唯一字段上的ForeignKey关系

通过Google表格中的两个唯一字段获取整行

如何使用codeigniter在同一字段中搜索2个值?

Spring Data:是否可以在不编写实现的情况下通过同一字段的两个值来查找?

使用sed在同一字段的两个字符之间插入空格

从一个表进行MySQL查询-两次选择同一字段

如何将具有唯一字段的两个不同对象列表合并到一个单独的列表中?

列表按第一字段排序,如何在第一字段相同的行上加入第二字段?

通过两个唯一字段搜索spring boot的唯一性

根据不同的字段条件对同一字段进行两次求和

匹配两个制表符分隔文件的第一字段并打印匹配值

选择加入时的唯一字段

Swift 解码 JSON,同一字段有两种可能的类型

Spring Mongo条件查询同一字段两次

Mongo聚合,按2个不同阵列中的同一字段分组

JavaScript-在同一字段上对2个对象数组进行排序

在同一字段中查找和替换

Birt报告同一字段的多个参数

匹配同一字段中的多个值

可选绑定变量以匹配同一字段

查询同一字段的多个 Firestore 集合