Spring @KafkaListener and concurrency

Sviatlana :

I am working with spring boot + spring @KafkaListener. And the behavior I expect is: my kafka listener reads messages in 10 threads. So that, if one of threads hangs, other messages are would continue reading and handling messages.

I defined bean of

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)
{

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setMissingTopicsFatal(false);
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    return factory;
}

And spring boot config:

spring.kafka.listener.concurrency=10

I see that all configs work, I see my 10 threads in jmx:

enter image description here

But then I make such test:

 @KafkaListener(topics = {
            "${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record)
    {
        if(record.getVersion() < 3) {
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        else
            System.out.println("It works!");

    }

If version is < 3, then hang, otherwise - work. I send 3 messages with version 1,2 and 3. I expect that messages with version 1 and 2 will hang, but version 3 will be processed at the time it comes to listener. But unfortunately message with version 3 waits for messages 1 and 2 before starts its processing.

Maybe my expectations are not true and this is a right behavior of kafka listener. Please help me to deal with kafka concurrency, why does it act like that?

Gary Russell :

Kafka doesn't work that way; you need at least as many partitions as consumers (controlled by concurrency in the spring container).

Also, only one consumer (in a group) can consume from a partition at a time so, even if you increase the partitions, records in the same partition behind the "stuck" consumer will not be received by other consumers.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

@KafkaListener concurrency multiple topics

Spring Kafka with Dynamic @KafkaListener

Problem with start of @KafkaListener (Spring)

Spring kafka @KafkaListener is not being invoked

Spring Kafka - How to Retry with @KafkaListener

spring kafka @KafkaListener listener container?

Testing a @KafkaListener using Spring Embedded Kafka

Spring Kafka: applying KafkaListenerErrorHandler to all KafkaListener

Pass a TOML configuration field to a Spring KafkaListener Annotation

spring kafka error handling without KafkaListener

Passing Acknowledgment to a spring KafkaListener consumer method

How to overload a KafkaListener method in Spring Boot

What is Spring KafkaListener reading strategy with severals topics?

Creating KafkaListener without Annotation & without Spring Boot

KafkaListener with long tasks in Spring Boot Service

spring boot KafkaListener with mulitple group ids

Spring Integration Concurrency of ServiceActivators

Spring StopWatch concurrency

Spring @Transactional and concurrency

Spring Security Concurrency Control

Spring Security session concurrency

Spring Cloud Stream - Concurrency

Spring Concurrency parallel request

Spring-Kafka Consumer KafkaListener cannot convert GenericMessage to Java Object

Spring Boot KafkaListener stops consuming messages after running some time

Spring-Kafka usage of ConcurrentKafkaListenerContainerFactory for more than One @Kafkalistener

Parallel processing and auto scaling in spring-kafka KafkaListener

"No transaction is in process" with Spring Kafka using ReplyingKafkaTemplate, @KafkaListener and @SendTo

Is Spring Kafka KafkaListener annotation able to take config property as topic name?