This is the second day of my participation in Gwen Challenge

Flink API details & practical operation


  • Stateful Stream Processing

The lowest level of abstraction was a stateful streaming interface. This interface is integrated into the DataStream API through ProcessFunction. This interface allows users to freely process events from one or more streams with a consistent fault-tolerant state. Alternatively, users can implement complex calculations by registering event Time and processing Time methods to handle callback functions

  • DataStream/DataSet API

DataStream/DataSet API is the core API provided by Flink. DataSet processes bounded data sets, and DataStream processes bounded or unbounded data streams. The user can convert/calculate the data through various methods (map /flatmap/window/keyby/sum/Max /min/AVg/Join, etc.)

  • Table API

The Table API provides operations such as SELECT, project, JOIN, group-by, and aggregate. However, it is simpler to use and can seamlessly switch between a Table and DataStream/DataSet. It also allows applications to mix the Table API with DataStream and DataSet

  • SQL

The highest level of abstraction that Flink provides is SQL. This layer of abstraction is similar to the Table API in syntax and expressiveness. SQL abstraction interacts closely with the Table API, and SQL queries can be executed directly on tables defined by the Table API

Dataflows Dataflow diagram

In Flink’s world view, everything is a data stream, so for batch computing, that’s just a special case of stream computing

Flink Dataflflows consists of three parts: Source, Transformation, and Sink End

Transformation Processes the data generated by various service logic. Finally, the data is output to the external system by sink (Console, Kafka, Redis, DB……).

Programs developed based on Flink can be mapped to a Dataflows

Dataflows Transformations

FlatMap

DataStream → DataStream iterates through each element in the DataStream and generates N elements, N=0,1,2


/** * flatMap Demo * DataStream → DataStream * traverses each element in the DataStream, producing N elements N= 0,1,2 */
public class FlinkDemo0001 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream= environment
                .socketTextStream("192.168.88.180".8888);
        DataStream<User> newStream= stream
                .flatMap(new FlatMapFunction<String, User>() {
            @Override
            public void flatMap(String s, Collector<User> collector)  {
                try {
                    String[] sp=s.split(",");
                    collector.collect(User.builder()
                            .id(Long.parseLong(sp[0]))
                            .name(sp[1])
                            .age(sp[2])
                            .build());
                }catch (Exception ex){
                    System.out.println(ex);
                }
            }
        }).setParallelism(2);

        newStream.print();
        environment.execute("flatMap Demo"); }}/*User Model*/
@Data
@Builder
public class User {
    private  Long id;
    private  String name;
    private  String age;
}

Copy the code
The test data# # # console
nc -lk  8888
### Input parameters1,1,1 2,2,2, three filling and filling#### sout
User(id=1, name=1, age=1)
User(id=2, name=2, age=2)
java.lang.NumberFormatException: For input string: ""
User(id=3, name=3, age=3)

Copy the code

Filter

DataStream → DataStream filter operator calculates a Boolean value based on the elements of the DataStream, true for reserved, false for filtered

/** * filter Demo * DataStream → DataStream * the filter operator, which calculates a Boolean value based on the elements of the DataStream, true means reserved, false means filtered */
public class FlinkDemo0002 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream= environment
                .socketTextStream("192.168.88.180".8888);
        DataStream<User> newStream= stream
                .flatMap(new FlatMapFunction<String, User>() {
                    @Override
                    public void flatMap(String s, Collector<User> collector)  {
                        try {
                            String[] sp=s.split(",");
                            collector.collect(User.builder()
                                    .id(Long.parseLong(sp[0]))
                                    .name(sp[1])
                                    .age(sp[2])
                                    .build());
                        }catch (Exception ex){
                            System.out.println(ex);
                        }
                    }
                })
                .filter(new RichFilterFunction<User>() {
                    @Override
                    public boolean filter(User user) throws Exception {
                        /** Only arguments */ whose age is greater than 25 are processed
                        if (Integer.parseInt(user.getAge())>25) {return true;
                        }
                        return false;
                    }
                }).setParallelism(2);

        newStream.print();
        environment.execute("filter demo"); }}Copy the code
# # # console
nc -lk  8888
### Input parameters
1,huang,26
2,kun,24
3,jie,23
4,la,29
#### sout
 User(id=1, name=huang, age=26)
 User(id=4, name=la, age=29)
If the age is less than 25, the filter will be removed
Copy the code

keyBy

(Generally with group calculation)

DataStream → KeyedStream Partitions data according to the specified field in the data flow. The data with the same specified field value must be in the same partition. The internal partition uses the HashPartitioner

There are three ways to specify a partition field:

  1. Specified by index number
  2. Specified by an anonymous function
  3. Specify partition fields by implementing the KeySelector interface
public class FlinkDemo0003_1 {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream= environment
                .socketTextStream("192.168.88.180".8888);
     DataStream <CoffeeShop> dataStream=   stream.flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
         @Override
         public void flatMap(String s, Collector<CoffeeShop> collector) {
             try{
                 String [] sp= s.split(",");
                 collector.collect(CoffeeShop.builder()
                         .id(Long.parseLong(sp[0]))
                         .date(format.parse(sp[1]))
                         .sales(Integer.parseInt(sp[2]))
                         .build());
             }catch(Exception ex){ System.out.println(ex); }}})/** Group by id */
                .keyBy(CoffeeShop::getId)
                 /** Sum the sales */
                .sum("sales");
        dataStream.print();
        environment.execute("keyBy & sum demo"); }}Copy the code

# # # console
nc -lk  8888
### Input parameters1, 221-10-1,1,2, 221-10-1,1, 3, 221-10-1,1,2, 221-10-2,2, 3, 221-10-2,2,1, 221-10-3,1, 221-10-3,2, 221-10-3,2 1-10 of 3202-3, 3#### sout
 User(id=1, name=huang, age=26)
 User(id=4, name=la, age=29)
It can be seen that every time the sales with the same ID will be accumulated, the last sales with the same ID must be the sum
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=2)
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=4)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=2)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=3)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=3)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=5)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=6)
Copy the code

Reduce

KeyedStream: Grouping by key → DataStream Reduce is aggregated based on partitioned stream objects. That is, objects of DataStream type cannot call reduce (they must be grouped first).

public class FlinkDemo0004 {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180".8888);
        DataStream dataStream = stream.
                flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
            @Override
            public void flatMap(String s, Collector<CoffeeShop> collector) {
                try {
                    String[] sp = s.split(",");
                    collector.collect(CoffeeShop.builder()
                            .id(Long.parseLong(sp[0]))
                            .date(format.parse(sp[1]))
                            .sales(Integer.parseInt(sp[2]))
                            .build());
                } catch(Exception ex) { System.out.println(ex); }}})/** Group by id */
                .keyBy(CoffeeShop::getId).reduce(new ReduceFunction<CoffeeShop>() {
                    @Override

                    public CoffeeShop reduce(CoffeeShop coffeeShop, CoffeeShop t1)
 throws Exception {
                        /*** * only retain large sales volume after aggregation partition */
                        return newCoffeeShop(coffeeShop.getId(), coffeeShop.getDate(), Math.max(coffeeShop.getSales(), t1.getSales()) ); }}); dataStream.print().setParallelism(1);
        environment.execute("reduce demo"); }}Copy the code

Aggregations

KeyedStream → DataStream Aggregations represent a type of aggregation operators. The specific operators are as follows:

  • sum
  • min
  • max
  • minBy
  • maxBy

The union’s merger

DataStream* → DataStream merges two or more data streams to create a new DataStream that contains elements of the merged data streams. Note: ensure that the elements in the DataStream are of the same type

/** * union Demo * DataStream * → DataStream * merge two or more data streams to create a new DataStream containing the elements of the merged data streams * note: ensure that the elements in the DataStream are of the same type */
public class FlinkDemo0006 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream1 = environment
                .socketTextStream("192.168.88.180".8888);
        DataStreamSource<String> stream2 = environment
                .socketTextStream("192.168.88.181".8888);
        stream1.union(stream2).print();
        environment.execute("union demo"); }}Copy the code
Console 192.168.88.180
nc -lk  8888
### Input parameters1-10-1, 1, 1202Console 192.168.88.181
nc -lk  8888
### Input parameters1-10-1, 1, 2202#### sout1-10 of 1202-1, 1 1-10-1, 1, 2202## You can see that the two streams are merged into one stream for processing
Copy the code

Connect the false merger

DataStream,DataStream → ConnectedStreams Combines two data streams and retains the data type of both streams, able to share the state of both streams

public class FlinkDemo0007 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
           StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream1 = environment
                .socketTextStream("192.168.88.180".8888);
        DataStreamSource<String> stream2 = environment
                .socketTextStream("192.168.88.181".8888);
        ConnectedStreams connectedStreams = stream1.connect(stream2);
        connectedStreams.getFirstInput().print();
        connectedStreams.getSecondInput().print();
        DataStream dataStream = connectedStreams
                       .map(new RichCoMapFunction<String, String, String>() {
            @Override
            public String map1(String s) throws Exception {
                return s;
            }

            @Override
            public String map2(String s) throws Exception {
                returns; }}); environment.execute("connect demo"); }}Copy the code
Console 192.168.88.180
nc -lk  8888
### Input parameters1-10-1, 1, 1202Console 192.168.88.181
nc -lk  8888
### Input parameters1-10-1, 1, 2202#### sout1-10 of 1202-1, 1 1-10-1, 1, 2202## You can see that the two streams are merged into one stream for processing, but you still have to process the two streams separately

Copy the code

Question: What is the difference between Connect and Connect?

  1. Connect can be of different data types. Connect can only merge two streams;
  2. A Union can merge multiple streams. The data structure of the Union must be the same.

CoMap, CoFlatMap

ConnectedStreams → DataStream CoMap, CoFlatMap is not the name of an operator, it’s the name of a class of operations that do a map traversal based on a ConnectedStreams DataStream, This type of operation is called CoMap and when you do flatMap traversal based on a ConnectedStreams data stream, it’s called a CoFlatMap

public class FlinkDemo0008 {

    public static void main(String[] args) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream1 = environment
                .socketTextStream("192.168.88.180".8888);
        DataStreamSource<String> stream2 = environment
                .socketTextStream("192.168.88.181".8888);
        ConnectedStreams connectedStreams = stream1.connect(stream2);
        DataStream dataStream = connectedStreams.flatMap(new RichCoFlatMapFunction<String,String, CoffeeShop>() {

            @Override
            public void flatMap1(String s, Collector<CoffeeShop> collector) 
                    throws Exception {
             String[] sp=s.split(",");
                collector.collect(CoffeeShop.builder()
                        .id(Long.parseLong(sp[0]))
                        .date(format.parse(sp[2]))
                        .sales(Integer.parseInt(sp[3]))
                        .build());
            }
            @Override
            public void flatMap2(String s, Collector<CoffeeShop> collector) 
                    throws Exception {
                String[] sp=s.split(",");
                collector.collect(CoffeeShop.builder()
                        .id(Long.parseLong(sp[0]))
                        .date(format.parse(sp[1]))
                        .sales(Integer.parseInt(sp[2])) .build()); }}); dataStream.print("> > > > > > >").setParallelism(1);
        environment.execute("connect demo"); }}Copy the code

Console 192.168.88.180
nc -lk  8888
### Input parameters1, Uni-President Ulemei,2021-10-1,1Console 192.168.88.181
nc -lk  8888
### Input parameters1-10-1, 1, 1202#### sout
>>>>>>>> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
>>>>>>>> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
## You can see that the two streams are merged into one stream for processing, but you still have to process the two streams separately
##coFlapMap/coMap can merge two incomplete streams into one stream for processing accordingly

Copy the code

2.12 Split (delete)

DataStream → SplitStream

Divide a stream into two or more streams depending on conditions

Select (2.12 has been deleted)

SplitStream → DataStream

Select one or more data streams from SplitStream

Side output Side output flow

The flow calculation process may encounter the separation of data flows based on different conditions. Filter segmentation causes unnecessary data replication

public class FlinkDemo0009 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180".8888);
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

        final OutputTag<CoffeeShop> outputTag = new OutputTag<CoffeeShop>("side-output") {}; SingleOutputStreamOperator dataStream = stream .flatMap(new RichFlatMapFunction<String, CoffeeShop>() {

                    @Override
                    public void flatMap(String s, Collector<CoffeeShop> collector) {
                        try {
                            String sp[] = s.split(",");

                            collector.collect(CoffeeShop.builder()
                                    .id(Long.parseLong(sp[0]))
                                    .date(format.parse(sp[1]))
                                    .sales(Integer.parseInt(sp[2]))
                                    .build());
                        } catch (Exception ex) {

                            System.out.println(ex);

                        }
                    }
                })
                .process(new ProcessFunction<CoffeeShop, CoffeeShop>() {

                    @Override
                    public void processElement(CoffeeShop s, Context context, Collector<CoffeeShop> collector) {
                        try {
                            if (s.getId() % 2= =0) {
                                collector.collect(s);
                            } else{ context.output(outputTag, s); }}catch(Exception ex) { System.out.println(ex); }}}); dataStream.getSideOutput(outputTag).print("outputTag>>>>>>>>>>>>>");
        dataStream.print("stream>>>>>>>>>>>>>");

        environment.execute("process demo"); }}Copy the code
# # # console192.16888.180.
nc -lk  8888### Input parameters1.2021-10-1.1
2.2021-10-1.2
3.2021-10-1.3
4.2021-10-2.4
5.2021-10-2.5
6.2021-10-2.6
7.2021-10-3.7
8.2021-10-3.8
9.2021-10-3.9

#### sout
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
outputTag>>>>>>>>>>>>>:3> CoffeeShop(id=3, date=Fri Oct 01 00:00:00 CST 2021, sales=3)
stream>>>>>>>>>>>>>:4> CoffeeShop(id=4, date=Sat Oct 02 00:00:00 CST 2021, sales=4)
stream>>>>>>>>>>>>>:2> CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=2)
outputTag>>>>>>>>>>>>>:3> CoffeeShop(id=7, date=Sun Oct 03 00:00:00 CST 2021, sales=7)
stream>>>>>>>>>>>>>:4> CoffeeShop(id=8, date=Sun Oct 03 00:00:00 CST 2021, sales=8)
stream>>>>>>>>>>>>>:2> CoffeeShop(id=6, date=Sat Oct 02 00:00:00 CST 2021, sales=6)
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=5, date=Sat Oct 02 00:00:00 CST 2021, sales=5)
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=9, date=Sun Oct 03 00:00:00 CST 2021, sales=9## You can see that the stream is split into two streams by id parityCopy the code

Iterate (is more important)

DataStream → IterativeStream → DataStream provides support for iterating data streams. The iterator consists of two parts: Iteration body and termination iteration condition The data flows that do not meet the termination iteration condition are returned to the stream stream, and the data flows that meet the termination iteration condition continue to be sent downstream for the next iteration


public class FlinkDemo0010 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180".8888);
        IterativeStream<Long> iterate = stream
                .map(new RichMapFunction<String, Long>() {
                    @Override
                    public Long map(String s) {
                        return Long.parseLong(s);
                    }
                }).iterate();

        DataStream<Long> feedback = iterate
                .map(new RichMapFunction<Long, Long>() {

                    @Override
                    public Long map(Long aLong) throws Exception {

                        return aLong > 2 ? aLong : (aLong + 1);
                    }
                }).filter(new FilterFunction<Long>() {
                    @Override
                    public boolean filter(Long s) throws Exception {
                        if (s > 2) {
                            return false;
                        } else {
                            return true; }}}); iterate.closeWith(feedback).print("feedback");
        SingleOutputStreamOperator<Long> result = iterate.
filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value >= 2; }}); result.print("result:");
        environment.execute("iterate demo"); }}Copy the code
# # # console192.16888.180.
nc -lk  8888### Input parameters1
2
3
4
5
#### sout
feedback> 2
result> 2
result> 2
result> 3
result> 4
result> 5# we can see that the number is1If the jump condition is not met, the map is iterated until the jump condition is metCopy the code

Function classes and rich function classes

When using the Flink operator, you can pass in anonymous functions and function-like objects

Function class is divided into: ordinary function class, rich function class ** (self-partition) **

Compared with ordinary functions, rich function classes can obtain the Context of the running environment, have some lifecycle methods, manage state, and can achieve more complex functions

Functional class classification

The rich function class inheriting AbstractRichFunction can adapt to the AbstractRichFunction method to obtain context and perform some special operations

public class FlinkDemo0011 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180".8888);
         stream.flatMap(new RichFlatMapFunction<String, Long>() {
             @Override
             public void flatMap(String s, Collector<Long> collector) throws Exception {
                 collector.collect(Long.parseLong(s));
             }
         }).filter(new RichFilterFunction<Long>() {
             @Override
             public void open(Configuration parameters)  {
                 /** * getRuntimeContext() method provided by AbstractRichFunction * to obtain runtime context information ** */
                 System.out.println(getRuntimeContext().getTaskName());
                 System.out.println(getRuntimeContext().getTaskNameWithSubtasks());
             }
            @Override
            public boolean filter(Long s)  {
                return s<100;
            }
         }).print();

        environment.execute("demo"); }}Copy the code

The underlying API (ProcessFunctionAPI)

It is a low-level API. The operators such as Map, filter and flatMap we mentioned above are based on the lower-level API encapsulated at this high-level. The more powerful the functions are, the more information users can obtain, such as element status information, event time, timer setting, etc


public class FlinkDemo0012 {

    public static void main(String[] args) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180".8888);
        final OutputTag<CoffeeShop> outputTag = new OutputTag<CoffeeShop>("output") {}; SingleOutputStreamOperator dataStream = stream. flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
                    @Override
                    public void flatMap(String s, Collector<CoffeeShop> collector) {
                        try {
                            String[] sp = s.split(",");
                            collector.collect(CoffeeShop.builder()
                                    .id(Long.parseLong(sp[0]))
                                    .date(format.parse(sp[1]))
                                    .sales(Integer.parseInt(sp[2]))
                                    .build());
                        } catch(Exception ex) { System.out.println(ex); }}})/** Group by id */
                .keyBy(CoffeeShop::getId)
                .process(new KeyedProcessFunction<Long, CoffeeShop, String>() {

                    @Override
                    public void processElement( CoffeeShop coffeeShop, Context context, Collector
       
         collector)
         {
                        try{
                            /** Current processing time */
                            long currentTime=  context.timerService()
                                    .currentProcessingTime();
                            if(coffeeShop.getSales()>100) {long timerTime = currentTime + 3 * 1000;
                                 context.timerService()
                                         .registerProcessingTimeTimer(timerTime);
                            }else{
                                collector.collect(coffeeShop
                                        .toString()+"Not 100."); }}catch(Exception ex){ System.out.println(ex); }}/** Sets the clock time */
                    @Override
                    public void onTimer(
																			 long timestamp,
																			 OnTimerContext ctx,
																			 Collector<String> out
)  {
                        out.collect( ctx.getCurrentKey()+"Sales volume reached: 100"); }}); dataStream.print("dataStream").setParallelism(1);

        environment.execute("process demo"); }}Copy the code
# # # console192.16888.180.
nc -lk  8888### Input parameters1.2021-10-1.99
2.2021-10-1.1011
3.2021-10-1.100
4.2021-10-2.88
5.2021-10-2.101
6.2021-10-2.102
#### sout
dataStream> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=99) did not reach100
dataStream> CoffeeShop(id=4, date=Sat Oct 02 00:00:00 CST 2021, sales=88) did not reach100
dataStream> CoffeeShop(id=3, date=Fri Oct 01 00:00:00 CST 2021, sales=100) did not reach100
dataStream> 2Sales reached:100  3DataStream in seconds >5Sales reached:100  3DataStream in seconds >6Sales reached:100  3Seconds after arrivalCopy the code

conclusion

Using the Map Filter… Operator fit, can directly pass in an anonymous function,

MapFuncation FilterFunction

Rich function class objects (RichMapFunction, RichFilterFunction)

The rich function class object passed in: you can get the context of the task execution, lifecycle methods, administrative state…..

If the business is more complex, providing these operators through Flink cannot meet our needs, using the lower level API directly through the process operator (using this API context, life cycle method, test output flow, time service)

KeyedDataStream calls Process KeyedProcessFunction

DataStream calls Process ProcessFunction

Specific code to write suitable, see hints on the line