使用 Spark Dataframe 遍历记录并根据某些条件将当前值与先前值连接起来

维杰 B

我对 Spark 和 Scala 编码很陌生。我目前正在研究 Spark DataFrames。我需要遍历记录并重复相同的值,直到满足下一个条件。请在示例下方找到,我在给定的文件中只有一列。该示例有两种类型的值,一种是标题数据,另一种是详细信息数据。标题数据始终为 10 个字符长度,详细数据始终为 15 个字符长度。我想将前 10 个字符与下一个记录的 15 个字符连接起来,直到我们达到下一个 10 个字符,依此类推...

df
---------------
1RHGTY567U //header data 
6786TYUIOPTR141 //detail data
6786TYUIOPTYU67 //detail data
T7997999HHBFFE6 //detail data
8YUITY567U      //header data 
HJS7890876997BB //detail data
BFJFBFKFN787897
GS678790877656H
BFJFDK786WQ4243
74849469GJGNVFM
67YUBMHJKH
VFJF788968FJFJD
HFJFGKJD789768D
GFJFHFFLLJFJDLD

我已经通过收集 DataFrame、循环遍历它并将其与其他记录连接起来进行了尝试,如下所示。我遵循的方法是一个代价高昂的操作,因为 collect() 是不可取的。我可以使用滞后窗口函数将当前值与前一个值连接起来,但我的场景几乎没有什么不同。

val srcDF = spark.read.format("csv").load(location + "/" + filename)

   //Adding another column to the DataFrame which shows length of the value in the column
   var newDF = srcDF.withColumn("col_length", length($"_c0"))

   //Converting DataFrame to RDD
   var RDD = newDF.map(row => row(0).toString + "," + row(1).toString).rdd

   //Iterating through RDD to concatenate Header data with the detail
   for (row <- RDD.collect) {
      if (row.split(",")(1).toInt == 16) { Rec = row.split(",")(0).toString }
      if (row.split(",")(1).toInt > 16) {
         srcModified += Rec + row.split(",")(0).toString
      } 
      else {
         srcModified += Rec
      }
   }

   //Converting ListBuffer to RDD
   val modifiedRDD = sc.parallelize(srcModified.toSeq)

我期待的输出如下所示:

new_DF
------

1RHGTY567U //header data 
1RHGTY567U6786TYUIOPTR141 //header data concatenated with detail data
1RHGTY567U6786TYUIOPTYU67 //header data concatenated with detail data
1RHGTY567UT7997999HHBFFE6 //header data concatenated with detail data
8YUITY567U      //header data 
8YUITY567UHJS7890876997BB //header data concatenated with detail data
8YUITY567UBFJFBFKFN787897 //header data concatenated with detail data
8YUITY567UGS678790877656H //header data concatenated with detail data
8YUITY567UBFJFDK786WQ4243 //header data concatenated with detail data
8YUITY567U74849469GJGNVFM //header data concatenated with detail data
67YUBMHJKH
67YUBMHJKHVFJF788968FJFJD
67YUBMHJKHHFJFGKJD789768D
67YUBMHJKHGFJFHFFLLJFJDLD

请问有什么建议吗?

帕夏701

增量列可以添加到Dataframe,增量列的Window会通过“last”函数找到最新的标题:

val withId = originalDF.select($"value", monotonically_increasing_id().alias("id"))

val idWindow = Window.orderBy("id")
withId
  .withColumn("previousHeader",
      last( when(length($"value") < 15, $"value")
            .otherwise(null), true).over(idWindow)
          )
  .select(
      when($"value"=== $"previousHeader", $"value")
      .otherwise(concat($"previousHeader", $"value")).alias("value")
  )

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

使用rbind将具有零值的数据帧列表连接起来

将Spark中的稀疏向量连接起来?

使用条件条件LINQ将2个数据表连接起来

将空的Pandas DataFrame与Multiindex DataFrame连接起来

如何将 Spark 与 Elastic Search 连接起来

如何从SparkContext将Apache Spark与Yarn连接起来?

将平面索引与分层索引的DataFrame连接起来

使用Python Pandas将两个具有范围条件的表连接起来

使用列值作为spark DataFrame函数的参数

使用JavaRdd <Row>映射Spark DataFrame Colunm值

使用Spark Dataframe API格式化时间戳记值

无法使用整数值替换Spark Dataframe中的空值

条件聚合 Spark DataFrame

如何使用Java连接Spark DataFrame中的所有列?

仅使用for循环体内的条件将带空格的字符串连接起来

Spark:如何在A的ID数组列不包含B的ID列的条件下,将两个`Dataset'的A和B连接起来?

使用Spark / scala将String转换为DataFrame

使用Scala API将TSV读取到Spark Dataframe中

使用Spark DataFrame将数据插入Cassandra表

Scala/Spark:仅使用 RDD 函数将 DataFrame 展平

Spark 使用 Scala 将 json 数据转换为 DataFrame

根据条件分割Spark DataFrame

Pandas:如何将 MultiIndex DataFrame 与单个索引 DataFrame 连接起来,以及自定义排序

使用箭头将一个项目内但跨组的点连接起来

使用视图的委托方法(PySide / Qt / PyQt)将QMainWindow中的动作连接起来

尝试使用 ffmpeg 将多个视频与多个音频流连接起来

如何使用css将椭圆或圆与线连接起来

使用Play Framework和JPA将两个表连接起来

使用CodeIginter将两个带有where子句的表连接起来