spring integration kafka listener thread reads multiple partitions when concurrency = partition count

Chad Showalter

I set up a Spring Integration flow to process a topic having 3 partitions and set the listener container's concurrency to 3. As expected, I see three threads processing batches from all 3 partitions. However, I see that in some cases, one of the listener threads may process a single batch containing messages from multiple partitions. My data is partitioned in kafka by an id so that it may be processed concurrently with other ids, but not with the same ids on another thread (which is what I was surprised to observe is happening). I thought from reading the docs that each thread would be assigned a partition. I'm using a KafkaMessageDrivenChannelAdapter like this:

private static final Class<List<MyEvent>> payloadClass = (Class<List<MyEvent>>)(Class) List.class;

public KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<String, MyEvent> myChannelAdapterSpec() {
        return Kafka.messageDrivenChannelAdapter(tstatEventConsumerFactory(),
                KafkaMessageDrivenChannelAdapter.ListenerMode.batch, "my-topic") //3 partitions
                .configureListenerContainer(c -> {
                    c.ackMode(ContainerProperties.AckMode.BATCH);
                    c.id(_ID);
                    c.concurrency(3);
                    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
                            (record, exception) -> log.error("failed to handle record at offset {}: {}",
                                    record.offset(), record.value(), exception),
                            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
                    );
                    c.errorHandler(errorHandler);
                });
    }
@Bean
public IntegrationFlow myIntegrationFlow() {
        return IntegrationFlows.from(myChannelAdapterSpec())
                .handle(payloadClass, (payload, headers) -> {
                    service.performSink(payload);
                    return null;
                })
                .get();
    }

How do I set this up so that each listener container thread only processes messages from one partition?

Artem Bilan

But is there additionally a way that I can keep from ever getting a batch with messages from multiple partitions, even if a rebalance does occur?

That's not how consumer group works. If you would like to have a "sticky" consumers, then consider to use a manual assignment. See the channel adapter factory based on the TopicPartitionOffset... topicPartitions:

/**
 * Create an initial
 * {@link KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec}.
 * @param consumerFactory the {@link ConsumerFactory}.
 * @param listenerMode the {@link KafkaMessageDrivenChannelAdapter.ListenerMode}.
 * @param topicPartitions the {@link TopicPartitionOffset} vararg.
 * @param <K> the Kafka message key type.
 * @param <V> the Kafka message value type.
 * @return the KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec.
 */
public static <K, V>
KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<K, V> messageDrivenChannelAdapter(
        ConsumerFactory<K, V> consumerFactory,
        KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode,
        TopicPartitionOffset... topicPartitions) {

Then it is not going to be treated as consumer group and you have to create several channel adapters pointing each to its specific partition. All of this channel adapters may emit messages to the same MessageChannel.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Spring Kafka concurrency with spring-integration

Spring integration Kafka MessaheChannel Thread?

spring Kafka integration test listener not working

Spring Integration Kafka Consumer Listener not Receiving messages

How to Create Integration Test for Spring Kafka Listener

Spring Kafka and poll strategy on multiple partitions

how does kafka batch listener concurrency work with multiple topics

Count integer partitions with k parts when partition elements given

Spring Integration Concurrency of ServiceActivators

Thread Sleep in the Kafka Listener

Spark + Kafka integration - mapping of Kafka partitions to RDD partitions

How to make Kafka consumer reads from specific topic partition Spring Boot

Generic Spring Kafka Listener

How to pass multiple bootstrap servers for listener using spring-kafka

Spring integration concurrency - detecting completion

Consuming kafka batch for multiple partitions

Kafka multiple consumers for a partition

Relation between Eventhub topic partition count and spring.cloud.stream.bindings.input.consumer.concurrency

Spring Kafka Integration for Kafka 0.9

Spring integration properties for Kafka

Spring Integration Kafka Consumer

Error when creating a topic with partition choosed on Spring Kafka

Spring kafka integration properties for batch listener using application.yml/properties

spring integration kafka - pause/seek consumer from my listener for a small time

ConcurrentMessageListenerContainer is reducing concurrency count while partition exist

Spring-Kafka Concurrency Property

kafka get partition count for a topic

scanEachPoll in FileReadingMessageSource in Spring Integration Listener

Spring Integration Kafka slow message processing when using KafkaNativeOffsetManager