how to set kafka consumer concurrency using spring boot

user3842182 :

I am writing a Java based Kafka Consumer application. I am utilizing kafka-clients, Spring Kafka and Spring boot for my application. While Spring boot lets me easily write Kafka Consumers (without really writing the ConcurrentKafkaListenerContainerFactory, ConsumerFactory etc), I want to be able to define / customize some of the properties for these consumers. However, I could not find out an easy way to do it using Spring boot. For eg: some of the properties that I would be interested in setting up are -

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

I took a look at the Spring Boot pre-defined properties here.

Also, based on an earlier question here, I want to setup the concurrency on the consumers, but cannot find a configuration, application.properties driven way to do that using Spring Boot.

An obvious way is to define the ConcurrentKafkaListenerContainerFactory, ConsumerFactory classes again in my Spring Context and work from there. I wanted to understand if there is a cleaner way of doing that, especially since I am using Spring Boot.

Versions-

  • kafka-clients - 0.10.0.0-SASL
  • spring-kafka - 1.1.0.RELEASE
  • spring boot - 1.5.10.RELEASE
Gary Russell :

At the URL you cited, scroll down to

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

spring-kafka - 1.1.0.RELEASE

I recommend upgrading to at least 1.3.5; it has a much simpler threading model, thanks to KIP-62.

EDIT

With Boot 2.0, you can set arbitrary producer, consumer, admin, common properties, as described in the boot documentation.

spring.kafka.consumer.properties.heartbeat.interval.ms

With Boot 1.5 there is only spring.kafka.properties as described here.

This sets the properties for both producers and consumers, but you may see some noise in the log about unused/unsupported properties for the producer.

Alternatively, you can simply override Boot's consumer factory and add properties as needed...

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
    Map<String, Object> consumerProps = properties.buildConsumerProperties();
    consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
    return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring boot Kafka consumer

How to load Kafka Consumer lazily in Spring boot?

How to read 2 Kafka topics sequentially in one consumer class using spring boot?

How to run bin/kafka-console-consumer.sh in Spring boot

How multiple consumer can listen to multiple topic in spring boot Kafka?

How to get spring boot default Kafka consumer object

How to pause a specific kafka consumer thread when concurrency is set to more than 1?

How to set Kafka offset for consumer?

Spring Boot Kafka message consumer and lost message

Spring Boot Kafka Listener vs Consumer

Failed to construct kafka consumer with Spring Boot

Spring Boot Kafka consumer lags and reads wrong

How to set amqp RabbitMQ consumer tag in Spring Boot?

Trigger one Kafka consumer by using values of another consumer In Spring Kafka

How to get consumer-id using a @KafkaListener spring boot 2 consumer

Spring Kafka Without spring boot consumer not consuming messages

How to make Kafka consumer reads from specific topic partition Spring Boot

How to set Principal in Kafka console producer/consumer?

How to set consumer.id for new Kafka consumer

Spring boot Kafka doesn't work - consumer not receiving messages

Spring Boot Kafka Consumer throwing No bean named 'kafkaListenerContainerFactory' available

Spring boot kafka duplicated properties in producer, consumer, admin

Spring Boot/Jhipster Integration tests build fail for Kafka consumer

Spring Boot Kafka: Unable to start consumer due to NoSuchBeanDefinitionException

Multi thread transactional Kafka producer and consumer with Spring Boot

How to list Kafka consumer group using python

How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?

How to create a Spring Boot consumer for an ActiveMQ queue?

Spring Kafka Consumer Retry