How to build a real-time computing platform based on Spark Streaming
# # # introduction
With the rapid development of Internet technology, users have higher and higher requirements for timeliness, accuracy and stability of data processing. How to build a stable and easy-to-use real-time computing platform that provides complete monitoring and warning functions has become a great challenge for many companies.
Since the establishment of Ctrip’s real-time computing platform in 2015, after more than two years of continuous technological evolution, the scale of real-time cluster has reached hundreds, covering hundreds of real-time applications of various SBU and public departments. The stability of JStorm cluster has reached 100% throughout the year. At present, the real-time platform is mainly built based on JStorm and Spark Streaming. I believe that friends who pay attention to Ctrip’s real-time platform have seen a share about Ctrip’s real-time platform last year: Ctrip’s real-time big data platform practice Sharing.
This sharing will focus on how Ctrip builds a real-time computing platform based on Spark Streaming. The article will explain the construction and application of the platform from the following aspects:
-
Spark Streaming vs JStorm
-
Spark Streaming design and encapsulation
-
Spark Streaming practice in Ctrip
-
The pit I stepped on
-
future
###Spark Streaming vs JStorm
Before ctrip’s real-time platform was connected to Spark Streaming, JStorm had been running steadily for one and a half years and could basically meet most application scenarios. Spark Streaming has the following considerations: First of all, The JStorm version used by Ctrip is version 2.1.1, which has a low Level of encapsulation and abstraction, and does not provide high-level abstraction methods and support for Windows, state, Sql and other functions. This greatly increases the threshold for users to implement real-time applications using JStorm and the difficulty of developing complex real-time application scenarios. Spark Streaming performs much better in these areas, not only providing a highly integrated abstract method (various operators), but also allowing users to use SQL to process data directly in conjunction with Spark SQL.
Secondly, users often need to maintain two sets of data processing logic during data processing. JStorm is used for real-time calculation, and Hive or Spark is used for offline calculation. To reduce development and maintenance costs and unify streaming and offline computing engines, Spark provides a good support.
Finally, before Spark Streaming is introduced, the cost of Spark and Flink is analyzed. Flink was at version 1.2 and Spark was at version 2.0.1. Compared with Spark, Flink’s support in SQL and MLlib is relatively weaker than Spark. In addition, many departments of the company develop offline task and algorithm models based on Spark SQL and MLlib, which greatly reduces the learning cost of Spark.
Below is a simple comparison of our current use of Spark Streaming and JStorm:
Spark Streaming design and encapsulation
In the initial stage of Spark Streaming, the first thing to consider is how to seamlessly embed Spark Streaming based on existing real-time platforms. The original real-time platform already includes many functions, such as metadata management, monitoring and alarm functions, so the first step is to encapsulate Spark Streaming and provide rich functions. The whole system consists of Muise Spark Core, Muise Portal, and external systems.
####Muise Spark Core
Muise Spark Core is a secondary encapsulation implemented by Spark Streaming, which supports multiple message queues on Ctrip. Among them, Hermes Kafka and the native Kafka consume data based on Direct Approach. Hermes Mysql and Qmq consume data on a Receiver basis. Many of the features I’ll cover next are focused on Kafka-type data sources.
Muise Spark Core contains the following features:
-
Kafka Offset automatic management
-
Support Exactly Once and At Least Once semantics
-
Provide Metric registration system, users can register custom Metric
-
Alarm based on system and user-defined metric
-
Long running on Yarn: provides a fault tolerance mechanism
Kafka Offset automatic management
The first goal of packaging Muise Spark Core is to be simple and easy to use, so that users can get started using Spark Streaming in the simplest way possible. First, we implement the function to help users automatically read and store Kafka Offset. Users do not need to care about how the Offset is processed. Secondly, we also verify the validity of Kafka Offset. Some users’ jobs may fail to run again after stopping for a long time. We also make corresponding operations for this situation. The following diagram shows a simple example of a spark Streaming job written by a user based on muise Spark Core. The user only needs a few lines of code to initialize the code and create the corresponding DStream:
By default, the job continues to consume each time based on the last stored Kafka Offset, but the user can determine the starting point for consuming the Offset. Here are three ways to set a spending threshold:
Exactly Once implementation
If real-time jobs are to implement end-to-end exactly Once, the semantics of Exactly Once should be guaranteed at the three stages of data source, data processing and data storage. Currently, the end-to-end exactly Once semantics can be realized based on the Kafka Direct API and Spark RDD operator. Typically implementing exactly Once in the data storage phase requires that the stored procedure be idempotent or transactional. Many systems support idempotent operations. For example, the same data is written to the same FILE in HDFS, which is an idempotent operation in itself, ensuring that the values obtained by multiple operations are the same. HBase, ElasticSearch, and Redis all support idempotent operations. Operations on relational databases typically support transactional operations.
DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream The saved Offset is the End Offset of this batch, and the next consumption will start from the last End Offset. There are some problems with this when the program goes down or the task is restarted. If Offset is stored before data processing is complete, job processing data may fail or the job may break down. After the restart, the data processed last time cannot be traced, resulting in data loss. If the Offset is stored after the data processing is complete, but the Offset fails or the job goes down, the data consumed last time will be consumed again after the restart. In addition, there is no guarantee that the amount of data consumed after the restart is the same as the amount of data consumed before the downtime, which introduces another problem. If the update operation is based on aggregate statistics, it will be impossible to determine whether the data has been updated successfully last time.
So in Muise Spark Core we added our own implementation to ensure Exactly once semantics. DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream: DirectKafkaInputStream In addition, we save the start Offset and end Offset of each batch when storing Kafka Offset, the specific format is as follows:
The purpose of this is to ensure that whether there is an outage or an artificial restart, the data of the first batch after the restart is exactly the same as that of the last batch before the restart. Such a design makes the subsequent user very flexible for the data processing of the first batch. If the user directly ignores the data of the first batch, the semantics of at most once are guaranteed at this time, because we cannot know whether the data operation of the last batch before the restart has been successfully completed. If the user processes the data of the first batch according to the original logic and does not redo it, the semantics of at least once are guaranteed at this time, and there may be repeated data in the final result. Finally, if the user wants to implement exactly once, Muise Spark Core provides the function of generating uuIds based on topic, partition, and offset. As long as two batches consume the same offset, the resulting UID will be the same. You can determine whether the last batch of data is successfully stored based on the UID. The following is a brief description of the behavior of the first batch operation after restart.
The Metrics system
The Musie Spark Core is based on spark’s own metrics system, adding many customized metrics and exposing the metrics registration interface to users. Users can easily register their own metrics and update metrics values in the application. Finally, all metrics will be written into Graphite according to the batch interval set by the operation, which is based on the company’s customized early warning system for alarm. The front-end can display metrics indicators through Grafana.
Metrics customized by Muise Spark Core include the following three types:
-
Fail If the number of Spark task failures exceeds four times within a batch time, the alarm is generated to monitor the running status of the program.
-
If the amount of data processed by spark Streaming is less than 0 within the Ack batch time, the alarm will be sent to monitor whether the program is normally consuming data.
-
Lag Alarms when the delay of data consumption in a batch exceeds the set value.
Since most of our jobs have the Back Pressure function enabled, we can see in Spark UI that each batch of data can be consumed within the normal time. However, there may be a large amount of data in Kafka at this time, so we will calculate an average difference between the current consumption time and the data itself time for each batch. If this difference is greater than the batch time, there is already a delay in the data consumption itself.
The following figure shows the early-warning system based on Metrics registered by users and Metrics customized by the system.
Fault tolerance
In fact, the Exactly Once chapter above has described in detail how Muise Spark Core can ensure correct data processing after application downtime. However, to ensure Spark Sreaming can run stably on a Yarn Cluster for a Long time, many configurations need to be added. You can see Long Running Spark Streaming Jobs on Yarn Cluster.
In addition to the above fault tolerance guarantees, Muise Portal (described later) also provides periodic detection of Spark Streaming jobs. Currently, Spark Streaming jobs in all databases marked with Running status are detected every 5 minutes. The REST APIs provided by Yarn can query the status of the job on Yarn based on the Application Id of each job. If the status is not Running, It tries to restart the job.
####Muise Portal
After all Spark Streaming is packaged, we need a platform that can manage configuration operations, and Muise Portal exists. Muise Portal currently supports Storm and Spark Streaming, creating jobs, publishing Jar packages, running and stopping jobs, and other functions. The following image shows the interface for the new job:
Spark Streaming jobs run in Yarn Cluster mode. All jobs are submitted to the Yarn Cluster by the Spark client on the Muise Portal. The specific operation flow of a job is shown in the figure below:
#### Overall structure
Finally, the overall architecture of ctrip real-time platform is given here.
###Spark Streaming practice in Ctrip
At present, Spark Streaming’s business scenarios in Ctrip can be divided into the following parts: ETL, real-time report statistics, personalized recommendation marketing scenarios, and risk control and security applications. Abstract, it can be divided into data filtering and extraction, data index statistics and the use of model algorithm.
####ETL
There are a variety of tools on the market today that can consume data in real time from Kafka, filter and clean it and eventually land on the corresponding storage system, such as Camus and Flume. Compared with such products, Spark Streaming has the advantages of supporting more complex processing logic, and resource scheduling based on Yarn makes Spark Streaming more flexible. Finally, you can convert Spark RDD data into Spark Dataframe data, which can be combined with Spark SQL and stored as formatted data such as Parquet when exported to distributed file systems such as HDFS and Alluxio. Users can use Spark SQL to process data more easily.
Currently, the typical APPLICATION of ETL is the Data Lake application of Ctrip Vacation department. The vacation department uses Spark Streaming to perform ETL operation on Data and finally stores Data to Alluxio. During this period, the customized metric function of Muise-Spark-core was used to verify and monitor the data quantity, field number, data format and duplicate data. The specific monitoring warnings have been described above.
#### Real-time report statistics
Real-time report statistics and presentation are also widely used in Spark Streaming. Data statistics can be based on Process Time or Event Time. Spark Streaming different batches of jobs can be regarded as one rolling window, and an independent window contains data of multiple Time periods, which limits Spark Streaming statistics based on Event Time. In general, the more common way is to count the cumulative value of different time dimensions in each batch and import it to the external system, such as ES; Then in the report presentation based on time to do a second aggregation to obtain the complete cumulative value of the final aggregation value. The figure below shows ctrip IBU’s real-time kanban based on Spark Streaming.
#### Personalized recommendation and risk control security
What these two applications have in common is that they both need to predict or classify users’ behaviors based on algorithmatic models. At present, all models of Ctrip are based on offline data for offline training at regular intervals every day. After Spark Streaming was introduced, many departments began to actively attempt real-time feature extraction and online model training. Spark Streaming can be well combined with Spark MLlib. The most successful case is that the security department used to capture attack requests based on various filtering conditions. Later, they used offline model training, Spark Streaming and Spark MLlib to make real-time prediction for users. The performance is ten times better than that of JStorm (based on a large number of regular expression matching users, very CPU consuming), and the missing alarm rate is 20% lower.
### the pit I’ve stepped on
Currently, Spark Streaming on Ctrip operates in the same YARN cluster as offline jobs, which affects job performance and stability. In particular, when the YARN or Hadoop cluster needs to update, maintain and restart services, Spark Streaming jobs will report errors or hang up to a large extent. Despite many fault tolerance guarantees, data backlog and data processing delay will also occur. In the later stage, Hadoop and Yarn clusters will be independently deployed, and all real-time jobs will run on independent clusters without external influence, which facilitates the development and maintenance of Flink jobs in the later stage. Later, Alluxio was used to realize data sharing between the main cluster and the subset group.
In the process of use, also encountered a variety of different bugs, here simply introduce a few more serious problems. Spark Streaming each batch of Job gets the latest Kafka Topic offset from the DirectKafkaInputStream comput method. Will lead to Java. Lang. RuntimeException: No leader found for partition the problem of xx, since this code runs on the Driver side, without doing any configuration and processing, will cause the program directly to hang up. The corresponding solution is to configure the spark. Streaming. Kafka. MaxRetries is greater than 1, and can be configured to refresh the leader. The backoff. Ms parameter set the time interval of each retry.
Secondly, there are many problems in the process of combining Spark Streaming with Spark Sql. For example, out of memory may occur in the process of using: PermGen space, this is due to the use of Code Generator for Spark SQL resulting in heavy use of PermGen space, Through the spark. Driver. Add – XX: extraJavaOptions MaxPermSize = 1024 m – XX: PermSize = 512 m. Spark Sql needs to create Spark Warehouse. If the Spark Sql is run based on Yarn, the default directory may be created in HDFS. If no Permission is granted, a message “Permission denied” will be displayed. You can configure config(“spark.sql.warehouse. Dir “, “file:${system:user.dir}/spark-warehouse”) to solve the problem.
### Future outlook
The application of Spark Streaming in Ctrip real-time platform is introduced in detail above. There are still some pain points in using Spark Streaming. For example, the window function is relatively single, the Event time-based statistical indicator is too cumbersome, and the official version basically does not add new features, which makes us more inclined to try Flink. Flink basically implements all of Google’s real-time processing concepts, including WaterMark, which you can check out in The Official Google doc: The World Beyond Batch: Streaming 102.
With the upcoming release of Flink 1.4, Spark 2.2.0 also supports more features of Structured Streaming based on Kafka data sources. We have done sufficient research on Flink in the early stage, and the main work in the second half of the year will be on the connection of Flink. In the future, we will focus on how to make it easier for users to implement real-time computing logic. Apache Beam provides good encapsulation for various real-time scenarios and supports a variety of real-time computing engines. Secondly, the realization of complex real-time application scenarios based on Stream Sql will be our main research direction.
Transcript: Pan Guoqing: Practical Analysis of Building real-time Computing Platform Based on Spark Streaming