kafka 中的反序列化异常

图沙尔

使用 kafka-streams 时出现以下错误。

[Kafka Stream] 10:09:26.442 ERROR --- o.a.k.s.e.LogAndFailExceptionHandler: Exception caught     
during Deserialization, taskId: 0_0, topic: t.commodity.promotion, partition: 0, offset: 0
                                    
java.lang.IllegalArgumentException: The class'com.course.kafka.kafkaorder.Broker.Message.PromotionMessage'       
is not in the trusted packages: [java.util, java.lang, com.course.stream.broker.message, 
com.course.stream.broker.message.*]. If you believe this class is safe to deserialize,      
please provide its name. If the serialization is only done by a trusted source, you can also enable        
trust all (*).          

[Kafka Stream] 10:09:26.444 ERROR --- o.a.k.s.p.internals.StreamThread: stream-thread [kafka-stream-7972        
450a-443b-8b7b-007e9fdf8e4c-StreamThread-1] Encountered the following exception during 
processing and the thread is going to shut down: 
                
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to        
fail upon a deserialization error. If you would rather have the streaming pipeline continue after a         
deserialization error, please set the default.deserialization.exception.handler appropriately.       
   

我的代码是

@Configuration
public class PromotionJsonSerde {
    
    @Bean
    public KStream<String, PromotionMessage> kStreamPromotionUppercase(StreamsBuilder builder)
    {
        var stringSerde = Serdes.String();
        var jsonSerde = new JsonSerde<>(PromotionMessage.class);

      KStream<String, PromotionMessage> sourceStream = builder.stream("t.commodity.promotion", 
                                Consumed.with(stringSerde, jsonSerde));     
      KStream<String, PromotionMessage> uppercaseStream = sourceStream.mapValues(this::uppercasePromotionCode);

      uppercaseStream.to("t.commodity.promotion-uppercase", Produced.with(stringSerde, jsonSerde));

      return sourceStream;

    }

    private PromotionMessage uppercasePromotionCode(PromotionMessage message)
    {
        return new PromotionMessage(message.getPromotionCode().toUpperCase());
    }
}

   

促销代码

public class PromotionMessage {
    
    private String promotionCode;
     // getters,setters, tostring
}          
   

应用程序.yml

logging:
  pattern:
    console: "[Kafka Stream] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"
  
spring:
  main:
    banner-mode: OFF
  kafka:
    listener:
      missing-topics-fatal: false
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

通过查看错误,我尝试在 application.yml 文件中添加以下代码。但我仍然遇到
同样的错误。

spring:
     kafka:    
       producer:
         streams:
          properties:
            default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler,org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler
        consumer:
          properties:
                spring:
                    json:
                      trusted:
                        packages: "*"
      cloud:
        stream:
          kafka:
            streams:
              bindings:
               process-in-0.consumer:
                deserializationExceptionHandler: logAndContinue       
        

为什么我收到反序列化错误?
如果我检查终端上的消费者,我会收到生产者发送的消息。
谁能帮我 ?

加里·拉塞尔

我看到您正在自己创建 serde new JsonSerde<>(PromotionMessage.class);- 我们会自动将该类的包添加到受信任的包中;因此

trusted packages: [java.util, java.lang, com.course.stream.broker.message, com.course.stream.broker.message.*]

创建自己的 serde 时,该属性将被忽略。解串器正在尝试创建com.course.kafka.kafkaorder.Broker.Message.PromotionMessage不同包中的内容;很可能是生产者的不同类。

添加以下内容:((JsonDeserializer) jsonSerde.deserializer()).setUseTypeHeaders(false);告诉反序列化器忽略标头中的类型信息并改用提供的回退类型。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

反序列化异常后继续使用反应堆kafka中的后续记录

Kafka Avro序列化器和反序列化器异常。Avro支持的类型

Kafka-在Consumer中反序列化对象

使用springboot在KafkaConsumer中反序列化kafka消息

Kafka:民意调查中的反序列化问题

为什么在 Apache Kafka 中序列化/反序列化密钥和记录?

Kafka Streams:POJO序列化/反序列化

Flink反序列化Kafka JSON

Kafka流GlobalKTable在逻辑删除上引发反序列化异常-空值-记录

Kafka流GlobalKTable在逻辑删除上引发反序列化异常-空值-记录

Kafka流GlobalKTable在逻辑删除上引发反序列化异常-空值-记录

Kafka流GlobalKTable在逻辑删除上引发反序列化异常-空值-记录

Kafka流异常:org.apache.kafka.streams.errors.StreamsException-反序列化异常处理程序

如何使用 C# 反序列化 Kafka 中的 Avro 消息

如何在Kafka-Spring中捕获反序列化错误?

如何使用Avro克服Spring Kafka 1.3中无法反序列化的消息

NPE同时反序列化Kafka流中的Avro消息

Kafka json反序列化器中的Scala classOf泛型类型

在使用者中反序列化加密的kafka消息

接口的Kafka JSON反序列化器

从Kafka使用者反序列化Java对象

Spring Kafka自定义反序列化器

Spring Kafka无法反序列化JSON

无法使用kafka和springboot反序列化数据

从Kafka反序列化Java对象时出错

如何从Kafka用Python解码/反序列化Avro

从 Kafka Streams 反序列化对象时出错

如何解决Kafka Avro反序列化问题

Java Kafka对象序列化器和反序列化器