Author: Shen Lei

Introduction: Today’s main content to share is Flink in the good practice and application. The contents include:

  1. Flink’s containerization transformation and practice
  2. Flink SQL practice and application
  3. Future planning.

I. Flink’s container transformation and practice

1. Excellent cluster evolution history

  • In July 2014, the first Storm mission was launched;

  • In 2016, Spark Streaming was introduced to run on Hadoop Yarn;

  • In 2018, Flink was introduced, and the operation mode is Flink on Yarn Per Job.

  • In June 2020, 100% Flink Jar task K8s was realized, K8s was used as the default Flink Jar computing resource, Flink SQL task On Yarn, Flink unified real-time development;

  • The Storm cluster went offline in November 2020. All storm missions have been moved to Flink;

  • In 2021, we intend to make all Flink missions K8s.

2. Business scenarios internally supported by Flink

Flink supports business scenarios such as risk control, buried real-time tasks, payments, algorithm-based real-time feature processing, real-time kanban for BI, and real-time monitoring. The current real-time mission size is 500+.

3. There are like pain points in Flink on Yarn

There are three main parts:

  • First, the CPU is not isolated. In Flink On Yarn mode, cpus are not isolated. If a real-time task causes excessive CPU usage On a machine, other real-time tasks On the machine are affected.

  • Second, the cost of promoting expansion and shrinkage is high. The Yarn and HDFS services use physical machines. The capacity of physical machines is not flexible during the expansion period, and a certain amount of human and material resources are required.

  • Third, the need to invest in human operation and maintenance. The application resources at the bottom of the enterprise are K8S, and the operation and maintenance of the Yarn cluster is performed independently. As a result, the human operation and maintenance cost of the Yarn cluster increases.

4. Advantages of Flink on K8S over Yarn

It can be summed up in four points:

  • First, unified operation and maintenance. The company has a unified operation and maintenance department.

  • Second, CPU isolation. CPU isolation between K8S pods, real-time tasks do not affect each other, more stable;

  • Third, storage computing separation. Flink separates computing resources from state storage. Computing resources can be mixed with other component resources to improve machine utilization.

  • Fourth, elastic expansion and contraction capacity. During the promotion period, it can expand and shrink the capacity flexibly to save labor and material costs better.

5. Real-time cluster deployment

Generally divided into three layers. The first layer is the storage layer; The second layer is the real-time computing resource layer. The third layer is the real-time computing engine layer.

  • The storage layer is divided into two parts:

    • The first one is the cloud disk, which mainly stores the local status of Flink tasks and the logs of Flink tasks.
    • The second part is real-time calculation of HDFS cluster, which mainly stores the remote status of Flink tasks.
  • The second layer is the resource layer of real-time computing, which is divided into two parts:

    • One is Hadoop Yarn cluster;
    • The other is Flink K8S cluster, further subdivided, there will be Flink K8S and offline HDFS mixed cluster resources, as well as a separate type of Flink K8S cluster resources.
  • The top layer has some real-time Flink Jars, spark Streaming tasks, and Flink SQL tasks.

The reason we consider mixing is that the offline HDFS cluster has low machine usage during the day. The offline HDFS cluster computing resources are allocated to real-time tasks, and the elastic computing resources of other internal components are used offline to improve machine usage and reduce costs.

6. Flink on k8S containerization process

As shown below:

  1. Step 1: Real-time platform Flink Jar task submission, Flink Jar task version management, Docker Flink task image construction, upload image to Docker image warehouse;
  2. The second step is to start the task.
  3. Step 3: Create a YAML file.
  4. Step 4: Command interaction with K8S Api Server;
  5. Step 5: Pull Flink task image from Docker image warehouse to Flink K8S cluster;
  6. Finally, the task runs. Here are a few tips:
    • The Job Standalone mode is Flink.

    • One image for each Flink Jar task, with the task name + time cut as the version of the image;

    • JobManager needs to be created as Deployment rather than Job;

    • Dockerfile specifies HADOOP_USER_NAME, consistent with the online task.

7. Some practice in Flink on K8S

  • The first practice is to solve the problem of under-resourced tasks failing to start.

    First, to describe the problem, Flink on K8S is not cloud native and can not achieve real-time task resources on demand application. If the resources configured on the platform are less than those used by real-time tasks (for example, the concurrency of user code writing is dead, but the concurrency configured by the user is less than the value), real-time tasks may fail to start.

    To solve this problem, we added an internal automatic detection mechanism for the concurrency of Flink Jar tasks. Its main flow is shown below. First of all, the user will submit Flink Jar job on our platform. After submitting the job, the user will put the Jar job and run parameters in the background to build the PackagedProgram. Get the pre-execution plan of the task through PackagedProgram. It is then used to obtain the true concurrency of the task. If the concurrency configured by the user in the code is less than the resources configured on the platform side, we will use the configuration on the platform side to apply for resources, and then launch. Instead, we use its true task concurrency to request resources and start the task.

  • The second practice is the resource analysis tool for the Flink on K8S task.

    First, let’s talk about the background. Flink K8S task resources are self-configured by users. When the concurrency of configuration or the memory is too large, there will be the problem of computing resource waste, which will increase the cost of the underlying machine. How to solve this problem, we made a platform administrator tool. For an administrator, there are two ways to see if a task’s resources are overmatched:

    • The first is the view of task memory. We used an open source tool, GC Viewer, to get the memory usage metrics of this real-time task based on the task’s GC logs.

    • The second is the perspective of message processing power. We added data source input Record /s and task message processing time Metric in the Flink source layer. Find the task or operator with the slowest message processing based on the metric to determine whether the concurrency is properly configured.

    The administrator presets Flink resources according to the memory analysis indicators, the rationality of concurrency, and the optimization rules. Then we will communicate and adjust with the business side. The image on the right is the result of two kinds of analysis, and the image on the top is the result of Flink on K8S POD memory analysis. Below is the analysis result of Flink K8S task processing capability. Finally, according to these indicators, we can carry out a resource readjustment to reduce the waste of resources. At present, we plan to make it an automated analysis and adjustment tool.

  • Next are other related practices of Flink on K8s.
    • First, based on Ingress Flink Web UI and Rest API use. Each task has an Ingress domain name, which is always used to access the Flink Web UI as well as the Resti API;

    • Second, mount multiple HostPath volumes to resolve I/O restrictions on a single cloud disk. The write bandwidth and I/O capability of a single cloud disk are limited. Multiple cloud disks are used to reduce the Checkpoint status and local write pressure of cloud disks.

    • Third, Flink related general configuration ConfigMap, Flink image upload successful detection. Create configMap for general configuration of Filebeat and Flink jobs and mount it to real-time tasks to ensure that each Flink task image is successfully uploaded to the image repository.

    • Fourth, HDFS disk SSD and log collection based on Filebeat. SSD disk is mainly in order to reduce the disk IO Wait time, adjust the DFS. Block. Invalidate. Limit, reduce the HDFS Pending delete block number. Task logs are collected using Filebeat and output to Kafka. Task logs are then viewed using custom LogServer and offline public LogServer.

8. Flink on K8s current pain points

  • First, the JobManager HA problem. If the JobManager Pod hangs, with the help of the K8S Deployment capability, JobManager will restart according to the YAML file and the status may be lost. If the YAML configuration Savepoint is restored, however, the messages may be heavily duplicated. We hope to support Jobmanager HA with ZK or ETCD in the future.

  • Second, modify the code, upload again a long time. Once the code changes the logic, the Flink Jar task upload time and mirror time may be at the level of minutes, which may affect services with high real-time requirements. We hope that in the future we can refer to the implementation of the community and pull the task Jar from HDFS to run.

  • Third, the K8S Node is Down, and the JobManager recovers slowly. Once the K8S Node goes down, Jobmanager Pod takes about 8 minutes to recover. It takes about 8 minutes for K8S internal exceptions to be discovered and jobs to be started. Some services, such as real-time CPS tasks, are affected. To solve the problem, the platform periodically checks the K8s node status. Once it detects that the K8s node is in down state, stop the tasks that JobManager belongs to on the node and restore them from their previous checkpoint.

  • Fourth, Flink on K8S is not cloud native. Currently, Flink Jar task concurrency automatic detection tool is used to solve the problem of insufficient resource allocation and failure to start. However, if the pre-execution plan of the task cannot be obtained, the concurrency of the code configuration cannot be obtained. Our thinking is: Flink on K8S cloud native features and the previous issues 1, 2, if the community support is relatively fast, we may consider the Flink version and the community version alignment.

9. Some recommendations of Flink on K8s

  • The first option is for the platform to build and manage a mirror of the task itself.
    • Advantages: The platform side has self-control over the overall process of building the image and running real-time tasks, and specific problems can be corrected in time.
    • Disadvantages: Docker and K8S-related technologies need to have a certain understanding, the threshold is relatively high to use, and non-cloud native issues need to be considered. It is available in Flink 1.6 and above.
  • The second scheme is Flink K8S Operator.
    • Advantages are: the user as a whole encapsulated a lot of low-level details, the use of a relatively lower threshold.
    • Disadvantages: the overall use is not as flexible as the first scheme, once there is a problem, because the bottom layer is using its encapsulated functions, the bottom layer is not easy to modify. It is available in Flink 1.7 and above.
  • The final solution is based on the community Flink K8s feature.
    • Advantages: cloud native, more friendly for resource application. At the same time, users will be more convenient to use, shielding many low-level implementation.

    • The downside: K8s cloud native features are experimental and still in development, such as K8s Per Job mode. It is available in version Flink above 1.10.

Second, Flink SQL practice and application

1. Praised the development history of Flink SQL

  • In September 2019, we explored and experimented with Flink 1.9 and 1.10 SQL capabilities and enhanced some Flink SQL capabilities.

  • In October 2019, we conducted SQL function verification, and verified Flink SQL Hbase dimension table association function based on buried real-time requirements, and the results were in line with expectations.

  • In February 2020, we extended the function of SQL. Flink 1.10 was used as SQL computing engine to develop and optimize the function of Flink SQL. The real-time platform supports full SQL development.

  • In April 2020, it began to support real-time data warehouse, yizan education, beauty industry, retail and other related real-time demand.

  • In August 2020, the new version of the real-time platform was officially launched. Currently, the main focus of Flink SQL development is our real-time task.

2. Some practice in Flink SQL

It is mainly divided into three aspects:

  • First, the practice of Flink Connector includes: Flink SQL supports Flink NSQ Connector, Flink SQL supports Flink HA Hbase Sink and dimension table, Flink SQL supports non-dense Mysql Connector, Flink SQL Support for standard output (community support already), Flink SQL support for Clickhouse Sink;

  • Second, platform level practices include: Flink SQL supports UDF and UDF management, task recovery from Checkpoint, idempotent functions, JSON-related functions, and Flink parameter configuration, such as status time setting. Aggregation optimization parameters, Flink real-time task blood data automatic collection, Flink syntax correctness detection function;

  • Third, Flink Runtime practices include: Flink source code to add a single Task and Operator record processing time indicators; Fixed Flink SQL retractable stream TOP N BUG.

Business practice

  • The first practice is our internal real-time kanban for customer service robots. The process is divided into three layers:

    • The first layer is the real-time data source. The first layer is the online MySQL business table. We will synchronize its Binlog to the corresponding Kafka Topic through DTS service.
    • The ODS layer for real-time tasks has three Kafka topics;
    • In the real-time DWD layer, there are two Flink SQL tasks.
      • Flink SQL A consumes two topics and then associates the data in these two topics with the corresponding data according to the functions of some Windows through Interval Join. At the same time, a state retention time is set for this real-time task. After Join, some ETL processing will be carried out, and finally its data will be input into a topic C.
      • Flink SQL B, another real-time task, consumes a topic, cleans the data in the topic, and then associates a dimension table in HBase to remove some additional data required by it. The associated data will eventually be input to Topic D.

    Upstream, Druid will consume the data from these two topics, perform some metrics queries, and ultimately provide them to the business side for use.

  • The second practice is the real-time user behavior middle tier. Users will search, browse, add to shopping cart and so on on our platform, which will generate corresponding events. The original solution was based on offline. We would store the data into Hive tables, and then the algorithm folks would combine user characteristics, machine learning models, and offline data to generate some user rating estimates that they would input into HBase.

    In this context, there are the following appeals: current user scoring is mainly based on offline tasks, while algorithm students hope to improve the accuracy of recommendations in a more timely and accurate manner by combining real-time user characteristics. This requires building a real-time user behavior intermediate layer, inputting user-generated events into Kafka, processing these data through Flink SQL jobs, and exporting the corresponding results to HBase. The algorithm students will update some parameters in the model in real time in combination with the algorithm model, and finally perform user score estimation in real time. It will also be stored in HBase and then used online.

    The construction process of the user behavior middle layer is divided into three steps:

    • At the first level, our data source is in Kafka.

    • The second layer is the ODS layer, in the Flink SQL job there will be some flow table definition, some ETL logic processing. Then define the associated sink table, dimension table, and so on. There will also be some aggregation operations, and then input into Kafka;

    • In the DWS layer, there are also Flink SQL jobs for users, which will involve users’ own UDF Jar, multi-stream Join, and UDF use. Then read some data from the ODS layer and store it in HBase for the algorithm team to use.

    Here are a few practical lessons:

    • First, Kafka Topic, Flink task name, Flink SQL Table name, according to the data store naming convention.

    • Second, index aggregation calculation, Flink SQL task to set the retention time of idle state, to prevent the infinite increase of task state.

    • Third, if data skew or read state pressure is high, you need to configure Flink SQL optimization parameters.

4. Practice in HAHBase Connector

Community HBase Connector Data association or writing is used by a single HBase cluster. If the HBase cluster is unavailable, the writing or writing of real-time task data is affected, which may affect service usage. As to how to solve this problem. First, there are two clusters in HBase, the primary cluster and the secondary cluster. Master slave replication is performed between them through WAL. Flink SQL jobs are written to the primary cluster first. When the primary cluster becomes unavailable, Flink SQL jobs are automatically demoted to the secondary cluster. Online services are not affected.

5. No secret Mysql Connector and index extension practice

The Flink mysql-sink syntax solves three problems:

  • First, Mysql database user names and passwords are not exposed and stored in plain text.

  • Mysql user name and password can be updated periodically.

  • Third, internal automatically authenticate table permissions based on user names. The main goal is to make the real-time task database safer to use.

Then, in the bottom left figure, we add the Task and Operator per-message processing Metric at the Flink source level. The purpose is to help the business side to troubleshoot and optimize Flink real-time tasks according to the monitoring indicators of message processing time.

6. The practice of automatic collection of blood metadata of Flink task

The process of blood metadata collection of Flink task is shown in the figure below. After the real-time task is started, the platform takes two different paths according to whether the current task is a Flink Jar task or a Flink SQL task to obtain the blood data of the task, and then reports the blood data to the metadata system. The value of this is twofold:

  • First, help the business side to understand the real-time task processing link. The service side can more clearly understand the relationship and impact between real-time tasks, and can timely notify other downstream service sides when performing tasks.

  • Second, better build real-time data warehouse. Based on the real-time task blood map, the common layer of real-time data is extracted to improve the reusability and better build the real-time data warehouse.

Iii. Future planning

Finally, the plan for the future includes four points:

  • First, promote Flink real-time task SQL. Promote Flink SQL development real-time tasks and increase the proportion of Flink SQL tasks.

  • Second, Flink task computing resources are automatically optimized. From memory, task processing capacity, input rate, etc., the task resources are analyzed, and the unreasonable task allocation is automatically configured, so as to reduce the machine cost.

  • Third, Flink SQL task K8S and K8S cloud native. The underlying computing resources of Flink are unified as K8S to reduce operation and maintenance costs. Flink K8S cloud is native to make more reasonable use of K8S resources.

  • Fourth, Flink and data lake and CDC functional technology research. The research reserve of new technology lays the technical foundation for other real-time requirements in the future.