This is the 19th day of my participation in the August More Text Challenge

Author: Tom Brother wechat official account: Micro technology

To decouple systems, we often introduce an MQ framework, where everyone does their job to work together on upstream and downstream business processes.

General process:

  • At the production end, a message is created and sent to the MQ Server over the network

  • MQ stores messages in a partition of the topic

  • The consumer side pulls messages from the partition and consumes them

But reality is often different! The MQ architecture is designed to meet high concurrency, high performance, and high availability

Since a single partition cannot meet our throughput requirements, we consider adopting a multi-partition architecture design. As the saying goes, “two heads are better than one”, multi-partition can effectively share the overall pressure and improve the overall system performance.

A cluster of two MQ machines, which used to store six messages in one partition, is now split between two partitions, each storing three messages, doubling the performance of the previous one.

It seems to meet our needs, but every coin has two sides!

Let’s look at the following business scenario:

A user e-commerce sites in order to completed, will experience a series of actions, middle order status changes, an order will produce multiple MQ message, confirm the delivery of order, payment, delivery, buyers, consumers need to strictly in accordance with the order of the business state machine, otherwise, would be a business problem.

We found that the message took on state, no longer individual, but context-dependent!

For this problem, suddenly think of HTTP protocol, its itself is stateless, that is to say, before and after the two requests are not related, but some business functions have login requirements, how to solve that?

Cookie mechanism is introduced to request the client to transfer some additional data each time to achieve context association.

Returning to the MQ message ordering problem, how do we solve it?

Answer: each step back, to ensure local order.

For example, in the e-commerce example above, as long as multiple status messages of an order are kept in the same partition, business requirements can be met. This scheme can cover most business scenarios.

There just needs to be a routing policy component that decides which partition the message should go into!

Considering that there are many MQ open source frameworks on the market, such as Kafka, Pulsar, RabbitMQ, RocketMQ, etc., API methods are slightly different, but the design ideas are the same.

Next, let’s take RocketMQ as an example:

The production end provides an interface MessageQueueSelector

public interface MessageQueueSelector {
   MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

Copy the code

A select method is defined on the interface.

  • MQS: all queue fragments under this Topic

  • MSG: message to be sent

  • Arg: Parameter passed when sending a message

The RocketMQ framework provides three default implementation classes for the MessageQueueSelector interface:

  • 1, SelectMessageQueueByHash:

The absolute value of hashcode of the arg argument, then mod mqs.size() to get the subscript of the destination queue at MQS

  • 2, SelectMessageQueueByRandom:

Take a random number to the MQS. Size () value as the subscript of the destination queue in MQS

  • 3, SelectMessageQueueByMachineRoom

Returns null

Special attention:

Although the message order of the individual shard is guaranteed, the consumer of each shard can only be processed by a single thread, because multithreading cannot control the order of consumption. This might cost some performance.

This raises another question: how do you ensure that there is only one consumer in a queue?

1.

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance

  • Iterate over all MessageQueue under a topic

  • isOrder && ! This.lock (MQ) tries to lock it, ensuring that a MessageQueue can only be processed by one consumer

2. Add the PullRequest object to the pullRequestQueue of the PullMessageService

public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); }}Copy the code

3, org. Apache. Rocketmq. Client. Impl. Consumer. PullMessageService# run

  • PullMessageService is a Runnable thread task

  • An infinite loop that pulls and processes messages from the queue

Another question, how do you ensure that a queue has only one thread processing messages?

1, DefaultMQPushConsumerImpl# pullMessage

  • ConsumeMessageService has two implementation class, because we have consumption order requirements, choose ConsumeMessageOrderlyService to handle business

2、 ConsumeMessageOrderlyService.ConsumeRequest

  • Get the lock object corresponding to messageQueue from ConcurrentMap

  • Synchronized keyword, thread to preempt lock, mutually exclusive relationship, so as to ensure that a MessageQueue can only have one thread concurrent processing

Moving on, what happens if you expand?

There were originally 6 partitions, and the message of order_ID_1 was in MessageQueue6. Now the capacity is doubled to 12 partitions, and the message generated after order_ID_1 order may be routed to MessageQueue8. The message of the same order is distributed in two partitions, so the order cannot be guaranteed.

What we can do is first process the existing messages and then expand them. For online services, you can set up a temporary topic to temporarily accumulate messages and resend them according to new routing rules after capacity expansion.

Sequential messages, what if one of them fails? Will it be blocked all the time?

1. If this fails, no consumption shift is committed and the system automatically retries (with a retry limit), blocking subsequent message consumption until the message is processed

2. If the message still fails after reaching the upper limit of retry, it enters the dead-letter queue and can continue processing subsequent messages

More:

Github.com/aalansehaiy…

Author introduction: Tom brother, computer graduate student, the school recruited ali, P7 technical expert, has a patent, CSDN blog expert. Responsible for e-commerce transactions, community fresh, flow marketing, Internet finance and other businesses, many years of first-line team management experience