How do YOU choose the right flow computing engine for your business? In addition to comparing individual functional matrices, benchmarks are an important and common way to evaluate system performance.
In the field of stream computing, however, there is no industry-standard benchmark. This article explores the challenges of flow computing benchmarking design, shares how to design a flow computing benchmarking framework, Nexmark, and plans for the future.
background
As the timeliness of data becomes more and more important for the refined operation of enterprises, “real-time is the future”, “real-time data warehouse” and “data lake” have become hot words in recent years. The landscape of stream computing has also changed dramatically in recent years. Apache Flink is constantly working in the direction of stream batch integration, Apache Spark has a certain audience for near real-time processing, Apache Kafka also has ksqlDB high-profile into stream computing. Apache Storm, on the other hand, began to fade away.
Where each engine has its advantages, how to choose a flow computing engine suitable for your business has become a long-standing topic. In addition to comparing the different functional matrices provided by each engine, performance is an evaluation factor that cannot be bypassed. Benchmarking is an important and common process used to evaluate system performance.
Problems with existing stream computing benchmarks
There is currently no industry-standard benchmark for stream computing. One of the most well-known Benchmarks in the industry today was Yahoo Streaming Benchmarks published five years ago by the Yahoo Storm team [4]. Yahoo’s original intention, due to the lack of real-world benchmarks, was to simulate a simple advertising scenario to compare various streaming computing frameworks, which has since been widely cited. The specific scenario is from the click stream of Kafka consumed ads, associated with the campaign information of the ads in Redis, and then time window aggregation counting.
However, these external system services (Kafka, Redis) became a bottleneck because the Yahoo team was so focused on restoring the real production environment. Ververica did an extended experiment in this article [5] to replace the data source from Kafka with a built-in Datagen source, which improved performance by 37 times! Thus, the introduction of Kafka components resulted in an inaccurate reflection of the engine’s true performance. More importantly, Yahoo Benchmark contains a very simple “Word Count” -like job that doesn’t fully reflect today’s complex streaming computing systems and businesses. Who would use a simple “Word Count” to measure and compare performance differences between databases? These are the reasons why Yahoo Benchmark is not an industry-standard Benchmark. That’s what we’re trying to solve.
Therefore, we believe that an industry-standard benchmark should have the following characteristics:
1. Reproducibility
Reproducibility is an important requirement for benchmark to be trusted. Many of benchmark’s results are hard to reproduce. Some are because only benchmark results are presented, and the code used to generate them is not publicly available. Some are because the hardware used for Benchmark is not readily available. Some of the results are inconsistent because Benchmark relies on too many services.
2. Can represent and cover the real business scenarios of the industry (query volume)
For example, the well-known tPC-H and TPC-DS in the database field cover a large number of Query sets to capture the subtle differences between query engines. Moreover, these Query collections are based on real business scenarios (the retail industry) and are popular with some big data systems because of their large data size.
3. Can adjust the load of the job (data volume, data distribution)
In big data, different data sizes can be completely different things for engines. For example, Yahoo Benchmark uses only 100 campaign ids, making the state so small that it can fit in memory. This makes the impact of IO and checkpoint synchronization negligible. However, the real scene often has to face the big state, the challenge is much more complex and difficult. A data generation tool like TPC-DS will provide Scalar Factor parameters to control the amount of data. Secondly, the data distribution should also be close to the real world data, such as data skew, and adjust the skew ratio. This provides a comprehensive and comprehensive reflection of the differences between business scenarios and engines.
4. A unified performance measurement indicator and collection and summary tool are available
Benchmarking performance metrics need to be clearly defined, consistent, and applicable to a variety of computing engines. However, performance metrics for stream computing are more difficult to define and collect than those for traditional batch processing. This is one of the most challenging problems for streaming benchmarking and is described below.
We have also looked ata number of other stream-computation-related benchmarks, including StreamBench, HiBench, and BigDataBench, but they all fall short of the above fundamentals. The industry benchmark for benchmarking is undoubtedly a series of benchmarks published by TPC, such as TPC-H, TPC-DS. However, these benchmarks are designed for traditional databases, traditional warehouses, and are not suitable for today’s stream computing systems. For example, Benchmark does not take into account event times, data out of order, Windows, and other common scenarios in streaming computing. So we had to consider redesigning and open-source a stream computing benchmark framework called Nexmark.
Address: github.com/nexmark/nex…
Design of Nexmark benchmark framework
In order to provide a flow computing benchmark that meets the above fundamentals, we designed and developed the Nexmark benchmark framework to become the standard benchmark for flow computing.
The Nexmark benchmark framework is derived from the Nexmark research paper [1], as well as Apache Beam Nexmark Suite[6], and has been expanded and improved upon. The Nexmark benchmark framework does not rely on any third-party services. You only need to deploy the engine and Nexmark, and run the Nexmark /bin/run_query.sh all script to wait and get benchmark results under all queries. Let’s look at some of the design decisions for The Nexmark benchmark.
Remove external source and sink dependencies
As mentioned above, Yahoo Benchmark uses a Kafka data source, but the result does not accurately reflect the true performance of the engine. In addition, we also found that in the scenario of benchmark fast and slow double stream JOIN, if Kafka data source is used, slow stream will overconsume (fast stream is easy to be backcompressed), resulting in the state of JOIN node will cache a large amount of advanced data. This doesn’t really reflect the real world, where slow flow can’t be consumed ahead of time (data hasn’t been generated yet). So we used Datagen Source in Nexmark, where the data is generated directly in memory and sent directly to downstream nodes without landing. Multiple event streams are generated by a single data generator, so when fast streams are backpressured, the generation of slow streams can also be suppressed, which better reflects the real scene.
Similarly, we remove the dependency of external sinks and instead of output to Kafka/Redis, output to an empty sink that will discard all data received by sink.
In this way, we ensure that the bottleneck is only within the engines themselves, allowing us to accurately measure the subtle differences between the engines.
Metrics
The metric of a batch system benchmark is usually measured in terms of total elapsed time. However, the data processed by the stream computing system is endless and the query time cannot be counted. Therefore, we proposed three primary metrics: throughput, latency, and CPU. The Nexmark testing framework automatically collects and summarizes metrics for us without the need to deploy any third-party metric services.
# # # # s throughput
Throughput, also known as TPS, describes how many pieces of data a streaming computing system can process per second. Since we have multiple event streams, all of which are generated by a single data generator, we use the TPS of the data generator rather than the TPS of a single event stream in order to unify the observation Angle. We take the maximum throughput a query can achieve as its throughput metric. For example, for the Flink engine, we use the < source_Operator_name >.numRecordsOutpersecond metric exposed by the Flink REST API to get the current throughput.
S delay
Latency describes the time interval between data entering a stream computing system and its results being output. For window aggregation, Yahoo Benchmark uses output_system_time-WINDOW_END as a delay indicator, which does not take into account the amount of time the data has to wait before the window is output. This calculation is also greatly affected by backpressure, so it is not accurate. A more accurate calculation is output_system_time-max (ingest_time). However, in non-window aggregations, or dual-stream joins, the delay is calculated differently.
Therefore, there are many practical problems in the definition and collection of delay in the flow computing system, which need to be analyzed according to the specific query. This is discussed in detail in reference [2], which is also the reason why we have not implemented delay metric in Nexmark yet.
S CPU
Resource utilization is an overlooked metric in many stream computing benchmarks. Because in a real production environment, we don’t limit the number of cores the flow computing engine can use, giving the system more flexibility. So we introduced CPU usage as a secondary metric, how many cores the job consumed. Using throughput /cores, you can calculate the average contribution of each core to the throughput. To collect the CPU usage of processes, we do not use JVM CPU load. Instead, we use YARN to sample /proc//stat and calculate it. In this way, we can obtain the real CPU usage of processes. So our Nexmark testing framework needs to deploy the CPU acquisition process on each machine before testing begins.
The Query and Schema
Nexmark’s business model is based on a real-world online auction system. All queries are based on the same three data streams, which are generated by a data generator to control their proportions, data skew, correlation, and so on. The three data streams are:
- Person: Represents a user who submits an auction or participates in a bid.
- Auction: Represents an item for sale.
- Bid: Represents a Bid for an item at an auction.
We defined a total of 16 Queries, all using ANSI SQL standard syntax. Based on SQL, we can more easily extend the Query test set to support more engines. However, most queries cannot be implemented through Structured Streaming due to Spark’s limitations in stream computing. So we currently only support testing the Flink SQL engine.
Configuration of job loads
We also support configuring the workload of the tuning job, including the throughput and throughput curves of the data generator, the ratio of data volumes to each data flow, the average data size of each data flow, and the data skew ratio. Refer to the Source DDL parameter for details.
4. Experimental Results
We benchmarked Nexmark against Flink on three machines in Aliyun. Each machine is ecs.i2g.2xlarge, with Xeon 2.5ghz CPU (8 vCores), 32 GB of ram, and 800 GB of SSD local disk. The bandwidth between machines is 2 Gbps.
Testing the Flink-1.11 version, we deployed the Flink standalone cluster on these 3 machines, consisting of 1 JobManager, 8 TaskManagers (each with 1 slot), all with 4 GB of memory. The cluster default parallelism is 8. Enable checkpoint and exactly once mode, checkpoint interval 3 minutes. Use the RocksDB state back end. The test found that for stateful Query, the size of each checkpoint was greater than GB, so the large-state scenario was effectively tested.
Datagen Source continues to generate data ata rate of 10 million per second, and the three streams are Bid: 92%, Auction: 6%, and Person: 2%. Each Query runs a 3-minute warm-up, followed by a 3-minute performance metrics collection.
After the nexmark/bin/run_query.sh all command is run, the following test results are displayed:
Five summarizes
We developed and designed Nexmark to deliver a standard set of benchmark tests and test procedures for flow computing. Although only Flink engine is supported at present, it has some significance at present, for example:
- Drive the development and standardization of flow computing benchmarks.
- As a performance testing tool between iterations of the Flink engine, or even a daily regression tool, to detect performance fallbacks in time.
- When developing Flink performance optimization function, it can be used to verify the effect of performance optimization.
- Some companies may have internal versions of Flink that can be used as a performance comparison tool between internal and open source versions.
Of course, we also plan to continuously improve and refine the Nexmark testing framework, such as support for Latency Metric, support for more engines, Such as Spark Structured Streaming, Spark Streaming, ksqlDB, Flink DataStream and so on. We also welcome people with lofty ideals to contribute and expand.
References and quotations:
[1]Pete Tucker and Kristin Tufte. “NEXMark — A Benchmark for Queries Over Data Streams”. June 2010. [2]Jeyhun Karimov And Tilmann Rabl. “Benchmarking Distributed Stream Data Processing Systems”. ArXiv :1802.08496 V2 [cs.db] Jun 2019 [3]Yangjun Wang. “Stream Processing Systems Benchmark: StreamBench”. May 2016. [4]github.com/yahoo/strea… [5] www.ververica.com/blog/extend… [6] beam.apache.org/documentati…