Kafka Streams 中的消息键长

我正在尝试使用 Long 作为消息键的类型,但我得到

Exception in thread "kafka_stream_app-f236aaca-3f90-469d-9d32-20ff694806ff-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize key for record. topic=test, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:38)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

我查了一下,data.length7

在我设置的流配置中

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

我用

KStream<Long, GenericRecord> stream = builder.stream(topic);

我试过通过一个简单的应用程序发送消息,也有kafka-avro-console-producer

/opt/confluent-3.3.0/bin/kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic test \
--property key.separator=, \
--property parse.key=true \
--property key.schema='{"type":"long"}' \
--property value.schema='{"type":"string"}' \
--property schema.registry.url=http://localhost:8081

有消息

123,"293"

使用kafka-avro-console-consumer我可以使用消息并查看(使用--property print.key=true正确发送的密钥123

知道解码消息时可能出现什么问题吗?

马蒂亚斯·J·萨克斯

因为您使用kafka-avro-console-producer的密钥不是序列化为普通的,Long而是作为 Avro 类型的。因此,您需要使用与您在写入路径(即'{"type":"long"}"上使用的架构相同的相应 Avro Serde

此外,您的返回类型不会Long只是 Avro 类型。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章