preface

After the Standalone migration from Flink Job to OnYarn, the Job performance is reduced. Before the migration, the data consumption speed is 8.3W+/S. After the migration to Yarn, the data consumption speed is 7.8W+/S, and the consumption speed is slightly shaken. After cause analysis and test verification, the problem is solved by halving the number of containers and changing the resources held by each Container from 1C2G 1Slot to 2C4G 2Slot without changing the resources allocated to jobs.

After experiencing this problem, I realized that a deeper understanding of Slot and the Flink Runtime Graph was necessary, so I wrote this article. This paper is divided into two parts. The first part analyzes the relationship between Flink Slot and Job operation in detail, and the second part introduces the problems encountered and solutions in detail.



Flink Slot

Flink cluster is composed of JobManager (JM) and TaskManager (TM), and each JM/TM runs in an independent JVM process. A JM is the Master and the management node of a cluster, while a TM is the Worker and the working node of a cluster. Each TM has at least one Slot. Slot is the minimum resource allocation unit for Flink to execute a Job, and a specific Task runs in Slot.

For TM: Taskmanager.number Of Task Slots, taskManager.heap. Size, taskManager.number Of Task Slots, taskManager.heap. Actually taskmanager. NumberOfTaskSlots just specify the TM Slot number, and cannot be used a specified number of CPU isolation for TM. Without considering Slot Sharing (more on that below), a Slot runs a SubTask (a Task implements Runable, and a SubTask is a concrete instance of executing a Task), So officials suggest taskmanager. NumberOfTaskSlots configuration Slot number and CPU is equal or proportional.

You can use scheduling systems such as Yarn to allocate a specified amount of CPU resources to Yarn Containers in Flink On Yarn mode to achieve strict CPU isolation. Yarn uses Cgroup to schedule resources based On time slices. There is one JM/TM instance running inside each Container. Taskmanager.heap. size is used to configure Memory for a TM. If a TM has N Slots, each Slot allocates 1/N Memory size to the entire TM Memory.

For Job: The number of slots required by a Job is greater than or equal to the maximum number of Parallelism configured for the Operator. The number of slots required by the Job is equal to the maximum Parallelism configured for the Operator in the Job, provided that the slotsharinggroups of all operators are the same.

For the relationship between TM/Slot, please refer to the following three graphs taken from the official documentation:

Figure 1: Flink On Yarn Job submission process. In the figure, you can see that each JM/TM instance belongs to a different Yarn Container, and each Container has only one JM or TM instance. After learning Yarn, you can learn that each Container is an independent process. A physical machine can have multiple Containers. Each Container holds a certain amount of CPU and Memory resources and resources are isolated. This ensures that multiple TMS on the same machine are resource isolated (in Standalone mode, there are multiple TMS on the same machine, which cannot isolate CPU resources between TMS).

FIG. 1


Figure 2: Flink Job operation diagram, in which there are two TMS with three slots each, two of which have tasks running, and one is idle. If the two TMS are on different Containers or containers, their resources are isolated from each other. Within a TM, each Slot has 1/3 TM Memory, sharing TM CPU, network (Tcp: ZK, Akka, Netty services, etc.), heartbeat information, Flink structured data sets, etc.

Figure 2


Figure 3: A diagram of the internal structure of Task slots, in which specific tasks run, which are Runable objects executed in threads (each dotted box represents a thread), The Task instance corresponding class in the source code is org. The apache. Flink. Runtime. The taskmanager. Task. Each Task is a set of Operators Chaining work together. The execution of a Flink Job can be seen as a DAG graph. Tasks are Vertex points on the DAG graph. The vertices are linked to each other through data transfer to form the Execution Graph of the entire Job.

FIG. 3


Operator Chain

Operator Chain is the linking of Operators in a Job according to a policy (for example, single Output Operators can be chained together) and placed in a Task thread for execution. Operator Chain default open, through StreamExecutionEnvironment disableOperatorChaining close (), Flink Operator similar to Bolt in the Storm, In Strom, the upstream Bolt goes to the downstream through data transmission on the network. However, Flink’s Operator Chain links multiple operators together for execution, reducing data transmission/thread switching and other links, reducing system overhead and increasing resource utilization and Job performance. The actual development process requires developers to understand these principles and allocate Memory and CPU to each Task thread.

Note: Data transfer between Chained Operators by default requires a copy of the data (e.g. Kryo.copy (…)). Upstream Operator), will be a new object serialization from the output and passed to the downstream Operator, by ExecutionConfig. EnableObjectReuse () open object reuse, thus closing the copy operation, this layer can reduce the object serialization overhead and GC pressure, etc., Specific source can read org. Apache. Flink. Streaming. Runtime. The tasks. The OperatorChain and org. Apache. Flink. Streaming. The runtime. The tasks. OperatorChain. The Cop YingChainingOutput. Developers are advised not to use reuse until they have a full understanding of the reuse internals, as this can cause bugs in the application.

For the Operator Chain effect, see the following official documents:

Figure 4: The upper part of the graph is the StreamGraph perspective, with Task category and no parallelism, as shown in the figure: There are three types of tasks in the Job Runtime: Source->Map, keyBy/window/apply, and Sink. Source->Map is a Task in which Source() and Map()chaining are together. The lower part of the figure shows the actual status of a Job during Runtime. The maximum parallelism of a Job is 2 and there are five subtasks (that is, five threads of execution). If there is no Operator Chain, Source() and Map() belong to different threads, the number of Task threads will increase to 7, the overhead of Thread switching and data transfer will increase, and the processing delay and performance will be worse than before. Note: When the slotSharingGroup name is the default or the same, two slots are required to run the current Job (the same as maximum Job Parallelism).

FIG. 4


Slot Sharing

Slot Sharing is the same Job with the same slotSharingGroup (default: A Slot can be shared between subtasks of different tasks with the name default. This gives one Slot the opportunity to hold the entire Pipeline of the Job. This is why the number of slots required to start a Job with the default slotSharing is equal to the maximum parallelism of the operators in the Job mentioned above. The Slot Sharing mechanism can further improve Job performance. The maximum degree of parallelism that can be set by the Operator is increased when the number of slots remains unchanged, so that tasks that consume resources like Window are distributed on different TM with maximum parallelism. In addition, simple operations such as map and filter do not monopolize Slot resources, reducing the possibility of resource waste.

For details of Slot Sharing effect, please refer to the following official document screenshots:

Figure 5: In the lower left corner of the figure is a soure-map-reduce Job, source and map are 4 parallelism, reduce is 3 parallelism, total 11 subtasks; The maximum Parallelism for this Job is 4, so when the Job is published to the top two TMS on the left, the operation diagram on the right of the figure is obtained, occupying four slots, three of which have complete source-map-Reduce model pipelines, as shown in the figure on the right. Note: The map result will be shuffled to the Reduce end. The arrow on the right shows the data shuffle process of the Job but only the Pipline in the Slot.

Figure 5


Figure 6: Source-map [6 Parallelism], keyBy/ Window /apply[6 Parallelism], sink[1 Parallelism] Starting from left to right, the first slot runs three subtasks [3 Threads] and holds a complete pipeline for the Job. Two subtasks [2 Threads] run in the remaining five slots, and the data is finally transmitted to Sink through the network for data processing.

Figure 6


Operator Chain & Slot Sharing API

Flink controls Job Operator Chain and Slot Sharing by default. For example: Will be the same and continuous SingleOutputStreamOperator parallelism operation chain together (the conditions of the chain is demanding, more than a single output this one, Specific can read org. Apache. Flink. Streaming. API. Graph. StreamingJobGraphGenerator. IsChainable (…). ), all tasks of a Job use the default Slot sharingGroup for Slot Sharing. However, in real requirements scenarios, we may encounter situations where the Operator Chain or Slot Sharing policies of jobs need to be manually intervened. This section focuses on the apis used to change the default Chain and Sharing policies.

  • StreamExecutionEnvironment. DisableOperatorChaining () : Disable the OperatorChain of the entire Job, and each Operator owns a Task. As shown in figure 4, if you disableOperatorChaining the Job, the source->map will be separated into source() and Map () tasks. Job The actual number of tasks increases to 7. This setting can degrade Job performance and can be used for testing or profiling in non-production environments. It is not recommended for actual production.
  • someStream.filter(…) .map(…) .startnewChain ().map() : startNewChain() indicates that a new chain starts from the current Operator[map]. The two maps chaining together, but the filter does not. (Because startNewChain exists, the map is disconnected from the filter for the first time.)
  • someStream.map(…) . DisableChaining () : disableChaining() indicates that the current Operator[map] disables the Operator Chain. That is, the Operator[map] occupies a Task independently.
  • someStream.map(…) . SlotSharingGroup (” name “) : By default, the slotGroup of all Operators is default, which can be customized using slotSharingGroup(). Flink runs Operators with the same slotGroup name in the same Slot. Operators with different slotGroup names run in other slots.

There are three kinds of Operator Chain strategy ALWAYS, NEVER, HEAD, detailed see org. Apache. Flink. Streaming. API. Operators. ChainingStrategy. StartNewChain () corresponds to the policy chainingStrategy.head (the default StreamOperator policy), disableChaining() corresponds to the policy chainingStrategy.never, ALWAYS is Operators chaining together as much as possible; In general, ALWAYS is the most efficient. Many operators overwrite the default policy to ALWAYS, such as filter, map, and flatMap.


Job performance deteriorates after OnYarn migration

JOB description:

StreamETL parallelism is a parallel ETL Job that does not contain operations such as Windows.

Environment Description:

  1. Standalone Job Execution Graph: 10TMs * 10slot-per-tm + UseG1GC – – XX: XX: MaxGCPauseMillis = 100.
  2. Job Execution Graph in initial state: 100TMs * 1slot-per-tm The Task of a Job runs on 100 Containers. TM in each Container holds one Slot and each Container allocates 1C2G resources. GCConf: -xx :+ USeg1gC-xx :MaxGCPauseMillis=100.
  3. Job Execution Graph: 50TMs * 2slot-per-tm The Task of a Job runs on 50 Containers. TM on each Container holds two slots. Each Container allocates 2C4G resources, GCConfig: -xx :+ USeg1gC-xx :MaxGCPauseMillis=100.

Note: OnYarn uses the same GC configuration as the Standalone GC configuration. When the current Job runs in the Standalone or OnYarn environment, the YGC and FGC frequencies are basically the same. The small heap memory of a single Container in OnYarn reduces the time required for a single GC. In the production environment, it is better to compare CMS and G1 to choose a better GC strategy. In the current context, GC has negligible impact on Job performance.

Problem analysis:

The cause of Job performance degradation is not difficult to locate, as can be seen from the thread diagram of this Container (screenshot from VisualVM) :

Figure 7: There are 126 active threads and 78 daemons in a 1C2G Container. First, with 126 active threads running in a 1C2G Container, frequent thread switching often occurs, which makes the scarce CPU even more scarce. Second, the threads that are really relevant for data processing are the 14 threads circled by the red brush (the two Kafka Partition Consumers, Consumers, and Operators are contained within this thread; 12 Kafka Producer threads that sink data into Kafka topics. Most of the threads outside these 14 threads can be shared between the same TM and different slots. For example: Zk-co-curator, Dubbo-client, GC-thread, Flink-akka, Flink-netty, flink-metrics and other threads can share multiple subtasks by increasing the number of slots under TM.

At this point we will naturally arrive at a solution: If the resources used by jobs remain the same, increase the number of cpus, Memory, and slots held by a Container while reducing the number of Containers. For example, change the number of containers from Solution 2 to Solution 3 in the preceding environment description. The actual adjusted Job runs much more stable and consumes at a similar rate as the Standalone Job.

Figure 7

Note: The current problem is encountered during internal migration of streametl-like jobs. The solution is simple but not universal. More careful problem analysis is needed for jobs with window operators. JMX or Prometheus monitoring is configured for all the jobs deployed to the Yarn cluster. The more slots in a Container and the more data collected in each scrape, check whether the jobs run properly in the generation environment. When the test will be the Container configuration for 3 c6g 3 slot, it found that a Java. Lang. OutOfMemoryError: The Direct buffer memory exception is associated with Prometheus Client and can be resolved by adjusting the MaxDirectMemorySize of the JVM.

The anomalies are shown in Figure 8:

Figure 8


conclusion

An Operator Chain is a Task that links multiple operators together. Slot Sharing executes multiple tasks in a Slot for tasks behind the Operator Chain. These two optimizations make full use of computing resources, reduce unnecessary overhead, and improve Job performance. In addition, Operator Chain source code in the streaming package, only in the stream processing task has this mechanism; Slot Sharing seems to be more widely used under the Flink-Runtime package (details remain to be seen).

Finally, a good understanding of what Slot, Operator Chain, and Slot Sharing are, and their roles and relationships to each other is essential to writing good code and running efficiently on clusters.


References:

  • https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling

  • https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups

  • https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#configuring-taskmanager-processing-slots

  • https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#background–internals

  • https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html#task-slots-and-resources

  • https://flink.apache.org/visualizer/


By Wang Chenglong, data engineer, TalkingData

Cover image from the network, if there is infringement, please contact to delete