spring kafka No type information in headers and no default type provided

mconner

I've got a spring boot app that defines:

  • a REST Controller that writes to a kafka topic, STREAM_TOPIC_IN_QQQ
  • a KafkaListener that reads from STREAM_TOPIC_IN_QQQ (groupId="bar") and logs
  • a KStream that peeks the topic and logs it, converts it to another type, then writes it to STREAM_TOPIC_OUT_QQQ
  • another KafkaListener that reads from STREAM_TOPIC_OUT_QQQ.

(I've been changing the suffix to avoid any possible confusion, and creating the topics by hand, because otherwise I was getting a warning, STREAM_TOPIC_IN_xxx=LEADER_NOT_AVAILABLE and the stream would not run for a minute or so.)

The first listener and the stream seem to be working, but when the listener on the STREAM_OUT_TOPIC tries to deserialize the message, I get the exception below. I am providing the serde in the stream with Produced.with. What do I need to do so that the listener knows the type to deserialize to?

Log

11 Mar 2019 14:34:00,194   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
11 Mar 2019 14:34:00,236   INFO     [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer]   Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367   ERROR    [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]

Here's the configuration:

REST (spring mvc):

@RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
    Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
    this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
    logger.debug("Sending a Kafka Message");
    return gr;
}

Kafka Config (spring-kafka):

@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
    KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
    stream.peek((k, greeting) -> {
        logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
    })
          .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
          .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
    return stream;
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
    logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}

application.yml

spring:
kafka:
  bootstrap-servers:  localhost:9092
  consumer:
    group-id: foo
    auto-offset-reset: latest
    key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    properties:
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    application-id: kafka9000-v0.1
    properties: # properties not explicitly handled by KafkaProperties.streams
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model
Gary Russell

See the documentation.

Specifically...

JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.

It's spring.json.value.default.type

You can also set spring.json.use.type.headers (default true) to prevent even looking for headers.

The deserializer automatically trusts the package of the default type so it's not necessary to add it there.

EDIT

However, also see Spring Messaging Message Conversion.

Use a BytesDeserializer and BytesJsonMessageConverter and the framework will pass the method parameter type as the target for conversion.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring Kafka Streams: Type Information Missing

Spring Kafka - deserialise Pojos without type information

Kafka spring boot get information from metadata provided by kafka

TypeError: Incorrect type for the 'headers' field on 'RequestInitializerDict': the provided value is not of type 'variant'

Send supertype as type information in Kafka JSON Serialization

Argparse TypeError for count-type argument when default value is provided

spring Kafka Ambiguous methods for payload type

Spring Data REST and custom entity lookup (Provided id of the wrong type)

How to configure Kafka type mapping using spring kafka?

which default pool type of ThreadPoolTaskExecutor in spring

Set default content type header of Spring RestTemplate

Spring Data MongoDB default type for inheritance

Kafka Connect API error when set default value of STRUCT type

Kafka Producer Error: ' Value serializer not specified and there is no default serializer defined for type ...'

Spring Webflux Webclient | Content type headers set issue

Using Spring Cloud Stream with Rabbitmq, with exchange type "headers"

Spring Kafka @SendTo Not Sending Headers

Kafka Spring Integration: Headers not coming for kafka consumer

Inferring type on interface using provided type mapping

Fail if type is not provided to builder, unless type is ()

Required type capture of ?, provided T

Typescript - optional type if generic is not provided

The given expression is always of the provided type

required type T, Provided Object

Provided id of the wrong type SpringBoot

How to specify class type when using spring kafka template?

Spring kafka 2.2 type mappings class loader mismatch

Kafka spring listener spring validation on headers

OCaml - preprocessing with type information