sequence

This paper mainly studies The consecutive Windowed operations of Flink

The instance

DataStream<Integer> input = ... ; DataStream<Integer> resultsPerKey = input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new Summer()); DataStream<Integer> globalResults = resultsPerKey .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new TopKWindowFunction());Copy the code
  • This example first performs a partition based on keys, then a windowAll operation is performed on the dataStream, and then a windowAll operation is performed on the dataStream. Partition summary and then global summary in the same time window.Can solve problems like top-K Elements)

TimestampsAndPeriodicWatermarksOperator

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java

public class TimestampsAndPeriodicWatermarksOperator<T>
		extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
		implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

	private static final long serialVersionUID = 1L;

	private transient long watermarkInterval;

	private transient long currentWatermark;

	public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
		super(assigner);
		this.chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();

		currentWatermark = Long.MIN_VALUE;
		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

		if (watermarkInterval > 0) {
			long now = getProcessingTimeService().getCurrentProcessingTime();
			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
		}
	}

	@Override
	public void processElement(StreamRecord<T> element) throws Exception {
		final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
				element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

		output.collect(element.replace(element.getValue(), newTimestamp));
	}

	@Override
	public void onProcessingTime(long timestamp) throws Exception {
		// register next timer
		Watermark newWatermark = userFunction.getCurrentWatermark();
		if(newWatermark ! = null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } /** * Override the base implementation to completely ignore watermarks propagated from * upstream (we rely only on the  {@link AssignerWithPeriodicWatermarks} to emit * watermarks from here). */ @Override public void processWatermark(Watermark mark) throws Exception { //if we receive a Long.MAX_VALUE watermark we forward it since it is used
		// to signal the end of input and to not block watermark progress downstream
		if(mark.getTimestamp() == Long.MAX_VALUE && currentWatermark ! = Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } } @Override public void close() throws Exception { super.close(); // emit a final watermark Watermark newWatermark = userFunction.getCurrentWatermark();if(newWatermark ! = null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); }}}Copy the code
  • Assume assignTimestampsAndWatermarks use AssignerWithPeriodicWatermarks type parameters, then create TimestampsAndPeriodicWatermarksOperator; It registers a delayed task at open based on the specified watermarkInterval
  • The delay task will callback onProcessingTime method and onProcessingTime here will call AssignerWithPeriodicWatermarks getCurrentWatermark method for watermark, Then register the new delay task, delay time for getProcessingTimeService () getCurrentProcessingTime () + watermarkInterval; Here watermarkInterval is the env. GetConfig () setAutoWatermarkInterval set value
  • AssignerWithPeriodicWatermarks getCurrentWatermark method besides the registration delay task implementation timing effect, also under the condition of new watermark value is greater than the currentWatermark watermark

SystemProcessingTimeService

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

public class SystemProcessingTimeService extends ProcessingTimeService {

	private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);

	private static final int STATUS_ALIVE = 0;
	private static final int STATUS_QUIESCED = 1;
	private static final int STATUS_SHUTDOWN = 2;

	// ------------------------------------------------------------------------

	/** The containing task that owns this time service provider. */
	private final AsyncExceptionHandler task;

	/** The lock that timers acquire upon triggering. */
	private final Object checkpointLock;

	/** The executor service that schedules and calls the triggers of this task. */
	private final ScheduledThreadPoolExecutor timerService;

	private final AtomicInteger status;

	public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
		this(failureHandler, checkpointLock, null);
	}

	public SystemProcessingTimeService(
			AsyncExceptionHandler task,
			Object checkpointLock,
			ThreadFactory threadFactory) {

		this.task = checkNotNull(task);
		this.checkpointLock = checkNotNull(checkpointLock);

		this.status = new AtomicInteger(STATUS_ALIVE);

		if (threadFactory == null) {
			this.timerService = new ScheduledThreadPoolExecutor(1);
		} else {
			this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
		}

		// tasks should be removed if the future is canceled
		this.timerService.setRemoveOnCancelPolicy(true);

		// make sure shutdown removes all pending tasks
		this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
		this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
	}

	@Override
	public long getCurrentProcessingTime() {
		returnSystem.currentTimeMillis(); } @Override public ScheduledFuture<? > registerTimer(long timestamp, ProcessingTimeCallback target) { // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark // T says we won't see elements in the future with a timestamp smaller or equal to T. // With processing time, we therefore need to delay firing the timer by one ms. long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1; // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer try { return timerService.schedule( new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { final int status = this.status.get(); if (status == STATUS_QUIESCED) { return new NeverCompleteFuture(delay); } else if (status == STATUS_SHUTDOWN) { throw new IllegalStateException("Timer service is shut down"); } else { // something else happened, so propagate the exception throw e; }}} / /... }Copy the code
  • SystemProcessingTimeService registerTimer method based on the specified timestamp registered a delay task TriggerTask; TimerService for JDK own ScheduledThreadPoolExecutor; TriggerTask’s run method triggers ProcessingTimeCallback() when service status is STATUS_LIVE.Here for TimestampsAndPeriodicWatermarksOperatorOnProcessingTime method

WindowOperator

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
	implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

	//......
	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {

			//......

		} else {
			for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(window);
			}
		}

		// side output input event if
		// element not handled by any window
		// late arriving tag has been set
		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
		if (isSkippedElement && isElementLate(element)) {
			if(lateDataOutputTag ! = null){ sideOutput(element); }else {
				this.numLateRecordsDropped.inc();
			}
		}
	}

	/**
	 * Emits the contents of the given window using the {@link InternalWindowFunction}.
	 */
	@SuppressWarnings("unchecked") private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); } / /... }Copy the code
  • The processElement method of The WindowOperator adds the element to the windowState, which is HeapAggregatingState, and then calls the triggerContext.onElement method (It uses the trigger.onElement method, where trigger is EventTimeTrigger) gets TriggerResult, which triggers emitWindowContents if fire is needed, and empties windowState if Purge is needed; EmitWindowContents calls userFunction.process to perform user-defined window operations

EventTimeTrigger

Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private EventTimeTrigger() {}

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			ctx.registerEventTimeTimer(window.maxTimestamp());
			return TriggerResult.CONTINUE;
		}
	}

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
		return time == window.maxTimestamp() ?
			TriggerResult.FIRE :
			TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.deleteEventTimeTimer(window.maxTimestamp());
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(TimeWindow window,
			OnMergeContext ctx) {
		// only register a timer if the watermark is not yet past the end of the merged window
		// this is in line with the logic inonElement(). If the watermark is past the end of // the window onElement() will fire and setting a timer here would fire  the window twice. long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
			ctx.registerEventTimeTimer(windowMaxTimestamp);
		}
	}

	@Override
	public String toString() {
		return "EventTimeTrigger()";
	}

	public static EventTimeTrigger create() {
		returnnew EventTimeTrigger(); }}Copy the code
  • EventTimeTrigger’s onElement method will determine if window.maxtimestamp () <= ctx.getCurrentwaterMark () returns triggerResult.fire, Tell WindowOperator that you can emitWindowContents

summary

  • Flink supports consecutive Windowed operations, such as partition by key, counting the keys by specified window, and then windowAll the dataStream. The time WindowAssigner is the same as before, so that the partition summary can be achieved in the same time window, and then the global summary effect (Can solve problems like top-K Elements)
  • AssignerWithPeriodicWatermarks or AssignerWithPunctuatedWatermarks has two functions, one is from the element extracted timestamp as eventTime, One was to launch watermark; The element does not necessarily arrive in strict eventTime order, so the function of watermark is to restrict the entry of late data into the window, so that the window does not wait indefinitely for elements that might belong to the window. Elements that tell the window that eventTime is less than or equal to the watermark can be considered arrived (The window can determine whether the window can be closed according to the time range set by itself and start to perform relevant operations on the window data with the help of trigger); For consecutive Windowed operations, the upstream watermark is forward to the downstream operations
  • Trigger is used to tell the WindowOperator when to close the window and start performing operations on the window data (Return triggerResult. FIRE case), for EventTimeTrigger, its onElement method is related to watermark, If window.maxtimestamp () <= ctx.getCurrentwaterMark (), triggerResult.fire is returned

doc

  • Consecutive windowed operations