我有一个JAVA应用程序,该应用程序创建了可以收听Rabbitmq的使用者。我需要知道启动的使用者仍然可以正常工作,如果没有,那么我需要重新启动使用者。
他们有什么办法可以做到吗?当前,我的主应用程序创建了一个Executor线程池,并在创建新连接时传递了该executor。
ExecutorService executor = Executors.newFixedThreadPool(30);
Connection connection = factory.newConnection(executor);
然后,主要方法通过使用新通道作为参数调用构造函数来创建30个ConsumerApp对象,并调用listen()方法
for(int i=0;i<=30;i++) {
ConsumerApp consumer = new ConsumerApp(i,connection.createChanell());
consumer.listen() }
ConsumerApp中的listen方法侦听队列并启动DefaultConsumer对象,该对象仅打印接收到的消息
listen() {
try {
channel.queueDeclare("test-queue-name", false, false, false, null);
}
catch {
System.out.println("Exception on creating Queue")
}
Consumer consumer = new DefaultConsumer(this.channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received Message in consumer '"+consumerId+" "+ message + "'");
}
};
//Now starting the consumer
try {
channel.basicConsume(QUEUE_NAME, true, consumer);
}
catch (ShutdownSignalException | IOException ex) {
ex.printStackTrace();
}
}
我想知道他们是否可以检查消费者是否活跃。我的想法是捕获关闭信号异常并重新创建使用者对象并调用listen方法。这是必要的,因为rabbitmq自动恢复并连接回来。?但是我怎么能保证呢?
使用传递给rabbitmq连接器的线程池是否可以实现此目的?
我正在使用最新版本的Rabbitmq客户端5.3.0
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句