๐Ÿ“– 1. Introduction to Flink

  • Apache Flinkยฎ โ€” Stateful Computations over Data Streams
  • Apache Flink is a distributed big data processing engine that performs stateful calculations on bounded and unbounded data streams. Flink runs in all common cluster environments and can perform calculations at memory speed and at any scale
    • Website: flink.apache.org

1.1 Processing of unbounded and bounded data

  • Any type of data can form a stream of events. Credit card transactions, sensor measurements, machine logs, records of user interactions on websites or mobile apps, all of these data form a stream.

  • Data can be treated as a == unbounded == or == bounded == stream.

(1) An unbounded flow has the beginning of a defined flow, but does not define the end of the flow. They produce data endlessly. Unbounded stream data must be processed continuously, that is, immediately after it is ingested. We can't wait for all the data to arrive before processing it, because the input is infinite and will never be complete at any point in time. Processing unbounded data often requires ingesting events in a particular order, such as the order in which they occurred, so that the integrity of the results can be inferred. (2) A bounded flow has the beginning and end of a defined flow. Bounded flows can be calculated after all the data is ingested. Bounded streams all data can be sorted, so no sequential ingestion is required. Bounded stream processing is often referred to as batch processingCopy the code

  • Apache Flink specializes in handling unbounded and bounded data sets. Precise time control and stateization enable Flink’s runtime to run any application that deals with unbounded flow. Bounded streams are internally processed by algorithms and data structures specially designed for fixed-size data sets, resulting in excellent performance.

1.2 Deploying Applications anywhere

  • Apache Flink is a distributed system that requires computing resources to execute applications. Flink integrates with all the usual cluster resource managers, such as Hadoop YARN, Apache Mesos, and Kubernetes, but can also run as a standalone cluster.

    Flink is designed to work well with each of the above resource managers through a resource-manager-specific deployment pattern. Flink can interact in a way that is appropriate for the current resource manager. When a Flink application is deployed, Flink automatically identifies the required resources based on the parallelism of the application configuration and requests these resources from the resource manager. In the event of a failure, Flink replaces the failed container by requesting a new resource. All communication for submitting or controlling an application is done through REST calls, which simplifies Flink's integration with a variety of environmentsCopy the code

1.3 Running applications of any scale

  • Flink is designed to run stateful streaming applications on any scale. As a result, applications are parallelized to possibly thousands of tasks, which are distributed across clusters and executed concurrently. So applications can take advantage of endless CPU, memory, disk, and network IO. And Flink makes it easy to maintain very large application states. Its asynchronous and incremental checkpoint algorithms minimize the processing delay while ensuring the consistency of the exact primary state.

    Flink users report some impressive scalability numbers in their production environments: processing trillions of events per day can maintain terabyte states that can deploy clusters of thousands of nodesCopy the code

1.4 Utilize memory performance

  • The stateful Flink program is optimized for local state access. The task’s == state is always kept in memory ==, and if the state size exceeds the available memory, it is kept in an efficiently accessible disk data structure. Tasks do all their calculations by accessing local (usually in-memory) state, resulting in very low processing latency. Flink stores local state periodically and asynchronously to ensure state consistency in fault scenarios.

๐Ÿ“– 2. Application scenarios of Flink

In the actual production process, a large number of data are constantly generated, such as financial transaction data, Internet order data, GPS positioning data, sensor signals, data generated by mobile terminals, communication signal data, as well as the familiar network traffic monitoring, server generated log data. The most common feature of these data is that they are generated in real time from different data sources and then transmitted to downstream analysis systems. These data types mainly include real-time intelligent recommendation, complex event processing, real-time fraud detection, real-time data warehouse and ETL type, flow data analysis type, real-time report type and other real-time business scenarios, and Flink has very good support for these types of scenarios.Copy the code
  • 1. Real-time intelligent recommendation

    Intelligent recommendation can predict what users are likely to buy in the future by training the model with recommendation algorithms based on their historical purchase behavior. For individuals, the recommendation system plays the role of information filtering; for Web/App servers, the recommendation system plays the role of satisfying users' personalized needs and improving user satisfaction. Recommendation system itself is also developing rapidly. In addition to more and more perfect algorithms, the requirements for time delay are also more and more demanding and real-time. Using Flink flow calculation to help users to build more real intelligent recommendation system, real-time calculation of user behavior index, the real-time updating of the model, the real-time user indicators, and push the forecast information to Wep/App side, help users want commodity information, on the other hand also help enterprises to increase sales, Create greater business value.Copy the code
  • 2. Complex event processing

    For complex event processing, common cases are mainly concentrated in the industrial field, such as real-time fault detection of vehicle sensors and mechanical equipment. These business types usually have a large amount of data and high requirements on the timeliness of data processing. By using THE CEP (Complex Event Processing) provided by Flink to extract the event mode, and using Flink Sql to convert the event data, a real-time rule engine is built in the streaming system. Once the event triggers the alarm rule, the alarm result is immediately transmitted to the downstream notification system. In order to achieve the equipment fault fast warning and monitoring, vehicle status monitoring and other purposes.Copy the code
  • 3. Real-time fraud detection

    In recent years, many financial companies and banks face the common challenge of how to ensure the security of users' and companies' funds. With the development of fraud methods, traditional anti-fraud methods are not enough to solve the current problems. In the past, it may take several hours to calculate the user's behavior indicators through the transaction data, and then identify the users suspected of fraudulent behavior through the rules, and then investigate and deal with the case. In this case, the funds may have been transferred by illegal elements, thus causing a lot of economic losses to the enterprise and users. And use Flink streaming computing technology can in milliseconds to complete assessment of fraud behavior index calculation, and then real-time rule judgment or model projections for trading water, once detected transactions suspected of fraud, so trade directly with the real-time interception, avoid economic losses because of slow processing.Copy the code
  • 4. Real-time data warehouse and ETL

    Combined with offline data warehouse, real-time cleaning, merging and structured processing of streaming data are carried out by using many advantages of streaming computing and flexible processing ability of SQL, so as to supplement and optimize the offline data warehouse. On the other hand, combined with the real-time data ETL processing ability, the stateful streaming computing technology can reduce the complexity of scheduling logic in the offline data calculation process as much as possible, process the statistical results needed by enterprises efficiently and quickly, and help enterprises better apply the results analyzed by real-time data.Copy the code
  • 5. Flow data analysis

    Real-time calculation of all kinds of data indicators, and timely adjustment of online system strategies using the real-time results, has a large number of applications in the field of various content delivery, wireless intelligent push. Streaming computing technology makes the data analysis scene real-time, helping enterprises to make real-time analysis of various indicators of Web applications or App applications, including App version distribution, Crash detection and distribution, etc. Meanwhile, it also provides multi-dimensional user behavior analysis and supports independent log analysis. Help developers to realize fine-grained operations based on big data technology, improve product quality and experience, and enhance user engagement.Copy the code
  • 6. Real-time report analysis

    Real-time report analysis is one of the report statistics schemes adopted by many companies in recent years, among which the most important application is real-time large screen display. The real-time results obtained by streaming calculation are directly pushed to the front-end application to display the changes of important indicators in real time. The most typical case is taobao's "Double 11" activity. In the annual "Double 11" shopping festival, apart from the crazy shopping, the most eye-catching thing is the constantly jumping transaction amount on tmall's big screen. In the whole computing link, including ordering and purchasing of Tmall transactions, data collection, data calculation and data verification, the whole link time compression displayed on the big screen of Singles' Day is less than 5 seconds, and the peak computing performance is as high as 300,000 orders/second. The calculation backup of multiple link streams ensures that nothing goes wrong. In other industries, enterprises are also building their own real-time reporting systems, so that enterprises can rely on their own business data, quickly extract more data value, so as to better serve the operation process of enterprises.Copy the code

๐Ÿ“– 3. Flink Basic technology stack

In the entire software architecture system of Flink. It also follows the concept of layered architecture design, which not only reduces the coupling degree of the system, but also provides rich and friendly interfaces for upper-layer users to build FLink applications.

  • ==API & libraries layer ==
As a distributed data processing framework, FINK provides interfaces supporting both stream and batch computing, and abstracts different application types of component libraries on this basis. For example, CEP (Complex Event Processing library), SQL&Table library, FlinkML(machine learning library), Gelly(Graph processing library) have streaming API, batch API. Stream processing supports event handling and table manipulation. Batch, supports machine learning, graph computation, and table manipulation.Copy the code
  • ==Runtime core layer ==
This layer is responsible for providing basic services to the interfaces of the upper layer, which is the core implementation of Flink distributed computing. The underlying execution engine of Flink.Copy the code
  • == Physical deployment layer ==
This layer mainly involves the deployment mode of Flink. Currently, Flink supports a variety of deployment modes: local cluster Standalone/YARN cloud GCE/EC2 Google cloud, Amazon cloud KubenetesCopy the code

๐Ÿ“– 4. Basic architecture of Flink

The whole Flink system is mainly composed of two components, namely JobManager and TaskManager. Flink architecture also follows the master-slave architecture design principle. JobManager is the Master node. TaskManager is the Worker (Slave) node. All components communicate with each other using the Akka Framework, including the status of tasks and Checkpoint triggers

  • Client

    The client is responsible for submitting the task to the cluster, establishing Akka connection with JobManager, and then submitting the task to JobManager, and obtaining the task execution status through interaction with JobManager. The client can submit tasks using the CLI or the Flink WebUI. Alternatively, you can specify the RPC network port of the JobManager to build an ExecutionEnvironment and submit Flink applications.Copy the code
  • JobManager

    JobManager is responsible for task scheduling and resource management of the entire Flink cluster, obtains the submitted application from the client, and then according to the TaskSlot usage of the TaskManager in the cluster, Assign TaskSlot resources to the submitted application and command TaskManager to launch the application obtained from the client. The JobManager is the Master node of the entire cluster, and only one active JobManager is responsible for task management and resource management of the entire cluster. JobManager and TaskManager communicate with each other through Actor System to obtain the task execution status and send the task execution status to the client. During task execution, Flink JobManager triggers the Checkpoint operation. After each TaskManager receives the Checkpoint command, the TaskManager completes the Checkpoint operation. All Checkpoint coordination is done in Fink JobManager. When the task is complete, Flink reports the task execution information back to the client and frees up resources in The TaskManager for the next submission of the task.Copy the code
  • TaskManager

    TaskManager functions as a Slave node in the entire cluster and is responsible for executing specific tasks and applying for and managing resources on each node for corresponding tasks. The client compiles and packages the written Flink application and submits it to JobManager. Then JobManager assigns tasks to TaskManager nodes with resources according to the resources of TaskManager registered in JobManager. Then start and run the task. The TaskManager receives the tasks to be deployed from JobManager, starts the Task using Slot resources, establishes a network connection for data access, receives the data, and begins data processing. The taskManagers interact with each other through data flows. It can be seen that Flink tasks are run in a multi-threaded way, which is very different from MapReduce multi-JVM. Flink can greatly improve the efficiency of CPU usage, and share system resources between multiple tasks and tasks by way of TaskSlot. Each TaskManager manages multiple TaskSlot resource pools to effectively manage resources.Copy the code

๐Ÿ“– 5. Flink source code compilation (understand)

  • We can compile flink’s source code and adapt it to our various hadoop versions

  • See also: blog.csdn.net/h335146502/…

CD/KKB/soft compilation flink wget tar ZXVF - https://github.com/apache/flink-shaded/archive/release-7.0.tar.gz - shaded pack Flink-shaded - release-7.00.tar. gz -c/KKB /install/ CD/KKB /install/flink-shaded-release-7.0/ MVN clean install-dskiptests - Dhadoop. Version = server - cdh5.14.2 compile flink wget source code for http://archive.apache.org/dist/flink/flink-1.9.2/flink-1.9.2-src.tgz Tar -zxf flink-1.9.2-src. TGZ -c/KKB /install/ CD/KKB /install/flink-1.9.2/ mvn-t2c clean install-dskiptests-dfast - Pinclude - hadoop - Pvendor - repos - Dhadoop. Version = server - cdh5.14.2Copy the code

๐Ÿ“– 6. Install in Local mode

  • 1. Install JDK and configure JAVA_HOME. Jdk1.8 or above is recommended

  • 2. Installation package download address:

    Mirrors.tuna.tsinghua.edu.cn/apache/flin…

  • 3. Directly upload the installation package to the server

  • 4. Decompress the installation package and configure environment variables

    Tar -zxf flink-1.9.2-bin-scala_2.11.tgz -c/KKB /install/ Configure the environment variable sudo vim /etc/profile export FLINK_HOME = / KKB/install/flink - 1.9.2 export PATH = : $FLINK_HOME/bin: $PATHCopy the code
  • 5. Start the service

    • Local mode, do not need to configure any configuration items, directly start the server

      CD/KKB/install/flink - 1.9.2#Start the flink
      bin/start-cluster.sh 
      #Stop flink
      bin/stop-cluster.sh 
      Copy the code
  • 6. Web page browsing

    • http://node01:8081/#/overview

๐Ÿ“– 7. Standalone mode Installation

  • (1) Cluster planning
The host name JobManager TaskManager
node01 is is
node02 is is
node03 is
  • (2) dependence

    • If the value is higher than JDK1.8, configure JAVA_HOME
    • No password is required between hosts
  • (3) Installation steps

Node01 Modify the following configuration files: (a) Modify conf/flink-conf.yaml
#The jobmanager address
jobmanager.rpc.address: node01
#Use ZooKeeper to set up high availability
high-availability: zookeeper
## Store JobManager metadata to HDFShigh-availability.storageDir: hdfs://node01:8020/flink high-availability.zookeeper.quorum: Node02 node01:2181:2181, node03:2181 (b) modify the conf/slaves node01 node02 node03 (c) modify the conf/masters node01:8081 node02:8081 Flink-shaded - hadoven-2-uber-27.5-10.0.jar is applied to the lib directory of Flink, shaded- hadoven-2-uber-27.5-10.0.jar is applied to flink-shaded- hadoven-2-uber-27.5-10.0.jar This jar package is uploaded to flink installation directory lib (e) and copied to other nodes scp-r/KKB /install/ flink-1.9.2node02 :/ KKB /install scp-r/KKB /install/flink-1.9.2 Node03 :/ KKB /install (f) : node01(JobMananger) Node startup CD/KKB /install/flink-1.9.2 bin/start-cluster.sh (g) : Visit http://node01:8081 http://node02:8081 (h) : Shut down the Flink cluster and run CD/KKB /install/flink-1.9.2 bin/stop-cluster.sh on the active nodeCopy the code
  • (4) The parameters to consider in StandAlone mode
Jobmanager. Heap. MB: jobmanager of memory available to the nodes of the taskmanager. Heap. MB: taskmanager of memory available to the nodes of the taskmanager. NumberOfTaskSlots: Number of available cpus on each TaskManager node Parallelism. Default: task parallelism by default TaskManager.tmp. dirs: temporary data storage directory of taskManagerCopy the code

๐Ÿ“– 8. Install Flink on Yarn mode

  1. Install Hadoop (YARN) first
  2. Upload a Flink package and configure the Hadoop environment variables
  • There are two modes of flink on YARN

8.1 The first method

  • Centralized memory management mode (Yarn Session)

    • After a Flink cluster is initialized in Yarn, the specified resources are created, and all Flink Jon submitted are in this Flink yarn-session. That is, no matter how many jobs are submitted, these jobs share the resources applied for in Yarn at the beginning. The Flink cluster will reside in the Yarn cluster unless manually stopped.

8.2 The second way

  • Memory Job management mode == [yarn-cluster recommended] ==
    • In Yarn, a new Flink cluster is created each time a job is submitted. The tasks are independent and easy to manage. The cluster created after the task is complete disappears.

8.3 Task Submission in different Modes

  • The first mode

    • Yarn-session. sh(create resources) + Flink run(submit tasks)

      • 1. Start yarn-session in the Flink directory

        bin/yarn-session.sh -n 2 -tm 1024 -s 1 -d
        
        #-n applies for two containers.
        #-s Indicates how many slots are started for each container
        #-tm Indicates that each TaskManager applies for 1024 MB memory
        #-d indicates running in the background program mode
        Copy the code
      • 2. Submit the task using the Flink script

        bin/flink run examples/batch/WordCount.jar \
        -input  hdfs://node01:8020/words.txt \
        -output hdfs://node01:8020/output/result.txt
        Copy the code
      • 3. Stop the task

        yarn application -kill application_1587024622720_0001
        Copy the code
  • The second model

    • Flink run -m yarn-cluster (Create resource + submit task)

      • 1. Start the cluster and execute tasks

        bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 \ examples/batch/WordCount.jar \ -input HDFS: / / node01:8020 / words. TXT \ - output HDFS: / / node01:8020 / output1 note: The YARN_CONF_DIR or HADOOP_CONF_DIR or HADOOP_HOME environment variable must be set on the client to read YARN and HDFS configuration information. Otherwise, the startup fails.Copy the code
  • Help information

    • Yarn-session. sh Specifies the script parameter
Usage: Mandatory -n,--container <arg> How many YARN containers to allocate (= number of TaskManager) Optional -d <arg> Dynamic properties -d,--detached run -jm,--jobManagerMemory <arg> JobManager memory [in MB] -nm,--name Sets a name for a custom application on YARN -q,--query Displays the available resources in YARN (memory, -s,--slots <arg> Specifies the number of slots used by each TaskManager. -tm,--taskManagerMemory <arg> [in MB] -z,--zookeeperNamespace <arg> Create NameSpace id on ZooKeeper for HA mode,--applicationId <yarnAppId> The TASK ID of the YARN cluster is attached to a background YARN SessionCopy the code
  • Flink Run script parameters
Run [OPTIONS] <jar-file> <arguments> "run" -c,--class <classname> -m,-- jobManager <host:port> specifies the address of the jobManager (primary node) to connect to. Use this parameter to specify a different jobManager -p from the configuration file,--parallelism <parallelism> specifies the degree of parallelism in the program. You can override the default values in the configuration file. By default, jobManager [/ TMP /. Yarn-properties-root] in the existing yarn-session information in the current YARN cluster is searched:  ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output HDFS: / / hostname: port/result1 connections specify host and port jobmanager:  ./bin/flink run -m node01:6123 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output HDFS: / / hostname: port/result1 starting a new yarn - the session:  ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output HDFS: / / hostname: port/result1 note: yarn session command line options can also be used. / bin/flink tools available. They all have one y prefix or yarn Flink, for example, the run - m yarn - cluster - yn 2 examples/batch/WordCount. JarCopy the code

8.4 Flink on YARN Cluster Deployment

  • (1) The operation principle of Flink on YARN

  • In fact, Flink on YARN deployment is very simple, as long as the Hadoop cluster is deployed, we only need to deploy a Flink client, and then submit the Flink task from the Flink client. The mode is similar to Spark on YARN.

๐Ÿ“– 9. Introductory case demonstration

9.1 Real-time Requirement Analysis

Real-time Statistics Indicates the number of words that appear in the latest 2 seconds every one secondCopy the code
  • Create a Maven project and add POM dependencies
 <properties>
        <flink.version>1.9.2</flink.version>
        <scala.version>2.11.8</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - streaming - java_2. 11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - streaming - scala_2. 11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>* : *</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
        
Copy the code
9.1.1 Real-time Code Development (Scala version)
  • Code development

    package com.kaikeba.demo1
    
    import org.apache.flink.streaming.api.scala.{DataStream.StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /** * Use the sliding window * count the number of occurrences of each word in the last 2 seconds */
    object FlinkStream {
    
      def main(args: Array[String) :Unit = {
          // Build the environment for the flow processing
            val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
         // Get data from socket
           val sourceStream: DataStream[String] = env.socketTextStream("node01".9999)
    
         // Import the implicit transformation package
          import org.apache.flink.api.scala._
    
         // Process the data
         val result: DataStream[(String.Int)] = sourceStream
              .flatMap(x => x.split("")) // Divide by space
              .map(x => (x, 1))           // Each word counts as 1
              .keyBy(0)                   // Group the words with subscript 0
              .timeWindow(Time.seconds(2),Time.seconds(1)) // Process 2s data every 1s
              .sum(1)            // Add the number of occurrences of the same word with subscript 1
    
            // Print the data
            result.print()
    
            // Start the task
             env.execute("FlinkStream")}}Copy the code
  • Sending socket data

    #Install the NC service on node01
    sudo yum -y install nc
    nc -lk 9999
    Copy the code
  • Create a JAR package and submit it to YARN to run

    flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c com.kaikeba.demo1.FlinkStream The original - flink_study - 1.0 - the SNAPSHOT. The jarCopy the code
9.1.2 Real-time Code Development (Java version)
  • Code development

    package com.kaikeba.demo1;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    /** * Java code development real-time statistics every 1 second count the number of words in the last 2 seconds */
    public class WindowWordCountJava {
        public static void main(String[] args) throws Exception {
    
            // Step 1: Get the streaming environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // Step 2: Obtain socket data
            DataStreamSource<String> sourceDstream = env.socketTextStream("node01".9999);
    
            // Step 3: Process the data
            DataStream<WordCount> wordAndOneStream = sourceDstream.flatMap(new FlatMapFunction<String, WordCount>() {
                public void flatMap(String line, Collector<WordCount> collector) throws Exception {
                    String[] words = line.split("");
                    for (String word : words) {
                        collector.collect(new WordCount(word, 1L)); }}}); DataStream<WordCount> resultStream = wordAndOneStream .keyBy("word")  // Group by word
          .timeWindow(Time.seconds(2), Time.seconds(1)) // Count the data of 2s every 1s
          .sum("count");   // Add the result according to the count field
    
      // Step 4: Print the result
      resultStream.print();
    
       // Step 5: Start the task
      env.execute("WindowWordCountJava");
        }
    
    
        public static class WordCount{
            public String word;
            public long count;
            // Remember to have the empty build
            public WordCount(a){}public WordCount(String word,long count){
                this.word = word;
                this.count = count;
            }
    
            @Override
            public String toString(a) {
                return "WordCount{" +
                        "word='" + word + '\' ' +
                        ", count=" + count +
                        '} '; }}}Copy the code
  • Sending socket data

    #Run the command on node01 to send data
    nc -lk 9999
    Copy the code

9.2 Offline Requirement Analysis

Count the number of occurrences of each word in the file.Copy the code
9.2.1 Offline Code Development (Scala)
package com.kaikeba.demo1

import org.apache.flink.api.scala.{ DataSet.ExecutionEnvironment}

/** * Scala develops flink's batch program */
object FlinkFileCount {
  def main(args: Array[String) :Unit = {

     // Todo :1, build Flink batch environment
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //todo:2
    val fileDataSet: DataSet[String] = env.readTextFile("d:\\words.txt")

     import org.apache.flink.api.scala._

    //todo: 3
      val resultDataSet: AggregateDataSet[(String.Int)] = fileDataSet
                                                .flatMap(x=> x.split(""))
                                                .map(x=>(x,1))
                                                .groupBy(0)
                                                .sum(1)

    //todo: 4
        resultDataSet.print()

    //todo: 5. Save the result to a file
       resultDataSet.writeAsText("d:\\result")

       env.execute("FlinkFileCount")}}Copy the code

๐Ÿ“– 10. Flink parallelism &Slot&Task

Each TaskManager of Flink provides solT for the cluster. Each Task Slot represents a fixed-size subset of resources for the TaskManager. The number of Solts is usually proportional to the number of CPU cores available per TaskManager node. Normally your slot number is the number of CPU cores per node.Copy the code

10.1 the parallelism

A Flink program consists of multiple tasks (Source, Transformation, and Sink). A task is executed by multiple parallel instances (threads), and the number of parallel instances (threads) of a task is called the parallelism of the task.

10.2 Setting of parallelism

  • The parallelism setting for a task can be specified at multiple levels
    • Operator Level
    • Execution Environment Level
    • Client Level
    • System Level
  • The priority of these parallelism degrees is
    • Operator Level > Execution Environment Level > Client Level > System Level
10.2.1 Operator level

10.2.2 Executing the Environment Level

โ€‹

10.2.3 Client Level
  • The parallelism can be set when the client submits the job to Flink. For the CLI client, you can specify the parallelism by using the -p parameter

    bin/flink run -p 10 examples/batch/WordCount.jar
    Copy the code
10.2.4 System Level
  • At the system level, you can specify the default parallelism for all execution environments by setting the ==parallelism. Default == property in the ==flink-conf.yaml== file

    parallelism.default: 1
    Copy the code

10.3 Demonstration of parallelism operation

  • To facilitate testing and observing task parallelism information locally, you can add the following dependencies to the local project

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>Flink - runtime - web_2. 11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    Copy the code
  • case

    • Note that the execution environment of the fetch program has changed
    • ==val environment=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()==
    package com.kaikeba.demo1
    
    import org.apache.flink.core.fs.FileSystem.WriteMode
    import org.apache.flink.streaming.api.scala.{DataStream.StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /** * local debug parallelism */
    object TestParallelism {
    
      def main(args: Array[String) :Unit = {
    
         / / use createLocalEnvironmentWithWebUI method, constructs the local flow processing environment
        val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
         // Execute the environment level
        //environment.setParallelism(4)
    
        import org.apache.flink.api.scala._
    
        // Accept socket data
        val sourceStream: DataStream[String] = environment.socketTextStream("node01".9999)
    
        val countStream: DataStream[(String.Int)] = sourceStream
                            .flatMap(x => x.split("")).setParallelism(5) // Operator level
                            .map(x => (x, 1))
                            .keyBy(0)
                            .timeWindow(Time.seconds(2), Time.seconds(1))
                            .sum(1)
    
        countStream.print()
    
        environment.execute()
    
      }
    }
    
    Copy the code
    • Set the parallelism and observe the localhost:8081 interface