Avoid __TypeId__ and spring_json_header_types magic headers set by Spring Kafka

dmitryb

I'm trying to configure Spring Kafka, so that events sent through KafkaTemplate don't include magic __TypeId__ header. From what I read in the documentation and online, the JsonSerializer.ADD_TYPE_INFO_HEADERS should give me the desired result. But I'm not using Spring's JsonSerializer, instead I'm using org.apache.kafka.common.serialization.ByteArraySerializer in combination with ByteArrayJsonMessageConverter. So, I'm confused, where that header is being set in my case and how to disable it?

Below is my configuration in code:

@TestConfiguration
public static class TestKafkaProducerConfig {

    @Bean
    public KafkaTemplate<String, MyEvent> kafkaTemplate(
            @Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {

        var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
        template.setDefaultTopic(topicName);

        var headerMapper = new SimpleKafkaHeaderMapper();
        headerMapper.setMapAllStringsOut(true);

        var messageConverter = new ByteArrayJsonMessageConverter();
        messageConverter.setHeaderMapper(headerMapper);

        template.setMessageConverter(messageConverter);
        return template;
    }

    @Bean
    public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
        return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
    }

    public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        return props;
    }
}

If I use JsonSerializer, then the JsonSerializer.ADD_TYPE_INFO_HEADERS setting is respected, and I don't get the __TypeId__ header set. But, then I can't use my customised MessageConverter with SimpleKafkaHeaderMapper and mapAllStringsOut which prevents another magic header spring_json_header_types.

My overall goal is to avoid Spring Kafka setting any magic headers, like __TypeId__ and spring_json_header_types.

dmitryb

As was explained by @artem-bilan, it's not that easy to get rid of the __TypeId__ when using ByteArraySerializer and ByteArrayJsonMessageConverter. So, I used JsonSerializer and MessagingMessageConverter instead. Below is my configuration which produces Kafka events without any magic headers.

@TestConfiguration
public static class TestKafkaProducerConfig {

    @Bean
    public KafkaTemplate<String, MyEvent> kafkaTemplate(
            @Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {

        var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
        template.setDefaultTopic(topicName);

        // Use SimpleKafkaHeaderMapper which does not add the json types header, `spring_json_header_types`.
        // The mapAllStringsOut when set to true, all string-valued headers will be converted to byte[] using
        // the charset property (default UTF-8).
        var headerMapper = new SimpleKafkaHeaderMapper();
        headerMapper.setMapAllStringsOut(true);

        var messageConverter = new MessagingMessageConverter();
        messageConverter.setHeaderMapper(headerMapper);

        template.setMessageConverter(messageConverter);
        return template;
    }

    @Bean
    public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
        return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
    }

    public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        return props;
    }
}

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring Kafka and Kafka Streams

Spring Webflux Webclient | Content type headers set issue

Spring Kafka, Spring Cloud Stream, and Avro compatibility Unknown magic byte

Spring Conversion Magic

How do you set cache headers in Spring MVC?

Set static headers, avoid allocation?

How to set timeout for onFailure event (Spring, Kafka)?

Validate accept Headers in spring

Spring Kafka Error Handler how to avoid endless loop

Spring Kafka @SendTo Not Sending Headers

Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)

Spring MockMvc and @AutoConfigureMockMvc() set headers in request

Spring @RequestMapping headers OR instead of AND?

Does Spring Cloud Stream Kafka supports embedded headers?

Kafka Spring Integration: Headers not coming for kafka consumer

Spring Kafka - Encountering "Magic v0 does not support record headers" error

Spring Boot CORS headers

spring kafka No type information in headers and no default type provided

How to set multiple headers at once in Spring WebClient?

Spring Security Headers included, still getting 'Header Not Set' vulnerability warning

Spring Cloud Stream Kafka add headers to only one binding

How to avoid copying headers from input to output in a Spring Integration @ServiceActivator method

Mask Headers in spring restdocs

Conditional (content-based) routing on Kafka headers in Spring Cloud Stream

Spring kafka - kafka streams topology set up

Kafka spring listener spring validation on headers

Problem reading kafka headers in a RecordInterceptor after upgrading to spring-kafka 2.8

Prevent __TypeId__ to be used in Spring Cloud Stream

How to set ID header in Spring Integration Kafka Message?