Conditional (content-based) routing on Kafka headers in Spring Cloud Stream

Oleg Efimov

I'm using Spring Cloud Stream 3.x in Spring Boot 2.x application to consume messages from a Kafka topic.

I want to have a listener that consumes messages conditionally on some custom header value, as per doc:

@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
  LOGGER.info("Received: {}", message);
}

However listener never gets notified, and if condition is removed I see the following in the log:

Received: ... SomeHeader: [B@1055e4af ...

It turns out that custom headers are left in Kafka byte array raw format, making them not eligible for condition evaluation.

Is some additional configuration needed or am I missing something?

Oleg Efimov

After some digging in sources and stackoveflow I have found the following:

So I added my custom header mapper bean (bean name is important, it allows to omit additional configuration property), which maps my custom header to String:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
    headerMapper.setRawMappedHeaders(Map.of(
        "SomeHeader", true
    ));
    return headerMapper;
}

That fixed the problem:

Received: ... SomeHeader: SomeHeaderValue ...

P.S. It seems like a bug in Spring Cloud Stream:

  1. It introduces its own implementation of header mapper (BinderHeaderMapper), but the latter doesn't respect conditional routing feature.
  2. Header mapper is subclassed in KafkaMessageChannelBinder, this added behaviour is non-obvious and will be lost if custom header mapper is provided.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring Cloud Stream content-based routing on payload

Spring Cloud Stream Kafka add headers to only one binding

Does Spring Cloud Stream Kafka supports embedded headers?

Kafka - spring cloud stream

Understanding Spring Cloud Stream Kafka and Spring Retry

Spring Cloud stream - Routing to multiple destinations streamBridge

How to remove the content type header from kafka messages produced by the spring cloud stream app starter sources

Spring cloud stream (Kafka) autoCreateTopics not working

Spring Cloud Stream Kafka Error channel

Spring Cloud Stream embedded header format (Kafka)

Spring Cloud Stream Kafka consumer patterns

Multiple @EnableBinding with Kafka Spring Cloud Stream

Spring Cloud Stream Kafka - Method must be Declarative

spring cloud stream kafka binding config

Auto commit in kafka with spring cloud stream

Error Handling in Spring Cloud Stream - Kafka Binder

Kafka Sticky Partitioner using spring cloud stream

Multiple StreamListeners with Spring Cloud Stream connected to Kafka

Correctly manage DLQ in Spring Cloud Stream Kafka

Spring cloud stream - Kafka binder performance

Spring cloud stream and consume multiple kafka topics

Spring Cloud stream compatibility matrix with kafka

Kafka client id property with spring cloud stream

spring-cloud-stream kafka avro

jackson cannot deserialize (spring cloud stream kafka)

Using Kstream with spring cloud stream and kafka 1.0.1

Asynchronously write to spring cloud kafka stream

Pause Spring Cloud Stream Kafka binder programmatically

Migrate Kafka Streams code to Spring Cloud Stream?