如果Kafka和Zookeeper没有运行,为什么Maven Clean Install无法完成?

太空少年:

为了学习Apache Kafka,我开发了一个Spring Boot应用程序,Kafka如果我将POST请求发送到调用KafkaTemplatesend方法的控制器,应用程序会将消息发送到主题我正在运行Ubuntu 19.04,并且已成功在本地安装Kafka安装Zookeeper一切正常。

当我关闭Zookeeper时,就会发生问题Kafka如果执行此操作,则在启动时Kafka AdminClient,我的应用程序会定期尝试查找信息,但将此消息发送到控制台

Connection to node -1 could not be established. Broker may not be available.

我实施了此处建议的修复程序Kafka + Zookeeper:无法建立到节点-1的连接。经纪人可能不可用,这里是Spring-Boot和Kafka:如何处理经纪人不可用?但是,如果我跑maven clean install那么构建,如果无法完成Zookeeper,并Kafka没有运行。为什么会这样,并且有一种方法可以配置应用程序,以便Kafka在启动时检查可用性并在服务不可用时正常处理?

这是我的服务类别, KafkaTemplate

@Autowired
public PingMessageServiceImpl(KafkaTemplate kafkaTemplate, KafkaTopicConfiguration kafkaTopicConfiguration) {
    this.kafkaTemplate = kafkaTemplate;
    this.kafkaTopicConfiguration = kafkaTopicConfiguration;
}

@Override
public void sendMessage(String message) {
    log.info(String.format("Received following ping message %s", message));

    if (!isValidPingRequest(message)) {
        log.warn("Received invalid ping request");
        throw new InvalidPingRequestException();
    }
    log.info(String.format("Sending message=[%s]", message));
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(kafkaTopicConfiguration.getPingTopic(), message);
    future.addCallback(buildListenableFutureCallback(message));
}

private boolean isValidPingRequest(String message) {
    return "ping".equalsIgnoreCase(message);
}

private ListenableFutureCallback<SendResult<String, String>> buildListenableFutureCallback(String message) {
    return new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            log.info(String.format("Sent message=[%s] with offset=[%d]", message, result.getRecordMetadata().offset()));
        }
        @Override
        public void onFailure(Throwable ex) {
            log.info(String.format("Unable to send message=[%s] due to %s", message, ex.getMessage()));
        }
    };
}

这是我用来从属性文件中提取Kafka的配置属性的配置类

@NotNull(message = "bootstrapAddress cannot be null")
@NotBlank(message = "bootstrapAddress cannot be blank")
private String bootstrapAddress;

@NotNull(message = "pingTopic cannot be null")
@NotBlank(message = "pingTopic cannot be blank")
private String pingTopic;

@NotNull(message = "reconnectBackoffMs cannot be null")
@NotBlank(message = "reconnectBackoffMs cannot be blank")
@Value("${kafka.reconnect.backoff.ms}")
private String reconnectBackoffMs;

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configurations = new HashMap<>();
    configurations.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configurations.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
    return new KafkaAdmin(configurations);
}

@Bean
public NewTopic pingTopic() {
    return new NewTopic(pingTopic, 1, (short) 1);
}

@PostConstruct
private void displayOnStartup() {
    log.info(String.format("bootstrapAddress is %s", bootstrapAddress));
    log.info(String.format("reconnectBackoffMs is %s", reconnectBackoffMs));
}
死侍 :

如果Spring-boot在加载ApplicationContextspring kafka bean(例如)进行了任何集成测试KafakTemplateKafkaAdmin将尝试使用ymlpropertiesfile中指定的属性连接kafka服务器。

因此,为避免这种情况,可以使用spring-embedded-kafka-server,以便在测试执行期间kafka bean将连接到嵌入式服务器。

或者简单地,您可以在集成测试用例中使用批注来模拟KafakTemplateKafkaAdminbean@MockBean

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

在Maven中,为什么要运行“ mvn clean”?

运行MVN Clean Install应用程序尝试将Postgres连接到错误的端口

从VM guest虚拟机到主机的Maven`mvn -PautoInstallPackage clean install`失败

如果我注释掉了这些方法以跳过测试,为什么还要在春季启动项目上运行runnung mvn clean install命令(在Spring Boot项目上)却获得单元测试失败?

Maven无法执行目标org.apache.maven.plugins:maven-clean-plugin:2.5:clean无法删除access_log

我要运行源./vars和./clean-all时无法创建证书

“ mvn clean install”和“ mvn eclipse:clean eclipse:eclipse”命令有什么区别?

mvn -U clean编译和mvn clean编译之间的区别

Gradle Clean任务因无法解决所有依赖关系而失败

使用Ubuntu 15.10的Maven,出现以下错误:mvn clean install;

maven clean install -U有什么作用?

为什么./gradlew clean build和./gradlew clean:build之间有区别?

如何只用“ mvn clean install”编译项目而不必随后运行“ mvn install”?

无法使用def clean()运行表单集验证

“ mvn clean install”运行测试,但不运行“ mvn clean site”

什么是Maven Clean Repo Building?

Diskpart Clean,因此无法启动

使用mvn clean -e install

Gradle Clean Build和connectedAndroidTest

mvn clean install找不到工件

RPM规范文件中每个指令(%build,%install,%clean等)的预期用途是什么?

'mvn clean install':为什么'clean'在'install'之后运行?

gulp clean无法正常工作

Maven clean无法删除文件

运行 mvn clean install 时切换分支会影响 maven 吗?

maven -P 选项不适用于 mvn clean install

“mvn clean”与“mvn clean install”(当插件执行附加到“clean”时)

gulp-clean 和 del 无法清理我想要的文件夹

如何覆蓋 ModelFormMixin 的 clean 和 clean_fieldname