RocketMQ message sending and message consumption

Message is sent

In RocketMQ, producers write messages to message queues. Different business scenarios require different write strategies for producers.

Such as synchronous send, asynchronous send, Oneway send, delay send, send transaction messages and so on.

Procedure for sending messages

The DefaultMQProducer class is used by default, and sending a message goes through five steps

  • 1) Set the GroupName of Producer.

  • 2) set InstanceName (InstanceName). When a Jvm needs to start multiple producers, set InstanceName to distinguish each Producer. Otherwise, the system uses the DEFAULT name “DEFAULT”.

  • 3) Set the retry times of sending failures. When the network is abnormal, this number affects the retry times of sending messages. To ensure that messages are not lost, you can set several retries.

  • 4) Set the NameServer address

  • 5) Assemble the message and send it.

The message has returned to the status

Message occurrence return status (SendResult#SendStatus) has the following four types:

  • FLUSH_DISK_TIMEOUT

Indicates that the flush was not completed within the specified time (this error is reported only if the Broker’s flush policy is set to SYNC_FLUSH).

  • FLUSH_SLAVE_TIMEOUT

Indicates that in active/standby mode, the Broker is set to SYNC_MASTER and the primary/secondary synchronization is not completed within the specified time.

  • SLAVE_NOT_AVAILABLE

This situation is similar to FLUSH_SLAVE_TIMEOUT. In flush_master mode, the Broker is set to SYNC_MASTER, but no Broker configured as Slave is found.

  • SEND_OK

The message is sent successfully. For example, has the message been saved to a disk? Are messages synchronized to the Slave? Are messages written to disk on the Slave? This parameter is determined based on the configured disk flushing policy and primary/secondary policy. This state can also be simply interpreted as SEND_OK if the three problems listed above did not occur

How can I improve write performance

Sending a message takes three steps

  • 1. The client sends a request to the server.
  • 2. The server processes the request.
  • 3. The server replies the client

The sending time of a message is the sum of the previous three steps.

Oneway mode

The Oneway mode can be used in scenarios that require high speed but low reliability, such as log collection applications.

Oneway only sends a request and does not wait for an answer. That is, data is written into the Socket buffer of the client and is returned without waiting for a result.

The time it takes to send a message in this way can be reduced to microseconds.

Increase the number of concurrent producers and use multiple producers to send simultaneously

RocketMQ introduces a concurrent window in which messages can be concurrently written to DirectMem and then asynchronously brushed into the file system as contiguous chunks of null data.

Commitlogs enable RocketMQ to maintain high write performance on both HDDS and SSDS.

At present, the write performance of ali internal optimized server reaches 900,000 + TPS, we can refer to this data for system optimization.

You are advised to use the EXT4 file system and the DEADLINE algorithm for I/O scheduling.

News consumption

Message consumption mainly consists of the following parts

  • 1. Message consumption (Pull and Push)
  • 2. Message consumption mode (broadcast mode and cluster mode)
  • 3. Flow control (can be combined with Sentinel, explained separately later)
  • 4. Set the number of concurrent threads
  • 5. The message filter (Tag, Key) TagA | | TagB | | TagC * null

Consumption patterns Push or Pull

RocketMQ message subscription has two modes, one is Push mode (MQPushConsumer), where the MQServer actively pushes to the consumer; The other is the Pull mode (MQPullConsumer), where the consumer proactively pulls from the MQ Server when needed. However, when implemented, both Push and Pull patterns are essentially consumer Pull modes, i.e. consumer polling pulls messages from the broker.

  • push

The advantage is high real-time. The disadvantage is that the processing capacity of the consumer terminal is limited. When a lot of messages are pushed to the consumer terminal instantly, it is easy to cause the message backlog of the consumer terminal, which will crush the client.

  • pull

The advantage is that the initiative is in the hands of the consumer side, according to their own processing capacity. The disadvantage is how to control the frequency of Pull. If the interval is too long, we worry about the impact on timeliness; if the interval is too short, we worry about the waste of resources by doing too much “useless work”. A compromise is long polling.

  • Differences between Push mode and Pull mode

In Push mode, the consumer encapsulates the action of long polling and registers the MessageListener listener. After receiving the message, the consumer wakes up the MessageListener consumeMessage() to consume it. To the user, the message feels pushed.

In the Pull mode, the process of message retrieval needs to be actively invoked by users. First, they get the collection of MessageQueue through the Topic they intend to consume, traverse the collection of MessageQueue, and then fetch messages in batches for each MessageQueue. The next starting offset of the queue is recorded until the queue is finished, and then another MessageQueue.

Improve Consumer processing power

  • 1. Improve the parallelism of consumption

Within the same Consumer group (Clustering), you can increase parallelism by increasing the number of Consumer instances.

You can increase the number of Consumer instances either by adding machines or by starting multiple Consumer processes on existing machines.

Note: The total number of consumers should not exceed the number of Read queues in Topic. Any more Consumer instances will not receive messages.

In addition, you can increase parallelism within the same Consumer to improve throughput by increasing the number of parallel processing threads in a single Consumer instance (by modifying consumeThreadMin and consumeThreadMax).

  • 2. Buy in bulk

In some service scenarios, the time required to process multiple messages simultaneously is much shorter than the total time required to process multiple messages one by one. For example, if a consumption message involves updating a database, the time required to process 10 updates at a time is much shorter than the time required to process 10 updates at a time.

Throughput of consumption can be increased by consuming in bulk. Implementation method is to set up the Consumer consumeMessageBatchMaxSize this parameter, the default is 1, if set to N, in news more each time he received a list of length N.

  • 3. Check the delay and skip non-important messages

In the process of consumption, if consumers find serious message accumulation for some reason and the accumulation cannot be eliminated in a short time, they can choose to discard unimportant messages so that they can catch up with the Producer as soon as possible.