preface

Ok, we will introduce the operator operation of Flink in this article

Flink is easy to download, decompress, go to Flink’s bin directory, start start-cluster.sh, and then visit localhost:8081 to access its beautiful pages


To stop the cluster, run the stop-cluster.sh command

1.1 (Supplement) Flink Shell use

For beginners, development is prone to errors. If you package debugging every time, it will be troublesome and difficult to locate problems. You can use the Scala shell command line for debugging

The Scala shell approach supports streaming and batch processing. When the shell command line is started, two different ExecutionEnvironments are automatically created. Senv (Stream) and BENV (Batch) are used to process Stream and Batch programs, respectively. (Similar to sc variable in spark-shell)

bin/start-scala-shell.sh [local|remote|yarn] [options] <args>

Copy the code


If we encounter the above Error, we can look at the Error message, which says we need to confirm the mode of execution, so we need to take this part of the argument, there are three different ways to specify, respectively

[local | remote <host> <port> | yarn]

Copy the code

So let’s try it. Let’s start with local

[root@node1 bin]# ./start-scala-shell.sh local

Copy the code

At this point, I specify the mode local, and it opens successfully

Is…?

… 🤣, I now believe that you are also likely to happen, the error “Could not create the DispatcherResourceManagerComponent”

To fix the problem, go to CD /usr/local/flink-1.10.0/conf and add a parameter

After modifying the port, you can run successfully

The remote and ON YARN modes are similar. You can also start them if you like

[root@node1 bin]/start-scala-shell.sh remote 192.168.200.11 8081

Copy the code

At this time we successfully started up, moved

It also shows us two examples of Flink’s batch processing and real-time processing.

Of course, it’s not that important, because flink-shell is nowhere near as good as Spark-shell, so we’ll just try and get past it.

Remember what we said at that time, to understand a real-time program, we mainly need to understand three aspects, data source, data processing and data output, so let’s take a look at the data source of Flink

1.2.1 Real-time Source Introduction

Source is the data input program, you can use the StreamExecutionEnvironment. AddSource (sourceFunction) to add a source for your program.

Flink provides a large number of implemented source methods, you can also customize the source (there will be a corresponding small demo, directly copy your IDEA to run) :

  1. Customize a parallelism free source by implementing the sourceFunction interface

  2. By implementing ParallelSourceFunction interface or inherit RichParallelSourceFunction customize the parallelism of the source

But for the most part, we’ll just use our own source.

1.2.2 Method of obtaining source

1. Based on files

readTextFile(path)

Read text files, which follow TextInputFormat reading rules, line by line and return.

Copy the code

2. Based on the socket

socketTextStream

To read data from the socket, the element can be split with a delimiter.

Copy the code

3. Set based

fromCollection(Collection)

Create a data flow from a Java collection, in which all elements must be of the same type.

Copy the code

4. Customize input

AddSource implements reading data from third-party data sources

The system provides a batch of connectors built-in, and the corresponding connectors will be providedsourceSupport [kafka]

Copy the code

There are other sources mentioned in the official website, but the focus is on Kafka, so learn more

  • Apache Kafka (source/sink
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)

1.2.3 Collection of data sources (Code can be directly copied and run)

public class StreamingSourceFromCollection {

    public static void main(String[] args) throws Exception {

        // Step 1: Get environment variables

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Step 2: Simulate data

        ArrayList<String> data = new ArrayList<String> ();

        data.add("hadoop");

        data.add("spark");

        data.add("flink");

        // Step 3: Get the data source

        DataStreamSource<String> dataStream = env.fromCollection(data);

        // Step 4: Transformation

        SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<String.String> () {

            @Override

            // Simply iterate over the data

            public String map(String word) throws Exception {

                return "testCollection_" + word;

            }

        });

        // Step 5: Process the result (print)

        addPreStream.print().setParallelism(1);

        // Step 6: Start the program

        env.execute("StreamingSourceFromCollection");



    }

}

Copy the code

The output

1.2.4 Custom single parallelism data source (code can be directly copied and run)

Simulate a data source that generates one data every second

/ * *

* Function: generate one data per second

* /


public class MyNoParalleSource implements SourceFunction<Long{

    private long number = 1L;

    private boolean isRunning = true;

    @Override

    public void run(SourceContext<Long> sct) throws Exception {

        while (isRunning){

         sct.collect(number);

         number++;

         // Generate one data per second

         Thread.sleep(1000);

        }



    }



    @Override

    public void cancel(a) {

        isRunning=false;

    }

}

Copy the code

At this point, we perform processing on the data source, which is also very simple, a map operation and a filter operation, filter is to select the even number

/ * *

* Function: Get data from a custom data source and filter out even numbers

* /


public class StreamingDemoWithMyNoPralalleSource {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        // Receive the data source

        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);

        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {

            @Override

            public Long map(Long value) throws Exception {

                System.out.println("Received data:"+value);

                return value;

            }

        });

        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long number) throws Exception {

                return number % 2= =0;

            }

        });



        filterDataStream.print().setParallelism(1);

        env.execute("StreamingDemoWithMyNoPralalleSource");

    }

}

Copy the code

And the result of that is

1.2.5 Customizing multiple parallelism data sources

/ * *

* Generates one data per second

* /


public class MyParalleSource implements ParallelSourceFunction<Long{

    private long number = 1L;

    private boolean isRunning = true;

    @Override

    public void run(SourceContext<Long> sct) throws Exception {

        while (isRunning){

            sct.collect(number);

            number++;

            // Generate one data per second

            Thread.sleep(1000);

        }



    }



    @Override

    public void cancel(a) {

        isRunning=false;

    }

}

Copy the code

Here we can see that we just implement a different interface and then set the parallelism in the business code

public class StreamingDemoWithMyPralalleSource {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        SetParallelism (2) sets parallelism 2

        DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);

        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {

            @Override

            public Long map(Long value) throws Exception {

                System.out.println("Received data:"+value);

                return value;

            }

        });

        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long number) throws Exception {

                return number % 2= =0;

            }

        });



        filterDataStream.print().setParallelism(1);

        env.execute("StreamingDemoWithMyNoPralalleSource");

    }

}

Copy the code

1.3 Common Transformation operators of Flink

1.3.1 Map and Filter (just demonstrated)

1.3.2 flatMap, keyBy, sum, union (basically the same as Spark)

1.3.3 connect, MapFunction and coMapFunction

The connect operation is not available on Spark, so if you look at it a little bit, it is similar to Union, but it can only connect two streams, the data types of the two streams can be different, and different processing methods are applied to the data in the two streams. The difference between CoMapFunction and MapFunction is that the data processing of one stream is changed to two streams (note that it can only be two).

public class ConnectionDemo {

    public static void main(String[] args) throws Exception {

        // Get Flink's operating environment

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        // Get the data source

        // Note: For this source, parallelism can only be set to 1

        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);



        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);



        SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String> () {

            @Override

            public String map(Long value) throws Exception {

                // Here is the second data source, string I added a prefix str_

                return "str_" + value;

            }

        });

        ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);

        SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String.Object> () {

            @Override

            public Object map1(Long value) throws Exception {

                // Business can be processed here

                return value;

            }

            @Override

            public Object map2(String value) throws Exception {

                // Business processing can also be done here

                return value;

            }

        });



        // Print the result

        result.print().setParallelism(1);

        String jobName = ConnectionDemo.class.getSimpleName();

        env.execute(jobName);

    }

}

Copy the code

In the output result, two streams of data may not be one for each other, but one stream may be the first to enter multiple streams

1.3.4 the Split and Select

The purpose of this is to split a data stream into multiple data streams

May be in the practical work, mixed with a variety of similar data source data stream, multiple types of data processing rules are different, so can according to certain rules, cut a data stream into multiple data stream, so that each data flow can use need not processing logic, and the select is help us to give different flow to extracted a role

public class SplitDemo {

    public static void main(String[] args) throws  Exception {

        // Get Flink's operating environment

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Get the data source

        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);// Note: For this source, parallelism can only be set to 1

        // The data is divided according to the parity of the data

        SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {

            @Override

            public Iterable<String> select(Long value
{

                ArrayList<String> outPut = new ArrayList<>();

                if (value % 2= =0) {

                    outPut.add("even");/ / even

                } else {

                    outPut.add("odd");/ / odd

                }

                return outPut;

            }

        });



        // Select one or more shard streams

        DataStream<Long> evenStream = splitStream.select("even");

        DataStream<Long> oddStream = splitStream.select("odd");

        DataStream<Long> moreStream = splitStream.select("odd"."even");



        // Print the result, at this point I select all even numbers

        evenStream.print().setParallelism(1);

        String jobName = SplitDemo.class.getSimpleName();

        env.execute(jobName);

    }

}

Copy the code

The results

1.4 Common sink operator of Flink

In fact, the output of data is relatively simple, I think this thing may not need to be combined with the code to expand, roughly go through it

1.4.1 the print () and printToErr ()

Prints the value of each element’s toString() method to the standard output or standard error output stream

1.4.2 writeAsText ()

/ * *

* Data source: 1 2 3 4 5..... Keep coming

* Print the received data through map

* To filter the data, we only need even numbers

* /


public class WriteTextDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);

        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {

            @Override

            public Long map(Long value) throws Exception {

                System.out.println("Received data:"+value);

                return value;

            }

        });

        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long number) throws Exception {

                return number % 2= =0;

            }

        });



    // If you do not have a cluster, you can specify a local path and write it to a file

   filterDataStream.writeAsText("your path").setParallelism(1);

        env.execute("StreamingDemoWithMyNoPralalleSource");

    }

}

Copy the code

1.4.3 User-defined Sink

Except for the following ones we mentioned above

  • Apache Kafka (source/sink
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)

Of course, I still have the status of writing data into Redis. At this point, we need to introduce a dependency first

<dependency>

    <groupId>org.apache.bahir</groupId>

    <artifactId>Flink - connector - redis_2. 11</artifactId>

    <version>1.0</version>

</dependency>

Copy the code

If you want to know about Redis partners can go to the past rookie tutorial such a website to have a look, the following code has been annotated

/ * *

* Write data to Redis

* /


public class SinkForRedisDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.socketTextStream("xxx", xxx, "\n");

        //lpush l_words word

        Tuple2 < string, string >

        DataStream<Tuple2<String.String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String.String> > () {

            @Override

            public Tuple2<String.String> map(String value) throws Exception {

                return new Tuple2<>("l_words", value);

            }

        });

        // Create the redis configuration

        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(xxx).setPort(xxx).build();



        / / create redissink

        RedisSink<Tuple2<String.String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());

        l_wordsData.addSink(redisSink);

        env.execute("StreamingDemoToRedis");



    }



    public static class MyRedisMapper implements RedisMapper<Tuple2<String.String>> {

        // Get the redis key from the received data

        @Override

        public String getKeyFromData(Tuple2<String.String> data) {

            return data.f0;

        }

        // Get the redis value from the received data

        @Override

        public String getValueFromData(Tuple2<String.String> data) {

            return data.f1;

        }



        @Override

        public RedisCommandDescription getCommandDescription() {

            return new RedisCommandDescription(RedisCommand.LPUSH);

        }

    }

}

Copy the code

1.5 Operators in batch processing

Flink’s batch processing is not very good and is rarely used in enterprise development. However, the team responsible for it is very diligent, so it is expected to become better in the near future. In fact, it is just a review of our previous Spark-Core functions

1.5.1 source

Based on the file

readTextFile(path)

Copy the code

Based on the collection

fromCollection(Collection)

Copy the code

1.5.2 the transform

Overview of operators:

  • Map: Enter an element, then return an element, and do some cleaning and conversion in between

  • FlatMap: Enter an element and return zero, one or more elements

  • MapPartition: similar to Map, data in one partition is processed at a time. MapPartition is recommended if you need to obtain third-party resource links during map processing.

  • Filter: the Filter function checks the incoming data and saves the data that meets the conditions

  • Reduce: Aggregates data, combines the current element with the value returned by the last Reduce, and returns a new value

  • Aggregate: sum, Max, and min

  • Distinct: Returns a de-weighted element from a dataset, data.distinct()

  • Join: internal Join

  • OuterJoin: External link

  • Cross: Gets the Cartesian product of two data sets

  • Union: Returns the sum of two datasets of the same data type

  • First-n: Gets the First n elements of the set

  • Sort Partition: Sorts all partitions of a data set locally, sorting multiple fields through a link call to sortPartition()

1.5.3 sink

  • WriteAsText () : Writes elements line-by-line as strings retrieved by calling each element’s toString() method
  • WriteAsCsv () : Writes tuples to files as comma-separated, with configurable separation between lines and fields. The value for each field comes from the object’s toString() method
  • Print () : Prints the value of each element’s toString() method to the standard output or standard error output stream

Requirement: Flink gets the user’s name from the data source and eventually needs to print out the user’s name and age information

Analysis: Therefore, it is necessary to obtain the user’s age information in the middle map processing, and use broadcast variables to process the user’s relational data set

We use the RichMapFunction at the bottom, which is an initialization process on the basis of the mapFunction. During the initialization process, I can get the broadcast variable, get the age value in the map, and then output the RES.

public class BroadCastDemo {

    public static void main(String[] args) throws Exception{



        // Get the operating environment

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1: Prepare the data to be broadcast

        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();

        broadData.add(new Tuple2<>("zhangsan".18));

        broadData.add(new Tuple2<>("lisi".19));

        broadData.add(new Tuple2<>("wangwu".20));

        DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);



        // Process the data that needs to be broadcast and convert the data set to map type, where key is the user name and value is the user age

        DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {

            @Override

            public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {

                HashMap<String, Integer> res = new HashMap<>();

                res.put(value.f0, value.f1);

                return res;

            }

        });

        / / the source data

        DataSource<String> data = env.fromElements("zhangsan"."lisi"."wangwu");

        // Note: here we need to use RichMapFunction to get the broadcast variable

        DataSet<String> result = data.map(new RichMapFunction<String.String> () {

            List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();

            HashMap<String, Integer> allMap = new HashMap<String, Integer>();



            / * *

* This method is executed only once

* You can implement some initialization functions here

* So, you can get broadcast variable data in the open method

* /


            @Override

            public void open(Configuration parameters) throws Exception {

                super.open(parameters);

                // Get broadcast data

                this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");

                for (HashMap map : broadCastMap) {

                    allMap.putAll(map);

                }

            }

            @Override

            public String map(String value) throws Exception {

                Integer age = allMap.get(value);

                return value + "," + age;

            }

        }).withBroadcastSet(toBroadcast, "broadCastMapName");// Perform the operation of broadcasting data

        result.print(a);

    }

}

Copy the code

It’s going to take a long time to run, and it’s going to print out three, four, five and their ages

1.5.5 Flink之Counter

Accumulator is similar to the application scenario of Mapreduce Counter. It can observe the data changes during the running of a task well. Accumulator can be operated in the operator function of Flink job task, but the final result of Accumulator can only be obtained after the task is executed.

Counter is a specific Accumulator that implements IntCounter, LongCounter and DoubleCounter

usage

1: Creates an accumulator

private IntCounter numLines = new IntCounter(); 

2: Registers the accumulator

getRuntimeContext().addAccumulator("num-lines".this.numLines);

3: Use an accumulator

this.numLines.add(1); 

4: Gets the result of the accumulator

myJobExecutionResult.getAccumulatorResult("num-lines")

Copy the code

The sample code

public class CounterDemo {

    public static void main(String[] args) throws Exception{

        // Get the operating environment

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> data = env.fromElements("a"."b"."c"."d");

        DataSet<String> result = data.map(new RichMapFunction<String, String>() {



            //1: create accumulator

            private IntCounter numLines = new IntCounter();

            @Override

            public void open(Configuration parameters) throws Exception {

                super.open(parameters);

                //2: registers the accumulator

                getRuntimeContext().addAccumulator("num-lines".this.numLines);



            }

            //int sum = 0;

            @Override

            public String map(String value) throws Exception {

                // If the degree of parallelism is 1, use the ordinary summation, but set multiple degrees of parallelism, the ordinary summation is incorrect

                //sum++;

                / / System. Out. Println (" sum: "+ sum);

                this.numLines.add(1);

                return value;

            }

        }).setParallelism(8);

        // If you want to get the value of counter, it can only be a task

        //result.print();

        result.writeAsText("d:\\data\\mycounter");

        JobExecutionResult jobResult = env.execute("counter");

        //3: get the accumulator

        int num = jobResult.getAccumulatorResult("num-lines");

        System.out.println("num:"+num);



    }

}

Copy the code

To here the operator of batch processing also mentioned about, interested friends can copy the code to run and see, not interested in words, that is also ok, anyway, now basically are using real-time operator, not too much impact

1.5.6 State of the operator

Back to the word count example

public class WordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> data = env.socketTextStream("localhost".8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override

            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {

                String[] fields = line.split(",");

                for (String word : fields) {

                    collector.collect(new Tuple2<>(word, 1));

                }

            }

        }).keyBy("0")

                .sum(1);

        result.print(a);

        env.execute("WordCount");

    }

}

Copy the code

Note that we must listen on port 8888 to start the program, otherwise a connect Refuse error will be reported. Since I was running under Windows, I set up a Netcat to assist. At this time, I started Netcat first, and then NC-LK 8888 listened on port 8888

Then I enter some words, and we look at our printed message

4 >(hadoop,1)

4 >(hadoop,2)

4 >(flink,1)

4 >(flink,2)

1 >(hive,1)

1 >(hive,2)

1 >(hive,3)

Copy the code

Flink is really real-time processing, one by one, and you’ll find that it’s easy to do the sums that you need to use updateStateByKey or mapWithState operators in Spark

Why is that? Because of what the website says: Flink is a stateful data stream

Therefore, state is a key point in our study of Flink. We’ll talk about that later

finally