Understand Flink ideas and architecture
Big data thinking
What is Big Data? Everyone knows how to handle Data, for example, get Data and do some aggregation. In the simplest stand-alone system, do some Data calculation and get the desired result.
So where is the big data reflected? In fact, there is a business bottleneck for the stand-alone system (we do not have Tianhe-2 supercomputer), which is the barren of commercial computers, that is, distributed data computing, so big data solves the problem of distributed computing and distributed storage.
So do we need to care for distributed computing? Obviously not. Reason: Existing frameworks do it.
Therefore, the framework of big data can realize the distributed computing by relying on the framework and allowing us to use stand-alone computing logic. So this distribution is more reflected in the ability of parallel processing, so it can face massive data.
Big Data Framework
Big data computing framework, popular is MapReduce, Spark, Flink
MapReduce, because of its complex code logic, has largely fallen off the stage for offline computing
Spark uses excellent functional programming, RDD abstraction and elastic data flow to implement simple programming and offline computing
Flink found the poverty of offline computing and added real-time computing. In fact, what needs to be cared is the dimension of real-time computing, such as global real-time computing, real-time computing within a time period, and streaming computing (a single individual is the minimum processing unit), so Flink provides all of them.
Simple business monitoring framework
The first thing we’ll do is this:
What is the Flink
Here’s What Flink says, which is basically a stream on which stateful calculations can be added.
You can see that it supports a lot of processing events
The following are its core features.
Here are some of flink’s ways of playing: SQL, SQL-like, bounded/unbounded flow, and state calculation.
Here’s the code. It is basically the input (source) -> processing -> output (sink), where the input can be multiple streams.
An introduction to the demo
Getting started: wordCount
private StreamExecutionEnvironment env;
@Before
public void init(a) {
env = StreamExecutionEnvironment.getExecutionEnvironment();
}
@Test
public void testWordCount1(a) throws Exception {
DataStreamSource<String> source = env.socketTextStream("localhost".9999);
source.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] split = value.split("");
for (String s : split) {
out.collect(Tuple2.of(s, 1));
}
}
}).keyBy(0).sum(1).print();
env.execute("testWordCount1");
}
Copy the code
Execute the script:
nc -l 9999
Copy the code
Source
1. Bounded and unbounded flows
According to Flink, data sources, namely streams, are divided into bounded streams and unbounded streams
An unbounded flow defines the beginning of a flow, but not the end. They produce data endlessly. Unbounded stream data must be processed continuously, that is, immediately after it is ingested. We can’t wait for all the data to arrive before processing it, because the input is infinite and will never be complete at any point in time. Processing unbounded data often requires ingesting events in a particular order, such as the order in which they occurred, so that the integrity of the results can be inferred.
A bounded flow defines the beginning and end of a flow. Bounded flows can be calculated after all the data is ingested. Bounded streams all data can be sorted, so no sequential ingestion is required. Bounded stream processing is often referred to as batch processing
In the beginning, Spark only existed as RDD, but after Flink came out, Spark borrowed from the past. Flink started in real time, because it couldn’t do Spark offline at first. So they’re kind of in love with each other.
This is basically an unbounded stream, where -> returns DataStreamSource
This source func has just two methods
The second is bounded flow, defined a lot
These basically can be understood as a bounded stream, file can have a listening mechanism, can achieve unbounded flow.
2. Parallel flow, non-parallel flow
1.SourceFunction
Source has another dimension, which can be divided into two types, parallel flows and non-parallel flows. Parallel flow is actually multiple threads to pull, non-parallel only have a single thread
SourceFunction is a non-parallel stream
private StreamExecutionEnvironment env;
@Before
public void before(a) {
env = StreamExecutionEnvironment.getExecutionEnvironment();
}
@Test
public void testSourceFunction(a) throws Exception {
//
env.addSource(new SourceFunction<String>() {
int count = 0;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
ctx.collect(String.format("[%s] : %d", Thread.currentThread().getName(), count++));
TimeUnit.MILLISECONDS.sleep(100); }}@Override
public void cancel(a) {
}
}).addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value));
}
}).setParallelism(1);
env.execute("testSourceFunction");
}
Copy the code
Output:
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :0}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :1}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :2}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :3}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :4}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :5}
[sink-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source -> Sink: Unnamed (1/1) :6}
Copy the code
SetParallelism (1) refers to the parallelism of sink and source-thread. The parallelism of sink and source-thread is the same as that of source-thread. So we can view it as, if we write it this way.
So let’s say we go like this
@Test
public void testSourceFunction2(a) throws Exception {
env.addSource(new SourceFunction<String>() {
int count = 0;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
ctx.collect(String.format("[source-thread %s] : %d", Thread.currentThread().getName(), count++));
TimeUnit.MILLISECONDS.sleep(100); }}@Override
public void cancel(a) {
}
}).addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value)); }}); env.execute("testSourceFunction");
}
Copy the code
At this time the output
[sink-thread Sink: Unnamed (5/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 0}
[sink-thread Sink: Unnamed (6/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 1}
[sink-thread Sink: Unnamed (7/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 2}
[sink-thread Sink: Unnamed (8/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 3}
[sink-thread Sink: Unnamed (1/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 4}
[sink-thread Sink: Unnamed (2/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 5}
[sink-thread Sink: Unnamed (3/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 6}
[sink-thread Sink: Unnamed (4/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 7}
[sink-thread Sink: Unnamed (5/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] : 8}
[sink-thread Sink: Unnamed (6/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/1)] :}Copy the code
In this case, the number of sink-thread threads is 8, that is, the default operation is 8 threads.
So we said SourceFunction is a non-parallel stream, so change it now.
@Test
public void testSourceFunction3(a) throws Exception {
env.addSource(new SourceFunction<String>() {
int count = 0;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
ctx.collect(String.format("[source-thread %s] : %d", Thread.currentThread().getName(), count++));
TimeUnit.MILLISECONDS.sleep(100); }}@Override
public void cancel(a) {
}
}).setParallelism(2).addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value)); }}); env.execute("testSourceFunction");
}
Copy the code
At this time the output
java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.
Copy the code
The maximum parallelism for throwing an exception is 1
2,ParallelSourceFunction
You can see from the name that it’s a parallel stream
@Test
public void testParallelSourceFunction(a) throws Exception {
env.addSource(new ParallelSourceFunction<String>() {
int count = 0;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
ctx.collect(String.format("[source-thread %s] : %d", Thread.currentThread().getName(), count++));
TimeUnit.MILLISECONDS.sleep(100); }}@Override
public void cancel(a) {
}
}).setParallelism(2).addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value)); }}); env.execute("testSourceFunction");
}
Copy the code
Output:
Source-thread has two Custom sources (1/2) and one Custom source (2/2), and count is different for each thread.
[sink-thread Sink: Unnamed (1/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 0}
[sink-thread Sink: Unnamed (7/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 0}
[sink-thread Sink: Unnamed (2/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 1}
[sink-thread Sink: Unnamed (8/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 1}
[sink-thread Sink: Unnamed (3/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 2}
[sink-thread Sink: Unnamed (1/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 2}
[sink-thread Sink: Unnamed (2/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 3}
[sink-thread Sink: Unnamed (4/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 3}
[sink-thread Sink: Unnamed (5/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 4}
[sink-thread Sink: Unnamed (3/8)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 4}
Copy the code
3,RichParallelSourceFunction
It enriches the ParallelSourceFunction, so let’s see what it has
GetRuntimeContext gets the current CTX, and we’ll talk about this context later, but it can be interpreted as a global context to get some global configuration information or whatever.
Open () is called when it starts
@Test
public void testRichParallelSourceFunction(a) throws Exception {
env.addSource(new RichParallelSourceFunction<String>() {
int count = 0;
@Override
public void open(Configu ration parameters) throws Exception {
RuntimeContext context = getRuntimeContext();
System.out.println("open context: " + context);
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
System.out.println("run context : " + ctx);
ctx.collect(String.format("[source-thread %s] : %d", Thread.currentThread().getName(), count++));
TimeUnit.MILLISECONDS.sleep(100);
}
@Override
public void cancel(a) {
}
}).setParallelism(2).addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(String.format("[sink-thread %s]{data : %s}", Thread.currentThread().getName(), value));
}
}).setParallelism(1);
env.execute("testRichParallelSourceFunction");
}
Copy the code
Output:
open context: org.apache.flink.streaming.api.operators.StreamingRuntimeContext@56d05a06
open context: org.apache.flink.streaming.api.operators.StreamingRuntimeContext@6ee7263b
run context : org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext@491e50f9
run context : org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext@595e4aa9
[sink-thread Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source (1/2)] : 0}
[sink-thread Sink: Unnamed (1/1)]{data : [source-thread Legacy Source Thread - Source: Custom Source (2/2)] : 0}
Copy the code
3. Merge multiple streams
1. Simple merge
Multiple streams can be processed in parallel using simple merges.
public static void test(a) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// stream 1
DataStreamSource<Integer> intStream = env.fromCollection(Arrays.asList(1.2.3));
// stream 2
DataStreamSource<String> source = env.fromCollection(Arrays.asList("a"."b"."c"));
// Connect multiple streams
ConnectedStreams<String, Integer> connect = source.connect(intStream);
connect.process(new CoProcessFunction<String, Integer, Object>() {
@Override
public void processElement1(String value, Context ctx, Collector<Object> out) throws Exception {
out.collect(value);
}
@Override
public void processElement2(Integer value, Context ctx, Collector<Object> out) throws Exception {
out.collect(value);
}
}).print();
env.execute("demo");
}
Copy the code
2. Broadcast consolidation
Use the idea of BoardCase to distribute information and configure scenarios.
@Test
public void testBroadcast(a) throws Exception {
DataStreamSource<String> source1 = environment.addSource(new RichParallelSourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
ctx.collect("= = = = = = = = =");
TimeUnit.MILLISECONDS.sleep(100); }}@Override
public void cancel(a) {
}
}).setParallelism(8);
DataStream<String> source2 = environment.addSource(new RichParallelSourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
ctx.collect(new Random().nextInt(100) + "");
TimeUnit.SECONDS.sleep(1);
System.out.println("refresh"); }}@Override
public void cancel(a) {
}
}).setParallelism(1).broadcast();
source1.connect(source2).flatMap(new CoFlatMapFunction<String, String, String>() {
String config;
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
System.out.println(String.format("process [%s] config %s", Thread.currentThread().getName(), config));
}
@Override
public void flatMap2(String value, Collector<String> out) throws Exception {
this.config = value;
}
}).addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {}}); environment.execute("");
}
Copy the code
We can get, just need a broadcast method, can achieve configuration distribution.
process [Co-Flat Map -> Sink: Unnamed (1/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (2/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (3/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (6/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (4/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (5/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (7/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (8/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (6/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (3/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (2/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (1/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (4/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (5/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (7/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (8/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (3/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (6/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (2/8)] config 18
process [Co-Flat Map -> Sink: Unnamed (1/8)] config 18
Copy the code
Operator Transformation
1, the map
A – > b is the map
SingleOutputStreamOperator<Tuple2<String, Integer>> map = env.socketTextStream("localhost".8888).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, value.split("").length); }});Copy the code
2, flatMap
SingleOutputStreamOperator<Object> flatMap = env.socketTextStream("".8888).flatMap(new FlatMapFunction<String, Object>() {
@Override
public void flatMap(String value, Collector<Object> out) throws Exception {}});Copy the code
FlatMap ->collector-> COLLECTOR ->collector-> collector-> collector-> collector-> Collector -> Collector -> Collector -> Collector
3, the split
@Test
public void testSplit(a) throws Exception {
DataStreamSource<Integer> source = environment.addSource(new SourceFunction<Integer>() {
Random random = new Random();
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (true) {
ctx.collect(random.nextInt(1000));
TimeUnit.MILLISECONDS.sleep(100); }}@Override
public void cancel(a) {}});// This is not recommended
SplitStream<Integer> split = source.split((OutputSelector<Integer>) value -> Collections.singletonList(value % 2= =0 ? "even" : "uneven"));
DataStream<Integer> even = split.select("even");
even.print();
environment.execute("testSplit");
}
Copy the code
The feeling is very similar to filter, but people can cut multiple, but it is not recommended to use
3, the key – by
Similar to group by
A partitioning strategy, for example, now we have to add up the even numbers, add up the odd numbers, add up every four.
@Test
public void testKeyBy() throws Exception {
DataStreamSource<Integer> source = environment.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
List<Integer> list = Arrays.asList(1.2.1.2.1.2.1.2);
list.forEach(integer -> {
ctx.collect(integer);
try {
TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }}); } @Override public void cancel() { } });// Partition by key
KeyedStream<Integer, String> integerObjectKeyedStream = source.keyBy(new KeySelector<Integer, String>() {
@Override
public String getKey(Integer value) throws Exception {
return value % 2= =0 ? "even" : "uneven"; }}); integerObjectKeyedStream.flatMap(new RichFlatMapFunction<Integer, Integer>() {
ValueState<Integer> state = null;
int trigger = 0;
@Override
public void open(Configuration parameters) throws Exception {
// State is a very core thing, which can be understood as a runtime state similar to context
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("", TypeInformation.of(Integer.class));
this.state = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
Integer sum = state.value();
if (sum == null) {
sum = 0;
}
sum += value;
state.update(sum);
// No four times refresh a sum
if (++trigger % 4= =0) { out.collect(sum); state.clear(); }}}).print(a); environment.execute("testKeyBy");
}
Copy the code
So that’s a simple demo;
Watermark and Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Copy the code
The event is introduced cloud.tencent.com/developer/a…
WaterMark is introduced www.jianshu.com/p/9db56f81f… , developer.aliyun.com/article/682…
Basically, the concept of processing time, which Flink will calculate according to different dimensions of time. When setting eventTime, the user has to set a method to get the time, but there is a problem with the user’s data, maybe for example, the time of A is 1s, and the time of B is 2s. Our statistical dimension is 2s. However, due to the network delay, B got it for 3s, and then needed a watermark. Only when watermark>windows_end_time, he went back to aggregate. It’s basically a fault-tolerant mechanism. Set the fault tolerance time based on the actual situation.
It is written as follows:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the custom time rub property
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Add a kafka source
FlinkKafkaConsumer011<EventLog> consumer = new FlinkKafkaConsumer011<>(KafkaProducer.topic, new EventLogSer(), getKafkaSourceProperties());
DataStreamSource<EventLog> streamSource = env.addSource(consumer);
// water mark, error tolerance is 100s
streamSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<EventLog>(Time.of(100, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(EventLog element) {
// The time recorded above the log is used as the window to calculate the dimension
return element.time;
}
// The number of aggregates needs to be converted
}).map(new MapFunction<EventLog, Tuple1<Integer>>() {
@Override
public Tuple1<Integer> map(EventLog value) throws Exception {
return new Tuple1<>(1);
}
}).timeWindowAll(Time.seconds(1)).sum(0).print();
Copy the code
There are two types of watermark: perfect watermark and heuristic watermark.
Window concept
Here is a recommended article: juejin.cn/post/684490…
Take a look at the following two pieces of code
env.socketTextStream("".8888).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] split = value.split("");
for (String s : split) {
out.collect(new Tuple2<>(s, 1));
}
}
}).keyBy(0).sum(1).print();
Copy the code
The dimension of this calculation is full
env.socketTextStream("".8888).flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] split = value.split("");
for (String s : split) {
out.collect(new Tuple2<>(s, 1));
}
}
}).keyBy(0).timeWindow(Time.seconds(1)).sum(1).print();
Copy the code
The dimension of this calculation is the time window
So what’s the time window?
For example, when we deal with a matter, we usually perform it in a cycle, such as counting the number of logs within a minute, which is aggregated by log time. Then the question comes, which dimension is the time according to? For example, the log time, the time recorded by the log itself, or the time processed by Flink. In fact, Flink considers all of this.
Here’s an example of how to do that
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the properties
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Add a kafka source
FlinkKafkaConsumer011<EventLog> consumer = new FlinkKafkaConsumer011<>(KafkaProducer.topic, new EventLogSer(), getKafkaSourceProperties());
DataStreamSource<EventLog> streamSource = env.addSource(consumer);
// water mark
streamSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<EventLog>(Time.of(100, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(EventLog element) {
// The time recorded above the log is used as the window to calculate the dimension
return element.time;
}
// The number of aggregates needs to be converted
}).map(new MapFunction<EventLog, Tuple1<Integer>>() {
@Override
public Tuple1<Integer> map(EventLog value) throws Exception {
return new Tuple1<>(1);
}
}).timeWindowAll(Time.seconds(1)).sum(0).print();
Copy the code
This is one
state
Juejin. Cn/post / 684490…
There are two dimensions of state, one is the task level (I tested the test failed), one is the key level.
1. Task level
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.jav a:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailure Handler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., After a 'keyBy()' operation. // The keyed state must be after the operation org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntim eContext.java:185) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:142) at state.StateDemo$1.open(StateDemo.java:39) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(Strea mTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748)Copy the code
2. Key level
In fact, state is easy to understand. It is to store a context, aggregated by a key. The data structure of this context is the characteristics of the following keys. Keyed state records the state of each key. There are six types of keyed state:
- ValueState
- ListState
- MapState
- ReducingState
- AggregatingState
- FoldingState
ValueState, in its simplest form, is a single state value:
The following demo is statistics, each partition processing number, aggregation dimension is key, the code is very simple.
private StreamExecutionEnvironment env;
@Before
public void init(a) {
env = StreamExecutionEnvironment.getExecutionEnvironment();
}
@Test
public void testKeyState(a) throws Exception {
env.addSource(new ParallelSourceFunction<String>() {
private Random random = new Random();
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
// There are 10 partitions
int count = random.nextInt(5);
ctx.collect(count + ",content");
TimeUnit.MILLISECONDS.sleep(100); }}@Override
public void cancel(a) {
}
}).setParallelism(8).keyBy(new KeySelector<String, Integer>() {
@Override
public Integer getKey(String value) throws Exception {
String[] split = value.split(",");
// use 0,1,2,3,4 as the key, which is similar to kafka's partitioning strategy
return Integer.parseInt(split[0]);
}
}).flatMap(new RichFlatMapFunction<String, Integer>() {
private ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("key", Integer.class);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(String value, Collector<Integer> out) throws Exception {
Integer count = state.value();
if (count == null) {
count = 0;
}
System.out.printf("[%s]%s\n", Thread.currentThread().getName(), value);
count++;
state.update(count);
out.collect(count);
}
}).print();
env.execute("testKeyState");
}
Copy the code
It’s all about the output
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 1
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 2
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 3
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 4
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 5
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 6
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 7
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content // This is 8
8> 8
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 1
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 2
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 3
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 4
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 5
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 6
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 7
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 8
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 9
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 10
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 11
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 12
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 13
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 14
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 15
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 16
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 1
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 2
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 3
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 4
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 5
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 6
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 7
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 8
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 17
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 18
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 19
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 20
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 21
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 22
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 23
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 24
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 1
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 2
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 3
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 4
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 5
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 6
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 7
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 8
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 9
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 10
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 11
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 12
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 13
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 14
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 15
[Flat Map -> Sink: Print to Std. Out (8/8)]2,content
8> 16
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 9
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 10
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 11
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 12
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 13
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 14
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 15
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 16
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content // The next position, here is 9, so successfully get the state.
8> 9
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 10
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 11
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 12
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 13
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 14
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 15
[Flat Map -> Sink: Print to Std. Out (8/8)]3,content
8> 16
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 25
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 26
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 27
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 28
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 29
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 30
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 31
[Flat Map -> Sink: Print to Std. Out (6/8)]0,content
6> 32
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 17
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 18
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 19
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 20
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 21
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 22
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 23
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 24
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 25
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 26
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 27
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 28
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 29
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 30
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 31
[Flat Map -> Sink: Print to Std. Out (6/8)]1,content
6> 32
Copy the code
In fact, the test results, also found a problem is that each key is bound to a thread, which may be convenient for aggregation.