我想将MQ队列中的消息发送到不同服务器上的EMS队列。我不确定如何使用Java做到这一点。我如何确保发送消息时不会丢失任何消息。
我可以使用Java使用来自MQ的消息。
try {
// Create a connection factory
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsConnectionFactory cf = ff.createConnectionFactory();
// Set the properties
cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, HOST);
cf.setIntProperty(WMQConstants.WMQ_PORT, PORT);
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, CHANNEL);
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, QMGR);
cf.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "JmsPutGet (JMS)");
cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
cf.setStringProperty(WMQConstants.USERID, APP_USER);
cf.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);
// Create JMS objects
context = cf.createContext();
destination = context.createQueue("queue:///" + QUEUE_NAME);
long uniqueNumber = System.currentTimeMillis() % 1000;
TextMessage message = context.createTextMessage("Your lucky number today is " + uniqueNumber);
producer = context.createProducer();
producer.send(destination, message);
System.out.println("Sent message:\n" + message);
consumer = context.createConsumer(destination); // autoclosable
String receivedMessage = consumer.receiveBody(String.class, 15000); // in ms or 15 seconds
System.out.println("\nReceived message:\n" + receivedMessage);
recordSuccess();
} catch (JMSException jmsex) {
recordFailure(jmsex);
}
System.exit(status);
您的代码几乎是正确的,但是如果要在程序中切换JMS提供程序,则还必须切换JMS ConnectionFactory。这是从中创建使用者和消息的中央(特定于提供商的)类。
因此,在创建上下文,生产者和消息时,您需要使用TIBCO com.tibco.tibjms.naming.TibjmsInitialContextFactory
,例如:
try {
// 1) Create a MQ connection factory
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsConnectionFactory cf = ff.createConnectionFactory();
// 2) Set the properties
cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, HOST);
cf.setIntProperty(WMQConstants.WMQ_PORT, PORT);
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, CHANNEL);
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, QMGR);
cf.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "JmsPutGet (JMS)");
cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
cf.setStringProperty(WMQConstants.USERID, APP_USER);
cf.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);
// 3) Create MQ consumer
JMSContext mqContext = cf.createContext();
destination = mqContext.createQueue("queue:///" + QUEUE_NAME);
consumer = mqContext.createConsumer(destination); // autoclosable
// 4) wait for message from MQ (or null)
String receivedText = mqContext.receiveBody(String.class, 15000); // in ms or 15 seconds
System.out.println("\nReceived message:\n" + receivedText);
// 5) Create TIBCO EMS ConnectionFactory and an EMS MessageProducer
TibjmsConnectionFactory emsCF = new com.tibco.tibjms.TibjmsConnectionFactory( "tcp://1.2.3.4:7222");
Connection emsConnection = emsCF.createConnection(user, password);
Session emsSession = emsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer emsProducer = emsSession.createProducer( emsSession.createQueue(QUEUE_NAME) );
// 6) Create EMS TextMessage from MQ TextMessage
TextMessage emsMsg = emsSession.createTextMessage( receivedText );
// 7) publish to EMS
emsProducer.send(emsMsg);
// 8) cleanup
emsConnection.close();
recordSuccess();
} catch (JMSException jmsex) {
recordFailure(jmsex);
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句