sequence

This paper mainly studies the join operation of Flink DataStream

The instance

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
Copy the code
  • Join streams is called to merge with another stream and return JoinedStreams. You can then call JoinedStreams’ WHERE operation to construct a condition for the WHERE object. Where has an equalTo operation that can construct equalTo, and equalTo has a window operation that can construct WithWindow, WithWindow can set windowAssigner, Trigger, Evictor, and allowedLateness. It provides apply

DataStream.join

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {
	//......

	/**
	 * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
	 * and window can be specified.
	 */
	public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
		returnnew JoinedStreams<>(this, otherStream); } / /... }Copy the code
  • DataStream provides a join method that performs a join operation and returns JoinedStreams

JoinedStreams

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

@Public
public class JoinedStreams<T1, T2> {

	/** The first input stream. */
	private final DataStream<T1> input1;

	/** The second input stream. */
	private final DataStream<T2> input2;

	public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
		this.input1 = requireNonNull(input1);
		this.input2 = requireNonNull(input2);
	}

	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
		requireNonNull(keySelector);
		final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
		return where(keySelector, keyType);
	}

	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
		requireNonNull(keySelector);
		requireNonNull(keyType);
		returnnew Where<>(input1.clean(keySelector), keyType); } / /... }Copy the code
  • JoinedStreams primarily provides WHERE operations to build WHERE objects

Where

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

	@Public
	public class Where<KEY> {

		private final KeySelector<T1, KEY> keySelector1;
		private final TypeInformation<KEY> keyType;

		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
			this.keySelector1 = keySelector1;
			this.keyType = keyType;
		}

		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
			requireNonNull(keySelector);
			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
			return equalTo(keySelector, otherKey);
		}

		public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
			requireNonNull(keySelector);
			requireNonNull(keyType);

			if(! keyType.equals(this.keyType)) { throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
						"first key = " + this.keyType + " , second key = " + keyType);
			}

			returnnew EqualTo(input2.clean(keySelector)); } / /... }Copy the code
  • The Where object primarily provides an equalTo operation to build an equalTo object

EqualTo

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

		@Public
		public class EqualTo {

			private final KeySelector<T2, KEY> keySelector2;

			EqualTo(KeySelector<T2, KEY> keySelector2) {
				this.keySelector2 = requireNonNull(keySelector2);
			}

			/**
			 * Specifies the window on which the join operation works.
			 */
			@PublicEvolving
			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
				returnnew WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null); }}Copy the code
  • The EqualTo object provides the window operation to build the WithWindow object

WithWindow

/ flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

	@Public
	public static class WithWindow<T1, T2, KEY, W extends Window> {

		private final DataStream<T1> input1;
		private final DataStream<T2> input2;

		private final KeySelector<T1, KEY> keySelector1;
		private final KeySelector<T2, KEY> keySelector2;
		private final TypeInformation<KEY> keyType;

		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

		private final Time allowedLateness;

		private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;

		@PublicEvolving
		protected WithWindow(DataStream<T1> input1,
				DataStream<T2> input2,
				KeySelector<T1, KEY> keySelector1,
				KeySelector<T2, KEY> keySelector2,
				TypeInformation<KEY> keyType,
				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
				Time allowedLateness) {

			this.input1 = requireNonNull(input1);
			this.input2 = requireNonNull(input2);

			this.keySelector1 = requireNonNull(keySelector1);
			this.keySelector2 = requireNonNull(keySelector2);
			this.keyType = requireNonNull(keyType);

			this.windowAssigner = requireNonNull(windowAssigner);

			this.trigger = trigger;
			this.evictor = evictor;

			this.allowedLateness = allowedLateness;
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, newTrigger, evictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, newEvictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
				windowAssigner, trigger, evictor, newLateness);
		}

		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
				function,
				JoinFunction.class,
				0,
				1,
				2,
				TypeExtractor.NO_INDEX,
				input1.getType(),
				input2.getType(),
				"Join".false);

			return apply(function, resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) {
			return (SingleOutputStreamOperator<T>) apply(function);
		}

		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			coGroupedWindowedStream = input1.coGroup(input2)
				.where(keySelector1)
				.equalTo(keySelector2)
				.window(windowAssigner)
				.trigger(trigger)
				.evictor(evictor)
				.allowedLateness(allowedLateness);

			return coGroupedWindowedStream
					.apply(new FlatJoinCoGroupFunction<>(function), resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			return (SingleOutputStreamOperator<T>) apply(function, resultType);
		}

		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
				function,
				FlatJoinFunction.class,
				0,
				1,
				2,
				new int[]{2, 0},
				input1.getType(),
				input2.getType(),
				"Join".false);

			return apply(function, resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) {
			return (SingleOutputStreamOperator<T>) apply(function);
		}

		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			coGroupedWindowedStream = input1.coGroup(input2)
				.where(keySelector1)
				.equalTo(keySelector2)
				.window(windowAssigner)
				.trigger(trigger)
				.evictor(evictor)
				.allowedLateness(allowedLateness);

			return coGroupedWindowedStream
					.apply(new JoinCoGroupFunction<>(function), resultType);
		}

		@PublicEvolving
		@Deprecated
		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			return (SingleOutputStreamOperator<T>) apply(function, resultType);
		}

		@VisibleForTesting
		Time getAllowedLateness() {
			return allowedLateness;
		}

		@VisibleForTesting
		CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() {
			returncoGroupedWindowedStream; }}Copy the code
  • WithWindow can set windowAssigner, Trigger, Evictor, and allowedLateness. It provides the apply operation (The with operation is marked as deprecated)
  • The Apply operation can accept JoinFunction or FlatJoinFunction. Inside, it uses DataStream’s coGroup method to create CoGroupedStreams. Then set its own WHERE and equalTo keySelector, windowAssigner, trigger, Evictor and allowedLateness to CoGroupedStreams. Finally, the Apply method of the WithWindow object of CoGroupedStreams is called
  • The apply parameters of the WithWindow object of CoGroupedStreams are different from those of JoinedStreams. The apply method WithWindow of CoGroupedStreams accepts CoGroupFunction, So the apply method of the WithWindow object in JoinedStreams internally wraps JoinFunction or FlatJoinFunction as CoGroupFunction(JoinFunction uses JoinCoGroupFunction and FlatJoinFunction uses FlatJoinCoGroupFunctionThe WithWindow apply method passed to CoGroupedStreams

JoinFunction

Flink – core – 1.7.0 – sources jar! /org/apache/flink/api/common/functions/JoinFunction.java

@Public @FunctionalInterface public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable { /** * The join method, called once per joined pair of elements. * * @param first The element from first input. * @param second The element from  second input. * @return The resulting element.
	 *
	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
	 *                   to fail and may trigger recovery.
	 */
	OUT join(IN1 first, IN2 second) throws Exception;
}
Copy the code
  • JoinFunction extends Function and Serializable. It defines the join operation. The default is the semantics of inner join. If outer join is required, CoGroupFunction can be used

FlatJoinFunction

Flink – core – 1.7.0 – sources jar! /org/apache/flink/api/common/functions/FlatJoinFunction.java

@Public @FunctionalInterface public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable { /** * The join method, called once per joined pair of elements. * * @param first The element from first input. * @param second The element from  second input. * @param out The collector used toreturnzero, one, or more elements. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation  * to fail and may trigger recovery. */ void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception; }Copy the code
  • If outer join is required, CoGroupFunction can be used. If outer join is required, CoGroupFunction can be used. If outer join is required, CoGroupFunction can be used. Different from JoinFunction’s join method, FlatJoinFunction’s join method has the Collector parameter. It can be used to send zero, one or more data, so it is named Flat

CoGroupedStreams

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Public
public class CoGroupedStreams<T1, T2> {
	//......

@Public
	public static class WithWindow<T1, T2, KEY, W extends Window> {
		private final DataStream<T1> input1;
		private final DataStream<T2> input2;

		private final KeySelector<T1, KEY> keySelector1;
		private final KeySelector<T2, KEY> keySelector2;

		private final TypeInformation<KEY> keyType;

		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

		private final Time allowedLateness;

		private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;

		protected WithWindow(DataStream<T1> input1,
				DataStream<T2> input2,
				KeySelector<T1, KEY> keySelector1,
				KeySelector<T2, KEY> keySelector2,
				TypeInformation<KEY> keyType,
				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
				Time allowedLateness) {
			this.input1 = input1;
			this.input2 = input2;

			this.keySelector1 = keySelector1;
			this.keySelector2 = keySelector2;
			this.keyType = keyType;

			this.windowAssigner = windowAssigner;
			this.trigger = trigger;
			this.evictor = evictor;

			this.allowedLateness = allowedLateness;
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, newTrigger, evictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, newEvictor, allowedLateness);
		}

		@PublicEvolving
		public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
					windowAssigner, trigger, evictor, newLateness);
		}

		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

			TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
				function,
				input1.getType(),
				input2.getType(),
				"CoGroup".false);

			return apply(function, resultType);
		}

		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
			UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

			DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
					.map(new Input1Tagger<T1, T2>())
					.setParallelism(input1.getParallelism())
					.returns(unionType);
			DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
					.map(new Input2Tagger<T1, T2>())
					.setParallelism(input2.getParallelism())
					.returns(unionType);

			DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

			// we explicitly create the keyed stream to manually pass the key type information in
			windowedStream =
					new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
					.window(windowAssigner);

			if(trigger ! = null) { windowedStream.trigger(trigger); }if(evictor ! = null) { windowedStream.evictor(evictor); }if(allowedLateness ! = null) { windowedStream.allowedLateness(allowedLateness); }return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); } / /... } / /... }Copy the code
  • The overall class structure of CoGroupedStreams is similar to JoinedStreams. CoGroupedStreams provides where operations to build WHERE objects. The Where object mainly provides an equalTo operation to build an equalTo object; The EqualTo object provides the window operation to build the WithWindow object; WithWindow can set windowAssigner, Trigger, Evictor, and allowedLateness. It provides apply. One difference is that the apply operation on the WithWindow object defined by CoGroupedStreams accepts a Function of type CoGroupFunction, The apply operation on the WithWindow object defined by JoinedStreams accepts functions of either JoinFunction or FlatJoinFunction

CoGroupFunction

Flink – core – 1.7.0 – sources jar! /org/apache/flink/api/common/functions/CoGroupFunction.java

@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {

	/**
	 * This method must be implemented to provide a user implementation of a
	 * coGroup. It is called for each pair of element groups where the elements share the
	 * same key.
	 *
	 * @param first The records from the first input.
	 * @param second The records from the second.
	 * @param out A collector to return elements.
	 *
	 * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
	 *                   and may trigger the recovery logic.
	 */
	void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
Copy the code
  • CoGroupFunction extends Function and Serializable. It defines the coGroup operation, which can be used to implement outer join with Iterable parameters. JoinFunction and FlatJoinFunction join arguments use a single object type

WrappingFunction

Flink – Java – 1.7.0 – sources jar! /org/apache/flink/api/java/operators/translation/WrappingFunction.java

@Internal public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction { private static final  long serialVersionUID = 1L; protected T wrappedFunction; protected WrappingFunction(T wrappedFunction) { this.wrappedFunction = wrappedFunction; } @Override public void open(Configuration parameters) throws Exception { FunctionUtils.openFunction(this.wrappedFunction, parameters); } @Override public void close() throws Exception { FunctionUtils.closeFunction(this.wrappedFunction); } @Override public voidsetRuntimeContext(RuntimeContext t) {
		super.setRuntimeContext(t);

		FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
	}

	public T getWrappedFunction () {
		returnthis.wrappedFunction; }}Copy the code
  • WrappingFunction inherits AbstractRichFunction, which overwrites the open, close, and setRuntimeContext methods of its parent class to manage wrappedFunction

JoinCoGroupFunction

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

	/**
	 * CoGroup function that does a nested-loop join to get the join result.
	 */
	private static class JoinCoGroupFunction<T1, T2, T>
			extends WrappingFunction<JoinFunction<T1, T2, T>>
			implements CoGroupFunction<T1, T2, T> {
		private static final long serialVersionUID = 1L;

		public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
			super(wrappedFunction);
		}

		@Override
		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for(T2 val2: second) { out.collect(wrappedFunction.join(val1, val2)); }}}}Copy the code
  • JoinCoGroupFunction inherits WrappingFunction and implements the coGroup method defined by the CoGroupFunction interface. By default, the first collection is iterated over and each element is iterated over the second collection. Execute wrappedfunction. join one by one, then emit join data
  • JoinedStreams defines a private static class called JoinCoGroupFunction, which is wrapped inside the Apply method of the JoinedStreams WithWindow object. Call the apply method WithWindow of CoGroupedStreams
  • The Join method defined by JoinFunction receives two object type arguments, while the coGroup method defined by JoinCoGroupFunction receives two Iterable arguments

FlatJoinCoGroupFunction

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

	/**
	 * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
	 */
	private static class FlatJoinCoGroupFunction<T1, T2, T>
			extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
			implements CoGroupFunction<T1, T2, T> {
		private static final long serialVersionUID = 1L;

		public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
			super(wrappedFunction);
		}

		@Override
		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for(T2 val2: second) { wrappedFunction.join(val1, val2, out); }}}}Copy the code
  • FlatJoinCoGroupFunction inherits WrappingFunction and implements the coGroup method defined by the CoGroupFunction interface. By default, the first collection is iterated over and each element of the second collection is iterated over. Execute wrappedfunction. join one by one, then emit join data
  • JoinedStreams defines the FlatJoinCoGroupFunction private static class. It is used internally by the Apply method of JoinedStreams’ WithWindow object to wrap the FlatJoinFunction. Call the apply method WithWindow of CoGroupedStreams
  • The Join method defined by FlatJoinFunction receives two object type arguments, while the coGroup method defined by FlatJoinCoGroupFunction receives two Iterable arguments

summary

  • DataStream provides a join method that performs a join operation and returns JoinedStreams. JoinedStreams mainly provides where operations to build WHERE objects; The Where object mainly provides an equalTo operation to build an equalTo object; The EqualTo object provides the window operation to build the WithWindow object; WithWindow can set windowAssigner, Trigger, Evictor, and allowedLateness, which provides the apply operation
  • The Apply operation can accept JoinFunction or FlatJoinFunction. Inside, it uses DataStream’s coGroup method to create CoGroupedStreams. Then set its own WHERE and equalTo keySelector, windowAssigner, trigger, Evictor and allowedLateness to CoGroupedStreams. Finally, call the Apply method of the WithWindow object of CoGroupedStreams. JoinFunction and FlatJoinFunction both inherit from Function and Serializable, which defines the join operation. The default is inner join semantics. If outer join is required, CoGroupFunction can be used. The difference between FlatJoinFunction and JoinFunction join is that the join method of FlatJoinFunction has the Collector parameter, which can be used to send zero, one or more data, so it is named Flat
  • The apply parameters of the WithWindow object of CoGroupedStreams are different from those of JoinedStreams. The apply method WithWindow of CoGroupedStreams accepts CoGroupFunction, So the apply method of the WithWindow object in JoinedStreams internally wraps JoinFunction or FlatJoinFunction as CoGroupFunction(JoinFunction uses JoinCoGroupFunction and FlatJoinFunction uses FlatJoinCoGroupFunction), then call the apply method WithWindow of CoGroupedStreams; Both JoinCoGroupFunction and FlatJoinCoGroupFunction inherit from WrappingFunction(It inherits AbstractRichFunction, which overwrites the parent class's open, close, and setRuntimeContext methods to manage wrappedFunctionCoGroupFunction (FlatJoinFunction) ¶ CoGroupFunction (FlatJoinFunction) ¶ The join method therefore passes more out arguments

doc

  • Joining
  • Flink principle and implementation: types and operations on data flow
  • Flink stream computing programming – experience joinedStream and coGroupedStream in two streams