如何使用Spring Boot等待完整的Kafka消息批处理?

托比亚斯·赫尔曼(Tobias Hermann):

当批量使用Kafka消息时,可以使用来限制批量大小max.poll.records

如果使用方非常快并且其提交偏移量不会显着滞后,则这意味着大多数批次将小得多。我只想接收“完整”批次,即仅调用我的使用者函数,然后达到批次大小。因此,我正在寻找类似的东西min.poll.records,该形式不存在。

这是我正在做的最小示例:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @Bean
    fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
        factory.isBatchListener = true
        return factory
    }

    @KafkaListener(
        topics = ["myTopic"],
        containerFactory = "kafkaBatchListenerContainerFactory"
    )
    fun batchListen(values: List<ConsumerRecord<String, String>>) {
        println(values.count())
    }
}

当出现一些消费者滞后时,它会输出如下内容:

[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]

sleep当满足以下两个条件之一时,是否有任何方法(如果没有“不完整”的批处理,则不手动在使用者处理程序中启用)来调用该函数?-仅当至少有n消息存在时-或至少m花费了毫秒的等待时间

加里·罗素:

卡夫卡没有min.poll.records; fetch.min.bytes如果您的记录长度相似,则可以使用来近似另请参阅fetch.max.wait.ms

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在融合的Kafka Python中读取批处理消息?

Spring Boot如何运行批处理作业

Spring Boot Kafka消息传递。如何简化处理程序的dto映射?

我们如何在Kafka中快速编写单个消息(非批处理)?

如何使用Spring Kafka批处理侦听器进行有状态重试

如何在批处理脚本中等待?

Spring Kafka批处理选项:使用SeekToCurrentBatchErrorHandler的ClassCastException

如何使用Apache骆驼向批处理消息发送批处理消息

如何使用 Spring Kafka 处理两个时间戳之间的所有消息?

如何在 Spring Boot 中搜索乘法(批处理)参数?

Spring集成-在处理消息之前等待条件

如何使用core.async正确批处理消息?

如何使用MyBatis / Spring进行批处理操作?

使用 Spring Cloud Stream 和 Kafka 处理重复消息

使用Spring Kafka处理有关死信主题的消息

使用KafkaNativeOffsetManager时Spring Integration Kafka慢速消息处理

使用Kafka模板批处理记录

Messenger:如何等待消息被处理?

如何使用Spring Boot设置Kafka使用者并发

如何使Spring Boot使用MultiTenantSpringLiquibase?

如何在Spring Boot应用程序的@Transactional方法中使用纯Hibernate保证原子批处理插入

Spring Kafka-使用哪个批处理错误处理程序?

使用Spring Kafka框架时如何处理错误/异常?

如何连续使用Spring Boot从IBM MQ JMS接收消息?

Windows批处理:在FOR / F中使用文件的完整路径

使用Wildfly进行批处理/批量消息(JMS)处理

使用Spring Boot时如何使用SpringTemplateEngine

如何使用 Kafka 流式 DSL 函数处理重复消息

如何等待多个批处理文件完成