使用Avro将spring-kafka序列化/反序列化org.springframework.messaging.Message对象

阴影88:

我想通过在对象上使用Avro serdes来发送和接收spring消息对象。我使用spring.framework.messaging.message是因为我想传递标题并对其进行修改。我的消息类型是事务。

这是我的生产者配置

@Bean
public ProducerFactory<String, Transaction> transactionProducerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

    return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, Transaction> transactionKafkaTemplate() {
    return new KafkaTemplate<>(transactionProducerFactory());
}

这是我的消费者配置

@Bean
public ConsumerFactory<String, Transaction> transactionConsumerFactory() {

    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "something");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new AvroDeserializer(Message.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Transaction> transactionListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(transactionConsumerFactory());
    factory.getContainerProperties().setAckOnError(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    factory.setErrorHandler(new CustomErrorHandler());
    return factory;
}

我能够用来发送邮件KafkaTemplate<String, Transaction> transactionsProducer;这是我送它

Transaction transaction = new Transaction();
        transaction.setAmount(100.00F);
        transaction.setTxnId("134567776433577");
        transaction.setTimestamp("2020-03-24-18:30:00.000");
Message<Transaction> message = MessageBuilder.withPayload(transaction).
setHeader("messageId", "m-asd-12435435").
setHeader(KafkaHeaders.TOPIC, "topic-1").
build();

kafkaTemplate.send(message);

但是然后在KafkaListener中出现此错误

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data '[30, 49, 51, 52, 53, 54, 55, 55, 55, 54, 52, 51, 51, 53, 55, 55, -128, -6, -14, -14, -1, 1, 0, 0, -56, 66, -128, -102, -79, -12, -108, 3, 46, 50, 48, 50, 48, 45, 48, 51, 45, 50, 52, 45, 49, 56, 58, 51, 48, 58, 48, 48, 46, 48, 48, 48]' from topic 'transaction-request-upstream'
Caused by: java.lang.InstantiationException: org.springframework.messaging.Message
    at java.lang.Class.newInstance(Class.java:427) ~[na:1.8.0_191]
    at com.ibm.dip.evaluator.configuration.AvroDeserializer.deserialize(AvroDeserializer.java:46) ~[classes/:na]
    at com.ibm.dip.evaluator.configuration.AvroDeserializer.deserialize(AvroDeserializer.java:17) ~[classes/:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1034) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:990) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.NoSuchMethodException: org.springframework.messaging.Message.<init>()
    at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_191]
    at java.lang.Class.newInstance(Class.java:412) ~[na:1.8.0_191]
    ... 18 common frames omitted

请帮忙。

加里·罗素:

我在consumerFactory中提供了这个return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new AvroDeserializer(Message.class));吗?

是的,应该Transformer.class,不是Message.class

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

从Kafka使用者反序列化Java对象

Spring Batch无法序列化以下类型的对象:类org.springframework.batch.core.StepExecution

如何使用反序列化的对象?

反序列化avro对象时出现Spring MessageConversionException

使用xsd.exe反序列化-如何反序列化对象而不是DataSet?

使用Jackson将反序列化JSON数组反序列化为单个Java对象

如何使用Jackson将反序列化的JSON反序列化为忽略键的对象?

将JSON反序列化为C#对象-不反序列化任何数据

Java Spring使用RestTemplate反序列化嵌套对象

Spring使用JSONObject字段反序列化对象

使用无参数构造函数对对象进行序列化和反序列化

使用json和python序列化/反序列化对象列表

使用writeValueAsString序列化的对象后杰克逊反序列化失败

如何使用JAXB序列化和反序列化对象?

如何使用Jackson序列化和反序列化对象列表

使用JSON序列化程序反序列化Mongo DB对象ID

使用C#序列化Java可反序列化的对象

使用 JQuery 序列化和反序列化自定义对象数组

如何使用GSON序列化/反序列化部分JSON对象?

如何使用对象作为json的键序列化/反序列化ruby哈希/结构

使用json.net对对象属性进行条件序列化/反序列化

如何使用可序列化对象数组反序列化类

如何使用 ByteArray 进行对象序列化和反序列化

ServiceStack使用对象对字典进行序列化和反序列化

如何使用XStream序列化/反序列化类型层次结构中的对象?

使用原型对象进行序列化/反序列化

在 sql server 中序列化使用 OpenJson() 反序列化的对象

使用PHP中的类型将嵌套对象序列化/反序列化为JSON

如何使用Jackson将Java Enums序列化和反序列化为JSON对象