在 KSQL 中,可以EXTRACTJSONFIELD
用于嵌套结构,但我不知道如何处理可变长度列表。例如:
{"id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}
我可以quux
作为基本流的 varchar处理,
create stream mystream (id bigint, quux varchar)
with (kafka_topic='mytopic', value_format='json')
但我希望能够把它变成一张桌子:
quuxid x y
1 1 2
1 3 4
1 5 6
如何处理 KSQL 中的可变长度列表?
这在 KSQL 中目前是不可能的。
如您所见,您可以专门通过索引访问数组数据:填充测试数据:
echo '{"id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}' | \
kafkacat -b localhost:9092 -t quux
在 KSQL 中检查消息:
ksql> print 'quux' from beginning;
Format:JSON
{"ROWTIME":1528791985250,"ROWKEY":"null","id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}
创建流:
create stream mystream (id bigint, quux varchar) \
with (kafka_topic='quux', value_format='json');
查询流:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'
ksql>
ksql> select * from mystream;
1528791985250 | null | 1 | [{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]
创建流array
:
ksql> CREATE STREAM mystream2 (id bigint, quux array<varchar>) with (kafka_topic='quux', value_format='json');
Message
----------------
Stream created
----------------
按索引访问单个项目:
ksql> SELECT quux[0], EXTRACTJSONFIELD(quux[0],'$.x') AS X, EXTRACTJSONFIELD(quux[0],'$.y') AS Y from mystream2;
{"x":1,"y":2} | 1 | 2
但是您正在寻找EXPLODE
KSQL 中尚不存在的函数的等价物。
相关github问题:
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句