当Kafka关闭时,我该如何处理IOException?

Grzegorz Piwowarek:

我正在尝试发布消息,而Apache Kafka掉线了。我应该如何处理这样的紧急情况?

KafkaProducer :: send()方法不会抛出任何可以处理的异常。生产者吞下它们并记录错误,所以我被这样的消息淹没,一切都挂起,直到Kafka重新连接。

2014-03-31 09:38:23.752 ERROR o.a.kafka.common.network.Selector - Error in I/O: 
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_51]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_51]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:205) ~[kafka-clients-0.8.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212) [kafka-clients-0.8.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:150) [kafka-clients-0.8.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
彼得·戴维斯(Peter Davis):

调用Producer.send(...)get()返回的future ,或者如果您不想阻塞代码,则传递一个callback

try { producer.send(new ProducerRecord("mytopic", key, value)) .get(); // block until acknowledged } catch(Exception e) { // handle message wasn't acknowledged }

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章