当批量使用Kafka消息时,可以使用来限制批量大小max.poll.records
。
如果使用方非常快并且其提交偏移量不会显着滞后,则这意味着大多数批次将小得多。我只想接收“完整”批次,即仅调用我的使用者函数,然后达到批次大小。因此,我正在寻找类似的东西min.poll.records
,该形式不存在。
这是我正在做的最小示例:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
}
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
)
fun batchListen(values: List<ConsumerRecord<String, String>>) {
println(values.count())
}
}
当出现一些消费者滞后时,它会输出如下内容:
[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]
sleep
当满足以下两个条件之一时,是否有任何方法(如果没有“不完整”的批处理,则不手动在使用者处理程序中启用)来调用该函数?-仅当至少有n
消息存在时-或至少m
花费了毫秒的等待时间
卡夫卡没有min.poll.records
; fetch.min.bytes
如果您的记录长度相似,则可以使用来近似。另请参阅fetch.max.wait.ms
。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句