Quarkus Kafka concurrency

Vijay Veeraraghavan

I use quarkus to process incoming messages from Kafka topic that has 4 partitions. While I ingest multiple messages to this topic I see the quarkus-kafka-consumer-code process the message one by one sequentially.

Below is my logic that process the incoming message. Having the topic with 4 partitions should the processEvent method handle 4 messages in parallel as I run only one instance of this consumer-app. Am I missing anything?

@Incoming("topic-1")
@Retry(delay = 3, maxRetries = 3, retryOn = { MyException.class })
@NonBlocking
public CompletionStage<Void> processEvent(KafkaRecordBatch<String, String> payload) {
    Multi.createFrom().iterable(payload.getRecords())
            .onItem().transform(EventProcessor1::newLogEvent).filter(l -> !l.isEmptyEvent())
            .onItem()
            .transform(this::processEvent).onItem().transformToUniAndConcatenate(this::process2).collect().with(Collectors.counting())
            .subscribe().with(c -> {
                log.info("completed {} events", c);
            });

   return payload.ack().whenCompleteAsync((s, e) -> {
        if (e != null) {
            throw new RuntimeException(e);
        }
        log.debug("batch processed & comitted successfully");
    });
            });
} 
Ladicek

You can configure the number of partitions that should be consumed concurrently. The configuration looks like this:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.xxx=xxx
mp.messaging.incoming.my-channel.partitions=4

The partitions configuration property defaults to 1, which is what you see.

See https://smallrye.io/smallrye-reactive-messaging/3.18.0/kafka/receiving-kafka-records/#configuration-reference for details.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Kafka Streaming Concurrency?

Kafka streams concurrency behavior

How to disable Kafka in Quarkus test?

Quarkus Kafka Docker Network Property

Spring-Kafka Concurrency Property

How can you use TLS for Kafka in Quarkus?

Is there any function in Quarkus to send message to Kafka

Quarkus Kafka: header propagation doesn't work

Increase stream threads for Quarkus Kafka Streams

Spring Kafka concurrency with spring-integration

Disable Kafka and Micrometer in Apache Camel tests using Quarkus?

Quarkus - Responding with status based on kafka write ack and nack

Quarkus/Smallrye reactive kafka - Endpoint success/failure response from Message

Quarkus Kafka kafka-quickstart QuoteProcessorTest.java returns No resolvable bootstrap urls given in bootstrap.servers

spring kafka thorws InstanceAlreadyExistsException exception after setting concurrency > 1

how to set kafka consumer concurrency using spring boot

how does kafka batch listener concurrency work with multiple topics

Kafka listener concurrency threads taking time to start in parallel?

How to execute JPA Entity manager operations inside Quarkus-Kafka consumer method

How to pause a specific kafka consumer thread when concurrency is set to more than 1?

spring integration kafka listener thread reads multiple partitions when concurrency = partition count

Quarkus with ActiveMQ?

Camel Quarkus does not seem to be using the Quarkus objectmapper

Quarkus reactive vs quarkus imperative does it matter?

Concurrency in Express

Concurrency in Netty

Concurrency in JMeter

Concurrency with HikariCP

Concurrency on DocumentDB