Writing in the front
The author has always wanted to explore message queuing MQ in depth, such as delayed consumption, broadcast messages, failure retries and other mechanisms. Alternative MQ middleware too much on market, such as kafka, activeMQ, rocketMQ... So I have been struggling with which one to start with, friendly to newcomers? Remember that the author has in-depth study xuxueli's distributed scheduling system (XXL-job), and then entered the XXL community, just found another open source project, namely distributed message queue (XXL-MQ). In short, in four words, see the big, understand allCopy the code
Ask questions
How does the consuming end consume messages? What is the data flow between the consuming end and the MQ center? 2 How do I control timeout consumption and retry consumption?Copy the code
Data flow chart
The process to summarize
1 When the consumer program starts, initialize the MQ connection configuration Bean XxlMqSpringClientFactory. This XxlMqSpringClientFactory class inherits the Spring framework's extended class ApplicationContextAware 2 Go into the ApplicationContextAware setApplicationContext() method and start initializing XxlMqClientFactory 3 Scan the classes in the project that implement the MqConsumer interface, that is, scan all topic classes and store the information in memory in List<ConsumerThread> 4 according to the topic of the different Consumerthreads Start the corresponding background polling thread, namely pull MQ message thread ConsumerThread. The execute () is to 5 ConsumerThread thread, check the thread pull xxlMqlBroker MQ message method. The pullMessages () 6 The xxlMqlBroker is now dropped by the XxlRpcReferenceBean dynamic proxy, so enter the XxlRpcReferenceBean dynamic proxy class 7 to assemble the remote call parameter XxlRpcRequest, and the underlying communication to the MQ center via netty 8 MQ Netty receives the communication request, parses the parameter XxlRpcRequest, and indexes to the target execution Method Spring serviceBean Method 9 Reflection to perform the target method XxlMqBrokerImpl. PullNewMessage (), get to the center of the MQ messages, and neety communication results returned to the consumer side 10 consumption and judge whether the news is timeouts, have by FutureTask. The get (), auxiliary tools, limit the timeout Consume () 11 After this consumption, the consumer calls the callbackMessage(MSG) function to feedback the consumption situation, that is, to prepare for the subsequent failure retry mechanismCopy the code
Source code process Debug diagram
1 When the consumer program starts, initialize the MQ connection configuration Bean XxlMqSpringClientFactory. This XxlMqSpringClientFactory class inherits the Spring framework's extended class ApplicationContextAwareCopy the code
2 Go to the ApplicationContextAware setApplicationContext() method and start initializing XxlMqClientFactoryCopy the code
Scan the classes in the project that implement the MqConsumer interface, that is, scan all the Topic classes and store the information in memory List<ConsumerThread>Copy the code
4 Start the corresponding background polling thread according to the topic of each ConsumerThread, i.e. the thread that pulls the MQ message consumerThread.execute ()Copy the code
5 to ConsumerThread thread, the thread view pull xxlMqlBroker MQ message method. The pullMessages ()Copy the code
6 xxlMqlBroker is now dropped by XxlRpcReferenceBean dynamic proxy classCopy the code
Assemble the remote call parameter XxlRpcRequest and initiate communication to the MQ center via nettyCopy the code
8 The MQ Netty receives the communication request, parses XxlRpcRequest, and indexes the target execution Method, Spring serviceBean MethodCopy the code
9 reflection to perform the target method XxlMqBrokerImpl. PullNewMessage (), get to the center of the MQ messages, and neety communication results returned to the consumers get the SQL query SELECT * FROM xxl_mq_message t the WHERE t.topic = 'topic_1' AND t.group = 'DEFAULT' AND t.status = 'NEW' AND t.effectTime < NOW() AND ((t.shardingId = 0 AND MOD(t.id, 2) = 0) OR (t.shardingId > 0 AND MOD(t.shardingId, 2) = 0)) ORDER BY t.id ASC LIMIT 100Copy the code
10 Consuming end determines whether the message has timeout limit, if yes, use futureTask.get () auxiliary tool class, if no, execute topic consumption method namely imqconsumer.consume ()Copy the code
11 After the consumption, the consumer invokes the callbackMessage(MSG) function to report the consumption status, which is to prepare for the subsequent failure retry mechanismCopy the code
Write in the last
How do individuals think about sequential consumption of messagesCopy the code