preface
The last article documented the process of building a distributed Flink cluster environment
This article let’s talk about how Flink client docking Flink cluster environment process
Example: Flink reads files in Hadoop and then processes the data through a clustered environment
Hadoop
Hadoop cluster environment construction
Build one of the big data operating environments
Set up the second big data operation environment
Hadoop cluster port description
The Hadoop cluster is abnormal
Cannot format the storage directory
Detailed Exception Information
org.apache.hadoop.hdfs.qjournal.client.QuorumException: Could not format one or more JournalNodes. 1 exceptions thrown:
192.168.84.132:8485: the Directory/usr /local/hadoop/jn/data/nameservices001 is in an inconsistent state: Can't format the storage directory because the current directory is not empty
Copy the code
The journalNode port is 8485
handling
Delete the specified directory on each Hadoop JournalNode
rm -rf /usr/local/hadoop/jn/data/nameservices001
Copy the code
Upload files to HDFS
cd /usr/local/hadoop/sbin
Create folder
hdfs dfs -mkdir /hdfsdata
# file
sudo vi /home/aaa.txt
Upload files to the specified folder
hdfs dfs -put /home/aaa.txt /hdfsdata
Copy the code
Uploading files is abnormal.
Hadoop DataNode cannot be started
Detailed Exception Information
File /hdfsdata/aaa.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1).
There are 0 datanode(s) running and no node(s) are excluded in this operation
Copy the code
Check the DataNode status on the WebUI
http://192.168.84.128:50070/dfshealth.html#tab-datanode
Copy the code
Solution 1
Stop the cluster
cd /usr/local/hadoop/sbin
./stop-all.sh
Copy the code
Delete the data directory configured in the HDFS
- View the data directory
Hadoop.tmp. dir corresponds to the file configured in core-site. XML
cat /usr/local/hadoop/etc/hadoop/core-site.xml
Copy the code
- delete
rm -rf /usr/local/hadoop/tmp/*
Copy the code
reformat
./hadoop namenode -format
Copy the code
Restarting the cluster
./start-all.sh
Copy the code
Solution 2
If the above method still fails to start the DataNode, use this method
When file system formatting is performed
Will be in the Namenode data folder
(That is, the path of dfs.name.dir in the configuration file on the local system)
Save a current/VERSION file in
Record the namespaceID
Marks all formatted versions of namenode
If we format the Namenode frequently
So the current/VERSION file saved in datanode (dfs.data.dir on the local system path) is just the NAMenode ID saved when you first formatted it
As a result, the ids of namenode and Datanode are inconsistent
Copy the code
- Solution A :(recommended)
Delete all DataNode data and delete the VERSION from/DFS /data/current of each DataNode in the cluster
Then run the Hadoop namenode-format command again to format the file
Restart the cluster, and the error disappears
Copy the code
- Solution B:
Copy the clusterID in the VERSION under name/current to the VERSION under data/current to overwrite the original clusterID
Copy the code
Check the DataNode status
The DataNode is up
Viewing uploaded Files
http://192.168.84.128:50070
Copy the code
The file path
HDFS: / / 192.168.84.128:8020 / hdfsdata/aaa. TXT
Copy the code
Flink reads the data source and processes the data
The DEMO source code
https://gitee.com/pingfanrenbiji/flink-examples-streaming
Copy the code
Flink reads HDFS files and processes the data
Create the Flink execution environment
- First parameter: remote Flink cluster JobManager IP address
- The second parameter 8081 is the jobManager webui port
- Third argument: is the jar package where the current folder is located
The data source
Read HDFS file data
Copy the code
Introduction to various operators
Take word counting, for example
The string data is first parsed into words and times represented by tuple2
The first field is the word and the second field is the count
The initial number of times is set to 1
Copy the code
flatmap
Flatmap to do the parsing
A line of data may have more than one word
Copy the code
keyBy
Group data streams by word fields, or index field 0
KeyBy (int index) returns a tuple2 stream with the word key
Copy the code
timeWindow
Specify the desired window on the stream
And calculate the result according to the data in the window
Aggregate the number of words every 5 seconds
Each window is counted from zero
TimeWindow Specifies that you want 5 seconds of Tumble.
Copy the code
sum
The third call specifies the sum aggregate function for each key and each window
According to the number of times field (i.e. index 1 field homesick)
Get the resulting data stream
The number of occurrences of each word in those 5 seconds is printed every 5 seconds
Copy the code
Print the data to the console
All operator operations (create source, aggregate, print) just build up the graph of the inner operator operations
Execute is executed only when execute is invoked on commit to cluster or local machine
Copy the code
Execution error found no code exception
Specific exception information
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.streaming.scala.examples.remotejob.RemoteJobTest$$anon$2
Copy the code
The solution
- Package the current directory folder as a JAR package
Use the Maven plugin maven-jar-plugin
- The third argument points to the JAR package
View the task execution process on the FLink Web UI
Compile the abnormal
Invalid mark
--add-exports=java.base/sun.net.util=ALL-UNNAMED
Copy the code
The HDFS file system is not supported
Specific exception information
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded
Copy the code
handling
- Download the Flink Hadoop resource JAR package
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2. 7.5-7.0. The jar
Copy the code
- Place it in the Flink installation package lib directory
Each node needs to place the JAR package and restart the Flink cluster environment
The hadoop Namenode node is in standby state
Detailed information
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
Copy the code
The solution
Reformat the two Namenode nodes
The specific details
Copy the code
Set up the second big data operation environment
legacy
Flink data source comes from socket data
Start the socket service and enter data
The problem is
Flink does not listen on the socket
The cause has not yet been identified
Understand the friends please contact me
Guide me
Copy the code
If the local environment is accessible
Afterword.
To solve this problem
I consulted Teacher Xie Bo in the Dingding group of "Apache Flink China Community"
He told me:
Create a local execution environment using Java or Scala
'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); '
There are few
'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(ip,port,jarfiles); '
Such use
If using flink distributed environment, upload JAR package through web UI interface to complete
Copy the code
That explains why I didn't find anything, you know
On my own'Cross the river by feeling for the stones'the
Copy the code
conclusion
When learning something new
After some hard work and struggle in accordance with their own ideas
Maybe it's going in the wrong direction
But you'll get to know it a little bit better
Copy the code