preface

The last article documented the process of building a distributed Flink cluster environment

This article let’s talk about how Flink client docking Flink cluster environment process

Example: Flink reads files in Hadoop and then processes the data through a clustered environment

Hadoop

Hadoop cluster environment construction

Build one of the big data operating environments

Set up the second big data operation environment

Hadoop cluster port description


The Hadoop cluster is abnormal

Cannot format the storage directory

Detailed Exception Information

org.apache.hadoop.hdfs.qjournal.client.QuorumException: Could not format one or more JournalNodes. 1 exceptions thrown:

192.168.84.132:8485: the Directory/usr /local/hadoop/jn/data/nameservices001 is in an inconsistent state: Can't format the storage directory because the current directory is not empty

Copy the code

The journalNode port is 8485

handling

Delete the specified directory on each Hadoop JournalNode



rm -rf /usr/local/hadoop/jn/data/nameservices001

Copy the code

Upload files to HDFS

cd /usr/local/hadoop/sbin

Create folder

hdfs dfs -mkdir /hdfsdata

# file

sudo vi /home/aaa.txt

Upload files to the specified folder

hdfs dfs -put /home/aaa.txt  /hdfsdata

Copy the code

Uploading files is abnormal.

Hadoop DataNode cannot be started

Detailed Exception Information

File /hdfsdata/aaa.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). 

There are 0 datanode(s) running and no node(s) are excluded in this operation

Copy the code

Check the DataNode status on the WebUI

http://192.168.84.128:50070/dfshealth.html#tab-datanode

Copy the code

Solution 1
Stop the cluster
cd /usr/local/hadoop/sbin

./stop-all.sh

Copy the code
Delete the data directory configured in the HDFS
  • View the data directory
Hadoop.tmp. dir corresponds to the file configured in core-site. XML



cat /usr/local/hadoop/etc/hadoop/core-site.xml 

Copy the code
  • delete
rm -rf /usr/local/hadoop/tmp/*

Copy the code
reformat
./hadoop namenode -format

Copy the code
Restarting the cluster
./start-all.sh

Copy the code
Solution 2
If the above method still fails to start the DataNode, use this method



When file system formatting is performed

Will be in the Namenode data folder

(That is, the path of dfs.name.dir in the configuration file on the local system)

Save a current/VERSION file in

Record the namespaceID

Marks all formatted versions of namenode

If we format the Namenode frequently

So the current/VERSION file saved in datanode (dfs.data.dir on the local system path) is just the NAMenode ID saved when you first formatted it

As a result, the ids of namenode and Datanode are inconsistent

Copy the code
  • Solution A :(recommended)
Delete all DataNode data and delete the VERSION from/DFS /data/current of each DataNode in the cluster

Then run the Hadoop namenode-format command again to format the file

Restart the cluster, and the error disappears

Copy the code
  • Solution B:
Copy the clusterID in the VERSION under name/current to the VERSION under data/current to overwrite the original clusterID

Copy the code

Check the DataNode status


The DataNode is up

Viewing uploaded Files

http://192.168.84.128:50070

Copy the code

The file path


HDFS: / / 192.168.84.128:8020 / hdfsdata/aaa. TXT

Copy the code

Flink reads the data source and processes the data

The DEMO source code

https://gitee.com/pingfanrenbiji/flink-examples-streaming

Copy the code

Flink reads HDFS files and processes the data


Create the Flink execution environment

  • First parameter: remote Flink cluster JobManager IP address
  • The second parameter 8081 is the jobManager webui port
  • Third argument: is the jar package where the current folder is located

The data source

Read HDFS file data

Copy the code

Introduction to various operators

Take word counting, for example


The string data is first parsed into words and times represented by tuple2

The first field is the word and the second field is the count

The initial number of times is set to 1

Copy the code

flatmap

Flatmap to do the parsing

A line of data may have more than one word



Copy the code

keyBy



Group data streams by word fields, or index field 0

KeyBy (int index) returns a tuple2 stream with the word key



Copy the code

timeWindow



Specify the desired window on the stream

And calculate the result according to the data in the window

Aggregate the number of words every 5 seconds

Each window is counted from zero



TimeWindow Specifies that you want 5 seconds of Tumble.



Copy the code

sum



The third call specifies the sum aggregate function for each key and each window

According to the number of times field (i.e. index 1 field homesick)

Get the resulting data stream

The number of occurrences of each word in those 5 seconds is printed every 5 seconds



Copy the code
Print the data to the console



All operator operations (create source, aggregate, print) just build up the graph of the inner operator operations



Execute is executed only when execute is invoked on commit to cluster or local machine

Copy the code

Execution error found no code exception

Specific exception information

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.streaming.scala.examples.remotejob.RemoteJobTest$$anon$2

Copy the code

The solution

  • Package the current directory folder as a JAR package

Use the Maven plugin maven-jar-plugin

  • The third argument points to the JAR package

View the task execution process on the FLink Web UI


Compile the abnormal

Invalid mark

--add-exports=java.base/sun.net.util=ALL-UNNAMED

Copy the code

The HDFS file system is not supported

Specific exception information

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded

Copy the code

handling

  • Download the Flink Hadoop resource JAR package
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2. 7.5-7.0. The jar

Copy the code
  • Place it in the Flink installation package lib directory

Each node needs to place the JAR package and restart the Flink cluster environment

The hadoop Namenode node is in standby state

Detailed information

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby

Copy the code

The solution

Reformat the two Namenode nodes



The specific details

Copy the code

Set up the second big data operation environment

legacy

Flink data source comes from socket data


Start the socket service and enter data


The problem is

Flink does not listen on the socket

The cause has not yet been identified

Understand the friends please contact me

Guide me

Copy the code

If the local environment is accessible


Afterword.

To solve this problem

I consulted Teacher Xie Bo in the Dingding group of "Apache Flink China Community"

He told me:



Create a local execution environment using Java or Scala



'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); '



There are few



'final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(ip,port,jarfiles); '



Such use



If using flink distributed environment, upload JAR package through web UI interface to complete

Copy the code

That explains why I didn't find anything, you know

On my own'Cross the river by feeling for the stones'the

Copy the code

conclusion

When learning something new

After some hard work and struggle in accordance with their own ideas

Maybe it's going in the wrong direction

But you'll get to know it a little bit better

Copy the code