当服务器或端口详细信息错误时,kafka使用者将无限等待

Surbhi Bakshi

我用以下属性设置了卡夫卡消费者

    Properties consumerProperties = new Properties();
    consumerProperties.put("bootstrap.servers",server);
    consumerProperties.put("group.id",groupId);
    consumerProperties.put("security.protocol", "SASL_PLAINTEXT");
    consumerProperties.put("sasl.mechanism", "PLAIN");
    consumerProperties.put("enable.auto.commit", "false");
    consumerProperties.put("acks", "all");
    consumerProperties.put("request.timeout.ms", 12000);
    consumerProperties.put("max.block.ms",500);
    consumerProperties.put("session.timeout.ms", 11000);
    consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 //Object creation from above properties
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
 // i have try catch blocks but exceptions aren't being thrown.

如果服务器详细信息不正确,它将在轮询时无限次等待-例如服务器名称错误或端口错误

            try{
                LOGGER.info("Subscribing to topic.");
                consumer.subscribe(Arrays.asList(topic));
                LOGGER.info("Subscribed to topic successfully.");
                LOGGER.info("Start of polling records for consumer. ");
                ***records = consumer.poll(100);***
         //CODE GETS STUCK IN ABOVE LINE FOR INFINITE TIME AND DOESNT COMES OUT
                LOGGER.info("Returning records to microservice.");
            }
            catch(InterruptException interruptException) {
                LOGGER.error("interrupt exception "+interruptException);
            }
            catch(TimeoutException timeoutException) {
                LOGGER.error("Time out exception "+timeoutException);
            }
            catch (KafkaException kafkaException) {
                LOGGER.error("Kafka Exception occurred while consuming records by consumer. Message: "+kafkaException.getMessage());
            }
            catch(Exception exception){
                LOGGER.error("Exception occured while creating consumer object "+exception);
            }

请建议我需要进行哪些更改,以中断对不正确服务器的无限轮询?

凯兰伯克特

正如其他人指出的那样,Kafka的内部ConsumerCoordinator当前具有内置超时9223372036854775807ms,同时试图确保协调器准备就绪。

如果您只是想在尝试轮询使用者之前确保主机/端口信息正确,则可以通过简单的调用来解决问题consumer.listTopics()org.apache.kafka.common.errors.TimeoutException如果无法连接,它将抛出一个

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

打印服务器证书详细信息时发生异常

要同时启动Kafka使用者和HTTP服务器?

Spring Boot授权服务器将请求发送到外部服务以获取用户详细信息

如何在服务器上验证PayPal Express Checkout详细信息?

动态获取代理服务器详细信息时,来自SOCKS服务器的格式错误的答复

如何获取成员的服务器详细信息

使用nova客户端通过Python OpenStack API显示服务器详细信息

无服务器SQS使用者跳过消息

如何针对服务器上运行的真实kafka代理测试kafka使用者?

使用服务器端JSON数组对象作为AngularJS 1.4中的登录详细信息

如何查看Kafka服务器状态或详细信息?

500-内部服务器错误,无详细信息

内部错误500:服务器中的IIS8错误,但在浏览器中的网页上未显示错误详细信息

TFS2013:将诊断活动日志复制到放置位置时发生错误。详细信息:内部服务器错误

如何在终端中显示服务器的TLS证书详细信息?

服务器在处理请求时遇到错误。请参阅服务器日志以获取更多详细信息

如何获取向nodejs服务器发送数据(.emit())的用户的详细信息?

如何将PHP用户详细信息(会话)解析到Node.js服务器?

有关esxi上服务器硬件的详细信息

如何使用Socket.IO发出服务器端错误并接收有关错误客户端的详细信息?

Postgres从属服务器日志重播详细信息

服务器端程序连接详细信息

如何确保资源显示在Chef服务器运行日志详细信息中

即使将IncludeErrorDetailPolicy设置为Always,也不会显示内部服务器错误[500]的详细信息

VSTS 构建阶段的 ReadyRoll 服务器详细信息

如何查找有关 Azure EventHub 的指标“内部服务器错误”和“其他错误”的详细信息

Keycloak:Admin-cli 添加 SMTP 服务器详细信息?

如何使用 Pivotal Spring Cloud Gateway 从单点登录 (SSO) 服务器获取用户详细信息?

无法连接到远程机器。验证 SSH 端点详细信息。使用 FTP 上传到远程服务器时