Flink receives a Kafka text data stream, performs WordCount word frequency statistics, and outputs it to standard output. In this article you can learn how to write and run Flink programs.

Code apart

The first step is to set up Flink’s execution environment:

// Create Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Copy the code

Set Kafka parameters, connect the server and port, read the data source in the Topic named Shakespeare, and name the data source stream:

/ / Kafka parameters
Properties properties = new Properties();
properties.setProperty("bootstrap.servers"."localhost:9092");
properties.setProperty("group.id"."flink-group");
String inputTopic = "Shakespeare";
String outputTopic = "WordCount";

// Source
FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
Copy the code

Use the Flink operator to process this data stream:

// Transformations
// Use the Flink operator to operate on the text in the input stream
// Cut words by space, count, partition, set time window, aggregate
DataStream<Tuple2<String, Integer>> wordCount = stream
    .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
      String[] tokens = line.split("\\s");
      // Output result (word, 1)
      for (String token : tokens) {
        if (token.length() > 0) {
          collector.collect(new Tuple2<>(token, 1));
        }
      }
    })
    .returns(Types.TUPLE(Types.STRING, Types.INT))
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1);
Copy the code

The datastream-level API provided by Flink is used here, which includes transformation, grouping, windowing, and aggregation operations.

Print the data stream:

// Sink
wordCount.print();
Copy the code

Finally, execute this program:

// execute
env.execute("kafka streaming word count");
Copy the code

Env.execute is required to start the Flink job, and only when execute() is called will the individual actions previously invoked be executed on the commit to the cluster or local machine.

The complete code is as follows:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class WordCountKafkaInStdOut {

    public static void main(String[] args) throws Exception {

        // Create Flink execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        / / Kafka parameters
        Properties properties = new Properties(a); properties.setProperty("bootstrap.servers"."localhost:9092");
        properties.setProperty("group.id"."flink-group");
        String inputTopic = "Shakespeare";
        String outputTopic = "WordCount";

        // Source
        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);

        // Transformations
        // Use the Flink operator to operate on the text in the input stream
        // Cut words by space, count, partition, set time window, aggregate
        DataStream<Tuple2<String.Integer>> wordCount = stream
            .flatMap((String line, Collector<Tuple2<String.Integer>> collector) -> {
                String[] tokens = line.split("\\s");
                // Output result (word, 1)
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<>(token, 1));
                    }
                }
            })
            .returns(Types.TUPLE(Types.STRING.Types.INT))
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        // Sink
        wordCount.print();

        // execute
        env.execute("kafka streaming word count"); }}Copy the code

Execute a program

In our Introduction to Kafka article, we talked about how to start a Kafka cluster and send data streams to a Topic. Before the Flink job starts, we will start a Kafka cluster as described in the article, create the corresponding Topic, and write data to the Topic.

Intellij Idea Debugging is performed

In IntelliJ Idea, click the green button to execute the program. Any two green buttons in the image below can start the program.

IntelliJ Idea displays the output of the program to the standard output, including the results to be printed this time.

Congratulations, your first Flink program is running successfully!

Submit jobs on the cluster

In the first step we have downloaded and set up the local cluster, then we add the code based on the template and can debug and run it in IntelliJ Idea. In a production environment, code compilation is typically packaged and submitted to the cluster.

Note that there are two directories involved here, one is the project directory where we stored the code we just wrote, referred to as the project directory, and the other is the Flink home directory downloaded from Flink’s official website. The bin directory in the home directory contains the good command line tools provided by Flink.

Go to the project directory and use the Maven command line to compile and package:

Use Maven to compile and package your own code
The package will be placed in the target subfolder of the project directory
$ mvn clean package
Copy the code

Go back to the Flink home directory where you just downloaded the decompression and use the command-line tool Flink provided by Flink to submit the job we just packed to the cluster. The command line argument –class is used to specify which main class is the entry. We’ll explain how to use the command line later.

$ bin/flink run --class com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut / Users/luweizheng/Projects/big data/flink tutorials/target/flink tutorials - 0.1 - jarCopy the code

At this point, there is a Flink program on the dashboard.

The output of the program will be typed into the.out file in the log directory under Flink’s home directory. Use the following command to view the result:

$ tail -f log/flink-*-taskexecutor-*.out
Copy the code

Stopping a local cluster:

$ ./bin/stop-cluster.sh
Copy the code

During Flink development and debugging, there are generally several ways to execute the program:

  1. Use the run button built into IntelliJ Idea. This method is mainly used for local debugging.
  2. Submit jobs to the cluster, including Java and Scala programs, using the standard command-line tools provided by Flink. This approach is more suitable for production environments.
  3. Use other command-line tools provided by Flink, such as interactive environments for Scala, Python, and SQL. This approach is also used during debugging.