Note: The current version of Flink is: 1.10
During a Flink cluster startup, the following configuration is used in terms of memory parameters:
taskmanager.memory.process.size: 1728m
taskmanager.memory.managed.size: 0m
taskmanager.memory.task.heap.size: 1024m
Copy the code
Unfortunately, an error was reported while executing the start-cluster.sh script.
In view of this error, a simple understanding of the Flink memory distribution is given.
If there is any improper or one-sided analysis, please point it out in the comments, learn together and make progress together.
Basic knowledge of
The figure above shows the memory detail of Flink TaskManager.
Total Process Memory [taskmanager.memory.process.size] : used to declare the total amount of memory allocated to the Flink JVM process. It is used for containerized deployment (such as K8s and Yarn) and corresponds to the memory size of the requested container.
Total Flink Memory [taskmanager.memory.flink.size] : This parameter is more likely to indicate how much memory is allocated to Flink itself. This parameter is used in Standalone deployment mode.
Framework Heap[taskmanager.memory.framework.heap.size] : Indicates the TaskExecutor process frame heap memory size. The default value is128 mb. This parameter is an advanced parameter and you are not advised to change it at will.
Task Heap[taskmanager.memory.task.heap.size] : This parameter is the size of heap memory that the Flink task can use.
Managed Memory[taskmanager.memory.managed.size] : This parameter configures the size of the TaskExecutor’s managed memory. It is mainly used for sorting, hash tables, caching of intermediate results in batch tasks, and the RocksDB state backend of flow tasks.
Framework Off-heap[taskmanager.memory.framework.off-heap.size] : This parameter is used to set the size of the TaskExecutor process’s off-heap memory. The default value is128 mb. This parameter is an advanced parameter and you are not advised to change it at will.
Task Off-Heap[taskmanager.memory.task.off-heap.size] : This parameter indicates the size of out-of-heap memory that Flink tasks can use. The default value is0 bytes.
Network: This memory is mainly used to store shuffle data, for example, Network buffers. The parameters associated with this part of memory are:Taskmanager.memory.net work. Fraction [default value: 0.1].Taskmanager.memory.net work. Max [default: 1 gb].Taskmanager.memory.net work. Min [default value: 64 MB].
JVM Metaspace[taskmanager.memory.jvm-metaspace.size] : This parameter represents the JVM Metaspace size of the TaskExecutor process.
JVM Overhead: This memory is reserved mainly for JVM Overhead, such as Thread stack space, compile cache, etc. The parameters associated with this part of memory are:Taskmanager.memory.jvm -overhead. Fraction [default: 0.1].Taskmanager.memory.jvm -overhead. Max [default: 1 GB].Taskmanager.memory.jvm -overhead. Min [default: 192 MB].
Finally, when the TaskExecutor process is started, Flink configures memory-related JVM parameters based on the size of the memory component configured or pushed:
JVM Arguments | Value |
---|---|
-Xmx and -Xms | Framework Heap + Task Heap Memory |
-XX:MaxDirectMemorySize | Framework Off-Heap + Task Off-Heap + Network Memory |
-XX:MaxMetaspaceSize | JVM Metaspace |
The JVM parameter details above will be printed as a log during TaskExecutor startup, as shown in the following figure:
Calculation formula and source code analysis
Memory calculation execution logic
# step1 start-cluster.sh
# Start TaskManager instance(s)TMSlaves Start # TMSlaves is the method in config.sh
# step2 config.sh
# starts or stops TMs on all slaves
TMSlaves() {
...
if [[ $? -ne 0 ]]; then
for slave in ${SLAVES[@]}; do
ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
done
...
}
# step3 taskmanager.sh
jvm_params_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}")
dynamic_configs_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${FLINK_CONF_DIR} $FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar) "${ARGS[@]}")
# step4 config.sh
runBashJavaUtilsCmd() {
...
local output=`${JAVA_RUN} -classpath "${class_path}" org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
...
echo "$output"
}
Copy the code
From the script above the call stack, we can find that the actual execution is org. Apache. Flink. Runtime. Util. BashJavaUtils class.
public class BashJavaUtils {
private static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:";
public static void main(String[] args) throws Exception {
checkArgument(args.length > 0."Command not specified.");
switch (Command.valueOf(args[0]) {case GET_TM_RESOURCE_DYNAMIC_CONFIGS:
getTmResourceDynamicConfigs(args);
break;
case GET_TM_RESOURCE_JVM_PARAMS:
getTmResourceJvmParams(args);
break;
default:
// unexpected, Command#valueOf should fail if a unknown command is passed in
throw new RuntimeException("Unexpected, something is wrong."); }}private static void getTmResourceDynamicConfigs(String[] args) throws Exception {
Configuration configuration = getConfigurationForStandaloneTaskManagers(args);
TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configuration);
System.out.println(EXECUTION_PREFIX + TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec));
}
private static void getTmResourceJvmParams(String[] args) throws Exception { Configuration configuration = getConfigurationForStandaloneTaskManagers(args); TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configuration); System.out.println(EXECUTION_PREFIX + TaskExecutorProcessUtils.generateJvmParametersStr(taskExecutorProcessSpec)); }... }Copy the code
In the BashJavaUtils code, the Jvm parameter (GET_TM_RESOURCE_JVM_PARAMS) is currently evaluated, Again, the Flink TaskExecutor memory component (GET_TM_RESOURCE_DYNAMIC_CONFIGS) calls a different method. In fact, both cases invoke the same set of code and execute the same logic (PS: presumably to print out the memory-related JVM configuration and the memory size of the Flink TaskExecutor memory component, respectively). With getTmResourceDynamicConfigs (), for example,
private static void getTmResourceDynamicConfigs(String[] args) throws Exception {# compatible with Flink1.9The memory Configuration method, here do not introduce the Configuration Configuration = getConfigurationForStandaloneTaskManagers (args); # according to the configuration in flink-conf.yaml, TaskExecutorProcessSpec TaskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configuration); System.out.println(EXECUTION_PREFIX + TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec)); }Copy the code
TaskExecutorProcessUtils. ProcessSpecFromConfig (configuration) is mainly used for according to the flink – the conf. Yaml parsing and deducing flink TaskExecutor memory components of memory allocation.
public static TaskExecutorProcessSpec processSpecFromConfig(final Configuration config) {
if (isTaskHeapMemorySizeExplicitlyConfigured(config) && isManagedMemorySizeExplicitlyConfigured(config)) {
// both task heap memory and managed memory are configured, use these to derive total flink memory
return deriveProcessSpecWithExplicitTaskAndManagedMemory(config);
} else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
// either of task heap memory and managed memory is not configured, total flink memory is configured,
// derive from total flink memory
return deriveProcessSpecWithTotalFlinkMemory(config);
} else if (isTotalProcessMemorySizeExplicitlyConfigured(config)) {
// total flink memory is not configured, total process memory is configured,
// derive from total process memory
return deriveProcessSpecWithTotalProcessMemory(config);
} else {
throw new IllegalConfigurationException(String.format("Either Task Heap Memory size (%s) and Managed Memory size (%s), or Total Flink"
+ " Memory size (%s), or Total Process Memory size (%s) need to be configured explicitly.", TaskManagerOptions.TASK_HEAP_MEMORY.key(), TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), TaskManagerOptions.TOTAL_FLINK_MEMORY.key(), TaskManagerOptions.TOTAL_PROCESS_MEMORY.key())); }}Copy the code
According to the above code, we can clearly find that the calculation of Flink memory components is determined according to the configuration of the user, mainly divided into the following three cases (priority from top to bottom) :
- Set task Heap memory and managed Memory to calculate total Flink memory
- Specify total Flink memory, calculate managed memory, Network memory, and Task heap memory
- Specify total Flink memory and Task heap memory, calculate managed memory and network memory
- Jvm-overhead, managed memory, network memory, task memory
Memory configuration and calculation
Set task heap memory and managed memory, calculate network memory and Total Flink memory
- step1Read task Heap Memory, managed Memory, Framework heap, Framework off-heap, Task off-heap from the configuration, and sum, denoting as
totalFlinkExcludeNetworkMemorySize
;
final MemorySize taskHeapMemorySize = getTaskHeapMemorySize(config);
final MemorySize managedMemorySize = getManagedMemorySize(config);
final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config); // 128m
final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize(config); // 128m
final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config); // 0m
final MemorySize networkMemorySize;
finalMemorySize totalFlinkExcludeNetworkMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMe morySize);Copy the code
- Step2 calculate network size
- Step2.1 configure total flink memory on the basis of task heap memory and managed memory. This time, to obtain the total flink memory value, can calculate the network memory: total flink memory – totalFlinkExcludeNetworkMemorySize;
// derive network memory from total flink memory, and check against network min/max
final MemorySize totalFlinkMemorySize = getTotalFlinkMemorySize(config);
networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize);
Copy the code
- Step2.2 configure task heap memory and managed memory, but do not configure total flink memory. At this time, the calculation mode of network memory also changes: TotalFlinkExcludeNetworkMemorySize * (network. Fraction/(1 – netowrk. Fraction)). Finally, calculate whether the network memory size is between network.min and network. Max; otherwise, take the maximum value.
final MemorySize relative = base.multiply(rangeFraction.fraction / (1 - rangeFraction.fraction));
capToMinMax(memoryDescription, relative, rangeFraction);
Copy the code
- Step3 computing JVM – overhead
- Step3.1 on the above basis, additional configuration of total process memory. In this case, the JVM metaspace and Total Process Memory obtained from the configuration can be inferred jVM-overhead: Total Process Memory – (Total Flink Memory + JVM Metaspace);
final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize(config);
final MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
final MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
Copy the code
- Step3.2 of course, total process memory may not be configured in additional ways. In this case, the JVM metaspace is obtained from the configuration, and the JVM-overhead is computed by the formula: (Total flink memory + JVM overhead) * (JVm-overhead. Fraction /(1-jVm-overhead. Finally, determine whether the JVM overhead size is between JVM overhead. Min and JVM-overhead. Max, otherwise take the maximum value.
final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize(config);
final MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
final MemorySize jvmOverheadSize = deriveJvmOverheadWithInverseFraction(config, totalFlinkAndJvmMetaspaceSize);
Copy the code
Specify total Flink memory and calculate managed memory and network memory
- Step1 obtain total flink memory from the configuration
final MemorySize totalFlinkMemorySize = getTotalFlinkMemorySize(config);
Copy the code
- Step2 calculate managed memory and network memory
- Step2.1 if you specify task heap memory, read the memory values of framework heap, framework off-heap, task off-heap, and task heap from the configuration. Then calculate managed memory and Netowrk memory (see comments).
final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config);
final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize(config);
final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config);
finalMemorySize taskHeapMemorySize = getTaskHeapMemorySize(config); Select managed memeory from managed memory size; If no, use fraction.final MemorySize managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);
finalMemorySize totalFlinkExcludeNetworkMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMe morySize); # network memory = total flink memory - totalFlinkExcludeNetworkMemorySizefinal MemorySize networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize);
Copy the code
- Step2.2 If no task heap memory is specified, run managed. Fraction to calculate managed memory and network. Fraction to calculate network memory. [Managed memory = Total flink memory * managed.faction; Network memory = Total flink memory * network.fraction] Finally, the Task Heap Memory is computed.
managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);
networkMemorySize = deriveNetworkMemoryWithFraction(config, totalFlinkMemorySize);
finalMemorySize totalFlinkExcludeTaskHeapMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize).add(networkMem orySize); taskHeapMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeTaskHeapMemorySize);Copy the code
- Step3 computing JVM – overhead
- Overhead = total Process Memory – (Total flink Memory + JVM metaspace)
final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize(config);
final MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
final MemorySize totalProcessMemorySize = getTotalProcessMemorySize(config);
final MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
Copy the code
- Step3.2 If total Process memory is not specified, jVM-overhead is obtained using the following calculation formula: (Total flink memory + JVM overhead) * (JVm-overhead. Fraction/(1-jVm-overhead.
final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize(config);
final MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
final MemorySize jvmOverheadSize = deriveJvmOverheadWithInverseFraction(config, totalFlinkAndJvmMetaspaceSize);
Copy the code
Specifies total process memory, projecting JVM-overhead, managed memory, network memory, task Heap memory
- Step1 obtain total process memory from configuration, JVM metaspace, Jvm-overhead = total process memory * JVm-overhead. Fraction
final MemorySize totalProcessMemorySize = getTotalProcessMemorySize(config);
final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize(config);
final MemorySize jvmOverheadSize = deriveJvmOverheadWithFraction(config, totalProcessMemorySize);
Copy the code
- Step2 Derive total Flink memory [Total Flink memory = Total Process Memory – JVM metaspace – JVM -overhead].
final MemorySize totalFlinkMemorySize = totalProcessMemorySize.subtract(jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize());
Copy the code
- Step3 obtain the framework heap, framework off-heap, and task off-heap values from the configuration, and then deduce the memory size of the flink memory component.
final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config);
final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize(config);
final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config);
Copy the code
- Step3.1 if additional task heap memory is specified, obtain task heap memory from the configuration, then deduce managed memory, and finally calculate network memory. (See note)
taskHeapMemorySize = getTaskHeapMemorySize(config); Managed size = managed size; If no value is configured, fraction is used to calculate: total flink memory * managed.fraction managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);finalMemorySize totalFlinkExcludeNetworkMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMe morySize); # network = total flink memory - totalFlinkExcludeNetworkMemorySize networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize);Copy the code
- Step3.2 if task heap memory is not configured, first deduce managed memory and network memory, and finally obtain tash heap memory. (See note)
Managed size = managed size; If no value is configured, fraction is used to calculate: total flink memory * managed.fraction managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize); # if network size is configured, the value can be set directly. If no value is configured, fraction is used to calculate: total flink memory * network.fraction networkMemorySize = deriveNetworkMemoryWithFraction(config, totalFlinkMemorySize);finalMemorySize totalFlinkExcludeTaskHeapMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize).add(networkMem orySize); # task heap memory = total flink memory - totalFlinkExcludeTaskHeapMemorySize; taskHeapMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeTaskHeapMemorySize);Copy the code