Here is a brief description of the installation process, and the use of IDEA to develop a simple flow handler, local debugging or submitted to Flink to run, Maven and JDK installation is not described here.

I. Introduction to Flink

Flink was born out of StratoSphere, a European big data research project. The project is a research project at the Technical University of Berlin. In its early days, Flink was Batch computing, but in 2014, a core member of The StratoSphere incubated Flink, which later became Apache’s top big data project that year. At the same time, the mainstream direction of Flink computing is positioned as Streaming, which uses Streaming computing to do all the big data calculation, which is the background of the birth of Flink technology.

In 2015, Ali began to intervene in Flink to optimize resource scheduling and streaming SQL, and established alibaba’s internal version Blink. In the recently updated version 1.9, Blink began to merge into Flink.

In the future, Flink will also support Java, Scala, Python and many more languages, as well as play a role in machine learning.

Ii. Flink development environment construction

First of all to run Flink, we need to download and unzip the Flink binary package, download address is as follows: flink.apache.org/downloads.h…

We can choose the Flink and Scala version. Here we choose the latest 1.9 version of Apache Flink 1.9.0 for Scala 2.12 to download.

Flink installation and deployment for Windows and Linux can be seen in Flink Quick Start – Installation and Example Run, which demonstrates the Windows version.

After the installation is successful, start the CLI, go to the flink folder, and run start-cluster.bat in the bin directory

$ cd flink
$ cd bin
$ start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.
Copy the code

After the successful startup, we can visit http://localhost:8081/ in the browser and see the flink management page.

Iii. Flink fast experience

Make sure you have Flink installed, as well as Maven 3.0.4 and Java 8 or later. Here is a brief overview of the Maven build process.

Other detailed build methods are welcome: Quickly build the first Flink project

1. Set up Maven project

Build a project using Flink Maven Archetype.

$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ - DarchetypeVersion = 1.9.0Copy the code

You can edit your own artifactId groupId

The directory structure is as follows:

├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ ├ ─ ├ ─ Java ├ ─ Java ├ ─log4j.properties
Copy the code

Core dependencies in POM:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> The < artifactId > flink - streaming - java_2. 11 < / artifactId > < version >${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>
Copy the code

2. Write code

StreamingJob

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingJob {

	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStream<Tuple2<String, Integer>> dataStreaming = env
				.socketTextStream("localhost", 9999)
				.flatMap(new Splitter())
				.keyBy(0)
				.timeWindow(Time.seconds(5))
				.sum(1);

		dataStreaming.print();

		// execute program
		env.execute("Flink Streaming Java API Skeleton");
	}
	public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

		@Override
		public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
			for(String word : sentence.split("")){ out.collect(new Tuple2<String, Integer>(word, 1)); }}}}Copy the code

3. Debug the program

Install netcat for simple debugging.

Start Netcat input:

nc -l 9999
Copy the code

Start the program

Enter a few words in Netcat separated by commas

View the results on one side of the program

4. Submit the program to Flink

Start the flink

Bat for Windows: start-cluster.bat for Linux: start-cluster.shCopy the code

Localhost :8081 View the management page

Package the code through Maven

Submit the typed package to Flink

To view the log

tail -f log/flink-***-jobmanager.out
Copy the code

Continue typing the word in Netcat, view the job status in Running Jobs, and view the output in log.

4. Flink programming model

Flink provides different levels of abstraction to develop stream/batch applications.

The lowest level of abstraction provides only stateful flows.

In practice, most applications do not require the above low-level abstractions and instead program against Core apis such as DataStream API (bounded/unbounded streams) and DataSet API (bounded datasets).

The Table Api declares a Table that follows a relational model.

The highest abstraction is SQL.

We only use the DataStream API here.

The basic building blocks of the Flink program are flows and transformations.

The basic elements of a program:

L Access the Execution Environment

L Load/create raw data

L Specifies the method of converting the data

L Specify the location where the calculation results are stored

L Trigger program execution

5. DataStreaming API usage

1. Capture the Execution Environment

StreamExecutionEnvironment is the foundation of all Flink program, access methods are:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String … jarFiles)

Typically, getExecutionEnvironment is used. If you are in the IDE or regular Java program execution can create based on the local machine through createLocalEnvironment StreamExecutionEnvironment. If you have created a JAR and want to invoke the getExecutionEnvironment method inside it, use the createRemoteEnvironment method.

Load/create raw data

Some StreamExecutionEnvironment provides access to the data source interface

(1) File-based data source

readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
Copy the code

(2) Socket-based data source (used in this article)

socketTextStream

(3) Data source based on Collection

fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)
Copy the code

3. Transformation method

Map mode: DataStream -> DataStream

The Hive UDF function takes an element and outputs an element

For example:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return2 * value; }});Copy the code

(2) FlatMap: DataStream -> DataStream

Get an element and output multiple values, similar to Hive UDTF

For example:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split("")){ out.collect(word); }}});Copy the code

3. Filter DataStream -> DataStream

Function: Checks whether the function returns true for each element, leaving only elements that return true

For example:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
Copy the code

DataStream -> KeyedStream

Function: Logically divides a stream into disjoint partitions, each with elements of the same key

For example:

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Copy the code

(5) Reduce mode: KeyedStream -> DataStream

Function: Rotate training reduce in keyed data stream.

For example:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        returnvalue1 + value2; }});Copy the code

(6) Aggregations: KeyedStream -> DataStream

Function: Perform aggregation operation in keyed data stream

For example:

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
Copy the code

(7) Window mode: KeyedStream -> WindowedStream

Function: Used in KeyedStream to group each key with Windows based on a certain feature.

For example:

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Copy the code

DataStream -> AllWindowedStream

Run the following command to group data based on a feature in DataStream:

For example:

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
Copy the code

Union: DataStream* -> DataStream

Function: Merge multiple data streams into a new data stream

For example:

dataStream.union(otherStream1, otherStream2, ...) ;Copy the code

DataStream -> SplitStream

Function: Split a stream into multiple streams

For example:

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        returnoutput; }});Copy the code

(11) Select SplitStream -> DataStream

Function: Select a stream from a split stream

For example:

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even"."odd");
Copy the code

4. Output data

writeAsText()
writeAsCsv(...)
print(a) /printToErr() 
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink
Copy the code

More Flink related principles:

Flink: a real-time computing framework for time travel

The king of real-time processing of big data -Flink

Unified batch stream processing – Flink batch stream integrated implementation principle

Flink Quick Start – Setup and sample run

Quickly build the first Flink project

For more real-time computing,Flink,Kafka and other related technology posts, welcome to real-time streaming computing: