I. Introduction and installation of ActivemQ
1. Introduction of ActiveMQ
ActiveMQ is an open source Message system provided by Apache, which is completely implemented by Java. Therefore, it can well support JMS (Java Message Service) specification proposed by J2EE. JMS is a set of Java application interfaces that provide services for message creation, sending, reading, and so on. JMS provides a common set of application programming interfaces and response syntax, similar to JDBC, the unified access interface for Java databases. It is a vendor-neutral API that enables Java programs to communicate well with message components from different vendors.
Message queue middleware is an important component in distributed system. It mainly solves the problems of asynchronous message, application decoupling, traffic cutting and so on, so as to achieve high performance, high availability, scalability and final consistency architecture
The most popular message queues are ActiveMQ, RabbitMQ, Kafka, MetaMQ and so on
2. Download and install ActivemQ
- Activemq.apache.org/download.ht…
2. Directory structure
Start ActivemQ step: bin –> Win64 (or win32) –> Activemq. bat Click to run
3. After startup, http://localhost:8161/admin/index.jsp username: admin password admin
Activemq entry level code
Multiple consumers can be started to distinguish between queue and topic patterns
1. Queue (session)
- producers
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author Yang * Producers * / public class Producer {/ * * * userName * / private static final String userName = ActiveMQConnection. DEFAULT_USER; / * * * * / passWord private static final String passWord = ActiveMQConnection. DEFAULT_PASSWORD; /** * url */ private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL; public void send(String message) { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl); final Connection connection = connectionFactory.createConnection(); connection.start(); Session = connection.createsession (Boolea.false, session.auto_acknowledge); Final Queue Queue = session.createQueue("test"); final MessageProducer producer = session.createProducer(queue); TextMessage textMessage = session.createTextMessage(message); / / send a producer. The send (textMessage); // session.com MIT (); producer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { Producer producer = new Producer(); producer.send("hello world"); }}Copy the code
2. Consumers
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author Yang * Consumers * / public class Consumer {/ * * * userName * / private static final String userName = ActiveMQConnection. DEFAULT_USER; / * * * * / passWord private static final String passWord = ActiveMQConnection. DEFAULT_PASSWORD; /** * url */ private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL; public void receive() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl); final Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Final Queue Queue = session.createQueue("test"); final MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(n -> { try { TextMessage msg = (TextMessage) n; final String text = msg.getText(); If (text.equalsignorecase ("hello world")) {system.out.println (" accept message: "+ msg.gettext ()); } else {// Default is 6 system.out.println (" test retransmission times "); int i = 1 / 0; } } catch (JMSException e) { // e.printStackTrace(); }}); } catch (JMSException e) { // e.printStackTrace(); } } public static void main(String[] args) { Consumer consumer = new Consumer(); consumer.receive(); }Copy the code
2. Topic will first start consumers to subscribe
1. The producer producers
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author Yang * Producer */ public class TopicProducer {/** * userName */ private static final String userName = ActiveMQConnection.DEFAULT_USER; / * * * * / passWord private static final String passWord = ActiveMQConnection. DEFAULT_PASSWORD; /** * url */ private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL; public void send(String message) { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl); final Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Final Topic = session.createTopic("topic-test"); final MessageProducer producer = session.createProducer(topic); TextMessage textMessage = session.createTextMessage(message); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // producer.setTimeToLive(10); / / send a producer. The send (textMessage); // producer.send(textMessage, DeliveryMode.PERSISTENT, 1, 60 * 60 * 24); // session.commit(); producer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { TopicProducer producer = new TopicProducer(); producer.send("hello world"); }Copy the code
2. The consumer consumers
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author Yang * Consumer */ public class TopicConsumer {/** * userName */ private static final String userName = ActiveMQConnection.DEFAULT_USER; / * * * * / passWord private static final String passWord = ActiveMQConnection. DEFAULT_PASSWORD; /** * url */ private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL; public void receive() { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl); final Connection connection = connectionFactory.createConnection(); // Set the client ID // connection.setclientid ("client-1"); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Final Topic = session.createTopic("topic-test"); final MessageConsumer messageConsumer = session.createConsumer(topic); / / normal subscription / / MessageConsumer consumer = session. CreateDurableSubscriber (topic, "bb"); / / durable subscriptions messageConsumer. SetMessageListener (n - > {try {TextMessage MSG = (TextMessage) n; final String text = msg.getText(); If (text.equalsignorecase ("hello world")) {system.out.println (" accept message: "+ msg.gettext ()); } else {system.out.println (" test reprint times "); int i = 1 / 0; } } catch (JMSException e) { // e.printStackTrace(); }}); } catch (JMSException e) { // e.printStackTrace(); } } public static void main(String[] args) { TopicConsumer consumer = new TopicConsumer(); consumer.receive(); }}Copy the code