Author: Zhang Xinyu
From the perspective of data transmission and data reliability, this paper compares and tests the performance of Storm and Flink in stream processing, analyzes the test results, and gives some suggestions for improving the performance when using Flink.
Apache Storm, Apache Spark, and Apache Flink are all very active distributed computing platforms in the open source community, and many companies may use two or even three of them simultaneously. For real-time computing, the underlying computing engines of Storm and Flink are stream-based, essentially processing data one by one, in pipelined mode, that is, all processing processes exist at the same time and data flows between them. Spark is based on batch data processing, that is, a small batch of data is processed, and the processing logic is calculated only after a batch of data is prepared. In this article, we compare Storm and Flink, both of which are based on stream processing.
To begin our test, we examined performance benchmarks of some existing big data platforms, such as Yahoo Streaming-Benchmarks, or Intel HiBench. In addition, there are many papers also from different angles of distributed computing platform for testing. Although each of these test cases has a different focus, they all use the same two metrics, namely throughput and latency. Throughput represents the amount of data that can be processed per unit of time, and can be increased by increasing concurrency. Latency represents the time required to process a piece of data and is inversely proportional to throughput.
When we design the computational logic, we first consider a computational model of flow processing. The figure above shows a simple flow calculation model where data is fetched from Source, sent to downstream Task, processed in Task, and finally output. For such a model, the delay time consists of three parts: data transfer time, Task calculation time, and data queuing time. We assume that there are sufficient resources and data is not queued. The delay time is only composed of data transmission time and Task calculation time. The time required to process tasks is closely related to the user’s logic, so the data transfer time can better reflect the capability of a computing platform. Therefore, in order to better reflect the ability of data transmission, we did not design any calculation logic in Task when designing test Case.
When determining the data source, the main consideration was to generate the data directly in the process, which was also used in many previous test standards. This is done because data generation is not constrained by the performance of external data source systems. But since most of the real-time computing data in our company comes from Kafka, we added tests that read data from Kafka.
Data transmission can be divided into two modes: inter-process data transmission and intra-process data transmission.
Interprocess data transfer means that the data will go through serialization, network transmission and deserialization. In Flink, two processing logics are distributed on different TaskManagers, and the data transfer between the two processing logics can be called inter-process data transfer. Flink network transmission is using Netty technology. In Storm, data transfer between processes is between workers. ZeroMQ for storm network transport is now Netty.
In-process data transfer refers to two processing logic in the same process. In Flink, the two processing logic are chained together and process data is transferred in a thread in the form of method call arguments. In Storm, the two processing logic becomes two threads, transferring data through a shared queue.
– Storm and Flink both have their own reliability mechanisms. In Storm, ACK is used to ensure data reliability. In Flink, checkpoint mechanism is used to ensure this, which is derived from chandy-Lamport algorithm.
In fact, the assurance of exactly-once reliability is related to the logic of processing and the design of the result output. For example, if the result is output to Kafka and the data output to Kafka cannot be rolled back, there is no guarantee of exactly-once. At least-once semantic reliability and non-guaranteed reliability were used in the test.
The image above shows the environment and platform versions we tested.
The figure above shows the throughput of Flink with different transmission modes and reliability: in-process + unreliable, in-process + reliable, inter-process + unreliable, inter-process + reliable in the case of self-generated data. You can see that intra-process data transfers are 3.8 times larger than inter-process data transfers. Whether checkpoint is enabled or not has little effect on Flink throughput. So when we use Flink, we use in-process transport, which is to Chain operators as much as possible.
So let’s take a look at why Chain performs so much better, and how to make Flink operator Chain use interprocess data transfer in the process of writing Flink code.
As you know we must create an env in Flink code, calling env’s disableOperatorChainning() method will make all operators unable to chain. We usually call this method during debug to debug problems.
If Chain is allowed, the Source and mapFunction in the figure above are chained together and computed in a Task. On the contrary, if Chain is not allowed, it is placed in two tasks.
For two operators without Chain, they are placed in two different tasks, so the data transmission between them is like this: SourceFunction takes data serialized and puts it into memory, and then transmits it to the process of MapFunction over the network, which serializes the data and uses it.
For the two operators in the Chain, they are placed in the same Task, so the data transmission between the two operators is as follows: SourceFunction takes the data, makes a deep copy, and then MapFunction takes the deeply copied object as the input data.
Although Flink has made a lot of improvements in serialization, the performance is still poor compared to in-process data transfer without serialization or network transfer. So we Chain the operator as much as possible.
Not two operators can be chained together. There are many conditions to Chain operators. First, the downstream operator can only accept one upstream data flow. Secondly, the number of concurrent upstream and downstream must be the same; Third, operators should use the same resource Group, default is the same, is default; Env does not call the disableOperatorChainning() method. Env does not call the disableOperatorChainning() method. For example, no call to rebalance(), no keyby(), no boardcast.
Compare Flink and Storm throughput when using in-process communication without ensuring data reliability. In this case, Flink is 15 times better than Storm. Flink throughput can reach 20.6 million pieces /s. Furthermore, if env.getConfig().enableObjectreuse () is called at development time, Flink’s concurrent throughput can reach 40.9 million /s.
When the enableObjectReuse method is called, Flink skips all the intermediate deep-copy steps and the SourceFunction data is directly input to MapFunction. However, it is important to note that this method cannot be called randomly. You must ensure that there is only one downstream Function, or that none of the downstream functions change the values inside the object. Otherwise there could be thread safety issues.
Storm’s ACK mechanism is more expensive to ensure data reliability.
The picture on the left shows Storm’s Ack mechanism. For every data Spout sends to Bolt, it sends an ACK message to acker, and Bolt sends an ACK message to acker when it has processed the data. When acker receives all ack messages for this data, Spout receives an ACK message. That is, for a two-level topology (spout+ Bolt), three ACK messages are transmitted for every piece of data sent. These three ACK messages are the costs needed to ensure reliability.
The figure on the right shows Flink’s Checkpoint mechanism. In Flink, the initiator of Checkpoint information is JobManager. It does not have ack cost per message as in Storm, and costs by time. You can set the checkpoint frequency, for example, every 10 seconds. Each checkpoint cost is only one checkpoint message sent from the Source to the map. (The checkpoint message sent by the JobManager goes through the control flow and has nothing to do with the data flow.) Compared to Storm, Flink’s reliability mechanism is much cheaper. This is why guaranteed reliability has little impact on Flink’s performance and storm has a big impact.
The last group of self-produced data is the comparison of data transmission between Flink and Storm. It can be seen that in the case of data transmission between processes, Flink’s concurrent throughput is 4.7 times that of Storm. 14 times more reliable than Storm.
The figure above shows the concurrent throughput of Storm and Flink when consuming kafka data. Throughput is definitely affected by Kafka because it is consuming data in Kafka. We found that the performance bottleneck was in the SourceFunction, so we increased the number of partitions for topic and the number of concurrent threads for SourceFunction to fetch data, but the number of concurrent threads for MapFunction was still 1. In this case, we find that the bottleneck of Flink moves upstream to send data downstream. Storm’s bottleneck is in the downstream data deserialization area.
From the perspective of data transmission and data reliability, we simply analyzed the performance of Flink and Storm computing platforms. But in practice, task must have calculation logic, which is bound to involve more resources such as CPU, memory and so on. In the future, we plan to make an intelligent analysis platform to analyze the performance of users’ jobs. Through the collected indicator information, the bottleneck of the operation is analyzed and optimization suggestions are given.