I'm trying to configure Spring Kafka, so that events sent through KafkaTemplate
don't include magic __TypeId__
header. From what I read in the documentation and online, the JsonSerializer.ADD_TYPE_INFO_HEADERS
should give me the desired result. But I'm not using Spring's JsonSerializer
, instead I'm using org.apache.kafka.common.serialization.ByteArraySerializer
in combination with ByteArrayJsonMessageConverter
. So, I'm confused, where that header is being set in my case and how to disable it?
Below is my configuration in code:
@TestConfiguration
public static class TestKafkaProducerConfig {
@Bean
public KafkaTemplate<String, MyEvent> kafkaTemplate(
@Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {
var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
template.setDefaultTopic(topicName);
var headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setMapAllStringsOut(true);
var messageConverter = new ByteArrayJsonMessageConverter();
messageConverter.setHeaderMapper(headerMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
}
public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
}
If I use JsonSerializer
, then the JsonSerializer.ADD_TYPE_INFO_HEADERS
setting is respected, and I don't get the __TypeId__
header set. But, then I can't use my customised MessageConverter
with SimpleKafkaHeaderMapper
and mapAllStringsOut
which prevents another magic header spring_json_header_types
.
My overall goal is to avoid Spring Kafka setting any magic headers, like __TypeId__
and spring_json_header_types
.
As was explained by @artem-bilan, it's not that easy to get rid of the __TypeId__
when using ByteArraySerializer
and ByteArrayJsonMessageConverter
. So, I used JsonSerializer
and MessagingMessageConverter
instead. Below is my configuration which produces Kafka events without any magic headers.
@TestConfiguration
public static class TestKafkaProducerConfig {
@Bean
public KafkaTemplate<String, MyEvent> kafkaTemplate(
@Value("${kafka.consumer.my-topic-name}") String topicName, EmbeddedKafkaBroker embeddedKafka) {
var template = new KafkaTemplate<>(producerFactory(embeddedKafka));
template.setDefaultTopic(topicName);
// Use SimpleKafkaHeaderMapper which does not add the json types header, `spring_json_header_types`.
// The mapAllStringsOut when set to true, all string-valued headers will be converted to byte[] using
// the charset property (default UTF-8).
var headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setMapAllStringsOut(true);
var messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(headerMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public ProducerFactory<String, MyEvent> producerFactory(EmbeddedKafkaBroker embeddedKafka) {
return new DefaultKafkaProducerFactory<>(producerConfig(embeddedKafka));
}
public Map<String, Object> producerConfig(EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
}
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments