Talking about message queues can still be a bit overwhelming.

Message queues, caches, and tables are the three musketeers of high-concurrency solutions, and message queues are my favorite and most thought-through technology.

I’d like to share my story with message queues in four phases, as well as a review of my technological growth.

  • First: ActiveMQ
  • Advanced: Redis&RabbitMQ
  • Sublimation: MetaQ
  • Love: RocketMQ

1 first ActiveMQ

1.1 Asynchrony & decoupling

In early 2011, I was doing RESEARCH and development in an Internet lottery company.

I am responsible for the user center system, providing user registration, query, modify and other basic functions. After a user successfully registers, the user needs to send an SMS message to the user.

Because the original is process-oriented programming, I put the new user module and send SMS module together.

Everything was fine at first, but then the problems began to manifest themselves.

  • SMS channels are not stable, and sending SMS messages takes about 5 seconds, which takes a lot of time for users to register interfaces and affects front-end user experience.
  • When the SMS channel interface changes, the user-centric code must be modified. But the user center is the core system. You have to be careful every time you go online. It feels awkward when a non-core function affects the core system.

For the first problem, I can take the thread pool approach, mainly asynchronous. But the second question left me stumped.

I asked the technical manager, who told me to introduce message queues to solve the problem.

  • Separate the SMS sending function into an independent Job service;
  • User center After a user is successfully registered, the Job service sends a message to the message queue. After receiving the message, the Job service invokes the SMS service to send short messages.

At this point, I realized that the core function of message queues is asynchronous and decoupled.

1.2 Dispatching Center

The business of lottery systems is complex. In the lottery order life cycle, through the creation, molecular order, ticket, prize calculation and many other links. Each link needs different service processing, each system has its own independent table, business function is relatively independent. If every application were to modify the master order table, it would be quite confusing.

The company’s architects designed itDispatch centerThe responsibility of the dispatch center is to maintain the order core state machine, order reward process, lottery core data generation.

The dispatch center transmits and exchanges information through message queue, ticket gateway, lottery service and other systems.

This design at that time in my eyes, it is simply water droplets vs human fleet, dimension reduction strike.

As I learned more about the business, I vaguely thought, “Good architecture is simple and should be easy to maintain.”

When the lottery business daily tens of millions of transactions, the dispatch center of r & D maintenance personnel also only two people. Dispatch center source code in the business logic, logs, code specifications are excellent.

In the future of my programming life, I will subconsciously imitate the coding way of the dispatch center, “not to play strange tricks, the code is for people to read”.

1.3 Restart method

With the lottery business exploding from 300,000 messages a day to around 1.5 to 2 million, things seem to be going smoothly.

On a day when the bicolor betting closes, the dispatch center cannot consume data from the message queue. The message bus is in a sending, not receiving state. The whole technical team is in a state of extreme anxiety, “if the ticket can not be issued, that is the loss of millions of ah, if the user won two dichromatic balls? That’s a million.” People are like ants on hot bricks.

It was also the first time that the entire technical team encountered the situation of consumption accumulation, and no one had any experience.

The first thing that comes to mind is to deploy several more dispatch center services. After deployment, the dispatch center still hangs after consuming thousands of messages. At this point, the architect can only resort to a reset strategy. You read that right. It’s the restart method. It’s a shame, but it was the only way.

The dispatch center rebooted, spent 10, 000 and then hung. Only one more reboot. Back and forth over 20 times, like squeezing toothpaste. And as the deadline for issuing tickets approaches, the tension and fear in the mind intensifies. Finally, after an hour of manual rebooting, the message was consumed.

I happened to be reading Bi Xuan’s “Distributed Java Application Basics and Practice” and wondered if the thread was blocked, so I used the Jstack command to check the stack situation. As expected, threads are blocked on the method of submitting data.

We immediately communicated with the DBA and found that the Oracle database executed a lot of large transactions. Each large transaction took more than 30 minutes to execute, which caused the scheduling thread of the scheduling center to be blocked.

Later, the technical Department adopted the following solutions to avoid the accumulation problem:

  1. When a producer sends a message, it divides a large message into multiple batches, reducing the probability that the dispatch center will execute a large transaction.
  2. Data source configuration parameter, if the transaction execution exceeds a certain time, automatically throw exception, rollback.

1.4 checking

The Spring-packaged ActiveMQ API is very simple and easy to use, which is really comfortable to use.

Limited by the technical level and vision of the lottery technical team at that time, we encountered some problems in using ActiveMQ.

  1. High swallow down, accumulated to a certain amount of information easy to Hang;

The technical team found that in the scenario with particularly high throughput, ActiveMQ was less likely to Hang if the message pile was larger.

Some messages do not need to be consumed immediately. However, in order to avoid the problem of message queue hanging, when consuming data, the ticket gateway first persists the messages to the local disk, generates local XML files, and then asynchronously executes the messages. In this way, we have greatly increased the consumption speed of the ticket issuing gateway and basically eliminated the accumulation of the ticket issuing gateway queue.

However, this approach also feels strange. When consuming a message, you have to store a copy of the data locally. The message is stored locally, and there is a risk of losing the message if the disk fails.

  1. High availability mechanism needs to be improved

We adopt the master/slave deployment mode, with one master and one slave, and the server configuration is 4-core and 8G.

In this deployment mode, two ActivemQs can be run at the same time and only one slave is allowed to connect to the Master, that is to say, there can only be two MQ clusters. There is a data backup channel between the two services, through which the Master can back up data one-way to the slave. This solution is not convenient in actual production lines, because when the Master hangs up, the Slave cannot automatically receive requests from the Client. Manual intervention is required, and the Slave needs to stop and restart the Master to recover the load cluster.

There are also strange cases of message loss where the producer sends a message successfully, but the master console cannot find it, but the slave console can actually find the message.

However, there is no way for consumers to consume the messages on the slave, and human intervention is required to process them.

2 advanced Redis&RabbitMQ

In 2014, I was engaged in the optimization of red envelope system and coupon system in Elong.

2.1 Can Redis do message queues

The hotel coupon calculation service uses the first-generation streaming computing frameworkStorm. Storm will not be covered in detail here, please refer to the following logic diagram:

Here our Storm cluster water source (data source) is redis cluster, usinglistThe data structure implements the push/ POP function of the message queue.

The overall flow of streaming computing:

  1. The hotel information service sends hotel information to Redis cluster A/B;
  2. Storm spout gets data from Redis cluster A/B and sends tuple messages to Bolt.
  3. Bolt components receive messages and clean the data using operational configuration rules;
  4. Finally Storm sends the processed data to Redis cluster C;
  5. The storage service obtains data from Redis cluster C and stores the data to the database.
  6. The search team scans the database tables to generate indexes.

The streaming service processes tens of millions of pieces of data a day, and it does so fairly well. But there were different voices within the team:

  • – Storm occasionally lost messages during topology update, or when the coupon service was restarted. However, message loss is not so sensitive to the business, and we also provide manual refresh function, which is also tolerated by the business.
  • The team needs to keep a constant eye on the Redis cache usage, worrying that the Redis queue will pile up and cause out of memory.
  • The architect believes that it is not decoupled enough for the search team to scan the database directly, and recommends replacing Redis cluster C with Kafka. The search team consumes messages directly from Kafka and generates indexes.

I think using Redis for message queuing should meet the following conditions:

  1. Tolerate small probability of message loss and achieve the final consistent business scenario through scheduled tasks or manual triggering.
  2. The probability of message accumulation is low, and there is related alarm monitoring;
  3. The consumer’s consumption model should be simple enough.

2.2 RabbitMQ is a tube, not a pool

RabbitMQ is written in the Erlang language. RabbitMQ fulfills two of my needs:

  1. High availability mechanism. Elong uses mirror high availability mode internally, and this mode has been used for a long time in Elong, and its stability has been verified to some extent.
  2. The RabbitMQ system I’m in charge of also handles around a million messages a day, which are sent and consumed perfectly.

Original use of coupon serviceSqlServerDue to the large amount of data, the technical team decided to use the strategy of separate database and separate table, using the distributed database DDA independently developed by the company.

To test the stability of the DDA for the first time using a distributed database, we simulated sending 10 million messages to RabbitMQ, then refactoring the coupon service consumption messages and hashing them to different mysql libraries by user number.

The RabbitMQ cluster mode is mirrored high availability with 3 servers, each configured with 4 cores and 8 GB.

We were sending messages at a rate of 3 million messages per hour, and for the first hour producers and consumers were doing well, but because consumers couldn’t keep up with producers, there was a backlog in the message queue. By the third hour, the message queue had accumulated more than 5 million messages, and the speed at which the producer sent the message had ballooned from 2 milliseconds to around 500 milliseconds. RabbitMQ console has blood splattered on the spot, red alert.

This was an unintentional test and RabbitMQ is good, but RabbitMQ’s support for message stacking is not good, which can cause RabbitMQ performance to degrade dramatically when a large number of messages are piled up.

A friend of mine said to me, “RabbitMQ is a pipe, you must use it as a pool?”

With the surge of the whole Internet data volume, appropriate accumulation is allowed in many business scenarios, as long as consumers can maintain stable consumption and there is no big fluctuation in the whole business.

I have come to believe that message queues can be used as both pipes and pools.

3 sublimation MetaQ

The origins of Metamorphosis began with my study of linkedin’s open SOURCE MQ – now migrated to Apache’s Kafka, an MQ system with a unique design that uses pull rather than the push model of MQ. It makes a lot of use of ZooKeeper for service discovery and offset storage. I really appreciate and agree with its design concept, and I strongly recommend you to read its design document. In general, metamorphosis design is completely consistent with it. — Zhuang Xiaodan, author of MetaQ

3.1 Stunning consumer model

In 2015, I was mainly engaged in the research and development of China Special car orders.

MetaQ fulfilled my fantasy of message queues: “distributed, high-throughput, high-heap”.

MetaQ supports two consumption models: cluster consumption and broadcast consumption, because the consumer models used before are all queue models. When I first came into contact with this publishing and subscription model, I was amazed.

Cluster consumption

After the order is successfully created, a message is sent to MetaQ. This message can be consumed by either the delivery service or the BI service.

Broadcast consumption

The delivery service sends a push message to the driver when the order is assigned to him. Push is achieved with the mode of broadcast consumption.

The general process is as follows:

  1. The driver side push service is a TCP service. After starting, it uses the broadcast mode to consume MetaQ’s PushTopic.
  2. The driver will periodically send TCP requests to the push service. After the authentication is successful, the push service will save the reference of the driver number and channel.
  3. The delivery service sends push messages to MetaQ.
  4. Each machine of the push service will receive the message, and then determine whether there is a channel reference of the driver in memory. If there is, the message will be pushed.

This is a classic case of broadcast consumption. I have studied the design of Jingmai TCP gateway, and its push also adopts a similar way.

3.2 Radical peak elimination

2015 was the year of taxi war.

For Shenzhou special car, with the increasing number of orders, happy at the same time, performance pressure is increasing. In the morning and evening peak hours, when users take a taxi, they often click the order and often have no response. At the system level, the ride-hailing API gateway found that the performance of the order service declined dramatically due to massive timeouts. At the database level, the pressure is even greater. It takes 8 seconds to insert a record at peak time.

The whole technical team needs to improve the performance of the private car system as soon as possible. Previously, the database has been split according to the module field. But the system’s bottlenecks are still evident.

We came up with something that now seems a little radical:

  1. Design order caching. If you’re interested in caching, we can talk about it later, there’s a lot to talk about;
  2. In the passenger carrying life cycle of an order, the modification operation of an order first modifies the cache, and then sends a message to MetaQ, the consumption message of the order falling disk service, and determines whether the order information is normal (such as whether it is out of order). If the order data is correct, it is stored in the database.

Here are two details:

  1. Consumers need to consume in order, which is realized by routing to different partitions according to the order number. Messages with the same order number are sent to the same partition every time.

  2. A daemon task that periodically polls ongoing orders, fixes data when the cache is inconsistent with the data, and sends an alarm.

This optimization greatly improved the overall performance of the order service, and also laid a solid foundation for the later order service library division, table division and heterogeneity. According to our statistics, there was basically no inconsistent scene between the cache and the database. However, this solution requires high cache availability, which is a bit radical.

3.3 Message SDK encapsulation

Those of you who have done infrastructure may have experience: “Three-party components will encapsulate one layer”. The Shenzhou architecture team also encapsulates metaq-Client with one layer.

In my mind, packaging a layer can reduce the mental commitment of developers to using third-party components, unify the technology stack, and that’s it.

Until an accident happened, and my mind took a turn for the better. It was one afternoon and the whole private car service crashed for a long time. The tech team found: “Private cars use ZooKeeper for service discovery. “The zK cluster leader machine is down and keeps selecting the master.”

After a temporary fix, we found that MetaQ and service discovery were both using the same ZK cluster, and the consumer offset submission and load balancing were both doing a lot of writes to the ZK cluster.

In order to reduce MetaQ’s influence on ZK clusters, our goal was: “MetaQ uses a separate ZK cluster”.

  1. A new ZK cluster needs to be deployed;
  2. MetaQ’s ZK data needs to be synchronized to the new cluster;
  3. When you switch to a new cluster, application services are not aware of the switch.

I was curious to ask the architecture department student, he said the new cluster has been deployed, but need to synchronize ZK data to the new cluster. He added a double write operation to the client. In other words, we will write a copy of the original ZK cluster, and also write a copy of the new ZK cluster. After a few weeks, MetaQ was done using a separate ZK cluster.

This experience brought me a great feeling: “Can still play like this?” It also got me thinking: Three-way component encapsulation is not as easy as you think.

We can take a look at the SDK encapsulation strategy of Kuaishou Message:

  1. Only the basic API is provided. All access must go through the interface provided by the SDK. A clean API is like the tip of the iceberg, everything below can be updated without breaking compatibility, except for simple interfaces to the outside world;
  2. Businesses are also easy to develop, producing and consuming as long as they need to provide topics (globally unique) and groups, without providing environments, NameServer addresses, and so on. The SDK internally resolves the address of the cluster NameServer according to Topic, and then connects the corresponding cluster. Production and test environments resolve different addresses to achieve isolation;
  3. The figure is divided into three layers. The second layer is generic and the third layer corresponds to the specific MQ implementation, so it can theoretically be replaced with other messaging middleware without modification of the client program.
  4. The SDK integrates the hot change mechanism to dynamically configure clients without restarting them, such as delivering routing policies (changing the NameServer address of the cluster or connecting to another cluster), the number of Client threads, and the timeout duration. With Maven’s force-update mechanism, you can ensure that the SDKS used by your business are basically up to date.

3.4 MetaQ Reconstruction and self-contained system

I have a habit of “always meeting with operations, DBAs, and architects to see if there is a problem with the current system and how they are going to solve it. This gives me another perspective on how the company’s systems are working.

MetaQ has its drawbacks.

  1. MetaQ’s basic communication framework was GECko. MetaQ occasionally had NO response from RPC and application suspended animation, so it was difficult to locate problems.
  2. MetaQ’s operation and maintenance capabilities are weak. It only has a simple Dashboard interface, which cannot realize automatic subject application and message tracking.

The other day, I noticed that a consumer server in my test environment started up and kept reporting problems with abnormal links and high CPU usage. I immediately checked with the netstat command and found that several hundred links had been created. Out of curiosity, I opened the source code and found that the network communication framework Gecko had been replaced with Netty. We’ll get in touch with the students in the architecture department right away.

It dawned on me: They had already started refactoring MetaQ. I never thought about refactoring open source software because it was too far away for me. Or at that time, I didn’t think I was up to it.

Later, the message queue developed by Shenzhou has become its own system and has been running well in the production environment.

To this day, I still admire the China Architecture team. They developed message queue, DataLink (data heterogeneity middleware), sub-database sub-table middleware and so on. They are willing to innovate and have the courage to make a better technological product.

I learned a lot from them.

Maybe the seed was planted in my heart when I saw them reconstructing MetaQ.

4 love RocketMQ

4.1 Open source feast

In 2014, I searched a lot of information about taobao’s message queue. I knew that the version of MetaQ had been upgraded to MetaQ 3.0, but the open source version had not been released.

Around the fall, I joined the RocketMQ technology group. [RocketMQ founder] said in the group: “Open source is coming soon, fork when it comes out.” His words sent in the group, the group are fried pot. I look forward to meeting Ali’s own internal messaging middleware soon.

Finally, RocketMQ is open source. I can’t wait to catch a glimpse of him.

Because I want to learn network programming, and RocketMQ’s communication module remoting is also written by Netty. So, the communications layer of RocketMQ is where I started to learn.

I wrote a toy RPC modeled after RocketMQ’s Remoting, which greatly boosted my confidence. Just in time, Elong held technological innovation activities. Let me see. Maybe we could rewrite Cobar’s communication module using Netty. So I spent two weeks writing a Netty version of proxy by referring to the source code of Cobar. In fact, it was very rough and many functions were not perfect. Later, this activity gave me a commendation award, now think very funny.

Because of the use of MetaQ in Shenzhou uber, I was also more comfortable learning RocketMQ. To really understand the source code, I often refer to RocketMQ’s source code and write some wheels to verify my learning.

Although I have done some exercises myself, I have never used them in a business environment. 2018 was the year I really used RocketMQ, and it was a year of gain.

SMS service

SMS services are widely used, such as user registration and login verification code, marketing SMS, successful order SMS notification and so on. When I first designed SMS, I wanted to learn how the industry works. So the target is locked in Tencent cloud SMS service. Tencent cloud SMS service has the following characteristics:

  • Unified SDK, the back-end entrance is HTTP/HTTPS service, allocate appId/appSecret authentication;
  • Simple API design: single hair, group hair, marketing single hair, marketing group hair, template single hair, template group hair.

So, I referred to this design idea.

  1. Imitate Tencent cloud SDK design, provide simple and easy to use SMS interface;
  2. Design SMS service API end, receive SMS request, send SMS information to message queue;
  3. Worker service consumes messages and invokes the SMS interfaces of different channel providers according to the load balancing algorithm.
  4. Dashboard allows you to view SMS sending records and configure channel provider information.

SMS was my first real production use of RocketMQ, and it was a great feeling of accomplishment when the messages came one by one.

MQ Console

Those of you who have used RocketMQ will be familiar with the console shown above. The team had multiple RocketMQ clusters, each requiring a separate console to be deployed. So I wondered if I could modify the console slightly to support multiple clusters.

So he rolled up his sleeves and got to work. In about 20 days, we adapted the open source version to support multiple clusters. After finishing, although it can satisfy my initial idea, it is very rough. And Sohu open source their own MQCloud, after I look at their design, I think it is far from a message governance platform.

Later, I read the two articles “Netease Cloud Music’s Transformation of Message Queue” and “Toutiao’s Practice and Thinking on The Construction of Message Service Platform and Disaster Recovery System”. The more I could not stand the itch, I really wanted to build a message governance platform in a real sense. Unfortunately, there has been no scene and opportunity, or a bit of a pity.

Recently, I read an article by Liang Yong, a harrow architecture expert, “Harrow’s practice in distributed message governance and micro-service governance”, which I recommend you to read.

Mp.weixin.qq.com/s/N-vd6he4n…

A window, began to develop components

Later, I tried RocketMQ even further.

  • Dons style encapsulation message SDK;
  • The o&M side smoothly expands message queues.
  • Production DefaultMQPullConsumer consumption mode attempt

After these are done, we have developed the registry, configuration center, task scheduling system. When designing these systems, I took a lot of inspiration from the RocketMQ source code, and although it now seems that there are a lot of design flaws and code quality needs to be improved, it has given me a lot of confidence.

RocketMQ has given me a window into the wider Java world. To me, this is the feast of open source.

4.2 Kafka: An integral part of the big data ecosystem

Kafka is a distributed message flow processing middleware with high throughput, persistence, horizontal expansion, supporting streaming data processing and other features. It adopts distributed message publishing and subscription mechanism, and has a wide range of applications in log collection, streaming data transmission, online/offline system analysis, real-time monitoring and other fields.

Log Sync

In the design of large-scale service systems, it is usually necessary to analyze and process the logs of each system application in a centralized manner in order to quickly locate problems, track logs of the whole link, and monitor faults in a timely manner.

Kafka is designed to handle massive log transfer scenarios. The application synchronizes log messages to the messaging service in a reliable asynchronous manner, analyzes logs in real time or offline through other components, and collects critical log information for application monitoring.

Log synchronization consists of three key parts: log collection client, Kafka message queue and log processing application at the back end.

  1. The log collection client is responsible for collecting log data of various application services and sending logs in batches and asynchronously to the Kafka client in the form of messages.

Kafka clients commit and compress messages in batches with minimal impact on application service performance. 2. Kafka stores logs in message files for persistence. 3. Log processing applications, such as Logstash, subscribe to and consume log messages in Kafka for file search services to retrieve the logs, or Kafka passes the messages to Hadoop and other big data applications for systematic storage and analysis.

Log synchronization diagram:

Flow computing processing

In many fields, such as stock market trend analysis, meteorological data measurement and control, and website user behavior analysis, due to fast data generation, strong real-time and large volume, it is difficult to uniformly collect these data and store them in the database before processing, which leads to the traditional data processing architecture can not meet the needs. Kafka and stream computing engines such as Storm, Samza and Spark are developed to better solve the problems encountered in the process of processing such data. The stream computing model can capture and process data in real time during the process of data flow, and perform calculation and analysis according to business requirements. Finally, the results are saved or distributed to the required components.

Data transfer hub

Over the past 10 years, specialized systems such as KV storage (HBase), ElasticSearch (ElasticSearch), streaming processing (Storm, Spark, Samza), and Timing database (OpenTSDB) have emerged. These systems were developed for a single purpose, their simplicity making it easier and more cost-effective to build distributed systems on commercial hardware. Typically, the same data set needs to be injected into multiple dedicated systems. For example, when applying the log used for offline log analysis, search a single logging is indispensible, and build a separate workflow to each type of data to import to their dedicated system obviously unrealistic, using the message queue Kafka version as the data transfer hub, with copy of data can be imported into the various dedicated system.

The following figure shows the architecture diagram of meituan MySQL data real-time synchronization to Hive, which is also a very classic case.

4.3 How to select the technology

Databases are specializing — the “one size fits all” approach no longer applies —– MongoDB design philosophy

Point one: There is the scenario, and then there is the technology for that scenario. Choose what technology for what scenario.

Second point: the reality is often very complicated, when we really do technology selection, and need to be implemented, technical reserve and cost are two key factors we need to consider.

Technology Reserve

  • Whether the technical team has experience in using this technology, whether they have stepped on potholes in production environment, and whether there is a comprehensive solution for these potholes;
  • Does the architecture team have a mature SDK, toolchain, or even technical product?

▍ cost

  • R&d, testing, operation and maintenance labor costs;
  • Server resource cost;
  • Recruitment costs, etc.

The last point is the human factor, especially the managerial factor. Every big technology selection test technology manager’s vision, pattern, and management wisdom.

5 Write to the end

While writing these stories, HAN’s speech during the promotion of “The Continent” kept coming back to me.

I don’t think there is anything unreasonable in this world, really, you can’t do things well without a lot of accumulation and a lot of thinking… In short, after experiencing this film, I think I have too much to learn. There are too many able people in the world. The limits you think are just starting points for others if they fail. So only keep progressive, can not lose face. Well, you don’t have to go to school, but you have to study, really.

Just learn a little every day and make a little progress. Let’s go for it.

Next time, I’d like to hand hand a message queue with you. It must be fun.


If my article is helpful to you, please like, review and forward it. Your support will encourage me to output higher quality articles. Thank you very much!

[Yongge Java combat sharing]