Spring for Kafka 2.3 setting an offset during runtime for specific listener with KafkaMessageListenerContainer

Jake Klimes :

I have to implement a functionality to (re-)set a listener for a certain topic/partition to any given offset. So if events are commited to the offset 5 and the admin decides to reset the offset to 2 then the event 3, 4 and 5 should be reprocessed.

We are using Spring for Kafka 2.3 and I was trying to follow the documentation on ConsumerSeekAware which seems to be exactly what I am looking for.

The problem however is that we are using topics that are created on runtime as well. We use a KafkaMessageListenerContainer through a DefaultKafkaConsumerFactory for that purpose and I don't know where to put the registerSeekCallback or something alike.

Is there any way to achieve this? I have problems understanding how the class using the @KafkaListener annotations maps to the way how listeners are created in the factory.

Any help would be appreciated. Even if it is only an explanation on how these things work together.

This is how the KafkaMessageListenerContainer are basically created:

public KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(String topicName,
        ContainerPropertiesStrategy containerPropertiesStrategy) {
    MessageListener<String, String> messageListener = getMessageListener(topicName);

    ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerFactoryConfiguration());

    KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = createKafkaMessageListenerContainer(topicName, messageListener, bootstrapServers, containerPropertiesStrategy, consumerFactory);
    return kafkaMessageListenerContainer;
}

public MessageListener<String, String> getMessageListener(String topic) {
    MessageListener<String, String> messageListener = new MessageListener<String, String>() {

        @Override
        public void onMessage(ConsumerRecord<String, String> message) {
            try {
                consumerService.consume(topic, message.value());
            } catch (IOException e) {
                log.log(Level.WARNING, "Message couldn't be consumed", e);
            }
        }
    };
    return messageListener;
}

public static KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(
  String topicName, MessageListener<String, String> messageListener, String bootstrapServers, ContainerPropertiesStrategy containerPropertiesStrategy,
  ConsumerFactory<String, Object> consumerFactory) {
ContainerProperties containerProperties = containerPropertiesStrategy.getContainerPropertiesForTopic(topicName);
containerProperties.setMessageListener(messageListener);

KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(
    consumerFactory, containerProperties);
kafkaMessageListenerContainer.setBeanName(topicName);
return kafkaMessageListenerContainer;
}

Hope that helps.

Gary Russell :

The key component is the AbstractConsumerSeekAware. Hopefully this will provide enough to get you started...

@SpringBootApplication
public class So59682801Application {

    public static void main(String[] args) {
        SpringApplication.run(So59682801Application.class, args).close();
    }


    @Bean
    public ApplicationRunner runner(ListenerCreator creator,
            KafkaTemplate<String, String> template, GenericApplicationContext context) {

        return args -> {
            System.out.println("Hit enter to create a listener");
            System.in.read();

            ConcurrentMessageListenerContainer<String, String> container =
                    creator.createContainer("so59682801group", "so59682801");

            // register the container as a bean so that all the "...Aware" interfaces are satisfied
            context.registerBean("so59682801", ConcurrentMessageListenerContainer.class, () -> container);
            context.getBean("so59682801", ConcurrentMessageListenerContainer.class); // re-fetch to initialize

            container.start();

            // send some messages
            IntStream.range(0, 10).forEach(i -> template.send("so59682801", "test" + i));

            System.out.println("Hit enter to reseek");
            System.in.read();
            ((MyListener) container.getContainerProperties().getMessageListener())
                .reseek(new TopicPartition("so59682801", 0), 5L);

            System.out.println("Hit enter to exit");
            System.in.read();
        };
    }

}

@Component
class ListenerCreator {

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    ListenerCreator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        factory.getContainerProperties().setIdleEventInterval(5000L);
        this.factory = factory;
    }

    ConcurrentMessageListenerContainer<String, String> createContainer(String groupId, String... topics) {
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topics);
        container.getContainerProperties().setGroupId(groupId);
        container.getContainerProperties().setMessageListener(new MyListener());
        return container;
    }

}

class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        System.out.println(data);
    }

    public void reseek(TopicPartition partition, long offset) {
        getSeekCallbackFor(partition).seek(partition.topic(), partition.partition(), offset);
    }

}

Calling reseek() on the listener queues the seek for the consumer thread when it wakes from the poll() (actually before the next one).

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

Question on Spring Kafka Listener Consumer Offset Acknowledgement

Spring kafka consumer, seek offset at runtime?

Spring Kafka - Subscribe new topics during runtime

Setting offset listener to CollapsingToolbarLayout

How to acknowledge specific offset on container.stop in Spring Kafka?

Generic Spring Kafka Listener

setting value in string during the runtime

Setting next LaunchScreen during runtime

How ro resolve cpp symbols from backtrace_symbols() in the offset during runtime for addr2line

Setting authorizationExceptionRetryInterval for Spring Kafka

Spring boot : Kafka commits offset

Spring-kafka listener concurenncy

Spring Kafka Consumer/Listener Group

Spring Kafka Listener on Http Request

Spring Kafka consumer listener configuration

spring-kafka listener shutdown

spring kafka @KafkaListener listener container?

Spring Boot Kafka listener is inconsistent

Will Spring KafkaContainerStoppingErrorHandler commits offset for batch listener

reload spring bean during the runtime

changing kafka retention period during runtime

How to re-send (read) an old kafka message from given topic and partition at specific offset using spring-kafka?

setting a breakpoint in a specific offset inside a function with 'gdb'

Kafka spring listener spring validation on headers

Not able to shutdown the jms listener which posts message to kafka spring boot application with Runtime.exit, context.close, System.exit()

Is there a way to stop Kafka consumer at a specific offset?

Kafka - Consumer group creation with specific offset?

Setting AuthExceptionRetryInterval property in Spring Kafka

Spring Kafka Property for setting SeekToCurrentBatchErrorHandler

TOP Ranking

  1. 1

    Failed to listen on localhost:8000 (reason: Cannot assign requested address)

  2. 2

    Loopback Error: connect ECONNREFUSED 127.0.0.1:3306 (MAMP)

  3. 3

    How to import an asset in swift using Bundle.main.path() in a react-native native module

  4. 4

    pump.io port in URL

  5. 5

    Compiler error CS0246 (type or namespace not found) on using Ninject in ASP.NET vNext

  6. 6

    BigQuery - concatenate ignoring NULL

  7. 7

    ngClass error (Can't bind ngClass since it isn't a known property of div) in Angular 11.0.3

  8. 8

    ggplotly no applicable method for 'plotly_build' applied to an object of class "NULL" if statements

  9. 9

    Spring Boot JPA PostgreSQL Web App - Internal Authentication Error

  10. 10

    How to remove the extra space from right in a webview?

  11. 11

    java.lang.NullPointerException: Cannot read the array length because "<local3>" is null

  12. 12

    Jquery different data trapped from direct mousedown event and simulation via $(this).trigger('mousedown');

  13. 13

    flutter: dropdown item programmatically unselect problem

  14. 14

    How to use merge windows unallocated space into Ubuntu using GParted?

  15. 15

    Change dd-mm-yyyy date format of dataframe date column to yyyy-mm-dd

  16. 16

    Nuget add packages gives access denied errors

  17. 17

    Svchost high CPU from Microsoft.BingWeather app errors

  18. 18

    Can't pre-populate phone number and message body in SMS link on iPhones when SMS app is not running in the background

  19. 19

    12.04.3--- Dconf Editor won't show com>canonical>unity option

  20. 20

    Any way to remove trailing whitespace *FOR EDITED* lines in Eclipse [for Java]?

  21. 21

    maven-jaxb2-plugin cannot generate classes due to two declarations cause a collision in ObjectFactory class

HotTag

Archive