First, realize the function
We hope to use a set of API to realize the function of sending and receiving messages in two modes, which is convenient for business programs to call
1. Send Topic
2. Send Queue
3, Receive Topic
4. Receive Queue
2. Interface design
Design common invocation interfaces based on functionality
/** * Data distribution interface (for sending and receiving message queue data) **@author eguid
*
*/
public interface MsgDistributeInterface {
/** * to the subject **@paramTopicName - Topic *@paramData - Data *@return* /
public boolean sendTopic(String topicName, byte[] data);
/** * to the subject *@paramTopicName - Topic *@param* data - data@paramOffset - Offset *@paramLength - Length *@return* /
boolean sendTopic(String topicName, byte[] data, int offset, int length);
/** * send to queue **@paramQueueName - queueName *@paramData - Data *@return* /
public boolean sendQueue(String queueName, byte[] data);
/** * send to queue *@paramQueueName - queueName *@paramData - Data *@param offset
* @param length
* @return* /
public boolean sendQueue(String queueName, byte[] data,int offset, int length);
/** * Receive queue messages *@paramQueueName queueName *@param listener
* @throws JMSException
*/
void receiveQueue(String queueName, MessageListener listener) throws JMSException;
/** * subscribe to the topic *@paramTopicName - topicName *@param listener
* @throws JMSException
*/
void receiveTopic(String topicName, MessageListener listener) throws JMSException;
}
Copy the code
3. Interface implementation based on ActiveMQ
/** * Message producer/consumer implementation based on activeMQ (initializing this object initializes the connection message queue and immediately throws an exception if it cannot connect to the message queue) **@author eguid
*
*/
public class ActiveMQImpl implements MsgDistributeInterface {
private String userName;
private String password;
private String brokerURL;
private boolean persistentMode;// Persistent mode
// Connect factory
ConnectionFactory connectionFactory;
// The thread that sends the message
Connection connection;
// Transaction management
Session session;
// Store each thread subscription mode producer
ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
// Store each thread queue mode producer
ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();
public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
this(userName, password, brokerURL, true);
}
public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
this.userName = userName;
this.password = password;
this.brokerURL = brokerURL;
this.persistentMode=persistentMode;
init();
}
public void init(a) throws JMSException {
try {
// Create a link factory
connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
// Create a link from the factory
connection = connectionFactory.createConnection();
// Open the link
connection.start();
// Create a transaction (subscription mode, transaction automatic confirmation mode)
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
throwe; }}@Override
public boolean sendTopic(String topicName, byte[] data) {
return sendTopic(topicName, data, 0, data.length);
}
@Override
public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
return send(true, topicName, data, offset, length);
}
@Override
public boolean sendQueue(String queueName, byte[] data) {
return sendQueue(queueName, data, 0, data.length);
}
@Override
public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
return send(false, queueName, data, offset, length);
}
/** * Send data **@param name
* @param data
* @param offset
* @param length
* @paramType * - Type *@return* /
private boolean send(boolean type, String name, byte[] data, int offset, int length) {
try {
MessageProducer messageProducer = getMessageProducer(name, type);
BytesMessage msg = createBytesMsg(data, offset, length);
System.err.println(Thread.currentThread().getName()+"Send message");
// Send a message
messageProducer.send(msg);
} catch (JMSException e) {
return false;
}
return false;
}
public void receive(String topicName) throws JMSException {
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic =session.createTopic(topicName);
MessageConsumer consumer=session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
BytesMessage msg=(BytesMessage) message;
System.err.println(Thread.currentThread().getName()+"Received a message:"+msg.toString()); }}); }/** * Creates a byte array message **@param data
* @param offset
* @param length
* @return
* @throws JMSException
*/
private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(data, offset, length);
return msg;
}
/** * Create object serialization message *@param obj
* @return
* @throws JMSException
*/
private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
// MapMessage msg = session.createMapMessage(); // Key-value message
ObjectMessage msg = session.createObjectMessage(obj);
return msg;
}
/** * Create string message *@param text
* @return
* @throws JMSException
*/
private TextMessage createTextMsg(String text) throws JMSException {
TextMessage msg = session.createTextMessage(text);
return msg;
}
/** * get creator **@paramName - Name (topic name and queue name) *@paramType - Type (true:topic,false:queue) *@return
* @throws JMSException
*/
private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
returntype? getTopicProducer(name):getQueueProducer(name); }/** * Create or get queue *@param queueName
* @return
* @throws JMSException
*/
private MessageProducer getQueueProducer(String queueName) throws JMSException {
MessageProducer messageProducer = null;
if ((messageProducer = queueThreadLocal.get()) == null) {
Queue queue = session.createQueue(queueName);
messageProducer = session.createProducer(queue);
// Whether to persist (1- not persistent (if there is no consumer, the message will automatically invalidate), 2- persistent (if there is no consumer to consume, the message queue will also cache the message waiting for the consumer to consume)messageProducer.setDeliveryMode(persistentMode? DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT); queueThreadLocal.set(messageProducer); }return messageProducer;
}
/** * Create or get the theme *@param topicName
* @return
* @throws JMSException
*/
private MessageProducer getTopicProducer(String topicName) throws JMSException {
MessageProducer messageProducer = null;
if ((messageProducer = topicThreadLocal.get()) == null) {
Topic topic = session.createTopic(topicName);
messageProducer = session.createProducer(topic);
// Whether to persist (1- not persistent (if there is no consumer, the message will automatically invalidate), 2- persistent (if there is no consumer to consume, the message queue will also cache the message waiting for the consumer to consume)messageProducer.setDeliveryMode(persistentMode? DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT); topicThreadLocal.set(messageProducer); }return messageProducer;
}
public String getPassword(a) {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue topic =session.createQueue(queueName);
MessageConsumer consumer=session.createConsumer(topic);
consumer.setMessageListener(listener);
}
@Override
public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic =session.createTopic(topicName);
MessageConsumer consumer=session.createConsumer(topic);
consumer.setMessageListener(listener);
}
Copy the code
Test Topic and Queue
public static void main(String[] args) throws JMSException{
// An exception will be thrown immediately if the creation fails
MsgDistributeInterface producter = new ActiveMQImpl("system"."manager"."TCP: / / 127.0.0.1:61616");
Test testMq = new Test();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start(a);
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start(a);
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start(a);
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start(a);
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start(a);
//Thread 6
new Thread(testMq.new ProductorMq(producter)).start(a);
// Subscribe to the receiving Thread Thread 1
new Thread(new Runnable() {
@Override
public void run(a) {
try {
producter.receiveTopic("eguid-topic".new MessageListener() {
@Override
public void onMessage(Message message) {
BytesMessage msg=(BytesMessage) message;
System.err.println(Thread.currentThread().getName()+"Subscribe to topic messages:"+msg.toString()); }}); }catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
// Subscribe to the receiving Thread Thread 2
new Thread(new Runnable() {
@Override
public void run(a) {
try {
producter.receiveTopic("eguid-topic".new MessageListener() {
@Override
public void onMessage(Message message) {
BytesMessage msg=(BytesMessage) message;
System.err.println(Thread.currentThread().getName()+"Subscribe to topic messages:"+msg.toString()); }}); }catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
// Queue message production Thread thread-1
new Thread(testMq.new QueueProductor(producter)).start(a);
// Queue message production Thread thread-2
new Thread(testMq.new QueueProductor(producter)).start(a);
// Queue receives Thread 1
new Thread(new Runnable() {
@Override
public void run(a) {
try {
producter.receiveQueue("eguid-queue".new MessageListener() {
@Override
public void onMessage(Message message) {
BytesMessage msg=(BytesMessage) message;
System.err.println(Thread.currentThread().getName()+"Received queue message:"+msg.toString()); }}); }catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
// The queue receives thread Thread2
new Thread(new Runnable() {
@Override
public void run(a) {
try {
producter.receiveQueue("eguid-queue".new MessageListener() {
@Override
public void onMessage(Message message) {
BytesMessage msg=(BytesMessage) message;
System.err.println(Thread.currentThread().getName()+"Received queue message:"+msg.toString()); }}); }catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}
private class ProductorMq implements Runnable{
Jtt809MsgProducter producter;
public ProductorMq(Jtt809MsgProducter producter){
this.producter = producter;
}
@Override
public void run(a) {
while(true) {try {
String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
producter.sendTopic("eguid-topic",wang.getBytes());
Thread.sleep(2000);
} catch(InterruptedException e) { e.printStackTrace(); }}}}private class QueueProductor implements Runnable{
Jtt809MsgProducter producter;
public QueueProductor(Jtt809MsgProducter producter){
this.producter = producter;
}
@Override
public void run(a) {
while(true) {try {
String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
producter.sendQueue("eguid-queue",eguid.getBytes());
Thread.sleep(2000);
} catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code
—end—