One, foreword

This paper is mainly composed of five parts:

The first is the awesome real-time platform architecture.

The second is why we chose Flink in the research stage. In this part, the comparison between Flink and Spark’s structured streaming and the reasons for choosing Flink are discussed.

The third is the more important content, Flink in the good practice. This includes some potholes we encountered in the process of using Flink, as well as some specific experiences.

The fourth part is the real – time calculation SQL, interface of some practices.

The last words are a few thoughts about Flink’s future. This can be divided into two parts, one is how our company will further use Flink in the future, and the other is some new features that Flink may have in the future.


Ii. Excellent real-time platform architecture

The great real-time platform architecture has several main components.

First, a message-oriented middleware is certainly essential for real-time data. In addition to Kafka, which is commonly used in the industry, there is NSQ. In contrast to Kafka, NSQ is developed using Go, so the company has a layer of Java client through the push and ACK mode to ensure that the message is delivered at least once, so there will be a big gap, especially in the implementation of fault tolerance. In the implementation process, some modifications were made with reference to the Rabbit MQ connector provided by Flink and combined with the features of NSQ Client.

Next is the calculation engine, the oldest is the Storm, still there are some tasks it above the Storm, as for the new basic task is not based on it to develop, because in addition to the high cost of development, semantic support SQL support including state management support all don’t do very well, the throughput is still relatively low, Moving Storm missions to Flink is also one of our next missions. Spark Streaming also has a relatively good ecosystem, but Spark Streaming is microbatch, which brings a lot of limitations, in addition to high latency and relying on external storage to hold intermediate state. Flink is a relatively new engine in Uzan, why did we introduce Flink when we have Spark and Storm, which I will mention in the next section.

Storage engine, in addition to traditional MySQL, we also use HBase, ES and ZanKV. ZanKV is a Redis compliant distributed KV database developed by our company, so just think of it as Redis.

The real-time OLAP engine is based on Druid and has great applications for multidimensional statistics.

And finally, our real-time platform. The real-time platform provides cluster management, project management, task management and alarm monitoring functions.

Here is a brief introduction to the architecture of the real-time platform, followed by Flink’s wonderful exploration phase. In this section, I will mainly compare Spark Structured Streaming.


Three, why choose to introduce Flink

Why the comparison with Spark Structured Streaming(SSS)? Because these are two engines that are representative in the context of real-time SQL.

The first is performance. Compare it from several angles. The first is latency. There is no doubt that Flink as a streaming engine is superior to SSS’s microbatch engine. Spark also introduces a continuous computing engine, but it is not as good as Flink in terms of semantics and maturity. As far as I know, they do this by assigning RDD to a node over a long period of time.

The second more intuitive indicator is throughput, which is slightly inferior to Spark in some scenarios. But when it comes to tasks with large intermediate states, Flink’s Rocksdb-based state management shows its advantage.

Flink can use either pure memory or RocksDB for the management of intermediate states. In the case of RocksDB, it’s simply an embedded database with a cache. With the ability to persist to disk, Flink can save a much larger amount of state than SSS and is not as easy to OOM. In addition, the incremental mode is used to checkpoint, so only the SST files that are different from the last checkpoint are backed up. In the process of use, RocksDB as a state management performance can also meet our needs.

After talking about performance, LET’s talk about SQL, which is also a general direction right now. When I first tried SSS, I tried multiple aggregate operations in a SQL statement, but threw exceptions. A closer look at the documentation later shows that this is indeed not supported in SSS. The second one is distinct and not supported. Flink is far superior to SSS in both cases. So from the point of view of real-time SQL, Flink has won himself another vote. In addition, Flink has more flexible Windows. For output, again referring to the DataFlow model, Flink supports delete and update operations, SSS only supports update operations. (SSS is based on Spark version 2.3)

Flexibility of the API. In SSS, it is true that table brings great convenience, but for some operations, we still want to operate in the form of DStream or RDD. However, SSS does not provide such transformation, so we can only write some UDFs. But in Flink, tables and DataStream have the flexibility to convert each other for more complex scenarios.


Four, Flink in the practice of praise

The first thing to consider before you actually start using Flink is deployment. Due to the existing technology stack, deploy on Yarn and use Single Job mode. Although there will be more ApplicationMaster, there is no doubt increased isolation.

4.1 Problem I: Flink-9567

I ran into one of the odder problems at the start of deployment. Let’s talk about the background first. Because it is still in the research stage, Yarn’s default queue is used, which has a low priority and is easy to be preempted when resources are tight. One morning, I started a task and requested five Containers to run TaskExecutor, a relatively simple state-by-state streaming task, to see if it was stable for a while. The Flink task ended up with over 100 containers and counting, but only five were working. The other containers were registered with slots and the slots were idle. The following two graphs represent the normal task and the broken task respectively.

After the error

Before getting into the details of this issue, let me explain how Flink and Yarn are integrated. Based on the diagram below, we describe what these components do one by one from the bottom up.

A TaskExecutor is the executor of an actual task, and it may have multiple slots, each of which performs a specific subtask. Each TaskExecutor registers its slot with SlotManager and reports its status, whether it is busy or idle.

SlotManager is both the Slot manager and responsible for providing slots for running tasks. The current backlog of slot applications is also recorded. If slots are insufficient, apply for containers from Flink’s ResourceManager.

Pending Slots Indicates the Pending Slot applications and counters

Flink ResourceManager interacts with Yarn ResourceManager to perform operations such as applying for containers, starting containers, and exiting containers. Because the asynchronous application mode is adopted, the current backlog of container applications needs to be recorded to prevent receiving too many containers.

Pending Container Request Indicates the Pending container counter

The AMRMClient performs the asynchronous request. The CallbackHandler notifies Flink’s ResourceManager when a container is received and when the container exits.

Yarn’s ResourceManager acts as a resource distributor. It receives container requests and prepares containers for clients.

This is a lot of concepts to introduce at once, but let me use a simple example to describe the role these components play at run time.

First, our configuration is three TaskManagers, each of which has two slots, or a total of six slots. Currently there are four slots, and the task scheduler requests two more slots to run subtasks.

SlotManager finds that all slots are already occupied, so it places the slot request into pending slots. So you can see that the counter at Pending Slots jumps from 0 to 2. SlotManager then applies for a new TaskExecutor from Flink’s ResourceManager, which can meet the requirements of the two slots. Flink’s ResourceManager adds 1 to the Pending Container Request and applies for resources to Yarn through the AMRM Client.

After Yarn prepares a Container, use the CallbackHandler to notify Flink’s ResourceManager. Flink starts a TaskExecutor for each container it receives and subtracts the Pending Container Request by one. When the Pending Container Request becomes zero, Even if you receive a new container, it will be returned immediately.

When TaskExecutor starts, it registers with SlotManager that two of its slots are available, and SlotManager completes the two backlog Slotrequests. Notify the scheduler that the two subtasks can be executed on the new TaskExecutor and that pending requests are set to 0. Everything is as expected up here.

Then how does the problem arise? So let’s take a look at the task that just worked. It occupies six slots.

If, at this point, something happens that causes the TaskExecutor to exit abnormally, such as Yarn preempting the resource. In this case, Yarn notifies Flink’s ResourceManager that the three Containers exit abnormally. Therefore, Flink’s ResourceManager immediately applies for three new Containers. What will be discussed here is a worst case, because this problem is not a stable recurrence.

The CallbackHandler receives two callback requests and finds that the Container is exiting abnormally. Therefore, it immediately requests a new Container and sets pending Container requests to 3.

If the task is restarted at this time, the scheduler applies for six slots from SlotManager. If no available slots exist in SlotManager, it applies for three Containers from Flink’s ResourceManager. Pending Container Requests becomes 6.

The result is six TaskExecutors, with a total of 12 slots, but only six are in use and six remain idle.

I had two attempts to fix this problem. In my first attempt, I did not apply for a new Container immediately after the Container withdrew abnormally. The problem, however, is that if the Container fails to start TaskExecutor, without this compensation mechanism, some Slot requests will be backlogged because SlotManager has already requested containers for them.

The second attempt is to check pending slots before Flink’s ResourceManager requests a new container. If the backlog slots is already filled with backlog containers, There is no need to apply for a new container.

4.2 Problem 2: Monitoring

The second pitfall we hit was actually related to delayed monitoring. Example is a very simple task, two sources, two operators other than source, parallelism is 2. It has two subtasks for each source and operator.

The logic of the task is simple, but when we turn on the time-lapse monitor. Even for such a simple task, it will record the delay data from each source subtask to each operator subtask. The latency data also includes average latency, maximum latency, 99 percent latency and so on. So we can come up with a formula, the number of delayed data is the number of subtasks of source times the number of sources times the parallelism of the operator times the number of operators. N = n(subtasks per source) * n(sources) * n(subtasks per operator) * n(operator)

I’m going to make a very simple assumption here, which is that the number of subtasks for source and the number of subtasks for algorithm are both p-parallelism. From the following formula, we can see that the number of monitoring increases square with the increase of parallelism. N = p^2 * n(sources) * n(operator)

If we had increased the last task to 10 parallelism, we would have received 400 copies of delayed data. This may not seem like a big deal, but it doesn’t seem to affect the normal operation of the component.

However, on Flink’s Dev Mailing list, one user said that after enabling the delay monitoring, JobMaster would soon fail. He received 24,000 + monitor data, and the ConcurrentHashMap containing the data took up 1.6 GIGABytes of memory. In general, how much memory will Flink’s JobMaster give? I usually allocate 1-2 G, which will result in long-term FullGC and OOM.

So how do we solve this problem? When late monitoring starts to affect the system, the easiest thing to do is to turn it off. But turn off the delay monitoring, on the one hand, we can not know the delay of the current task, on the other hand, there is no way to do some alarm function for the delay.

So another solution is as follows. The first is Flink-10243, which provides more options for deferred monitoring granularity, reducing the number at source. For example, if we use Single mode to collect this data, it will only record the delay of each operator subtask, regardless of the source or subtask of the source. This results in a formula that reduces the 400 latency monitors from the 10 parallelism tasks we mentioned earlier to 40. This feature was released in 1.7.0 and backport is back in 1.5.5 and 1.6.2.

In addition, flink-10246 proposes improvements to the MetricQueryService. It consists of several subtasks, the first three of which set up a proprietary low-priority ActorSystem for the monitoring service, which can be simply understood as providing a separate thread pool with low-priority threads to handle related tasks. It is also intended to prevent monitoring tasks from affecting major components. This feature was released in 1.7.0.

Another is Flink-10252, which is still under review and improvement in order to control the size of monitoring messages.


4.3 Practice I

Next, we will talk about some specific applications of Flink in Youzan.

The first is Flink with Spring. First of all, there are many services that only expose the Dubbo interface, and users often get the client of this service through Spring, and it is also the case in some real-time computing applications.

In addition, there are a number of data application developers who are Also Java engineers who want to use Spring in Flink and some of the components in the ecosystem to simplify their development. The needs of the users must be met. I’m going to talk about some examples of mistakes and finally how to use them.

The first error is typical of starting a Spring environment in Flink’s user code and then fetching the bean associated with the call in the operator. But in fact, the last Spring Context is actually launched on the client side, which is the end of the submission task, and there’s a red box in the middle that says Spring Context which is where it’s launched. However, the user is actually in the TaskSlot of the TaskManager at the time of the actual call, and they are both in different JVMS, which is obviously unreasonable. So we have a second mistake.

The second error looks much better than the first. We use RichFunction in the operator and get a Spring Context from the configuration file in the open method. But regardless of whether launching several Spring contexts in one TaskManager is a waste, launching two Spring contexts in one Jvm is problematic. Some users might think, well, that’s not easy, just set TaskSlot to 1. However, there is the OperatorChain mechanism that binds several narrowly dependent operators together to run in a TaskSlot. Why don’t we just shut down the OperatorChain? If not, Flink will probably do CoLocationGroup based optimizations where multiple subtasks are executed in a TaskSlot.

The final solution, however, is easier than wrapping the SpringContext with a singleton, ensuring that there is only one in each JVM, and using this singleton to fetch the corresponding Bean in the open method of the operator function.

However, when the Dubbo service is invoked, a response usually takes at least 10 ms. The maximum throughput of a TaskSlot is around 1000, which is a huge waste of performance. So the solution to this problem is async and caching, caching for calls that return the same value multiple times, and async for throughput promotion.

4.4 Practice II

But what if you want to use both asynchrony and caching? At first I thought this was an easy feature to implement, but when I actually wrote RichAsyncFunction I found there was no way to use the KeyedState hosted by Flink. So the initial idea was to make an LRU-like Cache to Cache data. But this doesn’t take advantage of Flink’s state management at all. So I looked at the implementation.

Why not?

When a record enters the operator, Flink will first extract the key and point the KeyedState to the storage space associated with the key, which points to the storage space associated with key4. However, if the asynchronous operation associated with KEY1 completes at this point and you want to cache the content, the content will be written to the storage space bound to KEY4. When the next key1 related record enters the operator, go back to the storage space associated with KEY1, but can not find the data, have to request again.

So the solution is to customize an operator, each record into the system, let it point to the same public key storage space. Use MapState for caching in this space. Finally, function run by the operator inherits AbstractRichFunction to obtain KeyedState in the open method, and implements AsyncFunction interface to perform asynchronous operations.


Five, real-time calculation SQL and interface

At first, we used SDK to simplify the development of SQL real-time tasks, but this is not very user-friendly for users, so now we talk about SQL real-time tasks interface, using Flink as the underlying engine to perform these tasks.

When performing SQL real-time tasks, the first step is to abstract the external system. Data sources and data pools are abstracted as streaming resources. Users register their Schema information and meta-information with the platform, and the platform manages the read and write permissions according to the project team they belong to. A unified format of the message source here would reduce a lot of complexity. For example, in Uzan, users who want to access must ensure that the message is in Json format. A sample message can directly generate Schema information.

Next, according to the data source and data pool selected by the user, obtain the corresponding Schema information and meta information, register the corresponding external system Table connector in the Flink task, and then execute the corresponding SQL statement.

Use UDF to extend functions not supported by SQL semantics.

It provides meta-information between data sources and data pools, obtains possible dependencies between real-time tasks, and monitors the entire link


Vi. Future and outlook

Flink’s attempts at batch processing and ML modules will be compared to Spark to analyze its strengths and weaknesses. At present, it is still in the research stage. At present, I pay more attention to the combination of Flink and Hive, corresponding to issue Flink-10566.

In terms of the development of Flink, I pay more attention to and participate in the optimization of scheduling and resource management. Now Flink’s scheduling and task execution diagrams are coupled together, using a relatively simple scheduling mechanism. By isolating the scheduler and making it pluggable, more scheduling mechanisms can be applied. In addition, based on the new scheduler, more flexible resource replenishment and reduction mechanisms can be made to achieve Auto Scaling. This is likely to be an important feature in future releases. Correspond to the two issues flink-10404 and Flink-10429.


The infrastructure team is mainly responsible for the excellent Data Platform (DP), real-time computing (Storm, Spark Streaming, Flink), offline computing (HDFS,YARN,HIVE, Spark SQL), online storage (HBase), Real-time OLAP(Druid) and other technical products, welcome interested partners contact [email protected]