
This paper mainly studies the intervalJoin operation of Flink KeyedStream

The instance

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/

public class KeyedStream<T, KEY> extends DataStream<T> {

	public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
  • The intervalJoin of KeyedStream creates and returns the intervalJoin


Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/

	public static class IntervalJoin<T1, T2, KEY> {

		private final KeyedStream<T1, KEY> streamOne;
		private final KeyedStream<T2, KEY> streamTwo;

				KeyedStream<T1, KEY> streamOne,
				KeyedStream<T2, KEY> streamTwo
		) {
			this.streamOne = checkNotNull(streamOne);
			this.streamTwo = checkNotNull(streamTwo);

		public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {

			TimeCharacteristic timeCharacteristic =

			if(timeCharacteristic ! = TimeCharacteristic.EventTime) { throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");

			checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
			checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");

			return new IntervalJoined<>(
  • IntervalJoin provides a between operation to set lowerBound and upperBound for interval. Here you can see between method of non TimeCharacteristic inside. The direct selling UnsupportedTimeCharacteristicException EventTime; The BETWEEN operation creates and returns the IntervalJoined


Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/

	public static class IntervalJoined<IN1, IN2, KEY> {

		private final KeyedStream<IN1, KEY> left;
		private final KeyedStream<IN2, KEY> right;

		private final long lowerBound;
		private final long upperBound;

		private final KeySelector<IN1, KEY> keySelector1;
		private final KeySelector<IN2, KEY> keySelector2;

		private boolean lowerBoundInclusive;
		private boolean upperBoundInclusive;

		public IntervalJoined(
				KeyedStream<IN1, KEY> left,
				KeyedStream<IN2, KEY> right,
				long lowerBound,
				long upperBound,
				boolean lowerBoundInclusive,
				boolean upperBoundInclusive) {

			this.left = checkNotNull(left);
			this.right = checkNotNull(right);

			this.lowerBound = lowerBound;
			this.upperBound = upperBound;

			this.lowerBoundInclusive = lowerBoundInclusive;
			this.upperBoundInclusive = upperBoundInclusive;

			this.keySelector1 = left.getKeySelector();
			this.keySelector2 = right.getKeySelector();

		public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
			this.upperBoundInclusive = false;
			return this;

		public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
			this.lowerBoundInclusive = false;
			return this;

		public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {

			final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(

			return process(processJoinFunction, outputType);

		public <OUT> SingleOutputStreamOperator<OUT> process(
				ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
				TypeInformation<OUT> outputType) {

			final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);

			final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
				new IntervalJoinOperator<>(

			return left
				.keyBy(keySelector1, keySelector2)
  • IntervalJoined is exclusive by default for lowerBound and upperBound. It also provides lowerBoundExclusive and upperBoundExclusive to be exclusive. IntervalJoined provides the process operation, which receives ProcessJoinFunction; The process operation creates the IntervalJoinOperator, Left.connect (right).keyby (keySelector1, keySelector2).transform(“Interval Join”, outputType, operator) Returns the SingleOutputStreamOperator (In this example, orangeStream is left and greenStream is right)


Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/functions/co/

@PublicEvolving public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction { private static final long serialVersionUID = -2444626938039012398L; public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception; public abstract class Context { public abstract long getLeftTimestamp(); public abstract long getRightTimestamp(); public abstract long getTimestamp(); public abstract <X> void output(OutputTag<X> outputTag, X value); }}
  • ProcessJoinFunction inherits the AbstractRichFunction, which defines the processElement abstract method as well as its own Context object. This object defines getLeftTimestamp, getRightTimestamp, getTimestamp, output four abstract methods


Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/operators/co/

public class IntervalJoinOperator<K, T1, T2, OUT>
		extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
		implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {

	private static final long serialVersionUID = -5380774605111543454L;

	private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);

	private static final String LEFT_BUFFER = "LEFT_BUFFER";
	private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
	private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
	private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
	private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";

	private final long lowerBound;
	private final long upperBound;

	private final TypeSerializer<T1> leftTypeSerializer;
	private final TypeSerializer<T2> rightTypeSerializer;

	private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
	private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;

	private transient TimestampedCollector<OUT> collector;
	private transient ContextImpl context;

	private transient InternalTimerService<String> internalTimerService;

	public IntervalJoinOperator(
			long lowerBound,
			long upperBound,
			boolean lowerBoundInclusive,
			boolean upperBoundInclusive,
			TypeSerializer<T1> leftTypeSerializer,
			TypeSerializer<T2> rightTypeSerializer,
			ProcessJoinFunction<T1, T2, OUT> udf) {


		Preconditions.checkArgument(lowerBound <= upperBound,
			"lowerBound <= upperBound must be fulfilled");

		// Move buffer by +1 / -1 depending on inclusiveness in order not needing
		// to check for inclusiveness later on
		this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
		this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;

		this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
		this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);

	public void open() throws Exception {;

		collector = new TimestampedCollector<>(output);
		context = new ContextImpl(userFunction);
		internalTimerService =
			getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);

	public void initializeState(StateInitializationContext context) throws Exception {

		this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
			new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer))

		this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
			new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))

	public void processElement1(StreamRecord<T1> record) throws Exception {
		processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);

	public void processElement2(StreamRecord<T2> record) throws Exception {
		processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);

	private <THIS, OTHER> void processElement(
			final StreamRecord<THIS> record,
			final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
			final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
			final long relativeLowerBound,
			final long relativeUpperBound,
			final boolean isLeft) throws Exception {

		final THIS ourValue = record.getValue();
		final long ourTimestamp = record.getTimestamp();

		if (ourTimestamp == Long.MIN_VALUE) {
			throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
					"interval stream joins need to have timestamps meaningful timestamps.");

		if (isLate(ourTimestamp)) {

		addToBuffer(ourBuffer, ourValue, ourTimestamp);

		for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
			final long timestamp  = bucket.getKey();

			if (timestamp < ourTimestamp + relativeLowerBound ||
					timestamp > ourTimestamp + relativeUpperBound) {

			for (BufferEntry<OTHER> entry: bucket.getValue()) {
				if (isLeft) {
					collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
				} else {
					collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);

		long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
		if (isLeft) {
			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
		} else {
			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);

	private boolean isLate(long timestamp) {
		long currentWatermark = internalTimerService.currentWatermark();
		returncurrentWatermark ! = Long.MIN_VALUE && timestamp < currentWatermark; } private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); collector.setAbsoluteTimestamp(resultTimestamp); context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); userFunction.processElement(left, right, context, collector); } @Override public void onEventTime(InternalTimer<K, String> timer) throws Exception { long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace(); logger.trace("onEventTime @ {}", timerTimestamp);

		switch (namespace) {
				long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
				logger.trace("Removing from left buffer @ {}", timestamp);
				long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
				logger.trace("Removing from right buffer @ {}", timestamp);
				throw new RuntimeException("Invalid namespace " + namespace);

	public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
		// do nothing.

