Flink includes automatic state management
For example, word count: the following example is an example of continuously reading input words from the socket, each count being added to the historical count.
package com.learn.flink.source;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/** * stream data - data source - socket * datastream-source - socket */
public class SourceDemo_Socket {
public static void main(String[] args) throws Exception {
/ / 0: env
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 1: source
DataStream<String> ds = env.socketTextStream("node01".999);
// 2. transformation
/ / cutting
DataStream<String> words = ds.flatMap((String value, Collector<String> out) -> {
Arrays.stream(value.split("")).forEach(out::collect);
}).returns(Types.STRING);
// Each word counts 1
DataStream<Tuple2<String, Integer>> wordAndOne = words.map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
/ / group
final KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(value -> value.f0);
/ / the aggregation
final SingleOutputStreamOperator<Tuple2<String, Integer>> sum = grouped.sum(1);
// 3: sink
sum.print();
// 4: executeenv.execute(); }}Copy the code
The result is to maintain a state of historical data, which Flink manages automatically.
The state can also be maintained manually in other scenarios where necessary.
Stateless and stateful computing
Stateless calculation: so as long as the input data is the same, the same calculation, the result is the same. For example, map and filter. Stateless computation is simple.
Stateful computation can be divided into two types:
- The input data contains the state
- The operator itself contains states
Stateful calculations when complex!
General usage scenarios are as follows:
- Access to historical data that needs to be compared to yesterday
- Window to calculate
The classification of the State
- Managed State: Flink automatically manages and optimizes a wide variety of data structures and can be used in most situations
- Keyed State: used on keyBy after grouping
- Operator State: Can be used on all operators, generally on source
- Raw State: the Raw State is completely managed by users. Only byte[] is supported. The Raw State can be used when operators are customized, but rarely.
Detailed illustration:
The sample
KeyedState:
package com.learn.flink.state;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** * Use the keyedState type of valueState to calculate the maximum value, the actual development type of maxBy */
public class StateDemo_KeyedState {
public static void main(String[] args) throws Exception {
//0:env
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//1: source
DataStream<Tuple2<String, Long>> ds = env.fromElements(
Tuple2.of("Beijing".1L),
Tuple2.of("Beijing".4L),
Tuple2.of("Shanghai".8L),
Tuple2.of("Beijing".3L),
Tuple2.of("Shanghai".2L));//2: transformation
// Demand: maximize sales in each city
final SingleOutputStreamOperator<Tuple2<String, Long>> result = ds.keyBy(t -> t.f0).maxBy(1);
// Use keyedState type valueState to calculate the maximum value, simulate maxBy's experimental principle
final SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = ds.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
//1: defines a maximum state store value
private ValueState<Long> maxValueState;
//2: state initialization
@Override
public void open(Configuration parameters) throws Exception {
//2-1: create a state descriptor
final ValueStateDescriptor<Long> maxValueStateDesc = new ValueStateDescriptor("maxValueState", Long.class);
//2-2: Indicates the initialization status based on the status description
maxValueState = getRuntimeContext().getState(maxValueStateDesc);
}
//3: in use
@Override
public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
final Long currentValue = value.f1;
// Get the status
final Long historyMaxValue = maxValueState.value();
if (null == historyMaxValue || historyMaxValue < currentValue) {
maxValueState.update(currentValue);
}
returnTuple3.of(value.f0, value.f1, maxValueState.value()); }});//3: sink
result2.print();
//4: executeenv.execute(); }}Copy the code