As a continuation of the multithreading in the previous section, we’ll first talk about Java’s native Blocking Queue, and then we’ll talk about JMS and one of its implementations, the ActiveMQ message Queue. So merge into message services.
1. Blocking Queue
BlockingQueue is also an interface under java.util.concurrent. It solves the problem of how to efficiently transfer data in multiple threads. With these efficient and thread safe classes, we can build high quality multi-threaded applications. A tool used primarily to control thread synchronization. BlockingQueue is an interface with the following methods:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit);
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
Copy the code
- Insert:
- Add (anObject): Add anObject to BlockingQueue, that is, return true if BlockingQueue fits, otherwise throw an exception, bad
- Offer (anObject): Add anObject to BlockingQueue, if possible. Return true if BlockingQueue is fit, false otherwise.
- Put (anObject): Add anObject to BlockingQueue. If there is no room in the BlockQueue, then the thread that called this method is blocked until there is room in BlockingQueue
- Read:
- Poll (time): retrieve the first object in BlockingQueue. If you cannot retrieve it immediately, you can wait for the time specified by the time parameter, and return null if you cannot retrieve it immediately. Return null if you cannot get it
- Take (): take the first object in the BlockingQueue. If BlockingQueue is empty, block the queue until a new object is added to the Blocking queue. Blocked. Wait until you can’t get it
- other
- int remainingCapacity(); Returns the remaining capacity of the queue, used during queue insertion and retrieval. Data may be inaccurate.
- boolean remove(Object o); Removes elements from the queue, if any, removes one or more, and returns true if the queue has changed
- public boolean contains(Object o); Check to see if the element exists in the queue. If it does, return true
- int drainTo(Collection
c); Removes all available elements from this queue and adds them to the given collection. (I.e. take out and put into the set) - int drainTo(Collection
c, int maxElements); The difference from the above method is that the number of moves is specified; The main method is: put, take pair block access; Add and poll pair of non-blocking access. BlockingQueue is an interface that has four specific implementation classes, the most common of which are: - ArrayBlockingQueue: a bounded BlockingQueue supported by an array. The constructor of a BlockingQueue of specified size must take an int to indicate its size. The objects contained are sorted in FIFO(first in, first out) order.
- LinkedBlockingQueue: If not, the default maximum is integer.max_value. The put and take methods are used. The PUT method blocks when the queue is full until a member has been consumed. The take method blocks when the queue is empty until a queue member is put in. LinkedBlockingQueue and ArrayBlockingQueue The data structures behind LinkedBlockingQueue and ArrayBlockingQueue are different, resulting in higher data throughput for LinkedBlockingQueue than for ArrayBlockingQueue, but higher performance for a large number of threads ArrayBlockingQueue is less predictable. Here is an example of a producer and consumer implemented using BlockingQueue: Producer Product:
public class Product implements Runnable{ BlockingQueue<String> queue; Public Product(BlockingQueue<String> queue) {this.queue = queue; } @Override public voidrun(){
try {
System.out.println(Thread.currentThread().getName()+"Start production.");
String temp = Thread.currentThread().getName()+": production thread"; queue.put(temp); } catch (InterruptedException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace(); }}}Copy the code
Consumer:
public class Consumer implements Runnable{ BlockingQueue<String> queue; Public Consumer(BlockingQueue<String> queue) {this.queue = queue; this.queue = queue; } @Override public voidrun() {
Random random = new Random();
try {
while(true){
Thread.sleep(random.nextInt(10));
System.out.println(Thread.currentThread().getName()+ "Ready to consume..."); String temp = queue.take(); System.out.println(thread.currentThread ().getName() +"Get the job ===="+temp); } } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code
Test class TestQueue:
Public class TestQueue {public static void main(String[] args) {// Create a blocking queue, <String> Queue = new LinkedBlockingDeque<String>(5); //BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5); Consumer consumer = new Consumer(queue); Product product = new Product(queue);for(int i = 0; i<3; i++){ new Thread(product,"product"+i).start(); } / /for(int i = 0; i<5; i++){ new Thread(consumer,"consumer").start(); / /}}}Copy the code
This code initializes a message queue with a String of type 5, and uses the producer thread to simulate three user requests. It temporarily puts the user’s request in BlockingQueue, and then the consumer thread continues to retrieve the task from the queue. Until there’s nothing left to consume in the queue. From this we can see that message queues have two major characteristics: decoupling and peak-filling. There is no gross relationship between the producer and the consumer, the producer puts data into the queue, the consumer takes data from the queue, they are both related to the queue, decoupled; If the concurrency is high, the producer simply puts the data in the queue, and the consumer can eat it slowly, without actually dragging the server down immediately. Refer to the address: http://blog.csdn.net/ghsau/article/details/8108292
Java Message Service
2.1 introduction of JMS
JMS, or Java Message Service, is used to send messages between two applications or distributed systems for asynchronous communication. JMS is a vendor-independent (or platform – independent) API. Similar to Java Database Connectivity (JDBC) : here, JDBC is an API that can be used to access many different relational databases, while JMS provides the same vendor-independent access method to access messaging services. Many vendors support JMS, including IBM’s MQSeries, BEA’s Weblogic JMS Service, and Progress’s SonicMQ, to name a few. JMS allows you to send messages from one JMS client to another through a messaging service. A message is a type of object in JMS that consists of two parts: a header and a message body. The header consists of routing information and metadata about the message; The message body carries the data or payload of the application. Depending on the type of payload, messages can be divided into several types that carry: Simple text (TextMessage), serializable object (ObjectMessage), property collection (MapMessage), byte stream (BytesMessage), raw value stream (StreamMessage), There are also messages with no payload.
2.2 the composition of JMS
JMS consists of the following elements: JMS Provider Provider: An implementation of the JMS specification for message-oriented middleware. The provider can be a JMS implementation for the Java platform or an adapter for message-oriented middleware for a non-Java platform. JMS customer: A Java application or object that produces or consumes a message (that is, both the producer and the consumer are collectively JMS customers). JMS producer: A JMS customer that creates and sends messages. JMS consumer: THE JMS customer that receives the message. JMS queue: an area containing messages that have been sent and are waiting to be read. If a message is read, it is removed from the queue. JMS topics: a mechanism for sending messages to multiple subscribers.
2.3Java Message Service Model
- Point-to-point model Under the point-to-point queue model, a producer publishes messages to a specific queue and a consumer reads messages from the queue. Here, the producer knows the consumer’s queue and sends a message directly to the consumer’s queue. This pattern has the following characteristics: Only one consumer will get the message; The producer does not need the consumer to be running during the consumption of the message, nor does the consumer need to be running when the producer sends the message; Each successfully processed message is signed for by the consumer.
- Publisher/subscriber model The publisher/subscriber model supports publishing messages to a specific message topic. In this model, publishers and subscribers do not know each other, similar to anonymous bulletin boards. This pattern has the following characteristics: multiple consumers can get the message; There is a time dependency between the publisher and the subscriber. The publisher needs to establish a subscription so that consumers can subscribe to it. The subscriber must remain continuously active to receive messages unless the subscriber has established a persistent subscription.
2.4 Message Queue (ActiveMQ)
ActiveMQ is an implementation of the JMS specification. How to use it
- Download ActiveMQ to the official website: http://activemq.apache.org/
- Run ActiveMQ to decompress apache-Activemq-5.5.1-bin. zip (similar to Tomcat, it can be used after decompression). I searched on the Internet and some people modified the address and protocol connected in the configuration file activemq.xml. If you do not succeed, you can modify the following:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector uri="http://localhost:8081"/>
<transportConnector uri="udp://localhost:61618"/>
</transportConnectors>
Copy the code
The test code is as follows:
public class Product {
private String username = ActiveMQConnectionFactory.DEFAULT_USER;
private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
private Connection connection = null;
private Session session = null;
private String subject = "myQueue"; private Destination destination = null; private MessageProducer producer = null; Private void init() throws JMSException {private void init() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void productMessage(String message) throws JMSException {
this.init();
TextMessage textMessage = session.createTextMessage(message);
connection.start();
System.out.println("Producer ready to send message:"+textMessage);
producer.send(textMessage);
System.out.println("The producer has finished sending messages...");
}
public void close() throws JMSException {
System.out.println("The producer begins to close the connection");
if(null! =producer){ producer.close(); }if(null! =session){ session.close(); }if(null! =connection){ connection.close(); }}}Copy the code
Consumer:
public class Consumer implements MessageListener,ExceptionListener{
private String name = ActiveMQConnectionFactory.DEFAULT_USER;
private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
private ActiveMQConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private String subject = "myQueue";
private Destination destination = null;
private MessageConsumer consumer = null;
public static Boolean isconnection=false; / / private void init() throws JMSException { connectionFactory = new ActiveMQConnectionFactory(name,password,url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } public void consumerMessage() throws JMSException { this.init(); connection.start(); / / set the message to monitor and abnormal monitoring consumer. SetMessageListener (this); connection.setExceptionListener(this); System.out.println("Consumers start listening....");
isconnection = true;
//Message receive = consumer.receive();
}
public void close() throws JMSException {
if(null! =consumer){ consumer.close(); }if(null! =session){ session.close(); }if(null! =connection){ connection.close(); }} /** * Override public void onException(JMSException exception) {// Override public void onException(JMSException exception)false; } /** * Message handler */ @override public void onMessage(Message Message) {try {if(message instanceof TextMessage){
TextMessage textMsg = (TextMessage) message;
String text = textMsg.getText();
System.out.println("Messages received by consumers ======="+text);
}else {
System.out.println("Received message does not match"); } } catch (JMSException e) { e.printStackTrace(); }}}Copy the code
Note: The consumer needs to implement MessageListener and ExceptionListener to listen for messages received and for processing when an error occurs. Producer test class TestProduct:
public class TestProduct {
public static void main(String[] args) throws JMSException {
for(int i=0; i<100; i++){ Product product = new Product(); product.productMessage("Hello World!"+i); product.close(); }}}Copy the code
TestProduct is used to simulate the generation of 100 messages written to the ActiveMQ queue. TestConsumer:
public class TestConsumer implements Runnable {
static Thread thread = null;
public static void main(String[] args) throws InterruptedException {
thread = new Thread(new TestConsumer());
thread.start();
while (trueBoolean alive = thread.isAlive(); Boolean alive = thread.isalive (); System.out.println("Current thread state:"+alive);
if(! alive){ thread = new Thread(new TestConsumer()); thread.start(); System.out.println("Thread restart completed");
}
Thread.sleep(1000);
}
}
@Override
public void run() {
try {
Consumer consumer = new Consumer();
consumer.consumerMessage();
while(Consumer.isconnection) { //System.out.println(123); } } catch (JMSException e) { e.printStackTrace(); }}}Copy the code
TestConsumer uses multiple threads to ensure that at all times there is a thread alive waiting to receive the ActiveMQ message queue and call consumer processing. Summary: My understanding is that queue is used for inter-thread communication, such as BlockingQueue, and JMS is used for inter-process communication, such as ActiveMQ. Attached is a message queue article written by Shen Jian, 58 Architect, for reference: http://dwz.cn/78yLxL It is important to emphasize that any technology reference should serve to solve business problems, and not just show off skills. Take, for example, message service, such as user register a website, register after I will call sent him email and text message service notice, I may have to pass him fill in the information, give him recommend users may know, the core business is registered here, send notification and recommend other user can put in the message queue processing, first response to the registration information, Other services are then invoked to handle both the notifications and the referrals. However, the site may have a relatively small number of users at the early stage, so my needs can be met without message queue. Referring to message queue will increase the complexity of the project, so the use of new technology must be to solve business problems, rather than a simple demonstration. Reference: http://blog.csdn.net/fanzhigang0/article/details/43764121 http://blog.csdn.net/u010702229/article/details/18085263
Attached a personal wechat public account, welcome to communicate with me.