Here is a brief description of the installation process, and the use of IDEA to develop a simple flow handler, local debugging or submitted to Flink to run, Maven and JDK installation is not described here.
I. Introduction to Flink
Flink was born out of StratoSphere, a European big data research project. The project is a research project at the Technical University of Berlin. In its early days, Flink was Batch computing, but in 2014, a core member of The StratoSphere incubated Flink, which later became Apache’s top big data project that year. At the same time, the mainstream direction of Flink computing is positioned as Streaming, which uses Streaming computing to do all the big data calculation, which is the background of the birth of Flink technology.
In 2015, Ali began to intervene in Flink to optimize resource scheduling and streaming SQL, and established alibaba’s internal version Blink. In the recently updated version 1.9, Blink began to merge into Flink.
In the future, Flink will also support Java, Scala, Python and many more languages, as well as play a role in machine learning.
Ii. Flink development environment construction
First of all to run Flink, we need to download and unzip the Flink binary package, download address is as follows: flink.apache.org/downloads.h…
We can choose the Flink and Scala version. Here we choose the latest 1.9 version of Apache Flink 1.9.0 for Scala 2.12 to download.
Flink installation and deployment for Windows and Linux can be seen in Flink Quick Start – Installation and Example Run, which demonstrates the Windows version.
After the installation is successful, start the CLI, go to the flink folder, and run start-cluster.bat in the bin directory
$ cd flink
$ cd bin
$ start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.
Copy the code
After the successful startup, we can visit http://localhost:8081/ in the browser and see the flink management page.
Iii. Flink fast experience
Make sure you have Flink installed, as well as Maven 3.0.4 and Java 8 or later. Here is a brief overview of the Maven build process.
Other detailed build methods are welcome: Quickly build the first Flink project
1. Set up Maven project
Build a project using Flink Maven Archetype.
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ - DarchetypeVersion = 1.9.0Copy the code
You can edit your own artifactId groupId
The directory structure is as follows:
├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ ├ ─ ├ ─ Java ├ ─ Java ├ ─log4j.properties
Copy the code
Core dependencies in POM:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> The < artifactId > flink - streaming - java_2. 11 < / artifactId > < version >${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Copy the code
2. Write code
StreamingJob
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStreaming = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStreaming.print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for(String word : sentence.split("")){ out.collect(new Tuple2<String, Integer>(word, 1)); }}}}Copy the code
3. Debug the program
Install netcat for simple debugging.
Start Netcat input:
nc -l 9999
Copy the code
Start the program
Enter a few words in Netcat separated by commas
View the results on one side of the program
4. Submit the program to Flink
Start the flink
Bat for Windows: start-cluster.bat for Linux: start-cluster.shCopy the code
Localhost :8081 View the management page
Package the code through Maven
Submit the typed package to Flink
To view the log
tail -f log/flink-***-jobmanager.out
Copy the code
Continue typing the word in Netcat, view the job status in Running Jobs, and view the output in log.
4. Flink programming model
Flink provides different levels of abstraction to develop stream/batch applications.
The lowest level of abstraction provides only stateful flows.
In practice, most applications do not require the above low-level abstractions and instead program against Core apis such as DataStream API (bounded/unbounded streams) and DataSet API (bounded datasets).
The Table Api declares a Table that follows a relational model.
The highest abstraction is SQL.
We only use the DataStream API here.
The basic building blocks of the Flink program are flows and transformations.
The basic elements of a program:
L Access the Execution Environment
L Load/create raw data
L Specifies the method of converting the data
L Specify the location where the calculation results are stored
L Trigger program execution
5. DataStreaming API usage
1. Capture the Execution Environment
StreamExecutionEnvironment is the foundation of all Flink program, access methods are:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String … jarFiles)
Typically, getExecutionEnvironment is used. If you are in the IDE or regular Java program execution can create based on the local machine through createLocalEnvironment StreamExecutionEnvironment. If you have created a JAR and want to invoke the getExecutionEnvironment method inside it, use the createRemoteEnvironment method.
Load/create raw data
Some StreamExecutionEnvironment provides access to the data source interface
(1) File-based data source
readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
Copy the code
(2) Socket-based data source (used in this article)
l socketTextStream
(3) Data source based on Collection
fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)
Copy the code
3. Transformation method
Map mode: DataStream -> DataStream
The Hive UDF function takes an element and outputs an element
For example:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return2 * value; }});Copy the code
(2) FlatMap: DataStream -> DataStream
Get an element and output multiple values, similar to Hive UDTF
For example:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split("")){ out.collect(word); }}});Copy the code
3. Filter DataStream -> DataStream
Function: Checks whether the function returns true for each element, leaving only elements that return true
For example:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
Copy the code
DataStream -> KeyedStream
Function: Logically divides a stream into disjoint partitions, each with elements of the same key
For example:
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Copy the code
(5) Reduce mode: KeyedStream -> DataStream
Function: Rotate training reduce in keyed data stream.
For example:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
returnvalue1 + value2; }});Copy the code
(6) Aggregations: KeyedStream -> DataStream
Function: Perform aggregation operation in keyed data stream
For example:
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
Copy the code
(7) Window mode: KeyedStream -> WindowedStream
Function: Used in KeyedStream to group each key with Windows based on a certain feature.
For example:
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Copy the code
DataStream -> AllWindowedStream
Run the following command to group data based on a feature in DataStream:
For example:
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Copy the code
Union: DataStream* -> DataStream
Function: Merge multiple data streams into a new data stream
For example:
dataStream.union(otherStream1, otherStream2, ...) ;Copy the code
DataStream -> SplitStream
Function: Split a stream into multiple streams
For example:
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
returnoutput; }});Copy the code
(11) Select SplitStream -> DataStream
Function: Select a stream from a split stream
For example:
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even"."odd");
Copy the code
4. Output data
writeAsText()
writeAsCsv(...)
print(a) /printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink
Copy the code
More Flink related principles:
Flink: a real-time computing framework for time travel
The king of real-time processing of big data -Flink
Unified batch stream processing – Flink batch stream integrated implementation principle
Flink Quick Start – Setup and sample run
Quickly build the first Flink project
For more real-time computing,Flink,Kafka and other related technology posts, welcome to real-time streaming computing: