background
With the rapid development of the company, the amount of messages pushed every day is increasing. The old message system has been increasingly unable to meet the functional requirements of the current push scenario, so we started to build a complete message system from 0 to 1.
The past of messaging platforms
There were various pain points and challenges in the original architecture:
- Slow access and slow transmission
Service access is slow, and different apis are required for sending different types of messages. The processing of message consumption is slow, which affects activity operation experience.
- Traffic analysis does not. Each service directly affects each other
The statistics on service usage are not clear, and traffic limiting for different services cannot be performed.
- Lack of special message priority push
In the absence of priority messages, when marketing messages are pushed in large quantities, normal order-related messages will pile up and block will be delayed.
Optimization through the following four aspects:
-
Unified access interfaces and service identification
-
Send faster, single consumption to multiple simultaneous consumption
-
Support message priority processing
-
Switch the data source storage and select mongodb with writes higher and reads lower than ES
The present of messaging platforms
The current overall architecture of the messaging platform *
How is the priority queue implemented in the above process?
At present, the priority of the message platform adopts two kinds of sending implementation. First, the traditional message queue Kafka is used for a priority sending and consumption, followed by the priority thread pool task for priority message sending.
Queue priority
Kafka itself does not support priority. We do this by artificially sending different priority messages to different Kafka queues.
Different topics are created and messages of different priorities are sent to different topics. Meanwhile, during message consumption, data of different topics are obtained for consumption in different proportions.
The current order is that the highest priority is twice as high as the second highest, and the order goes on. The last remaining pull message values are added to the highest priority for example, 50 pull messages at a time, 3 topics, then we are (25 + 5/13/7) pull.
Without the high priority messages, how can the low priority messages be pulled to the full load, i.e., 50 messages are pulled at a time according to the above lowest priority messages?
The message pull state machine is introduced to increase the consumption of low priority messages. The current message service state machine, initialization, low load, high load and other states, by judging the number of messages processed last time to determine the current state of the message consumer and pull parameter modification, the current use of reflection to modify the number of Kafka pull.
To speed up delivery we also use local thread pools, local thread tasks, we use task priority queues. Here is the process for submitting a thread task.In the figure above, we uniquely determine the execution priority of a thread task by comparing it with a previously defined priority value with an incremented sequence number.
How do we implement the delay queue in the overall flow chart?
First, we define the delay/timing policy. There are several strategies:
- Message push over 30 minutes
- Notification push less than 30 minutes
- Message push below 15s
After we distinguish the above three types of messages with delayed timing, there are different implementation methods. When it was less than 15s, we directly used Java’s own delayTask to judge and push messages. For those higher than 15s and lower than 30 minutes, we created a single time wheel of second basic level to push messages. The following is the implementation of the time wheel.However, we have partially optimized this by referring to Kafka’s delay queue. When there are no tasks in the time wheel that need to be executed, we directly wait for the executing thread until the next task submits notify. For delayed tasks longer than 30 minutes, we generally store message tasks first. Before the task is about to execute 30 minutes, we add the task data to the time wheel of second level and send messages according to the second method. (Why not use the day/hour time wheel, just don’t want to waste memory)
In the process of message push, it is necessary for users to prevent fatigue. At present, anti-fatigue scenarios of message center mainly include the following types:
- The user does not receive M messages in N days
- A scenario does not receive M messages in N days
- A specific service receives only one message in a day
In the above scenarios, we mainly use Mongo for data storage and aggregation query, because redis will be frequently operated in multi-user scenarios, which is not very good. And most of the anti-fatigue data we only keep for 1 week at most, the collection of Mongo, it is easy to meet our functions. Of course, in the scenario where a specific business can only receive one message in a day, we use Redis helperLogLog to prevent fatigue and reduce query and memory consumption. Although there is a little error, but the impact of our use scenario is not very high.
Finally, we summarize the points that need to be considered in building a message platform from our actual combat:
- Simple and easy access
- Fast response without affecting services
- Emergency messages are delivered to users in the first place and can be classified
- Messages are traceable and retractable
- Effect visualization
- Content security
Pay attention to the object technology, hand in hand to the cloud of technology