该Sample为用JMS直接访问MQ的简单例子,其中牵涉3个最基本的JMS模型,Producer/Consumer/Browser,因为在该Sample中Browser作用更像是个Monitor,所以类名使用的是Monitor。

注意一点,该Sample连接方式使用的是Bind方式,该方式将会自动连接本地的MQ服务器,如果改成Client,除非连接远程客户机,并且有正确的用户设置方式,不然即使host写成本地IP,依然会抛 2035 (MQRC_NOT_AUTHORIZED) 错误。

Prducer

 

package jms.direct;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;

public class JMSProducer {
	
	private String MQ_HOST = "localhost";
	private int MQ_PORT = 1416;
	private String MQ_CHANNEL = "";
	private String MQ_QUEUEMANAGER_NAME = "MW_QM";
	private String MQ_QUEUE_NAME = "MW_QUEUE";

	private JmsFactoryFactory jff = null;
	private JmsConnectionFactory jcf = null;
	private Connection connection = null;
	private Session session = null;
	private Queue queue = null;
	private MessageProducer producer = null;
	private Message msg = null;

	public JMSProducer() {
		try {
			// Create a connection factory
			jff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
			jcf = jff.createConnectionFactory();
			// Set the properties
//			jcf.setStringProperty(WMQConstants.WMQ_HOST_NAME, MQ_HOST);
//			jcf.setIntProperty(WMQConstants.WMQ_PORT, MQ_PORT);
//			jcf.setStringProperty(WMQConstants.WMQ_CHANNEL, MQ_CHANNEL);
//			jcf.setStringProperty(WMQConstants.USERID, "");
//			jcf.setStringProperty(WMQConstants.PASSWORD, "");
//			jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
			jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
			jcf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, MQ_QUEUEMANAGER_NAME);

			// Create JMS objects
			connection = jcf.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			queue = session.createQueue(MQ_QUEUE_NAME);
			producer = session.createProducer(queue);
			
			connection.start();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	public void sendMsg(String msgStr, String msgId) {
		try {
			msg = session.createMessage();
			msg.setStringProperty("STRMSG", msgStr);
			msg.setJMSCorrelationID(msgId);
			producer.send(msg);
		} catch (JMSException e) {
			e.printStackTrace();
		}
		System.out.println("-------------");
		System.out.println("Message send complete!");
		System.out.println("MessageId: " + msgId);
		System.out.println("Message: " + msgStr);
	}
	
	public void close() throws JMSException {
		producer.close();
		session.close();
		connection.close();
    }

	public static void main(String[] args) {
		try {
			System.out.println("Starting ...");
			JMSProducer producer = new JMSProducer();
			producer.sendMsg("Hello1", "000000001");
			producer.sendMsg("Hello2", "000000002");
			producer.sendMsg("Hello3", "000000003");
			producer.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 

Consumer

 

package jms.direct;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;

public class JMSConsumer {
	private String MQ_QUEUEMANAGER_NAME = "MW_QM";
	private String MQ_QUEUE_NAME = "MW_QUEUE";

	private JmsFactoryFactory jff = null;
	private JmsConnectionFactory jcf = null;
	private Connection connection = null;
	private Session session = null;
	private Queue queue = null;
	private MessageConsumer consumer = null;
	private Message msg = null;

	public JMSConsumer() {
		try {
			// Create a connection factory
			jff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
			jcf = jff.createConnectionFactory();
			// Set the properties
			jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
			jcf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, MQ_QUEUEMANAGER_NAME);

			// Create JMS objects
			connection = jcf.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			queue = session.createQueue(MQ_QUEUE_NAME);
			consumer = session.createConsumer(queue);
			
			connection.start();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	public void consume() {
		try {
			msg = consumer.receive();
			
			String msgId = msg.getJMSCorrelationID();
			String msgStr = msg.getStringProperty("STRMSG");
			
			System.out.println("-------------");
			System.out.println("Message received!");
			System.out.println("MessageId: " + msgId);
			System.out.println("Message: " + msgStr);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	public void close() throws JMSException {
		consumer.close();
		session.close();
		connection.close();
    }
	
	public static void main(String[] args) {
		try {
			System.out.println("Receiving ...");
			JMSConsumer receiver = new JMSConsumer();
			receiver.consume();
			receiver.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

Browser

 

package jms.direct;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;

public class JMSMonitor {
private String MQ_QUEUEMANAGER_NAME = "MW_QM";
private String MQ_QUEUE_NAME = "MW_QUEUE";

private JmsFactoryFactory jff = null;
private JmsConnectionFactory jcf = null;
private Connection connection = null;
private Session session = null;
private Queue queue = null;
private QueueBrowser monitor = null;
private Message msg = null;
private List<string> msgIdList = null;

public void initMsgIdList() {
msgIdList = new ArrayList<string>();
msgIdList.add("000000001");
msgIdList.add("000000002");
msgIdList.add("000000003"); } public JMSMonitor() {
initMsgIdList();
try {
// Create a connection factory
jff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
jcf = jff.createConnectionFactory();
// Set the properties
jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_BINDINGS);
jcf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, MQ_QUEUEMANAGER_NAME);
// Create JMS objects
connection = jcf.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(MQ_QUEUE_NAME);
monitor = session.createBrowser(queue);
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}

public void startMonitor() {
while (true) {
if (msgIdList.size() > 0) {
try {
Enumeration<message> msgs = monitor.getEnumeration();
List<string> remainMsgIdList = new ArrayList<string>();
while (msgs.hasMoreElements()) {
Message msg = msgs.nextElement();
String msgId = msg.getJMSCorrelationID();
if (msgIdList.contains(msgId)) {
remainMsgIdList.add(msgId);
System.out.println("Message ID [" + msgId + "] found, this message still not be consumed!");
} else if (!msgIdList.contains(msgId)) {
System.out.println("Message ID [" + msgId + "] is not be recorded by the sender yet, just ignnore it.");
}
}
msgIdList = remainMsgIdList;
// All message scan complete.
if (msgIdList.size() > 0) {
System.out.println("------------------------------");
System.out.println("Following Msg not be consumed!");
System.out.println("------------------------------");
for (String msgId : msgIdList) {
System.out.println(msgId);
}
} else {
System.out.println("No message record in the data base.");
// The sample end here, for the list will not be added
// again.
}
} catch (JMSException e) {
e.printStackTrace();
}
}
synchronized (this) {
try {
this.wait(5000);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}