Write-ahead log Indicates the Write Ahead log
Write-ahead Log (WAL) abstracts each status update into a command and appending to a Log. This Log is appending only, that is, sequential writes, so I/O is fast. Compared to the random I/O operation of updating the stored data structure and updating the disk, it is faster to write, and also provides some persistence, that is, data is not lost and can be recovered from the log.
background
If the server fails to store data, for example, you have confirmed the client’s request, but the data stored in the server does not fall off the disk due to the restart of the process, you need to ensure that the data update requested by the client falls off the disk successfully after the restart.
The solution
Each update is abstracted into an instruction and stored in a file. Each process appends to an independent file sequentially, simplifying log processing after restart and subsequent online update operations. Each Log record had a separate ID, which could be used to perform the Segmented Log or the low-water Mark to clean up the old Log. Log updates can use the Singular Update Queue design pattern.
The structure of logging is similar to:
Class WALEntry {// Log ID Private Final Long entryId; Private final byte[] data; // Type private final EntryType EntryType; // time private long timeStamp; }Copy the code
The log file is read on each reboot and all log entries are played back to restore the current data state.
Suppose there is an in-memory key-value pair to the database:
class KVStore {
private Map<String, String> kv = new HashMap<>();
public String get(String key) {
return kv.get(key);
}
public void put(String key, String value) {
appendLog(key, value);
kv.put(key, value);
}
private Long appendLog(String key, String value) {
return wal.writeEntry(new SetValueCommand(key, value).serialize());
}
}
Copy the code
The PUT operation is abstracted as SetValueCommand, which serializes and stores the memory hashmap in the log before updating it. SetValueCommand can be serialized and deserialized.
class SetValueCommand { final String key; final String value; public SetValueCommand(String key, String value) { this.key = key; this.value = value; } @override public byte[] serialize() {try {// serialize var baos = new ByteArrayOutputStream(); var dataInputStream = new DataOutputStream(baos); dataInputStream.writeInt(Command.SetValueType); dataInputStream.writeUTF(key); dataInputStream.writeUTF(value); return baos.toByteArray(); } catch (IOException e) { throw new RuntimeException(e); Public static SetValueCommand deserialize(InputStream is) {try {// deserialize DataInputStream DataInputStream = new DataInputStream(is); return new SetValueCommand(dataInputStream.readUTF(), dataInputStream.readUTF()); } catch (IOException e) { throw new RuntimeException(e); }}}Copy the code
This ensures that even if the process restarts, the HashMap can be recovered by reading log files at startup.
class KVStore { public KVStore(Config config) { this.config = config; this.wal = WriteAheadLog.openWAL(config); this.applyLog(); } public void applyLog() { List<WALEntry> walEntries = wal.readAll(); applyEntries(walEntries); } private void applyEntries(List<WALEntry> walEntries) { for (WALEntry walEntry : walEntries) { Command command = deserialize(walEntry); if (command instanceof SetValueCommand) { SetValueCommand setValueCommand = (SetValueCommand)command; kv.put(setValueCommand.key, setValueCommand.value); } } } public void initialiseFromSnapshot(SnapShot snapShot) { kv.putAll(snapShot.deserializeState()); }}Copy the code
Implementation consider
The first is to ensure that WAL logs are actually written to disk. File processing libraries provided by all programming languages provide a mechanism to force the operating system to flush file changes to disk. When flush, there is a trade-off to consider. Flush every record in the log, ensuring strong persistence, but severely impacting performance and quickly becoming a performance bottleneck. Performance improves with asynchronous flush, but log loss can occur if the program crashes before flush. Most implementations use batch processing to minimize the performance impact of flush while minimizing data loss.
In addition, we need to ensure that the log files are not corrupted. To deal with this, log entries are typically written along with CRC records and then validated when the file is read.
At the same time, using a single log file can become difficult to manage (hard to clean up old logs, too big to read on reboot). In order to solve this problem, the Segmented Log or the low-water Mark mentioned before was usually used to reduce the size of the files read when the program started and clean up the old logs.
Finally, consider the problem of repetition, or idempotency, that retries introduce. Since WAL logs are only attached, the logs may contain duplicate entries in the event of client communication failures and retries. When reading log entries, you may need to ensure that duplicates are ignored. However, if the store is similar to a HashMap, where updates to the same key are idempotent, no reloading is required, but ABA update issues may occur. There is a general need to implement some mechanism to mark a unique identifier for each request and detect duplicate requests.
For example,
Commitlog-like logs in various MQ
Message storage in MQ is similar to log storage due to the nature of message queues, so logs are generally used as direct storage. This message store is typically WAL’s design pattern, using RocketMQ as an example:
RocketMQ:
The RocketMQ storage first stores messages in Commitlog files that are written and saved using MMAP (File mapped memory) technology. For more information on this technique, please refer to the JDK core JAVA source code parsing (5) – JAVA File MMAP principle parsing
The core method of writing a message to the file when it comes is the MappedFile appendMessagesInner method:
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt ! = null; assert cb ! = null; Int currentPos = this.wrotePosition.get(); If (currentPos < this.filesize) {//mappedByteBuffer is public, //mappedByteBuffer is a file mapped memory abstracted from the file's memory // write to this buffer, // Create a new byteBuffer that shares the same memory with slice, and set position to TransientStorePool if writeBuffer is not null. ByteBuffer = writeBuffer! = null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; If (messageExt instanceof MessageExtBrokerInner) {result = cb.doappEnd (this.getFileFromoffSet (), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } / / write size increase this. WrotePosition. AddAndGet (result) getWroteBytes ()); This.storetimestamp = result.getstoretimestamp (); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }Copy the code
RocketMQ asynchronously updates the ConsumeQueue and Index files after storing messages in Commitlog files. The ConsumeQueue and Index files can be interpreted as storage state. Commitlogs play the role of WAL logs: Only messages written to the ConsumeQueue are consumed by the consumer, and only records that exist in the Index file can be read and located. If the message is successfully written to the CommitLog but the asynchronous update has not been performed, the RocketMQ process hangs, and there is an inconsistency. So when RocketMQ starts, the Commitlog is guaranteed final consistency with ConsumeQueue and Index via the following mechanism.
The entry is DefaultMessageStore’s load method:
public boolean load() { boolean result = true; ${ROCKET_HOME}/store/abort file is created when the RocketMQ Broker is started. Boolean lastExitOK =! Add JVM shutdownhook to delete this file. this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); // Load delay queue messages, ignoring if (null! = scheduleMessageService) { result = result && this.scheduleMessageService.load(); This.mitlog.load (); this.mitlog.load (); // ConsumeQueue result = result && this.loadConsumeQueue(); If (result) {// Load the storage checkpoint this.storecheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); This.indexservice.load (lastExitOK) = this.indexService.load(lastExitOK); This. recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); } } catch (Exception e) { log.error("load exception", e); result = false; } if (! result) { this.allocateMappedFileService.shutdown(); } return result; }Copy the code
The recover method of DefaultMessageStore is used:
private void recover(final boolean lastExitOK) { long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); / / according to the last time is normal, adopt different ways of recovery if (lastExitOK) {this.com mitLog. RecoverNormally (maxPhyOffsetOfConsumeQueue); } else { this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } this.recoverTopicQueueTable(); }Copy the code
When exiting normally last time:
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (! Mappedfiles.isempty ()) {// Scan only the last three files int index = mappedfiles.size () - 3; if (index < 0) index = 0; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; While (true) {/ / check whether store messages effectively DispatchRequest DispatchRequest = this. CheckMessageAndReturnSize (byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); / / if effective, add the offset the if (dispatchRequest. IsSuccess () && size > 0) {mappedFileOffset + = size; } / / if effective, but the size is zero, represented by the end of the file, file switch else if (dispatchRequest. IsSuccess () && size = = 0) {index++; if (index >= mappedFiles.size()) { // Current branch can not happen log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next physics file, " + mappedFile.getFileName()); }} // Stop at this point only if there are invalid messages, then discard all content after this message else if (! dispatchRequest.isSuccess()) { log.info("recover physics file end, " + mappedFile.getFileName()); break; } } processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); // Based on the valid offset, delete all files after this offset, and all files (normally only the last valid file, Not all files) is greater than the offset part of the enclosing mappedFileQueue. TruncateDirtyFiles (processOffset); // According to the valid offset in the commit log, Clean up consume queue if (maxPhyOffsetOfConsumeQueue > = processOffset) {the warn (" maxPhyOffsetOfConsumeQueue > = ({}) processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); }} else {// All commit logs are deleted, Log. Warn ("The commitlog files are deleted, and delete The consume queue files"); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); }}Copy the code
When you did not exit normally last time:
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (! Mappedfiles.isempty ()) {// Start from the last file and look for the first normal file from which messages can be recovered. // Start from this file because messages in it have been successfully written to the Consumer Queue and index. So restoring from here will guarantee final consistency, but will cause some messages that have already been written to the Consumer Queue to be written again, i.e. repeated consumption. int index = mappedFiles.size() - 1; MappedFile mappedFile = null; for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); / / to find the first file with the normal message if (this. IsMappedFileMatchedRecover (mappedFile)) {the info (" recover from this mapped file "+ mappedFile.getFileName()); break; If (index < 0) {index = 0; if (index < 0) {index = 0; mappedFile = mappedFiles.get(index); } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; While (true) {/ / validation message effectiveness DispatchRequest DispatchRequest = this. CheckMessageAndReturnSize (byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); / / if the message is effective if (dispatchRequest isSuccess ()) {if (size > 0) {mappedFileOffset + = size; If (this. DefaultMessageStore. GetMessageStoreConfig () isDuplicationEnable ()) {/ / if allowed to repeat forwarding messages, you may need to determine whether the current news news offset less than the offset of the confirmed, Only less than to distribute the if (dispatchRequest getCommitLogOffset () < this. DefaultMessageStore. GetConfirmOffset ()) {/ / to distribute news, Is updated consume queue and index enclosing defaultMessageStore. DoDispatch (dispatchRequest); }} else {/ / to distribute news, that is, to update the consume queue and index enclosing defaultMessageStore. DoDispatch (dispatchRequest); Else if (size == 0) {index++; if (index >= mappedFiles.size()) { // The current branch under normal circumstances should // not happen log.info("recover physics file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next physics file, " + mappedFile.getFileName()); } } } else { log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position()); break; }} // Update offset processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); / / clean up the if (maxPhyOffsetOfConsumeQueue > = processOffset) {the warn (" maxPhyOffsetOfConsumeQueue ({}) > = processOffset ({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } // Commitlog case files are deleted else { log.warn("The commitlog files are deleted, and delete the consume queue files"); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); }}Copy the code
To sum it up:
- First, determine whether the last exit was normal based on the presence of the ABORT file.
- For normal exit:
- Scan the penultimate three files and record the offset of valid messages
- Scan to the end of an invalid message, or scan the entire file
- Set the latest offset, and clean the Commit log and Consume queue according to this offset
- For those who do not exit normally:
- Starting with the last file, look forward to the first file that is healthy enough to recover the message
- Starting from this file restores and resends messages, since messages in this file have been successfully written to the Consumer Queue and index, so restoring from this file ensures final consistency. However, some messages that have already been written to the Consumer Queue will be written again, that is, repeated consumption.
- Update offset, clean up
The database
Almost all databases have WAL’s similar design, such as MySQL’s Innodb redo log, etc.
Consistent storage
Examples include ZK and consistency middleware such as ETCD.