sequence
This article focuses on storm window trigger
WindowTridentProcessor.prepare
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/WindowTridentProcessor.java
public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
this.topologyContext = context;
List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
if(parents.size() ! = 1) { throw new RuntimeException("Aggregation related operation can only have one parent");
}
Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);
this.tridentContext = tridentContext;
collector = new FreshCollector(tridentContext);
projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);
windowStore = windowStoreFactory.create(stormConf);
windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);
tridentWindowManager = storeTuplesInStore ?
new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
: new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());
tridentWindowManager.prepare();
}
Copy the code
- Here call tridentWindowManager. Prepare ()
AbstractTridentWindowManager.prepare
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
Aggregator aggregator, BatchOutputCollector delegateCollector) {
this.windowTaskId = windowTaskId;
this.windowStore = windowStore;
this.aggregator = aggregator;
this.delegateCollector = delegateCollector;
windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;
windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
windowManager.setEvictionPolicy(evictionPolicy);
triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
windowManager.setTriggerPolicy(triggerPolicy);
}
public void prepare() {
preInitialize();
initialize();
postInitialize();
}
private void postInitialize() {
// start trigger once the initialization is done.
triggerPolicy.start();
}
Copy the code
- AbstractTridentWindowManager in constructor calls windowStrategy. GetTriggerPolicy get triggerPolicy; The prepare method calls postInitialize, which triggers triggerPolicy.start().
SlidingDurationWindowStrategy.getTriggerPolicy
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
/**
* Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
*
* @param triggerHandler
* @param evictionPolicy
* @return
*/
@Override
public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
}
Copy the code
- SlidingDurationWindowStrategy, for example, created here is TimeTriggerPolicy, its duration is windowConfig. GetSlidingLength (), The triggerHandler is a WindowManager
TimeTriggerPolicy.start
Storm – core – 1.2.2 – sources jar! /org/apache/storm/windowing/TimeTriggerPolicy.java
public void start() {
executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
}
private Runnable newTriggerTask() {
return new Runnable() {
@Override
public void run() {
// do not process current timestamp since tuples might arrive while the trigger is executing
long now = System.currentTimeMillis() - 1;
try {
/*
* set the current timestamp as the reference time for the eviction policy
* to evict the events
*/
if(evictionPolicy ! = null) { evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration)); } handler.onTrigger(); } catch (Throwable th) { LOG.error("handler.onTrigger failed ", th); /* * propagate it so that task gets canceled and the exception * can be retrieved from executorFuture.get() */ throw th; }}}; }Copy the code
- The start method registers a scheduling task that is fired every duration (
windowConfig.getSlidingLength()
); The run method triggers handler.ontrigger (), windowManager.ontrigger ()
WindowManager.onTrigger
Storm – core – 1.2.2 – sources jar! /org/apache/storm/windowing/WindowManager.java
/**
* The callback invoked by the trigger policy.
*/
@Override
public boolean onTrigger() {
List<Event<T>> windowEvents = null;
List<T> expired = null;
try {
lock.lock();
/*
* scan the entire window to handle out of order events in
* the case of time based windows.
*/
windowEvents = scanEvents(true);
expired = new ArrayList<>(expiredEvents);
expiredEvents.clear();
} finally {
lock.unlock();
}
List<T> events = new ArrayList<>();
List<T> newEvents = new ArrayList<>();
for (Event<T> event : windowEvents) {
events.add(event.get());
if(! prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear();if(! events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
windowLifecycleListener.onActivation(events, newEvents, expired);
} else {
LOG.debug("No events in the window, skipping onActivation");
}
triggerPolicy.reset();
return! events.isEmpty(); }Copy the code
- Here call windowLifecycleListener. OnActivation (events, newEvents, expired), And windowLifecycleListener AbstractTridentWindowManager TridentWindowLifeCycleListener
TridentWindowLifeCycleListener.onActivation
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
/**
* Listener to reeive any activation/expiry of windowing events and take further action on them.
*/
class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {
@Override
public void onExpiry(List<T> expiredEvents) {
LOG.debug("onExpiry is invoked");
onTuplesExpired(expiredEvents);
}
@Override
public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
LOG.debug("onActivation is invoked with events size: [{}]", events.size());
// trigger occurred, create an aggregation and keep them in store
int currentTriggerId = triggerId.incrementAndGet();
execAggregatorAndStoreResult(currentTriggerId, events);
}
}
private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);
// run aggregator to compute the result
AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
Object state = aggregator.init(currentTriggerId, collector);
for (TridentTuple resultTuple : resultTuples) {
aggregator.aggregate(state, resultTuple, collector);
}
aggregator.complete(state, collector);
List<List<Object>> resultantAggregatedValue = collector.values;
ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
windowStore.putAll(entries);
pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
}
Copy the code
- TridentWindowLifeCycleListener. Mainly execAggregatorAndStoreResult onActivation method
- While execAggregatorAndStoreResult, in turn, calls the aggregator init, aggregate and complete method
- Finally, place the TriggerResult into pendingTriggers
summary
- When the storm TimeTriggerPolicy. Start registered task TriggerTask regularly, in the case of SlidingDurationWindowStrategy, Its scheduling interval for windowConfig. GetSlidingLength ()
- TriggerTask timing trigger WindowManager onTrigger method, this method can callback windowLifecycleListener onActivation
- Provided the TridentWindowLifeCycleListener AbstractTridentWindowManager, its execAggregatorAndStoreResult onActivation mainly call; On aggregator execAggregatorAndStoreResult method is mainly to complete a series of calls, call the init method first, and then traverse resultTuples turn call aggregate method, finally complete method (
You can clearly see the invocation logic and sequence of each method on the Aggregator interface
)
doc
- Windowing Support in Core Storm