preface

We finally ushered in our focus, the following code is the official website to explain the operator used in the code, so if you feel the official website looks difficult, come over here we slowly look at it is also ok

First, pull

1.1 Flink wordCount complete code

Let’s go back to the word count from last time

/ * *

* Word count

* /


public class WordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> data = env.socketTextStream("localhost".8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override

            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {

                String[] fields = line.split(",");

                for (String word : fields) {

                    collector.collect(new Tuple2<>(word, 1));

                }

            }

        }).keyBy("0")

                .sum(1);



        result.print(a);



        env.execute("WordCount");

    }

}

Copy the code

### 1.2 Code flow analysis

The logic of the word count is not clear, it’s all the same. Start with the first sentence. First we define a configuration

Configuration conf = new Configuration();

Copy the code

Then there is the entry to the program

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Copy the code

This word count we have to pay attention to for code

env.execute("test word count");

Copy the code

So what I’m doing here is I’m throwing an exception. Why is that? Because if there’s an exception here in production, and your task starts up wrong, what’s the point of catching it? So let’s just throw it out here.

Then obtain data from socket, corresponding code is

DataStreamSource<String> dataStream = env.socketTextStream("localhost".8888).setParallelism(1);

Copy the code

After that, it’s word counting.

If you open Up Netcat and type in a few words, you’ll find that Flink stores intermediate states. If you do this via Spark Streaming, you’ll have to check checkPoint. UpdateStateByKey or mapWithState, or we store the intermediate state in some storage medium, such as Redis, Hbase, etc. Let’s talk more about how Flink implements this kind of additive function through states

1.3 Flink的state

State: Indicates the status of a specific task/operator. State can be recorded and data can be recovered in case of failure. There are two basic types of State in Flink: Keyed State and Operator State, both of which can exist in two forms: Raw state and Managed State

For example, after the word count is “keyBy()”, the operator is “key state”, but if I delete this, the operator is “operate state”, which is simply “keyBy”

Managed state: The state managed by the Flink framework, which is what we usually use.

Raw status: The user manages the data structure of the status by himself. During checkpoint checkpoint, byte[] is used to read and write status content, but the internal data structure is not known. It is generally recommended to use the managed state on DataStream, but the original state is used when implementing a user-defined operator. However, when we use Flink, there is almost no custom state.

1.3.1 operator state

There is no shuffle state, in other words, no keyBy

  1. Operator State is a task-level state. In other words, each task corresponds to a state
  2. Each task in a Kafka Connector source needs to record information such as partition and offset for the consumed topic.
  3. Operator State Has only one managed state: ValueState

1.3.2 Keyed State

  1. Keyed state records the state of each key

  2. There are six types of Keyed state:

  3. ValueState

  4. ListState

  5. MapState

  6. ReducingState

  7. AggregatingState

  8. FoldingState (this is not too important)

Ii. Demonstration of various states

2.1 ValueState

Now let’s talk about the requirement: when the number of elements with the same key is equal to 3 or more, calculate the average value of those elements. Calculate the average value of every 3 elements in a keyed stream

This requirement is easy to understand. In the code below, the main method emulates a piece of data

DataStreamSource<Tuple2<Long.Long>> dataStreamSource = env.fromElements(

Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), 

Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), 

Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

Copy the code

Here I have key 1 appearing three times, key 2 appearing three times, so I’m just going to average them out, and that’s it. Then our result must be (1,5),(2,3.6666··).

2.1.1 Define a TestKeyedStateMain class

public class TestKeyedStateMain {}

Copy the code

2.3 Prefixes of the code

It’s very simple. You get the entry to the program, and then you throw in the simulation data THAT I just mentioned

At this point, we are simulating key-value data, so we don’t even need the flatMap operation, just keyBy 0 (the first position in the first tupl.of ()), and because we want to average, Flink itself provides a finite number of operators, so we need to do some additional operations.

At this time, we chose flatMap, but we need to implement our own state management, which is a bit of a custom operator

2.1.2 Code for self-management of status

public class CountWindowAverageWithValueState

    extends RichFlatMapFunction<Tuple2<Long.Long>,Tuple2<Long.Double> > {



}

Copy the code

Here we define an input type Tuple2 and an output type Tuple2, which correspond to the key-value pairs of the simulated data and the outputs of (1,5) and (2,3.6666···).

Since we need ValueState to store our state, we initialize a ValueState

// The first Long is used to store the number of occurrences of key

// The second Long represents the total value corresponding to the key

ValueState<Tuple2<Long.Long>> countAndSum;

Copy the code

And we need to note that each key in our data source has its own corresponding valueState

After inheritingRichFlatMapFunction, we can override two methods, one is open() and the other is flatMap(). The open() method will only be executed once. In open, we will register the state, and the state will be managed by Flink.

The register state pattern is fixed, ValueStateDescriptor

@Override

public void open(Configuration parameters) throws Exception {



   // The registration state initializes a description that takes two parameters

   // One argument is a name, and the other is a set argument that corresponds to the type of argument in your Tuple



   // Tuple corresponds to Types. Long, Types. Long ,>

    ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(

            "average".// The name of the state

            Types.TUPLE(Types.LONG, Types.LONG));// The data type of the state store



Get the state from Flink by describing it

    countAndSum = getRuntimeContext().getState(descriptor);

}

Copy the code

Above is the fixed operation, can remember first, simple understanding is to register the state, and then take out to use

@Override

public void flatMap(Tuple2<Long.Long> element,

                    Collector<Tuple2<Long.Double>> out) throws Exception {



    // The number of occurrences of the current key and the sum of corresponding values

    Tuple2<Long.Long> currentState = countAndSum.value();

    // If currentState is null for the first time, initialize to 0

    if(currentState == null) {

        currentState = Tuple2.of(0L,0L);

    }

    // Update the element in the state

    currentState.f0+=1;

    // Update the total value in the status

    currentState.f1+= element.f1;

    // Update the status

    countAndSum.update(currentState);



    / / determine

    if(currentState.f0 >= 3) {

        double avg=(double)currentState.f1 / currentState.f0;

        // Find the corresponding key and its average value

        out.collect(Tuple2.of(element.f0,avg));



        // count (3)

        countAndSum.clear();

    }

}

Copy the code

Note the currentState of f0, the corresponding f1 is Tuple2 two long parameters, here again the main method of flatMap to make up for all the state related CountWindowAverageWithValueState ()

public class TestKeyedStateMain {

    public static void main(String[] args) throws  Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =

                env.fromElements(Tuple2.of(1L.3L), Tuple2.of(1L.5L), Tuple2.of(1L.7L),

                        Tuple2.of(2L.4L), Tuple2.of(2L.2L), Tuple2.of(2L.5L));



        / / output:

        / / (1,5.0)

        / / (2,3.6666666666666665)

        dataStreamSource

                .keyBy(0)

                .flatMap(new CountWindowAverageWithValueState())

                .print();



        env.execute("TestStatefulApi");

    }

}

Copy the code

2.1.3 Complete code overview

/ * *

* ValueState<T> : This state holds a value for each key

* value() gets the status value

* update() updates the status value

* clear() Clears the status

* /


public class CountWindowAverageWithValueState

        extends RichFlatMapFunction<Tuple2<Long.Long>, Tuple2<Long.Double> > {

    // Store the number of occurrences of each key and the total value of the value corresponding to that key

    // managed keyed state

    //1. ValueState stores a state value for a key

    private ValueState<Tuple2<Long.Long>> countAndSum;



    @Override

    public void open(Configuration parameters) throws Exception {

        // Register status

        ValueStateDescriptor<Tuple2<Long.Long>> descriptor =

                new ValueStateDescriptor<Tuple2<Long.Long> > (

                        "average".// The name of the state

                        Types.TUPLE(Types.LONG, Types.LONG)); // The data type of the state store

        countAndSum = getRuntimeContext().getState(descriptor);

    }



    @Override

    public void flatMap(Tuple2<Long.Long> element,

                        Collector<Tuple2<Long.Double>> out) throws Exception {

        // Get the current state of the key

        Tuple2<Long.Long> currentState = countAndSum.value();



        // If the status value is not already initialized, it is initialized

        if (currentState == null) {

            currentState = Tuple2.of(0L, 0L);

        }



        // Update the number of elements in the status value

        currentState.f0 += 1;



        // Update the total value in the status value

        currentState.f1 += element.f1;



        // Update the status

        countAndSum.update(currentState);



        // If the current key appears three times, calculate the average value and print it

        if (currentState.f0 >= 3) {

            double avg = (double)currentState.f1 / currentState.f0;

            // Outputs the key and its average value

            out.collect(Tuple2.of(element.f0, avg));

            // Clear the status value

            countAndSum.clear();

        }

    }

}





public class TestKeyedStateMain {

    public static void main(String[] args) throws  Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        DataStreamSource<Tuple2<Long.Long>> dataStreamSource =

                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),

                        Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));



        / / output:

        / / (1,5.0)

        / / (2,3.6666666666666665)

        dataStreamSource

                .keyBy(0)

                .flatMap(new CountWindowAverageWithValueState())

                .print();



        env.execute("TestStatefulApi");

    }

}

Copy the code

2.2 ListState

Still is to achieve the same function as above, because the routine is basically the same so this does not expand the explanation

import org.apache.flink.api.common.functions.RichFlatMapFunction;

import org.apache.flink.api.common.state.ListState;

import org.apache.flink.api.common.state.ListStateDescriptor;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;

import org.apache.flink.util.Collector;



import java.util.ArrayList;

import java.util.Collections;



public class CountWindowAverageWithListState

        extends RichFlatMapFunction<Tuple2<Long.Long>,Tuple2<Long.Double> > {

    //List contains the number of occurrences of all keys

    ListState<Tuple2<Long.Long>> elementByKey;



    @Override

    public void open(Configuration parameters) throws Exception {

        ListStateDescriptor<Tuple2<Long.Long>> descriptor = new ListStateDescriptor<>(

                "average".

                Types.TUPLE(Types.LONG, Types.LONG));

       elementByKey = getRuntimeContext().getListState(descriptor);

    }



    @Override

    public void flatMap(Tuple2<Long.Long> element,

                        Collector<Tuple2<Long.Double>> out) throws  Exception {

        Iterable<Tuple2<Long.Long>> currentState = elementByKey.get(a);

        / / initialization

        if(currentState == null) {

            elementByKey.addAll(Collections.emptyList());

        }

        // Update the status

        elementByKey.add(element);

        ArrayList<Tuple2<Long.Long>> allElement = Lists.newArrayList(elementByKey.get());

        if(allElement.size() >= 3) {

            long count=0;

            long sum=0;

            for (Tuple2<Long.Long> ele:allElement){

                count++;

                sum += ele.f1;

            }

            double avg=(double) sum/count;

            out.collect(new Tuple2<>(element.f0,avg));

            // Clear data

            elementByKey.clear();

        }



    }

}

Copy the code

2.4 MapState

It does the same thing, but there’s actually a problem with this, because unlike the two states above, mapState has the same key and it does an overwrite operation, which is the same

Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), 

Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), 

Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

Copy the code

This data, after tuple2.of (1L, 3L) comes, tuple2.of (1L, 5L) comes again, and it replaces the previous 3L with 5L instead of counting it. In fact, this is just like a Java map

Data with the same key in mapState will be in the same state, so we will use the string key this time, designed in the form of 1_1,1_2,1_3

2.4.1 Prerequisites

So this time we’re going to use String, and we’re going to do the old open method, which is registered and used, so just remember that

public class CountWindowAverageWithMapState

    extends RichFlatMapFunction<Tuple2<Long.Long>,Tuple2<Long.Double> > {

        private MapState<String,Long> mapState;

        @Override

        public void open(Configuration parameters) throws Exception {

            MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<>(

                    "average".

                    String.class.Long.class);

            mapState = getRuntimeContext().getMapState(descriptor);

        }

    }

Copy the code

2.4.2 Compilation of flatMap

So let’s just take an element and put it into this map structure

@Override

public void flatMap(Tuple2<Long.Long> element,

                    Collector<Tuple2<Long.Double>> out) throws Exception {

    mapState.put(UUID.randomUUID().toString(),element.f1);

    ArrayList<Long> arrayList = Lists.newArrayList(mapState.values());

    if(arrayList.size() >= 3) {

        long count=0;

        long sum=0;

        for (Long ele:arrayList){

            count++;

            sum += ele;

        }

        double avg = (double) sum/count;

        out.collect(new Tuple2<Long.Double>(element.f0,avg));

        mapState.clear();

    }

}

Copy the code

We use the UUID for the key, because UUID is never repeated, so we don’t know what the key looks like, but we don’t care about the key. So it basically doesn’t matter.

We then determine whether the Arraylist is longer than 3, and execute our average algorithm if it is. And then we get the result, which is normal

This result is too false, why the UUID after this key is still so normal a number

Notice at this point that I’m not taking the key that I UUID at all, but the key that I brought with me when the data came in, so it’s fine

2.5 Use ReducingState to realize the function of sum operator

ReducingState has the aggregation effect, so it can simulate the accumulation of SUM and finally get the result

2.5.1 Pre-code

At this point we’re not set in the same way, we’ll first have a ReducingStateDescriptor to receive the description, and in addition to the name, we’ll implement a ReduceFunction interface, followed by a data type long.class

In fact, there is only one more operation to implement the interface

public class SumFunction

    extends RichFlatMapFunction<Tuple2<Long.Long>, Tuple2<Long.Long> > {



        // This object is used to store the cumulative value of the same key

        ReducingState<Long> reducingState;



        @Override

        public void open(Configuration parameters) throws Exception {

            ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(

                    "sum".// The name of the state

                    new ReduceFunction<Long> () {// Aggregate function

                        @Override

                        public Long reduce(Long v1, Long v2) throws Exception {

                            return v1 + v2;

                        }

                    }, Long.class);

            reducingState = getRuntimeContext().getReducingState(descriptor);



        }

Copy the code

And then after that it’s pretty easy

@Override

public void flatMap(Tuple2<Long.Long> element,

                    Collector<Tuple2<Long.Long>> out) throws Exception {

    // Put the data into the state

    reducingState.add(element.f1);

    out.collect(new Tuple2(element.f0,reducingState.get()));



}

Copy the code

2.6 AggregatingState

Now we want to implement a function like, let’s say our data is still the same, and what we want to implement is

(1,Contains:3 and 5 and 7)

(2,Contains:4 and 2 and 5)

Copy the code

The function records all the data with the same key. This operator is basically the most complex 😂, if the front of the feel difficult, that can skip this directly look at the next simulation requirements that example, that example will be very detailed, this state we will use later, so no hurry, when the details

2.6.1 Pre-code

First of all, the key is a long, but the output is a String, so we use a Tuple as the output

Now our description class is a little bit different than usual click on the source code to see

Post past baidu translation look

The first is the input data type, the third is the output data type, and the middle is an additive auxiliary variable. If you implement an interface like new AggregateFunction(), you will find that you have four methods that need to be implemented all at once

CreateAccumulator - Creates an accumulator variable

We just want to put together the results one by one, and this thing is going to be String STR =""The role of

add-- The role of splicing

    return accumulator+" and "+value; Namely the Contains: +value



Merge - this stuff doesn't work here



GetResult -- get the result

This is accumulator+" and "+valueThe final value of

Copy the code

Merge was originally designed to indicate that the results calculated by different tasks need to be merged by a merge operation. However, we now require the same key to be in the same task. In other words, it is impossible for us to meet the situation that the same key and different values calculated by different tasks need to be combined. Even if there are three tasks, the one with key 1 will be in Task1, and the one with key 2 will be in Task2

So at this point in time, it’s really useless

import org.apache.flink.api.common.functions.AggregateFunction;

import org.apache.flink.api.common.functions.RichFlatMapFunction;

import org.apache.flink.api.common.state.AggregatingState;

import org.apache.flink.api.common.state.AggregatingStateDescriptor;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.util.Collector;



public class ContainsValueFunction

        extends RichFlatMapFunction<Tuple2<Long.Long>,Tuple2<Long,String>> {

    AggregatingState<Long, String> aggregatingState;

    @Override

    public void open(Configuration parameters) throws Exception {

        AggregatingStateDescriptor<Long, String, String> descriptor = new AggregatingStateDescriptor<>(

                "totalStr".// The name of the state

                new AggregateFunction<Long, String, String>() {

                    @Override

                    public String createAccumulator() {

                        return "Contains:";

                    }



                    @Override

                    public String add(Long value, String accumulator) {

                        if("Contains:".equals(accumulator)){

                            return accumulator+value;



                        }



                        return accumulator+" and "+value;

                    }



                    @Override

                    public String getResult(String s) {

                        return s;

                    }



                    @Override

                    public String merge(String accumulator1, String accumulator2) {

                        if("Contains:".equals(accumulator1)){

                            return accumulator2;

                        }

                        if("Contains:".equals(accumulator2)){

                            return accumulator1;

                        }

                        String[] fields = accumulator1.split(":");

                        return accumulator2+fields[1];

                    }

                }, String.class);



        aggregatingState = getRuntimeContext().getAggregatingState(descriptor);



    }

Copy the code

The operation in flatMap is to store the data in the aggregatingState, and then output the data key and the elements extracted from the aggregatingState.

    @Override

    public void flatMap(Tuple2<Long, Long> element,

                        Collector<Tuple2<Long, String>> out
) throws Exception 
{

        aggregatingState.add(element.f1);

        out.collect(new Tuple2<>(element.f0,aggregatingState.get()));

    }

}

Copy the code

2.7 FoldingState

2.8 Simulate a requirement

Data with the same order number in the two streams can be merged together, and different lines of business may print different logs, so there are many scenarios where data from different lines of business need to be stitched together, similar to the effect of a real-time ETL

So why don’t we use join? Don’t forget that this is a real time scene, the data is coming fast and slow, of course, some special scenes and methods we don’t consider here

Orderinfo1 data, which is in a topic in Kafka

Order No. : 123, Name: Mop, price: 30. 0

Order No. : 234, product name: Toothpaste, price: 20. 0

Order No. : 345, Name: Quilt, price: 1144.

Order No. : 333, Product name: Cups, price: 1122.

Order No. : 444, Product Name:MacComputer, price: 30,000. 0

Copy the code

Orderinfo2 data, also in another topic in Kafka

Order No. : 123, order time: 2019- 11-11 1011:: 12, place of order: Jiangsu

Order No. : 234, Order time: 2019- 11-11 1111:13:, place of order: Yunnan

Order No. : 345, Order time: 2019- 11-11 1211:14:, place of order: Anhui

Order No. : 333, Order time: 2019- 11-11 1311:: 15, place of order: Beijing

Order No. : 444, Order time: 2019- 11-11 1411:16:, place of order: Shenzhen

Copy the code

Output this :(123, mop,30.0,2019-11-11 10:11:12, jiangsu)

2.8.1 Putting up shelves

Anyway, these two lines of code are still stuck, right

public class OrderETLStream {

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.execute("OrderETLStream");

}

Copy the code

2.8.2 Two Data streams

Of course, kafka itself provides the data, but now I have no local environment, so I first use a custom data source, with object-oriented thinking to simulate

OrderInfo1 and orderInfo2 are very simple, just define the field and provide the constructor, getter, setter, toString basic methods

orderInfo1.java

public class OrderInfo1 {

    / / order number

    private Long orderId;

    / / goods

    private String productName;

    / / price

    private double price;



    public static OrderInfo1 line2Info1(String line){

        String[] fields = line.split(",");

        OrderInfo1 orderInfo1 = new OrderInfo1();

        orderInfo1.setOrderId(Long.parseLong(fields[0]));

        orderInfo1.setProductName(fields[1]);

        orderInfo1.setPrice(Double.parseDouble(fields[2]));

        return orderInfo1;

    }



    public OrderInfo1(a){

    }



    @Override

    public String toString(a) {

        return "OrderInfo1{" +

                "orderId=" + orderId +

                ", productName='" + productName + '\' ' +

                ", price=" + price +

                '} ';

    }



    public OrderInfo1(Long orderId, String productName, double price) {

        this.orderId = orderId;

        this.productName = productName;

        this.price = price;

    }



    public Long getOrderId(a) {

        return orderId;

    }



    public void setOrderId(Long orderId) {

        this.orderId = orderId;

    }



    public String getProductName(a) {

        return productName;

    }



    public void setProductName(String productName) {

        this.productName = productName;

    }



    public double getPrice(a) {

        return price;

    }



    public void setPrice(double price) {

        this.price = price;

    }

}

Copy the code

orderInfo2

public class OrderInfo2 {

    / / order number

    private Long orderId;

    // Order date

    private String orderDate;

    // Place of order

    private String address;



    public static OrderInfo2 line2Info2(String line){

        String[] fields = line.split(",");

        OrderInfo2 orderInfo2 = new OrderInfo2();

        orderInfo2.setOrderId(Long.parseLong(fields[0]));

        orderInfo2.setOrderDate(fields[1]);

        orderInfo2.setAddress(fields[2]);

        return orderInfo2;

    }



    public OrderInfo2(){



    }



    @Override

    public String toString() {

        return "OrderInfo2{" +

                "orderId=" + orderId +

                ", orderDate='" + orderDate + '\' ' +

                ", address='" + address + '\' ' +

                '} ';

    }



    public OrderInfo2(Long orderId, String orderDate, String address) {

        this.orderId = orderId;

        this.orderDate = orderDate;

        this.address = address;

    }



    public Long getOrderId() {

        return orderId;

    }



    public void setOrderId(Long orderId) {

        this.orderId = orderId;

    }



    public String getOrderDate() {

        return orderDate;

    }



    public void setOrderDate(String orderDate) {

        this.orderDate = orderDate;

    }



    public String getAddress() {

        return address;

    }



    public void setAddress(String address) {

        this.address = address;

    }

}

Copy the code

2.8.3 Implementation of data source

It’s not that hard either. Once we’ve implemented the SourceFunction interface we’re going to implement two methods, one run and one cancel

The first argument is filePath, why do you need it? Because we have two data sources, how do we tell which one is which, and that’s what this parameter does, and we need to pass this value through the constructor of the class

Then the logic of the cancel method is simple, we must create a stream to read our file, and when the stream is empty, we can close it

Collect (line) : collect(line) : collect(line) : collect(line) : collect(line) : collect

FileSource.java

public class FileSource implements SourceFunction<String{

    private String filePath;

    BufferedReader reader;

    Random random=new Random();

    public FileSource(String filePath){

        this.filePath=filePath;

    }



    @Override

    public void run(SourceContext<String> sct) throws Exception {

      reader = new BufferedReader(

                new InputStreamReader(

                        new FileInputStream(filePath)));

      String line=null;

      while((line = reader.readLine()) ! =null) {

          // Simulate the feeling of a stream of data, so I let the thread sleep

          TimeUnit.MILLISECONDS.sleep(random.nextInt(500));

          sct.collect(line);

      }



    }



    @Override

    public void cancel(a) {

        try{

            if(reader == null) {

                reader.close();

            }

        }catch (Exception e){

            e.printStackTrace();

        }



    }

}

Copy the code

2.8.4 constant class

We usually define a class to put our constants in, but this example is so simple that it won’t be a problem. In practice, we recommend doing this

Constants.java

public class Constants {

    public static final String ORDER_INFO1_PATH="I:/OrderInfo1.txt";

    public static final String ORDER_INFO2_PATH="I:/OrderInfo2.txt";

}

Copy the code

2.8.5 back OrderETLStream

The first is the acquisition of two data streams

DataStreamSource<String> info1Stream = env.addSource(new FileSource(Constants.ORDER_INFO1_PATH));

DataStreamSource<String> info2Stream = env.addSource(new FileSource(Constants.ORDER_INFO2_PATH));

Copy the code

OrderInfo1 and orderInfo2 have static methods, orderInfo1 and orderInfo2 have static methods, orderInfo1 and orderInfo2 have static methods, orderInfo1 and orderInfo2 have static methods, orderInfo1 and orderInfo2 have static methods, orderInfo1 and orderInfo2 have static methods, orderInfo1 and orderInfo2 have static methods, orderInfo1 and orderInfo2 have static methods, orderInfo1 and orderInfo2 This static method helps us to slice the string and convert it to the corresponding object’s field type

And then I’m just going to use lambda expressions

SingleOutputStreamOperator<OrderInfo1> orderInfo1Stream = info1Stream

            .map(line -> OrderInfo1.line2Info1(line));



SingleOutputStreamOperator<OrderInfo2> orderInfo2Stream = info2Stream

            .map(line -> OrderInfo2.line2Info2(line));

Copy the code

We then use keyBy to group the two data sources and take the order number for the key field

KeyedStream<OrderInfo1, Long> keyByInfo1 = orderInfo1Stream

        .keyBy(orderInfo1 -> orderInfo1.getOrderId());



KeyedStream<OrderInfo2, Long> keyByInfo2 = orderInfo2Stream

        .keyBy(orderInfo2 -> orderInfo2.getOrderId());

Copy the code

After that, connect was used to join them together. However, considering the time sequence, we directly operated Join, which might cause problems in the result of operation. Therefore, we had to rely on the state we just learned, and valueState, the most common one, was used to complete it

2.8.6

Note that the RichCoFlatMapFunction is used, as mentioned in the previous article. For two different data sources, we use co, and the output data type is Tuple2, because it is a combination of the two

The following routine is the same as before, first open, first describe, then describe, then register state, and then use it

FlatMap1 and flatMap2 flatMap if to the data of 123, the second data flow if there is a 2, 1, the first stream to the second 123 to merge, but if it is the first data flow in the first 123, the second flow didn’t arrive, then you have to use first save the update, the second flow is also in the same way, Judge the first stream. That’s our logic

EnrichmentFunction.java

/ * *

* IN1 The data type of the first class

* IN2 The data type of the second stream

* OUT Specifies the type of output data

* /


public class EnrichmentFunction

        extends RichCoFlatMapFunction<OrderInfo1.OrderInfo2.

        Tuple2<OrderInfo1.OrderInfo2>> 
{

    // Same order number





    private ValueState<OrderInfo1> orderInfo1ValueState;

    private ValueState<OrderInfo2> orderInfo2ValueState;

    @Override

    public void open(Configuration parameters) throws Exception {

        ValueStateDescriptor<OrderInfo1> descriptor1 = new ValueStateDescriptor<>(

                "info1".

                OrderInfo1.class

        );

        ValueStateDescriptor<OrderInfo2> descriptor2 = new ValueStateDescriptor<>(

                "info2".

                OrderInfo2.class

        );

        orderInfo1ValueState = getRuntimeContext().getState(descriptor1);

        orderInfo2ValueState = getRuntimeContext().getState(descriptor2);

    }

    // The key of the first stream



    / / 123

    / / 123

    @Override

    public void flatMap1(OrderInfo1 orderInfo1,

                         Collector<Tuple2<OrderInfo1, OrderInfo2>> out)
 throws Exception 
{

    // If this method is run, the first stream must be data.

        OrderInfo2 value2 = orderInfo2ValueState.value();

        if(value2 ! =null) {

            orderInfo2ValueState.clear();

            out.collect(Tuple2.of(orderInfo1,value2));

        }else{

            orderInfo1ValueState.update(orderInfo1);

        }

    }

    // The key of the second stream

    @Override

    public void flatMap2(OrderInfo2 orderInfo2,

                         Collector<Tuple2<OrderInfo1, OrderInfo2>> out)
 throws Exception 
{

        OrderInfo1 value1 = orderInfo1ValueState.value();

        if(value1 ! =null) {

            orderInfo1ValueState.clear();;

            out.collect(Tuple2.of(value1,orderInfo2));

        }else{

            orderInfo2ValueState.update(orderInfo2);



        }

    }

}

Copy the code

At this point, I’m executing the code, running normally, good

finally

All the states just demonstrated are the practices mentioned in the official website. I think learning should start from the official website, so I have put forward these examples

Our next article will continue to type the code 🤣