该Sample为用JMS直接访问MQ的简单例子,其中牵涉3个最基本的JMS模型,Producer/Consumer/Browser,因为在该Sample中Browser作用更像是个Monitor,所以类名使用的是Monitor。
注意一点,该Sample连接方式使用的是Bind方式,该方式将会自动连接本地的MQ服务器,如果改成Client,除非连接远程客户机,并且有正确的用户设置方式,不然即使host写成本地IP,依然会抛 2035 (MQRC_NOT_AUTHORIZED) 错误。
Prducer
package jms.direct;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;
public class JMSProducer {
private String MQ_HOST = "localhost";
private int MQ_PORT = 1416;
private String MQ_CHANNEL = "";
private String MQ_QUEUEMANAGER_NAME = "MW_QM";
private String MQ_QUEUE_NAME = "MW_QUEUE";
private JmsFactoryFactory jff = null;
private JmsConnectionFactory jcf = null;
private Connection connection = null;
private Session session = null;
private Queue queue = null;
private MessageProducer producer = null;
private Message msg = null;
public JMSProducer() {
try {
// Create a connection factory
jff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
jcf = jff.createConnectionFactory();
// Set the properties
// jcf.setStringProperty(WMQConstants.WMQ_HOST_NAME, MQ_HOST);
// jcf.setIntProperty(WMQConstants.WMQ_PORT, MQ_PORT);
// jcf.setStringProperty(WMQConstants.WMQ_CHANNEL, MQ_CHANNEL);
// jcf.setStringProperty(WMQConstants.USERID, "");
// jcf.setStringProperty(WMQConstants.PASSWORD, "");
// jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
jcf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, MQ_QUEUEMANAGER_NAME);
// Create JMS objects
connection = jcf.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(MQ_QUEUE_NAME);
producer = session.createProducer(queue);
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMsg(String msgStr, String msgId) {
try {
msg = session.createMessage();
msg.setStringProperty("STRMSG", msgStr);
msg.setJMSCorrelationID(msgId);
producer.send(msg);
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("-------------");
System.out.println("Message send complete!");
System.out.println("MessageId: " + msgId);
System.out.println("Message: " + msgStr);
}
public void close() throws JMSException {
producer.close();
session.close();
connection.close();
}
public static void main(String[] args) {
try {
System.out.println("Starting ...");
JMSProducer producer = new JMSProducer();
producer.sendMsg("Hello1", "000000001");
producer.sendMsg("Hello2", "000000002");
producer.sendMsg("Hello3", "000000003");
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer
package jms.direct;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;
public class JMSConsumer {
private String MQ_QUEUEMANAGER_NAME = "MW_QM";
private String MQ_QUEUE_NAME = "MW_QUEUE";
private JmsFactoryFactory jff = null;
private JmsConnectionFactory jcf = null;
private Connection connection = null;
private Session session = null;
private Queue queue = null;
private MessageConsumer consumer = null;
private Message msg = null;
public JMSConsumer() {
try {
// Create a connection factory
jff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
jcf = jff.createConnectionFactory();
// Set the properties
jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
jcf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, MQ_QUEUEMANAGER_NAME);
// Create JMS objects
connection = jcf.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(MQ_QUEUE_NAME);
consumer = session.createConsumer(queue);
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void consume() {
try {
msg = consumer.receive();
String msgId = msg.getJMSCorrelationID();
String msgStr = msg.getStringProperty("STRMSG");
System.out.println("-------------");
System.out.println("Message received!");
System.out.println("MessageId: " + msgId);
System.out.println("Message: " + msgStr);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void close() throws JMSException {
consumer.close();
session.close();
connection.close();
}
public static void main(String[] args) {
try {
System.out.println("Receiving ...");
JMSConsumer receiver = new JMSConsumer();
receiver.consume();
receiver.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Browser
package jms.direct;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;
public class JMSMonitor {
private String MQ_QUEUEMANAGER_NAME = "MW_QM";
private String MQ_QUEUE_NAME = "MW_QUEUE";
private JmsFactoryFactory jff = null;
private JmsConnectionFactory jcf = null;
private Connection connection = null;
private Session session = null;
private Queue queue = null;
private QueueBrowser monitor = null;
private Message msg = null;
private List<string> msgIdList = null;
public void initMsgIdList() {
msgIdList = new ArrayList<string>();
msgIdList.add("000000001");
msgIdList.add("000000002");
msgIdList.add("000000003"); } public JMSMonitor() {
initMsgIdList();
try {
// Create a connection factory
jff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
jcf = jff.createConnectionFactory();
// Set the properties
jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
jcf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, MQ_QUEUEMANAGER_NAME);
// Create JMS objects
connection = jcf.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(MQ_QUEUE_NAME);
monitor = session.createBrowser(queue);
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void startMonitor() {
while (true) {
if (msgIdList.size() > 0) {
try {
Enumeration<message> msgs = monitor.getEnumeration();
List<string> remainMsgIdList = new ArrayList<string>();
while (msgs.hasMoreElements()) {
Message msg = msgs.nextElement();
String msgId = msg.getJMSCorrelationID();
if (msgIdList.contains(msgId)) {
remainMsgIdList.add(msgId);
System.out.println("Message ID [" + msgId + "] found, this message still not be consumed!");
} else if (!msgIdList.contains(msgId)) {
System.out.println("Message ID [" + msgId + "] is not be recorded by the sender yet, just ignnore it.");
}
}
msgIdList = remainMsgIdList;
// All message scan complete.
if (msgIdList.size() > 0) {
System.out.println("------------------------------");
System.out.println("Following Msg not be consumed!");
System.out.println("------------------------------");
for (String msgId : msgIdList) {
System.out.println(msgId);
}
} else {
System.out.println("No message record in the data base.");
// The sample end here, for the list will not be added
// again.
}
} catch (JMSException e) {
e.printStackTrace();
}
}
synchronized (this) {
try {
this.wait(5000);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}