First, realize the function

We hope to use a set of API to realize the function of sending and receiving messages in two modes, which is convenient for business programs to call

1. Send Topic

2. Send Queue

3, Receive Topic

4. Receive Queue

2. Interface design

Design common invocation interfaces based on functionality

/** * Data distribution interface (for sending and receiving message queue data) **@author eguid
 *
 */
public interface MsgDistributeInterface {
 
	/** * to the subject **@paramTopicName - Topic *@paramData - Data *@return* /
	public boolean sendTopic(String topicName, byte[] data);
	
	/** * to the subject *@paramTopicName - Topic *@param* data - data@paramOffset - Offset *@paramLength - Length *@return* /
	boolean sendTopic(String topicName, byte[] data, int offset, int length);
 
	/** * send to queue **@paramQueueName - queueName *@paramData - Data *@return* /
	public boolean sendQueue(String queueName, byte[] data);
 
	/** * send to queue *@paramQueueName - queueName *@paramData - Data *@param offset
	 * @param length
	 * @return* /
	public boolean sendQueue(String queueName, byte[] data,int offset, int length);
 
	/** * Receive queue messages *@paramQueueName queueName *@param listener
	 * @throws JMSException
	 */
	void receiveQueue(String queueName, MessageListener listener) throws JMSException;
 
	/** * subscribe to the topic *@paramTopicName - topicName *@param listener
	 * @throws JMSException
	 */
	void receiveTopic(String topicName, MessageListener listener) throws JMSException;
}
Copy the code

3. Interface implementation based on ActiveMQ

/** * Message producer/consumer implementation based on activeMQ (initializing this object initializes the connection message queue and immediately throws an exception if it cannot connect to the message queue) **@author eguid
 *
 */
public class ActiveMQImpl implements MsgDistributeInterface {
 
	private String userName;
	private String password;
	private String brokerURL;
	private boolean persistentMode;// Persistent mode
	// Connect factory
	ConnectionFactory connectionFactory;
	// The thread that sends the message
	Connection connection;
	// Transaction management
	Session session;
 
	// Store each thread subscription mode producer
	ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
	// Store each thread queue mode producer
	ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();
 
	public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
		this(userName, password, brokerURL, true);
	}
	
	public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
		this.userName = userName;
		this.password = password;
		this.brokerURL = brokerURL;
		this.persistentMode=persistentMode;
		init();
	}
 
	public void init(a) throws JMSException {
		try {
			// Create a link factory
			connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
			// Create a link from the factory
			connection = connectionFactory.createConnection();
			// Open the link
			connection.start();
			// Create a transaction (subscription mode, transaction automatic confirmation mode)
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e) {
			throwe; }}@Override
	public boolean sendTopic(String topicName, byte[] data) {
		return sendTopic(topicName, data, 0, data.length);
	}
 
	@Override
	public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
		return send(true, topicName, data, offset, length);
	}
 
	@Override
	public boolean sendQueue(String queueName, byte[] data) {
		return sendQueue(queueName, data, 0, data.length);
	}
 
	@Override
	public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
		return send(false, queueName, data, offset, length);
	}
 
	/** * Send data **@param name
	 * @param data
	 * @param offset
	 * @param length
	 * @paramType * - Type *@return* /
	private boolean send(boolean type, String name, byte[] data, int offset, int length) {
		try {
			MessageProducer messageProducer = getMessageProducer(name, type);
			
			BytesMessage msg = createBytesMsg(data, offset, length);
			  System.err.println(Thread.currentThread().getName()+"Send message");
			// Send a message
			messageProducer.send(msg);
		} catch (JMSException e) {
			return false;
		}
		return false;
	}
	
	public void receive(String topicName) throws JMSException {
		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
		Topic topic =session.createTopic(topicName);
		MessageConsumer consumer=session.createConsumer(topic);
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				BytesMessage msg=(BytesMessage) message;
				System.err.println(Thread.currentThread().getName()+"Received a message:"+msg.toString()); }}); }/** * Creates a byte array message **@param data
	 * @param offset
	 * @param length
	 * @return
	 * @throws JMSException
	 */
	private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
		BytesMessage msg = session.createBytesMessage();
		msg.writeBytes(data, offset, length);
		return msg;
	}
	
	/** * Create object serialization message *@param obj
	 * @return
	 * @throws JMSException
	 */
	private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
// MapMessage msg = session.createMapMessage(); // Key-value message
		ObjectMessage msg = session.createObjectMessage(obj);
		return msg;
	}
	
	/** * Create string message *@param text
	 * @return
	 * @throws JMSException
	 */
	private TextMessage createTextMsg(String text) throws JMSException {
		TextMessage msg = session.createTextMessage(text);
		return msg;
	}
 
	
	/** * get creator **@paramName - Name (topic name and queue name) *@paramType - Type (true:topic,false:queue) *@return
	 * @throws JMSException
	 */
	private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
		returntype? getTopicProducer(name):getQueueProducer(name); }/** * Create or get queue *@param queueName
	 * @return
	 * @throws JMSException
	 */
	private MessageProducer getQueueProducer(String queueName) throws JMSException {
		MessageProducer messageProducer = null;
		if ((messageProducer = queueThreadLocal.get()) == null) {
			Queue queue = session.createQueue(queueName);
			messageProducer = session.createProducer(queue);
			// Whether to persist (1- not persistent (if there is no consumer, the message will automatically invalidate), 2- persistent (if there is no consumer to consume, the message queue will also cache the message waiting for the consumer to consume)messageProducer.setDeliveryMode(persistentMode? DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT); queueThreadLocal.set(messageProducer); }return messageProducer;
	}
	
	/** * Create or get the theme *@param topicName
	 * @return
	 * @throws JMSException
	 */
	private MessageProducer getTopicProducer(String topicName) throws JMSException {
		MessageProducer messageProducer = null;
		if ((messageProducer = topicThreadLocal.get()) == null) {
			Topic topic = session.createTopic(topicName);
			messageProducer = session.createProducer(topic);
			// Whether to persist (1- not persistent (if there is no consumer, the message will automatically invalidate), 2- persistent (if there is no consumer to consume, the message queue will also cache the message waiting for the consumer to consume)messageProducer.setDeliveryMode(persistentMode? DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT); topicThreadLocal.set(messageProducer); }return  messageProducer;
	}
	
	public String getPassword(a) {
		return password;
	}
 
	public void setPassword(String password) {
		this.password = password;
	}
 
	@Override
	public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
		Queue topic =session.createQueue(queueName);
		MessageConsumer consumer=session.createConsumer(topic);
		consumer.setMessageListener(listener);
		
	}
 
	@Override
	public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
		Topic topic =session.createTopic(topicName);
		MessageConsumer consumer=session.createConsumer(topic);
		consumer.setMessageListener(listener);
	}
Copy the code

Test Topic and Queue

public static void main(String[] args) throws JMSException{
		// An exception will be thrown immediately if the creation fails
		MsgDistributeInterface  producter = new ActiveMQImpl("system"."manager"."TCP: / / 127.0.0.1:61616");
        Test testMq = new Test();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start(a);
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start(a);
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start(a);
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start(a);
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start(a);
        //Thread 6
        new Thread(testMq.new ProductorMq(producter)).start(a);
        
        // Subscribe to the receiving Thread Thread 1
        new Thread(new Runnable() {
			@Override
			public void run(a) {
				try {
					producter.receiveTopic("eguid-topic".new MessageListener() {
						@Override
						public void onMessage(Message message) {
							BytesMessage msg=(BytesMessage) message;
							System.err.println(Thread.currentThread().getName()+"Subscribe to topic messages:"+msg.toString()); }}); }catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}).start();
        // Subscribe to the receiving Thread Thread 2
        new Thread(new Runnable() {
			@Override
			public void run(a) {
				try {
					producter.receiveTopic("eguid-topic".new MessageListener() {
						@Override
						public void onMessage(Message message) {
							BytesMessage msg=(BytesMessage) message;
							System.err.println(Thread.currentThread().getName()+"Subscribe to topic messages:"+msg.toString()); }}); }catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}).start();
        // Queue message production Thread thread-1
        new Thread(testMq.new  QueueProductor(producter)).start(a);
        // Queue message production Thread thread-2
        new Thread(testMq.new  QueueProductor(producter)).start(a);
        // Queue receives Thread 1
        new Thread(new Runnable() {
			@Override
			public void run(a) {
				try {
					producter.receiveQueue("eguid-queue".new MessageListener() {
						@Override
						public void onMessage(Message message) {
							BytesMessage msg=(BytesMessage) message;
							System.err.println(Thread.currentThread().getName()+"Received queue message:"+msg.toString()); }}); }catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}).start();
      // The queue receives thread Thread2
        new Thread(new Runnable() {
			@Override
			public void run(a) {
				try {
					producter.receiveQueue("eguid-queue".new MessageListener() {
						@Override
						public void onMessage(Message message) {
							BytesMessage msg=(BytesMessage) message;
							System.err.println(Thread.currentThread().getName()+"Received queue message:"+msg.toString()); }}); }catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}).start();
    }
 
    private class ProductorMq implements Runnable{
    	Jtt809MsgProducter producter;
        public ProductorMq(Jtt809MsgProducter producter){
            this.producter = producter;
        }
 
        @Override
        public void run(a) {
            while(true) {try {
                	String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
                    producter.sendTopic("eguid-topic",wang.getBytes());
                    
                    Thread.sleep(2000);
                } catch(InterruptedException e) { e.printStackTrace(); }}}}private class QueueProductor implements Runnable{
    	Jtt809MsgProducter producter;
        public QueueProductor(Jtt809MsgProducter producter){
            this.producter = producter;
        }
 
        @Override
        public void run(a) {
            while(true) {try {
                	String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
                    producter.sendQueue("eguid-queue",eguid.getBytes());
                    Thread.sleep(2000);
                } catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code

—end—