【 article content output source: pull pull education Java salary training camp 】 — all brain maps are made by myself, do not abuse without permission

Mountaineering is full of feelings in the mountains, the sea is overflowing in the sea

Overview of messaging middleware

Part 1 – Distributed Architecture communication

“Principles of Distributed Architecture Communication”

Experience distributed communication by upgrading the classic two sets of distributed architecture:

  • SOA architecture


    • According to the actual service, the system is divided into appropriate independent modules, which are independent of each other
      • Advantages: Distributed, loosely coupled, flexible extension, reusable
    • In SOA architecture system, Dubbo and Zookeeper are used for remote communication between services
      • Advantages: Dubbo uses user-defined TCP to reduce the size of request packets, or uses HTTP2 to reduce the size of request packets and improve transmission efficiency
  • Microservices Architecture


    • Feign is used in SpringCloud1Resolve problems with remote communication between services
      • Mostly used for synchronous remote calls
    • RPC is mainly based on TCP/UDP. HTTP is an application-layer protocol and is built on the transport layer protocol TCP. RPC is more efficient (RPC long connection: no three-way handshake is required for each communication like HTTP, reducing network overhead).

Originally everything is very smooth, until encountered the following problem……

“Problems with Distributed Synchronous Communication”

In e-commerce projects, when a new product is added to the mall, the information is inserted into the database;

And then we’re going toUpdate search engine inversion index + Static processing of product pages

  • Method 1:

    After the insertion is successful, judge whether it is successful or not, and then call methods related to subsequent operations. The code is as follows:
    Long commodityId = addCommodity(newCommodity); // Insert a new item
    if(commodityId ! =null) {
    	refreshInvertedIndex(newCommodity); // Update the search engine's inverted index
        refreshStaticPage(newCommodity); // Static processing of product pages
    }
    Copy the code
    • Question:
      1. What if update inverted index fails?
      2. What if updating a static page fails?
    • Solution:
      1. If updating the inverted index fails, try again!
      2. If updating the static page fails, try again!
        Long commodityId = addCommodity(newCommodity); // Insert a new item
        if(commodityId ! =null) {
            boolean indexFlag = refreshInvertedIndex(newCommodity); // Recursively update the inverted index of the search engine
            boolean pageFlag = refreshStaticPage(newCommodity); // Static processing of recursive product pages
        }
        
        private boolean refreshInvertedIndex(Commodity newCommodity) {
        	boolean flag = indexService.refreshIndex(newCommodity);
            if (!flag) {
            	refreshInvertedIndex(newCommodity);
            }
        }
        
        private boolean refreshStaticPage(Commodity newCommodity) {
        	boolean flag = pageService.refreshPage(newCommodity);
            if (!flag) {
            	refreshStaticPage(newCommodity);
            }
        }
        Copy the code
      • Question:
        1. If the corresponding update keeps failing, doesn’t it keep going in an endless loop until the call stack crashes?
        2. If the corresponding update is always being retried, is the method call to add the goods always blocked during the retries?
        3. Wouldn’t it be inefficient to add items with a lot of concurrency?
      • Solution:
        1. Perhaps the wait time for iterations could be added to limit the number of iterations and reduce CPU consumption
        2. Perhaps multithreading can be added to reduce execution time by performing updates simultaneously

          But it’s all based on the fact that the call must succeed within the visible time. In the end, it’s the synchronous call that was mishandled. This problem is particularly acute in distributed architectures

  • Way 2:

    The method of adding goods can be implemented first. After the goods are added successfully, the task of updating the index and updating the static page can be cached to a public location, and then the corresponding service can obtain the task from this location to execute, as shown in the code:
    Long commodityId = addCommodity(newCommodity); // Insert a new item
    if(commodityId ! =null) {
    	commodityTaskService.cache(goods); // Place subsequent task operations in the cache for other services to perform
    }
    Copy the code
    • But there are still problems with this:
      • Is this common task pool going to go down? Will the service be unavailable? How to solve it?
      • Are you sure the message was sent to the task pool?
      • What if I fail to send a task to the task pool?
      • Will the update inverted index service and the update static page service repeat if the retries were successful but were actually sent multiple times?
      • Will the end result be different if I repeat it?
      • .

        If the above problems are solved by us from 0, the difficulty of development can be imagined. In distributed services, due to business splitting, applications also need to be split, and even the database is divided into different tables. However, to complete a business process, it often involves the coordination of several modules. Communication between modules, between services, and between clients and servers becomes very complex.

Distributed Asynchronous Communication Mode

The typical producer-consumer pattern, which can be cross-platform and support heterogeneous systems, is usually done with the help of message-oriented middleware

  • advantages:

    Decoupling between systems, and a certain degree of recoverability, support heterogeneous systems, downstream usually concurrent execution, the system has flexibility.

    Applicable to asynchronous processing, traffic peak clipping, traffic limiting, buffering, queuing, final consistency, and message-driven scenarios
  • disadvantages:

    Message-oriented middleware has some bottlenecks and consistency issues that are not intuitive and easy to debug for development and have additional costs.

Part 2 – An introduction to messaging middleware

“Message Middleware Concepts”

  • General concept: Message-oriented system (message-oriented middleware) is the basic software for sending and receiving messages in distributed systems
  • Concept of analytical: Message middleware can also be calledThe message queue, refers to the use of efficient and reliable messaging mechanism for platform-independent data communication, and based on data communication to carry out distributed system integration. Communication of processes can be extended in a distributed environment by providing a messaging and message queue model.
  • In a word:
    • Message-oriented middleware is to truncate communication between upstream and downstream, and then use its decoupling and asynchronous characteristics to build flexible, reliable and stable systems.

“Custom Messaging middleware”

Use Java code to implement the Producer-consumer pattern.

BlockingQueue is a common container in Java and is widely used in multithreaded programming. When the queue container is full, the producer thread is blocked until the queue is full. When the queue container is empty, the consumer thread is blocked until the queue is not empty and can continue taking.

  1. Mask entity class
public class Mask {
    
    private String id;
    private String type;
    
    public Mask(String id, String type) {
        this.id = id;
        this.type = type;
    }
    
    // get/set
    
    @Override
    public String toString(a) {
        return "Mask{" +
                "id='" + id + '\' ' +
                ", type='" + type + '\' ' +
                '} '; }}Copy the code
  1. producers
public class Producer implements Runnable{
    
    // block the queue
    private final BlockingQueue<Mask> blockingQueue;
    
    // constructor (initialize blocking queue)
    public Producer(BlockingQueue<Mask> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }
    
    @Override
    public void run(a) {
        while (true) {
            try {
                /* One mask is produced every 0.2s */
                Thread.sleep(200);
                if (blockingQueue.remainingCapacity() > 0) {
                    Mask mask = new Mask(UUID.randomUUID().toString(), "N95");
                    blockingQueue.add(mask);
                    System.out.println("Our factory is producing masks and currently has" + blockingQueue.size() + "Ten thousand masks...");
                } else {
                    System.out.println("The factory warehouse is full." + blockingQueue.size() + "Ten thousand masks, come and buy!!"); }}catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code
  1. consumers
public class Consumer implements Runnable {
    
    // block the queue
    private final BlockingQueue<Mask> blockingQueue;
    
    // constructor (initialize blocking queue)
    public Consumer(BlockingQueue<Mask> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }
    
    @Override
    public void run(a) {
        while (true) {
            try {
                /* Buy a mask every 0.1s */
                Thread.sleep(100);
                long startTime = System.currentTimeMillis();// Start buying masks
                Mask mask = blockingQueue.take();
                long endTime = System.currentTimeMillis();// End the time to buy masks
                System.out.println("I bought a mask \t (mask info:"+ mask.toString() +")");
                System.out.println("It cost me to wait for the goods." + (endTime - startTime) + "ms");
            } catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code
  1. The main entrance
public class MyTest {
    
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Mask> queue = new ArrayBlockingQueue<>(10); // The maximum number of masks can be 100,000
    
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
    
        new Thread(producer).start();
         Thread.sleep(3000); // Give mask manufacturers 3s, 150,000 masks can be produced, but the upper limit is 100,000
        new Thread(consumer).start(); // After the intervention of consumers, the average net consumption is 10,000 masks every 0.2s, that is, the stock of masks is completely cleared up after 2s}}Copy the code
  1. Running effect

The above code in the production environment is not obvious, such as no cluster, no distributed, play method is too single, can not meet the requirements of enterprise applications…

“Mainstream Messaging Middleware and Selection”

Some of the older systems in traditional financial institutions, banks, government agencies, etc. are still using commercial MQ products from vendors such as IBM

At present, the popular open source messaging middleware in the industry includes: ActiveMQ, RabbitMQ, RocketMQ, Kafka, ZeroMQ and so on, among which RabbitMQ, RocketMQ, Kafka are the most widely used

Redis can also implement mechanisms like “Queue” and “Pub/Sub” to some extent, but it is not strictly message-oriented

  • Select principle:
    • First, the product should be open source. Open source means that if a bug is encountered in queue usage, it can be fixed quickly without waiting for an update from the developer
    • Second, the product must be popular in recent years and have an active community. In this way, you can quickly find solutions to problems. Popularity also means fewer bugs. Popular products are generally compatible with peripheral systems.
    • Finally, as a message queue, it has the following features:
      • Reliability of message transmission: To ensure that messages are not lost.
      • Support for clustering, including horizontal scaling, single point of failure can be resolved.
      • The performance must meet the performance requirements of the business.
  1. RabbitMQRabbitMQ was originally used for reliable communications in telecommunications and is one of the few products to support the AMQP protocol.
    • advantages:
      • Lightweight, fast, easy to deploy and use
      • Flexible route configuration is supported. In RabbitMQ there is a switch module between the producer and the queue. According to the routing rules, messages sent by producers can be sent to different queues. Routing rules are flexible and can be implemented by themselves
      • RabbitMQ clients support most programming languages
    • disadvantages:
      • If a large number of messages pile up in the queue, performance degrades dramatically
      • RabbitMQ has the worst performance of Kafka and RocketMQ, processing tens to hundreds of thousands of messages per second.
      • RabbitMQ was developed by Erlang and is expensive to extend and redevelop
  2. RocketMQ: RocketMQ is an open source message queue implemented in Java. Kafka’s design was borrowed and improved a lot. RocketMQ is primarily used in ordered, transactional, flow computation, message push, log flow processing, binlog distribution, and other scenarios. After all the double 11 tests, the performance and stability can be reliable.
    • advantages:
      • It has almost all the features and functions that a message queue should have
      • Java development, reading source code, extension, secondary development is very convenient
      • Many optimizations have been made for response latency in e-commerce. In most cases, the response is at the millisecond level.
      • Performance is an order of magnitude higher than RabbitMQ, processing hundreds of thousands of messages per second
    • disadvantages:
      • Integration and compatibility with peripheral systems are not very good
  3. Kafka: The reliability, stability, and features of Kafka basically meet most application scenarios.

    Kafka is one of the most compatible systems around, especially in big data and streaming computing, where almost all open source software supports Kafka.
    • advantages:
      • Efficient, scalable, message persistence.
      • Support for partitioning, duplicates, and fault tolerance
      • Developed in Scala and Java, with a lot of design for batch and asynchronous processing, Kafka can achieve very high performance.
      • Sending and receiving asynchronous messages are the best of the three
    • disadvantages:
      • Orders of magnitude less than RocketMQ, processing hundreds of thousands of messages per second
      • With asynchronous messages and compression turned on, Kafka can eventually process up to 2000W messages per second — but because it is asynchronous and batch, the latency is high and not suitable for the e-mart landscape

“Message Middleware Application Scenarios”

12306 ticket queuing lock block, e-commerce seconds kill, big data real-time calculation……

[E-commerce second kill case]

For example, on June 18, the activity starts at 0:00 and only the top 200 are allowed. When the second kill is about to start, users will frantically refresh their APP or browser to ensure that they can see the product as soon as possible

  1. How should the system respond to high concurrency read requests when the user is constantly refreshing the page before the second kill starts?
  2. At the beginning of the second kill, a large number of concurrent users instantly request the system to generate orders, reducing inventory, how should the system deal with high concurrent write requests?
  • Handle high concurrency read requests
    • Use caching policies to keep requests out of the cache in the upper layer
    • Data that can be static should be as static as possible
    • Add traffic limiting (such as discarding repeated requests from a user, IP, or device within a short period of time)
  • Handle high concurrency write requests
    • Use message queues!
      • Cut the peak write traffic in the seckill scenarioTraffic peak clipping

        The request is temporarily stored in the message queue, and the service server responds to the user “The result is being processed…”. To free up system resources to handle requests from other users.

        There are 1000 SEC kill items and it takes 500ms to process a purchase request, so it takes a total of 500s. If you deploy 10 queue handlers, the time to process a kill request is the 50s, meaning that users need to wait the 50s to see the result of a kill, which is acceptable. At this point, 10 requests will be sent to the database, and it will not strain the database too much.

      • Simplify the business process in a kill request by asynchronous processingAsynchronous processing

        Primary services are processed first and secondary services are processed asynchronously. For example, the main process is to generate orders and reduce inventory; Secondary processes such as the successful purchase will send coupons to users to increase their points.

        Users are often concerned about the success of the second kill, the subsequent release of preferential and “points within 24 hours to the account” and other operations can be tolerated

      • Decouple, realize the loose coupling between the system modulesThe decoupling

        All the data is sent to the message queue, and the data service subscribes to the message queue to receive the data for processing.

[Job Hunting and Recruitment cases]

The pull check site is divided into B and C ends, with B for enterprise users and C for job seekers. Both end B and end C have their own search engines and caches. End B needs to obtain updates from end C to update the search engines and caches. The C side needs to obtain the update of the B side to update the search engine and cache of the C side.

How to solve the problem of data sharing between TERMINALS B and C?

  • Synchronize shared data x: Terminals B and C advertise services through RPC or WebService for each other to invoke to obtain information about each other. Every time a job seeker updates his resume, he will call the service at the B end for data synchronization. Every time an enterprise user updates a job requirement, he/she invokes the service on the C terminal for data synchronization.
  • √ : End B advertises updated data to the message queue, and end C advertises updated data to the message queue. End B subscribes to the message queue of end C, and end C subscribes to the message queue of end B.

【 Case of Alipay buying movie tickets 】

After purchasing a movie ticket in Alipay, users will soon receive message push and SMS (cinema address, hall number, seat number, time, etc.), and users will accumulate certain member points.

Here, the trading system does not have to wait for all actions such as message delivery to complete before returning success, allowing for certain delays and transient inconsistencies (final consistencies), and the latter two actions can usually be executed concurrently.

Part 3 – JMS specification and AMQP protocol

“JMS classic Mode details”

JMS (Java Message Service application Interface) is a Java platform API for MESSage-oriented Middleware (MOM). It is used to provide information between two applications. Or send messages in distributed systems for asynchronous communication. The vast majority of MOM providers support platform-independent apis.

  • JMS messageA message is a type of object in JMS and consists of two parts:
    • Message header: header field + header property. Fields are fields specified by the JMS protocol, and properties can be added by the user on demand.

    • The message body: Carries application data or payloads classified as follows:
      • Simple TextMessage
      • Serializable objects (ObjectMessage)
      • Attribute Collection (MapMessage)
      • Byte streams (BytesMessage)
      • Raw value stream (StreamMessage)
      • Message with no payload.
  • The architecture:
    • JMS vendor products: an implementation of the JMS interface. The product can be a JMS implementation of Java or a non-Java message-oriented middleware adapter
    • JMS Client: applications or objects that produce or consume message-based Java.
    • JMS Producer: JMS client that creates and sends messages
    • JMS Consumer: THE JMS client that receives the message
    • JMS Message: objects that contain data that can be passed between JMS clients
    • JMS Queue: a container for caching messages. Messages do not have to be received in the same order as they were sent. Messages are removed from the queue after they are consumed
    • JMS Topic: the Pub/Sub model
  • The object model:
    • ConnectionFactory interface (ConnectionFactory):

      The managed object that the user uses to create a connection to the JMS provider. JMS clients access connections through portable interfaces so that the code does not need to be modified when the underlying implementation changes. The administrator configures connection factories in the JNDI namespace so that JMS clients can look them up. Depending on the message type, the user will use either a queue connection factory or a topic connection factory.
    • Connection Interface:

      A connection represents a communication link between an application and a message server. Once you have the connection factory, you can create a connection to the JMS provider. Depending on the connection type, connections allow users to create sessions to send and receive queues and topics to targets.
    • Destination interface (Destination):

      A target is a managed object wrapped with an identifier for a message target, either a queue or a topic, where the message is published and received. The JMS administrator creates these objects and the user discovers them through JNDI. As with connection factories, administrators can create two types of targets, queues for the point-to-point model, and topics for the publisher/subscriber model.
    • Session Interface (Session):

      Represents a single-threaded context for sending and receiving messages. Since the session is single-threaded, messages are continuous, meaning they are received one by one in the order they were sent. The advantage of sessions is that they support transactions. If the user chooses transaction support, the session context holds a set of messages that are not sent until the transaction is committed. The user can cancel these messages using a rollback operation before committing the transaction. A session allows users to create messages, producers to send messages, and consumers to receive messages.
    • MessageConsumer interface (MessageConsumer):

      Object created by the session to receive messages sent to the target. Consumers can receive queue and topic type messages synchronously (blocking mode) or (non-blocking).
    • MessageProducer interface (MessageProducer):

      Object created by the session to send messages to the target. A user can create a sender for a target, or a generic sender that specifies a target when sending a message.
    • Message interface (Message):

      Are objects that are passed between consumers and producers, that is, from one application to another. A message has three main parts:
      • Message header (required) : Contains action Settings for identifying and routing the message.
      • A set of message properties (optional) : Contains additional properties to support compatibility between other providers and users. Custom fields and filters (message selectors) can be created.
      • A message body (optional) : Allows the user to create five types of messages (text messages, mapping messages, byte messages, stream messages, and object messages).

  • Support mode:
    • Point-to-point (Queue mode) Queue: A producer publishes a message to a particular queue from which a consumer reads the message.
    • Publish/subscribe Topic: Publishes messages to a particular topic, 0 or more subscribers may be interested in receiving messages on a particular message topic.
  • deliveryMarked:NON_PERSISTENTIs delivered at most once, and is marked asPERSISTENTThe message will be delivered using the snap-and-forward mechanism
  • supplier:
    • Open source software:
      • Apache ActiveMQ
      • RabbitMQ
      • RocketMQ
      • HornetQ developed by the JBoss community
      • Joram
      • The MantaRay Coridan
      • OpenJMS for The OpenJMS Group
    • Proprietary supplier:
      • BEA WebLogic Server JMS for BEA
      • TIBCO EMS Software
      • GigaSpaces GigaSpaces Technologies
      • IBus Softwired 2006
      • IONA JMS from IONA Technologies
      • SeeBeyond’s IQManager (acquired by Sun Microsystems in August 2005)
      • WebMethods JMS + –
      • My – channels of Nirvana
      • Sonic SonicMQ Software
      • The SwiftMQ SwiftMQ
      • IBM WebSphere MQ

“JMS issues in Application Clusters”

Production applications are almost always deployed in clusters. In Queue mode, the consumption of messages is fine because the same applications on different nodes preemptively consume messages, which also spreads the load.

Solution: Create the same Queue, each application consumes its own Queue disadvantages: waste space, producers also need to care about how many consumers downstream, the original purpose of “decoupling”.

How about Topic broadcast mode? For a message, the same application on different nodes will receive the message, act on it, and repeat consumption…

Solution: Hashing services or using distributed locks to realize competition between different nodes. Disadvantages: It is not an elegant solution because it invades services greatly.

ActiveMQ solves this problem with “virtual themes”

  • There seems to be a need for a combination of these two modes in production:
    • That is, there is competition between the same applications on different nodes, which will partially consume (P2P).
    • Different applications need to consume the full message (Topic) pattern. In this way, repeated consumption can be avoided.

Download the JMS specification document (JMS-1_1-FR-spec.pdf)

“AMQP Protocol Anatomy”

AMQP (Advanced Message Queuing Protocol) is a standard similar to and compatible with JMS. RabbitMQ supports AMQP 0-9-1, version 3.8.4 supports AMQP 1.0.

  • Concepts in AMQP:

    • Publisher: Message sender

      Send the message to the Exchange and specify a RoutingKey so that the queue can receive the specified message
    • Consumer: Message consumers

      Get messages from queues. A Consumer can subscribe to multiple queues to receive messages from multiple queues
    • Server: A concrete instance of an MQ service, also known as a Broker
    • Virtual host: Virtual host

      A Server can have multiple Virtual hosts to isolate different items. A Virtual host usually contains multiple Exchanges and Message Queues
    • Exchange: the switch

      Receives messages sent by Producer and forwards the messages to the corresponding Message Queue
    • Routing key: the routing key

      Used to specify Routing rules for messages (Exchange routes messages to a queue), usually in conjunction with a specific Exchange type, Binding Routing key
    • Bindings: Specifies the binding between the Exchange and Queue.

      Exchange determines which queues to dispatch messages to based on the message’s Routing key and Binding configuration (Binding, Binding, Routing key, and so on). This depends on the Exchange type
    • Message Queue: the container that actually stores the message and passes it to the final Consumer
  • AMQP transport layer architecture:

    AMQP is a binary protocol in which information is organized into data frames of many types.

    Data frames carry protocol methods and other information. All data frames have basically the same format:The frame header.load.The frame tail.

    The format of the data frame payload depends on the type of data frame

    We assume a reliable stream-oriented network transport layer (TCP/IP or equivalent)

    In a single socket connection, there may be multiple independent threads of control called channels. Each data frame is numbered using the channel number.

    By interweaving data frames, different channels share a connection. For any given channel, data frames are transmitted strictly sequentially.

    We use small data types to construct data frames, such as bit, INTEGER, String, and field tables.

    The fields of the data frame are slightly encapsulated without making the transfer slow or difficult to parse. It is relatively simple to mechanically generate layers of data frames according to protocol specifications

    The line-level format is designed to be scalable and generic enough to support any high-level protocol (not just AMQP).

    We assume that AMQP will expand, improve, and other changes over time, and require the wire-level format to support these changes.
    • The data type:
      • Integers(decimal number ranging from 1 to 8) : used to indicate size, quantity, limit, etc. Integer type unsigned, may not be aligned within the frame
      • Bits(8 bytes) : indicates the on/off value
      • Short strings: used to hold short text attributes. The number of strings is limited to 255,8 bytes
      • Long strings: Used to save binary data blocks
      • Field tables: contains key-value pairs. The field values are usually strings or integers
    • Negotiate agreements:

      The AMQP client and server negotiate the protocol.

      When the client is connected, the server presents the client with some options that the client must be able to accept or modify. If both parties agree on the outcome of the negotiation, proceed with the connection establishment process.

      Protocol negotiation is a useful technique because it allows us to assert assumptions and preconditions

      Agreeing to a constraint may cause both parties to reallocate the cache of the key, avoiding deadlocks. Each incoming data frame either follows the agreed limit, which is safe, or exceeds the limit, at which point the other party makes an error and must disconnect. In excellent practice of the “everything works or nothing works” RabbitMQ philosophy both parties agree to a small value as follows:

      • What restrictions must the server tell the client
      • The client responding to the server may require a lower limit on the client’s connection
    • Data frame demarcationTCP/IP is a streaming protocol and has no built-in mechanism for defining data frames. The existing agreement is resolved from the following aspects:
      • Each connection sends a single data frame. Simple but slow.
      • Adds a frame boundary to the stream. Simple, but very slow parsing.
      • Calculates the size of the data frame by adding the data frame size to each data frame header. This is simple, fast, AMQP option.

The RabbitMQ JMS client is implemented with the RabbitMQ Java client, which is compatible with both the JMS API and the AMQP 0-9-1 protocol

RabbitMQ uses the AMQP protocol. The JMS specification only specifies the use of Java, not any other language. The protocol is language-independent and can be used as a client if the language implements the protocol. In this way, interoperability between different languages is guaranteed

AMQP protocol document download address


  1. A lightweight RESTful HTTP service client widely used in SpringCloud. Conforms to the interface oriented programming habit (essence: encapsulates HTTP call flow, similar to Dubbo service call)↩