Spring-Kafka Concurrency Property

Akhil Baby :

I am progressing on writing my first Kafka Consumer by using Spring-Kafka. Had a look at the different options provided by framework, and have few doubts on the same. Can someone please clarify below if you have already worked on it.

Question - 1 : As per Spring-Kafka documentation, there are 2 ways to implement Kafka-Consumer; "You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation". Can someone tell when should I choose one option over another ?

Question - 2 : I have chosen KafkaListener approach for writing my application. For this I need to initialize a container factory instance and inside container factory there is option to control concurrency. Just want to double check if my understanding about concurrency is correct or not.

Suppose, I have a topic name MyTopic which has 4 partitions in it. And to consume messages from MyTopic, I've started 2 instances of my application and these instances are started by setting concurrency as 2. So, Ideally as per kafka assignment strategy, 2 partitions should go to consumer1 and 2 other partitions should go to consumer2. Since the concurrency is set as 2, does each of the consumer will start 2 threads, and will consume data from the topics in parallel ? Also should we consider anything if we are consuming in parallel.

Question 3 - I have chosen manual ack mode, and not managing the offsets externally (not persisting it to any database/filesystem). So should I need to write custom code to handle rebalance, or framework will manage it automatically ? I think no as I am acknowledging only after processing all the records.

Question - 4 : Also, with Manual ACK mode, which Listener will give more performance? BATCH Message Listener or normal Message Listener. I guess if I use Normal Message listener, the offsets will be committed after processing each of the messages.

Pasted the code below for your reference.

Batch Acknowledgement Consumer:

    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
          Consumer<?, ?> consumer) {
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("Record : " + record.value());
          // Process the message here..
          listener.addOffset(record.topic(), record.partition(), record.offset());
       }
       acknowledgment.acknowledge();
    }

Initialising container factory:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> configs = new HashMap<String, Object>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
    configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return configs;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    // Not sure about the impact of this property, so going with 1
    factory.setConcurrency(2);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
    return factory;
}
Gary Russell :
  1. @KafkaListener is a message-driven "POJO" it adds stuff like payload conversion, argument matching, etc. If you implement MessageListener you can only get the raw ConsumerRecord from Kafka. See @KafkaListener Annotation.

  2. Yes, the concurrency represents the number of threads; each thread creates a Consumer; they run in parallel; in your example, each would get 2 partitions.

Also should we consider anything if we are consuming in parallel.

Your listener must be thread-safe (no shared state or any such state needs to be protected by locks.

  1. It's not clear what you mean by "handle rebalance events". When a rebalance occurs, the framework will commit any pending offsets.

  2. It doesn't make a difference; message listener Vs. batch listener is just a preference. Even with a message listener, with MANUAL ackmode, the offsets are committed when all the results from the poll have been processed. With MANUAL_IMMEDIATE mode, the offsets are committed one-by-one.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring Kafka concurrency with spring-integration

Setting AuthExceptionRetryInterval property in Spring Kafka

Spring Kafka Property for setting SeekToCurrentBatchErrorHandler

spring kafka thorws InstanceAlreadyExistsException exception after setting concurrency > 1

how to set kafka consumer concurrency using spring boot

I don't see the concurrency property in Spring docs

Kafka client id property with spring cloud stream

Kafka Streaming Concurrency?

Quarkus Kafka concurrency

Kafka streams concurrency behavior

Spring Cloud Stream: spring.cloud.stream.default.consumer.concurrency property exactly meaning

spring integration kafka listener thread reads multiple partitions when concurrency = partition count

Is Spring Kafka KafkaListener annotation able to take config property as topic name?

what will happen if stopImmediate listener container property set to true in spring kafka?

How do we define kafka request.timeout.ms property in spring kafka properties file

Spring @KafkaListener and concurrency

Spring Integration Concurrency of ServiceActivators

Spring StopWatch concurrency

Spring @Transactional and concurrency

Spring Security Concurrency Control

Spring Security session concurrency

Spring Cloud Stream - Concurrency

Spring Concurrency parallel request

Spring Kafka and Kafka Streams

Spring rabbitmq task queue concurrency

Spring boot application and concurrency issues

Spring: Singleton/session scopes and concurrency

Spring integration concurrency - detecting completion

How to apply spring.cloud.stream.kafka.bindings configuration property to all consumers