File read/write IO

**1, HeapByteBuffer, DirectByteBuffer, **MappedByteBuffer

In the JVM garbage collector, except for the CMS, all objects need to be moved; If you want to pass a reference to a Java byte[] object to a Native code so that the native code can directly access the contents of the array, you must ensure that the byte[] object cannot be moved when the Native code accesses it.

If a HeapByteBuffer is passed, the byte[] behind the HeapByteBuffer is copied to a DirectByteBuffer. It then sends the data in DirectByteBuffer. If you use DirectByteBuffer directly, there will be one less copy of HeapByteBuffer->DirectByteBuffer.

However, using DirectByteBuffer also has a cost. It is more expensive to create DirectByteBuffer than HeapByteBuffer. Therefore, if you want to use DirectByteBuffer, it is better to reuse it to avoid excessive creation.

RocketMQ internal implementation based on Java nio provide. Nio. MappedByteBuffer, map method based on FileChannel get mmap buffer, as part of the page cache to use

DirectByteBuffer and Pagecache (MappedByteBuffer) read and write separated messages

DirectByteBuffer +PageCache two-tier architecture, this can achieve read and write message separation, When a message is written to a DirectByteBuffer — in out-of-heap memory, the message is read with the PageCache(for DirectByteBuffer, there is a two-step flush to the PageCache and another step to the disk file). Avoid many of the memory operations are easy to block, reduce latency, such as page missing interrupt reduction, memory locking, contaminated page write back.

Page Cache mechanism

Page Cache is an optimization in the Linux kernel for file I/O. Linux allocates an area of memory to Cache file pages. If you want to access a file Page from an external disk, the Page is copied into memory before being read or written. Due to hardware limitations, disk I/O speed is much slower than memory. Therefore, Page cache can greatly speed up file read and write.

Before page cache occupies a large amount of memory, use cache management methods such as POSIx_fadvise provided by the Linux kernel to proactively release useless Page cache to relieve memory pressure.

Second, file storage structure

The main storage files for messages include CommitLog files, ConsumeQueue files, and IndexFile files

Messages for all topics are stored in the Commitlog file, which is segmented by 1 gb by default and named by physical offset. The index information is stored in the ConsumeQueue /topic/ Queue directory, and an entry is fixed with 20 bytes of 8-byte commitlog physical offset, 4-byte message length, and 8-byte tag Hashcode.

The broker indexes the UNIQ_KEY and topic + “#” + key. The index file is essentially a hashmap. Writing consumeQueue and indexFile is an asynchronous process

1. Logical structure

2. Physical structure

rocketmq

|–store

|-commitlog

| | – 00000000000000000000

| | – 00000000001073741824

|-config

| |-consumerFilter.json

| |-consumerOffset.json

| |-delayOffset.json

| |-subscriptionGroup.json

| |-topics.json

|-consumequeue

| |-SCHEDULE_TOPIX_XXX

| |-topicA

| |-topicB

| |-0

| | – 1

| | – 2

3. | | –

| | – 00000000000000000000

| | – 00000000001073741824

|-index

| | – 00000000000000000000

| | – 00000000001073741824

|-abort

|-checkpoint

Commitlogs are the physical file for message storage. Messages for all message topics are stored in the CommitLog file. Commitlogs on each Broker are shared by all ConsumeQueues on the current machine. The default size of files in a CommitLog file is 1 GB. When a file is full, a new CommitLog file is generated.

ConsumeQueue is a logical queue for message consumption. After messages reach the CommitLog file, they are asynchronously forwarded to the message consumption queue for message consumers to consume. This includes MessageQueue’s physical location Offset in the CommitLog. The size of the Message entity content and the hash value of the Message Tag. The default size of each file is about 600W bytes, and a new file will be generated if the file is full.

IndexFile is a message Index file. An Index file provides data retrieval for commitlogs and provides a way to find messages in commitlogs by key or time interval. In physical storage, the file name is the timestamp of creation. A fixed single IndexFile size is about 400M, and an IndexFile can hold 2000W indexes

3. Commit files store sequence diagrams

1) Synchronous flush Every time a message is sent, the message is directly stored in the mappdByteBuffer of MapFile, and then the force() method is directly called to flush the message to disk. After the force flush succeeds, A return to the caller (GroupCommitRequest#waitForFlush) is an implementation of its synchronous call.

2) Whether to enable the off-heap memory cache pool in two cases: MessageStoreConfig#transientStorePoolEnable.

TransientStorePoolEnable = true Before appending messages, write them to writeBuffer, commit them to FileChannel, and flush them.

TransientStorePoolEnable =false (default) Messages are directly stored in MappedByteBuffer(pageCache) and then periodically flushed.

Queue file format

CommitLog RocketMQ files that store messages

ConsumeQueue stores commitLog indexes and represents a single queue

MessageQueue saves an offset for each consumeGroup that consumes the message, and records that its consumeGroup consumes a position in the consumeQueue. This allows everyone to consume with their own offset, but the queue is the same.

1, commitlog

Field is referred to as”

Field size (bytes)

Field meaning

msgSize

4

Represents the size of the message

magiccode

4

MAGICCODE = daa320a7

body crc

4

Message BODY BODY CRC Is validated when the broker restarts RECOVER

queueId

4

flag

4

queueoffset

8

This value is an increment and not the actual offset of consume queue. It can represent the number of messages in the consumeQueue or tranStateTable queue. For non-transaction messages or commit transaction messages, QUEUEOFFSET * 20 is the offset address; In the case of PREPARED or Rollback transactions, you can use this value to look up data from tranStateTable

physicaloffset

8

Represents the physical start address offset of the message in the commitLog

sysflag

4

The binary number is four bytes from right to left. When all four bytes are 0 (the value is 0), it means non-transaction message. If the first byte is 1 (value: 1), the message is Compressed (Compressed). When the second byte is 1 (value 2), it means MultiTags. Prepared message when the third byte is 1 (value 4); Commit message when the fourth byte is 1 (value 8); A ROLLBACK message is sent when all 3/4 bytes are 1 (value 12). A non-transactional message is expressed when all 3/4 bytes are zero

bornTIMESTAMP

8

The timestamp of the producer

bornHOST

8

The message producer address (port)

storeTIMESTAMP

8

Messages store time at the broker

storeHOSTADDRESS

8

The address where messages are stored to the broker (Address :port)

reconsumetimes

8

The message was re-consumed several times by a subscription group (counting independently between subscription groups) because the retry message was sent to queue queueId=0 of topic % Retry %groupName.

PreparedTransaction Offset

8

Indicates a transaction message in prepared state

messagebodyLength

4

Message body size value

messagebody

bodyLength

Message body content

topicLength

1

Topic name Content size

topic

topicLength

Topic content value

propertiesLength

2

Attribute value size

properties

propertiesLength

PropertiesLength Specifies the property data size

2, consumequeue

3, IndexFile

Production and consumption

1. Timing logic for message pull and rebalance

Methods: entrance DefaultMQPushConsumerImpl# start () method

Core classes: RebalanceService and PullMessageService

Two core approaches:

  • RebalanceImpl#rebalanceByTopic

Fetch all queues under topic

② Get all the consumer ids in the group under topic

③ Sort the queues and consumer ids (the same broker will be sorted together and then sorted by queue ID), and entrust strategy to allocate queues to ensure that consumers in the same consumer group are allocated to different queues

④ Pass in the queue allocated to the current consumer, add and delete the queue (RebalanceImpl#updateProcessQueueTableInRebalance)

⑤ Processing the logic after the change of consumption queue allocation (RebalanceImpl# messageQueueChanged method)

  • PullMessageService#pullMessage

Get the processing queue ProcessQueue and call the pullAPI method to pull the message

2. Production end

DefaultMQProducer: Default implementation. TransactionMQProducer: Inherits from DefaultMQProducer and implements the method of sending transaction messages

There are three types of interfaces in MQProducer: Oneway: after sending a message, it returns immediately without processing the response and does not care whether the message is sent successfully. Sync: sends a message and waits for a response. Async: Returns immediately after sending a message, processing the response in the provided callback method.

Entry method: DefaultMQProducerImpl#start() method

Five, the transaction

RocketMQ’s transaction message can also be thought of as a two-phase commit, which simply means that a half-message is sent to the Broker at the start of the transaction.

A semi-message means that the message is not visible to the Consumer and is not in the actual queue to be sent, but in a special queue.

The local transaction is executed after the half-message is sent, and the result of the local transaction determines whether to send a commit message or a rollback message to the Broker.

1. The producer sends a transaction message to the broker

The broker sends a message to the producer asking if the local transaction was successfully executed. The producer informs the broker of the result of the local transaction