I. Background and pain points

Before the first half of 2017, TalkingData’s App Analytics and Game Analytics products, streaming framework used the self-developed TD-ETL-framework. The framework reduces the complexity of developing streaming tasks, requiring only one Changer chain for different tasks, and supports horizontal scaling with acceptable performance, which used to meet business requirements.

However, by the end of 2016 and the first half of 2017, the framework was found to have the following important limitations:

  1. Performance problems: App Analytics- ETL – Adaptor and Game Analytics- ETL – Adaptor modules had serious performance problems (full-GC) during the holidays, resulting in the delay of indicator calculation.
  2. The framework’s fault tolerance is weak: it relies on offset stored on Kafka or ZK to achieve at-least-once at best, and relies on other services and stores to achieve exactly once, with exceptions resulting in lost restarts.
  3. The presentation ability of the framework is insufficient: the DAG diagram cannot be expressed completely. For complex streaming problems, several services that depend on the framework need to be combined together to solve the problem.

These two products of TalkingData mainly provide data analysis services for various mobile apps and games. With the continuous expansion of business in recent years, we need to choose a streaming engine with stronger performance and more perfect functions to upgrade our streaming services gradually. The research started at the end of 2016 and mainly selected from Flink, Heron and Spark Streaming.

In the end, we chose Flink based on the following considerations:

  1. Flink has perfect fault tolerance mechanism and supports Exactly-once.
  2. Flink has integrated rich streaming operator, and it is convenient to customize operator, and can directly call API to complete stream split and join, and can completely express DAG graph.
  3. Flink implements its own memory management and does not rely entirely on the JVM, thereby avoiding some of the full-GC problems of some of the services of the current ETL-Framework.
  4. The Window mechanism of Flink can solve the distribution problem of some index in GA, such as the distribution of game times in a single day.
  5. Flink’s idea was the most advanced of the streaming frameworks at the time: batch as a special case for streaming, and ultimately batch unification.

2. Evolution route

1. Standalone cluster (1.1.3->1.1.5->1.3.2)

We initially deployed in a standalone cluster mode. Since the first half of 2017, we have gradually migrated the ETL-job of some small traffic in Game Analytics to Flink, and by April, we have completely migrated the ETL-job of the product receiving various SDK versions to Flink and integrated it into one job. The following data stream and stream graph are formed:

Figure 1. Data flow diagram after migration of Game Analytics- ETL-Adaptor to Flink

Figure 2. The Stream Graph for Game Analytics- ETL

In the data flow diagram above, Flink-job invokes etl-service through Dubbo to abstract the logic of accessing external storage into etl-service. Flink-job does not need to consider the complicated access logic and self-built Cache. This not only completes the sharing of services, but also reduces the GC pressure on the job itself.

In addition, we built our own monitor service because Flink version 1.1.3 provided a low monitoring metric, and because its Kafka-connector uses Kafka08’s low level API, Kafka’s consumption offset is not on the submitted ZK, so we need to build a monitor to monitor Flink’s job activity, instantaneous speed, consumption siltation and other metrics, and tap into the company OWL to monitor the alarms.

At this point, Flink’s standalone Cluster has taken on all the traffic from Game Analytics, processing about 1 billion messages a day anda total throughput of 12 TERabytes a day. By summer vacation, the daily log volume rose to 1.8 billion logs per day, throughput reached about 20 TB per day, and TPS peaked at 30,000.

During this process, we encounter problems such as unbalanced job consumption in Flink and unbalanced job deploy in the standalone cluster, which leads to the accumulation of online consumption, and the job cannot be restarted after the cluster restarts automatically for no reason. (We’ll detail the typical manifestations of these problems and their contemporary solutions in Chapter 3.)

After a summer vacation, we decided that Flink was tested, so we started migrating App Analytics’ ETL-jobs to Flink. The following data flow diagram is formed:

Figure 3. Data flow diagram of App Analytics- ETL-Adaptor’s standard SDK processing work after migration to Flink

Figure 4. Stream graph of App Analytics- Etl-Flink job

A large number of users started migrating to the unified JSON SDK in March 2017, and the peak Kafka Topic traffic for the new SDK increased from 8 K/s in mid-year to 3 W/s by the end of the year. At this time, the Flink standalone cluster has deployed four jobs from two products, with an average daily throughput of 35 TB.

There were two very serious problems:

  • Jobs in the same standalone cluster grab resources from each other, and the standalone cluster mode can only isolate resources from memory on the heap of the Task Manager using Task slots. At the same time, the way Flink deployjobs in the standalone cluster mentioned above will cause unbalanced resource allocation, which will cause the problem of silting in the Game Analytics line when the App Analytics line has heavy traffic.
  • The parallelism of our source operator is equal to the number of Kafka topic partitions consumed, while the parallelism of the intermediate ETL operator is often much greater than the number of Kafka partitions consumed. Therefore, the final Job graph cannot be completely linked into an operator chain. Data transmission between operators must be applied and released by Flink’s network buffer. However, when the 1.1.x version of network buffer has a large amount of data, it is easy to cause deadlock when applying and releasing it. As a result, Flink obviously has a lot of messages to process, but most threads are in the state of waiting, resulting in a large amount of service delay.

These issues forced us to separate the jobs of the two products into two standalone clusters and do a large upgrade to Flink from 1.1.3 (intermediate to 1.1.5) to 1.3.2. The final upgrade to 1.3.2 was completed in Q1 18. Version 1.3.2 introduced incremental checkpoint commit and was a huge improvement over version 1.1.x in terms of performance and stability. After the upgrade, Flink cluster is basically stable. Although there are problems such as uneven consumption, it can be basically solved by expanding machines when the volume of business increases.

2. Flink on yarn (1.7.1)

The standalone cluster does not do a good job of resource isolation, deploy job imbalance, and Flink on YARN is very mature in the community. Therefore, we began to plan to migrate Flink’s standalone Cluster to Flink on YARN in Q4, 2018, and Flink has improved batch a lot in the latest version. We also plan to gradually replace the current batch engine with Flink.

Figure 5. Flink on YARN Cluster planning

As shown in Figure 5, Flink on YARN Cluster in the future can complete streaming and batch computing. Users of the cluster can complete stream/ Batch job construction, optimization and submission through a build service. After job submission, Users are distributed to different YARN queues based on the service team they belong to and the service volume of customers. In addition, the cluster needs a comprehensive monitoring system to collect user submission records, traffic and load of each queue, and running indicators of each job, and access the OWL of the company.

We migrated some stream jobs from App Analytics to Flink on YARN 1.7 starting from Q1 19. In addition, I completed the flow task migration of App Analytics to deal with unified JSON SDK before Q2, 19. The current Flink on YARN cluster has a peak processing volume of 30 W/s, and an average daily log throughput of about 5 billion, or about 60 TB. After Flink migrates to ON YARN, the upgrade performance is improved and resource isolation between jobs is better than that of the standalone Cluster. After migration, we used the monitoring scheme of Prometheus+Grafana, which made monitoring more convenient and intuitive.

In the future, we will migrate Flink jobs of Game Analytics and jobs exported from logs to the on YARN cluster, which is expected to save one-fourth of machine resources.

Iii. Description and solution of key problems

In the process of Flink practice, we encountered a lot of pits along the way, we pick out a few of the key pits to briefly explain.

1. Release resources properly when static variables and job Cancels are used less

When we implement the Operator function of Flink, we can inherit AbstractRichFunction in general, which provides the lifecycle method open()/close(), So initializing and freeing resources that operator depends on should be done by overriding these methods. When we initialize some resources such as Spring Context, dubbo config, we should hold these resources as singletons as possible and (in a TaskManager) only initialize them once. Similarly, We should only release once in the close method (in a TaskManager).

The static variable should be used with caution. Otherwise, the job may cancel and the corresponding resources are not released, resulting in job restart problems. Aversion to initialize static variables can use org. Apache. Flink. Configuration. The configuration (1.3) or Org. Apache. Flink. API. Java. Utils. ParameterTool (1.7) to save our resource configuration, and then through the ExecutionEnvironment deposit submitted (Job) and get these configuration (Job runtime).

Sample code:

Flink 1.3 Setup and Registration Configuration:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration parameters = new Configuration();
parameters.setString("zkConnects", zkConnects);
parameters.setBoolean("debug", debug);
env.getConfig().setGlobalJobParameters(parameters);Copy the code

Gets the configuration (in the open method of operator).

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    Configuration globConf = (Configuration) globalParams;
    debug = globConf.getBoolean("debug".false);
    String zks = globConf.getString("zkConnects".""); / /..do more ..
}Copy the code

Flink 1.7 Setup and Registration configuration:

ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);Copy the code

Get the configuration:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    ParameterTool parameters = (ParameterTool)
        getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    parameters.getRequired("input");
    // .. do more ..Copy the code

2. NetworkBuffer and operator chain

As mentioned above, when the upstream and downstream tasks of Flink’s job are distributed on different TaskManager nodes, the upstream and downstream operators are not chained together. In addition, the corresponding subtasks are distributed on different TaskManager nodes), the network buffer needs to be applied and released during the data transfer of the operator, and the data is transferred through the network I/O.

The process is summarized as follows: The result generated by the upstream operator will be serialized through the RecordWriter, and the BufferPool Buffer will be applied and the serialized result will be written to the Buffer. The Buffer is then added to the ResultSubPartition of the ResultPartition. The Buffer in the ResultSubPartition will be transmitted via Netty to the InputChannel of the InputGate operator of the next level. Similarly, Before entering the InputChannel, the Buffer also needs to apply to the BufferPool of the TaskManager where the next level operator resides. RecordReader reads the Buffer and deserializes the data in it. BufferPool is limited. If the BufferPool is empty, the thread in which the RecordWriter/RecordReader resides will wait for a period of time during the Buffer application process. For details, see [1], [2].

Brief screenshots are as follows:

Figure 6. Flink’s network stack, where RP is ResultPartition, RS is ResultSubPartition, IG is InputGate, IC is inputChannel

When using Flink 1.1.x and 1.3.x versions, if the number of network buffers is not configured sufficiently and the data throughput increases, the following phenomena will occur:

Figure 7. Upstream operator blocks in the requestBuffer() method that gets the network buffer

Figure 8. The downstream operator is blocked waiting for new data input

Figure 9. The downstream operator is blocked waiting for new data input

Our worker threads (the RecordWriter and RecordReader threads) spend most of their time applying for buffers from the BufferPool. At this time, the CPU usage wobble dramatically, slowing down Job consumption. In version 1.1.x, the job will be blocked for a long time, triggering the back pressure of the entire job and causing serious service delay.

At that time, we need through the upstream and downstream operator parallelism to calculate ResultPartition and InputGate need in the number of buffer, sufficient to configure the taskmanager.net work. NumberOfBuffers.

Figure 10. The impact of different network buffers on CPU utilization

When an adequate number of network buffers is configured, CPU jitter decreases and Job consumption speed increases.

After Flink 1.5, the credit-based flow control mechanism was introduced into its Network Stack [2]. This mechanism largely avoids blocking when applying for a Buffer from a BufferPool. In our preliminary test, the network stack of 1.7 did perform better than that of 1.3.

However, this is not the optimal situation after all, because if the network buffer is used to complete the data transmission of upstream and downstream operators, serialization/deserialization process cannot be avoided, and the information transmission of credit has certain delay and cost. This process can be avoided by linking the upstream and downstream operators into an operator chain.

Therefore, when constructing the execution diagram of our flow task, we should chain as many operators as possible, If Kafka resources allow, you can expand the partition of Kafka so that the source operator and its successors are linked together. However, you cannot expand the partition of Kafka topic. Make a good choice according to the volume of business and machine resources. For more detailed tuning of operator training and Task Slot, see: [4].

3. Recommendations for the choice of serializer in Flink

As we learned in the previous section, the data transfer of Flink’s tasks on different nodes must be serialized/deserialized, so serialization/deserialization is also an important factor affecting Flink performance. Flink has its own type system, that is, Flink has its own type description class (TypeInformation). Flink hopes to master as much information as possible about the data types of operators in and out, and use TypeInformation to describe them. There are two main reasons for this:

  • The more type information is known, the better serialization method Flink can choose and the more efficient Flink memory usage.
  • TypeInformation internally encapsulates its own serializer, available through createSerializer(), which frees users from worrying about using the serialization framework (for example, how to register their custom types into the serialization framework, although user customization and registration can improve performance).

In general, Flink recommends that the data we pass between operators be of POJOs type. For POJOs types, Flink uses Its own PojoSerializer for serialization by default. For data types that Flink cannot describe or infer on its own, Flink recognizes them as GenericType and serializes them using Kryo. Flink more efficient when dealing with POJOs, moreover POJOs type makes stream of grouping/joining/aggregating such as simple operation, because you can use such as: DataSet. KeyBy (“username”) directly manipulates data fields in the data stream.

In addition, we can do further optimization:

  • Display the invocation of the RETURNS method to trigger Flink’s Type Hint:
dataStream.flatMap(new MyOperator()).returns(MyClass.class)Copy the code

Method returns will eventually call TypeExtractor. CreateTypeInfo (typeClass), to build our TypeInformation custom types. The createTypeInfo method builds TypeInformation if our type meets POJOs’s rules or any of the other basic types in Flink, As much as possible, we will “translate” our types into types that Flink knows well, such as POJOs or other primitive types, so that Flink can use more efficient serialization methods on its own.

//org.apache.flink.api.java.typeutils.PojoTypeInfo
@Override
@PublicEvolving
@SuppressWarnings("unchecked")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
   if (config.isForceKryoEnabled()) {
      return new KryoSerializer<>(getTypeClass(), config);
   }

   if (config.isForceAvroEnabled()) {
      return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
   }

   return createPojoSerializer(config);
}Copy the code

For types that Flink cannot “translate”, GenericTypeInfo is returned and Kryo is used for serialization:

//org.apache.flink.api.java.typeutils.TypeExtractor

@SuppressWarnings({ "unchecked"."rawtypes" })
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
      ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { checkNotNull(clazz); // Try to convert clazz to PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo, etc. //... // If the above attempts do not succeed, thenreturn a generic type
   return new GenericTypeInfo<OUT>(clazz);
}Copy the code
  • Registered subtypes: through StreamExecutionEnvironment or ExecutionEnvironment instance registerType method (clazz) registered our data class and its subclasses, and the type of the field. The more Flink knows about the type, the better the performance;
  • For further optimization, Flink also allows users to register their own custom serializers and manually create their own type of TypeInformation, which can be found on the Flink website: [3];

In our practice, we initially passed data between operators as A JsonNode for extensibility, but we found that the performance was not as good as expected, so we changed the JsonNode type to conform to the POJOs specification. Over 30% performance improvement was achieved directly on the Flink version of 1.1.x. Performance is further improved when we call Flink’s Type Hint and env.getConfig().enableForceavro (). These methods were used until version 1.3.x.

If env.getConfig().enableForceavro () is used when upgrading to 1.7.x, our code will cause an exception to validate null fields. So we cancelled this configuration and tried to serialize using Kyro and register all subclasses of our type into Flink’s ExecutionEnvironment. So far the performance looks good and better than the old version using Avro. But best practices need after comparison and pressure measuring KryoSerializerAvroUtils getAvroUtils () createAvroSerializerPojoSerializer can be summed up, You should choose the proper Serializer based on your business scenario and data type.

4. In Standalone mode, the deploy of the job is shared with resources in isolation

Based on our previous experience, Flink’s standalone Cluster is somewhat random when it comes to publishing specific jobs. For example, if there are two 8-core machines in the cluster to deploy TaskManager, one instance of TaskManager on each machine, the TaskSlot of each TaskManager is 8, and the parallelism of our jobs is 12, It is possible to see the following phenomenon:

The slot of the first TaskManager is fully occupied, while the second TaskManager uses only half of its resources! Resources are severely unbalanced. As the traffic processed by jobs increases, the consumption speed of tasks on TM1 is slow, but the consumption speed of tasks on TM2 is much higher than that of tasks on TM1. Suppose the growth of our business forces us to expand the parallelism of the job to 24 and to add two more powerful machines (12 cores), we deploy taskManagers with 12 slots on each of the new machines. After capacity expansion, the TaskSlot usage of the cluster might look like the following:

The newly expanded machines with higher configurations did not take on more tasks, the burden on the old machines was still serious, and the resources were still uneven in nature!

In addition to unbalanced job publishing policies in standalone cluster mode, poor resource isolation is also a problem. Because we tend to deploy more than one job in a cluster, and these jobs share the JVM on each machine, there is natural competition for resources. At first, to solve these problems, we adopted the following solutions:

  1. Reduce the granularity of TaskManager, that is, multiple instances are deployed on one machine and each instance holds fewer slots.
  2. Isolate large service jobs on different clusters.

These solutions increase the number of instances and clusters, which in turn increases maintenance costs. So we decided to migrate to ON YARN. Flink on YARN is better than standalone mode for resource allocation and isolation.

Iv. Summary and outlook

Flink was just a spark in 2016, but in just two years, it has grown into the most popular stream processing platform, and has the potential to unify batch and stream. After two years of practice, Flink has proven that it can handle the streaming needs of Both TalkingData’s App Analytics and Game Analytics products. Next, we will migrate more complex business and batch processing to Flink, complete the cluster deployment and the unification of technology stack, and finally realize the planning of Flink on YARN Cluster as shown in Figure 5, so as to support more business volume with less cost.



The original link
This article is the original content of the cloud habitat community, shall not be reproduced without permission.