Author: Wang Xinchun
Sorted out: Guo Xuce
This article is shared by Wang Xinchun in The Offline Meetup· Shanghai station of Flink China community on July 29, 2018. Wang Xinchun is currently in charge of real-time platform related content in VipSHOP, mainly including real-time computing framework, providing real-time basic data, and machine learning platform. I was also responsible for the big data platform at Meituan Dianping. He has accumulated rich working experience in real-time processing of big data.
This paper mainly includes the following aspects:
-
Current situation of ViPSHOP real-time platform
-
Practice of Apache Flink (hereinafter referred to as Flink) in ViPSHOP
-
Flink On K8S
-
Subsequent planning
I. Current situation of ViPSHOP’s real-time platform
At present, the real-time platform of VipSHOP is not a unified computing framework, but three main computing frameworks including Storm, Spark and Flink. Due to historical reasons, the Storm platform currently has the largest number of jobs, but since last year, the focus of business has gradually shifted to Flink, so the number of apps on Flink has increased significantly this year.
The core business of real-time platform includes eight parts: Real-time recommendation, as a key business of e-commerce, contains multiple real-time features; The big Promotion Kanban board contains various dimensions of statistical indicators (such as: orders, UV, conversion rate, funnel, etc.) for leadership, operations, product decisions; Real-time data cleaning: collect data from users’ buried points for real-time cleaning and association to provide better data for downstream businesses; In addition, there are Internet finance, security risk control, price comparison with other suppliers, Logview, Mercury, Titan as the internal service monitoring system, VDRC real-time data synchronization system, etc.
In architectural design, there are two data sources. One is buried data in App, wechat, H5 and other applications. The original data is collected and sent to Kafka. The other is the MySQL Binlog with live data online. Data is cleaned and associated in the computing framework, and raw data is provided to downstream business applications (including off-line wide meters, etc.) for easier use through real-time ETL.
Ii. Flink’s practice in VIPSHOP
Scenario 1: Dataeye Real-time Kanban
Dataeye real-time kanban is to support the need for real-time calculation of all buried data, order data, etc., with the characteristics of large amount of data, and there are many statistical dimensions, such as the total station, secondary platform, department, schedule, crowd, activity, time dimension, etc., improve the complexity of calculation. Statistical data output indicators can reach hundreds of thousands per second.
Taking UV calculation as an example, the buried point data in Kafka is cleaned first, and then associated with Redis data, the associated data is written into Kafka; Subsequent Flink computing tasks consume Kafka’s associated data. Usually, the amount of calculation results of tasks is large (tens of millions due to the large number of calculation dimensions and indicators). Data output is buffered by Kafka and synchronized to HBase for real-time data display. Synchronization tasks limit the flow of data written to HBase and merge indicators of the same type to protect HBase. At the same time, there is another calculation scheme for disaster recovery.
In the calculation engine based on Storm, Redis is used as the intermediate state storage, while Flink has its own state storage after switching to Flink, saving storage space. Without Redis access, performance is improved and overall resource consumption is reduced by 1/3.
In the process of migrating computing tasks from Storm to Flink, the two schemes are migrated successively, and the computing tasks and synchronization tasks are separated, relieving the pressure of writing data to HBase.
Switching to Flink also required some tracking and improvement. For FlinkKafkaConsumer, the Aotu Commit in Kafka is modified for service reasons and the offset is set. You need to implement the kafka cluster switching function by yourself. You need to manually clear state data without Windows. There is also the problem of data skew, which is common to computing frameworks. Meanwhile, for the synchronization task chasing problem, Storm can take the value from Redis, and Flink can only wait.
Scenario 2: Kafka data is deployed to the HDFS
Spark Streaming used to be implemented. Now, you are switching to Flink and using OrcBucketingTableSink to ground buried data to Hive tables in HDFS. In Flink processing, the single Task Write can reach about 3.5K/s, the resource consumption is reduced by 90% after using Flink, and the delay is reduced from 30s to 3s. We are also working on Flink support for Spark Bucket tables.
Scenario 3: Real-time ETL
One pain point for ETL processing is that dictionary tables are stored in HDFS and are constantly changing, and real-time data flows need to be joined with dictionary tables. Dictionary tables change is caused by the offline batch processing tasks, the current approach is to use ContinuousFileMonitoringFunction and ContinuousFileReaderOperator monitor HDFS data change regularly, keep brush into the new data, Use the latest data to join real-time data.
We plan to make a more general way to support Hive table and stream join, and realize the effect of automatic data push after Hive table data changes.
Three, Flink On K8S
There are some different computing frameworks in VipSHOP, including real-time computing, machine learning and offline computing. Therefore, a unified underlying framework is needed for management, so Flink is migrated to K8S.
Cisco network components are used on K8S, and each Docker container has its own IP, which is also visible to the outside world. The overall architecture of the real-time platform fender is shown in the figure below.
Vipshop’s implementation plan on K8S is quite different from that provided by Flink community. Vipshop is deployed in K8S StatefulSet mode and internally implements some interfaces related to cluster. A job corresponds to a mini cluster and supports HA. For Flink, the biggest reason to use StatefulSet is that pod’s hostname is sorted; The potential benefits include:
1. When hostname is -0 or -1, pod can be directly specified as jobManager. You can start a cluster with one statefulset, whereas deployment must have two; Jobmanager and TaskManager are deployed separately.
- After pod fails for various reasons, cluster recover can theoretically be faster than Deployment (deployment hosts are randomly named each time) because the hostname of pod pulled back by StatefulSet stays the same.
The container’s entrypoint
The host name determines whether to start JobManager or TaskManager. Therefore, you need to check whether the host name of jobManager is the same in entryPoint. The passed parameter is “cluster HA “. Automatically start the role according to the host name; You can also specify the role name. The script content of docker-entrypoint.sh is as follows:
#! /bin/sh
# If unspecified, the hostname of the container is taken as the JobManager address
ACTION_CMD="The $1"
# if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager
if [ ${ACTION_CMD}= ="cluster" ]; then
jobmanagers=(${JOB_MANGER_HOSTS//,/ })
ACTION_CMD="taskmanager"
for i in The ${! jobmanagers[@]}
do
if [ "$(hostname -s)"= ="${jobmanagers[i]}" ]; then
ACTION_CMD="jobmanager"
echo "pod hostname match jobmanager config host, change action to jobmanager."
fi
done
fi
# if ha model, replace ha configuration
if [ "$2"= ="ha" ]; then
sed -i -e "s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
sed -i -e "s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
sed -i -e "s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
sed -i -e "s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
fi
if [ ${ACTION_CMD}= ="help" ]; then
echo "Usage: $(basename "$0") (cluster ha|jobmanager|taskmanager|local|help)"
exit 0
elif [ ${ACTION_CMD}= ="jobmanager" ]; then
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
echo "Starting Job Manager"
sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
sed -i -e "s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground cluster
elif [ ${ACTION_CMD}= ="taskmanager" ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
echo "Starting Task Manager"
sed -i -e "s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_CONF_DIR/flink-conf.yaml"
echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
exec "$FLINK_HOME/bin/taskmanager.sh" start-foreground
elif [ ${ACTION_CMD}= ="local" ]; then
echo "Starting local cluster"
exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground local
fi
exec "$@"
Copy the code
Entrypoint variable description
The docker entrypoint script needs to set the environment variable Settings:
Environment variable name | parameter | Example **** content | instructions |
---|---|---|---|
JOB_MANGER_HOSTS | StatefulSet.name-0,StatefulSet.name-1 | flink-cluster-0,flink-cluster-1 | JM host name, short host name; You don’t have to use FQDN |
FLINK_CLUSTER_IDENT | namespace/StatefulSet.name | default/flink-cluster | Used to do zK HA setup and HDFS checkpiont root directory |
TASK_MANAGER_NUMBER_OF_TASK_SLOTS | containers.resources.cpu.limits | 2 | TM slot number, set according to resources.cpu.limits |
FLINK_ZK_QUORUM | env:FLINK_ZK_QUORUM | ...: 2181 | Address of HA ZK |
JOB_MANAGER_HEAP_MB | env:JOB_MANAGER_HEAP_MBvalue:containers.resources.memory.limit -1024 | 4096 | JM Heap size, due to out of memory, need less than container. The resources. The memory. The limits; Otherwise easy OOM kill |
TASK_MANAGER_HEAP_MB | env:TASK_MANAGER_HEAP_MB value: containers.resources.memory.limit -1024 | 4096 | TM’s Heap size, due to the existence of Netty Heap memory, need less than container. Resources. The memory. The limits; Otherwise easy OOM kill |
Use ConfigMap to maintain the configuration
Other configurations on which the Flink cluster depends, such as HDFS, can be managed and maintained by creating configMap.
kubectl create configmap hdfs-conf –from-file=hdfs-site.xml –from-file=core-site.xml
[hadoop@flink-jm-0 hadoop]$ ll /home/vipshop/conf/hadoop total 0 lrwxrwxrwx. 1 root root 20 Apr 9 06:54 core-site.xml -> . data/core-site.xml lrwxrwxrwx. 1 root root 20 Apr 9 06:54 hdfs-site.xml -> .. data/hdfs-site.xmlCopy the code
Iv. Follow-up plan
The current real-time systems, machine learning platform to deal with the data distribution in various kinds of data storage components, such as Kafka, Redis, Tair and HDFS, how convenient and efficient access, processing, sharing these data is a big challenge, for the current data access and parse often requires a lot of energy, the main pain points include:
-
For binary (PB/Avro, etc.) data in Kafka, Redis and Tair, users cannot quickly and directly understand the schema and content of the data, and the cost of collecting the data content and communicating with the writer is very high.
-
Due to the lack of independent unified data system service, access to binary data in Kafka, Redis, Tair, etc., depends on the information provided by the writer, such as proto generation class, data format wiki definition, etc., which is costly to maintain and error-prone.
-
The lack of Relational Schema prevents users from developing businesses directly based on more efficient and easy-to-use SQL or LINQ layer apis.
-
Data cannot be easily published and shared through a separate service.
-
Real-time data cannot be directly supplied to the Batch SQL engine.
-
In addition, for most of the current data source access also lack of audit, permission management, access monitoring, tracking and other features.
The UDM includes modules such as Location Manager, Schema Metastore, and Client Proxy. Its main functions include:
-
Provides a name-to-address mapping service, where the consumer accesses data through an abstract name rather than a concrete address.
-
Users can easily view data Schema and probe data content through the Web GUI interface.
-
Provides Client API Proxy to support audit, monitoring, traceability and other additional functions.
-
In frameworks such as Spark/Flink/Storm, provide encapsulation of these data sources in the form most appropriate for use.
The following figure shows the overall architecture of the UDM.
UDM consumers include producers and consumers of data in real-time, machine learning, and offline platforms. When using Sql API or Table API, the Schema is registered first, and then Sql is used for development, which reduces the amount of development code.
The sequence diagram of Spark accessing Kafka PB data is used to illustrate the internal flow of the UDM
In Flink, UDMExternalCatalog is used to bridge the gap between Flink computing framework and UDM. By implementing various interfaces of ExternalCatalog and TableSourceFactory of respective data sources, Complete functions such as Schema and access control. At the same time, the functions of Flink SQL Client can be enhanced, and the UDM Schema can be queried by calling API to complete the generation and submission of SQL tasks.
For more information, please visit the Apache Flink Chinese community website