我用以下属性设置了卡夫卡消费者
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers",server);
consumerProperties.put("group.id",groupId);
consumerProperties.put("security.protocol", "SASL_PLAINTEXT");
consumerProperties.put("sasl.mechanism", "PLAIN");
consumerProperties.put("enable.auto.commit", "false");
consumerProperties.put("acks", "all");
consumerProperties.put("request.timeout.ms", 12000);
consumerProperties.put("max.block.ms",500);
consumerProperties.put("session.timeout.ms", 11000);
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//Object creation from above properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
// i have try catch blocks but exceptions aren't being thrown.
如果服务器详细信息不正确,它将在轮询时无限次等待-例如服务器名称错误或端口错误
try{
LOGGER.info("Subscribing to topic.");
consumer.subscribe(Arrays.asList(topic));
LOGGER.info("Subscribed to topic successfully.");
LOGGER.info("Start of polling records for consumer. ");
***records = consumer.poll(100);***
//CODE GETS STUCK IN ABOVE LINE FOR INFINITE TIME AND DOESNT COMES OUT
LOGGER.info("Returning records to microservice.");
}
catch(InterruptException interruptException) {
LOGGER.error("interrupt exception "+interruptException);
}
catch(TimeoutException timeoutException) {
LOGGER.error("Time out exception "+timeoutException);
}
catch (KafkaException kafkaException) {
LOGGER.error("Kafka Exception occurred while consuming records by consumer. Message: "+kafkaException.getMessage());
}
catch(Exception exception){
LOGGER.error("Exception occured while creating consumer object "+exception);
}
请建议我需要进行哪些更改,以中断对不正确服务器的无限轮询?
正如其他人指出的那样,Kafka的内部ConsumerCoordinator当前具有内置超时9223372036854775807ms,同时试图确保协调器准备就绪。
如果您只是想在尝试轮询使用者之前确保主机/端口信息正确,则可以通过简单的调用来解决问题consumer.listTopics()
。org.apache.kafka.common.errors.TimeoutException
如果无法连接,它将抛出一个。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句