为了学习Apache Kafka
,我开发了一个Spring Boot
应用程序,Kafka
如果我将POST请求发送到调用KafkaTemplate
send方法的控制器,该应用程序会将消息发送到主题。我正在运行Ubuntu 19.04,并且已成功在本地安装Kafka
和安装Zookeeper
。一切正常。
当我关闭Zookeeper
或时,就会发生问题Kafka
。如果执行此操作,则在启动时Kafka AdminClient
,我的应用程序会定期尝试查找信息,但将此消息发送到控制台
Connection to node -1 could not be established. Broker may not be available.
我实施了此处建议的修复程序Kafka + Zookeeper:无法建立到节点-1的连接。经纪人可能不可用,这里是Spring-Boot和Kafka:如何处理经纪人不可用?。但是,如果我跑maven clean install
那么构建,如果无法完成Zookeeper
,并Kafka
没有运行。为什么会这样,并且有一种方法可以配置应用程序,以便Kafka
在启动时检查可用性并在服务不可用时正常处理?
这是我的服务类别, KafkaTemplate
@Autowired
public PingMessageServiceImpl(KafkaTemplate kafkaTemplate, KafkaTopicConfiguration kafkaTopicConfiguration) {
this.kafkaTemplate = kafkaTemplate;
this.kafkaTopicConfiguration = kafkaTopicConfiguration;
}
@Override
public void sendMessage(String message) {
log.info(String.format("Received following ping message %s", message));
if (!isValidPingRequest(message)) {
log.warn("Received invalid ping request");
throw new InvalidPingRequestException();
}
log.info(String.format("Sending message=[%s]", message));
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(kafkaTopicConfiguration.getPingTopic(), message);
future.addCallback(buildListenableFutureCallback(message));
}
private boolean isValidPingRequest(String message) {
return "ping".equalsIgnoreCase(message);
}
private ListenableFutureCallback<SendResult<String, String>> buildListenableFutureCallback(String message) {
return new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info(String.format("Sent message=[%s] with offset=[%d]", message, result.getRecordMetadata().offset()));
}
@Override
public void onFailure(Throwable ex) {
log.info(String.format("Unable to send message=[%s] due to %s", message, ex.getMessage()));
}
};
}
这是我用来从属性文件中提取Kafka的配置属性的配置类
@NotNull(message = "bootstrapAddress cannot be null")
@NotBlank(message = "bootstrapAddress cannot be blank")
private String bootstrapAddress;
@NotNull(message = "pingTopic cannot be null")
@NotBlank(message = "pingTopic cannot be blank")
private String pingTopic;
@NotNull(message = "reconnectBackoffMs cannot be null")
@NotBlank(message = "reconnectBackoffMs cannot be blank")
@Value("${kafka.reconnect.backoff.ms}")
private String reconnectBackoffMs;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configurations.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
return new KafkaAdmin(configurations);
}
@Bean
public NewTopic pingTopic() {
return new NewTopic(pingTopic, 1, (short) 1);
}
@PostConstruct
private void displayOnStartup() {
log.info(String.format("bootstrapAddress is %s", bootstrapAddress));
log.info(String.format("reconnectBackoffMs is %s", reconnectBackoffMs));
}
如果Spring-boot
在加载ApplicationContext
spring kafka bean(例如)时进行了任何集成测试KafakTemplate
,KafkaAdmin
将尝试使用yml
或properties
file中指定的属性连接kafka服务器。
因此,为避免这种情况,可以使用spring-embedded-kafka-server,以便在测试执行期间kafka bean将连接到嵌入式服务器。
或者简单地,您可以在集成测试用例中使用批注来模拟KafakTemplate
和KafkaAdmin
bean@MockBean
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句