
The last article documented the process of building a distributed Flink cluster environment

This article let’s talk about how Flink client docking Flink cluster environment process

Example: Flink reads files in Hadoop and then processes the data through a clustered environment


Hadoop cluster environment construction

Build one of the big data operating environments

Set up the second big data operation environment

Hadoop cluster port description

The Hadoop cluster is abnormal

Cannot format the storage directory

Detailed Exception Information

org.apache.hadoop.hdfs.qjournal.client.QuorumException: Could not format one or more JournalNodes. 1 exceptions thrown: the Directory/usr /local/hadoop/jn/data/nameservices001 is in an inconsistent state: Can't format the storage directory because the current directory is not empty

The journalNode port is 8485


Delete the specified directory on each Hadoop JournalNode

rm -rf /usr/local/hadoop/jn/data/nameservices001

Upload files to HDFS

cd /usr/local/hadoop/sbin

Create folder

hdfs dfs -mkdir /hdfsdata

# file

sudo vi /home/aaa.txt

Upload files to the specified folder

hdfs dfs -put /home/aaa.txt  /hdfsdata

Uploading files is abnormal.

Hadoop DataNode cannot be started

Detailed Exception Information

File /hdfsdata/aaa.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). 

There are 0 datanode(s) running and no node(s) are excluded in this operation

Check the DataNode status on the WebUI

Solution 1
Stop the cluster
cd /usr/local/hadoop/sbin


Delete the data directory configured in the HDFS
  • View the data directory
Hadoop.tmp. dir corresponds to the file configured in core-site. XML

cat /usr/local/hadoop/etc/hadoop/core-site.xml 

  • delete
rm -rf /usr/local/hadoop/tmp/*

./hadoop namenode -format

Restarting the cluster

Solution 2
If the above method still fails to start the DataNode, use this method

When file system formatting is performed

Will be in the Namenode data folder

(That is, the path of in the configuration file on the local system)

Save a current/VERSION file in

Record the namespaceID

Marks all formatted versions of namenode

If we format the Namenode frequently

So the current/VERSION file saved in datanode ( on the local system path) is just the NAMenode ID saved when you first formatted it

As a result, the ids of namenode and Datanode are inconsistent

  • Solution A :(recommended)
Delete all DataNode data and delete the VERSION from/DFS /data/current of each DataNode in the cluster

Then run the Hadoop namenode-format command again to format the file

Restart the cluster, and the error disappears

  • Solution B:
Copy the clusterID in the VERSION under name/current to the VERSION under data/current to overwrite the original clusterID

Check the DataNode status

The DataNode is up

Viewing uploaded Files

The file path

HDFS: / / / hdfsdata/aaa. TXT

Flink reads the data source and processes the data

The DEMO source code

Flink reads HDFS files and processes the data

Create the Flink execution environment

  • First parameter: remote Flink cluster JobManager IP address
  • The second parameter 8081 is the jobManager webui port
  • Third argument: is the jar package where the current folder is located

The data source

Read HDFS file data

Introduction to various operators

Take word counting, for example

The string data is first parsed into words and times represented by tuple2

The first field is the word and the second field is the count

The initial number of times is set to 1

Flatmap to do the parsing

A line of data may have more than one word

Group data streams by word fields, or index field 0

KeyBy (int index) returns a tuple2 stream with the word key

Specify the desired window on the stream

And calculate the result according to the data in the window

Aggregate the number of words every 5 seconds

Each window is counted from zero

TimeWindow Specifies that you want 5 seconds of Tumble.

The third call specifies the sum aggregate function for each key and each window

According to the number of times field (i.e. index 1 field homesick)

Get the resulting data stream

The number of occurrences of each word in those 5 seconds is printed every 5 seconds

Print the data to the console

All operator operations (create source, aggregate, print) just build up the graph of the inner operator operations

Execute is executed only when execute is invoked on commit to cluster or local machine

Execution error found no code exception

Specific exception information

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.streaming.scala.examples.remotejob.RemoteJobTest$$anon$2

The solution

  • Package the current directory folder as a JAR package

Use the Maven plugin maven-jar-plugin

  • The third argument points to the JAR package

View the task execution process on the FLink Web UI

Compile the abnormal

Invalid mark


The HDFS file system is not supported

Specific exception information

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded

Copy the code


  • Download the Flink Hadoop resource JAR package 7.5-7.0. The jar

  • Place it in the Flink installation package lib directory

Each node needs to place the JAR package and restart the Flink cluster environment

The hadoop Namenode node is in standby state

Detailed information

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby

The solution

Reformat the two Namenode nodes

The specific details

Set up the second big data operation environment


Flink data source comes from socket data

Start the socket service and enter data

The problem is

Flink does not listen on the socket

The cause has not yet been identified

Understand the friends please contact me

Guide me

If the local environment is accessible


To solve this problem

I consulted Teacher Xie Bo in the Dingding group of "Apache Flink China Community"

He told me:

Create a local execution environment using Java or Scala

'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); '

There are few

'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(ip,port,jarfiles); '

Such use

If using flink distributed environment, upload JAR package through web UI interface to complete

That explains why I didn't find anything, you know

On my own'Cross the river by feeling for the stones'the

When learning something new

After some hard work and struggle in accordance with their own ideas

Maybe it's going in the wrong direction

But you'll get to know it a little bit better

