从卡夫卡获得最新价值

沙克尔

我有一个名为A的卡夫卡主题

主题A中的数据格式为:

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000}

现在在消费者方面,我只想获取一小时窗口的最新数据,这意味着每隔一小时我需要从基于created_at的主题中获取最新价值

我的预期输出是:

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}

我认为这可以通过ksql解决,但我不确定。请帮我。

预先感谢。

霍贾特

是的,您可以为此使用KSQL。请尝试以下操作:

CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAT) WITH (kafka_topic = 'topic_name', value_format = 'JSON');

CREATE TABLE maxRow AS SELECT id, name, max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) AS creted_at FROM s1 WINDOW TUMBLING (size 1 hour) GROUP BY id, name;

结果将具有created_atLinux时间戳格式时间。您可以在新查询中使用TIMESTAMPTOSTRING udf将其更改为所需的格式。如果您发现任何问题,请告诉我。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章