An overview,

WordCount (WordCount) has been a classic case of big data entry, the following use Java to implement Flink WordCount code

2. Create Maven project

Here is the pom.xml file


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.duniqb</groupId>
    <artifactId>learn-flink</artifactId>
    <version>1.0</version>
    <properties>
        <java.version>11</java.version>
        <flink.version>1.10.1</flink.version>
        <log4j.version>1.2.17</log4j.version>
        <slf4j.version>1.7.7</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <! -- Flink Java API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <! -- Java API for Streaming -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
        <! Flink's Web UI -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>${project.build.scope}</scope>
        </dependency>
    </dependencies>
</project>
Copy the code

SocketWordCount.java

public class SocketWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        // The data source is a Linux host, which transmits data through sockets
        DataStreamSource<String> socket = env.socketTextStream("ubuntu".6666."\n");
// lambda(socket);
// function(socket);
// lambdaAndFunction(socket);
        richFunction(socket);
        env.execute();
    }
	// rich function mode
    private static void richFunction(DataStreamSource<String> socket) {
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }

            @Override
            public void close(a) throws Exception {
                super.close();
            }

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] strings = value.split("");
                for (String s : strings) {
                    out.collect(Tuple2.of(s, 1)); }}}); KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = flatMap.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                returnvalue.f0; }}); tuple2StringKeyedStream.sum(1).print();
    }

	// Lambda and function are mixed
    private static void lambdaAndFunction(DataStreamSource<String> socket) {
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] strings = value.split("");
                for (String s : strings) {
                    out.collect(Tuple2.of(s, 1)); }}}); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(f -> f.f0).sum(1);
        sum.print();
    }
	
    // The pure function is complete
    private static void function(DataStreamSource<String> socket) {
        SingleOutputStreamOperator<String> flatMap = socket.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] strings = value.split("");
                for(String s : strings) { out.collect(s); }}}); SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1); }}); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy("f0").sum(1);
        sum.print();
    }
	
    // Lambda mode
    private static void lambda(DataStreamSource<String> socket) {
        socket
                .flatMap((String value, Collector<String> out) -> {
                    Arrays.stream(value.split("")).forEach(out::collect);
                }).returns(Types.STRING)
                .map(f -> Tuple2.of(f, 1)).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(0)
                .sum(1) .print(); }}Copy the code

Four, the operation effect

On any Linux host, start nC-LK 6666 to temporarily enable the server that sends socket text, and send some strings after starting:

Then start the Java program and see the following message on the terminal: