如何设置onFailure事件(春季,卡夫卡)的超时时间?

V.Perfilev:

我正在尝试在Spring MVC中实现将消息发送到Kafka的异步REST方法。一切正常,但是当服务器不可用时,onFailure事件将处理很长时间。如何将ListenableFuture中的响应时间限制为例如三秒。

这是我的代码:

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

@Value("${spring.kafka.topic}")
String topic;

@RequestMapping("/test")
DeferredResult<ResponseEntity<?>> test(
        @RequestParam(value = "message") String message
) {

    DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> sendResult) {
            ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);
            deferredResult.setResult(responseEntity);
        }

        @Override
        public void onFailure(Throwable ex) {
            ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);
            deferredResult.setResult(responseEntity);
        }

    });

    return deferredResult;
}

我试图使用REQUEST_TIMEOUT_MS_CONFIGKafka 属性和.get(long timeout, TimeUnit unit)ListenableFuture的方法,但没有得到想要的结果。

加里·罗素:

这是因为生产者会阻塞60秒(默认情况下)。

参见max.block.msKafkaDocumentation中的生产者配置

max.block.ms 配置控制了KafkaProducer.send()和KafkaProducer.partitionsFor()的阻塞时间,这些方法可以由于缓冲区已满或元数据不可用而被阻塞,用户提供的序列化器或分区器中的阻塞将不计入此超时时间。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章