我有一个带有架构的数据框-
|-- record_id: integer (nullable = true)
|-- Data1: string (nullable = true)
|-- Data2: string (nullable = true)
|-- Data3: string (nullable = true)
|-- Time: timestamp (nullable = true)
我想检索数据中的最后一条记录,并按record_id分组并使用最大的时间戳。
因此,如果数据最初是这样的:
+----------+---------+---------+---------+-----------------------+
|record_id |Data1 |Data2 |Data3 | Time|
+----------+---------+-------------------------------------------+
| 1 | aaa | null | null | 2018-06-04 21:51:53.0 |
| 1 | null | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 1 | aaa | null | dddd | 2018-06-06 21:51:53.0 |
| 1 | qqqq | wwww | eeee | 2018-06-07 21:51:53.0 |
| 2 | aaa | null | null | 2018-06-04 21:51:53.0 |
| 2 | aaaa | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 3 | aaa | null | dddd | 2018-06-06 21:51:53.0 |
| 3 | aaaa | bbbb | eeee | 2018-06-08 21:51:53.0 |
我希望输出是
+----------+---------+---------+---------+-----------------------+
|record_id |Data1 |Data2 |Data3 | Time|
+----------+---------+-------------------------------------------+
| 1 | qqqq | wwww | eeee | 2018-06-07 21:51:53.0 |
| 2 | aaaa | bbbb | cccc | 2018-06-05 21:51:53.0 |
| 3 | aaaa | bbbb | eeee | 2018-06-08 21:51:53.0 |
我试图在同一流上加入2个查询,类似于此处的答案。我的代码(其中df1是原始数据帧):
df1=df1.withWatermark("Timetemp", "2 seconds")
df1.createOrReplaceTempView("tbl")
df1.printSchema()
query="select t.record_id as record_id, max(t.Timetemp) as Timetemp from tbl t group by t.record_id"
df2=spark.sql(query)
df2=df2.withWatermark("Timetemp", "2 seconds")
qws=df1.alias('a').join(df2.alias('b'),((col('a.record_id')==col('b.record_id')) & (col("a.Timetemp")==col("b.Timetemp"))))
query = qws.writeStream.outputMode('append').format('console').start()
query.awaitTermination()
我仍然收到此错误,尽管:
当流式数据帧/数据集上有流式聚合而没有水印时,不支持追加输出模式;
明显有水印时。该怎么办?我不能使用窗口,因为流中不支持基于非时间的窗口。
我也有同样的任务。尝试了几种将current_timestamp
列添加到数据集的选项,它们按窗口分组并使用水印记录ID,但没有任何效果。
据我发现,没有可用的API来解决此任务。按和划分的窗口在流数据集上不起作用。
我使用MapGroupWithState
API解决了此任务,但没有保持如下状态:
import spark.implicits._
val stream = spark.readStream
.option("maxFileAge", "24h")
.option("maxFilesPerTrigger", "1000")
.parquet(sourcePath)
.as[input.Data]
val mostRecentRowPerPrimaryKey =
stream
.groupByKey(_.id)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(takeMostRecentOnly)
mostRecentRowPerPrimaryKey
.repartition(5)
.writeStream
.option("checkpointLocation", s"${streamingConfig.checkpointBasePath}/$streamName")
.option("truncate", "false")
.format("console")
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime(60.seconds))
.queryName(streamName)
.start()
def takeMostRecentOnly(pk: Long, values: Iterator[input.Data], state: GroupState[input.Data]): input.Data = {
values.maxBy(_.last_modified)
}
注意:仅在Update
模式下有效。
希望能帮助到你!
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句