sequence
This article focuses on Storm’s IWaitStrategy
IWaitStrategy
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/policy/IWaitStrategy Java
public interface IWaitStrategy {
static IWaitStrategy createBackPressureWaitStrategy(Map<String, Object> topologyConf) {
IWaitStrategy producerWaitStrategy =
ReflectionUtils.newInstance((String) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
producerWaitStrategy.prepare(topologyConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
return producerWaitStrategy;
}
void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation);
/**
* Implementations of this method should be thread-safe (preferably no side-effects and lock-free)
* <p>
* Supports static or dynamic backoff. Dynamic backoff relies on idleCounter to estimate how long caller has been idling.
* <p>
* <pre>
* <code>
* int idleCounter = 0;
* int consumeCount = consumeFromQ();
* while (consumeCount==0) {
* idleCounter = strategy.idle(idleCounter);
* consumeCount = consumeFromQ();
* }
* </code>
* </pre>
*
* @param idleCounter managed by the idle method until reset
* @return new counter value to be used on subsequent idle cycle
*/
int idle(int idleCounter) throws InterruptedException;
enum WAIT_SITUATION {SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT}
}
Copy the code
- This interface provides a factory method, the default is to read the topology. The backpressure. Wait. The strategy parameter values, create producerWaitStrategy, and use WAIT_SITUATION. BACK_PRESSURE_WAIT initialization
- There are three types of WAIT_SITUATION: SPOUT_WAIT, BOLT_WAIT, and BACK_PRESSURE_WAIT
- This interface defines the int idle(int idleCounter) method for static or dynamic backoff
SpoutExecutor
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/executor/spout/SpoutExecutor. Java
public class SpoutExecutor extends Executor { private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); private final IWaitStrategy spoutWaitStrategy; private final IWaitStrategy backPressureWaitStrategy; private final AtomicBoolean lastActive; private final MutableLong emittedCount; private final MutableLong emptyEmitStreak; private final SpoutThrottlingMetrics spoutThrottlingMetrics; private final boolean hasAckers; private final SpoutExecutorStats stats; private final BuiltinMetrics builtInMetrics; SpoutOutputCollectorImpl spoutOutputCollector; private Integer maxSpoutPending; private List<ISpout> spouts; private List<SpoutOutputCollector> outputCollectors; private RotatingMap<Long, TupleInfo> pending; private long threadId = 0; public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) { super(workerData, executorId, credentials, ClientStatsUtil.SPOUT); this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT); this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY)); this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); / /... } / /... }Copy the code
- There are two WatiStrategies created, one spoutWaitStrategy and one backPressureWaitStrategy
- SpoutWaitStrategy read is the topology. The spout. Wait. The strategy parameters, the defaults. The value is in yaml is org. Apache. Storm. The policy. WaitStrategyProgressive
- BackPressureWaitStrategy read is the topology. The backpressure. Wait. The strategy parameters, In the defaults. The value is in yaml for org. Apache. Storm. The policy. WaitStrategyProgressive
BoltExecutor
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/executor/bolt/BoltExecutor. Java
public class BoltExecutor extends Executor {
private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);
private final BooleanSupplier executeSampler;
private final boolean isSystemBoltExecutor;
private final IWaitStrategy consumeWaitStrategy; // employed when no incoming data
private final IWaitStrategy backPressureWaitStrategy; // employed when outbound path is congested
private final BoltExecutorStats stats;
private final BuiltinMetrics builtInMetrics;
private BoltOutputCollectorImpl outputCollector;
public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
super(workerData, executorId, credentials, ClientStatsUtil.BOLT);
this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID);
if (isSystemBoltExecutor) {
this.consumeWaitStrategy = makeSystemBoltWaitStrategy();
} else {
this.consumeWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY));
this.consumeWaitStrategy.prepare(topoConf, WAIT_SITUATION.BOLT_WAIT);
}
this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),
ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
this.builtInMetrics = new BuiltinBoltMetrics(stats);
}
private static IWaitStrategy makeSystemBoltWaitStrategy() {
WaitStrategyPark ws = new WaitStrategyPark();
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
returnws; } / /... }Copy the code
- Two iWaitStrategies are created, a consumeWaitStrategy and a backPressureWaitStrategy
- ConsumeWaitStrategy in the case of non SystemBoltExecutor read is the topology. The bolt. Wait. The strategy parameters, In the defaults. The value is in yaml for org. Apache. Storm. The policy. WaitStrategyProgressive; If SystemBoltExecutor is used, the WaitStrategyPark policy is used
- BackPressureWaitStrategy read read is the topology. The backpressure. Wait. The strategy parameters, In the defaults. The value is in yaml for org. Apache. Storm. The policy. WaitStrategyProgressive
WaitStrategyPark
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/policy/WaitStrategyPark Java
public class WaitStrategyPark implements IWaitStrategy {
private long parkTimeNanoSec;
public WaitStrategyPark() { // required for instantiation via reflection. must call prepare() thereafter
}
// Convenience alternative to prepare() for use in Tests
public WaitStrategyPark(long microsec) {
parkTimeNanoSec = microsec * 1_000;
}
@Override
public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) {
if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {
parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC));
} else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC));
} else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC));
} else {
throw new IllegalArgumentException("Unknown wait situation : " + waitSituation);
}
}
@Override
public int idle(int idleCounter) throws InterruptedException {
if (parkTimeNanoSec == 0) {
return 1;
}
LockSupport.parkNanos(parkTimeNanoSec);
returnidleCounter + 1; }}Copy the code
- This policy uses the lockSupport. parkNanos(parkTimeNanoSec) method
WaitStrategyProgressive
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/policy/WaitStrategyProgressive Java
/**
* A Progressive Wait Strategy
* <p> Has three levels of idling. Stays in each level for a configured number of iterations before entering the next level.
* Level 1 - No idling. Returns immediately. Stays in this level for `level1Count` iterations. Level 2 - Calls LockSupport.parkNanos(1).
* Stays in this level for `level2Count` iterations Level 3 - Calls Thread.sleep(). Stays in this level until wait situation changes.
*
* <p>
* The initial spin can be useful to prevent downstream bolt from repeatedly sleeping/parking when the upstream component is a bit
* relatively slower. Allows downstream bolt can enter deeper wait states only if the traffic to it appears to have reduced.
* <p>
*/
public class WaitStrategyProgressive implements IWaitStrategy {
private int level1Count;
private int level2Count;
private long level3SleepMs;
@Override
public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) {
if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {
level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
} else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
} else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT));
level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT));
level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
} else {
throw new IllegalArgumentException("Unknown wait situation : " + waitSituation);
}
}
@Override
public int idle(int idleCounter) throws InterruptedException {
if (idleCounter < level1Count) { // level 1 - no waiting
++idleCounter;
} else if (idleCounter < level1Count + level2Count) { // level 2 - parkNanos(1L)
++idleCounter;
LockSupport.parkNanos(1L);
} else { // level 3 - longer idling with Thread.sleep()
Thread.sleep(level3SleepMs);
}
returnidleCounter; }}Copy the code
- WaitStrategyProgressive is a progressive wait strategy, which is divided into three levels of IDling
- Level 1 = no idling; Enter Level 2 at level1 after level1Count counts
- Locksupport.parknanos (1) is used for level2 and level 3 is entered after level2Count counts for level2
- Level 3 uses Thread.sleep(level3SleepMs) to jump out when waiting for the situation to change
- Different WAIT_SITUATION reads different LEVEL1_COUNT, LEVEL2_COUNT, and LEVEL3_SLEEP_MILLIS parameters, with default values of 0, 0, and 1 for spout. For Bolt, the default values are 1, 1000, and 1, respectively. For back Pressure, the default values are 1, 1000, and 1, respectively
SpoutExecutor.call
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/executor/spout/SpoutExecutor. Java
@Override
public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);
return new Callable<Long>() {
final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
int recvqCheckSkips = 0;
int swIdleCount = 0; // counter for spout wait strategy
int bpIdleCount = 0; // counter for back pressure wait strategy
int rmspCount = 0;
@Override
public Long call() throws Exception {
int receiveCount = 0;
if(recvqCheckSkips++ == recvqCheckSkipCountMax) { receiveCount = receiveQueue.consume(SpoutExecutor.this); recvqCheckSkips = 0; } long currCount = emittedCount.get(); boolean reachedMaxSpoutPending = (maxSpoutPending ! = 0) && (pending.size() >= maxSpoutPending); boolean isActive = stormActive.get();if(! isActive) { inactiveExecute();return 0L;
}
if(! lastActive.get()) { lastActive.set(true);
activateSpouts();
}
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
boolean noEmits = true;
long emptyStretch = 0;
if(! reachedMaxSpoutPending && pendingEmitsIsEmpty) {for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators. spouts.get(j).nextTuple(); } noEmits = (currCount == emittedCount.get()); if (noEmits) { emptyEmitStreak.increment(); } else { emptyStretch = emptyEmitStreak.get(); emptyEmitStreak.set(0); } } if (reachedMaxSpoutPending) { if (rmspCount == 0) { LOG.debug("Reached max spout pending"); } rmspCount++; } else { if (rmspCount > 0) { LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount); } rmspCount = 0; } if (receiveCount > 1) { // continue without idling return 0L; } if (! pendingEmits.isEmpty()) { // then facing backpressure backPressureWaitStrategy(); return 0L; } bpIdleCount = 0; if (noEmits) { spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch); return 0L; } swIdleCount = 0; return 0L; } private void backPressureWaitStrategy() throws InterruptedException { long start = Time.currentTimeMillis(); if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait."); } bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount); spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start); } private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException { emptyEmitStreak.increment(); long start = Time.currentTimeMillis(); swIdleCount = spoutWaitStrategy.idle(swIdleCount); if (reachedMaxSpoutPending) { spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start); } else { if (emptyStretch > 0) { LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch); } } } // returns true if pendingEmits is empty private boolean tryFlushPendingEmits() { for (AddressedTuple t = pendingEmits.peek(); t ! = null; t = pendingEmits.peek()) { if (executorTransfer.tryTransfer(t, null)) { pendingEmits.poll(); } else { // to avoid reordering of emits, stop at first failure return false; } } return true; }}; }Copy the code
- Spout maintains pendingEmits queues that emit without success or are waiting for emit. It also maintains pending RotatingMap, which is the ID and data of tuples waiting for ACK
- Spout from the topology. Max. Spout. Pending read TOPOLOGY_MAX_SPOUT_PENDING configuration, MaxSpoutPending = objectreader.getint (topoconf.get (config.topology_max_spout_pending), 0) * idtoTask.size (); Namely maxSpoutPending is 0
- Spout at! ReachedMaxSpoutPending && pendingEmitsIsEmpty before calling nextTuple to send data; Trigger backPressureWaitStrategy when pendingEmits is not empty; In noEmits (
(currCount == emittedCount.get())
) triggers spoutWaitStrategy - On each call, record currCount = emittedCount.get() between calls to nextTuple; If nextTuple is called, emittedCount is updated in the EMIT or emitDirect methods of SpoutOutputCollectorImpl; Then use noEmits=(currCount == emittedCount.get()) to determine whether there is launch data
- Spout maintain the bpIdleCount and swIdleCount, respectively for backPressureWaitStrategy. Idle (bpIdleCount), spoutWaitStrategy. Idle (swIdleCount)
BoltExecutor.call
Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/executor/bolt/BoltExecutor. Java
@Override
public Callable<Long> call() throws Exception {
init(idToTask, idToTaskBase);
return new Callable<Long>() {
int bpIdleCount = 0;
int consumeIdleCounter = 0;
private final ExitCondition tillNoPendingEmits = () -> pendingEmits.isEmpty();
@Override
public Long call() throws Exception {
boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
if (pendingEmitsIsEmpty) {
if(bpIdleCount ! = 0) { LOG.debug("Ending Back Pressure Wait stretch : {}", bpIdleCount);
}
bpIdleCount = 0;
int consumeCount = receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits);
if (consumeCount == 0) {
if (consumeIdleCounter == 0) {
LOG.debug("Invoking consume wait strategy");
}
consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter);
if(Thread.interrupted()) { throw new InterruptedException(); }}else {
if(consumeIdleCounter ! = 0) { LOG.debug("Ending consume wait stretch : {}", consumeIdleCounter); } consumeIdleCounter = 0; }}else {
if (bpIdleCount == 0) { // check avoids multiple log msgs when spinning in a idle loop
LOG.debug("Experiencing Back Pressure. Entering BackPressure Wait. PendingEmits = {}", pendingEmits.size());
}
bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
}
return 0L;
}
// returns true if pendingEmits is empty
private boolean tryFlushPendingEmits() {
for(AddressedTuple t = pendingEmits.peek(); t ! = null; t = pendingEmits.peek()) {if (executorTransfer.tryTransfer(t, null)) {
pendingEmits.poll();
} else { // to avoid reordering of emits, stop at first failure
return false; }}return true; }}; }Copy the code
- Bolt executor also maintained the pendingEmits in pendingEmits isn’t empty, trigger backPressureWaitStrategy. Idle (bpIdleCount)
- Consume (BoltExecutor. This, tillNoPendingEmits) return consumeCount when pendingEmits is empty, Consumewaitstrategy. idle(consumeIdleCounter)
- Bolt executor maintains bpIdleCount and consumeIdleCounter, For backPressureWaitStrategy. Idle (bpIdleCount) and consumeWaitStrategy. Idle (consumeIdleCounter)
summary
- Used to spout and bolt executor in backPressureWaitStrategy, read is the topology. The backpressure. Wait. The strategy parameters (
for any producer (spout/bolt/transfer thread) when the downstream Q is full
), use the implementation class is org. Apache. Storm. The policy. WaitStrategyProgressive, in downstream component recv queue full use of the back pressure of strategy; In spout or Bolt’s call method, each time pendingEmitsIsEmpty is determined, tryFlushPendingEmits is called to try to send data first. If the downstream receives the data successfully, The pendingEmits queue is empty. This mechanism is used to dynamically judge the downstream load and determine whether backpressure is triggered - Spout use spoutWaitStrategy, read is the topology. The spout. Wait. The strategy parameters (
employed when there is no data to produce
), use the implementation class is org. Apache. Storm. The policy. WaitStrategyProgressive, when no data is to launch use; The emittedCount is used - Bolt use consumeWaitStrategy, in the case of non SystemBoltExecutor read is the topology. The bolt. Wait. The strategy parameters (
employed when there is no data in its receive buffer to process
), use the implementation class is org. Apache. Storm. The policy. WaitStrategyProgressive, in the receive buffer data processing when not used; Consume (BoltExecutor. This, tillNoPendingEmits) return consumeCount - Spout differs from Bolt in that in addition to pendingEmitsIsEmpty, spout also has a reachedMaxSpoutPending parameter to determine whether to continue generating data. Bolt uses pendingEmitsIsEmpty to determine if it can continue to consume data
- IWaitStrategy In addition to the WaitStrategyProgressive implementation, there is also a WaitStrategyPark implementation, which is used when Bolt is SystemBolt
doc
- IWaitStrategy
- WaitStrategyProgressive
- WaitStrategyPark