Triggers
Triggers determine when a window (formed by the window allocator) is ready for processing by the window function. Each WindowAssigner comes with a default trigger. If the default trigger doesn’t suit your needs, you can use trigger (…) Specify custom triggers.
The trigger interface has five methods that allow trigger to react to different events:
- OnElement () calls this method for every element that enters the window.
- OnEventTime () is called when the event timer is triggered.
- OnProcessingTime () is called when the processing time timer is triggered.
- OnMerge () is related to stateful triggers and merges the state of both triggers when their corresponding Windows merge, for example using a session window.
- Clear () This method performs a window deletion operation.
Two points to note about the above method:
1). The first three methods determine how to manipulate the input event by returning a TriggerResult.
CONTINUE: Do nothing.
FIRE: Triggers calculation.
PURE: Clears elements of the window.
FIRE_AND_PURE: Triggers calculation and cleaning of window elements.
2). Any of these methods can be used to register a processing or event time timer for a future operation
The Fire and Purge
Once the trigger determines that the window is ready to process, it fires, returning either FIRE or FIRE_AND_PURGE. This is the window operator’s signal for the result of the current window. Given a window with ProcessWindowFunction, all elements will be passed to ProcessWindowFunction (possibly after passing them to the expeller). Windows with ReduceFunction, AggregateFunction, or FoldFunction will simply emit its eager aggregation results.
When the trigger fires, it can be FIRE or FIRE_AND_PURGE. While FIRE keeps the window’s content, FIRE_AND_PURGE deletes it. By default, pre-implemented triggers only FIRE and do not clear window state.
Note ⚠️ : Clearing will only remove the contents of the window and will preserve any potential meta-information about the window as well as any trigger states.Copy the code
Default trigger
WindowAssigner’s default triggers are suitable for many use cases. For example, all event time window allocators have an EventTimeTrigger as the default trigger. Once WaterMark passes the end of the window, the trigger fires.
Note ⚠️ : The default trigger for GlobalWindow is NeverTrigger, which never fires. Therefore, when using GlobalWindow, you must always define a custom trigger. By specifying triggers using trigger (), you override the default triggers for WindowAssigner. For example, if CountTrigger is specified for TumblingEventTimeWindows, window triggers are no longer obtained based on time progress but only by counting. Now, if you want to react based on time and count, you have to write your own custom triggers.Copy the code
Built-in and custom triggers
Flink comes with some built-in triggers.
- EventTimeTrigger triggers calculations of Windows based on event time and the watermark mechanism.
- ProcessingTimeTrigger Triggers based on processing time.
- CountTrigger triggers a calculation if the number of window elements exceeds a predetermined limit.
- PurgingTrigger is converted to a Purging trigger as an argument to other triggers.
If you need to implement custom triggers, you should implement the Trigger class. Please note that the API is still evolving and may change in future releases of Flink.
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/ * * * * < li > < p / > title: DataStream trigger < / li > * < li >@author: li. Pan </li> * <li>Date: 2019/12/29 5:00pm </li> * <li>Version: V1.0</li> * <li>Description: User-defined number of elements trigger </li> */
public class CustomProcessingTimeTrigger extends Trigger<Object.TimeWindow> {
private static final long serialVersionUID = 1L;
private CustomProcessingTimeTrigger(a) {}
private static int flag = 0;
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
// CONTINUE means no output, i.e., we want to implement, say, 100 outputs at a time,
// Instead of output at the end of the window can be implemented here.
if(flag > 9){
flag = 0;
return TriggerResult.FIRE;
}else{
flag++;
}
System.out.println("onElement : "+element);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge(a) {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
// only register a timer if the time is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the time is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if(windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); }}@Override
public String toString(a) {
return "ProcessingTimeTrigger()";
}
/** * Create a custom trigger object */
public static CustomProcessingTimeTrigger create(a) {
return newCustomProcessingTimeTrigger(); }}Copy the code