Reading the source code for HashMap is actually relatively easy nowadays, because there are a number of blogs on the web that analyze HashMap and ConcurrentHashMap. And this article is the first detailed analysis of CopyOnWriteStateTable source blog, reading complex collection class source process is quite a challenge, the author also encountered a lot of questions at the beginning of reading, finally one by one to solve. This article has more than 12,000 words and many illustrations, which is not easy. It will be helpful to read this article in detail, whether it is for interviews or to broaden your horizons.

It will be helpful to read this article in detail, whether it is for interviews or to broaden your horizons.

Disclaimer: the author’s source code analysis is based on the Flink-1.9.0 release branch, in fact, read the source code do not care about the version of the problem, the main process of each version is basically similar. If you are familiar with the source code of a certain version, then the new version changes, we can focus on the changes.

This article mainly describes the knowledge about CopyOnWriteStateTable in Flink. When MemoryStateBackend and FsStateBackend are used, By default, state data is saved to CopyOnWriteStateTable. CopyOnWriteStateMap saves the state of multiple keygroups. Each KeyGroup corresponds to a CopyOnWriteStateMap.

CopyOnWriteStateMap is a structure similar to HashMap, but with two very interesting features:

  • 1. Hash structure To ensure high performance of read and write data, a capacity expansion policy is required. CopyOnWriteStateMap capacity expansion policy is a progressive rehash policy, that is, data is slowly migrated to a new hash table instead of migrating all data at once.
  • 2. CopyOnWriteStateMap at Checkpoint supports asynchronous snapshots. That is, data in CopyOnWriteStateMap can be modified at Checkpoint while snapshots are being created. The question is: How do you guarantee the accuracy of snapshot data after data changes?

Those of you who know Redis should know that Redis is also a large hash structure, and the expansion strategy is also a progressive Rehash. Redis RDB in the process of persisting data is also external service, external service means that data may be modified, so how does RDB ensure that the persisting data must be correct?

For example, RDB starts to persist data at 17:00:00, one second after a data is modified in Redis, and one minute after RDB finishes persisting. The expected result of RDB persistence should be the full Redis snapshot at 17:00.00. Answer: Certainly can do not affect.

Similar to Redis, Flink uses Checkpoint snapshots to reduce service downtime. How does Flink do this? Read the following in detail with questions.

1. StateTable profile

KeyedStateBackend of MemoryStateBackend and FsStateBackend use HeapKeyedStateBackend to store data. HeapKeyedStateBackend holds Map

> registeredKVStates to store the mapping between StateName and specific State. The key of registeredKVStates is StateName and value is the specific State data. The state-specific data is stored in the StateTable.
,>

StateTable has two implementations: CopyOnWriteStateTable and NestedMapsStateTable.

  • CopyOnWriteStateTable is a data structure customized by Flink. Checkpoint supports asynchronous Snapshot.
  • NestedMapsStateTable directly nested Java two-layer HashMap to store data. Snapshots need to be synchronized at Checkpoint.

The following details CopyOnWriteStateTable.

2.CopyOnWriteStateTable

StateTable holds StateMap[] keyGroupedStateMaps real stored data. StateTable initializes a StateMap for each KeyGroup to isolate the KeyGroup. When operating on a state, StateTable calculates the corresponding KeyGroup based on the key and obtains the corresponding StateMap before operating on the state.

CopyOnWriteStateMap is used to store data in CopyOnWriteStateTable. The realization of CopyOnWriteStateMap is mainly introduced here. CopyOnWriteStateMap is just an array + linked list hash table.

The element type in CopyOnWriteStateMap is StateMapEntry. The first level of the hash table begins with an array of type StateMapEntry: StateMapEntry[]. There is a StateMapEntry Next pointer in the StateMapEntry class that makes up the linked list.

Compared with a normal hash table, CopyOnWriteStateMap has the following points:

  • CopyOnWriteStateMap’s capacity expansion strategy is incremental Rehash, not all at once
  • To support asynchronous Snapshot, the StateMap Snapshot needs to be saved. How to implement the save policy?
  • To support CopyOnWrite, a series of copy operations must be performed when modifying data. Do not modify original data; otherwise, Snapshot will be affected.
  • Snapshot Asynchronous Snapshot process and Snapshot completion, how to release data of an earlier version?

3. Progressive Rehash policy of CopyOnWriteStateMap

The progressive Rehash policy indicates that CopyOnWriteStateMap currently has a hash table for external services, but the current hash table has too many elements and needs to be expanded. Therefore, data needs to be migrated to a larger hash table.

The problem with this strategy is that if a HashMap currently stores 1 GB of data, it may take a long time to migrate 1 GB of data. The expansion of CopyOnWriteStateMap will not completely migrate the data, but slowly migrate the data to a large hash table every time you operate CopyOnWriteStateMap.

For example, you can migrate four data items to a large hash table each time you perform GET and PUT operations. In this way, all data can be migrated after a period of GET and PUT operations. So the progressive rehash strategy migrates all the data into the new hash table many times.

3.1 Expansion Overview

There are two hash tables in the memory, primaryTable as the primary bucket and rehashTable as the bucket for capacity expansion. In the initial phase, only primaryTable is used. When the number of primaryTable elements exceeds the threshold, the system starts to expand the primaryTable.

Capacity expansion process: apply for a hash table twice the capacity of primaryTable to rehashTable, and slowly migrate elements in primaryTable to rehashTable. When size() > threshold is judged in putEntry method, doubleCapacity method will be called to apply for new hash table assignment to rehashTable.

As shown in the following figure, the number of buckets in primaryTable is 4 and that in rehashTable is 8.

During capacity expansion, elements at position 0 in primaryTable will be migrated to positions 0 and 4 in rehashTable, and elements at position 1 in primaryTable will be migrated to positions 1 and 5 in rehashTable.

3.2 Selecting a Table Policy

Assume that the data in bucket 0 in primaryTable has been migrated to rehashTable, then any put or GET operation on bucket 0 will be performed on rehashTable. Buckets 1, 2, and 3 have not been migrated. Therefore, buckets 1, 2, and 3 also need to operate primaryTable buckets. There will be a bucket selection operation corresponding to the source code, choose whether to use primaryTable or rehashTable.

The source code implementation is as follows:

// select primaryTable or incrementalRehashTable for the current element private StateMapEntry<K, N, S>[] selectActiveTable(int hashCode) {// calculate which bucket hashCode should be divided into in primaryTable int curIndex = hashCode & (primaryTable.length - 1); // Buckets whose value is greater than or equal to rehashIndex have not been migrated and should be found in primaryTable. // Buckets smaller than rehashIndex have been migrated and should be found in incrementalRehashTable. return curIndex >= rehashIndex ? primaryTable : incrementalRehashTable; }Copy the code

Int curIndex = hashCode & (primaryTable.length-1); Calculate which bucket of the primaryTable the current hashCode should be placed in.

RehashIndex indicates the progress of rehash migration, that is, data prior to rehashIndex has been migrated from primaryTable to the rehashTable bucket. If rehashIndex = 1, all data before primaryTable 1 is migrated, that is, all data before primaryTable 0 is migrated.

Policy: buckets whose value is greater than or equal to rehashIndex have not been migrated and should be found in primaryTable. Buckets smaller than rehashIndex have been migrated and should be looked for in incrementalRehashTable.

3.3 Migration Process

Every time the get, put, either containsKey, remove operation, will trigger call computeHashForOperationAndDoIncrementalRehash method transfer operation.

The role computeHashForOperationAndDoIncrementalRehash method:

  • Checks if it is in a rehash, and if it is, calls incrementalRehash to migrate a wave of data
  • Calculate the hashCode corresponding to the key and namespace

Focus on the incrementalRehash method implementation:

private void incrementalRehash() { StateMapEntry<K, N, S>[] oldMap = primaryTable; StateMapEntry<K, N, S>[] newMap = incrementalRehashTable; int oldCapacity = oldMap.length; int newMask = newMap.length - 1; int requiredVersion = highestRequiredSnapshotVersion; int rhIdx = rehashIndex; Int transferred = 0; // Migrate at least one MIN_TRANSFERRED_PER_INCREMENTAL_REHASH element to the new bucket each time, // MIN_TRANSFERRED_PER_INCREMENTAL_REHASH defaults to 4 while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {// Clear oldMap rhIdx pk StateMapEntry<K, N, S> e = oldMap[rhIdx]; // While (e! = e! = e! = e! = e! = e! = e! = null) {/ / version is smaller than highestRequiredSnapshotVersion elements, If (e.tryVersion < requiredVersion) {e = new StateMapEntry<>(e, stateMapVersion); } // Save the next node to be migrated to n StateMapEntry<K, n, S> n = e.next; Int pos = e.hash & newMask; e.next = newMap[pos]; newMap[pos] = e; // e points to the next node to be migrated e = n; // Extension +1 ++ extension; } oldMap[rhIdx] = null; If (++rhIdx == oldCapacity) {XXX return; rhIdx == oldCapacity; // primaryTableSize = transferred, added primaryTableSize -= transferred; incrementalRehashTableSize += transferred; rehashIndex = rhIdx; }Copy the code

The first layer of the while loop in the incrementalRehash method controls the minimum number of elements per migration. Then, the rhIdx bucket of oldMap is traversed. E refers to the element currently traversed. Each time, e refers to E. ext. If e is not empty, it means that there are still elements in the bucket that have not been traversed and need to continue traversing. Each migration must ensure that all buckets are migrated, not half of a bucket.

During migration, the hash value of the current element e is recalculated and inserted into the head of the corresponding bucket in newMap (headinterpolation). When e.entryVersion < requiredVersion, you need to create a new Entry to support CopyOnWrite, as described below.

4.StateMap Snapshot policy

The StateMap Snapshot policy is: To support asynchronous snapshots, the StateMap Snapshot needs to be saved.

The traditional method is to make a deep copy of the full StateMap data in memory, and then make a snapshot of the copied data. The original data can be used for external services. But deep copy requires copying all the real data, so it can be very inefficient. To improve efficiency, Flink makes only shallow copies of the data.

4.1 Shallow Copy Analysis

Shallow copy refers to copying references but not data.

If StateMap is not being expanded, the Snapshot process is relatively simple. Create a new Snapshot data and copy the primaryTable data to the Snapshot data.

As shown in the figure, the shallow copy can be interpreted as referring to the same linked list in bucket 0 of both tables, that is, pointing shotdata to Entry A in the figure. Shallow copies of other buckets are similar, so I don’t want to draw a picture.

If StateMap is currently being expanded, the Snapshot process is relatively complicated. To create a new Snapshot data, copy the data of both primaryTable and rehashTable to the Snapshot data.

As shown, the original two tables are copied to snapshotData, but the length of the snapshotData array is not the length of primaryTable + the length of rehashTable. Instead, calculate how many buckets in primaryTable and rehashTable have data. For example, three buckets in primaryTable have elements and two in rehashTable have elements. Therefore, the number of snapshotData buckets is 5, and 4 + 8 = 12 is not necessary.

Entry is also omitted in the figure above. The shallow copy referenced by Entry is similar to the situation without expansion.

4.2 Shallow copy source code details

First call the stateSnapshot method of CopyOnWriteStateTable to take a snapshot of the entire StateTable. Will create CopyOnWriteStateTableSnapshot stateSnapshot method, CopyOnWriteStateTableSnapshot constructor will call CopyOnWriteStateTable getStateMapSnapshotList method.

GetStateMapSnapshotList getStateMapSnapshotList

List<CopyOnWriteStateMapSnapshot<K, N, S>> getStateMapSnapshotList() { List<CopyOnWriteStateMapSnapshot<K, N, S>> snapshotList = new ArrayList<>(keyGroupedStateMaps.length); / / / / call all CopyOnWriteStateMap stateSnapshot method generates CopyOnWriteStateMapSnapshot saved to the list for (int I = 0; i < keyGroupedStateMaps.length; i++) { CopyOnWriteStateMap<K, N, S> stateMap = (CopyOnWriteStateMap<K, N, S>) keyGroupedStateMaps[i]; snapshotList.add(stateMap.stateSnapshot()); } return snapshotList; }Copy the code

CopyOnWriteStateTable maintains a StateMap for each KeyGroup into the keyGroupedStateMaps, The getStateMapSnapshotList method calls all CopyOnWriteStateMap’s stateSnapshot methods.

CopyOnWriteStateMap’s stateSnapshot method is as follows:

public CopyOnWriteStateMapSnapshot<K, N, S> stateSnapshot() { return new CopyOnWriteStateMapSnapshot<>(this); } CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> owningStateMap) { super(owningStateMap); / / to shallow copy StateMap data generated snapshotData enclosing snapshotData = owningStateMap. SnapshotMapArrays (); / / record current StateMap version to snapshotVersion enclosing snapshotVersion = owningStateMap. GetStateMapVersion (); this.numberOfEntriesInSnapshotData = owningStateMap.size(); }Copy the code

CopyOnWriteStateMap stateSnapshot method creates CopyOnWriteStateMapSnapshot, CopyOnWriteStateMapSnapshot constructor will call StateMap snapshotMapArrays method for shallow copy generated snapshotData StateMap data. Add the current StateMap version to snapshotVersion.

The snapshotMapArrays method of StateMap implements the shallow-copy principle in code as follows:

public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {// StateMap version private int stateMapVersion; // Version of all ongoing snapshots private Final TreeSet<Integer> snapshotVersions; / / the ongoing the snapshot version number biggest private int highestRequiredSnapshotVersion; StateMapEntry < K, N, S > [] snapshotMapArrays () {/ / 1 + 1 and stateMapVersion versions, assigned to highestRequiredSnapshotVersion, // Add snapshotVersions synchronized (snapshotVersions) {++stateMapVersion; highestRequiredSnapshotVersion = stateMapVersion; snapshotVersions.add(highestRequiredSnapshotVersion); } // create a copy of the current primary and Increment elements into the copy table. Alter table copy (int int, int int int, int int int, int int int, int int int, int int int, int int int, int int int, int int int, int int int, int int int, int int int) StateMapEntry<K, N, S>[] table = primaryTable; final int totalMapIndexSize = rehashIndex + table.length; final int copiedArraySize = Math.max(totalMapIndexSize, size()); final StateMapEntry<K, N, S>[] copy = new StateMapEntry[copiedArraySize]; if (isRehashing()) { final int localRehashIndex = rehashIndex; final int localCopyLength = table.length - localRehashIndex; // for the primary table, take every index >= rhIdx. System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength); table = incrementalRehashTable; System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex); System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex); } else { System.arraycopy(table, 0, copy, 0, table.length); } return copy; }}Copy the code

There are three important properties in CopyOnWriteStateMap:

  • StateMapVersion: indicates the current StateMap version. The version number increases by one for each Snapshot
  • SnapshotVersions: Hold versions of all snapshots in progress (because there may be multiple snapshots in progress at the same time)
  • HighestRequiredSnapshotVersion: said it was the biggest version number of the snapshot, if there is no ongoing snapshot, the current assignment of 0

SnapshotMapArrays the first step of the snapshotMapArrays method is to update the three attributes according to the above rules, and the second step is to make a shallow copy of the existing elements of primaryTable and rehashTable into the copy array.

CopiedArraySize = totalMapIndexSize; copiedArraySize = totalMapIndexSize Actually copiedArraySize = math.max (totalMapIndexSize, size()).

TotalMapIndexSize source annotation wrote: theory is enough, considering the size here primarily to compatible StateMap TransformedSnapshotIterator function.

5. Implementation principle of CopyOnWrite

The previous section concluded that each Snapshot is only a shallow copy, so Snapshot and StateMap both reference real data. If Snapshot has not flushed data to disk, but changes have been made in StateMap, then Snapshot’s last flush data is incorrect. Snapshot’s goal is to flush the original data in a Snapshot to disk. Since Snapshot is called, it is not allowed to be modified.

5.1 Principle of CopyOnWrite

How does StateMap ensure that it does not modify Snapshot data? The principle is simple: StateMap and Snapshot share a large amount of data. Since Snapshot requires that data cannot be modified, StateMap can make a copy of the data when modifying it. Therefore, Snapshot and StateMap have their own copies, so StateMap’s changes to data do not affect Snapshot.

Of course, StateMap will only Copy the data that needs to be changed to save memory and improve efficiency. StateMap will only Copy the data that needs to be changed and share as much as possible.

5.2 Detailed explanation of CopyOnWrite principle

In the previous part of Snapshot, only a shallow copy of the Table was made, and it can be seen that the data in the bucket is unchanged before and after the copy, and there is no intersection between buckets. Therefore, the principle here is mainly to analyze how a linked list in a bucket realizes CopyOnWrite.

■ 5.2.1 Modifying the head node of the Linked list

As shown in the figure above, bucket 0 of primaryTable and snapshotTable both point to Entry A. Suppose that the application layer wants to modify Entry A’s data, the overall process is as follows:

  • Deep copy an Entry A object is Entry a copy
  • Copy Entry a into the linked list of primaryTable, and next points to Entry b
  • The application layer modifies Entry A copy data and changes data1 to the specified data2

Entries b and c have not been modified and do not need to be copied. They belong to primaryTable and snapshotTable.

This leads to the design goal of CopyOnWriteStateMap (my own understanding, not official opinion) : to improve performance by copying as little data as possible while maintaining Snapshot data correctness.

■ 5.2.2 Modifying the middle node of the Linked List

As shown in the figure above, bucket 0 of primaryTable and snapshotTable both point to Entry A. Suppose that the application layer wants to modify Entry B’s data, the overall process is as follows:

  • Deep copy an Entry B object as Entry B copy
  • Copy Entry b into the linked list of primaryTable, and next points to Entry c
  • The application layer modifies the data of Entry B copy to data2

But does the process work? Entries a and c, as shown in the figure above, are shared with primaryTable and snapshotTable. Each Entry has only one next pointer, so can Entry A point to both Entry B and B copy? Definitely not, so Entry A cannot be shared. Below is the correct flow.

As shown in the figure below, when modifying Entry B, not only a copy of Entry B should be made, but also all the entries before Entry B in the linked list should be copied. In this way, Entry B can be modified on the premise of correctness. After all, correctness comes first.

Correct overall process:

  • Deep-copy Entry A and B objects are Entry A copy and B copy
  • Add Entry a copy and Entry b copy to the linked list of primaryTable, and the next of Entry b points to Entry c
  • The application layer modifies the data of Entry B copy to data2

** Summary: ** Assuming that Entry B needs to be modified, all entries before Entry B and Entry B in the linked list must be copied, and entries after Entry B can be shared.

■ 5.2.3 Inserting New Data

As shown in the figure above, new data is inserted into Entry D using the header insertion method. The header insertion method does not need to copy any data of the original list, but only needs to insert the latest data into the head of the list. In this way, primaryTable can access the inserted data without affecting SnapshotData’s access to the data of the original snapshot.

Note: This must be a scenario for inserting new data. For Map types, inserting old data may be a modification operation

■ 5.2.4 New nodes in the head of the linked list and then modify the middle node of the linked list

As shown in the figure above, there is a new Entry D in the head of the linked list and then Entry B is modified. The correct process is as follows:

  • Deep-copy Entry A and B objects are Entry A copy and B copy
  • Add Entry A copy and Entry B copy to the list of Entry D, and the next of Entry B points to Entry C
  • The application layer modifies the data of Entry B copy to data2

Before modifying Entry B, you need to copy all the previous entries of Entry B, but you do not need to copy Entry D. The previous copy is because the elements before Entry b are referenced by snapshotData, but Entry d is not referenced by snapshotData. Only primaryTable has Entry d, so no copy is required.

When modifying Entry B, which entries before Entry B need to be copied and which do not need to be copied will be explained in details in the subsequent source code section.

■ 5.2.5 Get intermediate node scenario

Theoretically, accessing the scene data data of the intermediate node is very secure.

As shown in the figure below, the Flink application layer accesses Entry B through primaryTable. In theory, scenarios that are only read do not need a copy. Because the original copy copy is changed by the application layer, to ensure the immutable nature of Snapshot data, a special copy is made for primaryTable to modify. But the amazing thing is that CopyOnWriteStateMap also needs to make a copy of Entry B and all entries before Entry B when it gets.

Why is that? The application layer gets the data object in Entry B. What if the application layer modifies the properties in the data object? For example, if data in Entry is a Person object, the Person object might have setter methods that modify its name and age. If the application layer changes the name or age, data changes occur during the Snapshot process.

So CopyOnWriteStateMap treats get the same as PUT. Both get and PUT require a copy of the Entry and the previous Entry.

■ 5.2.6 Remove Data Scenario

Two cases need to be distinguished: The Entry of remove is the head node of the linked list; The Entry of remove is not a linked list head node.

**Case1: the scenario where the Entry of **remove is the head node of the linked list is relatively simple. You can point the bucket directly to Next Entry B of Entry A.

**Case 2: The Entry of **remove is not the head node of the linked list. You need to copy all the entries before Entry B (new entries do not need to be copied), and the copy of the node before Entry B points directly to the next node of Entry B. A copy of Entry a is similar to that of put and get. The next pointer of Entry a cannot point to two nodes. Therefore, primaryTable and snapshotTable must have their own headers.

■ 5.2.7 COW principle summary

The above case basically covers a variety of scenarios. Here is a summary:

  • Insert a new Entry into the linked list using the header method

  • For example, to modify Entry B, you must copy Entry B and all the entries before Entry B in the linked list (new data does not need to be copied), and the entries after Entry B can be shared

  • The scenario of accessing Entry B is similar to that of modifying Entry B

  • If the modified or accessed data is the copied data, there is no need to copy it because the copied data is guaranteed to be exclusive to primaryTable and not shared with Snapshot

  • Remove data scenarios are divided into two cases:

    • If the Entry of remove is the head of the list, point the bucket directly to the next node of the head.
    • If the Entry of remove is not the head node of the linked list, you need to copy all the entries before the target Entry, and the copy of the node before the target Entry points directly to the next node of the target Entry. Of course, if the predecessor node is a new version, you do not need to copy it and can directly modify the next pointer of the predecessor Entry.

5.3 CopyOnWriteStateMap various operations source code in detail

■ 5.3.1 Introduction to CopyOnWriteStateMap

CopyOnWriteStateMap class is used to store data, supporting the function of CopyOnWrite, first introduce some relatively important fields in CopyOnWriteStateMap, the relevant source code is shown as follows (focus on the annotation of each field) :

Public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> Default number of buckets in the hash table 128 public static final int DEFAULT_CAPACITY = 128; // Hash The minimum number of data to be migrated is four. Private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4; // State's serializer protected final TypeSerializer<S> stateSerializer; Private static final StateMapEntry<? ,? ,? >[] EMPTY_TABLE = new StateMapEntry[MINIMUM_CAPACITY >>> 1]; Private int stateMapVersion private int stateMapVersion private int stateMapVersion // Each time a snapshot is created, You need to save the version of Snapshot to the Set. Private Final TreeSet<Integer> snapshotVersions; // TreeSet<Integer> snapshotVersions specifies the maximum version number private int highestRequiredSnapshotVersion; Private StateMapEntry<K, N, S>[] primaryTable; // A new table during capacity expansion. The array length is twice that of primaryTable. Private StateMapEntry<K, N, S>[] incrementalRehashTable; // Number of elements in primaryTable Private int primaryTableSize; / / the number of elements in incrementalRehashTable private int incrementalRehashTableSize; // Next index to be migrated for incremental rehash in primary table // That is, all data before rehashIndex in primaryTable has been migrated. Private int rehashIndex; // Expansion threshold. Similar to HashMap, expansion starts when the number of elements exceeds threshold. // Default threshold is StateMap capacity * 0.75 private int threshold; Private int modCount; private int modCount; private int modCount; }Copy the code

PrimaryTable is the hash table that stores data. PrimaryTable is the data of the StateMapEntry type. StateMapEntry is used to store a data entry in StateMap. Here is StateMapEntry.

S 5.3.2 StateMapEntry

StateMapEntry is the actual entity in CopyOnWriteStateMap that stores data. In Java HashMap, data is also encapsulated in Entry. The source code of Entry of HashMap is as follows:

Static class Node<K,V> implements map. Entry<K,V> {// The hash value of the current key final int hash; final K key; V value; // next points to the next Node in the bucket. }Copy the code

The static inner class Node in HashMap implements map. Entry, which has four fields: Hash, key, value, and Next. Hash refers to the hash value of the current key, and next refers to the next Node in the bucket.

HashMap finds data flow in get(key) :

  • Calculate the hash value based on the key to locate the bucket
  • Iterating through each Entry of the bucket, comparing hash values and keys (using equals to determine keys)
  • If the hash value and the equals method of the key match, the corresponding Entry is found and the value of the Entry is returned

The StateMapEntry source code is as follows:

protected static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> { final K key; final N namespace; S state; final int hash; StateMapEntry<K, N, S> next; // Version number of new entry int entryVersion; // state (data) update version int stateVersion; }Copy the code

StateMapEntry is similar to Entry in a HashMap. Other key, Hash, and next attributes are identical. The state in StateMapEntry represents the value in a HashMap, that is, the data stored in the HashMap.

StateMapEntry has three more fields than HashMap Entry:

  • Namespace: Namespace is a Flink concept used to distinguish different Windows. In StateMapEntry, key and namespace are combined as the common primary key and state as the value
  • EntryVersion: indicates the version number of an entry
  • StateVersion: indicates the version number when state (data) in the current StateMapEntry is updated

Since both key and namespace act as the primary key, in the get or PUT operation of CopyOnWriteStateMap, it is necessary to determine whether a matching Entry is found. The keys and namespace are also checked through equals. Only when the three parameters are verified, the corresponding Entry is found. This is quite different from HashMap and should be understood.

■ 5.3.3 Insert new data source code process

The put method of the CopyOnWriteStateMap class looks like this:

Public void put(K key, N namespace, S value) {// putEntry is used to find the corresponding Entry. Final StateMapEntry<K, N, S> e = putEntry(key, namespace); // set value to Entry e.state = value; // State is updated, so update stateVersion e.stateVersion = stateMapVersion; }Copy the code

The put method directly calls the putEntry method, which is used to find the corresponding Entry. PutEntry includes scenarios for modifying data or inserting new data. After finding the Entry, set the value to the Entry. The putEntry method is as follows:

Private StateMapEntry<K, N, S> putEntry(K key, N namespace) { Choose primaryTable or incrementalRehashTable final int hash = computeHashForOperationAndDoIncrementalRehash (key, namespace).  final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash); int index = hash & (tab.length - 1); For (StateMapEntry<K, N, S> e = TAB [index]; StateMapEntry<K, N, S> e = TAB [index]; e ! = null; E = e.next) {// If an Entry is found according to the key and namespace, it is treated as a data change. If (e.hash == hash &&key.equals (e.keey) &&namespace.equals (e.namespace)) {// Modify the data logic (ignored) if (e.e ntryVersion < highestRequiredSnapshotVersion) {e = handleChainedEntryCopyOnWrite (TAB, index, e); } // Modify the data, directly return the corresponding Entry return e; ++modCount +modCount +modCount +modCount +modCount +modCount +modCount +modCount +modCount +modCount if (size() > threshold) { doubleCapacity(); Return addNewStateMapEntry(TAB, key, namespace, hash); }Copy the code

The putEntry method first calculates the hash value for the current key and namespace, using selectActiveTable to select primary or incrementalRehashTable, The index of the bucket corresponding to the current element is then calculated.

Notice here that the normal HashMap structure has one Key and one value. In this case, the combination of the key and namespace is used as the Map key, and the value is the same as the original value.

The bucket is iterated through each Entry in the linked list. If the hash value, key, and Namespace are matched successfully, the corresponding Entry is found and the data is modified.

If no matching Entry is found after traversing all the elements in the bucket list, a new Entry is inserted, and the addNewStateMapEntry method is used to insert a new Entry into the head of the list (header method).

■ 5.3.4 Modifying the data source code process

In putEntry, the source code for modifying the data scenario is as follows:

// The normal HashMap structure has a key, If (e.hash == hash &&key.equals (e.keey) &&namespace.equals (e.namespace)) {// EntryVersion indicates the version number of the entry to create / / highestRequiredSnapshotVersion said the ongoing the snapshot maximum version number / / entryVersion less than HighestRequiredSnapshotVersion, explain some Snapshot Entry version is less than the current version number, / / i.e., the current Entry is the old version of the data, the current Entry by other Snapshot. / / in order to guarantee the Snapshot of the data is correct, there must be a e to create a new copy, and e before some of the elements of also need a copy of the copy / / handleChainedEntryCopyOnWrite method will copy accordingly, And returns the new copy of the e / / and then returns the handleChainedEntryCopyOnWrite method returns a copy of the e returned to the upper level, data modification operations. if (e.entryVersion < highestRequiredSnapshotVersion) { e = handleChainedEntryCopyOnWrite(tab, index, e); } / / on the contrary, entryVersion > = highestRequiredSnapshotVersion / / that the current Entry created version is higher than all the Snapshot version / / : The current Entry is a new version of the data and is not held by any Snapshot. // At this time, e is a new Entry and there is no sharing problem. Therefore, you can modify the current Entry directly. Therefore, return e; }Copy the code

Here is part of the source code of the previous section to insert new data, and now focuses on the process of modifying data. If an Entry is found based on the key and namespace, the Entry is considered to be a modification of the old data, and the corresponding modification logic is used. And then determine whether the current Entry entryVersion less than highestRequiredSnapshotVersion.

EntryVersion indicates the version number, of entry to create ongoing highestRequiredSnapshotVersion said those snapshot the biggest version number.

  • EntryVersion less than highestRequiredSnapshotVersion that Entry created some Snapshot version is less than the current version number, namely: The current Entry is an old version of data that is held by another Snapshot. In order to guarantee the Snapshot of the data is correct, there must be a e to create a new copy, and e before some of the elements also need to copy the copy, handleChainedEntryCopyOnWrite method will accordingly copy operations, and returns the new copy of the e. Finally, the copy of E is returned to the upper layer for data modification.
  • On the contrary, entryVersion > = highestRequiredSnapshotVersion, explain the current Entry created version is higher than all the version of the Snapshot. Snapshot cannot reference data of a higher version, so the current Entry is a new version of data that is not held by any Snapshot. At this point, E is a new Entry and there is no sharing problem. Therefore, you can directly modify the current Entry and return the current E.

HandleChainedEntryCopyOnWrite method: to create a new Entry e copy, and chain before the Entry in the table e certain elements also need to copy the copy, finally returned to a copy of the e.

So what elements should be copied and what elements should not be copied? New entries created after Snapshot do not need to be copied. The entries created before Snapshot will be referenced by Snapshot and therefore need to be copied.

HandleChainedEntryCopyOnWrite source as shown below:

private StateMapEntry<K, N, S> handleChainedEntryCopyOnWrite( StateMapEntry<K, N, S>[] tab, int mapIdx, StateMapEntry<K, N, S> untilEntry) {StateMapEntry<K, N, S> current = TAB [mapIdx]; StateMapEntry<K, N, S> copy; / / judge whether the version of the head node created below highestRequiredSnapshotVersion / / if below, then the current node is the Snapshot reference, So I need new a new Entry if (current) entryVersion < highestRequiredSnapshotVersion) {copy = new StateMapEntry < > (current, stateMapVersion); tab[mapIdx] = copy; } else { copy = current; } // Iterate over the elements of the current bucket until we get to the untilEntry node, which is the Entry node we want to modify while (current! = untilEntry) { current = current.next; / / the current version is less than highestRequiredSnapshotVersion, you need to copy, / / or you don't have to copy the if (current entryVersion < highestRequiredSnapshotVersion) {/ / entryVersion said create Entry version, Next = new StateMapEntry<>(current, stateMapVersion); // The entryVersion of the newly created Entry should be updated to the version copy of the current StateMap. copy = copy.next; } else { copy = current; } } return copy; }Copy the code

As you can see from the source code, the elements in the bucket are traversed from the start node to the Entry node to be modified. Are using current. EntryVersion < highestRequiredSnapshotVersion to determine whether the creation of the current node created version below highestRequiredSnapshotVersion.

  • If it is lower than that, the current node is referenced by Snapshot, so a new Entry is required, which is called copying a copy.
  • Otherwise, no copy.

When a new Entry is created, the entryVersion of the new Entry is updated to the version of the current StateMap, indicating that the Entry is a new version and is not referenced by Snapshot. In this way, when you want to modify the Entry, you can directly modify the Entry without making a copy.

■ 5.3.5 Accessing the data source code process

The get method of CopyOnWriteStateMap class is similar to putEntry. It iterates through the elements of the bucket in turn until the corresponding Entry is found according to the key and namespace, and then the corresponding Entry is returned. If no Entry matches the key or namespace after all entries of the bucket are iterated, null is returned.

If an Entry is found, a copy operation is also performed to ensure that the data referenced by Snapshot is not modified. In addition to copying other source code is relatively simple, similar to putEntry, so focus on analyzing the relevant source code after finding Entry. The relevant source code is as follows:

If ((e.hash == hash &&key.equals (eKey) &&namespace.equals (eNamespace))) {// Once you get the current data, to prevent the application layer from modifying the attributes inside the data, // Check whether the current State value is the old version of the data. // If the value is the old version, If (e.stateVersion < requiredVersion) {if (e.stateVersion < requiredVersion) {if (e.stateVersion < requiredVersion) { The Entry also need to copy a, / / copies according to the analysis before handleChainedEntryCopyOnWrite strategy can / / 2, the current data Entry is the new version, you don't need to copy, Can be directly modify its State if (e.e ntryVersion < requiredVersion) {e = handleChainedEntryCopyOnWrite (TAB, hash & (TAB. The length - 1), e); } // Update its stateVersion e.stateVersion = stateMapVersion; E.state = getStateSerializer().copy(e.state); } return e.state; }Copy the code

Once you get the current data, to prevent the application layer from changing the attribute values inside the data, you must ensure that this is an up-to-date Entry and update its stateVersion. First check if the current State (value) is old:

  • If the value is an older version, you must deeply copy a value
  • Otherwise, the value is the new version and is returned directly to the application layer

If value is, there are two different cases:

  • Is the old version 1, if the current Entry, the Entry also need to copy a, according to the analysis before handleChainedEntryCopyOnWrite strategy copy
  • 2. If the current Entry is of a new version, you do not need to copy it. You can directly modify the State of the Entry

Case 1 easy to understand, as shown in the figure below to access Entry b is 1 case scenario, you need to use handleChainedEntryCopyOnWrite method for Entry b and a copy operation, Then a deep copy is made of Entry B’s value object, so Entry B and B copies do not share data objects.

Although Entry A also makes a copy of the generated Entry A copy, the value object in Entry A does not make a deep copy, but shares the datA1 object. After Entry B, Entry A and a copy reference data1. The state of Entry A and a copy reference datA1 together. For modifying Entry a, the next get operation will correspond to the above case 2 scenario: the stateVersion is the old version, but Entry a copy belongs to the new version. In this case, you do not need to copy the Entry. You only need to make a deep copy of the State to ensure that the State of Entry A is not returned to the application layer.

■ 5.3.6 remove data source code process

The removeEntry source code is as follows:

private StateMapEntry<K, N, S> removeEntry(K key, N namespace) { final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash); int index = hash & (tab.length - 1); for (StateMapEntry<K, N, S> e = tab[index], prev = null; e ! = null; prev = e, E = e.next) {if (e.hash == hash && key.equals(e.keey) && namespace.equals(e.namespace)) {// If the Entry to be deleted does not have a predecessor, Note The Entry to be deleted is a header. // Directly point the bucket to the next node of the header. if (prev == null) { tab[index] = e.next; } else {// If the Entry of remove is not the head node of the list, copy all the entries before the target Entry, and the copy of the node before the target Entry points directly to the next node of the target Entry. // If the successor Entry is a new version, you do not need to copy it. You can directly modify the next pointer to the successor Entry. // copy-on-write check for entry if (prev.entryVersion < highestRequiredSnapshotVersion) { prev = handleChainedEntryCopyOnWrite(tab, index, prev); } prev.next = e.next; } // Modify some counters ++modCount; if (tab == primaryTable) { --primaryTableSize; } else { --incrementalRehashTableSize; } return e; } } return null; }Copy the code

Remove data scenarios are divided into two cases:

  • If the Entry of remove is the head of the list, point the bucket directly to the next node of the head.
  • If the Entry of remove is not the head node of the linked list, you need to copy all the entries before the target Entry, and the copy of the node before the target Entry points directly to the next node of the target Entry. Of course, if the predecessor node is a new version, you do not need to copy it and can directly modify the next pointer of the predecessor Entry.

The source code is relatively clear plus has been detailed analysis of put and get source code, so remove source code directly combined with the principle to see the annotations.

6.Snapshot process and release operation after completion

The previous analysis of CopyOnWriteStateMap expansion Rehash principle and source code, Snapshot shallow copy principle and source code and CopyOnWrite implementation principle and source code.

CopyOnWrite is mainly implemented to reduce the pause time of Checkpoint synchronization, and the data snapshot process is as asynchronous as possible. The following describes the Snapshot asynchronous Snapshot process and release operations after the Snapshot is complete.

HeapSnapshotStrategy class AsyncSnapshotCallable anonymous inner class will call AbstractStateTableSnapshot callInternal method WriteStateInKeyGroup method, passing each KeyGroupId in turn as an argument.

The writeStateInKeyGroup method is as follows:

public void writeStateInKeyGroup(@Nonnull DataOutputView dov, Int keyGroupId) {/ / obtain corresponding CopyOnWriteStateMapSnapshot keyGroupId StateMapSnapshot < K, N, S,? extends StateMap<K, N, S>> stateMapSnapshot = getStateMapSnapshotForKeyGroup(keyGroupId); / / will output the State data serialization stateMapSnapshot stateMapSnapshot. WriteState (localKeySerializer localNamespaceSerializer, localStateSerializer, dov, stateSnapshotTransformer); Statemapsnapshot.release (); statemapsnapshot.release (); }Copy the code

WriteStateInKeyGroup method to get the corresponding CopyOnWriteStateMapSnapshot KeyGroupId, then the State data serialization stateMapSnapshot output, This step serializes all data referenced by stateMapSnapshot to external storage. Once serialization is complete, the snapshot can be released.

Release calls releaseSnapshot, CopyOnWriteStateMap, releaseSnapshot, releaseSnapshot

Void releaseSnapshot(int snapshotVersion) {synchronized (snapshotVersions) {// Change the corresponding snapshotVersion from Remove in snapshotVersions snapshotVersions. Remove (snapshotVersion); / / to a maximum of snapshotVersions update highestRequiredSnapshotVersion, / / if snapshotVersions is empty, HighestRequiredSnapshotVersion update as 0 highestRequiredSnapshotVersion = snapshotVersions. IsEmpty ()? 0 : snapshotVersions.last(); }}Copy the code

The releaseSnapshot method removes the corresponding snapshotVersion from snapshotVersions, And will update to the maximum snapshotVersions highestRequiredSnapshotVersion, if snapshotVersions is empty, the highestRequiredSnapshotVersion update to 0.

There is a small question: according to the previous process analysis, if a large number of GET and PUT operations occur in the Flink application layer during the Snapshot process, many entries and states will have multiple copies. After Snapshot is finished, the old versions of data should be cleaned up. But did not see the old version of the data to clean operation?

As shown in the figure above, there are copies of Entry B and A. After the Snapshot ends, the new data is in the Entry A copy and the Entry B copy, so Entry A and B should be cleared. Keep Entry A copy and Entry B copy. But there is nothing in the code to clean up Entries A and B. Will there be a memory leak?

After the Snapshot ends, the hash table corresponding to the snapshotData is no longer referenced by the asynchronous Snapshot thread, so Entry A and Entry B become unreachable objects and are reclaimed by the JVM’s GC.

7. To summarize

This paper introduces the design principle of CopyOnWriteStateTable and related source code in detail, mainly from rehash and CopyOnWrite two points for in-depth analysis, I hope to help you.

The github repository is in the feature/source-code-read-1-9-0 branch and will be updated later:

Github.com/1996fanrui/…