See the memory calculation process from the startup exception log of the Flink cluster

See the memory calculation process from the startup exception log of the Flink cluster

Special reminder: The current Flink version is: 1.10

In the process of starting the Flink cluster once, the following configuration is used in terms of memory parameters:

taskmanager.memory.process.size: 1728m taskmanager.memory.managed.size: 0m taskmanager.memory.task.heap.size: 1024M copy the code

Unfortunately, in the implementation

start-cluster.sh
When scripting, an error is reported.

In view of this error, a simple understanding of the Flink memory distribution was made.
If there is any improper analysis or one-sidedness, please point it out in the comments and learn and make progress together.

Basic knowledge


The picture above is a detailed picture of Flink TaskManager memory.
Total Process Memory [ taskmanager.memory.process.size ]: Used to declare the total amount of memory allocated to the Flink JVM process. Mainly used for containerized deployment (such as K8s, Yarn, etc.), which corresponds to the memory size of the requested container.
Total Flink Memory [ taskmanager.memory.flink.size ]: This parameter is more inclined to indicate how much memory is allocated for Flink itself. This parameter is mainly used in Standalone deployment mode.
Framework Heap[ taskmanager.memory.framework.heap.size ]: This parameter is the frame heap memory size of the TaskExecutor process, and the default value is 128 mb . Among them, this parameter is an advanced parameter and it is not recommended to modify it at will.
Task Heap[ taskmanager.memory.task.heap.size ]: This parameter is the size of the heap memory that the Flink task can use.
Managed Memory[ taskmanager.memory.managed.size ]: This parameter is used to configure the managed memory size of TaskExecutor. Mainly used for sorting, hash tables, caching of intermediate results in batch tasks, and RocksDB status backend for stream tasks.
Framework Off-heap[taskmanager.memory.framework.off-heap.size ]: This parameter is used to configure the off-heap memory size reserved by the TaskExecutor process, the default value is 128 mb . Among them, this parameter is an advanced parameter, and it is not recommended to modify it at will.
Task Off-Heap[ taskmanager.memory.task.off-heap.size ]: This parameter indicates that the Flink task can use the off-heap memory size, and the default is 0 bytes .
Network: This memory is mainly used to store shuffle data, such as network buffers. The parameters related to this part of the memory are: taskmanager.memory.network.fraction[default value: 0.1] , taskmanager.memory.network.max[default value: 1 gb] , taskmanager.memory.network.min[default value: 64 mb] .
JVM Metaspace[ taskmanager.memory.jvm-metaspace.size ]: This parameter represents the JVM Metaspace size of the TaskExecutor process.
JVM Overhead: This memory is mainly reserved for JVM overhead, such as thread stack space, compile cache, etc. The parameters related to this part of the memory are:taskmanager.memory.jvm-overhead.fraction[default value: 0.1] , taskmanager.memory.jvm-overhead.max[default value: 1 gb], Taskmanager.memory.jvm-overhead.min[default value: 192 mb] .

Finally, when starting the TaskExecutor process, Flink configures memory-related JVM parameters according to the size of the configured or overturned memory component:

JVM ArgumentsValue
-Xmx and -XmsFramework Heap + Task Heap Memory
-XX:MaxDirectMemorySizeFramework Off-Heap + Task Off-Heap + Network Memory
-XX:MaxMetaspaceSizeJVM Metaspace

The specific information of the above JVM parameters will be printed out in log mode during the startup process of TaskExecutor, as shown in the figure:

Calculation formula and source code analysis

Memory calculation execution logic

# step1 start-cluster.sh # Start TaskManager instance(s) TMSlaves start # TMSlaves is a method in config.sh # step2 config.sh # starts or stops TMs on all slaves TMSlaves() { ... if [[ $? -ne 0 ]]; then for slave in ${SLAVES[@]}; do ssh -n $FLINK_SSH_OPTS $slave - "nohup/bin/bash -l/"${FLINK_BIN_DIR}/taskmanager.sh\"/"${CMD}\" &" done ... } # step3 taskmanager.sh jvm_params_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_JVM_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}") dynamic_configs_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_DYNAMIC_CONFIGS ${FLINK_CONF_DIR} $FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar) "${ARGS[@]}") # step4 config.sh runBashJavaUtilsCmd() { ... local output=`${JAVA_RUN} -classpath "${class_path}" org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000 ` ... echo "$output" } Copy code

From the above script call stack, we can find that what is actually executed in the end is

org.apache.flink.runtime.util.BashJavaUtils
class.

public class BashJavaUtils { private static final String EXECUTION_PREFIX = "BASH_JAVA_UTILS_EXEC_RESULT:" ; public static void main (String[] args) throws Exception { checkArgument(args.length> 0 , "Command not specified." ); switch (Command.valueOf(args[ 0 ])) { case GET_TM_RESOURCE_DYNAMIC_CONFIGS: getTmResourceDynamicConfigs(args); break ; case GET_TM_RESOURCE_JVM_PARAMS: getTmResourceJvmParams(args); break ; default : //unexpected, Command#valueOf should fail if a unknown command is passed in throw new RuntimeException( "Unexpected, something is wrong." ); } } private static void getTmResourceDynamicConfigs (String[] args) throws Exception { Configuration configuration = getConfigurationForStandaloneTaskManagers(args); TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configuration); System.out.println(EXECUTION_PREFIX + TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec)); } private static void getTmResourceJvmParams (String[] args) throws Exception { Configuration configuration = getConfigurationForStandaloneTaskManagers(args); TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configuration); System.out.println(EXECUTION_PREFIX + TaskExecutorProcessUtils.generateJvmParametersStr(taskExecutorProcessSpec)); } ... } Copy code

in

BashJavaUtils
In the code, different methods will be called according to the current Jvm parameters ( GET_TM_RESOURCE_JVM_PARAMS ) or the Flink TaskExecutor memory component ( GET_TM_RESOURCE_DYNAMIC_CONFIGS ). In fact, these two cases call the same set of code, and the execution logic is also the same (PS: may be to print out the memory-related JVM configuration and the memory size of the Flink TaskExecutor memory component respectively).
To
getTmResourceDynamicConfigs()
As an example,

private static void getTmResourceDynamicConfigs (String[] args) throws Exception { # Compatible with Flink 1.9 memory configuration writing, not introduced here Configuration configuration = getConfigurationForStandaloneTaskManagers(args); # According to the configuration in flink-conf.yaml, analyze and overthrow the memory allocation TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(configuration); System.out.println(EXECUTION_PREFIX + TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec)); } Copy code

TaskExecutorProcessUtils.processSpecFromConfig(configuration)
Mainly used according to
flink-conf.yaml
Analyze and derive the memory allocation of the Flink TaskExecutor memory component.

public static TaskExecutorProcessSpec processSpecFromConfig ( final Configuration config) { if (isTaskHeapMemorySizeExplicitlyConfigured(config) && isManagedMemorySizeExplicitlyConfigured(config)) { //both task heap memory and managed memory are configured, use these to derive total flink memory return deriveProcessSpecWithExplicitTaskAndManagedMemory(config); } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) { //either of task heap memory and managed memory is not configured, total flink memory is configured, //derive from total flink memory return deriveProcessSpecWithTotalFlinkMemory(config); } else if (isTotalProcessMemorySizeExplicitlyConfigured(config)) { //total flink memory is not configured, total process memory is configured, //derive from total process memory return deriveProcessSpecWithTotalProcessMemory(config); } else { throw new IllegalConfigurationException(String.format( "Either Task Heap Memory size (%s) and Managed Memory size (%s), or Total Flink" + "Memory size (%s), or Total Process Memory size (% s) need to be configured explicitly." , TaskManagerOptions.TASK_HEAP_MEMORY.key(), TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), TaskManagerOptions.TOTAL_FLINK_MEMORY.key(), TaskManagerOptions.TOTAL_PROCESS_MEMORY.key())); } } Copy code

According to the above code, we can clearly find that the calculation of the Flink memory component is determined according to the user's configuration, which is mainly divided into the following three situations ( priority from top to bottom ):

  • Set task heap memory and managed memory, calculate total flink memory memory
  • Specify total flink memory, calculate managed memory, network memory, and task heap memory
  • Specify total flink memory and task heap memory, and calculate managed memory and network memory
  • Specify total process memory, and calculate jvm-overhead, managed memory, network memory, and task memory.

Memory configuration and calculation

Set task heap memory and managed memory, calculate network memory and total flink memory

  • Step1 Read task heap memory, managed memory, framework heap, framework off-heap, task off-heap from the configuration, and then sum it up and record it as
    totalFlinkExcludeNetworkMemorySize
final MemorySize taskHeapMemorySize = getTaskHeapMemorySize(config); final MemorySize managedMemorySize = getManagedMemorySize(config); final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config); //128m final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize(config); //128m final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config); //0m final MemorySize networkMemorySize; final MemorySize totalFlinkExcludeNetworkMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize); Copy code
  • step2 calculate network size
  • Step 2.1 On the basis of configuring task heap memory and managed memory, additional configuration of total flink memory memory. At this time, obtain the total flink memory value, and then calculate the network memory: total flink memory-totalFlinkExcludeNetworkMemorySize ;
//derive network memory from total flink memory, and check against network min/max final MemorySize totalFlinkMemorySize = getTotalFlinkMemorySize(config); networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize); Copy code
  • Step2.2 configures task heap memory and managed memory, but does not configure total flink memory. At this time, the calculation method of network memory has also changed: totalFlinkExcludeNetworkMemorySize * (network.fraction/(1-netowrk.fraction)) . Finally, calculate whether the network memory size is between network.min and network.max, otherwise take the maximum value;
final MemorySize relative = base.multiply(rangeFraction.fraction/( 1 -rangeFraction.fraction)); capToMinMax(memoryDescription, relative, rangeFraction); Copy code
  • step3 calculate jvm-overhead
  • Step 3.1 is additionally configured with total process memory on the basis of the above. In this case, obtain the jvm metaspace and total process memory from the configuration, and then calculate the jvm-overhead: total process memory-(total flink memory + jvm metaspace) ;
Final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize (config); Final MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add (jvmMetaspaceSize); Final MemorySize jvmOverheadSize = totalProcessMemorySize.subtract (totalFlinkAndJvmMetaspaceSize); duplicated code
  • Step3.2 Of course, there may be no additional configuration of total process memory. In this case, obtain the jvm metaspace from the configuration, and calculate the jvm-overhead by the formula: (total flink memory + jvm metaspace) * (jvm-overhead.fraction/(1-jvm-overhead.fraction)) . Finally, determine whether the size of jvm overhead is between jvm overhead.min and jvm-overhead.max, otherwise the maximum value is used.
Final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize (config); Final MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add (jvmMetaspaceSize); Final MemorySize jvmOverheadSize = deriveJvmOverheadWithInverseFraction (config, totalFlinkAndJvmMetaspaceSize); duplicated code

Specify total flink memory, calculate managed memory and network memory

  • step1 get total flink memory from the configuration
final MemorySize totalFlinkMemorySize = getTotalFlinkMemorySize(config); Copy code
  • step2 calculate managed memory and network memory
  • step2.1 If additional task heap memory is specified, read the memory values of framework heap, framework off-heap, task off-heap, task heap, etc. from the configuration, and then calculate managed memory and netowrk memory (see note);
final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config); final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize(config); final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config); final MemorySize taskHeapMemorySize = getTaskHeapMemorySize(config); # If the managed memory size is configured, take out the managed memeory value from the configuration; if not, use the fraction calculation; final MemorySize managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize); final MemorySize totalFlinkExcludeNetworkMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize); # network memory = total flink memory-totalFlinkExcludeNetworkMemorySize Final MemorySize networkMemorySize = totalFlinkMemorySize.subtract (totalFlinkExcludeNetworkMemorySize); duplicated code
  • Step2.2 If no additional task heap memory is specified, use managed.fraction to calculate managed memory, and network.fraction to calculate network memory. [managed memory = total flink memory * managed.faction; network memory = total flink memory * network.fraction] . Finally, the task heap memory is calculated.
managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize); networkMemorySize = deriveNetworkMemoryWithFraction(config, totalFlinkMemorySize); final MemorySize totalFlinkExcludeTaskHeapMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize).add(networkMemorySize); taskHeapMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeTaskHeapMemorySize); Copy code
  • step3 calculate jvm-overhead
  • Step3.1 If total process memory is configured, then jvm-overhead = total process memory-(total flink memory + jvm metaspace) .
final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize(config); final MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize); final MemorySize totalProcessMemorySize = getTotalProcessMemorySize(config); final MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize); Copy code
  • Step3.2 If total process memory is not specified, the jvm-overhead can be obtained by the following calculation formula: (total flink memory + jvm metaspace) * (jvm-overhead.fraction/(1-jvm-overhead.fraction)) .
final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize(config); final MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize); Final MemorySize jvmOverheadSize = deriveJvmOverheadWithInverseFraction (config, totalFlinkAndJvmMetaspaceSize); duplicated code

Specify total process memory, calculate jvm-overhead, managed memory, network memory, task heap memory

  • Step1 Obtain total process memory and jvm metaspace from the configuration, and obtain jvm-overhead [jvm-overhead = total process memory * jvm-overhead.fraction] according to jvm-overhead.fraction .
Final MemorySize totalProcessMemorySize = getTotalProcessMemorySize (config); Final MemorySize jvmMetaspaceSize = getJvmMetaspaceSize (config); Final MemorySize jvmOverheadSize = deriveJvmOverheadWithFraction (config, totalProcessMemorySize); duplicated code
  • step2 derive total flink memory [total flink memory = total process memory-jvm metaspace-jvm-overhead] .
final MemorySize totalFlinkMemorySize = totalProcessMemorySize.subtract(jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize()); Copy code
  • Step3 first obtain the framework heap, framework off-heap, task off-heap values from the configuration, and then derive the memory size of the flink memory component.
Final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize (config); Final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize (config); Final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize (config); duplicated code
  • Step3.1 If task heap memory is additionally specified, obtain task heap memory from the configuration, then derive managed memory, and finally calculate network memory. (See note)
taskHeapMemorySize = getTaskHeapMemorySize(config); # If the managed size is configured, take the value directly; if it is not configured, use fraction to calculate: total flink memory * managed.fraction managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize); final MemorySize totalFlinkExcludeNetworkMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize); # network = total flink memory-totalFlinkExcludeNetworkMemorySize networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize); Copy code
  • Step 3.2 If task heap memory is not configured, first derive managed memory and network memory, and finally get tash heap memory. (See note)
# If the managed size is configured, take the value directly; if it is not configured, use fraction to calculate: total flink memory * managed.fraction managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize); # If the network size is configured, take the value directly; if it is not configured, use the fraction calculation to get: total flink memory * network.fraction networkMemorySize = deriveNetworkMemoryWithFraction(config, totalFlinkMemorySize); final MemorySize totalFlinkExcludeTaskHeapMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize).add(networkMemorySize); # task heap memory = total flink memory-totalFlinkExcludeTaskHeapMemorySize; taskHeapMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeTaskHeapMemorySize); Copy code