Cabbage Java study room covers core knowledge
RocketMQ (part 1)
1. RocketMQ profile
Originally called MetaQ, RocketMQ changed its name to RocketMQ when MeataQ released version 3.0. It was designed in a similar way to Kafka, but unlike Kafka, it was developed in Java. With a much larger domestic Java audience than Scala, RocketMQ is the first choice for many Java-focused companies. Similarly, RocketMQ and Kafka are both top projects in the Apache Foundation, with high community activity and rapid iterations.
2. RocketMQ architecture diagram
The RocketMQ architecture diagram is not very different from Kafka’s in general, but there are many differences in many details, which will be covered next.
3. RocketMQ nouns
In the RocketMQ architecture diagram, there are multiple producers, multiple master brokers, and multiple slave brokers. Each Producer can correspond to multiple topics, and each Consumer can also consume multiple topics.
The Broker information is reported to the NameServer, from which the Consumer pulls the Broker and Topic information.
-
Producer: Message Producer, a client that sends messages to the Broker
-
Consumer: Message Consumer, the client that reads messages from the Broker
-
Broker: Unlike Kafka, where the Broker can write requests and back up data to other nodes, RocketMQ can only be written to and usually read from the master Broker. Secondary node reads are used when the primary node fails or in some other special case, similar to mysql’s master-slave schema.
-
Topic: A first-class message type to which producers send messages and consumers read their messages.
-
ConsumerGroup: ProducerGroup, ProducerGroup, ConsumerGroup, which represents a certain category of producers and consumers. Generally speaking, the same service can be used as a Group. The same Group generally sends and consumes the same messages.
-
Tag: Kafka does not have this concept, Tag is a secondary message type, generally speaking, business associated can use the same Tag, such as order message queue, use Topic_Order, Tag can be divided into Tag_ food order, Tag_ clothing order and so on.
-
Queue: A Partition in Kafka is an ordered Queue. RocketMQ is divided into read and write queues. Generally, the number of read and write queues is the same.
-
NameServer: In Kafka, ZooKeeper is used to store the address of the Broker and the election of the Leader. RocketMQ uses a stateless NameServer instead of a voting Broker strategy. Since NameServer is stateless, cluster nodes do not communicate with each other. Therefore, data is uploaded to all nodes.
Many of my friends ask what statelessness is. A stateless service is a memory service. NameServer itself is a memory service. All data is stored in memory and will be lost after a restart.
4. RocketMQ Topic and Queue
For each message in RocketMQ, there is a Topic that distinguishes the different messages. A topic typically has multiple message subscribers, and when a producer publishes a message to a topic, consumers subscribed to the topic receive new messages written by the producer.
There are several queues in a Topic, which is actually the smallest unit in which we can send/read messages. We need to specify a Queue when we send messages, and we need to specify a Queue when we pull messages. So our sequential messages can be kept in order based on our Queue dimension, and if we want to be globally ordered then we need to set the Queue size to 1, so that all the data will be in order in the Queue.
In the figure above, our Producer selects a Queue using a number of strategies:
-
Non-sequential messages: Generally, non-sequential messages are sent in rotation mode.
-
Sequential messages: Hash the same type of data in the same queue based on a Key such as the common order Id, user Id, to ensure that we are sequential.
Our group of consumers also selects queues based on a number of policies, such as average allocation or consistent Hash allocation. Rebalance is required when the Consumer is offline or online. This is the Rebalance mechanism for RocketMQ.
- Pull the latest information from the broker and topic regularly
- Rebalance every 20 seconds
- Select one main Broker for the current Topic at random. It is important to note that all main brokers will be selected for each rebalance, as there may be multiple brokers.
- Gets all machine ids of the current Broker, current ConsumerGroup.
- Then the policy is allocated.
Since rebalancing is timed, it is possible for a Queue to be consumed by two consumers at the same time, resulting in repeated messages being delivered.
Unlike RocketMQ, Kafka’s rebalancing mechanism is performed by a Consumer in contact with a Coordinator. When a Coordinator detects a change in the Consumer group, the Coordinator sends a rebalancing signal during the heartbeat. A ConsumerLeader then makes the rebalance selection, and the Coordinator informs all consumers of the results.
What if the number of reads and writes to the Queue is inconsistent?
In RocketMQ, queues are divided into read and write queues. When I first started RocketMQ, I always thought that there would be no problem with inconsistent configuration of read and write queues. For example, when there were many consumer machines, we configured many read queues. In practice, however, it is found that messages cannot be consumed or no messages are consumed at all.
- When the number of write queues is greater than the number of read queues, the data in the write queue that is larger than this part of the read queue will be unconsumable because it will not be allocated to the consumer.
- When the number of read queues is greater than the number of write queues, the number of queues will be so large that no messages will be delivered.
5. Get started with RocketMQ instance
5.1. RocketMQ producer
Define a producer, create a Message, and call the send method.
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes (RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }}Copy the code
5.2. RocketMQ Consumers
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener (new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code
RocketMQ (part 1)