Abstract: This article is adapted from zhang Ying, head of real-time computing at Flink Forward Asia 2021. Flink has played an important role in the evolution of real-time computing of job gang, especially greatly improving the development efficiency of real-time tasks with the help of FlinkSQL. This article mainly shares the usage and practical experience of FlinkSQL in homework help, as well as the problems and solutions encountered in the process of building a real-time computing platform from 0 to 1 with the growth of task scale. The contents include:

  1. The development course
  2. Flink SQL application practice
  3. Platform construction
  4. Conclusion outlook

FFA 2021 Live Playback & Presentation PDF download

I. Development history

Homework Help mainly uses artificial intelligence, big data and other technologies to provide students with more efficient learning solutions. Therefore, business data, mainly students to the class situation, knowledge grasp the situation. In the overall architecture, whether binlog or ordinary log is collected and written into Kafka, and written into the storage layer by real-time and offline computing respectively. Based on OLAP, corresponding product services are provided externally, such as workbench and BI analysis tools.

At present, the real-time calculation of homework help is mainly based on Flink, and the development process is about three stages:

  1. 19. Real-time computing includes a small amount of SparkStreaming work, provided to tutors and lecturers. In the process of solving real-time requirements, you will find that the development efficiency is very low and the data is almost impossible to reuse;

  2. After that, the general practice is to gradually apply Flink JAR in production practice, and then build a platform and apply Flink SQL after accumulating experience. But in 20 years, the business had a lot of real time computing requirements, and we didn’t have enough development manpower. At that time, Flink SQL 1.9 was released shortly, and SQL functions changed greatly, so our approach was to directly apply Flink SQL in the direction of real-time data warehouse. At present, more than 90% of the tasks of the real-time data warehouse are realized by using Flink SQL.

  3. By November, the number of Flink operations quickly grew to several hundred, and we started building a real-time computing platform from zero to one that supported all of the company’s important lines of business, with computing deployed on multiple clusters in multiple clouds.

The following two aspects will be introduced:

  1. Typical problems encountered in FlinkSQL practice and solutions;
  2. Some thoughts on the construction of real-time computing platform.

Second, Flink SQL application practice

Here is the complete data flow architecture based on Flink SQL:

Once written to Kafka, the topic is automatically registered as a table of metadata, which is the starting point for all subsequent live SQL jobs. Users can use this table in SQL jobs without having to define complex DDLS.

At the same time, considering practical applications, it is also necessary to add or replace table attributes on the basis of metadata tables:

  1. ** New: ** Metadata records table-level attributes, but task-level attributes may be required in SQL jobs. For example, for Kafka source table, add job group.id to record offset;
  2. ** Replacement: ** Offline testing can quickly build an offline test table by defining properties such as Broker Topic to override the source table on the basis of referring to the metadata table.

The framework also needs to support users’ SQL jobs to easily output metrics and logs to achieve full link monitoring and Trace.

This paper mainly introduces DAG optimization practice when SQL adds Trace function, as well as the selection and encapsulation of physical storage at the bottom of Table.

2.1 Added the Trace function to SQL

SQL can improve developer efficiency, but the complexity of business logic remains, and complex business logic can write long DML. In such cases, views are recommended to improve readability. Because the SQL for views is shorter, much like the code specification that individual functions should not be too long.

On the left side of the figure below is a partial DAG of a sample task, where you can see that there are many SQL nodes. In this case, case locating is difficult because logging can be added if it is code implemented by the DataStream API. SQL does not, however, and the user can only see the input and output of the entire job.

Similar to printing logs in functions, we would like to be able to support adding Trace to views to facilitate case tracing.

But there are some problems when trying to add Trace to SQL. Here’s a simplified example:

The SQL in the upper right creates the source_table as the source table. The prepare_data view reads the table. Foo udF is called in the SQL, and StatementSet is used to insert into the two downstream tables. Convert the view to DataStream to call TraceSDK to write to the Trace system.

Note: We were based on 1.9 at the time, and we used some features that were added later for clarity

Issues.apache.org/jira/browse… Issues.apache.org/jira/browse…

The actual DAG at the bottom of the chart doesn’t quite match expectations:

  1. The DAG is split into two unrelated parts. The Kafka source table (DataSource) is read twice.
  2. The foo method is called three times.

Data source stress and computational performance need to be optimized.

To solve this problem, we need to optimize from several angles. Here we introduce the idea of DAG merging. The env of both table and stream will generate the corresponding transformation. What we do is we merge it into the Stream env so that we can get a complete list of transformations in the Stream env and then generate the StreamGraph commit.

The bottom left is our optimized DAG, which reads the source table and calls foo only once:

The optimized DAG looks very similar to the logical graph we wrote in SQL, and the performance is as expected.

Returning to the problem itself, a business can simply add a trace to some fields of the view in a statement such as prepare_data.trace.fields=f0,f1. Because SQL naturally contains field names, trace data is more readable than normal logs.

2.2 Selection and design of Table

As mentioned above, our primary requirement is to improve human efficiency. Therefore, Table is required to have good layering and reuse capabilities and support template-based development, so that N end-to-end Flink operations can be connected quickly.

Our solution, which is based on Redis implementation, has several benefits first:

  1. High QPS, low latency: This should be the focus of all real-time computing;

  2. TTL: users do not need to care about how the data exit, given a reasonable TTL is ok;

  3. Through the use of Protobuf and other high-performance and compact serialization methods, as well as the use of TTL, the overall storage is less than 200G, Redis memory pressure is acceptable;

  4. Fit computing model: The computation itself performs keyBy shuffling the data that needs to be processed simultaneously into the same concurrency to ensure timing, so it does not depend on storage to worry too much about lock optimization.

The following scenario addresses the problem of multiple indexes and triggering messages.

The figure above shows an example of a student’s attendance table for a particular chapter:

  1. ** Multiple indexes: ** Data is first stored in string format, such as key=(uid, Lesson_id), value=serialize(is_attend,…) SQL > JOIN ON uid AND lesson_id What if JOIN ON another field, such as Lesson_ID? Lesson_id = key (uid, Lesson_id); Lesson_id = 123; if lesson_id = 123; if lesson_id = 123;
  2. ** Trigger message: ** After writing to redis, an update message is written to Kafka. Consistency, sequentiality, and data loss between two stores are all guaranteed by the Redis Connector implementation.

These functions are encapsulated in the Redis Connector, and the business can simply define such a Table through DDL.

Some of the more important attributes in DDL are:

  1. Primary defines the primary key, which corresponds to the string data structure, as in the example uid + Lesson_ID;
  2. Index. fields defines the index field to aid lookup, such as Lesson_id in our example; Indexes can also be defined in multiple ways;
  3. Poster. kafka defines the Kafka table that receives trigger messages. This table is also defined in metadata and can be read directly by the user in subsequent SQL jobs without definition.

Therefore, the reuse of the whole development mode is very strong, and users can easily develop the end-to-end N SQL jobs, and do not have to worry about how to trace the case.

Iii. Platform construction

After the above data flow architecture was built, the number of real-time jobs quickly increased to several hundred in 2020.11, much faster than in nineteen nineteen. At this point we started to build the real-time computing platform from zero to one. Next we share some thoughts during the building process.

There are three starting points for the functions supported by the platform:

  1. ** Unification: ** unification of different cluster environments, Flink versions, submission methods, etc. of different cloud vendors; Previously, Hadoop clients were scattered on users’ submission machines, causing potential risks to cluster data and task security, and increasing subsequent cluster upgrade and migration costs. We hope to unify the submission entry and submission method of tasks through the platform.
  2. ** Easy to use: ** More easy-to-use functions can be provided through platform interaction, such as debugging and semantic detection, which can improve the human efficiency of task testing, and record the version history of tasks to support convenient online and rollback operations;
  3. ** Specifications: ** permission control, process approval, similar to the online process of online services, through the platform, can standardize the real-time task research and development process.

3.1 Specification – Real-time task flow management

FlinkSQL makes development very simple and efficient, but the simpler it is, the more difficult it is to specification, because it can take two hours to write a piece of SQL and half a day to walk through the specification.

But the specification still needs to be implemented, and some problems are similar to those encountered in online services and real-time computing:

  1. ** Task has been running online for a year. The initial demand may be word of mouth. It is better to remember a wiki or email, but it is easy to forget in the task handover.
  2. ** Non-standard: both the **UDF and DataStream codes do not comply with the specifications, resulting in poor readability. As a result, students who take over later cannot upgrade or dare not to change, and thus cannot maintain them for a long time. There should also be specifications for how SQL including real-time tasks should be written;
  3. ** Can not find: ** online running tasks, depending on a jar, corresponding to which Git module which commitId, how to find the corresponding code implementation in the first time if there is a problem;
  4. ** Blind modification: ** has been a normal task, the weekend suddenly alarm, the reason is to modify the online task SQL.

The specification is mainly divided into three parts:

  1. **RD can quickly create a UDF module from UDF Archetype project, this is a reference to Flink QuickStart. The resulting UDF module compiles normally and includes UDF examples like WordCount, as well as helper methods like the default ReadMe and VersionHelper. After modification according to service requirements, upload it to Git through CR.
  2. ** Requirement management and compilation: ** The submitted code will be associated with the requirement card, and will go through cluster compilation and QA testing before the order can be issued online;
  3. ** Online: ** Select which jobs to update/create according to the module and compilation output, and redeploy after approval by the job owner or leader.

The whole research and development process cannot be modified offline, such as replacing jar packages or taking effect on which tasks. A real-time task, even if it runs for several years, can also find out who is on the line, who approved, the test record at that time, the corresponding Git code, and the requirements of the real-time indicators put forward at the very beginning from the current task, so that the task can be maintained for a long time.

3.2 Ease of use – Monitoring

All of our Flink jobs currently run on Yarn. After the job starts, Prometheus is expected to grab the Container assigned by Yarn and connect to the alarm system, which allows users to configure alarms such as Kafka delay and Checkpoint failure. There are two main problems in building this path:

  1. How does Prometheus dynamically perceive HTTPServer after PrometheusReporter starts it; You also need to be able to control the metric size to avoid collecting a lot of useless data.
  2. Our SQL source table is based on Kafka. It is more convenient to configure Kafka delay alarms on a computing platform than third-party tools. Because it can get the topic and group ID read by the task naturally, it can also use the same alarm group with the task failure. With the alarm template, alarm configuration is very simple.

Solutions:

  1. Discovery has been added on the basis of the official PrometheusReporter. After the HTTPServer of The Container is started, register the corresponding IP address :port with the ZK node as a temporary node, and use Prometheus discover Targets to listen for changes on the ZK node. Because it is a temporary node, the Container destruction season disappears, and Prometheus can perceive that it is not fetching again. This made it easy for Prometheus to set up a pathway for grasping.
  2. KafkaConsumer. Records -lag is a practical and important delay indicator, which mainly does two things. Modify KafkaConnector to expose after KafkaConsumer. Poll to make sure the Record-lag indicator is visible. In the process of doing this, also found different Kafka version of this index format (cwiki.apache.org/confluence/… Flink metrics. In this way, the metrics exposed by different versions are consistent.

Fourth, summarize the outlook

The last stage is mainly in the application of Flink SQL to support the rapid development of real-time jobs, as well as the construction of a real-time computing platform, supporting thousands of Flink jobs.

One of the big takeaways is that SQL does simplify development, but it also hides more technical details. The requirements for real-time operations tools such as Trace or task specifications have not changed, and are even more stringent. Because while shielding the details, once something goes wrong, the less users know how to deal with it. It’s like the tip of an iceberg. The less that leaks out, the more that sinks, the more you need to build the surrounding system.

Another is to adapt to the current situation, first to meet the current needs as soon as possible, for example, we are to improve human efficiency, reduce the threshold of development. At the same time, we need to explore more business scenarios, such as replacing Redis Connector with HBase and RPC services. Now, the benefit is to modify the underlying storage, and users have little perception of SQL jobs, because SQL jobs are basically business logic, while DDL defines metadata.

The next step planning is mainly divided into three parts:

  1. Support flexible resource scaling to balance the cost and timeliness of real-time operations;
  2. We started to apply Flink SQL on a large scale since 1.9. Now the version upgrade has changed greatly, so we need to consider how to make the business use the feature in the new version in a low cost upgrade.
  3. Explore the implementation of stream batch integration in actual business scenarios.

FFA 2021 Live Playback & Presentation PDF download