sequence
This paper mainly studies the aggregation operation of Flink KeyedStream
The instance
@Test
public void testMax() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new
WordCount(1,"World", 3), new WordCount(2,"Hello", 1)};
env.fromElements(data)
.keyBy("word")
.max("frequency")
.addSink(new SinkFunction<WordCount>() {
@Override
public void invoke(WordCount value, Context context) throws Exception {
LOGGER.info("value:{}",value); }}); env.execute("testMax");
}
Copy the code
- The word field is keyBy and the frequency field is the maximum WordCount by KeyedStream’s Max method
KeyedStream.aggregate
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java
public SingleOutputStreamOperator<T> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
public SingleOutputStreamOperator<T> sum(String field) {
return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}
public SingleOutputStreamOperator<T> max(int positionToMax) {
return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
getExecutionConfig()));
}
public SingleOutputStreamOperator<T> max(String field) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
false, getExecutionConfig()));
}
public SingleOutputStreamOperator<T> min(int positionToMin) {
return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
getExecutionConfig()));
}
public SingleOutputStreamOperator<T> min(String field) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
false, getExecutionConfig()));
}
public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
getExecutionConfig()));
}
public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
first, getExecutionConfig()));
}
public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
getExecutionConfig()));
}
public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
first, getExecutionConfig()));
}
protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
clean(aggregate), getType().createSerializer(getExecutionConfig()));
return transform("Keyed Aggregation", getType(), operator);
}
Copy the code
- Aggregation of KeyedStream is protected. Sum, Max, min, maxBy and minBy actually call aggregate. The ComparableAggregator they create has a different AggregationType, SUM, MAX, MIN, MAXBY, MINBY
- Each sum, Max, min, maxBy, and minBy has two overloaded methods, one for an int and one for a String
- MaxBy and minBy are larger than sum, Max and min.
boolean
) argument, which specifies whether to return the first compare when multiple compare values are equal
ComparableAggregator
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@Internal
public class ComparableAggregator<T> extends AggregationFunction<T> {
private static final long serialVersionUID = 1L;
private Comparator comparator;
private boolean byAggregate;
private boolean first;
private final FieldAccessor<T, Object> fieldAccessor;
private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
this.comparator = Comparator.getForAggregation(aggregationType);
this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
this.first = first;
this.fieldAccessor = fieldAccessor;
}
public ComparableAggregator(int positionToAggregate,
TypeInformation<T> typeInfo,
AggregationType aggregationType,
ExecutionConfig config) {
this(positionToAggregate, typeInfo, aggregationType, false, config);
}
public ComparableAggregator(int positionToAggregate,
TypeInformation<T> typeInfo,
AggregationType aggregationType,
boolean first,
ExecutionConfig config) {
this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
}
public ComparableAggregator(String field,
TypeInformation<T> typeInfo,
AggregationType aggregationType,
boolean first,
ExecutionConfig config) {
this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
}
@SuppressWarnings("unchecked")
@Override
public T reduce(T value1, T value2) throws Exception {
Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1);
Object o2 = fieldAccessor.get(value2);
int c = comparator.isExtremal(o1, o2);
if (byAggregate) {
// if they are the same we choose based on whether we want to first or last
// element with the min/max.
if (c == 0) {
return first ? value1 : value2;
}
return c == 1 ? value1 : value2;
} else {
if (c == 0) {
value1 = fieldAccessor.set(value1, o2);
}
returnvalue1; }}}Copy the code
- The ComparableAggregator inherits AggregationFunction, which implements the ReduceFunction interface, where the Reduce method implemented by the ComparableAggregator, When the comparison value is 0, it determines whether to return the first encountered element. If the comparison value is 0, it returns value1; otherwise, it returns value2. The element with the largest comparison value is returned; If not byAggregate, then if the comparison value is 0(
Compares the value value1 of a field that is less than or equal to value2
), the reflection method is used to update the value of value2’s comparison field to Value1, and value1 is returned
AggregationFunction
@Internal
public abstract class AggregationFunction<T> implements ReduceFunction<T> {
private static final long serialVersionUID = 1L;
/**
* Aggregation types that can be used on a windowed stream or keyed stream.
*/
public enum AggregationType {
SUM, MIN, MAX, MINBY, MAXBY,
}
}
Copy the code
- The AggregationFunction declaration implements ReduceFunction and defines five types of AggregationTypes, namely SUM, MIN, MAX, MINBY, and MAXBY
Comparator
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/functions/aggregation/Comparator.java
@Internal
public abstract class Comparator implements Serializable {
private static final long serialVersionUID = 1L;
public abstract <R> int isExtremal(Comparable<R> o1, R o2);
public static Comparator getForAggregation(AggregationType type) {
switch (type) {
case MAX:
return new MaxComparator();
case MIN:
return new MinComparator();
case MINBY:
return new MinByComparator();
case MAXBY:
return new MaxByComparator();
default:
throw new IllegalArgumentException("Unsupported aggregation type.");
}
}
private static class MaxComparator extends Comparator {
private static final long serialVersionUID = 1L;
@Override
public <R> int isExtremal(Comparable<R> o1, R o2) {
return o1.compareTo(o2) > 0 ? 1 : 0;
}
}
private static class MaxByComparator extends Comparator {
private static final long serialVersionUID = 1L;
@Override
public <R> int isExtremal(Comparable<R> o1, R o2) {
int c = o1.compareTo(o2);
if (c > 0) {
return 1;
}
if (c == 0) {
return 0;
} else {
return- 1; } } } private static class MinByComparator extends Comparator {
private static final long serialVersionUID = 1L;
@Override
public <R> int isExtremal(Comparable<R> o1, R o2) {
int c = o1.compareTo(o2);
if (c < 0) {
return 1;
}
if (c == 0) {
return 0;
} else {
return- 1; } } } private static class MinComparator extends Comparator {
private static final long serialVersionUID = 1L;
@Override
public <R> int isExtremal(Comparable<R> o1, R o2) {
returno1.compareTo(o2) < 0 ? 1 : 0; }}}Copy the code
- The Comparator implements the Serializable interface, defines the isExtremal abstract method, and provides the getForAggregation factory method to create different comparators according to different AggregationTypes
- MaxComparator, MinComparator, MinByComparator and MaxByComparator subclasses are defined inComparator, and they all implement isExtremal method
- The MaxComparator uses the compareTo method defined by the Comparable interface directly, except that it returns only 0 and 1. If compareTo is greater than 0, then 0 is returned. The MaxByComparator also gets a value based on the compareTo method defined by the Comparable interface, but it returns three values: 1 if greater than 0, 0 if equal, and -1 if less than 0, that is, 1 if greater, 0 if equal, and -1 if less
summary
- Aggregation of KeyedStream is divided into sum, Max, min, maxBy, and minBy, all of which invoke protected aggregation internally. The ComparableAggregator they create has a different AggregationType, SUM, MAX, MIN, MAXBY, MINBY
- The ComparableAggregator inherits AggregationFunction, which implements the ReduceFunction interface, where the Reduce method implemented by the ComparableAggregator, When the comparison value is 0, it determines whether to return the first encountered element; if so, it returns the first encountered element; otherwise, it returns the last encountered element; when the comparison value is non-0, it returns the first encountered element. The element with the largest comparison value is returned; If it is not byAggregate, then if the comparison value is 0, the reflection method is used to update the value of the latter to Value1, and value1 is returned
- In Comparator, MaxComparator, MinComparator, MinByComparator, MaxByComparator four subclasses are defined. They all implement isExtremal method. When MaxComparator is greater than 1, it returns 0. When MaxByComparator is greater than 1, it returns 0. When MaxByComparator is less than -1. This difference is also reflected in ComparableAggregator’s Reduce method, and maxBy and minBy have one more first(
boolean
), which is used to select which element to return when comparing 0; For non-BYAggregate operations, the Reduce method always returns Value1. If the comparison value is less than or equal to the value, reflection is used to update Value1 and then returns Value1
doc
- DataStream Transformations