Docker and Docker-Composer are required to verify this article. The author uses Mac + Docker

Docker starts the Flink cluster

First download the image of Flink docker Pull Flink, I downloaded version 1.9.0.

Then write docker-composer. Yml

version: "2.1"
services:
  jobmanager:
    image: flink
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
Copy the code

The sample code

The function of this code is to read text information from the SOCKET port, and calculate the number of occurrences of each word in the statistical period after word segmentation. Here is just a list of the key code, all the project code can refer to my Github

public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {
        final int port;
        final String host;
        port = 9008;
        host = "192.168.65.2";

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.socketTextStream(host, port, "\n");

        DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>(){
                @Override
                public void flatMap(String value, Collector<WordWithCount> out){
                    for(String word : value.split("\\s")){
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>(){
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b){
                    return newWordWithCount(a.word, a.count+b.count); }}); windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount(a){}

        public WordWithCount(String word, long count){
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString(a){
            return word + ":"+ count; }}Copy the code

Run the example

First, run flink in the docker-comemess. yml directory

$ docker-compose up -d
$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                              NAMES
dc54c9cf6304        flink               "/ docker - entrypoint...."   3 days ago          Up 4 seconds        6121-6123/tcp, 8081/tcp            flink_taskmanager_1
2eab6b0fd0f1        flink               "/ docker - entrypoint...."3 days ago Up 3 seconds 6123/ TCP, 0.0.0.0:8081->8081/ TCP flink_jobManager_1Copy the code

You can see that both instances are started, then open a new terminal window and run the NC listener.

$ nc -l 9008
Copy the code

Open the Flink interface and select Submit New Job to upload the compiled JAR package.

After submitting, you can see that the running app has received the data

Use the docker logs -f command, and then enter some words in the NC window, you can see the statistics output in the Docker window.

The resources

  1. Flink- Docker-based development environment construction
  2. Docker Composer Quick start