1. Flink environment construction
1.1 Flink Version List:
archive.apache.org/dist/flink/
1.2 Install the latest version 1.12.2
Wget HTTP: / / https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgzCopy the code
1.3 Decompressing installation
The tar - XZF flink 1.12.2 - bin - scala_2. 12. TGZ. / bin/start - cluster. ShCopy the code
Check whether the installation was successful: the JPS – l | grep flink
Web UI page address: http://192.168.9.226:8081/#/overview
2. The wordCount example
2.1 Directory structure of springboot project:
2.2 Adding maven dependencies:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> < version > 1.10.0 < / version > < / dependency > < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - streaming - java_2. 11 < / artifactId > < version > 1.10.0 < / version > < / dependency > < the dependency > <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> < artifactId > maven -- the compiler plugin < / artifactId > < version > 3.6.0 < / version > < configuration > < source > 1.8 < / source > <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> < artifactId > maven - surefire plugin < / artifactId > < version > 2.19 < / version > < configuration > < the skip > true < / skip > </configuration> </plugin> </plugins> </build>Copy the code
2.3 Example 1: Batch wordCount
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; // Batch wordCount public class WordCountBatch {public static void main(String[] args) throws Exception{// Create an execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String filePath = "F:\\ttWork\\flink-demo\\ SRC \\main\ resources\ hell.txt "; DataSet<String> inputDateSet = env.readTextFile(filePath); // DataSet<Tuple2<String, Integer>> sum = inputDatesett.flatMap (new MyFlatMap()).groupby (0) // Sum (1); // DataSet<Tuple2<String, Integer>> sum = inputDateSet. Sum.print (); Public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>>{ @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> Collector) throws Exception {// Use the blank word String[] words = s.split(" "); For (String word: words){collector.collect(new Tuple2<>(word, 1)); }}}}Copy the code
2.4 Example 2: Stream processing wordCount
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; Public class WordCountStream {public static void main(String[] args) throws Exception {// Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the parallelism to env.setParallelism(4); String filePath = "F:\\ttWork\\flink-demo\\ SRC \\main\ resources\ hell.txt "; DataStream<String> inputDataStream = env.readTextFile(filePath); / / data flow conversion operations SingleOutputStreamOperator < Tuple2 < String, Integer>> sum = inputDataStream.flatMap(new MyFlatMap()) .keyBy(0) .sum(1); sum.print(); // Start task env.execute(); Public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> Collector) throws Exception {// Use the blank word String[] words = s.split(" "); For (String word: words){collector.collect(new Tuple2<>(word, 1)); }}}}Copy the code
2.5 Example 3: Socket flow processing wordCount
Socket Port input test data
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; WordCount public class WordCountSocketStream {public static void main(String[] args) throws Exception { / / create the execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); // env.setParallelism(4); ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); // Read data from the socket text stream DataStream<String> inputDataStream = env.sockettextStream (host, port); / / data flow conversion operations SingleOutputStreamOperator < Tuple2 < String, Integer>> sum = inputDataStream.flatMap(new MyFlatMap()) .keyBy(0) .sum(1); sum.print(); // Start task env.execute(); Public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> Collector) throws Exception {// Use the blank word String[] words = s.split(" "); For (String word: words){collector.collect(new Tuple2<>(word, 1)); }}}}Copy the code
2.6 Send Example three to the Flink server using flink Web UI
Enter the class to run, the socket address, and the port:
Check the execution of Flink program on the Running Jobs page, such as the parallelism of each operator, the number of accepted data, etc
View Flink’s log output on the Task Managers page: