RocketMQ is a messaging middleware developed and open-source by Alibaba using Java. Thanks to Alibaba’s local strengths and the strength of the Java language, more and more Chinese companies are switching message queues from Kafka to RocketMQ. According to some of the big companies I have met recently, the source of the framework in the interview has been more and more to middleware questions, I think this is reasonable, after all, middleware is a more general problem solving idea, Rather than being limited to languages like frameworks (of course there are a lot of good design ideas in frameworks like Spring that we can learn from). Middleware commonly used in Java development includes message queues, RPC, registries and so on. In my opinion, the usage scenario and principle of message queue are the easiest to understand, so it is a good choice to start from message queue to understand middleware.

Kafka is currently implemented in Scala in MQ, although it is also a JVM language, which is not hard to understand. But RocketMQ in Java is definitely a better option for individuals to follow up with DEBUG and implementation ideas when they encounter problems, without worrying about being confused by some other language’s fancy writing.

I have also seen Apache Pulsar rising online recently. One of its biggest features is that it integrates the flow message model with the queue message model, so it can support both scenarios. In addition, it provides kafka-on-Pulsar (KoP) component support for Kafka applications. Users can use Pulsar directly without changing any Kafka code On the client side, which is a way to promote users to experiment, similar to TiDB’s compatibility with MySQL. However, there are few articles about Pulsar on The Chinese Internet, so I will not give more explanation for the time being.

So, starting with this article, I’ll take a step-by-step look at RocketMQ message-oriented middleware, from awareness to practice to underlying principles.

RocketMQ vs. Kafka

There are four types of MQ commonly discussed: ActiveMQ, RabbitMQ, Kafka, and RocketMQ.

Among them, ActiveMQ is no longer recommended because of its early appearance and relatively poor performance, and the community is not active at present.

RabbitMQ has a lower throughput than Kafka and RocketMQ, but it is based on Erlang and has excellent performance, high concurrency and extremely low latency of microseconds. Erlang is a black hole language in China and there are few companies that can adapt and customize RabbitMQ based on Erlang, so it is not very active in China.

So for the remaining two, let’s do an overall comparison.

First of all, Kafka is a natural fit for big data and log collection, so many domestic companies have also chosen it as an MQ component for convenience.

Back to the subject,

  • throughput

    Kakfa can write 10-byte messages per second on a single machine, while RocketMQ can write up to 120,000 messages per second. Both of these will do the job, after all, when single machine messages reach tens of thousands per second, it’s time to scale the server horizontally.

  • timeliness

    Both Kafka and RocketMQ have ms latency

  • functional

    Kafka does not support a lot of functionality, mainly around message queues for sending and receiving messages. RocketMQ is, after all, fully functional, with better support for message queries, transaction messages, and so on.

All in all, Kafka is a relatively pure messaging system. After all, it was originally intended only for log collection and transmission. RocketMQ has more extensibility and is suitable for more reliable scenarios such as finance, so the choice between the two depends on the data reliability and complexity of the business.

RocketMQ components

RocketMQ consists of four parts: Producer, Consumer, Name Server, and Broker.

  • Producer: A message publisher that uses the Name Server to obtain routing information of Broker clusters and deliver messages. The delivery process supports rapid failure

  • -Penny: Consumer. Supports push and pull mode for message consumption. Broadcasting consumption is also supported

  • Name Server: Topic Routing registry, consistent with ZooKeeper in Kafka (the latest version of Kafka has removed the binding to ZK), supports dynamic registration and discovery of brokers. NameServer tells producers and consumers about the entire Broker cluster. It also manages Broker information, provides a heartbeat mechanism, detects whether the Broker is alive, and maintains the registration of live brokers for use by Producer and Consumer routes

  • Broker: Responsible for storing messages, querying delivery, and ensuring high availability of services

First of these components and its functions have an impression, the subsequent component of each function for the dimension of the underlying source code analysis.

RocketMQ example

Run dependency configuration

Visit RocketMQ Quick Start and follow the documentation step by step.

First, environmental requirements:

  1. 64-bit operating system
  2. 64-bit JDK 1.8 and later
  3. Maven 3.2.x or later
  4. Git
  5. More than 4 GB free disk space

I use Tencent cloud server and CentOS system to build.

First download 4.8.0-source-release, this is the source package, we need to decompress, use Maven to compile.

The command is as follows:

>Unzip rocketmq - all - 4.8.0 - source - the zip
> cdRocketmq - all - 4.8.0 /
> mvn -Prelease-all -DskipTests clean install -U
> cdDistribution/target/rocketmq - 4.8.0 / rocketmq - 4.8.0
Copy the code

At this time, you can see the folder directory as shown below:

The conf directory stores broker configuration files, including cluster configuration files. The bin directory houses scripts for nameserver, Broker, and other components.

In the official documentation, the next step is to start nameserver, but bin/runserver.sh and bin/runbroker.sh are configured to use gigabytes of heap memory by default, which is obviously not enough for weak cloud servers. Therefore, we need to modify some of the VM configuration parameters.

Change the heap size parameters in bin/runserver.sh to

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
Copy the code

Modify bin/runbroker.sh to change the heap size parameters to

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
Copy the code

After the script is changed, for ali Cloud, Tencent Cloud and other users, specify brokerIP as public IP in broker configuration file, so that other machines can access the broker, modify conf/broker.conf to the following content

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# Allow automatic creation of topics that do not exist
autoCreateTopicEnable=true
For RocketMQ users deployed on cloud servers, specify this as a public IP address
brokerIP1=111.231.26.137
# specify the nameserver address for the connection
namesrvAddr=111.231.26.137:9876
Copy the code

Linux running RocketMQ

Next, run namesRV and Broker in the background as Quick Start on the official website.

#Namesrv is run in the background
nohup sh bin/mqnamesrv &
#Check whether the Namesrv process starts normally
jps
#Check whether the NamesRV startup log is normal
tail -f ~/log/rockegmqlog/namesrv.log
Copy the code

The result of successful operation is as follows:

The default namesrv.log directory exists under ~/log/rockegmqlog/. RocketMQ does not have permission to create namesrv.log in the specified directory. Run namesrv.sh again.

Next start the broker

#Run the MQbroker in the background, and the -c command is used to specify the configuration file to be used for this startup
nohup sh bin/mqbroker -c conf/broker.conf &
#JPS checks whether the broker process is starting properly
jps
#Check the broker startup log
tail -f ~/logs/rocketmqlogs/broker.log 
Copy the code

The result is as follows:

After a successful startup, you should simulate sending and receiving messages. But before we do that, we need to do two things.

The first step is to modify the vm running parameters in bin/tools.sh to set the heap memory to the following:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
Copy the code

The second is the need to set the environment variable NAMESRV_ADDR. Use export to ensure that the environment variable is valid only for this login.

export NAMESRV_ADDR=localhost:9876
Copy the code

Next, use the test to send the message

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Copy the code

The Producer class attempts to send 100 messages, as follows

SendResult [sendStatus=SEND_OK… Indicates that the message is successfully sent.

Use consumers for consumption:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Copy the code

Receive New Messages: [MessageExt [indicates that the consumption was successful.

In addition to running, we need to know how to gracefully end namesrv and broker. Before using kill -9 to kill the corresponding process, resulting in a lot of online can not find the solution of the error, so two words: elegant!

Close the broker first and then namesRV

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
Copy the code

Namesrv and Broker are then always running on the cloud server, and the application connects to RocketMQ deployed on the cloud server

Application sends messages

Create a SpringBoot application and add the RocketMQ dependency to pom.xml:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
Copy the code

First create a normal Java program to test send:

package me.oldfarmer.lab.rocketmq.producer;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("custom-producer");
        producer.setNamesrvAddr("111.231.26.137:9876");
        producer.start();

        Message message = new Message("topicA"."Hello world".getBytes(StandardCharsets.UTF_8)); SendResult sendResult = producer.send(message); System.out.println(sendResult); producer.shutdown(); }}Copy the code

The code is relatively simple, so the comments are omitted.

The running results are as follows:

Sending succeeded.

Create a consumer

package me.oldfarmer.lab.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("custom_consumer");
        consumer.setNamesrvAddr("111.231.26.137:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// Start spending at the end

        consumer.subscribe("topic"."*"); // Consume all messages under topic
        // Register a message listener to process messages when they are received
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            MessageExt messageExt = list.get(0);
            try {
                System.out.println(messageExt);
            } catch (Exception e) {
                e.printStackTrace();
                // You need to tell the broker how to handle the message based on the result of the message processing
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}Copy the code

The running results are as follows:

So there are four of them printed because of how many times they were sent in the previous test.

It turns out that the consumer processed the message according to the listener after consuming it.


This concludes RocketMQ’s primer, which is relatively easy. It then breaks down what each component does in terms of the major steps.