Message queue has gradually become the core means of internal communication in enterprise IT systems. It has a series of functions, such as low coupling, reliable delivery, broadcast, flow control and final consistency, and has become one of the main means of asynchronous RPC. There are many mainstream message middleware in the market today, such as the old ActiveMQ, RabbitMQ, the hot Kafka, Alibaba independently developed Notify, MetaQ, RocketMQ and so on. Instead of going through all the features of these message queues, this article looks at some of the important things you need to think about and design when designing a message queue yourself. We will refer to many of the important ideas of these mature message queues. This article will start by explaining when you need a message queue, and then start with the Push model to analyze the issues you need to consider when designing a message queue from scratch, such as RPC, high availability, sequential and repeated messages, reliable delivery, and consumption relationship resolution. The advantages of the Pull model represented by Kafka will also be analyzed. Finally, advanced topics such as batch/asynchronous performance improvement, system design concept of pull model, storage subsystem design, flow control design, fair scheduling implementation, etc. The last four of these will be discussed in the next chapter.

When message queues are needed

When you need to use message queues, you need to consider their necessity in the first place. There are many scenarios in which MQ can be used. The most common ones are for service decoupling/final consistency/broadcasting/off-peak flow control, etc. On the other hand, if consistency is required and the processing results of business logic are concerned, RPC is more appropriate.

The decoupling

Decoupling is the most fundamental problem to be solved in message queues. Decoupling is simply a transaction that only cares about the core process. Less important things that rely on other systems can be notified without waiting for results. In other words, the message-based model is concerned with “notification”, not “processing”. For example, in Meituan Travel, we have a product center. Upstream of the product center, the main station, mobile background, tourism supply chain and other data sources are connected. Downstream docking is screening system, API system and other display systems. When the upstream data changes, if the message system is not used, it is bound to call our interface to update the data, which particularly depends on the stability and processing capacity of the product center interface. But in fact, as a tourism product center, maybe only for the self-established tourism supply chain, the success of product center renewal is what they care about. For external systems such as group purchase, success or failure of product center update is not their responsibility. They just need to make sure they notify us when the information changes. Downstream, we might need to update indexes, refresh the cache, and so on. For the product center, it’s not our job. To put it bluntly, if they pull data on a regular basis, the data can be updated, but not in real time. However, using the interface to update their data is obviously too “heavyweight” for the product center, and it may be more reasonable to just issue a notification of the product ID change to be handled by the downstream system. For another example, in our order system, it may be necessary to send SMS points to users after the final payment is successful, but this is no longer the core process of our system. If the external system speed is slow (such as SMS gateway speed is not good), then the main process time will be much longer, users certainly do not want to click to pay for several minutes before seeing the results. Then we just need to notify the SMS system that “we have paid successfully” and do not need to wait for it to complete processing.

Final consistency

Final consistency means that the states of two systems remain the same, and either they succeed or they fail. Of course there is a time limit, in theory the faster the better, but in practice in the case of various exceptions, there may be some delay in reaching the final consistent state, but the final state of the two systems is the same. There are several message queues designed for “ultimate consistency”, such as Notify (Ali), QMQ (Where are we going), which are designed for highly reliable notifications in trading systems. To understand the final consistency in terms of A bank’s transfer process, the transfer requirements are very simple. If system A successfully withholds the money, then system B must successfully add the money. Otherwise, they roll back together as if nothing had happened. However, there are many possible surprises along the way:

  1. A succeeded in deducting the money, but failed to call the B + money interface.
  2. A successfully deducted the money, although the call to B money interface is successful, but the final result of the network anomaly caused by timeout.
  3. A successfully deducts money, B fails to add money, A wants to roll back the deducted money, but A machine down.

Visible, want to make this seemingly simple thing really done, really not so easy. For all consistency issues across VMS, the common solution from a technical point of view is:

  1. Strong consistency, distributed transactions, but landing is difficult and costly, as discussed later.
  2. The final consistency is mainly in the form of “record” and “compensation”. Write things down before you do anything uncertain, and then do something uncertain. The result may be success, failure, or uncertainty. “uncertainty” (e.g., time out) can be equivalent to failure. Success cleans up the records. For failures and uncertainties, you can rely on scheduled tasks and other methods to redo all the failed things until success. Back to the example of the system in A buckle money under the condition of success, to give B “inform” it records in the database (in order to ensure the highest reliability can add notification system B and buckles money success both maintenance in A local transaction), notify the success is to delete this record, failed to notice or not sure depend on timing task compensatory to inform us, Until we update the status to the correct one. The whole model could still be done based on RPC, but could be abstracted into a unified model of an “enterprise bus” based on message queues. Specifically, local transactions maintain business changes and notification messages, land together (and roll back together if they fail), and then the RPC reaches the broker. After the broker lands successfully, the RPC returns with success and the local message can be deleted. Otherwise, local messages are constantly resent by scheduled task polling, which ensures a reliable landing broker for messages. The broker sends a message to a consumer in a similar manner until the consumer sends a successful confirmation of the purchase. Ignoring the problem of duplicate messages for a moment, the downstream is guaranteed to receive the message by landing the message twice plus compensation. Then rely on the state machine version number and other ways to do heavy, update their own services, to achieve the final consistency.

Final consistency is not a required feature of message queues, but it can be relied upon to do the same thing. In addition, all message queues that do not guarantee 100% no-loss cannot theoretically achieve final consistency. Well, theoretically 100%, to eliminate serious system failures and bugs. For designs like Kafka, there is the possibility of dropping messages at a design level (such as a timed flush, and dropping messages if a power failure occurs). Even if only 1/1000 of a message is lost, the business must use other means to ensure that the results are correct.

radio

One of the basic functions of message queues is to broadcast. Without message queues, we would tune the new interface each time a new business party came in. With message queues, we only need to care if the message is delivered to the queue, and who wants to subscribe is a downstream matter, greatly reducing the workload of development and tuning. For example, as mentioned at the beginning of this article, the product center releases information about product changes, and the scenic spot database has a lot of information to be updated. There may be many “concerned” parties, but the product center and scenic spot database only need to release information about changes, and whoever cares about it can access.

Peak shifting and flow control

Imagine that upstream and downstream have different processing abilities. For example, the Web front end handling tens of millions of requests per second is not a magical thing, just need to add a few more machines, build some LVS load balancing equipment and Nginx etc. However, the processing capacity of the database is very limited. Even with the use of SSD and sub-database sub-table, the processing capacity of a single machine is still at 10,000 levels. Because of cost considerations, we can’t expect the number of machines in the database to catch up with the front-end. This problem also exists between systems. For example, SMS systems may be stuck on the gateway due to the short board effect (hundreds of requests per second), and the concurrency at the front end is not of the same order of magnitude. But users at night half a minute or so to receive text messages, is generally not too big a problem. Without message queues, complex schemes such as negotiation and sliding Windows between two systems are not impossible. However, the exponential growth of system complexity inevitably requires upstream or downstream storage, and has to deal with a series of problems such as timing and congestion. And whenever there is a gap in processing power, a separate set of logic needs to be developed to maintain this logic. Therefore, it is a relatively common approach to dump the communication between the two systems using the intermediate system and process the messages when the downstream system is capable of handling them.

In short, message queues are not a panacea. RPC is superior to message queues for those requiring strong transaction assurance and being delay-sensitive. Message queues can be used to do things that are unimportant or important to others but not important to you. Message queues that support ultimate consistency can be used to handle “distributed transaction” scenarios that are less delay-sensitive and may be a better way to handle them than cumbersome distributed transactions. Use message queues as a common “funnel” when there is a gap between upstream and downstream systems. Distribution is carried out when downstream is able to handle it. Don’t hesitate to use message queues if there are many downstream systems that care about notifications from your system.

How to design a message queue

review

Now that we have a clear message queue usage scenario, the next step is to design and implement a message queue.



A message-based system model does not necessarily require a broker(message queue server). Akka (Actor model), ZeroMQ, etc. on the market are actually message-based system design paradigm, but there is no broker.

To design a message queue and equip it with a broker, we need to do two things:

  1. A dump of a message, delivered at a more appropriate time, or assisted by a series of means to reach the consumer machine.
  2. Specification of a paradigm and a common pattern to meet the needs of decoupling, final consistency, staggered peaks, and so on. Broken down, the simplest message queue can be made into a message forwarder, making one RPC into two. The sender delivers the message to the broker, which forwards the message to the receiver.

In general, the idea of designing message queues is to first build an overall data flow, such as producer sending to broker,broker sending to consumer,consumer replying to consumer confirmation,broker deleting/backing up messages, etc. Data streams are strung together using RPC. Then consider the high availability of RPC, try to achieve stateless, convenient horizontal expansion. Then consider how to carry the message stack, and then deliver the message at the right time. The best way to deal with the stack is storage, and the selection of storage needs to consider performance/reliability and development and maintenance costs and many other factors. In order to implement the broadcast function, we must maintain the consumption relationship, we can use zK/Config server to save the consumption relationship. After completing the above functions, message queues are basically implemented. Then we can consider some advanced features such as reliable delivery, transaction features, performance optimization, etc. Below we will design the message queue to focus on the module as the main line, interspersed with some message queue feature implementation methods, to specific analysis of the design and implementation of a message queue in all aspects.

Implement the basic functions of queues

RPC communication protocol

As mentioned earlier, the so-called message queue is nothing more than two RPCS and a dump, of course, the case that requires the consumer to finally do the consumption confirmation is three RPCS. Since it is RPC, there must be a series of topics, such as load balancing, service discovery, communication protocols, serialization protocols, and so on. In this area, my strong advice is not to reinvent the wheel. Leverage your company’s existing RPC frameworks: Thrift, Dubbo, or any other custom framework. Because message queue RPCS are not fundamentally different from ordinary RPCS. Of course, it’s not impossible to rewrite RPC frameworks yourself using Memchached or Redis protocols (MetaQ used their own encapsulation of Gecko NIO, kafka used similar protocols). But the cost and difficulty of implementing it have certainly multiplied. You can use an off-the-shelf RPC framework to eliminate extreme requirements for efficiency. In simple terms, the server provides two RPC services, one to receive messages and one to acknowledge receipt of messages. No matter which server receives the message or acknowledgement message, the result is the same. Of course, this may also involve cross-IDC services. Here and THE principle of RPC is consistent, try to choose this machine room delivery first. You might ask, what if the producer and the consumer are themselves in two rooms? First, the broker must be aware of all consumers. Secondly, the producer should just choose the nearest computer room as far as possible.

High availability

In fact, all high availability is dependent on RPC and high availability of storage. First, let’s take a look at the high availability of RPC. Meituan’s RPC framework based on MTThrift and Ali’s Dubbo have functions such as automatic service discovery and load balancing. The high availability of message queues, by ensuring that the broker’s interface to receive and acknowledge messages is idempotent, and that several consumer machines process messages are idempotent, hands off the availability of message queues to the RPC framework. So how do you guarantee idempotence? The easiest way to do this is to share storage. Broker multiple machines sharing a DB or a distributed file/KV system processing messages is naturally idempotent. Even if there is a single point of failure, the other nodes can be taken over immediately. In addition, failover can rely on compensation for scheduled tasks, which is naturally supported by message queues. We don’t need to worry too much about the availability of the storage system itself. For queues that do not share storage, such as Kafka using partitioned plus active/standby mode, it is a little more troublesome. High availability within each partition must be ensured, that is, each partition must have at least one master/slave and data synchronization is required. For details on this HA, refer to the next section of The Pull Model message System design.

The ability of the server to carry message accumulation

If a message reaches the server without any processing and then reaches the receiver, the broker loses its meaning. In order to meet our peak/flow control/final reachable needs, it would be logical to store messages and then choose when to deliver them. But this storage can be done in many ways. For example, store it in memory, store it in distributed KV, store it on disk, store it in database, etc. But when it comes down to it, there are two main types: persistent and nonpersistent. The persistent form ensures a greater degree of message reliability (such as power outages) and can theoretically carry a greater amount of message accumulation (external storage is much larger than memory). But not every message needs persistent storage. Many messages require delivery performance more than reliability, and there are a large number of messages (e.g., logs). At this time, the message is directly stored in memory, try several failover, and finally delivered. Message queues in the market generally support both forms. Of course, the specific scene should be combined with the company’s business.

Storage subsystem selection

Let’s take a look at the options for various storage subsystems if data landing is required. Theoretically, in terms of speed, file system > Distributed KV > Distributed file system > database, while reliability is the opposite. Still, make the most reasonable choice based on the business scenario you support. If your message queue is used to support payment/transaction, etc., with high reliability requirements, but not so high performance and volume requirements, and you don’t have the time and energy to devote to file storage system research, DB is the best choice. However, DB is limited by IOPS, and file-based storage is a good solution if 5-digit QPS performance is required for a single broker. On the whole, data files + index files can be used for processing. The specific design of this part is complicated, and you can refer to the storage subsystem design in the next part. Distributed KV (such as MongoDB, HBase), or persistent Redis, due to its friendly programming interface and considerable performance, is also a good choice in scenarios with low reliability requirements.

Analysis of consumption relationship

Our message queue is now tentatively capable of dumping messages. The next important thing is to resolve the send/receive relationship and deliver the message correctly. The message Queue on market defines a heap of confusing term, such as the Topic of the JMS specification/Queue, Kafka Topic/Partition/ConsumerGroup inside, the inside of the RabbitMQ Exchange and so on. Put aside the phenomenon to see the essence, nothing more than unicast and broadcast difference. The so-called unicast is point-to-point; Broadcasting, on the other hand, is point-to-multipoint. Of course, for most applications of the Internet, inter-group broadcast and intra-group unicast are the most common scenarios. A message needs to be notified to multiple business clusters, and there are many machines in a business cluster, so only one machine needs to consume the message. Of course, this is not absolute, but in many cases there are scenarios for intra-group broadcasting, such as local cache updates and so on. In addition, the consumption relationship may have a multi-level tree relationship in addition to within and between groups. Such cases are too complex to be considered. Therefore, a common design is to support inter-group broadcasting, with different groups registering for different subscriptions. If different machines in a group register the same ID, unicast; If different ids (such as IP address + port) are registered, broadcast. As for the maintenance of broadcast relationship, message queues are generally maintained on common storage, such as Config Server and ZooKeeper, because they are clusters. The things to do to maintain broadcasting relations are basically the same:

Queue advanced feature design

Above is all some message queue the realization of the basic functions, below to see some of the content of the message queue characteristics related, regardless of reliable delivery/message loss and repeat, and transaction or performance, not take care to every message queue, so be in accordance with the requirements of the business, to carefully measure the cost of various features to achieve, advantages and disadvantages, finally to make the most reasonable design.

Reliable delivery (Final consistency)

This is an exciting topic. Is it possible to be completely uninformed? The answer is, absolutely, if the message is likely to be repeated and, in exceptional cases, to accept a delay in the message. The solution is simple and simple, that is, whenever something unreliable is about to happen (RPC, etc.), the message is dropped first and then sent. When a message fails or fails without knowing the success (such as timeout), the status of the message is to be sent. The scheduled task keeps polling all the messages to be sent, and eventually the message will be delivered. To be specific:

  1. The producer needs to land a message before sending it to the broker.
  2. After the request is sent to the server, the server sends a message to the client that the data is successfully sent.
  3. A broadcast-enabled message queue persists a send state for each endpoint to be sent and does not delete messages until all endpoint states are OK.

For all kinds of uncertainties (timeout, down, message not delivered, data falling after delivery, data landing reply not received), in fact, for the sender, it is the same thing, that is, the message is not delivered. The problem with repushing messages is message duplication. Repetition and loss are like two nightmares. You have to deal with one. Fortunately, repeated messages have the opportunity to process, and lost messages are difficult to retrieve. Anyway, as a mature message queue, we should try our best to reduce the possibility of repeated delivery in each link. We should not indulge in random delivery just because there are repeated solutions. Finally, not all systems require ultimate consistency or reliable delivery, such as a forum system, a recruitment system. A repeated resume or topic post can be more annoying than a missing post. To keep repeating, any underlying component needs to serve the business scenario.

Consumer to confirm

When the broker delivers a message to the consumer, the consumer can immediately respond that I received the message. But receiving this message is just the first step, and whether I can process it is not certain. Perhaps because of a consumption problem, the system’s load can no longer process the message. Or the message mentioned in the state machine is not the message I want to receive, and I ask to resend the message. Separating message delivery from message processing is the true decoupling of message queues. Therefore, it is necessary to allow consumers to take the initiative to confirm consumption. Of course, for messages with no special logic, the default Auto Ack is also possible, but be sure to allow the consumer to Ack actively. There is nothing special about correctly consuming ACK. But for reject and error, special instructions are needed. Reject is something that the business side is not aware of, and assessing the flow and health of the system, as well as the processing capacity, is a very complex matter. For an extreme example, if you receive a message and start building an index, the message may take half an hour to process, but the message volume is very small. Therefore, reject is recommended as a sliding window/thread pool model to control. When consumption power does not match, reject directly and resend after a period of time to reduce the burden of business. But a business error is something that only the business side knows about, like the state machine mentioned above. The business should be allowed to proactively ack the error and agree with the broker on the time of the next delivery.

Repeated and sequential messages

As mentioned above, repeated messages are impossible to avoid 100% unless loss is allowed. Can sequential messages be 100% satisfied? The answer is yes, but on more demanding terms:

  1. Allows message loss.
  2. From the sender to the server to the receiver is a single point single thread.

Therefore, absolute sequential messages are basically impossible to achieve. Of course, in the message queue of METAQ/Kafka and other pull models, single-thread production/consumption, excluding message loss, is also a solution to sequential messages. In general, the design paradigm of a mainstream message queue should be to minimize duplicate messages without losing them, and not guarantee the delivery order of messages. When it comes to repeating messages, there are two main topics:

  1. How to identify duplicate messages and idempotent handle duplicate messages.
  2. How a message queue minimizes the delivery of duplicate messages.

Starting with the first topic, each message should have its own unique identity. No matter the MessageId is customized by the business side or generated according to IP/PID/ timestamp, if there is a place to record the MessageId, the message arrival can be compared and repeated identification can be completed. Database unique key/Bloom filter/ distributed KV key, are good choices. Since messages cannot be permanently stored, it is theoretically possible that the upstream will continue to deliver the message at the moment it is removed from the persistent store (the upstream fails to deliver the message for some reason and keeps trying again until it is time for the downstream to clean up the message). This kind of thing can only happen under unusual circumstances, after all, it is a minority situation. The message hasn’t arrived in two minutes. What’s the difference? Idempotent processing of messages is an art where duplicate messages or disorganized messages still come in for a variety of reasons. There are two common solutions:

The version number

For a simple example, a product has an online/offline state. If message 1 is offline, message 2 is online. Unfortunately, message 1 failed to be judged seriously and was delivered twice, and the second time happened after 2. If no repetitive judgment is made, it is obvious that the final state is wrong. However, if each message comes with a version number. When sent upstream, mark message 1 version number 1 and message 2 version number 2. If further offline messages are sent, the version number is marked as 3. Downstream maintains a version number for each message processed. Only messages larger than the current version number are accepted at a time. The initial version is 0, and when message 1 arrives, the version number is updated to 1. When message 2 arrives, it can be received because version number >1, and update version number to 2. When another offline message arrives, if the version number is 3., it is a real offline message. If it is 1, it is a duplicate message. If the business side only cares about message repetition, then the problem is solved. But a lot of times there is another headache, is the message order and the imagined order is not the same. For example, the order of should is 12, the order of arrival is 21. A status error eventually occurs. Referring to the TCP/IP protocol, if you want out-of-order messages to end up properly organized, you should only receive messages that are larger than the current version number. The version number of each message is kept for the duration of a session. If the order of arrival is 21, store 2 first, wait for 2 to arrive, then deal with 1, so that repeatability and orderliness requirements are met.

The state machine

Handling repeated and sequential messages based on version numbers sounds like a good idea, but there are always flaws. The biggest problem with using version numbers is:

  1. The sender must require a message with a business version number.
  2. The downstream must store the version number of the message, which is strictly sequential.

Instead of just storing messages with the latest version number, store all incoming messages out of order. And you have to deal with that. Imagine a “session” that never expires, such as the state of an item, moving up and down the line. Then all storage in the middle must be retained until a version prior to a certain number arrives, which is too costly. What if the message does not have a version number? The business side only needs to maintain a state machine and define the flow relationship of various states. For example, only online messages can be received in the Offline state, and only Offline messages can be received in the Online state. If the online message is received, or the offline message is received, the message is not lost and upstream services are correct. Either the message was resent, or it arrived in the wrong order. At this time, the consumer only needs to tell the sender “I can’t process this message” and ask the sender to resend it after a period of time. And there must be a limit to the number of retransmissions, such as 5 times, to avoid an endless loop, it is solved. For example, if the status of the product itself is offline, 1 is online message, 2 is offline message, and 3 is online message, normally, the message should arrive in the order of 123, but in reality, the received message status changes to 3123. Then, when the downstream receives 3 messages, it determines whether the state machine flow is offline -> online and can receive messages. Then message 1 is received, and it is found to be online -> Online. It refuses to receive the message and asks to resend the message. Then message 2 is received and the status is online -> Offline, and the message is received. No matter the resent message 1 or 3 arrives, it can still be received. Other retransmission, after a certain number of times rejected to stop retransmission, business is correct.

Middleware handling of repeated messages

Back to message queues. Which of the generic version number/state machine /ID rescheduled solutions above do message queues do, and which do message queues not do business side processing? There is no strict definition here, but going back to our starting point, we guarantee to repeat messages as little as possible without losing them, and consumption order is not guaranteed. Then the correctness of the business under repeated messages and out-of-order messages should be guaranteed by the consumer. What we should do is to reduce the repetition of message sending. There is no way to define a business version number/state machine for the business side, and it would be too much of a hostage for the customer to have a mandatory version number in the API. Moreover, maintaining so many states on the consumer side involves the problem of simultaneous consumption state between one consumer’s message landing/multiple machines, which increases exponentially in complexity and can only solve part of the problem. Key steps to reduce duplicate messages:

  1. The broker records messageids until they are cleared after successful delivery. Duplicate ids are not processed. As a result, as long as the sender can sense the success of the message delivery within the clearing cycle, there are few duplicate messages generated on the server side.
  2. For messages sent from the server to the consumer, it is necessary to record the IP address delivered because it is uncertain whether the peer is in process or if the message is lost. Ask for this IP before deciding to retransmit. Was the message processed successfully? If the inquiry fails, resend it.

The transaction

Persistence is a feature of a transaction, but satisfying persistence alone does not necessarily satisfy the characteristics of a transaction. Let’s take the deduction/addition example. To satisfy the consistent characteristics of a transaction, either none of the transactions must proceed or all of them must succeed. There are two broad solutions:

  1. Two-phase commit, distributed transaction.
  2. Local transaction, local landing, compensation send.

The biggest problem of distributed transaction is its high cost. Two-phase commit protocol is almost a black hole with no solution for arbitrating down machine or single point of failure. For transaction-intensive or I/O intensive applications, there is no way to withstand such high network latency and system complexity. And mature distributed transactions must be built with more reliable commercial DB and commercial middleware, the cost is too high. So how do you use local transactions to solve the problem of distributed transactions? Take the example of local and business building tables in a database instance, inserting messages into the local database in the same transaction as the business operation withholding money. If the message fails to enter the database, the service is rolled back. If the message is successfully stored, the transaction commits. Then send the message (note that the message can be sent in real time, without waiting for the scheduled task to check out, to improve the real-time performance of the message). The next problem is the final consistency problem mentioned earlier, as long as the message is not sent successfully, the scheduled task is always retry. A key point here is that local transactions do business landing and message landing transactions, not business landing and RPC success transactions. Here many people are easy to confuse, if the latter, is undoubtedly transaction nested RPC, is taboo, there will be a long transaction deadlock and other risks. Once the message lands successfully, there is largely no risk of loss (except for physical disk damage). A message can be reliably delivered to a producer->broker as long as it is sent to the server for confirmation before being deleted locally, and the business can be rolled back if the message storage fails. There are two biggest barriers to using local transactions:

  1. The configuration is complex, “kidnap” the business side, the local database instance must provide a library table.
  2. This does not apply to services that are sensitive to message delay.

On the other hand, not every business needs strong transactions. Transaction guarantee is required for withholding money and adding money, but transaction is not required for ordering and generating SMS, and the ordering business cannot be rolled back because the message store that requires SMS delivery fails. Therefore, a complete message queue should define the types of messages it can deliver, such as transactional messages, local non-persistent messages, and non-reliable messages that do not land on the server. Make different choices for different business scenarios. In addition, the use of transactions should be as low-cost and transparent as possible, and can be extended by relying on existing mature frameworks, such as Spring’s declarative transactions. A business side simply needs to use the @Transactional tag.

Performance related

Asynchronous/synchronous

To clarify a concept, asynchrony, synchronization and Oneway are three things. Asynchronous, in the final analysis you still need to care about the results, but maybe not at that point in time, you can use polling or callback to deal with the results; Synchronization requires the outcome of the moment; Oneway is the send it or leave it, which is fine for some scenarios where reliability is not required at all, but it’s not the category we’re talking about. Regression shows that any RPC has client-side asynchrony and server-side asynchrony, and can be any combination: client-side asynchrony to server, client-side asynchrony to server, client-side asynchrony to server, client-side asynchrony to server, client-side asynchrony to server. For clients, the difference between synchronous and asynchronous is getting a Result or a Future(Listenable). This can be done by thread pools, NIO, or other event mechanisms, which I won’t go into here. Server-side asynchrony may be a little harder to understand, but this requires RPC protocol support. Following the Servlet 3.0 specification, the server can spit out a Future to the client and notify the client when the Future is done. The entire process can be seen in the following code:

The client is synchronous and the server is asynchronous.

Future future = request(server); Future synchronized(future){while(! future.isDone()){ future.wait(); }} return future.get();Copy the code

Client synchronization Server synchronization.

Result result = request(server);
Copy the code

Client-side asynchronous server synchronization (in this case, thread pooling).

Future future = executor.submit(new Callable(){public void call(){
    result = request(server);
}})
return future;
Copy the code

Client asynchronous Server asynchronous.

Future future = request(server); // Server immediately returns future return futureCopy the code

Above said so much, in fact, is to let you out of two misunderstandings:

  1. RPC can be asynchronous only on the client, not the server.
  2. Asynchrony can only be done through thread pools.

So what are the biggest benefits of using asynchrony on the server side? Ultimately, it frees up threads and I/O. Imagine a server with a bunch of I/ OS waiting to be processed. If every request needs a synchronous response and every message needs to be returned immediately, it is almost impossible to do I/O merging (of course interfaces can be designed to be batch, but the number of incoming I/ OS is still small). Asynchronously returned to the client Future gives you the opportunity to merge I/O, drop multiple batches of messages together (this merge works especially well for databases such as MySQL that allow batch inserts), and free up threads altogether. The amount of concurrency that can be supported increases linearly without requiring as many threads as possible. The second myth is that thread pools are not the only way to return a future. In other words, you can do synchronous, asynchronous, or asynchronous operations (NIO, events) without the thread pool. Returning to the topic of message queues, we certainly don’t want the sending of messages to block the main process (as mentioned earlier, if the server is using the asynchronous model, there may be some delay due to message merging), so we can use the thread pool to submit a send request and the main process continues. But do requests in the thread pool care about the result? Of course, the message is successfully sent only after the message is successfully landed on the server. So in this model, it’s accurate to say that the client is semi-synchronous and semi-asynchronous (using a thread pool that does not block the main process, but the tasks in the thread pool wait for the server to return), and the server is purely asynchronous. The client thread pool waits on the future thrown back by the server until the server completes processing. In conclusion, synchronization can guarantee results, asynchrony can guarantee efficiency, and reasonable combination can achieve the best efficiency.

batch

When it comes to bulk production, the producer-consumer model has to be mentioned. But the biggest pain point in the producer-consumer model is when consumers should actually spend. In the grand scheme of things, consumption is event-driven. Major events include:

  1. Save up a certain amount.
  2. It reaches a certain time.
  3. New data arrives in the queue.

For timely data, mode 3 can be used, for example, the client delivers data to the server. As long as there is data in the queue, it flushes out all data in the queue, otherwise it suspends itself and waits for new data to arrive. In the process of flushing the queue data out the first time, some data is accumulated, and the second time can form a batch. The pseudocode is as follows:

Executor executor = Executors.newFixedThreadPool(4); final BlockingQueue queue = new ArrayBlockingQueue<>(); Private Runnable task = new Runnable({private Runnable task = new Runnable({private Runnable task = new Runnable({ Public void run(){List messages = new ArrayList<>(20); Queue. DrainTo (messages, 20); doSend(messages); If all four threads are full, the queue has a chance to hoard new messages. public void send(Message message){ queue.offer(message); executor.submit(task) }Copy the code

This approach is a good balance between message latency and batching, but gives priority to low latency responses. The maximum amount of delay is determined by the wait time for the last delivery. However, the problem may be that the batch size is not sufficient for maximum performance if it is sent too fast.

Executor executor = Executors.newFixedThreadPool(4); final BlockingQueue queue = new ArrayBlockingQueue<>(); volatile long last = System.currentMills(); Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){ flush(); }, 500,500, timeunits.mills); Private Runnable task = new Runnable({private Runnable task = new Runnable({private Runnable task = new Runnable({private Runnable task = new Runnable({private Runnable task = new Runnable({private Runnable task = new Runnable({ public void run(){ List messages = new ArrayList<>(20); Queue. DrainTo (messages, 20); doSend(messages); If all four threads are full, the queue has a chance to fill up with new messages. }}); public void send(Message message){ last = System.currentMills(); queue.offer(message); flush(); } private void flush(){ if(queue.size>200||System.currentMills()-last>200){ executor.submit(task) } }Copy the code

In contrast, for scenarios where performance can be traded for a reasonable amount of latency, a timing/quantitative approach may be more ideal, where a certain amount is sent before it is reached, but there is a time limit if it is never reached and cannot wait. To be specific, before submit above, one more time and quantity should be determined, and Runnable maintains a timer internally to avoid that the old task will never have a chance to trigger the sending condition when no new task comes. For server-side data landing, this approach is very convenient.

As a final note, I was once asked why network requests for smaller packets being merged into larger packets would improve performance. There are two main reasons:

  1. Reduce unnecessary headers. If you only have a few bytes per request and tens of bytes per header, it’s very inefficient.
  2. Reduced the number of ack packets to be replied. When requests are combined, the number of ACK packets must be reduced, reducing the cost of acknowledgement and retransmission.

A push or pull

Most of the message queues mentioned above are designed for the push model. Now there are many classic and mature pull model message queues on the market, such as Kafka, MetaQ and so on. This is a big departure from the traditional push approach in JMS. Let’s briefly analyze the advantages and disadvantages of push and pull models.

Slow consumption

Slow consumption is undoubtedly the biggest fatal flaw of the push model. If the speed of the consumer is much slower than the speed of the sender, messages will inevitably accumulate in the broker. Assuming that these messages are useful and cannot be discarded, messages are kept on the broker side. This is not the worst part, of course. The worst part is when the broker sends the consumer a bunch of messages, reject or error, that the consumer cannot process, and then kicks the ball back and forth. Conversely, in the Pull pattern, consumers can consume on demand without worrying about being harassed by messages they cannot handle, while brokers can stack messages relatively simply by maintaining queues and offsets for all messages instead of logging the state of every message to be sent. Therefore, the pull mode is suitable for slow consumption such as index building, limited message volume and uneven arrival speed.

Message delay and busy

This is the biggest weakness of the Pull model. Since the initiative lies with the consumer, the consumer cannot decide exactly when to pull the latest news. If the message is pulled once, you can continue pulling it. If the message is not pulled, you need to wait for a period of time to pull it again. But how long to wait is hard to judge. You might say, I can have xx dynamic pull timing adjustment algorithm, but the essence of the problem is that it’s not up to the consumer to decide whether a message arrives or not. Maybe 1,000 messages come in a minute, and then no new messages come in for half an hour. Maybe your algorithm calculates that the next most likely time is 31 minutes from now, or 60 minutes from now, and the next message comes in 10 minutes. Isn’t that frustrating? Of course, this is not to say that there are no solutions for delays. The more mature practice in the industry is to start with a short period of time (which does not burden the broker too much) and then wait exponentially. Wait for 5ms, then 10ms, then 20ms, then 40ms… Until a message arrives, then back to 5ms. Even then, there is a latency problem: suppose a 50ms message arrives between 40ms and 80ms, the message is 30ms late, and for a message that comes every half hour, this overhead is wasted. In Alibaba’s RocketMq, there is an optimized approach, long polling, to balance the shortcomings of the push-pull model. The basic idea is that if the consumer fails in a pull attempt, instead of returning the connection directly, it suspends the connection and waits. If the server receives a new message, it notifies the connection. However, the overhead of a large number of long connection blocks on the system is not trivial, so we should evaluate the interval reasonably, and add a time limit to wait

The order message

If push mode message queue, support partitions, single partition only support a consumer spending, and consumers only after confirm a message consumption can push to send another message, but also the sender to ensure global order only, sounds can do order message, but the cost is too high, especially under each message consumption must confirm to send a message, This is a disaster for push message queues, which themselves are bottlenecks for stacking capacity and slow consumption. The pull pattern, on the other hand, is much easier if you want to do global sequential messages:

  1. Producer corresponds to partitions and is single-threaded.
  2. Consumer corresponding partition, consumption confirmation (or batch confirmation), continue to consume.

So for log push scenarios where global order is desirable but small errors are allowed, the pull mode works well. If you don’t want to see an entire log mess ~~ Anyway, the scenarios that require sequential messages are still limited and expensive, think again.

conclusion

This article starts with why message queues are used, and then focuses on how to design a message queue from scratch, including RPC, transactions, final consistency, broadcasting, message validation, and other key issues. The push and pull models of message queue are briefly analyzed. Finally, the optimization ideas of message queue performance are analyzed from the perspective of batch and asynchronous. The next part will focus on advanced topics such as storage system design, flow control and staggered peak design, fair scheduling, etc. We hope to give you an overview of message queues and provide ideas for developing message queues. In addition, this article is mainly derived from their own thinking and reading source code in the development of message queue experience, relatively not “official”, there will inevitably be some loopholes, welcome everyone to exchange more.

An advanced message queue Design chapter will follow, which will cover the following areas:

  • Pull model message system design concept
  • Storage Subsystem design
  • Flow control
  • The fair scheduling

Stay tuned

Author’s brief introduction

Wang Ye, now a programmer in the r&d team of Meituan Tourism, previously worked at Baidu, Qunar and Youku, focusing on Java background development. I have strong interest in network programming and concurrent programming. I have done some basic components and turned over some source code. I am a typical otaku. Looking forward to working with more friends on the road of coding