在Spring Boot中控制启用/禁用Kafka使用者

Hua

我已经在Spring Boot中配置了多个Kafka使用者。这就是kafka.properties的样子(此处仅列出一个使用者的配置):

kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=

这是配置:

@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, String> pindropConsumerFactory() {
        Map<String, Object> dataRiverProps = new HashMap<>();

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));

        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(pindropConsumerFactory());
        return factory;
    }
}

这是消费者:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;


    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeJson(String message) {
        // processing message
    }
}

我是否可以使用道具“ kafka.enabled”来控制此使用者的创建或消息检索?非常感谢!

相同的努纳

您可以通过在使用者中使用属性autoStartup(true / false)来做到这一点,如下所示-

@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",
        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")
public void consume(String message) {
    //System.out.println("Consumed message: " + message);
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何使用Spring Boot设置Kafka使用者并发

不使用ConsumerRecordRecoverer提交Kafka Spring使用者补偿

可以使用属性启用/禁用Spring Boot @RestController吗?

没有Kafka的Spring Kafka使用者不使用消息

Spring Kafka-选择@TopicPartition会停用组中的使用者

如何在Spring Boot中启用/禁用特定的休息终点?

如何在Spring Boot Security中启用或禁用用户

Spring Kafka多个使用者针对单个主题消耗不同的消息

错误处理-使用者-Apache Kafka和Spring

无法使用Spring Boot构建kafka使用者

Spring Boot Kafka:由于NoSuchBeanDefinitionException而无法启动使用者

Spring Kafka在一个使用者中使用多种消息类型

Spring Kafka使用者,在运行时查找偏移量?

Spring Kafka使用者-云中的并发侦听器与非并发侦听器

在spring-webflux中使用Spring AMQP使用者

如何为ActiveMQ队列创建Spring Boot使用者?

我们如何在Spring Cloud Stream Kafka生产者,使用者和KStreams中为架构配置value.subject.name.strategy?

Spring-Kafka使用者KafkaListener无法将GenericMessage转换为Java对象

Spring Boot运行状况检查-SQS使用者

将自定义kafka使用者与Spring云配置客户端集成

在Spring Boot应用中为kafka使用者设置重试策略时,何时使用ExponentialBackOffPolicy和FixedBackOffPolicy?

如何实现Kafka使用者使用spring-cloud-stream来按需处理事件?

Spring Integration Kafka 1.2.1.RELEASE:使用者上下文错误

向Spring Integration Kafka使用者倒带偏移

如何在Spring Boot中实现循环队列使用者

嗨,spring-kafka如何处理使用者线程?

如何禁用自动启动的 KAFKA 使用者而无需进行任何代码更改,包括在 Spring Boot 中设置 autoStartup = "{xyz}"?

Dockerize 并部署在 AKS 上具有 Kafka 使用者的 Spring 应用程序?

無法使用 JmsListenerEndpointRegistry 在 Spring Boot 中停止 JMS 使用者