preface
Ok, we will introduce the operator operation of Flink in this article
Flink is easy to download, decompress, go to Flink’s bin directory, start start-cluster.sh, and then visit localhost:8081 to access its beautiful pages
To stop the cluster, run the stop-cluster.sh command
First, Flink operator
1.1 (Supplement) Flink Shell use
For beginners, development is prone to errors. If you package debugging every time, it will be troublesome and difficult to locate problems. You can use the Scala shell command line for debugging
The Scala shell approach supports streaming and batch processing. When the shell command line is started, two different ExecutionEnvironments are automatically created. Senv (Stream) and BENV (Batch) are used to process Stream and Batch programs, respectively. (Similar to sc variable in spark-shell)
bin/start-scala-shell.sh [local|remote|yarn] [options] <args>
Copy the code
If we encounter the above Error, we can look at the Error message, which says we need to confirm the mode of execution, so we need to take this part of the argument, there are three different ways to specify, respectively
[local | remote <host> <port> | yarn]
Copy the code
So let’s try it. Let’s start with local
[root@node1 bin]# ./start-scala-shell.sh local
Copy the code
At this point, I specify the mode local, and it opens successfully
Is…?
… 🤣, I now believe that you are also likely to happen, the error “Could not create the DispatcherResourceManagerComponent”
To fix the problem, go to CD /usr/local/flink-1.10.0/conf and add a parameter
After modifying the port, you can run successfully
The remote and ON YARN modes are similar. You can also start them if you like
[root@node1 bin]/start-scala-shell.sh remote 192.168.200.11 8081
Copy the code
At this time we successfully started up, moved
It also shows us two examples of Flink’s batch processing and real-time processing.
Of course, it’s not that important, because flink-shell is nowhere near as good as Spark-shell, so we’ll just try and get past it.
1.2 Flink’s data source
Remember what we said at that time, to understand a real-time program, we mainly need to understand three aspects, data source, data processing and data output, so let’s take a look at the data source of Flink
1.2.1 Real-time Source Introduction
Source is the data input program, you can use the StreamExecutionEnvironment. AddSource (sourceFunction) to add a source for your program.
Flink provides a large number of implemented source methods, you can also customize the source (there will be a corresponding small demo, directly copy your IDEA to run) :
-
Customize a parallelism free source by implementing the sourceFunction interface
-
By implementing ParallelSourceFunction interface or inherit RichParallelSourceFunction customize the parallelism of the source
But for the most part, we’ll just use our own source.
1.2.2 Method of obtaining source
1. Based on files
readTextFile(path)
Read text files, which follow TextInputFormat reading rules, line by line and return.
Copy the code
2. Based on the socket
socketTextStream
To read data from the socket, the element can be split with a delimiter.
Copy the code
3. Set based
fromCollection(Collection)
Create a data flow from a Java collection, in which all elements must be of the same type.
Copy the code
4. Customize input
AddSource implements reading data from third-party data sources
The system provides a batch of connectors built-in, and the corresponding connectors will be providedsourceSupport [kafka]
Copy the code
There are other sources mentioned in the official website, but the focus is on Kafka, so learn more
- Apache Kafka (source/sink
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
1.2.3 Collection of data sources (Code can be directly copied and run)
public class StreamingSourceFromCollection {
public static void main(String[] args) throws Exception {
// Step 1: Get environment variables
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Step 2: Simulate data
ArrayList<String> data = new ArrayList<String> ();
data.add("hadoop");
data.add("spark");
data.add("flink");
// Step 3: Get the data source
DataStreamSource<String> dataStream = env.fromCollection(data);
// Step 4: Transformation
SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<String.String> () {
@Override
// Simply iterate over the data
public String map(String word) throws Exception {
return "testCollection_" + word;
}
});
// Step 5: Process the result (print)
addPreStream.print().setParallelism(1);
// Step 6: Start the program
env.execute("StreamingSourceFromCollection");
}
}
Copy the code
The output
1.2.4 Custom single parallelism data source (code can be directly copied and run)
Simulate a data source that generates one data every second
/ * *
* Function: generate one data per second
* /
public class MyNoParalleSource implements SourceFunction<Long> {
private long number = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> sct) throws Exception {
while (isRunning){
sct.collect(number);
number++;
// Generate one data per second
Thread.sleep(1000);
}
}
@Override
public void cancel(a) {
isRunning=false;
}
}
Copy the code
At this point, we perform processing on the data source, which is also very simple, a map operation and a filter operation, filter is to select the even number
/ * *
* Function: Get data from a custom data source and filter out even numbers
* /
public class StreamingDemoWithMyNoPralalleSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Receive the data source
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("Received data:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2= =0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
Copy the code
And the result of that is
1.2.5 Customizing multiple parallelism data sources
/ * *
* Generates one data per second
* /
public class MyParalleSource implements ParallelSourceFunction<Long> {
private long number = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> sct) throws Exception {
while (isRunning){
sct.collect(number);
number++;
// Generate one data per second
Thread.sleep(1000);
}
}
@Override
public void cancel(a) {
isRunning=false;
}
}
Copy the code
Here we can see that we just implement a different interface and then set the parallelism in the business code
public class StreamingDemoWithMyPralalleSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SetParallelism (2) sets parallelism 2
DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("Received data:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2= =0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
Copy the code
1.3 Common Transformation operators of Flink
1.3.1 Map and Filter (just demonstrated)
1.3.2 flatMap, keyBy, sum, union (basically the same as Spark)
1.3.3 connect, MapFunction and coMapFunction
The connect operation is not available on Spark, so if you look at it a little bit, it is similar to Union, but it can only connect two streams, the data types of the two streams can be different, and different processing methods are applied to the data in the two streams. The difference between CoMapFunction and MapFunction is that the data processing of one stream is changed to two streams (note that it can only be two).
public class ConnectionDemo {
public static void main(String[] args) throws Exception {
// Get Flink's operating environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get the data source
// Note: For this source, parallelism can only be set to 1
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String> () {
@Override
public String map(Long value) throws Exception {
// Here is the second data source, string I added a prefix str_
return "str_" + value;
}
});
ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String.Object> () {
@Override
public Object map1(Long value) throws Exception {
// Business can be processed here
return value;
}
@Override
public Object map2(String value) throws Exception {
// Business processing can also be done here
return value;
}
});
// Print the result
result.print().setParallelism(1);
String jobName = ConnectionDemo.class.getSimpleName();
env.execute(jobName);
}
}
Copy the code
In the output result, two streams of data may not be one for each other, but one stream may be the first to enter multiple streams
1.3.4 the Split and Select
The purpose of this is to split a data stream into multiple data streams
May be in the practical work, mixed with a variety of similar data source data stream, multiple types of data processing rules are different, so can according to certain rules, cut a data stream into multiple data stream, so that each data flow can use need not processing logic, and the select is help us to give different flow to extracted a role
public class SplitDemo {
public static void main(String[] args) throws Exception {
// Get Flink's operating environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get the data source
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);// Note: For this source, parallelism can only be set to 1
// The data is divided according to the parity of the data
SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> outPut = new ArrayList<>();
if (value % 2= =0) {
outPut.add("even");/ / even
} else {
outPut.add("odd");/ / odd
}
return outPut;
}
});
// Select one or more shard streams
DataStream<Long> evenStream = splitStream.select("even");
DataStream<Long> oddStream = splitStream.select("odd");
DataStream<Long> moreStream = splitStream.select("odd"."even");
// Print the result, at this point I select all even numbers
evenStream.print().setParallelism(1);
String jobName = SplitDemo.class.getSimpleName();
env.execute(jobName);
}
}
Copy the code
The results
1.4 Common sink operator of Flink
In fact, the output of data is relatively simple, I think this thing may not need to be combined with the code to expand, roughly go through it
1.4.1 the print () and printToErr ()
Prints the value of each element’s toString() method to the standard output or standard error output stream
1.4.2 writeAsText ()
/ * *
* Data source: 1 2 3 4 5..... Keep coming
* Print the received data through map
* To filter the data, we only need even numbers
* /
public class WriteTextDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("Received data:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2= =0;
}
});
// If you do not have a cluster, you can specify a local path and write it to a file
filterDataStream.writeAsText("your path").setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
Copy the code
1.4.3 User-defined Sink
Except for the following ones we mentioned above
- Apache Kafka (source/sink
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
Of course, I still have the status of writing data into Redis. At this point, we need to introduce a dependency first
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>Flink - connector - redis_2. 11</artifactId>
<version>1.0</version>
</dependency>
Copy the code
If you want to know about Redis partners can go to the past rookie tutorial such a website to have a look, the following code has been annotated
/ * *
* Write data to Redis
* /
public class SinkForRedisDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("xxx", xxx, "\n");
//lpush l_words word
Tuple2 < string, string >
DataStream<Tuple2<String.String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String.String> > () {
@Override
public Tuple2<String.String> map(String value) throws Exception {
return new Tuple2<>("l_words", value);
}
});
// Create the redis configuration
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(xxx).setPort(xxx).build();
/ / create redissink
RedisSink<Tuple2<String.String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String.String>> {
// Get the redis key from the received data
@Override
public String getKeyFromData(Tuple2<String.String> data) {
return data.f0;
}
// Get the redis value from the received data
@Override
public String getValueFromData(Tuple2<String.String> data) {
return data.f1;
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}
Copy the code
1.5 Operators in batch processing
Flink’s batch processing is not very good and is rarely used in enterprise development. However, the team responsible for it is very diligent, so it is expected to become better in the near future. In fact, it is just a review of our previous Spark-Core functions
1.5.1 source
Based on the file
readTextFile(path)
Copy the code
Based on the collection
fromCollection(Collection)
Copy the code
1.5.2 the transform
Overview of operators:
-
Map: Enter an element, then return an element, and do some cleaning and conversion in between
-
FlatMap: Enter an element and return zero, one or more elements
-
MapPartition: similar to Map, data in one partition is processed at a time. MapPartition is recommended if you need to obtain third-party resource links during map processing.
-
Filter: the Filter function checks the incoming data and saves the data that meets the conditions
-
Reduce: Aggregates data, combines the current element with the value returned by the last Reduce, and returns a new value
-
Aggregate: sum, Max, and min
-
Distinct: Returns a de-weighted element from a dataset, data.distinct()
-
Join: internal Join
-
OuterJoin: External link
-
Cross: Gets the Cartesian product of two data sets
-
Union: Returns the sum of two datasets of the same data type
-
First-n: Gets the First n elements of the set
-
Sort Partition: Sorts all partitions of a data set locally, sorting multiple fields through a link call to sortPartition()
1.5.3 sink
- WriteAsText () : Writes elements line-by-line as strings retrieved by calling each element’s toString() method
- WriteAsCsv () : Writes tuples to files as comma-separated, with configurable separation between lines and fields. The value for each field comes from the object’s toString() method
- Print () : Prints the value of each element’s toString() method to the standard output or standard error output stream
1.5.4 Flink’s broadcast variables
Requirement: Flink gets the user’s name from the data source and eventually needs to print out the user’s name and age information
Analysis: Therefore, it is necessary to obtain the user’s age information in the middle map processing, and use broadcast variables to process the user’s relational data set
We use the RichMapFunction at the bottom, which is an initialization process on the basis of the mapFunction. During the initialization process, I can get the broadcast variable, get the age value in the map, and then output the RES.
public class BroadCastDemo {
public static void main(String[] args) throws Exception{
// Get the operating environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1: Prepare the data to be broadcast
ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
broadData.add(new Tuple2<>("zhangsan".18));
broadData.add(new Tuple2<>("lisi".19));
broadData.add(new Tuple2<>("wangwu".20));
DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);
// Process the data that needs to be broadcast and convert the data set to map type, where key is the user name and value is the user age
DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
HashMap<String, Integer> res = new HashMap<>();
res.put(value.f0, value.f1);
return res;
}
});
/ / the source data
DataSource<String> data = env.fromElements("zhangsan"."lisi"."wangwu");
// Note: here we need to use RichMapFunction to get the broadcast variable
DataSet<String> result = data.map(new RichMapFunction<String.String> () {
List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
HashMap<String, Integer> allMap = new HashMap<String, Integer>();
/ * *
* This method is executed only once
* You can implement some initialization functions here
* So, you can get broadcast variable data in the open method
* /
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Get broadcast data
this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
for (HashMap map : broadCastMap) {
allMap.putAll(map);
}
}
@Override
public String map(String value) throws Exception {
Integer age = allMap.get(value);
return value + "," + age;
}
}).withBroadcastSet(toBroadcast, "broadCastMapName");// Perform the operation of broadcasting data
result.print(a);
}
}
Copy the code
It’s going to take a long time to run, and it’s going to print out three, four, five and their ages
1.5.5 Flink之Counter
Accumulator is similar to the application scenario of Mapreduce Counter. It can observe the data changes during the running of a task well. Accumulator can be operated in the operator function of Flink job task, but the final result of Accumulator can only be obtained after the task is executed.
Counter is a specific Accumulator that implements IntCounter, LongCounter and DoubleCounter
usage
1: Creates an accumulator
private IntCounter numLines = new IntCounter();
2: Registers the accumulator
getRuntimeContext().addAccumulator("num-lines".this.numLines);
3: Use an accumulator
this.numLines.add(1);
4: Gets the result of the accumulator
myJobExecutionResult.getAccumulatorResult("num-lines")
Copy the code
The sample code
public class CounterDemo {
public static void main(String[] args) throws Exception{
// Get the operating environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.fromElements("a"."b"."c"."d");
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
//1: create accumulator
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2: registers the accumulator
getRuntimeContext().addAccumulator("num-lines".this.numLines);
}
//int sum = 0;
@Override
public String map(String value) throws Exception {
// If the degree of parallelism is 1, use the ordinary summation, but set multiple degrees of parallelism, the ordinary summation is incorrect
//sum++;
/ / System. Out. Println (" sum: "+ sum);
this.numLines.add(1);
return value;
}
}).setParallelism(8);
// If you want to get the value of counter, it can only be a task
//result.print();
result.writeAsText("d:\\data\\mycounter");
JobExecutionResult jobResult = env.execute("counter");
//3: get the accumulator
int num = jobResult.getAccumulatorResult("num-lines");
System.out.println("num:"+num);
}
}
Copy the code
To here the operator of batch processing also mentioned about, interested friends can copy the code to run and see, not interested in words, that is also ok, anyway, now basically are using real-time operator, not too much impact
1.5.6 State of the operator
Back to the word count example
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> data = env.socketTextStream("localhost".8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
collector.collect(new Tuple2<>(word, 1));
}
}
}).keyBy("0")
.sum(1);
result.print(a);
env.execute("WordCount");
}
}
Copy the code
Note that we must listen on port 8888 to start the program, otherwise a connect Refuse error will be reported. Since I was running under Windows, I set up a Netcat to assist. At this time, I started Netcat first, and then NC-LK 8888 listened on port 8888
Then I enter some words, and we look at our printed message
4 >(hadoop,1)
4 >(hadoop,2)
4 >(flink,1)
4 >(flink,2)
1 >(hive,1)
1 >(hive,2)
1 >(hive,3)
Copy the code
Flink is really real-time processing, one by one, and you’ll find that it’s easy to do the sums that you need to use updateStateByKey or mapWithState operators in Spark
Why is that? Because of what the website says: Flink is a stateful data stream
Therefore, state is a key point in our study of Flink. We’ll talk about that later
finally
…