火花流:为数据帧中的每个ID选择带有最大时间戳的记录(pyspark)

马克·B

我有一个带有架构的数据框-

 |-- record_id: integer (nullable = true)
 |-- Data1: string (nullable = true)
 |-- Data2: string (nullable = true)
 |-- Data3: string (nullable = true)
 |-- Time: timestamp (nullable = true)

我想检索数据中的最后一条记录,并按record_id分组并使用最大的时间戳。

因此,如果数据最初是这样的:

 +----------+---------+---------+---------+-----------------------+
 |record_id |Data1    |Data2    |Data3    |                   Time|
 +----------+---------+-------------------------------------------+
 |        1 | aaa     | null    |  null   | 2018-06-04 21:51:53.0 |
 |        1 | null    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        1 | aaa     | null    |  dddd   | 2018-06-06 21:51:53.0 |
 |        1 | qqqq    | wwww    |  eeee   | 2018-06-07 21:51:53.0 |
 |        2 | aaa     | null    |  null   | 2018-06-04 21:51:53.0 |
 |        2 | aaaa    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        3 | aaa     | null    |  dddd   | 2018-06-06 21:51:53.0 |
 |        3 | aaaa    | bbbb    |  eeee   | 2018-06-08 21:51:53.0 |

我希望输出是

 +----------+---------+---------+---------+-----------------------+
 |record_id |Data1    |Data2    |Data3    |                   Time|
 +----------+---------+-------------------------------------------+
 |        1 | qqqq    | wwww    |  eeee   | 2018-06-07 21:51:53.0 |
 |        2 | aaaa    | bbbb    |  cccc   | 2018-06-05 21:51:53.0 |
 |        3 | aaaa    | bbbb    |  eeee   | 2018-06-08 21:51:53.0 |

我试图在同一流上加入2个查询,类似于此处的答案我的代码(其中df1是原始数据帧):

df1=df1.withWatermark("Timetemp", "2 seconds")
df1.createOrReplaceTempView("tbl")
df1.printSchema()
query="select t.record_id as record_id, max(t.Timetemp) as Timetemp from tbl t group by t.record_id"
df2=spark.sql(query)
df2=df2.withWatermark("Timetemp", "2 seconds")

qws=df1.alias('a').join(df2.alias('b'),((col('a.record_id')==col('b.record_id')) & (col("a.Timetemp")==col("b.Timetemp"))))

query = qws.writeStream.outputMode('append').format('console').start()

query.awaitTermination()  

我仍然收到此错误,尽管:

当流式数据帧/数据集上有流式聚合而没有水印时,不支持追加输出模式;

明显有水印时。该怎么办?我不能使用窗口,因为流中不支持基于非时间的窗口。

米哈伊尔·杜布科夫(Mikhail Dubkov)

我也有同样的任务。尝试了几种将current_timestamp添加到数据集的选项,它们按窗口分组并使用水印记录ID,但没有任何效果。

据我发现,没有可用的API来解决此任务。按和划分的窗口在流数​​据集上不起作用。

我使用MapGroupWithStateAPI解决了此任务,但没有保持如下状态:

import spark.implicits._

val stream = spark.readStream
  .option("maxFileAge", "24h")
  .option("maxFilesPerTrigger", "1000")
  .parquet(sourcePath)
  .as[input.Data]

val mostRecentRowPerPrimaryKey =
  stream
    .groupByKey(_.id)
    .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(takeMostRecentOnly)

mostRecentRowPerPrimaryKey
  .repartition(5)
  .writeStream
  .option("checkpointLocation", s"${streamingConfig.checkpointBasePath}/$streamName")
  .option("truncate", "false")
  .format("console")
  .outputMode(OutputMode.Update())
  .trigger(Trigger.ProcessingTime(60.seconds))
  .queryName(streamName)
  .start()

def takeMostRecentOnly(pk: Long, values: Iterator[input.Data], state: GroupState[input.Data]): input.Data = {
  values.maxBy(_.last_modified)
}

注意:仅在Update模式下有效。

希望能帮助到你!

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

查找所有连续时间戳组并为数据帧中的每个组分配一个唯一的 id

根据最新时间戳为每个唯一ID过滤数据帧

火花流为每个触发过程间隔的每条记录选择最新事件

如何在使用pyspark从其自己的数据帧中选择的火花数据帧中执行计算

将带有毫秒的UTC时间戳转换为Python数据帧中HH:MM:SS格式的本地时间戳

如何创建带有增加的时间戳列的数据帧?

数据流中带有时间戳的过程字段

Spark数据帧中具有特定条件的时间戳的唯一ID

根据另一个数据帧的“时间戳”和“id”从一个数据帧中获取最大数量

为每个ID子组在时间序列数据表中添加缺少的日期记录

如何将时间戳类型的PySpark数据帧截断为一天?

为表上的每个组选择最后一个时间戳(记录)

为每个员工选择一天中最早时间戳的完整记录

带有时区的pyspark时间戳

如何获取带有时间戳的帧,其中从python中的视频转换帧

如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构?-显示空值

为每个ID选择具有最大日期的值

Pyspark:将具有特定时间戳的行插入数据帧

从 Scala/spark 中时间戳的数据帧列中获取最大值

熊猫数据帧中的时间戳

为表中的每个ID选择最新的3条记录

将数据帧中带有时间戳的多行事件转换为具有开始和结束日期时间的单行

分组数据帧以获取带有时间戳排序的最新消息

合并带有时间戳和间隔的数据帧

将带有时间戳的pandas数据帧转换为String

带有时间戳的pandas数据帧插入mysql的问题

选择带有最大时间戳的唯一条形码

使用Spark Scala为数据中的每个组进行窗口操作后选择最新的时间戳记录

使用时间戳自加入pyspark数据帧