Understand Flink ideas and architecture

Big data thinking

What is Big Data? Everyone knows how to handle Data, for example, get Data and do some aggregation. In the simplest stand-alone system, do some Data calculation and get the desired result.

So where is the big data reflected? In fact, there is a business bottleneck for the stand-alone system (we do not have Tianhe-2 supercomputer), which is the barren of commercial computers, that is, distributed data computing, so big data solves the problem of distributed computing and distributed storage.

So do we need to care for distributed computing? Obviously not. Reason: Existing frameworks do it.

Therefore, the framework of big data can realize the distributed computing by relying on the framework and allowing us to use stand-alone computing logic. So this distribution is more reflected in the ability of parallel processing, so it can face massive data.

Big Data Framework

Big data computing framework, popular is MapReduce, Spark, Flink

MapReduce, because of its complex code logic, has largely fallen off the stage for offline computing

Spark uses excellent functional programming, RDD abstraction and elastic data flow to implement simple programming and offline computing

Flink found the poverty of offline computing and added real-time computing. In fact, what needs to be cared is the dimension of real-time computing, such as global real-time computing, real-time computing within a time period, and streaming computing (a single individual is the minimum processing unit), so Flink provides all of them.

Simple business monitoring framework

The first thing we’ll do is this:

What is the Flink

Here’s What Flink says, which is basically a stream on which stateful calculations can be added.

You can see that it supports a lot of processing events

The following are its core features.

Here are some of flink’s ways of playing: SQL, SQL-like, bounded/unbounded flow, and state calculation.

Here’s the code. It is basically the input (source) -> processing -> output (sink), where the input can be multiple streams.

An introduction to the demo

Getting started: wordCount

private StreamExecutionEnvironment env;

@Before
public void init(a) {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
}
@Test
public void testWordCount1(a) throws Exception {
    DataStreamSource<String> source = env.socketTextStream("localhost".9999);
    source.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] split = value.split("");
            for (String s : split) {
                out.collect(Tuple2.of(s, 1));
            }
        }
    }).keyBy(0).sum(1).print();
    env.execute("testWordCount1");
}
Copy the code

Execute the script:

nc -l 9999
Copy the code

Source

1. Bounded and unbounded flows

According to Flink, data sources, namely streams, are divided into bounded streams and unbounded streams

An unbounded flow defines the beginning of a flow, but not the end. They produce data endlessly. Unbounded stream data must be processed continuously, that is, immediately after it is ingested. We can’t wait for all the data to arrive before processing it, because the input is infinite and will never be complete at any point in time. Processing unbounded data often requires ingesting events in a particular order, such as the order in which they occurred, so that the integrity of the results can be inferred.

A bounded flow defines the beginning and end of a flow. Bounded flows can be calculated after all the data is ingested. Bounded streams all data can be sorted, so no sequential ingestion is required. Bounded stream processing is often referred to as batch processing

In the beginning, Spark only existed as RDD, but after Flink came out, Spark borrowed from the past. Flink started in real time, because it couldn’t do Spark offline at first. So they’re kind of in love with each other.

This is basically an unbounded stream, where -> returns DataStreamSource

This source func has just two methods

The second is bounded flow, defined a lot

These basically can be understood as a bounded stream, file can have a listening mechanism, can achieve unbounded flow.

2. Parallel flow, non-parallel flow

1.SourceFunction

Source has another dimension, which can be divided into two types, parallel flows and non-parallel flows. Parallel flow is actually multiple threads to pull, non-parallel only have a single thread

SourceFunction is a non-parallel stream

private StreamExecutionEnvironment env;
@Before
public void before(a) {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
}

@Test
public void testSourceFunction(a) throws Exception {
  // 
    env.addSource(new SourceFunction<String>() {
        int count = 0;
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (true) {
                ctx.collect(String.format("[%s] : %d", Thread.currentThread().getName(), count++));
                TimeUnit.MILLISECONDS.sleep(100); }}@Override
        public void cancel(a) {
        }
    }).addSink(new SinkFunction<String>() {
        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value));
        }
    }).setParallelism(1);
    env.execute("testSourceFunction");
}
Copy the code

Output:

[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :0}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :1}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :2}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :3}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :4}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :5}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :6}

Copy the code

SetParallelism (1) refers to the parallelism of sink and source-thread. The parallelism of sink and source-thread is the same as that of source-thread. So we can view it as, if we write it this way.

So let’s say we go like this

@Test
public void testSourceFunction2(a) throws Exception {
    env.addSource(new SourceFunction<String>() {
        int count = 0;
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (true) {
                ctx.collect(String.format("[source-thread %s] : %d", Thread.currentThread().getName(), count++));
                TimeUnit.MILLISECONDS.sleep(100); }}@Override
        public void cancel(a) {
        }
    }).addSink(new SinkFunction<String>() {
        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value)); }}); env.execute("testSourceFunction");
}
Copy the code

At this time the output

[sink-thread Sink: Unnamed (5/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 0}
[sink-thread Sink: Unnamed (6/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 1}
[sink-thread Sink: Unnamed (7/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 2}
[sink-thread Sink: Unnamed (8/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 3}
[sink-thread Sink: Unnamed (1/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 4}
[sink-thread Sink: Unnamed (2/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 5}
[sink-thread Sink: Unnamed (3/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 6}
[sink-thread Sink: Unnamed (4/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 7}
[sink-thread Sink: Unnamed (5/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 8}
[sink-thread Sink: Unnamed (6/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] :}Copy the code

In this case, the number of sink-thread threads is 8, that is, the default operation is 8 threads.

So we said SourceFunction is a non-parallel stream, so change it now.

@Test
public void testSourceFunction3(a) throws Exception {
    env.addSource(new SourceFunction<String>() {
        int count = 0;
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (true) {
                ctx.collect(String.format("[source-thread %s] : %d", Thread.currentThread().getName(), count++));
                TimeUnit.MILLISECONDS.sleep(100); }}@Override
        public void cancel(a) {
        }
    }).setParallelism(2).addSink(new SinkFunction<String>() {
        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value)); }}); env.execute("testSourceFunction");
}
Copy the code

At this time the output

java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.
Copy the code

The maximum parallelism for throwing an exception is 1

2,ParallelSourceFunction

You can see from the name that it’s a parallel stream

@Test
public void testParallelSourceFunction(a) throws Exception {
    env.addSource(new ParallelSourceFunction<String>() {
        int count = 0;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (true) {
                ctx.collect(String.format("[source-thread %s] : %d", Thread.currentThread().getName(), count++));
                TimeUnit.MILLISECONDS.sleep(100); }}@Override
        public void cancel(a) {

        }
    }).setParallelism(2).addSink(new SinkFunction<String>() {
        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value)); }}); env.execute("testSourceFunction");
}
Copy the code

Output:

Source-thread has two Custom sources (1/2) and one Custom source (2/2), and count is different for each thread.

[sink-thread Sink: Unnamed (1/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 0}
[sink-thread Sink: Unnamed (7/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 0}
[sink-thread Sink: Unnamed (2/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 1}
[sink-thread Sink: Unnamed (8/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 1}
[sink-thread Sink: Unnamed (3/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 2}
[sink-thread Sink: Unnamed (1/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 2}
[sink-thread Sink: Unnamed (2/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 3}
[sink-thread Sink: Unnamed (4/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 3}
[sink-thread Sink: Unnamed (5/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 4}
[sink-thread Sink: Unnamed (3/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 4}
Copy the code

3,RichParallelSourceFunction

It enriches the ParallelSourceFunction, so let’s see what it has

GetRuntimeContext gets the current CTX, and we’ll talk about this context later, but it can be interpreted as a global context to get some global configuration information or whatever.

Open () is called when it starts

@Test
public void testRichParallelSourceFunction(a) throws Exception {

    env.addSource(new RichParallelSourceFunction<String>() {
        int count = 0;

        @Override
        public void open(Configu ration parameters) throws Exception {
            RuntimeContext context = getRuntimeContext();
            System.out.println("open context: " + context);
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("run context : " + ctx);
            ctx.collect(String.format("[source-thread %s] : %d", Thread.currentThread().getName(), count++));
            TimeUnit.MILLISECONDS.sleep(100);
        }

        @Override
        public void cancel(a) {

        }
    }).setParallelism(2).addSink(new SinkFunction<String>() {
        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value));
        }
    }).setParallelism(1);
    env.execute("testRichParallelSourceFunction");
}
Copy the code

Output:

open context: org.apache.flink.streaming.api.operators.StreamingRuntimeContext@56d05a06
open context: org.apache.flink.streaming.api.operators.StreamingRuntimeContext@6ee7263b
run context : org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext@491e50f9
run context : org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext@595e4aa9
[sink-thread Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 0}
[sink-thread Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 0}
Copy the code

3. Merge multiple streams

1. Simple merge

Multiple streams can be processed in parallel using simple merges.

public static void test(a) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // stream 1
    DataStreamSource<Integer> intStream = env.fromCollection(Arrays.asList(1.2.3));
    // stream 2
    DataStreamSource<String> source = env.fromCollection(Arrays.asList("a"."b"."c"));
    // Connect multiple streams
    ConnectedStreams<String, Integer> connect = source.connect(intStream);

    connect.process(new CoProcessFunction<String, Integer, Object>() {
        @Override
        public void processElement1(String value, Context ctx, Collector<Object> out) throws Exception {
            out.collect(value);
        }

        @Override
        public void processElement2(Integer value, Context ctx, Collector<Object> out) throws Exception {
            out.collect(value);
        }
    }).print();
    env.execute("demo");
}
Copy the code

2. Broadcast consolidation

Use the idea of BoardCase to distribute information and configure scenarios.

@Test
public void testBroadcast(a) throws Exception {
    DataStreamSource<String> source1 = environment.addSource(new RichParallelSourceFunction<String>() {
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (true) {
                ctx.collect("= = = = = = = = =");
                TimeUnit.MILLISECONDS.sleep(100); }}@Override
        public void cancel(a) {

        }
    }).setParallelism(8);

    DataStream<String> source2 = environment.addSource(new RichParallelSourceFunction<String>() {
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (true) {
                ctx.collect(new Random().nextInt(100) + "");
                TimeUnit.SECONDS.sleep(1);
                System.out.println("refresh"); }}@Override
        public void cancel(a) {

        }
    }).setParallelism(1).broadcast();


    source1.connect(source2).flatMap(new CoFlatMapFunction<String, String, String>() {
        String config;

        @Override
        public void flatMap1(String value, Collector<String> out) throws Exception {
            System.out.println(String.format("process [%s] config %s", Thread.currentThread().getName(), config));
        }

        @Override
        public void flatMap2(String value, Collector<String> out) throws Exception {
            this.config = value;
        }
    }).addSink(new SinkFunction<String>() {
        @Override
        public void invoke(String value, Context context) throws Exception {}}); environment.execute("");
}
Copy the code

We can get, just need a broadcast method, can achieve configuration distribution.

process [Co-Flat Map -> Sink: Unnamed (1/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (2/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (3/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (6/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (4/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (5/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (7/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (8/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (6/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (3/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (2/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (1/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (4/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (5/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (7/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (8/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (3/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (6/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (2/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (1/8)] config 18
Copy the code

Operator Transformation

1, the map

A – > b is the map

SingleOutputStreamOperator<Tuple2<String, Integer>> map = env.socketTextStream("localhost".8888).map(new MapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> map(String value) throws Exception {
        return new Tuple2<>(value, value.split("").length); }});Copy the code

2, flatMap

SingleOutputStreamOperator<Object> flatMap = env.socketTextStream("".8888).flatMap(new FlatMapFunction<String, Object>() {
    @Override
    public void flatMap(String value, Collector<Object> out) throws Exception {}});Copy the code

FlatMap ->collector-> COLLECTOR ->collector-> collector-> collector-> collector-> Collector -> Collector -> Collector -> Collector

3, the split

@Test
public void testSplit(a) throws Exception {
    DataStreamSource<Integer> source = environment.addSource(new SourceFunction<Integer>() {
        Random random = new Random();
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            while (true) {
                ctx.collect(random.nextInt(1000));
                TimeUnit.MILLISECONDS.sleep(100); }}@Override
        public void cancel(a) {}});// This is not recommended
    SplitStream<Integer> split = source.split((OutputSelector<Integer>) value -> Collections.singletonList(value % 2= =0 ? "even" : "uneven"));
    DataStream<Integer> even = split.select("even");
    even.print();
    environment.execute("testSplit");
}
Copy the code

The feeling is very similar to filter, but people can cut multiple, but it is not recommended to use

3, the key – by

Similar to group by

A partitioning strategy, for example, now we have to add up the even numbers, add up the odd numbers, add up every four.

@Test
public void testKeyBy() throws Exception {
    DataStreamSource<Integer> source = environment.addSource(new SourceFunction<Integer>() {
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            List<Integer> list = Arrays.asList(1.2.1.2.1.2.1.2);
            list.forEach(integer -> {
                ctx.collect(integer);
                try {
                    TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }}); } @Override public void cancel() { } });// Partition by key
    KeyedStream<Integer, String> integerObjectKeyedStream = source.keyBy(new KeySelector<Integer, String>() {
        @Override
        public String getKey(Integer value) throws Exception {
            return value % 2= =0 ? "even" : "uneven"; }}); integerObjectKeyedStream.flatMap(new RichFlatMapFunction<Integer, Integer>() {
        ValueState<Integer> state = null;
        int trigger = 0;

        @Override
        public void open(Configuration parameters) throws Exception {
            // State is a very core thing, which can be understood as a runtime state similar to context
            ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("", TypeInformation.of(Integer.class));
            this.state = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap(Integer value, Collector<Integer> out) throws Exception {
            Integer sum = state.value();
            if (sum == null) {
                sum = 0;
            }
            sum += value;
            state.update(sum);
            // No four times refresh a sum
            if (++trigger % 4= =0) { out.collect(sum); state.clear(); }}}).print(a); environment.execute("testKeyBy");
}
Copy the code

So that’s a simple demo;

Watermark and Time

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Copy the code

The event is introduced cloud.tencent.com/developer/a…

WaterMark is introduced www.jianshu.com/p/9db56f81f… , developer.aliyun.com/article/682…

Basically, the concept of processing time, which Flink will calculate according to different dimensions of time. When setting eventTime, the user has to set a method to get the time, but there is a problem with the user’s data, maybe for example, the time of A is 1s, and the time of B is 2s. Our statistical dimension is 2s. However, due to the network delay, B got it for 3s, and then needed a watermark. Only when watermark>windows_end_time, he went back to aggregate. It’s basically a fault-tolerant mechanism. Set the fault tolerance time based on the actual situation.

It is written as follows:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  // Set the custom time rub property
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  // Add a kafka source
  FlinkKafkaConsumer011<EventLog> consumer = new FlinkKafkaConsumer011<>(KafkaProducer.topic, new EventLogSer(), getKafkaSourceProperties());
  DataStreamSource<EventLog> streamSource = env.addSource(consumer);
  // water mark, error tolerance is 100s
  streamSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<EventLog>(Time.of(100, TimeUnit.MILLISECONDS)) {
      @Override
      public long extractTimestamp(EventLog element) {
          // The time recorded above the log is used as the window to calculate the dimension
          return element.time;
      }
      // The number of aggregates needs to be converted
  }).map(new MapFunction<EventLog, Tuple1<Integer>>() {
      @Override
      public Tuple1<Integer> map(EventLog value) throws Exception {
          return new Tuple1<>(1);
      }
  }).timeWindowAll(Time.seconds(1)).sum(0).print();
Copy the code

There are two types of watermark: perfect watermark and heuristic watermark.

Window concept

Here is a recommended article: juejin.cn/post/684490…

Take a look at the following two pieces of code

env.socketTextStream("".8888).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] split = value.split("");
        for (String s : split) {
            out.collect(new Tuple2<>(s, 1));
        }
    }
}).keyBy(0).sum(1).print();
Copy the code

The dimension of this calculation is full

env.socketTextStream("".8888).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] split = value.split("");
        for (String s : split) {
            out.collect(new Tuple2<>(s, 1));
        }
    }
}).keyBy(0).timeWindow(Time.seconds(1)).sum(1).print();
Copy the code

The dimension of this calculation is the time window

So what’s the time window?

For example, when we deal with a matter, we usually perform it in a cycle, such as counting the number of logs within a minute, which is aggregated by log time. Then the question comes, which dimension is the time according to? For example, the log time, the time recorded by the log itself, or the time processed by Flink. In fact, Flink considers all of this.

Here’s an example of how to do that

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the properties
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Add a kafka source
FlinkKafkaConsumer011<EventLog> consumer = new FlinkKafkaConsumer011<>(KafkaProducer.topic, new EventLogSer(), getKafkaSourceProperties());
DataStreamSource<EventLog> streamSource = env.addSource(consumer);

// water mark
streamSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<EventLog>(Time.of(100, TimeUnit.MILLISECONDS)) {
    @Override
    public long extractTimestamp(EventLog element) {
        // The time recorded above the log is used as the window to calculate the dimension
        return element.time;
    }
    // The number of aggregates needs to be converted
}).map(new MapFunction<EventLog, Tuple1<Integer>>() {
    @Override
    public Tuple1<Integer> map(EventLog value) throws Exception {
        return new Tuple1<>(1);
    }
}).timeWindowAll(Time.seconds(1)).sum(0).print();
Copy the code

This is one

state

Juejin. Cn/post / 684490…

There are two dimensions of state, one is the task level (I tested the test failed), one is the key level.

1. Task level

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.jav a:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailure Handler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., After a 'keyBy()' operation. // The keyed state must be after the operation org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntim eContext.java:185) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:142) at state.StateDemo$1.open(StateDemo.java:39) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(Strea mTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748)Copy the code

2. Key level

In fact, state is easy to understand. It is to store a context, aggregated by a key. The data structure of this context is the characteristics of the following keys. Keyed state records the state of each key. There are six types of keyed state:

  1. ValueState
  2. ListState
  3. MapState
  4. ReducingState
  5. AggregatingState
  6. FoldingState

ValueState, in its simplest form, is a single state value:

The following demo is statistics, each partition processing number, aggregation dimension is key, the code is very simple.

private StreamExecutionEnvironment env;

@Before
public void init(a) {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
}

@Test
public void testKeyState(a) throws Exception {
    env.addSource(new ParallelSourceFunction<String>() {
        private Random random = new Random();

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (true) {
                // There are 10 partitions
                int count = random.nextInt(5);
                ctx.collect(count + ",content");
                TimeUnit.MILLISECONDS.sleep(100); }}@Override
        public void cancel(a) {
        }
    }).setParallelism(8).keyBy(new KeySelector<String, Integer>() {
        @Override
        public Integer getKey(String value) throws Exception {
            String[] split = value.split(",");
            // use 0,1,2,3,4 as the key, which is similar to kafka's partitioning strategy
            return Integer.parseInt(split[0]);
        }
    }).flatMap(new RichFlatMapFunction<String, Integer>() {
        private ValueState<Integer> state;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("key", Integer.class);
            state = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap(String value, Collector<Integer> out) throws Exception {
            Integer count = state.value();
            if (count == null) {
                count = 0;
            }
            System.out.printf("[%s]%s\n", Thread.currentThread().getName(), value);
            count++;
            state.update(count);
            out.collect(count);
        }
    }).print();
    env.execute("testKeyState");
}
Copy the code

It’s all about the output

[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 1
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 2
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 3
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 4
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 5
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 6
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 7
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content // This is 8
8> 8
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 1
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 2
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 3
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 4
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 5
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 6
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 7
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 8
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 9
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 10
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 11
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 12
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 13
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 14
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 15
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 16
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 1
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 2
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 3
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 4
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 5
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 6
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 7
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 8
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 17
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 18
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 19
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 20
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 21
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 22
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 23
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 24
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 1
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 2
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 3
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 4
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 5
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 6
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 7
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 8
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 9
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 10
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 11
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 12
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 13
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 14
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 15
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 16
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 9
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 10
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 11
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 12
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 13
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 14
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 15
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 16
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content // The next position, here is 9, so successfully get the state.
8> 9
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 10
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 11
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 12
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 13
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 14
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 15
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 16
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 25
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 26
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 27
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 28
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 29
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 30
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 31
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 32
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 17
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 18
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 19
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 20
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 21
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 22
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 23
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 24
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 25
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 26
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 27
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 28
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 29
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 30
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 31
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 32
Copy the code

In fact, the test results, also found a problem is that each key is bound to a thread, which may be convenient for aggregation.