Message acknowledgement mechanism
Message determination mechanism for RabbitMq consumers:
NONE: no answer. By default, consumer processes all requests correctly.
AUTO: Indicates that the consumer automatically responds. An ACK is sent if the processing is successful (note: success indicates that no exception occurs). An NACK is sent if the processing fails. Rabbitmq sends a message and waits for the consumer to respond. It will not clear the message from RabbitMQ until it receives an ACK. The handling of receiving nACK exception messages is set by the setDefaultRequeueReject() method. In this mode, sending error messages can be recovered.
MANUAL: The mode is basically the same as the AUTO mode. The difference is that you need to confirm the method manually
Cluster consumption and broadcast consumption
Default cluster consumption
Clustered mode: A Consumer instance in the ConsumerGroup allocates consumers to queues based on the queue allocation policy algorithm, and allocates (by default) consumption messages equally.
- Equal distribution strategy
- Circular allocation policy
- Manual allocation policy
- Equipment Room Allocation Policy
- Consistency of the Hash
Radio consumption
Broadcast consumption: A message is consumed by multiple consumers, even if these consumers belong to the same ConsumerGroup, the message will be consumed by each consumer in the ConsumerGroup once. The concept of ConsumerGroup in broadcast consumption can be considered meaningless in terms of message partitioning.
Broadcast consumption, the message subscribers to the Topic consume each message. Cluster consumption, where only one of the messages subscribing to the Topic consumes a message
Broadcast consumption and cluster consumption are different in message falling: specific performance is on the preservation of message consumption progress. Broadcast consumption, because each consumer consumes each message independently, so each consumer saves his or her own progress of message consumption.
Under cluster consumption, if a Topic is subscribed and there are multiple MessageQueue under it, each consumer may consume different MessageQueue, so the overall consumption progress is saved in the centralized management of the Broker
RocketMq zero copy
The message consumer uses the zero-copy principle;
- Advantages of using mmap + Write (RocketMQ’s option is better than SendFile because of the need to transfer small chunks of data) : Even if called frequently, it is efficient to transfer small chunks of files; Disadvantages: Can not make good use of DMA mode, will consume more CPU than SendFile, memory security control complex, need to avoid JVM Crash problem.
- Use sendfile
Advantages: Can use DMA mode, less CPU consumption, high efficiency of large file transfer, no new problems of memory security; Disadvantages: Low efficiency of small block files. The file can be transferred only in BIO mode but not NIO.
The message is delivered 100 percent
-
Transaction and confirm patterns to ensure that the producer does not lose messages.
-
The message falls into the database and marks the message status
-
Delayed delivery of messages, secondary validation, callback checks
-
Transaction way
-
Because of poor performance, you are not advised to advertise messages in transaction mode. You are advised to use asynchronous confirmation mode
- Channel.txselect () declares the transaction mode to be started
- Channel.txcommit () commits the transaction
- Channel.txrollback () rolls back the transaction
• Release confirmation
Messages are stacked
- concurrentConsumers
- prefetchCount
- ConcurrentConsumers sets the number of concurrentConsumers set at initialization for each listener.
- PrefetchCount is the number of messages to be consumed from the broker at a time. PrefetchCount is the size of the LinkedBlockingQueue, a blocking queue maintained internally by BlockingQueueConsumer. Its effect is that if a consumer queue is blocked, new messages cannot be received and are sent to other consumers that are not blocked
If we want to increase the speed of the consumer, we can do so by configuring two parameters
Spring. The rabbitmq. Listener. Simple. Concurrency: the minimum number of consumers spring. The rabbitmq. Listener. Simple. Max - concurrency: The largest number of consumers spring. The rabbitmq. Listener. Simple. The prefetch: specify a request processing of a message, if you have the transaction must be greater than or equal to the transaction numberCopy the code
The order message
Sequential messages (FIFO messages) are a type of message provided by MQ that is published and consumed strictly sequentially. Sequential messages consist of two parts: sequential publication and sequential consumption. The main idea is to set the number of partitions for the topic to 1 to allow single-threaded messages to be sent. Sequential messages contain two types:
- Partition order: All messages within a Partition are published and consumed in a first-in, first-out order
- Global ordering: All messages within a Topic are published and consumed in a first-in, first-out order
- Sequential messages must be sent in a single thread, multithreading will no longer ensure order.
In the MQ model, the order needs to be guaranteed by three phases:
- Messages are sent in order
- Messages are stored in the same order as they were sent
- Messages are consumed in the same order as they are stored
Delay message
RabbitMQ itself does not support delayed messages, but is typically modeled using time-to-live and Dead Letter Exchanges. If the message is not consumed after the maximum lifetime, it becomes a dead letter and is re-delivered to the dead letter switch. Then the dead letter switch forwards the message to the corresponding dead letter queue according to the binding rules. By listening the queue, the message can be re-consumed.
In RocketMQ, delayed messages are supported, but not with arbitrary time precision, only with specific levels of delay. If you want to support arbitrary time accuracy, you can’t avoid ordering messages at the Broker level, and persistence considerations are involved, then message ordering inevitably incurs significant performance overhead.
The delay levels include 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 20m, 30m, 1H, 2h. When sending messages, you only need to set the message delay level. When setting the message delay level, the following three situations occur:
- If the delay level of a message is set to 0, the message is non-delayed.
- If the message delay level is greater than or equal to 1 and less than or equal to 18, the message delay is specified. For example, if the message delay level is set to 1, the message delay is 1s. Set the message latency level to 2, then 5s, and so on.
- If the message delay level is greater than 18, the message delay level is 18. For example, if the message delay level is 20, the message delay is 2h.
- The implementation principle is sent by the broker timed delivery task, based on the scheduled task sent, task scheduling.
Transaction message
The user sends a Half message to the broker, which sets queueOffset=0; That is, invisible to the consumer. The user sends a COMMIT message to the broker after the local message is successfully processed. The broker’s queueOffset is set to normal and the consumer can consume the message. If the local transaction fails, send a rollback to the broker, which deletes the message. If a producer forgets to deliver a commit or rollback message, the broker will periodically check the producer to determine the execution status of the local transaction before deciding whether to delete, commit, or rollback the Half message.
RocketMQ
The overall design of RocketMQ is similar to other MQS, with Producer, Consumer, NameServer, and Broker as well.NameServer stores information about topics and brokers. Its main function is to manage brokers and manage routing information for consumption.
When the server is started, the nodes are registered with NameServer, the connection is maintained through heartbeat, and the survival status of each node is recorded. In addition, NameServer also records the request information of producers and consumers, and implements load balancing of message delivery by combining the node information of message queue.
RocketMQ’s Broker is similar to Kafka in that it hosts message stores, acts as an entry point for client requests, and manages the consumption of producers and consumers.
The Broker cluster also bears the responsibility for the high availability of message queues and can extend the replication mechanism to ensure high availability through data synchronization between master and slave nodes, much like Kafka’s partitioned replication mechanism.
The Broker can scale horizontally – if the message queue cluster does not fit the current business scenario, new machines can be added to scale the Broker cluster. Once a new Broker node is started, it is registered with NameServer. Producers and consumers in the cluster are aware of the new node through NameServer and can then publish and consume messages.
Unlike other message queues, RocketMQ also supports tags, which are a further extension of Topic and can be understood as a subtopic. With Tag, the message of a business module can be further broken up and made more flexible when dividing the topic of the message queue.
RabbitMQ basic concepts
- Message
- Publisher
- Exchange
- Binding
- Queue
- Connection
- Channel
- Consumer
- Virtual Host
- Broker
Switch Types
• Fanout: widely distributed and can be received • dircat: point-to-point, one-to-one • Topic: matching card • header
Message persistence
The RabbitMQ:
-
Persistence of the exchange
The persistence of switches is declared with the durable value set to true. If the exchange is not persistent, the exchange information will be lost after the RabbitMQ exchange service restarts, but messages will not be lost, but messages will not be sent to the exchange.
-
Queue pair persistence
Persistence of queues When declaring queues, the durable object is set to true. If the queue is not persistent, the queue information will be lost when the RabbitMQ switch service is restarted, along with the messages in the queue.
-
Persistence of messages
The message is persisted by setting deliveryMode to 2 in BasicProperties. Persistence of queues guarantees that its metadata will not be lost due to exceptions, but it does not guarantee that existing messages will not be lost internally. To ensure that messages are not lost, they need to be persisted.
- Exchange persistence, declared as durable true
- Queue is durable and declared as true
- Delivery_mode =2 (1 is non-persistent)
RocketMQ cluster
There are five types of clustered deployment for brokers:
- If the Master node is faulty, the cluster is unavailable
- Single master, single salve, when the master hangs, it cannot write, but it can read
- Multi master, no salve, multi master, one hung, the other on top, does not affect the use of the best performance
- Multi-master, multi-salve, asynchronous replication
- Multi-master, multi-salve, synchronous replication
Message synchronization:
- Synchronous replication
- The producer sends a message back to the producer after the synchronous replication succeeds
- Asynchronous replication
- Instead of waiting, the producer returns a success message, using the idea of final consistency
NameSrv
It is mainly used to save metadata and improve availability. High availability is realized based on NameSrv cluster. The main function is to temporarily save and manage Topic routing information. Namesrvs are stateless, unable to communicate with each other and unaware of each other’s existence.
The Broker reports its metadata information to nameSrv, also known as topic routing, at startup.
Broker
The broker serves primarily as a store in rocketMQ, handling TCP requests and storing messages. Brokers are divided into Master brokers and Salve brokers. The Master provides services, while the Salve provides consumption services after the Master is down.
A broker is similar to a ZooKeeper in that information is stored in different file paths
- commitLog
- consumequeue
Contains consumption queue file information for all topics on the broker. Each consumption queue represents an index of a commitlog, which is provided to consumers for pull messages that are used by update sites
-
Index File
All files in this directory are hash indexes based on message keys and file names are named according to the timestamp when they were created
-
config
Saves all topics, subscriptions, and consumption progress in the current broker, which is regularly persisted from memory to disk in case of downtime
-
abort
If an exception is closed, the file will be deleted. If an exception is closed, the file will not be deleted. When the broker starts, it checks whether the file exists and determines whether to rebuild the index index
-
checkpoint
Records the operations performed in the latest time, for example, the last flush time
Three conditions must be met to delete expired files
- The current time is equal to the configured deletion time
- The disk usage exceeds 85%
- Manually Performing deletion
How do I prevent data loss?
It is mainly based on three dimensions:
-
Producers producer
The producer sends a message to the message middleware. Based on the ConfiM mechanism and synchronous method, the message is considered as successful only after receiving the ok confirmation. If the message fails, the task will be retried twice.
-
broker
Messages can be persisted to Commitlog, and unconsumed messages can be loaded even after a reboot.
Broker cluster supports the policy of 1 Master and N slave, and supports synchronous and asynchronous replication. Synchronous replication ensures that messages will not be lost even if the Master disk crashes.
The Broker supports synchronous and asynchronous flush policies to ensure that received messages are stored in local memory.
Based on three main documents, commitlog consumerQueue, indexFile for persistence, finally can can view in the config to the corresponding consumption progress, subscription relationship have to wait, if the downtime or shut down, You can use abort and checkpoint files to determine how to roll back.
-
consumers
Based on the ACK mechanism, there are manual ack and automatic ACK to confirm whether the consumption is successful. If the consumption fails, the message can be sent to the broker.
Consumequeue: Topic is the file name under consumeQueue, and queue ID is the folder under each Topic. It is used to record the location of each message in the file of commitlog. The content recorded in the consumeQueue directory is in queueoffset/size/tag format
Index: Message index file information that generates hash keys based on topic and tag names and values as the commitlog offset of messages.
Config: records the configuration information of some consumers
Synchronous and asynchronous disk flushing
- Asynchronous flush mode: When the write success status is returned, the message may only be written to the memory PAGECACHE. The write operation returns quickly and has a high throughput. When the number of messages in the memory accumulates to a certain extent, disk write operations are triggered. Advantages: High performance Disadvantages: When the Master machine breaks down and the disk is damaged, a small number of messages are lost, causing the status of MQ messages to be inconsistent with that of producer/consumer messages
- Synchronous flush mode: The message has been written to the disk before the application write success status is returned. The specific process is that, after the message is written to the PAGECACHE of the memory, the flush thread is immediately notified to flush the disk, and then waits for the flush completion. After the flush thread completes its execution, it wakes up the waiting thread and returns to the application the status of writing the message successfully.
Advantages: Can keep MQ message state consistent with producer/consumer message state
Disadvantages: Lower performance than asynchronous
Synchronous or asynchronous replication
If a broker group has Master and Slave, messages need to be copied from the Master to the Slave, either synchronously or asynchronously.
- Synchronous replication: The write success status is reported to the client only after data is written successfully to the Master and Slave
Advantages: If the Master fails, all backup data is stored on the Slave, which is easy to recover. Consumers can still consume data from the Slave, and messages are not lost
Disadvantages: Increased data write latency, reduced system throughput, slightly lower than asynchronous replication, about 10% lower than the response time of sending a single Master is slightly higher
- Asynchronous replication: The Master sends the write success status to the client as long as the write is successful
Advantages: The system has low latency and high throughput. After the Master goes down, consumers can still consume from the Slave. This process is transparent to the application, requires no human intervention, and performs almost as well as multiple Master modes
Disadvantages: If the Master fails, some data is not written to the Slave and a small number of messages are lost.