RocketMQ’s storage core class is DefaultMessageStore, and the entry method for storing messages is putMessage.

Message store analysis

The core attributes

  • messageStoreConfig
  • Store related configuration, such as storage path, commitLog file size, flush frequency, and so on.
  • CommitLog commitLog
  • ComitLog core processing class, messages are stored in commitlog files.
  • ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable
  • Queue information for topic.
  • FlushConsumeQueueService flushConsumeQueueService
  • ConsumeQueue Disk flushing thread.
  • CleanCommitLogService cleanCommitLogService
  • CommitLog Delete thread for expired files.
  • CleanConsumeQueueService cleanConsumeQueueService
  • ConsumeQueue Expired file deletion thread. ,
  • IndexService indexService
  • Index service.
  • AllocateMappedFileService allocateMappedFileService
  • MappedFile allocates threads. RocketMQ uses memory mapping to process commitlog and consumeQueue files.
  • ReputMessageService reputMessageService
  • Reput (Commitlog forwarding to Consumequeue, Index file)
  • HAService haService
  • Master/slave synchronization implements services.
  • ScheduleMessageService scheduleMessageService
  • Scheduled task scheduler to perform scheduled tasks.
  • StoreStatsService storeStatsService
  • Storage statistics service.
  • TransientStorePool transientStorePool
  • ByteBuffer pools, which are used in more detail later.
  • RunningFlags runningFlags
  • Storage service status.
  • BrokerStatsManager brokerStatsManager
  • Broker Statistics service.
  • MessageArrivingListener messageArrivingListener
  • The message reaches the listener.
  • StoreCheckpoint storeCheckpoint
  • Brush plate test point.
  • LinkedList dispatcherList
  • Forward COMitLog logs from commitlogs to consumeQueue and index files.

Message stored procedure putMessage

    @Override
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
// Determine whether the status of the sent message is ready
        if(checkStoreStatus ! = PutMessageStatus.PUT_OK) {return new PutMessageResult(checkStoreStatus, null);
        }
// Determine whether the message is illegal, that is, not the value of the normal return message
        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return new PutMessageResult(msgCheckStatus, null);
        }
System.currenttimemillis ();
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);
//elapsedTime is the time elapsed between sending a message and ending it
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }

        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
// If the current message fails to be sent or null is returned, the expiration time of the sent message +1
        if (null== result || ! result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }
Copy the code

Overloading putMessages

    @Override
    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if(checkStoreStatus ! = PutMessageStatus.PUT_OK) {return new PutMessageResult(checkStoreStatus, null);
        }

        PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return new PutMessageResult(msgCheckStatus, null);
        }

        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
        }

        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null== result || ! result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }
Copy the code

But I found a problem, only the parameters passed in changed, the other logic did not change at all

Actually sending a message

PutMessages method under the Commitlog class

    public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
        AppendMessageResult result;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// Get the transaction id
        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());

        if(tranType ! = MessageSysFlag.TRANSACTION_NOT_TYPE) {return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        if (messageExtBatch.getDelayTimeLevel() > 0) {
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }

        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            messageExtBatch.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            messageExtBatch.setStoreHostAddressV6Flag();
        }

        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        //fine-grained lock instead of the coarse-grained
        MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get();

        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
// Send messages are locked
        putMessageLock.lock();
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            messageExtBatch.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
// Different processing is performed depending on the result of the message returned
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
// If the message fails, it will be retried
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
        }

        if (null! = unlockMappedFile &&this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
        storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());

        handleDiskFlush(result, putMessageResult, messageExtBatch);

        handleHA(result, putMessageResult, messageExtBatch);

        return putMessageResult;
    }
Copy the code
  1. Get message type
  2. Gets a MappedFile object, a concrete implementation of memory mapping.
  3. Appending messages need to be locked and serialized.
  4. Validate the MappedFile object to get an available MappedFile (if not, create one)
  5. Write files through the MappedFile object.
  6. Flush disks according to the flush policy.
  7. Master-slave synchronization

Store the core class MappedFile

Basic attributes

// System cache size
   public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// Class variable, the total number of bytes used by all MappedFile instances.
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
/ / MappedFile number.
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// The current write pointer to the MappedFile object.
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// The pointer to the current submission.
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
// Pointer to the current flush to disk.
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
// Total file size
    protected int fileSize;
// File channel.
    protected FileChannel fileChannel;
// If transientStorePoolEnable is enabled, messages are written to off-heap memory, committed to PageCache, and finally written to disk.
    protected ByteBuffer writeBuffer = null;
// Buffer pool of ByteBuffer, off-heap memory, transientStorePoolEnable takes effect when true.
    protected TransientStorePool transientStorePool = null;
// File name
    private String fileName;
// Indicates the offset of the file represented by the file.
    private long fileFromOffset;
// File object
    private File file;
// The PageCache of the operating system.
    private MappedByteBuffer mappedByteBuffer;
// Store the timestamp for the last time.
    private volatile long storeTimestamp = 0;
    private boolean firstCreateInQueue = false;
Copy the code

Initialization method init

    public void init(final String fileName, final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        this.writeBuffer = transientStorePool.borrowBuffer();
        this.transientStorePool = transientStorePool;
    }

    private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;
// Make sure the directory exists, if not, a directory is created
        ensureDirOK(this.file.getParent());

        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("Failed to create file " + this.fileName, e);
            throw e;
        } catch (IOException e) {
            log.error("Failed to map file " + this.fileName, e);
            throw e;
        } finally {
            if(! ok &&this.fileChannel ! =null) {
                this.fileChannel.close(); }}}Copy the code

The initialization method is overridden a bit, mainly to distinguish between turning on out-of-heap memory, but the second method of init above is still called. Init basically takes a directory, reads a file at random, and then increses the information to store the data.

AppendMessageResult

    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
        return appendMessagesInner(msg, cb);
    }

    public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
        return appendMessagesInner(messageExtBatch, cb);
    }

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assertmessageExt ! =null;
        assertcb ! =null;
// Get the current write position
        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer ! =null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
// Different processing depending on the type of message is single message or batch message
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
// Change the write position
            this.wrotePosition.addAndGet(result.getWroteBytes());
// Change the final timestamp
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
Copy the code

doAppend

//fileFromOffset Specifies the offset of the file in the entire file sequence.
ByteBuffer ByteBuffer, NIO byte container.
//int maxBlank Specifies the maximum number of writable bytes.
//MessageExtBrokerInner msgInner message inner wrapper entity.
        public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner) {
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

            // PHY OFFSET
            long wroteOffset = fileFromOffset + byteBuffer.position();

            int sysflag = msgInner.getSysFlag();

            int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

            this.resetByteBuffer(storeHostHolder, storeHostLength);
            String msgId;
            if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
                msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            } else {
                msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            }

            // Record ConsumeQueue information
            keyBuilder.setLength(0);
            keyBuilder.append(msgInner.getTopic());
            keyBuilder.append(The '-');
            keyBuilder.append(msgInner.getQueueId());
            String key = keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            // Transaction messages that require special handling
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            switch (tranType) {
                // Prepared and Rollback message is not consumed, will not enter the
                // consumer queuec
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    queueOffset = 0L;
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                default:
                    break;
            }

            /** * Serialize message */
            final byte[] propertiesData =
                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }

            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData.length;

            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

            // Exceeds the maximum message
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

            // Initialization of storage space
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    break;
                default:
                    break;
            }
            return result;
        }
Copy the code

Logic:

  1. Get the offset address (the address to be written to) of the queue based on topic-QueryID. If not, add a new key-value pair with the current offset of 0.
  2. A separate special processing is required for transaction messages (PREPARE,ROLLBACK messages, not Consume).
  3. The length of the attached attribute of the message cannot exceed 65536 bytes.
  4. Calculates the message store length
  5. If the message length exceeds the configured total message length, MESSAGE_SIZE_EXCEEDED is returned.
  6. If the available space in the MapperFile is less than the current message storage space, END_OF_FILE is returned.
  7. Writes the message to MapperFile (in memory).

AppendMessageResult

    public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId,
        long storeTimestamp, long logicsOffset, long pagecacheRT) {
// Append results (success, end of file (insufficient file space), message length exceeded, message attribute length exceeded, unknown error).
        this.status = status;
// The offset of the message (relative to the entire COMMITlog).
        this.wroteOffset = wroteOffset;
// Message to be written bytes.
        this.wroteBytes = wroteBytes;
// Message ID.
        this.msgId = msgId;
// Message write timestamp.
        this.storeTimestamp = storeTimestamp;
// Message queue offset.
        this.logicsOffset = logicsOffset;
// Message write time stamp (message store timestamp - message store start timestamp).
        this.pagecacheRT = pagecacheRT;
    }
Copy the code

Once this method is done, it’s time to putMessages to flush

Message to brush plate

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
                PutMessageStatus flushStatus = null;
                try {
                    flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    //flushOK=false;
                }
                if(flushStatus ! = PutMessageStatus.PUT_OK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: "+ messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); }}else{ service.wakeup(); }}// Asynchronous flush
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else{ commitLogService.wakeup(); }}}Copy the code
  1. Synchronous brush, there are two Settings, whether to receive the storage MSG message before returning, default is true.
  2. If you wait to store the results.
  3. Wake up the synchronous flush thread.
  4. Asynchronous disk flushing mechanism.

Synchronous brush set

    public static class GroupCommitRequest {
        private final long nextOffset;
        private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
        private final long startTimestamp = System.currentTimeMillis();
        private long timeoutMillis = Long.MAX_VALUE;

        public GroupCommitRequest(long nextOffset, long timeoutMillis) {
            this.nextOffset = nextOffset;
            this.timeoutMillis = timeoutMillis;
        }

        public GroupCommitRequest(long nextOffset) {
            this.nextOffset = nextOffset;
        }


        public long getNextOffset(a) {
            return nextOffset;
        }

        public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
            this.flushOKFuture.complete(putMessageStatus);
        }

        public CompletableFuture<PutMessageStatus> future(a) {
            returnflushOKFuture; }}Copy the code

I’ll leave a little bit of a hole here. I used to use countDownLatch, now I use compatibleFuture

run

// Core method in GroupCommitService under Commitlog
        public void run(a) {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10);
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); }}// Under normal circumstances shutdown, wait for the arrival of the
            // request, and then flush
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn("GroupCommitService Exception, ", e);
            }

            synchronized (this) {
                this.swapRequests();
            }

            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }
Copy the code

waitForRunning

    protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true.false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd(); }}Copy the code

doCommit

        private void doCommit(a) {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            CommitLog.this.mappedFileQueue.flush(0);
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                        }

                        req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0); }}}Copy the code

Brush plate method

    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if(mappedFile ! =null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp; }}return result;
    }
Copy the code
  1. Gets the current MappedFile object based on the location of the last refresh.
  2. Execute flush on MappedFile.
  3. Updates the location of the last refresh.

The real brush plate method

    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if(writeBuffer ! =null || this.fileChannel.position() ! =0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force(); }}catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition()); }}return this.getFlushedPosition();
    }
Copy the code

The implementation logic for the brush is to call the Force method of FileChannel or MappedByteBuffer.

Asynchronous brush set

Related service class (thread) CommitLog FlushRealTimeService, CommitLogFlushRealTimeService , CommitLogFlushRealTimeService, CommitLogCommitRealTimeService.

  • CommitIntervalCommitLog cycle interval of the CommitRealTimeService thread. Default: 200ms.
  • CommitCommitLogLeastPages committing to a file, at least need how many pages (4 pages) by default.
  • FlushCommitLogLeastPages flushCommitLogLeastPages require at least multiple pages (4 by default) each time they are flushed to the commitlog.
  • FlushIntervalCommitLog specifies the wait time for an asynchronous refresh thread to complete a batch of tasks. The default value is 500ms.

MappedFileQueue#commit

    public boolean commit(final int commitLeastPages) {
        boolean result = true;
//findMappedFileByOffset Finds mapping files based on the offset
//findMappedFileByOffset two arguments, the first is the offset, and the second is to return the first if no mapping file is found
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if(mappedFile ! =null) {
// this commit calls the following commit method, the MappedFile#commit method
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;
        }

        return result;
    }
Copy the code

MappedFile#commit

    public int commit(final int commitLeastPages) {
        if (writeBuffer == null) {
            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
            return this.wrotePosition.get();
        }
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                commit0(commitLeastPages);
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); }}// All dirty data has been committed to FileChannel.
        if(writeBuffer ! =null && this.transientStorePool ! =null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }

        return this.committedPosition.get();
    }
Copy the code
  1. See if you can submit (pages that meet the minimum requirements for submission)

isAbleToCommit

    protected boolean isAbleToCommit(final int commitLeastPages) {
        int flush = this.committedPosition.get();
        int write = this.wrotePosition.get();

        if (this.isFull()) {
            return true;
        }

        if (commitLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }

        return write > flush;
    }
Copy the code
  1. Gets the offset of the last flush.
  2. Gets the current write offset.
  3. Returns true if the file is full.
  4. If the commitLeastPages is greater than 0, determine the interval between the offset of the current write and the offset of the last refresh. If the offset exceeds the number of commitLeastPages, the commitLeastPages will be committed; otherwise, no commitLeastPages will be committed.
  5. If no new data is written, the submission task ends.

commit0

MappedFile#commit0

    protected void commit0(final int commitLeastPages) {
// Get the current write offset
        int writePos = this.wrotePosition.get();
// Get the refresh offset.
        int lastCommittedPosition = this.committedPosition.get();
// Offset currently written - The offset from the last refresh is greater than the minimum page size refreshed
        if (writePos - lastCommittedPosition > commitLeastPages) {
            try {
// The slice method simply opens a new buffer, starting with no data in the writeBuffer
// writeBuffer size is 5, write two numbers, after slice, the new buffer starts at 0, capacity is 5-2=3
                ByteBuffer byteBuffer = writeBuffer.slice();
// Sets the position of this buffer. If the tag is defined and larger than the new location, it is discarded.
                byteBuffer.position(lastCommittedPosition);
// Set the limit of this buffer. If the position is greater than the new limit, set it to the new limit. If the tag is defined and greater than the new limit, it is discarded.
                byteBuffer.limit(writePos);
// Set the location
                this.fileChannel.position(lastCommittedPosition);
// Write data
                this.fileChannel.write(byteBuffer);
// Update data
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e); }}}Copy the code

Message stored procedure

Master/slave synchronization mechanism

    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Verify that the current node is the master node
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
// The message has been stored
            if (messageExt.isWaitStoreMsgOK()) {
                // Decide whether to wait, depending on whether the slave node receives the message
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    PutMessageStatus replicaStatus = null;
                    try {
                        replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                                TimeUnit.MILLISECONDS);
                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    }
                    if(replicaStatus ! = PutMessageStatus.PUT_OK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: "+ messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); }}// Slave problem
                else {
                    // Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); }}}}Copy the code

Message transfer process

  1. After the Producer sends a message to the Broker, the Broker writes the message to a CommitLog, either synchronously or asynchronously. All RocketMQ messages are stored in the CommitLog. To ensure that messages are stored in the CommitLog, a lock is placed before writing to the CommitLog. Messages can also be written to the CommitLog sequentially as long as they are persisted to the disk file CommitLog. This ensures that the messages sent by the Producer are not lost.
  2. CommitLog persists and dispatches messages to their Consume Queue, which is a logical Queue equivalent to a Partition in Kafka. Stores the CommitLog start Offset, log size, and MessageTag hashCode for this Queue.
  3. When a consumer consumes a message, the ConsumerQueue is read first. The logical consumption queue ConsumeQueue stores the CommitLog’s starting physical Offset, Offset, for messages on a Topic. The message size, and the HashCode value of the message Tag.
  4. There is no data to read messages directly from the ConsumerQueue; the real message body is in the CommitLog, so you need to read messages from the CommitLog as well.

conclusion

RocketMQ stores messages as a file system, including CommitLog files, ConsumeQueue files, and IndexFile files.

  • Commitlogs are the physical file for message storage. Messages for all message topics are stored in the CommitLog file. Commitlogs on each Broker are shared by all ConsumeQueues on the current machine. The default size of files in the CommitLog is 1 GB and can be dynamically configured. When a file is full, a new CommitLog file is generated. All Topic data is written sequentially to the CommitLog file.
  • ConsumeQueue is a logical queue for message consumption. After messages reach the CommitLog file, they are asynchronously forwarded to the message consumption queue for message consumers to consume. This includes MessageQueue’s physical location Offset in the CommitLog. The size of the Message entity content and the hash value of the Message Tag. The default size of each file is about 600W bytes. If the file is full, a new file is generated.
  • IndexFile is a message Index file. An Index file provides data retrieval for commitlogs and provides a way to find messages in commitlogs by key or time interval. In physical storage, the file name is named after the timestamp created. The size of a single IndexFile is about 400M, and an IndexFile can hold 2000W indexes.

A single COMMITlog file, with a default size of 1 GB, stores all messages in multiple commitlog files. The commitlog files are named according to the offset of the file in the entire Commitlog, as shown in the following example.

For example, a commitlog file, 1024 bytes.

First file: 00000000000000000000

Second file: 00000000000000001024

MappedFile encapsulates a CommitLog file, whereas MappedFileQueue encapsulates a logical CommitLog file. MappedFile queue, from smallest to largest.

MappedByteBuffer, which encapsulates the MappedFile class.

1. Synchronous flush Every time a message is sent, the message is directly stored in the mappdByteBuffer of MapFile, and then the force() method is directly called to write the message to the disk. After the force flush succeeds, A return to the caller (GroupCommitRequest#waitForFlush) is an implementation of its synchronous call.

2. Flush disks asynchronously

There are two scenarios: whether to enable the off-heap memory cache pool. The specific configuration parameter is MessageStoreConfig#transientStorePoolEnable.

1) transientStorePoolEnable = true

Messages are appended to writeBuffer, commit to FileChannel, and flush.

2) transientStorePoolEnable=false (default)

When messages are appended, they are directly stored in the MappedByteBuffer(pageCache) and then flushed periodically.

reference

  • Source analysis RocketMQ brush disk mechanism