如何使用Spring Boot设置Kafka使用者并发

用户3842182:

我正在编写一个基于Java的Kafka Consumer应用程序。我正在为我的应用程序使用kafka-clients,Spring Kafka和Spring boot。虽然Spring boot使我可以轻松地编写Kafka使用者(而无需真正编写ConcurrentKafkaListenerContainerFactory,ConsumerFactory等),但我希望能够为这些使用者定义/自定义某些属性。但是,我找不到使用Spring Boot的简单方法。例如:我想要设置的一些属性是-

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

我接过一看春天开机前定义的属性在这里

另外,基于这里的一个较早的问题,我想在使用者上设置并发性,但是找不到使用Spring Boot进行配置,由application.properties驱动的方式来做到这一点。

一种明显的方法是ConcurrentKafkaListenerContainerFactory, ConsumerFactory在Spring Context中再次定义类并从那里开始工作。我想了解是否有更清洁的方法,尤其是因为我使用的是Spring Boot。

版本-

  • kafka客户端-0.10.0.0-SASL
  • 春季卡夫卡-1.1.0.RELEASE
  • 弹簧靴-1.5.10.RELEASE
加里·罗素:

在您引用的网址上,向下滚动到

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

春季卡夫卡-1.1.0.RELEASE

我建议至少升级到1.3.5;多亏了KIP-62,它的线程模型更为简单。

编辑

使用Boot 2.0,您可以设置任意的生产者,使用者,管理员,公共属性,如引导文档中所述

spring.kafka.consumer.properties.heartbeat.interval.ms

对于Boot 1.5,只有此处spring.kafka.properties所述

这将为生产者和使用者设置属性,但是您可能会在日志中看到关于生产者未使用/不受支持的属性的噪音。

另外,您可以简单地覆盖Boot的消费者工厂并根据需要添加属性。

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
    Map<String, Object> consumerProps = properties.buildConsumerProperties();
    consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
    return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

无法使用Spring Boot构建kafka使用者

Spring Kafka使用者-云中的并发侦听器与非并发侦听器

在Spring Boot中控制启用/禁用Kafka使用者

如何减慢或设置Kafka流使用者的给定速度?

在Spring Boot应用中为kafka使用者设置重试策略时,何时使用ExponentialBackOffPolicy和FixedBackOffPolicy?

如何禁用自动启动的 KAFKA 使用者而无需进行任何代码更改,包括在 Spring Boot 中设置 autoStartup = "{xyz}"?

如何为ActiveMQ队列创建Spring Boot使用者?

MDB bean池与Spring JMS并发使用者

Spring Boot Kafka:由于NoSuchBeanDefinitionException而无法启动使用者

Kafka使用者使用ClassNotFoundException

Sprint Boot Kafka使用者无法连接到Kafka容器

不使用ConsumerRecordRecoverer提交Kafka Spring使用者补偿

无法启动多个Kafka使用者

Kafka多个使用者用于分区

与多个使用者暂停kafka主题

无法建立kafka使用者

分区特定的Flink Kafka使用者

Python-Kafka:使用者失败

.Net Core中的Kafka使用者

Apache Kafka多个使用者实例

错误处理-使用者-Apache Kafka和Spring

向Spring Integration Kafka使用者倒带偏移

如何实现Kafka使用者使用spring-cloud-stream来按需处理事件?

KafkaConnect接收器设置Kafka使用者隔离级别

多个延迟的使用者,并发句柄,BlockingQueue

具有独立使用者的单个InputStream的并发处理

kafka使用者问题:创建使用者之后,连接断开:Kafka + kubernetes

当消息数大于并发使用者数时,如何使用Spring IntegrationFlow中所需的所有消息?

不确定如何激活Kafka使用者的消耗()方法