We have long used Spark as a framework for offline and near-real time computing, performing almost all of the computing tasks in our primary business. Flink has been on the rise lately, and we’ve seen through research and Meetup that Flink really has an edge over Spark in real-time computing. We’re getting into the real time business. There’s always going to be real time counting, real time recommendations.
The official documentation of Flink is more detailed, and the design concept is similar to Spark, so the cost of understanding is low. Flink cluster deployment many ways (Local/Standalone/YARN/K8s/Mesos, etc.), have been given the ready-made YARN and ZooKeeper cluster, so direct configuration Flink on YARN.
The following figure shows the basic principle of Flink on YARN, which is similar to Spark on YARN.
First, set the environment variables of the Hadoop path. Flink uses the environment variables to obtain HDFS and YARN configuration information.
~ vim /etc/profile
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH/lib/hadoop/etc/hadoop
Copy the code
Then edit flink-conf.yaml, which contains the basic configuration of Flink.
- Resource parameters
# JobManager heap memory jobmanager.heap.mb:1024Taskmanager.heap. MB:2048# on each TaskManager task slot number TaskManager. NumberOfTaskSlots:4Parallelism. Default:12
Copy the code
The preceding four parameters are only default configurations. You can use the corresponding command line parameters (-jm, -tm, -s, and -p) to modify the default configurations.
- High availability configuration
# Enable zK-based high availability-Availability: Zookeeper # ZK Address high-availability.zookeeper.quorum: ha1:2181,ha2:2181,ha3:2181# Flink root node high in ZK storage-availability.zookeeper.path.root: /Flink # JobManager metadata persistence location, must be reliable store high-availability.storageDir: hdfs://mycluster/flink/ha/The biggest attempts # # program startup should with YARN ApplicationMaster the biggest attempts (YARN. The resourcemanager. Am. Max-Attempts) same as yarn.application-attempts: 4
Copy the code
Note that YARN ApplicationMaster biggest attempts (YARN. The resourcemanager. Am. Max – attempts) the default value is only 2, fault-tolerant rate is low, the operations so to modify it as 4 in advance or more.
- StateBackend is configured by default
# Optional JobManager (JM itself)/Filesystem (external filesystem)/Parameter Description Value rocksdb (a provided Rocksdb database) state.backend: Filesystem # Checkpoints state.checkpoints.//mycluster/flink-Parameter Description Changing the value of a savepoints directory that I change: parameter Description Changing the value of a savepoints directory that I change//mycluster/flink-savepoints
Copy the code
Filesystem or rocksdb provides high reliability. For lightweight, logically uncomplex tasks, choose JobManager. Applications can also pass StreamExecutionEnvironment. SetStateBackend () method to specify.
- Additional JVM parameters
env.java.opts: -server -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError
Copy the code
This is similar to the extraJavaOptions in Spark-Submit.
Flink on YARN has two execution modes.
- Session mode: Run yarn-session.sh to create a continuous Flink Session in which JobManager, TaskManager and required resources have been allocated. Jobs are submitted to a Session.
- Single job mode: Use the Flink run script to submit one job at a time, set JobManager to yarn-cluster, and yarn allocates resources independently. This mode is similar to the yarn-cluster deployment mode of Spark-submit. This pattern is commonly used in production environments, and here is a sample script.
/opt/flink1.51./bin/Flink Run # Run in separate mode (-d)
--detached \# select JobManager from JobManager (-m)
--jobmanager yarn-cluster \# YARN Application name (-ynm)
--yarnname "test-calendar-rt" \# Number of allocated YARN Containers (-yn)
--yarncontainer 3 \# JobManager memory (-yjm)
--yarnjobManagerMemory 1024 \# TaskManager memory (-ytm)
--yarntaskManagerMemory 2048 \# number of task slots per TaskManager (-ys)
--yarnslots 3 \# Parallelism (-p)
--parallelism 9 \User application entry class (-c)
--class com.xyz.bigdata.rt.flink.CalendarRecordETLTest \
/var/projects/rt/flink-test0.1-jar-with-dependencies.jar
Copy the code