我正在编写一个基于Java的Kafka Consumer应用程序。我正在为我的应用程序使用kafka-clients,Spring Kafka和Spring boot。虽然Spring boot使我可以轻松地编写Kafka使用者(而无需真正编写ConcurrentKafkaListenerContainerFactory,ConsumerFactory等),但我希望能够为这些使用者定义/自定义某些属性。但是,我找不到使用Spring Boot的简单方法。例如:我想要设置的一些属性是-
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
我接过一看春天开机前定义的属性在这里。
另外,基于这里的一个较早的问题,我想在使用者上设置并发性,但是找不到使用Spring Boot进行配置,由application.properties驱动的方式来做到这一点。
一种明显的方法是ConcurrentKafkaListenerContainerFactory, ConsumerFactory
在Spring Context中再次定义类并从那里开始工作。我想了解是否有更清洁的方法,尤其是因为我使用的是Spring Boot。
版本-
在您引用的网址上,向下滚动到
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
春季卡夫卡-1.1.0.RELEASE
我建议至少升级到1.3.5;多亏了KIP-62,它的线程模型更为简单。
编辑
使用Boot 2.0,您可以设置任意的生产者,使用者,管理员,公共属性,如引导文档中所述。
spring.kafka.consumer.properties.heartbeat.interval.ms
对于Boot 1.5,只有此处spring.kafka.properties
所述。
这将为生产者和使用者设置属性,但是您可能会在日志中看到关于生产者未使用/不受支持的属性的噪音。
另外,您可以简单地覆盖Boot的消费者工厂并根据需要添加属性。
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProps = properties.buildConsumerProperties();
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句