Docker and Docker-Composer are required to verify this article. The author uses Mac + Docker
Docker starts the Flink cluster
First download the image of Flink docker Pull Flink, I downloaded version 1.9.0.
Then write docker-composer. Yml
version: "2.1"
services:
jobmanager:
image: flink
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
Copy the code
The sample code
The function of this code is to read text information from the SOCKET port, and calculate the number of occurrences of each word in the statistical period after word segmentation. Here is just a list of the key code, all the project code can refer to my Github
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
final int port;
final String host;
port = 9008;
host = "192.168.65.2";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream(host, port, "\n");
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>(){
@Override
public void flatMap(String value, Collector<WordWithCount> out){
for(String word : value.split("\\s")){
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>(){
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b){
return newWordWithCount(a.word, a.count+b.count); }}); windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount(a){}
public WordWithCount(String word, long count){
this.word = word;
this.count = count;
}
@Override
public String toString(a){
return word + ":"+ count; }}Copy the code
Run the example
First, run flink in the docker-comemess. yml directory
$ docker-compose up -d
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
dc54c9cf6304 flink "/ docker - entrypoint...." 3 days ago Up 4 seconds 6121-6123/tcp, 8081/tcp flink_taskmanager_1
2eab6b0fd0f1 flink "/ docker - entrypoint...."3 days ago Up 3 seconds 6123/ TCP, 0.0.0.0:8081->8081/ TCP flink_jobManager_1Copy the code
You can see that both instances are started, then open a new terminal window and run the NC listener.
$ nc -l 9008
Copy the code
Open the Flink interface and select Submit New Job to upload the compiled JAR package.
After submitting, you can see that the running app has received the data
Use the docker logs -f command, and then enter some words in the NC window, you can see the statistics output in the Docker window.
The resources
- Flink- Docker-based development environment construction
- Docker Composer Quick start