I'm using Spring Cloud Stream 3.x in Spring Boot 2.x application to consume messages from a Kafka topic.
I want to have a listener that consumes messages conditionally on some custom header value, as per doc:
@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
LOGGER.info("Received: {}", message);
}
However listener never gets notified, and if condition is removed I see the following in the log:
Received: ... SomeHeader: [B@1055e4af ...
It turns out that custom headers are left in Kafka byte array raw format, making them not eligible for condition evaluation.
Is some additional configuration needed or am I missing something?
After some digging in sources and stackoveflow I have found the following:
So I added my custom header mapper bean (bean name is important, it allows to omit additional configuration property), which maps my custom header to String:
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setRawMappedHeaders(Map.of(
"SomeHeader", true
));
return headerMapper;
}
That fixed the problem:
Received: ... SomeHeader: SomeHeaderValue ...
P.S. It seems like a bug in Spring Cloud Stream:
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments